diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index ae2dbdb..de38dde 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -306,8 +306,6 @@ protected: /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; - rmw_guard_conditions_t fixed_guard_conditions_; - /// Guard condition for signaling the rmw layer to wake up for special events. rmw_guard_condition_t * interrupt_guard_condition_; @@ -321,7 +319,6 @@ private: RCLCPP_DISABLE_COPY(Executor); std::vector> weak_nodes_; - std::array guard_cond_handles_; }; } // namespace executor diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index e363914..9386a80 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -49,6 +49,9 @@ public: // return the new number of clients virtual size_t fill_client_handles(void ** & ptr) = 0; + // return the new number of guard_conditions + virtual size_t fill_guard_condition_handles(void ** & ptr) = 0; + virtual void clear_active_entities() = 0; virtual void clear_handles() = 0; @@ -59,6 +62,10 @@ public: // \return Shared pointer to the fresh executable. virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0; + virtual void add_guard_condition(const rmw_guard_condition_t * guard_condition) = 0; + + virtual void remove_guard_condition(const rmw_guard_condition_t * guard_condition) = 0; + virtual void get_next_subscription(rclcpp::executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) = 0; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index b947169..3f9eaf0 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -77,6 +77,9 @@ public: const std::string & node_name, rclcpp::context::Context::SharedPtr context, bool use_intra_process_comms = false); + RCLCPP_PUBLIC + ~Node(); + /// Get the name of the node. // \return The name of the node. RCLCPP_PUBLIC @@ -252,6 +255,9 @@ public: const CallbackGroupWeakPtrList & get_callback_groups() const; + RCLCPP_PUBLIC + rmw_guard_condition_t * get_notify_guard_condition() const; + std::atomic_bool has_executor; private: @@ -279,6 +285,9 @@ private: mutable std::mutex mutex_; + /// Guard condition for notifying the Executor of changes to this node. + rmw_guard_condition_t * notify_guard_condition_; + std::map parameters_; publisher::Publisher::SharedPtr events_publisher_; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index d4e0892..7148571 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -134,6 +134,11 @@ Node::create_publisher( shared_publish_callback, intra_process_publisher_handle); } + if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + throw std::runtime_error( + std::string( + "Failed to notify waitset on publisher creation: ") + rmw_get_error_string()); + } return publisher; } @@ -243,6 +248,11 @@ Node::create_subscription( default_callback_group_->add_subscription(sub_base_ptr); } number_of_subscriptions_++; + if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + throw std::runtime_error( + std::string( + "Failed to notify waitset on subscription creation: ") + rmw_get_error_string()); + } return sub; } @@ -289,6 +299,11 @@ Node::create_wall_timer( default_callback_group_->add_timer(timer); } number_of_timers_++; + if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + throw std::runtime_error( + std::string( + "Failed to notify waitset on timer creation: ") + rmw_get_error_string()); + } return timer; } @@ -333,6 +348,11 @@ Node::create_client( } number_of_clients_++; + if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + throw std::runtime_error( + std::string( + "Failed to notify waitset on client creation: ") + rmw_get_error_string()); + } return cli; } @@ -374,6 +394,11 @@ Node::create_service( default_callback_group_->add_service(serv_base_ptr); } number_of_services_++; + if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + throw std::runtime_error( + std::string( + "Failed to notify waitset on service creation: ") + rmw_get_error_string()); + } return serv; } diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index d4d4975..c0a0a4b 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -23,6 +23,8 @@ #include "rclcpp/node.hpp" #include "rclcpp/visibility_control.hpp" +#include "rmw/types.h" + namespace rclcpp { namespace memory_strategies @@ -62,6 +64,26 @@ public: allocator_ = std::make_shared(); } + void add_guard_condition(const rmw_guard_condition_t * guard_condition) + { + for (const auto & existing_guard_condition : guard_conditions_) { + if (existing_guard_condition == guard_condition) { + return; + } + } + guard_conditions_.push_back(guard_condition); + } + + void remove_guard_condition(const rmw_guard_condition_t * guard_condition) + { + for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) { + if (*it == guard_condition) { + guard_conditions_.erase(it); + break; + } + } + } + size_t fill_subscriber_handles(void ** & ptr) { for (auto & subscription : subscriptions_) { @@ -94,6 +116,17 @@ public: return client_handles_.size(); } + size_t fill_guard_condition_handles(void ** & ptr) + { + for (const auto & guard_condition : guard_conditions_) { + if (guard_condition) { + guard_condition_handles_.push_back(guard_condition->data); + } + } + ptr = guard_condition_handles_.data(); + return guard_condition_handles_.size(); + } + void clear_active_entities() { subscriptions_.clear(); @@ -106,6 +139,7 @@ public: subscriber_handles_.clear(); service_handles_.clear(); client_handles_.clear(); + guard_condition_handles_.clear(); } void remove_null_handles() @@ -124,6 +158,11 @@ public: std::remove(client_handles_.begin(), client_handles_.end(), nullptr), client_handles_.end() ); + + guard_condition_handles_.erase( + std::remove(guard_condition_handles_.begin(), guard_condition_handles_.end(), nullptr), + guard_condition_handles_.end() + ); } bool collect_entities(const WeakNodeVector & weak_nodes) @@ -288,10 +327,12 @@ private: VectorRebind subscriptions_; VectorRebind services_; VectorRebind clients_; + VectorRebind guard_conditions_; VectorRebind subscriber_handles_; VectorRebind service_handles_; VectorRebind client_handles_; + VectorRebind guard_condition_handles_; std::shared_ptr executable_allocator_; std::shared_ptr allocator_; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index e734f48..041bee7 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -34,23 +35,16 @@ Executor::Executor(const ExecutorArgs & args) throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor"); } - // The number of guard conditions is fixed at 2: 1 for the ctrl-c guard cond, + // The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond, // and one for the executor's guard cond (interrupt_guard_condition_) - // These guard conditions are permanently attached to the waitset. - const size_t number_of_guard_conds = 2; - fixed_guard_conditions_.guard_condition_count = number_of_guard_conds; - fixed_guard_conditions_.guard_conditions = static_cast(guard_cond_handles_.data()); + // The size of guard conditions is variable because the number of nodes can vary // Put the global ctrl-c guard condition in - assert(fixed_guard_conditions_.guard_condition_count > 1); - fixed_guard_conditions_.guard_conditions[0] = \ - rclcpp::utilities::get_global_sigint_guard_condition()->data; + memory_strategy_->add_guard_condition(rclcpp::utilities::get_global_sigint_guard_condition()); // Put the executor's guard condition in - fixed_guard_conditions_.guard_conditions[1] = interrupt_guard_condition_->data; + memory_strategy_->add_guard_condition(interrupt_guard_condition_); - // The waitset adds the fixed guard conditions to the middleware waitset on initialization, - // and removes the guard conditions in rmw_destroy_waitset. - waitset_ = rmw_create_waitset(&fixed_guard_conditions_, args.max_conditions); + waitset_ = rmw_create_waitset(args.max_conditions); if (!waitset_) { fprintf(stderr, @@ -107,6 +101,8 @@ Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) throw std::runtime_error(rmw_get_error_string_safe()); } } + // Add the node's notify condition to the guard condition handles + memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition()); } void @@ -136,6 +132,7 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) } } } + memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition()); } void @@ -356,6 +353,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) ); } + memory_strategy_->clear_handles(); // Use the number of subscriptions to allocate memory in the handles rmw_subscriptions_t subscriber_handles; subscriber_handles.subscriber_count = @@ -369,7 +367,10 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) client_handles.client_count = memory_strategy_->fill_client_handles(client_handles.clients); - // Don't pass guard conditions to rmw_wait; they are permanent fixtures of the waitset + // construct a guard conditions struct + rmw_guard_conditions_t guard_conditions; + guard_conditions.guard_condition_count = + memory_strategy_->fill_guard_condition_handles(guard_conditions.guard_conditions); rmw_time_t * wait_timeout = NULL; rmw_time_t rmw_timeout; @@ -397,7 +398,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) // Now wait on the waitable subscriptions and timers rmw_ret_t status = rmw_wait( &subscriber_handles, - nullptr, + &guard_conditions, &service_handles, &client_handles, waitset_, diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index 738fab7..df4ab3d 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -36,8 +36,13 @@ Node::Node( bool use_intra_process_comms) : name_(node_name), context_(context), number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0), - use_intra_process_comms_(use_intra_process_comms) + use_intra_process_comms_(use_intra_process_comms), + notify_guard_condition_(rmw_create_guard_condition()) { + if (!notify_guard_condition_) { + throw std::runtime_error("Failed to create guard condition for node: " + + std::string(rmw_get_error_string_safe())); + } has_executor.store(false); size_t domain_id = 0; char * ros_domain_id = nullptr; @@ -51,6 +56,10 @@ Node::Node( if (ros_domain_id) { uint32_t number = strtoul(ros_domain_id, NULL, 0); if (number == (std::numeric_limits::max)()) { + if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + fprintf( + stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe()); + } throw std::runtime_error("failed to interpret ROS_DOMAIN_ID as integral number"); } domain_id = static_cast(number); @@ -62,6 +71,10 @@ Node::Node( auto node = rmw_create_node(name_.c_str(), domain_id); if (!node) { // *INDENT-OFF* + if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + fprintf( + stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe()); + } throw std::runtime_error( std::string("could not create node: ") + rmw_get_error_string_safe()); @@ -85,6 +98,14 @@ Node::Node( "parameter_events", rmw_qos_profile_parameter_events); } +Node::~Node() +{ + if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + fprintf(stderr, "Warning! Failed to destroy guard condition in Node destructor: %s\n", + rmw_get_error_string_safe()); + } +} + const std::string & Node::get_name() const { @@ -334,3 +355,8 @@ Node::get_callback_groups() const { return callback_groups_; } + +rmw_guard_condition_t * Node::get_notify_guard_condition() const +{ + return notify_guard_condition_; +}