From 5e2a76cc202315d81d96e28c01ba6ddbd3fd6b62 Mon Sep 17 00:00:00 2001 From: William Woodall Date: Wed, 22 Jun 2016 20:18:46 -0700 Subject: [PATCH] add wait_for_service() and service_is_ready() to Client (#222) * add wait_for_service() and service_is_ready() to Client * fix compile on Linux (maybe Windows) * use visibility macros for Windows * prevent unreasonable uncrustify change * fixup comment * add GraphListener::is_shutdown() * disable copy on GraphListener * use weak_ptr in client, throw if invalid * ensure blocking wait_for_service wakes on rclcpp::shutdown/sigint * rethrow exceptions after reporting them in thread * lock ~Node() against notify_graph_change() this essentially protects the notify_guard_condition_ * adjust thread sync strategy * style * moving initialization of wait set around, fix double free * only fini wait set if started * use rclcpp::shutdown to ensure graph listener resources clean up before static destruction * uncrustify --- rclcpp/CMakeLists.txt | 3 + rclcpp/include/rclcpp/client.hpp | 58 +++- rclcpp/include/rclcpp/event.hpp | 58 ++++ rclcpp/include/rclcpp/exceptions.hpp | 101 ++++++ rclcpp/include/rclcpp/graph_listener.hpp | 174 +++++++++++ rclcpp/include/rclcpp/node.hpp | 114 ++++++- rclcpp/include/rclcpp/node_impl.hpp | 2 +- rclcpp/include/rclcpp/parameter_client.hpp | 3 +- rclcpp/include/rclcpp/utilities.hpp | 7 + rclcpp/src/rclcpp/client.cpp | 81 ++++- rclcpp/src/rclcpp/event.cpp | 44 +++ rclcpp/src/rclcpp/exceptions.cpp | 94 ++++++ rclcpp/src/rclcpp/graph_listener.cpp | 342 +++++++++++++++++++++ rclcpp/src/rclcpp/node.cpp | 170 ++++++++-- rclcpp/src/rclcpp/utilities.cpp | 17 + 15 files changed, 1227 insertions(+), 41 deletions(-) create mode 100644 rclcpp/include/rclcpp/event.hpp create mode 100644 rclcpp/include/rclcpp/exceptions.hpp create mode 100644 rclcpp/include/rclcpp/graph_listener.hpp create mode 100644 rclcpp/src/rclcpp/event.cpp create mode 100644 rclcpp/src/rclcpp/exceptions.cpp create mode 100644 rclcpp/src/rclcpp/graph_listener.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 57ddc4a..6cf748a 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -22,10 +22,13 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/client.cpp src/rclcpp/context.cpp src/rclcpp/contexts/default_context.cpp + src/rclcpp/event.cpp + src/rclcpp/exceptions.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp + src/rclcpp/graph_listener.cpp src/rclcpp/intra_process_manager.cpp src/rclcpp/intra_process_manager_impl.cpp src/rclcpp/memory_strategies.cpp diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index 4f11327..b170310 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -25,6 +25,7 @@ #include "rcl/client.h" #include "rcl/error_handling.h" +#include "rcl/wait.h" #include "rclcpp/function_traits.hpp" #include "rclcpp/macros.hpp" @@ -37,6 +38,12 @@ namespace rclcpp { + +namespace node +{ +class Node; +} // namespace node + namespace client { @@ -47,7 +54,7 @@ public: RCLCPP_PUBLIC ClientBase( - std::shared_ptr node_handle, + std::shared_ptr parent_node, const std::string & service_name); RCLCPP_PUBLIC @@ -61,6 +68,20 @@ public: const rcl_client_t * get_client_handle() const; + RCLCPP_PUBLIC + bool + service_is_ready() const; + + template + bool + wait_for_service( + std::chrono::duration timeout = std::chrono::duration(-1)) + { + return wait_for_service_nanoseconds( + std::chrono::duration_cast(timeout) + ); + } + virtual std::shared_ptr create_response() = 0; virtual std::shared_ptr create_request_header() = 0; virtual void handle_response( @@ -69,6 +90,15 @@ public: protected: RCLCPP_DISABLE_COPY(ClientBase); + RCLCPP_PUBLIC + bool + wait_for_service_nanoseconds(std::chrono::nanoseconds timeout); + + RCLCPP_PUBLIC + rcl_node_t * + get_rcl_node_handle() const; + + std::weak_ptr node_; std::shared_ptr node_handle_; rcl_client_t client_handle_ = rcl_get_zero_initialized_client(); @@ -97,15 +127,15 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(Client); Client( - std::shared_ptr node_handle, + std::shared_ptr parent_node, const std::string & service_name, rcl_client_options_t & client_options) - : ClientBase(node_handle, service_name) + : ClientBase(parent_node, service_name) { using rosidl_generator_cpp::get_service_type_support_handle; auto service_type_support_handle = get_service_type_support_handle(); - if (rcl_client_init(&client_handle_, this->node_handle_.get(), + if (rcl_client_init(&client_handle_, this->get_rcl_node_handle(), service_type_support_handle, service_name.c_str(), &client_options) != RCL_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here) @@ -118,25 +148,28 @@ public: virtual ~Client() { - if (rcl_client_fini(&client_handle_, node_handle_.get()) != RCL_RET_OK) { + if (rcl_client_fini(&client_handle_, this->get_rcl_node_handle()) != RCL_RET_OK) { fprintf(stderr, "Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe()); } } - std::shared_ptr create_response() + std::shared_ptr + create_response() { return std::shared_ptr(new typename ServiceT::Response()); } - std::shared_ptr create_request_header() + std::shared_ptr + create_request_header() { // TODO(wjwwood): This should probably use rmw_request_id's allocator. // (since it is a C type) return std::shared_ptr(new rmw_request_id_t); } - void handle_response(std::shared_ptr request_header, + void + handle_response(std::shared_ptr request_header, std::shared_ptr response) { std::lock_guard lock(pending_requests_mutex_); @@ -156,7 +189,8 @@ public: callback(future); } - SharedFuture async_send_request(SharedRequest request) + SharedFuture + async_send_request(SharedRequest request) { return async_send_request(request, [](SharedFuture) {}); } @@ -170,7 +204,8 @@ public: >::value >::type * = nullptr > - SharedFuture async_send_request(SharedRequest request, CallbackT && cb) + SharedFuture + async_send_request(SharedRequest request, CallbackT && cb) { std::lock_guard lock(pending_requests_mutex_); int64_t sequence_number; @@ -197,7 +232,8 @@ public: >::value >::type * = nullptr > - SharedFutureWithRequest async_send_request(SharedRequest request, CallbackT && cb) + SharedFutureWithRequest + async_send_request(SharedRequest request, CallbackT && cb) { SharedPromiseWithRequest promise = std::make_shared(); SharedFutureWithRequest future_with_request(promise->get_future()); diff --git a/rclcpp/include/rclcpp/event.hpp b/rclcpp/include/rclcpp/event.hpp new file mode 100644 index 0000000..0de55ef --- /dev/null +++ b/rclcpp/include/rclcpp/event.hpp @@ -0,0 +1,58 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EVENT_HPP_ +#define RCLCPP__EVENT_HPP_ + +#include +#include + +#include "rclcpp/macros.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace event +{ + +class Event +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Event); + + RCLCPP_PUBLIC + Event(); + + RCLCPP_PUBLIC + bool + set(); + + RCLCPP_PUBLIC + bool + check(); + + RCLCPP_PUBLIC + bool + check_and_clear(); + +private: + RCLCPP_DISABLE_COPY(Event); + + std::atomic_bool state_; +}; + +} // namespace event +} // namespace rclcpp + +#endif // RCLCPP__EVENT_HPP_ diff --git a/rclcpp/include/rclcpp/exceptions.hpp b/rclcpp/include/rclcpp/exceptions.hpp new file mode 100644 index 0000000..4700029 --- /dev/null +++ b/rclcpp/include/rclcpp/exceptions.hpp @@ -0,0 +1,101 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXCEPTIONS_HPP_ +#define RCLCPP__EXCEPTIONS_HPP_ + +#include +#include + +#include "rcl/error_handling.h" +#include "rcl/types.h" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace exceptions +{ + +/// Thrown when a method is trying to use a node, but it is invalid. +class InvalidNodeError : public std::runtime_error +{ +public: + InvalidNodeError() + : std::runtime_error("node is invalid") {} +}; + +/// Throw a C++ std::exception which was created based on an rcl error. +/* + * \param ret the return code for the current error state + * \param prefix string to prefix to the error if applicable (not all errors have custom messages) + * \param reset_error if true rcl_reset_error() is called before returning + * \throws std::invalid_argument if ret is RCL_RET_OK + * \throws std::runtime_error if the rcl_get_error_state returns 0 + * \throws RCLErrorBase some child class exception based on ret + */ +RCLCPP_PUBLIC +void +throw_from_rcl_error(rcl_ret_t ret, const std::string & prefix = "", bool reset_error = true); + +class RCLErrorBase +{ +public: + RCLCPP_PUBLIC + RCLErrorBase(rcl_ret_t ret, const rcl_error_state_t * error_state); + virtual ~RCLErrorBase() {} + + rcl_ret_t ret; + std::string message; + std::string file; + size_t line; + std::string formatted_message; +}; + +/// Created when the return code does not match one of the other specialized exceptions. +class RCLError : public RCLErrorBase, public std::runtime_error +{ +public: + RCLCPP_PUBLIC + RCLError(rcl_ret_t ret, const rcl_error_state_t * error_state, const std::string & prefix); + RCLCPP_PUBLIC + RCLError(const RCLErrorBase & base_exc, const std::string & prefix); +}; + +/// Created when the ret is RCL_RET_BAD_ALLOC. +class RCLBadAlloc : public RCLErrorBase, public std::bad_alloc +{ +public: + RCLCPP_PUBLIC + RCLBadAlloc(rcl_ret_t ret, const rcl_error_state_t * error_state); + RCLCPP_PUBLIC + explicit RCLBadAlloc(const RCLErrorBase & base_exc); +}; + +/// Created when the ret is RCL_RET_INVALID_ARGUMENT. +class RCLInvalidArgument : public RCLErrorBase, public std::invalid_argument +{ +public: + RCLCPP_PUBLIC + RCLInvalidArgument( + rcl_ret_t ret, + const rcl_error_state_t * error_state, + const std::string & prefix); + RCLCPP_PUBLIC + RCLInvalidArgument(const RCLErrorBase & base_exc, const std::string & prefix); +}; + +} // namespace exceptions +} // namespace rclcpp + +#endif // RCLCPP__EXCEPTIONS_HPP_ diff --git a/rclcpp/include/rclcpp/graph_listener.hpp b/rclcpp/include/rclcpp/graph_listener.hpp new file mode 100644 index 0000000..8927b99 --- /dev/null +++ b/rclcpp/include/rclcpp/graph_listener.hpp @@ -0,0 +1,174 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__GRAPH_LISTENER_HPP_ +#define RCLCPP__GRAPH_LISTENER_HPP_ + +#include +#include +#include +#include +#include + +#include "rcl/guard_condition.h" +#include "rcl/wait.h" +#include "rclcpp/macros.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +namespace node +{ +class Node; +} // namespace node + +namespace graph_listener +{ + +/// Thrown when a function is called on a GraphListener that is already shutdown. +class GraphListenerShutdownError : public std::runtime_error +{ +public: + GraphListenerShutdownError() + : std::runtime_error("GraphListener already shutdown") {} +}; + +/// Thrown when a node has already been added to the GraphListener. +class NodeAlreadyAddedError : public std::runtime_error +{ +public: + NodeAlreadyAddedError() + : std::runtime_error("node already added") {} +}; + +/// Thrown when the given node is not in the GraphListener. +class NodeNotFoundError : public std::runtime_error +{ +public: + NodeNotFoundError() + : std::runtime_error("node not found") {} +}; + +/// Notifies many nodes of graph changes by listening in a thread. +class GraphListener : public std::enable_shared_from_this +{ +public: + RCLCPP_PUBLIC + GraphListener(); + + RCLCPP_PUBLIC + virtual ~GraphListener(); + + /// Start the graph listener's listen thread if it hasn't been started. + /* This function is thread-safe. + * + * \throws GraphListenerShutdownError if the GraphListener is shutdown + */ + RCLCPP_PUBLIC + virtual + void + start_if_not_started(); + + /// Add a node to the graph listener's list of nodes. + /* + * \throws GraphListenerShutdownError if the GraphListener is shutdown + * \throws NodeAlreadyAddedError if the given node is already in the list + * \throws std::invalid_argument if node is nullptr + * \throws std::system_error anything std::mutex::lock() throws + */ + RCLCPP_PUBLIC + virtual + void + add_node(rclcpp::node::Node * node); + + /// Return true if the given node is in the graph listener's list of nodes. + /* Also return false if given nullptr. + * + * \throws std::system_error anything std::mutex::lock() throws + */ + RCLCPP_PUBLIC + virtual + bool + has_node(rclcpp::node::Node * node); + + /// Remove a node from the graph listener's list of nodes. + /* + * \throws NodeNotFoundError if the given node is not in the list + * \throws std::invalid_argument if node is nullptr + * \throws std::system_error anything std::mutex::lock() throws + */ + RCLCPP_PUBLIC + virtual + void + remove_node(rclcpp::node::Node * node); + + /// Stop the listening thread. + /* The thread cannot be restarted, and the class is defunct after calling. + * This function is called by the ~GraphListener() and does nothing if + * shutdown() was already called. + * This function exists separately from the ~GraphListener() so that it can + * be called before and exceptions can be caught. + * + * If start_if_not_started() was never called, this function still succeeds, + * but start_if_not_started() still cannot be called after this function. + * + * \throws rclcpp::execptions::RCLError from rcl_guard_condition_fini() + * \throws rclcpp::execptions::RCLError from rcl_wait_set_fini() + * \throws std::system_error anything std::mutex::lock() throws + */ + RCLCPP_PUBLIC + virtual + void + shutdown(); + + /// Return true if shutdown() has been called, else false. + RCLCPP_PUBLIC + virtual + bool + is_shutdown(); + +protected: + /// Main function for the listening thread. + RCLCPP_PUBLIC + virtual + void + run(); + + RCLCPP_PUBLIC + virtual + void + run_loop(); + +private: + RCLCPP_DISABLE_COPY(GraphListener); + + std::thread listener_thread_; + bool is_started_; + std::atomic_bool is_shutdown_; + mutable std::mutex shutdown_mutex_; + + mutable std::mutex nodes_barrier_mutex_; + mutable std::mutex nodes_mutex_; + std::vector nodes_; + + rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition(); + rcl_guard_condition_t * shutdown_guard_condition_; + rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); +}; + +} // namespace graph_listener +} // namespace rclcpp + +#endif // RCLCPP__GRAPH_LISTENER_HPP_ diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 988fbaa..7433ce6 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -15,9 +15,12 @@ #ifndef RCLCPP__NODE_HPP_ #define RCLCPP__NODE_HPP_ +#include +#include #include #include #include +#include #include #include #include @@ -33,6 +36,7 @@ #include "rclcpp/callback_group.hpp" #include "rclcpp/client.hpp" #include "rclcpp/context.hpp" +#include "rclcpp/event.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/parameter.hpp" @@ -50,10 +54,30 @@ struct rcl_node_t; namespace rclcpp { +namespace graph_listener +{ +class GraphListener; +} // namespace graph_listener + namespace node { + +class InvalidEventError : public std::runtime_error +{ +public: + InvalidEventError() + : std::runtime_error("event is invalid") {} +}; + +class EventNotRegisteredError : public std::runtime_error +{ +public: + EventNotRegisteredError() + : std::runtime_error("event already registered") {} +}; + /// Node is the single point of entry for creating publishers and subscribers. -class Node +class Node : public std::enable_shared_from_this { public: RCLCPP_SMART_PTR_DEFINITIONS(Node); @@ -258,7 +282,76 @@ public: get_callback_groups() const; RCLCPP_PUBLIC - const rcl_guard_condition_t * get_notify_guard_condition() const; + const rcl_guard_condition_t * + get_notify_guard_condition() const; + + RCLCPP_PUBLIC + const rcl_guard_condition_t * + get_graph_guard_condition() const; + + RCLCPP_PUBLIC + const rcl_node_t * + get_rcl_node_handle() const; + + /// Return the rcl_node_t node handle (non-const version). + RCLCPP_PUBLIC + rcl_node_t * + get_rcl_node_handle(); + + /// Return the rcl_node_t node handle in a std::shared_ptr. + /* This handle remains valid after the Node is destroyed. + * The actual rcl node is not finalized until it is out of scope everywhere. + */ + RCLCPP_PUBLIC + std::shared_ptr + get_shared_node_handle(); + + /// Notify threads waiting on graph changes. + /* Affects threads waiting on the notify guard condition, see: + * get_notify_guard_condition(), as well as the threads waiting on graph + * changes using a graph Event, see: wait_for_graph_change(). + * + * This is typically only used by the rclcpp::graph_listener::GraphListener. + * + * \throws RCLBaseError (a child of that exception) when an rcl error occurs + */ + RCLCPP_PUBLIC + void + notify_graph_change(); + + /// Notify any and all blocking node actions that shutdown has occurred. + RCLCPP_PUBLIC + void + notify_shutdown(); + + /// Return a graph event, which will be set anytime a graph change occurs. + /* The graph Event object is a loan which must be returned. + * The Event object is scoped and therefore to return the load just let it go + * out of scope. + */ + RCLCPP_PUBLIC + rclcpp::event::Event::SharedPtr + get_graph_event(); + + /// Wait for a graph event to occur by waiting on an Event to become set. + /* The given Event must be acquire through the get_graph_event() method. + * + * \throws InvalidEventError if the given event is nullptr + * \throws EventNotRegisteredError if the given event was not acquired with + * get_graph_event(). + */ + RCLCPP_PUBLIC + void + wait_for_graph_change( + rclcpp::event::Event::SharedPtr event, + std::chrono::nanoseconds timeout); + + /// Return the number of on loan graph events, see get_graph_event(). + /* This is typically only used by the rclcpp::graph_listener::GraphListener. + */ + RCLCPP_PUBLIC + size_t + count_graph_users(); std::atomic_bool has_executor; @@ -288,7 +381,24 @@ private: mutable std::mutex mutex_; /// Guard condition for notifying the Executor of changes to this node. + mutable std::mutex notify_guard_condition_mutex_; rcl_guard_condition_t notify_guard_condition_ = rcl_get_zero_initialized_guard_condition(); + bool notify_guard_condition_is_valid_; + + /// Graph Listener which waits on graph changes for the node and is shared across nodes. + std::shared_ptr graph_listener_; + /// Whether or not this node needs to be added to the graph listener. + std::atomic_bool should_add_to_graph_listener_; + + /// Mutex to guard the graph event related data structures. + std::mutex graph_mutex_; + /// For notifying waiting threads (wait_for_graph_change()) on changes (notify_graph_change()). + std::condition_variable graph_cv_; + /// Weak references to graph events out on loan. + std::vector graph_events_; + /// Number of graph events out on loan, used to determine if the graph should be monitored. + /* graph_users_count_ is atomic so that it can be accessed without acquiring the graph_mutex_ */ + std::atomic_size_t graph_users_count_; std::map parameters_; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index f13d241..f0f81a5 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -302,7 +302,7 @@ Node::create_client( using rclcpp::client::ClientBase; auto cli = Client::make_shared( - node_handle_, + shared_from_this(), service_name, options); diff --git a/rclcpp/include/rclcpp/parameter_client.hpp b/rclcpp/include/rclcpp/parameter_client.hpp index 9f1ef1b..94dd62b 100644 --- a/rclcpp/include/rclcpp/parameter_client.hpp +++ b/rclcpp/include/rclcpp/parameter_client.hpp @@ -119,8 +119,7 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(SyncParametersClient); RCLCPP_PUBLIC - SyncParametersClient( - rclcpp::node::Node::SharedPtr node); + explicit SyncParametersClient(rclcpp::node::Node::SharedPtr node); RCLCPP_PUBLIC SyncParametersClient( diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index 82c2717..1bed7ac 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -16,6 +16,7 @@ #define RCLCPP__UTILITIES_HPP_ #include +#include #include "rclcpp/visibility_control.hpp" @@ -50,6 +51,12 @@ RCLCPP_PUBLIC void shutdown(); +/// Register a function to be called when shutdown is called. +/* Calling the callbacks is the last thing shutdown() does. */ +RCLCPP_PUBLIC +void +on_shutdown(std::function callback); + /// Get a handle to the rmw guard condition that manages the signal handler. /** * The first time that this function is called for a given waitset a new guard diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index 0fd26a8..3755e97 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -14,22 +14,29 @@ #include "rclcpp/client.hpp" +#include #include #include -#include "rmw/rmw.h" +#include "rcl/graph.h" +#include "rcl/node.h" +#include "rcl/wait.h" +#include "rclcpp/exceptions.hpp" +#include "rclcpp/node.hpp" +#include "rclcpp/utilities.hpp" using rclcpp::client::ClientBase; +using rclcpp::exceptions::InvalidNodeError; +using rclcpp::exceptions::throw_from_rcl_error; ClientBase::ClientBase( - std::shared_ptr node_handle, + std::shared_ptr parent_node, const std::string & service_name) -: node_handle_(node_handle), service_name_(service_name) +: node_(parent_node), node_handle_(parent_node->get_shared_node_handle()), + service_name_(service_name) {} -ClientBase::~ClientBase() -{ -} +ClientBase::~ClientBase() {} const std::string & ClientBase::get_service_name() const @@ -42,3 +49,65 @@ ClientBase::get_client_handle() const { return &client_handle_; } + +bool +ClientBase::service_is_ready() const +{ + bool is_ready; + rcl_ret_t ret = + rcl_service_server_is_available(this->get_rcl_node_handle(), &client_handle_, &is_ready); + if (ret != RCL_RET_OK) { + throw_from_rcl_error(ret, "rcl_service_server_is_available failed"); + } + return is_ready; +} + +bool +ClientBase::wait_for_service_nanoseconds(std::chrono::nanoseconds timeout) +{ + auto start = std::chrono::steady_clock::now(); + // check to see if the server is ready immediately + if (this->service_is_ready()) { + return true; + } + if (timeout == std::chrono::nanoseconds(0)) { + // check was non-blocking, return immediately + return false; + } + // make an event to reuse, rather than create a new one each time + auto node_ptr = node_.lock(); + if (!node_ptr) { + throw InvalidNodeError(); + } + auto event = node_ptr->get_graph_event(); + // update the time even on the first loop to account for time in first server_is_read() + std::chrono::nanoseconds time_to_wait = timeout - (std::chrono::steady_clock::now() - start); + if (timeout > std::chrono::nanoseconds(0) && time_to_wait < std::chrono::nanoseconds(0)) { + // Do not allow the time_to_wait to become negative when timeout was originally positive. + // Setting time_to_wait to 0 will allow one non-blocking wait because of the do-while. + time_to_wait = std::chrono::nanoseconds(0); + } + // continue forever if timeout is negative, otherwise continue until out of time_to_wait + // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here) + do { + if (!rclcpp::utilities::ok()) { + return false; + } + node_ptr->wait_for_graph_change(event, time_to_wait); + if (event->check_and_clear()) { + if (this->service_is_ready()) { + return true; + } + } + // server is not ready, loop if there is time left + time_to_wait = timeout - (std::chrono::steady_clock::now() - start); + } while (timeout < std::chrono::nanoseconds(0) || time_to_wait > std::chrono::nanoseconds(0)); + // *INDENT-ON* + return false; // timeout exceeded while waiting for the server to be ready +} + +rcl_node_t * +ClientBase::get_rcl_node_handle() const +{ + return node_handle_.get(); +} diff --git a/rclcpp/src/rclcpp/event.cpp b/rclcpp/src/rclcpp/event.cpp new file mode 100644 index 0000000..ab6e06c --- /dev/null +++ b/rclcpp/src/rclcpp/event.cpp @@ -0,0 +1,44 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/event.hpp" + +namespace rclcpp +{ +namespace event +{ + +Event::Event() +: state_(false) {} + +bool +Event::set() +{ + return state_.exchange(true); +} + +bool +Event::check() +{ + return state_.load(); +} + +bool +Event::check_and_clear() +{ + return state_.exchange(false); +} + +} // namespace event +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/exceptions.cpp b/rclcpp/src/rclcpp/exceptions.cpp new file mode 100644 index 0000000..175226c --- /dev/null +++ b/rclcpp/src/rclcpp/exceptions.cpp @@ -0,0 +1,94 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/exceptions.hpp" + +#include +#include +#include + +namespace rclcpp +{ +namespace exceptions +{ + +void +throw_from_rcl_error(rcl_ret_t ret, const std::string & prefix, bool reset_error) +{ + if (RCL_RET_OK == ret) { + throw std::invalid_argument("ret is RCL_RET_OK"); + } + const rcl_error_state_t * error_state = rcl_get_error_state(); + if (!error_state) { + throw std::runtime_error("rcl error state is not set"); + } + std::string formated_prefix = prefix; + if (!prefix.empty()) { + formated_prefix += ": "; + } + RCLErrorBase base_exc(ret, error_state); + if (reset_error) { + rcl_reset_error(); + } + switch (ret) { + case RCL_RET_BAD_ALLOC: + throw RCLBadAlloc(base_exc); + case RCL_RET_INVALID_ARGUMENT: + throw RCLInvalidArgument(base_exc, formated_prefix); + default: + throw RCLError(base_exc, formated_prefix); + } +} + +RCLErrorBase::RCLErrorBase(rcl_ret_t ret, const rcl_error_state_t * error_state) +: ret(ret), message(error_state->message), file(error_state->file), line(error_state->line_number), + formatted_message(rcl_get_error_string_safe()) +{} + +RCLError::RCLError( + rcl_ret_t ret, + const rcl_error_state_t * error_state, + const std::string & prefix) +: RCLError(RCLErrorBase(ret, error_state), prefix) +{} + +RCLError::RCLError( + const RCLErrorBase & base_exc, + const std::string & prefix) +: RCLErrorBase(base_exc), std::runtime_error(prefix + base_exc.formatted_message) +{} + +RCLBadAlloc::RCLBadAlloc(rcl_ret_t ret, const rcl_error_state_t * error_state) +: RCLBadAlloc(RCLErrorBase(ret, error_state)) +{} + +RCLBadAlloc::RCLBadAlloc(const RCLErrorBase & base_exc) +: RCLErrorBase(base_exc), std::bad_alloc() +{} + +RCLInvalidArgument::RCLInvalidArgument( + rcl_ret_t ret, + const rcl_error_state_t * error_state, + const std::string & prefix) +: RCLInvalidArgument(RCLErrorBase(ret, error_state), prefix) +{} + +RCLInvalidArgument::RCLInvalidArgument( + const RCLErrorBase & base_exc, + const std::string & prefix) +: RCLErrorBase(base_exc), std::invalid_argument(prefix + base_exc.formatted_message) +{} + +} // namespace exceptions +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/graph_listener.cpp b/rclcpp/src/rclcpp/graph_listener.cpp new file mode 100644 index 0000000..e82c42a --- /dev/null +++ b/rclcpp/src/rclcpp/graph_listener.cpp @@ -0,0 +1,342 @@ +// Copyright 2016 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/graph_listener.hpp" + +#include +#include +#include +#include + +#include "rcl/error_handling.h" +#include "rcl/types.h" +#include "rclcpp/exceptions.hpp" +#include "rclcpp/node.hpp" +#include "rmw/impl/cpp/demangle.hpp" + +using rclcpp::exceptions::throw_from_rcl_error; + +namespace rclcpp +{ +namespace graph_listener +{ + +GraphListener::GraphListener() +: is_started_(false), is_shutdown_(false), shutdown_guard_condition_(nullptr) +{ + rcl_ret_t ret = rcl_guard_condition_init( + &interrupt_guard_condition_, + rcl_guard_condition_get_default_options()); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to create interrupt guard condition"); + } + + shutdown_guard_condition_ = rclcpp::utilities::get_sigint_guard_condition(&wait_set_); +} + +GraphListener::~GraphListener() +{ + this->shutdown(); +} + +void +GraphListener::start_if_not_started() +{ + std::lock_guard shutdown_lock(shutdown_mutex_); + if (is_shutdown_.load()) { + throw GraphListenerShutdownError(); + } + if (!is_started_) { + // Initialize the wait set before starting. + rcl_ret_t ret = rcl_wait_set_init( + &wait_set_, + 0, // number_of_subscriptions + 2, // number_of_guard_conditions + 0, // number_of_timers + 0, // number_of_clients + 0, // number_of_services + rcl_get_default_allocator()); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to initialize wait set"); + } + // Register an on_shutdown hook to shtudown the graph listener. + // This is important to ensure that the wait set is finalized before + // destruction of static objects occurs. + std::weak_ptr weak_this = shared_from_this(); + rclcpp::utilities::on_shutdown([weak_this]() { + auto shared_this = weak_this.lock(); + if (shared_this) { + shared_this->shutdown(); + } + }); + // Start the listener thread. + listener_thread_ = std::thread(&GraphListener::run, this); + is_started_ = true; + } +} + +void +GraphListener::run() +{ + try { + run_loop(); + } catch (const std::exception & exc) { + fprintf(stderr, + "[rclcpp] caught %s exception in GraphListener thread: %s\n", + rmw::impl::cpp::demangle(exc).c_str(), + exc.what()); + std::rethrow_exception(std::current_exception()); + } catch (...) { + fprintf(stderr, "[rclcpp] unknown error in GraphListener thread\n"); + std::rethrow_exception(std::current_exception()); + } +} + +void +GraphListener::run_loop() +{ + while (true) { + // If shutdown() was called, exit. + if (is_shutdown_.load()) { + return; + } + rcl_ret_t ret; + { + // This "barrier" lock ensures that other functions can acquire the + // nodes_mutex_ after waking up rcl_wait. + std::lock_guard nodes_barrier_lock(nodes_barrier_mutex_); + // This is ownership is passed to nodes_lock in the next line. + nodes_mutex_.lock(); + } + // This lock is released when the loop continues or exits. + std::lock_guard nodes_lock(nodes_mutex_, std::adopt_lock); + + // Resize the wait set if necessary. + if (wait_set_.size_of_guard_conditions < (nodes_.size() + 2)) { + ret = rcl_wait_set_resize_guard_conditions(&wait_set_, nodes_.size() + 2); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to resize wait set"); + } + } + // Clear the wait set's guard conditions. + ret = rcl_wait_set_clear_guard_conditions(&wait_set_); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to clear wait set"); + } + // Put the interrupt guard condition in the wait set. + ret = rcl_wait_set_add_guard_condition(&wait_set_, &interrupt_guard_condition_); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to add interrupt guard condition to wait set"); + } + // Put the shutdown guard condition in the wait set. + ret = rcl_wait_set_add_guard_condition(&wait_set_, shutdown_guard_condition_); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to add shutdown guard condition to wait set"); + } + // Put graph guard conditions for each node into the wait set. + for (const auto node_ptr : nodes_) { + // Only wait on graph changes if some user of the node is watching. + if (node_ptr->count_graph_users() == 0) { + continue; + } + // Add the graph guard condition for the node to the wait set. + auto graph_gc = rcl_node_get_graph_guard_condition(node_ptr->get_rcl_node_handle()); + if (!graph_gc) { + throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition"); + } + ret = rcl_wait_set_add_guard_condition(&wait_set_, graph_gc); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to add graph guard condition to wait set"); + } + } + + // Wait for: graph changes, interrupt, or shutdown/SIGINT + ret = rcl_wait(&wait_set_, -1); // block for ever until a guard condition is triggered + if (RCL_RET_TIMEOUT == ret) { + throw std::runtime_error("rcl_wait unexpectedly timed out"); + } + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to wait on wait set"); + } + + bool shutdown_guard_condition_triggered = false; + // Check to see if the shutdown guard condition has been triggered. + for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) { + if (shutdown_guard_condition_ == wait_set_.guard_conditions[i]) { + shutdown_guard_condition_triggered = true; + } + } + // Notify nodes who's guard conditions are set (triggered). + for (const auto node_ptr : nodes_) { + auto graph_gc = rcl_node_get_graph_guard_condition(node_ptr->get_rcl_node_handle()); + if (!graph_gc) { + throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition"); + } + for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) { + if (graph_gc == wait_set_.guard_conditions[i]) { + node_ptr->notify_graph_change(); + } + } + if (shutdown_guard_condition_triggered) { + // If shutdown, then notify the node of this as well. + node_ptr->notify_shutdown(); + } + } + } // while (true) +} + +static void +interrupt_(rcl_guard_condition_t * interrupt_guard_condition) +{ + rcl_ret_t ret = rcl_trigger_guard_condition(interrupt_guard_condition); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to trigger the interrupt guard condition"); + } +} + +static void +acquire_nodes_lock_( + std::mutex * nodes_barrier_mutex, + std::mutex * nodes_mutex, + rcl_guard_condition_t * interrupt_guard_condition) +{ + { + // Acquire this lock to prevent the run loop from re-locking the + // nodes_mutext_ after being woken up. + std::lock_guard nodes_barrier_lock(*nodes_barrier_mutex); + // Trigger the interrupt guard condition to wake up rcl_wait. + interrupt_(interrupt_guard_condition); + nodes_mutex->lock(); + } +} + +static bool +has_node_(std::vector * nodes, rclcpp::node::Node * node) +{ + for (const auto node_ptr : (*nodes)) { + if (node == node_ptr) { + return true; + } + } + return false; +} + +bool +GraphListener::has_node(rclcpp::node::Node * node) +{ + if (!node) { + return false; + } + // Acquire the nodes mutex using the barrier to prevent the run loop from + // re-locking the nodes mutex after being interrupted. + acquire_nodes_lock_(&nodes_barrier_mutex_, &nodes_mutex_, &interrupt_guard_condition_); + // Store the now acquired nodes_mutex_ in the scoped lock using adopt_lock. + std::lock_guard nodes_lock(nodes_mutex_, std::adopt_lock); + return has_node_(&nodes_, node); +} + +void +GraphListener::add_node(rclcpp::node::Node * node) +{ + if (!node) { + throw std::invalid_argument("node is nullptr"); + } + std::lock_guard shutdown_lock(shutdown_mutex_); + if (is_shutdown_.load()) { + throw GraphListenerShutdownError(); + } + + // Acquire the nodes mutex using the barrier to prevent the run loop from + // re-locking the nodes mutex after being interrupted. + acquire_nodes_lock_(&nodes_barrier_mutex_, &nodes_mutex_, &interrupt_guard_condition_); + // Store the now acquired nodes_mutex_ in the scoped lock using adopt_lock. + std::lock_guard nodes_lock(nodes_mutex_, std::adopt_lock); + if (has_node_(&nodes_, node)) { + throw NodeAlreadyAddedError(); + } + nodes_.push_back(node); + // The run loop has already been interrupted by acquire_nodes_lock_() and + // will evaluate the new node when nodes_lock releases the nodes_mutex_. +} + +static void +remove_node_(std::vector * nodes, rclcpp::node::Node * node) +{ + // Remove the node if it is found. + for (auto it = nodes->begin(); it != nodes->end(); ++it) { + if (node == *it) { + // Found the node, remove it. + nodes->erase(it); + // Now trigger the interrupt guard condition to make sure + return; + } + } + // Not found in the loop. + throw NodeNotFoundError(); +} + +void +GraphListener::remove_node(rclcpp::node::Node * node) +{ + if (!node) { + throw std::invalid_argument("node is nullptr"); + } + std::lock_guard shutdown_lock(shutdown_mutex_); + if (is_shutdown()) { + // If shutdown, then the run loop has been joined, so we can remove them directly. + return remove_node_(&nodes_, node); + } + // Otherwise, first interrupt and lock against the run loop to safely remove the node. + // Acquire the nodes mutex using the barrier to prevent the run loop from + // re-locking the nodes mutex after being interrupted. + acquire_nodes_lock_(&nodes_barrier_mutex_, &nodes_mutex_, &interrupt_guard_condition_); + // Store the now acquired nodes_mutex_ in the scoped lock using adopt_lock. + std::lock_guard nodes_lock(nodes_mutex_, std::adopt_lock); + remove_node_(&nodes_, node); +} + +void +GraphListener::shutdown() +{ + std::lock_guard shutdown_lock(shutdown_mutex_); + if (!is_shutdown_.exchange(true)) { + if (is_started_) { + interrupt_(&interrupt_guard_condition_); + listener_thread_.join(); + } + rcl_ret_t ret = rcl_guard_condition_fini(&interrupt_guard_condition_); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to finalize interrupt guard condition"); + } + if (shutdown_guard_condition_) { + rclcpp::utilities::release_sigint_guard_condition(&wait_set_); + shutdown_guard_condition_ = nullptr; + } + if (is_started_) { + ret = rcl_wait_set_fini(&wait_set_); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to finalize wait set"); + } + } + } +} + +bool +GraphListener::is_shutdown() +{ + return is_shutdown_.load(); +} + +} // namespace graph_listener +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index e4a6fd7..6a25f52 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -20,9 +20,12 @@ #include #include "rcl_interfaces/srv/list_parameters.hpp" +#include "rclcpp/exceptions.hpp" +#include "rclcpp/graph_listener.hpp" #include "rclcpp/node.hpp" using rclcpp::node::Node; +using rclcpp::exceptions::throw_from_rcl_error; Node::Node(const std::string & node_name, bool use_intra_process_comms) : Node( @@ -37,15 +40,14 @@ Node::Node( bool use_intra_process_comms) : name_(node_name), context_(context), number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0), - use_intra_process_comms_(use_intra_process_comms) + use_intra_process_comms_(use_intra_process_comms), notify_guard_condition_is_valid_(false), + graph_listener_(context->get_sub_context()), + should_add_to_graph_listener_(true), graph_users_count_(0) { rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options(); - if (rcl_guard_condition_init( - ¬ify_guard_condition_, guard_condition_options) != RCL_RET_OK) - { - throw std::runtime_error( - std::string("Failed to create interrupt guard condition in Executor constructor: ") + - rcl_get_error_string_safe()); + rcl_ret_t ret = rcl_guard_condition_init(¬ify_guard_condition_, guard_condition_options); + if (ret != RCL_RET_OK) { + throw_from_rcl_error(ret, "failed to create interrupt guard condition"); } has_executor.store(false); @@ -85,34 +87,42 @@ Node::Node( rcl_node_options_t options = rcl_node_get_default_options(); // TODO(jacquelinekay): Allocator options options.domain_id = domain_id; - if (rcl_node_init(node_handle_.get(), name_.c_str(), &options) != RCL_RET_OK) { + ret = rcl_node_init(node_handle_.get(), name_.c_str(), &options); + if (ret != RCL_RET_OK) { // Finalize the interrupt guard condition. if (rcl_guard_condition_fini(¬ify_guard_condition_) != RCL_RET_OK) { fprintf(stderr, "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); } - throw std::runtime_error(std::string( - "Could not initialize rcl node: ") + rcl_get_error_string_safe()); + throw_from_rcl_error(ret, "failed to initialize rcl node"); } - // Initialize node handle shared_ptr with custom deleter. - // *INDENT-OFF* - // *INDENT-ON* - using rclcpp::callback_group::CallbackGroupType; default_callback_group_ = create_callback_group( CallbackGroupType::MutuallyExclusive); events_publisher_ = create_publisher( "parameter_events", rmw_qos_profile_parameter_events); + notify_guard_condition_is_valid_ = true; } Node::~Node() { - // Finalize the interrupt guard condition. - if (rcl_guard_condition_fini(¬ify_guard_condition_) != RCL_RET_OK) { - fprintf(stderr, - "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); + // Remove self from graph listener. + // Exchange with false to prevent others from trying to add this node to the + // graph listener after checking that it was not here. + if (!should_add_to_graph_listener_.exchange(false)) { + // If it was already false, then it needs to now be removed. + graph_listener_->remove_node(this); + } + // Finalize the interrupt guard condition after removing self from graph listener. + { + std::lock_guard notify_guard_condition_lock(notify_guard_condition_mutex_); + notify_guard_condition_is_valid_ = false; + if (rcl_guard_condition_fini(¬ify_guard_condition_) != RCL_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); + } } } @@ -373,7 +383,129 @@ Node::get_callback_groups() const return callback_groups_; } -const rcl_guard_condition_t * Node::get_notify_guard_condition() const +const rcl_guard_condition_t * +Node::get_notify_guard_condition() const { + std::lock_guard notify_guard_condition_lock(notify_guard_condition_mutex_); + if (!notify_guard_condition_is_valid_) { + return nullptr; + } return ¬ify_guard_condition_; } + +const rcl_guard_condition_t * +Node::get_graph_guard_condition() const +{ + return rcl_node_get_graph_guard_condition(node_handle_.get()); +} + +const rcl_node_t * +Node::get_rcl_node_handle() const +{ + return node_handle_.get(); +} + +rcl_node_t * +Node::get_rcl_node_handle() +{ + return node_handle_.get(); +} + +std::shared_ptr +Node::get_shared_node_handle() +{ + return node_handle_; +} + +void +Node::notify_graph_change() +{ + { + std::lock_guard graph_changed_lock(graph_mutex_); + bool bad_ptr_encountered = false; + for (auto & event_wptr : graph_events_) { + auto event_ptr = event_wptr.lock(); + if (event_ptr) { + event_ptr->set(); + } else { + bad_ptr_encountered = true; + } + } + if (bad_ptr_encountered) { + // remove invalid pointers with the erase-remove idiom + graph_events_.erase( + std::remove_if( + graph_events_.begin(), + graph_events_.end(), + [](const rclcpp::event::Event::WeakPtr & wptr) { + return wptr.expired(); + }), + graph_events_.end()); + // update graph_users_count_ + graph_users_count_.store(graph_events_.size()); + } + } + graph_cv_.notify_all(); + { + std::lock_guard notify_guard_condition_lock(notify_guard_condition_mutex_); + rcl_ret_t ret = rcl_trigger_guard_condition(¬ify_guard_condition_); + if (RCL_RET_OK != ret) { + throw_from_rcl_error(ret, "failed to trigger notify guard condition"); + } + } +} + +void +Node::notify_shutdown() +{ + // notify here anything that will not be woken up by ctrl-c or rclcpp::shutdown(). + graph_cv_.notify_all(); +} + +rclcpp::event::Event::SharedPtr +Node::get_graph_event() +{ + auto event = rclcpp::event::Event::make_shared(); + std::lock_guard graph_changed_lock(graph_mutex_); + // on first call, add node to graph_listener_ + if (should_add_to_graph_listener_.exchange(false)) { + graph_listener_->add_node(this); + graph_listener_->start_if_not_started(); + } + graph_events_.push_back(event); + graph_users_count_++; + return event; +} + +void +Node::wait_for_graph_change( + rclcpp::event::Event::SharedPtr event, + std::chrono::nanoseconds timeout) +{ + if (!event) { + throw InvalidEventError(); + } + { + std::lock_guard graph_changed_lock(graph_mutex_); + bool event_in_graph_events = false; + for (const auto & event_wptr : graph_events_) { + if (event == event_wptr.lock()) { + event_in_graph_events = true; + break; + } + } + if (!event_in_graph_events) { + throw EventNotRegisteredError(); + } + } + std::unique_lock graph_lock(graph_mutex_); + graph_cv_.wait_for(graph_lock, timeout, [&event]() { + return event->check() || !rclcpp::utilities::ok(); + }); +} + +size_t +Node::count_graph_users() +{ + return graph_users_count_.load(); +} diff --git a/rclcpp/src/rclcpp/utilities.cpp b/rclcpp/src/rclcpp/utilities.cpp index 4beb601..73297ee 100644 --- a/rclcpp/src/rclcpp/utilities.cpp +++ b/rclcpp/src/rclcpp/utilities.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "rcl/error_handling.h" #include "rcl/rcl.h" @@ -154,6 +155,9 @@ rclcpp::utilities::ok() return ::g_signal_status == 0; } +static std::mutex on_shutdown_mutex_; +static std::vector> on_shutdown_callbacks_; + void rclcpp::utilities::shutdown() { @@ -170,6 +174,19 @@ rclcpp::utilities::shutdown() } g_is_interrupted.store(true); g_interrupt_condition_variable.notify_all(); + { + std::lock_guard lock(on_shutdown_mutex_); + for (auto & on_shutdown_callback : on_shutdown_callbacks_) { + on_shutdown_callback(); + } + } +} + +void +rclcpp::utilities::on_shutdown(std::function callback) +{ + std::lock_guard lock(on_shutdown_mutex_); + on_shutdown_callbacks_.push_back(callback); } rcl_guard_condition_t *