Add support for taking a sequence of messages (#148)

* Add support for taking a sequence of messages

Signed-off-by: Michael Carroll <michael@openrobotics.org>

* Reorder valid messages to front of sequence

Signed-off-by: Michael Carroll <michael@openrobotics.org>

* Initialize taken value

Signed-off-by: Michael Carroll <michael@openrobotics.org>
This commit is contained in:
Michael Carroll 2020-04-24 11:49:15 -05:00 committed by GitHub
parent 2638f4abe8
commit 224d761eca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2163,6 +2163,80 @@ static rmw_ret_t rmw_take_int(
return RMW_RET_OK; return RMW_RET_OK;
} }
static rmw_ret_t rmw_take_seq(
const rmw_subscription_t * subscription,
size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken)
{
RET_NULL(taken);
RET_NULL(message_sequence);
RET_NULL(message_info_sequence);
RET_WRONG_IMPLID(subscription);
if (count > message_sequence->capacity) {
RMW_SET_ERROR_MSG("Insuffient capacity in message_sequence");
return RMW_RET_ERROR;
}
if (count > message_info_sequence->capacity) {
RMW_SET_ERROR_MSG("Insuffient capacity in message_info_sequence");
return RMW_RET_ERROR;
}
CddsSubscription * sub = static_cast<CddsSubscription *>(subscription->data);
RET_NULL(sub);
std::vector<dds_sample_info_t> infos(count);
auto ret = dds_take(sub->enth, message_sequence->data, infos.data(), count, count);
// Returning 0 should not be an error, as it just indicates that no messages were available.
if (ret < 0) {
return RMW_RET_ERROR;
}
// Keep track of taken/not taken to reorder sequence with valid messages at the front
std::vector<void *> taken_msg;
std::vector<void *> not_taken_msg;
*taken = 0u;
for (int ii = 0; ii < ret; ++ii) {
const dds_sample_info_t & info = infos[ii];
void * message = &message_sequence->data[ii];
rmw_message_info_t * message_info = &message_info_sequence->data[*taken];
if (info.valid_data) {
taken_msg.push_back(message);
(*taken)++;
if (message_info) {
message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier;
memset(message_info->publisher_gid.data, 0, sizeof(message_info->publisher_gid.data));
assert(sizeof(info.publication_handle) <= sizeof(message_info->publisher_gid.data));
memcpy(
message_info->publisher_gid.data, &info.publication_handle,
sizeof(info.publication_handle));
}
} else {
not_taken_msg.push_back(message);
}
}
for (size_t ii = 0; ii < taken_msg.size(); ++ii) {
message_sequence->data[ii] = taken_msg[ii];
}
for (size_t ii = 0; ii < not_taken_msg.size(); ++ii) {
message_sequence->data[ii + taken_msg.size()] = not_taken_msg[ii];
}
message_sequence->size = *taken;
message_info_sequence->size = *taken;
return RMW_RET_OK;
}
static rmw_ret_t rmw_take_ser_int( static rmw_ret_t rmw_take_ser_int(
const rmw_subscription_t * subscription, const rmw_subscription_t * subscription,
rmw_serialized_message_t * serialized_message, bool * taken, rmw_serialized_message_t * serialized_message, bool * taken,
@ -2221,6 +2295,16 @@ extern "C" rmw_ret_t rmw_take_with_info(
return rmw_take_int(subscription, ros_message, taken, message_info); return rmw_take_int(subscription, ros_message, taken, message_info);
} }
extern "C" rmw_ret_t rmw_take_sequence(
const rmw_subscription_t * subscription, size_t count,
rmw_message_sequence_t * message_sequence,
rmw_message_info_sequence_t * message_info_sequence,
size_t * taken, rmw_subscription_allocation_t * allocation)
{
static_cast<void>(allocation);
return rmw_take_seq(subscription, count, message_sequence, message_info_sequence, taken);
}
extern "C" rmw_ret_t rmw_take_serialized_message( extern "C" rmw_ret_t rmw_take_serialized_message(
const rmw_subscription_t * subscription, const rmw_subscription_t * subscription,
rmw_serialized_message_t * serialized_message, rmw_serialized_message_t * serialized_message,