use dds conditions and waitsets

This commit is contained in:
Erik Boasson 2018-07-19 16:35:46 +02:00
parent bae6854ff4
commit e6b6ede709

View file

@ -25,8 +25,6 @@
- hash set of writer handles should be thread safe: no guarantee that no writers get
added/deleted in parallel to each other or to takes
- should use cdds read conditions, triggers & waitsets rather the local thing done here
- introspection stuff not done yet (probably requires additions to cdds)
- check / make sure a node remains valid while one of its subscriptions exists
@ -166,10 +164,24 @@ struct CddsGuardCondition {
dds_entity_t gcondh;
};
struct CddsWaitset {
dds_entity_t waitseth;
std::vector<dds_attach_t> trigs;
std::mutex lock;
bool inuse;
std::vector<CddsSubscription *> subs;
std::vector<CddsGuardCondition *> gcs;
std::vector<CddsClient *> cls;
std::vector<CddsService *> srvs;
};
struct Cdds {
std::mutex lock;
uint32_t refcount;
dds_entity_t ppant;
std::unordered_set<CddsWaitset *> waitsets;
Cdds() : refcount(0), ppant(0) {}
};
@ -187,6 +199,8 @@ using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport<rosidl_typ
using ResponseTypeSupport_c = rmw_cyclonedds_cpp::ResponseTypeSupport<rosidl_typesupport_introspection_c__ServiceMembers, rosidl_typesupport_introspection_c__MessageMembers>;
using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport<rosidl_typesupport_introspection_cpp::ServiceMembers, rosidl_typesupport_introspection_cpp::MessageMembers>;
static void clean_waitset_caches();
static bool using_introspection_c_typesupport(const char *typesupport_identifier)
{
return typesupport_identifier == rosidl_typesupport_introspection_c__identifier;
@ -710,6 +724,7 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t *node, rmw_subscription
RET_WRONG_IMPLID(subscription);
auto sub = static_cast<CddsSubscription *>(subscription->data);
if (sub != nullptr) {
clean_waitset_caches();
if (dds_delete(sub->rdcondh) < 0) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
}
@ -774,10 +789,6 @@ extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription,
/////////// ///////////
/////////////////////////////////////////////////////////////////////////////////////////
struct CddsWaitset {
dds_entity_t waitseth;
};
extern "C" rmw_guard_condition_t *rmw_create_guard_condition()
{
rmw_guard_condition_t *guard_condition_handle;
@ -805,6 +816,7 @@ extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_co
{
RET_NULL(guard_condition_handle);
auto *gcond_impl = static_cast<CddsGuardCondition *>(guard_condition_handle->data);
clean_waitset_caches();
dds_delete(gcond_impl->gcondh);
unref_ppant();
delete gcond_impl;
@ -836,10 +848,15 @@ extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions)
RMW_SET_ERROR_MSG("failed to construct wait set info struct");
goto fail_ws;
}
ws->inuse = false;
if ((ws->waitseth = dds_create_waitset(gcdds.ppant)) < 0) {
RMW_SET_ERROR_MSG("failed to create waitset");
goto fail_waitset;
}
{
std::lock_guard<std::mutex> lock(gcdds.lock);
gcdds.waitsets.insert(ws);
}
return wait_set;
fail_waitset:
@ -860,58 +877,114 @@ extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t *wait_set)
auto ws = static_cast<CddsWaitset *>(wait_set->data);
RET_NULL(ws);
dds_delete(ws->waitseth);
{
std::lock_guard<std::mutex> lock(gcdds.lock);
gcdds.waitsets.erase(ws);
}
RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR);
rmw_free(wait_set->data);
rmw_wait_set_free(wait_set);
return result;
}
template<typename T>
static bool require_reattach(const std::vector<T *>& cached, size_t count, void **ary)
{
if (ary == nullptr || count == 0) {
return (cached.size() != 0);
} else if (count != cached.size()) {
return true;
} else {
return (memcmp(static_cast<const void *>(cached.data()), static_cast<void *>(ary), count * sizeof(void *)) != 0);
}
}
static void waitset_detach(CddsWaitset *ws)
{
for (auto&& x : ws->subs) dds_waitset_detach(ws->waitseth, x->rdcondh);
for (auto&& x : ws->gcs) dds_waitset_detach(ws->waitseth, x->gcondh);
for (auto&& x : ws->srvs) dds_waitset_detach(ws->waitseth, x->service.sub->rdcondh);
for (auto&& x : ws->cls) dds_waitset_detach(ws->waitseth, x->client.sub->rdcondh);
ws->subs.resize(0);
ws->gcs.resize(0);
ws->srvs.resize(0);
ws->cls.resize(0);
}
static void clean_waitset_caches()
{
/* Called whenever a subscriber, guard condition, service or client is deleted (as these may
have been cached in a waitset), and drops all cached entities from all waitsets (just to keep
life simple). I'm assuming one is not allowed to delete an entity while it is still being
used ... */
std::lock_guard<std::mutex> lock(gcdds.lock);
for (auto&& ws : gcdds.waitsets) {
std::lock_guard<std::mutex> lock(ws->lock);
if (!ws->inuse) {
waitset_detach(ws);
}
}
}
extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout)
{
RET_NULL(wait_set);
CddsWaitset *ws = static_cast<CddsWaitset *>(wait_set->data);
RET_NULL(ws);
size_t nelems;
dds_return_t ntrig;
nelems = 0;
#define ATTACH(type, var, name, condname) do { \
{
std::lock_guard<std::mutex> lock(ws->lock);
if (ws->inuse) {
RMW_SET_ERROR_MSG("concurrent calls to rmw_wait on a single waitset is not supported");
return RMW_RET_ERROR;
}
ws->inuse = true;
}
if (require_reattach(ws->subs, subs ? subs->subscriber_count : 0, subs ? subs->subscribers : nullptr) ||
require_reattach(ws->gcs, gcs ? gcs->guard_condition_count : 0, gcs ? gcs->guard_conditions : nullptr) ||
require_reattach(ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) ||
require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) {
size_t nelems = 0;
waitset_detach(ws);
#define ATTACH(type, var, name, cond) do { \
ws->var.resize(0); \
if (var) { \
ws->var.reserve(var->name##_count); \
for (size_t i = 0; i < var->name##_count; i++) { \
auto x = static_cast<type *>(var->name##s[i]); \
dds_waitset_attach(ws->waitseth, x->condname, nelems); \
ws->var.push_back(x); \
dds_waitset_attach(ws->waitseth, x->cond, nelems); \
nelems++; \
} \
} \
} while (0)
ATTACH(CddsSubscription, subs, subscriber, rdcondh);
ATTACH(CddsClient, cls, client, client.sub->rdcondh);
ATTACH(CddsService, srvs, service, service.sub->rdcondh);
ATTACH(CddsGuardCondition, gcs, guard_condition, gcondh);
ATTACH(CddsService, srvs, service, service.sub->rdcondh);
ATTACH(CddsClient, cls, client, client.sub->rdcondh);
#undef ATTACH
std::vector<dds_attach_t> trigs;
dds_duration_t timeout;
if (wait_timeout == NULL) {
timeout = DDS_NEVER;
} else {
timeout = (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec;
ws->trigs.reserve(nelems + 1);
}
trigs.reserve(nelems + 1);
ntrig = dds_waitset_wait(ws->waitseth, &trigs[0], nelems, timeout);
std::sort(trigs.begin(), trigs.end());
trigs[ntrig] = (dds_attach_t)-1;
const dds_duration_t timeout =
(wait_timeout == NULL) ? DDS_NEVER : (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec;
const dds_return_t ntrig = dds_waitset_wait(ws->waitseth, ws->trigs.data(), ws->trigs.capacity(), timeout);
ws->trigs.resize(ntrig);
std::sort(ws->trigs.begin(), ws->trigs.end());
ws->trigs.push_back((dds_attach_t)-1);
{
long trig_idx = 0;
bool dummy;
nelems = 0;
#define DETACH(type, var, name, condname, on_triggered) do { \
size_t nelems = 0;
#define DETACH(type, var, name, cond, on_triggered) do { \
if (var) { \
for (size_t i = 0; i < var->name##_count; i++) { \
auto x = static_cast<type *>(var->name##s[i]); \
dds_waitset_detach(ws->waitseth, x->condname); \
if (trigs[trig_idx] == (long)nelems) { \
on_triggered \
/*dds_waitset_detach(ws->waitseth, x->cond);*/ \
if (ws->trigs[trig_idx] == (long)nelems) { \
on_triggered; \
trig_idx++; \
} else { \
var->name##s[i] = nullptr; \
@ -920,13 +993,19 @@ extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t
} \
} \
} while (0)
DETACH(CddsSubscription, subs, subscriber, rdcondh, ;);
DETACH(CddsClient, cls, client, client.sub->rdcondh, ;);
DETACH(CddsService, srvs, service, service.sub->rdcondh, ;);
DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy););
DETACH(CddsSubscription, subs, subscriber, rdcondh, (void)x);
DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy));
DETACH(CddsService, srvs, service, service.sub->rdcondh, (void)x);
DETACH(CddsClient, cls, client, client.sub->rdcondh, (void)x);
#undef DETACH
}
return (ntrig == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK;
{
std::lock_guard<std::mutex> lock(ws->lock);
ws->inuse = false;
}
return (ws->trigs.size() == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK;
}
/////////////////////////////////////////////////////////////////////////////////////////
@ -1154,6 +1233,7 @@ extern "C" rmw_ret_t rmw_destroy_client(rmw_node_t *node, rmw_client_t *client)
RET_WRONG_IMPLID(node);
RET_WRONG_IMPLID(client);
auto info = static_cast<CddsClient *>(client->data);
clean_waitset_caches();
rmw_fini_cs(&info->client);
rmw_free(const_cast<char *>(client->service_name));
rmw_client_free(client);
@ -1187,6 +1267,7 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t *node, rmw_service_t *servic
RET_WRONG_IMPLID(node);
RET_WRONG_IMPLID(service);
auto info = static_cast<CddsService *>(service->data);
clean_waitset_caches();
rmw_fini_cs(&info->service);
rmw_free(const_cast<char *>(service->service_name));
rmw_service_free(service);