Clean up StaticMemoryStrategy, implement method chaining for ObjectPoolBounds struct

This commit is contained in:
Jackie Kay 2015-08-10 18:05:44 -07:00
parent e57b049aeb
commit 872edfc743
5 changed files with 163 additions and 55 deletions

View file

@ -187,6 +187,7 @@ protected:
"[rclcpp::error] take failed for subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
}
subscription->return_message(message);
}
static void
@ -791,12 +792,7 @@ protected:
AnyExecutable::SharedPtr
get_next_ready_executable()
{
return get_next_ready_executable(this->memory_strategy_->instantiate_next_executable());
}
AnyExecutable::SharedPtr
get_next_ready_executable(AnyExecutable::SharedPtr any_exec)
{
auto any_exec = this->memory_strategy_->instantiate_next_executable();
// Check the timers to see if there are any that are ready, if so return
get_next_timer(any_exec);
if (any_exec->timer) {
@ -817,9 +813,8 @@ protected:
if (any_exec->client) {
return any_exec;
}
// If there is neither a ready timer nor subscription, return a null ptr
any_exec.reset();
return any_exec;
// If there is no ready executable, return a null ptr
return std::shared_ptr<AnyExecutable>();
}
template<typename T = std::milli>
@ -842,7 +837,7 @@ protected:
if (any_exec) {
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly
if (any_exec->callback_group->type_ == \
if (any_exec->callback_group && any_exec->callback_group->type_ == \
callback_group::CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise

View file

@ -51,7 +51,7 @@ public:
virtual executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
return executor::AnyExecutable::SharedPtr(new executor::AnyExecutable);
return std::make_shared<executor::AnyExecutable>();
}
virtual void * alloc(size_t size)

View file

@ -25,7 +25,7 @@ namespace strategies
namespace message_pool_memory_strategy
{
template<typename MessageT, size_t size,
template<typename MessageT, size_t Size,
typename std::enable_if<rosidl_generator_traits::has_fixed_size<MessageT>::value>::type * =
nullptr>
class MessagePoolMemoryStrategy
@ -36,7 +36,7 @@ public:
MessagePoolMemoryStrategy()
: next_array_index_(0)
{
for (size_t i = 0; i < size; ++i) {
for (size_t i = 0; i < Size; ++i) {
pool_[i].msg_ptr_ = std::make_shared<MessageT>();
pool_[i].used = false;
}
@ -45,7 +45,7 @@ public:
std::shared_ptr<MessageT> borrow_message()
{
size_t current_index = next_array_index_;
next_array_index_ = (next_array_index_ + 1) % size;
next_array_index_ = (next_array_index_ + 1) % Size;
if (pool_[current_index].used) {
throw std::runtime_error("Tried to access message that was still in use! Abort.");
}
@ -58,7 +58,7 @@ public:
void return_message(std::shared_ptr<MessageT> & msg)
{
for (size_t i = 0; i < size; ++i) {
for (size_t i = 0; i < Size; ++i) {
if (pool_[i].msg_ptr_ == msg) {
pool_[i].used = false;
return;
@ -74,7 +74,7 @@ protected:
bool used;
};
std::array<PoolMember, size> pool_;
std::array<PoolMember, Size> pool_;
size_t next_array_index_;
};

View file

@ -27,71 +27,155 @@ namespace memory_strategies
namespace static_memory_strategy
{
struct ObjectPoolBounds
{
public:
size_t max_subscriptions_;
size_t max_services_;
size_t max_clients_;
size_t max_executables_;
size_t pool_size_;
struct ObjectPoolBounds
{
public:
size_t max_subscriptions;
size_t max_services;
size_t max_clients;
size_t max_executables;
size_t max_guard_conditions;
size_t pool_size;
ObjectPoolBounds(size_t subs = 10, size_t services = 10, size_t clients = 10,
size_t executables = 1, size_t pool = 1024)
: max_subscriptions_(subs), max_services_(services), max_clients_(clients), max_executables_(
executables), pool_size_(pool)
{}
};
ObjectPoolBounds()
: max_subscriptions(10), max_services(10), max_clients(10),
max_executables(1), max_guard_conditions(2), pool_size(1024)
{}
ObjectPoolBounds & set_max_subscriptions(size_t subscriptions)
{
max_subscriptions = subscriptions;
return *this;
}
ObjectPoolBounds & set_max_services(size_t services)
{
max_services = services;
return *this;
}
ObjectPoolBounds & set_max_clients(size_t clients)
{
max_clients = clients;
return *this;
}
ObjectPoolBounds & set_max_guard_conditions(size_t guard_conditions)
{
max_guard_conditions = guard_conditions;
return *this;
}
ObjectPoolBounds & set_max_executables(size_t executables)
{
max_executables = executables;
return *this;
}
ObjectPoolBounds & set_memory_pool_size(size_t pool)
{
pool_size = pool;
return *this;
}
};
class StaticMemoryStrategy : public memory_strategy::MemoryStrategy
{
public:
StaticMemoryStrategy(ObjectPoolBounds bounds = ObjectPoolBounds())
: bounds_(bounds)
: bounds_(bounds), memory_pool_(nullptr), subscription_pool_(nullptr),
service_pool_(nullptr), guard_condition_pool_(nullptr), executable_pool_(nullptr)
{
memory_pool_ = static_cast<void **>(malloc(bounds_.pool_size_));
subscription_pool_ = static_cast<void **>(malloc(bounds_.max_subscriptions_));
service_pool_ = static_cast<void **>(malloc(bounds_.max_services_));
client_pool_ = static_cast<void **>(malloc(bounds_.max_clients_));
executable_pool_ = static_cast<executor::AnyExecutable *>(malloc(bounds_.max_executables_));
if (bounds_.pool_size) {
memory_pool_ = new void *[bounds_.pool_size];
memset(memory_pool_, 0, bounds_.pool_size * sizeof(void *));
}
if (bounds_.max_subscriptions) {
subscription_pool_ = new void *[bounds_.max_subscriptions];
memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *));
}
if (bounds_.max_services) {
service_pool_ = new void *[bounds_.max_services];
memset(service_pool_, 0, bounds_.max_services * sizeof(void *));
}
if (bounds_.max_clients) {
client_pool_ = new void *[bounds_.max_clients];
memset(client_pool_, 0, bounds_.max_clients * sizeof(void *));
}
if (bounds_.max_guard_conditions) {
guard_condition_pool_ = new void *[bounds_.max_guard_conditions];
memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *));
}
if (bounds_.max_executables) {
executable_pool_ = new executor::AnyExecutable::SharedPtr[bounds_.max_executables];
}
for (size_t i = 0; i < bounds_.max_executables; ++i) {
executable_pool_[i] = std::make_shared<executor::AnyExecutable>();
}
memset(memory_pool_, 0, bounds_.pool_size_);
memset(subscription_pool_, 0, bounds_.max_subscriptions_);
memset(service_pool_, 0, bounds_.max_services_);
memset(client_pool_, 0, bounds_.max_clients_);
memset(executable_pool_, 0, bounds_.max_executables_);
pool_seq_ = 0;
exec_seq_ = 0;
// Reserve pool_size_ buckets in the memory map.
memory_map_.reserve(bounds_.pool_size_);
for (size_t i = 0; i < bounds_.pool_size_; ++i) {
memory_map_.reserve(bounds_.pool_size);
for (size_t i = 0; i < bounds_.pool_size; ++i) {
memory_map_[memory_pool_[i]] = 0;
}
}
~StaticMemoryStrategy()
{
if (bounds_.pool_size) {
delete[] memory_pool_;
}
if (bounds_.max_subscriptions) {
delete[] subscription_pool_;
}
if (bounds_.max_services) {
delete[] service_pool_;
}
if (bounds_.max_clients) {
delete[] client_pool_;
}
if (bounds_.max_guard_conditions) {
delete[] guard_condition_pool_;
}
}
void ** borrow_handles(HandleType type, size_t number_of_handles)
{
switch (type) {
case HandleType::subscription_handle:
if (number_of_handles > bounds_.max_subscriptions_) {
if (number_of_handles > bounds_.max_subscriptions) {
throw std::runtime_error("Requested size exceeded maximum subscriptions.");
}
return subscription_pool_;
case HandleType::service_handle:
if (number_of_handles > bounds_.max_services_) {
if (number_of_handles > bounds_.max_services) {
throw std::runtime_error("Requested size exceeded maximum services.");
}
return service_pool_;
case HandleType::client_handle:
if (number_of_handles > bounds_.max_clients_) {
if (number_of_handles > bounds_.max_clients) {
throw std::runtime_error("Requested size exceeded maximum clients.");
}
return client_pool_;
case HandleType::guard_condition_handle:
if (number_of_handles > bounds_.max_guard_conditions) {
throw std::runtime_error("Requested size exceeded maximum guard_conditions.");
}
return guard_condition_pool_;
default:
break;
}
@ -103,13 +187,24 @@ public:
(void)handles;
switch (type) {
case HandleType::subscription_handle:
memset(subscription_pool_, 0, bounds_.max_subscriptions_);
if (bounds_.max_subscriptions) {
memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *));
}
break;
case HandleType::service_handle:
memset(service_pool_, 0, bounds_.max_services_);
if (bounds_.max_services) {
memset(service_pool_, 0, bounds_.max_services * sizeof(void *));
}
break;
case HandleType::client_handle:
memset(client_pool_, 0, bounds_.max_clients_);
if (bounds_.max_clients) {
memset(client_pool_, 0, bounds_.max_clients * sizeof(void *));
}
break;
case HandleType::guard_condition_handle:
if (bounds_.max_guard_conditions) {
memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *));
}
break;
default:
throw std::runtime_error("Unrecognized enum, could not return handle memory.");
@ -118,28 +213,39 @@ public:
executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
if (exec_seq_ >= bounds_.max_executables_) {
if (exec_seq_ >= bounds_.max_executables) {
// wrap around
exec_seq_ = 0;
}
size_t prev_exec_seq_ = exec_seq_;
++exec_seq_;
return std::make_shared<executor::AnyExecutable>(executable_pool_[prev_exec_seq_]);
if (!executable_pool_[prev_exec_seq_]) {
throw std::runtime_error("Executable pool member was NULL");
}
executable_pool_[prev_exec_seq_]->subscription.reset();
executable_pool_[prev_exec_seq_]->timer.reset();
executable_pool_[prev_exec_seq_]->service.reset();
executable_pool_[prev_exec_seq_]->client.reset();
executable_pool_[prev_exec_seq_]->callback_group.reset();
executable_pool_[prev_exec_seq_]->node.reset();
return executable_pool_[prev_exec_seq_];
}
void * alloc(size_t size)
{
// Extremely naive static allocation strategy
// Keep track of block size at a given pointer
if (pool_seq_ + size > bounds_.pool_size_) {
if (pool_seq_ + size > bounds_.pool_size) {
// Start at 0
pool_seq_ = 0;
}
void * ptr = memory_pool_[pool_seq_];
if (memory_map_.count(ptr) == 0) {
// We expect to have the state for all blocks pre-mapped into memory_map_
throw std::runtime_error("Unexpected pointer in rcl_malloc.");
throw std::runtime_error("Unexpected pointer in StaticMemoryStrategy::alloc.");
}
memory_map_[ptr] = size;
size_t prev_pool_seq = pool_seq_;
@ -151,7 +257,7 @@ public:
{
if (memory_map_.count(ptr) == 0) {
// We expect to have the state for all blocks pre-mapped into memory_map_
throw std::runtime_error("Unexpected pointer in rcl_free.");
throw std::runtime_error("Unexpected pointer in StaticMemoryStrategy::free.");
}
memset(ptr, 0, memory_map_[ptr]);
@ -164,7 +270,8 @@ private:
void ** subscription_pool_;
void ** service_pool_;
void ** client_pool_;
executor::AnyExecutable * executable_pool_;
void ** guard_condition_pool_;
executor::AnyExecutable::SharedPtr * executable_pool_;
size_t pool_seq_;
size_t exec_seq_;

View file

@ -80,6 +80,7 @@ public:
virtual std::shared_ptr<void> create_message() = 0;
virtual void handle_message(std::shared_ptr<void> & message) = 0;
virtual void return_message(std::shared_ptr<void> & message) = 0;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase);
@ -128,6 +129,11 @@ public:
{
auto typed_message = std::static_pointer_cast<MessageT>(message);
callback_(typed_message);
}
void return_message(std::shared_ptr<void> & message)
{
auto typed_message = std::static_pointer_cast<MessageT>(message);
message_memory_strategy_->return_message(typed_message);
}