From 06818ee78c9641f383ac6c603fcb85cc4a79711b Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Mon, 26 Oct 2015 18:29:53 -0700 Subject: [PATCH] move a lot to memory strategy --- rclcpp/include/rclcpp/executor.hpp | 53 +- rclcpp/include/rclcpp/memory_strategies.hpp | 17 +- rclcpp/include/rclcpp/memory_strategy.hpp | 158 ++---- .../strategies/allocator_memory_strategy.hpp | 495 ++++++++++++++---- 4 files changed, 478 insertions(+), 245 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 9a1658b..97fdd33 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -27,7 +27,7 @@ #include #include -#include +#include #include #include @@ -343,10 +343,17 @@ protected: void wait_for_work(std::chrono::duration timeout = std::chrono::duration(-1)) { + /* memory_strategy_->subs.clear(); memory_strategy_->services.clear(); memory_strategy_->clients.clear(); + */ + memory_strategy_->clear_active_entities(); + // Collect the subscriptions and timers to be waited on + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + /* bool has_invalid_weak_nodes = false; for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -373,6 +380,8 @@ protected: } } } + */ + // Clean up any invalid nodes, if they were detected if (has_invalid_weak_nodes) { weak_nodes_.erase( @@ -382,7 +391,22 @@ protected: return i.expired(); })); } + + // Use the number of subscriptions to allocate memory in the handles + rmw_subscriptions_t subscriber_handles; + subscriber_handles.subscriber_count = + memory_strategy_->fill_subscriber_handles(subscriber_handles.subscribers); + + rmw_services_t service_handles; + service_handles.service_count = + memory_strategy_->fill_service_handles(service_handles.services); + + rmw_clients_t client_handles; + client_handles.client_count = + memory_strategy_->fill_client_handles(client_handles.clients); + + /* size_t max_number_of_subscriptions = memory_strategy_->subs.size() * 2; // Times two for intra-process. rmw_subscriptions_t subscriber_handles; subscriber_handles.subscriber_count = 0; @@ -442,14 +466,16 @@ protected: client->get_client_handle()->data; client_handle_index += 1; } + */ // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, // and one for the executor's guard cond (interrupt_guard_condition_) size_t number_of_guard_conds = 2; rmw_guard_conditions_t guard_condition_handles; guard_condition_handles.guard_condition_count = number_of_guard_conds; - guard_condition_handles.guard_conditions = - memory_strategy_->borrow_handles(HandleType::guard_condition_handle, number_of_guard_conds); + /*guard_condition_handles.guard_conditions = + memory_strategy_->borrow_handles(HandleType::guard_condition_handle, number_of_guard_conds);*/ + guard_condition_handles.guard_conditions = static_cast(guard_cond_handles_.data()); if (guard_condition_handles.guard_conditions == NULL && number_of_guard_conds > 0) { @@ -500,6 +526,7 @@ protected: // If ctrl-c guard condition, return directly if (guard_condition_handles.guard_conditions[0] != 0) { // Make sure to free or clean memory + /* memory_strategy_->return_handles(HandleType::subscription_handle, subscriber_handles.subscribers); memory_strategy_->return_handles(HandleType::service_handle, @@ -508,10 +535,13 @@ protected: client_handles.clients); memory_strategy_->return_handles(HandleType::guard_condition_handle, guard_condition_handles.guard_conditions); + */ + memory_strategy_->clear_handles(); return; } // Add the new work to the class's list of things waiting to be executed // Starting with the subscribers + /* for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) { void * handle = subscriber_handles.subscribers[i]; if (handle) { @@ -545,10 +575,13 @@ protected: memory_strategy_->subs.clear(); memory_strategy_->services.clear(); memory_strategy_->clients.clear(); + */ + memory_strategy_->clear_active_entities(); } /******************************/ +/* rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription_by_handle(void * subscriber_handle) { @@ -579,6 +612,7 @@ protected: } return nullptr; } +*/ rclcpp::service::ServiceBase::SharedPtr get_service_by_handle(void * service_handle) @@ -729,6 +763,7 @@ protected: return latest; } + /* rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_subscription( rclcpp::subscription::SubscriptionBase::SharedPtr subscription) @@ -909,28 +944,29 @@ protected: client_handles_.erase(it++); } } +*/ AnyExecutable::SharedPtr get_next_ready_executable() { - auto any_exec = this->memory_strategy_->instantiate_next_executable(); + auto any_exec = memory_strategy_->instantiate_next_executable(); // Check the timers to see if there are any that are ready, if so return get_next_timer(any_exec); if (any_exec->timer) { return any_exec; } // Check the subscriptions to see if there are any that are ready - get_next_subscription(any_exec); + memory_strategy_->get_next_subscription(any_exec, weak_nodes_); if (any_exec->subscription || any_exec->subscription_intra_process) { return any_exec; } // Check the services to see if there are any that are ready - get_next_service(any_exec); + memory_strategy_->get_next_service(any_exec, weak_nodes_); if (any_exec->service) { return any_exec; } // Check the clients to see if there are any that are ready - get_next_client(any_exec); + memory_strategy_->get_next_client(any_exec, weak_nodes_); if (any_exec->client) { return any_exec; } @@ -980,12 +1016,15 @@ private: RCLCPP_DISABLE_COPY(Executor); std::vector> weak_nodes_; + std::array guard_cond_handles_; + /* using SubscriberHandles = std::list; SubscriberHandles subscriber_handles_; using ServiceHandles = std::list; ServiceHandles service_handles_; using ClientHandles = std::list; ClientHandles client_handles_; + */ }; } /* executor */ diff --git a/rclcpp/include/rclcpp/memory_strategies.hpp b/rclcpp/include/rclcpp/memory_strategies.hpp index 598da69..2887426 100644 --- a/rclcpp/include/rclcpp/memory_strategies.hpp +++ b/rclcpp/include/rclcpp/memory_strategies.hpp @@ -15,18 +15,27 @@ #ifndef RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_ #define RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_ -#include -#include +//#include +//#include +#include namespace rclcpp { namespace memory_strategies { -using rclcpp::memory_strategies::heap_pool_memory_strategy::HeapPoolMemoryStrategy; -using rclcpp::memory_strategies::stack_pool_memory_strategy::StackPoolMemoryStrategy; +using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy; +//using rclcpp::memory_strategies::heap_pool_memory_strategy::HeapPoolMemoryStrategy; +//using rclcpp::memory_strategies::stack_pool_memory_strategy::StackPoolMemoryStrategy; } /* memory_strategies */ + +namespace memory_strategy { + MemoryStrategy::SharedPtr create_default_strategy() { + return std::make_shared>(); + } +} + } /* rclcpp */ #endif diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index eed0e0e..86acfb8 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -14,19 +14,17 @@ #ifndef RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_ #define RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_ + +#include +#include + #include +#include +#include namespace rclcpp { -// TODO move HandleType somewhere where it makes sense -enum class HandleType {subscription_handle, service_handle, client_handle, guard_condition_handle}; - -namespace executor -{ -class Executor; -} - namespace memory_strategy { @@ -38,137 +36,43 @@ namespace memory_strategy */ class MemoryStrategy { - - friend class executor::Executor; - public: - RCLCPP_SMART_PTR_DEFINITIONS(MemoryStrategy); + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy); + using WeakNodeVector = std::vector>>; - /// Borrow memory for storing data for subscriptions, services, clients, or guard conditions. - /** - * The default implementation stores std::vectors for each handle type and resizes the vectors - * as necessary based on the requested number of handles. - * \param[in] The type of entity that this function is requesting for. - * \param[in] The number of handles to borrow. - * \return Pointer to the allocated handles. - */ - virtual void ** borrow_handles(HandleType type, size_t number_of_handles) - { - switch (type) { - case HandleType::subscription_handle: - if (subscription_handles.size() < number_of_handles) { - subscription_handles.resize(number_of_handles, 0); - } - return static_cast(subscription_handles.data()); - case HandleType::service_handle: - if (service_handles.size() < number_of_handles) { - service_handles.resize(number_of_handles, 0); - } - return static_cast(service_handles.data()); - case HandleType::client_handle: - if (client_handles.size() < number_of_handles) { - client_handles.resize(number_of_handles, 0); - } - return static_cast(client_handles.data()); - case HandleType::guard_condition_handle: - if (number_of_handles > 2) { - throw std::runtime_error("Too many guard condition handles requested!"); - } - return guard_cond_handles.data(); - default: - throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast(type)) + - ", could not borrow handle memory."); - } - } + // return the new number of subscribers + virtual size_t fill_subscriber_handles(void ** ptr) = 0; - /// Return the memory borrowed in borrow_handles. - /** - * return_handles should always mirror the way memory was borrowed in borrow_handles. - * \param[in] The type of entity that this function is returning. - * \param[in] Pointer to the handles returned. - */ - virtual void return_handles(HandleType type, void ** handles) - { - switch (type) { - case HandleType::subscription_handle: - if (handles != subscription_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - memset(handles, 0, subscription_handles.size()); - break; - case HandleType::service_handle: - if (handles != service_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - memset(handles, 0, service_handles.size()); - break; - case HandleType::client_handle: - if (handles != client_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - memset(handles, 0, client_handles.size()); - break; - case HandleType::guard_condition_handle: - if (handles != guard_cond_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this MemoryStrategy"); - } - guard_cond_handles.fill(0); - break; - default: - throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast(type)) + - ", could not borrow handle memory."); - } - } + // return the new number of services + virtual size_t fill_service_handles(void ** ptr) = 0; + + // return the new number of clients + virtual size_t fill_client_handles(void ** ptr) = 0; + + virtual void clear_active_entities() = 0; + + virtual void clear_handles() = 0; + virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0; /// Provide a newly initialized AnyExecutable object. // \return Shared pointer to the fresh executable. - virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() - { - return std::make_shared(); - } + virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0; - /// Implementation of a general-purpose allocation function. - /** - * \param[in] size Number of bytes to allocate. - * \return Pointer to the allocated chunk of memory. - */ - virtual void * alloc(size_t size) - { - if (size == 0) { - return NULL; - } - return std::malloc(size); - } + virtual void + get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; - /// Implementation of a general-purpose free. - /** - * \param[in] Pointer to deallocate. - */ - virtual void free(void * ptr) - { - return std::free(ptr); - } + virtual void + get_next_service(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; + + virtual void + get_next_client(executor::AnyExecutable::SharedPtr any_exec, + const WeakNodeVector & weak_nodes) = 0; - std::vector subs; - std::vector services; - std::vector clients; - std::vector subscription_handles; - std::vector service_handles; - std::vector client_handles; - std::array guard_cond_handles; }; - -MemoryStrategy::SharedPtr create_default_strategy() -{ - return std::make_shared(MemoryStrategy()); -} - } /* memory_strategy */ } /* rclcpp */ diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index a6ebfc3..f47b95c 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -15,13 +15,15 @@ #ifndef RCLCPP_RCLCPP_ALLOCATOR_MEMORY_STRATEGY_HPP_ #define RCLCPP_RCLCPP_ALLOCATOR_MEMORY_STRATEGY_HPP_ +#include +#include + #include #include namespace rclcpp { - namespace memory_strategies { @@ -34,13 +36,16 @@ namespace allocator_memory_strategy * the rmw implementation after the executor waits for work, based on the number of entities that * come through. */ -template +template> class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy { public: RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy); + using WeakNode = std::weak_ptr>; + using NodeVector = std::vector; + using ExecAllocTraits = allocator::AllocRebind; using ExecAlloc = typename ExecAllocTraits::allocator_type; using ExecDeleter = allocator::Deleter; @@ -53,139 +58,415 @@ public: allocator_ = std::make_shared(*allocator.get()); } - /// Borrow memory for storing data for subscriptions, services, clients, or guard conditions. - /** - * The default implementation stores std::vectors for each handle type and resizes the vectors - * as necessary based on the requested number of handles. - * \param[in] The type of entity that this function is requesting for. - * \param[in] The number of handles to borrow. - * \return Pointer to the allocated handles. - */ - virtual void ** borrow_handles(HandleType type, size_t number_of_handles) + AllocatorMemoryStrategy() { - switch (type) { - case HandleType::subscription_handle: - if (subscription_handles.size() < number_of_handles) { - subscription_handles.resize(number_of_handles, 0); - } - return static_cast(subscription_handles.data()); - case HandleType::service_handle: - if (service_handles.size() < number_of_handles) { - service_handles.resize(number_of_handles, 0); - } - return static_cast(service_handles.data()); - case HandleType::client_handle: - if (client_handles.size() < number_of_handles) { - client_handles.resize(number_of_handles, 0); - } - return static_cast(client_handles.data()); - case HandleType::guard_condition_handle: - if (number_of_handles > 2) { - throw std::runtime_error("Too many guard condition handles requested!"); - } - return guard_cond_handles.data(); - default: - throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast(type)) + - ", could not borrow handle memory."); - } + executable_allocator_ = std::make_shared(); + allocator_ = std::make_shared(); } - /// Return the memory borrowed in borrow_handles. - /** - * return_handles should always mirror the way memory was borrowed in borrow_handles. - * \param[in] The type of entity that this function is returning. - * \param[in] Pointer to the handles returned. - */ - virtual void return_handles(HandleType type, void ** handles) - { - switch (type) { - case HandleType::subscription_handle: - if (handles != subscription_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this AllocatorMemoryStrategy"); - } - memset(handles, 0, subscription_handles.size()); - break; - case HandleType::service_handle: - if (handles != service_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this AllocatorMemoryStrategy"); - } - memset(handles, 0, service_handles.size()); - break; - case HandleType::client_handle: - if (handles != client_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this AllocatorMemoryStrategy"); - } - memset(handles, 0, client_handles.size()); - break; - case HandleType::guard_condition_handle: - if (handles != guard_cond_handles.data()) { - throw std::runtime_error( - "tried to return memory that isn't handled by this AllocatorMemoryStrategy"); - } - guard_cond_handles.fill(0); - break; - default: - throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast(type)) + - ", could not borrow handle memory."); + size_t fill_subscriber_handles(void ** ptr) { + size_t max_size = subscriptions_.size(); + if (subscriber_handles_.size() < max_size) { + subscriber_handles_.resize(max_size); } + size_t count = 0; + for (auto & subscription : subscriptions_) { + subscriber_handles_[count++] = subscription->get_subscription_handle()->data; + if (subscription->get_intra_process_subscription_handle()) { + subscriber_handles_[count++] = + subscription->get_intra_process_subscription_handle()->data; + } + } + ptr = static_cast(subscriber_handles_.data()); + return count; } + // return the new number of services + size_t fill_service_handles(void ** ptr) { + size_t max_size = services_.size(); + if (service_handles_.size() < max_size) { + service_handles_.resize(max_size); + } + size_t count = 0; + for (auto & service : services_) { + service_handles_[count++] = service->get_service_handle()->data; + } + ptr = static_cast(service_handles_.data()); + return count; + } + + // return the new number of clients + size_t fill_client_handles(void ** ptr) { + size_t max_size = clients_.size(); + if (client_handles_.size() < max_size) { + client_handles_.resize(max_size); + } + size_t count = 0; + for (auto & client : clients_) { + client_handles_[count++] = client->get_client_handle()->data; + } + ptr = static_cast(client_handles_.data()); + return count; + } + + void clear_active_entities() { + subscriptions_.clear(); + services_.clear(); + clients_.clear(); + } + + void clear_handles() { + subscriber_handles_.clear(); + service_handles_.clear(); + client_handles_.clear(); + } + + bool collect_entities(const NodeVector & weak_nodes) { + bool has_invalid_weak_nodes = false; + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + has_invalid_weak_nodes = false; + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + for (auto & weak_subscription : group->get_subscription_ptrs()) { + auto subscription = weak_subscription.lock(); + if (subscription) { + subscriptions_.push_back(subscription); + } + } + for (auto & service : group->get_service_ptrs()) { + services_.push_back(service); + } + for (auto & client : group->get_client_ptrs()) { + clients_.push_back(client); + } + } + } + return has_invalid_weak_nodes; + } + + /// Provide a newly initialized AnyExecutable object. // \return Shared pointer to the fresh executable. - virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() + executor::AnyExecutable::SharedPtr instantiate_next_executable() { return std::allocate_shared(*executable_allocator_.get()); } - /// Implementation of a general-purpose allocation function. - /** - * \param[in] size Number of bytes to allocate. - * \return Pointer to the allocated chunk of memory. - */ - virtual void * alloc(size_t size) + static rclcpp::subscription::SubscriptionBase::SharedPtr + get_subscription_by_handle(void * subscriber_handle, + const NodeVector & weak_nodes) { - if (size == 0) { - return NULL; + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_subscription : group->get_subscription_ptrs()) { + auto subscription = weak_subscription.lock(); + if (subscription) { + if (subscription->get_subscription_handle()->data == subscriber_handle) { + return subscription; + } + if (subscription->get_intra_process_subscription_handle() && + subscription->get_intra_process_subscription_handle()->data == subscriber_handle) + { + return subscription; + } + } + } + } } - auto ptr = VoidAllocTraits::allocate(*allocator_.get(), size); - alloc_map[ptr] = size; - return ptr; + return nullptr; } - /// Implementation of a general-purpose free. - /** - * \param[in] Pointer to deallocate. - */ - virtual void free(void * ptr) + static rclcpp::service::ServiceBase::SharedPtr + get_service_by_handle(void * service_handle, + const NodeVector & weak_nodes) { - if (alloc_map.count(ptr) == 0) { - // do nothing, the pointer is not in the alloc'd map - return; + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & service : group->get_service_ptrs()) { + if (service->get_service_handle()->data == service_handle) { + return service; + } + } + } } - VoidAllocTraits::deallocate(*allocator_.get(), &ptr, alloc_map[ptr]); + return rclcpp::service::ServiceBase::SharedPtr(); } + static rclcpp::client::ClientBase::SharedPtr + get_client_by_handle(void * client_handle, + const NodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & client : group->get_client_ptrs()) { + if (client->get_client_handle()->data == client_handle) { + return client; + } + } + } + } + return rclcpp::client::ClientBase::SharedPtr(); + } + + static rclcpp::node::Node::SharedPtr + get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, + const NodeVector & weak_nodes) + { + if (!group) { + return rclcpp::node::Node::SharedPtr(); + } + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto callback_group = weak_group.lock(); + if (callback_group == group) { + return node; + } + } + } + return rclcpp::node::Node::SharedPtr(); + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr subscription, + const NodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & weak_sub : group->get_subscription_ptrs()) { + auto sub = weak_sub.lock(); + if (sub == subscription) { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + + + virtual void + get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, + const NodeVector & weak_nodes) + { + auto it = subscriber_handles_.begin(); + while (it != subscriber_handles_.end()) { + auto subscription = get_subscription_by_handle(*it, weak_nodes); + if (subscription) { + // Figure out if this is for intra-process or not. + bool is_intra_process = false; + if (subscription->get_intra_process_subscription_handle()) { + is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it; + } + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_subscription(subscription, weak_nodes); + if (!group) { + // Group was not found, meaning the subscription is not valid... + // Remove it from the ready list and continue looking + subscriber_handles_.erase(it++); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + if (is_intra_process) { + any_exec->subscription_intra_process = subscription; + } else { + any_exec->subscription = subscription; + } + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group, weak_nodes); + subscriber_handles_.erase(it++); + return; + } + // Else, the subscription is no longer valid, remove it and continue + subscriber_handles_.erase(it++); + } + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_service( + rclcpp::service::ServiceBase::SharedPtr service, + const NodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & serv : group->get_service_ptrs()) { + if (serv == service) { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + virtual void + get_next_service(executor::AnyExecutable::SharedPtr any_exec, + const NodeVector & weak_nodes) + { + auto it = service_handles_.begin(); + while (it != service_handles_.end()) { + auto service = get_service_by_handle(*it, weak_nodes); + if (service) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_service(service, weak_nodes); + if (!group) { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + service_handles_.erase(it++); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->service = service; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group, weak_nodes); + service_handles_.erase(it++); + return; + } + // Else, the service is no longer valid, remove it and continue + service_handles_.erase(it++); + } + } + + static rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_client( + rclcpp::client::ClientBase::SharedPtr client, + const NodeVector & weak_nodes) + { + for (auto & weak_node : weak_nodes) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group) { + continue; + } + for (auto & cli : group->get_client_ptrs()) { + if (cli == client) { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + virtual void + get_next_client(executor::AnyExecutable::SharedPtr any_exec, + const NodeVector & weak_nodes) + { + auto it = client_handles_.begin(); + while (it != client_handles_.end()) { + auto client = get_client_by_handle(*it, weak_nodes); + if (client) { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_client(client, weak_nodes); + if (!group) { + // Group was not found, meaning the service is not valid... + // Remove it from the ready list and continue looking + client_handles_.erase(it++); + continue; + } + if (!group->can_be_taken_from().load()) { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + ++it; + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->client = client; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group, weak_nodes); + client_handles_.erase(it++); + return; + } + // Else, the service is no longer valid, remove it and continue + client_handles_.erase(it++); + } + } + + + +private: template using VectorRebind = typename std::allocator_traits::template rebind_alloc; std::vector> subs; + VectorRebind> subscriptions_; std::vector> services; + VectorRebind> services_; std::vector> clients; + VectorRebind> clients_; - std::vector subscription_handles; - std::vector service_handles; - std::vector client_handles; - std::array guard_cond_handles; + std::vector subscriber_handles_; + std::vector service_handles_; + std::vector client_handles_; - std::unordered_map alloc_map; - -private: std::shared_ptr executable_allocator_; std::shared_ptr allocator_; };