From 6162be51f9c61da84812ee160ead78dbcb4a0e80 Mon Sep 17 00:00:00 2001 From: Dennis Potman Date: Tue, 3 Mar 2020 15:57:52 +0100 Subject: [PATCH] Fix sertopic referencing Use the function dds_create_topic_generic for creating topic, so that the sertopic that is actually used is referenced in the publisher. Signed-off-by: Dennis Potman --- rmw_cyclonedds_cpp/src/rmw_node.cpp | 102 ++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 29 deletions(-) diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 1f8797e..3dc1be2 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -165,7 +165,6 @@ struct CddsPublisher : CddsEntity struct CddsSubscription : CddsEntity { dds_entity_t rdcondh; - struct ddsi_sertopic * sertopic; }; struct CddsCS @@ -1263,10 +1262,20 @@ static CddsPublisher * create_cdds_publisher( fqtopic_name.c_str(), type_support->typesupport_identifier, create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); - if ((topic = - dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0) - { + auto ddsi_st = static_cast(sertopic); +#ifdef DDS_HAS_CREATE_TOPIC_GENERIC + /* dds_create_topic_generic takes over ownership, the sertopic actually used + is returned as an uncounted reference that remains valid for the lifetime + of the topic (and thus the writer) */ + topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr); +#else + /* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer + (incorrectly) retains a counted reference */ + topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr); +#endif + if (topic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); + ddsi_sertopic_unref(ddsi_st); goto fail_topic; } if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { @@ -1280,7 +1289,7 @@ static CddsPublisher * create_cdds_publisher( RMW_SET_ERROR_MSG("failed to get instance handle for writer"); goto fail_instance_handle; } - pub->sertopic = sertopic; + pub->sertopic = ddsi_st; dds_delete_qos(qos); dds_delete(topic); return pub; @@ -1293,6 +1302,9 @@ fail_writer: dds_delete_qos(qos); fail_qos: dds_delete(topic); +#ifndef DDS_HAS_CREATE_TOPIC_GENERIC + ddsi_sertopic_unref(ddsi_st); +#endif fail_topic: delete pub; return nullptr; @@ -1448,7 +1460,9 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * if (dds_delete(pub->enth) < 0) { RMW_SET_ERROR_MSG("failed to delete writer"); } +#ifndef DDS_HAS_CREATE_TOPIC_GENERIC ddsi_sertopic_unref(pub->sertopic); +#endif delete pub; } rmw_free(const_cast(publisher->topic_name)); @@ -1485,10 +1499,23 @@ static CddsSubscription * create_cdds_subscription( fqtopic_name.c_str(), type_support->typesupport_identifier, create_message_type_support(type_support->data, type_support->typesupport_identifier), false, rmw_cyclonedds_cpp::make_message_value_type(type_supports)); - if ((topic = - dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0) - { + auto ddsi_st = static_cast(sertopic); +#ifdef DDS_HAS_CREATE_TOPIC_GENERIC + /* dds_create_topic_generic takes over ownership, the sertopic actually used + is returned as an uncounted reference that remains valid for the lifetime + of the topic (and thus the writer) */ + topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr); +#else + /* dds_create_topic_arbitrary never takes ownership of sertopic, we don't + need it in the RMW layer */ + topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr); + if (topic > 0) { + ddsi_sertopic_unref(ddsi_st); + } +#endif + if (topic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); + ddsi_sertopic_unref(ddsi_st); goto fail_topic; } if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { @@ -1502,7 +1529,6 @@ static CddsSubscription * create_cdds_subscription( RMW_SET_ERROR_MSG("failed to create readcondition"); goto fail_readcond; } - sub->sertopic = sertopic; dds_delete_qos(qos); dds_delete(topic); return sub; @@ -1633,7 +1659,6 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio if (dds_delete(sub->enth) < 0) { RMW_SET_ERROR_MSG("failed to delete reader"); } - ddsi_sertopic_unref(sub->sertopic); delete sub; } rmw_free(const_cast(subscription->topic_name)); @@ -1666,9 +1691,7 @@ static rmw_ret_t rmw_take_int( dds_time_t tnow = dds_time(); dds_time_t dt = tnow - info.source_timestamp; if (dt >= DDS_MSECS(REPORT_LATE_MESSAGES)) { - fprintf( - stderr, "** %s sample in history for %.fms\n", sub->sertopic->name, - static_cast(dt) / 1e6); + fprintf(stderr, "** sample in history for %.fms\n", static_cast(dt) / 1e6); } #endif *taken = true; @@ -2368,9 +2391,8 @@ extern "C" rmw_ret_t rmw_take_response( dds_time_t dtreq = tnow - info->reqtime[seq]; if (dtreq > DDS_MSECS(REPORT_LATE_MESSAGES) || dtresp > DDS_MSECS(REPORT_LATE_MESSAGES)) { fprintf( - stderr, "** %s response time %.fms; response in history for %.fms\n", - info->client.sub->sertopic->name, static_cast(dtreq) / 1e6, - static_cast(dtresp) / 1e6); + stderr, "** response time %.fms; response in history for %.fms\n", + static_cast(dtreq) / 1e6, static_cast(dtresp) / 1e6); } info->reqtime.erase(seq); } @@ -2388,9 +2410,7 @@ static void check_for_blocked_requests(CddsClient & client) for (auto const & r : client.reqtime) { dds_time_t dt = tnow - r.second; if (dt > DDS_SECS(1)) { - fprintf( - stderr, "** %s already waiting for %.fms\n", client.client.sub->sertopic->name, - static_cast(dt) / 1e6); + fprintf(stderr, "** already waiting for %.fms\n", static_cast(dt) / 1e6); } } } @@ -2536,23 +2556,44 @@ static rmw_ret_t rmw_init_cs( auto pub_st = create_sertopic( pubtopic_name.c_str(), type_support->typesupport_identifier, pub_type_support, true, std::move(pub_msg_ts)); + auto ddsi_pub_st = static_cast(pub_st); auto sub_st = create_sertopic( subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true, std::move(sub_msg_ts)); + auto ddsi_sub_st = static_cast(sub_st); - dds_qos_t * qos; - if ((pubtopic = - dds_create_topic_arbitrary(node_impl->enth, pub_st, nullptr, nullptr, nullptr)) < 0) - { +#ifdef DDS_HAS_CREATE_TOPIC_GENERIC + /* dds_create_topic_generic takes over ownership, the sertopic actually used + is returned as an uncounted reference that remains valid for the lifetime + of the topic (and thus the writer) */ + pubtopic = dds_create_topic_generic(node_impl->enth, &ddsi_pub_st, nullptr, nullptr, nullptr); +#else + /* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer + (incorrectly) retains a counted reference */ + pubtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_pub_st, nullptr, nullptr, nullptr); +#endif + if (pubtopic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); + ddsi_sertopic_unref(ddsi_pub_st); goto fail_pubtopic; } - if ((subtopic = - dds_create_topic_arbitrary(node_impl->enth, sub_st, nullptr, nullptr, nullptr)) < 0) - { + +#ifdef DDS_HAS_CREATE_TOPIC_GENERIC + subtopic = dds_create_topic_generic(node_impl->enth, &ddsi_sub_st, nullptr, nullptr, nullptr); +#else + /* dds_create_topic_arbitrary never takes ownership of sertopic, we don't + need it in the RMW layer */ + subtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_sub_st, nullptr, nullptr, nullptr); + if (subtopic > 0) { + ddsi_sertopic_unref(ddsi_sub_st); + } +#endif + if (subtopic < 0) { RMW_SET_ERROR_MSG("failed to create topic"); + ddsi_sertopic_unref(ddsi_sub_st); goto fail_subtopic; } + dds_qos_t * qos; if ((qos = dds_create_qos()) == nullptr) { goto fail_qos; } @@ -2562,12 +2603,11 @@ static rmw_ret_t rmw_init_cs( RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } - pub->sertopic = pub_st; + pub->sertopic = ddsi_pub_st; if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } - sub->sertopic = sub_st; if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) { RMW_SET_ERROR_MSG("failed to create readcondition"); goto fail_readcond; @@ -2596,14 +2636,18 @@ fail_qos: dds_delete(subtopic); fail_subtopic: dds_delete(pubtopic); +#ifndef DDS_HAS_CREATE_TOPIC_GENERIC + ddsi_sertopic_unref(ddsi_pub_st); +#endif fail_pubtopic: return RMW_RET_ERROR; } static void rmw_fini_cs(CddsCS * cs) { - ddsi_sertopic_unref(cs->sub->sertopic); +#ifndef DDS_HAS_CREATE_TOPIC_GENERIC ddsi_sertopic_unref(cs->pub->sertopic); +#endif dds_delete(cs->sub->rdcondh); dds_delete(cs->sub->enth); dds_delete(cs->pub->enth);