From fc48cf5fa2fbaae4e50262586eb0ae4b746b4ab3 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Tue, 27 Oct 2015 11:55:01 -0700 Subject: [PATCH] remove pool memory strategies --- .../rclcpp/any_subscription_callback.hpp | 3 +- rclcpp/include/rclcpp/executor.hpp | 422 +----------------- .../executors/multi_threaded_executor.hpp | 2 +- .../executors/single_threaded_executor.hpp | 2 +- .../include/rclcpp/intra_process_manager.hpp | 5 +- .../rclcpp/intra_process_manager_state.hpp | 46 +- rclcpp/include/rclcpp/memory_strategies.hpp | 13 +- rclcpp/include/rclcpp/memory_strategy.hpp | 138 +++++- .../strategies/allocator_memory_strategy.hpp | 214 ++------- .../strategies/heap_pool_memory_strategy.hpp | 336 -------------- .../strategies/stack_pool_memory_strategy.hpp | 219 --------- rclcpp/test/test_intra_process_manager.cpp | 6 +- 12 files changed, 227 insertions(+), 1179 deletions(-) delete mode 100644 rclcpp/include/rclcpp/strategies/heap_pool_memory_strategy.hpp delete mode 100644 rclcpp/include/rclcpp/strategies/stack_pool_memory_strategy.hpp diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index cb9a23c..bdce88b 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -45,7 +45,8 @@ class AnySubscriptionCallback using ConstSharedPtrWithInfoCallback = std::function, const rmw_message_info_t &)>; using UniquePtrCallback = std::function; - using UniquePtrWithInfoCallback = std::function; + using UniquePtrWithInfoCallback = + std::function; SharedPtrCallback shared_ptr_callback_; SharedPtrWithInfoCallback shared_ptr_with_info_callback_; diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 97fdd33..b567aef 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -48,7 +49,6 @@ namespace executor */ class Executor { - friend class memory_strategy::MemoryStrategy; public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor); @@ -56,7 +56,7 @@ public: /// Default constructor. // \param[in] ms The memory strategy to be used with this executor. explicit Executor(memory_strategy::MemoryStrategy::SharedPtr ms = - memory_strategy::create_default_strategy()) + memory_strategies::create_default_strategy()) : interrupt_guard_condition_(rmw_create_guard_condition()), memory_strategy_(ms) { @@ -343,45 +343,11 @@ protected: void wait_for_work(std::chrono::duration timeout = std::chrono::duration(-1)) { - /* - memory_strategy_->subs.clear(); - memory_strategy_->services.clear(); - memory_strategy_->clients.clear(); - */ memory_strategy_->clear_active_entities(); // Collect the subscriptions and timers to be waited on bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); - /* - bool has_invalid_weak_nodes = false; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - has_invalid_weak_nodes = false; - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - memory_strategy_->subs.push_back(subscription); - } - } - for (auto & service : group->get_service_ptrs()) { - memory_strategy_->services.push_back(service); - } - for (auto & client : group->get_client_ptrs()) { - memory_strategy_->clients.push_back(client); - } - } - } - */ - // Clean up any invalid nodes, if they were detected if (has_invalid_weak_nodes) { weak_nodes_.erase( @@ -406,76 +372,12 @@ protected: client_handles.client_count = memory_strategy_->fill_client_handles(client_handles.clients); - /* - size_t max_number_of_subscriptions = memory_strategy_->subs.size() * 2; // Times two for intra-process. - rmw_subscriptions_t subscriber_handles; - subscriber_handles.subscriber_count = 0; - // TODO(wjwwood): Avoid redundant malloc's - subscriber_handles.subscribers = memory_strategy_->borrow_handles( - HandleType::subscription_handle, max_number_of_subscriptions); - if (subscriber_handles.subscribers == NULL && - max_number_of_subscriptions > 0) - { - // TODO(wjwwood): Use a different error here? maybe std::bad_alloc? - throw std::runtime_error("Could not malloc for subscriber pointers."); - } - // Then fill the SubscriberHandles with ready subscriptions - for (auto & subscription : memory_strategy_->subs) { - subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = - subscription->get_subscription_handle()->data; - if (subscription->get_intra_process_subscription_handle()) { - subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = - subscription->get_intra_process_subscription_handle()->data; - } - } - - // Use the number of services to allocate memory in the handles - size_t number_of_services = memory_strategy_->services.size(); - rmw_services_t service_handles; - service_handles.service_count = number_of_services; - service_handles.services = - memory_strategy_->borrow_handles(HandleType::service_handle, number_of_services); - if (service_handles.services == NULL && - number_of_services > 0) - { - // TODO(esteve): Use a different error here? maybe std::bad_alloc? - throw std::runtime_error("Could not malloc for service pointers."); - } - // Then fill the ServiceHandles with ready services - size_t service_handle_index = 0; - for (auto & service : memory_strategy_->services) { - service_handles.services[service_handle_index] = \ - service->get_service_handle()->data; - service_handle_index += 1; - } - - // Use the number of clients to allocate memory in the handles - size_t number_of_clients = memory_strategy_->clients.size(); - rmw_clients_t client_handles; - client_handles.client_count = number_of_clients; - client_handles.clients = - memory_strategy_->borrow_handles(HandleType::client_handle, number_of_clients); - if (client_handles.clients == NULL && number_of_clients > 0) { - // TODO: Use a different error here? maybe std::bad_alloc? - throw std::runtime_error("Could not malloc for client pointers."); - } - // Then fill the ServiceHandles with ready clients - size_t client_handle_index = 0; - for (auto & client : memory_strategy_->clients) { - client_handles.clients[client_handle_index] = \ - client->get_client_handle()->data; - client_handle_index += 1; - } - */ - // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, // and one for the executor's guard cond (interrupt_guard_condition_) size_t number_of_guard_conds = 2; rmw_guard_conditions_t guard_condition_handles; guard_condition_handles.guard_condition_count = number_of_guard_conds; - /*guard_condition_handles.guard_conditions = - memory_strategy_->borrow_handles(HandleType::guard_condition_handle, number_of_guard_conds);*/ - guard_condition_handles.guard_conditions = static_cast(guard_cond_handles_.data()); + guard_condition_handles.guard_conditions = static_cast(guard_cond_handles_.data()); if (guard_condition_handles.guard_conditions == NULL && number_of_guard_conds > 0) { @@ -526,140 +428,15 @@ protected: // If ctrl-c guard condition, return directly if (guard_condition_handles.guard_conditions[0] != 0) { // Make sure to free or clean memory - /* - memory_strategy_->return_handles(HandleType::subscription_handle, - subscriber_handles.subscribers); - memory_strategy_->return_handles(HandleType::service_handle, - service_handles.services); - memory_strategy_->return_handles(HandleType::client_handle, - client_handles.clients); - memory_strategy_->return_handles(HandleType::guard_condition_handle, - guard_condition_handles.guard_conditions); - */ memory_strategy_->clear_handles(); return; } // Add the new work to the class's list of things waiting to be executed // Starting with the subscribers - /* - for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) { - void * handle = subscriber_handles.subscribers[i]; - if (handle) { - subscriber_handles_.push_back(handle); - } - } - // Then the services - for (size_t i = 0; i < number_of_services; ++i) { - void * handle = service_handles.services[i]; - if (handle) { - service_handles_.push_back(handle); - } - } - // Then the clients - for (size_t i = 0; i < number_of_clients; ++i) { - void * handle = client_handles.clients[i]; - if (handle) { - client_handles_.push_back(handle); - } - } - - memory_strategy_->return_handles(HandleType::subscription_handle, - subscriber_handles.subscribers); - memory_strategy_->return_handles(HandleType::service_handle, - service_handles.services); - memory_strategy_->return_handles(HandleType::client_handle, - client_handles.clients); - memory_strategy_->return_handles(HandleType::guard_condition_handle, - guard_condition_handles.guard_conditions); - - memory_strategy_->subs.clear(); - memory_strategy_->services.clear(); - memory_strategy_->clients.clear(); - */ memory_strategy_->clear_active_entities(); } /******************************/ - -/* - rclcpp::subscription::SubscriptionBase::SharedPtr - get_subscription_by_handle(void * subscriber_handle) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - if (subscription->get_subscription_handle()->data == subscriber_handle) { - return subscription; - } - if (subscription->get_intra_process_subscription_handle() && - subscription->get_intra_process_subscription_handle()->data == subscriber_handle) - { - return subscription; - } - } - } - } - } - return nullptr; - } -*/ - - rclcpp::service::ServiceBase::SharedPtr - get_service_by_handle(void * service_handle) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & service : group->get_service_ptrs()) { - if (service->get_service_handle()->data == service_handle) { - return service; - } - } - } - } - return rclcpp::service::ServiceBase::SharedPtr(); - } - - rclcpp::client::ClientBase::SharedPtr - get_client_by_handle(void * client_handle) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & client : group->get_client_ptrs()) { - if (client->get_client_handle()->data == client_handle) { - return client; - } - } - } - } - return rclcpp::client::ClientBase::SharedPtr(); - } - rclcpp::node::Node::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group) { @@ -763,189 +540,6 @@ protected: return latest; } - /* - rclcpp::callback_group::CallbackGroup::SharedPtr - get_group_by_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & weak_sub : group->get_subscription_ptrs()) { - auto sub = weak_sub.lock(); - if (sub == subscription) { - return group; - } - } - } - } - return rclcpp::callback_group::CallbackGroup::SharedPtr(); - } - - void - get_next_subscription(AnyExecutable::SharedPtr any_exec) - { - auto it = subscriber_handles_.begin(); - while (it != subscriber_handles_.end()) { - auto subscription = get_subscription_by_handle(*it); - if (subscription) { - // Figure out if this is for intra-process or not. - bool is_intra_process = false; - if (subscription->get_intra_process_subscription_handle()) { - is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it; - } - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_subscription(subscription); - if (!group) { - // Group was not found, meaning the subscription is not valid... - // Remove it from the ready list and continue looking - subscriber_handles_.erase(it++); - continue; - } - if (!group->can_be_taken_from().load()) { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - if (is_intra_process) { - any_exec->subscription_intra_process = subscription; - } else { - any_exec->subscription = subscription; - } - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group); - subscriber_handles_.erase(it++); - return; - } - // Else, the subscription is no longer valid, remove it and continue - subscriber_handles_.erase(it++); - } - } - - rclcpp::callback_group::CallbackGroup::SharedPtr - get_group_by_service( - rclcpp::service::ServiceBase::SharedPtr service) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & serv : group->get_service_ptrs()) { - if (serv == service) { - return group; - } - } - } - } - return rclcpp::callback_group::CallbackGroup::SharedPtr(); - } - - void - get_next_service(AnyExecutable::SharedPtr any_exec) - { - auto it = service_handles_.begin(); - while (it != service_handles_.end()) { - auto service = get_service_by_handle(*it); - if (service) { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_service(service); - if (!group) { - // Group was not found, meaning the service is not valid... - // Remove it from the ready list and continue looking - service_handles_.erase(it++); - continue; - } - if (!group->can_be_taken_from().load()) { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec->service = service; - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group); - service_handles_.erase(it++); - return; - } - // Else, the service is no longer valid, remove it and continue - service_handles_.erase(it++); - } - } - - rclcpp::callback_group::CallbackGroup::SharedPtr - get_group_by_client( - rclcpp::client::ClientBase::SharedPtr client) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & cli : group->get_client_ptrs()) { - if (cli == client) { - return group; - } - } - } - } - return rclcpp::callback_group::CallbackGroup::SharedPtr(); - } - - void - get_next_client(AnyExecutable::SharedPtr any_exec) - { - auto it = client_handles_.begin(); - while (it != client_handles_.end()) { - auto client = get_client_by_handle(*it); - if (client) { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_client(client); - if (!group) { - // Group was not found, meaning the service is not valid... - // Remove it from the ready list and continue looking - client_handles_.erase(it++); - continue; - } - if (!group->can_be_taken_from().load()) { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec->client = client; - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group); - client_handles_.erase(it++); - return; - } - // Else, the service is no longer valid, remove it and continue - client_handles_.erase(it++); - } - } -*/ - AnyExecutable::SharedPtr get_next_ready_executable() { @@ -1016,15 +610,7 @@ private: RCLCPP_DISABLE_COPY(Executor); std::vector> weak_nodes_; - std::array guard_cond_handles_; - /* - using SubscriberHandles = std::list; - SubscriberHandles subscriber_handles_; - using ServiceHandles = std::list; - ServiceHandles service_handles_; - using ClientHandles = std::list; - ClientHandles client_handles_; - */ + std::array guard_cond_handles_; }; } /* executor */ diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index a251560..7cfea87 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -41,7 +41,7 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(MultiThreadedExecutor); MultiThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms = - memory_strategy::create_default_strategy()) + memory_strategies::create_default_strategy()) : executor::Executor(ms) { number_of_threads_ = std::thread::hardware_concurrency(); diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9155956..c082296 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -45,7 +45,7 @@ public: /// Default constructor. See the default constructor for Executor. SingleThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms = - memory_strategy::create_default_strategy()) + memory_strategies::create_default_strategy()) : executor::Executor(ms) {} /// Default destrcutor. diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 6ef4f18..4790ae1 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -124,7 +124,8 @@ public: //IntraProcessManager() = default; IntraProcessManager(IntraProcessManagerStateBase::SharedPtr state = create_default_state()) - : state_(state) { + : state_(state) + { } /// Register a subscription with the manager, returns subscriptions unique id. @@ -317,7 +318,7 @@ public: message_sequence_number, requesting_subscriptions_intra_process_id, buffer - ); + ); typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); if (!typed_buffer) { return; diff --git a/rclcpp/include/rclcpp/intra_process_manager_state.hpp b/rclcpp/include/rclcpp/intra_process_manager_state.hpp index 906d9a2..cc59e6f 100644 --- a/rclcpp/include/rclcpp/intra_process_manager_state.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager_state.hpp @@ -32,7 +32,8 @@ namespace rclcpp namespace intra_process_manager { -class IntraProcessManagerStateBase { +class IntraProcessManagerStateBase +{ public: //RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManagerStateBase); using SharedPtr = std::shared_ptr; @@ -44,9 +45,9 @@ public: remove_subscription(uint64_t intra_process_subscription_id) = 0; virtual void add_publisher(uint64_t id, - publisher::PublisherBase::WeakPtr publisher, - mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, - size_t size) = 0; + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) = 0; virtual void remove_publisher(uint64_t intra_process_publisher_id) = 0; @@ -70,17 +71,19 @@ public: }; template> -class IntraProcessManagerState : public IntraProcessManagerStateBase { +class IntraProcessManagerState : public IntraProcessManagerStateBase +{ public: - void - add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) { + add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) + { subscriptions_[id] = subscription; subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); } void - remove_subscription(uint64_t intra_process_subscription_id) { + remove_subscription(uint64_t intra_process_subscription_id) + { subscriptions_.erase(intra_process_subscription_id); for (auto & pair : subscription_ids_by_topic_) { pair.second.erase(intra_process_subscription_id); @@ -96,9 +99,10 @@ public: } void add_publisher(uint64_t id, - publisher::PublisherBase::WeakPtr publisher, - mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, - size_t size) { + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) + { publishers_[id].publisher = publisher; // As long as the size of the ring buffer is less than the max sequence number, we're safe. @@ -112,7 +116,8 @@ public: } void - remove_publisher(uint64_t intra_process_publisher_id) { + remove_publisher(uint64_t intra_process_publisher_id) + { publishers_.erase(intra_process_publisher_id); } @@ -220,7 +225,6 @@ public: return false; } - private: template using RebindAlloc = typename std::allocator_traits::template rebind_alloc; @@ -229,10 +233,10 @@ private: using AllocString = std::string; using AllocSet = std::set, RebindAlloc>; using SubscriptionMap = std::unordered_map, std::equal_to, - RebindAlloc>>; + std::hash, std::equal_to, + RebindAlloc>>; using IDTopicMap = std::map, RebindAlloc>>; + std::less, RebindAlloc>>; SubscriptionMap subscriptions_; @@ -249,19 +253,21 @@ private: mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; using TargetSubscriptionsMap = std::unordered_map, std::equal_to, RebindAlloc>>; + std::hash, std::equal_to, + RebindAlloc>>; TargetSubscriptionsMap target_subscriptions_by_message_sequence; }; using PublisherMap = std::unordered_map, std::equal_to, - RebindAlloc>>; + std::hash, std::equal_to, + RebindAlloc>>; PublisherMap publishers_; }; -static IntraProcessManagerStateBase::SharedPtr create_default_state() { +static IntraProcessManagerStateBase::SharedPtr create_default_state() +{ return std::make_shared>(); } diff --git a/rclcpp/include/rclcpp/memory_strategies.hpp b/rclcpp/include/rclcpp/memory_strategies.hpp index b69ed51..fee7c25 100644 --- a/rclcpp/include/rclcpp/memory_strategies.hpp +++ b/rclcpp/include/rclcpp/memory_strategies.hpp @@ -17,6 +17,7 @@ //#include //#include +#include #include namespace rclcpp @@ -28,14 +29,12 @@ using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrat //using rclcpp::memory_strategies::heap_pool_memory_strategy::HeapPoolMemoryStrategy; //using rclcpp::memory_strategies::stack_pool_memory_strategy::StackPoolMemoryStrategy; +static memory_strategy::MemoryStrategy::SharedPtr create_default_strategy() +{ + return std::make_shared>(); +} + } /* memory_strategies */ - -namespace memory_strategy { - static MemoryStrategy::SharedPtr create_default_strategy() { - return std::make_shared>(); - } -} /* memory_strategy */ - } /* rclcpp */ #endif diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 22291bd..cdc27cf 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -24,10 +24,10 @@ namespace rclcpp { - namespace memory_strategy { + /// Delegate for handling memory allocations while the Executor is executing. /** * By default, the memory strategy dynamically allocates memory for structures that come in from @@ -38,16 +38,16 @@ class MemoryStrategy { public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy); - using WeakNodeVector = typename std::vector>>; + using WeakNodeVector = std::vector>; // return the new number of subscribers - virtual size_t fill_subscriber_handles(void ** ptr) = 0; + virtual size_t fill_subscriber_handles(void ** & ptr) = 0; // return the new number of services - virtual size_t fill_service_handles(void ** ptr) = 0; + virtual size_t fill_service_handles(void ** & ptr) = 0; // return the new number of clients - virtual size_t fill_client_handles(void ** ptr) = 0; + virtual size_t fill_client_handles(void ** & ptr) = 0; virtual void clear_active_entities() = 0; @@ -70,6 +70,134 @@ public: get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) = 0; + static rclcpp::subscription::SubscriptionBase::SharedPtr + get_subscription_by_handle(void * subscriber_handle, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_subscription : group->get_subscription_ptrs()) { + auto subscription = weak_subscription.lock(); + if (subscription) { + if (subscription->get_subscription_handle()->data == subscriber_handle) { + return subscription; + } + if (subscription->get_intra_process_subscription_handle() && + subscription->get_intra_process_subscription_handle()->data == subscriber_handle) + { + return subscription; + } + } + } + } + } + return nullptr; + } + + static rclcpp::service::ServiceBase::SharedPtr + get_service_by_handle(void * service_handle, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & service : group->get_service_ptrs()) { + if (service->get_service_handle()->data == service_handle) { + return service; + } + } + } + } + return rclcpp::service::ServiceBase::SharedPtr(); + } + + static rclcpp::client::ClientBase::SharedPtr + get_client_by_handle(void * client_handle, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & client : group->get_client_ptrs()) { + if (client->get_client_handle()->data == client_handle) { + return client; + } + } + } + } + return rclcpp::client::ClientBase::SharedPtr(); + } + + static rclcpp::node::Node::SharedPtr + get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, + const WeakNodeVector & weak_nodes) + { + if (!group) { + return rclcpp::node::Node::SharedPtr(); + } + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto callback_group = weak_group.lock(); + if (callback_group == group) { + return node; + } + } + } + return rclcpp::node::Node::SharedPtr(); + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_sub : group->get_subscription_ptrs()) { + auto sub = weak_sub.lock(); + if (sub == subscription) { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + }; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index c0f86fc..1117ca6 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -20,6 +20,7 @@ #include #include +#include namespace rclcpp { @@ -43,14 +44,13 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy public: RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy); - using ExecAllocTraits = allocator::AllocRebind; using ExecAlloc = typename ExecAllocTraits::allocator_type; using ExecDeleter = allocator::Deleter; using VoidAllocTraits = typename allocator::AllocRebind; using VoidAlloc = typename VoidAllocTraits::allocator_type; - using NodeVector = typename std::vector>>; + using WeakNodeVector = std::vector>; AllocatorMemoryStrategy(std::shared_ptr allocator) { @@ -64,11 +64,10 @@ public: allocator_ = std::make_shared(); } - size_t fill_subscriber_handles(void ** ptr) { + size_t fill_subscriber_handles(void ** & ptr) + { size_t max_size = subscriptions_.size(); - if (subscriber_handles_.size() < max_size) { - subscriber_handles_.resize(max_size); - } + subscriber_handles_.resize(max_size); size_t count = 0; for (auto & subscription : subscriptions_) { subscriber_handles_[count++] = subscription->get_subscription_handle()->data; @@ -77,12 +76,13 @@ public: subscription->get_intra_process_subscription_handle()->data; } } - ptr = static_cast(subscriber_handles_.data()); + ptr = subscriber_handles_.data(); return count; } // return the new number of services - size_t fill_service_handles(void ** ptr) { + size_t fill_service_handles(void ** & ptr) + { size_t max_size = services_.size(); if (service_handles_.size() < max_size) { service_handles_.resize(max_size); @@ -91,12 +91,13 @@ public: for (auto & service : services_) { service_handles_[count++] = service->get_service_handle()->data; } - ptr = static_cast(service_handles_.data()); + ptr = service_handles_.data(); return count; } // return the new number of clients - size_t fill_client_handles(void ** ptr) { + size_t fill_client_handles(void ** & ptr) + { size_t max_size = clients_.size(); if (client_handles_.size() < max_size) { client_handles_.resize(max_size); @@ -105,23 +106,26 @@ public: for (auto & client : clients_) { client_handles_[count++] = client->get_client_handle()->data; } - ptr = static_cast(client_handles_.data()); + ptr = client_handles_.data(); return count; } - void clear_active_entities() { + void clear_active_entities() + { subscriptions_.clear(); services_.clear(); clients_.clear(); } - void clear_handles() { + void clear_handles() + { subscriber_handles_.clear(); service_handles_.clear(); client_handles_.clear(); } - bool collect_entities(const NodeVector & weak_nodes) { + bool collect_entities(const WeakNodeVector & weak_nodes) + { bool has_invalid_weak_nodes = false; for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -159,143 +163,15 @@ public: return std::allocate_shared(*executable_allocator_.get()); } - static rclcpp::subscription::SubscriptionBase::SharedPtr - get_subscription_by_handle(void * subscriber_handle, - const NodeVector & weak_nodes) - { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - if (subscription->get_subscription_handle()->data == subscriber_handle) { - return subscription; - } - if (subscription->get_intra_process_subscription_handle() && - subscription->get_intra_process_subscription_handle()->data == subscriber_handle) - { - return subscription; - } - } - } - } - } - return nullptr; - } - - static rclcpp::service::ServiceBase::SharedPtr - get_service_by_handle(void * service_handle, - const NodeVector & weak_nodes) - { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & service : group->get_service_ptrs()) { - if (service->get_service_handle()->data == service_handle) { - return service; - } - } - } - } - return rclcpp::service::ServiceBase::SharedPtr(); - } - - static rclcpp::client::ClientBase::SharedPtr - get_client_by_handle(void * client_handle, - const NodeVector & weak_nodes) - { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & client : group->get_client_ptrs()) { - if (client->get_client_handle()->data == client_handle) { - return client; - } - } - } - } - return rclcpp::client::ClientBase::SharedPtr(); - } - - static rclcpp::node::Node::SharedPtr - get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, - const NodeVector & weak_nodes) - { - if (!group) { - return rclcpp::node::Node::SharedPtr(); - } - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto callback_group = weak_group.lock(); - if (callback_group == group) { - return node; - } - } - } - return rclcpp::node::Node::SharedPtr(); - } - - static rclcpp::callback_group::CallbackGroup::SharedPtr - get_group_by_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription, - const NodeVector & weak_nodes) - { - for (auto & weak_node : weak_nodes) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & weak_sub : group->get_subscription_ptrs()) { - auto sub = weak_sub.lock(); - if (sub == subscription) { - return group; - } - } - } - } - return rclcpp::callback_group::CallbackGroup::SharedPtr(); - } - - - virtual void get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, - const NodeVector & weak_nodes) + const WeakNodeVector & weak_nodes) { auto it = subscriber_handles_.begin(); + VectorRebind::iterator> to_erase; while (it != subscriber_handles_.end()) { - auto subscription = get_subscription_by_handle(*it, weak_nodes); + auto subscription = memory_strategy::MemoryStrategy::get_subscription_by_handle(*it, + weak_nodes); if (subscription) { // Figure out if this is for intra-process or not. bool is_intra_process = false; @@ -307,7 +183,8 @@ public: if (!group) { // Group was not found, meaning the subscription is not valid... // Remove it from the ready list and continue looking - subscriber_handles_.erase(it++); + //subscriber_handles_.erase(it++); + to_erase.push_back(it++); continue; } if (!group->can_be_taken_from().load()) { @@ -324,18 +201,25 @@ public: } any_exec->callback_group = group; any_exec->node = get_node_by_group(group, weak_nodes); - subscriber_handles_.erase(it++); + to_erase.push_back(it++); return; } // Else, the subscription is no longer valid, remove it and continue - subscriber_handles_.erase(it++); + // TODO + to_erase.push_back(it++); } + for (auto & entry : to_erase) { + if (entry != subscriber_handles_.end()) { + subscriber_handles_.erase(entry); + } + } + } static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_service( rclcpp::service::ServiceBase::SharedPtr service, - const NodeVector & weak_nodes) + const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -359,11 +243,11 @@ public: virtual void get_next_service(executor::AnyExecutable::SharedPtr any_exec, - const NodeVector & weak_nodes) + const WeakNodeVector & weak_nodes) { auto it = service_handles_.begin(); while (it != service_handles_.end()) { - auto service = get_service_by_handle(*it, weak_nodes); + auto service = memory_strategy::MemoryStrategy::get_service_by_handle(*it, weak_nodes); if (service) { // Find the group for this handle and see if it can be serviced auto group = get_group_by_service(service, weak_nodes); @@ -394,7 +278,7 @@ public: static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_client( rclcpp::client::ClientBase::SharedPtr client, - const NodeVector & weak_nodes) + const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -418,11 +302,11 @@ public: virtual void get_next_client(executor::AnyExecutable::SharedPtr any_exec, - const NodeVector & weak_nodes) + const WeakNodeVector & weak_nodes) { auto it = client_handles_.begin(); while (it != client_handles_.end()) { - auto client = get_client_by_handle(*it, weak_nodes); + auto client = memory_strategy::MemoryStrategy::get_client_by_handle(*it, weak_nodes); if (client) { // Find the group for this handle and see if it can be serviced auto group = get_group_by_client(client, weak_nodes); @@ -450,22 +334,18 @@ public: } } - - private: template - using VectorRebind = typename std::allocator_traits::template rebind_alloc; + using VectorRebind = + std::vector::template rebind_alloc>; - std::vector> subscriptions_; - std::vector> services_; - std::vector> clients_; + VectorRebind subscriptions_; + VectorRebind services_; + VectorRebind clients_; - std::vector subscriber_handles_; - std::vector service_handles_; - std::vector client_handles_; + VectorRebind subscriber_handles_; + VectorRebind service_handles_; + VectorRebind client_handles_; std::shared_ptr executable_allocator_; std::shared_ptr allocator_; diff --git a/rclcpp/include/rclcpp/strategies/heap_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/heap_pool_memory_strategy.hpp deleted file mode 100644 index 8e7151c..0000000 --- a/rclcpp/include/rclcpp/strategies/heap_pool_memory_strategy.hpp +++ /dev/null @@ -1,336 +0,0 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP_RCLCPP_HEAP_POOL_MEMORY_STRATEGY_HPP_ -#define RCLCPP_RCLCPP_HEAP_POOL_MEMORY_STRATEGY_HPP_ - -#include - -#include - -namespace rclcpp -{ - -namespace memory_strategies -{ - -namespace heap_pool_memory_strategy -{ - -/// Representation of the upper bounds on the memory pools managed by StaticMemoryStrategy. -struct ObjectPoolBounds -{ -public: - size_t max_subscriptions; - size_t max_services; - size_t max_clients; - size_t max_executables; - size_t max_guard_conditions; - size_t pool_size; - - /// Default constructor attempts to set reasonable default limits on the fields. - ObjectPoolBounds() - : max_subscriptions(10), max_services(10), max_clients(10), - max_executables(1), max_guard_conditions(2), pool_size(1024) - {} - - // Setters implement named parameter idiom/method chaining - - /// Set the maximum number of subscriptions. - /** - * \param[in] subscriptions Maximum number of subscriptions. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_subscriptions(size_t subscriptions) - { - max_subscriptions = subscriptions; - return *this; - } - - /// Set the maximum number of services. - /** - * \param[in] services Maximum number of services. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_services(size_t services) - { - max_services = services; - return *this; - } - - /// Set the maximum number of clients. - /** - * \param[in] clients Maximum number of clients. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_clients(size_t clients) - { - max_clients = clients; - return *this; - } - - /// Set the maximum number of guard conditions. - /** - * \param[in] guard conditions Maximum number of guard conditions. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_guard_conditions(size_t guard_conditions) - { - max_guard_conditions = guard_conditions; - return *this; - } - - /// Set the maximum number of executables. - /** - * \param[in] executables Maximum number of executables. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_executables(size_t executables) - { - max_executables = executables; - return *this; - } - - /// Set the maximum memory pool size. - /** - * \param[in] executables Maximum memory pool size. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_memory_pool_size(size_t pool) - { - pool_size = pool; - return *this; - } -}; - - -/// Heap-based memory pool allocation strategy, an alternative to the default memory strategy. -/** - * The memory managed by this class is allocated dynamically in the constructor, but no subsequent - * accesses to the class (besides the destructor) allocate or free memory. - * HeapPoolMemoryStrategy puts a hard limit on the number of subscriptions, etc. that can be executed - * in one iteration of `Executor::spin`. Thus it allows for memory allocation optimization for - * situations where a limit on the number of such entities is known. - */ -class HeapPoolMemoryStrategy : public memory_strategy::MemoryStrategy -{ -public: - HeapPoolMemoryStrategy(ObjectPoolBounds bounds = ObjectPoolBounds()) - : bounds_(bounds), memory_pool_(nullptr), subscription_pool_(nullptr), - service_pool_(nullptr), guard_condition_pool_(nullptr), executable_pool_(nullptr) - { - if (bounds_.pool_size) { - memory_pool_ = new void *[bounds_.pool_size]; - memset(memory_pool_, 0, bounds_.pool_size * sizeof(void *)); - } - - if (bounds_.max_subscriptions) { - subscription_pool_ = new void *[bounds_.max_subscriptions]; - memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *)); - } - - if (bounds_.max_services) { - service_pool_ = new void *[bounds_.max_services]; - memset(service_pool_, 0, bounds_.max_services * sizeof(void *)); - } - - if (bounds_.max_clients) { - client_pool_ = new void *[bounds_.max_clients]; - memset(client_pool_, 0, bounds_.max_clients * sizeof(void *)); - } - - if (bounds_.max_guard_conditions) { - guard_condition_pool_ = new void *[bounds_.max_guard_conditions]; - memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *)); - } - - if (bounds_.max_executables) { - executable_pool_ = new executor::AnyExecutable::SharedPtr[bounds_.max_executables]; - } - - for (size_t i = 0; i < bounds_.max_executables; ++i) { - executable_pool_[i] = std::make_shared(); - } - - pool_seq_ = 0; - exec_seq_ = 0; - - // Reserve pool_size_ buckets in the memory map. - memory_map_.reserve(bounds_.pool_size); - for (size_t i = 0; i < bounds_.pool_size; ++i) { - memory_map_[memory_pool_[i]] = 0; - } - - subs.reserve(bounds_.max_subscriptions); - clients.reserve(bounds_.max_clients); - services.reserve(bounds_.max_services); - } - - /// Default destructor. Free all allocated memory. - ~HeapPoolMemoryStrategy() - { - if (bounds_.pool_size) { - delete[] memory_pool_; - } - if (bounds_.max_subscriptions) { - delete[] subscription_pool_; - } - if (bounds_.max_services) { - delete[] service_pool_; - } - if (bounds_.max_clients) { - delete[] client_pool_; - } - if (bounds_.max_guard_conditions) { - delete[] guard_condition_pool_; - } - } - - void ** borrow_handles(HandleType type, size_t number_of_handles) - { - switch (type) { - case HandleType::subscription_handle: - if (number_of_handles > bounds_.max_subscriptions) { - throw std::runtime_error("Requested size exceeded maximum subscriptions."); - } - - return subscription_pool_; - case HandleType::service_handle: - if (number_of_handles > bounds_.max_services) { - throw std::runtime_error("Requested size exceeded maximum services."); - } - - return service_pool_; - case HandleType::client_handle: - if (number_of_handles > bounds_.max_clients) { - throw std::runtime_error("Requested size exceeded maximum clients."); - } - - return client_pool_; - case HandleType::guard_condition_handle: - if (number_of_handles > bounds_.max_guard_conditions) { - throw std::runtime_error("Requested size exceeded maximum guard_conditions."); - } - - return guard_condition_pool_; - default: - break; - } - throw std::runtime_error("Unrecognized enum, could not borrow handle memory."); - } - - void return_handles(HandleType type, void ** handles) - { - (void)handles; - switch (type) { - case HandleType::subscription_handle: - if (bounds_.max_subscriptions) { - memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *)); - } - break; - case HandleType::service_handle: - if (bounds_.max_services) { - memset(service_pool_, 0, bounds_.max_services * sizeof(void *)); - } - break; - case HandleType::client_handle: - if (bounds_.max_clients) { - memset(client_pool_, 0, bounds_.max_clients * sizeof(void *)); - } - break; - case HandleType::guard_condition_handle: - if (bounds_.max_guard_conditions) { - memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *)); - } - break; - default: - throw std::runtime_error("Unrecognized enum, could not return handle memory."); - } - } - - executor::AnyExecutable::SharedPtr instantiate_next_executable() - { - if (exec_seq_ >= bounds_.max_executables) { - // wrap around - exec_seq_ = 0; - } - size_t prev_exec_seq_ = exec_seq_; - ++exec_seq_; - - if (!executable_pool_[prev_exec_seq_]) { - throw std::runtime_error("Executable pool member was NULL"); - } - - executable_pool_[prev_exec_seq_]->subscription.reset(); - executable_pool_[prev_exec_seq_]->timer.reset(); - executable_pool_[prev_exec_seq_]->service.reset(); - executable_pool_[prev_exec_seq_]->client.reset(); - executable_pool_[prev_exec_seq_]->callback_group.reset(); - executable_pool_[prev_exec_seq_]->node.reset(); - - return executable_pool_[prev_exec_seq_]; - } - - void * alloc(size_t size) - { - // Extremely naive static allocation strategy - // Keep track of block size at a given pointer - if (pool_seq_ + size > bounds_.pool_size) { - // Start at 0 - pool_seq_ = 0; - } - void * ptr = memory_pool_[pool_seq_]; - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in HeapPoolMemoryStrategy::alloc."); - } - memory_map_[ptr] = size; - size_t prev_pool_seq = pool_seq_; - pool_seq_ += size; - return memory_pool_[prev_pool_seq]; - } - - void free(void * ptr) - { - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in HeapPoolMemoryStrategy::free."); - } - - memset(ptr, 0, memory_map_[ptr]); - } - -private: - ObjectPoolBounds bounds_; - - void ** memory_pool_; - void ** subscription_pool_; - void ** service_pool_; - void ** client_pool_; - void ** guard_condition_pool_; - executor::AnyExecutable::SharedPtr * executable_pool_; - - size_t pool_seq_; - size_t exec_seq_; - - std::unordered_map memory_map_; -}; - -} /* heap_pool_memory_strategy */ - -} /* memory_strategies */ - -} /* rclcpp */ - -#endif diff --git a/rclcpp/include/rclcpp/strategies/stack_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/stack_pool_memory_strategy.hpp deleted file mode 100644 index a7fd0a9..0000000 --- a/rclcpp/include/rclcpp/strategies/stack_pool_memory_strategy.hpp +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP_RCLCPP_STACK_POOL_MEMORY_STRATEGY_HPP_ -#define RCLCPP_RCLCPP_STACK_POOL_MEMORY_STRATEGY_HPP_ - -#include - -#include - -namespace rclcpp -{ - -namespace memory_strategies -{ - -namespace stack_pool_memory_strategy -{ - -/// Stack-based memory pool allocation strategy, an alternative to the default memory strategy. -/** - * The memory managed by this class is allocated statically (on the heap) in the constructor. - * StackPoolMemoryStrategy puts a hard limit on the number of subscriptions, etc. that can be - * executed in one iteration of `Executor::spin`. Thus it allows for memory allocation optimization - * for situations where a limit on the number of such entities is known. - * Because this class is templated on the sizes of the memory pools for each entity, the amount of - * memory required by this class is known at compile time. - */ -template -class StackPoolMemoryStrategy : public memory_strategy::MemoryStrategy -{ -public: - StackPoolMemoryStrategy() - { - if (PoolSize) { - memory_pool_.fill(0); - } - - if (MaxSubscriptions) { - subscription_pool_.fill(0); - } - - if (MaxServices) { - service_pool_.fill(0); - } - - if (MaxClients) { - client_pool_.fill(0); - } - - if (MaxGuardConditions) { - guard_condition_pool_.fill(0); - } - - for (size_t i = 0; i < MaxExecutables; ++i) { - executable_pool_[i] = std::make_shared(); - } - - pool_seq_ = 0; - exec_seq_ = 0; - - // Reserve pool_size_ buckets in the memory map. - memory_map_.reserve(PoolSize); - for (size_t i = 0; i < PoolSize; ++i) { - memory_map_[memory_pool_[i]] = 0; - } - subs.reserve(MaxSubscriptions); - clients.reserve(MaxClients); - services.reserve(MaxServices); - } - - void ** borrow_handles(HandleType type, size_t number_of_handles) - { - switch (type) { - case HandleType::subscription_handle: - if (number_of_handles > MaxSubscriptions) { - throw std::runtime_error("Requested size exceeded maximum subscriptions."); - } - - return subscription_pool_.data(); - case HandleType::service_handle: - if (number_of_handles > MaxServices) { - throw std::runtime_error("Requested size exceeded maximum services."); - } - - return service_pool_.data(); - case HandleType::client_handle: - if (number_of_handles > MaxClients) { - throw std::runtime_error("Requested size exceeded maximum clients."); - } - - return client_pool_.data(); - case HandleType::guard_condition_handle: - if (number_of_handles > MaxGuardConditions) { - throw std::runtime_error("Requested size exceeded maximum guard_conditions."); - } - - return guard_condition_pool_.data(); - default: - break; - } - throw std::runtime_error("Unrecognized enum, could not borrow handle memory."); - } - - void return_handles(HandleType type, void ** handles) - { - (void)handles; - switch (type) { - case HandleType::subscription_handle: - if (MaxSubscriptions) { - subscription_pool_.fill(0); - } - break; - case HandleType::service_handle: - if (MaxServices) { - service_pool_.fill(0); - } - break; - case HandleType::client_handle: - if (MaxClients) { - client_pool_.fill(0); - } - break; - case HandleType::guard_condition_handle: - if (MaxGuardConditions) { - guard_condition_pool_.fill(0); - } - break; - default: - throw std::runtime_error("Unrecognized enum, could not return handle memory."); - } - } - - executor::AnyExecutable::SharedPtr instantiate_next_executable() - { - if (exec_seq_ >= MaxExecutables) { - // wrap around - exec_seq_ = 0; - } - size_t prev_exec_seq_ = exec_seq_; - ++exec_seq_; - - if (!executable_pool_[prev_exec_seq_]) { - throw std::runtime_error("Executable pool member was NULL"); - } - - // Make sure to clear the executable fields. - executable_pool_[prev_exec_seq_]->subscription.reset(); - executable_pool_[prev_exec_seq_]->timer.reset(); - executable_pool_[prev_exec_seq_]->service.reset(); - executable_pool_[prev_exec_seq_]->client.reset(); - executable_pool_[prev_exec_seq_]->callback_group.reset(); - executable_pool_[prev_exec_seq_]->node.reset(); - - return executable_pool_[prev_exec_seq_]; - } - - void * alloc(size_t size) - { - // Extremely naive static allocation strategy - // Keep track of block size at a given pointer - if (pool_seq_ + size > PoolSize) { - // Start at 0 - pool_seq_ = 0; - } - void * ptr = memory_pool_[pool_seq_]; - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in StackPoolMemoryStrategy::alloc."); - } - memory_map_[ptr] = size; - size_t prev_pool_seq = pool_seq_; - pool_seq_ += size; - return memory_pool_[prev_pool_seq]; - } - - void free(void * ptr) - { - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in StackPoolMemoryStrategy::free."); - } - - memset(ptr, 0, memory_map_[ptr]); - } - -private: - std::array memory_pool_; - std::array subscription_pool_; - std::array service_pool_; - std::array client_pool_; - std::array guard_condition_pool_; - std::array executable_pool_; - - size_t pool_seq_; - size_t exec_seq_; - - std::unordered_map memory_map_; -}; - -} /* stack_pool_memory_strategy */ - -} /* memory_strategies */ - -} /* rclcpp */ - -#endif diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 24d3f9b..8291668 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -67,11 +67,13 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(Publisher); - Publisher() { + Publisher() + { allocator_ = std::make_shared(); } - std::shared_ptr get_allocator() { + std::shared_ptr get_allocator() + { return allocator_; } };