commit
39c663ea64
9 changed files with 121 additions and 9 deletions
|
@ -33,6 +33,9 @@ struct AnyExecutable
|
|||
RCLCPP_PUBLIC
|
||||
AnyExecutable();
|
||||
|
||||
RCLCPP_PUBLIC
|
||||
virtual ~AnyExecutable();
|
||||
|
||||
// Only one of the following pointers will be set.
|
||||
rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
|
||||
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;
|
||||
|
|
|
@ -180,6 +180,12 @@ public:
|
|||
return FutureReturnCode::INTERRUPTED;
|
||||
}
|
||||
|
||||
/// Cancel any running spin* function, causing it to return.
|
||||
/* This function can be called asynchonously from any thread. */
|
||||
RCLCPP_PUBLIC
|
||||
void
|
||||
cancel();
|
||||
|
||||
/// Support dynamic switching of the memory strategy.
|
||||
/**
|
||||
* Switching the memory strategy while the executor is spinning in another threading could have
|
||||
|
@ -253,6 +259,9 @@ protected:
|
|||
AnyExecutable::SharedPtr
|
||||
get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
|
||||
|
||||
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
|
||||
std::atomic_bool spinning;
|
||||
|
||||
/// Guard condition for signaling the rmw layer to wake up for special events.
|
||||
rmw_guard_condition_t * interrupt_guard_condition_;
|
||||
|
||||
|
|
|
@ -82,6 +82,9 @@
|
|||
__RCLCPP_UNIQUE_PTR_ALIAS(__VA_ARGS__) \
|
||||
__RCLCPP_MAKE_UNIQUE_DEFINITION(__VA_ARGS__)
|
||||
|
||||
#define RCLCPP_STRING_JOIN(arg1, arg2) RCLCPP_DO_STRING_JOIN(arg1, arg2)
|
||||
#define RCLCPP_DO_STRING_JOIN(arg1, arg2) arg1 ## arg2
|
||||
|
||||
#define RCLCPP_INFO(Args) std::cout << Args << std::endl;
|
||||
|
||||
#endif // RCLCPP__MACROS_HPP_
|
||||
|
|
|
@ -46,7 +46,6 @@ RCLCPP_PUBLIC
|
|||
void
|
||||
shutdown();
|
||||
|
||||
|
||||
/// Get a handle to the rmw guard condition that manages the signal handler.
|
||||
RCLCPP_PUBLIC
|
||||
rmw_guard_condition_t *
|
||||
|
|
|
@ -25,3 +25,13 @@ AnyExecutable::AnyExecutable()
|
|||
callback_group(nullptr),
|
||||
node(nullptr)
|
||||
{}
|
||||
|
||||
AnyExecutable::~AnyExecutable()
|
||||
{
|
||||
// Make sure that discarded (taken but not executed) AnyExecutable's have
|
||||
// their callback groups reset. This can happen when an executor is canceled
|
||||
// between taking an AnyExecutable and executing it.
|
||||
if (callback_group) {
|
||||
callback_group->can_be_taken_from().store(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,11 +16,13 @@
|
|||
|
||||
#include "rcl_interfaces/msg/intra_process_message.hpp"
|
||||
|
||||
#include "./scope_exit.hpp"
|
||||
|
||||
using rclcpp::executor::AnyExecutable;
|
||||
using rclcpp::executor::Executor;
|
||||
|
||||
Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
|
||||
: interrupt_guard_condition_(rmw_create_guard_condition()),
|
||||
: spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()),
|
||||
memory_strategy_(ms)
|
||||
{
|
||||
}
|
||||
|
@ -108,9 +110,12 @@ Executor::spin_node_some(rclcpp::node::Node::SharedPtr node)
|
|||
void
|
||||
Executor::spin_some()
|
||||
{
|
||||
while (AnyExecutable::SharedPtr any_exec =
|
||||
get_next_executable(std::chrono::milliseconds::zero()))
|
||||
{
|
||||
if (spinning.exchange(true)) {
|
||||
throw std::runtime_error("spin_some() called while already spinning");
|
||||
}
|
||||
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
|
||||
AnyExecutable::SharedPtr any_exec;
|
||||
while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && spinning.load()) {
|
||||
execute_any_executable(any_exec);
|
||||
}
|
||||
}
|
||||
|
@ -118,12 +123,26 @@ Executor::spin_some()
|
|||
void
|
||||
Executor::spin_once(std::chrono::nanoseconds timeout)
|
||||
{
|
||||
if (spinning.exchange(true)) {
|
||||
throw std::runtime_error("spin_once() called while already spinning");
|
||||
}
|
||||
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
|
||||
auto any_exec = get_next_executable(timeout);
|
||||
if (any_exec) {
|
||||
execute_any_executable(any_exec);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Executor::cancel()
|
||||
{
|
||||
spinning.store(false);
|
||||
rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_);
|
||||
if (status != RMW_RET_OK) {
|
||||
throw std::runtime_error(rmw_get_error_string_safe());
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
|
||||
{
|
||||
|
@ -136,7 +155,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr
|
|||
void
|
||||
Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec)
|
||||
{
|
||||
if (!any_exec) {
|
||||
if (!any_exec || !spinning.load()) {
|
||||
return;
|
||||
}
|
||||
if (any_exec->timer) {
|
||||
|
@ -503,6 +522,9 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout)
|
|||
if (!any_exec) {
|
||||
// Wait for subscriptions or timers to work on
|
||||
wait_for_work(timeout);
|
||||
if (!spinning.load()) {
|
||||
return nullptr;
|
||||
}
|
||||
// Try again
|
||||
any_exec = get_next_ready_executable();
|
||||
}
|
||||
|
@ -517,6 +539,8 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout)
|
|||
// It should not have been taken otherwise
|
||||
assert(any_exec->callback_group->can_be_taken_from().load());
|
||||
// Set to false to indicate something is being run from this group
|
||||
// This is reset to true either when the any_exec is executed or when the
|
||||
// any_exec is destructued
|
||||
any_exec->callback_group->can_be_taken_from().store(false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
|
||||
#include "rclcpp/utilities.hpp"
|
||||
|
||||
#include "../scope_exit.hpp"
|
||||
|
||||
using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor;
|
||||
|
||||
MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
|
||||
|
@ -36,6 +38,10 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
|
|||
void
|
||||
MultiThreadedExecutor::spin()
|
||||
{
|
||||
if (spinning.exchange(true)) {
|
||||
throw std::runtime_error("spin() called while already spinning");
|
||||
}
|
||||
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
|
||||
std::vector<std::thread> threads;
|
||||
{
|
||||
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
|
||||
|
@ -61,11 +67,11 @@ void
|
|||
MultiThreadedExecutor::run(size_t this_thread_number)
|
||||
{
|
||||
thread_number_by_thread_id_[std::this_thread::get_id()] = this_thread_number;
|
||||
while (rclcpp::utilities::ok()) {
|
||||
while (rclcpp::utilities::ok() && spinning.load()) {
|
||||
executor::AnyExecutable::SharedPtr any_exec;
|
||||
{
|
||||
std::lock_guard<std::mutex> wait_lock(wait_mutex_);
|
||||
if (!rclcpp::utilities::ok()) {
|
||||
if (!rclcpp::utilities::ok() || !spinning.load()) {
|
||||
return;
|
||||
}
|
||||
any_exec = get_next_executable();
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
#include <rclcpp/executors/single_threaded_executor.hpp>
|
||||
|
||||
#include "../scope_exit.hpp"
|
||||
|
||||
using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor;
|
||||
|
||||
SingleThreadedExecutor::SingleThreadedExecutor(
|
||||
|
@ -26,7 +28,11 @@ SingleThreadedExecutor::~SingleThreadedExecutor() {}
|
|||
void
|
||||
SingleThreadedExecutor::spin()
|
||||
{
|
||||
while (rclcpp::utilities::ok()) {
|
||||
if (spinning.exchange(true)) {
|
||||
throw std::runtime_error("spin_some() called while already spinning");
|
||||
}
|
||||
RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
|
||||
while (rclcpp::utilities::ok() && spinning.load()) {
|
||||
auto any_exec = get_next_executable();
|
||||
execute_any_executable(any_exec);
|
||||
}
|
||||
|
|
52
rclcpp/src/rclcpp/scope_exit.hpp
Normal file
52
rclcpp/src/rclcpp/scope_exit.hpp
Normal file
|
@ -0,0 +1,52 @@
|
|||
// Copyright 2015 Open Source Robotics Foundation, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Based on: http://the-witness.net/news/2012/11/scopeexit-in-c11/
|
||||
// But I changed the lambda to include by reference rather than value, see:
|
||||
// http://the-witness.net/news/2012/11/scopeexit-in-c11/comment-page-1/#comment-86873
|
||||
|
||||
#ifndef RCLCPP__SCOPE_EXIT_HPP_
|
||||
#define RCLCPP__SCOPE_EXIT_HPP_
|
||||
|
||||
#include <functional>
|
||||
|
||||
#include "rclcpp/macros.hpp"
|
||||
|
||||
namespace rclcpp
|
||||
{
|
||||
|
||||
template<typename Callable>
|
||||
struct ScopeExit
|
||||
{
|
||||
explicit ScopeExit(Callable callable)
|
||||
: callable_(callable) {}
|
||||
~ScopeExit() {callable_(); }
|
||||
|
||||
private:
|
||||
Callable callable_;
|
||||
};
|
||||
|
||||
template<typename Callable>
|
||||
ScopeExit<Callable>
|
||||
make_scope_exit(Callable callable)
|
||||
{
|
||||
return ScopeExit<Callable>(callable);
|
||||
}
|
||||
|
||||
} // namespace rclcpp
|
||||
|
||||
#define RCLCPP_SCOPE_EXIT(code) \
|
||||
auto RCLCPP_STRING_JOIN(scope_exit_, __LINE__) = rclcpp::make_scope_exit([&]() {code; })
|
||||
|
||||
#endif // RCLCPP__SCOPE_EXIT_HPP_
|
Loading…
Add table
Add a link
Reference in a new issue