Merge pull request #56 from ros2/memory_dependency_injection

Add "MemoryStrategy" to Executor with dep. injection
This commit is contained in:
Jackie Kay 2015-07-15 09:47:24 -07:00
commit 5a426d3064
7 changed files with 396 additions and 54 deletions

View file

@ -0,0 +1,47 @@
// Copyright 2014 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_ANY_EXECUTABLE_HPP_
#define RCLCPP_RCLCPP_ANY_EXECUTABLE_HPP_
#include <memory>
#include <rclcpp/macros.hpp>
#include <rclcpp/node.hpp>
namespace rclcpp
{
namespace executor
{
struct AnyExecutable
{
RCLCPP_MAKE_SHARED_DEFINITIONS(AnyExecutable);
AnyExecutable()
: subscription(0), timer(0), callback_group(0), node(0)
{}
// Either the subscription or the timer will be set, but not both
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;
// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
rclcpp::node::Node::SharedPtr node;
};
} /* executor */
} /* rclcpp */
#endif

View file

@ -24,7 +24,9 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp> #include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategy.hpp>
#include <rclcpp/node.hpp> #include <rclcpp/node.hpp>
#include <rclcpp/utilities.hpp> #include <rclcpp/utilities.hpp>
@ -35,12 +37,17 @@ namespace executor
class Executor class Executor
{ {
friend class memory_strategy::MemoryStrategy;
public: public:
RCLCPP_MAKE_SHARED_DEFINITIONS(Executor); RCLCPP_MAKE_SHARED_DEFINITIONS(Executor);
Executor() Executor(memory_strategy::MemoryStrategy::SharedPtr ms =
: interrupt_guard_condition_(rmw_create_guard_condition()) memory_strategy::create_default_strategy())
{} : interrupt_guard_condition_(rmw_create_guard_condition()),
memory_strategy_(ms)
{
}
virtual ~Executor() virtual ~Executor()
{ {
@ -91,7 +98,7 @@ public:
{ {
this->add_node(node); this->add_node(node);
// non-blocking = true // non-blocking = true
std::shared_ptr<AnyExecutable> any_exec = get_next_executable(nonblocking); auto any_exec = get_next_executable(nonblocking);
if (any_exec) { if (any_exec) {
execute_any_executable(any_exec); execute_any_executable(any_exec);
} }
@ -102,31 +109,25 @@ public:
{ {
this->add_node(node); this->add_node(node);
// non-blocking = true // non-blocking = true
std::shared_ptr<AnyExecutable> any_exec; while (AnyExecutable::SharedPtr any_exec = get_next_executable(true)) {
while ((any_exec = get_next_executable(true))) {
execute_any_executable(any_exec); execute_any_executable(any_exec);
} }
this->remove_node(node); this->remove_node(node);
} }
protected: // Support dynamic switching of memory strategy
struct AnyExecutable
{
AnyExecutable()
: subscription(0), timer(0), callback_group(0), node(0)
{}
// Either the subscription or the timer will be set, but not both
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::timer::TimerBase::SharedPtr timer;
rclcpp::service::ServiceBase::SharedPtr service;
rclcpp::client::ClientBase::SharedPtr client;
// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
rclcpp::node::Node::SharedPtr node;
};
void void
execute_any_executable(std::shared_ptr<AnyExecutable> & any_exec) set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
{
if (memory_strategy == nullptr) {
throw std::runtime_error("Received NULL memory strategy in executor.");
}
memory_strategy_ = memory_strategy;
}
protected:
void
execute_any_executable(AnyExecutable::SharedPtr & any_exec)
{ {
if (!any_exec) { if (!any_exec) {
return; return;
@ -265,8 +266,8 @@ protected:
rmw_subscriptions_t subscriber_handles; rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = number_of_subscriptions; subscriber_handles.subscriber_count = number_of_subscriptions;
// TODO(wjwwood): Avoid redundant malloc's // TODO(wjwwood): Avoid redundant malloc's
subscriber_handles.subscribers = static_cast<void **>( subscriber_handles.subscribers =
std::malloc(sizeof(void *) * number_of_subscriptions)); memory_strategy_->borrow_handles(HandleType::subscriber_handle, number_of_subscriptions);
if (subscriber_handles.subscribers == NULL) { if (subscriber_handles.subscribers == NULL) {
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc? // TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for subscriber pointers."); throw std::runtime_error("Could not malloc for subscriber pointers.");
@ -283,9 +284,8 @@ protected:
unsigned long number_of_services = services.size(); unsigned long number_of_services = services.size();
rmw_services_t service_handles; rmw_services_t service_handles;
service_handles.service_count = number_of_services; service_handles.service_count = number_of_services;
// TODO(esteve): Avoid redundant malloc's service_handles.services =
service_handles.services = static_cast<void **>( memory_strategy_->borrow_handles(HandleType::service_handle, number_of_services);
std::malloc(sizeof(void *) * number_of_services));
if (service_handles.services == NULL) { if (service_handles.services == NULL) {
// TODO(esteve): Use a different error here? maybe std::bad_alloc? // TODO(esteve): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for service pointers."); throw std::runtime_error("Could not malloc for service pointers.");
@ -302,9 +302,8 @@ protected:
unsigned long number_of_clients = clients.size(); unsigned long number_of_clients = clients.size();
rmw_clients_t client_handles; rmw_clients_t client_handles;
client_handles.client_count = number_of_clients; client_handles.client_count = number_of_clients;
// TODO: Avoid redundant malloc's client_handles.clients =
client_handles.clients = static_cast<void **>( memory_strategy_->borrow_handles(HandleType::client_handle, number_of_clients);
std::malloc(sizeof(void *) * number_of_clients));
if (client_handles.clients == NULL) { if (client_handles.clients == NULL) {
// TODO: Use a different error here? maybe std::bad_alloc? // TODO: Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for client pointers."); throw std::runtime_error("Could not malloc for client pointers.");
@ -324,9 +323,8 @@ protected:
timers.size() + start_of_timer_guard_conds; timers.size() + start_of_timer_guard_conds;
rmw_guard_conditions_t guard_condition_handles; rmw_guard_conditions_t guard_condition_handles;
guard_condition_handles.guard_condition_count = number_of_guard_conds; guard_condition_handles.guard_condition_count = number_of_guard_conds;
// TODO(wjwwood): Avoid redundant malloc's guard_condition_handles.guard_conditions =
guard_condition_handles.guard_conditions = static_cast<void **>( memory_strategy_->borrow_handles(HandleType::guard_condition_handle, number_of_guard_conds);
std::malloc(sizeof(void *) * number_of_guard_conds));
if (guard_condition_handles.guard_conditions == NULL) { if (guard_condition_handles.guard_conditions == NULL) {
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc? // TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for guard condition pointers."); throw std::runtime_error("Could not malloc for guard condition pointers.");
@ -355,12 +353,15 @@ protected:
nonblocking); nonblocking);
// If ctrl-c guard condition, return directly // If ctrl-c guard condition, return directly
if (guard_condition_handles.guard_conditions[0] != 0) { if (guard_condition_handles.guard_conditions[0] != 0) {
// Make sure to free memory // Make sure to free or clean memory
// TODO(wjwwood): Remove theses when the "Avoid redundant malloc's" memory_strategy_->return_handles(HandleType::subscriber_handle,
// todo has been addressed. subscriber_handles.subscribers);
std::free(subscriber_handles.subscribers); memory_strategy_->return_handles(HandleType::service_handle,
std::free(service_handles.services); service_handles.services);
std::free(guard_condition_handles.guard_conditions); memory_strategy_->return_handles(HandleType::client_handle,
client_handles.clients);
memory_strategy_->return_handles(HandleType::guard_condition_handle,
guard_condition_handles.guard_conditions);
return; return;
} }
// Add the new work to the class's list of things waiting to be executed // Add the new work to the class's list of things waiting to be executed
@ -393,12 +394,15 @@ protected:
} }
} }
// Make sure to free memory memory_strategy_->return_handles(HandleType::subscriber_handle,
// TODO(wjwwood): Remove theses when the "Avoid redundant malloc's" subscriber_handles.subscribers);
// todo has been addressed. memory_strategy_->return_handles(HandleType::service_handle,
std::free(subscriber_handles.subscribers); service_handles.services);
std::free(service_handles.services); memory_strategy_->return_handles(HandleType::client_handle,
std::free(guard_condition_handles.guard_conditions); client_handles.clients);
memory_strategy_->return_handles(HandleType::guard_condition_handle,
guard_condition_handles.guard_conditions);
} }
/******************************/ /******************************/
@ -566,7 +570,7 @@ protected:
} }
void void
get_next_timer(std::shared_ptr<AnyExecutable> & any_exec) get_next_timer(AnyExecutable::SharedPtr & any_exec)
{ {
for (auto handle : guard_condition_handles_) { for (auto handle : guard_condition_handles_) {
auto timer = get_timer_by_handle(handle); auto timer = get_timer_by_handle(handle);
@ -618,7 +622,7 @@ protected:
} }
void void
get_next_subscription(std::shared_ptr<AnyExecutable> & any_exec) get_next_subscription(AnyExecutable::SharedPtr & any_exec)
{ {
for (auto handle : subscriber_handles_) { for (auto handle : subscriber_handles_) {
auto subscription = get_subscription_by_handle(handle); auto subscription = get_subscription_by_handle(handle);
@ -670,7 +674,7 @@ protected:
} }
void void
get_next_service(std::shared_ptr<AnyExecutable> & any_exec) get_next_service(AnyExecutable::SharedPtr & any_exec)
{ {
for (auto handle : service_handles_) { for (auto handle : service_handles_) {
auto service = get_service_by_handle(handle); auto service = get_service_by_handle(handle);
@ -722,7 +726,7 @@ protected:
} }
void void
get_next_client(std::shared_ptr<AnyExecutable> & any_exec) get_next_client(AnyExecutable::SharedPtr & any_exec)
{ {
for (auto handle : client_handles_) { for (auto handle : client_handles_) {
auto client = get_client_by_handle(handle); auto client = get_client_by_handle(handle);
@ -752,10 +756,15 @@ protected:
} }
} }
std::shared_ptr<AnyExecutable> AnyExecutable::SharedPtr
get_next_ready_executable() get_next_ready_executable()
{ {
std::shared_ptr<AnyExecutable> any_exec(new AnyExecutable()); return get_next_ready_executable(this->memory_strategy_->instantiate_next_executable());
}
AnyExecutable::SharedPtr
get_next_ready_executable(AnyExecutable::SharedPtr any_exec)
{
// Check the timers to see if there are any that are ready, if so return // Check the timers to see if there are any that are ready, if so return
get_next_timer(any_exec); get_next_timer(any_exec);
if (any_exec->timer) { if (any_exec->timer) {
@ -781,7 +790,7 @@ protected:
return any_exec; return any_exec;
} }
std::shared_ptr<AnyExecutable> AnyExecutable::SharedPtr
get_next_executable(bool nonblocking = false) get_next_executable(bool nonblocking = false)
{ {
// Check to see if there are any subscriptions or timers needing service // Check to see if there are any subscriptions or timers needing service
@ -813,6 +822,8 @@ protected:
rmw_guard_condition_t * interrupt_guard_condition_; rmw_guard_condition_t * interrupt_guard_condition_;
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
private: private:
RCLCPP_DISABLE_COPY(Executor); RCLCPP_DISABLE_COPY(Executor);

View file

@ -79,7 +79,7 @@ private:
{ {
rclcpp::thread_id = this_thread_id; rclcpp::thread_id = this_thread_id;
while (rclcpp::utilities::ok()) { while (rclcpp::utilities::ok()) {
std::shared_ptr<AnyExecutable> any_exec; executor::AnyExecutable::SharedPtr any_exec;
{ {
std::lock_guard<std::mutex> wait_lock(wait_mutex_); std::lock_guard<std::mutex> wait_lock(wait_mutex_);
if (!rclcpp::utilities::ok()) { if (!rclcpp::utilities::ok()) {

View file

@ -24,6 +24,7 @@
#include <rclcpp/executor.hpp> #include <rclcpp/executor.hpp>
#include <rclcpp/macros.hpp> #include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategies.hpp>
#include <rclcpp/node.hpp> #include <rclcpp/node.hpp>
#include <rclcpp/utilities.hpp> #include <rclcpp/utilities.hpp>
#include <rclcpp/rate.hpp> #include <rclcpp/rate.hpp>

View file

@ -0,0 +1,30 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_
#define RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_
#include <rclcpp/strategies/static_memory_strategy.hpp>
namespace rclcpp
{
namespace memory_strategies
{
using rclcpp::memory_strategies::static_memory_strategy::StaticMemoryStrategy;
} /* memory_strategies */
} /* rclcpp */
#endif

View file

@ -0,0 +1,76 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_
#include <rclcpp/any_executable.hpp>
namespace rclcpp
{
// TODO move HandleType somewhere where it makes sense
enum class HandleType {subscriber_handle, service_handle, client_handle, guard_condition_handle};
namespace executor
{
class Executor;
}
namespace memory_strategy
{
class MemoryStrategy
{
friend class executor::Executor;
public:
RCLCPP_MAKE_SHARED_DEFINITIONS(MemoryStrategy);
virtual void ** borrow_handles(HandleType type, size_t number_of_handles)
{
return static_cast<void **>(alloc(sizeof(void *) * number_of_handles));
}
virtual void return_handles(HandleType type, void ** handles)
{
this->free(handles);
}
virtual executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
return executor::AnyExecutable::SharedPtr(new executor::AnyExecutable);
}
virtual void * alloc(size_t size)
{
return std::malloc(size);
}
virtual void free(void * ptr)
{
return std::free(ptr);
}
};
MemoryStrategy::SharedPtr create_default_strategy()
{
return std::make_shared<MemoryStrategy>(MemoryStrategy());
}
} /* memory_strategy */
} /* rclcpp */
#endif

View file

@ -0,0 +1,177 @@
// Copyright 2015 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP_RCLCPP_STATIC_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_STATIC_MEMORY_STRATEGY_HPP_
#include <unordered_map>
#include <rclcpp/memory_strategy.hpp>
namespace rclcpp
{
namespace memory_strategies
{
namespace static_memory_strategy
{
class StaticMemoryStrategy : public memory_strategy::MemoryStrategy
{
public:
StaticMemoryStrategy()
{
memset(memory_pool_, 0, pool_size_);
memset(subscriber_pool_, 0, max_subscribers_);
memset(service_pool_, 0, max_services_);
memset(client_pool_, 0, max_clients_);
memset(guard_condition_pool_, 0, max_guard_conditions_);
pool_seq_ = 0;
exec_seq_ = 0;
// Reserve pool_size_ buckets in the memory map.
memory_map_.reserve(pool_size_);
for (size_t i = 0; i < pool_size_; ++i) {
memory_map_[memory_pool_[i]] = 0;
}
for (size_t i = 0; i < max_executables_; ++i) {
executable_pool_[i] = std::make_shared<executor::AnyExecutable>(executor::AnyExecutable());
}
}
void ** borrow_handles(HandleType type, size_t number_of_handles)
{
switch (type) {
case HandleType::subscriber_handle:
if (number_of_handles > max_subscribers_) {
throw std::runtime_error("Requested size exceeded maximum subscribers.");
}
return subscriber_pool_;
case HandleType::service_handle:
if (number_of_handles > max_services_) {
throw std::runtime_error("Requested size exceeded maximum services.");
}
return service_pool_;
case HandleType::client_handle:
if (number_of_handles > max_clients_) {
throw std::runtime_error("Requested size exceeded maximum clients.");
}
return client_pool_;
case HandleType::guard_condition_handle:
if (number_of_handles > max_guard_conditions_) {
throw std::runtime_error("Requested size exceeded maximum guard conditions.");
}
return guard_condition_pool_;
default:
break;
}
throw std::runtime_error("Unrecognized enum, could not borrow handle memory.");
}
void return_handles(HandleType type, void ** handles)
{
switch (type) {
case HandleType::subscriber_handle:
memset(subscriber_pool_, 0, max_subscribers_);
break;
case HandleType::service_handle:
memset(service_pool_, 0, max_services_);
break;
case HandleType::client_handle:
memset(client_pool_, 0, max_clients_);
break;
case HandleType::guard_condition_handle:
memset(guard_condition_pool_, 0, max_guard_conditions_);
break;
default:
throw std::runtime_error("Unrecognized enum, could not return handle memory.");
}
}
executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
if (exec_seq_ >= max_executables_) {
// wrap around
exec_seq_ = 0;
}
size_t prev_exec_seq_ = exec_seq_;
++exec_seq_;
return executable_pool_[prev_exec_seq_];
}
void * alloc(size_t size)
{
// Extremely naive static allocation strategy
// Keep track of block size at a given pointer
if (pool_seq_ + size > pool_size_) {
// Start at 0
pool_seq_ = 0;
}
void * ptr = memory_pool_[pool_seq_];
if (memory_map_.count(ptr) == 0) {
// We expect to have the state for all blocks pre-mapped into memory_map_
throw std::runtime_error("Unexpected pointer in rcl_malloc.");
}
memory_map_[ptr] = size;
size_t prev_pool_seq = pool_seq_;
pool_seq_ += size;
return memory_pool_[prev_pool_seq];
}
void free(void * ptr)
{
if (memory_map_.count(ptr) == 0) {
// We expect to have the state for all blocks pre-mapped into memory_map_
throw std::runtime_error("Unexpected pointer in rcl_free.");
}
memset(ptr, 0, memory_map_[ptr]);
}
private:
static const size_t pool_size_ = 1024;
static const size_t max_subscribers_ = 10;
static const size_t max_services_ = 5;
static const size_t max_clients_ = 10;
static const size_t max_guard_conditions_ = 50;
static const size_t max_executables_ = 1;
void * memory_pool_[pool_size_];
void * subscriber_pool_[max_subscribers_];
void * service_pool_[max_services_];
void * client_pool_[max_clients_];
void * guard_condition_pool_[max_guard_conditions_];
executor::AnyExecutable::SharedPtr executable_pool_[max_executables_];
size_t pool_seq_;
size_t exec_seq_;
std::unordered_map<void *, size_t> memory_map_;
};
} /* static_memory_strategy */
} /* memory_strategies */
} /* rclcpp */
#endif