diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3da0e46..1bd484f 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -260,6 +260,17 @@ if(BUILD_TESTING) ${PROJECT_NAME} ) endif() + ament_add_gtest(test_serialized_message_allocator test/test_serialized_message_allocator.cpp + ENV RCL_ASSERT_RMW_ID_MATCHES=${rmw_implementation}) + if(TARGET test_serialized_message_allocator) + target_include_directories(test_serialized_message_allocator PUBLIC + ${test_msgs_INCLUDE_DIRS} + ) + target_link_libraries(test_serialized_message_allocator + ${PROJECT_NAME} + ${test_msgs_LIBRARIES} + ) + endif() ament_add_gtest(test_service test/test_service.cpp) if(TARGET test_service) target_include_directories(test_service PUBLIC @@ -280,6 +291,16 @@ if(BUILD_TESTING) ) target_link_libraries(test_subscription ${PROJECT_NAME}) endif() + find_package(test_msgs REQUIRED) + ament_add_gtest(test_subscription_traits test/test_subscription_traits.cpp) + if(TARGET test_subscription_traits) + target_include_directories(test_subscription_traits PUBLIC + ${rcl_INCLUDE_DIRS} + ) + ament_target_dependencies(test_subscription_traits + "test_msgs" + ) + endif() ament_add_gtest(test_find_weak_nodes test/test_find_weak_nodes.cpp) if(TARGET test_find_weak_nodes) target_include_directories(test_find_weak_nodes PUBLIC diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index 5125d9b..a4ea581 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -26,8 +26,13 @@ namespace rclcpp { -template -typename rclcpp::Subscription::SharedPtr +template< + typename MessageT, + typename CallbackT, + typename AllocatorT, + typename CallbackMessageT, + typename SubscriptionT = rclcpp::Subscription> +typename std::shared_ptr create_subscription( rclcpp::node_interfaces::NodeTopicsInterface * node_topics, const std::string & topic_name, @@ -36,7 +41,8 @@ create_subscription( rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, bool use_intra_process_comms, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, AllocatorT>::SharedPtr msg_mem_strat, typename std::shared_ptr allocator) { @@ -44,8 +50,8 @@ create_subscription( subscription_options.qos = qos_profile; subscription_options.ignore_local_publications = ignore_local_publications; - auto factory = - rclcpp::create_subscription_factory( + auto factory = rclcpp::create_subscription_factory + ( std::forward(callback), msg_mem_strat, allocator); auto sub = node_topics->create_subscription( diff --git a/rclcpp/include/rclcpp/message_memory_strategy.hpp b/rclcpp/include/rclcpp/message_memory_strategy.hpp index fafd002..c4de703 100644 --- a/rclcpp/include/rclcpp/message_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/message_memory_strategy.hpp @@ -18,10 +18,15 @@ #include #include +#include "rcl/types.h" + #include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/exceptions.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/visibility_control.hpp" +#include "rmw/serialized_message.h" + namespace rclcpp { namespace message_memory_strategy @@ -39,14 +44,29 @@ public: using MessageAlloc = typename MessageAllocTraits::allocator_type; using MessageDeleter = allocator::Deleter; + using SerializedMessageAllocTraits = allocator::AllocRebind; + using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type; + using SerializedMessageDeleter = + allocator::Deleter; + + using BufferAllocTraits = allocator::AllocRebind; + using BufferAlloc = typename BufferAllocTraits::allocator_type; + using BufferDeleter = allocator::Deleter; + MessageMemoryStrategy() { message_allocator_ = std::make_shared(); + serialized_message_allocator_ = std::make_shared(); + buffer_allocator_ = std::make_shared(); + rcutils_allocator_ = allocator::get_rcl_allocator(*buffer_allocator_.get()); } explicit MessageMemoryStrategy(std::shared_ptr allocator) { message_allocator_ = std::make_shared(*allocator.get()); + serialized_message_allocator_ = std::make_shared(*allocator.get()); + buffer_allocator_ = std::make_shared(*allocator.get()); + rcutils_allocator_ = allocator::get_rcl_allocator(*buffer_allocator_.get()); } /// Default factory method @@ -62,6 +82,37 @@ public: return std::allocate_shared(*message_allocator_.get()); } + virtual std::shared_ptr borrow_serialized_message(size_t capacity) + { + auto msg = new rcl_serialized_message_t; + *msg = rmw_get_zero_initialized_serialized_message(); + auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + + auto serialized_msg = std::shared_ptr(msg, + [](rmw_serialized_message_t * msg) { + auto ret = rmw_serialized_message_fini(msg); + delete msg; + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "leaking memory"); + } + }); + + return serialized_msg; + } + + virtual std::shared_ptr borrow_serialized_message() + { + return borrow_serialized_message(default_buffer_capacity_); + } + + virtual void set_default_buffer_capacity(size_t capacity) + { + default_buffer_capacity_ = capacity; + } + /// Release ownership of the message, which will deallocate it if it has no more owners. /** \param[in] msg Shared pointer to the message we are returning. */ virtual void return_message(std::shared_ptr & msg) @@ -69,8 +120,22 @@ public: msg.reset(); } + virtual void return_serialized_message(std::shared_ptr & serialized_msg) + { + serialized_msg.reset(); + } + std::shared_ptr message_allocator_; MessageDeleter message_deleter_; + + std::shared_ptr serialized_message_allocator_; + SerializedMessageDeleter serialized_message_deleter_; + + std::shared_ptr buffer_allocator_; + BufferDeleter buffer_deleter_; + size_t default_buffer_capacity_ = 0; + + rcutils_allocator_t rcutils_allocator_; }; } // namespace message_memory_strategy diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index 87974ef..1dfa12e 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -53,6 +53,7 @@ #include "rclcpp/publisher.hpp" #include "rclcpp/service.hpp" #include "rclcpp/subscription.hpp" +#include "rclcpp/subscription_traits.hpp" #include "rclcpp/time.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" @@ -183,7 +184,8 @@ public: typename MessageT, typename CallbackT, typename Alloc = std::allocator, - typename SubscriptionT = rclcpp::Subscription> + typename SubscriptionT = rclcpp::Subscription< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>> std::shared_ptr create_subscription( const std::string & topic_name, @@ -191,7 +193,8 @@ public: const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat = nullptr, std::shared_ptr allocator = nullptr); @@ -214,15 +217,17 @@ public: typename MessageT, typename CallbackT, typename Alloc = std::allocator, - typename SubscriptionT = rclcpp::Subscription> + typename SubscriptionT = rclcpp::Subscription< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>> std::shared_ptr create_subscription( const std::string & topic_name, - size_t qos_history_depth, CallbackT && callback, + size_t qos_history_depth, rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr, bool ignore_local_publications = false, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat = nullptr, std::shared_ptr allocator = nullptr); diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index e841f61..716266f 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -82,7 +82,11 @@ Node::create_publisher( allocator); } -template +template< + typename MessageT, + typename CallbackT, + typename Alloc, + typename SubscriptionT> std::shared_ptr Node::create_subscription( const std::string & topic_name, @@ -90,20 +94,23 @@ Node::create_subscription( const rmw_qos_profile_t & qos_profile, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat, std::shared_ptr allocator) { + using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type::type; + if (!allocator) { allocator = std::make_shared(); } if (!msg_mem_strat) { using rclcpp::message_memory_strategy::MessageMemoryStrategy; - msg_mem_strat = MessageMemoryStrategy::create_default(); + msg_mem_strat = MessageMemoryStrategy::create_default(); } - return rclcpp::create_subscription( + return rclcpp::create_subscription( this->node_topics_.get(), topic_name, std::forward(callback), @@ -115,21 +122,26 @@ Node::create_subscription( allocator); } -template +template< + typename MessageT, + typename CallbackT, + typename Alloc, + typename SubscriptionT> std::shared_ptr Node::create_subscription( const std::string & topic_name, - size_t qos_history_depth, CallbackT && callback, + size_t qos_history_depth, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr msg_mem_strat, std::shared_ptr allocator) { rmw_qos_profile_t qos = rmw_qos_profile_default; qos.depth = qos_history_depth; - return this->create_subscription( + return this->create_subscription( topic_name, std::forward(callback), qos, diff --git a/rclcpp/include/rclcpp/parameter_client.hpp b/rclcpp/include/rclcpp/parameter_client.hpp index cfd9a8e..ac3250b 100644 --- a/rclcpp/include/rclcpp/parameter_client.hpp +++ b/rclcpp/include/rclcpp/parameter_client.hpp @@ -120,8 +120,9 @@ public: auto msg_mem_strat = MessageMemoryStrategy::create_default(); + using rcl_interfaces::msg::ParameterEvent; return rclcpp::create_subscription< - rcl_interfaces::msg::ParameterEvent, CallbackT, Alloc, SubscriptionT>( + ParameterEvent, CallbackT, Alloc, ParameterEvent, SubscriptionT>( this->node_topics_interface_.get(), "parameter_events", std::forward(callback), diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index f505bef..ec26790 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -205,11 +205,8 @@ public: ipm.publisher_id = intra_process_publisher_id_; ipm.message_sequence = message_seq; auto status = rcl_publish(&intra_process_publisher_handle_, &ipm); - if (status != RCL_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here) - throw std::runtime_error( - std::string("failed to publish intra process message: ") + rcl_get_error_string_safe()); - // *INDENT-ON* + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message"); } } else { // Always destroy the message, even if we don't consume it, for consistency. @@ -279,6 +276,19 @@ public: return this->publish(*msg); } + void + publish(const rcl_serialized_message_t * serialized_msg) + { + if (store_intra_process_message_) { + // TODO(Karsten1987): support serialized message passed by intraprocess + throw std::runtime_error("storing serialized messages in intra process is not supported yet"); + } + auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg); + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message"); + } + } + std::shared_ptr get_allocator() const { return message_allocator_; @@ -289,11 +299,8 @@ protected: do_inter_process_publish(const MessageT * msg) { auto status = rcl_publish(&publisher_handle_, msg); - if (status != RCL_RET_OK) { - // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here) - throw std::runtime_error( - std::string("failed to publish message: ") + rcl_get_error_string_safe()); - // *INDENT-ON* + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message"); } } diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 91c1248..e368550 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -29,12 +29,13 @@ #include "rcl_interfaces/msg/intra_process_message.hpp" +#include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/exceptions.hpp" +#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" -#include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" -#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -64,7 +65,8 @@ public: std::shared_ptr node_handle, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, - const rcl_subscription_options_t & subscription_options); + const rcl_subscription_options_t & subscription_options, + bool is_serialized = false); /// Default destructor. RCLCPP_PUBLIC @@ -91,6 +93,12 @@ public: /** \return Shared pointer to the fresh message. */ virtual std::shared_ptr create_message() = 0; + + /// Borrow a new serialized message + /** \return Shared pointer to a rcl_message_serialized_t. */ + virtual std::shared_ptr + create_serialized_message() = 0; + /// Check if we need to handle the message, and execute the callback if we do. /** * \param[in] message Shared pointer to the message to handle. @@ -104,11 +112,22 @@ public: virtual void return_message(std::shared_ptr & message) = 0; + /// Return the message borrowed in create_serialized_message. + /** \param[in] message Shared pointer to the returned message. */ + virtual void + return_serialized_message(std::shared_ptr & message) = 0; + virtual void handle_intra_process_message( rcl_interfaces::msg::IntraProcessMessage & ipm, const rmw_message_info_t & message_info) = 0; + const rosidl_message_type_support_t & + get_message_type_support_handle() const; + + bool + is_serialized() const; + protected: std::shared_ptr intra_process_subscription_handle_; std::shared_ptr subscription_handle_; @@ -116,19 +135,24 @@ protected: private: RCLCPP_DISABLE_COPY(SubscriptionBase) + + rosidl_message_type_support_t type_support_; + bool is_serialized_; }; /// Subscription implementation, templated on the type of message this subscription receives. -template> +template< + typename CallbackMessageT, + typename Alloc = std::allocator> class Subscription : public SubscriptionBase { friend class rclcpp::node_interfaces::NodeTopicsInterface; public: - using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocTraits = allocator::AllocRebind; using MessageAlloc = typename MessageAllocTraits::allocator_type; - using MessageDeleter = allocator::Deleter; - using MessageUniquePtr = std::unique_ptr; + using MessageDeleter = allocator::Deleter; + using MessageUniquePtr = std::unique_ptr; RCLCPP_SMART_PTR_DEFINITIONS(Subscription) @@ -144,17 +168,19 @@ public: */ Subscription( std::shared_ptr node_handle, + const rosidl_message_type_support_t & ts, const std::string & topic_name, const rcl_subscription_options_t & subscription_options, - AnySubscriptionCallback callback, - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr - memory_strategy = message_memory_strategy::MessageMemoryStrategy callback, + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr + memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) : SubscriptionBase( node_handle, - *rosidl_typesupport_cpp::get_message_type_support_handle(), + ts, topic_name, - subscription_options), + subscription_options, + rclcpp::subscription_traits::is_serialized_subscription_argument::value), any_callback_(callback), message_memory_strategy_(memory_strategy), get_intra_process_message_callback_(nullptr), @@ -167,11 +193,12 @@ public: * \param[in] message_memory_strategy Shared pointer to the memory strategy to set. */ void set_message_memory_strategy( - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy) { message_memory_strategy_ = message_memory_strategy; } + std::shared_ptr create_message() { /* The default message memory strategy provides a dynamically allocated message on each call to @@ -181,6 +208,11 @@ public: return message_memory_strategy_->borrow_message(); } + std::shared_ptr create_serialized_message() + { + return message_memory_strategy_->borrow_serialized_message(); + } + void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) { if (matches_any_intra_process_publishers_) { @@ -190,7 +222,7 @@ public: return; } } - auto typed_message = std::static_pointer_cast(message); + auto typed_message = std::static_pointer_cast(message); any_callback_.dispatch(typed_message, message_info); } @@ -198,10 +230,15 @@ public: /** \param message message to be returned */ void return_message(std::shared_ptr & message) { - auto typed_message = std::static_pointer_cast(message); + auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); } + void return_serialized_message(std::shared_ptr & message) + { + message_memory_strategy_->return_serialized_message(message); + } + void handle_intra_process_message( rcl_interfaces::msg::IntraProcessMessage & ipm, const rmw_message_info_t & message_info) @@ -278,8 +315,8 @@ public: private: RCLCPP_DISABLE_COPY(Subscription) - AnySubscriptionCallback any_callback_; - typename message_memory_strategy::MessageMemoryStrategy::SharedPtr + AnySubscriptionCallback any_callback_; + typename message_memory_strategy::MessageMemoryStrategy::SharedPtr message_memory_strategy_; GetMessageCallbackType get_intra_process_message_callback_; diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index 024f329..66b452d 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -25,6 +25,7 @@ #include "rosidl_typesupport_cpp/message_type_support.hpp" #include "rclcpp/subscription.hpp" +#include "rclcpp/subscription_traits.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/visibility_control.hpp" @@ -65,22 +66,28 @@ struct SubscriptionFactory }; /// Return a SubscriptionFactory with functions for creating a SubscriptionT. -template +template< + typename MessageT, + typename CallbackT, + typename Alloc, + typename CallbackMessageT, + typename SubscriptionT> SubscriptionFactory create_subscription_factory( CallbackT && callback, - typename rclcpp::message_memory_strategy::MessageMemoryStrategy::SharedPtr + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + CallbackMessageT, Alloc>::SharedPtr msg_mem_strat, std::shared_ptr allocator) { SubscriptionFactory factory; using rclcpp::AnySubscriptionCallback; - AnySubscriptionCallback any_subscription_callback(allocator); + AnySubscriptionCallback any_subscription_callback(allocator); any_subscription_callback.set(std::forward(callback)); auto message_alloc = - std::make_shared::MessageAlloc>(); + std::make_shared::MessageAlloc>(); // factory function that creates a MessageT specific SubscriptionT factory.create_typed_subscription = @@ -91,13 +98,14 @@ create_subscription_factory( ) -> rclcpp::SubscriptionBase::SharedPtr { subscription_options.allocator = - rclcpp::allocator::get_rcl_allocator(*message_alloc.get()); + rclcpp::allocator::get_rcl_allocator(*message_alloc.get()); using rclcpp::Subscription; using rclcpp::SubscriptionBase; - auto sub = Subscription::make_shared( + auto sub = Subscription::make_shared( node_base->get_shared_rcl_node_handle(), + *rosidl_typesupport_cpp::get_message_type_support_handle(), topic_name, subscription_options, any_subscription_callback, @@ -117,7 +125,7 @@ create_subscription_factory( uint64_t intra_process_subscription_id = ipm->add_subscription(subscription); auto intra_process_options = rcl_subscription_get_default_options(); - intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator( + intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator( *message_alloc.get()); intra_process_options.qos = subscription_options.qos; intra_process_options.ignore_local_publications = false; @@ -128,7 +136,7 @@ create_subscription_factory( uint64_t publisher_id, uint64_t message_sequence, uint64_t subscription_id, - typename rclcpp::Subscription::MessageUniquePtr & message) + typename rclcpp::Subscription::MessageUniquePtr & message) { auto ipm = weak_ipm.lock(); if (!ipm) { @@ -136,7 +144,7 @@ create_subscription_factory( throw std::runtime_error( "intra process take called after destruction of intra process manager"); } - ipm->take_intra_process_message( + ipm->take_intra_process_message( publisher_id, message_sequence, subscription_id, message); }; diff --git a/rclcpp/include/rclcpp/subscription_traits.hpp b/rclcpp/include/rclcpp/subscription_traits.hpp new file mode 100644 index 0000000..3a73228 --- /dev/null +++ b/rclcpp/include/rclcpp/subscription_traits.hpp @@ -0,0 +1,80 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SUBSCRIPTION_TRAITS_HPP_ +#define RCLCPP__SUBSCRIPTION_TRAITS_HPP_ + +#include + +#include "rclcpp/function_traits.hpp" + +namespace rclcpp +{ +namespace subscription_traits +{ + +/* + * The current version of uncrustify has a misinterpretion here + * between `:` used for inheritance vs for initializer list + * The result is that whenever a templated struct is used, + * the colon has to be without any whitespace next to it whereas + * when no template is used, the colon has to be separated by a space. + * Cheers! + */ +template +struct is_serialized_subscription_argument : std::false_type +{}; + +template<> +struct is_serialized_subscription_argument: std::true_type +{}; + +template<> +struct is_serialized_subscription_argument> + : std::true_type +{}; + +template +struct is_serialized_subscription : is_serialized_subscription_argument +{}; + +template +struct is_serialized_callback + : is_serialized_subscription_argument< + typename rclcpp::function_traits::function_traits::template argument_type<0>> +{}; + +template +struct extract_message_type +{ + using type = typename std::remove_cv::type; +}; + +template +struct extract_message_type>: extract_message_type +{}; + +template +struct extract_message_type>: extract_message_type +{}; + +template +struct has_message_type : extract_message_type< + typename rclcpp::function_traits::function_traits::template argument_type<0>> +{}; + +} // namespace subscription_traits +} // namespace rclcpp + +#endif // RCLCPP__SUBSCRIPTION_TRAITS_HPP_ diff --git a/rclcpp/package.xml b/rclcpp/package.xml index 6e4d4c7..2ff16f9 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -36,6 +36,7 @@ ament_lint_common rmw rmw_implementation_cmake + test_msgs ament_cmake diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 13b2bd8..fc46f74 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -281,22 +281,41 @@ void Executor::execute_subscription( rclcpp::SubscriptionBase::SharedPtr subscription) { - std::shared_ptr message = subscription->create_message(); rmw_message_info_t message_info; + message_info.from_intra_process = false; - auto ret = rcl_take(subscription->get_subscription_handle().get(), + if (subscription->is_serialized()) { + auto serialized_msg = subscription->create_serialized_message(); + auto ret = rcl_take_serialized_message( + subscription->get_subscription_handle().get(), + serialized_msg.get(), &message_info); + if (RCL_RET_OK == ret) { + auto void_serialized_msg = std::static_pointer_cast(serialized_msg); + subscription->handle_message(void_serialized_msg, message_info); + } else if (RCL_RET_SUBSCRIPTION_TAKE_FAILED != ret) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "take_serialized failed for subscription on topic '%s': %s", + subscription->get_topic_name(), rcl_get_error_string_safe()); + rcl_reset_error(); + } + subscription->return_serialized_message(serialized_msg); + } else { + std::shared_ptr message = subscription->create_message(); + auto ret = rcl_take( + subscription->get_subscription_handle().get(), message.get(), &message_info); - if (ret == RCL_RET_OK) { - message_info.from_intra_process = false; - subscription->handle_message(message, message_info); - } else if (ret != RCL_RET_SUBSCRIPTION_TAKE_FAILED) { - RCUTILS_LOG_ERROR_NAMED( - "rclcpp", - "take failed for subscription on topic '%s': %s", - subscription->get_topic_name(), rcl_get_error_string_safe()); - rcl_reset_error(); + if (RCL_RET_OK == ret) { + subscription->handle_message(message, message_info); + } else if (RCL_RET_SUBSCRIPTION_TAKE_FAILED != ret) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "could not deserialize serialized message on topic '%s': %s", + subscription->get_topic_name(), rcl_get_error_string_safe()); + rcl_reset_error(); + } + subscription->return_message(message); } - subscription->return_message(message); } void diff --git a/rclcpp/src/rclcpp/parameter_client.cpp b/rclcpp/src/rclcpp/parameter_client.cpp index 3ba62ff..be3666a 100644 --- a/rclcpp/src/rclcpp/parameter_client.cpp +++ b/rclcpp/src/rclcpp/parameter_client.cpp @@ -399,8 +399,9 @@ SyncParametersClient::set_parameters( { auto f = async_parameters_client_->set_parameters(parameters); + auto node_base_interface = node_->get_node_base_interface(); using rclcpp::executors::spin_node_until_future_complete; - if (spin_node_until_future_complete(*executor_, node_->get_node_base_interface(), f) == + if (spin_node_until_future_complete(*executor_, node_base_interface, f) == rclcpp::executor::FutureReturnCode::SUCCESS) { return f.get(); diff --git a/rclcpp/src/rclcpp/subscription.cpp b/rclcpp/src/rclcpp/subscription.cpp index 83401c0..f1448e1 100644 --- a/rclcpp/src/rclcpp/subscription.cpp +++ b/rclcpp/src/rclcpp/subscription.cpp @@ -31,8 +31,11 @@ SubscriptionBase::SubscriptionBase( std::shared_ptr node_handle, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, - const rcl_subscription_options_t & subscription_options) -: node_handle_(node_handle) + const rcl_subscription_options_t & subscription_options, + bool is_serialized) +: node_handle_(node_handle), + type_support_(type_support_handle), + is_serialized_(is_serialized) { std::weak_ptr weak_node_handle(node_handle_); auto custom_deletor = [weak_node_handle](rcl_subscription_t * rcl_subs) @@ -111,3 +114,15 @@ SubscriptionBase::get_intra_process_subscription_handle() const { return intra_process_subscription_handle_; } + +const rosidl_message_type_support_t & +SubscriptionBase::get_message_type_support_handle() const +{ + return type_support_; +} + +bool +SubscriptionBase::is_serialized() const +{ + return is_serialized_; +} diff --git a/rclcpp/src/rclcpp/time_source.cpp b/rclcpp/src/rclcpp/time_source.cpp index 3a09716..b18b56c 100644 --- a/rclcpp/src/rclcpp/time_source.cpp +++ b/rclcpp/src/rclcpp/time_source.cpp @@ -74,7 +74,9 @@ void TimeSource::attachNode( auto cb = std::bind(&TimeSource::clock_cb, this, std::placeholders::_1); - clock_subscription_ = rclcpp::create_subscription( + clock_subscription_ = rclcpp::create_subscription< + MessageT, decltype(cb), Alloc, MessageT, SubscriptionT + >( node_topics_.get(), topic_name, std::move(cb), diff --git a/rclcpp/test/test_serialized_message_allocator.cpp b/rclcpp/test/test_serialized_message_allocator.cpp new file mode 100644 index 0000000..ae65f7f --- /dev/null +++ b/rclcpp/test/test_serialized_message_allocator.cpp @@ -0,0 +1,67 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rcl/types.h" + +#include "test_msgs/msg/empty.hpp" + +TEST(TestSerializedMessageAllocator, default_allocator) { + using DummyMessageT = float; + auto mem_strategy = + rclcpp::message_memory_strategy::MessageMemoryStrategy::create_default(); + + auto msg0 = mem_strategy->borrow_serialized_message(); + ASSERT_EQ(msg0->buffer_capacity, 0u); + mem_strategy->return_serialized_message(msg0); + + auto msg100 = mem_strategy->borrow_serialized_message(100); + ASSERT_EQ(msg100->buffer_capacity, 100u); + mem_strategy->return_serialized_message(msg100); + + auto msg200 = mem_strategy->borrow_serialized_message(); + auto ret = rmw_serialized_message_resize(msg200.get(), 200); + ASSERT_EQ(RCL_RET_OK, ret); + EXPECT_EQ(0u, msg200->buffer_length); + EXPECT_EQ(200u, msg200->buffer_capacity); + mem_strategy->return_serialized_message(msg200); + + auto msg1000 = mem_strategy->borrow_serialized_message(1000); + ASSERT_EQ(msg1000->buffer_capacity, 1000u); + ret = rmw_serialized_message_resize(msg1000.get(), 2000); + ASSERT_EQ(RCL_RET_OK, ret); + EXPECT_EQ(2000u, msg1000->buffer_capacity); + mem_strategy->return_serialized_message(msg1000); +} + +TEST(TestSerializedMessageAllocator, borrow_from_subscription) { + rclcpp::init(0, NULL); + + auto node = std::make_shared("test_serialized_message_allocator_node"); + std::shared_ptr sub = + node->create_subscription("~/dummy_topic", []( + std::shared_ptr test_msg) {(void) test_msg;}); + + auto msg0 = sub->create_serialized_message(); + EXPECT_EQ(0u, msg0->buffer_capacity); + sub->return_serialized_message(msg0); + + rclcpp::shutdown(); +} diff --git a/rclcpp/test/test_subscription_traits.cpp b/rclcpp/test/test_subscription_traits.cpp new file mode 100644 index 0000000..3112caf --- /dev/null +++ b/rclcpp/test/test_subscription_traits.cpp @@ -0,0 +1,178 @@ +// Copyright 2017 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rcl/types.h" + +#include "rclcpp/allocator/allocator_common.hpp" +#include "rclcpp/subscription_traits.hpp" + +#include "test_msgs/msg/empty.hpp" + +void serialized_callback_copy(rcl_serialized_message_t unused) +{ + (void) unused; +} + +void serialized_callback_shared_ptr(std::shared_ptr unused) +{ + (void) unused; +} + +void not_serialized_callback(char * unused) +{ + (void) unused; +} + +void not_serialized_shared_ptr_callback(std::shared_ptr unused) +{ + (void) unused; +} + +void not_serialized_unique_ptr_callback( + test_msgs::msg::Empty::UniquePtrWithDeleter, + test_msgs::msg::Empty>> unused) +{ + (void) unused; +} + +TEST(TestSubscriptionTraits, is_serialized_callback) { + // Test regular functions + auto cb1 = &serialized_callback_copy; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == true, + "rcl_serialized_message_t in a first argument callback makes it a serialized callback"); + + auto cb2 = &serialized_callback_shared_ptr; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == true, + "std::shared_ptr in a callback makes it a serialized callback"); + + auto cb3 = ¬_serialized_callback; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == false, + "passing a char * is not a serialized callback"); + + auto cb4 = ¬_serialized_shared_ptr_callback; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == false, + "passing a std::shared_tr is not a serialized callback"); + + auto cb5 = [](rcl_serialized_message_t unused) -> void + { + (void) unused; + }; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == true, + "rcl_serialized_message_t in a first argument callback makes it a serialized callback"); + + using MessageT = test_msgs::msg::Empty; + using MessageTAllocator = std::allocator; + using MessageTDeallocator = rclcpp::allocator::Deleter; + auto cb6 = [](MessageT::UniquePtrWithDeleter unique_msg_ptr) -> void + { + (void) unique_msg_ptr; + }; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == false, + "passing a std::unique_ptr of test_msgs::msg::Empty is not a serialized callback"); + + auto cb7 = ¬_serialized_unique_ptr_callback; + static_assert( + rclcpp::subscription_traits::is_serialized_callback::value == false, + "passing a fancy unique_ptr of test_msgs::msg::Empty is not a serialized callback"); +} + +TEST(TestSubscriptionTraits, callback_messages) { + static_assert( + std::is_same< + std::shared_ptr, + rclcpp::function_traits::function_traits< + decltype(not_serialized_shared_ptr_callback) + >::template argument_type<0> + >::value, "wrong!"); + + static_assert( + std::is_same< + char, + rclcpp::subscription_traits::extract_message_type< + rclcpp::function_traits::function_traits< + decltype(not_serialized_shared_ptr_callback) + >::template argument_type<0> + >::type + >::value, "wrong!"); + + auto cb1 = &serialized_callback_copy; + static_assert( + std::is_same< + rcl_serialized_message_t, + rclcpp::subscription_traits::has_message_type::type>::value, + "serialized callback message type is rcl_serialized_message_t"); + + auto cb2 = &serialized_callback_shared_ptr; + static_assert( + std::is_same< + rcl_serialized_message_t, + rclcpp::subscription_traits::has_message_type::type>::value, + "serialized callback message type is rcl_serialized_message_t"); + + auto cb3 = ¬_serialized_callback; + static_assert( + std::is_same< + char *, + rclcpp::subscription_traits::has_message_type::type>::value, + "not serialized callback message type is char"); + + auto cb4 = ¬_serialized_shared_ptr_callback; + static_assert( + std::is_same< + char, + rclcpp::subscription_traits::has_message_type::type>::value, + "not serialized shared_ptr callback message type is std::shared_ptr"); + + auto cb5 = [](rcl_serialized_message_t unused) -> void + { + (void) unused; + }; + static_assert( + std::is_same< + rcl_serialized_message_t, + rclcpp::subscription_traits::has_message_type::type>::value, + "serialized callback message type is rcl_serialized_message_t"); + + using MessageT = test_msgs::msg::Empty; + using MessageTAllocator = std::allocator; + using MessageTDeallocator = rclcpp::allocator::Deleter; + auto cb6 = [](std::unique_ptr unique_msg_ptr) -> void + { + (void) unique_msg_ptr; + }; + static_assert( + std::is_same< + test_msgs::msg::Empty, + rclcpp::subscription_traits::has_message_type::type>::value, + "passing a std::unique_ptr of test_msgs::msg::Empty has message type Empty"); + + auto cb7 = ¬_serialized_unique_ptr_callback; + static_assert( + std::is_same< + test_msgs::msg::Empty, + rclcpp::subscription_traits::has_message_type::type>::value, + "passing a fancy std::unique_ptr of test_msgs::msg::Empty has message type Empty"); +} diff --git a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp index b3c947e..9fbe83e 100644 --- a/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp +++ b/rclcpp_lifecycle/src/lifecycle_node_interface_impl.hpp @@ -297,11 +297,14 @@ public: return RCL_RET_ERROR; } + constexpr bool publish_update = true; // keep the initial state to pass to a transition callback State initial_state(state_machine_.current_state); uint8_t transition_id = lifecycle_transition; - if (rcl_lifecycle_trigger_transition(&state_machine_, transition_id, true) != RCL_RET_OK) { + if (rcl_lifecycle_trigger_transition( + &state_machine_, transition_id, publish_update) != RCL_RET_OK) + { RCUTILS_LOG_ERROR("Unable to start transition %u from current state %s: %s", transition_id, state_machine_.current_state->label, rcl_get_error_string_safe()) return RCL_RET_ERROR; @@ -311,7 +314,7 @@ public: state_machine_.current_state->id, initial_state); if (rcl_lifecycle_trigger_transition( - &state_machine_, cb_return_code, true) != RCL_RET_OK) + &state_machine_, cb_return_code, publish_update) != RCL_RET_OK) { RCUTILS_LOG_ERROR("Failed to finish transition %u. Current state is now: %s", transition_id, state_machine_.current_state->label) @@ -326,13 +329,17 @@ public: state_machine_.current_state->id, initial_state); if (error_resolved == lifecycle_msgs::msg::Transition::TRANSITION_CALLBACK_SUCCESS) { // We call cleanup on the error state - if (rcl_lifecycle_trigger_transition(&state_machine_, error_resolved, true) != RCL_RET_OK) { + if (rcl_lifecycle_trigger_transition( + &state_machine_, error_resolved, publish_update) != RCL_RET_OK) + { RCUTILS_LOG_ERROR("Failed to call cleanup on error state") return RCL_RET_ERROR; } } else { // We call shutdown on the error state - if (rcl_lifecycle_trigger_transition(&state_machine_, error_resolved, true) != RCL_RET_OK) { + if (rcl_lifecycle_trigger_transition( + &state_machine_, error_resolved, publish_update) != RCL_RET_OK) + { RCUTILS_LOG_ERROR("Failed to call cleanup on error state") return RCL_RET_ERROR; }