use waitsets, readconditions, guardconditions for waiting

This commit is contained in:
Erik Boasson 2018-07-13 08:52:21 +02:00
parent 9c71a0bed2
commit bae6854ff4

View file

@ -20,8 +20,7 @@
- should serialize straight into serdata_t, instead of into a FastBuffer that then gets copied - should serialize straight into serdata_t, instead of into a FastBuffer that then gets copied
- topic creation: until cdds allows multiple calls to create_topic for the same topic, use - topic creation: shouldn't leak topics
create-or-else-find and leak
- hash set of writer handles should be thread safe: no guarantee that no writers get - hash set of writer handles should be thread safe: no guarantee that no writers get
added/deleted in parallel to each other or to takes added/deleted in parallel to each other or to takes
@ -110,28 +109,14 @@ extern "C" {
const char *const adlink_cyclonedds_identifier = "rmw_cyclonedds_cpp"; const char *const adlink_cyclonedds_identifier = "rmw_cyclonedds_cpp";
struct condition {
std::mutex internalMutex_;
std::atomic_uint triggerValue_;
std::mutex *conditionMutex_;
std::condition_variable *conditionVariable_;
condition() : triggerValue_(0), conditionMutex_(nullptr), conditionVariable_(nullptr) { }
};
static void condition_set_trigger(struct condition *cond, unsigned value);
static void condition_add_trigger(struct condition *cond, int delta);
struct CddsTypeSupport { struct CddsTypeSupport {
void *type_support_; void *type_support_;
const char *typesupport_identifier_; const char *typesupport_identifier_;
}; };
/* this had better be compatible with the "guid" field in the rmw_request_id_t and the data field in rmw_gid_t */ /* instance handles are unsigned 64-bit integers carefully constructed to be as close to uniformly
typedef std::array<uint8_t,16> cdds_guid_t; distributed as possible for no other reason than making them near-perfect hash keys, hence we can
typedef std::array<uint8_t,RMW_GID_STORAGE_SIZE> cdds_gid_t; improve over the default hash function */
/* instance handles are carefully constructed to be as close to uniformly distributed as possible
for no other reason than making them near-perfect hash keys */
struct dds_instance_handle_hash { struct dds_instance_handle_hash {
public: public:
std::size_t operator()(dds_instance_handle_t const& x) const noexcept { std::size_t operator()(dds_instance_handle_t const& x) const noexcept {
@ -140,7 +125,6 @@ public:
}; };
struct CddsNode { struct CddsNode {
dds_entity_t ppant;
rmw_guard_condition_t *graph_guard_condition; rmw_guard_condition_t *graph_guard_condition;
std::unordered_set<dds_instance_handle_t, dds_instance_handle_hash> own_writers; std::unordered_set<dds_instance_handle_t, dds_instance_handle_hash> own_writers;
}; };
@ -155,10 +139,10 @@ struct CddsSubscription {
typedef rmw_subscription_t rmw_type; typedef rmw_subscription_t rmw_type;
typedef rmw_subscriptions_t rmw_set_type; typedef rmw_subscriptions_t rmw_set_type;
dds_entity_t subh; dds_entity_t subh;
dds_entity_t rdcondh;
CddsNode *node; CddsNode *node;
bool ignore_local_publications; bool ignore_local_publications;
CddsTypeSupport ts; CddsTypeSupport ts;
struct condition cond;
}; };
struct CddsCS { struct CddsCS {
@ -179,57 +163,22 @@ struct CddsService {
}; };
struct CddsGuardCondition { struct CddsGuardCondition {
struct condition cond; dds_entity_t gcondh;
}; };
/* iterators for sets of subscriptions, clients, services and guard conditions */ struct Cdds {
#define DEFITER1(const_, cddstype_, rmwtype_, name_) \ std::mutex lock;
static const_ Cdds##cddstype_ * const_ *begin(const_ rmw_##rmwtype_##_t& s) { \ uint32_t refcount;
return (const_ Cdds##cddstype_ * const_ *)(&s.name_##s[0]); \ dds_entity_t ppant;
} \ Cdds() : refcount(0), ppant(0) {}
static const_ Cdds##cddstype_ * const_ *end(const_ rmw_##rmwtype_##_t& s) { \ };
return (const_ Cdds##cddstype_ * const_ *)(&s.name_##s[s.name_##_count]); \
}
#define DEFITER(cddstype_, rmwtype_, name_) \
DEFITER1(, cddstype_, rmwtype_, name_) \
DEFITER1(const, cddstype_, rmwtype_, name_)
DEFITER(Subscription, subscriptions, subscriber)
DEFITER(Client, clients, client)
DEFITER(Service, services, service)
DEFITER(GuardCondition, guard_conditions, guard_condition)
#undef DEFITER
#undef DEFITER1
struct condition *get_condition(CddsSubscription *x) { return &x->cond; }
struct condition *get_condition(CddsClient *x) { return &x->client.sub->cond; }
struct condition *get_condition(CddsService *x) { return &x->service.sub->cond; }
struct condition *get_condition(CddsGuardCondition *x) { return &x->cond; }
template <typename T> const condition *get_condition(const T *x) { return get_condition(const_cast<T *>(x)); }
template<typename T> void condition_set_trigger(T *x, unsigned value) {
condition_set_trigger(get_condition(x), value);
}
template<typename T> void condition_add_trigger(T *x, int delta) {
condition_add_trigger(get_condition(x), delta);
}
template<typename T> bool condition_read(T *x) {
return condition_read(get_condition(x));
}
template<typename T> bool condition_take(T *x) {
return condition_take(get_condition(x));
}
template<typename T> void condition_attach(T *x, std::mutex *conditionMutex, std::condition_variable *conditionVariable) {
condition_attach(get_condition(x), conditionMutex, conditionVariable);
}
template<typename T> void condition_detach(T *x) {
condition_detach(get_condition(x));
}
typedef struct cdds_request_header { typedef struct cdds_request_header {
uint64_t guid; uint64_t guid;
int64_t seq; int64_t seq;
} cdds_request_header_t; } cdds_request_header_t;
static Cdds gcdds;
using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_c__MessageMembers>; using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_c__MessageMembers>;
using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_cpp::MessageMembers>; using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_cpp::MessageMembers>;
@ -339,6 +288,28 @@ extern "C" rmw_ret_t rmw_init()
return RMW_RET_OK; return RMW_RET_OK;
} }
static dds_entity_t ref_ppant()
{
std::lock_guard<std::mutex> lock(gcdds.lock);
if (gcdds.refcount == 0) {
if ((gcdds.ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, nullptr, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create participant");
return gcdds.ppant;
}
}
gcdds.refcount++;
return gcdds.ppant;
}
static void unref_ppant()
{
std::lock_guard<std::mutex> lock(gcdds.lock);
if (--gcdds.refcount == 0) {
dds_delete(gcdds.ppant);
gcdds.ppant = 0;
}
}
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
/////////// /////////// /////////// ///////////
/////////// NODES /////////// /////////// NODES ///////////
@ -352,6 +323,10 @@ extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_,
(void)domain_id; (void)domain_id;
(void)security_options; (void)security_options;
dds_entity_t pp = ref_ppant();
if (pp < 0) {
return nullptr;
}
auto *node_impl = new CddsNode(); auto *node_impl = new CddsNode();
rmw_node_t *node_handle = nullptr; rmw_node_t *node_handle = nullptr;
RET_ALLOC_X(node_impl, goto fail_node_impl); RET_ALLOC_X(node_impl, goto fail_node_impl);
@ -361,11 +336,6 @@ extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_,
} }
node_impl->graph_guard_condition = graph_guard_condition; node_impl->graph_guard_condition = graph_guard_condition;
if ((node_impl->ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL)) < 0) {
RMW_SET_ERROR_MSG("failed to create participant");
goto fail_ppant;
}
node_handle = rmw_node_allocate(); node_handle = rmw_node_allocate();
RET_ALLOC_X(node_handle, goto fail_node_handle); RET_ALLOC_X(node_handle, goto fail_node_handle);
node_handle->implementation_identifier = adlink_cyclonedds_identifier; node_handle->implementation_identifier = adlink_cyclonedds_identifier;
@ -389,16 +359,13 @@ extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_,
fail_node_handle_name: fail_node_handle_name:
rmw_node_free(node_handle); rmw_node_free(node_handle);
fail_node_handle: fail_node_handle:
if (dds_delete(node_impl->ppant) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete participant during error handling");
}
fail_ppant:
if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling");
} }
fail_ggc: fail_ggc:
delete node_impl; delete node_impl;
fail_node_impl: fail_node_impl:
unref_ppant();
return nullptr; return nullptr;
} }
@ -408,10 +375,6 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t *node)
RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(node);
auto node_impl = static_cast<CddsNode *>(node->data); auto node_impl = static_cast<CddsNode *>(node->data);
RET_NULL(node_impl); RET_NULL(node_impl);
if (dds_delete(node_impl->ppant) < 0) {
RMW_SET_ERROR_MSG("failed to delete participant");
result_ret = RMW_RET_ERROR;
}
rmw_free(const_cast<char *>(node->name)); rmw_free(const_cast<char *>(node->name));
rmw_free(const_cast<char *>(node->namespace_)); rmw_free(const_cast<char *>(node->namespace_));
rmw_node_free(node); rmw_node_free(node);
@ -420,6 +383,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t *node)
result_ret = RMW_RET_ERROR; result_ret = RMW_RET_ERROR;
} }
delete node_impl; delete node_impl;
unref_ppant();
return result_ret; return result_ret;
} }
@ -551,17 +515,14 @@ static CddsPublisher *create_cdds_publisher(const rmw_node_t *node, const rosidl
pub->ts.type_support_ = create_message_type_support(type_support->data, pub->ts.typesupport_identifier_); pub->ts.type_support_ = create_message_type_support(type_support->data, pub->ts.typesupport_identifier_);
std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies);
/*FIXME: fix topic creation issues in CycloneDDS */ if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) {
if ((topic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("failed to create topic");
if ((topic = dds_find_topic(node_impl->ppant, fqtopic_name.c_str())) < 0) { goto fail_topic;
RMW_SET_ERROR_MSG("failed to create topic");
goto fail_topic;
}
} }
if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies)) == nullptr) {
goto fail_qos; goto fail_qos;
} }
if ((pub->pubh = dds_create_writer(node_impl->ppant, topic, qos, NULL)) < 0) { if ((pub->pubh = dds_create_writer(gcdds.ppant, topic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
} }
@ -665,13 +626,6 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t *node, rmw_publisher_t *pu
/////////// /////////// /////////// ///////////
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
static void subhandler(dds_entity_t rd, void *vsub)
{
CddsSubscription *sub = static_cast<CddsSubscription *>(vsub);
(void)rd;
condition_add_trigger(&sub->cond, 1);
}
static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications)
{ {
RET_WRONG_IMPLID_X(node, return nullptr); RET_WRONG_IMPLID_X(node, return nullptr);
@ -681,11 +635,9 @@ static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const
RET_NULL_X(node_impl, return nullptr); RET_NULL_X(node_impl, return nullptr);
const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); const rosidl_message_type_support_t *type_support = get_typesupport(type_supports);
RET_NULL_X(type_support, return nullptr); RET_NULL_X(type_support, return nullptr);
(void)ignore_local_publications;
CddsSubscription *sub = new CddsSubscription(); CddsSubscription *sub = new CddsSubscription();
dds_entity_t topic; dds_entity_t topic;
dds_qos_t *qos; dds_qos_t *qos;
dds_listener_t *listeners;
sub->node = node_impl; sub->node = node_impl;
sub->ignore_local_publications = ignore_local_publications; sub->ignore_local_publications = ignore_local_publications;
@ -693,31 +645,28 @@ static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const
sub->ts.type_support_ = create_message_type_support(type_support->data, sub->ts.typesupport_identifier_); sub->ts.type_support_ = create_message_type_support(type_support->data, sub->ts.typesupport_identifier_);
std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies);
/*FIXME: fix topic creation issues in CycloneDDS */ if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) {
if ((topic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("failed to create topic");
if ((topic = dds_find_topic(node_impl->ppant, fqtopic_name.c_str())) < 0) { goto fail_topic;
RMW_SET_ERROR_MSG("failed to create topic");
goto fail_topic;
}
} }
if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { if ((qos = create_readwrite_qos(qos_policies)) == nullptr) {
goto fail_qos; goto fail_qos;
} }
if ((listeners = dds_listener_create(static_cast<void *>(sub))) == nullptr) { if ((sub->subh = dds_create_reader(gcdds.ppant, topic, qos, nullptr)) < 0) {
goto fail_listener;
}
dds_lset_data_available(listeners, subhandler);
if ((sub->subh = dds_create_reader(node_impl->ppant, topic, qos, listeners)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader"); RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader; goto fail_reader;
} }
if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) {
RMW_SET_ERROR_MSG("failed to create readcondition");
goto fail_readcond;
}
dds_qos_delete(qos); dds_qos_delete(qos);
dds_listener_delete(listeners);
return sub; return sub;
fail_readcond:
if (dds_delete(sub->subh) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling");
}
fail_reader: fail_reader:
dds_listener_delete(listeners);
fail_listener:
dds_qos_delete(qos); dds_qos_delete(qos);
fail_qos: fail_qos:
/* FIXME: leak topic */ /* FIXME: leak topic */
@ -744,8 +693,11 @@ extern "C" rmw_subscription_t *rmw_create_subscription(const rmw_node_t *node, c
fail_topic_name: fail_topic_name:
rmw_subscription_free(rmw_subscription); rmw_subscription_free(rmw_subscription);
fail_subscription: fail_subscription:
if (dds_delete(sub->rdcondh) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete readcondition during error handling");
}
if (dds_delete(sub->subh) < 0) { if (dds_delete(sub->subh) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling");
} }
delete sub; delete sub;
fail_common_init: fail_common_init:
@ -758,6 +710,9 @@ extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t *node, rmw_subscription
RET_WRONG_IMPLID(subscription); RET_WRONG_IMPLID(subscription);
auto sub = static_cast<CddsSubscription *>(subscription->data); auto sub = static_cast<CddsSubscription *>(subscription->data);
if (sub != nullptr) { if (sub != nullptr) {
if (dds_delete(sub->rdcondh) < 0) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
}
if (dds_delete(sub->subh) < 0) { if (dds_delete(sub->subh) < 0) {
RMW_SET_ERROR_MSG("failed to delete reader"); RMW_SET_ERROR_MSG("failed to delete reader");
} }
@ -779,7 +734,6 @@ static rmw_ret_t rmw_take_int(const rmw_subscription_t *subscription, void *ros_
struct serdata *sd; struct serdata *sd;
dds_sample_info_t info; dds_sample_info_t info;
while (dds_takecdr(sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { while (dds_takecdr(sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) {
condition_add_trigger(&sub->cond, -1);
if (info.valid_data && !(sub->ignore_local_publications && sub->node->own_writers.count(info.publication_handle))) { if (info.valid_data && !(sub->ignore_local_publications && sub->node->own_writers.count(info.publication_handle))) {
size_t sz; size_t sz;
void *raw; void *raw;
@ -821,109 +775,55 @@ extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription,
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
struct CddsWaitset { struct CddsWaitset {
std::condition_variable condition; dds_entity_t waitseth;
std::mutex condition_mutex;
}; };
static void condition_set_trigger(struct condition *cond, unsigned value)
{
std::lock_guard<std::mutex> lock(cond->internalMutex_);
if (cond->conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*cond->conditionMutex_);
// the change to triggerValue_ needs to be mutually exclusive with
// rmw_wait() which checks hasTriggered() and decides if wait() needs to
// be called
const bool notify = (value > 0 && cond->triggerValue_ == 0);
cond->triggerValue_ = value;
clock.unlock();
if (notify) {
cond->conditionVariable_->notify_one();
}
} else {
cond->triggerValue_ = value;
}
}
static void condition_add_trigger(struct condition *cond, int delta)
{
std::lock_guard<std::mutex> lock(cond->internalMutex_);
if (cond->conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*cond->conditionMutex_);
// the change to triggerValue_ needs to be mutually exclusive with
// rmw_wait() which checks hasTriggered() and decides if wait() needs to
// be called
const bool notify = (delta > 0 && cond->triggerValue_ == 0);
assert(delta >= 0 || cond->triggerValue_ >= (unsigned)-delta);
cond->triggerValue_ += delta;
clock.unlock();
if (notify) {
cond->conditionVariable_->notify_one();
}
} else {
assert(delta >= 0 || cond->triggerValue_ >= (unsigned)-delta);
cond->triggerValue_ += delta;
}
}
static void condition_attach(struct condition *cond, std::mutex *conditionMutex, std::condition_variable *conditionVariable)
{
std::lock_guard<std::mutex> lock(cond->internalMutex_);
cond->conditionMutex_ = conditionMutex;
cond->conditionVariable_ = conditionVariable;
}
static void condition_detach(struct condition *cond)
{
std::lock_guard<std::mutex> lock(cond->internalMutex_);
cond->conditionMutex_ = nullptr;
cond->conditionVariable_ = nullptr;
}
static bool condition_read(const struct condition *cond)
{
return cond->triggerValue_ > 0;
}
static bool condition_read(struct condition *cond)
{
return condition_read(const_cast<const struct condition *>(cond));
}
static bool condition_take(struct condition *cond)
{
std::lock_guard<std::mutex> lock(cond->internalMutex_);
bool ret = cond->triggerValue_ > 0;
cond->triggerValue_ = 0;
return ret;
}
extern "C" rmw_guard_condition_t *rmw_create_guard_condition() extern "C" rmw_guard_condition_t *rmw_create_guard_condition()
{ {
rmw_guard_condition_t *guard_condition_handle = new rmw_guard_condition_t; rmw_guard_condition_t *guard_condition_handle;
auto *gcond_impl = new CddsGuardCondition();
if (ref_ppant() < 0) {
goto fail_ppant;
}
if ((gcond_impl->gcondh = dds_create_guardcondition(gcdds.ppant)) < 0) {
RMW_SET_ERROR_MSG("failed to create guardcondition");
goto fail_guardcond;
}
guard_condition_handle = new rmw_guard_condition_t;
guard_condition_handle->implementation_identifier = adlink_cyclonedds_identifier; guard_condition_handle->implementation_identifier = adlink_cyclonedds_identifier;
guard_condition_handle->data = new CddsGuardCondition(); guard_condition_handle->data = gcond_impl;
return guard_condition_handle; return guard_condition_handle;
fail_guardcond:
unref_ppant();
fail_ppant:
delete(gcond_impl);
return nullptr;
} }
extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_condition) extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_condition_handle)
{ {
RET_NULL(guard_condition); RET_NULL(guard_condition_handle);
delete static_cast<CddsGuardCondition *>(guard_condition->data); auto *gcond_impl = static_cast<CddsGuardCondition *>(guard_condition_handle->data);
delete guard_condition; dds_delete(gcond_impl->gcondh);
unref_ppant();
delete gcond_impl;
delete guard_condition_handle;
return RMW_RET_OK; return RMW_RET_OK;
} }
extern "C" rmw_ret_t rmw_trigger_guard_condition(const rmw_guard_condition_t *guard_condition_handle) extern "C" rmw_ret_t rmw_trigger_guard_condition(const rmw_guard_condition_t *guard_condition_handle)
{ {
RET_WRONG_IMPLID(guard_condition_handle); RET_WRONG_IMPLID(guard_condition_handle);
condition_set_trigger(static_cast<CddsGuardCondition *>(guard_condition_handle->data), 1); auto *gcond_impl = static_cast<CddsGuardCondition *>(guard_condition_handle->data);
dds_set_guardcondition(gcond_impl->gcondh, true);
return RMW_RET_OK; return RMW_RET_OK;
} }
extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions)
{ {
(void)max_conditions; (void)max_conditions;
rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); rmw_wait_set_t *wait_set = rmw_wait_set_allocate();
CddsWaitset *ws = nullptr; CddsWaitset *ws = nullptr;
RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); RET_ALLOC_X(wait_set, goto fail_alloc_wait_set);
wait_set->implementation_identifier = adlink_cyclonedds_identifier; wait_set->implementation_identifier = adlink_cyclonedds_identifier;
@ -931,16 +831,21 @@ extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions)
RET_ALLOC_X(wait_set->data, goto fail_alloc_wait_set_data); RET_ALLOC_X(wait_set->data, goto fail_alloc_wait_set_data);
// This should default-construct the fields of CddsWaitset // This should default-construct the fields of CddsWaitset
ws = static_cast<CddsWaitset *>(wait_set->data); ws = static_cast<CddsWaitset *>(wait_set->data);
RMW_TRY_PLACEMENT_NEW(ws, ws, goto fail_placement_new, CddsWaitset, ) RMW_TRY_PLACEMENT_NEW(ws, ws, goto fail_placement_new, CddsWaitset, );
if (!ws) { if (!ws) {
RMW_SET_ERROR_MSG("failed to construct wait set info struct"); RMW_SET_ERROR_MSG("failed to construct wait set info struct");
goto fail_ws; goto fail_ws;
} }
if ((ws->waitseth = dds_create_waitset(gcdds.ppant)) < 0) {
RMW_SET_ERROR_MSG("failed to create waitset");
goto fail_waitset;
}
return wait_set; return wait_set;
fail_waitset:
fail_ws: fail_ws:
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws) RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws);
fail_placement_new: fail_placement_new:
rmw_free(wait_set->data); rmw_free(wait_set->data);
fail_alloc_wait_set_data: fail_alloc_wait_set_data:
rmw_wait_set_free(wait_set); rmw_wait_set_free(wait_set);
@ -948,82 +853,80 @@ extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions)
return nullptr; return nullptr;
} }
extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t * wait_set) extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t *wait_set)
{ {
RET_WRONG_IMPLID(wait_set); RET_WRONG_IMPLID(wait_set);
auto result = RMW_RET_OK; auto result = RMW_RET_OK;
auto ws = static_cast<CddsWaitset *>(wait_set->data); auto ws = static_cast<CddsWaitset *>(wait_set->data);
RET_NULL(ws); RET_NULL(ws);
std::mutex *conditionMutex = &ws->condition_mutex; dds_delete(ws->waitseth);
RET_NULL(conditionMutex); RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR);
RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR) rmw_free(wait_set->data);
rmw_free(wait_set->data);
rmw_wait_set_free(wait_set); rmw_wait_set_free(wait_set);
return result; return result;
} }
static bool check_wait_set_for_data(const rmw_subscriptions_t *subs, const rmw_guard_conditions_t *gcs, const rmw_services_t *srvs, const rmw_clients_t *cls)
{
if (subs) { for (auto&& x : *subs) { if (condition_read(x)) return true; } }
if (cls) { for (auto&& x : *cls) { if (condition_read(x)) return true; } }
if (srvs) { for (auto&& x : *srvs) { if (condition_read(x)) return true; } }
if (gcs) { for (auto&& x : *gcs) { if (condition_read(x)) return true; } }
return false;
}
extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout)
{ {
RET_NULL(wait_set); RET_NULL(wait_set);
CddsWaitset *ws = static_cast<CddsWaitset *>(wait_set->data); CddsWaitset *ws = static_cast<CddsWaitset *>(wait_set->data);
RET_NULL(ws); RET_NULL(ws);
std::mutex *conditionMutex = &ws->condition_mutex; size_t nelems;
std::condition_variable *conditionVariable = &ws->condition; dds_return_t ntrig;
if (subs) { for (auto&& x : *subs) condition_attach(x, conditionMutex, conditionVariable); } nelems = 0;
if (cls) { for (auto&& x : *cls) condition_attach(x, conditionMutex, conditionVariable); } #define ATTACH(type, var, name, condname) do { \
if (srvs) { for (auto&& x : *srvs) condition_attach(x, conditionMutex, conditionVariable); } if (var) { \
if (gcs) { for (auto&& x : *gcs) condition_attach(x, conditionMutex, conditionVariable); } for (size_t i = 0; i < var->name##_count; i++) { \
auto x = static_cast<type *>(var->name##s[i]); \
dds_waitset_attach(ws->waitseth, x->condname, nelems); \
nelems++; \
} \
} \
} while (0)
ATTACH(CddsSubscription, subs, subscriber, rdcondh);
ATTACH(CddsClient, cls, client, client.sub->rdcondh);
ATTACH(CddsService, srvs, service, service.sub->rdcondh);
ATTACH(CddsGuardCondition, gcs, guard_condition, gcondh);
#undef ATTACH
// This mutex prevents any of the listeners to change the internal state and notify the std::vector<dds_attach_t> trigs;
// condition between the call to hasData() / hasTriggered() and wait() otherwise the dds_duration_t timeout;
// decision to wait might be incorrect if (wait_timeout == NULL) {
std::unique_lock<std::mutex> lock(*conditionMutex); timeout = DDS_NEVER;
// First check variables.
// If wait_timeout is null, wait indefinitely (so we have to wait)
// If wait_timeout is not null and either of its fields are nonzero, we have to wait
bool timeout;
if (check_wait_set_for_data(subs, gcs, srvs, cls)) {
timeout = false;
} else if (wait_timeout && wait_timeout->sec == 0 && wait_timeout->nsec == 0) {
/* timeout = 0: no waiting required */
timeout = true;
} else { } else {
auto predicate = [subs, gcs, srvs, cls]() { return check_wait_set_for_data(subs, gcs, srvs, cls); }; timeout = (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec;
if (!wait_timeout) {
conditionVariable->wait(lock, predicate);
timeout = false;
} else {
auto n = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::seconds(wait_timeout->sec));
n += std::chrono::nanoseconds(wait_timeout->nsec);
timeout = !conditionVariable->wait_for(lock, n, predicate);
}
} }
trigs.reserve(nelems + 1);
ntrig = dds_waitset_wait(ws->waitseth, &trigs[0], nelems, timeout);
std::sort(trigs.begin(), trigs.end());
trigs[ntrig] = (dds_attach_t)-1;
// Unlock the condition variable mutex to prevent deadlocks that can occur if long trig_idx = 0;
// a listener triggers while the condition variable is being detached. bool dummy;
// Listeners will no longer be prevented from changing their internal state, nelems = 0;
// but that should not cause issues (if a listener has data / has triggered #define DETACH(type, var, name, condname, on_triggered) do { \
// after we check, it will be caught on the next call to this function). if (var) { \
lock.unlock(); for (size_t i = 0; i < var->name##_count; i++) { \
auto x = static_cast<type *>(var->name##s[i]); \
dds_waitset_detach(ws->waitseth, x->condname); \
if (trigs[trig_idx] == (long)nelems) { \
on_triggered \
trig_idx++; \
} else { \
var->name##s[i] = nullptr; \
} \
nelems++; \
} \
} \
} while (0)
DETACH(CddsSubscription, subs, subscriber, rdcondh, ;);
DETACH(CddsClient, cls, client, client.sub->rdcondh, ;);
DETACH(CddsService, srvs, service, service.sub->rdcondh, ;);
DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy););
#undef DETACH
if (subs) { for (auto&& x : *subs) { condition_detach(x); if (!condition_read(x)) x = nullptr; } } return (ntrig == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK;
if (cls) { for (auto&& x : *cls) { condition_detach(x); if (!condition_read(x)) x = nullptr; } }
if (srvs) { for (auto&& x : *srvs) { condition_detach(x); if (!condition_read(x)) x = nullptr; } }
// guard conditions are auto-resetting, hence condition_take
if (gcs) { for (auto&& x : *gcs) { condition_detach(x); if (!condition_take(x)) x = nullptr; } }
return timeout ? RMW_RET_TIMEOUT : RMW_RET_OK;
} }
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
@ -1040,7 +943,6 @@ static rmw_ret_t rmw_take_response_request(CddsCS *cs, rmw_request_id_t *request
struct serdata *sd; struct serdata *sd;
dds_sample_info_t info; dds_sample_info_t info;
while (dds_takecdr(cs->sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { while (dds_takecdr(cs->sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) {
condition_add_trigger(&cs->sub->cond, -1);
if (info.valid_data) { if (info.valid_data) {
size_t sz; size_t sz;
void *raw; void *raw;
@ -1157,7 +1059,6 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se
subtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); subtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies);
} }
RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client") RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client")
RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str()) RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str())
RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str()) RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str())
@ -1165,44 +1066,37 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se
dds_entity_t pubtopic, subtopic; dds_entity_t pubtopic, subtopic;
dds_qos_t *qos; dds_qos_t *qos;
dds_listener_t *listeners; if ((pubtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, pubtopic_name.c_str(), nullptr, nullptr)) < 0) {
if ((pubtopic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, pubtopic_name.c_str(), NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("failed to create topic");
if ((pubtopic = dds_find_topic(node_impl->ppant, pubtopic_name.c_str())) < 0) { goto fail_pubtopic;
RMW_SET_ERROR_MSG("failed to create topic");
goto fail_pubtopic;
}
} }
if ((subtopic = dds_create_topic(node_impl->ppant, &rmw_cyclonedds_topic_desc, subtopic_name.c_str(), NULL, NULL)) < 0) { if ((subtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, subtopic_name.c_str(), nullptr, nullptr)) < 0) {
if ((subtopic = dds_find_topic(node_impl->ppant, subtopic_name.c_str())) < 0) { RMW_SET_ERROR_MSG("failed to create topic");
RMW_SET_ERROR_MSG("failed to create topic"); goto fail_subtopic;
goto fail_subtopic;
}
} }
if ((qos = dds_qos_create()) == nullptr) { if ((qos = dds_qos_create()) == nullptr) {
goto fail_qos; goto fail_qos;
} }
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1));
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
if ((listeners = dds_listener_create(static_cast<void *>(sub))) == nullptr) { if ((pub->pubh = dds_create_writer(gcdds.ppant, pubtopic, qos, nullptr)) < 0) {
goto fail_listener;
}
dds_lset_data_available(listeners, subhandler);
if ((pub->pubh = dds_create_writer(node_impl->ppant, pubtopic, qos, NULL)) < 0) {
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
} }
if ((sub->subh = dds_create_reader(node_impl->ppant, subtopic, qos, listeners)) < 0) { sub->node = node_impl;
if ((sub->subh = dds_create_reader(gcdds.ppant, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader"); RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader; goto fail_reader;
} }
sub->node = node_impl; if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) {
dds_qos_delete(qos); RMW_SET_ERROR_MSG("failed to create readcondition");
dds_listener_delete(listeners); goto fail_readcond;
}
if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) {
RMW_SET_ERROR_MSG("failed to get instance handle for writer"); RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle; goto fail_instance_handle;
} }
dds_qos_delete(qos);
node_impl->own_writers.insert(pub->pubiid); node_impl->own_writers.insert(pub->pubiid);
cs->pub = pub; cs->pub = pub;
@ -1210,14 +1104,12 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se
return RMW_RET_OK; return RMW_RET_OK;
fail_instance_handle: fail_instance_handle:
if (dds_delete(pub->pubh) < 0) { dds_delete(sub->rdcondh);
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); fail_readcond:
} dds_delete(sub->subh);
fail_reader: fail_reader:
dds_delete(pub->pubh); dds_delete(pub->pubh);
fail_writer: fail_writer:
dds_listener_delete(listeners);
fail_listener:
dds_qos_delete(qos); dds_qos_delete(qos);
fail_qos: fail_qos:
/* leak subtopic */ /* leak subtopic */
@ -1229,6 +1121,7 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se
static void rmw_fini_cs(CddsCS *cs) static void rmw_fini_cs(CddsCS *cs)
{ {
dds_delete(cs->sub->rdcondh);
dds_delete(cs->sub->subh); dds_delete(cs->sub->subh);
dds_delete(cs->pub->pubh); dds_delete(cs->pub->pubh);
cs->sub->node->own_writers.erase(cs->pub->pubiid); cs->sub->node->own_writers.erase(cs->pub->pubiid);