From 222a0535b62719db69c6b57e66a94a45816ad164 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Wed, 27 Aug 2014 00:54:00 -0700 Subject: [PATCH] WIP changes to support multithreaded executor and timers --- rclcpp/include/rclcpp/callback_group.hpp | 21 +++- rclcpp/include/rclcpp/executor.hpp | 22 +++- .../executors/multi_threaded_executor.hpp | 45 ++++++- rclcpp/include/rclcpp/node.hpp | 31 +++-- rclcpp/include/rclcpp/node_impl.hpp | 60 +++++++-- rclcpp/include/rclcpp/rclcpp.hpp | 39 +++--- rclcpp/include/rclcpp/timer.hpp | 114 ++++++++++++++++++ 7 files changed, 287 insertions(+), 45 deletions(-) create mode 100644 rclcpp/include/rclcpp/timer.hpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 483eb62..c184655 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -20,6 +20,7 @@ #include #include +#include namespace rclcpp { @@ -31,7 +32,7 @@ namespace executor {class Executor;} namespace callback_group { -enum class CallbackGroupType {NonThreadSafe, ThreadSafe}; +enum class CallbackGroupType {MutuallyExclusive, Reentrant}; class CallbackGroup { @@ -40,21 +41,29 @@ class CallbackGroup public: RCLCPP_MAKE_SHARED_DEFINITIONS(CallbackGroup); -private: - CallbackGroup(std::string group_name, CallbackGroupType group_type) - : name_(group_name), type_(group_type) + CallbackGroup(CallbackGroupType group_type) + : type_(group_type) {} + +private: RCLCPP_DISABLE_COPY(CallbackGroup); void - add_subscription(subscription::SubscriptionBase::SharedPtr &subscription_ptr) + add_subscription( + const subscription::SubscriptionBase::SharedPtr &subscription_ptr) { subscription_ptrs_.push_back(subscription_ptr); } - std::string name_; + void + add_timer(const timer::TimerBase::SharedPtr &timer_ptr) + { + timer_ptrs_.push_back(timer_ptr); + } + CallbackGroupType type_; std::vector subscription_ptrs_; + std::vector timer_ptrs_; }; diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 11b91b3..89216c1 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -88,8 +88,13 @@ protected: std::malloc(sizeof(void *) * node.number_of_subscriptions_)); // Add subscriptions from groups size_t handles_index = 0; - for (auto group : node.callback_groups_) + for (auto weak_group : node.callback_groups_) { + auto group = weak_group.lock(); + if (!group) + { + continue; + } for (auto subscription : group->subscription_ptrs_) { assert(handles_index < node.number_of_subscriptions_); @@ -140,8 +145,13 @@ protected: size_t handles_index = 0; for (auto &node : nodes) { - for (auto group : node->callback_groups_) + for (auto weak_group : node->callback_groups_) { + auto group = weak_group.lock(); + if (!group) + { + continue; + } for (auto subscription : group->subscription_ptrs_) { assert(handles_index < number_of_subscriptions); @@ -151,7 +161,6 @@ protected: } } } - assert(handles_index == number_of_subscriptions); } struct AnyExecutable @@ -186,8 +195,13 @@ protected: { continue; } - for (auto group : node->callback_groups_) + for (auto weak_group : node->callback_groups_) { + auto group = weak_group.lock(); + if (!group) + { + continue; + } for (auto subscription : group->subscription_ptrs_) { if (subscription->subscription_handle_.data_ == subscriber_handle) diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 777bda7..db9b085 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -41,6 +42,10 @@ class MultiThreadedExecutor : public executor::Executor public: RCLCPP_MAKE_SHARED_DEFINITIONS(MultiThreadedExecutor); + MultiThreadedExecutor() {} + + ~MultiThreadedExecutor() {} + void add_node(rclcpp::node::Node::SharedPtr &node_ptr) { this->weak_nodes_.push_back(node_ptr); @@ -48,16 +53,48 @@ public: void spin() { - } - - void spin_some() - { + std::vector threads; + size_t number_of_threads = std::thread::hardware_concurrency(); + if (number_of_threads == 0) + { + number_of_threads = 1; + } + for (; number_of_threads > 0; --number_of_threads) + { + threads.emplace_back(std::thread(&MultiThreadedExecutor::run, this)); + } + for (auto &thread : threads) + { + thread.join(); + } } private: + void run() + { + while (rclcpp::utilities::ok()) + { + std::shared_ptr any_exec; + { + std::lock_guard wait_lock(wait_mutex_); + if (!rclcpp::utilities::ok()) + { + return; + } + any_exec = get_next_executable(); + } + if (any_exec && any_exec->subscription) + { + // Do callback + execute_subscription(any_exec->subscription); + } + } + } + RCLCPP_DISABLE_COPY(MultiThreadedExecutor); std::vector> weak_nodes_; + std::mutex wait_mutex_; }; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 181ae8a..fcbe525 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -16,6 +16,7 @@ #ifndef RCLCPP_RCLCPP_NODE_HPP_ #define RCLCPP_RCLCPP_NODE_HPP_ +#include #include #include @@ -24,6 +25,7 @@ #include #include #include +#include namespace rclcpp { @@ -53,15 +55,9 @@ public: std::string get_name(); - /* Creates a named callback group. */ - void - create_callback_group( - std::string group_name, - rclcpp::callback_group::CallbackGroupType callback_group_type); - - /* Return callback group if it exists, otherwise evaluates to false. */ + /* Create and return a callback group. */ rclcpp::callback_group::CallbackGroup::SharedPtr - get_callback_group(std::string group_name); + create_callback_group(rclcpp::callback_group::CallbackGroupType group_type); /* Create and return a Publisher. */ template rclcpp::publisher::Publisher::SharedPtr @@ -73,7 +69,21 @@ public: create_subscription( std::string topic_name, size_t queue_size, - std::function &)> callback); + std::function &)> callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); + + /* Create a timer. */ + rclcpp::timer::WallTimer::SharedPtr + create_wall_timer( + std::chrono::nanoseconds period, + rclcpp::timer::CallbackType callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); + + rclcpp::timer::WallTimer::SharedPtr + create_wall_timer( + std::chrono::duration period, + rclcpp::timer::CallbackType callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group=nullptr); private: RCLCPP_DISABLE_COPY(Node); @@ -85,9 +95,10 @@ private: rclcpp::context::Context::SharedPtr context_; rclcpp::callback_group::CallbackGroup::SharedPtr default_callback_group_; - std::vector callback_groups_; + std::list> callback_groups_; size_t number_of_subscriptions_; + size_t number_of_timers_; }; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 8da4a58..c8f69d6 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -43,14 +43,22 @@ Node::Node(std::string node_name, context::Context::SharedPtr context) : name_(node_name), context_(context), number_of_subscriptions_(0) { node_handle_ = ::ros_middleware_interface::create_node(); - using rclcpp::callback_group::CallbackGroup; using rclcpp::callback_group::CallbackGroupType; - default_callback_group_.reset(new CallbackGroup( - "default_group", CallbackGroupType::NonThreadSafe)); - callback_groups_.push_back(default_callback_group_); + default_callback_group_ = \ + create_callback_group(CallbackGroupType::MutuallyExclusive); } -template publisher::Publisher::SharedPtr +rclcpp::callback_group::CallbackGroup::SharedPtr +Node::create_callback_group( + rclcpp::callback_group::CallbackGroupType group_type) +{ + using rclcpp::callback_group::CallbackGroup; + using rclcpp::callback_group::CallbackGroupType; + return CallbackGroup::SharedPtr(new CallbackGroup(group_type)); +} + +template +publisher::Publisher::SharedPtr Node::create_publisher(std::string topic_name, size_t queue_size) { namespace rmi = ::ros_middleware_interface; @@ -68,7 +76,8 @@ typename subscription::Subscription::SharedPtr Node::create_subscription( std::string topic_name, size_t queue_size, - std::function &)> callback) + std::function &)> callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group) { namespace rmi = ::ros_middleware_interface; @@ -83,9 +92,46 @@ Node::create_subscription( topic_name, callback); auto sub_base_ptr = std::dynamic_pointer_cast(sub); - this->default_callback_group_->add_subscription(sub_base_ptr); + if (group) + { + group->add_subscription(sub_base_ptr); + } + else + { + default_callback_group_->add_subscription(sub_base_ptr); + } number_of_subscriptions_++; return sub; } +rclcpp::timer::WallTimer::SharedPtr +Node::create_wall_timer(std::chrono::nanoseconds period, + rclcpp::timer::CallbackType callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group) +{ + auto timer = rclcpp::timer::WallTimer::make_shared(period); + if (group) + { + group->add_timer(timer); + } + else + { + default_callback_group_->add_timer(timer); + } + number_of_timers_++; + return timer; +} + +rclcpp::timer::WallTimer::SharedPtr +Node::create_wall_timer( + std::chrono::duration period, + rclcpp::timer::CallbackType callback, + rclcpp::callback_group::CallbackGroup::SharedPtr group) +{ + return create_wall_timer( + std::chrono::duration_cast(period), + callback, + group); +} + #endif /* RCLCPP_RCLCPP_NODE_IMPL_HPP_ */ diff --git a/rclcpp/include/rclcpp/rclcpp.hpp b/rclcpp/include/rclcpp/rclcpp.hpp index 89abfe3..fb02037 100644 --- a/rclcpp/include/rclcpp/rclcpp.hpp +++ b/rclcpp/include/rclcpp/rclcpp.hpp @@ -18,23 +18,39 @@ #include #include -#include #include #include #include #include -namespace -{ - std::shared_ptr g_executor(0); - std::mutex g_executor_mutex; -} - namespace rclcpp { +constexpr std::chrono::seconds operator ""_s(unsigned long long s) +{ + return std::chrono::seconds(s); +} +constexpr std::chrono::duration operator ""_s(long double s) +{ + return std::chrono::duration(s); +} + +constexpr std::chrono::nanoseconds +operator ""_ns(unsigned long long ns) +{ + return std::chrono::nanoseconds(ns); +} +constexpr std::chrono::duration +operator ""_ns(long double ns) +{ + return std::chrono::duration(ns); +} + using rclcpp::node::Node; +using rclcpp::publisher::Publisher; +using rclcpp::subscription::SubscriptionBase; +using rclcpp::subscription::Subscription; using rclcpp::rate::GenericRate; using rclcpp::rate::WallRate; using rclcpp::utilities::ok; @@ -42,13 +58,8 @@ using rclcpp::utilities::init; void spin_some(Node &node) { - std::lock_guard lock(g_executor_mutex); - if (!g_executor) - { - // Create an executor - g_executor.reset(new rclcpp::executors::SingleThreadedExecutor()); - } - g_executor->spin_node_some(node); + rclcpp::executors::SingleThreadedExecutor executor; + executor.spin_node_some(node); } void spin_some(Node::SharedPtr &node_ptr) diff --git a/rclcpp/include/rclcpp/timer.hpp b/rclcpp/include/rclcpp/timer.hpp new file mode 100644 index 0000000..14e9d33 --- /dev/null +++ b/rclcpp/include/rclcpp/timer.hpp @@ -0,0 +1,114 @@ +/* Copyright 2014 Open Source Robotics Foundation, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef RCLCPP_RCLCPP_TIMER_HPP_ +#define RCLCPP_RCLCPP_TIMER_HPP_ + +#include +#include +#include +#include + +#include + +namespace rclcpp +{ +namespace timer +{ + +typedef std::function CallbackType; + +class TimerBase +{ +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(TimerBase); + + virtual bool sleep() = 0; + virtual bool is_steady() = 0; + virtual void reset() = 0; +}; + +template +class GenericTimer : public TimerBase +{ +public: + RCLCPP_MAKE_SHARED_DEFINITIONS(GenericTimer); + + GenericTimer(std::chrono::nanoseconds period) + : period_(period) + { + } + + virtual bool + sleep() + { + // Time coming into sleep + auto now = Clock::now(); + // Time of next interval + auto next_interval = last_interval_ + period_; + // Detect backwards time flow + if (now < last_interval_) + { + // Best thing to do is to set the next_interval to now + period + next_interval = now + period_; + } + // Calculate the time to sleep + auto time_to_sleep = next_interval - now; + // Update the interval + last_interval_ += period_; + // If the time_to_sleep is negative or zero, don't sleep + if (time_to_sleep <= std::chrono::seconds(0)) + { + // If an entire cycle was missed then reset next interval. + // This might happen if the loop took more than a cycle. + // Or if time jumps forward. + if (now > next_interval + period_) + { + last_interval_ = now + period_; + } + // Either way do not sleep and return false + return false; + } + // Sleep + std::this_thread::sleep_for(time_to_sleep); + return true; + } + + virtual bool + is_steady() + { + return Clock::is_steady; + } + + virtual void + reset() + { + last_interval_ = Clock::now(); + } + +private: + RCLCPP_DISABLE_COPY(GenericTimer); + + std::chrono::nanoseconds period_; + std::chrono::time_point last_interval_; + +}; + +typedef GenericTimer WallTimer; + +} /* namespace timer */ +} /* namespace rclcpp */ + +#endif /* RCLCPP_RCLCPP_TIMER_HPP_ */