add write/take of serialized messages

This commit is contained in:
Erik Boasson 2019-05-05 12:36:58 +08:00
parent b925d5591c
commit 925b6e09b3
3 changed files with 128 additions and 21 deletions

View file

@ -100,11 +100,13 @@ struct CddsNode {
struct CddsPublisher {
dds_entity_t pubh;
dds_instance_handle_t pubiid;
struct ddsi_sertopic *sertopic;
};
struct CddsSubscription {
dds_entity_t subh;
dds_entity_t rdcondh;
struct ddsi_sertopic *sertopic;
};
struct CddsCS {
@ -348,18 +350,54 @@ extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition (cons
/////////// ///////////
/////////////////////////////////////////////////////////////////////////////////////////
using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_c__MessageMembers>;
using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_cpp::MessageMembers>;
extern "C" rmw_ret_t rmw_serialize (const void *ros_message, const rosidl_message_type_support_t *type_support, rmw_serialized_message_t *serialized_message)
{
(void) ros_message; (void) type_support; (void) serialized_message;
RMW_SET_ERROR_MSG ("rmw_serialize not implemented");
return RMW_RET_ERROR;
std::vector<unsigned char> data;
cycser sd (data);
rmw_ret_t ret;
const rosidl_message_type_support_t *ts;
if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_c__identifier)) != nullptr) {
auto members = static_cast<const rosidl_typesupport_introspection_c__MessageMembers *> (ts->data);
MessageTypeSupport_c msgts (members);
msgts.serializeROSmessage (ros_message, sd, nullptr);
} else if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) {
auto members = static_cast<const rosidl_typesupport_introspection_cpp::MessageMembers *> (ts->data);
MessageTypeSupport_cpp msgts (members);
msgts.serializeROSmessage (ros_message, sd, nullptr);
} else {
RMW_SET_ERROR_MSG ("rmw_serialize: type support trouble");
return RMW_RET_ERROR;
}
/* FIXME: what about the header - should be included or not? */
if ((ret = rmw_serialized_message_resize (serialized_message, data.size ())) != RMW_RET_OK) {
return ret;
}
memcpy (serialized_message->buffer, data.data (), data.size ());
serialized_message->buffer_length = data.size ();
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_deserialize (const rmw_serialized_message_t *serialized_message, const rosidl_message_type_support_t *type_support, void *ros_message)
{
(void) ros_message; (void) type_support; (void) serialized_message;
RMW_SET_ERROR_MSG ("rmw_deserialize not implemented");
return RMW_RET_ERROR;
cycdeser sd (serialized_message->buffer, serialized_message->buffer_length);
bool ok;
const rosidl_message_type_support_t *ts;
if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_c__identifier)) != nullptr) {
auto members = static_cast<const rosidl_typesupport_introspection_c__MessageMembers *> (ts->data);
MessageTypeSupport_c msgts (members);
ok = msgts.deserializeROSmessage (sd, ros_message, nullptr);
} else if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) {
auto members = static_cast<const rosidl_typesupport_introspection_cpp::MessageMembers *> (ts->data);
MessageTypeSupport_cpp msgts (members);
ok = msgts.deserializeROSmessage (sd, ros_message, nullptr);
} else {
RMW_SET_ERROR_MSG ("rmw_serialize: type support trouble");
return RMW_RET_ERROR;
}
return ok ? RMW_RET_OK : RMW_RET_ERROR;
}
/////////////////////////////////////////////////////////////////////////////////////////
@ -386,9 +424,12 @@ extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *
extern "C" rmw_ret_t rmw_publish_serialized_message (const rmw_publisher_t *publisher, const rmw_serialized_message_t *serialized_message)
{
(void) publisher; (void) serialized_message;
RMW_SET_ERROR_MSG ("rmw_publish_serialized_message not implemented");
return RMW_RET_ERROR;
RET_WRONG_IMPLID (publisher);
RET_NULL (serialized_message);
auto pub = static_cast<CddsPublisher *> (publisher->data);
struct ddsi_serdata *d = serdata_rmw_from_serialized_message (pub->sertopic, serialized_message->buffer, serialized_message->buffer_length);
const bool ok = (dds_writecdr (pub->pubh, d) >= 0);
return ok ? RMW_RET_OK : RMW_RET_ERROR;
}
static const rosidl_message_type_support_t *get_typesupport (const rosidl_message_type_support_t *type_supports)
@ -490,8 +531,11 @@ static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosid
RMW_SET_ERROR_MSG ("failed to get instance handle for writer");
goto fail_instance_handle;
}
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
pub->sertopic = sertopic;
dds_delete_qos (qos);
/* FIXME: leak the topic for now */
dds_delete (topic);
return pub;
fail_instance_handle:
@ -501,8 +545,7 @@ static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosid
fail_writer:
dds_delete_qos (qos);
fail_qos:
/* not deleting topic -- have to sort out proper topic handling & that requires fixing a few
things in cyclone as well */
dds_delete (topic);
fail_topic:
delete pub;
return nullptr;
@ -574,6 +617,7 @@ extern "C" rmw_ret_t rmw_destroy_publisher (rmw_node_t *node, rmw_publisher_t *p
if (dds_delete (pub->pubh) < 0) {
RMW_SET_ERROR_MSG ("failed to delete writer");
}
ddsi_sertopic_unref (pub->sertopic);
delete pub;
}
rmw_free (const_cast<char *> (publisher->topic_name));
@ -619,7 +663,11 @@ static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const
RMW_SET_ERROR_MSG ("failed to create readcondition");
goto fail_readcond;
}
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
sub->sertopic = sertopic;
dds_delete_qos (qos);
dds_delete (topic);
return sub;
fail_readcond:
if (dds_delete (sub->subh) < 0) {
@ -628,7 +676,7 @@ static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const
fail_reader:
dds_delete_qos (qos);
fail_qos:
/* FIXME: leak topic */
dds_delete (topic);
fail_topic:
delete sub;
return nullptr;
@ -683,6 +731,7 @@ extern "C" rmw_ret_t rmw_destroy_subscription (rmw_node_t *node, rmw_subscriptio
if (dds_delete (sub->subh) < 0) {
RMW_SET_ERROR_MSG ("failed to delete reader");
}
ddsi_sertopic_unref (sub->sertopic);
delete sub;
}
rmw_free (const_cast<char *> (subscription->topic_name));
@ -715,6 +764,42 @@ static rmw_ret_t rmw_take_int (const rmw_subscription_t *subscription, void *ros
return RMW_RET_OK;
}
static rmw_ret_t rmw_take_ser_int (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info)
{
RET_NULL (taken);
RET_NULL (serialized_message);
RET_WRONG_IMPLID (subscription);
CddsSubscription *sub = static_cast<CddsSubscription *> (subscription->data);
RET_NULL (sub);
dds_sample_info_t info;
struct ddsi_serdata *dcmn;
while (dds_takecdr (sub->subh, &dcmn, 1, &info, DDS_ANY_STATE) == 1) {
if (info.valid_data) {
if (message_info) {
message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier;
memset (message_info->publisher_gid.data, 0, sizeof (message_info->publisher_gid.data));
assert (sizeof (info.publication_handle) <= sizeof (message_info->publisher_gid.data));
memcpy (message_info->publisher_gid.data, &info.publication_handle, sizeof (info.publication_handle));
}
auto d = static_cast<struct serdata_rmw *> (dcmn);
/* FIXME: what about the header - should be included or not? */
if (rmw_serialized_message_resize (serialized_message, d->data.size ()) != RMW_RET_OK) {
ddsi_serdata_unref (dcmn);
*taken = false;
return RMW_RET_ERROR;
}
memcpy (serialized_message->buffer, d->data.data (), d->data.size ());
serialized_message->buffer_length = d->data.size ();
ddsi_serdata_unref (dcmn);
*taken = true;
return RMW_RET_OK;
}
ddsi_serdata_unref (dcmn);
}
*taken = false;
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken)
{
return rmw_take_int (subscription, ros_message, taken, nullptr);
@ -727,16 +812,12 @@ extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription,
extern "C" rmw_ret_t rmw_take_serialized_message (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken)
{
(void) subscription; (void) serialized_message; (void) taken;
RMW_SET_ERROR_MSG ("rmw_take_serialized_message not implemented");
return RMW_RET_ERROR;
return rmw_take_ser_int (subscription, serialized_message, taken, nullptr);
}
extern "C" rmw_ret_t rmw_take_serialized_message_with_info (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info)
{
(void) subscription; (void) serialized_message; (void) taken; (void) message_info;
RMW_SET_ERROR_MSG ("rmw_take_serialized_message_with_info not implemented");
return RMW_RET_ERROR;
return rmw_take_ser_int (subscription, serialized_message, taken, message_info);
}
/////////////////////////////////////////////////////////////////////////////////////////
@ -1114,10 +1195,16 @@ static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_s
RMW_SET_ERROR_MSG ("failed to create writer");
goto fail_writer;
}
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
pub->sertopic = pub_st;
if ((sub->subh = dds_create_reader (gcdds.ppant, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG ("failed to create reader");
goto fail_reader;
}
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
sub->sertopic = sub_st;
if ((sub->rdcondh = dds_create_readcondition (sub->subh, DDS_ANY_STATE)) < 0) {
RMW_SET_ERROR_MSG ("failed to create readcondition");
goto fail_readcond;
@ -1127,6 +1214,8 @@ static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_s
goto fail_instance_handle;
}
dds_delete_qos (qos);
dds_delete (subtopic);
dds_delete (pubtopic);
cs->pub = pub;
cs->sub = sub;
@ -1141,15 +1230,17 @@ static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_s
fail_writer:
dds_delete_qos (qos);
fail_qos:
/* leak subtopic */
dds_delete (subtopic);
fail_subtopic:
/* leak pubtopic */
dds_delete (pubtopic);
fail_pubtopic:
return RMW_RET_ERROR;
}
static void rmw_fini_cs (CddsCS *cs)
{
ddsi_sertopic_unref (cs->sub->sertopic);
ddsi_sertopic_unref (cs->pub->sertopic);
dds_delete (cs->sub->rdcondh);
dds_delete (cs->sub->subh);
dds_delete (cs->pub->pubh);