From 4b0ad21b3dd465da4da7907b56aebd724907267e Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Wed, 2 Dec 2015 11:40:21 -0800 Subject: [PATCH] Adjust for new rmw_waitset_t API --- rclcpp/include/rclcpp/executor.hpp | 26 +++++- .../executors/multi_threaded_executor.hpp | 2 +- .../executors/single_threaded_executor.hpp | 2 +- rclcpp/include/rclcpp/node.hpp | 2 + rclcpp/src/rclcpp/executor.cpp | 82 ++++++++++++------- rclcpp/src/rclcpp/executors.cpp | 1 + .../executors/multi_threaded_executor.cpp | 4 +- .../executors/single_threaded_executor.cpp | 6 +- rclcpp/src/rclcpp/node.cpp | 1 + 9 files changed, 87 insertions(+), 39 deletions(-) diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 1e6bd95..e7f4e4e 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -44,6 +44,24 @@ namespace executor */ enum FutureReturnCode {SUCCESS, INTERRUPTED, TIMEOUT}; +/// +/** + * Options to be passed to the executor constructor. + */ +struct ExecutorArgs +{ + memory_strategy::MemoryStrategy::SharedPtr memory_strategy; + size_t max_conditions = 0; +}; + +static inline ExecutorArgs create_default_executor_arguments() +{ + ExecutorArgs args; + args.memory_strategy = memory_strategies::create_default_strategy(); + args.max_conditions = 0; + return args; +} + /// Coordinate the order and timing of available communication tasks. /** * Executor provides spin functions (including spin_node_once and spin_some). @@ -62,8 +80,7 @@ public: /// Default constructor. // \param[in] ms The memory strategy to be used with this executor. RCLCPP_PUBLIC - explicit Executor( - memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy()); + explicit Executor(const ExecutorArgs & args = create_default_executor_arguments()); /// Default destructor. RCLCPP_PUBLIC @@ -262,9 +279,14 @@ protected: /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; + rmw_guard_conditions_t fixed_guard_conditions_; + /// Guard condition for signaling the rmw layer to wake up for special events. rmw_guard_condition_t * interrupt_guard_condition_; + /// Waitset for managing entities that the rmw layer waits on. + rmw_waitset_t * waitset_; + /// The memory strategy: an interface for handling user-defined memory allocation strategies. memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 9eb78a1..6d35496 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -38,7 +38,7 @@ public: RCLCPP_PUBLIC MultiThreadedExecutor( - memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy()); + const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments()); RCLCPP_PUBLIC virtual ~MultiThreadedExecutor(); diff --git a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp index 3302a68..4981d03 100644 --- a/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/single_threaded_executor.hpp @@ -47,7 +47,7 @@ public: /// Default constructor. See the default constructor for Executor. RCLCPP_PUBLIC SingleThreadedExecutor( - memory_strategy::MemoryStrategy::SharedPtr ms = memory_strategies::create_default_strategy()); + const executor::ExecutorArgs & args = rclcpp::executor::create_default_executor_arguments()); /// Default destrcutor. RCLCPP_PUBLIC diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 9a5901d..b947169 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -252,6 +252,8 @@ public: const CallbackGroupWeakPtrList & get_callback_groups() const; + std::atomic_bool has_executor; + private: RCLCPP_DISABLE_COPY(Node); diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 3e77d95..16a1ea8 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -19,15 +19,57 @@ using rclcpp::executor::AnyExecutable; using rclcpp::executor::Executor; +using rclcpp::executor::ExecutorArgs; -Executor::Executor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) -: spinning(false), interrupt_guard_condition_(rmw_create_guard_condition()), - memory_strategy_(ms) +Executor::Executor(const ExecutorArgs & args) +: spinning(false), + memory_strategy_(args.memory_strategy) { + interrupt_guard_condition_ = rmw_create_guard_condition(); + if (!interrupt_guard_condition_) { + throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor"); + } + + // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, + // and one for the executor's guard cond (interrupt_guard_condition_) + // These guard conditions are permanently attached to the waitset. + const size_t number_of_guard_conds = 2; + fixed_guard_conditions_.guard_condition_count = number_of_guard_conds; + fixed_guard_conditions_.guard_conditions = static_cast(guard_cond_handles_.data()); + + // Put the global ctrl-c guard condition in + assert(fixed_guard_conditions_.guard_condition_count > 1); + fixed_guard_conditions_.guard_conditions[0] = \ + rclcpp::utilities::get_global_sigint_guard_condition()->data; + // Put the executor's guard condition in + fixed_guard_conditions_.guard_conditions[1] = interrupt_guard_condition_->data; + + // The waitset adds the fixed guard conditions to the middleware waitset on initialization, + // and removes the guard conditions in rmw_destroy_waitset. + waitset_ = rmw_create_waitset(&fixed_guard_conditions_, args.max_conditions); + + if (!waitset_) { + fprintf(stderr, + "[rclcpp::error] failed to create waitset: %s\n", rmw_get_error_string_safe()); + rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); + if (status != RMW_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe()); + } + throw std::runtime_error("Failed to create waitset in Executor constructor"); + } } Executor::~Executor() { + // Try to deallocate the waitset. + if (waitset_) { + rmw_ret_t status = rmw_destroy_waitset(waitset_); + if (status != RMW_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy waitset: %s\n", rmw_get_error_string_safe()); + } + } // Try to deallocate the interrupt guard condition. if (interrupt_guard_condition_ != nullptr) { rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); @@ -41,6 +83,10 @@ Executor::~Executor() void Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) { + // If the node already has an executor + if (node_ptr->has_executor.exchange(true)) { + throw std::runtime_error("Node has already been added to an executor."); + } // Check to ensure node not already added for (auto & weak_node : weak_nodes_) { auto node = weak_node.lock(); @@ -76,6 +122,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) // *INDENT-ON* ) ); + node_ptr->has_executor.store(false); if (notify) { // If the node was matched and removed, interrupt waiting if (node_removed) { @@ -318,25 +365,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) client_handles.client_count = memory_strategy_->fill_client_handles(client_handles.clients); - // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, - // and one for the executor's guard cond (interrupt_guard_condition_) - size_t number_of_guard_conds = 2; - rmw_guard_conditions_t guard_condition_handles; - guard_condition_handles.guard_condition_count = number_of_guard_conds; - guard_condition_handles.guard_conditions = static_cast(guard_cond_handles_.data()); - if (guard_condition_handles.guard_conditions == NULL && - number_of_guard_conds > 0) - { - // TODO(wjwwood): Use a different error here? maybe std::bad_alloc? - throw std::runtime_error("Could not malloc for guard condition pointers."); - } - // Put the global ctrl-c guard condition in - assert(guard_condition_handles.guard_condition_count > 1); - guard_condition_handles.guard_conditions[0] = \ - rclcpp::utilities::get_global_sigint_guard_condition()->data; - // Put the executor's guard condition in - guard_condition_handles.guard_conditions[1] = \ - interrupt_guard_condition_->data; + // Don't pass guard conditions to rmw_wait; they are permanent fixtures of the waitset rmw_time_t * wait_timeout = NULL; rmw_time_t rmw_timeout; @@ -364,19 +393,14 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) // Now wait on the waitable subscriptions and timers rmw_ret_t status = rmw_wait( &subscriber_handles, - &guard_condition_handles, + nullptr, &service_handles, &client_handles, + waitset_, wait_timeout); if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) { throw std::runtime_error(rmw_get_error_string_safe()); } - // If ctrl-c guard condition, return directly - if (guard_condition_handles.guard_conditions[0] != 0) { - // Make sure to free or clean memory - memory_strategy_->clear_handles(); - return; - } memory_strategy_->remove_null_handles(); } diff --git a/rclcpp/src/rclcpp/executors.cpp b/rclcpp/src/rclcpp/executors.cpp index 49d4197..1dd2f18 100644 --- a/rclcpp/src/rclcpp/executors.cpp +++ b/rclcpp/src/rclcpp/executors.cpp @@ -27,4 +27,5 @@ rclcpp::spin(node::Node::SharedPtr node_ptr) rclcpp::executors::SingleThreadedExecutor exec; exec.add_node(node_ptr); exec.spin(); + exec.remove_node(node_ptr); } diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 1be631a..d488909 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -23,8 +23,8 @@ using rclcpp::executors::multi_threaded_executor::MultiThreadedExecutor; -MultiThreadedExecutor::MultiThreadedExecutor(rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) -: executor::Executor(ms) +MultiThreadedExecutor::MultiThreadedExecutor(const rclcpp::executor::ExecutorArgs & args) +: executor::Executor(args) { number_of_threads_ = std::thread::hardware_concurrency(); if (number_of_threads_ == 0) { diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 24b6f97..744d5f0 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -17,10 +17,8 @@ using rclcpp::executors::single_threaded_executor::SingleThreadedExecutor; -SingleThreadedExecutor::SingleThreadedExecutor( - rclcpp::memory_strategy::MemoryStrategy::SharedPtr ms) -: executor::Executor(ms) {} - +SingleThreadedExecutor::SingleThreadedExecutor(const rclcpp::executor::ExecutorArgs & args) +: executor::Executor(args) {} SingleThreadedExecutor::~SingleThreadedExecutor() {} diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index 5d250f7..738fab7 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -38,6 +38,7 @@ Node::Node( number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0), use_intra_process_comms_(use_intra_process_comms) { + has_executor.store(false); size_t domain_id = 0; char * ros_domain_id = nullptr; const char * env_var = "ROS_DOMAIN_ID";