Support for deadline, lifespan and liveliness qos (#88)

* Events and liveliness/lifespan/deadline qos support

This commit adds support for liveliness, lifespan and deadline missed
qos in rmw, and it adds event support in rmw_wait that is required for
these qos policies to work correctly.

* Removed redundant empty check in rmw_wait, fix duration 0 vs infinity for qos getter and setter

* Disabled auto-dispose so that deadline-missed on reader is still triggered when a writer becomes not-alive and unregisters itself
This commit is contained in:
Dennis Potman 2020-01-31 14:53:20 +01:00 committed by GitHub
parent ee35a6c41a
commit a771f917f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include <mutex> #include <mutex>
#include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <algorithm> #include <algorithm>
#include <map> #include <map>
@ -132,9 +133,13 @@ struct builtin_readers
dds_entity_t rds[sizeof(builtin_topics) / sizeof(builtin_topics[0])]; dds_entity_t rds[sizeof(builtin_topics) / sizeof(builtin_topics[0])];
}; };
struct CddsNode struct CddsEntity
{
dds_entity_t enth;
};
struct CddsNode : CddsEntity
{ {
dds_entity_t pp;
dds_entity_t pub; dds_entity_t pub;
dds_entity_t sub; dds_entity_t sub;
rmw_guard_condition_t * graph_guard_condition; rmw_guard_condition_t * graph_guard_condition;
@ -146,16 +151,14 @@ struct CddsNode
dds_domainid_t domain_id; dds_domainid_t domain_id;
}; };
struct CddsPublisher struct CddsPublisher : CddsEntity
{ {
dds_entity_t pubh;
dds_instance_handle_t pubiid; dds_instance_handle_t pubiid;
struct ddsi_sertopic * sertopic; struct ddsi_sertopic * sertopic;
}; };
struct CddsSubscription struct CddsSubscription : CddsEntity
{ {
dds_entity_t subh;
dds_entity_t rdcondh; dds_entity_t rdcondh;
struct ddsi_sertopic * sertopic; struct ddsi_sertopic * sertopic;
}; };
@ -187,6 +190,11 @@ struct CddsGuardCondition
dds_entity_t gcondh; dds_entity_t gcondh;
}; };
struct CddsEvent : CddsEntity
{
rmw_event_type_t event_type;
};
struct CddsWaitset struct CddsWaitset
{ {
dds_entity_t waitseth; dds_entity_t waitseth;
@ -200,6 +208,7 @@ struct CddsWaitset
std::vector<CddsGuardCondition *> gcs; std::vector<CddsGuardCondition *> gcs;
std::vector<CddsClient *> cls; std::vector<CddsClient *> cls;
std::vector<CddsService *> srvs; std::vector<CddsService *> srvs;
std::vector<CddsEvent> evs;
}; };
#if SUPPORT_LOCALHOST #if SUPPORT_LOCALHOST
@ -723,7 +732,7 @@ extern "C" rmw_node_t * rmw_create_node(
if (!(graph_guard_condition = rmw_create_guard_condition(context))) { if (!(graph_guard_condition = rmw_create_guard_condition(context))) {
goto fail_ggc; goto fail_ggc;
} }
node_impl->pp = pp; node_impl->enth = pp;
node_impl->pub = pub; node_impl->pub = pub;
node_impl->sub = sub; node_impl->sub = sub;
node_impl->graph_guard_condition = graph_guard_condition; node_impl->graph_guard_condition = graph_guard_condition;
@ -809,7 +818,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node)
/* prevent race with rmw_create_node (see there) */ /* prevent race with rmw_create_node (see there) */
std::lock_guard<std::mutex> lock(gcdds.domains_lock); std::lock_guard<std::mutex> lock(gcdds.domains_lock);
#endif #endif
if (dds_delete(node_impl->pp) < 0) { if (dds_delete(node_impl->enth) < 0) {
RMW_SET_ERROR_MSG("failed to destroy DDS participant"); RMW_SET_ERROR_MSG("failed to destroy DDS participant");
result_ret = RMW_RET_ERROR; result_ret = RMW_RET_ERROR;
} }
@ -927,7 +936,7 @@ extern "C" rmw_ret_t rmw_publish(
RET_NULL(ros_message); RET_NULL(ros_message);
auto pub = static_cast<CddsPublisher *>(publisher->data); auto pub = static_cast<CddsPublisher *>(publisher->data);
assert(pub); assert(pub);
if (dds_write(pub->pubh, ros_message) >= 0) { if (dds_write(pub->enth, ros_message) >= 0) {
return RMW_RET_OK; return RMW_RET_OK;
} else { } else {
RMW_SET_ERROR_MSG("failed to publish data"); RMW_SET_ERROR_MSG("failed to publish data");
@ -946,7 +955,7 @@ extern "C" rmw_ret_t rmw_publish_serialized_message(
struct ddsi_serdata * d = serdata_rmw_from_serialized_message(pub->sertopic, struct ddsi_serdata * d = serdata_rmw_from_serialized_message(pub->sertopic,
serialized_message->buffer, serialized_message->buffer,
serialized_message->buffer_length); serialized_message->buffer_length);
const bool ok = (dds_writecdr(pub->pubh, d) >= 0); const bool ok = (dds_writecdr(pub->enth, d) >= 0);
return ok ? RMW_RET_OK : RMW_RET_ERROR; return ok ? RMW_RET_OK : RMW_RET_ERROR;
} }
@ -1007,7 +1016,9 @@ static dds_qos_t * create_readwrite_qos(
const rmw_qos_profile_t * qos_policies, const rmw_qos_profile_t * qos_policies,
bool ignore_local_publications) bool ignore_local_publications)
{ {
dds_duration_t ldur;
dds_qos_t * qos = dds_create_qos(); dds_qos_t * qos = dds_create_qos();
dds_qset_writer_data_lifecycle (qos, false); /* disable autodispose */
switch (qos_policies->history) { switch (qos_policies->history) {
case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_HISTORY_UNKNOWN: case RMW_QOS_POLICY_HISTORY_UNKNOWN:
@ -1062,7 +1073,34 @@ static dds_qos_t * create_readwrite_qos(
default: default:
rmw_cyclonedds_cpp::unreachable(); rmw_cyclonedds_cpp::unreachable();
} }
/* deadline, lifespan, liveliness are not yet supported */ if (qos_policies->lifespan.sec > 0 || qos_policies->lifespan.nsec > 0) {
dds_qset_lifespan(qos, DDS_SECS(qos_policies->lifespan.sec) + qos_policies->lifespan.nsec);
}
if (qos_policies->deadline.sec > 0 || qos_policies->deadline.nsec > 0) {
dds_qset_deadline(qos, DDS_SECS(qos_policies->deadline.sec) + qos_policies->deadline.nsec);
}
if (qos_policies->liveliness_lease_duration.sec == 0 &&
qos_policies->liveliness_lease_duration.nsec == 0)
{
ldur = DDS_INFINITY;
} else {
ldur = DDS_SECS(qos_policies->liveliness_lease_duration.sec) +
qos_policies->liveliness_lease_duration.nsec;
}
switch (qos_policies->liveliness) {
case RMW_QOS_POLICY_LIVELINESS_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC:
case RMW_QOS_POLICY_LIVELINESS_UNKNOWN:
dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, ldur);
break;
case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE:
dds_qset_liveliness(qos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur);
break;
case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC:
dds_qset_liveliness(qos, DDS_LIVELINESS_MANUAL_BY_TOPIC, ldur);
break;
}
if (ignore_local_publications) { if (ignore_local_publications) {
dds_qset_ignorelocal(qos, DDS_IGNORELOCAL_PARTICIPANT); dds_qset_ignorelocal(qos, DDS_IGNORELOCAL_PARTICIPANT);
} }
@ -1145,26 +1183,18 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
RMW_SET_ERROR_MSG("get_readwrite_qos: deadline not set"); RMW_SET_ERROR_MSG("get_readwrite_qos: deadline not set");
goto error; goto error;
} }
if (deadline == DDS_INFINITY) {
qos_policies->deadline.sec = qos_policies->deadline.nsec = 0;
} else {
qos_policies->deadline.sec = (uint64_t) deadline / 1000000000; qos_policies->deadline.sec = (uint64_t) deadline / 1000000000;
qos_policies->deadline.nsec = (uint64_t) deadline % 1000000000; qos_policies->deadline.nsec = (uint64_t) deadline % 1000000000;
} }
}
{ {
dds_duration_t lifespan; dds_duration_t lifespan;
if (!dds_qget_lifespan(qos, &lifespan)) { if (!dds_qget_lifespan(qos, &lifespan)) {
lifespan = DDS_INFINITY; lifespan = DDS_INFINITY;
} }
if (lifespan == DDS_INFINITY) {
qos_policies->lifespan.sec = qos_policies->lifespan.nsec = 0;
} else {
qos_policies->lifespan.sec = (uint64_t) lifespan / 1000000000; qos_policies->lifespan.sec = (uint64_t) lifespan / 1000000000;
qos_policies->lifespan.nsec = (uint64_t) lifespan % 1000000000; qos_policies->lifespan.nsec = (uint64_t) lifespan % 1000000000;
} }
}
{ {
dds_liveliness_kind_t kind; dds_liveliness_kind_t kind;
@ -1186,14 +1216,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
default: default:
rmw_cyclonedds_cpp::unreachable(); rmw_cyclonedds_cpp::unreachable();
} }
if (lease_duration == DDS_INFINITY) {
qos_policies->liveliness_lease_duration.sec = qos_policies->liveliness_lease_duration.nsec =
0;
} else {
qos_policies->liveliness_lease_duration.sec = (uint64_t) lease_duration / 1000000000; qos_policies->liveliness_lease_duration.sec = (uint64_t) lease_duration / 1000000000;
qos_policies->liveliness_lease_duration.nsec = (uint64_t) lease_duration % 1000000000; qos_policies->liveliness_lease_duration.nsec = (uint64_t) lease_duration % 1000000000;
} }
}
dds_delete_qos(qos); dds_delete_qos(qos);
return true; return true;
@ -1226,7 +1251,7 @@ static CddsPublisher * create_cdds_publisher(
create_message_type_support(type_support->data, type_support->typesupport_identifier), false, create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports)); rmw_cyclonedds_cpp::make_message_value_type(type_supports));
if ((topic = if ((topic =
dds_create_topic_arbitrary(node_impl->pp, sertopic, nullptr, nullptr, nullptr)) < 0) dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0)
{ {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
goto fail_topic; goto fail_topic;
@ -1234,11 +1259,11 @@ static CddsPublisher * create_cdds_publisher(
if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies, false)) == nullptr) {
goto fail_qos; goto fail_qos;
} }
if ((pub->pubh = dds_create_writer(node_impl->pub, topic, qos, nullptr)) < 0) { if ((pub->enth = dds_create_writer(node_impl->pub, topic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
} }
if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { if (dds_get_instance_handle(pub->enth, &pub->pubiid) < 0) {
RMW_SET_ERROR_MSG("failed to get instance handle for writer"); RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle; goto fail_instance_handle;
} }
@ -1248,7 +1273,7 @@ static CddsPublisher * create_cdds_publisher(
return pub; return pub;
fail_instance_handle: fail_instance_handle:
if (dds_delete(pub->pubh) < 0) { if (dds_delete(pub->enth) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling");
} }
fail_writer: fail_writer:
@ -1309,7 +1334,7 @@ extern "C" rmw_publisher_t * rmw_create_publisher(
fail_topic_name: fail_topic_name:
rmw_publisher_free(rmw_publisher); rmw_publisher_free(rmw_publisher);
fail_publisher: fail_publisher:
if (dds_delete(pub->pubh) < 0) { if (dds_delete(pub->enth) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling");
} }
delete pub; delete pub;
@ -1350,7 +1375,7 @@ extern "C" rmw_ret_t rmw_publisher_count_matched_subscriptions(
RET_WRONG_IMPLID(publisher); RET_WRONG_IMPLID(publisher);
auto pub = static_cast<CddsPublisher *>(publisher->data); auto pub = static_cast<CddsPublisher *>(publisher->data);
dds_publication_matched_status_t status; dds_publication_matched_status_t status;
if (dds_get_publication_matched_status(pub->pubh, &status) < 0) { if (dds_get_publication_matched_status(pub->enth, &status) < 0) {
return RMW_RET_ERROR; return RMW_RET_ERROR;
} else { } else {
*subscription_count = status.current_count; *subscription_count = status.current_count;
@ -1369,7 +1394,7 @@ rmw_ret_t rmw_publisher_get_actual_qos(const rmw_publisher_t * publisher, rmw_qo
RET_NULL(qos); RET_NULL(qos);
RET_WRONG_IMPLID(publisher); RET_WRONG_IMPLID(publisher);
auto pub = static_cast<CddsPublisher *>(publisher->data); auto pub = static_cast<CddsPublisher *>(publisher->data);
if (get_readwrite_qos(pub->pubh, qos)) { if (get_readwrite_qos(pub->enth, qos)) {
return RMW_RET_OK; return RMW_RET_OK;
} else { } else {
return RMW_RET_ERROR; return RMW_RET_ERROR;
@ -1407,7 +1432,7 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t *
RET_WRONG_IMPLID(publisher); RET_WRONG_IMPLID(publisher);
auto pub = static_cast<CddsPublisher *>(publisher->data); auto pub = static_cast<CddsPublisher *>(publisher->data);
if (pub != nullptr) { if (pub != nullptr) {
if (dds_delete(pub->pubh) < 0) { if (dds_delete(pub->enth) < 0) {
RMW_SET_ERROR_MSG("failed to delete writer"); RMW_SET_ERROR_MSG("failed to delete writer");
} }
ddsi_sertopic_unref(pub->sertopic); ddsi_sertopic_unref(pub->sertopic);
@ -1448,7 +1473,7 @@ static CddsSubscription * create_cdds_subscription(
create_message_type_support(type_support->data, type_support->typesupport_identifier), false, create_message_type_support(type_support->data, type_support->typesupport_identifier), false,
rmw_cyclonedds_cpp::make_message_value_type(type_supports)); rmw_cyclonedds_cpp::make_message_value_type(type_supports));
if ((topic = if ((topic =
dds_create_topic_arbitrary(node_impl->pp, sertopic, nullptr, nullptr, nullptr)) < 0) dds_create_topic_arbitrary(node_impl->enth, sertopic, nullptr, nullptr, nullptr)) < 0)
{ {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
goto fail_topic; goto fail_topic;
@ -1456,11 +1481,11 @@ static CddsSubscription * create_cdds_subscription(
if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies, ignore_local_publications)) == nullptr) {
goto fail_qos; goto fail_qos;
} }
if ((sub->subh = dds_create_reader(node_impl->sub, topic, qos, nullptr)) < 0) { if ((sub->enth = dds_create_reader(node_impl->sub, topic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader"); RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader; goto fail_reader;
} }
if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) {
RMW_SET_ERROR_MSG("failed to create readcondition"); RMW_SET_ERROR_MSG("failed to create readcondition");
goto fail_readcond; goto fail_readcond;
} }
@ -1469,7 +1494,7 @@ static CddsSubscription * create_cdds_subscription(
dds_delete(topic); dds_delete(topic);
return sub; return sub;
fail_readcond: fail_readcond:
if (dds_delete(sub->subh) < 0) { if (dds_delete(sub->enth) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling");
} }
fail_reader: fail_reader:
@ -1546,7 +1571,7 @@ fail_subscription:
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
"failed to delete readcondition during error handling"); "failed to delete readcondition during error handling");
} }
if (dds_delete(sub->subh) < 0) { if (dds_delete(sub->enth) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling");
} }
delete sub; delete sub;
@ -1560,7 +1585,7 @@ extern "C" rmw_ret_t rmw_subscription_count_matched_publishers(
RET_WRONG_IMPLID(subscription); RET_WRONG_IMPLID(subscription);
auto sub = static_cast<CddsSubscription *>(subscription->data); auto sub = static_cast<CddsSubscription *>(subscription->data);
dds_subscription_matched_status_t status; dds_subscription_matched_status_t status;
if (dds_get_subscription_matched_status(sub->subh, &status) < 0) { if (dds_get_subscription_matched_status(sub->enth, &status) < 0) {
return RMW_RET_ERROR; return RMW_RET_ERROR;
} else { } else {
*publisher_count = status.current_count; *publisher_count = status.current_count;
@ -1575,7 +1600,7 @@ extern "C" rmw_ret_t rmw_subscription_get_actual_qos(
RET_NULL(qos); RET_NULL(qos);
RET_WRONG_IMPLID(subscription); RET_WRONG_IMPLID(subscription);
auto sub = static_cast<CddsSubscription *>(subscription->data); auto sub = static_cast<CddsSubscription *>(subscription->data);
if (get_readwrite_qos(sub->subh, qos)) { if (get_readwrite_qos(sub->enth, qos)) {
return RMW_RET_OK; return RMW_RET_OK;
} else { } else {
return RMW_RET_ERROR; return RMW_RET_ERROR;
@ -1592,7 +1617,7 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t * node, rmw_subscriptio
if (dds_delete(sub->rdcondh) < 0) { if (dds_delete(sub->rdcondh) < 0) {
RMW_SET_ERROR_MSG("failed to delete readcondition"); RMW_SET_ERROR_MSG("failed to delete readcondition");
} }
if (dds_delete(sub->subh) < 0) { if (dds_delete(sub->enth) < 0) {
RMW_SET_ERROR_MSG("failed to delete reader"); RMW_SET_ERROR_MSG("failed to delete reader");
} }
ddsi_sertopic_unref(sub->sertopic); ddsi_sertopic_unref(sub->sertopic);
@ -1614,7 +1639,7 @@ static rmw_ret_t rmw_take_int(
CddsSubscription * sub = static_cast<CddsSubscription *>(subscription->data); CddsSubscription * sub = static_cast<CddsSubscription *>(subscription->data);
RET_NULL(sub); RET_NULL(sub);
dds_sample_info_t info; dds_sample_info_t info;
while (dds_take(sub->subh, &ros_message, &info, 1, 1) == 1) { while (dds_take(sub->enth, &ros_message, &info, 1, 1) == 1) {
if (info.valid_data) { if (info.valid_data) {
if (message_info) { if (message_info) {
message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier; message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier;
@ -1651,7 +1676,7 @@ static rmw_ret_t rmw_take_ser_int(
RET_NULL(sub); RET_NULL(sub);
dds_sample_info_t info; dds_sample_info_t info;
struct ddsi_serdata * dcmn; struct ddsi_serdata * dcmn;
while (dds_takecdr(sub->subh, &dcmn, 1, &info, DDS_ANY_STATE) == 1) { while (dds_takecdr(sub->enth, &dcmn, 1, &info, DDS_ANY_STATE) == 1) {
if (info.valid_data) { if (info.valid_data) {
if (message_info) { if (message_info) {
message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier; message_info->publisher_gid.implementation_identifier = eclipse_cyclonedds_identifier;
@ -1771,7 +1796,7 @@ extern "C" rmw_ret_t rmw_take_event(
auto ei = static_cast<rmw_liveliness_changed_status_t *>(event_info); auto ei = static_cast<rmw_liveliness_changed_status_t *>(event_info);
auto sub = static_cast<CddsSubscription *>(event_handle->data); auto sub = static_cast<CddsSubscription *>(event_handle->data);
dds_liveliness_changed_status_t st; dds_liveliness_changed_status_t st;
if (dds_get_liveliness_changed_status(sub->subh, &st) < 0) { if (dds_get_liveliness_changed_status(sub->enth, &st) < 0) {
*taken = false; *taken = false;
return RMW_RET_ERROR; return RMW_RET_ERROR;
} else { } else {
@ -1788,7 +1813,7 @@ extern "C" rmw_ret_t rmw_take_event(
auto ei = static_cast<rmw_requested_deadline_missed_status_t *>(event_info); auto ei = static_cast<rmw_requested_deadline_missed_status_t *>(event_info);
auto sub = static_cast<CddsSubscription *>(event_handle->data); auto sub = static_cast<CddsSubscription *>(event_handle->data);
dds_requested_deadline_missed_status_t st; dds_requested_deadline_missed_status_t st;
if (dds_get_requested_deadline_missed_status(sub->subh, &st) < 0) { if (dds_get_requested_deadline_missed_status(sub->enth, &st) < 0) {
*taken = false; *taken = false;
return RMW_RET_ERROR; return RMW_RET_ERROR;
} else { } else {
@ -1803,7 +1828,7 @@ extern "C" rmw_ret_t rmw_take_event(
auto ei = static_cast<rmw_liveliness_lost_status_t *>(event_info); auto ei = static_cast<rmw_liveliness_lost_status_t *>(event_info);
auto pub = static_cast<CddsPublisher *>(event_handle->data); auto pub = static_cast<CddsPublisher *>(event_handle->data);
dds_liveliness_lost_status_t st; dds_liveliness_lost_status_t st;
if (dds_get_liveliness_lost_status(pub->pubh, &st) < 0) { if (dds_get_liveliness_lost_status(pub->enth, &st) < 0) {
*taken = false; *taken = false;
return RMW_RET_ERROR; return RMW_RET_ERROR;
} else { } else {
@ -1818,7 +1843,7 @@ extern "C" rmw_ret_t rmw_take_event(
auto ei = static_cast<rmw_offered_deadline_missed_status_t *>(event_info); auto ei = static_cast<rmw_offered_deadline_missed_status_t *>(event_info);
auto pub = static_cast<CddsPublisher *>(event_handle->data); auto pub = static_cast<CddsPublisher *>(event_handle->data);
dds_offered_deadline_missed_status_t st; dds_offered_deadline_missed_status_t st;
if (dds_get_offered_deadline_missed_status(pub->pubh, &st) < 0) { if (dds_get_offered_deadline_missed_status(pub->enth, &st) < 0) {
*taken = false; *taken = false;
return RMW_RET_ERROR; return RMW_RET_ERROR;
} else { } else {
@ -2011,6 +2036,27 @@ static bool require_reattach(const std::vector<T *> & cached, size_t count, void
} }
} }
static bool require_reattach(
const std::vector<CddsEvent> & cached, rmw_events_t * events)
{
if (events == nullptr || events->event_count == 0) {
return cached.size() != 0;
} else if (events->event_count != cached.size()) {
return true;
} else {
for (size_t i = 0; i < events->event_count; ++i) {
rmw_event_t * current_event = static_cast<rmw_event_t *>(events->events[i]);
CddsEvent c = cached.at(i);
if (c.enth != static_cast<CddsEntity *>(current_event->data)->enth ||
c.event_type != current_event->event_type)
{
return true;
}
}
return false;
}
}
static void waitset_detach(CddsWaitset * ws) static void waitset_detach(CddsWaitset * ws)
{ {
for (auto && x : ws->subs) { for (auto && x : ws->subs) {
@ -2047,12 +2093,84 @@ static void clean_waitset_caches()
} }
} }
/// mapping of RMW_EVENT to the corresponding DDS status
static const std::unordered_map<rmw_event_type_t, uint32_t> mask_map{
{RMW_EVENT_LIVELINESS_CHANGED, DDS_LIVELINESS_CHANGED_STATUS},
{RMW_EVENT_REQUESTED_DEADLINE_MISSED, DDS_REQUESTED_DEADLINE_MISSED_STATUS},
{RMW_EVENT_LIVELINESS_LOST, DDS_LIVELINESS_LOST_STATUS},
{RMW_EVENT_OFFERED_DEADLINE_MISSED, DDS_OFFERED_DEADLINE_MISSED_STATUS},
};
static uint32_t get_status_kind_from_rmw(const rmw_event_type_t event_t)
{
return mask_map.at(event_t);
}
static bool is_event_supported(const rmw_event_type_t event_t)
{
return mask_map.count(event_t) > 0;
}
static rmw_ret_t gather_event_entities(
const rmw_events_t * events,
std::unordered_set<dds_entity_t> & entities)
{
RMW_CHECK_ARGUMENT_FOR_NULL(events, RMW_RET_INVALID_ARGUMENT);
std::unordered_map<dds_entity_t, uint32_t> status_mask_map;
for (size_t i = 0; i < events->event_count; ++i) {
rmw_event_t * current_event = static_cast<rmw_event_t *>(events->events[i]);
dds_entity_t dds_entity = static_cast<CddsEntity *>(current_event->data)->enth;
if (dds_entity <= 0) {
RMW_SET_ERROR_MSG("Event entity handle is invalid");
return RMW_RET_ERROR;
}
if (is_event_supported(current_event->event_type)) {
if (status_mask_map.find(dds_entity) == status_mask_map.end()) {
status_mask_map[dds_entity] = 0;
}
status_mask_map[dds_entity] |= get_status_kind_from_rmw(current_event->event_type);
}
}
for (auto & pair : status_mask_map) {
// set the status condition's mask with the supported type
dds_set_status_mask(pair.first, pair.second);
entities.insert(pair.first);
}
return RMW_RET_OK;
}
static rmw_ret_t handle_active_events(rmw_events_t * events)
{
if (events) {
for (size_t i = 0; i < events->event_count; ++i) {
rmw_event_t * current_event = static_cast<rmw_event_t *>(events->events[i]);
dds_entity_t dds_entity = static_cast<CddsEntity *>(current_event->data)->enth;
if (dds_entity <= 0) {
RMW_SET_ERROR_MSG("Event entity handle is invalid");
return RMW_RET_ERROR;
}
uint32_t status_mask;
dds_get_status_changes(dds_entity, &status_mask);
if (!is_event_supported(current_event->event_type) ||
!static_cast<bool>(status_mask & get_status_kind_from_rmw(current_event->event_type)))
{
events->events[i] = nullptr;
}
}
}
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_wait( extern "C" rmw_ret_t rmw_wait(
rmw_subscriptions_t * subs, rmw_guard_conditions_t * gcs, rmw_subscriptions_t * subs, rmw_guard_conditions_t * gcs,
rmw_services_t * srvs, rmw_clients_t * cls, rmw_events_t * evs, rmw_services_t * srvs, rmw_clients_t * cls, rmw_events_t * evs,
rmw_wait_set_t * wait_set, const rmw_time_t * wait_timeout) rmw_wait_set_t * wait_set, const rmw_time_t * wait_timeout)
{ {
static_cast<void>(evs);
RET_NULL(wait_set); RET_NULL(wait_set);
CddsWaitset * ws = static_cast<CddsWaitset *>(wait_set->data); CddsWaitset * ws = static_cast<CddsWaitset *>(wait_set->data);
RET_NULL(ws); RET_NULL(ws);
@ -2071,7 +2189,8 @@ extern "C" rmw_ret_t rmw_wait(
require_reattach(ws->gcs, gcs ? gcs->guard_condition_count : 0, require_reattach(ws->gcs, gcs ? gcs->guard_condition_count : 0,
gcs ? gcs->guard_conditions : nullptr) || gcs ? gcs->guard_conditions : nullptr) ||
require_reattach(ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) || require_reattach(ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) ||
require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr) ||
require_reattach(ws->evs, evs))
{ {
size_t nelems = 0; size_t nelems = 0;
waitset_detach(ws); waitset_detach(ws);
@ -2093,6 +2212,28 @@ extern "C" rmw_ret_t rmw_wait(
ATTACH(CddsService, srvs, service, service.sub->rdcondh); ATTACH(CddsService, srvs, service, service.sub->rdcondh);
ATTACH(CddsClient, cls, client, client.sub->rdcondh); ATTACH(CddsClient, cls, client, client.sub->rdcondh);
#undef ATTACH #undef ATTACH
ws->evs.resize(0);
if (evs) {
std::unordered_set<dds_entity_t> event_entities;
rmw_ret_t ret_code = gather_event_entities(evs, event_entities);
if (ret_code != RMW_RET_OK) {
return ret_code;
}
for (auto e : event_entities) {
dds_waitset_attach(ws->waitseth, e, nelems);
nelems++;
}
ws->evs.reserve(evs->event_count);
for (size_t i = 0; i < evs->event_count; i++) {
auto current_event = static_cast<rmw_event_t *>(evs->events[i]);
CddsEvent ev;
ev.enth = static_cast<CddsEntity *>(current_event->data)->enth;
ev.event_type = current_event->event_type;
ws->evs.push_back(ev);
}
}
ws->nelems = nelems; ws->nelems = nelems;
} }
@ -2116,7 +2257,6 @@ extern "C" rmw_ret_t rmw_wait(
if (var) { \ if (var) { \
for (size_t i = 0; i < var->name ## _count; i++) { \ for (size_t i = 0; i < var->name ## _count; i++) { \
auto x = static_cast<type *>(var->name ## s[i]); \ auto x = static_cast<type *>(var->name ## s[i]); \
/*dds_waitset_detach (ws->waitseth, x->cond);*/ \
if (ws->trigs[trig_idx] == static_cast<dds_attach_t>(nelems)) { \ if (ws->trigs[trig_idx] == static_cast<dds_attach_t>(nelems)) { \
on_triggered; \ on_triggered; \
trig_idx++; \ trig_idx++; \
@ -2133,6 +2273,7 @@ extern "C" rmw_ret_t rmw_wait(
DETACH(CddsService, srvs, service, service.sub->rdcondh, (void) x); DETACH(CddsService, srvs, service, service.sub->rdcondh, (void) x);
DETACH(CddsClient, cls, client, client.sub->rdcondh, (void) x); DETACH(CddsClient, cls, client, client.sub->rdcondh, (void) x);
#undef DETACH #undef DETACH
handle_active_events(evs);
} }
#if REPORT_BLOCKED_REQUESTS #if REPORT_BLOCKED_REQUESTS
@ -2167,7 +2308,7 @@ static rmw_ret_t rmw_take_response_request(
dds_sample_info_t info; dds_sample_info_t info;
wrap.data = ros_data; wrap.data = ros_data;
void * wrap_ptr = static_cast<void *>(&wrap); void * wrap_ptr = static_cast<void *>(&wrap);
while (dds_take(cs->sub->subh, &wrap_ptr, &info, 1, 1) == 1) { while (dds_take(cs->sub->enth, &wrap_ptr, &info, 1, 1) == 1) {
if (info.valid_data) { if (info.valid_data) {
memset(request_header, 0, sizeof(wrap.header)); memset(request_header, 0, sizeof(wrap.header));
assert(sizeof(wrap.header.guid) <= sizeof(request_header->writer_guid)); assert(sizeof(wrap.header.guid) <= sizeof(request_header->writer_guid));
@ -2248,7 +2389,7 @@ static rmw_ret_t rmw_send_response_request(
const void * ros_data) const void * ros_data)
{ {
const cdds_request_wrapper_t wrap = {header, const_cast<void *>(ros_data)}; const cdds_request_wrapper_t wrap = {header, const_cast<void *>(ros_data)};
if (dds_write(cs->pub->pubh, static_cast<const void *>(&wrap)) >= 0) { if (dds_write(cs->pub->enth, static_cast<const void *>(&wrap)) >= 0) {
return RMW_RET_OK; return RMW_RET_OK;
} else { } else {
RMW_SET_ERROR_MSG("cannot publish data"); RMW_SET_ERROR_MSG("cannot publish data");
@ -2377,13 +2518,13 @@ static rmw_ret_t rmw_init_cs(
dds_qos_t * qos; dds_qos_t * qos;
if ((pubtopic = if ((pubtopic =
dds_create_topic_arbitrary(node_impl->pp, pub_st, nullptr, nullptr, nullptr)) < 0) dds_create_topic_arbitrary(node_impl->enth, pub_st, nullptr, nullptr, nullptr)) < 0)
{ {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
goto fail_pubtopic; goto fail_pubtopic;
} }
if ((subtopic = if ((subtopic =
dds_create_topic_arbitrary(node_impl->pp, sub_st, nullptr, nullptr, nullptr)) < 0) dds_create_topic_arbitrary(node_impl->enth, sub_st, nullptr, nullptr, nullptr)) < 0)
{ {
RMW_SET_ERROR_MSG("failed to create topic"); RMW_SET_ERROR_MSG("failed to create topic");
goto fail_subtopic; goto fail_subtopic;
@ -2393,21 +2534,21 @@ static rmw_ret_t rmw_init_cs(
} }
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1));
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
if ((pub->pubh = dds_create_writer(node_impl->pub, pubtopic, qos, nullptr)) < 0) { if ((pub->enth = dds_create_writer(node_impl->pub, pubtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
} }
pub->sertopic = pub_st; pub->sertopic = pub_st;
if ((sub->subh = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { if ((sub->enth = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader"); RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader; goto fail_reader;
} }
sub->sertopic = sub_st; sub->sertopic = sub_st;
if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { if ((sub->rdcondh = dds_create_readcondition(sub->enth, DDS_ANY_STATE)) < 0) {
RMW_SET_ERROR_MSG("failed to create readcondition"); RMW_SET_ERROR_MSG("failed to create readcondition");
goto fail_readcond; goto fail_readcond;
} }
if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { if (dds_get_instance_handle(pub->enth, &pub->pubiid) < 0) {
RMW_SET_ERROR_MSG("failed to get instance handle for writer"); RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle; goto fail_instance_handle;
} }
@ -2422,9 +2563,9 @@ static rmw_ret_t rmw_init_cs(
fail_instance_handle: fail_instance_handle:
dds_delete(sub->rdcondh); dds_delete(sub->rdcondh);
fail_readcond: fail_readcond:
dds_delete(sub->subh); dds_delete(sub->enth);
fail_reader: fail_reader:
dds_delete(pub->pubh); dds_delete(pub->enth);
fail_writer: fail_writer:
dds_delete_qos(qos); dds_delete_qos(qos);
fail_qos: fail_qos:
@ -2440,8 +2581,8 @@ static void rmw_fini_cs(CddsCS * cs)
ddsi_sertopic_unref(cs->sub->sertopic); ddsi_sertopic_unref(cs->sub->sertopic);
ddsi_sertopic_unref(cs->pub->sertopic); ddsi_sertopic_unref(cs->pub->sertopic);
dds_delete(cs->sub->rdcondh); dds_delete(cs->sub->rdcondh);
dds_delete(cs->sub->subh); dds_delete(cs->sub->enth);
dds_delete(cs->pub->pubh); dds_delete(cs->pub->enth);
} }
extern "C" rmw_client_t * rmw_create_client( extern "C" rmw_client_t * rmw_create_client(
@ -2539,7 +2680,9 @@ static rmw_ret_t do_for_node(
std::function<bool(const dds_builtintopic_participant_t & sample)> oper) std::function<bool(const dds_builtintopic_participant_t & sample)> oper)
{ {
dds_entity_t rd; dds_entity_t rd;
if ((rd = dds_create_reader(node_impl->pp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { if ((rd = dds_create_reader(node_impl->enth, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT,
NULL, NULL)) < 0)
{
RMW_SET_ERROR_MSG("rmw_get_node_names: failed to create reader"); RMW_SET_ERROR_MSG("rmw_get_node_names: failed to create reader");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
@ -2663,7 +2806,7 @@ static rmw_ret_t rmw_collect_tptyp_for_kind(
builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION ||
builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION);
dds_entity_t rd; dds_entity_t rd;
if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) { if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) {
RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader"); RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
@ -2956,8 +3099,8 @@ extern "C" rmw_ret_t rmw_service_server_is_available(
auto info = static_cast<CddsClient *>(client->data); auto info = static_cast<CddsClient *>(client->data);
dds_publication_matched_status_t ps; dds_publication_matched_status_t ps;
dds_subscription_matched_status_t cs; dds_subscription_matched_status_t cs;
if (dds_get_publication_matched_status(info->client.pub->pubh, &ps) < 0 || if (dds_get_publication_matched_status(info->client.pub->enth, &ps) < 0 ||
dds_get_subscription_matched_status(info->client.sub->subh, &cs) < 0) dds_get_subscription_matched_status(info->client.sub->enth, &cs) < 0)
{ {
RMW_SET_ERROR_MSG("rmw_service_server_is_available: get_..._matched_status failed"); RMW_SET_ERROR_MSG("rmw_service_server_is_available: get_..._matched_status failed");
return RMW_RET_ERROR; return RMW_RET_ERROR;
@ -2980,7 +3123,7 @@ static rmw_ret_t rmw_count_pubs_or_subs(
std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false); std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", false);
dds_entity_t rd; dds_entity_t rd;
if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) { if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) {
RMW_SET_ERROR_MSG("rmw_count_pubs_or_subs failed to create reader"); RMW_SET_ERROR_MSG("rmw_count_pubs_or_subs failed to create reader");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }