Use serialized message (#1081)

* use serialized message in callback

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* introduce resize method for serialized message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* introduce release for serialized message

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* address review comments

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* correct typo

Signed-off-by: Karsten Knese <karsten@openrobotics.org>

* fix interface traits test

Signed-off-by: Karsten Knese <karsten@openrobotics.org>
This commit is contained in:
Karsten Knese 2020-04-22 19:30:56 -07:00 committed by GitHub
parent bb91b6c2ef
commit 46cfe84b14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 89 additions and 46 deletions

View file

@ -46,10 +46,10 @@ public:
using MessageAlloc = typename MessageAllocTraits::allocator_type;
using MessageDeleter = allocator::Deleter<MessageAlloc, MessageT>;
using SerializedMessageAllocTraits = allocator::AllocRebind<rcl_serialized_message_t, Alloc>;
using SerializedMessageAllocTraits = allocator::AllocRebind<rclcpp::SerializedMessage, Alloc>;
using SerializedMessageAlloc = typename SerializedMessageAllocTraits::allocator_type;
using SerializedMessageDeleter =
allocator::Deleter<SerializedMessageAlloc, rcl_serialized_message_t>;
allocator::Deleter<SerializedMessageAlloc, rclcpp::SerializedMessage>;
using BufferAllocTraits = allocator::AllocRebind<char, Alloc>;
using BufferAlloc = typename BufferAllocTraits::allocator_type;
@ -86,31 +86,12 @@ public:
return std::allocate_shared<MessageT, MessageAlloc>(*message_allocator_.get());
}
virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message(size_t capacity)
virtual std::shared_ptr<rclcpp::SerializedMessage> borrow_serialized_message(size_t capacity)
{
auto msg = new rcl_serialized_message_t;
*msg = rmw_get_zero_initialized_serialized_message();
auto ret = rmw_serialized_message_init(msg, capacity, &rcutils_allocator_);
if (ret != RCL_RET_OK) {
rclcpp::exceptions::throw_from_rcl_error(ret);
return std::make_shared<rclcpp::SerializedMessage>(capacity);
}
auto serialized_msg = std::shared_ptr<rcl_serialized_message_t>(
msg,
[](rmw_serialized_message_t * msg) {
auto fini_ret = rmw_serialized_message_fini(msg);
delete msg;
if (fini_ret != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to destroy serialized message: %s", rcl_get_error_string().str);
}
});
return serialized_msg;
}
virtual std::shared_ptr<rcl_serialized_message_t> borrow_serialized_message()
virtual std::shared_ptr<rclcpp::SerializedMessage> borrow_serialized_message()
{
return borrow_serialized_message(default_buffer_capacity_);
}
@ -127,7 +108,8 @@ public:
msg.reset();
}
virtual void return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & serialized_msg)
virtual void return_serialized_message(
std::shared_ptr<rclcpp::SerializedMessage> & serialized_msg)
{
serialized_msg.reset();
}

View file

@ -217,6 +217,12 @@ public:
return this->do_serialized_publish(&serialized_msg);
}
void
publish(const SerializedMessage & serialized_msg)
{
return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message());
}
/// Publish an instance of a LoanedMessage.
/**
* When publishing a loaned message, the memory for this ROS message will be deallocated

View file

@ -98,6 +98,20 @@ public:
*/
size_t capacity() const;
/// Allocate memory in the data buffer
/**
* The data buffer of the underlying rcl_serialized_message_t will be resized.
* This might change the data layout and invalidates all pointers to the data.
*/
void reserve(size_t capacity);
/// Release the underlying rcl_serialized_message_t
/**
* The memory (i.e. the data buffer) of the serialized message will no longer
* be managed by this instance and the memory won't be deallocated on destruction.
*/
rcl_serialized_message_t release_rcl_serialized_message();
private:
rcl_serialized_message_t serialized_message_;
};

View file

@ -250,7 +250,7 @@ public:
return message_memory_strategy_->borrow_message();
}
std::shared_ptr<rcl_serialized_message_t>
std::shared_ptr<rclcpp::SerializedMessage>
create_serialized_message() override
{
return message_memory_strategy_->borrow_serialized_message();
@ -299,7 +299,7 @@ public:
}
void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) override
return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) override
{
message_memory_strategy_->return_serialized_message(message);
}

View file

@ -33,6 +33,7 @@
#include "rclcpp/message_info.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"
@ -151,7 +152,7 @@ public:
*/
RCLCPP_PUBLIC
bool
take_serialized(rcl_serialized_message_t & message_out, rclcpp::MessageInfo & message_info_out);
take_serialized(rclcpp::SerializedMessage & message_out, rclcpp::MessageInfo & message_info_out);
/// Borrow a new message.
/** \return Shared pointer to the fresh message. */
@ -164,7 +165,7 @@ public:
/** \return Shared pointer to a rcl_message_serialized_t. */
RCLCPP_PUBLIC
virtual
std::shared_ptr<rcl_serialized_message_t>
std::shared_ptr<rclcpp::SerializedMessage>
create_serialized_message() = 0;
/// Check if we need to handle the message, and execute the callback if we do.
@ -194,7 +195,7 @@ public:
RCLCPP_PUBLIC
virtual
void
return_serialized_message(std::shared_ptr<rcl_serialized_message_t> & message) = 0;
return_serialized_message(std::shared_ptr<rclcpp::SerializedMessage> & message) = 0;
RCLCPP_PUBLIC
const rosidl_message_type_support_t &

View file

@ -350,8 +350,7 @@ Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
if (subscription->is_serialized()) {
// This is the case where a copy of the serialized message is taken from
// the middleware via inter-process communication.
std::shared_ptr<rcl_serialized_message_t> serialized_msg =
subscription->create_serialized_message();
std::shared_ptr<SerializedMessage> serialized_msg = subscription->create_serialized_message();
take_and_do_error_handling(
"taking a serialized message from topic",
subscription->get_topic_name(),

View file

@ -144,4 +144,20 @@ size_t SerializedMessage::capacity() const
{
return serialized_message_.buffer_capacity;
}
void SerializedMessage::reserve(size_t capacity)
{
auto ret = rmw_serialized_message_resize(&serialized_message_, capacity);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret);
}
}
rcl_serialized_message_t SerializedMessage::release_rcl_serialized_message()
{
auto ret = serialized_message_;
serialized_message_ = rmw_get_zero_initialized_serialized_message();
return ret;
}
} // namespace rclcpp

View file

@ -159,12 +159,12 @@ SubscriptionBase::take_type_erased(void * message_out, rclcpp::MessageInfo & mes
bool
SubscriptionBase::take_serialized(
rcl_serialized_message_t & message_out,
rclcpp::SerializedMessage & message_out,
rclcpp::MessageInfo & message_info_out)
{
rcl_ret_t ret = rcl_take_serialized_message(
this->get_subscription_handle().get(),
&message_out,
&message_out.get_rcl_serialized_message(),
&message_info_out.get_rmw_message_info(),
nullptr);
if (RCL_RET_SUBSCRIPTION_TAKE_FAILED == ret) {

View file

@ -114,6 +114,31 @@ TEST(TestSerializedMessage, various_constructors_from_rcl) {
EXPECT_TRUE(nullptr != rcl_handle.buffer);
}
TEST(TestSerializedMessage, release) {
const std::string content = "Hello World";
const auto content_size = content.size() + 1; // accounting for null terminator
rcl_serialized_message_t released_handle = rmw_get_zero_initialized_serialized_message();
{
rclcpp::SerializedMessage serialized_msg(13);
// manually copy some content
auto & rcl_serialized_msg = serialized_msg.get_rcl_serialized_message();
std::memcpy(rcl_serialized_msg.buffer, content.c_str(), content.size());
rcl_serialized_msg.buffer[content.size()] = '\0';
rcl_serialized_msg.buffer_length = content_size;
EXPECT_EQ(13u, serialized_msg.capacity());
released_handle = serialized_msg.release_rcl_serialized_message();
// scope exit of serialized_msg
}
EXPECT_TRUE(nullptr != released_handle.buffer);
EXPECT_EQ(13u, released_handle.buffer_capacity);
EXPECT_EQ(content_size, released_handle.buffer_length);
// cleanup memory manually
EXPECT_EQ(RCL_RET_OK, rmw_serialized_message_fini(&released_handle));
}
TEST(TestSerializedMessage, serialization) {
using MessageT = test_msgs::msg::BasicTypes;

View file

@ -29,25 +29,25 @@ TEST(TestSerializedMessageAllocator, default_allocator) {
rclcpp::message_memory_strategy::MessageMemoryStrategy<DummyMessageT>::create_default();
auto msg0 = mem_strategy->borrow_serialized_message();
ASSERT_EQ(msg0->buffer_capacity, 0u);
ASSERT_EQ(msg0->capacity(), 0u);
mem_strategy->return_serialized_message(msg0);
auto msg100 = mem_strategy->borrow_serialized_message(100);
ASSERT_EQ(msg100->buffer_capacity, 100u);
ASSERT_EQ(msg100->capacity(), 100u);
mem_strategy->return_serialized_message(msg100);
auto msg200 = mem_strategy->borrow_serialized_message();
auto ret = rmw_serialized_message_resize(msg200.get(), 200);
auto ret = rmw_serialized_message_resize(&msg200->get_rcl_serialized_message(), 200);
ASSERT_EQ(RCL_RET_OK, ret);
EXPECT_EQ(0u, msg200->buffer_length);
EXPECT_EQ(200u, msg200->buffer_capacity);
EXPECT_EQ(0u, msg200->size());
EXPECT_EQ(200u, msg200->capacity());
mem_strategy->return_serialized_message(msg200);
auto msg1000 = mem_strategy->borrow_serialized_message(1000);
ASSERT_EQ(msg1000->buffer_capacity, 1000u);
ret = rmw_serialized_message_resize(msg1000.get(), 2000);
ASSERT_EQ(msg1000->capacity(), 1000u);
ret = rmw_serialized_message_resize(&msg1000->get_rcl_serialized_message(), 2000);
ASSERT_EQ(RCL_RET_OK, ret);
EXPECT_EQ(2000u, msg1000->buffer_capacity);
EXPECT_EQ(2000u, msg1000->capacity());
mem_strategy->return_serialized_message(msg1000);
}
@ -61,7 +61,7 @@ TEST(TestSerializedMessageAllocator, borrow_from_subscription) {
[](std::shared_ptr<test_msgs::msg::Empty> test_msg) {(void) test_msg;});
auto msg0 = sub->create_serialized_message();
EXPECT_EQ(0u, msg0->buffer_capacity);
EXPECT_EQ(0u, msg0->capacity());
sub->return_serialized_message(msg0);
rclcpp::shutdown();

View file

@ -299,10 +299,10 @@ TEST_F(TestSubscription, take) {
TEST_F(TestSubscription, take_serialized) {
initialize();
using test_msgs::msg::Empty;
auto do_nothing = [](std::shared_ptr<const rcl_serialized_message_t>) {FAIL();};
auto do_nothing = [](std::shared_ptr<const rclcpp::SerializedMessage>) {FAIL();};
{
auto sub = node->create_subscription<test_msgs::msg::Empty>("~/test_take", 1, do_nothing);
std::shared_ptr<rcl_serialized_message_t> msg = sub->create_serialized_message();
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
EXPECT_FALSE(sub->take_serialized(*msg, msg_info));
}
@ -317,7 +317,7 @@ TEST_F(TestSubscription, take_serialized) {
test_msgs::msg::Empty msg;
pub->publish(msg);
}
std::shared_ptr<rcl_serialized_message_t> msg = sub->create_serialized_message();
std::shared_ptr<rclcpp::SerializedMessage> msg = sub->create_serialized_message();
rclcpp::MessageInfo msg_info;
bool message_recieved = false;
auto start = std::chrono::steady_clock::now();