Zero copy api (#864)

* loaned message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

wip

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

loaned message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* use publisher allocator

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* use unique_ptr inside LoanedMessage

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* can_loan_messages for subscription_base

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* use correct rcl_publish function

* default move constructor not allowed in gcc

Signed-off-by: Knese Karsten (CR/RTC-HMI4) <karsten.knese@us.bosch.com>

*  remove get_instance and make LoanedMessage constructor public

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* some more api doc

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* rebase ontop of master

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* warn when not being able to loan message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* first draft loaned_message_sequence

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* rmw_take_loaned_message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* address review comments

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* introduce rmw_publish_loaned_message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* const correct publish

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* placement new

Signed-off-by: Knese Karsten <karsten@openrobotics.org>

* disable deleter for callback

Signed-off-by: Knese Karsten <karsten@openrobotics.org>

* print info once

Signed-off-by: Knese Karsten <karsten@openrobotics.org>

* uncrustify

Signed-off-by: Karsten Knese <karsten@openrobotics.org>
This commit is contained in:
Karsten Knese 2019-10-18 14:51:24 -07:00 committed by GitHub
parent 658f207dd8
commit ed0bd16e31
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 460 additions and 8 deletions

View file

@ -216,6 +216,13 @@ if(BUILD_TESTING)
"rosidl_typesupport_cpp"
)
endif()
ament_add_gtest(test_loaned_message test/test_loaned_message.cpp)
ament_target_dependencies(test_loaned_message
"test_msgs"
)
target_link_libraries(test_loaned_message ${PROJECT_NAME})
ament_add_gtest(test_node test/test_node.cpp)
if(TARGET test_node)
ament_target_dependencies(test_node

View file

@ -0,0 +1,206 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RCLCPP__LOANED_MESSAGE_HPP_
#define RCLCPP__LOANED_MESSAGE_HPP_
#include <memory>
#include <utility>
#include "rclcpp/logging.hpp"
#include "rclcpp/publisher_base.hpp"
#include "rcl/allocator.h"
#include "rcl/publisher.h"
namespace rclcpp
{
template<typename MessageT, typename AllocatorT = std::allocator<void>>
class LoanedMessage
{
using MessageAllocatorTraits = allocator::AllocRebind<MessageT, AllocatorT>;
using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
public:
/// Constructor of the LoanedMessage class.
/**
* The constructor of this class allocates memory for a given message type
* and associates this with a given publisher.
*
* Given the publisher instance, a case differentiation is being performaned
* which decides whether the underlying middleware is able to allocate the appropriate
* memory for this message type or not.
* In the case that the middleware can not loan messages, the passed in allocator instance
* is being used to allocate the message within the scope of this class.
* Otherwise, the allocator is being ignored and the allocation is solely performaned
* in the underlying middleware with its appropriate allocation strategy.
* The need for this arises as the user code can be written explicitly targeting a middleware
* capable of loaning messages.
* However, this user code is ought to be usable even when dynamically linked against
* a middleware which doesn't support message loaning in which case the allocator will be used.
*
* \param pub rclcpp::Publisher instance to which the memory belongs
* \param allocator Allocator instance in case middleware can not allocate messages
*/
LoanedMessage(
const rclcpp::PublisherBase & pub,
std::allocator<MessageT> allocator)
: pub_(pub),
message_(nullptr),
message_allocator_(std::move(allocator))
{
if (pub_.can_loan_messages()) {
void * message_ptr = nullptr;
auto ret = rcl_borrow_loaned_message(
pub_.get_publisher_handle(),
rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
&message_ptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
message_ = static_cast<MessageT *>(message_ptr);
} else {
RCLCPP_INFO_ONCE(
rclcpp::get_logger("rclcpp"),
"Currently used middleware can't loan messages. Local allocator will be used.");
message_ = message_allocator_.allocate(1);
new (message_) MessageT();
}
}
/// Constructor of the LoanedMessage class.
/**
* The constructor of this class allocates memory for a given message type
* and associates this with a given publisher.
*
* Given the publisher instance, a case differentiation is being performaned
* which decides whether the underlying middleware is able to allocate the appropriate
* memory for this message type or not.
* In the case that the middleware can not loan messages, the passed in allocator instance
* is being used to allocate the message within the scope of this class.
* Otherwise, the allocator is being ignored and the allocation is solely performaned
* in the underlying middleware with its appropriate allocation strategy.
* The need for this arises as the user code can be written explicitly targeting a middleware
* capable of loaning messages.
* However, this user code is ought to be usable even when dynamically linked against
* a middleware which doesn't support message loaning in which case the allocator will be used.
*
* \param pub rclcpp::Publisher instance to which the memory belongs
* \param allocator Allocator instance in case middleware can not allocate messages
*/
LoanedMessage(
const rclcpp::PublisherBase * pub,
std::shared_ptr<std::allocator<MessageT>> allocator)
: LoanedMessage(*pub, *allocator)
{}
/// Move semantic for RVO
LoanedMessage(LoanedMessage<MessageT> && other)
: pub_(std::move(other.pub_)),
message_(std::move(other.message_)),
message_allocator_(std::move(other.message_allocator_))
{}
/// Destructor of the LoanedMessage class.
/**
* The destructor has the explicit task to return the allocated memory for its message
* instance.
* If the message was previously allocated via the middleware, the message is getting
* returned to the middleware to cleanly destroy the allocation.
* In the case that the local allocator instance was used, the same instance is then
* being used to destroy the allocated memory.
*
* The contract here is that the memory for this message is valid as long as this instance
* of the LoanedMessage class is alive.
*/
virtual ~LoanedMessage()
{
auto error_logger = rclcpp::get_logger("LoanedMessage");
if (message_ == nullptr) {
return;
}
if (pub_.can_loan_messages()) {
// return allocated memory to the middleware
auto ret =
rcl_return_loaned_message(pub_.get_publisher_handle(), message_);
if (ret != RCL_RET_OK) {
RCLCPP_ERROR(
error_logger, "rcl_deallocate_loaned_message failed: %s", rcl_get_error_string().str);
rcl_reset_error();
}
} else {
// call destructor before deallocating
message_->~MessageT();
message_allocator_.deallocate(message_, 1);
}
message_ = nullptr;
}
/// Validate if the message was correctly allocated.
/**
* The allocated memory might not be always consistent and valid.
* Reasons why this could fail is that an allocation step was failing,
* e.g. just like malloc could fail or a maximum amount of previously allocated
* messages is exceeded in which case the loaned messages have to be returned
* to the middleware prior to be able to allocate a new one.
*/
bool is_valid() const
{
return message_ != nullptr;
}
/// Access the ROS message instance.
/**
* A call to `get()` will return a mutable reference to the underlying ROS message instance.
* This allows a user to modify the content of the message prior to publishing it.
*
* If this reference is copied, the memory for this copy is no longer managed
* by the LoanedMessage instance and has to be cleanup individually.
*/
MessageT & get() const
{
return *message_;
}
/// Release ownership of the ROS message instance.
/**
* A call to `release()` will unmanage the memory for the ROS message.
* That means that the destructor of this class will not free the memory on scope exit.
*
* \return Raw pointer to the message instance.
*/
MessageT * release()
{
auto msg = message_;
message_ = nullptr;
return msg;
}
protected:
const rclcpp::PublisherBase & pub_;
MessageT * message_;
MessageAllocator message_allocator_;
/// Deleted copy constructor to preserve memory integrity.
LoanedMessage(const LoanedMessage<MessageT> & other) = delete;
};
} // namespace rclcpp
#endif // RCLCPP__LOANED_MESSAGE_HPP_

View file

@ -34,6 +34,7 @@
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/detail/resolve_use_intra_process.hpp"
#include "rclcpp/intra_process_manager.hpp"
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/publisher_base.hpp"
@ -44,6 +45,9 @@
namespace rclcpp
{
template<typename MessageT, typename AllocatorT>
class LoanedMessage;
/// A publisher publishes messages of any type to a topic.
template<typename MessageT, typename AllocatorT = std::allocator<void>>
class Publisher : public PublisherBase
@ -132,6 +136,26 @@ public:
>::make_shared(size, this->get_allocator());
}
/// Loan memory for a ROS message from the middleware
/**
* If the middleware is capable of loaning memory for a ROS message instance,
* the loaned message will be directly allocated in the middleware.
* If not, the message allocator of this rclcpp::Publisher instance is being used.
*
* With a call to \sa `publish` the LoanedMessage instance is being returned to the middleware
* or free'd accordingly to the allocator.
* If the message is not being published but processed differently, the message has to be
* returned by calling \sa `return_loaned_message`.
* \sa rclcpp::LoanedMessage for details of the LoanedMessage class.
*
* \return LoanedMessage containing memory for a ROS message of type MessageT
*/
rclcpp::LoanedMessage<MessageT, AllocatorT>
loan_message()
{
return rclcpp::LoanedMessage<MessageT, AllocatorT>(this, this->get_allocator());
}
/// Send a message to the topic for this publisher.
/**
* This function is templated on the input message type, MessageT.
@ -191,6 +215,41 @@ public:
return this->do_serialized_publish(&serialized_msg);
}
/// Publish an instance of a LoanedMessage
/**
* When publishing a loaned message, the memory for this ROS instance will be deallocated
* after being published.
* The instance of the loaned message is no longer valid after this call.
*
* \param loaned_msg The LoanedMessage instance to be published.
*/
void
publish(rclcpp::LoanedMessage<MessageT, AllocatorT> && loaned_msg)
{
if (!loaned_msg.is_valid()) {
throw std::runtime_error("loaned message is not valid");
}
if (intra_process_is_enabled_) {
// TODO(Karsten1987): support loaned message passed by intraprocess
throw std::runtime_error("storing loaned messages in intra process is not supported yet");
}
// verify that publisher supports loaned messages
// TODO(Karsten1987): This case separation has to be done in rclcpp
// otherwise we have to ensure that every middleware implements
// `rmw_publish_loaned_message` explicitly the same way as `rmw_publish`
// by taking a copy of the ros message.
if (this->can_loan_messages()) {
// we release the ownership from the rclpp::LoanedMessage instance
// and let the middleware clean up the memory.
this->do_loaned_message_publish(loaned_msg.release());
} else {
// we don't release the ownership, let the middleware copy the ros message
// and thus the destructor of rclcpp::LoanedMessage cleans up the memory.
this->do_inter_process_publish(&loaned_msg.get());
}
}
std::shared_ptr<MessageAllocator>
get_allocator() const
{
@ -202,6 +261,7 @@ protected:
do_inter_process_publish(const MessageT * msg)
{
auto status = rcl_publish(&publisher_handle_, msg, nullptr);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
@ -252,6 +312,26 @@ protected:
}
}
void
do_loaned_message_publish(MessageT * msg)
{
auto status = rcl_publish_loaned_message(&publisher_handle_, msg, nullptr);
if (RCL_RET_PUBLISHER_INVALID == status) {
rcl_reset_error(); // next call will reset error message if not context
if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
if (nullptr != context && !rcl_context_is_valid(context)) {
// publisher is invalid due to context being shutdown
return;
}
}
}
if (RCL_RET_OK != status) {
rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
}
}
uint64_t
store_intra_process_message(
uint64_t publisher_id,

View file

@ -161,6 +161,15 @@ public:
rclcpp::QoS
get_actual_qos() const;
/// Check if publisher instance can loan messages.
/**
* Depending on the middleware and the message type, this will return true if the middleware
* can allocate a ROS message instance.
*/
RCLCPP_PUBLIC
bool
can_loan_messages() const;
/// Compare this publisher to a gid.
/**
* Note that this function calls the next function.

View file

@ -151,7 +151,7 @@ public:
message_memory_strategy_ = message_memory_strategy;
}
std::shared_ptr<void> create_message()
std::shared_ptr<void> create_message() override
{
/* The default message memory strategy provides a dynamically allocated message on each call to
* create_message, though alternative memory strategies that re-use a preallocated message may be
@ -160,12 +160,13 @@ public:
return message_memory_strategy_->borrow_message();
}
std::shared_ptr<rcl_serialized_message_t> create_serialized_message()
std::shared_ptr<rcl_serialized_message_t> create_serialized_message() override
{
return message_memory_strategy_->borrow_serialized_message();
}
void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
void handle_message(
std::shared_ptr<void> & message, const rmw_message_info_t & message_info) override
{
if (matches_any_intra_process_publishers(&message_info.publisher_gid)) {
// In this case, the message will be delivered via intra process and
@ -176,22 +177,33 @@ public:
any_callback_.dispatch(typed_message, message_info);
}
/// Return the loaned message.
void
handle_loaned_message(
void * loaned_message, const rmw_message_info_t & message_info) override
{
auto typed_message = static_cast<CallbackMessageT *>(loaned_message);
// message is loaned, so we have to make sure that the deleter does not deallocate the message
auto sptr = std::shared_ptr<CallbackMessageT>(
typed_message, [](CallbackMessageT * msg) {(void) msg;});
any_callback_.dispatch(sptr, message_info);
}
/// Return the borrowed message.
/** \param message message to be returned */
void return_message(std::shared_ptr<void> & message)
void return_message(std::shared_ptr<void> & message) override
{
auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
message_memory_strategy_->return_message(typed_message);
}
void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message)
void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) override
{
message_memory_strategy_->return_serialized_message(message);
}
void handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info)
const rmw_message_info_t & message_info) override
{
if (!use_intra_process_) {
// throw std::runtime_error(
@ -244,7 +256,7 @@ public:
/// Implemenation detail.
const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const
get_intra_process_subscription_handle() const override
{
if (!use_intra_process_) {
return nullptr;

View file

@ -138,6 +138,12 @@ public:
void
handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
// TODO(karsten1987): Does it make sense to pass in a weak_ptr?
RCLCPP_PUBLIC
virtual
void
handle_loaned_message(void * loaned_message, const rmw_message_info_t & message_info) = 0;
/// Return the message borrowed in create_message.
/** \param[in] message Shared pointer to the returned message. */
RCLCPP_PUBLIC
@ -173,6 +179,17 @@ public:
size_t
get_publisher_count() const;
/// Check if subscription instance can loan messages.
/**
* Depending on the middleware and the message type, this will return true if the middleware
* can allocate a ROS message instance.
*
* \return boolean flag indicating if middleware can loan messages.
*/
RCLCPP_PUBLIC
bool
can_loan_messages() const;
using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;

View file

@ -331,6 +331,32 @@ Executor::execute_subscription(
rcl_reset_error();
}
subscription->return_serialized_message(serialized_msg);
} else if (subscription->can_loan_messages()) {
void * loaned_msg = nullptr;
auto ret = rcl_take_loaned_message(
subscription->get_subscription_handle().get(),
&loaned_msg,
&message_info,
nullptr);
if (RCL_RET_OK == ret) {
subscription->handle_loaned_message(loaned_msg, message_info);
} else if (RCL_RET_SUBSCRIPTION_TAKE_FAILED != ret) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"take_loaned failed for subscription on topic '%s': %s",
subscription->get_topic_name(), rcl_get_error_string().str);
rcl_reset_error();
}
ret = rcl_release_loaned_message(
subscription->get_subscription_handle().get(),
loaned_msg);
if (RCL_RET_OK != ret) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"release_loaned failed for subscription on topic '%s': %s",
subscription->get_topic_name(), rcl_get_error_string().str);
}
loaned_msg = nullptr;
} else {
std::shared_ptr<void> message = subscription->create_message();
auto ret = rcl_take(

View file

@ -224,6 +224,12 @@ PublisherBase::assert_liveliness() const
return RCL_RET_OK == rcl_publisher_assert_liveliness(&publisher_handle_);
}
bool
PublisherBase::can_loan_messages() const
{
return rcl_publisher_can_loan_messages(&publisher_handle_);
}
bool
PublisherBase::operator==(const rmw_gid_t & gid) const
{

View file

@ -200,3 +200,9 @@ SubscriptionBase::setup_intra_process(
weak_ipm_ = weak_ipm;
use_intra_process_ = true;
}
bool
SubscriptionBase::can_loan_messages() const
{
return rcl_subscription_can_loan_messages(subscription_handle_.get());
}

View file

@ -0,0 +1,83 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <memory>
#include "rclcpp/loaned_message.hpp"
#include "rclcpp/rclcpp.hpp"
#include "test_msgs/msg/basic_types.hpp"
using MessageT = test_msgs::msg::BasicTypes;
using LoanedMessageT = rclcpp::LoanedMessage<MessageT>;
class TestLoanedMessage : public ::testing::Test
{
protected:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}
static void TearDownTestCase()
{
rclcpp::shutdown();
}
};
TEST_F(TestLoanedMessage, initialize) {
auto node = std::make_shared<rclcpp::Node>("loaned_message_test_node");
auto pub = node->create_publisher<MessageT>("loaned_message_test_topic", 1);
auto loaned_msg = rclcpp::LoanedMessage<MessageT>(pub.get(), pub->get_allocator());
ASSERT_TRUE(loaned_msg.is_valid());
loaned_msg.get().float32_value = 42.0f;
ASSERT_EQ(42.0f, loaned_msg.get().float32_value);
SUCCEED();
}
TEST_F(TestLoanedMessage, loan_from_pub) {
auto node = std::make_shared<rclcpp::Node>("loaned_message_test_node");
auto pub = node->create_publisher<MessageT>("loaned_message_test_topic", 1);
auto loaned_msg = pub->loan_message();
ASSERT_TRUE(loaned_msg.is_valid());
loaned_msg.get().float64_value = 42.0f;
ASSERT_EQ(42.0f, loaned_msg.get().float64_value);
SUCCEED();
}
TEST_F(TestLoanedMessage, release) {
auto node = std::make_shared<rclcpp::Node>("loaned_message_test_node");
auto pub = node->create_publisher<MessageT>("loaned_message_test_topic", 1);
MessageT * msg = nullptr;
{
auto loaned_msg = pub->loan_message();
ASSERT_TRUE(loaned_msg.is_valid());
loaned_msg.get().float64_value = 42.0f;
ASSERT_EQ(42.0f, loaned_msg.get().float64_value);
msg = loaned_msg.release();
// call destructor implicitly.
// destructor not allowed to free memory because of not having ownership
// of the data after a call to release.
}
ASSERT_EQ(42.0f, msg->float64_value);
SUCCEED();
}