From 84cac5937b303e0dca3c9c1b8ffc8b73a2bd0197 Mon Sep 17 00:00:00 2001 From: Michael Carroll Date: Thu, 23 Apr 2020 19:13:59 -0500 Subject: [PATCH] Add support for taking a sequence of messages (#614) * Add support for taking a sequence of messages Signed-off-by: Michael Carroll * Fix uncrustify Signed-off-by: Michael Carroll * Apply suggestions from code review Co-Authored-By: Jacob Perron Signed-off-by: Michael Carroll * Address reviewer feedback. Signed-off-by: Michael Carroll * Remove vertical whitespace Signed-off-by: Michael Carroll * Address reviewer feedback Signed-off-by: Michael Carroll * Remove blank line Signed-off-by: Michael Carroll Co-authored-by: Jacob Perron --- rcl/include/rcl/subscription.h | 60 +++++++++++- rcl/src/rcl/subscription.c | 46 +++++++++ rcl/test/rcl/test_subscription.cpp | 152 +++++++++++++++++++++++++++++ 3 files changed, 255 insertions(+), 3 deletions(-) diff --git a/rcl/include/rcl/subscription.h b/rcl/include/rcl/subscription.h index af0bbe8..32ec518 100644 --- a/rcl/include/rcl/subscription.h +++ b/rcl/include/rcl/subscription.h @@ -26,6 +26,8 @@ extern "C" #include "rcl/node.h" #include "rcl/visibility_control.h" +#include "rmw/message_sequence.h" + /// Internal rcl implementation struct. struct rcl_subscription_impl_t; @@ -203,7 +205,7 @@ rcl_subscription_get_default_options(void); /// Take a ROS message from a topic using a rcl subscription. /** * It is the job of the caller to ensure that the type of the ros_message - * argument and the type associate with the subscription, via the type + * argument and the type associated with the subscription, via the type * support, match. * Passing a different type to rcl_take produces undefined behavior and cannot * be checked by this function and therefore no deliberate error will occur. @@ -227,7 +229,7 @@ rcl_subscription_get_default_options(void); * be allocated for a dynamically sized array in the target message, then the * allocator given in the subscription options is used. * - * The rmw message_info struct contains meta information about this particular + * The rmw_message_info struct contains meta information about this particular * message instance, like what the GUID of the publisher which published it * originally or whether or not the message received from within the same * process. @@ -248,7 +250,7 @@ rcl_subscription_get_default_options(void); * \param[inout] ros_message type-erased ptr to a allocated ROS message * \param[out] message_info rmw struct which contains meta-data for the message * \param[in] allocation structure pointer used for memory preallocation (may be NULL) - * \return `RCL_RET_OK` if the message was published, or + * \return `RCL_RET_OK` if the message was taken, or * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or * \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or * \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or @@ -266,6 +268,58 @@ rcl_take( rmw_subscription_allocation_t * allocation ); +/// Take a sequence of messages from a topic using a rcl subscription. +/** + * In contrast to `rcl_take`, this function can take multiple messages at + * the same time. + * It is the job of the caller to ensure that the type of the message_sequence + * argument and the type associated with the subscription, via the type + * support, match. + * + * The message_sequence pointer should point to an already allocated sequence + * of ROS messages of the correct type, into which the taken ROS messages will + * be copied if messages are available. + * The message_sequence `size` member will be set to the number of messages + * correctly taken. + * + * The rmw_message_info_sequence struct contains meta information about the + * corresponding message instance index. + * The message_info_sequence argument should be an already allocated + * rmw_message_info_sequence_t structure. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | Maybe [1] + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Yes + * [1] only if storage in the serialized_message is insufficient + * + * \param[in] subscription the handle to the subscription from which to take. + * \param[in] count number of messages to attempt to take. + * \param[inout] message_sequence pointer to a (pre-allocated) message sequence. + * \param[inout] message_info_sequence pointer to a (pre-allocated) message info sequence. + * \param[in] allocation structure pointer used for memory preallocation (may be NULL) + * \return `RCL_RET_OK` if one or more messages was taken, or + * \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or + * \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or + * \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or + * \return `RCL_RET_SUBSCRIPTION_TAKE_FAILED` if take failed but no error + * occurred in the middleware, or + * \return `RCL_RET_ERROR` if an unspecified error occurs. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_take_sequence( + const rcl_subscription_t * subscription, + size_t count, + rmw_message_sequence_t * message_sequence, + rmw_message_info_sequence_t * message_info_sequence, + rmw_subscription_allocation_t * allocation +); + /// Take a serialized raw message from a topic using a rcl subscription. /** * In contrast to `rcl_take`, this function stores the taken message in diff --git a/rcl/src/rcl/subscription.c b/rcl/src/rcl/subscription.c index d52a721..2e3b37d 100644 --- a/rcl/src/rcl/subscription.c +++ b/rcl/src/rcl/subscription.c @@ -283,6 +283,52 @@ rcl_take( return RCL_RET_OK; } +rcl_ret_t +rcl_take_sequence( + const rcl_subscription_t * subscription, + size_t count, + rmw_message_sequence_t * message_sequence, + rmw_message_info_sequence_t * message_info_sequence, + rmw_subscription_allocation_t * allocation +) +{ + // Set the sizes to zero to indicate that there are no valid messages + message_sequence->size = 0u; + message_info_sequence->size = 0u; + + RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking %zu messages", count); + if (!rcl_subscription_is_valid(subscription)) { + return RCL_RET_SUBSCRIPTION_INVALID; // error message already set + } + RCL_CHECK_ARGUMENT_FOR_NULL(message_sequence, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RCL_RET_INVALID_ARGUMENT); + + if (message_sequence->capacity < count) { + RCL_SET_ERROR_MSG("Insufficient message sequence capacity for requested count"); + return RCL_RET_INVALID_ARGUMENT; + } + + if (message_info_sequence->capacity < count) { + RCL_SET_ERROR_MSG("Insufficient message info sequence capacity for requested count"); + return RCL_RET_INVALID_ARGUMENT; + } + + size_t taken = 0u; + rmw_ret_t ret = rmw_take_sequence( + subscription->impl->rmw_handle, count, message_sequence, message_info_sequence, &taken, + allocation); + if (ret != RMW_RET_OK) { + RCL_SET_ERROR_MSG(rmw_get_error_string().str); + return rcl_convert_rmw_ret_to_rcl_ret(ret); + } + RCUTILS_LOG_DEBUG_NAMED( + ROS_PACKAGE_NAME, "Subscription took %zu messages", taken); + if (0u == taken) { + return RCL_RET_SUBSCRIPTION_TAKE_FAILED; + } + return RCL_RET_OK; +} + rcl_ret_t rcl_take_serialized_message( const rcl_subscription_t * subscription, diff --git a/rcl/test/rcl/test_subscription.cpp b/rcl/test/rcl/test_subscription.cpp index 8adcc80..f13efb6 100644 --- a/rcl/test/rcl/test_subscription.cpp +++ b/rcl/test/rcl/test_subscription.cpp @@ -278,3 +278,155 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription ASSERT_EQ(std::string(test_string), std::string(msg.string_value.data, msg.string_value.size)); } } + +/* Basic nominal test of a subscription taking a sequence. + */ +TEST_F( + CLASSNAME( + TestSubscriptionFixture, + RMW_IMPLEMENTATION), test_subscription_nominal_string_sequence) { + rcl_ret_t ret; + rcl_publisher_t publisher = rcl_get_zero_initialized_publisher(); + const rosidl_message_type_support_t * ts = + ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings); + const char * topic = "rcl_test_subscription_nominal_string_sequence_chatter"; + rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options(); + ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + rcl_subscription_t subscription = rcl_get_zero_initialized_subscription(); + rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options(); + ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr); + EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + }); + // TODO(wjwwood): add logic to wait for the connection to be established + // probably using the count_subscriptions busy wait mechanism + // until then we will sleep for a short period of time + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + const char * test_string = "testing"; + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + ret = rcl_publish(&publisher, &msg, nullptr); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + bool success; + wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100, success); + ASSERT_TRUE(success); + auto allocator = rcutils_get_default_allocator(); + { + size_t size = 1; + rmw_message_info_sequence_t message_infos; + rmw_message_info_sequence_init(&message_infos, size, &allocator); + + rmw_message_sequence_t messages; + rmw_message_sequence_init(&messages, size, &allocator); + + auto seq = test_msgs__msg__Strings__Sequence__create(size); + + for (size_t ii = 0; ii < size; ++ii) { + messages.data[ii] = &seq->data[ii]; + } + + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rmw_message_info_sequence_fini(&message_infos); + rmw_message_sequence_fini(&messages); + test_msgs__msg__Strings__Sequence__destroy(seq); + }); + + // Attempt to take more than capacity allows. + ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr); + ASSERT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + + ASSERT_EQ(0u, messages.size); + ASSERT_EQ(0u, message_infos.size); + } + + { + size_t size = 5; + rmw_message_info_sequence_t message_infos; + rmw_message_info_sequence_init(&message_infos, size, &allocator); + + rmw_message_sequence_t messages; + rmw_message_sequence_init(&messages, size, &allocator); + + auto seq = test_msgs__msg__Strings__Sequence__create(size); + + for (size_t ii = 0; ii < size; ++ii) { + messages.data[ii] = &seq->data[ii]; + } + + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rmw_message_info_sequence_fini(&message_infos); + rmw_message_sequence_fini(&messages); + test_msgs__msg__Strings__Sequence__destroy(seq); + }); + + ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ(3u, messages.size); + ASSERT_EQ(3u, message_infos.size); + } + + { + test_msgs__msg__Strings msg; + test_msgs__msg__Strings__init(&msg); + ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string)); + ret = rcl_publish(&publisher, &msg, nullptr); + ret = rcl_publish(&publisher, &msg, nullptr); + ret = rcl_publish(&publisher, &msg, nullptr); + ret = rcl_publish(&publisher, &msg, nullptr); + ret = rcl_publish(&publisher, &msg, nullptr); + test_msgs__msg__Strings__fini(&msg); + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + } + + // Give a brief moment for publications to go through. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + // Take fewer messages than are available in the subscription + { + size_t size = 3; + rmw_message_info_sequence_t message_infos; + rmw_message_info_sequence_init(&message_infos, size, &allocator); + + rmw_message_sequence_t messages; + rmw_message_sequence_init(&messages, size, &allocator); + + auto seq = test_msgs__msg__Strings__Sequence__create(size); + + for (size_t ii = 0; ii < size; ++ii) { + messages.data[ii] = &seq->data[ii]; + } + + OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT( + { + rmw_message_info_sequence_fini(&message_infos); + rmw_message_sequence_fini(&messages); + test_msgs__msg__Strings__Sequence__destroy(seq); + }); + + ret = rcl_take_sequence(&subscription, 3, &messages, &message_infos, nullptr); + + ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str; + ASSERT_EQ(3u, messages.size); + ASSERT_EQ(3u, message_infos.size); + + ASSERT_EQ( + std::string(test_string), + std::string(seq->data[0].string_value.data, seq->data[0].string_value.size)); + } +}