diff --git a/rclcpp/include/rclcpp/any_executable.hpp b/rclcpp/include/rclcpp/any_executable.hpp index 6f886fd..656b2c7 100644 --- a/rclcpp/include/rclcpp/any_executable.hpp +++ b/rclcpp/include/rclcpp/any_executable.hpp @@ -33,6 +33,9 @@ struct AnyExecutable RCLCPP_PUBLIC AnyExecutable(); + RCLCPP_PUBLIC + virtual ~AnyExecutable(); + // Only one of the following pointers will be set. rclcpp::subscription::SubscriptionBase::SharedPtr subscription; rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process; diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index d620334..1e6bd95 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -180,6 +180,12 @@ public: return FutureReturnCode::INTERRUPTED; } + /// Cancel any running spin* function, causing it to return. + /* This function can be called asynchonously from any thread. */ + RCLCPP_PUBLIC + void + cancel(); + /// Support dynamic switching of the memory strategy. /** * Switching the memory strategy while the executor is spinning in another threading could have @@ -253,6 +259,9 @@ protected: AnyExecutable::SharedPtr get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. + std::atomic_bool spinning; + /// Guard condition for signaling the rmw layer to wake up for special events. rmw_guard_condition_t * interrupt_guard_condition_; diff --git a/rclcpp/include/rclcpp/macros.hpp b/rclcpp/include/rclcpp/macros.hpp index b22067d..b6cc888 100644 --- a/rclcpp/include/rclcpp/macros.hpp +++ b/rclcpp/include/rclcpp/macros.hpp @@ -82,6 +82,9 @@ __RCLCPP_UNIQUE_PTR_ALIAS(__VA_ARGS__) \ __RCLCPP_MAKE_UNIQUE_DEFINITION(__VA_ARGS__) +#define RCLCPP_STRING_JOIN(arg1, arg2) RCLCPP_DO_STRING_JOIN(arg1, arg2) +#define RCLCPP_DO_STRING_JOIN(arg1, arg2) arg1 ## arg2 + #define RCLCPP_INFO(Args) std::cout << Args << std::endl; #endif // RCLCPP__MACROS_HPP_ diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index 1006152..dbcac72 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -46,7 +46,6 @@ RCLCPP_PUBLIC void shutdown(); - /// Get a handle to the rmw guard condition that manages the signal handler. RCLCPP_PUBLIC rmw_guard_condition_t * diff --git a/rclcpp/src/rclcpp/any_executable.cpp b/rclcpp/src/rclcpp/any_executable.cpp index 0566140..0cef731 100644 --- a/rclcpp/src/rclcpp/any_executable.cpp +++ b/rclcpp/src/rclcpp/any_executable.cpp @@ -25,3 +25,13 @@ AnyExecutable::AnyExecutable() callback_group(nullptr), node(nullptr) {} + +AnyExecutable::~AnyExecutable() +{ + // Make sure that discarded (taken but not executed) AnyExecutable's have + // their callback groups reset. This can happen when an executor is canceled + // between taking an AnyExecutable and executing it. + if (callback_group) { + callback_group->can_be_taken_from().store(true); + } +} diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index bfe27a3..981934c 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -16,11 +16,13 @@ #include "rcl_interfaces/msg/intra_process_message.hpp" +#include "./scope_exit.hpp" + using rclcpp::executor::AnyExecutable; using rclcpp::executor::Executor; Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) -: interrupt_guard_condition_(rmw_create_guard_condition()), +: spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()), memory_strategy_(ms) { } @@ -108,9 +110,12 @@ Executor::spin_node_some(rclcpp::node::Node::SharedPtr node) void Executor::spin_some() { - while (AnyExecutable::SharedPtr any_exec = - get_next_executable(std::chrono::milliseconds::zero())) - { + if (spinning.exchange(true)) { + throw std::runtime_error("spin_some() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); + AnyExecutable::SharedPtr any_exec; + while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && spinning.load()) { execute_any_executable(any_exec); } } @@ -118,12 +123,26 @@ Executor::spin_some() void Executor::spin_once(std::chrono::nanoseconds timeout) { + if (spinning.exchange(true)) { + throw std::runtime_error("spin_once() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); auto any_exec = get_next_executable(timeout); if (any_exec) { execute_any_executable(any_exec); } } +void +Executor::cancel() +{ + spinning.store(false); + rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); + if (status != RMW_RET_OK) { + throw std::runtime_error(rmw_get_error_string_safe()); + } +} + void Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy) { @@ -136,7 +155,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr void Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec) { - if (!any_exec) { + if (!any_exec || !spinning.load()) { return; } if (any_exec->timer) { @@ -503,6 +522,9 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout) if (!any_exec) { // Wait for subscriptions or timers to work on wait_for_work(timeout); + if (!spinning.load()) { + return nullptr; + } // Try again any_exec = get_next_ready_executable(); } @@ -517,6 +539,8 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout) // It should not have been taken otherwise assert(any_exec->callback_group->can_be_taken_from().load()); // Set to false to indicate something is being run from this group + // This is reset to true either when the any_exec is executed or when the + // any_exec is destructued any_exec->callback_group->can_be_taken_from().store(false); } } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 5684213..63d555f 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -20,6 +20,8 @@ #include "rclcpp/utilities.hpp" +#include "../scope_exit.hpp" + using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor; MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) @@ -36,6 +38,10 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {} void MultiThreadedExecutor::spin() { + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); std::vector threads; { std::lock_guard wait_lock(wait_mutex_); @@ -61,11 +67,11 @@ void MultiThreadedExecutor::run(size_t this_thread_number) { thread_number_by_thread_id_[std::this_thread::get_id()] = this_thread_number; - while (rclcpp::utilities::ok()) { + while (rclcpp::utilities::ok() && spinning.load()) { executor::AnyExecutable::SharedPtr any_exec; { std::lock_guard wait_lock(wait_mutex_); - if (!rclcpp::utilities::ok()) { + if (!rclcpp::utilities::ok() || !spinning.load()) { return; } any_exec = get_next_executable(); diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 39f04bf..204d422 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -14,6 +14,8 @@ #include +#include "../scope_exit.hpp" + using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor; SingleThreadedExecutor::SingleThreadedExecutor( @@ -26,7 +28,11 @@ SingleThreadedExecutor::~SingleThreadedExecutor() {} void SingleThreadedExecutor::spin() { - while (rclcpp::utilities::ok()) { + if (spinning.exchange(true)) { + throw std::runtime_error("spin_some() called while already spinning"); + } + RCLCPP_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::utilities::ok() && spinning.load()) { auto any_exec = get_next_executable(); execute_any_executable(any_exec); } diff --git a/rclcpp/src/rclcpp/scope_exit.hpp b/rclcpp/src/rclcpp/scope_exit.hpp new file mode 100644 index 0000000..334b790 --- /dev/null +++ b/rclcpp/src/rclcpp/scope_exit.hpp @@ -0,0 +1,52 @@ +// Copyright 2015 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Based on: http://the-witness.net/news/2012/11/scopeexit-in-c11/ +// But I changed the lambda to include by reference rather than value, see: +// http://the-witness.net/news/2012/11/scopeexit-in-c11/comment-page-1/#comment-86873 + +#ifndef RCLCPP__SCOPE_EXIT_HPP_ +#define RCLCPP__SCOPE_EXIT_HPP_ + +#include + +#include "rclcpp/macros.hpp" + +namespace rclcpp +{ + +template +struct ScopeExit +{ + explicit ScopeExit(Callable callable) + : callable_(callable) {} + ~ScopeExit() {callable_(); } + +private: + Callable callable_; +}; + +template +ScopeExit +make_scope_exit(Callable callable) +{ + return ScopeExit(callable); +} + +} // namespace rclcpp + +#define RCLCPP_SCOPE_EXIT(code) \ + auto RCLCPP_STRING_JOIN(scope_exit_, __LINE__) = rclcpp::make_scope_exit([&]() {code; }) + +#endif // RCLCPP__SCOPE_EXIT_HPP_