refactor executor::cancel to use a spinning state

This commit is contained in:
William Woodall 2015-11-12 17:20:05 -08:00
parent 0a478e5233
commit 5bd71c1f80
6 changed files with 43 additions and 20 deletions

View file

@ -33,6 +33,9 @@ struct AnyExecutable
RCLCPP_PUBLIC RCLCPP_PUBLIC
AnyExecutable(); AnyExecutable();
RCLCPP_PUBLIC
virtual ~AnyExecutable();
// Only one of the following pointers will be set. // Only one of the following pointers will be set.
rclcpp::subscription::SubscriptionBase::SharedPtr subscription; rclcpp::subscription::SubscriptionBase::SharedPtr subscription;
rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process; rclcpp::subscription::SubscriptionBase::SharedPtr subscription_intra_process;

View file

@ -180,9 +180,8 @@ public:
return FutureReturnCode::INTERRUPTED; return FutureReturnCode::INTERRUPTED;
} }
/// Stop everything /// Cancels any running spin* function, causing it to return.
/** /* This function can be called asynchonously from any thread. */
*/
RCLCPP_PUBLIC RCLCPP_PUBLIC
void void
cancel(); cancel();
@ -260,8 +259,8 @@ protected:
AnyExecutable::SharedPtr AnyExecutable::SharedPtr
get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
/// For cancelling execution mid-spin. /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool canceled; std::atomic_bool spinning;
/// Guard condition for signaling the rmw layer to wake up for special events. /// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_; rmw_guard_condition_t * interrupt_guard_condition_;

View file

@ -25,3 +25,13 @@ AnyExecutable::AnyExecutable()
callback_group(nullptr), callback_group(nullptr),
node(nullptr) node(nullptr)
{} {}
AnyExecutable::~AnyExecutable()
{
// Make sure that discarded (taken but not executed) AnyExecutable's have
// their callback groups reset. This can happen when an executor is canceled
// between taking an AnyExecutable and executing it.
if (callback_group) {
callback_group->can_be_taken_from().store(true);
}
}

View file

@ -20,8 +20,7 @@ using rclcpp::executor::AnyExecutable;
using rclcpp::executor::Executor; using rclcpp::executor::Executor;
Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
: interrupt_guard_condition_(rmw_create_guard_condition()), : spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()),
canceled(false),
memory_strategy_(ms) memory_strategy_(ms)
{ {
} }
@ -109,26 +108,33 @@ Executor::spin_node_some(rclcpp::node::Node::SharedPtr node)
void void
Executor::spin_some() Executor::spin_some()
{ {
canceled = false; if (spinning.exchange(true)) {
throw std::runtime_error("spin_some() called while already spinning");
}
AnyExecutable::SharedPtr any_exec; AnyExecutable::SharedPtr any_exec;
while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && !canceled) { while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && spinning.load()) {
execute_any_executable(any_exec); execute_any_executable(any_exec);
} }
spinning.store(false);
} }
void void
Executor::spin_once(std::chrono::nanoseconds timeout) Executor::spin_once(std::chrono::nanoseconds timeout)
{ {
if (spinning.exchange(true)) {
throw std::runtime_error("spin_once() called while already spinning");
}
auto any_exec = get_next_executable(timeout); auto any_exec = get_next_executable(timeout);
if (any_exec) { if (any_exec) {
execute_any_executable(any_exec); execute_any_executable(any_exec);
} }
spinning.store(false);
} }
void void
Executor::cancel() Executor::cancel()
{ {
canceled = true; spinning.store(false);
rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) { if (status != RMW_RET_OK) {
throw std::runtime_error(rmw_get_error_string_safe()); throw std::runtime_error(rmw_get_error_string_safe());
@ -147,7 +153,7 @@ Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr
void void
Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec) Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec)
{ {
if (!any_exec) { if (!any_exec || !spinning.load()) {
return; return;
} }
if (any_exec->timer) { if (any_exec->timer) {
@ -514,15 +520,12 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout)
if (!any_exec) { if (!any_exec) {
// Wait for subscriptions or timers to work on // Wait for subscriptions or timers to work on
wait_for_work(timeout); wait_for_work(timeout);
if (canceled) { if (!spinning.load()) {
return nullptr; return nullptr;
} }
// Try again // Try again
any_exec = get_next_ready_executable(); any_exec = get_next_ready_executable();
} }
if (canceled) {
return nullptr;
}
// At this point any_exec should be valid with either a valid subscription // At this point any_exec should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr // or a valid timer, or it should be a null shared_ptr
if (any_exec) { if (any_exec) {
@ -534,6 +537,8 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout)
// It should not have been taken otherwise // It should not have been taken otherwise
assert(any_exec->callback_group->can_be_taken_from().load()); assert(any_exec->callback_group->can_be_taken_from().load());
// Set to false to indicate something is being run from this group // Set to false to indicate something is being run from this group
// This is reset to true either when the any_exec is executed or when the
// any_exec is destructued
any_exec->callback_group->can_be_taken_from().store(false); any_exec->callback_group->can_be_taken_from().store(false);
} }
} }

View file

@ -36,7 +36,9 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
void void
MultiThreadedExecutor::spin() MultiThreadedExecutor::spin()
{ {
canceled = false; if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
std::vector<std::thread> threads; std::vector<std::thread> threads;
{ {
std::lock_guard<std::mutex> wait_lock(wait_mutex_); std::lock_guard<std::mutex> wait_lock(wait_mutex_);
@ -50,6 +52,7 @@ MultiThreadedExecutor::spin()
for (auto & thread : threads) { for (auto & thread : threads) {
thread.join(); thread.join();
} }
spinning.store(false);
} }
size_t size_t
@ -62,11 +65,11 @@ void
MultiThreadedExecutor::run(size_t this_thread_number) MultiThreadedExecutor::run(size_t this_thread_number)
{ {
thread_number_by_thread_id_[std::this_thread::get_id()] = this_thread_number; thread_number_by_thread_id_[std::this_thread::get_id()] = this_thread_number;
while (rclcpp::utilities::ok() && !canceled) { while (rclcpp::utilities::ok() && spinning.load()) {
executor::AnyExecutable::SharedPtr any_exec; executor::AnyExecutable::SharedPtr any_exec;
{ {
std::lock_guard<std::mutex> wait_lock(wait_mutex_); std::lock_guard<std::mutex> wait_lock(wait_mutex_);
if (!rclcpp::utilities::ok()) { if (!rclcpp::utilities::ok() || !spinning.load()) {
return; return;
} }
any_exec = get_next_executable(); any_exec = get_next_executable();

View file

@ -26,9 +26,12 @@ SingleThreadedExecutor::~SingleThreadedExecutor() {}
void void
SingleThreadedExecutor::spin() SingleThreadedExecutor::spin()
{ {
canceled = false; if (spinning.exchange(true)) {
while (rclcpp::utilities::ok() && !canceled) { throw std::runtime_error("spin_some() called while already spinning");
}
while (rclcpp::utilities::ok() && spinning.load()) {
auto any_exec = get_next_executable(); auto any_exec = get_next_executable();
execute_any_executable(any_exec); execute_any_executable(any_exec);
} }
spinning.store(false);
} }