refactor of executors for mutually exclusive cb's

This commit is contained in:
William Woodall 2014-08-29 17:49:23 -07:00
parent b5b845153f
commit b91dd6ef00
3 changed files with 403 additions and 155 deletions

View file

@ -40,19 +40,55 @@ public:
RCLCPP_MAKE_SHARED_DEFINITIONS(Executor); RCLCPP_MAKE_SHARED_DEFINITIONS(Executor);
Executor() Executor()
{ : interrupt_guard_condition_(
subscriber_handles_.subscriber_count_ = 0; ros_middleware_interface::create_guard_condition())
subscriber_handles_.subscribers_ = 0; {}
}
~Executor() {} ~Executor() {}
void add_node(rclcpp::node::Node::SharedPtr &node_ptr) void
add_node(rclcpp::node::Node::SharedPtr &node_ptr)
{ {
this->weak_nodes_.push_back(node_ptr); this->weak_nodes_.push_back(node_ptr);
} }
protected: protected:
static void execute_subscription(
struct AnyExecutable
{
AnyExecutable() : subscription(0), timer(0), callback_group(0), node(0) {}
// Either the subscription or the timer will be set, but not both
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::timer::TimerBase::SharedPtr timer;
// These are used to keep the scope on the containing items
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
rclcpp::node::Node::SharedPtr node;
};
void
execute_any_executable(std::shared_ptr<AnyExecutable> &any_exec)
{
if (!any_exec)
{
return;
}
if (any_exec->timer)
{
execute_timer(any_exec->timer);
}
if (any_exec->subscription)
{
execute_subscription(any_exec->subscription);
}
// Reset the callback_group, reguardless of type
any_exec->callback_group->can_be_taken_from_.store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
using ros_middleware_interface::trigger_guard_condition;
trigger_guard_condition(interrupt_guard_condition_);
}
static void
execute_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr &subscription) rclcpp::subscription::SubscriptionBase::SharedPtr &subscription)
{ {
std::shared_ptr<void> message = subscription->create_message(); std::shared_ptr<void> message = subscription->create_message();
@ -71,47 +107,55 @@ protected:
} }
} }
void reset_subscriber_handles() static void
execute_timer(
rclcpp::timer::TimerBase::SharedPtr &timer)
{ {
if (subscriber_handles_.subscriber_count_ != 0) timer->callback_();
{
subscriber_handles_.subscriber_count_ = 0;
std::free(subscriber_handles_.subscribers_);
}
} }
void populate_subscriber_handles_with_node(rclcpp::node::Node &node) /*** Reseting class storage ***/
void
reset_all_handles()
{ {
// Use the number of subscriptions to pre allocate subscriber_handles reset_subscriber_handles();
subscriber_handles_.subscriber_count_ = node.number_of_subscriptions_; reset_guard_condition_handles();
subscriber_handles_.subscribers_ = static_cast<void **>(
std::malloc(sizeof(void *) * node.number_of_subscriptions_));
// Add subscriptions from groups
size_t handles_index = 0;
for (auto weak_group : node.callback_groups_)
{
auto group = weak_group.lock();
if (!group)
{
continue;
}
for (auto subscription : group->subscription_ptrs_)
{
assert(handles_index < node.number_of_subscriptions_);
subscriber_handles_.subscribers_[handles_index] = \
subscription->subscription_handle_.data_;
handles_index += 1;
}
}
assert(handles_index == node.number_of_subscriptions_);
} }
void populate_subscriber_handles() void
reset_subscriber_handles()
{ {
// Calculate the number of subscriptions and create shared_ptrs subscriber_handles_.clear();
size_t number_of_subscriptions = 0; }
std::vector<rclcpp::node::Node::SharedPtr> nodes;
void
reset_guard_condition_handles()
{
guard_condition_handles_.clear();
}
/******************************/
/*** Populating class storage from a single node ***/
void
populate_subscriber_handles_with_node(rclcpp::node::Node &node)
{
// TODO: reimplement
}
/******************************/
/*** Populate class storage from stored weak node pointers and wait. ***/
void
populate_all_handles(bool nonblocking)
{
// Collect the subscriptions and timers to be waited on
bool has_invalid_weak_nodes = false; bool has_invalid_weak_nodes = false;
std::vector<rclcpp::subscription::SubscriptionBase::SharedPtr> subs;
std::vector<rclcpp::timer::TimerBase::SharedPtr> timers;
for (auto &weak_node : weak_nodes_) for (auto &weak_node : weak_nodes_)
{ {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -120,8 +164,22 @@ protected:
has_invalid_weak_nodes = false; has_invalid_weak_nodes = false;
continue; continue;
} }
nodes.push_back(node); for (auto &weak_group : node->callback_groups_)
number_of_subscriptions += node->number_of_subscriptions_; {
auto group = weak_group.lock();
if (!group || group->can_be_taken_from_.load() == false)
{
continue;
}
for (auto &subscription : group->subscription_ptrs_)
{
subs.push_back(subscription);
}
for (auto &timer : group->timer_ptrs_)
{
timers.push_back(timer);
}
}
} }
// Clean up any invalid nodes, if they were detected // Clean up any invalid nodes, if they were detected
if (has_invalid_weak_nodes) if (has_invalid_weak_nodes)
@ -132,61 +190,86 @@ protected:
return i.expired(); return i.expired();
})); }));
} }
// Use the number of subscriptions to pre allocate subscriber_handles // Use the number of subscriptions to allocate memory in the handles
subscriber_handles_.subscriber_count_ = number_of_subscriptions; size_t number_of_subscriptions = subs.size();
subscriber_handles_.subscribers_ = static_cast<void **>( ros_middleware_interface::SubscriberHandles subscriber_handles;
subscriber_handles.subscriber_count_ = number_of_subscriptions;
subscriber_handles.subscribers_ = static_cast<void **>(
std::malloc(sizeof(void *) * number_of_subscriptions)); std::malloc(sizeof(void *) * number_of_subscriptions));
if (subscriber_handles_.subscribers_ == NULL) if (subscriber_handles.subscribers_ == NULL)
{ {
// TODO: Use a different error here? maybe std::bad_alloc? // TODO: Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for subscriber pointers."); throw std::runtime_error("Could not malloc for subscriber pointers.");
} }
// Now fill the subscriber_handles with subscribers from the nodes // Then fill the SubscriberHandles with ready subscriptions
size_t handles_index = 0; size_t subscriber_handle_index = 0;
for (auto &node : nodes) for (auto &subscription : subs)
{ {
for (auto weak_group : node->callback_groups_) subscriber_handles.subscribers_[subscriber_handle_index] = \
subscription->subscription_handle_.data_;
subscriber_handle_index += 1;
}
// Use the number of guard conditions to allocate memory in the handles
// Add 2 to the number for the ctrl-c guard cond and the executor's
size_t start_of_timer_guard_conds = 2;
size_t number_of_guard_conds = timers.size() + start_of_timer_guard_conds;
ros_middleware_interface::GuardConditionHandles guard_condition_handles;
guard_condition_handles.guard_condition_count_ = number_of_guard_conds;
guard_condition_handles.guard_conditions_ = static_cast<void **>(
std::malloc(sizeof(void *) * number_of_guard_conds));
if (guard_condition_handles.guard_conditions_ == NULL)
{
// TODO: Use a different error here? maybe std::bad_alloc?
throw std::runtime_error(
"Could not malloc for guard condition pointers.");
}
// Put the global ctrl-c guard condition in
assert(guard_condition_handles.guard_condition_count_ > 1);
guard_condition_handles.guard_conditions_[0] = \
rclcpp::utilities::get_global_sigint_guard_condition().data_;
// Put the executor's guard condition in
guard_condition_handles.guard_conditions_[1] = \
interrupt_guard_condition_.data_;
// Then fill the SubscriberHandles with ready subscriptions
size_t guard_cond_handle_index = start_of_timer_guard_conds;
for (auto &timer : timers)
{
guard_condition_handles.guard_conditions_[guard_cond_handle_index] = \
timer->guard_condition_.data_;
guard_cond_handle_index += 1;
}
// Now wait on the waitable subscriptions and timers
ros_middleware_interface::wait(subscriber_handles,
guard_condition_handles,
nonblocking);
// Add the new work to the class's list of things waiting to be executed
// Starting with the subscribers
for (size_t i = 0; i < number_of_subscriptions; ++i)
{
void *handle = subscriber_handles.subscribers_[i];
if (handle)
{ {
auto group = weak_group.lock(); subscriber_handles_.push_back(handle);
if (!group)
{
continue;
}
for (auto subscription : group->subscription_ptrs_)
{
assert(handles_index < number_of_subscriptions);
subscriber_handles_.subscribers_[handles_index] = \
subscription->subscription_handle_.data_;
handles_index += 1;
}
} }
} }
} // Then the timers, start with start_of_timer_guard_conds
for (size_t i = start_of_timer_guard_conds; i < number_of_guard_conds; ++i)
struct AnyExecutable
{
AnyExecutable() : subscription(0), guard_condition_handle(0) {}
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
ros_middleware_interface::GuardConditionHandle *guard_condition_handle;
};
std::list<void *> get_ready_subscriber_handles()
{
std::list<void *> ready_subscriber_handles;
for (size_t i = 0; i < subscriber_handles_.subscriber_count_; ++i)
{ {
void *handle = subscriber_handles_.subscribers_[i]; void *handle = guard_condition_handles.guard_conditions_[i];
if (!handle) if (handle)
{ {
continue; guard_condition_handles_.push_back(handle);
} }
ready_subscriber_handles.push_back(handle);
} }
return ready_subscriber_handles; // Make sure to free memory
std::free(subscriber_handles.subscribers_);
std::free(guard_condition_handles.guard_conditions_);
} }
rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription_by_handle( /******************************/
void * subscriber_handle)
rclcpp::subscription::SubscriptionBase::SharedPtr
get_subscription_by_handle(void * subscriber_handle)
{ {
for (auto weak_node : weak_nodes_) for (auto weak_node : weak_nodes_)
{ {
@ -214,91 +297,263 @@ protected:
return rclcpp::subscription::SubscriptionBase::SharedPtr(); return rclcpp::subscription::SubscriptionBase::SharedPtr();
} }
void remove_subscriber_handle_from_subscriber_handles_(void *handle) rclcpp::timer::TimerBase::SharedPtr
get_timer_by_handle(void * guard_condition_handle)
{ {
for (size_t i = 0; i < subscriber_handles_.subscriber_count_; ++i) for (auto weak_node : weak_nodes_)
{ {
if (handle == subscriber_handles_.subscribers_[i]) auto node = weak_node.lock();
if (!node)
{ {
// TODO: use nullptr here? continue;
subscriber_handles_.subscribers_[i] = NULL; }
for (auto weak_group : node->callback_groups_)
{
auto group = weak_group.lock();
if (!group)
{
continue;
}
for (auto timer : group->timer_ptrs_)
{
if (timer->guard_condition_.data_ == guard_condition_handle)
{
return timer;
}
}
} }
} }
return rclcpp::timer::TimerBase::SharedPtr();
} }
rclcpp::subscription::SubscriptionBase::SharedPtr get_next_subscription( void
std::list<void *> &ready_subscriber_handles) remove_subscriber_handle_from_subscriber_handles(void *handle)
{ {
if (ready_subscriber_handles.size() == 0) subscriber_handles_.remove(handle);
}
void
remove_guard_condition_handle_from_guard_condition_handles(void *handle)
{
guard_condition_handles_.remove(handle);
}
rclcpp::node::Node::SharedPtr
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr &group)
{
if (!group)
{ {
return rclcpp::subscription::SubscriptionBase::SharedPtr(); return rclcpp::node::Node::SharedPtr();
} }
while (ready_subscriber_handles.size() != 0) for (auto &weak_node : weak_nodes_)
{
auto node = weak_node.lock();
if (!node)
{
continue;
}
for (auto &weak_group : node->callback_groups_)
{
auto callback_group = weak_group.lock();
if (!callback_group)
{
continue;
}
if (callback_group == group)
{
return node;
}
}
}
return rclcpp::node::Node::SharedPtr();
}
rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_timer(
rclcpp::timer::TimerBase::SharedPtr &timer)
{
for (auto &weak_node : weak_nodes_)
{
auto node = weak_node.lock();
if (!node)
{
continue;
}
for (auto &weak_group : node->callback_groups_)
{
auto group = weak_group.lock();
for (auto &t : group->timer_ptrs_)
{
if (t == timer)
{
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
}
void
get_next_timer(std::shared_ptr<AnyExecutable> &any_exec)
{
for (auto handle : guard_condition_handles_)
{
auto timer = get_timer_by_handle(handle);
if (timer)
{
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_timer(timer);
if (!group)
{
// Group was not found, meaning the timer is not valid...
// Remove it from the ready list and continue looking
remove_guard_condition_handle_from_guard_condition_handles(handle);
continue;
}
if (!group->can_be_taken_from_.load())
{
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->timer = timer;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
remove_guard_condition_handle_from_guard_condition_handles(handle);
return;
}
// Else, the timer is no longer valid, remove it and continue
remove_guard_condition_handle_from_guard_condition_handles(handle);
}
}
rclcpp::callback_group::CallbackGroup::SharedPtr
get_group_by_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr &subscription)
{
for (auto &weak_node : weak_nodes_)
{
auto node = weak_node.lock();
if (!node)
{
continue;
}
for (auto &weak_group : node->callback_groups_)
{
auto group = weak_group.lock();
for (auto &sub : group->subscription_ptrs_)
{
if (sub == subscription)
{
return group;
}
}
}
}
return rclcpp::callback_group::CallbackGroup::SharedPtr();
}
void
get_next_subscription(std::shared_ptr<AnyExecutable> &any_exec)
{
for (auto handle : subscriber_handles_)
{ {
void *handle = ready_subscriber_handles.front();
ready_subscriber_handles.pop_front();
remove_subscriber_handle_from_subscriber_handles_(handle);
auto subscription = get_subscription_by_handle(handle); auto subscription = get_subscription_by_handle(handle);
if (subscription) return subscription; if (subscription)
{
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription);
if (!group)
{
// Group was not found, meaning the subscription is not valid...
// Remove it from the ready list and continue looking
remove_subscriber_handle_from_subscriber_handles(handle);
continue;
}
if (!group->can_be_taken_from_.load())
{
// Group is mutually exclusive and is being used, so skip it for now
// Leave it to be checked next time, but continue searching
continue;
}
// Otherwise it is safe to set and return the any_exec
any_exec->subscription = subscription;
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group);
remove_subscriber_handle_from_subscriber_handles(handle);
return;
}
// Else, the subscription is no longer valid, remove it and continue
remove_subscriber_handle_from_subscriber_handles(handle);
} }
return rclcpp::subscription::SubscriptionBase::SharedPtr();
} }
std::shared_ptr<AnyExecutable> get_next_executable(bool nonblocking=false) std::shared_ptr<AnyExecutable>
get_next_ready_executable()
{ {
namespace rmi = ros_middleware_interface;
// Get any ready subscriber handles out of subscriber_handles_.
// TODO: operate directly on subscriber_handles_ or
// store ready_subscriber_handles to prevent rebuilding it every time
auto ready_subscriber_handles = get_ready_subscriber_handles();
// If there are none
if (ready_subscriber_handles.size() == 0)
{
// Repopulate the subscriber handles and wait on them
reset_subscriber_handles();
populate_subscriber_handles();
// TODO: populate the guard handles correctly
rmi::GuardConditionHandles guard_condition_handles;
guard_condition_handles.guard_condition_count_ = 1;
guard_condition_handles.guard_conditions_ = static_cast<void **>(
std::malloc(sizeof(void *) * 1));
guard_condition_handles.guard_conditions_[0] = \
rclcpp::utilities::get_global_sigint_guard_cond().data_;
// Wait on the handles.
rmi::wait(subscriber_handles_, guard_condition_handles, nonblocking);
std::free(guard_condition_handles.guard_conditions_);
ready_subscriber_handles = get_ready_subscriber_handles();
}
// At this point there should be something in either the subscriber handles
// Or in the guard handles.
// TODO: also check the timers here
std::shared_ptr<AnyExecutable> any_exec(new AnyExecutable()); std::shared_ptr<AnyExecutable> any_exec(new AnyExecutable());
// TODO: actually check the guard handles // Check the timers to see if there are any that are ready, if so return
any_exec->guard_condition_handle = 0; get_next_timer(any_exec);
// If any_exec.guard_condition_handle is valid, return now before checking if (any_exec->timer)
// for valid subscriptions.
if (any_exec->guard_condition_handle)
{ {
return any_exec; return any_exec;
} }
// Check for subscriptions which are ready to be handled // Check the subscriptions to see if there are any that are ready
any_exec->subscription = get_next_subscription(ready_subscriber_handles); get_next_subscription(any_exec);
// If any_exec.subscription is valid, return now
if (any_exec->subscription) if (any_exec->subscription)
{ {
return any_exec; return any_exec;
} }
// If there is neither a ready guard condition nor subscription, return an // If there is neither a ready timer nor subscription, return a null ptr
// empty any_exec anyways any_exec.reset();
return any_exec; return any_exec;
} }
std::shared_ptr<AnyExecutable>
get_next_executable(bool nonblocking=false)
{
namespace rmi = ros_middleware_interface;
// Check to see if there are any subscriptions or timers needing service
// TODO: improve run to run efficiency of this function
auto any_exec = get_next_ready_executable();
// If there are none
if (!any_exec)
{
// Repopulate the subscriber handles and wait on them
populate_all_handles(nonblocking);
// Try again
any_exec = get_next_ready_executable();
}
// At this point any_exec should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr
if (any_exec)
{
// If it is valid, check to see if the group is mutually exclusive or
// not, then mark it accordingly
if (any_exec->callback_group->type_ == \
callback_group::CallbackGroupType::MutuallyExclusive)
{
// It should not have been taken otherwise
assert(any_exec->callback_group->can_be_taken_from_.load() == true);
// Set to false to indicate something is being run from this group
any_exec->callback_group->can_be_taken_from_.store(false);
}
}
return any_exec;
}
ros_middleware_interface::GuardConditionHandle interrupt_guard_condition_;
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
typedef std::list<void*> SubscriberHandles;
SubscriberHandles subscriber_handles_;
typedef std::list<void*> GuardConditionHandles;
GuardConditionHandles guard_condition_handles_;
private: private:
RCLCPP_DISABLE_COPY(Executor); RCLCPP_DISABLE_COPY(Executor);
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
ros_middleware_interface::SubscriberHandles subscriber_handles_;
}; };
} /* executor */ } /* executor */

View file

@ -46,11 +46,6 @@ public:
~MultiThreadedExecutor() {} ~MultiThreadedExecutor() {}
void add_node(rclcpp::node::Node::SharedPtr &node_ptr)
{
this->weak_nodes_.push_back(node_ptr);
}
void spin() void spin()
{ {
std::vector<std::thread> threads; std::vector<std::thread> threads;
@ -59,9 +54,15 @@ public:
{ {
number_of_threads = 1; number_of_threads = 1;
} }
for (; number_of_threads > 0; --number_of_threads)
{ {
threads.emplace_back(std::thread(&MultiThreadedExecutor::run, this)); std::lock_guard<std::mutex> wait_lock(wait_mutex_);
size_t thread_id = 1;
for (; number_of_threads > 0; --number_of_threads)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id++);
threads.emplace_back(func);
}
} }
for (auto &thread : threads) for (auto &thread : threads)
{ {
@ -70,8 +71,9 @@ public:
} }
private: private:
void run() void run(size_t this_thread_id)
{ {
rclcpp::thread_id = this_thread_id;
while (rclcpp::utilities::ok()) while (rclcpp::utilities::ok())
{ {
std::shared_ptr<AnyExecutable> any_exec; std::shared_ptr<AnyExecutable> any_exec;
@ -83,17 +85,12 @@ private:
} }
any_exec = get_next_executable(); any_exec = get_next_executable();
} }
if (any_exec && any_exec->subscription) execute_any_executable(any_exec);
{
// Do callback
execute_subscription(any_exec->subscription);
}
} }
} }
RCLCPP_DISABLE_COPY(MultiThreadedExecutor); RCLCPP_DISABLE_COPY(MultiThreadedExecutor);
std::vector<std::weak_ptr<rclcpp::node::Node>> weak_nodes_;
std::mutex wait_mutex_; std::mutex wait_mutex_;
}; };

View file

@ -51,11 +51,7 @@ public:
while (rclcpp::utilities::ok()) while (rclcpp::utilities::ok())
{ {
auto any_exec = get_next_executable(); auto any_exec = get_next_executable();
if (any_exec->subscription) execute_any_executable(any_exec);
{
// Do callback
execute_subscription(any_exec->subscription);
}
} }
} }