From e961189be84c4d873dd88301c06483d971b62d51 Mon Sep 17 00:00:00 2001 From: Jackie Kay Date: Sun, 24 Apr 2016 21:25:19 +0000 Subject: [PATCH] Refactor to use rcl (#207) --- rclcpp/CMakeLists.txt | 16 +- rclcpp/cmake/get_rclcpp_information.cmake | 8 +- .../rclcpp/allocator/allocator_common.hpp | 60 ++++ rclcpp/include/rclcpp/client.hpp | 53 +++- rclcpp/include/rclcpp/executor.hpp | 24 +- rclcpp/include/rclcpp/memory_strategy.hpp | 42 +-- rclcpp/include/rclcpp/node.hpp | 16 +- rclcpp/include/rclcpp/node_impl.hpp | 126 +++----- rclcpp/include/rclcpp/publisher.hpp | 85 ++++-- rclcpp/include/rclcpp/service.hpp | 56 +++- .../strategies/allocator_memory_strategy.hpp | 205 ++++++++----- rclcpp/include/rclcpp/subscription.hpp | 68 +++-- rclcpp/include/rclcpp/timer.hpp | 73 ++--- rclcpp/include/rclcpp/utilities.hpp | 5 +- rclcpp/package.xml | 1 + rclcpp/src/rclcpp/client.cpp | 15 +- rclcpp/src/rclcpp/executor.cpp | 283 ++++++++---------- rclcpp/src/rclcpp/memory_strategy.cpp | 18 +- rclcpp/src/rclcpp/node.cpp | 76 +++-- rclcpp/src/rclcpp/publisher.cpp | 51 ++-- rclcpp/src/rclcpp/service.cpp | 20 +- rclcpp/src/rclcpp/subscription.cpp | 42 ++- rclcpp/src/rclcpp/timer.cpp | 44 ++- rclcpp/src/rclcpp/utilities.cpp | 31 +- 24 files changed, 796 insertions(+), 622 deletions(-) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index d5bee53..25a4b1b 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 2.8.3) project(rclcpp) find_package(ament_cmake REQUIRED) +find_package(rcl REQUIRED) find_package(rcl_interfaces REQUIRED) find_package(rmw REQUIRED) find_package(rmw_implementation REQUIRED) @@ -42,13 +43,14 @@ set(${PROJECT_NAME}_SRCS ) macro(target) + if(NOT "${target_suffix} " STREQUAL " ") + get_rcl_information("${rmw_implementation}" "rcl${target_suffix}") + endif() add_library(${PROJECT_NAME}${target_suffix} SHARED ${${PROJECT_NAME}_SRCS}) ament_target_dependencies(${PROJECT_NAME}${target_suffix} - "rcl_interfaces" - "rmw" - "rosidl_generator_cpp" - "${rmw_implementation}") + "rcl${target_suffix}" + "rosidl_generator_cpp") # Causes the visibility macros to use dllexport rather than dllimport, # which is appropriate when building the dll but not consuming it. @@ -66,9 +68,7 @@ endmacro() call_for_each_rmw_implementation(target GENERATE_DEFAULT) ament_export_dependencies(ament_cmake) -ament_export_dependencies(rcl_interfaces) -ament_export_dependencies(rmw) -ament_export_dependencies(rmw_implementation) +ament_export_dependencies(rcl) ament_export_dependencies(rosidl_generator_cpp) ament_export_include_directories(include) @@ -90,6 +90,7 @@ if(AMENT_ENABLE_TESTING) ament_add_gtest(test_mapped_ring_buffer test/test_mapped_ring_buffer.cpp) if(TARGET test_mapped_ring_buffer) target_include_directories(test_mapped_ring_buffer PUBLIC + ${rcl_INCLUDE_DIRS} ${rcl_interfaces_INCLUDE_DIRS} ${rmw_INCLUDE_DIRS} ${rosidl_generator_cpp_INCLUDE_DIRS} @@ -98,6 +99,7 @@ if(AMENT_ENABLE_TESTING) ament_add_gtest(test_intra_process_manager test/test_intra_process_manager.cpp) if(TARGET test_intra_process_manager) target_include_directories(test_intra_process_manager PUBLIC + ${rcl_INCLUDE_DIRS} ${rcl_interfaces_INCLUDE_DIRS} ${rmw_INCLUDE_DIRS} ${rosidl_generator_cpp_INCLUDE_DIRS} diff --git a/rclcpp/cmake/get_rclcpp_information.cmake b/rclcpp/cmake/get_rclcpp_information.cmake index 309c6b7..fff5371 100644 --- a/rclcpp/cmake/get_rclcpp_information.cmake +++ b/rclcpp/cmake/get_rclcpp_information.cmake @@ -28,6 +28,11 @@ macro(get_rclcpp_information rmw_implementation var_prefix) # so that the variables can be used by various functions / macros set(${var_prefix}_FOUND TRUE) + # Get rcl using the existing macro + if(NOT "${target_suffix} " STREQUAL " ") + get_rcl_information("${rmw_implementation}" "rcl${target_suffix}") + endif() + # include directories normalize_path(${var_prefix}_INCLUDE_DIRS "${rclcpp_DIR}/../../../include") @@ -63,8 +68,7 @@ macro(get_rclcpp_information rmw_implementation var_prefix) # dependencies set(_exported_dependencies "rcl_interfaces" - "rmw" - "${rmw_implementation}" + "rcl${target_suffix}" "rosidl_generator_cpp") set(${var_prefix}_DEFINITIONS) foreach(_dep ${_exported_dependencies}) diff --git a/rclcpp/include/rclcpp/allocator/allocator_common.hpp b/rclcpp/include/rclcpp/allocator/allocator_common.hpp index d32b200..01333e7 100644 --- a/rclcpp/include/rclcpp/allocator/allocator_common.hpp +++ b/rclcpp/include/rclcpp/allocator/allocator_common.hpp @@ -17,6 +17,8 @@ #include +#include "rcl/allocator.h" + #include "rclcpp/allocator/allocator_deleter.hpp" namespace rclcpp @@ -27,6 +29,64 @@ namespace allocator template using AllocRebind = typename std::allocator_traits::template rebind_traits; +template +void * retyped_allocate(size_t size, void * untyped_allocator) +{ + auto typed_allocator = static_cast(untyped_allocator); + if (!typed_allocator) { + throw std::runtime_error("Received incorrect allocator type"); + } + return std::allocator_traits::allocate(*typed_allocator, size); +} + +template +void retyped_deallocate(void * untyped_pointer, void * untyped_allocator) +{ + auto typed_allocator = static_cast(untyped_allocator); + if (!typed_allocator) { + throw std::runtime_error("Received incorrect allocator type"); + } + auto typed_ptr = static_cast(untyped_pointer); + std::allocator_traits::deallocate(*typed_allocator, typed_ptr, 1); +} + +template +void * retyped_reallocate(void * untyped_pointer, size_t size, void * untyped_allocator) +{ + auto typed_allocator = static_cast(untyped_allocator); + if (!typed_allocator) { + throw std::runtime_error("Received incorrect allocator type"); + } + auto typed_ptr = static_cast(untyped_pointer); + std::allocator_traits::deallocate(*typed_allocator, typed_ptr, 1); + return std::allocator_traits::allocate(*typed_allocator, size); +} + + +// Convert a std::allocator_traits-formatted Allocator into an rcl allocator +template>::value>::type * = nullptr> +rcl_allocator_t get_rcl_allocator(Alloc & allocator) +{ + rcl_allocator_t rcl_allocator = rcl_get_default_allocator(); +#ifndef _WIN32 + rcl_allocator.allocate = &retyped_allocate; + rcl_allocator.deallocate = &retyped_deallocate; + rcl_allocator.reallocate = &retyped_reallocate; + rcl_allocator.state = &allocator; +#endif + return rcl_allocator; +} + +// TODO(jacquelinekay) Workaround for an incomplete implementation of std::allocator +template>::value>::type * = nullptr> +rcl_allocator_t get_rcl_allocator(Alloc & allocator) +{ + (void)allocator; + return rcl_get_default_allocator(); +} + } // namespace allocator } // namespace rclcpp diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index ad176df..02372a9 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -23,10 +23,15 @@ #include #include +#include "rcl/client.h" +#include "rcl/error_handling.h" + #include "rclcpp/function_traits.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/type_support_decl.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" + #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -42,19 +47,18 @@ public: RCLCPP_PUBLIC ClientBase( - std::shared_ptr node_handle, - rmw_client_t * client_handle, + std::shared_ptr node_handle, const std::string & service_name); RCLCPP_PUBLIC - virtual ~ClientBase(); + ~ClientBase(); RCLCPP_PUBLIC const std::string & get_service_name() const; RCLCPP_PUBLIC - const rmw_client_t * + const rcl_client_t * get_client_handle() const; virtual std::shared_ptr create_response() = 0; @@ -62,12 +66,12 @@ public: virtual void handle_response( std::shared_ptr request_header, std::shared_ptr response) = 0; -private: +protected: RCLCPP_DISABLE_COPY(ClientBase); - std::shared_ptr node_handle_; + std::shared_ptr node_handle_; - rmw_client_t * client_handle_; + rcl_client_t client_handle_ = rcl_get_zero_initialized_client(); std::string service_name_; }; @@ -93,11 +97,32 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(Client); Client( - std::shared_ptr node_handle, - rmw_client_t * client_handle, - const std::string & service_name) - : ClientBase(node_handle, client_handle, service_name) - {} + std::shared_ptr node_handle, + const std::string & service_name, + rcl_client_options_t & client_options) + : ClientBase(node_handle, service_name) + { + using rosidl_generator_cpp::get_service_type_support_handle; + auto service_type_support_handle = + get_service_type_support_handle(); + if (rcl_client_init(&client_handle_, this->node_handle_.get(), + service_type_support_handle, service_name.c_str(), &client_options) != RCL_RET_OK) + { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("could not create client: ") + + rcl_get_error_string_safe()); + // *INDENT-ON* + } + } + + virtual ~Client() + { + if (rcl_client_fini(&client_handle_, node_handle_.get()) != RCL_RET_OK) { + fprintf(stderr, + "Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe()); + } + } std::shared_ptr create_response() { @@ -149,10 +174,10 @@ public: { std::lock_guard lock(pending_requests_mutex_); int64_t sequence_number; - if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) { + if (RCL_RET_OK != rcl_send_request(get_client_handle(), request.get(), &sequence_number)) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( - std::string("failed to send request: ") + rmw_get_error_string_safe()); + std::string("failed to send request: ") + rcl_get_error_string_safe()); // *INDENT-ON* } diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index de38dde..14cb8e3 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -25,6 +25,9 @@ #include #include +#include "rcl/guard_condition.h" +#include "rcl/wait.h" + #include "rclcpp/any_executable.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" @@ -193,10 +196,12 @@ public: } auto end_time = std::chrono::steady_clock::now(); - if (timeout > std::chrono::nanoseconds::zero()) { - end_time += timeout; + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; } - auto timeout_left = timeout; + std::chrono::nanoseconds timeout_left = timeout_ns; while (rclcpp::utilities::ok()) { // Do one item of work. @@ -207,7 +212,7 @@ public: return FutureReturnCode::SUCCESS; } // If the original timeout is < 0, then this is blocking, never TIMEOUT. - if (timeout < std::chrono::nanoseconds::zero()) { + if (timeout_ns < std::chrono::nanoseconds::zero()) { continue; } // Otherwise check if we still have time to wait, return TIMEOUT if not. @@ -216,8 +221,7 @@ public: return FutureReturnCode::TIMEOUT; } // Subtract the elapsed time from the original timeout. - using duration_type = std::chrono::duration; - timeout_left = std::chrono::duration_cast(end_time - now); + timeout_left = std::chrono::duration_cast(end_time - now); } // The future did not complete before ok() returned false, return INTERRUPTED. @@ -291,10 +295,6 @@ protected: void get_next_timer(AnyExecutable::SharedPtr any_exec); - RCLCPP_PUBLIC - std::chrono::nanoseconds - get_earliest_timer(); - RCLCPP_PUBLIC AnyExecutable::SharedPtr get_next_ready_executable(); @@ -307,10 +307,10 @@ protected: std::atomic_bool spinning; /// Guard condition for signaling the rmw layer to wake up for special events. - rmw_guard_condition_t * interrupt_guard_condition_; + rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition(); /// Waitset for managing entities that the rmw layer waits on. - rmw_waitset_t * waitset_; + rcl_wait_set_t waitset_ = rcl_get_zero_initialized_wait_set(); /// The memory strategy: an interface for handling user-defined memory allocation strategies. memory_strategy::MemoryStrategy::SharedPtr memory_strategy_; diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 9386a80..30f91cf 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -18,6 +18,9 @@ #include #include +#include "rcl/allocator.h" +#include "rcl/wait.h" + #include "rclcpp/any_executable.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node.hpp" @@ -40,31 +43,25 @@ public: RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(MemoryStrategy); using WeakNodeVector = std::vector>; - // return the new number of subscribers - virtual size_t fill_subscriber_handles(void ** & ptr) = 0; - - // return the new number of services - virtual size_t fill_service_handles(void ** & ptr) = 0; - - // return the new number of clients - virtual size_t fill_client_handles(void ** & ptr) = 0; - - // return the new number of guard_conditions - virtual size_t fill_guard_condition_handles(void ** & ptr) = 0; - - virtual void clear_active_entities() = 0; - - virtual void clear_handles() = 0; - virtual void remove_null_handles() = 0; virtual bool collect_entities(const WeakNodeVector & weak_nodes) = 0; + virtual size_t number_of_ready_subscriptions() const = 0; + virtual size_t number_of_ready_services() const = 0; + virtual size_t number_of_ready_clients() const = 0; + virtual size_t number_of_ready_timers() const = 0; + virtual size_t number_of_guard_conditions() const = 0; + + virtual bool add_handles_to_waitset(rcl_wait_set_t * wait_set) = 0; + virtual void clear_handles() = 0; + virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0; + /// Provide a newly initialized AnyExecutable object. // \return Shared pointer to the fresh executable. virtual rclcpp::executor::AnyExecutable::SharedPtr instantiate_next_executable() = 0; - virtual void add_guard_condition(const rmw_guard_condition_t * guard_condition) = 0; + virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0; - virtual void remove_guard_condition(const rmw_guard_condition_t * guard_condition) = 0; + virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0; virtual void get_next_subscription(rclcpp::executor::AnyExecutable::SharedPtr any_exec, @@ -78,15 +75,18 @@ public: get_next_client(rclcpp::executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) = 0; + virtual rcl_allocator_t + get_allocator() = 0; + static rclcpp::subscription::SubscriptionBase::SharedPtr - get_subscription_by_handle(void * subscriber_handle, + get_subscription_by_handle(const rcl_subscription_t * subscriber_handle, const WeakNodeVector & weak_nodes); static rclcpp::service::ServiceBase::SharedPtr - get_service_by_handle(void * service_handle, const WeakNodeVector & weak_nodes); + get_service_by_handle(const rcl_service_t * service_handle, const WeakNodeVector & weak_nodes); static rclcpp::client::ClientBase::SharedPtr - get_client_by_handle(void * client_handle, const WeakNodeVector & weak_nodes); + get_client_by_handle(const rcl_client_t * client_handle, const WeakNodeVector & weak_nodes); static rclcpp::node::Node::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 3f9eaf0..987c333 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -22,6 +22,9 @@ #include #include +#include "rcl/error_handling.h" +#include "rcl/node.h" + #include "rcl_interfaces/msg/list_parameters_result.hpp" #include "rcl_interfaces/msg/parameter_descriptor.hpp" #include "rcl_interfaces/msg/parameter_event.hpp" @@ -39,11 +42,10 @@ #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" -// Forward declaration of ROS middleware class -namespace rmw +namespace rcl { -struct rmw_node_t; -} // namespace rmw +struct rcl_node_t; +} // namespace rcl namespace rclcpp { @@ -256,7 +258,7 @@ public: get_callback_groups() const; RCLCPP_PUBLIC - rmw_guard_condition_t * get_notify_guard_condition() const; + const rcl_guard_condition_t * get_notify_guard_condition() const; std::atomic_bool has_executor; @@ -269,7 +271,7 @@ private: std::string name_; - std::shared_ptr node_handle_; + std::shared_ptr node_handle_; rclcpp::context::Context::SharedPtr context_; @@ -286,7 +288,7 @@ private: mutable std::mutex mutex_; /// Guard condition for notifying the Executor of changes to this node. - rmw_guard_condition_t * notify_guard_condition_; + rcl_guard_condition_t notify_guard_condition_ = rcl_get_zero_initialized_guard_condition(); std::map parameters_; diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 7148571..f13d241 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -30,6 +30,9 @@ #include #include +#include "rcl/publisher.h" +#include "rcl/subscription.h" + #include "rcl_interfaces/msg/intra_process_message.hpp" #include "rclcpp/contexts/default_context.hpp" @@ -70,33 +73,20 @@ Node::create_publisher( if (!allocator) { allocator = std::make_shared(); } - using rosidl_generator_cpp::get_message_type_support_handle; - auto type_support_handle = get_message_type_support_handle(); - rmw_publisher_t * publisher_handle = rmw_create_publisher( - node_handle_.get(), type_support_handle, topic_name.c_str(), &qos_profile); - if (!publisher_handle) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("could not create publisher: ") + - rmw_get_error_string_safe()); - // *INDENT-ON* - } + + auto publisher_options = rcl_publisher_get_default_options(); + publisher_options.qos = qos_profile; + + auto message_alloc = + std::make_shared::MessageAlloc>( + *allocator.get()); + publisher_options.allocator = allocator::get_rcl_allocator( + *message_alloc.get()); auto publisher = publisher::Publisher::make_shared( - node_handle_, publisher_handle, topic_name, qos_profile.depth, allocator); + node_handle_, topic_name, publisher_options, message_alloc); if (use_intra_process_comms_) { - rmw_publisher_t * intra_process_publisher_handle = rmw_create_publisher( - node_handle_.get(), rclcpp::type_support::get_intra_process_message_msg_type_support(), - (topic_name + "__intra").c_str(), &qos_profile); - if (!intra_process_publisher_handle) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("could not create intra process publisher: ") + - rmw_get_error_string_safe()); - // *INDENT-ON* - } - auto intra_process_manager = context_->get_sub_context(); uint64_t intra_process_publisher_id = @@ -132,9 +122,9 @@ Node::create_publisher( publisher->setup_intra_process( intra_process_publisher_id, shared_publish_callback, - intra_process_publisher_handle); + publisher_options); } - if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + if (rcl_trigger_guard_condition(¬ify_guard_condition_) != RCL_RET_OK) { throw std::runtime_error( std::string( "Failed to notify waitset on publisher creation: ") + rmw_get_error_string()); @@ -162,46 +152,37 @@ Node::create_subscription( Alloc> any_subscription_callback(allocator); any_subscription_callback.set(std::forward(callback)); - using rosidl_generator_cpp::get_message_type_support_handle; - if (!msg_mem_strat) { msg_mem_strat = rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); } + auto message_alloc = + std::make_shared::MessageAlloc>(); - auto type_support_handle = get_message_type_support_handle(); - rmw_subscription_t * subscriber_handle = rmw_create_subscription( - node_handle_.get(), type_support_handle, - topic_name.c_str(), &qos_profile, ignore_local_publications); - if (!subscriber_handle) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("could not create subscription: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } + auto subscription_options = rcl_subscription_get_default_options(); + subscription_options.qos = qos_profile; + subscription_options.allocator = rclcpp::allocator::get_rcl_allocator( + *message_alloc.get()); + subscription_options.ignore_local_publications = ignore_local_publications; using rclcpp::subscription::Subscription; using rclcpp::subscription::SubscriptionBase; auto sub = Subscription::make_shared( node_handle_, - subscriber_handle, topic_name, - ignore_local_publications, + subscription_options, any_subscription_callback, msg_mem_strat); auto sub_base_ptr = std::dynamic_pointer_cast(sub); // Setup intra process. if (use_intra_process_comms_) { - rmw_subscription_t * intra_process_subscriber_handle = rmw_create_subscription( - node_handle_.get(), rclcpp::type_support::get_intra_process_message_msg_type_support(), - (topic_name + "__intra").c_str(), &qos_profile, false); - if (!subscriber_handle) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("could not create intra process subscription: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } + auto intra_process_options = rcl_subscription_get_default_options(); + intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator( + *message_alloc.get()); + intra_process_options.qos = qos_profile; + intra_process_options.ignore_local_publications = false; + auto intra_process_manager = context_->get_sub_context(); rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = intra_process_manager; @@ -210,7 +191,6 @@ Node::create_subscription( // *INDENT-OFF* sub->setup_intra_process( intra_process_subscription_id, - intra_process_subscriber_handle, [weak_ipm]( uint64_t publisher_id, uint64_t message_sequence, @@ -233,7 +213,8 @@ Node::create_subscription( "intra process publisher check called after destruction of intra process manager"); } return ipm->matches_any_publishers(sender_gid); - } + }, + intra_process_options ); // *INDENT-ON* } @@ -248,7 +229,7 @@ Node::create_subscription( default_callback_group_->add_subscription(sub_base_ptr); } number_of_subscriptions_++; - if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + if (rcl_trigger_guard_condition(¬ify_guard_condition_) != RCL_RET_OK) { throw std::runtime_error( std::string( "Failed to notify waitset on subscription creation: ") + rmw_get_error_string()); @@ -299,7 +280,7 @@ Node::create_wall_timer( default_callback_group_->add_timer(timer); } number_of_timers_++; - if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + if (rcl_trigger_guard_condition(¬ify_guard_condition_) != RCL_RET_OK) { throw std::runtime_error( std::string( "Failed to notify waitset on timer creation: ") + rmw_get_error_string()); @@ -314,27 +295,16 @@ Node::create_client( const rmw_qos_profile_t & qos_profile, rclcpp::callback_group::CallbackGroup::SharedPtr group) { - using rosidl_generator_cpp::get_service_type_support_handle; - auto service_type_support_handle = - get_service_type_support_handle(); - - rmw_client_t * client_handle = rmw_create_client( - this->node_handle_.get(), service_type_support_handle, service_name.c_str(), &qos_profile); - if (!client_handle) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("could not create client: ") + - rmw_get_error_string_safe()); - // *INDENT-ON* - } + rcl_client_options_t options = rcl_client_get_default_options(); + options.qos = qos_profile; using rclcpp::client::Client; using rclcpp::client::ClientBase; auto cli = Client::make_shared( node_handle_, - client_handle, - service_name); + service_name, + options); auto cli_base_ptr = std::dynamic_pointer_cast(cli); if (group) { @@ -348,7 +318,7 @@ Node::create_client( } number_of_clients_++; - if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + if (rcl_trigger_guard_condition(¬ify_guard_condition_) != RCL_RET_OK) { throw std::runtime_error( std::string( "Failed to notify waitset on client creation: ") + rmw_get_error_string()); @@ -364,25 +334,15 @@ Node::create_service( const rmw_qos_profile_t & qos_profile, rclcpp::callback_group::CallbackGroup::SharedPtr group) { - using rosidl_generator_cpp::get_service_type_support_handle; - auto service_type_support_handle = - get_service_type_support_handle(); - rclcpp::service::AnyServiceCallback any_service_callback; any_service_callback.set(std::forward(callback)); - rmw_service_t * service_handle = rmw_create_service( - node_handle_.get(), service_type_support_handle, service_name.c_str(), &qos_profile); - if (!service_handle) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("could not create service: ") + - rmw_get_error_string_safe()); - // *INDENT-ON* - } + rcl_service_options_t service_options = rcl_service_get_default_options(); + service_options.qos = qos_profile; auto serv = service::Service::make_shared( - node_handle_, service_handle, service_name, any_service_callback); + node_handle_, + service_name, any_service_callback, service_options); auto serv_base_ptr = std::dynamic_pointer_cast(serv); if (group) { if (!group_in_node(group)) { @@ -394,7 +354,7 @@ Node::create_service( default_callback_group_->add_service(serv_base_ptr); } number_of_services_++; - if (rmw_trigger_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + if (rcl_trigger_guard_condition(¬ify_guard_condition_) != RCL_RET_OK) { throw std::runtime_error( std::string( "Failed to notify waitset on service creation: ") + rmw_get_error_string()); diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 7ec46e8..bc50635 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -24,12 +24,16 @@ #include #include +#include "rcl/error_handling.h" +#include "rcl/publisher.h" + #include "rcl_interfaces/msg/intra_process_message.hpp" #include "rmw/impl/cpp/demangle.hpp" #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -52,21 +56,18 @@ public: /** * Typically, a publisher is not created through this method, but instead is created through a * call to `Node::create_publisher`. - * \param[in] node_handle The corresponding rmw representation of the owner node. - * \param[in] publisher_handle The rmw publisher handle corresponding to this publisher. + * \param[in] node_handle The corresponding rcl representation of the owner node. * \param[in] topic The topic that this publisher publishes on. * \param[in] queue_size The maximum number of unpublished messages to queue. */ RCLCPP_PUBLIC PublisherBase( - std::shared_ptr node_handle, - rmw_publisher_t * publisher_handle, + std::shared_ptr node_handle, std::string topic, size_t queue_size); - /// Default destructor. RCLCPP_PUBLIC - virtual ~PublisherBase(); + ~PublisherBase(); /// Get the topic that this publisher publishes on. // \return The topic name. @@ -120,12 +121,13 @@ protected: setup_intra_process( uint64_t intra_process_publisher_id, StoreMessageCallbackT callback, - rmw_publisher_t * intra_process_publisher_handle); + const rcl_publisher_options_t & intra_process_options); - std::shared_ptr node_handle_; + std::shared_ptr node_handle_; - rmw_publisher_t * publisher_handle_; - rmw_publisher_t * intra_process_publisher_handle_; + rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); + rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); + rcl_allocator_t rcl_allocator_ = rcl_get_default_allocator(); std::string topic_; size_t queue_size_; @@ -152,15 +154,54 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(Publisher); Publisher( - std::shared_ptr node_handle, - rmw_publisher_t * publisher_handle, + std::shared_ptr node_handle, std::string topic, - size_t queue_size, - std::shared_ptr allocator) - : PublisherBase(node_handle, publisher_handle, topic, queue_size) + const rcl_publisher_options_t & publisher_options, + std::shared_ptr allocator) + : PublisherBase(node_handle, topic, publisher_options.qos.depth), message_allocator_(allocator) { - message_allocator_ = std::make_shared(*allocator.get()); + using rosidl_generator_cpp::get_message_type_support_handle; allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + + rcl_allocator_ = publisher_options.allocator; + auto type_support_handle = get_message_type_support_handle(); + if (rcl_publisher_init( + &publisher_handle_, node_handle_.get(), type_support_handle, + topic.c_str(), &publisher_options) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("could not create publisher: ") + + rcl_get_error_string_safe()); + } + // Life time of this object is tied to the publisher handle. + rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle(&publisher_handle_); + if (!publisher_rmw_handle) { + throw std::runtime_error( + std::string("failed to get rmw handle: ") + rcl_get_error_string_safe()); + } + if (rmw_get_gid_for_publisher(publisher_rmw_handle, &rmw_gid_) != RMW_RET_OK) { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("failed to get publisher gid: ") + rmw_get_error_string_safe()); + // *INDENT-ON* + } + } + + virtual ~Publisher() + { + if (rcl_publisher_fini(&intra_process_publisher_handle_, node_handle_.get()) != RCL_RET_OK) { + fprintf( + stderr, + "Error in destruction of intra process rcl publisher handle: %s\n", + rcl_get_error_string_safe()); + } + + if (rcl_publisher_fini(&publisher_handle_, node_handle_.get()) != RCL_RET_OK) { + fprintf( + stderr, + "Error in destruction of rcl publisher handle: %s\n", + rcl_get_error_string_safe()); + } } @@ -188,11 +229,11 @@ public: rcl_interfaces::msg::IntraProcessMessage ipm; ipm.publisher_id = intra_process_publisher_id_; ipm.message_sequence = message_seq; - auto status = rmw_publish(intra_process_publisher_handle_, &ipm); - if (status != RMW_RET_OK) { + auto status = rcl_publish(&intra_process_publisher_handle_, &ipm); + if (status != RCL_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( - std::string("failed to publish intra process message: ") + rmw_get_error_string_safe()); + std::string("failed to publish intra process message: ") + rcl_get_error_string_safe()); // *INDENT-ON* } } else { @@ -263,11 +304,11 @@ protected: void do_inter_process_publish(const MessageT * msg) { - auto status = rmw_publish(publisher_handle_, msg); - if (status != RMW_RET_OK) { + auto status = rcl_publish(&publisher_handle_, msg); + if (status != RCL_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( - std::string("failed to publish message: ") + rmw_get_error_string_safe()); + std::string("failed to publish message: ") + rcl_get_error_string_safe()); // *INDENT-ON* } } diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index b267cbc..19ae8f3 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -21,8 +21,12 @@ #include #include +#include "rcl/error_handling.h" +#include "rcl/service.h" + #include "rclcpp/any_service_callback.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -39,19 +43,18 @@ public: RCLCPP_PUBLIC ServiceBase( - std::shared_ptr node_handle, - rmw_service_t * service_handle, + std::shared_ptr node_handle, const std::string service_name); RCLCPP_PUBLIC - virtual ~ServiceBase(); + ~ServiceBase(); RCLCPP_PUBLIC std::string get_service_name(); RCLCPP_PUBLIC - const rmw_service_t * + const rcl_service_t * get_service_handle(); virtual std::shared_ptr create_request() = 0; @@ -60,12 +63,12 @@ public: std::shared_ptr request_header, std::shared_ptr request) = 0; -private: +protected: RCLCPP_DISABLE_COPY(ServiceBase); - std::shared_ptr node_handle_; + std::shared_ptr node_handle_; - rmw_service_t * service_handle_; + rcl_service_t service_handle_ = rcl_get_zero_initialized_service(); std::string service_name_; }; @@ -88,15 +91,37 @@ public: RCLCPP_SMART_PTR_DEFINITIONS(Service); Service( - std::shared_ptr node_handle, - rmw_service_t * service_handle, + std::shared_ptr node_handle, const std::string & service_name, - AnyServiceCallback any_callback) - : ServiceBase(node_handle, service_handle, service_name), any_callback_(any_callback) - {} + AnyServiceCallback any_callback, + rcl_service_options_t & service_options) + : ServiceBase(node_handle, service_name), any_callback_(any_callback) + { + using rosidl_generator_cpp::get_service_type_support_handle; + auto service_type_support_handle = get_service_type_support_handle(); + + if (rcl_service_init( + &service_handle_, node_handle.get(), service_type_support_handle, service_name.c_str(), + &service_options) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("could not create service: ") + + rcl_get_error_string_safe()); + } + } Service() = delete; + virtual ~Service() + { + if (rcl_service_fini(&service_handle_, node_handle_.get()) != RCL_RET_OK) { + std::stringstream ss; + ss << "Error in destruction of rcl service_handle_ handle: " << + rcl_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); + } + } + std::shared_ptr create_request() { return std::shared_ptr(new typename ServiceT::Request()); @@ -122,11 +147,12 @@ public: std::shared_ptr req_id, std::shared_ptr response) { - rmw_ret_t status = rmw_send_response(get_service_handle(), req_id.get(), response.get()); - if (status != RMW_RET_OK) { + rcl_ret_t status = rcl_send_response(get_service_handle(), req_id.get(), response.get()); + + if (status != RCL_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( - std::string("failed to send response: ") + rmw_get_error_string_safe()); + std::string("failed to send response: ") + rcl_get_error_string_safe()); // *INDENT-ON* } } diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index c0a0a4b..10510b1 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -18,6 +18,8 @@ #include #include +#include "rcl/allocator.h" + #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/memory_strategy.hpp" #include "rclcpp/node.hpp" @@ -64,7 +66,7 @@ public: allocator_ = std::make_shared(); } - void add_guard_condition(const rmw_guard_condition_t * guard_condition) + void add_guard_condition(const rcl_guard_condition_t * guard_condition) { for (const auto & existing_guard_condition : guard_conditions_) { if (existing_guard_condition == guard_condition) { @@ -74,7 +76,7 @@ public: guard_conditions_.push_back(guard_condition); } - void remove_guard_condition(const rmw_guard_condition_t * guard_condition) + void remove_guard_condition(const rcl_guard_condition_t * guard_condition) { for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) { if (*it == guard_condition) { @@ -84,69 +86,40 @@ public: } } - size_t fill_subscriber_handles(void ** & ptr) - { - for (auto & subscription : subscriptions_) { - subscriber_handles_.push_back(subscription->get_subscription_handle()->data); - if (subscription->get_intra_process_subscription_handle()) { - subscriber_handles_.push_back(subscription->get_intra_process_subscription_handle()->data); - } - } - ptr = subscriber_handles_.data(); - return subscriber_handles_.size(); - } - - // return the new number of services - size_t fill_service_handles(void ** & ptr) - { - for (auto & service : services_) { - service_handles_.push_back(service->get_service_handle()->data); - } - ptr = service_handles_.data(); - return service_handles_.size(); - } - - // return the new number of clients - size_t fill_client_handles(void ** & ptr) - { - for (auto & client : clients_) { - client_handles_.push_back(client->get_client_handle()->data); - } - ptr = client_handles_.data(); - return client_handles_.size(); - } - - size_t fill_guard_condition_handles(void ** & ptr) - { - for (const auto & guard_condition : guard_conditions_) { - if (guard_condition) { - guard_condition_handles_.push_back(guard_condition->data); - } - } - ptr = guard_condition_handles_.data(); - return guard_condition_handles_.size(); - } - - void clear_active_entities() - { - subscriptions_.clear(); - services_.clear(); - clients_.clear(); - } - void clear_handles() { - subscriber_handles_.clear(); + subscription_handles_.clear(); service_handles_.clear(); client_handles_.clear(); - guard_condition_handles_.clear(); + timer_handles_.clear(); } - void remove_null_handles() + virtual void remove_null_handles(rcl_wait_set_t * wait_set) { - subscriber_handles_.erase( - std::remove(subscriber_handles_.begin(), subscriber_handles_.end(), nullptr), - subscriber_handles_.end() + for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) { + if (!wait_set->subscriptions[i]) { + subscription_handles_[i] = nullptr; + } + } + for (size_t i = 0; i < wait_set->size_of_services; ++i) { + if (!wait_set->services[i]) { + service_handles_[i] = nullptr; + } + } + for (size_t i = 0; i < wait_set->size_of_clients; ++i) { + if (!wait_set->clients[i]) { + client_handles_[i] = nullptr; + } + } + for (size_t i = 0; i < wait_set->size_of_timers; ++i) { + if (!wait_set->timers[i]) { + timer_handles_[i] = nullptr; + } + } + + subscription_handles_.erase( + std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr), + subscription_handles_.end() ); service_handles_.erase( @@ -159,9 +132,9 @@ public: client_handles_.end() ); - guard_condition_handles_.erase( - std::remove(guard_condition_handles_.begin(), guard_condition_handles_.end(), nullptr), - guard_condition_handles_.end() + timer_handles_.erase( + std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr), + timer_handles_.end() ); } @@ -182,18 +155,28 @@ public: for (auto & weak_subscription : group->get_subscription_ptrs()) { auto subscription = weak_subscription.lock(); if (subscription) { - subscriptions_.push_back(subscription); + subscription_handles_.push_back(subscription->get_subscription_handle()); + if (subscription->get_intra_process_subscription_handle()) { + subscription_handles_.push_back( + subscription->get_intra_process_subscription_handle()); + } } } for (auto & service : group->get_service_ptrs()) { if (service) { - services_.push_back(service); + service_handles_.push_back(service->get_service_handle()); } } for (auto & weak_client : group->get_client_ptrs()) { auto client = weak_client.lock(); if (client) { - clients_.push_back(client); + client_handles_.push_back(client->get_client_handle()); + } + } + for (auto & weak_timer : group->get_timer_ptrs()) { + auto timer = weak_timer.lock(); + if (timer) { + timer_handles_.push_back(timer->get_timer_handle()); } } } @@ -201,6 +184,45 @@ public: return has_invalid_weak_nodes; } + bool add_handles_to_waitset(rcl_wait_set_t * wait_set) + { + for (auto subscription : subscription_handles_) { + if (rcl_wait_set_add_subscription(wait_set, subscription) != RCL_RET_OK) { + fprintf(stderr, "Couldn't add subscription to waitset: %s\n", rcl_get_error_string_safe()); + return false; + } + } + + for (auto client : client_handles_) { + if (rcl_wait_set_add_client(wait_set, client) != RCL_RET_OK) { + fprintf(stderr, "Couldn't add client to waitset: %s\n", rcl_get_error_string_safe()); + return false; + } + } + + for (auto service : service_handles_) { + if (rcl_wait_set_add_service(wait_set, service) != RCL_RET_OK) { + fprintf(stderr, "Couldn't add service to waitset: %s\n", rcl_get_error_string_safe()); + return false; + } + } + + for (auto timer : timer_handles_) { + if (rcl_wait_set_add_timer(wait_set, timer) != RCL_RET_OK) { + fprintf(stderr, "Couldn't add timer to waitset: %s\n", rcl_get_error_string_safe()); + return false; + } + } + + for (auto guard_condition : guard_conditions_) { + if (rcl_wait_set_add_guard_condition(wait_set, guard_condition) != RCL_RET_OK) { + fprintf(stderr, "Couldn't add guard_condition to waitset: %s\n", + rcl_get_error_string_safe()); + return false; + } + } + return true; + } /// Provide a newly initialized AnyExecutable object. // \return Shared pointer to the fresh executable. @@ -213,21 +235,21 @@ public: get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes) { - auto it = subscriber_handles_.begin(); - while (it != subscriber_handles_.end()) { + auto it = subscription_handles_.begin(); + while (it != subscription_handles_.end()) { auto subscription = get_subscription_by_handle(*it, weak_nodes); if (subscription) { // Figure out if this is for intra-process or not. bool is_intra_process = false; if (subscription->get_intra_process_subscription_handle()) { - is_intra_process = subscription->get_intra_process_subscription_handle()->data == *it; + is_intra_process = subscription->get_intra_process_subscription_handle() == *it; } // Find the group for this handle and see if it can be serviced auto group = get_group_by_subscription(subscription, weak_nodes); if (!group) { // Group was not found, meaning the subscription is not valid... // Remove it from the ready list and continue looking - subscriber_handles_.erase(it); + subscription_handles_.erase(it); continue; } if (!group->can_be_taken_from().load()) { @@ -244,11 +266,11 @@ public: } any_exec->callback_group = group; any_exec->node = get_node_by_group(group, weak_nodes); - subscriber_handles_.erase(it); + subscription_handles_.erase(it); return; } // Else, the subscription is no longer valid, remove it and continue - subscriber_handles_.erase(it); + subscription_handles_.erase(it); } } @@ -319,20 +341,47 @@ public: } } + virtual rcl_allocator_t get_allocator() + { + return rclcpp::allocator::get_rcl_allocator(*allocator_.get()); + } + + size_t number_of_ready_subscriptions() const + { + return subscription_handles_.size(); + } + + size_t number_of_ready_services() const + { + return service_handles_.size(); + } + + size_t number_of_ready_clients() const + { + return client_handles_.size(); + } + + size_t number_of_guard_conditions() const + { + return guard_conditions_.size(); + } + + size_t number_of_ready_timers() const + { + return timer_handles_.size(); + } + private: template using VectorRebind = std::vector::template rebind_alloc>; - VectorRebind subscriptions_; - VectorRebind services_; - VectorRebind clients_; - VectorRebind guard_conditions_; + VectorRebind guard_conditions_; - VectorRebind subscriber_handles_; - VectorRebind service_handles_; - VectorRebind client_handles_; - VectorRebind guard_condition_handles_; + VectorRebind subscription_handles_; + VectorRebind service_handles_; + VectorRebind client_handles_; + VectorRebind timer_handles_; std::shared_ptr executable_allocator_; std::shared_ptr allocator_; diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 0044b20..7725916 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -24,11 +24,15 @@ #include #include +#include "rcl/error_handling.h" +#include "rcl/subscription.h" + #include "rcl_interfaces/msg/intra_process_message.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" #include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -51,14 +55,13 @@ public: /// Default constructor. /** - * \param[in] node_handle The rmw representation of the node that owns this subscription. + * \param[in] node_handle The rcl representation of the node that owns this subscription. * \param[in] topic_name Name of the topic to subscribe to. * \param[in] ignore_local_publications True to ignore local publications (unused). */ RCLCPP_PUBLIC SubscriptionBase( - std::shared_ptr node_handle, - rmw_subscription_t * subscription_handle, + std::shared_ptr node_handle, const std::string & topic_name, bool ignore_local_publications); @@ -72,11 +75,11 @@ public: get_topic_name() const; RCLCPP_PUBLIC - const rmw_subscription_t * + const rcl_subscription_t * get_subscription_handle() const; RCLCPP_PUBLIC - const rmw_subscription_t * + virtual const rcl_subscription_t * get_intra_process_subscription_handle() const; /// Borrow a new message. @@ -101,15 +104,12 @@ public: const rmw_message_info_t & message_info) = 0; protected: - rmw_subscription_t * intra_process_subscription_handle_; + rcl_subscription_t intra_process_subscription_handle_ = rcl_get_zero_initialized_subscription(); + rcl_subscription_t subscription_handle_ = rcl_get_zero_initialized_subscription(); + std::shared_ptr node_handle_; private: RCLCPP_DISABLE_COPY(SubscriptionBase); - - std::shared_ptr node_handle_; - - rmw_subscription_t * subscription_handle_; - std::string topic_name_; bool ignore_local_publications_; }; @@ -134,27 +134,37 @@ public: /** * The constructor for a subscription is almost never called directly. Instead, subscriptions * should be instantiated through Node::create_subscription. - * \param[in] node_handle rmw representation of the node that owns this subscription. + * \param[in] node_handle rcl representation of the node that owns this subscription. * \param[in] topic_name Name of the topic to subscribe to. * \param[in] ignore_local_publications True to ignore local publications (unused). * \param[in] callback User-defined callback to call when a message is received. * \param[in] memory_strategy The memory strategy to be used for managing message memory. */ Subscription( - std::shared_ptr node_handle, - rmw_subscription_t * subscription_handle, + std::shared_ptr node_handle, const std::string & topic_name, - bool ignore_local_publications, + const rcl_subscription_options_t & subscription_options, AnySubscriptionCallback callback, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) - : SubscriptionBase(node_handle, subscription_handle, topic_name, ignore_local_publications), + : SubscriptionBase( + node_handle, topic_name, subscription_options.ignore_local_publications), any_callback_(callback), message_memory_strategy_(memory_strategy), get_intra_process_message_callback_(nullptr), matches_any_intra_process_publishers_(nullptr) { + using rosidl_generator_cpp::get_message_type_support_handle; + + auto type_support_handle = get_message_type_support_handle(); + if (rcl_subscription_init( + &subscription_handle_, node_handle_.get(), type_support_handle, topic_name.c_str(), + &subscription_options) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("could not create subscription: ") + rcl_get_error_string_safe()); + } } /// Support dynamically setting the message memory strategy. @@ -231,16 +241,36 @@ private: void setup_intra_process( uint64_t intra_process_subscription_id, - rmw_subscription_t * intra_process_subscription, GetMessageCallbackType get_message_callback, - MatchesAnyPublishersCallbackType matches_any_publisher_callback) + MatchesAnyPublishersCallbackType matches_any_publisher_callback, + const rcl_subscription_options_t & intra_process_options) { + if (rcl_subscription_init( + &intra_process_subscription_handle_, node_handle_.get(), + rclcpp::type_support::get_intra_process_message_msg_type_support(), + (get_topic_name() + "__intra").c_str(), + &intra_process_options) != RCL_RET_OK) + { + // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) + throw std::runtime_error( + std::string("could not create intra process subscription: ") + rcl_get_error_string_safe()); + // *INDENT-ON* + } + intra_process_subscription_id_ = intra_process_subscription_id; - intra_process_subscription_handle_ = intra_process_subscription; get_intra_process_message_callback_ = get_message_callback; matches_any_intra_process_publishers_ = matches_any_publisher_callback; } + const rcl_subscription_t * + get_intra_process_subscription_handle() const + { + if (!get_intra_process_message_callback_) { + return nullptr; + } + return &intra_process_subscription_handle_; + } + RCLCPP_DISABLE_COPY(Subscription); AnySubscriptionCallback any_callback_; diff --git a/rclcpp/include/rclcpp/timer.hpp b/rclcpp/include/rclcpp/timer.hpp index d70a2e9..afa683d 100644 --- a/rclcpp/include/rclcpp/timer.hpp +++ b/rclcpp/include/rclcpp/timer.hpp @@ -27,6 +27,10 @@ #include "rclcpp/rate.hpp" #include "rclcpp/utilities.hpp" #include "rclcpp/visibility_control.hpp" + +#include "rcl/error_handling.h" +#include "rcl/timer.h" + #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -44,7 +48,7 @@ public: explicit TimerBase(std::chrono::nanoseconds period); RCLCPP_PUBLIC - virtual ~TimerBase(); + ~TimerBase(); RCLCPP_PUBLIC void @@ -54,27 +58,31 @@ public: virtual void execute_callback() = 0; + RCLCPP_PUBLIC + const rcl_timer_t * + get_timer_handle(); + /// Check how long the timer has until its next scheduled callback. // \return A std::chrono::duration representing the relative time until the next callback. - virtual std::chrono::nanoseconds - time_until_trigger() = 0; + RCLCPP_PUBLIC + std::chrono::nanoseconds + time_until_trigger(); /// Is the clock steady (i.e. is the time between ticks constant?) // \return True if the clock used by this timer is steady. virtual bool is_steady() = 0; - /// Check if the timer needs to trigger the callback. + /// Check if the timer is ready to trigger the callback. /** * This function expects its caller to immediately trigger the callback after this function, * since it maintains the last time the callback was triggered. * \return True if the timer needs to trigger. */ - virtual bool check_and_trigger() = 0; + RCLCPP_PUBLIC + bool is_ready(); protected: - std::chrono::nanoseconds period_; - - bool canceled_; + rcl_timer_t timer_handle_ = rcl_get_zero_initialized_timer(); }; @@ -102,10 +110,8 @@ public: * \param[in] callback User-specified callback function. */ GenericTimer(std::chrono::nanoseconds period, FunctorT && callback) - : TimerBase(period), callback_(std::forward(callback)), loop_rate_(period) + : TimerBase(period), callback_(std::forward(callback)) { - /* Set last_triggered_time_ so that the timer fires at least one period after being created. */ - last_triggered_time_ = Clock::now(); } /// Default destructor. @@ -113,11 +119,21 @@ public: { // Stop the timer from running. cancel(); + if (rcl_timer_fini(&timer_handle_) != RCL_RET_OK) { + fprintf(stderr, "Failed to clean up rcl timer handle: %s\n", rcl_get_error_string_safe()); + } } void execute_callback() { + rcl_ret_t ret = rcl_timer_call(&timer_handle_); + if (ret == RCL_RET_TIMER_CANCELED) { + return; + } + if (ret != RCL_RET_OK) { + throw std::runtime_error("Failed to notify timer that callback occurred"); + } execute_callback_delegate<>(); } @@ -146,39 +162,6 @@ public: callback_(*this); } - bool - check_and_trigger() - { - if (canceled_) { - return false; - } - if (Clock::now() < last_triggered_time_) { - return false; - } - if (std::chrono::duration_cast(Clock::now() - last_triggered_time_) >= - loop_rate_.period()) - { - last_triggered_time_ = Clock::now(); - return true; - } - return false; - } - - std::chrono::nanoseconds - time_until_trigger() - { - std::chrono::nanoseconds time_until_trigger; - // Calculate the time between the next trigger and the current time - if (last_triggered_time_ + loop_rate_.period() < Clock::now()) { - // time is overdue, need to trigger immediately - time_until_trigger = std::chrono::nanoseconds::zero(); - } else { - time_until_trigger = std::chrono::duration_cast( - last_triggered_time_ - Clock::now()) + loop_rate_.period(); - } - return time_until_trigger; - } - virtual bool is_steady() { @@ -189,8 +172,6 @@ protected: RCLCPP_DISABLE_COPY(GenericTimer); FunctorT callback_; - rclcpp::rate::GenericRate loop_rate_; - std::chrono::time_point last_triggered_time_; }; template diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index dbcac72..ace7200 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -18,6 +18,9 @@ #include #include "rclcpp/visibility_control.hpp" + +#include "rcl/guard_condition.h" + #include "rmw/macros.h" #include "rmw/rmw.h" @@ -48,7 +51,7 @@ shutdown(); /// Get a handle to the rmw guard condition that manages the signal handler. RCLCPP_PUBLIC -rmw_guard_condition_t * +rcl_guard_condition_t * get_global_sigint_guard_condition(); /// Use the global condition variable to block for the specified amount of time. diff --git a/rclcpp/package.xml b/rclcpp/package.xml index db04382..a16a756 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -16,6 +16,7 @@ rcl_interfaces rosidl_generator_cpp + rcl rmw_implementation ament_cmake diff --git a/rclcpp/src/rclcpp/client.cpp b/rclcpp/src/rclcpp/client.cpp index 29aa62a..0fd26a8 100644 --- a/rclcpp/src/rclcpp/client.cpp +++ b/rclcpp/src/rclcpp/client.cpp @@ -22,20 +22,13 @@ using rclcpp::client::ClientBase; ClientBase::ClientBase( - std::shared_ptr node_handle, - rmw_client_t * client_handle, + std::shared_ptr node_handle, const std::string & service_name) -: node_handle_(node_handle), client_handle_(client_handle), service_name_(service_name) +: node_handle_(node_handle), service_name_(service_name) {} ClientBase::~ClientBase() { - if (client_handle_) { - if (rmw_destroy_client(client_handle_) != RMW_RET_OK) { - fprintf(stderr, - "Error in destruction of rmw client handle: %s\n", rmw_get_error_string_safe()); - } - } } const std::string & @@ -44,8 +37,8 @@ ClientBase::get_service_name() const return this->service_name_; } -const rmw_client_t * +const rcl_client_t * ClientBase::get_client_handle() const { - return this->client_handle_; + return &client_handle_; } diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 041bee7..ad05e50 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -16,6 +16,9 @@ #include #include +#include "rcl/allocator.h" +#include "rcl/error_handling.h" + #include "rclcpp/executor.hpp" #include "rclcpp/scope_exit.hpp" @@ -30,29 +33,33 @@ Executor::Executor(const ExecutorArgs & args) : spinning(false), memory_strategy_(args.memory_strategy) { - interrupt_guard_condition_ = rmw_create_guard_condition(); - if (!interrupt_guard_condition_) { - throw std::runtime_error("Failed to create interrupt guard condition in Executor constructor"); + rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options(); + if (rcl_guard_condition_init( + &interrupt_guard_condition_, guard_condition_options) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("Failed to create interrupt guard condition in Executor constructor: ") + + rcl_get_error_string_safe()); } // The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond, // and one for the executor's guard cond (interrupt_guard_condition_) - // The size of guard conditions is variable because the number of nodes can vary // Put the global ctrl-c guard condition in memory_strategy_->add_guard_condition(rclcpp::utilities::get_global_sigint_guard_condition()); + // Put the executor's guard condition in - memory_strategy_->add_guard_condition(interrupt_guard_condition_); + memory_strategy_->add_guard_condition(&interrupt_guard_condition_); + rcl_allocator_t allocator = memory_strategy_->get_allocator(); - waitset_ = rmw_create_waitset(args.max_conditions); - - if (!waitset_) { + if (rcl_wait_set_init( + &waitset_, 0, 2, 0, 0, 0, allocator) != RCL_RET_OK) + { fprintf(stderr, - "[rclcpp::error] failed to create waitset: %s\n", rmw_get_error_string_safe()); - rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); - if (status != RMW_RET_OK) { + "[rclcpp::error] failed to create waitset: %s\n", rcl_get_error_string_safe()); + if (rcl_guard_condition_fini(&interrupt_guard_condition_) != RCL_RET_OK) { fprintf(stderr, - "[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe()); + "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); } throw std::runtime_error("Failed to create waitset in Executor constructor"); } @@ -60,21 +67,15 @@ Executor::Executor(const ExecutorArgs & args) Executor::~Executor() { - // Try to deallocate the waitset. - if (waitset_) { - rmw_ret_t status = rmw_destroy_waitset(waitset_); - if (status != RMW_RET_OK) { - fprintf(stderr, - "[rclcpp::error] failed to destroy waitset: %s\n", rmw_get_error_string_safe()); - } + // Finalize the waitset. + if (rcl_wait_set_fini(&waitset_) != RCL_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy waitset: %s\n", rcl_get_error_string_safe()); } - // Try to deallocate the interrupt guard condition. - if (interrupt_guard_condition_ != nullptr) { - rmw_ret_t status = rmw_destroy_guard_condition(interrupt_guard_condition_); - if (status != RMW_RET_OK) { - fprintf(stderr, - "[rclcpp::error] failed to destroy guard condition: %s\n", rmw_get_error_string_safe()); - } + // Finalize the interrupt guard condition. + if (rcl_guard_condition_fini(&interrupt_guard_condition_) != RCL_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); } } @@ -96,9 +97,8 @@ Executor::add_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) weak_nodes_.push_back(node_ptr); if (notify) { // Interrupt waiting to handle new node - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); - if (status != RMW_RET_OK) { - throw std::runtime_error(rmw_get_error_string_safe()); + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string_safe()); } } // Add the node's notify condition to the guard condition handles @@ -126,9 +126,8 @@ Executor::remove_node(rclcpp::node::Node::SharedPtr node_ptr, bool notify) if (notify) { // If the node was matched and removed, interrupt waiting if (node_removed) { - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); - if (status != RMW_RET_OK) { - throw std::runtime_error(rmw_get_error_string_safe()); + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string_safe()); } } } @@ -184,9 +183,8 @@ void Executor::cancel() { spinning.store(false); - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); - if (status != RMW_RET_OK) { - throw std::runtime_error(rmw_get_error_string_safe()); + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string_safe()); } } @@ -224,9 +222,8 @@ Executor::execute_any_executable(AnyExecutable::SharedPtr any_exec) any_exec->callback_group->can_be_taken_from().store(true); // Wake the wait, because it may need to be recalculated or work that // was previously blocked is now available. - rmw_ret_t status = rmw_trigger_guard_condition(interrupt_guard_condition_); - if (status != RMW_RET_OK) { - throw std::runtime_error(rmw_get_error_string_safe()); + if (rcl_trigger_guard_condition(&interrupt_guard_condition_) != RCL_RET_OK) { + throw std::runtime_error(rcl_get_error_string_safe()); } } @@ -235,20 +232,17 @@ Executor::execute_subscription( rclcpp::subscription::SubscriptionBase::SharedPtr subscription) { std::shared_ptr message = subscription->create_message(); - bool taken = false; rmw_message_info_t message_info; - auto ret = - rmw_take_with_info(subscription->get_subscription_handle(), - message.get(), &taken, &message_info); - if (ret == RMW_RET_OK) { - if (taken) { - message_info.from_intra_process = false; - subscription->handle_message(message, message_info); - } - } else { + + auto ret = rcl_take(subscription->get_subscription_handle(), + message.get(), &message_info); + if (ret == RCL_RET_OK) { + message_info.from_intra_process = false; + subscription->handle_message(message, message_info); + } else if (ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) { fprintf(stderr, "[rclcpp::error] take failed for subscription on topic '%s': %s\n", - subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); + subscription->get_topic_name().c_str(), rcl_get_error_string_safe()); } subscription->return_message(message); } @@ -258,22 +252,19 @@ Executor::execute_intra_process_subscription( rclcpp::subscription::SubscriptionBase::SharedPtr subscription) { rcl_interfaces::msg::IntraProcessMessage ipm; - bool taken = false; rmw_message_info_t message_info; - rmw_ret_t status = rmw_take_with_info( + rcl_ret_t status = rcl_take( subscription->get_intra_process_subscription_handle(), &ipm, - &taken, &message_info); - if (status == RMW_RET_OK) { - if (taken) { - message_info.from_intra_process = true; - subscription->handle_intra_process_message(ipm, message_info); - } - } else { + + if (status == RCL_RET_OK) { + message_info.from_intra_process = true; + subscription->handle_intra_process_message(ipm, message_info); + } else if (status != RCL_RET_SUBSCRIPTION_TAKE_FAILED) { fprintf(stderr, "[rclcpp::error] take failed for intra process subscription on topic '%s': %s\n", - subscription->get_topic_name().c_str(), rmw_get_error_string_safe()); + subscription->get_topic_name().c_str(), rcl_get_error_string_safe()); } } @@ -290,20 +281,18 @@ Executor::execute_service( { auto request_header = service->create_request_header(); std::shared_ptr request = service->create_request(); - bool taken = false; - rmw_ret_t status = rmw_take_request( + rcl_ret_t status = rcl_take_request( service->get_service_handle(), request_header.get(), - request.get(), - &taken); - if (status == RMW_RET_OK) { - if (taken) { + request.get()); + if (status != RCL_RET_SERVICE_TAKE_FAILED) { + if (status == RCL_RET_OK) { service->handle_request(request_header, request); } } else { fprintf(stderr, "[rclcpp::error] take request failed for server of service '%s': %s\n", - service->get_service_name().c_str(), rmw_get_error_string_safe()); + service->get_service_name().c_str(), rcl_get_error_string_safe()); } } @@ -313,29 +302,26 @@ Executor::execute_client( { auto request_header = client->create_request_header(); std::shared_ptr response = client->create_response(); - bool taken = false; - rmw_ret_t status = rmw_take_response( + rcl_ret_t status = rcl_take_response( client->get_client_handle(), request_header.get(), - response.get(), - &taken); - if (status == RMW_RET_OK) { - if (taken) { + response.get()); + if (status != RCL_RET_SERVICE_TAKE_FAILED) { + if (status == RCL_RET_OK) { client->handle_response(request_header, response); } } else { fprintf(stderr, "[rclcpp::error] take response failed for client of service '%s': %s\n", - client->get_service_name().c_str(), rmw_get_error_string_safe()); + client->get_service_name().c_str(), rcl_get_error_string_safe()); } } void Executor::wait_for_work(std::chrono::nanoseconds timeout) { - memory_strategy_->clear_active_entities(); - // Collect the subscriptions and timers to be waited on + memory_strategy_->clear_handles(); bool has_invalid_weak_nodes = memory_strategy_->collect_entities(weak_nodes_); // Clean up any invalid nodes, if they were detected @@ -353,61 +339,75 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) ); } - memory_strategy_->clear_handles(); - // Use the number of subscriptions to allocate memory in the handles - rmw_subscriptions_t subscriber_handles; - subscriber_handles.subscriber_count = - memory_strategy_->fill_subscriber_handles(subscriber_handles.subscribers); - - rmw_services_t service_handles; - service_handles.service_count = - memory_strategy_->fill_service_handles(service_handles.services); - - rmw_clients_t client_handles; - client_handles.client_count = - memory_strategy_->fill_client_handles(client_handles.clients); - - // construct a guard conditions struct - rmw_guard_conditions_t guard_conditions; - guard_conditions.guard_condition_count = - memory_strategy_->fill_guard_condition_handles(guard_conditions.guard_conditions); - - rmw_time_t * wait_timeout = NULL; - rmw_time_t rmw_timeout; - - auto next_timer_duration = get_earliest_timer(); - // If the next timer timeout must preempt the requested timeout - // or if the requested timeout blocks forever, and there exists a valid timer, - // replace the requested timeout with the next timeout. - bool has_valid_timer = next_timer_duration >= std::chrono::nanoseconds::zero(); - if ((next_timer_duration < timeout || - timeout < std::chrono::nanoseconds::zero()) && has_valid_timer) + if (rcl_wait_set_resize_subscriptions( + &waitset_, memory_strategy_->number_of_ready_subscriptions()) != RCL_RET_OK) { - rmw_timeout.sec = - std::chrono::duration_cast(next_timer_duration).count(); - rmw_timeout.nsec = next_timer_duration.count() % (1000 * 1000 * 1000); - wait_timeout = &rmw_timeout; - } else if (timeout >= std::chrono::nanoseconds::zero()) { - // Convert timeout representation to rmw_time - rmw_timeout.sec = std::chrono::duration_cast(timeout).count(); - rmw_timeout.nsec = std::chrono::duration_cast(timeout).count() % - (1000 * 1000 * 1000); - wait_timeout = &rmw_timeout; + throw std::runtime_error( + std::string("Couldn't resize the number of subscriptions in waitset : ") + + rcl_get_error_string_safe()); } - // Now wait on the waitable subscriptions and timers - rmw_ret_t status = rmw_wait( - &subscriber_handles, - &guard_conditions, - &service_handles, - &client_handles, - waitset_, - wait_timeout); - if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) { - throw std::runtime_error(rmw_get_error_string_safe()); + if (rcl_wait_set_resize_services( + &waitset_, memory_strategy_->number_of_ready_services()) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("Couldn't resize the number of services in waitset : ") + + rcl_get_error_string_safe()); } - memory_strategy_->remove_null_handles(); + if (rcl_wait_set_resize_clients( + &waitset_, memory_strategy_->number_of_ready_clients()) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("Couldn't resize the number of clients in waitset : ") + + rcl_get_error_string_safe()); + } + + if (rcl_wait_set_resize_guard_conditions( + &waitset_, memory_strategy_->number_of_guard_conditions()) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("Couldn't resize the number of guard_conditions in waitset : ") + + rcl_get_error_string_safe()); + } + + if (rcl_wait_set_resize_timers( + &waitset_, memory_strategy_->number_of_ready_timers()) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("Couldn't resize the number of timers in waitset : ") + + rcl_get_error_string_safe()); + } + + if (!memory_strategy_->add_handles_to_waitset(&waitset_)) { + throw std::runtime_error("Couldn't fill waitset"); + } + rcl_ret_t status = + rcl_wait(&waitset_, std::chrono::duration_cast(timeout).count()); + if (status == RCL_RET_WAIT_SET_EMPTY) { + fprintf(stderr, "Warning: empty waitset received in rcl_wait(). This should never happen.\n"); + } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { + throw std::runtime_error(std::string("rcl_wait() failed: ") + rcl_get_error_string_safe()); + } + + // check the null handles in the waitset and remove them from the handles in memory strategy + // for callback-based entities + memory_strategy_->remove_null_handles(&waitset_); + if (rcl_wait_set_clear_subscriptions(&waitset_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear subscriptions from waitset"); + } + if (rcl_wait_set_clear_services(&waitset_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear servicess from waitset"); + } + if (rcl_wait_set_clear_clients(&waitset_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear clients from waitset"); + } + if (rcl_wait_set_clear_guard_conditions(&waitset_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear guard conditions from waitset"); + } + if (rcl_wait_set_clear_timers(&waitset_) != RCL_RET_OK) { + throw std::runtime_error("Couldn't clear timers from waitset"); + } } rclcpp::node::Node::SharedPtr @@ -470,7 +470,7 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) } for (auto & timer_ref : group->get_timer_ptrs()) { auto timer = timer_ref.lock(); - if (timer && timer->check_and_trigger()) { + if (timer && timer->is_ready()) { any_exec->timer = timer; any_exec->callback_group = group; node = get_node_by_group(group); @@ -481,37 +481,6 @@ Executor::get_next_timer(AnyExecutable::SharedPtr any_exec) } } -std::chrono::nanoseconds -Executor::get_earliest_timer() -{ - std::chrono::nanoseconds latest = std::chrono::nanoseconds::max(); - bool timers_empty = true; - for (auto & weak_node : weak_nodes_) { - auto node = weak_node.lock(); - if (!node) { - continue; - } - for (auto & weak_group : node->get_callback_groups()) { - auto group = weak_group.lock(); - if (!group || !group->can_be_taken_from().load()) { - continue; - } - for (auto & timer_ref : group->get_timer_ptrs()) { - timers_empty = false; - // Check the expected trigger time - auto timer = timer_ref.lock(); - if (timer && timer->time_until_trigger() < latest) { - latest = timer->time_until_trigger(); - } - } - } - } - if (timers_empty) { - return std::chrono::nanoseconds(-1); - } - return latest; -} - AnyExecutable::SharedPtr Executor::get_next_ready_executable() { diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index a196fa3..b068e68 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -18,7 +18,7 @@ using rclcpp::memory_strategy::MemoryStrategy; rclcpp::subscription::SubscriptionBase::SharedPtr MemoryStrategy::get_subscription_by_handle( - void * subscriber_handle, const WeakNodeVector & weak_nodes) + const rcl_subscription_t * subscriber_handle, const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -33,12 +33,10 @@ MemoryStrategy::get_subscription_by_handle( for (auto & weak_subscription : group->get_subscription_ptrs()) { auto subscription = weak_subscription.lock(); if (subscription) { - if (subscription->get_subscription_handle()->data == subscriber_handle) { + if (subscription->get_subscription_handle() == subscriber_handle) { return subscription; } - if (subscription->get_intra_process_subscription_handle() && - subscription->get_intra_process_subscription_handle()->data == subscriber_handle) - { + if (subscription->get_intra_process_subscription_handle() == subscriber_handle) { return subscription; } } @@ -49,7 +47,8 @@ MemoryStrategy::get_subscription_by_handle( } rclcpp::service::ServiceBase::SharedPtr -MemoryStrategy::get_service_by_handle(void * service_handle, const WeakNodeVector & weak_nodes) +MemoryStrategy::get_service_by_handle(const rcl_service_t * service_handle, + const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -62,7 +61,7 @@ MemoryStrategy::get_service_by_handle(void * service_handle, const WeakNodeVecto continue; } for (auto & service : group->get_service_ptrs()) { - if (service->get_service_handle()->data == service_handle) { + if (service->get_service_handle() == service_handle) { return service; } } @@ -72,7 +71,8 @@ MemoryStrategy::get_service_by_handle(void * service_handle, const WeakNodeVecto } rclcpp::client::ClientBase::SharedPtr -MemoryStrategy::get_client_by_handle(void * client_handle, const WeakNodeVector & weak_nodes) +MemoryStrategy::get_client_by_handle(const rcl_client_t * client_handle, + const WeakNodeVector & weak_nodes) { for (auto & weak_node : weak_nodes) { auto node = weak_node.lock(); @@ -86,7 +86,7 @@ MemoryStrategy::get_client_by_handle(void * client_handle, const WeakNodeVector } for (auto & weak_client : group->get_client_ptrs()) { auto client = weak_client.lock(); - if (client && client->get_client_handle()->data == client_handle) { + if (client && client->get_client_handle() == client_handle) { return client; } } diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index df4ab3d..ce4c7ac 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -36,13 +36,17 @@ Node::Node( bool use_intra_process_comms) : name_(node_name), context_(context), number_of_subscriptions_(0), number_of_timers_(0), number_of_services_(0), - use_intra_process_comms_(use_intra_process_comms), - notify_guard_condition_(rmw_create_guard_condition()) + use_intra_process_comms_(use_intra_process_comms) { - if (!notify_guard_condition_) { - throw std::runtime_error("Failed to create guard condition for node: " + - std::string(rmw_get_error_string_safe())); + rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options(); + if (rcl_guard_condition_init( + ¬ify_guard_condition_, guard_condition_options) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("Failed to create interrupt guard condition in Executor constructor: ") + + rcl_get_error_string_safe()); } + has_executor.store(false); size_t domain_id = 0; char * ros_domain_id = nullptr; @@ -56,9 +60,10 @@ Node::Node( if (ros_domain_id) { uint32_t number = strtoul(ros_domain_id, NULL, 0); if (number == (std::numeric_limits::max)()) { - if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) { - fprintf( - stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe()); + // Finalize the interrupt guard condition. + if (rcl_guard_condition_fini(¬ify_guard_condition_) != RCL_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); } throw std::runtime_error("failed to interpret ROS_DOMAIN_ID as integral number"); } @@ -68,27 +73,30 @@ Node::Node( #endif } - auto node = rmw_create_node(name_.c_str(), domain_id); - if (!node) { - // *INDENT-OFF* - if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) { + rcl_node_t * rcl_node = new rcl_node_t(rcl_get_zero_initialized_node()); + node_handle_.reset(rcl_node, [](rcl_node_t * node) { + if (rcl_node_fini(node) != RCL_RET_OK) { fprintf( - stderr, "Failed to destroy notify guard condition: %s\n", rmw_get_error_string_safe()); + stderr, "Error in destruction of rmw node handle: %s\n", rcl_get_error_string_safe()); } - throw std::runtime_error( - std::string("could not create node: ") + - rmw_get_error_string_safe()); - // *INDENT-ON* + delete node; + }); + rcl_node_options_t options = rcl_node_get_default_options(); + // TODO(jacquelinekay): Allocator options + options.domain_id = domain_id; + if (rcl_node_init(node_handle_.get(), name_.c_str(), &options) != RCL_RET_OK) { + // Finalize the interrupt guard condition. + if (rcl_guard_condition_fini(¬ify_guard_condition_) != RCL_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); + } + + throw std::runtime_error(std::string( + "Could not initialize rcl node: ") + rcl_get_error_string_safe()); } + // Initialize node handle shared_ptr with custom deleter. // *INDENT-OFF* - node_handle_.reset(node, [](rmw_node_t * node) { - auto ret = rmw_destroy_node(node); - if (ret != RMW_RET_OK) { - fprintf( - stderr, "Error in destruction of rmw node handle: %s\n", rmw_get_error_string_safe()); - } - }); // *INDENT-ON* using rclcpp::callback_group::CallbackGroupType; @@ -100,9 +108,10 @@ Node::Node( Node::~Node() { - if (rmw_destroy_guard_condition(notify_guard_condition_) != RMW_RET_OK) { - fprintf(stderr, "Warning! Failed to destroy guard condition in Node destructor: %s\n", - rmw_get_error_string_safe()); + // Finalize the interrupt guard condition. + if (rcl_guard_condition_fini(¬ify_guard_condition_) != RCL_RET_OK) { + fprintf(stderr, + "[rclcpp::error] failed to destroy guard condition: %s\n", rcl_get_error_string_safe()); } } @@ -297,7 +306,8 @@ Node::get_topic_names_and_types() const topic_names_and_types.topic_names = nullptr; topic_names_and_types.type_names = nullptr; - auto ret = rmw_get_topic_names_and_types(node_handle_.get(), &topic_names_and_types); + auto ret = rmw_get_topic_names_and_types(rcl_node_get_rmw_handle(node_handle_.get()), + &topic_names_and_types); if (ret != RMW_RET_OK) { // *INDENT-OFF* throw std::runtime_error( @@ -325,7 +335,8 @@ size_t Node::count_publishers(const std::string & topic_name) const { size_t count; - auto ret = rmw_count_publishers(node_handle_.get(), topic_name.c_str(), &count); + auto ret = rmw_count_publishers(rcl_node_get_rmw_handle(node_handle_.get()), + topic_name.c_str(), &count); if (ret != RMW_RET_OK) { // *INDENT-OFF* throw std::runtime_error( @@ -339,7 +350,8 @@ size_t Node::count_subscribers(const std::string & topic_name) const { size_t count; - auto ret = rmw_count_subscribers(node_handle_.get(), topic_name.c_str(), &count); + auto ret = rmw_count_subscribers(rcl_node_get_rmw_handle(node_handle_.get()), + topic_name.c_str(), &count); if (ret != RMW_RET_OK) { // *INDENT-OFF* throw std::runtime_error( @@ -356,7 +368,7 @@ Node::get_callback_groups() const return callback_groups_; } -rmw_guard_condition_t * Node::get_notify_guard_condition() const +const rcl_guard_condition_t * Node::get_notify_guard_condition() const { - return notify_guard_condition_; + return ¬ify_guard_condition_; } diff --git a/rclcpp/src/rclcpp/publisher.cpp b/rclcpp/src/rclcpp/publisher.cpp index 4ad9b87..d2d5728 100644 --- a/rclcpp/src/rclcpp/publisher.cpp +++ b/rclcpp/src/rclcpp/publisher.cpp @@ -33,42 +33,17 @@ using rclcpp::publisher::PublisherBase; PublisherBase::PublisherBase( - std::shared_ptr node_handle, - rmw_publisher_t * publisher_handle, + std::shared_ptr node_handle, std::string topic, size_t queue_size) -: node_handle_(node_handle), publisher_handle_(publisher_handle), - intra_process_publisher_handle_(nullptr), +: node_handle_(node_handle), topic_(topic), queue_size_(queue_size), intra_process_publisher_id_(0), store_intra_process_message_(nullptr) { - // Life time of this object is tied to the publisher handle. - if (rmw_get_gid_for_publisher(publisher_handle_, &rmw_gid_) != RMW_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) - throw std::runtime_error( - std::string("failed to get publisher gid: ") + rmw_get_error_string_safe()); - // *INDENT-ON* - } } PublisherBase::~PublisherBase() { - if (intra_process_publisher_handle_) { - if (rmw_destroy_publisher(node_handle_.get(), intra_process_publisher_handle_)) { - fprintf( - stderr, - "Error in destruction of intra process rmw publisher handle: %s\n", - rmw_get_error_string_safe()); - } - } - if (publisher_handle_) { - if (rmw_destroy_publisher(node_handle_.get(), publisher_handle_) != RMW_RET_OK) { - fprintf( - stderr, - "Error in destruction of rmw publisher handle: %s\n", - rmw_get_error_string_safe()); - } - } } const std::string & @@ -124,13 +99,29 @@ void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, StoreMessageCallbackT callback, - rmw_publisher_t * intra_process_publisher_handle) + const rcl_publisher_options_t & intra_process_options) { + if (rcl_publisher_init( + &intra_process_publisher_handle_, node_handle_.get(), + rclcpp::type_support::get_intra_process_message_msg_type_support(), + (topic_ + "__intra").c_str(), &intra_process_options) != RCL_RET_OK) + { + throw std::runtime_error( + std::string("could not create intra process publisher: ") + + rcl_get_error_string_safe()); + } + intra_process_publisher_id_ = intra_process_publisher_id; store_intra_process_message_ = callback; - intra_process_publisher_handle_ = intra_process_publisher_handle; // Life time of this object is tied to the publisher handle. - auto ret = rmw_get_gid_for_publisher(intra_process_publisher_handle_, &intra_process_rmw_gid_); + rmw_publisher_t * publisher_rmw_handle = rcl_publisher_get_rmw_handle( + &intra_process_publisher_handle_); + if (publisher_rmw_handle == nullptr) { + throw std::runtime_error(std::string( + "Failed to get rmw publisher handle") + rcl_get_error_string_safe()); + } + auto ret = rmw_get_gid_for_publisher( + publisher_rmw_handle, &intra_process_rmw_gid_); if (ret != RMW_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( diff --git a/rclcpp/src/rclcpp/service.cpp b/rclcpp/src/rclcpp/service.cpp index f2f588c..24d0ad6 100644 --- a/rclcpp/src/rclcpp/service.cpp +++ b/rclcpp/src/rclcpp/service.cpp @@ -28,23 +28,13 @@ using rclcpp::service::ServiceBase; ServiceBase::ServiceBase( - std::shared_ptr node_handle, - rmw_service_t * service_handle, + std::shared_ptr node_handle, const std::string service_name) -: node_handle_(node_handle), service_handle_(service_handle), service_name_(service_name) +: node_handle_(node_handle), service_name_(service_name) {} ServiceBase::~ServiceBase() -{ - if (service_handle_) { - if (rmw_destroy_service(service_handle_) != RMW_RET_OK) { - std::stringstream ss; - ss << "Error in destruction of rmw service_handle_ handle: " << - rmw_get_error_string_safe() << '\n'; - (std::cerr << ss.str()).flush(); - } - } -} +{} std::string ServiceBase::get_service_name() @@ -52,8 +42,8 @@ ServiceBase::get_service_name() return this->service_name_; } -const rmw_service_t * +const rcl_service_t * ServiceBase::get_service_handle() { - return this->service_handle_; + return &service_handle_; } diff --git a/rclcpp/src/rclcpp/subscription.cpp b/rclcpp/src/rclcpp/subscription.cpp index caf6578..0bd7cc9 100644 --- a/rclcpp/src/rclcpp/subscription.cpp +++ b/rclcpp/src/rclcpp/subscription.cpp @@ -24,13 +24,10 @@ using rclcpp::subscription::SubscriptionBase; SubscriptionBase::SubscriptionBase( - std::shared_ptr node_handle, - rmw_subscription_t * subscription_handle, + std::shared_ptr node_handle, const std::string & topic_name, bool ignore_local_publications) -: intra_process_subscription_handle_(nullptr), - node_handle_(node_handle), - subscription_handle_(subscription_handle), +: node_handle_(node_handle), topic_name_(topic_name), ignore_local_publications_(ignore_local_publications) { @@ -40,22 +37,19 @@ SubscriptionBase::SubscriptionBase( SubscriptionBase::~SubscriptionBase() { - if (subscription_handle_) { - if (rmw_destroy_subscription(node_handle_.get(), subscription_handle_) != RMW_RET_OK) { - std::stringstream ss; - ss << "Error in destruction of rmw subscription handle: " << - rmw_get_error_string_safe() << '\n'; - (std::cerr << ss.str()).flush(); - } + if (rcl_subscription_fini(&subscription_handle_, node_handle_.get()) != RCL_RET_OK) { + std::stringstream ss; + ss << "Error in destruction of rcl subscription handle: " << + rcl_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); } - if (intra_process_subscription_handle_) { - auto ret = rmw_destroy_subscription(node_handle_.get(), intra_process_subscription_handle_); - if (ret != RMW_RET_OK) { - std::stringstream ss; - ss << "Error in destruction of rmw intra process subscription handle: " << - rmw_get_error_string_safe() << '\n'; - (std::cerr << ss.str()).flush(); - } + if (rcl_subscription_fini( + &intra_process_subscription_handle_, node_handle_.get()) != RCL_RET_OK) + { + std::stringstream ss; + ss << "Error in destruction of rmw intra process subscription handle: " << + rcl_get_error_string_safe() << '\n'; + (std::cerr << ss.str()).flush(); } } @@ -65,14 +59,14 @@ SubscriptionBase::get_topic_name() const return this->topic_name_; } -const rmw_subscription_t * +const rcl_subscription_t * SubscriptionBase::get_subscription_handle() const { - return subscription_handle_; + return &subscription_handle_; } -const rmw_subscription_t * +const rcl_subscription_t * SubscriptionBase::get_intra_process_subscription_handle() const { - return intra_process_subscription_handle_; + return &intra_process_subscription_handle_; } diff --git a/rclcpp/src/rclcpp/timer.cpp b/rclcpp/src/rclcpp/timer.cpp index 8b1b09c..1128bbf 100644 --- a/rclcpp/src/rclcpp/timer.cpp +++ b/rclcpp/src/rclcpp/timer.cpp @@ -15,13 +15,19 @@ #include "rclcpp/timer.hpp" #include +#include using rclcpp::timer::TimerBase; TimerBase::TimerBase(std::chrono::nanoseconds period) -: period_(period), - canceled_(false) -{} +{ + if (rcl_timer_init( + &timer_handle_, period.count(), nullptr, + rcl_get_default_allocator()) != RCL_RET_OK) + { + fprintf(stderr, "Couldn't initialize rcl timer handle: %s\n", rcl_get_error_string_safe()); + } +} TimerBase::~TimerBase() {} @@ -29,5 +35,35 @@ TimerBase::~TimerBase() void TimerBase::cancel() { - this->canceled_ = true; + if (rcl_timer_cancel(&timer_handle_) != RCL_RET_OK) { + throw std::runtime_error(std::string("Couldn't cancel timer: ") + rcl_get_error_string_safe()); + } +} + +bool +TimerBase::is_ready() +{ + bool ready = false; + if (rcl_timer_is_ready(&timer_handle_, &ready) != RCL_RET_OK) { + throw std::runtime_error(std::string("Failed to check timer: ") + rcl_get_error_string_safe()); + } + return ready; +} + +std::chrono::nanoseconds +TimerBase::time_until_trigger() +{ + int64_t time_until_next_call = 0; + if (rcl_timer_get_time_until_next_call(&timer_handle_, &time_until_next_call) != RCL_RET_OK) { + throw std::runtime_error( + std::string("Timer could not get time until next call: ") + + rcl_get_error_string_safe()); + } + return std::chrono::nanoseconds(time_until_next_call); +} + +const rcl_timer_t * +TimerBase::get_timer_handle() +{ + return &timer_handle_; } diff --git a/rclcpp/src/rclcpp/utilities.cpp b/rclcpp/src/rclcpp/utilities.cpp index bc0b365..b3c7c2b 100644 --- a/rclcpp/src/rclcpp/utilities.cpp +++ b/rclcpp/src/rclcpp/utilities.cpp @@ -22,6 +22,9 @@ #include #include +#include "rcl/error_handling.h" +#include "rcl/rcl.h" + #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -33,7 +36,8 @@ /// Represent the status of the global interrupt signal. static volatile sig_atomic_t g_signal_status = 0; /// Guard condition for interrupting the rmw implementation when the global interrupt signal fired. -static rmw_guard_condition_t * g_sigint_guard_cond_handle = rmw_create_guard_condition(); +static rcl_guard_condition_t g_sigint_guard_cond_handle = + rcl_get_zero_initialized_guard_condition(); /// Condition variable for timed sleep (see sleep_for). static std::condition_variable g_interrupt_condition_variable; static std::atomic g_is_interrupted(false); @@ -77,10 +81,10 @@ signal_handler(int signal_value) } #endif g_signal_status = signal_value; - rmw_ret_t status = rmw_trigger_guard_condition(g_sigint_guard_cond_handle); - if (status != RMW_RET_OK) { + rcl_ret_t status = rcl_trigger_guard_condition(&g_sigint_guard_cond_handle); + if (status != RCL_RET_OK) { fprintf(stderr, - "[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe()); + "[rclcpp::error] failed to trigger guard condition: %s\n", rcl_get_error_string_safe()); } g_is_interrupted.store(true); g_interrupt_condition_variable.notify_all(); @@ -89,14 +93,11 @@ signal_handler(int signal_value) void rclcpp::utilities::init(int argc, char * argv[]) { - (void)argc; - (void)argv; g_is_interrupted.store(false); - rmw_ret_t status = rmw_init(); - if (status != RMW_RET_OK) { + if (rcl_init(argc, argv, rcl_get_default_allocator()) != RCL_RET_OK) { // *INDENT-OFF* (prevent uncrustify from making unecessary indents here) throw std::runtime_error( - std::string("failed to initialize rmw implementation: ") + rmw_get_error_string_safe()); + std::string("failed to initialize rmw implementation: ") + rcl_get_error_string_safe()); // *INDENT-ON* } #ifdef HAS_SIGACTION @@ -138,6 +139,11 @@ rclcpp::utilities::init(int argc, char * argv[]) error_string); // *INDENT-ON* } + rcl_guard_condition_options_t options = rcl_guard_condition_get_default_options(); + if (rcl_guard_condition_init(&g_sigint_guard_cond_handle, options) != RCL_RET_OK) { + throw std::runtime_error(std::string( + "Couldn't initialize guard condition: ") + rcl_get_error_string_safe()); + } } bool @@ -150,8 +156,7 @@ void rclcpp::utilities::shutdown() { g_signal_status = SIGINT; - rmw_ret_t status = rmw_trigger_guard_condition(g_sigint_guard_cond_handle); - if (status != RMW_RET_OK) { + if (rcl_trigger_guard_condition(&g_sigint_guard_cond_handle) != RCL_RET_OK) { fprintf(stderr, "[rclcpp::error] failed to trigger guard condition: %s\n", rmw_get_error_string_safe()); } @@ -159,10 +164,10 @@ rclcpp::utilities::shutdown() g_interrupt_condition_variable.notify_all(); } -rmw_guard_condition_t * +rcl_guard_condition_t * rclcpp::utilities::get_global_sigint_guard_condition() { - return ::g_sigint_guard_cond_handle; + return &::g_sigint_guard_cond_handle; } bool