move a lot to memory strategy

This commit is contained in:
Jackie Kay 2015-10-26 18:29:53 -07:00
parent c663d892a4
commit 06818ee78c
4 changed files with 478 additions and 245 deletions

View file

@ -27,7 +27,7 @@
#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/memory_strategy.hpp>
#include <rclcpp/memory_strategies.hpp>
#include <rclcpp/node.hpp>
#include <rclcpp/utilities.hpp>
@ -343,10 +343,17 @@ protected:
void
wait_for_work(std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t, T>(-1))
{
/*
memory_strategy_->subs.clear();
memory_strategy_->services.clear();
memory_strategy_->clients.clear();
*/
memory_strategy_->clear_active_entities();
// Collect the subscriptions and timers to be waited on
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
/*
bool has_invalid_weak_nodes = false;
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
@ -373,6 +380,8 @@ protected:
}
}
}
*/
// Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) {
weak_nodes_.erase(
@ -382,7 +391,22 @@ protected:
return i.expired();
}));
}
// Use the number of subscriptions to allocate memory in the handles
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count =
memory_strategy_->fill_subscriber_handles(subscriber_handles.subscribers);
rmw_services_t service_handles;
service_handles.service_count =
memory_strategy_->fill_service_handles(service_handles.services);
rmw_clients_t client_handles;
client_handles.client_count =
memory_strategy_->fill_client_handles(client_handles.clients);
/*
size_t max_number_of_subscriptions = memory_strategy_->subs.size() * 2; // Times two for intra-process.
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count = 0;
@ -442,14 +466,16 @@ protected:
client->get_client_handle()->data;
client_handle_index += 1;
}
*/
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
size_t number_of_guard_conds = 2;
rmw_guard_conditions_t guard_condition_handles;
guard_condition_handles.guard_condition_count = number_of_guard_conds;
guard_condition_handles.guard_conditions =
memory_strategy_->borrow_handles(HandleType::guard_condition_handle, number_of_guard_conds);
/*guard_condition_handles.guard_conditions =
memory_strategy_->borrow_handles(HandleType::guard_condition_handle, number_of_guard_conds);*/
guard_condition_handles.guard_conditions = static_cast<void**>(guard_cond_handles_.data());
if (guard_condition_handles.guard_conditions == NULL &&
number_of_guard_conds > 0)
{
@ -500,6 +526,7 @@ protected:
// If ctrl-c guard condition, return directly
if (guard_condition_handles.guard_conditions[0] != 0) {
// Make sure to free or clean memory
/*
memory_strategy_->return_handles(HandleType::subscription_handle,
subscriber_handles.subscribers);
memory_strategy_->return_handles(HandleType::service_handle,
@ -508,10 +535,13 @@ protected:
client_handles.clients);
memory_strategy_->return_handles(HandleType::guard_condition_handle,
guard_condition_handles.guard_conditions);
*/
memory_strategy_->clear_handles();
return;
}
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers
/*
for (size_t i = 0; i < subscriber_handles.subscriber_count; ++i) {
void * handle = subscriber_handles.subscribers[i];
if (handle) {
@ -545,10 +575,13 @@ protected:
memory_strategy_->subs.clear();
memory_strategy_->services.clear();
memory_strategy_->clients.clear();
*/
memory_strategy_->clear_active_entities();
}
/******************************/
/*
rclcpp::subscription::SubscriptionBase::SharedPtr
get_subscription_by_handle(void * subscriber_handle)
{
@ -579,6 +612,7 @@ protected:
}
return nullptr;
}
*/
rclcpp::service::ServiceBase::SharedPtr
get_service_by_handle(void * service_handle)
@ -729,6 +763,7 @@ protected:
return latest;
}
/*
rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription)
@ -909,28 +944,29 @@ protected:
client_handles_.erase(it++);
}
}
*/
AnyExecutable::SharedPtr
get_next_ready_executable()
{
auto any_exec = this->memory_strategy_->instantiate_next_executable();
auto any_exec = memory_strategy_->instantiate_next_executable();
// Check the timers to see if there are any that are ready, if so return
get_next_timer(any_exec);
if (any_exec->timer) {
return any_exec;
}
// Check the subscriptions to see if there are any that are ready
get_next_subscription(any_exec);
memory_strategy_->get_next_subscription(any_exec, weak_nodes_);
if (any_exec->subscription || any_exec->subscription_intra_process) {
return any_exec;
}
// Check the services to see if there are any that are ready
get_next_service(any_exec);
memory_strategy_->get_next_service(any_exec, weak_nodes_);
if (any_exec->service) {
return any_exec;
}
// Check the clients to see if there are any that are ready
get_next_client(any_exec);
memory_strategy_->get_next_client(any_exec, weak_nodes_);
if (any_exec->client) {
return any_exec;
}
@ -980,12 +1016,15 @@ private:
RCLCPP_DISABLE_COPY(Executor);
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
std::array<void*, 2> guard_cond_handles_;
/*
using SubscriberHandles = std::list<void *>;
SubscriberHandles subscriber_handles_;
using ServiceHandles = std::list<void *>;
ServiceHandles service_handles_;
using ClientHandles = std::list<void *>;
ClientHandles client_handles_;
*/
};
} /* executor */

View file

@ -15,18 +15,27 @@
#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_
#define RCLCPP_RCLCPP_MEMORY_STRATEGIES_HPP_
#include <rclcpp/strategies/heap_pool_memory_strategy.hpp>
#include <rclcpp/strategies/stack_pool_memory_strategy.hpp>
//#include <rclcpp/strategies/heap_pool_memory_strategy.hpp>
//#include <rclcpp/strategies/stack_pool_memory_strategy.hpp>
#include <rclcpp/strategies/allocator_memory_strategy.hpp>
namespace rclcpp
{
namespace memory_strategies
{
using rclcpp::memory_strategies::heap_pool_memory_strategy::HeapPoolMemoryStrategy;
using rclcpp::memory_strategies::stack_pool_memory_strategy::StackPoolMemoryStrategy;
using rclcpp::memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy;
//using rclcpp::memory_strategies::heap_pool_memory_strategy::HeapPoolMemoryStrategy;
//using rclcpp::memory_strategies::stack_pool_memory_strategy::StackPoolMemoryStrategy;
} /* memory_strategies */
namespace memory_strategy {
MemoryStrategy::SharedPtr create_default_strategy() {
return std::make_shared<memory_strategies::allocator_memory_strategy::AllocatorMemoryStrategy<>>();
}
}
} /* rclcpp */
#endif

View file

@ -14,19 +14,17 @@
#ifndef RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_MEMORY_STRATEGY_HPP_
#include <memory>
#include <vector>
#include <rclcpp/any_executable.hpp>
#include <rclcpp/macros.hpp>
#include <rclcpp/node.hpp>
namespace rclcpp
{
// TODO move HandleType somewhere where it makes sense
enum class HandleType {subscription_handle, service_handle, client_handle, guard_condition_handle};
namespace executor
{
class Executor;
}
namespace memory_strategy
{
@ -38,137 +36,43 @@ namespace memory_strategy
*/
class MemoryStrategy
{
friend class executor::Executor;
public:
RCLCPP_SMART_PTR_DEFINITIONS(MemoryStrategy);
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy);
using WeakNodeVector = std::vector<std::weak_ptr<rclcpp::node::Node>>>;
/// Borrow memory for storing data for subscriptions, services, clients, or guard conditions.
/**
* The default implementation stores std::vectors for each handle type and resizes the vectors
* as necessary based on the requested number of handles.
* \param[in] The type of entity that this function is requesting for.
* \param[in] The number of handles to borrow.
* \return Pointer to the allocated handles.
*/
virtual void ** borrow_handles(HandleType type, size_t number_of_handles)
{
switch (type) {
case HandleType::subscription_handle:
if (subscription_handles.size() < number_of_handles) {
subscription_handles.resize(number_of_handles, 0);
}
return static_cast<void **>(subscription_handles.data());
case HandleType::service_handle:
if (service_handles.size() < number_of_handles) {
service_handles.resize(number_of_handles, 0);
}
return static_cast<void **>(service_handles.data());
case HandleType::client_handle:
if (client_handles.size() < number_of_handles) {
client_handles.resize(number_of_handles, 0);
}
return static_cast<void **>(client_handles.data());
case HandleType::guard_condition_handle:
if (number_of_handles > 2) {
throw std::runtime_error("Too many guard condition handles requested!");
}
return guard_cond_handles.data();
default:
throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast<int>(type)) +
", could not borrow handle memory.");
}
}
// return the new number of subscribers
virtual size_t fill_subscriber_handles(void ** ptr) = 0;
/// Return the memory borrowed in borrow_handles.
/**
* return_handles should always mirror the way memory was borrowed in borrow_handles.
* \param[in] The type of entity that this function is returning.
* \param[in] Pointer to the handles returned.
*/
virtual void return_handles(HandleType type, void ** handles)
{
switch (type) {
case HandleType::subscription_handle:
if (handles != subscription_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this MemoryStrategy");
}
memset(handles, 0, subscription_handles.size());
break;
case HandleType::service_handle:
if (handles != service_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this MemoryStrategy");
}
memset(handles, 0, service_handles.size());
break;
case HandleType::client_handle:
if (handles != client_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this MemoryStrategy");
}
memset(handles, 0, client_handles.size());
break;
case HandleType::guard_condition_handle:
if (handles != guard_cond_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this MemoryStrategy");
}
guard_cond_handles.fill(0);
break;
default:
throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast<int>(type)) +
", could not borrow handle memory.");
}
}
// return the new number of services
virtual size_t fill_service_handles(void ** ptr) = 0;
// return the new number of clients
virtual size_t fill_client_handles(void ** ptr) = 0;
virtual void clear_active_entities() = 0;
virtual void clear_handles() = 0;
virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0;
/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
virtual executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
return std::make_shared<executor::AnyExecutable>();
}
virtual executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0;
/// Implementation of a general-purpose allocation function.
/**
* \param[in] size Number of bytes to allocate.
* \return Pointer to the allocated chunk of memory.
*/
virtual void * alloc(size_t size)
{
if (size == 0) {
return NULL;
}
return std::malloc(size);
}
virtual void
get_next_subscription(executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;
/// Implementation of a general-purpose free.
/**
* \param[in] Pointer to deallocate.
*/
virtual void free(void * ptr)
{
return std::free(ptr);
}
virtual void
get_next_service(executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;
virtual void
get_next_client(executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr> subs;
std::vector<rclcpp::service::ServiceBase::SharedPtr> services;
std::vector<rclcpp::client::ClientBase::SharedPtr> clients;
std::vector<void *> subscription_handles;
std::vector<void *> service_handles;
std::vector<void *> client_handles;
std::array<void *, 2> guard_cond_handles;
};
MemoryStrategy::SharedPtr create_default_strategy()
{
return std::make_shared<MemoryStrategy>(MemoryStrategy());
}
} /* memory_strategy */
} /* rclcpp */

View file

@ -15,13 +15,15 @@
#ifndef RCLCPP_RCLCPP_ALLOCATOR_MEMORY_STRATEGY_HPP_
#define RCLCPP_RCLCPP_ALLOCATOR_MEMORY_STRATEGY_HPP_
#include <memory>
#include <vector>
#include <rclcpp/allocator/allocator_common.hpp>
#include <rclcpp/memory_strategy.hpp>
namespace rclcpp
{
namespace memory_strategies
{
@ -34,13 +36,16 @@ namespace allocator_memory_strategy
* the rmw implementation after the executor waits for work, based on the number of entities that
* come through.
*/
template<typename Alloc>
template<typename Alloc = std::allocator<void>>
class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(AllocatorMemoryStrategy<Alloc>);
using WeakNode = std::weak_ptr<rclcpp::node::Node>>;
using NodeVector = std::vector<WeakNode>;
using ExecAllocTraits = allocator::AllocRebind<executor::AnyExecutable, Alloc>;
using ExecAlloc = typename ExecAllocTraits::allocator_type;
using ExecDeleter = allocator::Deleter<ExecAlloc, executor::AnyExecutable>;
@ -53,139 +58,415 @@ public:
allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
}
/// Borrow memory for storing data for subscriptions, services, clients, or guard conditions.
/**
* The default implementation stores std::vectors for each handle type and resizes the vectors
* as necessary based on the requested number of handles.
* \param[in] The type of entity that this function is requesting for.
* \param[in] The number of handles to borrow.
* \return Pointer to the allocated handles.
*/
virtual void ** borrow_handles(HandleType type, size_t number_of_handles)
AllocatorMemoryStrategy()
{
switch (type) {
case HandleType::subscription_handle:
if (subscription_handles.size() < number_of_handles) {
subscription_handles.resize(number_of_handles, 0);
}
return static_cast<void **>(subscription_handles.data());
case HandleType::service_handle:
if (service_handles.size() < number_of_handles) {
service_handles.resize(number_of_handles, 0);
}
return static_cast<void **>(service_handles.data());
case HandleType::client_handle:
if (client_handles.size() < number_of_handles) {
client_handles.resize(number_of_handles, 0);
}
return static_cast<void **>(client_handles.data());
case HandleType::guard_condition_handle:
if (number_of_handles > 2) {
throw std::runtime_error("Too many guard condition handles requested!");
}
return guard_cond_handles.data();
default:
throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast<int>(type)) +
", could not borrow handle memory.");
}
executable_allocator_ = std::make_shared<ExecAlloc>();
allocator_ = std::make_shared<VoidAlloc>();
}
/// Return the memory borrowed in borrow_handles.
/**
* return_handles should always mirror the way memory was borrowed in borrow_handles.
* \param[in] The type of entity that this function is returning.
* \param[in] Pointer to the handles returned.
*/
virtual void return_handles(HandleType type, void ** handles)
{
switch (type) {
case HandleType::subscription_handle:
if (handles != subscription_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this AllocatorMemoryStrategy");
}
memset(handles, 0, subscription_handles.size());
break;
case HandleType::service_handle:
if (handles != service_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this AllocatorMemoryStrategy");
}
memset(handles, 0, service_handles.size());
break;
case HandleType::client_handle:
if (handles != client_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this AllocatorMemoryStrategy");
}
memset(handles, 0, client_handles.size());
break;
case HandleType::guard_condition_handle:
if (handles != guard_cond_handles.data()) {
throw std::runtime_error(
"tried to return memory that isn't handled by this AllocatorMemoryStrategy");
}
guard_cond_handles.fill(0);
break;
default:
throw std::runtime_error("Unknown HandleType " + std::to_string(static_cast<int>(type)) +
", could not borrow handle memory.");
size_t fill_subscriber_handles(void ** ptr) {
size_t max_size = subscriptions_.size();
if (subscriber_handles_.size() < max_size) {
subscriber_handles_.resize(max_size);
}
size_t count = 0;
for (auto & subscription : subscriptions_) {
subscriber_handles_[count++] = subscription->get_subscription_handle()->data;
if (subscription->get_intra_process_subscription_handle()) {
subscriber_handles_[count++] =
subscription->get_intra_process_subscription_handle()->data;
}
}
ptr = static_cast<void**>(subscriber_handles_.data());
return count;
}
// return the new number of services
size_t fill_service_handles(void ** ptr) {
size_t max_size = services_.size();
if (service_handles_.size() < max_size) {
service_handles_.resize(max_size);
}
size_t count = 0;
for (auto & service : services_) {
service_handles_[count++] = service->get_service_handle()->data;
}
ptr = static_cast<void**>(service_handles_.data());
return count;
}
// return the new number of clients
size_t fill_client_handles(void ** ptr) {
size_t max_size = clients_.size();
if (client_handles_.size() < max_size) {
client_handles_.resize(max_size);
}
size_t count = 0;
for (auto & client : clients_) {
client_handles_[count++] = client->get_client_handle()->data;
}
ptr = static_cast<void**>(client_handles_.data());
return count;
}
void clear_active_entities() {
subscriptions_.clear();
services_.clear();
clients_.clear();
}
void clear_handles() {
subscriber_handles_.clear();
service_handles_.clear();
client_handles_.clear();
}
bool collect_entities(const NodeVector & weak_nodes) {
bool has_invalid_weak_nodes = false;
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
has_invalid_weak_nodes = false;
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from().load()) {
continue;
}
for (auto & weak_subscription : group->get_subscription_ptrs()) {
auto subscription = weak_subscription.lock();
if (subscription) {
subscriptions_.push_back(subscription);
}
}
for (auto & service : group->get_service_ptrs()) {
services_.push_back(service);
}
for (auto & client : group->get_client_ptrs()) {
clients_.push_back(client);
}
}
}
return has_invalid_weak_nodes;
}
/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
virtual executor::AnyExecutable::SharedPtr instantiate_next_executable()
executor::AnyExecutable::SharedPtr instantiate_next_executable()
{
return std::allocate_shared<executor::AnyExecutable>(*executable_allocator_.get());
}
/// Implementation of a general-purpose allocation function.
/**
* \param[in] size Number of bytes to allocate.
* \return Pointer to the allocated chunk of memory.
*/
virtual void * alloc(size_t size)
static rclcpp::subscription::SubscriptionBase::SharedPtr
get_subscription_by_handle(void * subscriber_handle,
const NodeVector & weak_nodes)
{
if (size == 0) {
return NULL;
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & weak_subscription : group->get_subscription_ptrs()) {
auto subscription = weak_subscription.lock();
if (subscription) {
if (subscription->get_subscription_handle()->data == subscriber_handle) {
return subscription;
}
if (subscription->get_intra_process_subscription_handle() &&
subscription->get_intra_process_subscription_handle()->data == subscriber_handle)
{
return subscription;
}
}
}
}
}
auto ptr = VoidAllocTraits::allocate(*allocator_.get(), size);
alloc_map[ptr] = size;
return ptr;
return nullptr;
}
/// Implementation of a general-purpose free.
/**
* \param[in] Pointer to deallocate.
*/
virtual void free(void * ptr)
static rclcpp::service::ServiceBase::SharedPtr
get_service_by_handle(void * service_handle,
const NodeVector & weak_nodes)
{
if (alloc_map.count(ptr) == 0) {
// do nothing, the pointer is not in the alloc'd map
return;
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & service : group->get_service_ptrs()) {
if (service->get_service_handle()->data == service_handle) {
return service;
}
}
}
}
VoidAllocTraits::deallocate(*allocator_.get(), &ptr, alloc_map[ptr]);
return rclcpp::service::ServiceBase::SharedPtr();
}
static rclcpp::client::ClientBase::SharedPtr
get_client_by_handle(void * client_handle,
const NodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & client : group->get_client_ptrs()) {
if (client->get_client_handle()->data == client_handle) {
return client;
}
}
}
}
return rclcpp::client::ClientBase::SharedPtr();
}
static rclcpp::node::Node::SharedPtr
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group,
const NodeVector & weak_nodes)
{
if (!group) {
return rclcpp::node::Node::SharedPtr();
}
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto callback_group = weak_group.lock();
if (callback_group == group) {
return node;
}
}
}
return rclcpp::node::Node::SharedPtr();
}
static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription,
const NodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & weak_sub : group->get_subscription_ptrs()) {
auto sub = weak_sub.lock();
if (sub == subscription) {
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
}
virtual void
get_next_subscription(executor::AnyExecutable::SharedPtr any_exec,
const NodeVector & weak_nodes)
{
auto it = subscriber_handles_.begin();
while (it != subscriber_handles_.end()) {
auto subscription = get_subscription_by_handle(*it, weak_nodes);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->get_intra_process_subscription_handle()) {
is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription, weak_nodes);
if (!group) {
// Group was not found, meaning the subscription is not valid...
// Remove it from the ready list and continue looking
subscriber_handles_.erase(it++);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
if (is_intra_process) {
any_exec->subscription_intra_process = subscription;
} else {
any_exec->subscription = subscription;
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes);
subscriber_handles_.erase(it++);
return;
}
// Else, the subscription is no longer valid, remove it and continue
subscriber_handles_.erase(it++);
}
}
static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_service(
rclcpp::service::ServiceBase::SharedPtr service,
const NodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & serv : group->get_service_ptrs()) {
if (serv == service) {
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
}
virtual void
get_next_service(executor::AnyExecutable::SharedPtr any_exec,
const NodeVector & weak_nodes)
{
auto it = service_handles_.begin();
while (it != service_handles_.end()) {
auto service = get_service_by_handle(*it, weak_nodes);
if (service) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_service(service, weak_nodes);
if (!group) {
// Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking
service_handles_.erase(it++);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->service = service;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes);
service_handles_.erase(it++);
return;
}
// Else, the service is no longer valid, remove it and continue
service_handles_.erase(it++);
}
}
static rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_client(
rclcpp::client::ClientBase::SharedPtr client,
const NodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group) {
continue;
}
for (auto & cli : group->get_client_ptrs()) {
if (cli == client) {
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
}
virtual void
get_next_client(executor::AnyExecutable::SharedPtr any_exec,
const NodeVector & weak_nodes)
{
auto it = client_handles_.begin();
while (it != client_handles_.end()) {
auto client = get_client_by_handle(*it, weak_nodes);
if (client) {
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_client(client, weak_nodes);
if (!group) {
// Group was not found, meaning the service is not valid...
// Remove it from the ready list and continue looking
client_handles_.erase(it++);
continue;
}
if (!group->can_be_taken_from().load()) {
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
++it;
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->client = client;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes);
client_handles_.erase(it++);
return;
}
// Else, the service is no longer valid, remove it and continue
client_handles_.erase(it++);
}
}
private:
template<typename U>
using VectorRebind = typename std::allocator_traits<Alloc>::template rebind_alloc<U>;
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr,
VectorRebind<rclcpp::subscription::SubscriptionBase::SharedPtr>> subs;
VectorRebind<rclcpp::subscription::SubscriptionBase::SharedPtr>> subscriptions_;
std::vector<rclcpp::service::ServiceBase::SharedPtr,
VectorRebind<rclcpp::service::ServiceBase::SharedPtr>> services;
VectorRebind<rclcpp::service::ServiceBase::SharedPtr>> services_;
std::vector<rclcpp::client::ClientBase::SharedPtr,
VectorRebind<rclcpp::client::ClientBase::SharedPtr>> clients;
VectorRebind<rclcpp::client::ClientBase::SharedPtr>> clients_;
std::vector<void *, VoidAlloc> subscription_handles;
std::vector<void *, VoidAlloc> service_handles;
std::vector<void *, VoidAlloc> client_handles;
std::array<void *, 2> guard_cond_handles;
std::vector<void *, VoidAlloc> subscriber_handles_;
std::vector<void *, VoidAlloc> service_handles_;
std::vector<void *, VoidAlloc> client_handles_;
std::unordered_map<void *, size_t> alloc_map;
private:
std::shared_ptr<ExecAlloc> executable_allocator_;
std::shared_ptr<VoidAlloc> allocator_;
};