Fix sertopic referencing

Use the function dds_create_topic_generic for creating topic, so that
the sertopic that is actually used is referenced in the publisher.

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
Dennis Potman 2020-03-03 15:57:52 +01:00 committed by eboasson
parent 49d4a51abf
commit 6162be51f9

View file

@ -165,7 +165,6 @@ struct CddsPublisher : CddsEntity
struct CddsSubscription : CddsEntity struct CddsSubscription : CddsEntity
{ {
dds_entity_t rdcondh; dds_entity_t rdcondh;
struct ddsi_sertopic * sertopic;
}; };
struct CddsCS struct CddsCS
@ -1263,10 +1262,20 @@ static CddsPublisher * create_cdds_publisher(
fqtopic_name.c_str(), type_support->typesupport_identifier, fqtopic_name.c_str(), type_support->typesupport_identifier,
create_message_type_support(type_support->data, type_support->typesupport_identifier), false, create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports)); rmw_cyclonedds_cpp::make_message_value_type(type_supports));
if ((topic = auto ddsi_st = static_cast<struct ddsi_sertopic *>(sertopic);
dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0) #ifdef DDS_HAS_CREATE_TOPIC_GENERIC
{ /* dds_create_topic_generic takes over ownership, the sertopic actually used
is returned as an uncounted reference that remains valid for the lifetime
of the topic (and thus the writer) */
topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr);
#else
/* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer
(incorrectly) retains a counted reference */
topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr);
#endif
if (topic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_st);
goto fail_topic; goto fail_topic;
} }
if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) {
@ -1280,7 +1289,7 @@ static CddsPublisher * create_cdds_publisher(
RMW_SET_ERROR_MSG("failed to get instance handle for writer"); RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle; goto fail_instance_handle;
} }
pub->sertopic = sertopic; pub->sertopic = ddsi_st;
dds_delete_qos(qos); dds_delete_qos(qos);
dds_delete(topic); dds_delete(topic);
return pub; return pub;
@ -1293,6 +1302,9 @@ fail_writer:
dds_delete_qos(qos); dds_delete_qos(qos);
fail_qos: fail_qos:
dds_delete(topic); dds_delete(topic);
#ifndef DDS_HAS_CREATE_TOPIC_GENERIC
ddsi_sertopic_unref(ddsi_st);
#endif
fail_topic: fail_topic:
delete pub; delete pub;
return nullptr; return nullptr;
@ -1448,7 +1460,9 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t *
if (dds_delete(pub->enth) < 0) { if (dds_delete(pub->enth) < 0) {
RMW_SET_ERROR_MSG("failed to delete writer"); RMW_SET_ERROR_MSG("failed to delete writer");
} }
#ifndef DDS_HAS_CREATE_TOPIC_GENERIC
ddsi_sertopic_unref(pub->sertopic); ddsi_sertopic_unref(pub->sertopic);
#endif
delete pub; delete pub;
} }
rmw_free(const_cast<char *>(publisher->topic_name)); rmw_free(const_cast<char *>(publisher->topic_name));
@ -1485,10 +1499,23 @@ static CddsSubscription * create_cdds_subscription(
fqtopic_name.c_str(), type_support->typesupport_identifier, fqtopic_name.c_str(), type_support->typesupport_identifier,
create_message_type_support(type_support->data, type_support->typesupport_identifier), false, create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports)); rmw_cyclonedds_cpp::make_message_value_type(type_supports));
if ((topic = auto ddsi_st = static_cast<struct ddsi_sertopic *>(sertopic);
dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0) #ifdef DDS_HAS_CREATE_TOPIC_GENERIC
{ /* dds_create_topic_generic takes over ownership, the sertopic actually used
is returned as an uncounted reference that remains valid for the lifetime
of the topic (and thus the writer) */
topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr);
#else
/* dds_create_topic_arbitrary never takes ownership of sertopic, we don't
need it in the RMW layer */
topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr);
if (topic > 0) {
ddsi_sertopic_unref(ddsi_st);
}
#endif
if (topic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_st);
goto fail_topic; goto fail_topic;
} }
if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) {
@ -1502,7 +1529,6 @@ static CddsSubscription * create_cdds_subscription(
RMW_SET_ERROR_MSG("failed to create readcondition"); RMW_SET_ERROR_MSG("failed to create readcondition");
goto fail_readcond; goto fail_readcond;
} }
sub->sertopic = sertopic;
dds_delete_qos(qos); dds_delete_qos(qos);
dds_delete(topic); dds_delete(topic);
return sub; return sub;
@ -1633,7 +1659,6 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio
if (dds_delete(sub->enth) < 0) { if (dds_delete(sub->enth) < 0) {
RMW_SET_ERROR_MSG("failed to delete reader"); RMW_SET_ERROR_MSG("failed to delete reader");
} }
ddsi_sertopic_unref(sub->sertopic);
delete sub; delete sub;
} }
rmw_free(const_cast<char *>(subscription->topic_name)); rmw_free(const_cast<char *>(subscription->topic_name));
@ -1666,9 +1691,7 @@ static rmw_ret_t rmw_take_int(
dds_time_t tnow = dds_time(); dds_time_t tnow = dds_time();
dds_time_t dt = tnow - info.source_timestamp; dds_time_t dt = tnow - info.source_timestamp;
if (dt >= DDS_MSECS(REPORT_LATE_MESSAGES)) { if (dt >= DDS_MSECS(REPORT_LATE_MESSAGES)) {
fprintf( fprintf(stderr, "** sample in history for %.fms\n", static_cast<double>(dt) / 1e6);
stderr, "** %s sample in history for %.fms\n", sub->sertopic->name,
static_cast<double>(dt) / 1e6);
} }
#endif #endif
*taken = true; *taken = true;
@ -2368,9 +2391,8 @@ extern "C" rmw_ret_t rmw_take_response(
dds_time_t dtreq = tnow - info->reqtime[seq]; dds_time_t dtreq = tnow - info->reqtime[seq];
if (dtreq > DDS_MSECS(REPORT_LATE_MESSAGES) || dtresp > DDS_MSECS(REPORT_LATE_MESSAGES)) { if (dtreq > DDS_MSECS(REPORT_LATE_MESSAGES) || dtresp > DDS_MSECS(REPORT_LATE_MESSAGES)) {
fprintf( fprintf(
stderr, "** %s response time %.fms; response in history for %.fms\n", stderr, "** response time %.fms; response in history for %.fms\n",
info->client.sub->sertopic->name, static_cast<double>(dtreq) / 1e6, static_cast<double>(dtreq) / 1e6, static_cast<double>(dtresp) / 1e6);
static_cast<double>(dtresp) / 1e6);
} }
info->reqtime.erase(seq); info->reqtime.erase(seq);
} }
@ -2388,9 +2410,7 @@ static void check_for_blocked_requests(CddsClient & client)
for (auto const & r : client.reqtime) { for (auto const & r : client.reqtime) {
dds_time_t dt = tnow - r.second; dds_time_t dt = tnow - r.second;
if (dt > DDS_SECS(1)) { if (dt > DDS_SECS(1)) {
fprintf( fprintf(stderr, "** already waiting for %.fms\n", static_cast<double>(dt) / 1e6);
stderr, "** %s already waiting for %.fms\n", client.client.sub->sertopic->name,
static_cast<double>(dt) / 1e6);
} }
} }
} }
@ -2536,23 +2556,44 @@ static rmw_ret_t rmw_init_cs(
auto pub_st = create_sertopic( auto pub_st = create_sertopic(
pubtopic_name.c_str(), type_support->typesupport_identifier, pub_type_support, true, pubtopic_name.c_str(), type_support->typesupport_identifier, pub_type_support, true,
std::move(pub_msg_ts)); std::move(pub_msg_ts));
auto ddsi_pub_st = static_cast<struct ddsi_sertopic *>(pub_st);
auto sub_st = create_sertopic( auto sub_st = create_sertopic(
subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true, subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true,
std::move(sub_msg_ts)); std::move(sub_msg_ts));
auto ddsi_sub_st = static_cast<struct ddsi_sertopic *>(sub_st);
dds_qos_t * qos; #ifdef DDS_HAS_CREATE_TOPIC_GENERIC
if ((pubtopic = /* dds_create_topic_generic takes over ownership, the sertopic actually used
dds_create_topic_arbitrary(node_impl->enth, pub_st, nullptr, nullptr, nullptr)) < 0) is returned as an uncounted reference that remains valid for the lifetime
{ of the topic (and thus the writer) */
pubtopic = dds_create_topic_generic(node_impl->enth, &ddsi_pub_st, nullptr, nullptr, nullptr);
#else
/* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer
(incorrectly) retains a counted reference */
pubtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_pub_st, nullptr, nullptr, nullptr);
#endif
if (pubtopic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_pub_st);
goto fail_pubtopic; goto fail_pubtopic;
} }
if ((subtopic =
dds_create_topic_arbitrary(node_impl->enth, sub_st, nullptr, nullptr, nullptr)) < 0) #ifdef DDS_HAS_CREATE_TOPIC_GENERIC
{ subtopic = dds_create_topic_generic(node_impl->enth, &ddsi_sub_st, nullptr, nullptr, nullptr);
#else
/* dds_create_topic_arbitrary never takes ownership of sertopic, we don't
need it in the RMW layer */
subtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_sub_st, nullptr, nullptr, nullptr);
if (subtopic > 0) {
ddsi_sertopic_unref(ddsi_sub_st);
}
#endif
if (subtopic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_sub_st);
goto fail_subtopic; goto fail_subtopic;
} }
dds_qos_t * qos;
if ((qos = dds_create_qos()) == nullptr) { if ((qos = dds_create_qos()) == nullptr) {
goto fail_qos; goto fail_qos;
} }
@ -2562,12 +2603,11 @@ static rmw_ret_t rmw_init_cs(
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
} }
pub->sertopic = pub_st; pub->sertopic = ddsi_pub_st;
if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader"); RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader; goto fail_reader;
} }
sub->sertopic = sub_st;
if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) { if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) {
RMW_SET_ERROR_MSG("failed to create readcondition"); RMW_SET_ERROR_MSG("failed to create readcondition");
goto fail_readcond; goto fail_readcond;
@ -2596,14 +2636,18 @@ fail_qos:
dds_delete(subtopic); dds_delete(subtopic);
fail_subtopic: fail_subtopic:
dds_delete(pubtopic); dds_delete(pubtopic);
#ifndef DDS_HAS_CREATE_TOPIC_GENERIC
ddsi_sertopic_unref(ddsi_pub_st);
#endif
fail_pubtopic: fail_pubtopic:
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
static void rmw_fini_cs(CddsCS * cs) static void rmw_fini_cs(CddsCS * cs)
{ {
ddsi_sertopic_unref(cs->sub->sertopic); #ifndef DDS_HAS_CREATE_TOPIC_GENERIC
ddsi_sertopic_unref(cs->pub->sertopic); ddsi_sertopic_unref(cs->pub->sertopic);
#endif
dds_delete(cs->sub->rdcondh); dds_delete(cs->sub->rdcondh);
dds_delete(cs->sub->enth); dds_delete(cs->sub->enth);
dds_delete(cs->pub->enth); dds_delete(cs->pub->enth);