diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index ed5a639..1e3cde5 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -20,8 +20,7 @@ - should serialize straight into serdata_t, instead of into a FastBuffer that then gets copied - - topic creation: until cdds allows multiple calls to create_topic for the same topic, use - create-or-else-find and leak + - topic creation: shouldn't leak topics - hash set of writer handles should be thread safe: no guarantee that no writers get added/deleted in parallel to each other or to takes @@ -110,28 +109,14 @@ extern "C" { const char *const adlink_cyclonedds_identifier = "rmw_cyclonedds_cpp"; -struct condition { - std::mutex internalMutex_; - std::atomic_uint triggerValue_; - std::mutex *conditionMutex_; - std::condition_variable *conditionVariable_; - condition() : triggerValue_(0), conditionMutex_(nullptr), conditionVariable_(nullptr) { } -}; - -static void condition_set_trigger(struct condition *cond, unsigned value); -static void condition_add_trigger(struct condition *cond, int delta); - struct CddsTypeSupport { void *type_support_; const char *typesupport_identifier_; }; -/* this had better be compatible with the "guid" field in the rmw_request_id_t and the data field in rmw_gid_t */ -typedef std::array cdds_guid_t; -typedef std::array cdds_gid_t; - -/* instance handles are carefully constructed to be as close to uniformly distributed as possible - for no other reason than making them near-perfect hash keys */ +/* instance handles are unsigned 64-bit integers carefully constructed to be as close to uniformly + distributed as possible for no other reason than making them near-perfect hash keys, hence we can + improve over the default hash function */ struct dds_instance_handle_hash { public: std::size_t operator()(dds_instance_handle_t const& x) const noexcept { @@ -140,7 +125,6 @@ public: }; struct CddsNode { - dds_entity_t ppant; rmw_guard_condition_t *graph_guard_condition; std::unordered_set own_writers; }; @@ -155,10 +139,10 @@ struct CddsSubscription { typedef rmw_subscription_t rmw_type; typedef rmw_subscriptions_t rmw_set_type; dds_entity_t subh; + dds_entity_t rdcondh; CddsNode *node; bool ignore_local_publications; CddsTypeSupport ts; - struct condition cond; }; struct CddsCS { @@ -179,57 +163,22 @@ struct CddsService { }; struct CddsGuardCondition { - struct condition cond; + dds_entity_t gcondh; }; -/* iterators for sets of subscriptions, clients, services and guard conditions */ -#define DEFITER1(const_, cddstype_, rmwtype_, name_) \ - static const_ Cdds##cddstype_ * const_ *begin(const_ rmw_##rmwtype_##_t& s) { \ - return (const_ Cdds##cddstype_ * const_ *)(&s.name_##s[0]); \ - } \ - static const_ Cdds##cddstype_ * const_ *end(const_ rmw_##rmwtype_##_t& s) { \ - return (const_ Cdds##cddstype_ * const_ *)(&s.name_##s[s.name_##_count]); \ - } -#define DEFITER(cddstype_, rmwtype_, name_) \ - DEFITER1(, cddstype_, rmwtype_, name_) \ - DEFITER1(const, cddstype_, rmwtype_, name_) -DEFITER(Subscription, subscriptions, subscriber) -DEFITER(Client, clients, client) -DEFITER(Service, services, service) -DEFITER(GuardCondition, guard_conditions, guard_condition) -#undef DEFITER -#undef DEFITER1 - -struct condition *get_condition(CddsSubscription *x) { return &x->cond; } -struct condition *get_condition(CddsClient *x) { return &x->client.sub->cond; } -struct condition *get_condition(CddsService *x) { return &x->service.sub->cond; } -struct condition *get_condition(CddsGuardCondition *x) { return &x->cond; } -template const condition *get_condition(const T *x) { return get_condition(const_cast(x)); } - -template void condition_set_trigger(T *x, unsigned value) { - condition_set_trigger(get_condition(x), value); -} -template void condition_add_trigger(T *x, int delta) { - condition_add_trigger(get_condition(x), delta); -} -template bool condition_read(T *x) { - return condition_read(get_condition(x)); -} -template bool condition_take(T *x) { - return condition_take(get_condition(x)); -} -template void condition_attach(T *x, std::mutex *conditionMutex, std::condition_variable *conditionVariable) { - condition_attach(get_condition(x), conditionMutex, conditionVariable); -} -template void condition_detach(T *x) { - condition_detach(get_condition(x)); -} +struct Cdds { + std::mutex lock; + uint32_t refcount; + dds_entity_t ppant; + Cdds() : refcount(0), ppant(0) {} +}; typedef struct cdds_request_header { uint64_t guid; int64_t seq; } cdds_request_header_t; +static Cdds gcdds; using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; @@ -339,6 +288,28 @@ extern "C" rmw_ret_t rmw_init() return RMW_RET_OK; } +static dds_entity_t ref_ppant() +{ + std::lock_guard lock(gcdds.lock); + if (gcdds.refcount == 0) { + if ((gcdds.ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG("failed to create participant"); + return gcdds.ppant; + } + } + gcdds.refcount++; + return gcdds.ppant; +} + +static void unref_ppant() +{ + std::lock_guard lock(gcdds.lock); + if (--gcdds.refcount == 0) { + dds_delete(gcdds.ppant); + gcdds.ppant = 0; + } +} + ///////////////////////////////////////////////////////////////////////////////////////// /////////// /////////// /////////// NODES /////////// @@ -352,6 +323,10 @@ extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_, (void)domain_id; (void)security_options; + dds_entity_t pp = ref_ppant(); + if (pp < 0) { + return nullptr; + } auto *node_impl = new CddsNode(); rmw_node_t *node_handle = nullptr; RET_ALLOC_X(node_impl, goto fail_node_impl); @@ -361,11 +336,6 @@ extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_, } node_impl->graph_guard_condition = graph_guard_condition; - if ((node_impl->ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL)) < 0) { - RMW_SET_ERROR_MSG("failed to create participant"); - goto fail_ppant; - } - node_handle = rmw_node_allocate(); RET_ALLOC_X(node_handle, goto fail_node_handle); node_handle->implementation_identifier = adlink_cyclonedds_identifier; @@ -389,16 +359,13 @@ extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_, fail_node_handle_name: rmw_node_free(node_handle); fail_node_handle: - if (dds_delete(node_impl->ppant) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete participant during error handling"); - } - fail_ppant: if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); } fail_ggc: delete node_impl; fail_node_impl: + unref_ppant(); return nullptr; } @@ -408,10 +375,6 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t *node) RET_WRONG_IMPLID(node); auto node_impl = static_cast(node->data); RET_NULL(node_impl); - if (dds_delete(node_impl->ppant) < 0) { - RMW_SET_ERROR_MSG("failed to delete participant"); - result_ret = RMW_RET_ERROR; - } rmw_free(const_cast(node->name)); rmw_free(const_cast(node->namespace_)); rmw_node_free(node); @@ -420,6 +383,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t *node) result_ret = RMW_RET_ERROR; } delete node_impl; + unref_ppant(); return result_ret; } @@ -551,17 +515,14 @@ static CddsPublisher *create_cdds_publisher(const rmw_node_t *node, const rosidl pub->ts.type_support_ = create_message_type_support(type_support->data, pub->ts.typesupport_identifier_); std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); - /*FIXME: fix topic creation issues in CycloneDDS */ - if ((topic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), NULL, NULL)) < 0) { - if ((topic = dds_find_topic(node_impl->ppant, fqtopic_name.c_str())) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); - goto fail_topic; - } + if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_topic; } if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { goto fail_qos; } - if ((pub->pubh = dds_create_writer(node_impl->ppant, topic, qos, NULL)) < 0) { + if ((pub->pubh = dds_create_writer(gcdds.ppant, topic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } @@ -665,13 +626,6 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t *node, rmw_publisher_t *pu /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static void subhandler(dds_entity_t rd, void *vsub) -{ - CddsSubscription *sub = static_cast(vsub); - (void)rd; - condition_add_trigger(&sub->cond, 1); -} - static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { RET_WRONG_IMPLID_X(node, return nullptr); @@ -681,11 +635,9 @@ static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const RET_NULL_X(node_impl, return nullptr); const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); RET_NULL_X(type_support, return nullptr); - (void)ignore_local_publications; CddsSubscription *sub = new CddsSubscription(); dds_entity_t topic; dds_qos_t *qos; - dds_listener_t *listeners; sub->node = node_impl; sub->ignore_local_publications = ignore_local_publications; @@ -693,31 +645,28 @@ static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const sub->ts.type_support_ = create_message_type_support(type_support->data, sub->ts.typesupport_identifier_); std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); - /*FIXME: fix topic creation issues in CycloneDDS */ - if ((topic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), NULL, NULL)) < 0) { - if ((topic = dds_find_topic(node_impl->ppant, fqtopic_name.c_str())) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); - goto fail_topic; - } + if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_topic; } if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { goto fail_qos; } - if ((listeners = dds_listener_create(static_cast(sub))) == nullptr) { - goto fail_listener; - } - dds_lset_data_available(listeners, subhandler); - if ((sub->subh = dds_create_reader(node_impl->ppant, topic, qos, listeners)) < 0) { + if ((sub->subh = dds_create_reader(gcdds.ppant, topic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } + if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { + RMW_SET_ERROR_MSG("failed to create readcondition"); + goto fail_readcond; + } dds_qos_delete(qos); - dds_listener_delete(listeners); return sub; - + fail_readcond: + if (dds_delete(sub->subh) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); + } fail_reader: - dds_listener_delete(listeners); - fail_listener: dds_qos_delete(qos); fail_qos: /* FIXME: leak topic */ @@ -744,8 +693,11 @@ extern "C" rmw_subscription_t *rmw_create_subscription(const rmw_node_t *node, c fail_topic_name: rmw_subscription_free(rmw_subscription); fail_subscription: + if (dds_delete(sub->rdcondh) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete readcondition during error handling"); + } if (dds_delete(sub->subh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); } delete sub; fail_common_init: @@ -758,6 +710,9 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t *node, rmw_subscription RET_WRONG_IMPLID(subscription); auto sub = static_cast(subscription->data); if (sub != nullptr) { + if (dds_delete(sub->rdcondh) < 0) { + RMW_SET_ERROR_MSG("failed to delete readcondition"); + } if (dds_delete(sub->subh) < 0) { RMW_SET_ERROR_MSG("failed to delete reader"); } @@ -779,7 +734,6 @@ static rmw_ret_t rmw_take_int(const rmw_subscription_t *subscription, void *ros_ struct serdata *sd; dds_sample_info_t info; while (dds_takecdr(sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { - condition_add_trigger(&sub->cond, -1); if (info.valid_data && !(sub->ignore_local_publications && sub->node->own_writers.count(info.publication_handle))) { size_t sz; void *raw; @@ -821,109 +775,55 @@ extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription, ///////////////////////////////////////////////////////////////////////////////////////// struct CddsWaitset { - std::condition_variable condition; - std::mutex condition_mutex; + dds_entity_t waitseth; }; -static void condition_set_trigger(struct condition *cond, unsigned value) -{ - std::lock_guard lock(cond->internalMutex_); - if (cond->conditionMutex_ != nullptr) { - std::unique_lock clock(*cond->conditionMutex_); - // the change to triggerValue_ needs to be mutually exclusive with - // rmw_wait() which checks hasTriggered() and decides if wait() needs to - // be called - const bool notify = (value > 0 && cond->triggerValue_ == 0); - cond->triggerValue_ = value; - clock.unlock(); - if (notify) { - cond->conditionVariable_->notify_one(); - } - } else { - cond->triggerValue_ = value; - } -} - -static void condition_add_trigger(struct condition *cond, int delta) -{ - std::lock_guard lock(cond->internalMutex_); - if (cond->conditionMutex_ != nullptr) { - std::unique_lock clock(*cond->conditionMutex_); - // the change to triggerValue_ needs to be mutually exclusive with - // rmw_wait() which checks hasTriggered() and decides if wait() needs to - // be called - const bool notify = (delta > 0 && cond->triggerValue_ == 0); - assert(delta >= 0 || cond->triggerValue_ >= (unsigned)-delta); - cond->triggerValue_ += delta; - clock.unlock(); - if (notify) { - cond->conditionVariable_->notify_one(); - } - } else { - assert(delta >= 0 || cond->triggerValue_ >= (unsigned)-delta); - cond->triggerValue_ += delta; - } -} - -static void condition_attach(struct condition *cond, std::mutex *conditionMutex, std::condition_variable *conditionVariable) -{ - std::lock_guard lock(cond->internalMutex_); - cond->conditionMutex_ = conditionMutex; - cond->conditionVariable_ = conditionVariable; -} - -static void condition_detach(struct condition *cond) -{ - std::lock_guard lock(cond->internalMutex_); - cond->conditionMutex_ = nullptr; - cond->conditionVariable_ = nullptr; -} - -static bool condition_read(const struct condition *cond) -{ - return cond->triggerValue_ > 0; -} - -static bool condition_read(struct condition *cond) -{ - return condition_read(const_cast(cond)); -} - -static bool condition_take(struct condition *cond) -{ - std::lock_guard lock(cond->internalMutex_); - bool ret = cond->triggerValue_ > 0; - cond->triggerValue_ = 0; - return ret; -} - extern "C" rmw_guard_condition_t *rmw_create_guard_condition() { - rmw_guard_condition_t *guard_condition_handle = new rmw_guard_condition_t; + rmw_guard_condition_t *guard_condition_handle; + auto *gcond_impl = new CddsGuardCondition(); + if (ref_ppant() < 0) { + goto fail_ppant; + } + if ((gcond_impl->gcondh = dds_create_guardcondition(gcdds.ppant)) < 0) { + RMW_SET_ERROR_MSG("failed to create guardcondition"); + goto fail_guardcond; + } + guard_condition_handle = new rmw_guard_condition_t; guard_condition_handle->implementation_identifier = adlink_cyclonedds_identifier; - guard_condition_handle->data = new CddsGuardCondition(); + guard_condition_handle->data = gcond_impl; return guard_condition_handle; + + fail_guardcond: + unref_ppant(); + fail_ppant: + delete(gcond_impl); + return nullptr; } -extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_condition) +extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_condition_handle) { - RET_NULL(guard_condition); - delete static_cast(guard_condition->data); - delete guard_condition; + RET_NULL(guard_condition_handle); + auto *gcond_impl = static_cast(guard_condition_handle->data); + dds_delete(gcond_impl->gcondh); + unref_ppant(); + delete gcond_impl; + delete guard_condition_handle; return RMW_RET_OK; } extern "C" rmw_ret_t rmw_trigger_guard_condition(const rmw_guard_condition_t *guard_condition_handle) { RET_WRONG_IMPLID(guard_condition_handle); - condition_set_trigger(static_cast(guard_condition_handle->data), 1); + auto *gcond_impl = static_cast(guard_condition_handle->data); + dds_set_guardcondition(gcond_impl->gcondh, true); return RMW_RET_OK; } extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) { (void)max_conditions; - rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); + rmw_wait_set_t *wait_set = rmw_wait_set_allocate(); CddsWaitset *ws = nullptr; RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); wait_set->implementation_identifier = adlink_cyclonedds_identifier; @@ -931,16 +831,21 @@ extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) RET_ALLOC_X(wait_set->data, goto fail_alloc_wait_set_data); // This should default-construct the fields of CddsWaitset ws = static_cast(wait_set->data); - RMW_TRY_PLACEMENT_NEW(ws, ws, goto fail_placement_new, CddsWaitset, ) - if (!ws) { - RMW_SET_ERROR_MSG("failed to construct wait set info struct"); - goto fail_ws; - } + RMW_TRY_PLACEMENT_NEW(ws, ws, goto fail_placement_new, CddsWaitset, ); + if (!ws) { + RMW_SET_ERROR_MSG("failed to construct wait set info struct"); + goto fail_ws; + } + if ((ws->waitseth = dds_create_waitset(gcdds.ppant)) < 0) { + RMW_SET_ERROR_MSG("failed to create waitset"); + goto fail_waitset; + } return wait_set; - + + fail_waitset: fail_ws: - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws) - fail_placement_new: + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws); + fail_placement_new: rmw_free(wait_set->data); fail_alloc_wait_set_data: rmw_wait_set_free(wait_set); @@ -948,82 +853,80 @@ extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) return nullptr; } -extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t * wait_set) +extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t *wait_set) { RET_WRONG_IMPLID(wait_set); auto result = RMW_RET_OK; auto ws = static_cast(wait_set->data); RET_NULL(ws); - std::mutex *conditionMutex = &ws->condition_mutex; - RET_NULL(conditionMutex); - RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR) - rmw_free(wait_set->data); + dds_delete(ws->waitseth); + RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR); + rmw_free(wait_set->data); rmw_wait_set_free(wait_set); return result; } -static bool check_wait_set_for_data(const rmw_subscriptions_t *subs, const rmw_guard_conditions_t *gcs, const rmw_services_t *srvs, const rmw_clients_t *cls) -{ - if (subs) { for (auto&& x : *subs) { if (condition_read(x)) return true; } } - if (cls) { for (auto&& x : *cls) { if (condition_read(x)) return true; } } - if (srvs) { for (auto&& x : *srvs) { if (condition_read(x)) return true; } } - if (gcs) { for (auto&& x : *gcs) { if (condition_read(x)) return true; } } - return false; -} - extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) { RET_NULL(wait_set); CddsWaitset *ws = static_cast(wait_set->data); RET_NULL(ws); - std::mutex *conditionMutex = &ws->condition_mutex; - std::condition_variable *conditionVariable = &ws->condition; - - if (subs) { for (auto&& x : *subs) condition_attach(x, conditionMutex, conditionVariable); } - if (cls) { for (auto&& x : *cls) condition_attach(x, conditionMutex, conditionVariable); } - if (srvs) { for (auto&& x : *srvs) condition_attach(x, conditionMutex, conditionVariable); } - if (gcs) { for (auto&& x : *gcs) condition_attach(x, conditionMutex, conditionVariable); } + size_t nelems; + dds_return_t ntrig; - // This mutex prevents any of the listeners to change the internal state and notify the - // condition between the call to hasData() / hasTriggered() and wait() otherwise the - // decision to wait might be incorrect - std::unique_lock lock(*conditionMutex); + nelems = 0; +#define ATTACH(type, var, name, condname) do { \ + if (var) { \ + for (size_t i = 0; i < var->name##_count; i++) { \ + auto x = static_cast(var->name##s[i]); \ + dds_waitset_attach(ws->waitseth, x->condname, nelems); \ + nelems++; \ + } \ + } \ + } while (0) + ATTACH(CddsSubscription, subs, subscriber, rdcondh); + ATTACH(CddsClient, cls, client, client.sub->rdcondh); + ATTACH(CddsService, srvs, service, service.sub->rdcondh); + ATTACH(CddsGuardCondition, gcs, guard_condition, gcondh); +#undef ATTACH - // First check variables. - // If wait_timeout is null, wait indefinitely (so we have to wait) - // If wait_timeout is not null and either of its fields are nonzero, we have to wait - bool timeout; - if (check_wait_set_for_data(subs, gcs, srvs, cls)) { - timeout = false; - } else if (wait_timeout && wait_timeout->sec == 0 && wait_timeout->nsec == 0) { - /* timeout = 0: no waiting required */ - timeout = true; + std::vector trigs; + dds_duration_t timeout; + if (wait_timeout == NULL) { + timeout = DDS_NEVER; } else { - auto predicate = [subs, gcs, srvs, cls]() { return check_wait_set_for_data(subs, gcs, srvs, cls); }; - if (!wait_timeout) { - conditionVariable->wait(lock, predicate); - timeout = false; - } else { - auto n = std::chrono::duration_cast(std::chrono::seconds(wait_timeout->sec)); - n += std::chrono::nanoseconds(wait_timeout->nsec); - timeout = !conditionVariable->wait_for(lock, n, predicate); - } + timeout = (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec; } + trigs.reserve(nelems + 1); + ntrig = dds_waitset_wait(ws->waitseth, &trigs[0], nelems, timeout); + std::sort(trigs.begin(), trigs.end()); + trigs[ntrig] = (dds_attach_t)-1; - // Unlock the condition variable mutex to prevent deadlocks that can occur if - // a listener triggers while the condition variable is being detached. - // Listeners will no longer be prevented from changing their internal state, - // but that should not cause issues (if a listener has data / has triggered - // after we check, it will be caught on the next call to this function). - lock.unlock(); + long trig_idx = 0; + bool dummy; + nelems = 0; +#define DETACH(type, var, name, condname, on_triggered) do { \ + if (var) { \ + for (size_t i = 0; i < var->name##_count; i++) { \ + auto x = static_cast(var->name##s[i]); \ + dds_waitset_detach(ws->waitseth, x->condname); \ + if (trigs[trig_idx] == (long)nelems) { \ + on_triggered \ + trig_idx++; \ + } else { \ + var->name##s[i] = nullptr; \ + } \ + nelems++; \ + } \ + } \ + } while (0) + DETACH(CddsSubscription, subs, subscriber, rdcondh, ;); + DETACH(CddsClient, cls, client, client.sub->rdcondh, ;); + DETACH(CddsService, srvs, service, service.sub->rdcondh, ;); + DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy);); +#undef DETACH - if (subs) { for (auto&& x : *subs) { condition_detach(x); if (!condition_read(x)) x = nullptr; } } - if (cls) { for (auto&& x : *cls) { condition_detach(x); if (!condition_read(x)) x = nullptr; } } - if (srvs) { for (auto&& x : *srvs) { condition_detach(x); if (!condition_read(x)) x = nullptr; } } - // guard conditions are auto-resetting, hence condition_take - if (gcs) { for (auto&& x : *gcs) { condition_detach(x); if (!condition_take(x)) x = nullptr; } } - - return timeout ? RMW_RET_TIMEOUT : RMW_RET_OK; + return (ntrig == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK; } ///////////////////////////////////////////////////////////////////////////////////////// @@ -1040,7 +943,6 @@ static rmw_ret_t rmw_take_response_request(CddsCS *cs, rmw_request_id_t *request struct serdata *sd; dds_sample_info_t info; while (dds_takecdr(cs->sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { - condition_add_trigger(&cs->sub->cond, -1); if (info.valid_data) { size_t sz; void *raw; @@ -1157,7 +1059,6 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se subtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); } - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client") RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str()) RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str()) @@ -1165,44 +1066,37 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se dds_entity_t pubtopic, subtopic; dds_qos_t *qos; - dds_listener_t *listeners; - if ((pubtopic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, pubtopic_name.c_str(), NULL, NULL)) < 0) { - if ((pubtopic = dds_find_topic(node_impl->ppant, pubtopic_name.c_str())) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); - goto fail_pubtopic; - } + if ((pubtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, pubtopic_name.c_str(), nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_pubtopic; } - if ((subtopic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, subtopic_name.c_str(), NULL, NULL)) < 0) { - if ((subtopic = dds_find_topic(node_impl->ppant, subtopic_name.c_str())) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); - goto fail_subtopic; - } + if ((subtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, subtopic_name.c_str(), nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG("failed to create topic"); + goto fail_subtopic; } if ((qos = dds_qos_create()) == nullptr) { goto fail_qos; } dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); - if ((listeners = dds_listener_create(static_cast(sub))) == nullptr) { - goto fail_listener; - } - dds_lset_data_available(listeners, subhandler); - if ((pub->pubh = dds_create_writer(node_impl->ppant, pubtopic, qos, NULL)) < 0) { + if ((pub->pubh = dds_create_writer(gcdds.ppant, pubtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } - if ((sub->subh = dds_create_reader(node_impl->ppant, subtopic, qos, listeners)) < 0) { + sub->node = node_impl; + if ((sub->subh = dds_create_reader(gcdds.ppant, subtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } - sub->node = node_impl; - dds_qos_delete(qos); - dds_listener_delete(listeners); - + if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { + RMW_SET_ERROR_MSG("failed to create readcondition"); + goto fail_readcond; + } if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { RMW_SET_ERROR_MSG("failed to get instance handle for writer"); goto fail_instance_handle; } + dds_qos_delete(qos); node_impl->own_writers.insert(pub->pubiid); cs->pub = pub; @@ -1210,14 +1104,12 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se return RMW_RET_OK; fail_instance_handle: - if (dds_delete(pub->pubh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); - } + dds_delete(sub->rdcondh); + fail_readcond: + dds_delete(sub->subh); fail_reader: dds_delete(pub->pubh); fail_writer: - dds_listener_delete(listeners); - fail_listener: dds_qos_delete(qos); fail_qos: /* leak subtopic */ @@ -1229,6 +1121,7 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se static void rmw_fini_cs(CddsCS *cs) { + dds_delete(cs->sub->rdcondh); dds_delete(cs->sub->subh); dds_delete(cs->pub->pubh); cs->sub->node->own_writers.erase(cs->pub->pubiid);