Implement cancel

This commit is contained in:
Jackie Kay 2015-11-12 16:29:17 -08:00
parent fabea6b4b6
commit 0a478e5233
5 changed files with 34 additions and 6 deletions

View file

@ -180,6 +180,13 @@ public:
return FutureReturnCode::INTERRUPTED; return FutureReturnCode::INTERRUPTED;
} }
/// Stop everything
/**
*/
RCLCPP_PUBLIC
void
cancel();
/// Support dynamic switching of the memory strategy. /// Support dynamic switching of the memory strategy.
/** /**
* Switching the memory strategy while the executor is spinning in another threading could have * Switching the memory strategy while the executor is spinning in another threading could have
@ -253,6 +260,9 @@ protected:
AnyExecutable::SharedPtr AnyExecutable::SharedPtr
get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
/// For cancelling execution mid-spin.
std::atomic_bool canceled;
/// Guard condition for signaling the rmw layer to wake up for special events. /// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_; rmw_guard_condition_t * interrupt_guard_condition_;

View file

@ -46,7 +46,6 @@ RCLCPP_PUBLIC
void void
shutdown(); shutdown();
/// Get a handle to the rmw guard condition that manages the signal handler. /// Get a handle to the rmw guard condition that manages the signal handler.
RCLCPP_PUBLIC RCLCPP_PUBLIC
rmw_guard_condition_t * rmw_guard_condition_t *

View file

@ -21,6 +21,7 @@ using rclcpp::executor::Executor;
Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms)
: interrupt_guard_condition_(rmw_create_guard_condition()), : interrupt_guard_condition_(rmw_create_guard_condition()),
canceled(false),
memory_strategy_(ms) memory_strategy_(ms)
{ {
} }
@ -108,9 +109,9 @@ Executor::spin_node_some(rclcpp::node::Node::SharedPtr node)
void void
Executor::spin_some() Executor::spin_some()
{ {
while (AnyExecutable::SharedPtr any_exec = canceled = false;
get_next_executable(std::chrono::milliseconds::zero())) AnyExecutable::SharedPtr any_exec;
{ while ((any_exec = get_next_executable(std::chrono::milliseconds::zero())) && !canceled) {
execute_any_executable(any_exec); execute_any_executable(any_exec);
} }
} }
@ -124,6 +125,16 @@ Executor::spin_once(std::chrono::nanoseconds timeout)
} }
} }
void
Executor::cancel()
{
canceled = true;
rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
throw std::runtime_error(rmw_get_error_string_safe());
}
}
void void
Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy) Executor::set_memory_strategy(rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
{ {
@ -503,9 +514,15 @@ Executor::get_next_executable(std::chrono::nanoseconds timeout)
if (!any_exec) { if (!any_exec) {
// Wait for subscriptions or timers to work on // Wait for subscriptions or timers to work on
wait_for_work(timeout); wait_for_work(timeout);
if (canceled) {
return nullptr;
}
// Try again // Try again
any_exec = get_next_ready_executable(); any_exec = get_next_ready_executable();
} }
if (canceled) {
return nullptr;
}
// At this point any_exec should be valid with either a valid subscription // At this point any_exec should be valid with either a valid subscription
// or a valid timer, or it should be a null shared_ptr // or a valid timer, or it should be a null shared_ptr
if (any_exec) { if (any_exec) {

View file

@ -36,6 +36,7 @@ MultiThreadedExecutor::~MultiThreadedExecutor() {}
void void
MultiThreadedExecutor::spin() MultiThreadedExecutor::spin()
{ {
canceled = false;
std::vector<std::thread> threads; std::vector<std::thread> threads;
{ {
std::lock_guard<std::mutex> wait_lock(wait_mutex_); std::lock_guard<std::mutex> wait_lock(wait_mutex_);
@ -61,7 +62,7 @@ void
MultiThreadedExecutor::run(size_t this_thread_number) MultiThreadedExecutor::run(size_t this_thread_number)
{ {
thread_number_by_thread_id_[std::this_thread::get_id()] = this_thread_number; thread_number_by_thread_id_[std::this_thread::get_id()] = this_thread_number;
while (rclcpp::utilities::ok()) { while (rclcpp::utilities::ok() && !canceled) {
executor::AnyExecutable::SharedPtr any_exec; executor::AnyExecutable::SharedPtr any_exec;
{ {
std::lock_guard<std::mutex> wait_lock(wait_mutex_); std::lock_guard<std::mutex> wait_lock(wait_mutex_);

View file

@ -26,7 +26,8 @@ SingleThreadedExecutor::~SingleThreadedExecutor() {}
void void
SingleThreadedExecutor::spin() SingleThreadedExecutor::spin()
{ {
while (rclcpp::utilities::ok()) { canceled = false;
while (rclcpp::utilities::ok() && !canceled) {
auto any_exec = get_next_executable(); auto any_exec = get_next_executable();
execute_any_executable(any_exec); execute_any_executable(any_exec);
} }