diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 3dc1be2..dd09641 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -928,6 +928,63 @@ extern "C" rmw_ret_t rmw_deserialize( return ok ? RMW_RET_OK : RMW_RET_ERROR; } +///////////////////////////////////////////////////////////////////////////////////////// +/////////// /////////// +/////////// TOPIC CREATION /////////// +/////////// /////////// +///////////////////////////////////////////////////////////////////////////////////////// + +/* Publications need the sertopic that DDSI uses for the topic when publishing a + serialized message. With the old ("arbitrary") interface of Cyclone, one doesn't know + the sertopic that is actually used because that may be the one that was provided in the + call to dds_create_topic_arbitrary(), but it may also be one that was introduced by a + preceding call to create the same topic. + + There is no way of discovering which case it is, and there is no way of getting access + to the correct sertopic. The best one can do is to keep using one provided when + creating the topic -- and fortunately using the wrong sertopic has surprisingly few + nasty side-effects, but it still wrong. + + Because the caller retains ownership, so this is easy, but it does require dropping the + reference when cleaning up. + + The new ("generic") interface instead takes over the ownership of the reference iff it + succeeds and it returns a non-counted reference to the sertopic actually used. The + lifetime of the reference is at least as long as the lifetime of the DDS topic exists; + and the topic's lifetime is at least that of the readers/writers using it. This + reference can therefore safely be used. */ + +static dds_entity_t create_topic( + dds_entity_t pp, struct ddsi_sertopic * sertopic, + struct ddsi_sertopic ** stact) +{ + dds_entity_t tp; +#ifdef DDS_HAS_CREATE_TOPIC_GENERIC + tp = dds_create_topic_generic(pp, &sertopic, nullptr, nullptr, nullptr); +#else + tp = dds_create_topic_arbitrary(pp, sertopic, nullptr, nullptr, nullptr); +#endif + if (tp < 0) { + ddsi_sertopic_unref(sertopic); + } else { + if (stact) { + *stact = sertopic; + } + } + return tp; +} + +static dds_entity_t create_topic(dds_entity_t pp, struct ddsi_sertopic * sertopic) +{ + dds_entity_t tp = create_topic(pp, sertopic, nullptr); +#ifndef DDS_HAS_CREATE_TOPIC_GENERIC + if (tp > 0) { + ddsi_sertopic_unref(sertopic); + } +#endif + return tp; +} + ///////////////////////////////////////////////////////////////////////////////////////// /////////// /////////// /////////// PUBLICATIONS /////////// @@ -1262,20 +1319,10 @@ static CddsPublisher * create_cdds_publisher( fqtopic_name.c_str(), type_support->typesupport_identifier, create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); - auto ddsi_st = static_cast(sertopic); -#ifdef DDS_HAS_CREATE_TOPIC_GENERIC - /* dds_create_topic_generic takes over ownership, the sertopic actually used - is returned as an uncounted reference that remains valid for the lifetime - of the topic (and thus the writer) */ - topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr); -#else - /* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer - (incorrectly) retains a counted reference */ - topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr); -#endif + struct ddsi_sertopic * stact; + topic = create_topic(node_impl->enth, sertopic, &stact); if (topic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); - ddsi_sertopic_unref(ddsi_st); goto fail_topic; } if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { @@ -1289,7 +1336,7 @@ static CddsPublisher * create_cdds_publisher( RMW_SET_ERROR_MSG("failed to get instance handle for writer"); goto fail_instance_handle; } - pub->sertopic = ddsi_st; + pub->sertopic = stact; dds_delete_qos(qos); dds_delete(topic); return pub; @@ -1303,7 +1350,7 @@ fail_writer: fail_qos: dds_delete(topic); #ifndef DDS_HAS_CREATE_TOPIC_GENERIC - ddsi_sertopic_unref(ddsi_st); + ddsi_sertopic_unref(stact); #endif fail_topic: delete pub; @@ -1499,23 +1546,9 @@ static CddsSubscription * create_cdds_subscription( fqtopic_name.c_str(), type_support->typesupport_identifier, create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); - auto ddsi_st = static_cast(sertopic); -#ifdef DDS_HAS_CREATE_TOPIC_GENERIC - /* dds_create_topic_generic takes over ownership, the sertopic actually used - is returned as an uncounted reference that remains valid for the lifetime - of the topic (and thus the writer) */ - topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr); -#else - /* dds_create_topic_arbitrary never takes ownership of sertopic, we don't - need it in the RMW layer */ - topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr); - if (topic > 0) { - ddsi_sertopic_unref(ddsi_st); - } -#endif + topic = create_topic(node_impl->enth, sertopic); if (topic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); - ddsi_sertopic_unref(ddsi_st); goto fail_topic; } if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { @@ -2552,45 +2585,24 @@ static rmw_ret_t rmw_init_cs( RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "***********"); dds_entity_t pubtopic, subtopic; + struct sertopic_rmw * pub_st, * sub_st; - auto pub_st = create_sertopic( + pub_st = create_sertopic( pubtopic_name.c_str(), type_support->typesupport_identifier, pub_type_support, true, std::move(pub_msg_ts)); - auto ddsi_pub_st = static_cast(pub_st); - auto sub_st = create_sertopic( - subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true, - std::move(sub_msg_ts)); - auto ddsi_sub_st = static_cast(sub_st); - -#ifdef DDS_HAS_CREATE_TOPIC_GENERIC - /* dds_create_topic_generic takes over ownership, the sertopic actually used - is returned as an uncounted reference that remains valid for the lifetime - of the topic (and thus the writer) */ - pubtopic = dds_create_topic_generic(node_impl->enth, &ddsi_pub_st, nullptr, nullptr, nullptr); -#else - /* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer - (incorrectly) retains a counted reference */ - pubtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_pub_st, nullptr, nullptr, nullptr); -#endif + struct ddsi_sertopic * pub_stact; + pubtopic = create_topic(node_impl->enth, pub_st, &pub_stact); if (pubtopic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); - ddsi_sertopic_unref(ddsi_pub_st); goto fail_pubtopic; } -#ifdef DDS_HAS_CREATE_TOPIC_GENERIC - subtopic = dds_create_topic_generic(node_impl->enth, &ddsi_sub_st, nullptr, nullptr, nullptr); -#else - /* dds_create_topic_arbitrary never takes ownership of sertopic, we don't - need it in the RMW layer */ - subtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_sub_st, nullptr, nullptr, nullptr); - if (subtopic > 0) { - ddsi_sertopic_unref(ddsi_sub_st); - } -#endif + sub_st = create_sertopic( + subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true, + std::move(sub_msg_ts)); + subtopic = create_topic(node_impl->enth, sub_st); if (subtopic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); - ddsi_sertopic_unref(ddsi_sub_st); goto fail_subtopic; } dds_qos_t * qos; @@ -2603,7 +2615,7 @@ static rmw_ret_t rmw_init_cs( RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } - pub->sertopic = ddsi_pub_st; + pub->sertopic = pub_stact; if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; @@ -2637,7 +2649,7 @@ fail_qos: fail_subtopic: dds_delete(pubtopic); #ifndef DDS_HAS_CREATE_TOPIC_GENERIC - ddsi_sertopic_unref(ddsi_pub_st); + ddsi_sertopic_unref(pub_stact); #endif fail_pubtopic: return RMW_RET_ERROR;