add wait_for_service() and service_is_ready() to Client (#222)

* add wait_for_service() and service_is_ready() to Client

* fix compile on Linux (maybe Windows)

* use visibility macros for Windows

* prevent unreasonable uncrustify change

* fixup comment

* add GraphListener::is_shutdown()

* disable copy on GraphListener

* use weak_ptr<Node> in client, throw if invalid

* ensure blocking wait_for_service wakes on rclcpp::shutdown/sigint

* rethrow exceptions after reporting them in thread

* lock ~Node() against notify_graph_change()

this essentially protects the notify_guard_condition_

* adjust thread sync strategy

* style

* moving initialization of wait set around, fix double free

* only fini wait set if started

* use rclcpp::shutdown to ensure graph listener resources clean up before static destruction

* uncrustify
This commit is contained in:
William Woodall 2016-06-22 20:18:46 -07:00 committed by GitHub
parent 3553107823
commit 5e2a76cc20
15 changed files with 1227 additions and 41 deletions

View file

@ -22,10 +22,13 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/client.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/event.cpp
src/rclcpp/exceptions.cpp
src/rclcpp/executor.cpp
src/rclcpp/executors.cpp
src/rclcpp/executors/multi_threaded_executor.cpp
src/rclcpp/executors/single_threaded_executor.cpp
src/rclcpp/graph_listener.cpp
src/rclcpp/intra_process_manager.cpp
src/rclcpp/intra_process_manager_impl.cpp
src/rclcpp/memory_strategies.cpp

View file

@ -25,6 +25,7 @@
#include "rcl/client.h"
#include "rcl/error_handling.h"
#include "rcl/wait.h"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/macros.hpp"
@ -37,6 +38,12 @@
namespace rclcpp
{
namespace node
{
class Node;
} // namespace node
namespace client
{
@ -47,7 +54,7 @@ public:
RCLCPP_PUBLIC
ClientBase(
std::shared_ptr<rcl_node_t> node_handle,
std::shared_ptr<rclcpp::node::Node> parent_node,
const std::string & service_name);
RCLCPP_PUBLIC
@ -61,6 +68,20 @@ public:
const rcl_client_t *
get_client_handle() const;
RCLCPP_PUBLIC
bool
service_is_ready() const;
template<typename RatioT = std::milli>
bool
wait_for_service(
std::chrono::duration<int64_t, RatioT> timeout = std::chrono::duration<int64_t, RatioT>(-1))
{
return wait_for_service_nanoseconds(
std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
);
}
virtual std::shared_ptr<void> create_response() = 0;
virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
virtual void handle_response(
@ -69,6 +90,15 @@ public:
protected:
RCLCPP_DISABLE_COPY(ClientBase);
RCLCPP_PUBLIC
bool
wait_for_service_nanoseconds(std::chrono::nanoseconds timeout);
RCLCPP_PUBLIC
rcl_node_t *
get_rcl_node_handle() const;
std::weak_ptr<rclcpp::node::Node> node_;
std::shared_ptr<rcl_node_t> node_handle_;
rcl_client_t client_handle_ = rcl_get_zero_initialized_client();
@ -97,15 +127,15 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(Client);
Client(
std::shared_ptr<rcl_node_t> node_handle,
std::shared_ptr<rclcpp::node::Node> parent_node,
const std::string & service_name,
rcl_client_options_t & client_options)
: ClientBase(node_handle, service_name)
: ClientBase(parent_node, service_name)
{
using rosidl_generator_cpp::get_service_type_support_handle;
auto service_type_support_handle =
get_service_type_support_handle<ServiceT>();
if (rcl_client_init(&client_handle_, this->node_handle_.get(),
if (rcl_client_init(&client_handle_, this->get_rcl_node_handle(),
service_type_support_handle, service_name.c_str(), &client_options) != RCL_RET_OK)
{
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
@ -118,25 +148,28 @@ public:
virtual ~Client()
{
if (rcl_client_fini(&client_handle_, node_handle_.get()) != RCL_RET_OK) {
if (rcl_client_fini(&client_handle_, this->get_rcl_node_handle()) != RCL_RET_OK) {
fprintf(stderr,
"Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe());
}
}
std::shared_ptr<void> create_response()
std::shared_ptr<void>
create_response()
{
return std::shared_ptr<void>(new typename ServiceT::Response());
}
std::shared_ptr<rmw_request_id_t> create_request_header()
std::shared_ptr<rmw_request_id_t>
create_request_header()
{
// TODO(wjwwood): This should probably use rmw_request_id's allocator.
// (since it is a C type)
return std::shared_ptr<rmw_request_id_t>(new rmw_request_id_t);
}
void handle_response(std::shared_ptr<rmw_request_id_t> request_header,
void
handle_response(std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> response)
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
@ -156,7 +189,8 @@ public:
callback(future);
}
SharedFuture async_send_request(SharedRequest request)
SharedFuture
async_send_request(SharedRequest request)
{
return async_send_request(request, [](SharedFuture) {});
}
@ -170,7 +204,8 @@ public:
>::value
>::type * = nullptr
>
SharedFuture async_send_request(SharedRequest request, CallbackT && cb)
SharedFuture
async_send_request(SharedRequest request, CallbackT && cb)
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
int64_t sequence_number;
@ -197,7 +232,8 @@ public:
>::value
>::type * = nullptr
>
SharedFutureWithRequest async_send_request(SharedRequest request, CallbackT && cb)
SharedFutureWithRequest
async_send_request(SharedRequest request, CallbackT && cb)
{
SharedPromiseWithRequest promise = std::make_shared<PromiseWithRequest>();
SharedFutureWithRequest future_with_request(promise->get_future());

View file

@ -0,0 +1,58 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EVENT_HPP_
#define RCLCPP__EVENT_HPP_
#include <atomic>
#include <memory>
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace event
{
class Event
{
public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(Event);
RCLCPP_PUBLIC
Event();
RCLCPP_PUBLIC
bool
set();
RCLCPP_PUBLIC
bool
check();
RCLCPP_PUBLIC
bool
check_and_clear();
private:
RCLCPP_DISABLE_COPY(Event);
std::atomic_bool state_;
};
} // namespace event
} // namespace rclcpp
#endif // RCLCPP__EVENT_HPP_

View file

@ -0,0 +1,101 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__EXCEPTIONS_HPP_
#define RCLCPP__EXCEPTIONS_HPP_
#include <stdexcept>
#include <string>
#include "rcl/error_handling.h"
#include "rcl/types.h"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace exceptions
{
/// Thrown when a method is trying to use a node, but it is invalid.
class InvalidNodeError : public std::runtime_error
{
public:
InvalidNodeError()
: std::runtime_error("node is invalid") {}
};
/// Throw a C++ std::exception which was created based on an rcl error.
/*
* \param ret the return code for the current error state
* \param prefix string to prefix to the error if applicable (not all errors have custom messages)
* \param reset_error if true rcl_reset_error() is called before returning
* \throws std::invalid_argument if ret is RCL_RET_OK
* \throws std::runtime_error if the rcl_get_error_state returns 0
* \throws RCLErrorBase some child class exception based on ret
*/
RCLCPP_PUBLIC
void
throw_from_rcl_error(rcl_ret_t ret, const std::string & prefix = "", bool reset_error = true);
class RCLErrorBase
{
public:
RCLCPP_PUBLIC
RCLErrorBase(rcl_ret_t ret, const rcl_error_state_t * error_state);
virtual ~RCLErrorBase() {}
rcl_ret_t ret;
std::string message;
std::string file;
size_t line;
std::string formatted_message;
};
/// Created when the return code does not match one of the other specialized exceptions.
class RCLError : public RCLErrorBase, public std::runtime_error
{
public:
RCLCPP_PUBLIC
RCLError(rcl_ret_t ret, const rcl_error_state_t * error_state, const std::string & prefix);
RCLCPP_PUBLIC
RCLError(const RCLErrorBase & base_exc, const std::string & prefix);
};
/// Created when the ret is RCL_RET_BAD_ALLOC.
class RCLBadAlloc : public RCLErrorBase, public std::bad_alloc
{
public:
RCLCPP_PUBLIC
RCLBadAlloc(rcl_ret_t ret, const rcl_error_state_t * error_state);
RCLCPP_PUBLIC
explicit RCLBadAlloc(const RCLErrorBase & base_exc);
};
/// Created when the ret is RCL_RET_INVALID_ARGUMENT.
class RCLInvalidArgument : public RCLErrorBase, public std::invalid_argument
{
public:
RCLCPP_PUBLIC
RCLInvalidArgument(
rcl_ret_t ret,
const rcl_error_state_t * error_state,
const std::string & prefix);
RCLCPP_PUBLIC
RCLInvalidArgument(const RCLErrorBase & base_exc, const std::string & prefix);
};
} // namespace exceptions
} // namespace rclcpp
#endif // RCLCPP__EXCEPTIONS_HPP_

View file

@ -0,0 +1,174 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__GRAPH_LISTENER_HPP_
#define RCLCPP__GRAPH_LISTENER_HPP_
#include <atomic>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
{
namespace node
{
class Node;
} // namespace node
namespace graph_listener
{
/// Thrown when a function is called on a GraphListener that is already shutdown.
class GraphListenerShutdownError : public std::runtime_error
{
public:
GraphListenerShutdownError()
: std::runtime_error("GraphListener already shutdown") {}
};
/// Thrown when a node has already been added to the GraphListener.
class NodeAlreadyAddedError : public std::runtime_error
{
public:
NodeAlreadyAddedError()
: std::runtime_error("node already added") {}
};
/// Thrown when the given node is not in the GraphListener.
class NodeNotFoundError : public std::runtime_error
{
public:
NodeNotFoundError()
: std::runtime_error("node not found") {}
};
/// Notifies many nodes of graph changes by listening in a thread.
class GraphListener : public std::enable_shared_from_this<GraphListener>
{
public:
RCLCPP_PUBLIC
GraphListener();
RCLCPP_PUBLIC
virtual ~GraphListener();
/// Start the graph listener's listen thread if it hasn't been started.
/* This function is thread-safe.
*
* \throws GraphListenerShutdownError if the GraphListener is shutdown
*/
RCLCPP_PUBLIC
virtual
void
start_if_not_started();
/// Add a node to the graph listener's list of nodes.
/*
* \throws GraphListenerShutdownError if the GraphListener is shutdown
* \throws NodeAlreadyAddedError if the given node is already in the list
* \throws std::invalid_argument if node is nullptr
* \throws std::system_error anything std::mutex::lock() throws
*/
RCLCPP_PUBLIC
virtual
void
add_node(rclcpp::node::Node * node);
/// Return true if the given node is in the graph listener's list of nodes.
/* Also return false if given nullptr.
*
* \throws std::system_error anything std::mutex::lock() throws
*/
RCLCPP_PUBLIC
virtual
bool
has_node(rclcpp::node::Node * node);
/// Remove a node from the graph listener's list of nodes.
/*
* \throws NodeNotFoundError if the given node is not in the list
* \throws std::invalid_argument if node is nullptr
* \throws std::system_error anything std::mutex::lock() throws
*/
RCLCPP_PUBLIC
virtual
void
remove_node(rclcpp::node::Node * node);
/// Stop the listening thread.
/* The thread cannot be restarted, and the class is defunct after calling.
* This function is called by the ~GraphListener() and does nothing if
* shutdown() was already called.
* This function exists separately from the ~GraphListener() so that it can
* be called before and exceptions can be caught.
*
* If start_if_not_started() was never called, this function still succeeds,
* but start_if_not_started() still cannot be called after this function.
*
* \throws rclcpp::execptions::RCLError from rcl_guard_condition_fini()
* \throws rclcpp::execptions::RCLError from rcl_wait_set_fini()
* \throws std::system_error anything std::mutex::lock() throws
*/
RCLCPP_PUBLIC
virtual
void
shutdown();
/// Return true if shutdown() has been called, else false.
RCLCPP_PUBLIC
virtual
bool
is_shutdown();
protected:
/// Main function for the listening thread.
RCLCPP_PUBLIC
virtual
void
run();
RCLCPP_PUBLIC
virtual
void
run_loop();
private:
RCLCPP_DISABLE_COPY(GraphListener);
std::thread listener_thread_;
bool is_started_;
std::atomic_bool is_shutdown_;
mutable std::mutex shutdown_mutex_;
mutable std::mutex nodes_barrier_mutex_;
mutable std::mutex nodes_mutex_;
std::vector<rclcpp::node::Node *> nodes_;
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rcl_guard_condition_t * shutdown_guard_condition_;
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
};
} // namespace graph_listener
} // namespace rclcpp
#endif // RCLCPP__GRAPH_LISTENER_HPP_

View file

@ -15,9 +15,12 @@
#ifndef RCLCPP__NODE_HPP_
#define RCLCPP__NODE_HPP_
#include <atomic>
#include <condition_variable>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <tuple>
#include <vector>
@ -33,6 +36,7 @@
#include "rclcpp/callback_group.hpp"
#include "rclcpp/client.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/event.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/parameter.hpp"
@ -50,10 +54,30 @@ struct rcl_node_t;
namespace rclcpp
{
namespace graph_listener
{
class GraphListener;
} // namespace graph_listener
namespace node
{
class InvalidEventError : public std::runtime_error
{
public:
InvalidEventError()
: std::runtime_error("event is invalid") {}
};
class EventNotRegisteredError : public std::runtime_error
{
public:
EventNotRegisteredError()
: std::runtime_error("event already registered") {}
};
/// Node is the single point of entry for creating publishers and subscribers.
class Node
class Node : public std::enable_shared_from_this<Node>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(Node);
@ -258,7 +282,76 @@ public:
get_callback_groups() const;
RCLCPP_PUBLIC
const rcl_guard_condition_t * get_notify_guard_condition() const;
const rcl_guard_condition_t *
get_notify_guard_condition() const;
RCLCPP_PUBLIC
const rcl_guard_condition_t *
get_graph_guard_condition() const;
RCLCPP_PUBLIC
const rcl_node_t *
get_rcl_node_handle() const;
/// Return the rcl_node_t node handle (non-const version).
RCLCPP_PUBLIC
rcl_node_t *
get_rcl_node_handle();
/// Return the rcl_node_t node handle in a std::shared_ptr.
/* This handle remains valid after the Node is destroyed.
* The actual rcl node is not finalized until it is out of scope everywhere.
*/
RCLCPP_PUBLIC
std::shared_ptr<rcl_node_t>
get_shared_node_handle();
/// Notify threads waiting on graph changes.
/* Affects threads waiting on the notify guard condition, see:
* get_notify_guard_condition(), as well as the threads waiting on graph
* changes using a graph Event, see: wait_for_graph_change().
*
* This is typically only used by the rclcpp::graph_listener::GraphListener.
*
* \throws RCLBaseError (a child of that exception) when an rcl error occurs
*/
RCLCPP_PUBLIC
void
notify_graph_change();
/// Notify any and all blocking node actions that shutdown has occurred.
RCLCPP_PUBLIC
void
notify_shutdown();
/// Return a graph event, which will be set anytime a graph change occurs.
/* The graph Event object is a loan which must be returned.
* The Event object is scoped and therefore to return the load just let it go
* out of scope.
*/
RCLCPP_PUBLIC
rclcpp::event::Event::SharedPtr
get_graph_event();
/// Wait for a graph event to occur by waiting on an Event to become set.
/* The given Event must be acquire through the get_graph_event() method.
*
* \throws InvalidEventError if the given event is nullptr
* \throws EventNotRegisteredError if the given event was not acquired with
* get_graph_event().
*/
RCLCPP_PUBLIC
void
wait_for_graph_change(
rclcpp::event::Event::SharedPtr event,
std::chrono::nanoseconds timeout);
/// Return the number of on loan graph events, see get_graph_event().
/* This is typically only used by the rclcpp::graph_listener::GraphListener.
*/
RCLCPP_PUBLIC
size_t
count_graph_users();
std::atomic_bool has_executor;
@ -288,7 +381,24 @@ private:
mutable std::mutex mutex_;
/// Guard condition for notifying the Executor of changes to this node.
mutable std::mutex notify_guard_condition_mutex_;
rcl_guard_condition_t notify_guard_condition_ = rcl_get_zero_initialized_guard_condition();
bool notify_guard_condition_is_valid_;
/// Graph Listener which waits on graph changes for the node and is shared across nodes.
std::shared_ptr<rclcpp::graph_listener::GraphListener> graph_listener_;
/// Whether or not this node needs to be added to the graph listener.
std::atomic_bool should_add_to_graph_listener_;
/// Mutex to guard the graph event related data structures.
std::mutex graph_mutex_;
/// For notifying waiting threads (wait_for_graph_change()) on changes (notify_graph_change()).
std::condition_variable graph_cv_;
/// Weak references to graph events out on loan.
std::vector<rclcpp::event::Event::WeakPtr> graph_events_;
/// Number of graph events out on loan, used to determine if the graph should be monitored.
/* graph_users_count_ is atomic so that it can be accessed without acquiring the graph_mutex_ */
std::atomic_size_t graph_users_count_;
std::map<std::string, rclcpp::parameter::ParameterVariant> parameters_;

View file

@ -302,7 +302,7 @@ Node::create_client(
using rclcpp::client::ClientBase;
auto cli = Client<ServiceT>::make_shared(
node_handle_,
shared_from_this(),
service_name,
options);

View file

@ -119,8 +119,7 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(SyncParametersClient);
RCLCPP_PUBLIC
SyncParametersClient(
rclcpp::node::Node::SharedPtr node);
explicit SyncParametersClient(rclcpp::node::Node::SharedPtr node);
RCLCPP_PUBLIC
SyncParametersClient(

View file

@ -16,6 +16,7 @@
#define RCLCPP__UTILITIES_HPP_
#include <chrono>
#include <functional>
#include "rclcpp/visibility_control.hpp"
@ -50,6 +51,12 @@ RCLCPP_PUBLIC
void
shutdown();
/// Register a function to be called when shutdown is called.
/* Calling the callbacks is the last thing shutdown() does. */
RCLCPP_PUBLIC
void
on_shutdown(std::function<void(void)> callback);
/// Get a handle to the rmw guard condition that manages the signal handler.
/**
* The first time that this function is called for a given waitset a new guard

View file

@ -14,22 +14,29 @@
#include "rclcpp/client.hpp"
#include <chrono>
#include <cstdio>
#include <string>
#include "rmw/rmw.h"
#include "rcl/graph.h"
#include "rcl/node.h"
#include "rcl/wait.h"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/utilities.hpp"
using rclcpp::client::ClientBase;
using rclcpp::exceptions::InvalidNodeError;
using rclcpp::exceptions::throw_from_rcl_error;
ClientBase::ClientBase(
std::shared_ptr<rcl_node_t> node_handle,
std::shared_ptr<rclcpp::node::Node> parent_node,
const std::string & service_name)
: node_handle_(node_handle), service_name_(service_name)
: node_(parent_node), node_handle_(parent_node->get_shared_node_handle()),
service_name_(service_name)
{}
ClientBase::~ClientBase()
{
}
ClientBase::~ClientBase() {}
const std::string &
ClientBase::get_service_name() const
@ -42,3 +49,65 @@ ClientBase::get_client_handle() const
{
return &client_handle_;
}
bool
ClientBase::service_is_ready() const
{
bool is_ready;
rcl_ret_t ret =
rcl_service_server_is_available(this->get_rcl_node_handle(), &client_handle_, &is_ready);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "rcl_service_server_is_available failed");
}
return is_ready;
}
bool
ClientBase::wait_for_service_nanoseconds(std::chrono::nanoseconds timeout)
{
auto start = std::chrono::steady_clock::now();
// check to see if the server is ready immediately
if (this->service_is_ready()) {
return true;
}
if (timeout == std::chrono::nanoseconds(0)) {
// check was non-blocking, return immediately
return false;
}
// make an event to reuse, rather than create a new one each time
auto node_ptr = node_.lock();
if (!node_ptr) {
throw InvalidNodeError();
}
auto event = node_ptr->get_graph_event();
// update the time even on the first loop to account for time in first server_is_read()
std::chrono::nanoseconds time_to_wait = timeout - (std::chrono::steady_clock::now() - start);
if (timeout > std::chrono::nanoseconds(0) && time_to_wait < std::chrono::nanoseconds(0)) {
// Do not allow the time_to_wait to become negative when timeout was originally positive.
// Setting time_to_wait to 0 will allow one non-blocking wait because of the do-while.
time_to_wait = std::chrono::nanoseconds(0);
}
// continue forever if timeout is negative, otherwise continue until out of time_to_wait
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
do {
if (!rclcpp::utilities::ok()) {
return false;
}
node_ptr->wait_for_graph_change(event, time_to_wait);
if (event->check_and_clear()) {
if (this->service_is_ready()) {
return true;
}
}
// server is not ready, loop if there is time left
time_to_wait = timeout - (std::chrono::steady_clock::now() - start);
} while (timeout < std::chrono::nanoseconds(0) || time_to_wait > std::chrono::nanoseconds(0));
// *INDENT-ON*
return false; // timeout exceeded while waiting for the server to be ready
}
rcl_node_t *
ClientBase::get_rcl_node_handle() const
{
return node_handle_.get();
}

View file

@ -0,0 +1,44 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/event.hpp"
namespace rclcpp
{
namespace event
{
Event::Event()
: state_(false) {}
bool
Event::set()
{
return state_.exchange(true);
}
bool
Event::check()
{
return state_.load();
}
bool
Event::check_and_clear()
{
return state_.exchange(false);
}
} // namespace event
} // namespace rclcpp

View file

@ -0,0 +1,94 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/exceptions.hpp"
#include <cstdio>
#include <functional>
#include <string>
namespace rclcpp
{
namespace exceptions
{
void
throw_from_rcl_error(rcl_ret_t ret, const std::string & prefix, bool reset_error)
{
if (RCL_RET_OK == ret) {
throw std::invalid_argument("ret is RCL_RET_OK");
}
const rcl_error_state_t * error_state = rcl_get_error_state();
if (!error_state) {
throw std::runtime_error("rcl error state is not set");
}
std::string formated_prefix = prefix;
if (!prefix.empty()) {
formated_prefix += ": ";
}
RCLErrorBase base_exc(ret, error_state);
if (reset_error) {
rcl_reset_error();
}
switch (ret) {
case RCL_RET_BAD_ALLOC:
throw RCLBadAlloc(base_exc);
case RCL_RET_INVALID_ARGUMENT:
throw RCLInvalidArgument(base_exc, formated_prefix);
default:
throw RCLError(base_exc, formated_prefix);
}
}
RCLErrorBase::RCLErrorBase(rcl_ret_t ret, const rcl_error_state_t * error_state)
: ret(ret), message(error_state->message), file(error_state->file), line(error_state->line_number),
formatted_message(rcl_get_error_string_safe())
{}
RCLError::RCLError(
rcl_ret_t ret,
const rcl_error_state_t * error_state,
const std::string & prefix)
: RCLError(RCLErrorBase(ret, error_state), prefix)
{}
RCLError::RCLError(
const RCLErrorBase & base_exc,
const std::string & prefix)
: RCLErrorBase(base_exc), std::runtime_error(prefix + base_exc.formatted_message)
{}
RCLBadAlloc::RCLBadAlloc(rcl_ret_t ret, const rcl_error_state_t * error_state)
: RCLBadAlloc(RCLErrorBase(ret, error_state))
{}
RCLBadAlloc::RCLBadAlloc(const RCLErrorBase & base_exc)
: RCLErrorBase(base_exc), std::bad_alloc()
{}
RCLInvalidArgument::RCLInvalidArgument(
rcl_ret_t ret,
const rcl_error_state_t * error_state,
const std::string & prefix)
: RCLInvalidArgument(RCLErrorBase(ret, error_state), prefix)
{}
RCLInvalidArgument::RCLInvalidArgument(
const RCLErrorBase & base_exc,
const std::string & prefix)
: RCLErrorBase(base_exc), std::invalid_argument(prefix + base_exc.formatted_message)
{}
} // namespace exceptions
} // namespace rclcpp

View file

@ -0,0 +1,342 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rclcpp/graph_listener.hpp"
#include <cstdio>
#include <exception>
#include <string>
#include <vector>
#include "rcl/error_handling.h"
#include "rcl/types.h"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/node.hpp"
#include "rmw/impl/cpp/demangle.hpp"
using rclcpp::exceptions::throw_from_rcl_error;
namespace rclcpp
{
namespace graph_listener
{
GraphListener::GraphListener()
: is_started_(false), is_shutdown_(false), shutdown_guard_condition_(nullptr)
{
rcl_ret_t ret = rcl_guard_condition_init(
&interrupt_guard_condition_,
rcl_guard_condition_get_default_options());
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to create interrupt guard condition");
}
shutdown_guard_condition_ = rclcpp::utilities::get_sigint_guard_condition(&wait_set_);
}
GraphListener::~GraphListener()
{
this->shutdown();
}
void
GraphListener::start_if_not_started()
{
std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
if (is_shutdown_.load()) {
throw GraphListenerShutdownError();
}
if (!is_started_) {
// Initialize the wait set before starting.
rcl_ret_t ret = rcl_wait_set_init(
&wait_set_,
0, // number_of_subscriptions
2, // number_of_guard_conditions
0, // number_of_timers
0, // number_of_clients
0, // number_of_services
rcl_get_default_allocator());
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to initialize wait set");
}
// Register an on_shutdown hook to shtudown the graph listener.
// This is important to ensure that the wait set is finalized before
// destruction of static objects occurs.
std::weak_ptr<GraphListener> weak_this = shared_from_this();
rclcpp::utilities::on_shutdown([weak_this]() {
auto shared_this = weak_this.lock();
if (shared_this) {
shared_this->shutdown();
}
});
// Start the listener thread.
listener_thread_ = std::thread(&GraphListener::run, this);
is_started_ = true;
}
}
void
GraphListener::run()
{
try {
run_loop();
} catch (const std::exception & exc) {
fprintf(stderr,
"[rclcpp] caught %s exception in GraphListener thread: %s\n",
rmw::impl::cpp::demangle(exc).c_str(),
exc.what());
std::rethrow_exception(std::current_exception());
} catch (...) {
fprintf(stderr, "[rclcpp] unknown error in GraphListener thread\n");
std::rethrow_exception(std::current_exception());
}
}
void
GraphListener::run_loop()
{
while (true) {
// If shutdown() was called, exit.
if (is_shutdown_.load()) {
return;
}
rcl_ret_t ret;
{
// This "barrier" lock ensures that other functions can acquire the
// nodes_mutex_ after waking up rcl_wait.
std::lock_guard<std::mutex> nodes_barrier_lock(nodes_barrier_mutex_);
// This is ownership is passed to nodes_lock in the next line.
nodes_mutex_.lock();
}
// This lock is released when the loop continues or exits.
std::lock_guard<std::mutex> nodes_lock(nodes_mutex_, std::adopt_lock);
// Resize the wait set if necessary.
if (wait_set_.size_of_guard_conditions < (nodes_.size() + 2)) {
ret = rcl_wait_set_resize_guard_conditions(&wait_set_, nodes_.size() + 2);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to resize wait set");
}
}
// Clear the wait set's guard conditions.
ret = rcl_wait_set_clear_guard_conditions(&wait_set_);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to clear wait set");
}
// Put the interrupt guard condition in the wait set.
ret = rcl_wait_set_add_guard_condition(&wait_set_, &interrupt_guard_condition_);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to add interrupt guard condition to wait set");
}
// Put the shutdown guard condition in the wait set.
ret = rcl_wait_set_add_guard_condition(&wait_set_, shutdown_guard_condition_);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to add shutdown guard condition to wait set");
}
// Put graph guard conditions for each node into the wait set.
for (const auto node_ptr : nodes_) {
// Only wait on graph changes if some user of the node is watching.
if (node_ptr->count_graph_users() == 0) {
continue;
}
// Add the graph guard condition for the node to the wait set.
auto graph_gc = rcl_node_get_graph_guard_condition(node_ptr->get_rcl_node_handle());
if (!graph_gc) {
throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition");
}
ret = rcl_wait_set_add_guard_condition(&wait_set_, graph_gc);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to add graph guard condition to wait set");
}
}
// Wait for: graph changes, interrupt, or shutdown/SIGINT
ret = rcl_wait(&wait_set_, -1); // block for ever until a guard condition is triggered
if (RCL_RET_TIMEOUT == ret) {
throw std::runtime_error("rcl_wait unexpectedly timed out");
}
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to wait on wait set");
}
bool shutdown_guard_condition_triggered = false;
// Check to see if the shutdown guard condition has been triggered.
for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) {
if (shutdown_guard_condition_ == wait_set_.guard_conditions[i]) {
shutdown_guard_condition_triggered = true;
}
}
// Notify nodes who's guard conditions are set (triggered).
for (const auto node_ptr : nodes_) {
auto graph_gc = rcl_node_get_graph_guard_condition(node_ptr->get_rcl_node_handle());
if (!graph_gc) {
throw_from_rcl_error(RCL_RET_ERROR, "failed to get graph guard condition");
}
for (size_t i = 0; i < wait_set_.size_of_guard_conditions; ++i) {
if (graph_gc == wait_set_.guard_conditions[i]) {
node_ptr->notify_graph_change();
}
}
if (shutdown_guard_condition_triggered) {
// If shutdown, then notify the node of this as well.
node_ptr->notify_shutdown();
}
}
} // while (true)
}
static void
interrupt_(rcl_guard_condition_t * interrupt_guard_condition)
{
rcl_ret_t ret = rcl_trigger_guard_condition(interrupt_guard_condition);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to trigger the interrupt guard condition");
}
}
static void
acquire_nodes_lock_(
std::mutex * nodes_barrier_mutex,
std::mutex * nodes_mutex,
rcl_guard_condition_t * interrupt_guard_condition)
{
{
// Acquire this lock to prevent the run loop from re-locking the
// nodes_mutext_ after being woken up.
std::lock_guard<std::mutex> nodes_barrier_lock(*nodes_barrier_mutex);
// Trigger the interrupt guard condition to wake up rcl_wait.
interrupt_(interrupt_guard_condition);
nodes_mutex->lock();
}
}
static bool
has_node_(std::vector<rclcpp::node::Node *> * nodes, rclcpp::node::Node * node)
{
for (const auto node_ptr : (*nodes)) {
if (node == node_ptr) {
return true;
}
}
return false;
}
bool
GraphListener::has_node(rclcpp::node::Node * node)
{
if (!node) {
return false;
}
// Acquire the nodes mutex using the barrier to prevent the run loop from
// re-locking the nodes mutex after being interrupted.
acquire_nodes_lock_(&nodes_barrier_mutex_, &nodes_mutex_, &interrupt_guard_condition_);
// Store the now acquired nodes_mutex_ in the scoped lock using adopt_lock.
std::lock_guard<std::mutex> nodes_lock(nodes_mutex_, std::adopt_lock);
return has_node_(&nodes_, node);
}
void
GraphListener::add_node(rclcpp::node::Node * node)
{
if (!node) {
throw std::invalid_argument("node is nullptr");
}
std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
if (is_shutdown_.load()) {
throw GraphListenerShutdownError();
}
// Acquire the nodes mutex using the barrier to prevent the run loop from
// re-locking the nodes mutex after being interrupted.
acquire_nodes_lock_(&nodes_barrier_mutex_, &nodes_mutex_, &interrupt_guard_condition_);
// Store the now acquired nodes_mutex_ in the scoped lock using adopt_lock.
std::lock_guard<std::mutex> nodes_lock(nodes_mutex_, std::adopt_lock);
if (has_node_(&nodes_, node)) {
throw NodeAlreadyAddedError();
}
nodes_.push_back(node);
// The run loop has already been interrupted by acquire_nodes_lock_() and
// will evaluate the new node when nodes_lock releases the nodes_mutex_.
}
static void
remove_node_(std::vector<rclcpp::node::Node *> * nodes, rclcpp::node::Node * node)
{
// Remove the node if it is found.
for (auto it = nodes->begin(); it != nodes->end(); ++it) {
if (node == *it) {
// Found the node, remove it.
nodes->erase(it);
// Now trigger the interrupt guard condition to make sure
return;
}
}
// Not found in the loop.
throw NodeNotFoundError();
}
void
GraphListener::remove_node(rclcpp::node::Node * node)
{
if (!node) {
throw std::invalid_argument("node is nullptr");
}
std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
if (is_shutdown()) {
// If shutdown, then the run loop has been joined, so we can remove them directly.
return remove_node_(&nodes_, node);
}
// Otherwise, first interrupt and lock against the run loop to safely remove the node.
// Acquire the nodes mutex using the barrier to prevent the run loop from
// re-locking the nodes mutex after being interrupted.
acquire_nodes_lock_(&nodes_barrier_mutex_, &nodes_mutex_, &interrupt_guard_condition_);
// Store the now acquired nodes_mutex_ in the scoped lock using adopt_lock.
std::lock_guard<std::mutex> nodes_lock(nodes_mutex_, std::adopt_lock);
remove_node_(&nodes_, node);
}
void
GraphListener::shutdown()
{
std::lock_guard<std::mutex> shutdown_lock(shutdown_mutex_);
if (!is_shutdown_.exchange(true)) {
if (is_started_) {
interrupt_(&interrupt_guard_condition_);
listener_thread_.join();
}
rcl_ret_t ret = rcl_guard_condition_fini(&interrupt_guard_condition_);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to finalize interrupt guard condition");
}
if (shutdown_guard_condition_) {
rclcpp::utilities::release_sigint_guard_condition(&wait_set_);
shutdown_guard_condition_ = nullptr;
}
if (is_started_) {
ret = rcl_wait_set_fini(&wait_set_);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to finalize wait set");
}
}
}
}
bool
GraphListener::is_shutdown()
{
return is_shutdown_.load();
}
} // namespace graph_listener
} // namespace rclcpp

View file

@ -20,9 +20,12 @@
#include <vector>
#include "rcl_interfaces/srv/list_parameters.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/graph_listener.hpp"
#include "rclcpp/node.hpp"
using rclcpp::node::Node;
using rclcpp::exceptions::throw_from_rcl_error;
Node::Node(const std::string & node_name, bool use_intra_process_comms)
: Node(
@ -37,15 +40,14 @@ Node::Node(
bool use_intra_process_comms)
: name_(node_name), context_(context),
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
use_intra_process_comms_(use_intra_process_comms)
use_intra_process_comms_(use_intra_process_comms), notify_guard_condition_is_valid_(false),
graph_listener_(context->get_sub_context<rclcpp::graph_listener::GraphListener>()),
should_add_to_graph_listener_(true), graph_users_count_(0)
{
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
if (rcl_guard_condition_init(
&notify_guard_condition_, guard_condition_options) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("Failed to create interrupt guard condition in Executor constructor: ") +
rcl_get_error_string_safe());
rcl_ret_t ret = rcl_guard_condition_init(&notify_guard_condition_, guard_condition_options);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "failed to create interrupt guard condition");
}
has_executor.store(false);
@ -85,34 +87,42 @@ Node::Node(
rcl_node_options_t options = rcl_node_get_default_options();
// TODO(jacquelinekay): Allocator options
options.domain_id = domain_id;
if (rcl_node_init(node_handle_.get(), name_.c_str(), &options) != RCL_RET_OK) {
ret = rcl_node_init(node_handle_.get(), name_.c_str(), &options);
if (ret != RCL_RET_OK) {
// Finalize the interrupt guard condition.
if (rcl_guard_condition_fini(&notify_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
}
throw std::runtime_error(std::string(
"Could not initialize rcl node: ") + rcl_get_error_string_safe());
throw_from_rcl_error(ret, "failed to initialize rcl node");
}
// Initialize node handle shared_ptr with custom deleter.
// *INDENT-OFF*
// *INDENT-ON*
using rclcpp::callback_group::CallbackGroupType;
default_callback_group_ = create_callback_group(
CallbackGroupType::MutuallyExclusive);
events_publisher_ = create_publisher<rcl_interfaces::msg::ParameterEvent>(
"parameter_events", rmw_qos_profile_parameter_events);
notify_guard_condition_is_valid_ = true;
}
Node::~Node()
{
// Finalize the interrupt guard condition.
if (rcl_guard_condition_fini(&notify_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
// Remove self from graph listener.
// Exchange with false to prevent others from trying to add this node to the
// graph listener after checking that it was not here.
if (!should_add_to_graph_listener_.exchange(false)) {
// If it was already false, then it needs to now be removed.
graph_listener_->remove_node(this);
}
// Finalize the interrupt guard condition after removing self from graph listener.
{
std::lock_guard<std::mutex> notify_guard_condition_lock(notify_guard_condition_mutex_);
notify_guard_condition_is_valid_ = false;
if (rcl_guard_condition_fini(&notify_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
}
}
}
@ -373,7 +383,129 @@ Node::get_callback_groups() const
return callback_groups_;
}
const rcl_guard_condition_t * Node::get_notify_guard_condition() const
const rcl_guard_condition_t *
Node::get_notify_guard_condition() const
{
std::lock_guard<std::mutex> notify_guard_condition_lock(notify_guard_condition_mutex_);
if (!notify_guard_condition_is_valid_) {
return nullptr;
}
return &notify_guard_condition_;
}
const rcl_guard_condition_t *
Node::get_graph_guard_condition() const
{
return rcl_node_get_graph_guard_condition(node_handle_.get());
}
const rcl_node_t *
Node::get_rcl_node_handle() const
{
return node_handle_.get();
}
rcl_node_t *
Node::get_rcl_node_handle()
{
return node_handle_.get();
}
std::shared_ptr<rcl_node_t>
Node::get_shared_node_handle()
{
return node_handle_;
}
void
Node::notify_graph_change()
{
{
std::lock_guard<std::mutex> graph_changed_lock(graph_mutex_);
bool bad_ptr_encountered = false;
for (auto & event_wptr : graph_events_) {
auto event_ptr = event_wptr.lock();
if (event_ptr) {
event_ptr->set();
} else {
bad_ptr_encountered = true;
}
}
if (bad_ptr_encountered) {
// remove invalid pointers with the erase-remove idiom
graph_events_.erase(
std::remove_if(
graph_events_.begin(),
graph_events_.end(),
[](const rclcpp::event::Event::WeakPtr & wptr) {
return wptr.expired();
}),
graph_events_.end());
// update graph_users_count_
graph_users_count_.store(graph_events_.size());
}
}
graph_cv_.notify_all();
{
std::lock_guard<std::mutex> notify_guard_condition_lock(notify_guard_condition_mutex_);
rcl_ret_t ret = rcl_trigger_guard_condition(&notify_guard_condition_);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "failed to trigger notify guard condition");
}
}
}
void
Node::notify_shutdown()
{
// notify here anything that will not be woken up by ctrl-c or rclcpp::shutdown().
graph_cv_.notify_all();
}
rclcpp::event::Event::SharedPtr
Node::get_graph_event()
{
auto event = rclcpp::event::Event::make_shared();
std::lock_guard<std::mutex> graph_changed_lock(graph_mutex_);
// on first call, add node to graph_listener_
if (should_add_to_graph_listener_.exchange(false)) {
graph_listener_->add_node(this);
graph_listener_->start_if_not_started();
}
graph_events_.push_back(event);
graph_users_count_++;
return event;
}
void
Node::wait_for_graph_change(
rclcpp::event::Event::SharedPtr event,
std::chrono::nanoseconds timeout)
{
if (!event) {
throw InvalidEventError();
}
{
std::lock_guard<std::mutex> graph_changed_lock(graph_mutex_);
bool event_in_graph_events = false;
for (const auto & event_wptr : graph_events_) {
if (event == event_wptr.lock()) {
event_in_graph_events = true;
break;
}
}
if (!event_in_graph_events) {
throw EventNotRegisteredError();
}
}
std::unique_lock<std::mutex> graph_lock(graph_mutex_);
graph_cv_.wait_for(graph_lock, timeout, [&event]() {
return event->check() || !rclcpp::utilities::ok();
});
}
size_t
Node::count_graph_users()
{
return graph_users_count_.load();
}

View file

@ -22,6 +22,7 @@
#include <map>
#include <mutex>
#include <string>
#include <vector>
#include "rcl/error_handling.h"
#include "rcl/rcl.h"
@ -154,6 +155,9 @@ rclcpp::utilities::ok()
return ::g_signal_status == 0;
}
static std::mutex on_shutdown_mutex_;
static std::vector<std::function<void(void)>> on_shutdown_callbacks_;
void
rclcpp::utilities::shutdown()
{
@ -170,6 +174,19 @@ rclcpp::utilities::shutdown()
}
g_is_interrupted.store(true);
g_interrupt_condition_variable.notify_all();
{
std::lock_guard<std::mutex> lock(on_shutdown_mutex_);
for (auto & on_shutdown_callback : on_shutdown_callbacks_) {
on_shutdown_callback();
}
}
}
void
rclcpp::utilities::on_shutdown(std::function<void(void)> callback)
{
std::lock_guard<std::mutex> lock(on_shutdown_mutex_);
on_shutdown_callbacks_.push_back(callback);
}
rcl_guard_condition_t *