diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 1e3cde5..51f4db4 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -25,8 +25,6 @@ - hash set of writer handles should be thread safe: no guarantee that no writers get added/deleted in parallel to each other or to takes - - should use cdds read conditions, triggers & waitsets rather the local thing done here - - introspection stuff not done yet (probably requires additions to cdds) - check / make sure a node remains valid while one of its subscriptions exists @@ -166,10 +164,24 @@ struct CddsGuardCondition { dds_entity_t gcondh; }; +struct CddsWaitset { + dds_entity_t waitseth; + + std::vector trigs; + + std::mutex lock; + bool inuse; + std::vector subs; + std::vector gcs; + std::vector cls; + std::vector srvs; +}; + struct Cdds { std::mutex lock; uint32_t refcount; dds_entity_t ppant; + std::unordered_set waitsets; Cdds() : refcount(0), ppant(0) {} }; @@ -187,6 +199,8 @@ using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport; using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport; +static void clean_waitset_caches(); + static bool using_introspection_c_typesupport(const char *typesupport_identifier) { return typesupport_identifier == rosidl_typesupport_introspection_c__identifier; @@ -710,6 +724,7 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t *node, rmw_subscription RET_WRONG_IMPLID(subscription); auto sub = static_cast(subscription->data); if (sub != nullptr) { + clean_waitset_caches(); if (dds_delete(sub->rdcondh) < 0) { RMW_SET_ERROR_MSG("failed to delete readcondition"); } @@ -774,10 +789,6 @@ extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription, /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -struct CddsWaitset { - dds_entity_t waitseth; -}; - extern "C" rmw_guard_condition_t *rmw_create_guard_condition() { rmw_guard_condition_t *guard_condition_handle; @@ -805,6 +816,7 @@ extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_co { RET_NULL(guard_condition_handle); auto *gcond_impl = static_cast(guard_condition_handle->data); + clean_waitset_caches(); dds_delete(gcond_impl->gcondh); unref_ppant(); delete gcond_impl; @@ -836,10 +848,15 @@ extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) RMW_SET_ERROR_MSG("failed to construct wait set info struct"); goto fail_ws; } + ws->inuse = false; if ((ws->waitseth = dds_create_waitset(gcdds.ppant)) < 0) { RMW_SET_ERROR_MSG("failed to create waitset"); goto fail_waitset; } + { + std::lock_guard lock(gcdds.lock); + gcdds.waitsets.insert(ws); + } return wait_set; fail_waitset: @@ -860,73 +877,135 @@ extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t *wait_set) auto ws = static_cast(wait_set->data); RET_NULL(ws); dds_delete(ws->waitseth); + { + std::lock_guard lock(gcdds.lock); + gcdds.waitsets.erase(ws); + } RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR); rmw_free(wait_set->data); rmw_wait_set_free(wait_set); return result; } +template +static bool require_reattach(const std::vector& cached, size_t count, void **ary) +{ + if (ary == nullptr || count == 0) { + return (cached.size() != 0); + } else if (count != cached.size()) { + return true; + } else { + return (memcmp(static_cast(cached.data()), static_cast(ary), count * sizeof(void *)) != 0); + } +} + +static void waitset_detach(CddsWaitset *ws) +{ + for (auto&& x : ws->subs) dds_waitset_detach(ws->waitseth, x->rdcondh); + for (auto&& x : ws->gcs) dds_waitset_detach(ws->waitseth, x->gcondh); + for (auto&& x : ws->srvs) dds_waitset_detach(ws->waitseth, x->service.sub->rdcondh); + for (auto&& x : ws->cls) dds_waitset_detach(ws->waitseth, x->client.sub->rdcondh); + ws->subs.resize(0); + ws->gcs.resize(0); + ws->srvs.resize(0); + ws->cls.resize(0); +} + +static void clean_waitset_caches() +{ + /* Called whenever a subscriber, guard condition, service or client is deleted (as these may + have been cached in a waitset), and drops all cached entities from all waitsets (just to keep + life simple). I'm assuming one is not allowed to delete an entity while it is still being + used ... */ + std::lock_guard lock(gcdds.lock); + for (auto&& ws : gcdds.waitsets) { + std::lock_guard lock(ws->lock); + if (!ws->inuse) { + waitset_detach(ws); + } + } +} + extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) { RET_NULL(wait_set); CddsWaitset *ws = static_cast(wait_set->data); RET_NULL(ws); - size_t nelems; - dds_return_t ntrig; - nelems = 0; -#define ATTACH(type, var, name, condname) do { \ - if (var) { \ - for (size_t i = 0; i < var->name##_count; i++) { \ - auto x = static_cast(var->name##s[i]); \ - dds_waitset_attach(ws->waitseth, x->condname, nelems); \ - nelems++; \ - } \ - } \ - } while (0) - ATTACH(CddsSubscription, subs, subscriber, rdcondh); - ATTACH(CddsClient, cls, client, client.sub->rdcondh); - ATTACH(CddsService, srvs, service, service.sub->rdcondh); - ATTACH(CddsGuardCondition, gcs, guard_condition, gcondh); -#undef ATTACH - - std::vector trigs; - dds_duration_t timeout; - if (wait_timeout == NULL) { - timeout = DDS_NEVER; - } else { - timeout = (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec; + { + std::lock_guard lock(ws->lock); + if (ws->inuse) { + RMW_SET_ERROR_MSG("concurrent calls to rmw_wait on a single waitset is not supported"); + return RMW_RET_ERROR; + } + ws->inuse = true; } - trigs.reserve(nelems + 1); - ntrig = dds_waitset_wait(ws->waitseth, &trigs[0], nelems, timeout); - std::sort(trigs.begin(), trigs.end()); - trigs[ntrig] = (dds_attach_t)-1; - - long trig_idx = 0; - bool dummy; - nelems = 0; -#define DETACH(type, var, name, condname, on_triggered) do { \ - if (var) { \ - for (size_t i = 0; i < var->name##_count; i++) { \ - auto x = static_cast(var->name##s[i]); \ - dds_waitset_detach(ws->waitseth, x->condname); \ - if (trigs[trig_idx] == (long)nelems) { \ - on_triggered \ - trig_idx++; \ - } else { \ - var->name##s[i] = nullptr; \ + + if (require_reattach(ws->subs, subs ? subs->subscriber_count : 0, subs ? subs->subscribers : nullptr) || + require_reattach(ws->gcs, gcs ? gcs->guard_condition_count : 0, gcs ? gcs->guard_conditions : nullptr) || + require_reattach(ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) || + require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) { + size_t nelems = 0; + waitset_detach(ws); +#define ATTACH(type, var, name, cond) do { \ + ws->var.resize(0); \ + if (var) { \ + ws->var.reserve(var->name##_count); \ + for (size_t i = 0; i < var->name##_count; i++) { \ + auto x = static_cast(var->name##s[i]); \ + ws->var.push_back(x); \ + dds_waitset_attach(ws->waitseth, x->cond, nelems); \ + nelems++; \ } \ - nelems++; \ } \ - } \ - } while (0) - DETACH(CddsSubscription, subs, subscriber, rdcondh, ;); - DETACH(CddsClient, cls, client, client.sub->rdcondh, ;); - DETACH(CddsService, srvs, service, service.sub->rdcondh, ;); - DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy);); -#undef DETACH + } while (0) + ATTACH(CddsSubscription, subs, subscriber, rdcondh); + ATTACH(CddsGuardCondition, gcs, guard_condition, gcondh); + ATTACH(CddsService, srvs, service, service.sub->rdcondh); + ATTACH(CddsClient, cls, client, client.sub->rdcondh); +#undef ATTACH + ws->trigs.reserve(nelems + 1); + } + + const dds_duration_t timeout = + (wait_timeout == NULL) ? DDS_NEVER : (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec; + const dds_return_t ntrig = dds_waitset_wait(ws->waitseth, ws->trigs.data(), ws->trigs.capacity(), timeout); + ws->trigs.resize(ntrig); + std::sort(ws->trigs.begin(), ws->trigs.end()); + ws->trigs.push_back((dds_attach_t)-1); - return (ntrig == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK; + { + long trig_idx = 0; + bool dummy; + size_t nelems = 0; +#define DETACH(type, var, name, cond, on_triggered) do { \ + if (var) { \ + for (size_t i = 0; i < var->name##_count; i++) { \ + auto x = static_cast(var->name##s[i]); \ + /*dds_waitset_detach(ws->waitseth, x->cond);*/ \ + if (ws->trigs[trig_idx] == (long)nelems) { \ + on_triggered; \ + trig_idx++; \ + } else { \ + var->name##s[i] = nullptr; \ + } \ + nelems++; \ + } \ + } \ + } while (0) + DETACH(CddsSubscription, subs, subscriber, rdcondh, (void)x); + DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy)); + DETACH(CddsService, srvs, service, service.sub->rdcondh, (void)x); + DETACH(CddsClient, cls, client, client.sub->rdcondh, (void)x); +#undef DETACH + } + + { + std::lock_guard lock(ws->lock); + ws->inuse = false; + } + + return (ws->trigs.size() == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK; } ///////////////////////////////////////////////////////////////////////////////////////// @@ -1154,6 +1233,7 @@ extern "C" rmw_ret_t rmw_destroy_client(rmw_node_t *node, rmw_client_t *client) RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(client); auto info = static_cast(client->data); + clean_waitset_caches(); rmw_fini_cs(&info->client); rmw_free(const_cast(client->service_name)); rmw_client_free(client); @@ -1187,6 +1267,7 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t *node, rmw_service_t *servic RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(service); auto info = static_cast(service->data); + clean_waitset_caches(); rmw_fini_cs(&info->service); rmw_free(const_cast(service->service_name)); rmw_service_free(service);