Refactor to use rcl (#207)

This commit is contained in:
Jackie Kay 2016-04-24 21:25:19 +00:00
parent 6bcd9db4d6
commit e961189be8
24 changed files with 796 additions and 622 deletions

View file

@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8.3)
project(rclcpp)
find_package(ament_cmake REQUIRED)
find_package(rcl REQUIRED)
find_package(rcl_interfaces REQUIRED)
find_package(rmw REQUIRED)
find_package(rmw_implementation REQUIRED)
@ -42,13 +43,14 @@ set(${PROJECT_NAME}_SRCS
)
macro(target)
if(NOT "${target_suffix} " STREQUAL " ")
get_rcl_information("${rmw_implementation}" "rcl${target_suffix}")
endif()
add_library(${PROJECT_NAME}${target_suffix} SHARED
${${PROJECT_NAME}_SRCS})
ament_target_dependencies(${PROJECT_NAME}${target_suffix}
"rcl_interfaces"
"rmw"
"rosidl_generator_cpp"
"${rmw_implementation}")
"rcl${target_suffix}"
"rosidl_generator_cpp")
# Causes the visibility macros to use dllexport rather than dllimport,
# which is appropriate when building the dll but not consuming it.
@ -66,9 +68,7 @@ endmacro()
call_for_each_rmw_implementation(target GENERATE_DEFAULT)
ament_export_dependencies(ament_cmake)
ament_export_dependencies(rcl_interfaces)
ament_export_dependencies(rmw)
ament_export_dependencies(rmw_implementation)
ament_export_dependencies(rcl)
ament_export_dependencies(rosidl_generator_cpp)
ament_export_include_directories(include)
@ -90,6 +90,7 @@ if(AMENT_ENABLE_TESTING)
ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp)
if(TARGET test_mapped_ring_buffer)
target_include_directories(test_mapped_ring_buffer PUBLIC
${rcl_INCLUDE_DIRS}
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}
@ -98,6 +99,7 @@ if(AMENT_ENABLE_TESTING)
ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp)
if(TARGET test_intra_process_manager)
target_include_directories(test_intra_process_manager PUBLIC
${rcl_INCLUDE_DIRS}
${rcl_interfaces_INCLUDE_DIRS}
${rmw_INCLUDE_DIRS}
${rosidl_generator_cpp_INCLUDE_DIRS}

View file

@ -28,6 +28,11 @@ macro(get_rclcpp_information rmw_implementation var_prefix)
# so that the variables can be used by various functions / macros
set(${var_prefix}_FOUND TRUE)
# Get rcl using the existing macro
if(NOT "${target_suffix} " STREQUAL " ")
get_rcl_information("${rmw_implementation}" "rcl${target_suffix}")
endif()
# include directories
normalize_path(${var_prefix}_INCLUDE_DIRS
"${rclcpp_DIR}/../../../include")
@ -63,8 +68,7 @@ macro(get_rclcpp_information rmw_implementation var_prefix)
# dependencies
set(_exported_dependencies
"rcl_interfaces"
"rmw"
"${rmw_implementation}"
"rcl${target_suffix}"
"rosidl_generator_cpp")
set(${var_prefix}_DEFINITIONS)
foreach(_dep ${_exported_dependencies})

View file

@ -17,6 +17,8 @@
#include <memory>
#include "rcl/allocator.h"
#include "rclcpp/allocator/allocator_deleter.hpp"
namespace rclcpp
@ -27,6 +29,64 @@ namespace allocator
template<typename T, typename Alloc>
using AllocRebind = typename std::allocator_traits<Alloc>::template rebind_traits<T>;
template<typename Alloc>
void * retyped_allocate(size_t size, void * untyped_allocator)
{
auto typed_allocator = static_cast<Alloc *>(untyped_allocator);
if (!typed_allocator) {
throw std::runtime_error("Received incorrect allocator type");
}
return std::allocator_traits<Alloc>::allocate(*typed_allocator, size);
}
template<typename T, typename Alloc>
void retyped_deallocate(void * untyped_pointer, void * untyped_allocator)
{
auto typed_allocator = static_cast<Alloc *>(untyped_allocator);
if (!typed_allocator) {
throw std::runtime_error("Received incorrect allocator type");
}
auto typed_ptr = static_cast<T *>(untyped_pointer);
std::allocator_traits<Alloc>::deallocate(*typed_allocator, typed_ptr, 1);
}
template<typename T, typename Alloc>
void * retyped_reallocate(void * untyped_pointer, size_t size, void * untyped_allocator)
{
auto typed_allocator = static_cast<Alloc *>(untyped_allocator);
if (!typed_allocator) {
throw std::runtime_error("Received incorrect allocator type");
}
auto typed_ptr = static_cast<T *>(untyped_pointer);
std::allocator_traits<Alloc>::deallocate(*typed_allocator, typed_ptr, 1);
return std::allocator_traits<Alloc>::allocate(*typed_allocator, size);
}
// Convert a std::allocator_traits-formatted Allocator into an rcl allocator
template<typename T, typename Alloc,
typename std::enable_if<!std::is_same<Alloc, std::allocator<void>>::value>::type * = nullptr>
rcl_allocator_t get_rcl_allocator(Alloc & allocator)
{
rcl_allocator_t rcl_allocator = rcl_get_default_allocator();
#ifndef _WIN32
rcl_allocator.allocate = &retyped_allocate<Alloc>;
rcl_allocator.deallocate = &retyped_deallocate<T, Alloc>;
rcl_allocator.reallocate = &retyped_reallocate<T, Alloc>;
rcl_allocator.state = &allocator;
#endif
return rcl_allocator;
}
// TODO(jacquelinekay) Workaround for an incomplete implementation of std::allocator<void>
template<typename T, typename Alloc,
typename std::enable_if<std::is_same<Alloc, std::allocator<void>>::value>::type * = nullptr>
rcl_allocator_t get_rcl_allocator(Alloc & allocator)
{
(void)allocator;
return rcl_get_default_allocator();
}
} // namespace allocator
} // namespace rclcpp

View file

@ -23,10 +23,15 @@
#include <tuple>
#include <utility>
#include "rcl/client.h"
#include "rcl/error_handling.h"
#include "rclcpp/function_traits.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
@ -42,19 +47,18 @@ public:
RCLCPP_PUBLIC
ClientBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_client_t * client_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string & service_name);
RCLCPP_PUBLIC
virtual ~ClientBase();
~ClientBase();
RCLCPP_PUBLIC
const std::string &
get_service_name() const;
RCLCPP_PUBLIC
const rmw_client_t *
const rcl_client_t *
get_client_handle() const;
virtual std::shared_ptr<void> create_response() = 0;
@ -62,12 +66,12 @@ public:
virtual void handle_response(
std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) = 0;
private:
protected:
RCLCPP_DISABLE_COPY(ClientBase);
std::shared_ptr<rmw_node_t> node_handle_;
std::shared_ptr<rcl_node_t> node_handle_;
rmw_client_t * client_handle_;
rcl_client_t client_handle_ = rcl_get_zero_initialized_client();
std::string service_name_;
};
@ -93,11 +97,32 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(Client);
Client(
std::shared_ptr<rmw_node_t> node_handle,
rmw_client_t * client_handle,
const std::string & service_name)
: ClientBase(node_handle, client_handle, service_name)
{}
std::shared_ptr<rcl_node_t> node_handle,
const std::string & service_name,
rcl_client_options_t & client_options)
: ClientBase(node_handle, service_name)
{
using rosidl_generator_cpp::get_service_type_support_handle;
auto service_type_support_handle =
get_service_type_support_handle<ServiceT>();
if (rcl_client_init(&client_handle_, this->node_handle_.get(),
service_type_support_handle, service_name.c_str(), &client_options) != RCL_RET_OK)
{
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create client: ") +
rcl_get_error_string_safe());
// *INDENT-ON*
}
}
virtual ~Client()
{
if (rcl_client_fini(&client_handle_, node_handle_.get()) != RCL_RET_OK) {
fprintf(stderr,
"Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe());
}
}
std::shared_ptr<void> create_response()
{
@ -149,10 +174,10 @@ public:
{
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
int64_t sequence_number;
if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) {
if (RCL_RET_OK != rcl_send_request(get_client_handle(), request.get(), &sequence_number)) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to send request: ") + rmw_get_error_string_safe());
std::string("failed to send request: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}

View file

@ -25,6 +25,9 @@
#include <string>
#include <vector>
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/any_executable.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/memory_strategies.hpp"
@ -193,10 +196,12 @@ public:
}
auto end_time = std::chrono::steady_clock::now();
if (timeout > std::chrono::nanoseconds::zero()) {
end_time += timeout;
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
timeout);
if (timeout_ns > std::chrono::nanoseconds::zero()) {
end_time += timeout_ns;
}
auto timeout_left = timeout;
std::chrono::nanoseconds timeout_left = timeout_ns;
while (rclcpp::utilities::ok()) {
// Do one item of work.
@ -207,7 +212,7 @@ public:
return FutureReturnCode::SUCCESS;
}
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
if (timeout < std::chrono::nanoseconds::zero()) {
if (timeout_ns < std::chrono::nanoseconds::zero()) {
continue;
}
// Otherwise check if we still have time to wait, return TIMEOUT if not.
@ -216,8 +221,7 @@ public:
return FutureReturnCode::TIMEOUT;
}
// Subtract the elapsed time from the original timeout.
using duration_type = std::chrono::duration<int64_t, TimeT>;
timeout_left = std::chrono::duration_cast<duration_type>(end_time - now);
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
}
// The future did not complete before ok() returned false, return INTERRUPTED.
@ -291,10 +295,6 @@ protected:
void
get_next_timer(AnyExecutable::SharedPtr any_exec);
RCLCPP_PUBLIC
std::chrono::nanoseconds
get_earliest_timer();
RCLCPP_PUBLIC
AnyExecutable::SharedPtr
get_next_ready_executable();
@ -307,10 +307,10 @@ protected:
std::atomic_bool spinning;
/// Guard condition for signaling the rmw layer to wake up for special events.
rmw_guard_condition_t * interrupt_guard_condition_;
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
/// Waitset for managing entities that the rmw layer waits on.
rmw_waitset_t * waitset_;
rcl_wait_set_t waitset_ = rcl_get_zero_initialized_wait_set();
/// The memory strategy: an interface for handling user-defined memory allocation strategies.
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;

View file

@ -18,6 +18,9 @@
#include <memory>
#include <vector>
#include "rcl/allocator.h"
#include "rcl/wait.h"
#include "rclcpp/any_executable.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node.hpp"
@ -40,31 +43,25 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy);
using WeakNodeVector = std::vector<std::weak_ptr<rclcpp::node::Node>>;
// return the new number of subscribers
virtual size_t fill_subscriber_handles(void ** & ptr) = 0;
// return the new number of services
virtual size_t fill_service_handles(void ** & ptr) = 0;
// return the new number of clients
virtual size_t fill_client_handles(void ** & ptr) = 0;
// return the new number of guard_conditions
virtual size_t fill_guard_condition_handles(void ** & ptr) = 0;
virtual void clear_active_entities() = 0;
virtual void clear_handles() = 0;
virtual void remove_null_handles() = 0;
virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0;
virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0;
virtual size_t number_of_ready_clients() const = 0;
virtual size_t number_of_ready_timers() const = 0;
virtual size_t number_of_guard_conditions() const = 0;
virtual bool add_handles_to_waitset(rcl_wait_set_t * wait_set) = 0;
virtual void clear_handles() = 0;
virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0;
/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0;
virtual void add_guard_condition(const rmw_guard_condition_t * guard_condition) = 0;
virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void remove_guard_condition(const rmw_guard_condition_t * guard_condition) = 0;
virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
get_next_subscription(rclcpp::executor::AnyExecutable::SharedPtr any_exec,
@ -78,15 +75,18 @@ public:
get_next_client(rclcpp::executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes) = 0;
virtual rcl_allocator_t
get_allocator() = 0;
static rclcpp::subscription::SubscriptionBase::SharedPtr
get_subscription_by_handle(void * subscriber_handle,
get_subscription_by_handle(const rcl_subscription_t * subscriber_handle,
const WeakNodeVector & weak_nodes);
static rclcpp::service::ServiceBase::SharedPtr
get_service_by_handle(void * service_handle, const WeakNodeVector & weak_nodes);
get_service_by_handle(const rcl_service_t * service_handle, const WeakNodeVector & weak_nodes);
static rclcpp::client::ClientBase::SharedPtr
get_client_by_handle(void * client_handle, const WeakNodeVector & weak_nodes);
get_client_by_handle(const rcl_client_t * client_handle, const WeakNodeVector & weak_nodes);
static rclcpp::node::Node::SharedPtr
get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group,

View file

@ -22,6 +22,9 @@
#include <tuple>
#include <vector>
#include "rcl/error_handling.h"
#include "rcl/node.h"
#include "rcl_interfaces/msg/list_parameters_result.hpp"
#include "rcl_interfaces/msg/parameter_descriptor.hpp"
#include "rcl_interfaces/msg/parameter_event.hpp"
@ -39,11 +42,10 @@
#include "rclcpp/timer.hpp"
#include "rclcpp/visibility_control.hpp"
// Forward declaration of ROS middleware class
namespace rmw
namespace rcl
{
struct rmw_node_t;
} // namespace rmw
struct rcl_node_t;
} // namespace rcl
namespace rclcpp
{
@ -256,7 +258,7 @@ public:
get_callback_groups() const;
RCLCPP_PUBLIC
rmw_guard_condition_t * get_notify_guard_condition() const;
const rcl_guard_condition_t * get_notify_guard_condition() const;
std::atomic_bool has_executor;
@ -269,7 +271,7 @@ private:
std::string name_;
std::shared_ptr<rmw_node_t> node_handle_;
std::shared_ptr<rcl_node_t> node_handle_;
rclcpp::context::Context::SharedPtr context_;
@ -286,7 +288,7 @@ private:
mutable std::mutex mutex_;
/// Guard condition for notifying the Executor of changes to this node.
rmw_guard_condition_t * notify_guard_condition_;
rcl_guard_condition_t notify_guard_condition_ = rcl_get_zero_initialized_guard_condition();
std::map<std::string, rclcpp::parameter::ParameterVariant> parameters_;

View file

@ -30,6 +30,9 @@
#include <utility>
#include <vector>
#include "rcl/publisher.h"
#include "rcl/subscription.h"
#include "rcl_interfaces/msg/intra_process_message.hpp"
#include "rclcpp/contexts/default_context.hpp"
@ -70,33 +73,20 @@ Node::create_publisher(
if (!allocator) {
allocator = std::make_shared<Alloc>();
}
using rosidl_generator_cpp::get_message_type_support_handle;
auto type_support_handle = get_message_type_support_handle<MessageT>();
rmw_publisher_t * publisher_handle = rmw_create_publisher(
node_handle_.get(), type_support_handle, topic_name.c_str(), &qos_profile);
if (!publisher_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create publisher: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}
auto publisher_options = rcl_publisher_get_default_options();
publisher_options.qos = qos_profile;
auto message_alloc =
std::make_shared<typename publisher::Publisher<MessageT, Alloc>::MessageAlloc>(
*allocator.get());
publisher_options.allocator = allocator::get_rcl_allocator<MessageT>(
*message_alloc.get());
auto publisher = publisher::Publisher<MessageT, Alloc>::make_shared(
node_handle_, publisher_handle, topic_name, qos_profile.depth, allocator);
node_handle_, topic_name, publisher_options, message_alloc);
if (use_intra_process_comms_) {
rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher(
node_handle_.get(), rclcpp::type_support::get_intra_process_message_msg_type_support(),
(topic_name + "__intra").c_str(), &qos_profile);
if (!intra_process_publisher_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process publisher: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}
auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
uint64_t intra_process_publisher_id =
@ -132,9 +122,9 @@ Node::create_publisher(
publisher->setup_intra_process(
intra_process_publisher_id,
shared_publish_callback,
intra_process_publisher_handle);
publisher_options);
}
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
if (rcl_trigger_guard_condition(&notify_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on publisher creation: ") + rmw_get_error_string());
@ -162,46 +152,37 @@ Node::create_subscription(
Alloc> any_subscription_callback(allocator);
any_subscription_callback.set(std::forward<CallbackT>(callback));
using rosidl_generator_cpp::get_message_type_support_handle;
if (!msg_mem_strat) {
msg_mem_strat =
rclcpp::message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::create_default();
}
auto message_alloc =
std::make_shared<typename subscription::Subscription<MessageT, Alloc>::MessageAlloc>();
auto type_support_handle = get_message_type_support_handle<MessageT>();
rmw_subscription_t * subscriber_handle = rmw_create_subscription(
node_handle_.get(), type_support_handle,
topic_name.c_str(), &qos_profile, ignore_local_publications);
if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create subscription: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
auto subscription_options = rcl_subscription_get_default_options();
subscription_options.qos = qos_profile;
subscription_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
*message_alloc.get());
subscription_options.ignore_local_publications = ignore_local_publications;
using rclcpp::subscription::Subscription;
using rclcpp::subscription::SubscriptionBase;
auto sub = Subscription<MessageT, Alloc>::make_shared(
node_handle_,
subscriber_handle,
topic_name,
ignore_local_publications,
subscription_options,
any_subscription_callback,
msg_mem_strat);
auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
// Setup intra process.
if (use_intra_process_comms_) {
rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription(
node_handle_.get(), rclcpp::type_support::get_intra_process_message_msg_type_support(),
(topic_name + "__intra").c_str(), &qos_profile, false);
if (!subscriber_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process subscription: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
auto intra_process_options = rcl_subscription_get_default_options();
intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
*message_alloc.get());
intra_process_options.qos = qos_profile;
intra_process_options.ignore_local_publications = false;
auto intra_process_manager =
context_->get_sub_context<rclcpp::intra_process_manager::IntraProcessManager>();
rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager;
@ -210,7 +191,6 @@ Node::create_subscription(
// *INDENT-OFF*
sub->setup_intra_process(
intra_process_subscription_id,
intra_process_subscriber_handle,
[weak_ipm](
uint64_t publisher_id,
uint64_t message_sequence,
@ -233,7 +213,8 @@ Node::create_subscription(
"intra process publisher check called after destruction of intra process manager");
}
return ipm->matches_any_publishers(sender_gid);
}
},
intra_process_options
);
// *INDENT-ON*
}
@ -248,7 +229,7 @@ Node::create_subscription(
default_callback_group_->add_subscription(sub_base_ptr);
}
number_of_subscriptions_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
if (rcl_trigger_guard_condition(&notify_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on subscription creation: ") + rmw_get_error_string());
@ -299,7 +280,7 @@ Node::create_wall_timer(
default_callback_group_->add_timer(timer);
}
number_of_timers_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
if (rcl_trigger_guard_condition(&notify_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on timer creation: ") + rmw_get_error_string());
@ -314,27 +295,16 @@ Node::create_client(
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group)
{
using rosidl_generator_cpp::get_service_type_support_handle;
auto service_type_support_handle =
get_service_type_support_handle<ServiceT>();
rmw_client_t * client_handle = rmw_create_client(
this->node_handle_.get(), service_type_support_handle, service_name.c_str(), &qos_profile);
if (!client_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create client: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}
rcl_client_options_t options = rcl_client_get_default_options();
options.qos = qos_profile;
using rclcpp::client::Client;
using rclcpp::client::ClientBase;
auto cli = Client<ServiceT>::make_shared(
node_handle_,
client_handle,
service_name);
service_name,
options);
auto cli_base_ptr = std::dynamic_pointer_cast<ClientBase>(cli);
if (group) {
@ -348,7 +318,7 @@ Node::create_client(
}
number_of_clients_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
if (rcl_trigger_guard_condition(&notify_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on client creation: ") + rmw_get_error_string());
@ -364,25 +334,15 @@ Node::create_service(
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group)
{
using rosidl_generator_cpp::get_service_type_support_handle;
auto service_type_support_handle =
get_service_type_support_handle<ServiceT>();
rclcpp::service::AnyServiceCallback<ServiceT> any_service_callback;
any_service_callback.set(std::forward<CallbackT>(callback));
rmw_service_t * service_handle = rmw_create_service(
node_handle_.get(), service_type_support_handle, service_name.c_str(), &qos_profile);
if (!service_handle) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create service: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
}
rcl_service_options_t service_options = rcl_service_get_default_options();
service_options.qos = qos_profile;
auto serv = service::Service<ServiceT>::make_shared(
node_handle_, service_handle, service_name, any_service_callback);
node_handle_,
service_name, any_service_callback, service_options);
auto serv_base_ptr = std::dynamic_pointer_cast<service::ServiceBase>(serv);
if (group) {
if (!group_in_node(group)) {
@ -394,7 +354,7 @@ Node::create_service(
default_callback_group_->add_service(serv_base_ptr);
}
number_of_services_++;
if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
if (rcl_trigger_guard_condition(&notify_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(
std::string(
"Failed to notify waitset on service creation: ") + rmw_get_error_string());

View file

@ -24,12 +24,16 @@
#include <sstream>
#include <string>
#include "rcl/error_handling.h"
#include "rcl/publisher.h"
#include "rcl_interfaces/msg/intra_process_message.hpp"
#include "rmw/impl/cpp/demangle.hpp"
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
@ -52,21 +56,18 @@ public:
/**
* Typically, a publisher is not created through this method, but instead is created through a
* call to `Node::create_publisher`.
* \param[in] node_handle The corresponding rmw representation of the owner node.
* \param[in] publisher_handle The rmw publisher handle corresponding to this publisher.
* \param[in] node_handle The corresponding rcl representation of the owner node.
* \param[in] topic The topic that this publisher publishes on.
* \param[in] queue_size The maximum number of unpublished messages to queue.
*/
RCLCPP_PUBLIC
PublisherBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_publisher_t * publisher_handle,
std::shared_ptr<rcl_node_t> node_handle,
std::string topic,
size_t queue_size);
/// Default destructor.
RCLCPP_PUBLIC
virtual ~PublisherBase();
~PublisherBase();
/// Get the topic that this publisher publishes on.
// \return The topic name.
@ -120,12 +121,13 @@ protected:
setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
rmw_publisher_t * intra_process_publisher_handle);
const rcl_publisher_options_t & intra_process_options);
std::shared_ptr<rmw_node_t> node_handle_;
std::shared_ptr<rcl_node_t> node_handle_;
rmw_publisher_t * publisher_handle_;
rmw_publisher_t * intra_process_publisher_handle_;
rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_allocator_t rcl_allocator_ = rcl_get_default_allocator();
std::string topic_;
size_t queue_size_;
@ -152,15 +154,54 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(Publisher<MessageT, Alloc>);
Publisher(
std::shared_ptr<rmw_node_t> node_handle,
rmw_publisher_t * publisher_handle,
std::shared_ptr<rcl_node_t> node_handle,
std::string topic,
size_t queue_size,
std::shared_ptr<Alloc> allocator)
: PublisherBase(node_handle, publisher_handle, topic, queue_size)
const rcl_publisher_options_t & publisher_options,
std::shared_ptr<MessageAlloc> allocator)
: PublisherBase(node_handle, topic, publisher_options.qos.depth), message_allocator_(allocator)
{
message_allocator_ = std::make_shared<MessageAlloc>(*allocator.get());
using rosidl_generator_cpp::get_message_type_support_handle;
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
rcl_allocator_ = publisher_options.allocator;
auto type_support_handle = get_message_type_support_handle<MessageT>();
if (rcl_publisher_init(
&publisher_handle_, node_handle_.get(), type_support_handle,
topic.c_str(), &publisher_options) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("could not create publisher: ") +
rcl_get_error_string_safe());
}
// Life time of this object is tied to the publisher handle.
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(&publisher_handle_);
if (!publisher_rmw_handle) {
throw std::runtime_error(
std::string("failed to get rmw handle: ") + rcl_get_error_string_safe());
}
if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to get publisher gid: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
virtual ~Publisher()
{
if (rcl_publisher_fini(&intra_process_publisher_handle_, node_handle_.get()) != RCL_RET_OK) {
fprintf(
stderr,
"Error in destruction of intra process rcl publisher handle: %s\n",
rcl_get_error_string_safe());
}
if (rcl_publisher_fini(&publisher_handle_, node_handle_.get()) != RCL_RET_OK) {
fprintf(
stderr,
"Error in destruction of rcl publisher handle: %s\n",
rcl_get_error_string_safe());
}
}
@ -188,11 +229,11 @@ public:
rcl_interfaces::msg::IntraProcessMessage ipm;
ipm.publisher_id = intra_process_publisher_id_;
ipm.message_sequence = message_seq;
auto status = rmw_publish(intra_process_publisher_handle_, &ipm);
if (status != RMW_RET_OK) {
auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
if (status != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish intra process message: ") + rmw_get_error_string_safe());
std::string("failed to publish intra process message: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}
} else {
@ -263,11 +304,11 @@ protected:
void
do_inter_process_publish(const MessageT * msg)
{
auto status = rmw_publish(publisher_handle_, msg);
if (status != RMW_RET_OK) {
auto status = rcl_publish(&publisher_handle_, msg);
if (status != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to publish message: ") + rmw_get_error_string_safe());
std::string("failed to publish message: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}
}

View file

@ -21,8 +21,12 @@
#include <sstream>
#include <string>
#include "rcl/error_handling.h"
#include "rcl/service.h"
#include "rclcpp/any_service_callback.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
@ -39,19 +43,18 @@ public:
RCLCPP_PUBLIC
ServiceBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_service_t * service_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string service_name);
RCLCPP_PUBLIC
virtual ~ServiceBase();
~ServiceBase();
RCLCPP_PUBLIC
std::string
get_service_name();
RCLCPP_PUBLIC
const rmw_service_t *
const rcl_service_t *
get_service_handle();
virtual std::shared_ptr<void> create_request() = 0;
@ -60,12 +63,12 @@ public:
std::shared_ptr<rmw_request_id_t> request_header,
std::shared_ptr<void> request) = 0;
private:
protected:
RCLCPP_DISABLE_COPY(ServiceBase);
std::shared_ptr<rmw_node_t> node_handle_;
std::shared_ptr<rcl_node_t> node_handle_;
rmw_service_t * service_handle_;
rcl_service_t service_handle_ = rcl_get_zero_initialized_service();
std::string service_name_;
};
@ -88,15 +91,37 @@ public:
RCLCPP_SMART_PTR_DEFINITIONS(Service);
Service(
std::shared_ptr<rmw_node_t> node_handle,
rmw_service_t * service_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string & service_name,
AnyServiceCallback<ServiceT> any_callback)
: ServiceBase(node_handle, service_handle, service_name), any_callback_(any_callback)
{}
AnyServiceCallback<ServiceT> any_callback,
rcl_service_options_t & service_options)
: ServiceBase(node_handle, service_name), any_callback_(any_callback)
{
using rosidl_generator_cpp::get_service_type_support_handle;
auto service_type_support_handle = get_service_type_support_handle<ServiceT>();
if (rcl_service_init(
&service_handle_, node_handle.get(), service_type_support_handle, service_name.c_str(),
&service_options) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("could not create service: ") +
rcl_get_error_string_safe());
}
}
Service() = delete;
virtual ~Service()
{
if (rcl_service_fini(&service_handle_, node_handle_.get()) != RCL_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rcl service_handle_ handle: " <<
rcl_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
}
std::shared_ptr<void> create_request()
{
return std::shared_ptr<void>(new typename ServiceT::Request());
@ -122,11 +147,12 @@ public:
std::shared_ptr<rmw_request_id_t> req_id,
std::shared_ptr<typename ServiceT::Response> response)
{
rmw_ret_t status = rmw_send_response(get_service_handle(), req_id.get(), response.get());
if (status != RMW_RET_OK) {
rcl_ret_t status = rcl_send_response(get_service_handle(), req_id.get(), response.get());
if (status != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to send response: ") + rmw_get_error_string_safe());
std::string("failed to send response: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}
}

View file

@ -18,6 +18,8 @@
#include <memory>
#include <vector>
#include "rcl/allocator.h"
#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node.hpp"
@ -64,7 +66,7 @@ public:
allocator_ = std::make_shared<VoidAlloc>();
}
void add_guard_condition(const rmw_guard_condition_t * guard_condition)
void add_guard_condition(const rcl_guard_condition_t * guard_condition)
{
for (const auto & existing_guard_condition : guard_conditions_) {
if (existing_guard_condition == guard_condition) {
@ -74,7 +76,7 @@ public:
guard_conditions_.push_back(guard_condition);
}
void remove_guard_condition(const rmw_guard_condition_t * guard_condition)
void remove_guard_condition(const rcl_guard_condition_t * guard_condition)
{
for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
if (*it == guard_condition) {
@ -84,69 +86,40 @@ public:
}
}
size_t fill_subscriber_handles(void ** & ptr)
{
for (auto & subscription : subscriptions_) {
subscriber_handles_.push_back(subscription->get_subscription_handle()->data);
if (subscription->get_intra_process_subscription_handle()) {
subscriber_handles_.push_back(subscription->get_intra_process_subscription_handle()->data);
}
}
ptr = subscriber_handles_.data();
return subscriber_handles_.size();
}
// return the new number of services
size_t fill_service_handles(void ** & ptr)
{
for (auto & service : services_) {
service_handles_.push_back(service->get_service_handle()->data);
}
ptr = service_handles_.data();
return service_handles_.size();
}
// return the new number of clients
size_t fill_client_handles(void ** & ptr)
{
for (auto & client : clients_) {
client_handles_.push_back(client->get_client_handle()->data);
}
ptr = client_handles_.data();
return client_handles_.size();
}
size_t fill_guard_condition_handles(void ** & ptr)
{
for (const auto & guard_condition : guard_conditions_) {
if (guard_condition) {
guard_condition_handles_.push_back(guard_condition->data);
}
}
ptr = guard_condition_handles_.data();
return guard_condition_handles_.size();
}
void clear_active_entities()
{
subscriptions_.clear();
services_.clear();
clients_.clear();
}
void clear_handles()
{
subscriber_handles_.clear();
subscription_handles_.clear();
service_handles_.clear();
client_handles_.clear();
guard_condition_handles_.clear();
timer_handles_.clear();
}
void remove_null_handles()
virtual void remove_null_handles(rcl_wait_set_t * wait_set)
{
subscriber_handles_.erase(
std::remove(subscriber_handles_.begin(), subscriber_handles_.end(), nullptr),
subscriber_handles_.end()
for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) {
if (!wait_set->subscriptions[i]) {
subscription_handles_[i] = nullptr;
}
}
for (size_t i = 0; i < wait_set->size_of_services; ++i) {
if (!wait_set->services[i]) {
service_handles_[i] = nullptr;
}
}
for (size_t i = 0; i < wait_set->size_of_clients; ++i) {
if (!wait_set->clients[i]) {
client_handles_[i] = nullptr;
}
}
for (size_t i = 0; i < wait_set->size_of_timers; ++i) {
if (!wait_set->timers[i]) {
timer_handles_[i] = nullptr;
}
}
subscription_handles_.erase(
std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
subscription_handles_.end()
);
service_handles_.erase(
@ -159,9 +132,9 @@ public:
client_handles_.end()
);
guard_condition_handles_.erase(
std::remove(guard_condition_handles_.begin(), guard_condition_handles_.end(), nullptr),
guard_condition_handles_.end()
timer_handles_.erase(
std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
timer_handles_.end()
);
}
@ -182,18 +155,28 @@ public:
for (auto & weak_subscription : group->get_subscription_ptrs()) {
auto subscription = weak_subscription.lock();
if (subscription) {
subscriptions_.push_back(subscription);
subscription_handles_.push_back(subscription->get_subscription_handle());
if (subscription->get_intra_process_subscription_handle()) {
subscription_handles_.push_back(
subscription->get_intra_process_subscription_handle());
}
}
}
for (auto & service : group->get_service_ptrs()) {
if (service) {
services_.push_back(service);
service_handles_.push_back(service->get_service_handle());
}
}
for (auto & weak_client : group->get_client_ptrs()) {
auto client = weak_client.lock();
if (client) {
clients_.push_back(client);
client_handles_.push_back(client->get_client_handle());
}
}
for (auto & weak_timer : group->get_timer_ptrs()) {
auto timer = weak_timer.lock();
if (timer) {
timer_handles_.push_back(timer->get_timer_handle());
}
}
}
@ -201,6 +184,45 @@ public:
return has_invalid_weak_nodes;
}
bool add_handles_to_waitset(rcl_wait_set_t * wait_set)
{
for (auto subscription : subscription_handles_) {
if (rcl_wait_set_add_subscription(wait_set, subscription) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add subscription to waitset: %s\n", rcl_get_error_string_safe());
return false;
}
}
for (auto client : client_handles_) {
if (rcl_wait_set_add_client(wait_set, client) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add client to waitset: %s\n", rcl_get_error_string_safe());
return false;
}
}
for (auto service : service_handles_) {
if (rcl_wait_set_add_service(wait_set, service) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add service to waitset: %s\n", rcl_get_error_string_safe());
return false;
}
}
for (auto timer : timer_handles_) {
if (rcl_wait_set_add_timer(wait_set, timer) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add timer to waitset: %s\n", rcl_get_error_string_safe());
return false;
}
}
for (auto guard_condition : guard_conditions_) {
if (rcl_wait_set_add_guard_condition(wait_set, guard_condition) != RCL_RET_OK) {
fprintf(stderr, "Couldn't add guard_condition to waitset: %s\n",
rcl_get_error_string_safe());
return false;
}
}
return true;
}
/// Provide a newly initialized AnyExecutable object.
// \return Shared pointer to the fresh executable.
@ -213,21 +235,21 @@ public:
get_next_subscription(executor::AnyExecutable::SharedPtr any_exec,
const WeakNodeVector & weak_nodes)
{
auto it = subscriber_handles_.begin();
while (it != subscriber_handles_.end()) {
auto it = subscription_handles_.begin();
while (it != subscription_handles_.end()) {
auto subscription = get_subscription_by_handle(*it, weak_nodes);
if (subscription) {
// Figure out if this is for intra-process or not.
bool is_intra_process = false;
if (subscription->get_intra_process_subscription_handle()) {
is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it;
is_intra_process = subscription->get_intra_process_subscription_handle() == *it;
}
// Find the group for this handle and see if it can be serviced
auto group = get_group_by_subscription(subscription, weak_nodes);
if (!group) {
// Group was not found, meaning the subscription is not valid...
// Remove it from the ready list and continue looking
subscriber_handles_.erase(it);
subscription_handles_.erase(it);
continue;
}
if (!group->can_be_taken_from().load()) {
@ -244,11 +266,11 @@ public:
}
any_exec->callback_group = group;
any_exec->node = get_node_by_group(group, weak_nodes);
subscriber_handles_.erase(it);
subscription_handles_.erase(it);
return;
}
// Else, the subscription is no longer valid, remove it and continue
subscriber_handles_.erase(it);
subscription_handles_.erase(it);
}
}
@ -319,20 +341,47 @@ public:
}
}
virtual rcl_allocator_t get_allocator()
{
return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
}
size_t number_of_ready_subscriptions() const
{
return subscription_handles_.size();
}
size_t number_of_ready_services() const
{
return service_handles_.size();
}
size_t number_of_ready_clients() const
{
return client_handles_.size();
}
size_t number_of_guard_conditions() const
{
return guard_conditions_.size();
}
size_t number_of_ready_timers() const
{
return timer_handles_.size();
}
private:
template<typename T>
using VectorRebind =
std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;
VectorRebind<rclcpp::subscription::SubscriptionBase::SharedPtr> subscriptions_;
VectorRebind<rclcpp::service::ServiceBase::SharedPtr> services_;
VectorRebind<rclcpp::client::ClientBase::SharedPtr> clients_;
VectorRebind<const rmw_guard_condition_t *> guard_conditions_;
VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
VectorRebind<void *> subscriber_handles_;
VectorRebind<void *> service_handles_;
VectorRebind<void *> client_handles_;
VectorRebind<void *> guard_condition_handles_;
VectorRebind<const rcl_subscription_t *> subscription_handles_;
VectorRebind<const rcl_service_t *> service_handles_;
VectorRebind<const rcl_client_t *> client_handles_;
VectorRebind<const rcl_timer_t *> timer_handles_;
std::shared_ptr<ExecAlloc> executable_allocator_;
std::shared_ptr<VoidAlloc> allocator_;

View file

@ -24,11 +24,15 @@
#include <sstream>
#include <string>
#include "rcl/error_handling.h"
#include "rcl/subscription.h"
#include "rcl_interfaces/msg/intra_process_message.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_memory_strategy.hpp"
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
namespace rclcpp
@ -51,14 +55,13 @@ public:
/// Default constructor.
/**
* \param[in] node_handle The rmw representation of the node that owns this subscription.
* \param[in] node_handle The rcl representation of the node that owns this subscription.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] ignore_local_publications True to ignore local publications (unused).
*/
RCLCPP_PUBLIC
SubscriptionBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_subscription_t * subscription_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string & topic_name,
bool ignore_local_publications);
@ -72,11 +75,11 @@ public:
get_topic_name() const;
RCLCPP_PUBLIC
const rmw_subscription_t *
const rcl_subscription_t *
get_subscription_handle() const;
RCLCPP_PUBLIC
const rmw_subscription_t *
virtual const rcl_subscription_t *
get_intra_process_subscription_handle() const;
/// Borrow a new message.
@ -101,15 +104,12 @@ public:
const rmw_message_info_t & message_info) = 0;
protected:
rmw_subscription_t * intra_process_subscription_handle_;
rcl_subscription_t intra_process_subscription_handle_ = rcl_get_zero_initialized_subscription();
rcl_subscription_t subscription_handle_ = rcl_get_zero_initialized_subscription();
std::shared_ptr<rcl_node_t> node_handle_;
private:
RCLCPP_DISABLE_COPY(SubscriptionBase);
std::shared_ptr<rmw_node_t> node_handle_;
rmw_subscription_t * subscription_handle_;
std::string topic_name_;
bool ignore_local_publications_;
};
@ -134,27 +134,37 @@ public:
/**
* The constructor for a subscription is almost never called directly. Instead, subscriptions
* should be instantiated through Node::create_subscription.
* \param[in] node_handle rmw representation of the node that owns this subscription.
* \param[in] node_handle rcl representation of the node that owns this subscription.
* \param[in] topic_name Name of the topic to subscribe to.
* \param[in] ignore_local_publications True to ignore local publications (unused).
* \param[in] callback User-defined callback to call when a message is received.
* \param[in] memory_strategy The memory strategy to be used for managing message memory.
*/
Subscription(
std::shared_ptr<rmw_node_t> node_handle,
rmw_subscription_t * subscription_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string & topic_name,
bool ignore_local_publications,
const rcl_subscription_options_t & subscription_options,
AnySubscriptionCallback<MessageT, Alloc> callback,
typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
memory_strategy = message_memory_strategy::MessageMemoryStrategy<MessageT,
Alloc>::create_default())
: SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications),
: SubscriptionBase(
node_handle, topic_name, subscription_options.ignore_local_publications),
any_callback_(callback),
message_memory_strategy_(memory_strategy),
get_intra_process_message_callback_(nullptr),
matches_any_intra_process_publishers_(nullptr)
{
using rosidl_generator_cpp::get_message_type_support_handle;
auto type_support_handle = get_message_type_support_handle<MessageT>();
if (rcl_subscription_init(
&subscription_handle_, node_handle_.get(), type_support_handle, topic_name.c_str(),
&subscription_options) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("could not create subscription: ") + rcl_get_error_string_safe());
}
}
/// Support dynamically setting the message memory strategy.
@ -231,16 +241,36 @@ private:
void setup_intra_process(
uint64_t intra_process_subscription_id,
rmw_subscription_t * intra_process_subscription,
GetMessageCallbackType get_message_callback,
MatchesAnyPublishersCallbackType matches_any_publisher_callback)
MatchesAnyPublishersCallbackType matches_any_publisher_callback,
const rcl_subscription_options_t & intra_process_options)
{
if (rcl_subscription_init(
&intra_process_subscription_handle_, node_handle_.get(),
rclcpp::type_support::get_intra_process_message_msg_type_support(),
(get_topic_name() + "__intra").c_str(),
&intra_process_options) != RCL_RET_OK)
{
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("could not create intra process subscription: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}
intra_process_subscription_id_ = intra_process_subscription_id;
intra_process_subscription_handle_ = intra_process_subscription;
get_intra_process_message_callback_ = get_message_callback;
matches_any_intra_process_publishers_ = matches_any_publisher_callback;
}
const rcl_subscription_t *
get_intra_process_subscription_handle() const
{
if (!get_intra_process_message_callback_) {
return nullptr;
}
return &intra_process_subscription_handle_;
}
RCLCPP_DISABLE_COPY(Subscription);
AnySubscriptionCallback<MessageT, Alloc> any_callback_;

View file

@ -27,6 +27,10 @@
#include "rclcpp/rate.hpp"
#include "rclcpp/utilities.hpp"
#include "rclcpp/visibility_control.hpp"
#include "rcl/error_handling.h"
#include "rcl/timer.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
@ -44,7 +48,7 @@ public:
explicit TimerBase(std::chrono::nanoseconds period);
RCLCPP_PUBLIC
virtual ~TimerBase();
~TimerBase();
RCLCPP_PUBLIC
void
@ -54,27 +58,31 @@ public:
virtual void
execute_callback() = 0;
RCLCPP_PUBLIC
const rcl_timer_t *
get_timer_handle();
/// Check how long the timer has until its next scheduled callback.
// \return A std::chrono::duration representing the relative time until the next callback.
virtual std::chrono::nanoseconds
time_until_trigger() = 0;
RCLCPP_PUBLIC
std::chrono::nanoseconds
time_until_trigger();
/// Is the clock steady (i.e. is the time between ticks constant?)
// \return True if the clock used by this timer is steady.
virtual bool is_steady() = 0;
/// Check if the timer needs to trigger the callback.
/// Check if the timer is ready to trigger the callback.
/**
* This function expects its caller to immediately trigger the callback after this function,
* since it maintains the last time the callback was triggered.
* \return True if the timer needs to trigger.
*/
virtual bool check_and_trigger() = 0;
RCLCPP_PUBLIC
bool is_ready();
protected:
std::chrono::nanoseconds period_;
bool canceled_;
rcl_timer_t timer_handle_ = rcl_get_zero_initialized_timer();
};
@ -102,10 +110,8 @@ public:
* \param[in] callback User-specified callback function.
*/
GenericTimer(std::chrono::nanoseconds period, FunctorT && callback)
: TimerBase(period), callback_(std::forward<FunctorT>(callback)), loop_rate_(period)
: TimerBase(period), callback_(std::forward<FunctorT>(callback))
{
/* Set last_triggered_time_ so that the timer fires at least one period after being created. */
last_triggered_time_ = Clock::now();
}
/// Default destructor.
@ -113,11 +119,21 @@ public:
{
// Stop the timer from running.
cancel();
if (rcl_timer_fini(&timer_handle_) != RCL_RET_OK) {
fprintf(stderr, "Failed to clean up rcl timer handle: %s\n", rcl_get_error_string_safe());
}
}
void
execute_callback()
{
rcl_ret_t ret = rcl_timer_call(&timer_handle_);
if (ret == RCL_RET_TIMER_CANCELED) {
return;
}
if (ret != RCL_RET_OK) {
throw std::runtime_error("Failed to notify timer that callback occurred");
}
execute_callback_delegate<>();
}
@ -146,39 +162,6 @@ public:
callback_(*this);
}
bool
check_and_trigger()
{
if (canceled_) {
return false;
}
if (Clock::now() < last_triggered_time_) {
return false;
}
if (std::chrono::duration_cast<std::chrono::nanoseconds>(Clock::now() - last_triggered_time_) >=
loop_rate_.period())
{
last_triggered_time_ = Clock::now();
return true;
}
return false;
}
std::chrono::nanoseconds
time_until_trigger()
{
std::chrono::nanoseconds time_until_trigger;
// Calculate the time between the next trigger and the current time
if (last_triggered_time_ + loop_rate_.period() < Clock::now()) {
// time is overdue, need to trigger immediately
time_until_trigger = std::chrono::nanoseconds::zero();
} else {
time_until_trigger = std::chrono::duration_cast<std::chrono::nanoseconds>(
last_triggered_time_ - Clock::now()) + loop_rate_.period();
}
return time_until_trigger;
}
virtual bool
is_steady()
{
@ -189,8 +172,6 @@ protected:
RCLCPP_DISABLE_COPY(GenericTimer);
FunctorT callback_;
rclcpp::rate::GenericRate<Clock> loop_rate_;
std::chrono::time_point<Clock> last_triggered_time_;
};
template<typename CallbackType>

View file

@ -18,6 +18,9 @@
#include <chrono>
#include "rclcpp/visibility_control.hpp"
#include "rcl/guard_condition.h"
#include "rmw/macros.h"
#include "rmw/rmw.h"
@ -48,7 +51,7 @@ shutdown();
/// Get a handle to the rmw guard condition that manages the signal handler.
RCLCPP_PUBLIC
rmw_guard_condition_t *
rcl_guard_condition_t *
get_global_sigint_guard_condition();
/// Use the global condition variable to block for the specified amount of time.

View file

@ -16,6 +16,7 @@
<build_export_depend>rcl_interfaces</build_export_depend>
<build_export_depend>rosidl_generator_cpp</build_export_depend>
<depend>rcl</depend>
<depend>rmw_implementation</depend>
<exec_depend>ament_cmake</exec_depend>

View file

@ -22,20 +22,13 @@
using rclcpp::client::ClientBase;
ClientBase::ClientBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_client_t * client_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string & service_name)
: node_handle_(node_handle), client_handle_(client_handle), service_name_(service_name)
: node_handle_(node_handle), service_name_(service_name)
{}
ClientBase::~ClientBase()
{
if (client_handle_) {
if (rmw_destroy_client(client_handle_) != RMW_RET_OK) {
fprintf(stderr,
"Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe());
}
}
}
const std::string &
@ -44,8 +37,8 @@ ClientBase::get_service_name() const
return this->service_name_;
}
const rmw_client_t *
const rcl_client_t *
ClientBase::get_client_handle() const
{
return this->client_handle_;
return &client_handle_;
}

View file

@ -16,6 +16,9 @@
#include <string>
#include <type_traits>
#include "rcl/allocator.h"
#include "rcl/error_handling.h"
#include "rclcpp/executor.hpp"
#include "rclcpp/scope_exit.hpp"
@ -30,29 +33,33 @@ Executor::Executor(const ExecutorArgs & args)
: spinning(false),
memory_strategy_(args.memory_strategy)
{
interrupt_guard_condition_ = rmw_create_guard_condition();
if (!interrupt_guard_condition_) {
throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor");
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
if (rcl_guard_condition_init(
&interrupt_guard_condition_, guard_condition_options) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("Failed to create interrupt guard condition in Executor constructor: ") +
rcl_get_error_string_safe());
}
// The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
// The size of guard conditions is variable because the number of nodes can vary
// Put the global ctrl-c guard condition in
memory_strategy_->add_guard_condition(rclcpp::utilities::get_global_sigint_guard_condition());
// Put the executor's guard condition in
memory_strategy_->add_guard_condition(interrupt_guard_condition_);
memory_strategy_->add_guard_condition(&interrupt_guard_condition_);
rcl_allocator_t allocator = memory_strategy_->get_allocator();
waitset_ = rmw_create_waitset(args.max_conditions);
if (!waitset_) {
if (rcl_wait_set_init(
&waitset_, 0, 2, 0, 0, 0, allocator) != RCL_RET_OK)
{
fprintf(stderr,
"[rclcpp::error] failed to create waitset: %s\n", rmw_get_error_string_safe());
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
"[rclcpp::error] failed to create waitset: %s\n", rcl_get_error_string_safe());
if (rcl_guard_condition_fini(&interrupt_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
}
throw std::runtime_error("Failed to create waitset in Executor constructor");
}
@ -60,21 +67,15 @@ Executor::Executor(const ExecutorArgs & args)
Executor::~Executor()
{
// Try to deallocate the waitset.
if (waitset_) {
rmw_ret_t status = rmw_destroy_waitset(waitset_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy waitset: %s\n", rmw_get_error_string_safe());
}
// Finalize the waitset.
if (rcl_wait_set_fini(&waitset_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy waitset: %s\n", rcl_get_error_string_safe());
}
// Try to deallocate the interrupt guard condition.
if (interrupt_guard_condition_ != nullptr) {
rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
}
// Finalize the interrupt guard condition.
if (rcl_guard_condition_fini(&interrupt_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
}
}
@ -96,9 +97,8 @@ Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
weak_nodes_.push_back(node_ptr);
if (notify) {
// Interrupt waiting to handle new node
rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
throw std::runtime_error(rmw_get_error_string_safe());
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string_safe());
}
}
// Add the node's notify condition to the guard condition handles
@ -126,9 +126,8 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify)
if (notify) {
// If the node was matched and removed, interrupt waiting
if (node_removed) {
rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
throw std::runtime_error(rmw_get_error_string_safe());
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string_safe());
}
}
}
@ -184,9 +183,8 @@ void
Executor::cancel()
{
spinning.store(false);
rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
throw std::runtime_error(rmw_get_error_string_safe());
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string_safe());
}
}
@ -224,9 +222,8 @@ Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec)
any_exec->callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_);
if (status != RMW_RET_OK) {
throw std::runtime_error(rmw_get_error_string_safe());
if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) {
throw std::runtime_error(rcl_get_error_string_safe());
}
}
@ -235,20 +232,17 @@ Executor::execute_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription)
{
std::shared_ptr<void> message = subscription->create_message();
bool taken = false;
rmw_message_info_t message_info;
auto ret =
rmw_take_with_info(subscription->get_subscription_handle(),
message.get(), &taken, &message_info);
if (ret == RMW_RET_OK) {
if (taken) {
message_info.from_intra_process = false;
subscription->handle_message(message, message_info);
}
} else {
auto ret = rcl_take(subscription->get_subscription_handle(),
message.get(), &message_info);
if (ret == RCL_RET_OK) {
message_info.from_intra_process = false;
subscription->handle_message(message, message_info);
} else if (ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
fprintf(stderr,
"[rclcpp::error] take failed for subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
subscription->get_topic_name().c_str(), rcl_get_error_string_safe());
}
subscription->return_message(message);
}
@ -258,22 +252,19 @@ Executor::execute_intra_process_subscription(
rclcpp::subscription::SubscriptionBase::SharedPtr subscription)
{
rcl_interfaces::msg::IntraProcessMessage ipm;
bool taken = false;
rmw_message_info_t message_info;
rmw_ret_t status = rmw_take_with_info(
rcl_ret_t status = rcl_take(
subscription->get_intra_process_subscription_handle(),
&ipm,
&taken,
&message_info);
if (status == RMW_RET_OK) {
if (taken) {
message_info.from_intra_process = true;
subscription->handle_intra_process_message(ipm, message_info);
}
} else {
if (status == RCL_RET_OK) {
message_info.from_intra_process = true;
subscription->handle_intra_process_message(ipm, message_info);
} else if (status != RCL_RET_SUBSCRIPTION_TAKE_FAILED) {
fprintf(stderr,
"[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n",
subscription->get_topic_name().c_str(), rmw_get_error_string_safe());
subscription->get_topic_name().c_str(), rcl_get_error_string_safe());
}
}
@ -290,20 +281,18 @@ Executor::execute_service(
{
auto request_header = service->create_request_header();
std::shared_ptr<void> request = service->create_request();
bool taken = false;
rmw_ret_t status = rmw_take_request(
rcl_ret_t status = rcl_take_request(
service->get_service_handle(),
request_header.get(),
request.get(),
&taken);
if (status == RMW_RET_OK) {
if (taken) {
request.get());
if (status != RCL_RET_SERVICE_TAKE_FAILED) {
if (status == RCL_RET_OK) {
service->handle_request(request_header, request);
}
} else {
fprintf(stderr,
"[rclcpp::error] take request failed for server of service '%s': %s\n",
service->get_service_name().c_str(), rmw_get_error_string_safe());
service->get_service_name().c_str(), rcl_get_error_string_safe());
}
}
@ -313,29 +302,26 @@ Executor::execute_client(
{
auto request_header = client->create_request_header();
std::shared_ptr<void> response = client->create_response();
bool taken = false;
rmw_ret_t status = rmw_take_response(
rcl_ret_t status = rcl_take_response(
client->get_client_handle(),
request_header.get(),
response.get(),
&taken);
if (status == RMW_RET_OK) {
if (taken) {
response.get());
if (status != RCL_RET_SERVICE_TAKE_FAILED) {
if (status == RCL_RET_OK) {
client->handle_response(request_header, response);
}
} else {
fprintf(stderr,
"[rclcpp::error] take response failed for client of service '%s': %s\n",
client->get_service_name().c_str(), rmw_get_error_string_safe());
client->get_service_name().c_str(), rcl_get_error_string_safe());
}
}
void
Executor::wait_for_work(std::chrono::nanoseconds timeout)
{
memory_strategy_->clear_active_entities();
// Collect the subscriptions and timers to be waited on
memory_strategy_->clear_handles();
bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_);
// Clean up any invalid nodes, if they were detected
@ -353,61 +339,75 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
);
}
memory_strategy_->clear_handles();
// Use the number of subscriptions to allocate memory in the handles
rmw_subscriptions_t subscriber_handles;
subscriber_handles.subscriber_count =
memory_strategy_->fill_subscriber_handles(subscriber_handles.subscribers);
rmw_services_t service_handles;
service_handles.service_count =
memory_strategy_->fill_service_handles(service_handles.services);
rmw_clients_t client_handles;
client_handles.client_count =
memory_strategy_->fill_client_handles(client_handles.clients);
// construct a guard conditions struct
rmw_guard_conditions_t guard_conditions;
guard_conditions.guard_condition_count =
memory_strategy_->fill_guard_condition_handles(guard_conditions.guard_conditions);
rmw_time_t * wait_timeout = NULL;
rmw_time_t rmw_timeout;
auto next_timer_duration = get_earliest_timer();
// If the next timer timeout must preempt the requested timeout
// or if the requested timeout blocks forever, and there exists a valid timer,
// replace the requested timeout with the next timeout.
bool has_valid_timer = next_timer_duration >= std::chrono::nanoseconds::zero();
if ((next_timer_duration < timeout ||
timeout < std::chrono::nanoseconds::zero()) && has_valid_timer)
if (rcl_wait_set_resize_subscriptions(
&waitset_, memory_strategy_->number_of_ready_subscriptions()) != RCL_RET_OK)
{
rmw_timeout.sec =
std::chrono::duration_cast<std::chrono::seconds>(next_timer_duration).count();
rmw_timeout.nsec = next_timer_duration.count() % (1000 * 1000 * 1000);
wait_timeout = &rmw_timeout;
} else if (timeout >= std::chrono::nanoseconds::zero()) {
// Convert timeout representation to rmw_time
rmw_timeout.sec = std::chrono::duration_cast<std::chrono::seconds>(timeout).count();
rmw_timeout.nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count() %
(1000 * 1000 * 1000);
wait_timeout = &rmw_timeout;
throw std::runtime_error(
std::string("Couldn't resize the number of subscriptions in waitset : ") +
rcl_get_error_string_safe());
}
// Now wait on the waitable subscriptions and timers
rmw_ret_t status = rmw_wait(
&subscriber_handles,
&guard_conditions,
&service_handles,
&client_handles,
waitset_,
wait_timeout);
if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) {
throw std::runtime_error(rmw_get_error_string_safe());
if (rcl_wait_set_resize_services(
&waitset_, memory_strategy_->number_of_ready_services()) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("Couldn't resize the number of services in waitset : ") +
rcl_get_error_string_safe());
}
memory_strategy_->remove_null_handles();
if (rcl_wait_set_resize_clients(
&waitset_, memory_strategy_->number_of_ready_clients()) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("Couldn't resize the number of clients in waitset : ") +
rcl_get_error_string_safe());
}
if (rcl_wait_set_resize_guard_conditions(
&waitset_, memory_strategy_->number_of_guard_conditions()) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("Couldn't resize the number of guard_conditions in waitset : ") +
rcl_get_error_string_safe());
}
if (rcl_wait_set_resize_timers(
&waitset_, memory_strategy_->number_of_ready_timers()) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("Couldn't resize the number of timers in waitset : ") +
rcl_get_error_string_safe());
}
if (!memory_strategy_->add_handles_to_waitset(&waitset_)) {
throw std::runtime_error("Couldn't fill waitset");
}
rcl_ret_t status =
rcl_wait(&waitset_, std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count());
if (status == RCL_RET_WAIT_SET_EMPTY) {
fprintf(stderr, "Warning: empty waitset received in rcl_wait(). This should never happen.\n");
} else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) {
throw std::runtime_error(std::string("rcl_wait() failed: ") + rcl_get_error_string_safe());
}
// check the null handles in the waitset and remove them from the handles in memory strategy
// for callback-based entities
memory_strategy_->remove_null_handles(&waitset_);
if (rcl_wait_set_clear_subscriptions(&waitset_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear subscriptions from waitset");
}
if (rcl_wait_set_clear_services(&waitset_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear servicess from waitset");
}
if (rcl_wait_set_clear_clients(&waitset_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear clients from waitset");
}
if (rcl_wait_set_clear_guard_conditions(&waitset_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear guard conditions from waitset");
}
if (rcl_wait_set_clear_timers(&waitset_) != RCL_RET_OK) {
throw std::runtime_error("Couldn't clear timers from waitset");
}
}
rclcpp::node::Node::SharedPtr
@ -470,7 +470,7 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
}
for (auto & timer_ref : group->get_timer_ptrs()) {
auto timer = timer_ref.lock();
if (timer && timer->check_and_trigger()) {
if (timer && timer->is_ready()) {
any_exec->timer = timer;
any_exec->callback_group = group;
node = get_node_by_group(group);
@ -481,37 +481,6 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec)
}
}
std::chrono::nanoseconds
Executor::get_earliest_timer()
{
std::chrono::nanoseconds latest = std::chrono::nanoseconds::max();
bool timers_empty = true;
for (auto & weak_node : weak_nodes_) {
auto node = weak_node.lock();
if (!node) {
continue;
}
for (auto & weak_group : node->get_callback_groups()) {
auto group = weak_group.lock();
if (!group || !group->can_be_taken_from().load()) {
continue;
}
for (auto & timer_ref : group->get_timer_ptrs()) {
timers_empty = false;
// Check the expected trigger time
auto timer = timer_ref.lock();
if (timer && timer->time_until_trigger() < latest) {
latest = timer->time_until_trigger();
}
}
}
}
if (timers_empty) {
return std::chrono::nanoseconds(-1);
}
return latest;
}
AnyExecutable::SharedPtr
Executor::get_next_ready_executable()
{

View file

@ -18,7 +18,7 @@ using rclcpp::memory_strategy::MemoryStrategy;
rclcpp::subscription::SubscriptionBase::SharedPtr
MemoryStrategy::get_subscription_by_handle(
void * subscriber_handle, const WeakNodeVector & weak_nodes)
const rcl_subscription_t * subscriber_handle, const WeakNodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
@ -33,12 +33,10 @@ MemoryStrategy::get_subscription_by_handle(
for (auto & weak_subscription : group->get_subscription_ptrs()) {
auto subscription = weak_subscription.lock();
if (subscription) {
if (subscription->get_subscription_handle()->data == subscriber_handle) {
if (subscription->get_subscription_handle() == subscriber_handle) {
return subscription;
}
if (subscription->get_intra_process_subscription_handle() &&
subscription->get_intra_process_subscription_handle()->data == subscriber_handle)
{
if (subscription->get_intra_process_subscription_handle() == subscriber_handle) {
return subscription;
}
}
@ -49,7 +47,8 @@ MemoryStrategy::get_subscription_by_handle(
}
rclcpp::service::ServiceBase::SharedPtr
MemoryStrategy::get_service_by_handle(void * service_handle, const WeakNodeVector & weak_nodes)
MemoryStrategy::get_service_by_handle(const rcl_service_t * service_handle,
const WeakNodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
@ -62,7 +61,7 @@ MemoryStrategy::get_service_by_handle(void * service_handle, const WeakNodeVecto
continue;
}
for (auto & service : group->get_service_ptrs()) {
if (service->get_service_handle()->data == service_handle) {
if (service->get_service_handle() == service_handle) {
return service;
}
}
@ -72,7 +71,8 @@ MemoryStrategy::get_service_by_handle(void * service_handle, const WeakNodeVecto
}
rclcpp::client::ClientBase::SharedPtr
MemoryStrategy::get_client_by_handle(void * client_handle, const WeakNodeVector & weak_nodes)
MemoryStrategy::get_client_by_handle(const rcl_client_t * client_handle,
const WeakNodeVector & weak_nodes)
{
for (auto & weak_node : weak_nodes) {
auto node = weak_node.lock();
@ -86,7 +86,7 @@ MemoryStrategy::get_client_by_handle(void * client_handle, const WeakNodeVector
}
for (auto & weak_client : group->get_client_ptrs()) {
auto client = weak_client.lock();
if (client && client->get_client_handle()->data == client_handle) {
if (client && client->get_client_handle() == client_handle) {
return client;
}
}

View file

@ -36,13 +36,17 @@ Node::Node(
bool use_intra_process_comms)
: name_(node_name), context_(context),
number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0),
use_intra_process_comms_(use_intra_process_comms),
notify_guard_condition_(rmw_create_guard_condition())
use_intra_process_comms_(use_intra_process_comms)
{
if (!notify_guard_condition_) {
throw std::runtime_error("Failed to create guard condition for node: " +
std::string(rmw_get_error_string_safe()));
rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
if (rcl_guard_condition_init(
&notify_guard_condition_, guard_condition_options) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("Failed to create interrupt guard condition in Executor constructor: ") +
rcl_get_error_string_safe());
}
has_executor.store(false);
size_t domain_id = 0;
char * ros_domain_id = nullptr;
@ -56,9 +60,10 @@ Node::Node(
if (ros_domain_id) {
uint32_t number = strtoul(ros_domain_id, NULL, 0);
if (number == (std::numeric_limits<uint32_t>::max)()) {
if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
fprintf(
stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe());
// Finalize the interrupt guard condition.
if (rcl_guard_condition_fini(&notify_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
}
throw std::runtime_error("failed to interpret ROS_DOMAIN_ID as integral number");
}
@ -68,27 +73,30 @@ Node::Node(
#endif
}
auto node = rmw_create_node(name_.c_str(), domain_id);
if (!node) {
// *INDENT-OFF*
if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
rcl_node_t * rcl_node = new rcl_node_t(rcl_get_zero_initialized_node());
node_handle_.reset(rcl_node, [](rcl_node_t * node) {
if (rcl_node_fini(node) != RCL_RET_OK) {
fprintf(
stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe());
stderr, "Error in destruction of rmw node handle: %s\n", rcl_get_error_string_safe());
}
throw std::runtime_error(
std::string("could not create node: ") +
rmw_get_error_string_safe());
// *INDENT-ON*
delete node;
});
rcl_node_options_t options = rcl_node_get_default_options();
// TODO(jacquelinekay): Allocator options
options.domain_id = domain_id;
if (rcl_node_init(node_handle_.get(), name_.c_str(), &options) != RCL_RET_OK) {
// Finalize the interrupt guard condition.
if (rcl_guard_condition_fini(&notify_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
}
throw std::runtime_error(std::string(
"Could not initialize rcl node: ") + rcl_get_error_string_safe());
}
// Initialize node handle shared_ptr with custom deleter.
// *INDENT-OFF*
node_handle_.reset(node, [](rmw_node_t * node) {
auto ret = rmw_destroy_node(node);
if (ret != RMW_RET_OK) {
fprintf(
stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe());
}
});
// *INDENT-ON*
using rclcpp::callback_group::CallbackGroupType;
@ -100,9 +108,10 @@ Node::Node(
Node::~Node()
{
if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) {
fprintf(stderr, "Warning! Failed to destroy guard condition in Node destructor: %s\n",
rmw_get_error_string_safe());
// Finalize the interrupt guard condition.
if (rcl_guard_condition_fini(&notify_guard_condition_) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe());
}
}
@ -297,7 +306,8 @@ Node::get_topic_names_and_types() const
topic_names_and_types.topic_names = nullptr;
topic_names_and_types.type_names = nullptr;
auto ret = rmw_get_topic_names_and_types(node_handle_.get(), &topic_names_and_types);
auto ret = rmw_get_topic_names_and_types(rcl_node_get_rmw_handle(node_handle_.get()),
&topic_names_and_types);
if (ret != RMW_RET_OK) {
// *INDENT-OFF*
throw std::runtime_error(
@ -325,7 +335,8 @@ size_t
Node::count_publishers(const std::string & topic_name) const
{
size_t count;
auto ret = rmw_count_publishers(node_handle_.get(), topic_name.c_str(), &count);
auto ret = rmw_count_publishers(rcl_node_get_rmw_handle(node_handle_.get()),
topic_name.c_str(), &count);
if (ret != RMW_RET_OK) {
// *INDENT-OFF*
throw std::runtime_error(
@ -339,7 +350,8 @@ size_t
Node::count_subscribers(const std::string & topic_name) const
{
size_t count;
auto ret = rmw_count_subscribers(node_handle_.get(), topic_name.c_str(), &count);
auto ret = rmw_count_subscribers(rcl_node_get_rmw_handle(node_handle_.get()),
topic_name.c_str(), &count);
if (ret != RMW_RET_OK) {
// *INDENT-OFF*
throw std::runtime_error(
@ -356,7 +368,7 @@ Node::get_callback_groups() const
return callback_groups_;
}
rmw_guard_condition_t * Node::get_notify_guard_condition() const
const rcl_guard_condition_t * Node::get_notify_guard_condition() const
{
return notify_guard_condition_;
return &notify_guard_condition_;
}

View file

@ -33,42 +33,17 @@
using rclcpp::publisher::PublisherBase;
PublisherBase::PublisherBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_publisher_t * publisher_handle,
std::shared_ptr<rcl_node_t> node_handle,
std::string topic,
size_t queue_size)
: node_handle_(node_handle), publisher_handle_(publisher_handle),
intra_process_publisher_handle_(nullptr),
: node_handle_(node_handle),
topic_(topic), queue_size_(queue_size),
intra_process_publisher_id_(0), store_intra_process_message_(nullptr)
{
// Life time of this object is tied to the publisher handle.
if (rmw_get_gid_for_publisher(publisher_handle_, &rmw_gid_) != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to get publisher gid: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}
}
PublisherBase::~PublisherBase()
{
if (intra_process_publisher_handle_) {
if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) {
fprintf(
stderr,
"Error in destruction of intra process rmw publisher handle: %s\n",
rmw_get_error_string_safe());
}
}
if (publisher_handle_) {
if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) {
fprintf(
stderr,
"Error in destruction of rmw publisher handle: %s\n",
rmw_get_error_string_safe());
}
}
}
const std::string &
@ -124,13 +99,29 @@ void
PublisherBase::setup_intra_process(
uint64_t intra_process_publisher_id,
StoreMessageCallbackT callback,
rmw_publisher_t * intra_process_publisher_handle)
const rcl_publisher_options_t & intra_process_options)
{
if (rcl_publisher_init(
&intra_process_publisher_handle_, node_handle_.get(),
rclcpp::type_support::get_intra_process_message_msg_type_support(),
(topic_ + "__intra").c_str(), &intra_process_options) != RCL_RET_OK)
{
throw std::runtime_error(
std::string("could not create intra process publisher: ") +
rcl_get_error_string_safe());
}
intra_process_publisher_id_ = intra_process_publisher_id;
store_intra_process_message_ = callback;
intra_process_publisher_handle_ = intra_process_publisher_handle;
// Life time of this object is tied to the publisher handle.
auto ret = rmw_get_gid_for_publisher(intra_process_publisher_handle_, &intra_process_rmw_gid_);
rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(
&intra_process_publisher_handle_);
if (publisher_rmw_handle == nullptr) {
throw std::runtime_error(std::string(
"Failed to get rmw publisher handle") + rcl_get_error_string_safe());
}
auto ret = rmw_get_gid_for_publisher(
publisher_rmw_handle, &intra_process_rmw_gid_);
if (ret != RMW_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(

View file

@ -28,23 +28,13 @@
using rclcpp::service::ServiceBase;
ServiceBase::ServiceBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_service_t * service_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string service_name)
: node_handle_(node_handle), service_handle_(service_handle), service_name_(service_name)
: node_handle_(node_handle), service_name_(service_name)
{}
ServiceBase::~ServiceBase()
{
if (service_handle_) {
if (rmw_destroy_service(service_handle_) != RMW_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rmw service_handle_ handle: " <<
rmw_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
}
}
{}
std::string
ServiceBase::get_service_name()
@ -52,8 +42,8 @@ ServiceBase::get_service_name()
return this->service_name_;
}
const rmw_service_t *
const rcl_service_t *
ServiceBase::get_service_handle()
{
return this->service_handle_;
return &service_handle_;
}

View file

@ -24,13 +24,10 @@
using rclcpp::subscription::SubscriptionBase;
SubscriptionBase::SubscriptionBase(
std::shared_ptr<rmw_node_t> node_handle,
rmw_subscription_t * subscription_handle,
std::shared_ptr<rcl_node_t> node_handle,
const std::string & topic_name,
bool ignore_local_publications)
: intra_process_subscription_handle_(nullptr),
node_handle_(node_handle),
subscription_handle_(subscription_handle),
: node_handle_(node_handle),
topic_name_(topic_name),
ignore_local_publications_(ignore_local_publications)
{
@ -40,22 +37,19 @@ SubscriptionBase::SubscriptionBase(
SubscriptionBase::~SubscriptionBase()
{
if (subscription_handle_) {
if (rmw_destroy_subscription(node_handle_.get(), subscription_handle_) != RMW_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rmw subscription handle: " <<
rmw_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
if (rcl_subscription_fini(&subscription_handle_, node_handle_.get()) != RCL_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rcl subscription handle: " <<
rcl_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
if (intra_process_subscription_handle_) {
auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_);
if (ret != RMW_RET_OK) {
std::stringstream ss;
ss << "Error in destruction of rmw intra process subscription handle: " <<
rmw_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
if (rcl_subscription_fini(
&intra_process_subscription_handle_, node_handle_.get()) != RCL_RET_OK)
{
std::stringstream ss;
ss << "Error in destruction of rmw intra process subscription handle: " <<
rcl_get_error_string_safe() << '\n';
(std::cerr << ss.str()).flush();
}
}
@ -65,14 +59,14 @@ SubscriptionBase::get_topic_name() const
return this->topic_name_;
}
const rmw_subscription_t *
const rcl_subscription_t *
SubscriptionBase::get_subscription_handle() const
{
return subscription_handle_;
return &subscription_handle_;
}
const rmw_subscription_t *
const rcl_subscription_t *
SubscriptionBase::get_intra_process_subscription_handle() const
{
return intra_process_subscription_handle_;
return &intra_process_subscription_handle_;
}

View file

@ -15,13 +15,19 @@
#include "rclcpp/timer.hpp"
#include <chrono>
#include <string>
using rclcpp::timer::TimerBase;
TimerBase::TimerBase(std::chrono::nanoseconds period)
: period_(period),
canceled_(false)
{}
{
if (rcl_timer_init(
&timer_handle_, period.count(), nullptr,
rcl_get_default_allocator()) != RCL_RET_OK)
{
fprintf(stderr, "Couldn't initialize rcl timer handle: %s\n", rcl_get_error_string_safe());
}
}
TimerBase::~TimerBase()
{}
@ -29,5 +35,35 @@ TimerBase::~TimerBase()
void
TimerBase::cancel()
{
this->canceled_ = true;
if (rcl_timer_cancel(&timer_handle_) != RCL_RET_OK) {
throw std::runtime_error(std::string("Couldn't cancel timer: ") + rcl_get_error_string_safe());
}
}
bool
TimerBase::is_ready()
{
bool ready = false;
if (rcl_timer_is_ready(&timer_handle_, &ready) != RCL_RET_OK) {
throw std::runtime_error(std::string("Failed to check timer: ") + rcl_get_error_string_safe());
}
return ready;
}
std::chrono::nanoseconds
TimerBase::time_until_trigger()
{
int64_t time_until_next_call = 0;
if (rcl_timer_get_time_until_next_call(&timer_handle_, &time_until_next_call) != RCL_RET_OK) {
throw std::runtime_error(
std::string("Timer could not get time until next call: ") +
rcl_get_error_string_safe());
}
return std::chrono::nanoseconds(time_until_next_call);
}
const rcl_timer_t *
TimerBase::get_timer_handle()
{
return &timer_handle_;
}

View file

@ -22,6 +22,9 @@
#include <mutex>
#include <string>
#include "rcl/error_handling.h"
#include "rcl/rcl.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
@ -33,7 +36,8 @@
/// Represent the status of the global interrupt signal.
static volatile sig_atomic_t g_signal_status = 0;
/// Guard condition for interrupting the rmw implementation when the global interrupt signal fired.
static rmw_guard_condition_t * g_sigint_guard_cond_handle = rmw_create_guard_condition();
static rcl_guard_condition_t g_sigint_guard_cond_handle =
rcl_get_zero_initialized_guard_condition();
/// Condition variable for timed sleep (see sleep_for).
static std::condition_variable g_interrupt_condition_variable;
static std::atomic<bool> g_is_interrupted(false);
@ -77,10 +81,10 @@ signal_handler(int signal_value)
}
#endif
g_signal_status = signal_value;
rmw_ret_t status = rmw_trigger_guard_condition(g_sigint_guard_cond_handle);
if (status != RMW_RET_OK) {
rcl_ret_t status = rcl_trigger_guard_condition(&g_sigint_guard_cond_handle);
if (status != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe());
"[rclcpp::error] failed to trigger guard condition: %s\n", rcl_get_error_string_safe());
}
g_is_interrupted.store(true);
g_interrupt_condition_variable.notify_all();
@ -89,14 +93,11 @@ signal_handler(int signal_value)
void
rclcpp::utilities::init(int argc, char * argv[])
{
(void)argc;
(void)argv;
g_is_interrupted.store(false);
rmw_ret_t status = rmw_init();
if (status != RMW_RET_OK) {
if (rcl_init(argc, argv, rcl_get_default_allocator()) != RCL_RET_OK) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to initialize rmw implementation: ") + rmw_get_error_string_safe());
std::string("failed to initialize rmw implementation: ") + rcl_get_error_string_safe());
// *INDENT-ON*
}
#ifdef HAS_SIGACTION
@ -138,6 +139,11 @@ rclcpp::utilities::init(int argc, char * argv[])
error_string);
// *INDENT-ON*
}
rcl_guard_condition_options_t options = rcl_guard_condition_get_default_options();
if (rcl_guard_condition_init(&g_sigint_guard_cond_handle, options) != RCL_RET_OK) {
throw std::runtime_error(std::string(
"Couldn't initialize guard condition: ") + rcl_get_error_string_safe());
}
}
bool
@ -150,8 +156,7 @@ void
rclcpp::utilities::shutdown()
{
g_signal_status = SIGINT;
rmw_ret_t status = rmw_trigger_guard_condition(g_sigint_guard_cond_handle);
if (status != RMW_RET_OK) {
if (rcl_trigger_guard_condition(&g_sigint_guard_cond_handle) != RCL_RET_OK) {
fprintf(stderr,
"[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe());
}
@ -159,10 +164,10 @@ rclcpp::utilities::shutdown()
g_interrupt_condition_variable.notify_all();
}
rmw_guard_condition_t *
rcl_guard_condition_t *
rclcpp::utilities::get_global_sigint_guard_condition()
{
return ::g_sigint_guard_cond_handle;
return &::g_sigint_guard_cond_handle;
}
bool