diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 89216c1..44ee554 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -40,19 +40,55 @@ public: RCLCPP_MAKE_SHARED_DEFINITIONS(Executor); Executor() - { - subscriber_handles_.subscriber_count_ = 0; - subscriber_handles_.subscribers_ = 0; - } + : interrupt_guard_condition_( + ros_middleware_interface::create_guard_condition()) + {} ~Executor() {} - void add_node(rclcpp::node::Node::SharedPtr &node_ptr) + void + add_node(rclcpp::node::Node::SharedPtr &node_ptr) { this->weak_nodes_.push_back(node_ptr); } protected: - static void execute_subscription( + + struct AnyExecutable + { + AnyExecutable() : subscription(0), timer(0), callback_group(0), node(0) {} + // Either the subscription or the timer will be set, but not both + rclcpp::subscription::SubscriptionBase::SharedPtr subscription; + rclcpp::timer::TimerBase::SharedPtr timer; + // These are used to keep the scope on the containing items + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; + rclcpp::node::Node::SharedPtr node; + }; + + void + execute_any_executable(std::shared_ptr &any_exec) + { + if (!any_exec) + { + return; + } + if (any_exec->timer) + { + execute_timer(any_exec->timer); + } + if (any_exec->subscription) + { + execute_subscription(any_exec->subscription); + } + // Reset the callback_group, reguardless of type + any_exec->callback_group->can_be_taken_from_.store(true); + // Wake the wait, because it may need to be recalculated or work that + // was previously blocked is now available. + using ros_middleware_interface::trigger_guard_condition; + trigger_guard_condition(interrupt_guard_condition_); + } + + static void + execute_subscription( rclcpp::subscription::SubscriptionBase::SharedPtr &subscription) { std::shared_ptr message = subscription->create_message(); @@ -71,47 +107,55 @@ protected: } } - void reset_subscriber_handles() + static void + execute_timer( + rclcpp::timer::TimerBase::SharedPtr &timer) { - if (subscriber_handles_.subscriber_count_ != 0) - { - subscriber_handles_.subscriber_count_ = 0; - std::free(subscriber_handles_.subscribers_); - } + timer->callback_(); } - void populate_subscriber_handles_with_node(rclcpp::node::Node &node) +/*** Reseting class storage ***/ + + void + reset_all_handles() { - // Use the number of subscriptions to pre allocate subscriber_handles - subscriber_handles_.subscriber_count_ = node.number_of_subscriptions_; - subscriber_handles_.subscribers_ = static_cast( - std::malloc(sizeof(void *) * node.number_of_subscriptions_)); - // Add subscriptions from groups - size_t handles_index = 0; - for (auto weak_group : node.callback_groups_) - { - auto group = weak_group.lock(); - if (!group) - { - continue; - } - for (auto subscription : group->subscription_ptrs_) - { - assert(handles_index < node.number_of_subscriptions_); - subscriber_handles_.subscribers_[handles_index] = \ - subscription->subscription_handle_.data_; - handles_index += 1; - } - } - assert(handles_index == node.number_of_subscriptions_); + reset_subscriber_handles(); + reset_guard_condition_handles(); } - void populate_subscriber_handles() + void + reset_subscriber_handles() { - // Calculate the number of subscriptions and create shared_ptrs - size_t number_of_subscriptions = 0; - std::vector nodes; + subscriber_handles_.clear(); + } + + void + reset_guard_condition_handles() + { + guard_condition_handles_.clear(); + } + +/******************************/ + +/*** Populating class storage from a single node ***/ + + void + populate_subscriber_handles_with_node(rclcpp::node::Node &node) + { + // TODO: reimplement + } + +/******************************/ + +/*** Populate class storage from stored weak node pointers and wait. ***/ + + void + populate_all_handles(bool nonblocking) + { + // Collect the subscriptions and timers to be waited on bool has_invalid_weak_nodes = false; + std::vector subs; + std::vector timers; for (auto &weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -120,8 +164,22 @@ protected: has_invalid_weak_nodes = false; continue; } - nodes.push_back(node); - number_of_subscriptions += node->number_of_subscriptions_; + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + if (!group || group->can_be_taken_from_.load() == false) + { + continue; + } + for (auto &subscription : group->subscription_ptrs_) + { + subs.push_back(subscription); + } + for (auto &timer : group->timer_ptrs_) + { + timers.push_back(timer); + } + } } // Clean up any invalid nodes, if they were detected if (has_invalid_weak_nodes) @@ -132,61 +190,86 @@ protected: return i.expired(); })); } - // Use the number of subscriptions to pre allocate subscriber_handles - subscriber_handles_.subscriber_count_ = number_of_subscriptions; - subscriber_handles_.subscribers_ = static_cast( + // Use the number of subscriptions to allocate memory in the handles + size_t number_of_subscriptions = subs.size(); + ros_middleware_interface::SubscriberHandles subscriber_handles; + subscriber_handles.subscriber_count_ = number_of_subscriptions; + subscriber_handles.subscribers_ = static_cast( std::malloc(sizeof(void *) * number_of_subscriptions)); - if (subscriber_handles_.subscribers_ == NULL) + if (subscriber_handles.subscribers_ == NULL) { // TODO: Use a different error here? maybe std::bad_alloc? throw std::runtime_error("Could not malloc for subscriber pointers."); } - // Now fill the subscriber_handles with subscribers from the nodes - size_t handles_index = 0; - for (auto &node : nodes) + // Then fill the SubscriberHandles with ready subscriptions + size_t subscriber_handle_index = 0; + for (auto &subscription : subs) { - for (auto weak_group : node->callback_groups_) + subscriber_handles.subscribers_[subscriber_handle_index] = \ + subscription->subscription_handle_.data_; + subscriber_handle_index += 1; + } + // Use the number of guard conditions to allocate memory in the handles + // Add 2 to the number for the ctrl-c guard cond and the executor's + size_t start_of_timer_guard_conds = 2; + size_t number_of_guard_conds = timers.size() + start_of_timer_guard_conds; + ros_middleware_interface::GuardConditionHandles guard_condition_handles; + guard_condition_handles.guard_condition_count_ = number_of_guard_conds; + guard_condition_handles.guard_conditions_ = static_cast( + std::malloc(sizeof(void *) * number_of_guard_conds)); + if (guard_condition_handles.guard_conditions_ == NULL) + { + // TODO: Use a different error here? maybe std::bad_alloc? + throw std::runtime_error( + "Could not malloc for guard condition pointers."); + } + // Put the global ctrl-c guard condition in + assert(guard_condition_handles.guard_condition_count_ > 1); + guard_condition_handles.guard_conditions_[0] = \ + rclcpp::utilities::get_global_sigint_guard_condition().data_; + // Put the executor's guard condition in + guard_condition_handles.guard_conditions_[1] = \ + interrupt_guard_condition_.data_; + // Then fill the SubscriberHandles with ready subscriptions + size_t guard_cond_handle_index = start_of_timer_guard_conds; + for (auto &timer : timers) + { + guard_condition_handles.guard_conditions_[guard_cond_handle_index] = \ + timer->guard_condition_.data_; + guard_cond_handle_index += 1; + } + // Now wait on the waitable subscriptions and timers + ros_middleware_interface::wait(subscriber_handles, + guard_condition_handles, + nonblocking); + // Add the new work to the class's list of things waiting to be executed + // Starting with the subscribers + for (size_t i = 0; i < number_of_subscriptions; ++i) + { + void *handle = subscriber_handles.subscribers_[i]; + if (handle) { - auto group = weak_group.lock(); - if (!group) - { - continue; - } - for (auto subscription : group->subscription_ptrs_) - { - assert(handles_index < number_of_subscriptions); - subscriber_handles_.subscribers_[handles_index] = \ - subscription->subscription_handle_.data_; - handles_index += 1; - } + subscriber_handles_.push_back(handle); } } - } - - struct AnyExecutable - { - AnyExecutable() : subscription(0), guard_condition_handle(0) {} - rclcpp::subscription::SubscriptionBase::SharedPtr subscription; - ros_middleware_interface::GuardConditionHandle *guard_condition_handle; - }; - - std::list get_ready_subscriber_handles() - { - std::list ready_subscriber_handles; - for (size_t i = 0; i < subscriber_handles_.subscriber_count_; ++i) + // Then the timers, start with start_of_timer_guard_conds + for (size_t i = start_of_timer_guard_conds; i < number_of_guard_conds; ++i) { - void *handle = subscriber_handles_.subscribers_[i]; - if (!handle) + void *handle = guard_condition_handles.guard_conditions_[i]; + if (handle) { - continue; + guard_condition_handles_.push_back(handle); } - ready_subscriber_handles.push_back(handle); } - return ready_subscriber_handles; + // Make sure to free memory + std::free(subscriber_handles.subscribers_); + std::free(guard_condition_handles.guard_conditions_); } - rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription_by_handle( - void * subscriber_handle) +/******************************/ + + rclcpp::subscription::SubscriptionBase::SharedPtr + get_subscription_by_handle(void * subscriber_handle) { for (auto weak_node : weak_nodes_) { @@ -214,91 +297,263 @@ protected: return rclcpp::subscription::SubscriptionBase::SharedPtr(); } - void remove_subscriber_handle_from_subscriber_handles_(void *handle) + rclcpp::timer::TimerBase::SharedPtr + get_timer_by_handle(void * guard_condition_handle) { - for (size_t i = 0; i < subscriber_handles_.subscriber_count_; ++i) + for (auto weak_node : weak_nodes_) { - if (handle == subscriber_handles_.subscribers_[i]) + auto node = weak_node.lock(); + if (!node) { - // TODO: use nullptr here? - subscriber_handles_.subscribers_[i] = NULL; + continue; + } + for (auto weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + if (!group) + { + continue; + } + for (auto timer : group->timer_ptrs_) + { + if (timer->guard_condition_.data_ == guard_condition_handle) + { + return timer; + } + } } } + return rclcpp::timer::TimerBase::SharedPtr(); } - rclcpp::subscription::SubscriptionBase::SharedPtr get_next_subscription( - std::list &ready_subscriber_handles) + void + remove_subscriber_handle_from_subscriber_handles(void *handle) { - if (ready_subscriber_handles.size() == 0) + subscriber_handles_.remove(handle); + } + + void + remove_guard_condition_handle_from_guard_condition_handles(void *handle) + { + guard_condition_handles_.remove(handle); + } + + rclcpp::node::Node::SharedPtr + get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr &group) + { + if (!group) { - return rclcpp::subscription::SubscriptionBase::SharedPtr(); + return rclcpp::node::Node::SharedPtr(); } - while (ready_subscriber_handles.size() != 0) + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto callback_group = weak_group.lock(); + if (!callback_group) + { + continue; + } + if (callback_group == group) + { + return node; + } + } + } + return rclcpp::node::Node::SharedPtr(); + } + + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_timer( + rclcpp::timer::TimerBase::SharedPtr &timer) + { + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + for (auto &t : group->timer_ptrs_) + { + if (t == timer) + { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_timer(std::shared_ptr &any_exec) + { + for (auto handle : guard_condition_handles_) + { + auto timer = get_timer_by_handle(handle); + if (timer) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_timer(timer); + if (!group) + { + // Group was not found, meaning the timer is not valid... + // Remove it from the ready list and continue looking + remove_guard_condition_handle_from_guard_condition_handles(handle); + continue; + } + if (!group->can_be_taken_from_.load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->timer = timer; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group); + remove_guard_condition_handle_from_guard_condition_handles(handle); + return; + } + // Else, the timer is no longer valid, remove it and continue + remove_guard_condition_handle_from_guard_condition_handles(handle); + } + } + + rclcpp::callback_group::CallbackGroup::SharedPtr + get_group_by_subscription( + rclcpp::subscription::SubscriptionBase::SharedPtr &subscription) + { + for (auto &weak_node : weak_nodes_) + { + auto node = weak_node.lock(); + if (!node) + { + continue; + } + for (auto &weak_group : node->callback_groups_) + { + auto group = weak_group.lock(); + for (auto &sub : group->subscription_ptrs_) + { + if (sub == subscription) + { + return group; + } + } + } + } + return rclcpp::callback_group::CallbackGroup::SharedPtr(); + } + + void + get_next_subscription(std::shared_ptr &any_exec) + { + for (auto handle : subscriber_handles_) { - void *handle = ready_subscriber_handles.front(); - ready_subscriber_handles.pop_front(); - remove_subscriber_handle_from_subscriber_handles_(handle); auto subscription = get_subscription_by_handle(handle); - if (subscription) return subscription; + if (subscription) + { + // Find the group for this handle and see if it can be serviced + auto group = get_group_by_subscription(subscription); + if (!group) + { + // Group was not found, meaning the subscription is not valid... + // Remove it from the ready list and continue looking + remove_subscriber_handle_from_subscriber_handles(handle); + continue; + } + if (!group->can_be_taken_from_.load()) + { + // Group is mutually exclusive and is being used, so skip it for now + // Leave it to be checked next time, but continue searching + continue; + } + // Otherwise it is safe to set and return the any_exec + any_exec->subscription = subscription; + any_exec->callback_group = group; + any_exec->node = get_node_by_group(group); + remove_subscriber_handle_from_subscriber_handles(handle); + return; + } + // Else, the subscription is no longer valid, remove it and continue + remove_subscriber_handle_from_subscriber_handles(handle); } - return rclcpp::subscription::SubscriptionBase::SharedPtr(); } - std::shared_ptr get_next_executable(bool nonblocking=false) + std::shared_ptr + get_next_ready_executable() { - namespace rmi = ros_middleware_interface; - // Get any ready subscriber handles out of subscriber_handles_. - // TODO: operate directly on subscriber_handles_ or - // store ready_subscriber_handles to prevent rebuilding it every time - auto ready_subscriber_handles = get_ready_subscriber_handles(); - // If there are none - if (ready_subscriber_handles.size() == 0) - { - // Repopulate the subscriber handles and wait on them - reset_subscriber_handles(); - populate_subscriber_handles(); - // TODO: populate the guard handles correctly - rmi::GuardConditionHandles guard_condition_handles; - guard_condition_handles.guard_condition_count_ = 1; - guard_condition_handles.guard_conditions_ = static_cast( - std::malloc(sizeof(void *) * 1)); - guard_condition_handles.guard_conditions_[0] = \ - rclcpp::utilities::get_global_sigint_guard_cond().data_; - // Wait on the handles. - rmi::wait(subscriber_handles_, guard_condition_handles, nonblocking); - std::free(guard_condition_handles.guard_conditions_); - ready_subscriber_handles = get_ready_subscriber_handles(); - } - // At this point there should be something in either the subscriber handles - // Or in the guard handles. - // TODO: also check the timers here std::shared_ptr any_exec(new AnyExecutable()); - // TODO: actually check the guard handles - any_exec->guard_condition_handle = 0; - // If any_exec.guard_condition_handle is valid, return now before checking - // for valid subscriptions. - if (any_exec->guard_condition_handle) + // Check the timers to see if there are any that are ready, if so return + get_next_timer(any_exec); + if (any_exec->timer) { return any_exec; } - // Check for subscriptions which are ready to be handled - any_exec->subscription = get_next_subscription(ready_subscriber_handles); - // If any_exec.subscription is valid, return now + // Check the subscriptions to see if there are any that are ready + get_next_subscription(any_exec); if (any_exec->subscription) { return any_exec; } - // If there is neither a ready guard condition nor subscription, return an - // empty any_exec anyways + // If there is neither a ready timer nor subscription, return a null ptr + any_exec.reset(); return any_exec; } + std::shared_ptr + get_next_executable(bool nonblocking=false) + { + namespace rmi = ros_middleware_interface; + // Check to see if there are any subscriptions or timers needing service + // TODO: improve run to run efficiency of this function + auto any_exec = get_next_ready_executable(); + // If there are none + if (!any_exec) + { + // Repopulate the subscriber handles and wait on them + populate_all_handles(nonblocking); + // Try again + any_exec = get_next_ready_executable(); + } + // At this point any_exec should be valid with either a valid subscription + // or a valid timer, or it should be a null shared_ptr + if (any_exec) + { + // If it is valid, check to see if the group is mutually exclusive or + // not, then mark it accordingly + if (any_exec->callback_group->type_ == \ + callback_group::CallbackGroupType::MutuallyExclusive) + { + // It should not have been taken otherwise + assert(any_exec->callback_group->can_be_taken_from_.load() == true); + // Set to false to indicate something is being run from this group + any_exec->callback_group->can_be_taken_from_.store(false); + } + } + return any_exec; + } + + ros_middleware_interface::GuardConditionHandle interrupt_guard_condition_; + std::vector> weak_nodes_; + typedef std::list SubscriberHandles; + SubscriberHandles subscriber_handles_; + typedef std::list GuardConditionHandles; + GuardConditionHandles guard_condition_handles_; + private: RCLCPP_DISABLE_COPY(Executor); - std::vector> weak_nodes_; - ros_middleware_interface::SubscriberHandles subscriber_handles_; - }; } /* executor */ diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index db9b085..9648de2 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -46,11 +46,6 @@ public: ~MultiThreadedExecutor() {} - void add_node(rclcpp::node::Node::SharedPtr &node_ptr) - { - this->weak_nodes_.push_back(node_ptr); - } - void spin() { std::vector threads; @@ -59,9 +54,15 @@ public: { number_of_threads = 1; } - for (; number_of_threads > 0; --number_of_threads) { - threads.emplace_back(std::thread(&MultiThreadedExecutor::run, this)); + std::lock_guard wait_lock(wait_mutex_); + size_t thread_id = 1; + for (; number_of_threads > 0; --number_of_threads) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id++); + threads.emplace_back(func); + } } for (auto &thread : threads) { @@ -70,8 +71,9 @@ public: } private: - void run() + void run(size_t this_thread_id) { + rclcpp::thread_id = this_thread_id; while (rclcpp::utilities::ok()) { std::shared_ptr any_exec; @@ -83,17 +85,12 @@ private: } any_exec = get_next_executable(); } - if (any_exec && any_exec->subscription) - { - // Do callback - execute_subscription(any_exec->subscription); - } + execute_any_executable(any_exec); } } RCLCPP_DISABLE_COPY(MultiThreadedExecutor); - std::vector> weak_nodes_; std::mutex wait_mutex_; }; diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index dbdb700..e061ec1 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -51,11 +51,7 @@ public: while (rclcpp::utilities::ok()) { auto any_exec = get_next_executable(); - if (any_exec->subscription) - { - // Do callback - execute_subscription(any_exec->subscription); - } + execute_any_executable(any_exec); } }