Add support for taking a sequence of messages (#614)
* Add support for taking a sequence of messages Signed-off-by: Michael Carroll <michael@openrobotics.org> * Fix uncrustify Signed-off-by: Michael Carroll <michael@openrobotics.org> * Apply suggestions from code review Co-Authored-By: Jacob Perron <jacob@openrobotics.org> Signed-off-by: Michael Carroll <michael@openrobotics.org> * Address reviewer feedback. Signed-off-by: Michael Carroll <michael@openrobotics.org> * Remove vertical whitespace Signed-off-by: Michael Carroll <michael@openrobotics.org> * Address reviewer feedback Signed-off-by: Michael Carroll <michael@openrobotics.org> * Remove blank line Signed-off-by: Michael Carroll <michael@openrobotics.org> Co-authored-by: Jacob Perron <jacob@openrobotics.org>
This commit is contained in:
parent
643f6ec530
commit
84cac5937b
3 changed files with 255 additions and 3 deletions
|
@ -26,6 +26,8 @@ extern "C"
|
||||||
#include "rcl/node.h"
|
#include "rcl/node.h"
|
||||||
#include "rcl/visibility_control.h"
|
#include "rcl/visibility_control.h"
|
||||||
|
|
||||||
|
#include "rmw/message_sequence.h"
|
||||||
|
|
||||||
/// Internal rcl implementation struct.
|
/// Internal rcl implementation struct.
|
||||||
struct rcl_subscription_impl_t;
|
struct rcl_subscription_impl_t;
|
||||||
|
|
||||||
|
@ -203,7 +205,7 @@ rcl_subscription_get_default_options(void);
|
||||||
/// Take a ROS message from a topic using a rcl subscription.
|
/// Take a ROS message from a topic using a rcl subscription.
|
||||||
/**
|
/**
|
||||||
* It is the job of the caller to ensure that the type of the ros_message
|
* It is the job of the caller to ensure that the type of the ros_message
|
||||||
* argument and the type associate with the subscription, via the type
|
* argument and the type associated with the subscription, via the type
|
||||||
* support, match.
|
* support, match.
|
||||||
* Passing a different type to rcl_take produces undefined behavior and cannot
|
* Passing a different type to rcl_take produces undefined behavior and cannot
|
||||||
* be checked by this function and therefore no deliberate error will occur.
|
* be checked by this function and therefore no deliberate error will occur.
|
||||||
|
@ -227,7 +229,7 @@ rcl_subscription_get_default_options(void);
|
||||||
* be allocated for a dynamically sized array in the target message, then the
|
* be allocated for a dynamically sized array in the target message, then the
|
||||||
* allocator given in the subscription options is used.
|
* allocator given in the subscription options is used.
|
||||||
*
|
*
|
||||||
* The rmw message_info struct contains meta information about this particular
|
* The rmw_message_info struct contains meta information about this particular
|
||||||
* message instance, like what the GUID of the publisher which published it
|
* message instance, like what the GUID of the publisher which published it
|
||||||
* originally or whether or not the message received from within the same
|
* originally or whether or not the message received from within the same
|
||||||
* process.
|
* process.
|
||||||
|
@ -248,7 +250,7 @@ rcl_subscription_get_default_options(void);
|
||||||
* \param[inout] ros_message type-erased ptr to a allocated ROS message
|
* \param[inout] ros_message type-erased ptr to a allocated ROS message
|
||||||
* \param[out] message_info rmw struct which contains meta-data for the message
|
* \param[out] message_info rmw struct which contains meta-data for the message
|
||||||
* \param[in] allocation structure pointer used for memory preallocation (may be NULL)
|
* \param[in] allocation structure pointer used for memory preallocation (may be NULL)
|
||||||
* \return `RCL_RET_OK` if the message was published, or
|
* \return `RCL_RET_OK` if the message was taken, or
|
||||||
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
|
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
|
||||||
* \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or
|
* \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or
|
||||||
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or
|
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or
|
||||||
|
@ -266,6 +268,58 @@ rcl_take(
|
||||||
rmw_subscription_allocation_t * allocation
|
rmw_subscription_allocation_t * allocation
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/// Take a sequence of messages from a topic using a rcl subscription.
|
||||||
|
/**
|
||||||
|
* In contrast to `rcl_take`, this function can take multiple messages at
|
||||||
|
* the same time.
|
||||||
|
* It is the job of the caller to ensure that the type of the message_sequence
|
||||||
|
* argument and the type associated with the subscription, via the type
|
||||||
|
* support, match.
|
||||||
|
*
|
||||||
|
* The message_sequence pointer should point to an already allocated sequence
|
||||||
|
* of ROS messages of the correct type, into which the taken ROS messages will
|
||||||
|
* be copied if messages are available.
|
||||||
|
* The message_sequence `size` member will be set to the number of messages
|
||||||
|
* correctly taken.
|
||||||
|
*
|
||||||
|
* The rmw_message_info_sequence struct contains meta information about the
|
||||||
|
* corresponding message instance index.
|
||||||
|
* The message_info_sequence argument should be an already allocated
|
||||||
|
* rmw_message_info_sequence_t structure.
|
||||||
|
*
|
||||||
|
* <hr>
|
||||||
|
* Attribute | Adherence
|
||||||
|
* ------------------ | -------------
|
||||||
|
* Allocates Memory | Maybe [1]
|
||||||
|
* Thread-Safe | No
|
||||||
|
* Uses Atomics | No
|
||||||
|
* Lock-Free | Yes
|
||||||
|
* <i>[1] only if storage in the serialized_message is insufficient</i>
|
||||||
|
*
|
||||||
|
* \param[in] subscription the handle to the subscription from which to take.
|
||||||
|
* \param[in] count number of messages to attempt to take.
|
||||||
|
* \param[inout] message_sequence pointer to a (pre-allocated) message sequence.
|
||||||
|
* \param[inout] message_info_sequence pointer to a (pre-allocated) message info sequence.
|
||||||
|
* \param[in] allocation structure pointer used for memory preallocation (may be NULL)
|
||||||
|
* \return `RCL_RET_OK` if one or more messages was taken, or
|
||||||
|
* \return `RCL_RET_INVALID_ARGUMENT` if any arguments are invalid, or
|
||||||
|
* \return `RCL_RET_SUBSCRIPTION_INVALID` if the subscription is invalid, or
|
||||||
|
* \return `RCL_RET_BAD_ALLOC` if allocating memory failed, or
|
||||||
|
* \return `RCL_RET_SUBSCRIPTION_TAKE_FAILED` if take failed but no error
|
||||||
|
* occurred in the middleware, or
|
||||||
|
* \return `RCL_RET_ERROR` if an unspecified error occurs.
|
||||||
|
*/
|
||||||
|
RCL_PUBLIC
|
||||||
|
RCL_WARN_UNUSED
|
||||||
|
rcl_ret_t
|
||||||
|
rcl_take_sequence(
|
||||||
|
const rcl_subscription_t * subscription,
|
||||||
|
size_t count,
|
||||||
|
rmw_message_sequence_t * message_sequence,
|
||||||
|
rmw_message_info_sequence_t * message_info_sequence,
|
||||||
|
rmw_subscription_allocation_t * allocation
|
||||||
|
);
|
||||||
|
|
||||||
/// Take a serialized raw message from a topic using a rcl subscription.
|
/// Take a serialized raw message from a topic using a rcl subscription.
|
||||||
/**
|
/**
|
||||||
* In contrast to `rcl_take`, this function stores the taken message in
|
* In contrast to `rcl_take`, this function stores the taken message in
|
||||||
|
|
|
@ -283,6 +283,52 @@ rcl_take(
|
||||||
return RCL_RET_OK;
|
return RCL_RET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rcl_ret_t
|
||||||
|
rcl_take_sequence(
|
||||||
|
const rcl_subscription_t * subscription,
|
||||||
|
size_t count,
|
||||||
|
rmw_message_sequence_t * message_sequence,
|
||||||
|
rmw_message_info_sequence_t * message_info_sequence,
|
||||||
|
rmw_subscription_allocation_t * allocation
|
||||||
|
)
|
||||||
|
{
|
||||||
|
// Set the sizes to zero to indicate that there are no valid messages
|
||||||
|
message_sequence->size = 0u;
|
||||||
|
message_info_sequence->size = 0u;
|
||||||
|
|
||||||
|
RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription taking %zu messages", count);
|
||||||
|
if (!rcl_subscription_is_valid(subscription)) {
|
||||||
|
return RCL_RET_SUBSCRIPTION_INVALID; // error message already set
|
||||||
|
}
|
||||||
|
RCL_CHECK_ARGUMENT_FOR_NULL(message_sequence, RCL_RET_INVALID_ARGUMENT);
|
||||||
|
RCL_CHECK_ARGUMENT_FOR_NULL(message_info_sequence, RCL_RET_INVALID_ARGUMENT);
|
||||||
|
|
||||||
|
if (message_sequence->capacity < count) {
|
||||||
|
RCL_SET_ERROR_MSG("Insufficient message sequence capacity for requested count");
|
||||||
|
return RCL_RET_INVALID_ARGUMENT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message_info_sequence->capacity < count) {
|
||||||
|
RCL_SET_ERROR_MSG("Insufficient message info sequence capacity for requested count");
|
||||||
|
return RCL_RET_INVALID_ARGUMENT;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t taken = 0u;
|
||||||
|
rmw_ret_t ret = rmw_take_sequence(
|
||||||
|
subscription->impl->rmw_handle, count, message_sequence, message_info_sequence, &taken,
|
||||||
|
allocation);
|
||||||
|
if (ret != RMW_RET_OK) {
|
||||||
|
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
|
||||||
|
return rcl_convert_rmw_ret_to_rcl_ret(ret);
|
||||||
|
}
|
||||||
|
RCUTILS_LOG_DEBUG_NAMED(
|
||||||
|
ROS_PACKAGE_NAME, "Subscription took %zu messages", taken);
|
||||||
|
if (0u == taken) {
|
||||||
|
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
|
||||||
|
}
|
||||||
|
return RCL_RET_OK;
|
||||||
|
}
|
||||||
|
|
||||||
rcl_ret_t
|
rcl_ret_t
|
||||||
rcl_take_serialized_message(
|
rcl_take_serialized_message(
|
||||||
const rcl_subscription_t * subscription,
|
const rcl_subscription_t * subscription,
|
||||||
|
|
|
@ -278,3 +278,155 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription
|
||||||
ASSERT_EQ(std::string(test_string), std::string(msg.string_value.data, msg.string_value.size));
|
ASSERT_EQ(std::string(test_string), std::string(msg.string_value.data, msg.string_value.size));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Basic nominal test of a subscription taking a sequence.
|
||||||
|
*/
|
||||||
|
TEST_F(
|
||||||
|
CLASSNAME(
|
||||||
|
TestSubscriptionFixture,
|
||||||
|
RMW_IMPLEMENTATION), test_subscription_nominal_string_sequence) {
|
||||||
|
rcl_ret_t ret;
|
||||||
|
rcl_publisher_t publisher = rcl_get_zero_initialized_publisher();
|
||||||
|
const rosidl_message_type_support_t * ts =
|
||||||
|
ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings);
|
||||||
|
const char * topic = "rcl_test_subscription_nominal_string_sequence_chatter";
|
||||||
|
rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
|
||||||
|
ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options);
|
||||||
|
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
|
||||||
|
{
|
||||||
|
rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr);
|
||||||
|
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
});
|
||||||
|
rcl_subscription_t subscription = rcl_get_zero_initialized_subscription();
|
||||||
|
rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options();
|
||||||
|
ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options);
|
||||||
|
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
|
||||||
|
{
|
||||||
|
rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr);
|
||||||
|
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
});
|
||||||
|
// TODO(wjwwood): add logic to wait for the connection to be established
|
||||||
|
// probably using the count_subscriptions busy wait mechanism
|
||||||
|
// until then we will sleep for a short period of time
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
||||||
|
const char * test_string = "testing";
|
||||||
|
{
|
||||||
|
test_msgs__msg__Strings msg;
|
||||||
|
test_msgs__msg__Strings__init(&msg);
|
||||||
|
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
test_msgs__msg__Strings__fini(&msg);
|
||||||
|
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
}
|
||||||
|
bool success;
|
||||||
|
wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100, success);
|
||||||
|
ASSERT_TRUE(success);
|
||||||
|
auto allocator = rcutils_get_default_allocator();
|
||||||
|
{
|
||||||
|
size_t size = 1;
|
||||||
|
rmw_message_info_sequence_t message_infos;
|
||||||
|
rmw_message_info_sequence_init(&message_infos, size, &allocator);
|
||||||
|
|
||||||
|
rmw_message_sequence_t messages;
|
||||||
|
rmw_message_sequence_init(&messages, size, &allocator);
|
||||||
|
|
||||||
|
auto seq = test_msgs__msg__Strings__Sequence__create(size);
|
||||||
|
|
||||||
|
for (size_t ii = 0; ii < size; ++ii) {
|
||||||
|
messages.data[ii] = &seq->data[ii];
|
||||||
|
}
|
||||||
|
|
||||||
|
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
|
||||||
|
{
|
||||||
|
rmw_message_info_sequence_fini(&message_infos);
|
||||||
|
rmw_message_sequence_fini(&messages);
|
||||||
|
test_msgs__msg__Strings__Sequence__destroy(seq);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Attempt to take more than capacity allows.
|
||||||
|
ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
|
||||||
|
ASSERT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;
|
||||||
|
|
||||||
|
ASSERT_EQ(0u, messages.size);
|
||||||
|
ASSERT_EQ(0u, message_infos.size);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
size_t size = 5;
|
||||||
|
rmw_message_info_sequence_t message_infos;
|
||||||
|
rmw_message_info_sequence_init(&message_infos, size, &allocator);
|
||||||
|
|
||||||
|
rmw_message_sequence_t messages;
|
||||||
|
rmw_message_sequence_init(&messages, size, &allocator);
|
||||||
|
|
||||||
|
auto seq = test_msgs__msg__Strings__Sequence__create(size);
|
||||||
|
|
||||||
|
for (size_t ii = 0; ii < size; ++ii) {
|
||||||
|
messages.data[ii] = &seq->data[ii];
|
||||||
|
}
|
||||||
|
|
||||||
|
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
|
||||||
|
{
|
||||||
|
rmw_message_info_sequence_fini(&message_infos);
|
||||||
|
rmw_message_sequence_fini(&messages);
|
||||||
|
test_msgs__msg__Strings__Sequence__destroy(seq);
|
||||||
|
});
|
||||||
|
|
||||||
|
ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
|
||||||
|
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
ASSERT_EQ(3u, messages.size);
|
||||||
|
ASSERT_EQ(3u, message_infos.size);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
test_msgs__msg__Strings msg;
|
||||||
|
test_msgs__msg__Strings__init(&msg);
|
||||||
|
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
ret = rcl_publish(&publisher, &msg, nullptr);
|
||||||
|
test_msgs__msg__Strings__fini(&msg);
|
||||||
|
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give a brief moment for publications to go through.
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||||
|
// Take fewer messages than are available in the subscription
|
||||||
|
{
|
||||||
|
size_t size = 3;
|
||||||
|
rmw_message_info_sequence_t message_infos;
|
||||||
|
rmw_message_info_sequence_init(&message_infos, size, &allocator);
|
||||||
|
|
||||||
|
rmw_message_sequence_t messages;
|
||||||
|
rmw_message_sequence_init(&messages, size, &allocator);
|
||||||
|
|
||||||
|
auto seq = test_msgs__msg__Strings__Sequence__create(size);
|
||||||
|
|
||||||
|
for (size_t ii = 0; ii < size; ++ii) {
|
||||||
|
messages.data[ii] = &seq->data[ii];
|
||||||
|
}
|
||||||
|
|
||||||
|
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
|
||||||
|
{
|
||||||
|
rmw_message_info_sequence_fini(&message_infos);
|
||||||
|
rmw_message_sequence_fini(&messages);
|
||||||
|
test_msgs__msg__Strings__Sequence__destroy(seq);
|
||||||
|
});
|
||||||
|
|
||||||
|
ret = rcl_take_sequence(&subscription, 3, &messages, &message_infos, nullptr);
|
||||||
|
|
||||||
|
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
|
||||||
|
ASSERT_EQ(3u, messages.size);
|
||||||
|
ASSERT_EQ(3u, message_infos.size);
|
||||||
|
|
||||||
|
ASSERT_EQ(
|
||||||
|
std::string(test_string),
|
||||||
|
std::string(seq->data[0].string_value.data, seq->data[0].string_value.size));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue