From 99286978f92c30fe171313bf0785d6b6272c3257 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Sat, 28 Mar 2020 00:37:51 -0700 Subject: [PATCH] [rclcpp] adding a "static" single threaded executor (#1034) * Added static single threaded executor functionality Signed-off-by: Ishu Goel executor enhanced to run clients and waitable Signed-off-by: Ishu Goel tested executor Signed-off-by: Ishu Goel added semi-dynamic feature to the executor Signed-off-by: Ishu Goel Jenkins error fixes Signed-off-by: Ishu Goel Added static single threaded executor functionality Signed-off-by: Ishu Goel * Added semi-dynamic feature and made changes based on review comments Signed-off-by: Ishu Goel * re-added accidentally deleted code in node.hpp, fixed static_single_threaded_executor.cpp w.r.t. intra-process change since last commit Signed-off-by: MartinCornelis2 * Remove not needed comparison wait_set_.size_of_* is always different than '0' if we are inside the for loop Signed-off-by: Mauro * If new entity added to a node: re-collect entities Now we check ONLY node guard_conditions_ Some possible guard conditions to be triggered HERE are: 1. Ctrl+C guard condition 2. Executor interrupt_guard_condition_ 3. Node guard_conditions_ 4. Waitables guard conditions 5. ..more The previous approach was only checking if NOT (1 & 2), so if a Waitable was triggered, it would re-collect all entities, even if no new node entity was added. This was the case of the intra process manager, who relies on waitables. Every time a subscriber got a message, all the entities were collected. Signed-off-by: Mauro * Implement static executor entities collector Signed-off-by: Mauro * fixup and style Signed-off-by: William Woodall * mark new classes as final, with non-virtual destructors Signed-off-by: William Woodall * adding copyright to static executor files Signed-off-by: Ishu Goel * fixup Signed-off-by: William Woodall * cpplint fixes Signed-off-by: William Woodall Co-authored-by: Ishu Goel Co-authored-by: MartinCornelis2 Co-authored-by: Mauro --- rclcpp/CMakeLists.txt | 3 + rclcpp/include/rclcpp/executable_list.hpp | 92 ++++++ rclcpp/include/rclcpp/executor.hpp | 1 - rclcpp/include/rclcpp/executors.hpp | 1 + .../static_executor_entities_collector.hpp | 167 ++++++++++ .../static_single_threaded_executor.hpp | 200 ++++++++++++ rclcpp/include/rclcpp/memory_strategy.hpp | 1 + .../strategies/allocator_memory_strategy.hpp | 9 +- rclcpp/src/rclcpp/executable_list.cpp | 84 +++++ .../static_executor_entities_collector.cpp | 286 ++++++++++++++++++ .../static_single_threaded_executor.cpp | 144 +++++++++ 11 files changed, 986 insertions(+), 2 deletions(-) create mode 100644 rclcpp/include/rclcpp/executable_list.hpp create mode 100644 rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp create mode 100644 rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp create mode 100644 rclcpp/src/rclcpp/executable_list.cpp create mode 100644 rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp create mode 100644 rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 26361a2..fb80b4b 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -40,11 +40,14 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/duration.cpp src/rclcpp/event.cpp src/rclcpp/exceptions.cpp + src/rclcpp/executable_list.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp src/rclcpp/expand_topic_or_service_name.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp + src/rclcpp/executors/static_executor_entities_collector.cpp + src/rclcpp/executors/static_single_threaded_executor.cpp src/rclcpp/graph_listener.cpp src/rclcpp/init_options.cpp src/rclcpp/intra_process_manager.cpp diff --git a/rclcpp/include/rclcpp/executable_list.hpp b/rclcpp/include/rclcpp/executable_list.hpp new file mode 100644 index 0000000..2929b54 --- /dev/null +++ b/rclcpp/include/rclcpp/executable_list.hpp @@ -0,0 +1,92 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTABLE_LIST_HPP_ +#define RCLCPP__EXECUTABLE_LIST_HPP_ + +#include +#include + +#include "rclcpp/client.hpp" +#include "rclcpp/service.hpp" +#include "rclcpp/subscription_base.hpp" +#include "rclcpp/timer.hpp" +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executor +{ + +/// This class contains subscriptionbase, timerbase, etc. which can be used to run callbacks. +class ExecutableList final +{ +public: + RCLCPP_PUBLIC + ExecutableList(); + + RCLCPP_PUBLIC + ~ExecutableList(); + + RCLCPP_PUBLIC + void + clear(); + + RCLCPP_PUBLIC + void + add_subscription(rclcpp::SubscriptionBase::SharedPtr subscription); + + RCLCPP_PUBLIC + void + add_timer(rclcpp::TimerBase::SharedPtr timer); + + RCLCPP_PUBLIC + void + add_service(rclcpp::ServiceBase::SharedPtr service); + + RCLCPP_PUBLIC + void + add_client(rclcpp::ClientBase::SharedPtr client); + + RCLCPP_PUBLIC + void + add_waitable(rclcpp::Waitable::SharedPtr waitable); + + // Vector containing the SubscriptionBase of all the subscriptions added to the executor. + std::vector subscription; + // Contains the count of added subscriptions + size_t number_of_subscriptions; + // Vector containing the TimerBase of all the timers added to the executor. + std::vector timer; + // Contains the count of added timers + size_t number_of_timers; + // Vector containing the ServiceBase of all the services added to the executor. + std::vector service; + // Contains the count of added services + size_t number_of_services; + // Vector containing the ClientBase of all the clients added to the executor. + std::vector client; + // Contains the count of added clients + size_t number_of_clients; + // Vector containing all the waitables added to the executor. + std::vector waitable; + // Contains the count of added waitables + size_t number_of_waitables; +}; + +} // namespace executor +} // namespace rclcpp + +#endif // RCLCPP__EXECUTABLE_LIST_HPP_ diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index b5ff96d..aa94c01 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -357,7 +357,6 @@ protected: /// The context associated with this executor. std::shared_ptr context_; -private: RCLCPP_DISABLE_COPY(Executor) std::list weak_nodes_; diff --git a/rclcpp/include/rclcpp/executors.hpp b/rclcpp/include/rclcpp/executors.hpp index 6c73994..ac91141 100644 --- a/rclcpp/include/rclcpp/executors.hpp +++ b/rclcpp/include/rclcpp/executors.hpp @@ -20,6 +20,7 @@ #include "rclcpp/executors/multi_threaded_executor.hpp" #include "rclcpp/executors/single_threaded_executor.hpp" +#include "rclcpp/executors/static_single_threaded_executor.hpp" #include "rclcpp/node.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" diff --git a/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp new file mode 100644 index 0000000..49d069e --- /dev/null +++ b/rclcpp/include/rclcpp/executors/static_executor_entities_collector.hpp @@ -0,0 +1,167 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ +#define RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ + +#include +#include + +#include "rcl/guard_condition.h" +#include "rcl/wait.h" + +#include "rclcpp/executable_list.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategy.hpp" +#include "rclcpp/visibility_control.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ +namespace executors +{ + +class StaticExecutorEntitiesCollector final + : public rclcpp::Waitable, + public std::enable_shared_from_this +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(StaticExecutorEntitiesCollector) + + // Constructor + RCLCPP_PUBLIC + StaticExecutorEntitiesCollector() = default; + + // Destructor + ~StaticExecutorEntitiesCollector(); + + RCLCPP_PUBLIC + void + init( + rcl_wait_set_t * p_wait_set, + rclcpp::memory_strategy::MemoryStrategy::SharedPtr & memory_strategy, + rcl_guard_condition_t * executor_guard_condition); + + RCLCPP_PUBLIC + void + execute() override; + + RCLCPP_PUBLIC + void + fill_memory_strategy(); + + RCLCPP_PUBLIC + void + fill_executable_list(); + + /// Function to reallocate space for entities in the wait set. + RCLCPP_PUBLIC + void + prepare_wait_set(); + + /// Function to add_handles_to_wait_set and wait for work and + // block until the wait set is ready or until the timeout has been exceeded. + RCLCPP_PUBLIC + void + refresh_wait_set(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + bool + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + size_t + get_number_of_ready_guard_conditions() override; + + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + RCLCPP_PUBLIC + bool + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr); + + /// Complete all available queued work without blocking. + /** + * This function checks if after the guard condition was triggered + * (or a spurious wakeup happened) we are really ready to execute + * i.e. re-collect entities + */ + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) override; + + RCLCPP_PUBLIC + size_t + get_number_of_timers() {return exec_list_.number_of_timers;} + + RCLCPP_PUBLIC + size_t + get_number_of_subscriptions() {return exec_list_.number_of_subscriptions;} + + RCLCPP_PUBLIC + size_t + get_number_of_services() {return exec_list_.number_of_services;} + + RCLCPP_PUBLIC + size_t + get_number_of_clients() {return exec_list_.number_of_clients;} + + RCLCPP_PUBLIC + size_t + get_number_of_waitables() {return exec_list_.number_of_waitables;} + + RCLCPP_PUBLIC + rclcpp::SubscriptionBase::SharedPtr + get_subscription(size_t i) {return exec_list_.subscription[i];} + + RCLCPP_PUBLIC + rclcpp::TimerBase::SharedPtr + get_timer(size_t i) {return exec_list_.timer[i];} + + RCLCPP_PUBLIC + rclcpp::ServiceBase::SharedPtr + get_service(size_t i) {return exec_list_.service[i];} + + RCLCPP_PUBLIC + rclcpp::ClientBase::SharedPtr + get_client(size_t i) {return exec_list_.client[i];} + + RCLCPP_PUBLIC + rclcpp::Waitable::SharedPtr + get_waitable(size_t i) {return exec_list_.waitable[i];} + +private: + /// Nodes guard conditions which trigger this waitable + std::list guard_conditions_; + + /// Memory strategy: an interface for handling user-defined memory allocation strategies. + rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; + + /// List of weak nodes registered in the static executor + std::list weak_nodes_; + + /// Wait set for managing entities that the rmw layer waits on. + rcl_wait_set_t * p_wait_set_ = nullptr; + + /// Executable list: timers, subscribers, clients, services and waitables + rclcpp::executor::ExecutableList exec_list_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__STATIC_EXECUTOR_ENTITIES_COLLECTOR_HPP_ diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp new file mode 100644 index 0000000..43d086f --- /dev/null +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -0,0 +1,200 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ +#define RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ + +#include +#include +#include +#include +#include + +#include "rmw/rmw.h" + +#include "rclcpp/executable_list.hpp" +#include "rclcpp/executor.hpp" +#include "rclcpp/executors/static_executor_entities_collector.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/rate.hpp" +#include "rclcpp/utilities.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace executors +{ + +/// Static executor implementation +/** + * This executor is a static version of the original single threaded executor. + * It's static because it doesn't reconstruct the executable list for every iteration. + * All nodes, callbackgroups, timers, subscriptions etc. are created before + * spin() is called, and modified only when an entity is added/removed to/from a node. + * + * To run this executor instead of SingleThreadedExecutor replace: + * rclcpp::executors::SingleThreadedExecutor exec; + * by + * rclcpp::executors::StaticSingleThreadedExecutor exec; + * in your source code and spin node(s) in the following way: + * exec.add_node(node); + * exec.spin(); + * exec.remove_node(node); + */ +class StaticSingleThreadedExecutor : public executor::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(StaticSingleThreadedExecutor) + + /// Default constructor. See the default constructor for Executor. + RCLCPP_PUBLIC + explicit StaticSingleThreadedExecutor( + const executor::ExecutorArgs & args = executor::ExecutorArgs()); + + /// Default destrcutor. + RCLCPP_PUBLIC + virtual ~StaticSingleThreadedExecutor(); + + /// Static executor implementation of spin. + // This function will block until work comes in, execute it, and keep blocking. + // It will only be interrupt by a CTRL-C (managed by the global signal handler). + RCLCPP_PUBLIC + void + spin() override; + + /// Add a node to the executor. + /** + * An executor can have zero or more nodes which provide work during `spin` functions. + * \param[in] node_ptr Shared pointer to the node to be added. + * \param[in] notify True to trigger the interrupt guard condition during this function. If + * the executor is blocked at the rmw layer while waiting for work and it is notified that a new + * node was added, it will wake up. + */ + RCLCPP_PUBLIC + void + add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + RCLCPP_PUBLIC + void + add_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Remove a node from the executor. + /** + * \param[in] node_ptr Shared pointer to the node to remove. + * \param[in] notify True to trigger the interrupt guard condition and wake up the executor. + * This is useful if the last node was removed from the executor while the executor was blocked + * waiting for work in another thread, because otherwise the executor would never be notified. + */ + RCLCPP_PUBLIC + void + remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true) override; + + /// Convenience function which takes Node and forwards NodeBaseInterface. + RCLCPP_PUBLIC + void + remove_node(std::shared_ptr node_ptr, bool notify = true) override; + + /// Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted. + /** + * \param[in] future The future to wait on. If this function returns SUCCESS, the future can be + * accessed without blocking (though it may still throw an exception). + * \param[in] timeout Optional timeout parameter, which gets passed to + * Executor::execute_ready_executables. + * `-1` is block forever, `0` is non-blocking. + * If the time spent inside the blocking loop exceeds this timeout, return a TIMEOUT return + * code. + * \return The return code, one of `SUCCESS`, `INTERRUPTED`, or `TIMEOUT`. + * + * Example usage: + * rclcpp::executors::StaticSingleThreadedExecutor exec; + * // ... other part of code like creating node + * // define future + * exec.add_node(node); + * exec.spin_until_future_complete(future); + */ + template + rclcpp::executor::FutureReturnCode + spin_until_future_complete( + std::shared_future & future, + std::chrono::duration timeout = std::chrono::duration(-1)) + { + std::future_status status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return rclcpp::executor::FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + entities_collector_ = std::make_shared(); + entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_); + + while (rclcpp::ok(this->context_)) { + // Do one set of work. + entities_collector_->refresh_wait_set(timeout_left); + execute_ready_executables(); + // Check if the future is set, return SUCCESS if it is. + status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return rclcpp::executor::FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return rclcpp::executor::FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return rclcpp::executor::FutureReturnCode::INTERRUPTED; + } + +protected: + /// Check which executables in ExecutableList struct are ready from wait_set and execute them. + /** + * \param[in] exec_list Structure that can hold subscriptionbases, timerbases, etc + * \param[in] timeout Optional timeout parameter. + */ + RCLCPP_PUBLIC + void + execute_ready_executables(); + +private: + RCLCPP_DISABLE_COPY(StaticSingleThreadedExecutor) + + StaticExecutorEntitiesCollector::SharedPtr entities_collector_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__STATIC_SINGLE_THREADED_EXECUTOR_HPP_ diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index aaf730b..3549802 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -56,6 +56,7 @@ public: virtual size_t number_of_guard_conditions() const = 0; virtual size_t number_of_waitables() const = 0; + virtual void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) = 0; virtual bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) = 0; virtual void clear_handles() = 0; virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index dae7370..bc703f5 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -169,7 +169,6 @@ public: subscription_handles_.push_back(subscription->get_subscription_handle()); return false; }); - group->find_service_ptrs_if( [this](const rclcpp::ServiceBase::SharedPtr & service) { service_handles_.push_back(service->get_service_handle()); @@ -195,6 +194,14 @@ public: return has_invalid_weak_nodes; } + void add_waitable_handle(const rclcpp::Waitable::SharedPtr & waitable) + { + if (nullptr == waitable) { + throw std::runtime_error("waitable object unexpectedly nullptr"); + } + waitable_handles_.push_back(waitable); + } + bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) override { for (auto subscription : subscription_handles_) { diff --git a/rclcpp/src/rclcpp/executable_list.cpp b/rclcpp/src/rclcpp/executable_list.cpp new file mode 100644 index 0000000..d9dae57 --- /dev/null +++ b/rclcpp/src/rclcpp/executable_list.cpp @@ -0,0 +1,84 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/executable_list.hpp" + +using rclcpp::executor::ExecutableList; + +ExecutableList::ExecutableList() +: number_of_subscriptions(0), + number_of_timers(0), + number_of_services(0), + number_of_clients(0), + number_of_waitables(0) +{} + +ExecutableList::~ExecutableList() +{} + +void +ExecutableList::clear() +{ + this->timer.clear(); + this->number_of_timers = 0; + + this->subscription.clear(); + this->number_of_subscriptions = 0; + + this->service.clear(); + this->number_of_services = 0; + + this->client.clear(); + this->number_of_clients = 0; + + this->waitable.clear(); + this->number_of_waitables = 0; +} + +void +ExecutableList::add_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) +{ + this->subscription.push_back(std::move(subscription)); + this->number_of_subscriptions++; +} + +void +ExecutableList::add_timer(rclcpp::TimerBase::SharedPtr timer) +{ + this->timer.push_back(std::move(timer)); + this->number_of_timers++; +} + +void +ExecutableList::add_service(rclcpp::ServiceBase::SharedPtr service) +{ + this->service.push_back(std::move(service)); + this->number_of_services++; +} + +void +ExecutableList::add_client(rclcpp::ClientBase::SharedPtr client) +{ + this->client.push_back(std::move(client)); + this->number_of_clients++; +} + +void +ExecutableList::add_waitable(rclcpp::Waitable::SharedPtr waitable) +{ + this->waitable.push_back(std::move(waitable)); + this->number_of_waitables++; +} diff --git a/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp new file mode 100644 index 0000000..f8e32da --- /dev/null +++ b/rclcpp/src/rclcpp/executors/static_executor_entities_collector.cpp @@ -0,0 +1,286 @@ +// Copyright 2020 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/static_executor_entities_collector.hpp" + +#include +#include + +#include "rclcpp/executors/static_single_threaded_executor.hpp" + +using rclcpp::executors::StaticExecutorEntitiesCollector; + +StaticExecutorEntitiesCollector::~StaticExecutorEntitiesCollector() +{ + // Disassociate all nodes + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node) { + std::atomic_bool & has_executor = node->get_associated_with_executor_atomic(); + has_executor.store(false); + } + } + exec_list_.clear(); + weak_nodes_.clear(); + guard_conditions_.clear(); +} + +void +StaticExecutorEntitiesCollector::init( + rcl_wait_set_t * p_wait_set, + memory_strategy::MemoryStrategy::SharedPtr & memory_strategy, + rcl_guard_condition_t * executor_guard_condition) +{ + // Empty initialize executable list + exec_list_ = executor::ExecutableList(); + // Get executor's wait_set_ pointer + p_wait_set_ = p_wait_set; + // Get executor's memory strategy ptr + if (memory_strategy == nullptr) { + throw std::runtime_error("Received NULL memory strategy in executor waitable."); + } + memory_strategy_ = memory_strategy; + + // Add executor's guard condition + guard_conditions_.push_back(executor_guard_condition); + + // Get memory strategy and executable list. Prepare wait_set_ + execute(); +} + +void +StaticExecutorEntitiesCollector::execute() +{ + // Fill memory strategy with entities coming from weak_nodes_ + fill_memory_strategy(); + // Fill exec_list_ with entities coming from weak_nodes_ (same as memory strategy) + fill_executable_list(); + // Resize the wait_set_ based on memory_strategy handles (rcl_wait_set_resize) + prepare_wait_set(); +} + +void +StaticExecutorEntitiesCollector::fill_memory_strategy() +{ + memory_strategy_->clear_handles(); + bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); + + // Clean up any invalid nodes, if they were detected + if (has_invalid_weak_nodes) { + auto node_it = weak_nodes_.begin(); + while (node_it != weak_nodes_.end()) { + if (node_it->expired()) { + node_it = weak_nodes_.erase(node_it); + } else { + ++node_it; + } + } + } + + // Add the static executor waitable to the memory strategy + memory_strategy_->add_waitable_handle(this->shared_from_this()); +} + +void +StaticExecutorEntitiesCollector::fill_executable_list() +{ + exec_list_.clear(); + + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (!node) { + continue; + } + // Check in all the callback groups + for (auto & weak_group : node->get_callback_groups()) { + auto group = weak_group.lock(); + if (!group || !group->can_be_taken_from().load()) { + continue; + } + + group->find_timer_ptrs_if( + [this](const rclcpp::TimerBase::SharedPtr & timer) { + if (timer) { + exec_list_.add_timer(timer); + } + return false; + }); + group->find_subscription_ptrs_if( + [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { + if (subscription) { + exec_list_.add_subscription(subscription); + } + return false; + }); + group->find_service_ptrs_if( + [this](const rclcpp::ServiceBase::SharedPtr & service) { + if (service) { + exec_list_.add_service(service); + } + return false; + }); + group->find_client_ptrs_if( + [this](const rclcpp::ClientBase::SharedPtr & client) { + if (client) { + exec_list_.add_client(client); + } + return false; + }); + group->find_waitable_ptrs_if( + [this](const rclcpp::Waitable::SharedPtr & waitable) { + if (waitable) { + exec_list_.add_waitable(waitable); + } + return false; + }); + } + } + + // Add the executor's waitable to the executable list + exec_list_.add_waitable(shared_from_this()); +} + +void +StaticExecutorEntitiesCollector::prepare_wait_set() +{ + // clear wait set + if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + + // The size of waitables are accounted for in size of the other entities + rcl_ret_t ret = rcl_wait_set_resize( + p_wait_set_, memory_strategy_->number_of_ready_subscriptions(), + memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); + + if (RCL_RET_OK != ret) { + throw std::runtime_error( + std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); + } +} + +void +StaticExecutorEntitiesCollector::refresh_wait_set(std::chrono::nanoseconds timeout) +{ + // clear wait set (memeset to '0' all wait_set_ entities + // but keeps the wait_set_ number of entities) + if (rcl_wait_set_clear(p_wait_set_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear wait set"); + } + + if (!memory_strategy_->add_handles_to_wait_set(p_wait_set_)) { + throw std::runtime_error("Couldn't fill wait set"); + } + + rcl_ret_t status = + rcl_wait(p_wait_set_, std::chrono::duration_cast(timeout).count()); + + if (status == RCL_RET_WAIT_SET_EMPTY) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in rcl_wait(). This should never happen."); + } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(status, "rcl_wait() failed"); + } +} + +bool +StaticExecutorEntitiesCollector::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + // Add waitable guard conditions (one for each registered node) into the wait set. + for (const auto & gc : guard_conditions_) { + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, gc, NULL); + if (ret != RCL_RET_OK) { + throw std::runtime_error("Executor waitable: couldn't add guard condition to wait set"); + } + } + return true; +} + +size_t StaticExecutorEntitiesCollector::get_number_of_ready_guard_conditions() +{ + return guard_conditions_.size(); +} + +void +StaticExecutorEntitiesCollector::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + // Check to ensure node not already added + for (auto & weak_node : weak_nodes_) { + auto node = weak_node.lock(); + if (node == node_ptr) { + // TODO(jacquelinekay): Use a different error here? + throw std::runtime_error("Cannot add node to executor, node already added."); + } + } + + weak_nodes_.push_back(node_ptr); + guard_conditions_.push_back(node_ptr->get_notify_guard_condition()); +} + +bool +StaticExecutorEntitiesCollector::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr) +{ + auto node_it = weak_nodes_.begin(); + + while (node_it != weak_nodes_.end()) { + bool matched = (node_it->lock() == node_ptr); + if (matched) { + // Find and remove node and its guard condition + auto gc_it = std::find( + guard_conditions_.begin(), + guard_conditions_.end(), + node_ptr->get_notify_guard_condition()); + + if (gc_it != guard_conditions_.end()) { + guard_conditions_.erase(gc_it); + weak_nodes_.erase(node_it); + return true; + } + + throw std::runtime_error("Didn't find guard condition associated with node."); + + } else { + ++node_it; + } + } + + return false; +} + +bool +StaticExecutorEntitiesCollector::is_ready(rcl_wait_set_t * p_wait_set) +{ + // Check wait_set guard_conditions for added/removed entities to/from a node + for (size_t i = 0; i < p_wait_set->size_of_guard_conditions; ++i) { + if (p_wait_set->guard_conditions[i] != NULL) { + // Check if the guard condition triggered belongs to a node + auto it = std::find( + guard_conditions_.begin(), guard_conditions_.end(), + p_wait_set->guard_conditions[i]); + + // If it does, we are ready to re-collect entities + if (it != guard_conditions_.end()) { + return true; + } + } + } + // None of the guard conditions triggered belong to a registered node + return false; +} diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp new file mode 100644 index 0000000..c6c1d1b --- /dev/null +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -0,0 +1,144 @@ +// Copyright 2019 Nobleo Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/static_single_threaded_executor.hpp" + +#include + +#include "rclcpp/scope_exit.hpp" + +using rclcpp::executors::StaticSingleThreadedExecutor; +using rclcpp::executor::ExecutableList; + +StaticSingleThreadedExecutor::StaticSingleThreadedExecutor( + const rclcpp::executor::ExecutorArgs & args) +: executor::Executor(args) +{ + entities_collector_ = std::make_shared(); +} + +StaticSingleThreadedExecutor::~StaticSingleThreadedExecutor() {} + +void +StaticSingleThreadedExecutor::spin() +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); + + // Set memory_strategy_ and exec_list_ based on weak_nodes_ + // Prepare wait_set_ based on memory_strategy_ + entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_); + + while (rclcpp::ok(this->context_) && spinning.load()) { + // Refresh wait set and wait for work + entities_collector_->refresh_wait_set(); + execute_ready_executables(); + } +} + +void +StaticSingleThreadedExecutor::add_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } + + if (notify) { + // Interrupt waiting to handle new node + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); + } + } + + entities_collector_->add_node(node_ptr); +} + +void +StaticSingleThreadedExecutor::add_node(std::shared_ptr node_ptr, bool notify) +{ + this->add_node(node_ptr->get_node_base_interface(), notify); +} + +void +StaticSingleThreadedExecutor::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + bool node_removed = entities_collector_->remove_node(node_ptr); + + if (notify) { + // If the node was matched and removed, interrupt waiting + if (node_removed) { + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string().str); + } + } + } + + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + has_executor.store(false); +} + +void +StaticSingleThreadedExecutor::remove_node(std::shared_ptr node_ptr, bool notify) +{ + this->remove_node(node_ptr->get_node_base_interface(), notify); +} + +void +StaticSingleThreadedExecutor::execute_ready_executables() +{ + // Execute all the ready subscriptions + for (size_t i = 0; i < wait_set_.size_of_subscriptions; ++i) { + if (i < entities_collector_->get_number_of_subscriptions()) { + if (wait_set_.subscriptions[i]) { + execute_subscription(entities_collector_->get_subscription(i)); + } + } + } + // Execute all the ready timers + for (size_t i = 0; i < wait_set_.size_of_timers; ++i) { + if (i < entities_collector_->get_number_of_timers()) { + if (wait_set_.timers[i] && entities_collector_->get_timer(i)->is_ready()) { + execute_timer(entities_collector_->get_timer(i)); + } + } + } + // Execute all the ready services + for (size_t i = 0; i < wait_set_.size_of_services; ++i) { + if (i < entities_collector_->get_number_of_services()) { + if (wait_set_.services[i]) { + execute_service(entities_collector_->get_service(i)); + } + } + } + // Execute all the ready clients + for (size_t i = 0; i < wait_set_.size_of_clients; ++i) { + if (i < entities_collector_->get_number_of_clients()) { + if (wait_set_.clients[i]) { + execute_client(entities_collector_->get_client(i)); + } + } + } + // Execute all the ready waitables + for (size_t i = 0; i < entities_collector_->get_number_of_waitables(); ++i) { + if (entities_collector_->get_waitable(i)->is_ready(&wait_set_)) { + entities_collector_->get_waitable(i)->execute(); + } + } +}