From ed0bd16e318f8736227e2121119bc418c58d56ba Mon Sep 17 00:00:00 2001 From: Karsten Knese Date: Fri, 18 Oct 2019 14:51:24 -0700 Subject: [PATCH] Zero copy api (#864) * loaned message Signed-off-by: Karsten Knese wip Signed-off-by: Karsten Knese loaned message Signed-off-by: Karsten Knese * use publisher allocator Signed-off-by: Karsten Knese * use unique_ptr inside LoanedMessage Signed-off-by: Karsten Knese * can_loan_messages for subscription_base Signed-off-by: Karsten Knese * use correct rcl_publish function * default move constructor not allowed in gcc Signed-off-by: Knese Karsten (CR/RTC-HMI4) * remove get_instance and make LoanedMessage constructor public Signed-off-by: Karsten Knese * some more api doc Signed-off-by: Karsten Knese * rebase ontop of master Signed-off-by: Karsten Knese * warn when not being able to loan message Signed-off-by: Karsten Knese * first draft loaned_message_sequence Signed-off-by: Karsten Knese * rmw_take_loaned_message Signed-off-by: Karsten Knese * address review comments Signed-off-by: Karsten Knese * introduce rmw_publish_loaned_message Signed-off-by: Karsten Knese * const correct publish Signed-off-by: Karsten Knese * placement new Signed-off-by: Knese Karsten * disable deleter for callback Signed-off-by: Knese Karsten * print info once Signed-off-by: Knese Karsten * uncrustify Signed-off-by: Karsten Knese --- rclcpp/CMakeLists.txt | 7 + rclcpp/include/rclcpp/loaned_message.hpp | 206 ++++++++++++++++++++ rclcpp/include/rclcpp/publisher.hpp | 80 ++++++++ rclcpp/include/rclcpp/publisher_base.hpp | 9 + rclcpp/include/rclcpp/subscription.hpp | 28 ++- rclcpp/include/rclcpp/subscription_base.hpp | 17 ++ rclcpp/src/rclcpp/executor.cpp | 26 +++ rclcpp/src/rclcpp/publisher_base.cpp | 6 + rclcpp/src/rclcpp/subscription_base.cpp | 6 + rclcpp/test/test_loaned_message.cpp | 83 ++++++++ 10 files changed, 460 insertions(+), 8 deletions(-) create mode 100644 rclcpp/include/rclcpp/loaned_message.hpp create mode 100644 rclcpp/test/test_loaned_message.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 0fa5b97..15f9c31 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -216,6 +216,13 @@ if(BUILD_TESTING) "rosidl_typesupport_cpp" ) endif() + + ament_add_gtest(test_loaned_message test/test_loaned_message.cpp) + ament_target_dependencies(test_loaned_message + "test_msgs" + ) + target_link_libraries(test_loaned_message ${PROJECT_NAME}) + ament_add_gtest(test_node test/test_node.cpp) if(TARGET test_node) ament_target_dependencies(test_node diff --git a/rclcpp/include/rclcpp/loaned_message.hpp b/rclcpp/include/rclcpp/loaned_message.hpp new file mode 100644 index 0000000..6f3226a --- /dev/null +++ b/rclcpp/include/rclcpp/loaned_message.hpp @@ -0,0 +1,206 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__LOANED_MESSAGE_HPP_ +#define RCLCPP__LOANED_MESSAGE_HPP_ + +#include +#include + +#include "rclcpp/logging.hpp" +#include "rclcpp/publisher_base.hpp" + +#include "rcl/allocator.h" +#include "rcl/publisher.h" + +namespace rclcpp +{ + +template> +class LoanedMessage +{ + using MessageAllocatorTraits = allocator::AllocRebind; + using MessageAllocator = typename MessageAllocatorTraits::allocator_type; + +public: + /// Constructor of the LoanedMessage class. + /** + * The constructor of this class allocates memory for a given message type + * and associates this with a given publisher. + * + * Given the publisher instance, a case differentiation is being performaned + * which decides whether the underlying middleware is able to allocate the appropriate + * memory for this message type or not. + * In the case that the middleware can not loan messages, the passed in allocator instance + * is being used to allocate the message within the scope of this class. + * Otherwise, the allocator is being ignored and the allocation is solely performaned + * in the underlying middleware with its appropriate allocation strategy. + * The need for this arises as the user code can be written explicitly targeting a middleware + * capable of loaning messages. + * However, this user code is ought to be usable even when dynamically linked against + * a middleware which doesn't support message loaning in which case the allocator will be used. + * + * \param pub rclcpp::Publisher instance to which the memory belongs + * \param allocator Allocator instance in case middleware can not allocate messages + */ + LoanedMessage( + const rclcpp::PublisherBase & pub, + std::allocator allocator) + : pub_(pub), + message_(nullptr), + message_allocator_(std::move(allocator)) + { + if (pub_.can_loan_messages()) { + void * message_ptr = nullptr; + auto ret = rcl_borrow_loaned_message( + pub_.get_publisher_handle(), + rosidl_typesupport_cpp::get_message_type_support_handle(), + &message_ptr); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret); + } + message_ = static_cast(message_ptr); + } else { + RCLCPP_INFO_ONCE( + rclcpp::get_logger("rclcpp"), + "Currently used middleware can't loan messages. Local allocator will be used."); + message_ = message_allocator_.allocate(1); + new (message_) MessageT(); + } + } + + /// Constructor of the LoanedMessage class. + /** + * The constructor of this class allocates memory for a given message type + * and associates this with a given publisher. + * + * Given the publisher instance, a case differentiation is being performaned + * which decides whether the underlying middleware is able to allocate the appropriate + * memory for this message type or not. + * In the case that the middleware can not loan messages, the passed in allocator instance + * is being used to allocate the message within the scope of this class. + * Otherwise, the allocator is being ignored and the allocation is solely performaned + * in the underlying middleware with its appropriate allocation strategy. + * The need for this arises as the user code can be written explicitly targeting a middleware + * capable of loaning messages. + * However, this user code is ought to be usable even when dynamically linked against + * a middleware which doesn't support message loaning in which case the allocator will be used. + * + * \param pub rclcpp::Publisher instance to which the memory belongs + * \param allocator Allocator instance in case middleware can not allocate messages + */ + LoanedMessage( + const rclcpp::PublisherBase * pub, + std::shared_ptr> allocator) + : LoanedMessage(*pub, *allocator) + {} + + /// Move semantic for RVO + LoanedMessage(LoanedMessage && other) + : pub_(std::move(other.pub_)), + message_(std::move(other.message_)), + message_allocator_(std::move(other.message_allocator_)) + {} + + /// Destructor of the LoanedMessage class. + /** + * The destructor has the explicit task to return the allocated memory for its message + * instance. + * If the message was previously allocated via the middleware, the message is getting + * returned to the middleware to cleanly destroy the allocation. + * In the case that the local allocator instance was used, the same instance is then + * being used to destroy the allocated memory. + * + * The contract here is that the memory for this message is valid as long as this instance + * of the LoanedMessage class is alive. + */ + virtual ~LoanedMessage() + { + auto error_logger = rclcpp::get_logger("LoanedMessage"); + + if (message_ == nullptr) { + return; + } + + if (pub_.can_loan_messages()) { + // return allocated memory to the middleware + auto ret = + rcl_return_loaned_message(pub_.get_publisher_handle(), message_); + if (ret != RCL_RET_OK) { + RCLCPP_ERROR( + error_logger, "rcl_deallocate_loaned_message failed: %s", rcl_get_error_string().str); + rcl_reset_error(); + } + } else { + // call destructor before deallocating + message_->~MessageT(); + message_allocator_.deallocate(message_, 1); + } + message_ = nullptr; + } + + /// Validate if the message was correctly allocated. + /** + * The allocated memory might not be always consistent and valid. + * Reasons why this could fail is that an allocation step was failing, + * e.g. just like malloc could fail or a maximum amount of previously allocated + * messages is exceeded in which case the loaned messages have to be returned + * to the middleware prior to be able to allocate a new one. + */ + bool is_valid() const + { + return message_ != nullptr; + } + + /// Access the ROS message instance. + /** + * A call to `get()` will return a mutable reference to the underlying ROS message instance. + * This allows a user to modify the content of the message prior to publishing it. + * + * If this reference is copied, the memory for this copy is no longer managed + * by the LoanedMessage instance and has to be cleanup individually. + */ + MessageT & get() const + { + return *message_; + } + + /// Release ownership of the ROS message instance. + /** + * A call to `release()` will unmanage the memory for the ROS message. + * That means that the destructor of this class will not free the memory on scope exit. + * + * \return Raw pointer to the message instance. + */ + MessageT * release() + { + auto msg = message_; + message_ = nullptr; + return msg; + } + +protected: + const rclcpp::PublisherBase & pub_; + + MessageT * message_; + + MessageAllocator message_allocator_; + + /// Deleted copy constructor to preserve memory integrity. + LoanedMessage(const LoanedMessage & other) = delete; +}; + +} // namespace rclcpp + +#endif // RCLCPP__LOANED_MESSAGE_HPP_ diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index e9df937..9bde46e 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -34,6 +34,7 @@ #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/detail/resolve_use_intra_process.hpp" #include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/loaned_message.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/publisher_base.hpp" @@ -44,6 +45,9 @@ namespace rclcpp { +template +class LoanedMessage; + /// A publisher publishes messages of any type to a topic. template> class Publisher : public PublisherBase @@ -132,6 +136,26 @@ public: >::make_shared(size, this->get_allocator()); } + /// Loan memory for a ROS message from the middleware + /** + * If the middleware is capable of loaning memory for a ROS message instance, + * the loaned message will be directly allocated in the middleware. + * If not, the message allocator of this rclcpp::Publisher instance is being used. + * + * With a call to \sa `publish` the LoanedMessage instance is being returned to the middleware + * or free'd accordingly to the allocator. + * If the message is not being published but processed differently, the message has to be + * returned by calling \sa `return_loaned_message`. + * \sa rclcpp::LoanedMessage for details of the LoanedMessage class. + * + * \return LoanedMessage containing memory for a ROS message of type MessageT + */ + rclcpp::LoanedMessage + loan_message() + { + return rclcpp::LoanedMessage(this, this->get_allocator()); + } + /// Send a message to the topic for this publisher. /** * This function is templated on the input message type, MessageT. @@ -191,6 +215,41 @@ public: return this->do_serialized_publish(&serialized_msg); } + /// Publish an instance of a LoanedMessage + /** + * When publishing a loaned message, the memory for this ROS instance will be deallocated + * after being published. + * The instance of the loaned message is no longer valid after this call. + * + * \param loaned_msg The LoanedMessage instance to be published. + */ + void + publish(rclcpp::LoanedMessage && loaned_msg) + { + if (!loaned_msg.is_valid()) { + throw std::runtime_error("loaned message is not valid"); + } + if (intra_process_is_enabled_) { + // TODO(Karsten1987): support loaned message passed by intraprocess + throw std::runtime_error("storing loaned messages in intra process is not supported yet"); + } + + // verify that publisher supports loaned messages + // TODO(Karsten1987): This case separation has to be done in rclcpp + // otherwise we have to ensure that every middleware implements + // `rmw_publish_loaned_message` explicitly the same way as `rmw_publish` + // by taking a copy of the ros message. + if (this->can_loan_messages()) { + // we release the ownership from the rclpp::LoanedMessage instance + // and let the middleware clean up the memory. + this->do_loaned_message_publish(loaned_msg.release()); + } else { + // we don't release the ownership, let the middleware copy the ros message + // and thus the destructor of rclcpp::LoanedMessage cleans up the memory. + this->do_inter_process_publish(&loaned_msg.get()); + } + } + std::shared_ptr get_allocator() const { @@ -202,6 +261,7 @@ protected: do_inter_process_publish(const MessageT * msg) { auto status = rcl_publish(&publisher_handle_, msg, nullptr); + if (RCL_RET_PUBLISHER_INVALID == status) { rcl_reset_error(); // next call will reset error message if not context if (rcl_publisher_is_valid_except_context(&publisher_handle_)) { @@ -252,6 +312,26 @@ protected: } } + void + do_loaned_message_publish(MessageT * msg) + { + auto status = rcl_publish_loaned_message(&publisher_handle_, msg, nullptr); + + if (RCL_RET_PUBLISHER_INVALID == status) { + rcl_reset_error(); // next call will reset error message if not context + if (rcl_publisher_is_valid_except_context(&publisher_handle_)) { + rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_); + if (nullptr != context && !rcl_context_is_valid(context)) { + // publisher is invalid due to context being shutdown + return; + } + } + } + if (RCL_RET_OK != status) { + rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message"); + } + } + uint64_t store_intra_process_message( uint64_t publisher_id, diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index a3ac1d7..c5837f6 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -161,6 +161,15 @@ public: rclcpp::QoS get_actual_qos() const; + /// Check if publisher instance can loan messages. + /** + * Depending on the middleware and the message type, this will return true if the middleware + * can allocate a ROS message instance. + */ + RCLCPP_PUBLIC + bool + can_loan_messages() const; + /// Compare this publisher to a gid. /** * Note that this function calls the next function. diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 4be5284..5272b32 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -151,7 +151,7 @@ public: message_memory_strategy_ = message_memory_strategy; } - std::shared_ptr create_message() + std::shared_ptr create_message() override { /* The default message memory strategy provides a dynamically allocated message on each call to * create_message, though alternative memory strategies that re-use a preallocated message may be @@ -160,12 +160,13 @@ public: return message_memory_strategy_->borrow_message(); } - std::shared_ptr create_serialized_message() + std::shared_ptr create_serialized_message() override { return message_memory_strategy_->borrow_serialized_message(); } - void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) + void handle_message( + std::shared_ptr & message, const rmw_message_info_t & message_info) override { if (matches_any_intra_process_publishers(&message_info.publisher_gid)) { // In this case, the message will be delivered via intra process and @@ -176,22 +177,33 @@ public: any_callback_.dispatch(typed_message, message_info); } - /// Return the loaned message. + void + handle_loaned_message( + void * loaned_message, const rmw_message_info_t & message_info) override + { + auto typed_message = static_cast(loaned_message); + // message is loaned, so we have to make sure that the deleter does not deallocate the message + auto sptr = std::shared_ptr( + typed_message, [](CallbackMessageT * msg) {(void) msg;}); + any_callback_.dispatch(sptr, message_info); + } + + /// Return the borrowed message. /** \param message message to be returned */ - void return_message(std::shared_ptr & message) + void return_message(std::shared_ptr & message) override { auto typed_message = std::static_pointer_cast(message); message_memory_strategy_->return_message(typed_message); } - void return_serialized_message(std::shared_ptr & message) + void return_serialized_message(std::shared_ptr & message) override { message_memory_strategy_->return_serialized_message(message); } void handle_intra_process_message( rcl_interfaces::msg::IntraProcessMessage & ipm, - const rmw_message_info_t & message_info) + const rmw_message_info_t & message_info) override { if (!use_intra_process_) { // throw std::runtime_error( @@ -244,7 +256,7 @@ public: /// Implemenation detail. const std::shared_ptr - get_intra_process_subscription_handle() const + get_intra_process_subscription_handle() const override { if (!use_intra_process_) { return nullptr; diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 8cad2b7..f3df997 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -138,6 +138,12 @@ public: void handle_message(std::shared_ptr & message, const rmw_message_info_t & message_info) = 0; + // TODO(karsten1987): Does it make sense to pass in a weak_ptr? + RCLCPP_PUBLIC + virtual + void + handle_loaned_message(void * loaned_message, const rmw_message_info_t & message_info) = 0; + /// Return the message borrowed in create_message. /** \param[in] message Shared pointer to the returned message. */ RCLCPP_PUBLIC @@ -173,6 +179,17 @@ public: size_t get_publisher_count() const; + /// Check if subscription instance can loan messages. + /** + * Depending on the middleware and the message type, this will return true if the middleware + * can allocate a ROS message instance. + * + * \return boolean flag indicating if middleware can loan messages. + */ + RCLCPP_PUBLIC + bool + can_loan_messages() const; + using IntraProcessManagerWeakPtr = std::weak_ptr; diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index a00d8f3..48f8405 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -331,6 +331,32 @@ Executor::execute_subscription( rcl_reset_error(); } subscription->return_serialized_message(serialized_msg); + } else if (subscription->can_loan_messages()) { + void * loaned_msg = nullptr; + auto ret = rcl_take_loaned_message( + subscription->get_subscription_handle().get(), + &loaned_msg, + &message_info, + nullptr); + if (RCL_RET_OK == ret) { + subscription->handle_loaned_message(loaned_msg, message_info); + } else if (RCL_RET_SUBSCRIPTION_TAKE_FAILED != ret) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "take_loaned failed for subscription on topic '%s': %s", + subscription->get_topic_name(), rcl_get_error_string().str); + rcl_reset_error(); + } + ret = rcl_release_loaned_message( + subscription->get_subscription_handle().get(), + loaned_msg); + if (RCL_RET_OK != ret) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "release_loaned failed for subscription on topic '%s': %s", + subscription->get_topic_name(), rcl_get_error_string().str); + } + loaned_msg = nullptr; } else { std::shared_ptr message = subscription->create_message(); auto ret = rcl_take( diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index a5d142f..38d3995 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -224,6 +224,12 @@ PublisherBase::assert_liveliness() const return RCL_RET_OK == rcl_publisher_assert_liveliness(&publisher_handle_); } +bool +PublisherBase::can_loan_messages() const +{ + return rcl_publisher_can_loan_messages(&publisher_handle_); +} + bool PublisherBase::operator==(const rmw_gid_t & gid) const { diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 0cdd1f5..99558d2 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -200,3 +200,9 @@ SubscriptionBase::setup_intra_process( weak_ipm_ = weak_ipm; use_intra_process_ = true; } + +bool +SubscriptionBase::can_loan_messages() const +{ + return rcl_subscription_can_loan_messages(subscription_handle_.get()); +} diff --git a/rclcpp/test/test_loaned_message.cpp b/rclcpp/test/test_loaned_message.cpp new file mode 100644 index 0000000..726209a --- /dev/null +++ b/rclcpp/test/test_loaned_message.cpp @@ -0,0 +1,83 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rclcpp/loaned_message.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "test_msgs/msg/basic_types.hpp" + +using MessageT = test_msgs::msg::BasicTypes; +using LoanedMessageT = rclcpp::LoanedMessage; + +class TestLoanedMessage : public ::testing::Test +{ +protected: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } +}; + +TEST_F(TestLoanedMessage, initialize) { + auto node = std::make_shared("loaned_message_test_node"); + auto pub = node->create_publisher("loaned_message_test_topic", 1); + + auto loaned_msg = rclcpp::LoanedMessage(pub.get(), pub->get_allocator()); + ASSERT_TRUE(loaned_msg.is_valid()); + loaned_msg.get().float32_value = 42.0f; + ASSERT_EQ(42.0f, loaned_msg.get().float32_value); + + SUCCEED(); +} + +TEST_F(TestLoanedMessage, loan_from_pub) { + auto node = std::make_shared("loaned_message_test_node"); + auto pub = node->create_publisher("loaned_message_test_topic", 1); + + auto loaned_msg = pub->loan_message(); + ASSERT_TRUE(loaned_msg.is_valid()); + loaned_msg.get().float64_value = 42.0f; + ASSERT_EQ(42.0f, loaned_msg.get().float64_value); + + SUCCEED(); +} + +TEST_F(TestLoanedMessage, release) { + auto node = std::make_shared("loaned_message_test_node"); + auto pub = node->create_publisher("loaned_message_test_topic", 1); + + MessageT * msg = nullptr; + { + auto loaned_msg = pub->loan_message(); + ASSERT_TRUE(loaned_msg.is_valid()); + loaned_msg.get().float64_value = 42.0f; + ASSERT_EQ(42.0f, loaned_msg.get().float64_value); + msg = loaned_msg.release(); + // call destructor implicitly. + // destructor not allowed to free memory because of not having ownership + // of the data after a call to release. + } + + ASSERT_EQ(42.0f, msg->float64_value); + + SUCCEED(); +}