Merge pull request #166 from ros2/waitset_handle

Store handle of rmw_waitset_t in Executor
This commit is contained in:
Jackie Kay 2016-01-12 17:46:35 -08:00
commit 9d754a70a2
9 changed files with 87 additions and 39 deletions

View file

@ -44,6 +44,24 @@ namespace executor
*/ */
enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT}; enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT};
///
/**
* Options to be passed to the executor constructor.
*/
struct ExecutorArgs
{
memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
size_t max_conditions = 0;
};
static inline ExecutorArgs create_default_executor_arguments()
{
ExecutorArgs args;
args.memory_strategy = memory_strategies::create_default_strategy();
args.max_conditions = 0;
return args;
}
/// Coordinate the order and timing of available communication tasks. /// Coordinate the order and timing of available communication tasks.
/** /**
* Executor provides spin functions (including spin_node_once and spin_some). * Executor provides spin functions (including spin_node_once and spin_some).
@ -62,8 +80,7 @@ public:
/// Default constructor. /// Default constructor.
// \param[in] ms The memory strategy to be used with this executor. // \param[in] ms The memory strategy to be used with this executor.
RCLCPP_PUBLIC RCLCPP_PUBLIC
explicit Executor( explicit Executor(const ExecutorArgs & args = create_default_executor_arguments());
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy());
/// Default destructor. /// Default destructor.
RCLCPP_PUBLIC RCLCPP_PUBLIC
@ -262,9 +279,14 @@ protected:
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning; std::atomic_bool spinning;
rmw_guard_conditions_t fixed_guard_conditions_;
/// Guard condition for signaling the rmw layer to wake up for special events. /// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_; rmw_guard_condition_t * interrupt_guard_condition_;
/// Waitset for managing entities that the rmw layer waits on.
rmw_waitset_t * waitset_;
/// The memory strategy: an interface for handling user-defined memory allocation strategies. /// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;

View file

@ -38,7 +38,7 @@ public:
RCLCPP_PUBLIC RCLCPP_PUBLIC
MultiThreadedExecutor( MultiThreadedExecutor(
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy()); const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
RCLCPP_PUBLIC RCLCPP_PUBLIC
virtual ~MultiThreadedExecutor(); virtual ~MultiThreadedExecutor();

View file

@ -47,7 +47,7 @@ public:
/// Default constructor. See the default constructor for Executor. /// Default constructor. See the default constructor for Executor.
RCLCPP_PUBLIC RCLCPP_PUBLIC
SingleThreadedExecutor( SingleThreadedExecutor(
memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy()); const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments());
/// Default destrcutor. /// Default destrcutor.
RCLCPP_PUBLIC RCLCPP_PUBLIC

View file

@ -252,6 +252,8 @@ public:
const CallbackGroupWeakPtrList & const CallbackGroupWeakPtrList &
get_callback_groups() const; get_callback_groups() const;
std::atomic_bool has_executor;
private: private:
RCLCPP_DISABLE_COPY(Node); RCLCPP_DISABLE_COPY(Node);

View file

@ -19,15 +19,57 @@
using rclcpp::executor::AnyExecutable; using rclcpp::executor::AnyExecutable;
using rclcpp::executor::Executor; using rclcpp::executor::Executor;
using rclcpp::executor::ExecutorArgs;
Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) Executor::Executor(const ExecutorArgs & args)
: spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()), : spinning(false),
memory_strategy_(ms) memory_strategy_(args.memory_strategy)
{ {
interrupt_guard_condition_ = rmw_create_guard_condition();
if (!interrupt_guard_condition_) {
throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor");
}
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
// These guard conditions are permanently attached to the waitset.
const size_t number_of_guard_conds = 2;
fixed_guard_conditions_.guard_condition_count = number_of_guard_conds;
fixed_guard_conditions_.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
// Put the global ctrl-c guard condition in
assert(fixed_guard_conditions_.guard_condition_count > 1);
fixed_guard_conditions_.guard_conditions[0] = \
rclcpp::utilities::get_global_sigint_guard_condition()->data;
// Put the executor's guard condition in
fixed_guard_conditions_.guard_conditions[1] = interrupt_guard_condition_->data;
// The waitset adds the fixed guard conditions to the middleware waitset on initialization,
// and removes the guard conditions in rmw_destroy_waitset.
waitset_ = rmw_create_waitset(&fixed_guard_conditions_, args.max_conditions);
if (!waitset_) {
fprintf(stderr,
"[rclcpp::error] failed to create waitset: %s\n", rmw_get_error_string_safe());
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
}
throw std::runtime_error("Failed to create waitset in Executor constructor");
}
} }
Executor::~Executor() Executor::~Executor()
{ {
// Try to deallocate the waitset.
if (waitset_) {
rmw_ret_t status = rmw_destroy_waitset(waitset_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy waitset: %s\n", rmw_get_error_string_safe());
}
}
// Try to deallocate the interrupt guard condition. // Try to deallocate the interrupt guard condition.
if (interrupt_guard_condition_ != nullptr) { if (interrupt_guard_condition_ != nullptr) {
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
@ -41,6 +83,10 @@ Executor::~Executor()
void void
Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
{ {
// If the node already has an executor
if (node_ptr->has_executor.exchange(true)) {
throw std::runtime_error("Node has already been added to an executor.");
}
// Check to ensure node not already added // Check to ensure node not already added
for (auto & weak_node : weak_nodes_) { for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock(); auto node = weak_node.lock();
@ -76,6 +122,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
// *INDENT-ON* // *INDENT-ON*
) )
); );
node_ptr->has_executor.store(false);
if (notify) { if (notify) {
// If the node was matched and removed, interrupt waiting // If the node was matched and removed, interrupt waiting
if (node_removed) { if (node_removed) {
@ -318,25 +365,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
client_handles.client_count = client_handles.client_count =
memory_strategy_->fill_client_handles(client_handles.clients); memory_strategy_->fill_client_handles(client_handles.clients);
// The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, // Don't pass guard conditions to rmw_wait; they are permanent fixtures of the waitset
// and one for the executor's guard cond (interrupt_guard_condition_)
size_t number_of_guard_conds = 2;
rmw_guard_conditions_t guard_condition_handles;
guard_condition_handles.guard_condition_count = number_of_guard_conds;
guard_condition_handles.guard_conditions = static_cast<void **>(guard_cond_handles_.data());
if (guard_condition_handles.guard_conditions == NULL &&
number_of_guard_conds > 0)
{
// TODO(wjwwood): Use a different error here? maybe std::bad_alloc?
throw std::runtime_error("Could not malloc for guard condition pointers.");
}
// Put the global ctrl-c guard condition in
assert(guard_condition_handles.guard_condition_count > 1);
guard_condition_handles.guard_conditions[0] = \
rclcpp::utilities::get_global_sigint_guard_condition()->data;
// Put the executor's guard condition in
guard_condition_handles.guard_conditions[1] = \
interrupt_guard_condition_->data;
rmw_time_t * wait_timeout = NULL; rmw_time_t * wait_timeout = NULL;
rmw_time_t rmw_timeout; rmw_time_t rmw_timeout;
@ -364,19 +393,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
// Now wait on the waitable subscriptions and timers // Now wait on the waitable subscriptions and timers
rmw_ret_t status = rmw_wait( rmw_ret_t status = rmw_wait(
&subscriber_handles, &subscriber_handles,
&guard_condition_handles, nullptr,
&service_handles, &service_handles,
&client_handles, &client_handles,
waitset_,
wait_timeout); wait_timeout);
if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) { if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) {
throw std::runtime_error(rmw_get_error_string_safe()); throw std::runtime_error(rmw_get_error_string_safe());
} }
// If ctrl-c guard condition, return directly
if (guard_condition_handles.guard_conditions[0] != 0) {
// Make sure to free or clean memory
memory_strategy_->clear_handles();
return;
}
memory_strategy_->remove_null_handles(); memory_strategy_->remove_null_handles();
} }

View file

@ -27,4 +27,5 @@ rclcpp::spin(node::Node::SharedPtr node_ptr)
rclcpp::executors::SingleThreadedExecutor exec; rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(node_ptr); exec.add_node(node_ptr);
exec.spin(); exec.spin();
exec.remove_node(node_ptr);
} }

View file

@ -23,8 +23,8 @@
using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor; using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor;
MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) MultiThreadedExecutor::MultiThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
: executor::Executor(ms) : executor::Executor(args)
{ {
number_of_threads_ = std::thread::hardware_concurrency(); number_of_threads_ = std::thread::hardware_concurrency();
if (number_of_threads_ == 0) { if (number_of_threads_ == 0) {

View file

@ -17,10 +17,8 @@
using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor; using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor;
SingleThreadedExecutor::SingleThreadedExecutor( SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::executor::ExecutorArgs & args)
rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) : executor::Executor(args) {}
: executor::Executor(ms) {}
SingleThreadedExecutor::~SingleThreadedExecutor() {} SingleThreadedExecutor::~SingleThreadedExecutor() {}

View file

@ -38,6 +38,7 @@ Node::Node(
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0), number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
use_intra_process_comms_(use_intra_process_comms) use_intra_process_comms_(use_intra_process_comms)
{ {
has_executor.store(false);
size_t domain_id = 0; size_t domain_id = 0;
char * ros_domain_id = nullptr; char * ros_domain_id = nullptr;
const char * env_var = "ROS_DOMAIN_ID"; const char * env_var = "ROS_DOMAIN_ID";