diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp index 6fe9ad4..8223a5e 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp @@ -52,4 +52,6 @@ void *create_response_type_support (const void *untyped_members, const char *typ struct sertopic_rmw *create_sertopic (const char *topicname, const char *type_support_identifier, void *type_support, bool is_request_header); +struct ddsi_serdata *serdata_rmw_from_serialized_message (const struct ddsi_sertopic *topiccmn, const void *raw, size_t size); + #endif diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 98ebd01..a54766f 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -100,11 +100,13 @@ struct CddsNode { struct CddsPublisher { dds_entity_t pubh; dds_instance_handle_t pubiid; + struct ddsi_sertopic *sertopic; }; struct CddsSubscription { dds_entity_t subh; dds_entity_t rdcondh; + struct ddsi_sertopic *sertopic; }; struct CddsCS { @@ -348,18 +350,54 @@ extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition (cons /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// +using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; +using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; + extern "C" rmw_ret_t rmw_serialize (const void *ros_message, const rosidl_message_type_support_t *type_support, rmw_serialized_message_t *serialized_message) { - (void) ros_message; (void) type_support; (void) serialized_message; - RMW_SET_ERROR_MSG ("rmw_serialize not implemented"); - return RMW_RET_ERROR; + std::vector data; + cycser sd (data); + rmw_ret_t ret; + const rosidl_message_type_support_t *ts; + if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + auto members = static_cast (ts->data); + MessageTypeSupport_c msgts (members); + msgts.serializeROSmessage (ros_message, sd, nullptr); + } else if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + auto members = static_cast (ts->data); + MessageTypeSupport_cpp msgts (members); + msgts.serializeROSmessage (ros_message, sd, nullptr); + } else { + RMW_SET_ERROR_MSG ("rmw_serialize: type support trouble"); + return RMW_RET_ERROR; + } + /* FIXME: what about the header - should be included or not? */ + if ((ret = rmw_serialized_message_resize (serialized_message, data.size ())) != RMW_RET_OK) { + return ret; + } + memcpy (serialized_message->buffer, data.data (), data.size ()); + serialized_message->buffer_length = data.size (); + return RMW_RET_OK; } extern "C" rmw_ret_t rmw_deserialize (const rmw_serialized_message_t *serialized_message, const rosidl_message_type_support_t *type_support, void *ros_message) { - (void) ros_message; (void) type_support; (void) serialized_message; - RMW_SET_ERROR_MSG ("rmw_deserialize not implemented"); - return RMW_RET_ERROR; + cycdeser sd (serialized_message->buffer, serialized_message->buffer_length); + bool ok; + const rosidl_message_type_support_t *ts; + if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + auto members = static_cast (ts->data); + MessageTypeSupport_c msgts (members); + ok = msgts.deserializeROSmessage (sd, ros_message, nullptr); + } else if ((ts = get_message_typesupport_handle (type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + auto members = static_cast (ts->data); + MessageTypeSupport_cpp msgts (members); + ok = msgts.deserializeROSmessage (sd, ros_message, nullptr); + } else { + RMW_SET_ERROR_MSG ("rmw_serialize: type support trouble"); + return RMW_RET_ERROR; + } + return ok ? RMW_RET_OK : RMW_RET_ERROR; } ///////////////////////////////////////////////////////////////////////////////////////// @@ -386,9 +424,12 @@ extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void * extern "C" rmw_ret_t rmw_publish_serialized_message (const rmw_publisher_t *publisher, const rmw_serialized_message_t *serialized_message) { - (void) publisher; (void) serialized_message; - RMW_SET_ERROR_MSG ("rmw_publish_serialized_message not implemented"); - return RMW_RET_ERROR; + RET_WRONG_IMPLID (publisher); + RET_NULL (serialized_message); + auto pub = static_cast (publisher->data); + struct ddsi_serdata *d = serdata_rmw_from_serialized_message (pub->sertopic, serialized_message->buffer, serialized_message->buffer_length); + const bool ok = (dds_writecdr (pub->pubh, d) >= 0); + return ok ? RMW_RET_OK : RMW_RET_ERROR; } static const rosidl_message_type_support_t *get_typesupport (const rosidl_message_type_support_t *type_supports) @@ -490,8 +531,11 @@ static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosid RMW_SET_ERROR_MSG ("failed to get instance handle for writer"); goto fail_instance_handle; } + /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been + created earlier, but the two are equivalent, so this'll do */ + pub->sertopic = sertopic; dds_delete_qos (qos); - /* FIXME: leak the topic for now */ + dds_delete (topic); return pub; fail_instance_handle: @@ -501,8 +545,7 @@ static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosid fail_writer: dds_delete_qos (qos); fail_qos: - /* not deleting topic -- have to sort out proper topic handling & that requires fixing a few - things in cyclone as well */ + dds_delete (topic); fail_topic: delete pub; return nullptr; @@ -574,6 +617,7 @@ extern "C" rmw_ret_t rmw_destroy_publisher (rmw_node_t *node, rmw_publisher_t *p if (dds_delete (pub->pubh) < 0) { RMW_SET_ERROR_MSG ("failed to delete writer"); } + ddsi_sertopic_unref (pub->sertopic); delete pub; } rmw_free (const_cast (publisher->topic_name)); @@ -619,7 +663,11 @@ static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const RMW_SET_ERROR_MSG ("failed to create readcondition"); goto fail_readcond; } + /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been + created earlier, but the two are equivalent, so this'll do */ + sub->sertopic = sertopic; dds_delete_qos (qos); + dds_delete (topic); return sub; fail_readcond: if (dds_delete (sub->subh) < 0) { @@ -628,7 +676,7 @@ static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const fail_reader: dds_delete_qos (qos); fail_qos: - /* FIXME: leak topic */ + dds_delete (topic); fail_topic: delete sub; return nullptr; @@ -683,6 +731,7 @@ extern "C" rmw_ret_t rmw_destroy_subscription (rmw_node_t *node, rmw_subscriptio if (dds_delete (sub->subh) < 0) { RMW_SET_ERROR_MSG ("failed to delete reader"); } + ddsi_sertopic_unref (sub->sertopic); delete sub; } rmw_free (const_cast (subscription->topic_name)); @@ -715,6 +764,42 @@ static rmw_ret_t rmw_take_int (const rmw_subscription_t *subscription, void *ros return RMW_RET_OK; } +static rmw_ret_t rmw_take_ser_int (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info) +{ + RET_NULL (taken); + RET_NULL (serialized_message); + RET_WRONG_IMPLID (subscription); + CddsSubscription *sub = static_cast (subscription->data); + RET_NULL (sub); + dds_sample_info_t info; + struct ddsi_serdata *dcmn; + while (dds_takecdr (sub->subh, &dcmn, 1, &info, DDS_ANY_STATE) == 1) { + if (info.valid_data) { + if (message_info) { + message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier; + memset (message_info->publisher_gid.data, 0, sizeof (message_info->publisher_gid.data)); + assert (sizeof (info.publication_handle) <= sizeof (message_info->publisher_gid.data)); + memcpy (message_info->publisher_gid.data, &info.publication_handle, sizeof (info.publication_handle)); + } + auto d = static_cast (dcmn); + /* FIXME: what about the header - should be included or not? */ + if (rmw_serialized_message_resize (serialized_message, d->data.size ()) != RMW_RET_OK) { + ddsi_serdata_unref (dcmn); + *taken = false; + return RMW_RET_ERROR; + } + memcpy (serialized_message->buffer, d->data.data (), d->data.size ()); + serialized_message->buffer_length = d->data.size (); + ddsi_serdata_unref (dcmn); + *taken = true; + return RMW_RET_OK; + } + ddsi_serdata_unref (dcmn); + } + *taken = false; + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken) { return rmw_take_int (subscription, ros_message, taken, nullptr); @@ -727,16 +812,12 @@ extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription, extern "C" rmw_ret_t rmw_take_serialized_message (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken) { - (void) subscription; (void) serialized_message; (void) taken; - RMW_SET_ERROR_MSG ("rmw_take_serialized_message not implemented"); - return RMW_RET_ERROR; + return rmw_take_ser_int (subscription, serialized_message, taken, nullptr); } extern "C" rmw_ret_t rmw_take_serialized_message_with_info (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info) { - (void) subscription; (void) serialized_message; (void) taken; (void) message_info; - RMW_SET_ERROR_MSG ("rmw_take_serialized_message_with_info not implemented"); - return RMW_RET_ERROR; + return rmw_take_ser_int (subscription, serialized_message, taken, message_info); } ///////////////////////////////////////////////////////////////////////////////////////// @@ -1114,10 +1195,16 @@ static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_s RMW_SET_ERROR_MSG ("failed to create writer"); goto fail_writer; } + /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been + created earlier, but the two are equivalent, so this'll do */ + pub->sertopic = pub_st; if ((sub->subh = dds_create_reader (gcdds.ppant, subtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG ("failed to create reader"); goto fail_reader; } + /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been + created earlier, but the two are equivalent, so this'll do */ + sub->sertopic = sub_st; if ((sub->rdcondh = dds_create_readcondition (sub->subh, DDS_ANY_STATE)) < 0) { RMW_SET_ERROR_MSG ("failed to create readcondition"); goto fail_readcond; @@ -1127,6 +1214,8 @@ static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_s goto fail_instance_handle; } dds_delete_qos (qos); + dds_delete (subtopic); + dds_delete (pubtopic); cs->pub = pub; cs->sub = sub; @@ -1141,15 +1230,17 @@ static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_s fail_writer: dds_delete_qos (qos); fail_qos: - /* leak subtopic */ + dds_delete (subtopic); fail_subtopic: - /* leak pubtopic */ + dds_delete (pubtopic); fail_pubtopic: return RMW_RET_ERROR; } static void rmw_fini_cs (CddsCS *cs) { + ddsi_sertopic_unref (cs->sub->sertopic); + ddsi_sertopic_unref (cs->pub->sertopic); dds_delete (cs->sub->rdcondh); dds_delete (cs->sub->subh); dds_delete (cs->pub->pubh); diff --git a/rmw_cyclonedds_cpp/src/serdata.cpp b/rmw_cyclonedds_cpp/src/serdata.cpp index 19ebb83..b6381dc 100644 --- a/rmw_cyclonedds_cpp/src/serdata.cpp +++ b/rmw_cyclonedds_cpp/src/serdata.cpp @@ -165,6 +165,20 @@ static struct ddsi_serdata *serdata_rmw_from_sample (const struct ddsi_sertopic return d; } +struct ddsi_serdata *serdata_rmw_from_serialized_message (const struct ddsi_sertopic *topiccmn, const void *raw, size_t size) +{ + const struct sertopic_rmw *topic = static_cast (topiccmn); + struct serdata_rmw *d = rmw_serdata_new (topic, SDK_DATA); + /* FIXME: CDR padding in DDSI makes me do this to avoid reading beyond the bounds of the vector + when copying data to network. Should fix Cyclone to handle that more elegantly. */ + d->data.resize (size); + memcpy (d->data.data (), raw, size); + while (d->data.size () % 4) { + d->data.push_back (0); + } + return d; +} + static struct ddsi_serdata *serdata_rmw_to_topicless (const struct ddsi_serdata *dcmn) { const struct serdata_rmw *d = static_cast (dcmn);