diff --git a/rclcpp/include/rclcpp/allocator/allocator_common.hpp b/rclcpp/include/rclcpp/allocator/allocator_common.hpp new file mode 100644 index 0000000..d32b200 --- /dev/null +++ b/rclcpp/include/rclcpp/allocator/allocator_common.hpp @@ -0,0 +1,33 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__ALLOCATOR__ALLOCATOR_COMMON_HPP_ +#define RCLCPP__ALLOCATOR__ALLOCATOR_COMMON_HPP_ + +#include + +#include "rclcpp/allocator/allocator_deleter.hpp" + +namespace rclcpp +{ +namespace allocator +{ + +template +using AllocRebind = typename std::allocator_traits::template rebind_traits; + +} // namespace allocator +} // namespace rclcpp + +#endif // RCLCPP__ALLOCATOR__ALLOCATOR_COMMON_HPP_ diff --git a/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp b/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp new file mode 100644 index 0000000..c3b59f2 --- /dev/null +++ b/rclcpp/include/rclcpp/allocator/allocator_deleter.hpp @@ -0,0 +1,105 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__ALLOCATOR__ALLOCATOR_DELETER_HPP_ +#define RCLCPP__ALLOCATOR__ALLOCATOR_DELETER_HPP_ + +#include + +namespace rclcpp +{ + +namespace allocator +{ + +template +class AllocatorDeleter +{ + template + using AllocRebind = typename std::allocator_traits::template rebind_alloc; + +public: + AllocatorDeleter() + : allocator_(nullptr) + { + } + + explicit AllocatorDeleter(Allocator * a) + : allocator_(a) + { + } + + template + AllocatorDeleter(const AllocatorDeleter & a) + { + allocator_ = a.get_allocator(); + } + + template + void operator()(T * ptr) + { + std::allocator_traits>::destroy(*allocator_, ptr); + std::allocator_traits>::deallocate(*allocator_, ptr, 1); + ptr = nullptr; + } + + Allocator * get_allocator() const + { + return allocator_; + } + + void set_allocator(Allocator * alloc) + { + allocator_ = alloc; + } + +private: + Allocator * allocator_; +}; + +template +void set_allocator_for_deleter(D * deleter, Alloc * alloc) +{ + (void) alloc; + (void) deleter; + throw std::runtime_error("Reached unexpected template specialization"); +} + +template +void set_allocator_for_deleter(std::default_delete * deleter, std::allocator * alloc) +{ + (void) deleter; + (void) alloc; +} + +template +void set_allocator_for_deleter(AllocatorDeleter * deleter, Alloc * alloc) +{ + if (!deleter || !alloc) { + throw std::invalid_argument("Argument was NULL to set_allocator_for_deleter"); + } + deleter->set_allocator(alloc); +} + +template +using Deleter = typename std::conditional< + std::is_same::template rebind_alloc, + typename std::allocator::template rebind::other>::value, + std::default_delete, + AllocatorDeleter + >::type; +} // namespace allocator +} // namespace rclcpp + +#endif // RCLCPP__ALLOCATOR__ALLOCATOR_DELETER_HPP_ diff --git a/rclcpp/include/rclcpp/any_subscription_callback.hpp b/rclcpp/include/rclcpp/any_subscription_callback.hpp index fdec603..94e26a1 100644 --- a/rclcpp/include/rclcpp/any_subscription_callback.hpp +++ b/rclcpp/include/rclcpp/any_subscription_callback.hpp @@ -12,16 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ -#define RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ +#ifndef RCLCPP__ANY_SUBSCRIPTION_CALLBACK_HPP_ +#define RCLCPP__ANY_SUBSCRIPTION_CALLBACK_HPP_ -#include +#include #include #include #include -#include +#include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/function_traits.hpp" namespace rclcpp { @@ -29,18 +30,23 @@ namespace rclcpp namespace any_subscription_callback { -template +template class AnySubscriptionCallback { + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + using SharedPtrCallback = std::function)>; using SharedPtrWithInfoCallback = std::function, const rmw_message_info_t &)>; using ConstSharedPtrCallback = std::function)>; using ConstSharedPtrWithInfoCallback = std::function, const rmw_message_info_t &)>; - using UniquePtrCallback = std::function)>; + using UniquePtrCallback = std::function; using UniquePtrWithInfoCallback = - std::function, const rmw_message_info_t &)>; + std::function; SharedPtrCallback shared_ptr_callback_; SharedPtrWithInfoCallback shared_ptr_with_info_callback_; @@ -50,11 +56,14 @@ class AnySubscriptionCallback UniquePtrWithInfoCallback unique_ptr_with_info_callback_; public: - AnySubscriptionCallback() + explicit AnySubscriptionCallback(std::shared_ptr allocator) : shared_ptr_callback_(nullptr), shared_ptr_with_info_callback_(nullptr), const_shared_ptr_callback_(nullptr), const_shared_ptr_with_info_callback_(nullptr), unique_ptr_callback_(nullptr), unique_ptr_with_info_callback_(nullptr) - {} + { + message_allocator_ = std::make_shared(*allocator.get()); + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + } AnySubscriptionCallback(const AnySubscriptionCallback &) = default; @@ -155,17 +164,20 @@ public: } else if (const_shared_ptr_with_info_callback_) { const_shared_ptr_with_info_callback_(message, message_info); } else if (unique_ptr_callback_) { - unique_ptr_callback_(std::unique_ptr(new MessageT(*message))); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *message); + unique_ptr_callback_(MessageUniquePtr(ptr, message_deleter_)); } else if (unique_ptr_with_info_callback_) { - unique_ptr_with_info_callback_(std::unique_ptr(new MessageT(* - message)), message_info); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *message); + unique_ptr_with_info_callback_(MessageUniquePtr(ptr, message_deleter_), message_info); } else { throw std::runtime_error("unexpected message without any callback set"); } } void dispatch_intra_process( - std::unique_ptr & message, const rmw_message_info_t & message_info) + MessageUniquePtr & message, const rmw_message_info_t & message_info) { (void)message_info; if (shared_ptr_callback_) { @@ -188,9 +200,13 @@ public: throw std::runtime_error("unexpected message without any callback set"); } } + +private: + std::shared_ptr message_allocator_; + MessageDeleter message_deleter_; }; -} /* namespace any_subscription_callback */ -} /* namespace rclcpp */ +} // namespace any_subscription_callback +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_ANY_SUBSCRIPTION_CALLBACK_HPP_ */ +#endif // RCLCPP__ANY_SUBSCRIPTION_CALLBACK_HPP_ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 9a1658b..cc3fe42 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_EXECUTOR_HPP_ -#define RCLCPP_RCLCPP_EXECUTOR_HPP_ +#ifndef RCLCPP__EXECUTOR_HPP_ +#define RCLCPP__EXECUTOR_HPP_ #include #include @@ -23,13 +23,14 @@ #include #include -#include +#include "rcl_interfaces/msg/intra_process_message.hpp" -#include -#include -#include -#include -#include +#include "rclcpp/any_executable.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" namespace rclcpp { @@ -48,15 +49,13 @@ namespace executor */ class Executor { - friend class memory_strategy::MemoryStrategy; - public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Executor); /// Default constructor. // \param[in] ms The memory strategy to be used with this executor. explicit Executor(memory_strategy::MemoryStrategy::SharedPtr ms = - memory_strategy::create_default_strategy()) + memory_strategies::create_default_strategy()) : interrupt_guard_condition_(rmw_create_guard_condition()), memory_strategy_(ms) { @@ -94,7 +93,7 @@ public: for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); if (node == node_ptr) { - // TODO: Use a different error here? + // TODO(jacquelinekay): Use a different error here? throw std::runtime_error("Cannot add node to executor, node already added."); } } @@ -343,36 +342,11 @@ protected: void wait_for_work(std::chrono::duration timeout = std::chrono::duration(-1)) { - memory_strategy_->subs.clear(); - memory_strategy_->services.clear(); - memory_strategy_->clients.clear(); + memory_strategy_->clear_active_entities(); + // Collect the subscriptions and timers to be waited on - bool has_invalid_weak_nodes = false; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - has_invalid_weak_nodes = false; - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - memory_strategy_->subs.push_back(subscription); - } - } - for (auto & service : group->get_service_ptrs()) { - memory_strategy_->services.push_back(service); - } - for (auto & client : group->get_client_ptrs()) { - memory_strategy_->clients.push_back(client); - } - } - } + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + // Clean up any invalid nodes, if they were detected if (has_invalid_weak_nodes) { weak_nodes_.erase( @@ -382,74 +356,26 @@ protected: return i.expired(); })); } + // Use the number of subscriptions to allocate memory in the handles - size_t max_number_of_subscriptions = memory_strategy_->subs.size() * 2; // Times two for intra-process. rmw_subscriptions_t subscriber_handles; - subscriber_handles.subscriber_count = 0; - // TODO(wjwwood): Avoid redundant malloc's - subscriber_handles.subscribers = memory_strategy_->borrow_handles( - HandleType::subscription_handle, max_number_of_subscriptions); - if (subscriber_handles.subscribers == NULL && - max_number_of_subscriptions > 0) - { - // TODO(wjwwood): Use a different error here? maybe std::bad_alloc? - throw std::runtime_error("Could not malloc for subscriber pointers."); - } - // Then fill the SubscriberHandles with ready subscriptions - for (auto & subscription : memory_strategy_->subs) { - subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = - subscription->get_subscription_handle()->data; - if (subscription->get_intra_process_subscription_handle()) { - subscriber_handles.subscribers[subscriber_handles.subscriber_count++] = - subscription->get_intra_process_subscription_handle()->data; - } - } + subscriber_handles.subscriber_count = + memory_strategy_->fill_subscriber_handles(subscriber_handles.subscribers); - // Use the number of services to allocate memory in the handles - size_t number_of_services = memory_strategy_->services.size(); rmw_services_t service_handles; - service_handles.service_count = number_of_services; - service_handles.services = - memory_strategy_->borrow_handles(HandleType::service_handle, number_of_services); - if (service_handles.services == NULL && - number_of_services > 0) - { - // TODO(esteve): Use a different error here? maybe std::bad_alloc? - throw std::runtime_error("Could not malloc for service pointers."); - } - // Then fill the ServiceHandles with ready services - size_t service_handle_index = 0; - for (auto & service : memory_strategy_->services) { - service_handles.services[service_handle_index] = \ - service->get_service_handle()->data; - service_handle_index += 1; - } + service_handles.service_count = + memory_strategy_->fill_service_handles(service_handles.services); - // Use the number of clients to allocate memory in the handles - size_t number_of_clients = memory_strategy_->clients.size(); rmw_clients_t client_handles; - client_handles.client_count = number_of_clients; - client_handles.clients = - memory_strategy_->borrow_handles(HandleType::client_handle, number_of_clients); - if (client_handles.clients == NULL && number_of_clients > 0) { - // TODO: Use a different error here? maybe std::bad_alloc? - throw std::runtime_error("Could not malloc for client pointers."); - } - // Then fill the ServiceHandles with ready clients - size_t client_handle_index = 0; - for (auto & client : memory_strategy_->clients) { - client_handles.clients[client_handle_index] = \ - client->get_client_handle()->data; - client_handle_index += 1; - } + client_handles.client_count = + memory_strategy_->fill_client_handles(client_handles.clients); // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, // and one for the executor's guard cond (interrupt_guard_condition_) size_t number_of_guard_conds = 2; rmw_guard_conditions_t guard_condition_handles; guard_condition_handles.guard_condition_count = number_of_guard_conds; - guard_condition_handles.guard_conditions = - memory_strategy_->borrow_handles(HandleType::guard_condition_handle, number_of_guard_conds); + guard_condition_handles.guard_conditions = static_cast(guard_cond_handles_.data()); if (guard_condition_handles.guard_conditions == NULL && number_of_guard_conds > 0) { @@ -500,132 +426,14 @@ protected: // If ctrl-c guard condition, return directly if (guard_condition_handles.guard_conditions[0] != 0) { // Make sure to free or clean memory - memory_strategy_->return_handles(HandleType::subscription_handle, - subscriber_handles.subscribers); - memory_strategy_->return_handles(HandleType::service_handle, - service_handles.services); - memory_strategy_->return_handles(HandleType::client_handle, - client_handles.clients); - memory_strategy_->return_handles(HandleType::guard_condition_handle, - guard_condition_handles.guard_conditions); + memory_strategy_->clear_handles(); return; } - // Add the new work to the class's list of things waiting to be executed - // Starting with the subscribers - for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) { - void * handle = subscriber_handles.subscribers[i]; - if (handle) { - subscriber_handles_.push_back(handle); - } - } - // Then the services - for (size_t i = 0; i < number_of_services; ++i) { - void * handle = service_handles.services[i]; - if (handle) { - service_handles_.push_back(handle); - } - } - // Then the clients - for (size_t i = 0; i < number_of_clients; ++i) { - void * handle = client_handles.clients[i]; - if (handle) { - client_handles_.push_back(handle); - } - } - memory_strategy_->return_handles(HandleType::subscription_handle, - subscriber_handles.subscribers); - memory_strategy_->return_handles(HandleType::service_handle, - service_handles.services); - memory_strategy_->return_handles(HandleType::client_handle, - client_handles.clients); - memory_strategy_->return_handles(HandleType::guard_condition_handle, - guard_condition_handles.guard_conditions); - - memory_strategy_->subs.clear(); - memory_strategy_->services.clear(); - memory_strategy_->clients.clear(); + memory_strategy_->revalidate_handles(); } /******************************/ - - rclcpp::subscription::SubscriptionBase::SharedPtr - get_subscription_by_handle(void * subscriber_handle) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & weak_subscription : group->get_subscription_ptrs()) { - auto subscription = weak_subscription.lock(); - if (subscription) { - if (subscription->get_subscription_handle()->data == subscriber_handle) { - return subscription; - } - if (subscription->get_intra_process_subscription_handle() && - subscription->get_intra_process_subscription_handle()->data == subscriber_handle) - { - return subscription; - } - } - } - } - } - return nullptr; - } - - rclcpp::service::ServiceBase::SharedPtr - get_service_by_handle(void * service_handle) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & service : group->get_service_ptrs()) { - if (service->get_service_handle()->data == service_handle) { - return service; - } - } - } - } - return rclcpp::service::ServiceBase::SharedPtr(); - } - - rclcpp::client::ClientBase::SharedPtr - get_client_by_handle(void * client_handle) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & client : group->get_client_ptrs()) { - if (client->get_client_handle()->data == client_handle) { - return client; - } - } - } - } - return rclcpp::client::ClientBase::SharedPtr(); - } - rclcpp::node::Node::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group) { @@ -729,213 +537,32 @@ protected: return latest; } - rclcpp::callback_group::CallbackGroup::SharedPtr - get_group_by_subscription( - rclcpp::subscription::SubscriptionBase::SharedPtr subscription) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & weak_sub : group->get_subscription_ptrs()) { - auto sub = weak_sub.lock(); - if (sub == subscription) { - return group; - } - } - } - } - return rclcpp::callback_group::CallbackGroup::SharedPtr(); - } - - void - get_next_subscription(AnyExecutable::SharedPtr any_exec) - { - auto it = subscriber_handles_.begin(); - while (it != subscriber_handles_.end()) { - auto subscription = get_subscription_by_handle(*it); - if (subscription) { - // Figure out if this is for intra-process or not. - bool is_intra_process = false; - if (subscription->get_intra_process_subscription_handle()) { - is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it; - } - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_subscription(subscription); - if (!group) { - // Group was not found, meaning the subscription is not valid... - // Remove it from the ready list and continue looking - subscriber_handles_.erase(it++); - continue; - } - if (!group->can_be_taken_from().load()) { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - if (is_intra_process) { - any_exec->subscription_intra_process = subscription; - } else { - any_exec->subscription = subscription; - } - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group); - subscriber_handles_.erase(it++); - return; - } - // Else, the subscription is no longer valid, remove it and continue - subscriber_handles_.erase(it++); - } - } - - rclcpp::callback_group::CallbackGroup::SharedPtr - get_group_by_service( - rclcpp::service::ServiceBase::SharedPtr service) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & serv : group->get_service_ptrs()) { - if (serv == service) { - return group; - } - } - } - } - return rclcpp::callback_group::CallbackGroup::SharedPtr(); - } - - void - get_next_service(AnyExecutable::SharedPtr any_exec) - { - auto it = service_handles_.begin(); - while (it != service_handles_.end()) { - auto service = get_service_by_handle(*it); - if (service) { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_service(service); - if (!group) { - // Group was not found, meaning the service is not valid... - // Remove it from the ready list and continue looking - service_handles_.erase(it++); - continue; - } - if (!group->can_be_taken_from().load()) { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec->service = service; - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group); - service_handles_.erase(it++); - return; - } - // Else, the service is no longer valid, remove it and continue - service_handles_.erase(it++); - } - } - - rclcpp::callback_group::CallbackGroup::SharedPtr - get_group_by_client( - rclcpp::client::ClientBase::SharedPtr client) - { - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group) { - continue; - } - for (auto & cli : group->get_client_ptrs()) { - if (cli == client) { - return group; - } - } - } - } - return rclcpp::callback_group::CallbackGroup::SharedPtr(); - } - - void - get_next_client(AnyExecutable::SharedPtr any_exec) - { - auto it = client_handles_.begin(); - while (it != client_handles_.end()) { - auto client = get_client_by_handle(*it); - if (client) { - // Find the group for this handle and see if it can be serviced - auto group = get_group_by_client(client); - if (!group) { - // Group was not found, meaning the service is not valid... - // Remove it from the ready list and continue looking - client_handles_.erase(it++); - continue; - } - if (!group->can_be_taken_from().load()) { - // Group is mutually exclusive and is being used, so skip it for now - // Leave it to be checked next time, but continue searching - ++it; - continue; - } - // Otherwise it is safe to set and return the any_exec - any_exec->client = client; - any_exec->callback_group = group; - any_exec->node = get_node_by_group(group); - client_handles_.erase(it++); - return; - } - // Else, the service is no longer valid, remove it and continue - client_handles_.erase(it++); - } - } - AnyExecutable::SharedPtr get_next_ready_executable() { - auto any_exec = this->memory_strategy_->instantiate_next_executable(); + auto any_exec = memory_strategy_->instantiate_next_executable(); // Check the timers to see if there are any that are ready, if so return get_next_timer(any_exec); if (any_exec->timer) { return any_exec; } // Check the subscriptions to see if there are any that are ready - get_next_subscription(any_exec); + memory_strategy_->get_next_subscription(any_exec, weak_nodes_); if (any_exec->subscription || any_exec->subscription_intra_process) { return any_exec; } // Check the services to see if there are any that are ready - get_next_service(any_exec); + memory_strategy_->get_next_service(any_exec, weak_nodes_); if (any_exec->service) { return any_exec; } // Check the clients to see if there are any that are ready - get_next_client(any_exec); + memory_strategy_->get_next_client(any_exec, weak_nodes_); if (any_exec->client) { return any_exec; } // If there is no ready executable, return a null ptr - return std::shared_ptr(); + return nullptr; } template @@ -980,15 +607,10 @@ private: RCLCPP_DISABLE_COPY(Executor); std::vector> weak_nodes_; - using SubscriberHandles = std::list; - SubscriberHandles subscriber_handles_; - using ServiceHandles = std::list; - ServiceHandles service_handles_; - using ClientHandles = std::list; - ClientHandles client_handles_; + std::array guard_cond_handles_; }; -} /* executor */ -} /* rclcpp */ +} // namespace executor +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_EXECUTOR_HPP_ */ +#endif // RCLCPP__EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index a251560..82b1f84 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_EXECUTORS_MULTI_THREADED_EXECUTOR_HPP_ -#define RCLCPP_RCLCPP_EXECUTORS_MULTI_THREADED_EXECUTOR_HPP_ +#ifndef RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ + +#include #include #include @@ -21,12 +23,10 @@ #include #include -#include - -#include -#include -#include -#include +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" namespace rclcpp { @@ -41,7 +41,7 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(MultiThreadedExecutor); MultiThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms = - memory_strategy::create_default_strategy()) + memory_strategies::create_default_strategy()) : executor::Executor(ms) { number_of_threads_ = std::thread::hardware_concurrency(); @@ -97,11 +97,10 @@ private: std::mutex wait_mutex_; size_t number_of_threads_; - }; -} /* namespace multi_threaded_executor */ -} /* namespace executors */ -} /* namespace rclcpp */ +} // namespace multi_threaded_executor +} // namespace executors +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_EXECUTORS_MULTI_THREADED_EXECUTOR_HPP_ */ +#endif // RCLCPP__EXECUTORS__MULTI_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 9155956..be87e19 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -12,22 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_EXECUTORS_SINGLE_THREADED_EXECUTOR_HPP_ -#define RCLCPP_RCLCPP_EXECUTORS_SINGLE_THREADED_EXECUTOR_HPP_ +#ifndef RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ + +#include #include #include #include #include -#include - -#include -#include -#include -#include -#include -#include +#include "rclcpp/executor.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/rate.hpp" namespace rclcpp { @@ -45,7 +45,7 @@ public: /// Default constructor. See the default constructor for Executor. SingleThreadedExecutor(memory_strategy::MemoryStrategy::SharedPtr ms = - memory_strategy::create_default_strategy()) + memory_strategies::create_default_strategy()) : executor::Executor(ms) {} /// Default destrcutor. @@ -64,11 +64,10 @@ public: private: RCLCPP_DISABLE_COPY(SingleThreadedExecutor); - }; -} /* namespace single_threaded_executor */ -} /* namespace executors */ -} /* namespace rclcpp */ +} // namespace single_threaded_executor +} // namespace executors +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_EXECUTORS_SINGLE_THREADED_EXECUTOR_HPP_ */ +#endif // RCLCPP__EXECUTORS__SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/intra_process_manager.hpp b/rclcpp/include/rclcpp/intra_process_manager.hpp index 340b527..d49d91c 100644 --- a/rclcpp/include/rclcpp/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/intra_process_manager.hpp @@ -12,24 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ -#define RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ +#ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_ +#define RCLCPP__INTRA_PROCESS_MANAGER_HPP_ -#include -#include -#include -#include +#include #include #include #include #include -#include #include +#include #include #include -#include +#include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/intra_process_manager_state.hpp" +#include "rclcpp/mapped_ring_buffer.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/subscription.hpp" namespace rclcpp { @@ -122,7 +124,12 @@ private: public: RCLCPP_SMART_PTR_DEFINITIONS(IntraProcessManager); - IntraProcessManager() = default; + explicit IntraProcessManager( + IntraProcessManagerStateBase::SharedPtr state = create_default_state() + ) + : state_(state) + { + } /// Register a subscription with the manager, returns subscriptions unique id. /* In addition to generating a unique intra process id for the subscription, @@ -140,8 +147,7 @@ public: add_subscription(subscription::SubscriptionBase::SharedPtr subscription) { auto id = IntraProcessManager::get_next_unique_id(); - subscriptions_[id] = subscription; - subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); + state_->add_subscription(id, subscription); return id; } @@ -153,17 +159,7 @@ public: void remove_subscription(uint64_t intra_process_subscription_id) { - subscriptions_.erase(intra_process_subscription_id); - for (auto & pair : subscription_ids_by_topic_) { - pair.second.erase(intra_process_subscription_id); - } - // Iterate over all publisher infos and all stored subscription id's and - // remove references to this subscription's id. - for (auto & publisher_pair : publishers_) { - for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { - sub_pair.second.erase(intra_process_subscription_id); - } - } + state_->remove_subscription(intra_process_subscription_id); } /// Register a publisher with the manager, returns the publisher unique id. @@ -188,21 +184,17 @@ public: * \param buffer_size if 0 (default) a size is calculated based on the QoS. * \return an unsigned 64-bit integer which is the publisher's unique id. */ - template + template uint64_t - add_publisher(typename publisher::Publisher::SharedPtr publisher, + add_publisher(typename publisher::Publisher::SharedPtr publisher, size_t buffer_size = 0) { auto id = IntraProcessManager::get_next_unique_id(); - publishers_[id].publisher = publisher; size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size(); - // As long as the size of the ring buffer is less than the max sequence number, we're safe. - if (size > std::numeric_limits::max()) { - throw std::invalid_argument("the calculated buffer size is too large"); - } - publishers_[id].sequence_number.store(0); - publishers_[id].buffer = mapped_ring_buffer::MappedRingBuffer::make_shared(size); - publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + auto mrb = mapped_ring_buffer::MappedRingBuffer::MessageAlloc>::make_shared( + size, publisher->get_allocator()); + state_->add_publisher(id, publisher, mrb, size); return id; } @@ -214,7 +206,7 @@ public: void remove_publisher(uint64_t intra_process_publisher_id) { - publishers_.erase(intra_process_publisher_id); + state_->remove_publisher(intra_process_publisher_id); } /// Store a message in the manager, and return the message sequence number. @@ -247,42 +239,30 @@ public: * \param message the message that is being stored. * \return the message sequence number. */ - template + template, + typename Deleter = std::default_delete> uint64_t store_intra_process_message( uint64_t intra_process_publisher_id, - std::unique_ptr & message) + std::unique_ptr & message) { - auto it = publishers_.find(intra_process_publisher_id); - if (it == publishers_.end()) { - throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + using TypedMRB = typename mapped_ring_buffer::MappedRingBuffer; + uint64_t message_seq = 0; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->get_publisher_info_for_id( + intra_process_publisher_id, message_seq); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + throw std::runtime_error("Typecast failed due to incorrect message type"); } - PublisherInfo & info = it->second; - // Calculate the next message sequence number. - uint64_t message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); + // Insert the message into the ring buffer using the message_seq to identify it. - typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info.buffer); bool did_replace = typed_buffer->push_and_replace(message_seq, message); // TODO(wjwwood): do something when a message was displaced. log debug? (void)did_replace; // Avoid unused variable warning. - // Figure out what subscriptions should receive the message. - auto publisher = info.publisher.lock(); - if (!publisher) { - throw std::runtime_error("publisher has unexpectedly gone out of scope"); - } - auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; - // Store the list for later comparison. - info.target_subscriptions_by_message_sequence[message_seq].clear(); - std::copy( - destined_subscriptions.begin(), destined_subscriptions.end(), - // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] - std::inserter( - info.target_subscriptions_by_message_sequence[message_seq], - // This ends up only being a hint to std::set, could also be .begin(). - info.target_subscriptions_by_message_sequence[message_seq].end() - ) - ); + + state_->store_intra_process_message(intra_process_publisher_id, message_seq); + // Return the message sequence which is sent to the subscription. return message_seq; } @@ -321,48 +301,32 @@ public: * \param requesting_subscriptions_intra_process_id the subscription's id. * \param message the message typed unique_ptr used to return the message. */ - template + template, + typename Deleter = std::default_delete> void take_intra_process_message( uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, - std::unique_ptr & message) + std::unique_ptr & message) { + using MRBMessageAlloc = typename std::allocator_traits::template rebind_alloc; + using TypedMRB = mapped_ring_buffer::MappedRingBuffer; message = nullptr; - PublisherInfo * info; - { - auto it = publishers_.find(intra_process_publisher_id); - if (it == publishers_.end()) { - // Publisher is either invalid or no longer exists. - return; - } - info = &it->second; + + size_t target_subs_size = 0; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = state_->take_intra_process_message( + intra_process_publisher_id, + message_sequence_number, + requesting_subscriptions_intra_process_id, + target_subs_size + ); + typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(buffer); + if (!typed_buffer) { + return; } - // Figure out how many subscriptions are left. - std::set * target_subs; - { - auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); - if (it == info->target_subscriptions_by_message_sequence.end()) { - // Message is no longer being stored by this publisher. - return; - } - target_subs = &it->second; - } - { - auto it = std::find( - target_subs->begin(), target_subs->end(), - requesting_subscriptions_intra_process_id); - if (it == target_subs->end()) { - // This publisher id/message seq pair was not intended for this subscription. - return; - } - target_subs->erase(it); - } - typedef typename mapped_ring_buffer::MappedRingBuffer TypedMRB; - typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast(info->buffer); // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left. - if (target_subs->size()) { + if (target_subs_size) { // There are more subscriptions to serve, return a copy. typed_buffer->get_copy_at_key(message_sequence_number, message); } else { @@ -375,16 +339,7 @@ public: bool matches_any_publishers(const rmw_gid_t * id) const { - for (auto & publisher_pair : publishers_) { - auto publisher = publisher_pair.second.publisher.lock(); - if (!publisher) { - continue; - } - if (*publisher.get() == id) { - return true; - } - } - return false; + return state_->matches_any_publishers(id); } private: @@ -411,28 +366,12 @@ private: static std::atomic next_unique_id_; - std::unordered_map subscriptions_; - std::map> subscription_ids_by_topic_; - - struct PublisherInfo - { - RCLCPP_DISABLE_COPY(PublisherInfo); - - PublisherInfo() = default; - - publisher::PublisherBase::WeakPtr publisher; - std::atomic sequence_number; - mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; - std::unordered_map> target_subscriptions_by_message_sequence; - }; - - std::unordered_map publishers_; - + IntraProcessManagerStateBase::SharedPtr state_; }; std::atomic IntraProcessManager::next_unique_id_ {1}; -} /* namespace intra_process_manager */ -} /* namespace rclcpp */ +} // namespace intra_process_manager +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_INTRA_PROCESS_MANAGER_HPP_ */ +#endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_ diff --git a/rclcpp/include/rclcpp/intra_process_manager_state.hpp b/rclcpp/include/rclcpp/intra_process_manager_state.hpp new file mode 100644 index 0000000..8a4bc5e --- /dev/null +++ b/rclcpp/include/rclcpp/intra_process_manager_state.hpp @@ -0,0 +1,270 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ +#define RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace rclcpp +{ +namespace intra_process_manager +{ + +class IntraProcessManagerStateBase +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(IntraProcessManagerStateBase); + + virtual void + add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0; + + virtual void + remove_subscription(uint64_t intra_process_subscription_id) = 0; + + virtual void add_publisher(uint64_t id, + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) = 0; + + virtual void + remove_publisher(uint64_t intra_process_publisher_id) = 0; + + virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr + get_publisher_info_for_id( + uint64_t intra_process_publisher_id, + uint64_t & message_seq) = 0; + + virtual void + store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0; + + virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr + take_intra_process_message(uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + size_t & size) = 0; + + virtual bool + matches_any_publishers(const rmw_gid_t * id) const = 0; +}; + +template> +class IntraProcessManagerState : public IntraProcessManagerStateBase +{ +public: + void + add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) + { + subscriptions_[id] = subscription; + subscription_ids_by_topic_[subscription->get_topic_name()].insert(id); + } + + void + remove_subscription(uint64_t intra_process_subscription_id) + { + subscriptions_.erase(intra_process_subscription_id); + for (auto & pair : subscription_ids_by_topic_) { + pair.second.erase(intra_process_subscription_id); + } + // Iterate over all publisher infos and all stored subscription id's and + // remove references to this subscription's id. + for (auto & publisher_pair : publishers_) { + for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) { + sub_pair.second.erase(intra_process_subscription_id); + } + } + } + + void add_publisher(uint64_t id, + publisher::PublisherBase::WeakPtr publisher, + mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, + size_t size) + { + publishers_[id].publisher = publisher; + // As long as the size of the ring buffer is less than the max sequence number, we're safe. + if (size > std::numeric_limits::max()) { + throw std::invalid_argument("the calculated buffer size is too large"); + } + publishers_[id].sequence_number.store(0); + + publishers_[id].buffer = mrb; + publishers_[id].target_subscriptions_by_message_sequence.reserve(size); + } + + void + remove_publisher(uint64_t intra_process_publisher_id) + { + publishers_.erase(intra_process_publisher_id); + } + + // return message_seq and mrb + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + get_publisher_info_for_id( + uint64_t intra_process_publisher_id, + uint64_t & message_seq) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + PublisherInfo & info = it->second; + // Calculate the next message sequence number. + message_seq = info.sequence_number.fetch_add(1, std::memory_order_relaxed); + + return info.buffer; + } + + void + store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + throw std::runtime_error("store_intra_process_message called with invalid publisher id"); + } + PublisherInfo & info = it->second; + auto publisher = info.publisher.lock(); + if (!publisher) { + throw std::runtime_error("publisher has unexpectedly gone out of scope"); + } + + // Figure out what subscriptions should receive the message. + auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()]; + // Store the list for later comparison. + info.target_subscriptions_by_message_sequence[message_seq].clear(); + std::copy( + destined_subscriptions.begin(), destined_subscriptions.end(), + // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq] + std::inserter( + info.target_subscriptions_by_message_sequence[message_seq], + // This ends up only being a hint to std::set, could also be .begin(). + info.target_subscriptions_by_message_sequence[message_seq].end() + ) + ); + } + + mapped_ring_buffer::MappedRingBufferBase::SharedPtr + take_intra_process_message(uint64_t intra_process_publisher_id, + uint64_t message_sequence_number, + uint64_t requesting_subscriptions_intra_process_id, + size_t & size + ) + { + PublisherInfo * info; + { + auto it = publishers_.find(intra_process_publisher_id); + if (it == publishers_.end()) { + // Publisher is either invalid or no longer exists. + return 0; + } + info = &it->second; + } + // Figure out how many subscriptions are left. + AllocSet * target_subs; + { + auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number); + if (it == info->target_subscriptions_by_message_sequence.end()) { + // Message is no longer being stored by this publisher. + return 0; + } + target_subs = &it->second; + } + { + auto it = std::find( + target_subs->begin(), target_subs->end(), + requesting_subscriptions_intra_process_id); + if (it == target_subs->end()) { + // This publisher id/message seq pair was not intended for this subscription. + return 0; + } + target_subs->erase(it); + } + size = target_subs->size(); + return info->buffer; + } + + bool + matches_any_publishers(const rmw_gid_t * id) const + { + for (auto & publisher_pair : publishers_) { + auto publisher = publisher_pair.second.publisher.lock(); + if (!publisher) { + continue; + } + if (*publisher.get() == id) { + return true; + } + } + return false; + } + +private: + template + using RebindAlloc = typename std::allocator_traits::template rebind_alloc; + + using AllocSet = std::set, RebindAlloc>; + using SubscriptionMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + using IDTopicMap = std::map, RebindAlloc>>; + + SubscriptionMap subscriptions_; + + IDTopicMap subscription_ids_by_topic_; + + struct PublisherInfo + { + RCLCPP_DISABLE_COPY(PublisherInfo); + + PublisherInfo() = default; + + publisher::PublisherBase::WeakPtr publisher; + std::atomic sequence_number; + mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer; + + using TargetSubscriptionsMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + TargetSubscriptionsMap target_subscriptions_by_message_sequence; + }; + + using PublisherMap = std::unordered_map, std::equal_to, + RebindAlloc>>; + + PublisherMap publishers_; +}; + +static IntraProcessManagerStateBase::SharedPtr create_default_state() +{ + return std::make_shared>(); +} + +} // namespace intra_process_manager +} // namespace rclcpp + +#endif // RCLCPP__INTRA_PROCESS_MANAGER_STATE_HPP_ diff --git a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp index 4b5bb47..8a9b7f9 100644 --- a/rclcpp/include/rclcpp/mapped_ring_buffer.hpp +++ b/rclcpp/include/rclcpp/mapped_ring_buffer.hpp @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_RING_BUFFER_HPP_ -#define RCLCPP_RCLCPP_RING_BUFFER_HPP_ - -#include +#ifndef RCLCPP__MAPPED_RING_BUFFER_HPP_ +#define RCLCPP__MAPPED_RING_BUFFER_HPP_ #include #include @@ -23,6 +21,9 @@ #include #include +#include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/macros.hpp" + namespace rclcpp { namespace mapped_ring_buffer @@ -50,24 +51,35 @@ public: * there is no guarantee on which value is returned if a key is used multiple * times. */ -template +template> class MappedRingBuffer : public MappedRingBufferBase { public: - RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer); + RCLCPP_SMART_PTR_DEFINITIONS(MappedRingBuffer); + using ElemAllocTraits = allocator::AllocRebind; + using ElemAlloc = typename ElemAllocTraits::allocator_type; + using ElemDeleter = allocator::Deleter; + + using ElemUniquePtr = std::unique_ptr; /// Constructor. /* The constructor will allocate memory while reserving space. * * \param size size of the ring buffer; must be positive and non-zero. */ - MappedRingBuffer(size_t size) + explicit MappedRingBuffer(size_t size, std::shared_ptr allocator = nullptr) : elements_(size), head_(0) { if (size == 0) { throw std::invalid_argument("size must be a positive, non-zero value"); } + if (!allocator) { + allocator_ = std::make_shared(); + } else { + allocator_ = std::make_shared(*allocator.get()); + } } + virtual ~MappedRingBuffer() {} /// Return a copy of the value stored in the ring buffer at the given key. @@ -82,12 +94,14 @@ public: * \param value if the key is found, the value is stored in this parameter */ void - get_copy_at_key(uint64_t key, std::unique_ptr & value) + get_copy_at_key(uint64_t key, ElemUniquePtr & value) { auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { - value = std::unique_ptr(new T(*it->value)); + auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); + ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value); + value = ElemUniquePtr(ptr); } } @@ -111,13 +125,15 @@ public: * \param value if the key is found, the value is stored in this parameter */ void - get_ownership_at_key(uint64_t key, std::unique_ptr & value) + get_ownership_at_key(uint64_t key, ElemUniquePtr & value) { auto it = get_iterator_of_key(key); value = nullptr; if (it != elements_.end() && it->in_use) { // Make a copy. - auto copy = std::unique_ptr(new T(*it->value)); + auto ptr = ElemAllocTraits::allocate(*allocator_.get(), 1); + ElemAllocTraits::construct(*allocator_.get(), ptr, *it->value); + auto copy = ElemUniquePtr(ptr); // Return the original. value.swap(it->value); // Store the copy. @@ -136,7 +152,7 @@ public: * \param value if the key is found, the value is stored in this parameter */ void - pop_at_key(uint64_t key, std::unique_ptr & value) + pop_at_key(uint64_t key, ElemUniquePtr & value) { auto it = get_iterator_of_key(key); value = nullptr; @@ -158,7 +174,7 @@ public: * \param value the value to store, and optionally the value displaced */ bool - push_and_replace(uint64_t key, std::unique_ptr & value) + push_and_replace(uint64_t key, ElemUniquePtr & value) { bool did_replace = elements_[head_].in_use; elements_[head_].key = key; @@ -169,9 +185,9 @@ public: } bool - push_and_replace(uint64_t key, std::unique_ptr && value) + push_and_replace(uint64_t key, ElemUniquePtr && value) { - std::unique_ptr temp = std::move(value); + ElemUniquePtr temp = std::move(value); return push_and_replace(key, temp); } @@ -183,16 +199,18 @@ public: } private: - RCLCPP_DISABLE_COPY(MappedRingBuffer); + RCLCPP_DISABLE_COPY(MappedRingBuffer); struct element { uint64_t key; - std::unique_ptr value; + ElemUniquePtr value; bool in_use; }; - typename std::vector::iterator + using VectorAlloc = typename std::allocator_traits::template rebind_alloc; + + typename std::vector::iterator get_iterator_of_key(uint64_t key) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) @@ -203,12 +221,12 @@ private: return it; } - std::vector elements_; + std::vector elements_; size_t head_; - + std::shared_ptr allocator_; }; -} /* namespace ring_buffer */ -} /* namespace rclcpp */ +} // namespace mapped_ring_buffer +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_RING_BUFFER_HPP_ */ +#endif // RCLCPP__MAPPED_RING_BUFFER_HPP_ diff --git a/rclcpp/include/rclcpp/memory_strategies.hpp b/rclcpp/include/rclcpp/memory_strategies.hpp index 598da69..f5a1a2a 100644 --- a/rclcpp/include/rclcpp/memory_strategies.hpp +++ b/rclcpp/include/rclcpp/memory_strategies.hpp @@ -12,21 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_ -#define RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_ +#ifndef RCLCPP__MEMORY_STRATEGIES_HPP_ +#define RCLCPP__MEMORY_STRATEGIES_HPP_ -#include -#include +#include +#include namespace rclcpp { namespace memory_strategies { -using rclcpp::memory_strategies::heap_pool_memory_strategy::HeapPoolMemoryStrategy; -using rclcpp::memory_strategies::stack_pool_memory_strategy::StackPoolMemoryStrategy; +using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy; -} /* memory_strategies */ -} /* rclcpp */ +static memory_strategy::MemoryStrategy::SharedPtr create_default_strategy() +{ + return std::make_shared>(); +} -#endif +} // namespace memory_strategies +} // namespace rclcpp + +#endif // RCLCPP__MEMORY_STRATEGIES_HPP_ diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index eed0e0e..895ea9d 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -12,21 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_ -#define RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_ -#include +#ifndef RCLCPP__MEMORY_STRATEGY_HPP_ +#define RCLCPP__MEMORY_STRATEGY_HPP_ + +#include +#include + +#include "rclcpp/any_executable.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/node.hpp" namespace rclcpp { - -// TODO move HandleType somewhere where it makes sense -enum class HandleType {subscription_handle, service_handle, client_handle, guard_condition_handle}; - -namespace executor -{ -class Executor; -} - namespace memory_strategy { @@ -38,139 +35,221 @@ namespace memory_strategy */ class MemoryStrategy { - - friend class executor::Executor; - public: - RCLCPP_SMART_PTR_DEFINITIONS(MemoryStrategy); + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy); + using WeakNodeVector = std::vector>; - /// Borrow memory for storing data for subscriptions, services, clients, or guard conditions. - /** - * The default implementation stores std::vectors for each handle type and resizes the vectors - * as necessary based on the requested number of handles. - * \param[in] The type of entity that this function is requesting for. - * \param[in] The number of handles to borrow. - * \return Pointer to the allocated handles. - */ - virtual void ** borrow_handles(HandleType type, size_t number_of_handles) - { - switch (type) { - case HandleType::subscription_handle: - if (subscription_handles.size() < number_of_handles) { - subscription_handles.resize(number_of_handles, 0); - } - return static_cast(subscription_handles.data()); - case HandleType::service_handle: - if (service_handles.size() < number_of_handles) { - service_handles.resize(number_of_handles, 0); - } - return static_cast(service_handles.data()); - case HandleType::client_handle: - if (client_handles.size() < number_of_handles) { - client_handles.resize(number_of_handles, 0); - } - return static_cast(client_handles.data()); - case HandleType::guard_condition_handle: - if (number_of_handles > 2) { - throw std::runtime_error("Too many guard condition handles requested!"); - } - return guard_cond_handles.data(); - default: - throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast(type)) + - ", could not borrow handle memory."); - } - } + // return the new number of subscribers + virtual size_t fill_subscriber_handles(void ** & ptr) = 0; - /// Return the memory borrowed in borrow_handles. - /** - * return_handles should always mirror the way memory was borrowed in borrow_handles. - * \param[in] The type of entity that this function is returning. - * \param[in] Pointer to the handles returned. - */ - virtual void return_handles(HandleType type, void ** handles) - { - switch (type) { - case HandleType::subscription_handle: - if (handles != subscription_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - memset(handles, 0, subscription_handles.size()); - break; - case HandleType::service_handle: - if (handles != service_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - memset(handles, 0, service_handles.size()); - break; - case HandleType::client_handle: - if (handles != client_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - memset(handles, 0, client_handles.size()); - break; - case HandleType::guard_condition_handle: - if (handles != guard_cond_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - guard_cond_handles.fill(0); - break; - default: - throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast(type)) + - ", could not borrow handle memory."); - } - } + // return the new number of services + virtual size_t fill_service_handles(void ** & ptr) = 0; + + // return the new number of clients + virtual size_t fill_client_handles(void ** & ptr) = 0; + + virtual void clear_active_entities() = 0; + + virtual void clear_handles() = 0; + virtual void revalidate_handles() = 0; + virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0; /// Provide a newly initialized AnyExecutable object. // \return Shared pointer to the fresh executable. - virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() - { - return std::make_shared(); - } + virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0; - /// Implementation of a general-purpose allocation function. - /** - * \param[in] size Number of bytes to allocate. - * \return Pointer to the allocated chunk of memory. - */ - virtual void * alloc(size_t size) + virtual void + get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; + + virtual void + get_next_service(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; + + virtual void + get_next_client(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; + + static rclcpp::subscription::SubscriptionBase::SharedPtr + get_subscription_by_handle(void * subscriber_handle, + const WeakNodeVector & weak_nodes) { - if (size == 0) { - return NULL; + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_subscription : group->get_subscription_ptrs()) { + auto subscription = weak_subscription.lock(); + if (subscription) { + if (subscription->get_subscription_handle()->data == subscriber_handle) { + return subscription; + } + if (subscription->get_intra_process_subscription_handle() && + subscription->get_intra_process_subscription_handle()->data == subscriber_handle) + { + return subscription; + } + } + } + } } - return std::malloc(size); + return nullptr; } - /// Implementation of a general-purpose free. - /** - * \param[in] Pointer to deallocate. - */ - virtual void free(void * ptr) + static rclcpp::service::ServiceBase::SharedPtr + get_service_by_handle(void * service_handle, const WeakNodeVector & weak_nodes) { - return std::free(ptr); + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & service : group->get_service_ptrs()) { + if (service->get_service_handle()->data == service_handle) { + return service; + } + } + } + } + return nullptr; + } + + static rclcpp::client::ClientBase::SharedPtr + get_client_by_handle(void * client_handle, const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & client : group->get_client_ptrs()) { + if (client->get_client_handle()->data == client_handle) { + return client; + } + } + } + } + return nullptr; + } + + static rclcpp::node::Node::SharedPtr + get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, + const WeakNodeVector & weak_nodes) + { + if (!group) { + return nullptr; + } + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto callback_group = weak_group.lock(); + if (callback_group == group) { + return node; + } + } + } + return nullptr; + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_sub : group->get_subscription_ptrs()) { + auto sub = weak_sub.lock(); + if (sub == subscription) { + return group; + } + } + } + } + return nullptr; + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_service( + rclcpp::service::ServiceBase::SharedPtr service, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & serv : group->get_service_ptrs()) { + if (serv == service) { + return group; + } + } + } + } + return nullptr; + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_client(rclcpp::client::ClientBase::SharedPtr client, + const WeakNodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & cli : group->get_client_ptrs()) { + if (cli == client) { + return group; + } + } + } + } + return nullptr; } - std::vector subs; - std::vector services; - std::vector clients; - std::vector subscription_handles; - std::vector service_handles; - std::vector client_handles; - std::array guard_cond_handles; }; +} // namespace memory_strategy -MemoryStrategy::SharedPtr create_default_strategy() -{ - return std::make_shared(MemoryStrategy()); -} +} // namespace rclcpp -} /* memory_strategy */ - -} /* rclcpp */ - -#endif +#endif // RCLCPP__MEMORY_STRATEGY_HPP_ diff --git a/rclcpp/include/rclcpp/message_memory_strategy.hpp b/rclcpp/include/rclcpp/message_memory_strategy.hpp index 3ef46d8..61b582d 100644 --- a/rclcpp/include/rclcpp/message_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/message_memory_strategy.hpp @@ -12,12 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_ -#define RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_ +#ifndef RCLCPP__MESSAGE_MEMORY_STRATEGY_HPP_ +#define RCLCPP__MESSAGE_MEMORY_STRATEGY_HPP_ #include -#include +#include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/macros.hpp" namespace rclcpp { @@ -26,24 +27,37 @@ namespace message_memory_strategy /// Default allocation strategy for messages received by subscriptions. // A message memory strategy must be templated on the type of the subscription it belongs to. -template +template> class MessageMemoryStrategy { - public: RCLCPP_SMART_PTR_DEFINITIONS(MessageMemoryStrategy); + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + + MessageMemoryStrategy() + { + message_allocator_ = std::make_shared(); + } + + explicit MessageMemoryStrategy(std::shared_ptr allocator) + { + message_allocator_ = std::make_shared(*allocator.get()); + } + /// Default factory method static SharedPtr create_default() { - return SharedPtr(new MessageMemoryStrategy); + return std::make_shared>(std::make_shared()); } /// By default, dynamically allocate a new message. // \return Shared pointer to the new message. virtual std::shared_ptr borrow_message() { - return std::shared_ptr(new MessageT); + return std::allocate_shared(*message_allocator_.get()); } /// Release ownership of the message, which will deallocate it if it has no more owners. @@ -52,9 +66,12 @@ public: { msg.reset(); } + + std::shared_ptr message_allocator_; + MessageDeleter message_deleter_; }; -} /* message_memory_strategy */ -} /* rclcpp */ +} // namespace message_memory_strategy +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_MSG_MEMORY_STRATEGY_HPP_ */ +#endif // RCLCPP__MESSAGE_MEMORY_STRATEGY_HPP_ diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 6b64bec..0e78b10 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -12,48 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_NODE_HPP_ -#define RCLCPP_RCLCPP_NODE_HPP_ +#ifndef RCLCPP__NODE_HPP_ +#define RCLCPP__NODE_HPP_ #include +#include #include #include #include +#include -#include -#include -#include -#include -#include +#include "rcl_interfaces/msg/list_parameters_result.hpp" +#include "rcl_interfaces/msg/parameter_descriptor.hpp" +#include "rcl_interfaces/msg/parameter_event.hpp" +#include "rcl_interfaces/msg/set_parameters_result.hpp" +#include "rosidl_generator_cpp/message_type_support.hpp" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "rclcpp/callback_group.hpp" +#include "rclcpp/client.hpp" +#include "rclcpp/context.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/message_memory_strategy.hpp" +#include "rclcpp/parameter.hpp" +#include "rclcpp/publisher.hpp" +#include "rclcpp/service.hpp" +#include "rclcpp/subscription.hpp" +#include "rclcpp/timer.hpp" // Forward declaration of ROS middleware class namespace rmw { struct rmw_node_t; -} // namespace rmw +} // namespace rmw namespace rclcpp { - namespace node { /// Node is the single point of entry for creating publishers and subscribers. class Node { - public: RCLCPP_SMART_PTR_DEFINITIONS(Node); @@ -63,7 +63,7 @@ public: * \param[in] use_intra_process_comms True to use the optimized intra-process communication * pipeline to pass messages between nodes in the same process using shared memory. */ - Node(const std::string & node_name, bool use_intra_process_comms = false); + explicit Node(const std::string & node_name, bool use_intra_process_comms = false); /// Create a node based on the node name and a rclcpp::context::Context. /** @@ -91,10 +91,11 @@ public: * \param[in] qos_history_depth The depth of the publisher message queue. * \return Shared pointer to the created publisher. */ - template - typename rclcpp::publisher::Publisher::SharedPtr + template> + typename rclcpp::publisher::Publisher::SharedPtr create_publisher( - const std::string & topic_name, size_t qos_history_depth); + const std::string & topic_name, size_t qos_history_depth, + std::shared_ptr allocator = nullptr); /// Create and return a Publisher. /** @@ -102,11 +103,12 @@ public: * \param[in] qos_profile The quality of service profile to pass on to the rmw implementation. * \return Shared pointer to the created publisher. */ - template - typename rclcpp::publisher::Publisher::SharedPtr + template> + typename rclcpp::publisher::Publisher::SharedPtr create_publisher( const std::string & topic_name, - const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default); + const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, + std::shared_ptr allocator = nullptr); /// Create and return a Subscription. /** @@ -122,16 +124,17 @@ public: Windows build breaks when static member function passed as default argument to msg_mem_strat, nullptr is a workaround. */ - template - typename rclcpp::subscription::Subscription::SharedPtr + template> + typename rclcpp::subscription::Subscription::SharedPtr create_subscription( const std::string & topic_name, CallbackT callback, const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat = nullptr); + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat = nullptr, + std::shared_ptr allocator = nullptr); /// Create and return a Subscription. /** @@ -147,16 +150,17 @@ public: Windows build breaks when static member function passed as default argument to msg_mem_strat, nullptr is a workaround. */ - template - typename rclcpp::subscription::Subscription::SharedPtr + template> + typename rclcpp::subscription::Subscription::SharedPtr create_subscription( const std::string & topic_name, size_t qos_history_depth, CallbackT callback, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat = nullptr); + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat = nullptr, + std::shared_ptr allocator = nullptr); /// Create a timer. /** @@ -274,9 +278,9 @@ const rosidl_message_type_support_t * Node::ipm_ts_ = make_shared())); \ } -#ifndef RCLCPP_RCLCPP_NODE_IMPL_HPP_ +#ifndef RCLCPP__NODE_IMPL_HPP_ // Template implementations #include "node_impl.hpp" #endif -#endif /* RCLCPP_RCLCPP_NODE_HPP_ */ +#endif // RCLCPP__NODE_HPP_ diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 8a9c730..b55d87f 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -12,34 +12,38 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_NODE_IMPL_HPP_ -#define RCLCPP_RCLCPP_NODE_IMPL_HPP_ +#ifndef RCLCPP__NODE_IMPL_HPP_ +#define RCLCPP__NODE_IMPL_HPP_ + +#include +#include #include #include #include #include +#include #include #include #include #include +#include +#include -#include -#include -#include -#include -#include +#include "rcl_interfaces/msg/intra_process_message.hpp" +#include "rosidl_generator_cpp/message_type_support.hpp" +#include "rosidl_generator_cpp/service_type_support.hpp" -#include -#include -#include +#include "rclcpp/contexts/default_context.hpp" +#include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/parameter.hpp" -#ifndef RCLCPP_RCLCPP_NODE_HPP_ +#ifndef RCLCPP__NODE_HPP_ #include "node.hpp" #endif using namespace rclcpp; -using namespace rclcpp::node; +using namespace node; Node::Node(const std::string & node_name, bool use_intra_process_comms) : Node( @@ -113,21 +117,29 @@ Node::create_callback_group( return group; } -template -typename rclcpp::publisher::Publisher::SharedPtr +template +typename rclcpp::publisher::Publisher::SharedPtr Node::create_publisher( - const std::string & topic_name, size_t qos_history_depth) + const std::string & topic_name, size_t qos_history_depth, + std::shared_ptr allocator) { + if (!allocator) { + allocator = std::make_shared(); + } rmw_qos_profile_t qos = rmw_qos_profile_default; qos.depth = qos_history_depth; - return this->create_publisher(topic_name, qos); + return this->create_publisher(topic_name, qos, allocator); } -template -typename publisher::Publisher::SharedPtr +template +typename publisher::Publisher::SharedPtr Node::create_publisher( - const std::string & topic_name, const rmw_qos_profile_t & qos_profile) + const std::string & topic_name, const rmw_qos_profile_t & qos_profile, + std::shared_ptr allocator) { + if (!allocator) { + allocator = std::make_shared(); + } using rosidl_generator_cpp::get_message_type_support_handle; auto type_support_handle = get_message_type_support_handle(); rmw_publisher_t * publisher_handle = rmw_create_publisher( @@ -140,8 +152,8 @@ Node::create_publisher( // *INDENT-ON* } - auto publisher = publisher::Publisher::make_shared( - node_handle_, publisher_handle, topic_name, qos_profile.depth); + auto publisher = publisher::Publisher::make_shared( + node_handle_, publisher_handle, topic_name, qos_profile.depth, allocator); if (use_intra_process_comms_) { rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher( @@ -157,7 +169,7 @@ Node::create_publisher( auto intra_process_manager = context_->get_sub_context(); uint64_t intra_process_publisher_id = - intra_process_manager->add_publisher(publisher); + intra_process_manager->add_publisher(publisher); rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; // *INDENT-OFF* auto shared_publish_callback = @@ -179,8 +191,10 @@ Node::create_publisher( "' is incompatible from the publisher type '" + message_type_info.name() + "'"); } MessageT * typed_message_ptr = static_cast(msg); - std::unique_ptr unique_msg(typed_message_ptr); - uint64_t message_seq = ipm->store_intra_process_message(publisher_id, unique_msg); + using MessageDeleter = typename publisher::Publisher::MessageDeleter; + std::unique_ptr unique_msg(typed_message_ptr); + uint64_t message_seq = + ipm->store_intra_process_message(publisher_id, unique_msg); return message_seq; }; // *INDENT-ON* @@ -205,25 +219,31 @@ Node::group_in_node(callback_group::CallbackGroup::SharedPtr group) return group_belongs_to_this_node; } -template -typename rclcpp::subscription::Subscription::SharedPtr +template +typename rclcpp::subscription::Subscription::SharedPtr Node::create_subscription( const std::string & topic_name, CallbackT callback, const rmw_qos_profile_t & qos_profile, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat) + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat, + typename std::shared_ptr allocator) { - rclcpp::subscription::AnySubscriptionCallback any_subscription_callback; + if (!allocator) { + allocator = std::make_shared(); + } + + rclcpp::subscription::AnySubscriptionCallback any_subscription_callback(allocator); any_subscription_callback.set(callback); using rosidl_generator_cpp::get_message_type_support_handle; if (!msg_mem_strat) { msg_mem_strat = - rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); + rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); } auto type_support_handle = get_message_type_support_handle(); @@ -239,7 +259,7 @@ Node::create_subscription( using namespace rclcpp::subscription; - auto sub = Subscription::make_shared( + auto sub = Subscription::make_shared( node_handle_, subscriber_handle, topic_name, @@ -271,7 +291,7 @@ Node::create_subscription( uint64_t publisher_id, uint64_t message_sequence, uint64_t subscription_id, - std::unique_ptr & message) + typename Subscription::MessageUniquePtr & message) { auto ipm = weak_ipm.lock(); if (!ipm) { @@ -279,7 +299,8 @@ Node::create_subscription( throw std::runtime_error( "intra process take called after destruction of intra process manager"); } - ipm->take_intra_process_message(publisher_id, message_sequence, subscription_id, message); + ipm->take_intra_process_message( + publisher_id, message_sequence, subscription_id, message); }, [weak_ipm](const rmw_gid_t * sender_gid) -> bool { auto ipm = weak_ipm.lock(); @@ -295,7 +316,7 @@ Node::create_subscription( // Assign to a group. if (group) { if (!group_in_node(group)) { - // TODO: use custom exception + // TODO(jacquelinekay): use custom exception throw std::runtime_error("Cannot create subscription, group not in node."); } group->add_subscription(sub_base_ptr); @@ -306,26 +327,28 @@ Node::create_subscription( return sub; } -template -typename rclcpp::subscription::Subscription::SharedPtr +template +typename rclcpp::subscription::Subscription::SharedPtr Node::create_subscription( const std::string & topic_name, size_t qos_history_depth, CallbackT callback, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr - msg_mem_strat) + typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + msg_mem_strat, + std::shared_ptr allocator) { rmw_qos_profile_t qos = rmw_qos_profile_default; qos.depth = qos_history_depth; - return this->create_subscription( + return this->create_subscription( topic_name, callback, qos, group, ignore_local_publications, - msg_mem_strat); + msg_mem_strat, + allocator); } rclcpp::timer::WallTimer::SharedPtr @@ -337,7 +360,7 @@ Node::create_wall_timer( auto timer = rclcpp::timer::WallTimer::make_shared(period, callback); if (group) { if (!group_in_node(group)) { - // TODO: use custom exception + // TODO(jacquelinekay): use custom exception throw std::runtime_error("Cannot create timer, group not in node."); } group->add_timer(timer); @@ -432,7 +455,7 @@ Node::create_service( auto serv_base_ptr = std::dynamic_pointer_cast(serv); if (group) { if (!group_in_node(group)) { - // TODO: use custom exception + // TODO(jacquelinekay): use custom exception throw std::runtime_error("Cannot create service, group not in node."); } group->add_service(serv_base_ptr); @@ -480,7 +503,7 @@ Node::set_parameters_atomically( tmp_map.insert(parameters_.begin(), parameters_.end()); std::swap(tmp_map, parameters_); - // TODO: handle parameter constraints + // TODO(jacquelinekay): handle parameter constraints rcl_interfaces::msg::SetParametersResult result; result.successful = true; @@ -650,4 +673,4 @@ Node::get_callback_groups() const return callback_groups_; } -#endif /* RCLCPP_RCLCPP_NODE_IMPL_HPP_ */ +#endif // RCLCPP__NODE_IMPL_HPP_ diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index d75ed05..d0acec5 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_PUBLISHER_HPP_ -#define RCLCPP_RCLCPP_PUBLISHER_HPP_ +#ifndef RCLCPP__PUBLISHER_HPP_ +#define RCLCPP__PUBLISHER_HPP_ -#include +#include +#include #include #include @@ -23,10 +24,11 @@ #include #include -#include -#include -#include -#include +#include "rcl_interfaces/msg/intra_process_message.hpp" +#include "rmw/impl/cpp/demangle.hpp" + +#include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/macros.hpp" namespace rclcpp { @@ -35,7 +37,7 @@ namespace rclcpp namespace node { class Node; -} // namespace node +} // namespace node namespace publisher { @@ -203,21 +205,30 @@ protected: }; /// A publisher publishes messages of any type to a topic. -template +template> class Publisher : public PublisherBase { friend rclcpp::node::Node; public: - RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); Publisher( std::shared_ptr node_handle, rmw_publisher_t * publisher_handle, std::string topic, - size_t queue_size) + size_t queue_size, + std::shared_ptr allocator) : PublisherBase(node_handle, publisher_handle, topic, queue_size) - {} + { + message_allocator_ = std::make_shared(*allocator.get()); + allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + } /// Send a message to the topic for this publisher. @@ -226,7 +237,7 @@ public: * \param[in] msg A shared pointer to the message to send. */ void - publish(std::unique_ptr & msg) + publish(std::unique_ptr & msg) { this->do_inter_process_publish(msg.get()); if (store_intra_process_message_) { @@ -274,7 +285,9 @@ public: // The intra process manager should probably also be able to store // shared_ptr's and do the "smart" thing based on other intra process // subscriptions. For now call the other publish(). - std::unique_ptr unique_msg(new MessageT(*msg.get())); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); + MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } @@ -291,7 +304,9 @@ public: // The intra process manager should probably also be able to store // shared_ptr's and do the "smart" thing based on other intra process // subscriptions. For now call the other publish(). - std::unique_ptr unique_msg(new MessageT(*msg.get())); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get()); + MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } @@ -304,10 +319,17 @@ public: return this->do_inter_process_publish(&msg); } // Otherwise we have to allocate memory in a unique_ptr and pass it along. - std::unique_ptr unique_msg(new MessageT(msg)); + auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1); + MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg); + MessageUniquePtr unique_msg(ptr, message_deleter_); return this->publish(unique_msg); } + std::shared_ptr get_allocator() const + { + return message_allocator_; + } + protected: void do_inter_process_publish(const MessageT * msg) @@ -321,9 +343,12 @@ protected: } } + std::shared_ptr message_allocator_; + + MessageDeleter message_deleter_; }; -} /* namespace publisher */ -} /* namespace rclcpp */ +} // namespace publisher +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_PUBLISHER_HPP_ */ +#endif // RCLCPP__PUBLISHER_HPP_ diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp new file mode 100644 index 0000000..7622123 --- /dev/null +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -0,0 +1,318 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ +#define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ + +#include +#include + +#include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/node.hpp" + +namespace rclcpp +{ + +namespace memory_strategies +{ + +namespace allocator_memory_strategy +{ + +/// Delegate for handling memory allocations while the Executor is executing. +/** + * By default, the memory strategy dynamically allocates memory for structures that come in from + * the rmw implementation after the executor waits for work, based on the number of entities that + * come through. + */ +template> +class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy); + + using ExecAllocTraits = allocator::AllocRebind; + using ExecAlloc = typename ExecAllocTraits::allocator_type; + using ExecDeleter = allocator::Deleter; + using VoidAllocTraits = typename allocator::AllocRebind; + using VoidAlloc = typename VoidAllocTraits::allocator_type; + + using WeakNodeVector = std::vector>; + + explicit AllocatorMemoryStrategy(std::shared_ptr allocator) + { + executable_allocator_ = std::make_shared(*allocator.get()); + allocator_ = std::make_shared(*allocator.get()); + } + + AllocatorMemoryStrategy() + { + executable_allocator_ = std::make_shared(); + allocator_ = std::make_shared(); + } + + size_t fill_subscriber_handles(void ** & ptr) + { + for (auto & subscription : subscriptions_) { + subscriber_handles_.push_back(subscription->get_subscription_handle()->data); + if (subscription->get_intra_process_subscription_handle()) { + subscriber_handles_.push_back(subscription->get_intra_process_subscription_handle()->data); + } + } + ptr = subscriber_handles_.data(); + return subscriber_handles_.size(); + } + + // return the new number of services + size_t fill_service_handles(void ** & ptr) + { + for (auto & service : services_) { + service_handles_.push_back(service->get_service_handle()->data); + } + ptr = service_handles_.data(); + return service_handles_.size(); + } + + // return the new number of clients + size_t fill_client_handles(void ** & ptr) + { + for (auto & client : clients_) { + client_handles_.push_back(client->get_client_handle()->data); + } + ptr = client_handles_.data(); + return client_handles_.size(); + } + + void clear_active_entities() + { + subscriptions_.clear(); + services_.clear(); + clients_.clear(); + } + + void clear_handles() + { + subscriber_handles_.clear(); + service_handles_.clear(); + client_handles_.clear(); + } + + void revalidate_handles() + { + { + VectorRebind temp; + for (auto & subscriber_handle : subscriber_handles_) { + if (subscriber_handle) { + temp.push_back(subscriber_handle); + } + } + subscriber_handles_.swap(temp); + } + { + VectorRebind temp; + for (auto & service_handle : service_handles_) { + if (service_handle) { + temp.push_back(service_handle); + } + } + service_handles_.swap(temp); + } + { + VectorRebind temp; + for (auto & client_handle : client_handles_) { + if (client_handle) { + temp.push_back(client_handle); + } + } + client_handles_.swap(temp); + } + } + + bool collect_entities(const WeakNodeVector & weak_nodes) + { + bool has_invalid_weak_nodes = false; + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + has_invalid_weak_nodes = false; + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + for (auto & weak_subscription : group->get_subscription_ptrs()) { + auto subscription = weak_subscription.lock(); + if (subscription) { + subscriptions_.push_back(subscription); + } + } + for (auto & service : group->get_service_ptrs()) { + if (service) { + services_.push_back(service); + } + } + for (auto & client : group->get_client_ptrs()) { + if (client) { + clients_.push_back(client); + } + } + } + } + return has_invalid_weak_nodes; + } + + + /// Provide a newly initialized AnyExecutable object. + // \return Shared pointer to the fresh executable. + executor::AnyExecutable::SharedPtr instantiate_next_executable() + { + return std::allocate_shared(*executable_allocator_.get()); + } + + virtual void + get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) + { + auto it = subscriber_handles_.begin(); + while (it != subscriber_handles_.end()) { + auto subscription = get_subscription_by_handle(*it, weak_nodes); + if (subscription) { + // Figure out if this is for intra-process or not. + bool is_intra_process = false; + if (subscription->get_intra_process_subscription_handle()) { + is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it; + } + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_subscription(subscription, weak_nodes); + if (!group) { + // Group was not found, meaning the subscription is not valid... + // Remove it from the ready list and continue looking + subscriber_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + if (is_intra_process) { + any_exec->subscription_intra_process = subscription; + } else { + any_exec->subscription = subscription; + } + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group, weak_nodes); + subscriber_handles_.erase(it); + return; + } + // Else, the subscription is no longer valid, remove it and continue + subscriber_handles_.erase(it); + } + } + + virtual void + get_next_service(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) + { + auto it = service_handles_.begin(); + while (it != service_handles_.end()) { + auto service = get_service_by_handle(*it, weak_nodes); + if (service) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_service(service, weak_nodes); + if (!group) { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + service_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->service = service; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group, weak_nodes); + service_handles_.erase(it); + return; + } + // Else, the service is no longer valid, remove it and continue + service_handles_.erase(it); + } + } + + virtual void + get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) + { + auto it = client_handles_.begin(); + while (it != client_handles_.end()) { + auto client = get_client_by_handle(*it, weak_nodes); + if (client) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_client(client, weak_nodes); + if (!group) { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + client_handles_.erase(it); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->client = client; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group, weak_nodes); + client_handles_.erase(it); + return; + } + // Else, the service is no longer valid, remove it and continue + client_handles_.erase(it); + } + } + +private: + template + using VectorRebind = + std::vector::template rebind_alloc>; + + VectorRebind subscriptions_; + VectorRebind services_; + VectorRebind clients_; + + VectorRebind subscriber_handles_; + VectorRebind service_handles_; + VectorRebind client_handles_; + + std::shared_ptr executable_allocator_; + std::shared_ptr allocator_; +}; + +} // namespace allocator_memory_strategy +} // namespace memory_strategies + +} // namespace rclcpp + +#endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_ diff --git a/rclcpp/include/rclcpp/strategies/heap_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/heap_pool_memory_strategy.hpp deleted file mode 100644 index 8e7151c..0000000 --- a/rclcpp/include/rclcpp/strategies/heap_pool_memory_strategy.hpp +++ /dev/null @@ -1,336 +0,0 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP_RCLCPP_HEAP_POOL_MEMORY_STRATEGY_HPP_ -#define RCLCPP_RCLCPP_HEAP_POOL_MEMORY_STRATEGY_HPP_ - -#include - -#include - -namespace rclcpp -{ - -namespace memory_strategies -{ - -namespace heap_pool_memory_strategy -{ - -/// Representation of the upper bounds on the memory pools managed by StaticMemoryStrategy. -struct ObjectPoolBounds -{ -public: - size_t max_subscriptions; - size_t max_services; - size_t max_clients; - size_t max_executables; - size_t max_guard_conditions; - size_t pool_size; - - /// Default constructor attempts to set reasonable default limits on the fields. - ObjectPoolBounds() - : max_subscriptions(10), max_services(10), max_clients(10), - max_executables(1), max_guard_conditions(2), pool_size(1024) - {} - - // Setters implement named parameter idiom/method chaining - - /// Set the maximum number of subscriptions. - /** - * \param[in] subscriptions Maximum number of subscriptions. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_subscriptions(size_t subscriptions) - { - max_subscriptions = subscriptions; - return *this; - } - - /// Set the maximum number of services. - /** - * \param[in] services Maximum number of services. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_services(size_t services) - { - max_services = services; - return *this; - } - - /// Set the maximum number of clients. - /** - * \param[in] clients Maximum number of clients. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_clients(size_t clients) - { - max_clients = clients; - return *this; - } - - /// Set the maximum number of guard conditions. - /** - * \param[in] guard conditions Maximum number of guard conditions. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_guard_conditions(size_t guard_conditions) - { - max_guard_conditions = guard_conditions; - return *this; - } - - /// Set the maximum number of executables. - /** - * \param[in] executables Maximum number of executables. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_max_executables(size_t executables) - { - max_executables = executables; - return *this; - } - - /// Set the maximum memory pool size. - /** - * \param[in] executables Maximum memory pool size. - * \return Reference to this object, for method chaining. - */ - ObjectPoolBounds & set_memory_pool_size(size_t pool) - { - pool_size = pool; - return *this; - } -}; - - -/// Heap-based memory pool allocation strategy, an alternative to the default memory strategy. -/** - * The memory managed by this class is allocated dynamically in the constructor, but no subsequent - * accesses to the class (besides the destructor) allocate or free memory. - * HeapPoolMemoryStrategy puts a hard limit on the number of subscriptions, etc. that can be executed - * in one iteration of `Executor::spin`. Thus it allows for memory allocation optimization for - * situations where a limit on the number of such entities is known. - */ -class HeapPoolMemoryStrategy : public memory_strategy::MemoryStrategy -{ -public: - HeapPoolMemoryStrategy(ObjectPoolBounds bounds = ObjectPoolBounds()) - : bounds_(bounds), memory_pool_(nullptr), subscription_pool_(nullptr), - service_pool_(nullptr), guard_condition_pool_(nullptr), executable_pool_(nullptr) - { - if (bounds_.pool_size) { - memory_pool_ = new void *[bounds_.pool_size]; - memset(memory_pool_, 0, bounds_.pool_size * sizeof(void *)); - } - - if (bounds_.max_subscriptions) { - subscription_pool_ = new void *[bounds_.max_subscriptions]; - memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *)); - } - - if (bounds_.max_services) { - service_pool_ = new void *[bounds_.max_services]; - memset(service_pool_, 0, bounds_.max_services * sizeof(void *)); - } - - if (bounds_.max_clients) { - client_pool_ = new void *[bounds_.max_clients]; - memset(client_pool_, 0, bounds_.max_clients * sizeof(void *)); - } - - if (bounds_.max_guard_conditions) { - guard_condition_pool_ = new void *[bounds_.max_guard_conditions]; - memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *)); - } - - if (bounds_.max_executables) { - executable_pool_ = new executor::AnyExecutable::SharedPtr[bounds_.max_executables]; - } - - for (size_t i = 0; i < bounds_.max_executables; ++i) { - executable_pool_[i] = std::make_shared(); - } - - pool_seq_ = 0; - exec_seq_ = 0; - - // Reserve pool_size_ buckets in the memory map. - memory_map_.reserve(bounds_.pool_size); - for (size_t i = 0; i < bounds_.pool_size; ++i) { - memory_map_[memory_pool_[i]] = 0; - } - - subs.reserve(bounds_.max_subscriptions); - clients.reserve(bounds_.max_clients); - services.reserve(bounds_.max_services); - } - - /// Default destructor. Free all allocated memory. - ~HeapPoolMemoryStrategy() - { - if (bounds_.pool_size) { - delete[] memory_pool_; - } - if (bounds_.max_subscriptions) { - delete[] subscription_pool_; - } - if (bounds_.max_services) { - delete[] service_pool_; - } - if (bounds_.max_clients) { - delete[] client_pool_; - } - if (bounds_.max_guard_conditions) { - delete[] guard_condition_pool_; - } - } - - void ** borrow_handles(HandleType type, size_t number_of_handles) - { - switch (type) { - case HandleType::subscription_handle: - if (number_of_handles > bounds_.max_subscriptions) { - throw std::runtime_error("Requested size exceeded maximum subscriptions."); - } - - return subscription_pool_; - case HandleType::service_handle: - if (number_of_handles > bounds_.max_services) { - throw std::runtime_error("Requested size exceeded maximum services."); - } - - return service_pool_; - case HandleType::client_handle: - if (number_of_handles > bounds_.max_clients) { - throw std::runtime_error("Requested size exceeded maximum clients."); - } - - return client_pool_; - case HandleType::guard_condition_handle: - if (number_of_handles > bounds_.max_guard_conditions) { - throw std::runtime_error("Requested size exceeded maximum guard_conditions."); - } - - return guard_condition_pool_; - default: - break; - } - throw std::runtime_error("Unrecognized enum, could not borrow handle memory."); - } - - void return_handles(HandleType type, void ** handles) - { - (void)handles; - switch (type) { - case HandleType::subscription_handle: - if (bounds_.max_subscriptions) { - memset(subscription_pool_, 0, bounds_.max_subscriptions * sizeof(void *)); - } - break; - case HandleType::service_handle: - if (bounds_.max_services) { - memset(service_pool_, 0, bounds_.max_services * sizeof(void *)); - } - break; - case HandleType::client_handle: - if (bounds_.max_clients) { - memset(client_pool_, 0, bounds_.max_clients * sizeof(void *)); - } - break; - case HandleType::guard_condition_handle: - if (bounds_.max_guard_conditions) { - memset(guard_condition_pool_, 0, bounds_.max_guard_conditions * sizeof(void *)); - } - break; - default: - throw std::runtime_error("Unrecognized enum, could not return handle memory."); - } - } - - executor::AnyExecutable::SharedPtr instantiate_next_executable() - { - if (exec_seq_ >= bounds_.max_executables) { - // wrap around - exec_seq_ = 0; - } - size_t prev_exec_seq_ = exec_seq_; - ++exec_seq_; - - if (!executable_pool_[prev_exec_seq_]) { - throw std::runtime_error("Executable pool member was NULL"); - } - - executable_pool_[prev_exec_seq_]->subscription.reset(); - executable_pool_[prev_exec_seq_]->timer.reset(); - executable_pool_[prev_exec_seq_]->service.reset(); - executable_pool_[prev_exec_seq_]->client.reset(); - executable_pool_[prev_exec_seq_]->callback_group.reset(); - executable_pool_[prev_exec_seq_]->node.reset(); - - return executable_pool_[prev_exec_seq_]; - } - - void * alloc(size_t size) - { - // Extremely naive static allocation strategy - // Keep track of block size at a given pointer - if (pool_seq_ + size > bounds_.pool_size) { - // Start at 0 - pool_seq_ = 0; - } - void * ptr = memory_pool_[pool_seq_]; - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in HeapPoolMemoryStrategy::alloc."); - } - memory_map_[ptr] = size; - size_t prev_pool_seq = pool_seq_; - pool_seq_ += size; - return memory_pool_[prev_pool_seq]; - } - - void free(void * ptr) - { - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in HeapPoolMemoryStrategy::free."); - } - - memset(ptr, 0, memory_map_[ptr]); - } - -private: - ObjectPoolBounds bounds_; - - void ** memory_pool_; - void ** subscription_pool_; - void ** service_pool_; - void ** client_pool_; - void ** guard_condition_pool_; - executor::AnyExecutable::SharedPtr * executable_pool_; - - size_t pool_seq_; - size_t exec_seq_; - - std::unordered_map memory_map_; -}; - -} /* heap_pool_memory_strategy */ - -} /* memory_strategies */ - -} /* rclcpp */ - -#endif diff --git a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp index 43e518c..8f9438c 100644 --- a/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/message_pool_memory_strategy.hpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_ -#define RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_ +#ifndef RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ +#define RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ #include #include @@ -96,10 +96,9 @@ protected: std::array pool_; size_t next_array_index_; - }; -} /* message_pool_memory_strategy */ -} /* strategies */ -} /* rclcpp */ -#endif /* RCLCPP_RCLCPP_MSG_POOL_MEMORY_STRATEGY_HPP_ */ +} // namespace message_pool_memory_strategy +} // namespace strategies +} // namespace rclcpp +#endif // RCLCPP__STRATEGIES__MESSAGE_POOL_MEMORY_STRATEGY_HPP_ diff --git a/rclcpp/include/rclcpp/strategies/stack_pool_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/stack_pool_memory_strategy.hpp deleted file mode 100644 index a7fd0a9..0000000 --- a/rclcpp/include/rclcpp/strategies/stack_pool_memory_strategy.hpp +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright 2015 Open Source Robotics Foundation, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RCLCPP_RCLCPP_STACK_POOL_MEMORY_STRATEGY_HPP_ -#define RCLCPP_RCLCPP_STACK_POOL_MEMORY_STRATEGY_HPP_ - -#include - -#include - -namespace rclcpp -{ - -namespace memory_strategies -{ - -namespace stack_pool_memory_strategy -{ - -/// Stack-based memory pool allocation strategy, an alternative to the default memory strategy. -/** - * The memory managed by this class is allocated statically (on the heap) in the constructor. - * StackPoolMemoryStrategy puts a hard limit on the number of subscriptions, etc. that can be - * executed in one iteration of `Executor::spin`. Thus it allows for memory allocation optimization - * for situations where a limit on the number of such entities is known. - * Because this class is templated on the sizes of the memory pools for each entity, the amount of - * memory required by this class is known at compile time. - */ -template -class StackPoolMemoryStrategy : public memory_strategy::MemoryStrategy -{ -public: - StackPoolMemoryStrategy() - { - if (PoolSize) { - memory_pool_.fill(0); - } - - if (MaxSubscriptions) { - subscription_pool_.fill(0); - } - - if (MaxServices) { - service_pool_.fill(0); - } - - if (MaxClients) { - client_pool_.fill(0); - } - - if (MaxGuardConditions) { - guard_condition_pool_.fill(0); - } - - for (size_t i = 0; i < MaxExecutables; ++i) { - executable_pool_[i] = std::make_shared(); - } - - pool_seq_ = 0; - exec_seq_ = 0; - - // Reserve pool_size_ buckets in the memory map. - memory_map_.reserve(PoolSize); - for (size_t i = 0; i < PoolSize; ++i) { - memory_map_[memory_pool_[i]] = 0; - } - subs.reserve(MaxSubscriptions); - clients.reserve(MaxClients); - services.reserve(MaxServices); - } - - void ** borrow_handles(HandleType type, size_t number_of_handles) - { - switch (type) { - case HandleType::subscription_handle: - if (number_of_handles > MaxSubscriptions) { - throw std::runtime_error("Requested size exceeded maximum subscriptions."); - } - - return subscription_pool_.data(); - case HandleType::service_handle: - if (number_of_handles > MaxServices) { - throw std::runtime_error("Requested size exceeded maximum services."); - } - - return service_pool_.data(); - case HandleType::client_handle: - if (number_of_handles > MaxClients) { - throw std::runtime_error("Requested size exceeded maximum clients."); - } - - return client_pool_.data(); - case HandleType::guard_condition_handle: - if (number_of_handles > MaxGuardConditions) { - throw std::runtime_error("Requested size exceeded maximum guard_conditions."); - } - - return guard_condition_pool_.data(); - default: - break; - } - throw std::runtime_error("Unrecognized enum, could not borrow handle memory."); - } - - void return_handles(HandleType type, void ** handles) - { - (void)handles; - switch (type) { - case HandleType::subscription_handle: - if (MaxSubscriptions) { - subscription_pool_.fill(0); - } - break; - case HandleType::service_handle: - if (MaxServices) { - service_pool_.fill(0); - } - break; - case HandleType::client_handle: - if (MaxClients) { - client_pool_.fill(0); - } - break; - case HandleType::guard_condition_handle: - if (MaxGuardConditions) { - guard_condition_pool_.fill(0); - } - break; - default: - throw std::runtime_error("Unrecognized enum, could not return handle memory."); - } - } - - executor::AnyExecutable::SharedPtr instantiate_next_executable() - { - if (exec_seq_ >= MaxExecutables) { - // wrap around - exec_seq_ = 0; - } - size_t prev_exec_seq_ = exec_seq_; - ++exec_seq_; - - if (!executable_pool_[prev_exec_seq_]) { - throw std::runtime_error("Executable pool member was NULL"); - } - - // Make sure to clear the executable fields. - executable_pool_[prev_exec_seq_]->subscription.reset(); - executable_pool_[prev_exec_seq_]->timer.reset(); - executable_pool_[prev_exec_seq_]->service.reset(); - executable_pool_[prev_exec_seq_]->client.reset(); - executable_pool_[prev_exec_seq_]->callback_group.reset(); - executable_pool_[prev_exec_seq_]->node.reset(); - - return executable_pool_[prev_exec_seq_]; - } - - void * alloc(size_t size) - { - // Extremely naive static allocation strategy - // Keep track of block size at a given pointer - if (pool_seq_ + size > PoolSize) { - // Start at 0 - pool_seq_ = 0; - } - void * ptr = memory_pool_[pool_seq_]; - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in StackPoolMemoryStrategy::alloc."); - } - memory_map_[ptr] = size; - size_t prev_pool_seq = pool_seq_; - pool_seq_ += size; - return memory_pool_[prev_pool_seq]; - } - - void free(void * ptr) - { - if (memory_map_.count(ptr) == 0) { - // We expect to have the state for all blocks pre-mapped into memory_map_ - throw std::runtime_error("Unexpected pointer in StackPoolMemoryStrategy::free."); - } - - memset(ptr, 0, memory_map_[ptr]); - } - -private: - std::array memory_pool_; - std::array subscription_pool_; - std::array service_pool_; - std::array client_pool_; - std::array guard_condition_pool_; - std::array executable_pool_; - - size_t pool_seq_; - size_t exec_seq_; - - std::unordered_map memory_map_; -}; - -} /* stack_pool_memory_strategy */ - -} /* memory_strategies */ - -} /* rclcpp */ - -#endif diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index b7250e6..fb9c952 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ -#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ +#ifndef RCLCPP__SUBSCRIPTION_HPP_ +#define RCLCPP__SUBSCRIPTION_HPP_ + +#include +#include #include #include @@ -21,13 +24,11 @@ #include #include -#include -#include -#include +#include "rcl_interfaces/msg/intra_process_message.hpp" -#include -#include -#include +#include "rclcpp/macros.hpp" +#include "rclcpp/message_memory_strategy.hpp" +#include "rclcpp/any_subscription_callback.hpp" namespace rclcpp { @@ -35,7 +36,7 @@ namespace rclcpp namespace node { class Node; -} // namespace node +} // namespace node namespace subscription { @@ -44,7 +45,6 @@ namespace subscription /// specializations of Subscription, among other things. class SubscriptionBase { - public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(SubscriptionBase); @@ -138,18 +138,22 @@ private: std::string topic_name_; bool ignore_local_publications_; - }; using namespace any_subscription_callback; /// Subscription implementation, templated on the type of message this subscription receives. -template +template> class Subscription : public SubscriptionBase { friend class rclcpp::node::Node; public: + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + RCLCPP_SMART_PTR_DEFINITIONS(Subscription); /// Default constructor. @@ -167,15 +171,17 @@ public: rmw_subscription_t * subscription_handle, const std::string & topic_name, bool ignore_local_publications, - AnySubscriptionCallback callback, - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = - message_memory_strategy::MessageMemoryStrategy::create_default()) + AnySubscriptionCallback callback, + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = + message_memory_strategy::MessageMemoryStrategy::create_default()) : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), any_callback_(callback), message_memory_strategy_(memory_strategy), get_intra_process_message_callback_(nullptr), matches_any_intra_process_publishers_(nullptr) - {} + { + } /// Support dynamically setting the message memory strategy. /** @@ -183,7 +189,8 @@ public: * \param[in] message_memory_strategy Shared pointer to the memory strategy to set. */ void set_message_memory_strategy( - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy) + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy) { message_memory_strategy_ = message_memory_strategy; } @@ -226,7 +233,7 @@ public: // However, this can only really happen if this node has it disabled, but the other doesn't. return; } - std::unique_ptr msg; + MessageUniquePtr msg; get_intra_process_message_callback_( ipm.publisher_id, ipm.message_sequence, @@ -244,7 +251,7 @@ public: private: typedef std::function< - void (uint64_t, uint64_t, uint64_t, std::unique_ptr &) + void (uint64_t, uint64_t, uint64_t, MessageUniquePtr &) > GetMessageCallbackType; typedef std::function MatchesAnyPublishersCallbackType; @@ -262,17 +269,16 @@ private: RCLCPP_DISABLE_COPY(Subscription); - AnySubscriptionCallback any_callback_; - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr + AnySubscriptionCallback any_callback_; + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; GetMessageCallbackType get_intra_process_message_callback_; MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_; uint64_t intra_process_subscription_id_; - }; -} /* namespace subscription */ -} /* namespace rclcpp */ +} // namespace subscription +} // namespace rclcpp -#endif /* RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ */ +#endif // RCLCPP__SUBSCRIPTION_HPP_ diff --git a/rclcpp/test/test_intra_process_manager.cpp b/rclcpp/test/test_intra_process_manager.cpp index 4ce7f90..6f47c83 100644 --- a/rclcpp/test/test_intra_process_manager.cpp +++ b/rclcpp/test/test_intra_process_manager.cpp @@ -16,6 +16,7 @@ #include +#include #include #include @@ -55,11 +56,27 @@ public: size_t mock_queue_size; }; -template +template> class Publisher : public PublisherBase { public: - RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + using MessageAllocTraits = allocator::AllocRebind; + using MessageAlloc = typename MessageAllocTraits::allocator_type; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; + std::shared_ptr allocator_; + + RCLCPP_SMART_PTR_DEFINITIONS(Publisher); + + Publisher() + { + allocator_ = std::make_shared(); + } + + std::shared_ptr get_allocator() + { + return allocator_; + } }; } @@ -99,8 +116,8 @@ public: } // Prevent rclcpp/publisher.hpp and rclcpp/subscription.hpp from being imported. -#define RCLCPP_RCLCPP_PUBLISHER_HPP_ -#define RCLCPP_RCLCPP_SUBSCRIPTION_HPP_ +#define RCLCPP__PUBLISHER_HPP_ +#define RCLCPP__SUBSCRIPTION_HPP_ // Force ipm to use our mock publisher class. #define Publisher mock::Publisher #define PublisherBase mock::PublisherBase @@ -139,8 +156,10 @@ TEST(TestIntraProcessManager, nominal) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); - auto p2_id = ipm.add_publisher(p2); + auto p1_id = + ipm.add_publisher>(p1); + auto p2_id = + ipm.add_publisher>(p2); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -221,7 +240,8 @@ TEST(TestIntraProcessManager, remove_publisher_before_trying_to_take) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -269,7 +289,8 @@ TEST(TestIntraProcessManager, removed_subscription_affects_take) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -338,7 +359,8 @@ TEST(TestIntraProcessManager, multiple_subscriptions_one_publisher) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -410,9 +432,12 @@ TEST(TestIntraProcessManager, multiple_publishers_one_subscription) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); - auto p2_id = ipm.add_publisher(p2); - auto p3_id = ipm.add_publisher(p3); + auto p1_id = + ipm.add_publisher>(p1); + auto p2_id = + ipm.add_publisher>(p2); + auto p3_id = + ipm.add_publisher>(p3); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -512,9 +537,12 @@ TEST(TestIntraProcessManager, multiple_publishers_multiple_subscription) { s3->mock_topic_name = "nominal1"; s3->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); - auto p2_id = ipm.add_publisher(p2); - auto p3_id = ipm.add_publisher(p3); + auto p1_id = + ipm.add_publisher>(p1); + auto p2_id = + ipm.add_publisher>(p2); + auto p3_id = + ipm.add_publisher>(p3); auto s1_id = ipm.add_subscription(s1); auto s2_id = ipm.add_subscription(s2); auto s3_id = ipm.add_subscription(s3); @@ -655,7 +683,8 @@ TEST(TestIntraProcessManager, ring_buffer_displacement) { s1->mock_topic_name = "nominal1"; s1->mock_queue_size = 10; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto s1_id = ipm.add_subscription(s1); auto ipm_msg = std::make_shared(); @@ -720,7 +749,8 @@ TEST(TestIntraProcessManager, subscription_creation_race_condition) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - auto p1_id = ipm.add_publisher(p1); + auto p1_id = + ipm.add_publisher>(p1); auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; @@ -767,7 +797,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_take) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - p1_id = ipm.add_publisher(p1); + p1_id = ipm.add_publisher>(p1); auto ipm_msg = std::make_shared(); ipm_msg->message_sequence = 42; @@ -805,7 +835,7 @@ TEST(TestIntraProcessManager, publisher_out_of_scope_store) { p1->mock_topic_name = "nominal1"; p1->mock_queue_size = 2; - p1_id = ipm.add_publisher(p1); + p1_id = ipm.add_publisher>(p1); } auto ipm_msg = std::make_shared();