Fix leak in client/service topic error handling

And refactor to reduce the amount of code duplication.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-03-05 13:56:17 +01:00 committed by eboasson
parent 6162be51f9
commit b4c0620b5b

View file

@ -928,6 +928,63 @@ extern "C" rmw_ret_t rmw_deserialize(
return ok ? RMW_RET_OK : RMW_RET_ERROR; return ok ? RMW_RET_OK : RMW_RET_ERROR;
} }
/////////////////////////////////////////////////////////////////////////////////////////
/////////// ///////////
/////////// TOPIC CREATION ///////////
/////////// ///////////
/////////////////////////////////////////////////////////////////////////////////////////
/* Publications need the sertopic that DDSI uses for the topic when publishing a
serialized message. With the old ("arbitrary") interface of Cyclone, one doesn't know
the sertopic that is actually used because that may be the one that was provided in the
call to dds_create_topic_arbitrary(), but it may also be one that was introduced by a
preceding call to create the same topic.
There is no way of discovering which case it is, and there is no way of getting access
to the correct sertopic. The best one can do is to keep using one provided when
creating the topic -- and fortunately using the wrong sertopic has surprisingly few
nasty side-effects, but it still wrong.
Because the caller retains ownership, so this is easy, but it does require dropping the
reference when cleaning up.
The new ("generic") interface instead takes over the ownership of the reference iff it
succeeds and it returns a non-counted reference to the sertopic actually used. The
lifetime of the reference is at least as long as the lifetime of the DDS topic exists;
and the topic's lifetime is at least that of the readers/writers using it. This
reference can therefore safely be used. */
static dds_entity_t create_topic(
dds_entity_t pp, struct ddsi_sertopic * sertopic,
struct ddsi_sertopic ** stact)
{
dds_entity_t tp;
#ifdef DDS_HAS_CREATE_TOPIC_GENERIC
tp = dds_create_topic_generic(pp, &sertopic, nullptr, nullptr, nullptr);
#else
tp = dds_create_topic_arbitrary(pp, sertopic, nullptr, nullptr, nullptr);
#endif
if (tp < 0) {
ddsi_sertopic_unref(sertopic);
} else {
if (stact) {
*stact = sertopic;
}
}
return tp;
}
static dds_entity_t create_topic(dds_entity_t pp, struct ddsi_sertopic * sertopic)
{
dds_entity_t tp = create_topic(pp, sertopic, nullptr);
#ifndef DDS_HAS_CREATE_TOPIC_GENERIC
if (tp > 0) {
ddsi_sertopic_unref(sertopic);
}
#endif
return tp;
}
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
/////////// /////////// /////////// ///////////
/////////// PUBLICATIONS /////////// /////////// PUBLICATIONS ///////////
@ -1262,20 +1319,10 @@ static CddsPublisher * create_cdds_publisher(
fqtopic_name.c_str(), type_support->typesupport_identifier, fqtopic_name.c_str(), type_support->typesupport_identifier,
create_message_type_support(type_support->data, type_support->typesupport_identifier), false, create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports)); rmw_cyclonedds_cpp::make_message_value_type(type_supports));
auto ddsi_st = static_cast<struct ddsi_sertopic *>(sertopic); struct ddsi_sertopic * stact;
#ifdef DDS_HAS_CREATE_TOPIC_GENERIC topic = create_topic(node_impl->enth, sertopic, &stact);
/* dds_create_topic_generic takes over ownership, the sertopic actually used
is returned as an uncounted reference that remains valid for the lifetime
of the topic (and thus the writer) */
topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr);
#else
/* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer
(incorrectly) retains a counted reference */
topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr);
#endif
if (topic < 0) { if (topic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_st);
goto fail_topic; goto fail_topic;
} }
if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) {
@ -1289,7 +1336,7 @@ static CddsPublisher * create_cdds_publisher(
RMW_SET_ERROR_MSG("failed to get instance handle for writer"); RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle; goto fail_instance_handle;
} }
pub->sertopic = ddsi_st; pub->sertopic = stact;
dds_delete_qos(qos); dds_delete_qos(qos);
dds_delete(topic); dds_delete(topic);
return pub; return pub;
@ -1303,7 +1350,7 @@ fail_writer:
fail_qos: fail_qos:
dds_delete(topic); dds_delete(topic);
#ifndef DDS_HAS_CREATE_TOPIC_GENERIC #ifndef DDS_HAS_CREATE_TOPIC_GENERIC
ddsi_sertopic_unref(ddsi_st); ddsi_sertopic_unref(stact);
#endif #endif
fail_topic: fail_topic:
delete pub; delete pub;
@ -1499,23 +1546,9 @@ static CddsSubscription * create_cdds_subscription(
fqtopic_name.c_str(), type_support->typesupport_identifier, fqtopic_name.c_str(), type_support->typesupport_identifier,
create_message_type_support(type_support->data, type_support->typesupport_identifier), false, create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports)); rmw_cyclonedds_cpp::make_message_value_type(type_supports));
auto ddsi_st = static_cast<struct ddsi_sertopic *>(sertopic); topic = create_topic(node_impl->enth, sertopic);
#ifdef DDS_HAS_CREATE_TOPIC_GENERIC
/* dds_create_topic_generic takes over ownership, the sertopic actually used
is returned as an uncounted reference that remains valid for the lifetime
of the topic (and thus the writer) */
topic = dds_create_topic_generic(node_impl->enth, &ddsi_st, nullptr, nullptr, nullptr);
#else
/* dds_create_topic_arbitrary never takes ownership of sertopic, we don't
need it in the RMW layer */
topic = dds_create_topic_arbitrary(node_impl->enth, ddsi_st, nullptr, nullptr, nullptr);
if (topic > 0) {
ddsi_sertopic_unref(ddsi_st);
}
#endif
if (topic < 0) { if (topic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_st);
goto fail_topic; goto fail_topic;
} }
if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) {
@ -2552,45 +2585,24 @@ static rmw_ret_t rmw_init_cs(
RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "***********"); RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "***********");
dds_entity_t pubtopic, subtopic; dds_entity_t pubtopic, subtopic;
struct sertopic_rmw * pub_st, * sub_st;
auto pub_st = create_sertopic( pub_st = create_sertopic(
pubtopic_name.c_str(), type_support->typesupport_identifier, pub_type_support, true, pubtopic_name.c_str(), type_support->typesupport_identifier, pub_type_support, true,
std::move(pub_msg_ts)); std::move(pub_msg_ts));
auto ddsi_pub_st = static_cast<struct ddsi_sertopic *>(pub_st); struct ddsi_sertopic * pub_stact;
auto sub_st = create_sertopic( pubtopic = create_topic(node_impl->enth, pub_st, &pub_stact);
subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true,
std::move(sub_msg_ts));
auto ddsi_sub_st = static_cast<struct ddsi_sertopic *>(sub_st);
#ifdef DDS_HAS_CREATE_TOPIC_GENERIC
/* dds_create_topic_generic takes over ownership, the sertopic actually used
is returned as an uncounted reference that remains valid for the lifetime
of the topic (and thus the writer) */
pubtopic = dds_create_topic_generic(node_impl->enth, &ddsi_pub_st, nullptr, nullptr, nullptr);
#else
/* dds_create_topic_arbitrary never takes ownership of sertopic, the RMW layer
(incorrectly) retains a counted reference */
pubtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_pub_st, nullptr, nullptr, nullptr);
#endif
if (pubtopic < 0) { if (pubtopic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_pub_st);
goto fail_pubtopic; goto fail_pubtopic;
} }
#ifdef DDS_HAS_CREATE_TOPIC_GENERIC sub_st = create_sertopic(
subtopic = dds_create_topic_generic(node_impl->enth, &ddsi_sub_st, nullptr, nullptr, nullptr); subtopic_name.c_str(), type_support->typesupport_identifier, sub_type_support, true,
#else std::move(sub_msg_ts));
/* dds_create_topic_arbitrary never takes ownership of sertopic, we don't subtopic = create_topic(node_impl->enth, sub_st);
need it in the RMW layer */
subtopic = dds_create_topic_arbitrary(node_impl->enth, ddsi_sub_st, nullptr, nullptr, nullptr);
if (subtopic > 0) {
ddsi_sertopic_unref(ddsi_sub_st);
}
#endif
if (subtopic < 0) { if (subtopic < 0) {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
ddsi_sertopic_unref(ddsi_sub_st);
goto fail_subtopic; goto fail_subtopic;
} }
dds_qos_t * qos; dds_qos_t * qos;
@ -2603,7 +2615,7 @@ static rmw_ret_t rmw_init_cs(
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
} }
pub->sertopic = ddsi_pub_st; pub->sertopic = pub_stact;
if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader"); RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader; goto fail_reader;
@ -2637,7 +2649,7 @@ fail_qos:
fail_subtopic: fail_subtopic:
dds_delete(pubtopic); dds_delete(pubtopic);
#ifndef DDS_HAS_CREATE_TOPIC_GENERIC #ifndef DDS_HAS_CREATE_TOPIC_GENERIC
ddsi_sertopic_unref(ddsi_pub_st); ddsi_sertopic_unref(pub_stact);
#endif #endif
fail_pubtopic: fail_pubtopic:
return RMW_RET_ERROR; return RMW_RET_ERROR;