Multi-domain support

Depends on some improvements to Cyclone, source compatible with versions
that lack those improvements.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-09-01 15:30:19 +02:00 committed by eboasson
parent fb8c08bd7f
commit a63cc8b84d
5 changed files with 429 additions and 91 deletions

View file

@ -38,29 +38,8 @@ There are a number of known limitations:
* Cyclone DDS does not yet implement DDS Security. Consequently, there is no support for security * Cyclone DDS does not yet implement DDS Security. Consequently, there is no support for security
in this RMW implementation either. in this RMW implementation either.
* Cyclone DDS does not allow creating a waitset or a guard condition outside a participant, and this
forces the creation of an additional participant. It can be fixed in the RMW layer, or it can be
dealt with in Cyclone DDS, but the trouble with the latter is that there are solid reasons for not
allowing it, even if it is easy to support it today. (E.g., a remote procedure call interface
...)
* Cyclone DDS does not currently support multiple domains simultaneously (waiting in a PR for the
final polish), and so this RMW implementation ignores the domain\_id parameter in create\_node,
instead creating all nodes/participants (including the special participant mentioned above) in the
default domain, which can be controlled via CYCLONEDDS\_URI.
* Deserialization only handles native format (it doesn't do any byte swapping). This is pure * Deserialization only handles native format (it doesn't do any byte swapping). This is pure
laziness, adding it is trivial. laziness, adding it is trivial.
* Deserialization assumes the input is valid and will do terrible things if it isn't. Again, pure * Deserialization assumes the input is valid and will do terrible things if it isn't. Again, pure
laziness, it's just adding some bounds checks and other validation code. laziness, it's just adding some bounds checks and other validation code.
* There are some "oddities" with the way service requests and replies are serialized and what it
uses as a "GUID". (It actually uses an almost-certainly-unique 64-bit number, the Cyclone DDS
instance id, instead of a real GUID.) I'm pretty sure the format is wildly different from that in
other RMW implementations, and so services presumably will not function cross-implementation.
* The name mangling seems to be compatibl-ish with the FastRTPS implementation and in some cases
using the ros2 CLI for querying the system works cross-implementation, but not always. The one in
this implementation is reverse-engineered, so trouble may be lurking somewhere. As a related
point: the "no_demangle" option is currently ignored ... it causes a compiler warning.

View file

@ -54,6 +54,7 @@ add_library(rmw_cyclonedds_cpp
src/rmw_node.cpp src/rmw_node.cpp
src/serdata.cpp src/serdata.cpp
src/serdes.cpp src/serdes.cpp
src/graphrhc.cpp
src/u16string.cpp src/u16string.cpp
) )

View file

@ -0,0 +1,27 @@
// Copyright 2019 ADLINK Technology
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_
#define RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_
#include "dds/dds.h"
/* Introduction of custom RHC coincides with promoting the library instance & domains to entities,
and so with the introduction of DDS_CYCLONEDDS_HANDLE. */
#ifdef DDS_CYCLONEDDS_HANDLE
#include "dds/ddsc/dds_rhc.h"
struct dds_rhc * graphrhc_new();
#endif // DDS_CYCLONEDDS_HANDLE
#endif // RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_

View file

@ -0,0 +1,184 @@
// Copyright 2019 ADLINK Technology
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_
#define RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_
#include "dds/dds.h"
/* Introduction of custom RHC coincides with promoting the library instance & domains to entities,
and so with the introduction of DDS_CYCLONEDDS_HANDLE. */
#ifdef DDS_CYCLONEDDS_HANDLE
#include "dds/ddsc/dds_rhc.h"
struct graphrhc : dds_rhc
{
struct dds_reader * reader;
};
static dds_return_t graphrhc_associate(
struct dds_rhc * rhc_cmn, struct dds_reader * reader,
const struct ddsi_sertopic * topic,
struct ddsi_tkmap * tkmap)
{
// C++ doesn't grok the fake inheritance in C, so static_cast won't work
struct graphrhc * rhc = reinterpret_cast<struct graphrhc *>(rhc_cmn);
rhc->reader = reader;
static_cast<void>(topic);
static_cast<void>(tkmap);
return DDS_RETCODE_OK;
}
static void graphrhc_free(struct ddsi_rhc * rhc_cmn)
{
// C++ doesn't grok the fake inheritance in C, so static_cast won't work
struct graphrhc * rhc = reinterpret_cast<struct graphrhc *>(rhc_cmn);
delete rhc;
}
static bool graphrhc_store(
struct ddsi_rhc * __restrict rhc_cmn,
const struct ddsi_writer_info * __restrict wrinfo,
struct ddsi_serdata * __restrict sample,
struct ddsi_tkmap_instance * __restrict tk)
{
// C++ doesn't grok the fake inheritance in C, so static_cast won't work
struct graphrhc * rhc = reinterpret_cast<struct graphrhc *>(rhc_cmn);
dds_reader_data_available_cb(rhc->reader);
static_cast<void>(wrinfo);
static_cast<void>(sample);
static_cast<void>(tk);
return true;
}
static void graphrhc_unregister_wr(
struct ddsi_rhc * __restrict rhc_cmn,
const struct ddsi_writer_info * __restrict wrinfo)
{
// C++ doesn't grok the fake inheritance in C, so static_cast won't work
struct graphrhc * rhc = reinterpret_cast<struct graphrhc *>(rhc_cmn);
dds_reader_data_available_cb(rhc->reader);
static_cast<void>(wrinfo);
}
static void graphrhc_relinquish_ownership(
struct ddsi_rhc * __restrict rhc_cmn,
const uint64_t wr_iid)
{
static_cast<void>(rhc_cmn);
static_cast<void>(wr_iid);
}
static void graphrhc_set_qos(struct ddsi_rhc * rhc_cmn, const struct dds_qos * qos)
{
static_cast<void>(rhc_cmn);
static_cast<void>(qos);
}
static int graphrhc_read(
struct dds_rhc * rhc_cmn, bool lock, void ** values,
dds_sample_info_t * info_seq, uint32_t max_samples, uint32_t mask,
dds_instance_handle_t handle, struct dds_readcond * cond)
{
static_cast<void>(rhc_cmn);
static_cast<void>(lock);
static_cast<void>(values);
static_cast<void>(info_seq);
static_cast<void>(max_samples);
static_cast<void>(mask);
static_cast<void>(handle);
static_cast<void>(cond);
return 0;
}
static int graphrhc_take(
struct dds_rhc * rhc_cmn, bool lock, void ** values,
dds_sample_info_t * info_seq, uint32_t max_samples, uint32_t mask,
dds_instance_handle_t handle, struct dds_readcond * cond)
{
static_cast<void>(rhc_cmn);
static_cast<void>(lock);
static_cast<void>(values);
static_cast<void>(info_seq);
static_cast<void>(max_samples);
static_cast<void>(mask);
static_cast<void>(handle);
static_cast<void>(cond);
return 0;
}
static int graphrhc_takecdr(
struct dds_rhc * rhc_cmn, bool lock, struct ddsi_serdata ** values,
dds_sample_info_t * info_seq, uint32_t max_samples,
uint32_t sample_states, uint32_t view_states, uint32_t instance_states,
dds_instance_handle_t handle)
{
static_cast<void>(rhc_cmn);
static_cast<void>(lock);
static_cast<void>(values);
static_cast<void>(info_seq);
static_cast<void>(max_samples);
static_cast<void>(sample_states);
static_cast<void>(view_states);
static_cast<void>(instance_states);
static_cast<void>(handle);
return 0;
}
static bool graphrhc_add_readcondition(struct dds_rhc * rhc_cmn, struct dds_readcond * cond)
{
static_cast<void>(rhc_cmn);
static_cast<void>(cond);
return true;
}
static void graphrhc_remove_readcondition(struct dds_rhc * rhc_cmn, struct dds_readcond * cond)
{
static_cast<void>(rhc_cmn);
static_cast<void>(cond);
}
static uint32_t graphrhc_lock_samples(struct dds_rhc * rhc_cmn)
{
static_cast<void>(rhc_cmn);
return 0;
}
static const struct dds_rhc_ops graphrhc_ops = {
{
graphrhc_store,
graphrhc_unregister_wr,
graphrhc_relinquish_ownership,
graphrhc_set_qos,
graphrhc_free
},
graphrhc_read,
graphrhc_take,
graphrhc_takecdr,
graphrhc_add_readcondition,
graphrhc_remove_readcondition,
graphrhc_lock_samples,
graphrhc_associate
};
struct dds_rhc * graphrhc_new()
{
auto rhc = new graphrhc;
rhc->common.ops = &graphrhc_ops;
return static_cast<struct dds_rhc *>(rhc);
}
#endif // DDS_CYCLONEDDS_HANDLE
#endif // RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_

View file

@ -51,6 +51,16 @@
#include "rmw_cyclonedds_cpp/serdes.hpp" #include "rmw_cyclonedds_cpp/serdes.hpp"
#include "rmw_cyclonedds_cpp/serdata.hpp" #include "rmw_cyclonedds_cpp/serdata.hpp"
/* Proper multi-domain support requires eliminating the "extra" participant, which in turn relies on
the promotion of the Cyclone DDS library instance and the daomsin to full-fledged entities. The
custom RHC was introduced at essentially the same time */
#ifdef DDS_CYCLONEDDS_HANDLE
#define MULTIDOMAIN 1
#include "rmw_cyclonedds_cpp/graphrhc.hpp"
#else
#define MULTIDOMAIN 0
#endif
#define RET_ERR_X(msg, code) do {RMW_SET_ERROR_MSG(msg); code;} while (0) #define RET_ERR_X(msg, code) do {RMW_SET_ERROR_MSG(msg); code;} while (0)
#define RET_NULL_X(var, code) do {if (!var) RET_ERR_X (#var " is null", code);} while (0) #define RET_NULL_X(var, code) do {if (!var) RET_ERR_X (#var " is null", code);} while (0)
#define RET_ALLOC_X(var, code) do {if (!var) RET_ERR_X ("failed to allocate " #var, code); \ #define RET_ALLOC_X(var, code) do {if (!var) RET_ERR_X ("failed to allocate " #var, code); \
@ -98,12 +108,20 @@ static const dds_entity_t builtin_topics[] = {
DDS_BUILTIN_TOPIC_DCPSPUBLICATION DDS_BUILTIN_TOPIC_DCPSPUBLICATION
}; };
struct builtin_readers
{
dds_entity_t rds[sizeof(builtin_topics) / sizeof(builtin_topics[0])];
};
struct CddsNode struct CddsNode
{ {
dds_entity_t pp; dds_entity_t pp;
dds_entity_t pub; dds_entity_t pub;
dds_entity_t sub; dds_entity_t sub;
rmw_guard_condition_t * graph_guard_condition; rmw_guard_condition_t * graph_guard_condition;
#if MULTIDOMAIN
builtin_readers brd;
#endif
}; };
struct CddsPublisher struct CddsPublisher
@ -146,6 +164,7 @@ struct CddsWaitset
dds_entity_t waitseth; dds_entity_t waitseth;
std::vector<dds_attach_t> trigs; std::vector<dds_attach_t> trigs;
size_t nelems;
std::mutex lock; std::mutex lock;
bool inuse; bool inuse;
@ -158,9 +177,12 @@ struct CddsWaitset
struct Cdds struct Cdds
{ {
std::mutex lock; std::mutex lock;
#if !MULTIDOMAIN
uint32_t refcount; uint32_t refcount;
dds_entity_t ppant; dds_entity_t ppant;
dds_entity_t builtin_readers[sizeof(builtin_topics) / sizeof(builtin_topics[0])]; builtin_readers brd;
#endif
/* special guard condition that gets attached to every waitset but that is never triggered: /* special guard condition that gets attached to every waitset but that is never triggered:
this way, we can avoid Cyclone's behaviour of always returning immediately when no this way, we can avoid Cyclone's behaviour of always returning immediately when no
@ -171,13 +193,22 @@ struct Cdds
deleted */ deleted */
std::unordered_set<CddsWaitset *> waitsets; std::unordered_set<CddsWaitset *> waitsets;
#if !MULTIDOMAIN
/* set of nodes is used to trigger graph guard conditions when something changes, but that /* set of nodes is used to trigger graph guard conditions when something changes, but that
something also changes when creating the built-in readers when Cdds::lock is already held */ something also changes when creating the built-in readers when Cdds::lock is already held */
std::mutex nodes_lock; std::mutex nodes_lock;
std::unordered_set<CddsNode *> nodes; std::unordered_set<CddsNode *> nodes;
#endif
#if MULTIDOMAIN
Cdds() Cdds()
: refcount(0), ppant(0) {} : gc_for_empty_waitset(0)
{}
#else
Cdds()
: refcount(0), ppant(0), gc_for_empty_waitset(0)
{}
#endif
}; };
static Cdds gcdds; static Cdds gcdds;
@ -304,8 +335,25 @@ extern "C" rmw_ret_t rmw_shutdown(rmw_context_t * context)
return RMW_RET_OK; return RMW_RET_OK;
} }
/////////////////////////////////////////////////////////////////////////////////////////
/////////// ///////////
/////////// GRAPH GUARD ///////////
/////////// ///////////
/////////////////////////////////////////////////////////////////////////////////////////
static void ggcallback(dds_entity_t rd, void * varg) static void ggcallback(dds_entity_t rd, void * varg)
{ {
#if MULTIDOMAIN
auto gg = static_cast<rmw_guard_condition_t *>(varg);
void * msg = 0;
dds_sample_info_t info;
while (dds_take(rd, &msg, &info, 1, 1) > 0) {
dds_return_loan(rd, &msg, 1);
}
if (rmw_trigger_guard_condition(gg) != RMW_RET_OK) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to trigger graph guard condition");
}
#else
static_cast<void>(varg); static_cast<void>(varg);
void * msg = 0; void * msg = 0;
dds_sample_info_t info; dds_sample_info_t info;
@ -321,8 +369,59 @@ static void ggcallback(dds_entity_t rd, void * varg)
} }
} }
} }
#endif
} }
static void builtin_readers_fini(builtin_readers & brd)
{
for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) {
if (brd.rds[i] > 0) {
dds_delete(brd.rds[i]);
}
}
}
static bool builtin_readers_init(builtin_readers & brd, dds_entity_t pp, rmw_guard_condition_t * gg)
{
/* Built-in topics readers: have to be per-node or the graph guard condition support becomes a
real mess. */
#if MULTIDOMAIN
assert(gg != nullptr);
#else
assert(gg == nullptr);
#endif
dds_listener_t * gglistener = dds_create_listener(static_cast<void *>(gg));
dds_lset_data_available(gglistener, ggcallback);
for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) {
brd.rds[i] = 0;
}
for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) {
#if MULTIDOMAIN
struct dds_rhc * rhc = graphrhc_new();
dds_entity_t rd = dds_create_reader_rhc(pp, builtin_topics[i], nullptr, gglistener, rhc);
#else
dds_entity_t rd = dds_create_reader(pp, builtin_topics[i], nullptr, gglistener);
#endif
if (rd < 0) {
#if MULTIDOMAIN
dds_rhc_free(rhc);
#endif
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
"rmw_create_node: failed to create DDS built-in reader");
goto fail;
}
brd.rds[i] = rd;
}
dds_delete_listener(gglistener);
return true;
fail:
builtin_readers_fini(brd);
dds_delete_listener(gglistener);
return false;
}
#if !MULTIDOMAIN
static dds_entity_t ref_ppant() static dds_entity_t ref_ppant()
{ {
std::lock_guard<std::mutex> lock(gcdds.lock); std::lock_guard<std::mutex> lock(gcdds.lock);
@ -336,29 +435,14 @@ static dds_entity_t ref_ppant()
RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets"); RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets");
dds_delete(gcdds.ppant); dds_delete(gcdds.ppant);
gcdds.ppant = 0; gcdds.ppant = 0;
return 0; return DDS_RETCODE_ERROR;
} }
static_assert(sizeof(gcdds.builtin_readers) / sizeof(gcdds.builtin_readers[0]) == if (!builtin_readers_init(gcdds.brd, gcdds.ppant, nullptr)) {
sizeof(builtin_topics) / sizeof(builtin_topics[0]),
"mismatch between array of built-in topics and array of built-in readers");
dds_listener_t * gglistener = dds_create_listener(nullptr);
dds_lset_data_available(gglistener, ggcallback);
for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) {
dds_entity_t rd = dds_create_reader(gcdds.ppant, builtin_topics[i], nullptr, gglistener);
if (rd < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
"rmw_create_node: failed to create DDS built-in reader");
while (i--) {
dds_delete(gcdds.builtin_readers[i]);
}
dds_delete(gcdds.ppant); dds_delete(gcdds.ppant);
gcdds.ppant = 0; gcdds.ppant = 0;
return 0; return DDS_RETCODE_ERROR;
} }
gcdds.builtin_readers[i] = rd;
}
dds_delete_listener(gglistener);
} }
gcdds.refcount++; gcdds.refcount++;
return gcdds.ppant; return gcdds.ppant;
@ -368,10 +452,12 @@ static void unref_ppant()
{ {
std::lock_guard<std::mutex> lock(gcdds.lock); std::lock_guard<std::mutex> lock(gcdds.lock);
if (--gcdds.refcount == 0) { if (--gcdds.refcount == 0) {
builtin_readers_fini(gcdds.brd);
dds_delete(gcdds.ppant); dds_delete(gcdds.ppant);
gcdds.ppant = 0; gcdds.ppant = 0;
} }
} }
#endif
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
/////////// /////////// /////////// ///////////
@ -394,7 +480,18 @@ extern "C" rmw_node_t * rmw_create_node(
static_cast<void>(context); static_cast<void>(context);
RET_NULL_X(name, return nullptr); RET_NULL_X(name, return nullptr);
RET_NULL_X(namespace_, return nullptr); RET_NULL_X(namespace_, return nullptr);
(void) domain_id; #if MULTIDOMAIN
/* domain_id = UINT32_MAX = Cyclone DDS' "default domain id".*/
if (domain_id >= UINT32_MAX) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
"rmw_create_node: domain id out of range");
return nullptr;
}
const dds_domainid_t did = static_cast<dds_domainid_t>(domain_id);
#else
static_cast<void>(domain_id);
const dds_domainid_t did = DDS_DOMAIN_DEFAULT;
#endif
(void) security_options; (void) security_options;
rmw_ret_t ret; rmw_ret_t ret;
int dummy_validation_result; int dummy_validation_result;
@ -407,13 +504,16 @@ extern "C" rmw_node_t * rmw_create_node(
dds_qos_t * qos = dds_create_qos(); dds_qos_t * qos = dds_create_qos();
std::string user_data = get_node_user_data(name, namespace_); std::string user_data = get_node_user_data(name, namespace_);
dds_qset_userdata(qos, user_data.c_str(), user_data.size()); dds_qset_userdata(qos, user_data.c_str(), user_data.size());
dds_entity_t pp = dds_create_participant(DDS_DOMAIN_DEFAULT, qos, nullptr); dds_entity_t pp = dds_create_participant(did, qos, nullptr);
dds_delete_qos(qos); dds_delete_qos(qos);
if (pp < 0) { if (pp < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
"rmw_create_node: failed to create DDS participant"); "rmw_create_node: failed to create DDS participant");
return nullptr; return nullptr;
} }
/* Since ROS2 doesn't require anything fancy from DDS Subscribers or Publishers, create a single
pair & reuse that */
dds_entity_t pub, sub; dds_entity_t pub, sub;
if ((pub = dds_create_publisher(pp, nullptr, nullptr)) < 0) { if ((pub = dds_create_publisher(pp, nullptr, nullptr)) < 0) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
@ -427,6 +527,7 @@ extern "C" rmw_node_t * rmw_create_node(
dds_delete(pp); dds_delete(pp);
return nullptr; return nullptr;
} }
auto * node_impl = new CddsNode(); auto * node_impl = new CddsNode();
rmw_node_t * node_handle = nullptr; rmw_node_t * node_handle = nullptr;
RET_ALLOC_X(node_impl, goto fail_node_impl); RET_ALLOC_X(node_impl, goto fail_node_impl);
@ -439,12 +540,16 @@ extern "C" rmw_node_t * rmw_create_node(
node_impl->sub = sub; node_impl->sub = sub;
node_impl->graph_guard_condition = graph_guard_condition; node_impl->graph_guard_condition = graph_guard_condition;
#if MULTIDOMAIN
if (!builtin_readers_init(node_impl->brd, pp, graph_guard_condition)) {
goto fail_builtin_reader;
}
#else
{ {
std::lock_guard<std::mutex> lock(gcdds.nodes_lock); std::lock_guard<std::mutex> lock(gcdds.nodes_lock);
gcdds.nodes.insert(node_impl); gcdds.nodes.insert(node_impl);
} }
#endif
/* FIXME: should there be a (potentially spurious) trigger of the graph guard condition here? */
node_handle = rmw_node_allocate(); node_handle = rmw_node_allocate();
RET_ALLOC_X(node_handle, goto fail_node_handle); RET_ALLOC_X(node_handle, goto fail_node_handle);
@ -461,19 +566,20 @@ extern "C" rmw_node_t * rmw_create_node(
memcpy(const_cast<char *>(node_handle->namespace_), namespace_, strlen(namespace_) + 1); memcpy(const_cast<char *>(node_handle->namespace_), namespace_, strlen(namespace_) + 1);
return node_handle; return node_handle;
#if 0
fail_add_node:
rmw_free(const_cast<char *>(node_handle->namespace_));
#endif
fail_node_handle_namespace: fail_node_handle_namespace:
rmw_free(const_cast<char *>(node_handle->name)); rmw_free(const_cast<char *>(node_handle->name));
fail_node_handle_name: fail_node_handle_name:
rmw_node_free(node_handle); rmw_node_free(node_handle);
fail_node_handle: fail_node_handle:
#if !MULTIDOMAIN
{ {
std::lock_guard<std::mutex> lock(gcdds.nodes_lock); std::lock_guard<std::mutex> lock(gcdds.nodes_lock);
gcdds.nodes.erase(node_impl); gcdds.nodes.erase(node_impl);
} }
#else
builtin_readers_fini(node_impl->brd);
fail_builtin_reader:
#endif
if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) {
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
"failed to destroy guard condition during error handling"); "failed to destroy guard condition during error handling");
@ -494,10 +600,16 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node)
rmw_free(const_cast<char *>(node->name)); rmw_free(const_cast<char *>(node->name));
rmw_free(const_cast<char *>(node->namespace_)); rmw_free(const_cast<char *>(node->namespace_));
rmw_node_free(node); rmw_node_free(node);
#if MULTIDOMAIN
/* Must prevent the built-in topic listener from triggering before deleting the graph guard
condition. Deleting them first is the easiest. */
builtin_readers_fini(node_impl->brd);
#else
{ {
std::lock_guard<std::mutex> lock(gcdds.nodes_lock); std::lock_guard<std::mutex> lock(gcdds.nodes_lock);
gcdds.nodes.erase(node_impl); gcdds.nodes.erase(node_impl);
} }
#endif
if (RMW_RET_OK != rmw_destroy_guard_condition(node_impl->graph_guard_condition)) { if (RMW_RET_OK != rmw_destroy_guard_condition(node_impl->graph_guard_condition)) {
RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); RMW_SET_ERROR_MSG("failed to destroy graph guard condition");
result_ret = RMW_RET_ERROR; result_ret = RMW_RET_ERROR;
@ -571,7 +683,7 @@ extern "C" rmw_ret_t rmw_serialize(
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
} }
/* FIXME: what about the header - should be included or not? */
if ((ret = rmw_serialized_message_resize(serialized_message, data.size())) != RMW_RET_OK) { if ((ret = rmw_serialized_message_resize(serialized_message, data.size())) != RMW_RET_OK) {
return ret; return ret;
} }
@ -906,8 +1018,6 @@ static CddsPublisher * create_cdds_publisher(
RMW_SET_ERROR_MSG("failed to get instance handle for writer"); RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle; goto fail_instance_handle;
} }
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
pub->sertopic = sertopic; pub->sertopic = sertopic;
dds_delete_qos(qos); dds_delete_qos(qos);
dds_delete(topic); dds_delete(topic);
@ -1094,8 +1204,6 @@ static CddsSubscription * create_cdds_subscription(
RMW_SET_ERROR_MSG("failed to create readcondition"); RMW_SET_ERROR_MSG("failed to create readcondition");
goto fail_readcond; goto fail_readcond;
} }
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
sub->sertopic = sertopic; sub->sertopic = sertopic;
dds_delete_qos(qos); dds_delete_qos(qos);
dds_delete(topic); dds_delete(topic);
@ -1410,10 +1518,15 @@ extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * co
static_cast<void>(context); static_cast<void>(context);
rmw_guard_condition_t * guard_condition_handle; rmw_guard_condition_t * guard_condition_handle;
auto * gcond_impl = new CddsGuardCondition(); auto * gcond_impl = new CddsGuardCondition();
if (ref_ppant() < 0) { #if MULTIDOMAIN
const dds_entity_t owner = DDS_CYCLONEDDS_HANDLE;
#else
const dds_entity_t owner = ref_ppant();
if (owner < 0) {
goto fail_ppant; goto fail_ppant;
} }
if ((gcond_impl->gcondh = dds_create_guardcondition(gcdds.ppant)) < 0) { #endif
if ((gcond_impl->gcondh = dds_create_guardcondition(owner)) < 0) {
RMW_SET_ERROR_MSG("failed to create guardcondition"); RMW_SET_ERROR_MSG("failed to create guardcondition");
goto fail_guardcond; goto fail_guardcond;
} }
@ -1423,8 +1536,10 @@ extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * co
return guard_condition_handle; return guard_condition_handle;
fail_guardcond: fail_guardcond:
#if !MULTIDOMAIN
unref_ppant(); unref_ppant();
fail_ppant: fail_ppant:
#endif
delete (gcond_impl); delete (gcond_impl);
return nullptr; return nullptr;
} }
@ -1435,7 +1550,6 @@ extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t * guard_c
auto * gcond_impl = static_cast<CddsGuardCondition *>(guard_condition_handle->data); auto * gcond_impl = static_cast<CddsGuardCondition *>(guard_condition_handle->data);
clean_waitset_caches(); clean_waitset_caches();
dds_delete(gcond_impl->gcondh); dds_delete(gcond_impl->gcondh);
unref_ppant();
delete gcond_impl; delete gcond_impl;
delete guard_condition_handle; delete guard_condition_handle;
return RMW_RET_OK; return RMW_RET_OK;
@ -1456,6 +1570,7 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t
(void) max_conditions; (void) max_conditions;
rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); rmw_wait_set_t * wait_set = rmw_wait_set_allocate();
CddsWaitset * ws = nullptr; CddsWaitset * ws = nullptr;
dds_entity_t owner = 0;
RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); RET_ALLOC_X(wait_set, goto fail_alloc_wait_set);
wait_set->implementation_identifier = eclipse_cyclonedds_identifier; wait_set->implementation_identifier = eclipse_cyclonedds_identifier;
wait_set->data = rmw_allocate(sizeof(CddsWaitset)); wait_set->data = rmw_allocate(sizeof(CddsWaitset));
@ -1468,26 +1583,53 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t
goto fail_ws; goto fail_ws;
} }
ws->inuse = false; ws->inuse = false;
if ((ws->waitseth = dds_create_waitset(ref_ppant())) < 0) { ws->nelems = 0;
#if MULTIDOMAIN
owner = DDS_CYCLONEDDS_HANDLE;
#else
owner = ref_ppant();
if (owner < 0) {
goto fail_ppant;
}
#endif
if ((ws->waitseth = dds_create_waitset(owner)) < 0) {
RMW_SET_ERROR_MSG("failed to create waitset"); RMW_SET_ERROR_MSG("failed to create waitset");
goto fail_waitset; goto fail_waitset;
} }
{
std::lock_guard<std::mutex> lock(gcdds.lock);
#if MULTIDOMAIN
// Lazily create dummy guard condition
if (gcdds.waitsets.size() == 0) {
if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(owner)) < 0) {
RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets");
goto fail_create_dummy;
}
}
#endif
// Attach never-triggered guard condition. As it will never be triggered, it will never be // Attach never-triggered guard condition. As it will never be triggered, it will never be
// included in the result of dds_waitset_wait // included in the result of dds_waitset_wait
if (dds_waitset_attach(ws->waitseth, gcdds.gc_for_empty_waitset, INTPTR_MAX) < 0) { if (dds_waitset_attach(ws->waitseth, gcdds.gc_for_empty_waitset, INTPTR_MAX) < 0) {
RMW_SET_ERROR_MSG("failed to attach dummy guard condition for blocking on empty waitset"); RMW_SET_ERROR_MSG("failed to attach dummy guard condition for blocking on empty waitset");
goto fail_attach_dummy; goto fail_attach_dummy;
} }
{
std::lock_guard<std::mutex> lock(gcdds.lock);
gcdds.waitsets.insert(ws); gcdds.waitsets.insert(ws);
} }
return wait_set; return wait_set;
fail_attach_dummy: fail_attach_dummy:
#if MULTIDOMAIN
fail_create_dummy:
#endif
dds_delete(ws->waitseth); dds_delete(ws->waitseth);
fail_waitset: fail_waitset:
#if !MULTIDOMAIN
unref_ppant(); unref_ppant();
fail_ppant:
#endif
fail_ws: fail_ws:
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws); RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws);
fail_placement_new: fail_placement_new:
@ -1508,8 +1650,13 @@ extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t * wait_set)
{ {
std::lock_guard<std::mutex> lock(gcdds.lock); std::lock_guard<std::mutex> lock(gcdds.lock);
gcdds.waitsets.erase(ws); gcdds.waitsets.erase(ws);
#if MULTIDOMAIN
if (gcdds.waitsets.size() == 0) {
dds_delete(gcdds.gc_for_empty_waitset);
gcdds.gc_for_empty_waitset = 0;
}
#endif
} }
unref_ppant();
RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR); RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR);
rmw_free(wait_set->data); rmw_free(wait_set->data);
rmw_wait_set_free(wait_set); rmw_wait_set_free(wait_set);
@ -1548,6 +1695,7 @@ static void waitset_detach(CddsWaitset * ws)
ws->gcs.resize(0); ws->gcs.resize(0);
ws->srvs.resize(0); ws->srvs.resize(0);
ws->cls.resize(0); ws->cls.resize(0);
ws->nelems = 0;
} }
static void clean_waitset_caches() static void clean_waitset_caches()
@ -1611,9 +1759,10 @@ extern "C" rmw_ret_t rmw_wait(
ATTACH(CddsService, srvs, service, service.sub->rdcondh); ATTACH(CddsService, srvs, service, service.sub->rdcondh);
ATTACH(CddsClient, cls, client, client.sub->rdcondh); ATTACH(CddsClient, cls, client, client.sub->rdcondh);
#undef ATTACH #undef ATTACH
ws->trigs.resize(nelems + 1); ws->nelems = nelems;
} }
ws->trigs.resize(ws->nelems + 1);
const dds_duration_t timeout = const dds_duration_t timeout =
(wait_timeout == NULL) ? (wait_timeout == NULL) ?
DDS_NEVER : DDS_NEVER :
@ -1849,15 +1998,11 @@ static rmw_ret_t rmw_init_cs(
RMW_SET_ERROR_MSG("failed to create writer"); RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer; goto fail_writer;
} }
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
pub->sertopic = pub_st; pub->sertopic = pub_st;
if ((sub->subh = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { if ((sub->subh = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader"); RMW_SET_ERROR_MSG("failed to create reader");
goto fail_reader; goto fail_reader;
} }
/* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been
created earlier, but the two are equivalent, so this'll do */
sub->sertopic = sub_st; sub->sertopic = sub_st;
if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) {
RMW_SET_ERROR_MSG("failed to create readcondition"); RMW_SET_ERROR_MSG("failed to create readcondition");
@ -1988,11 +2133,11 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * serv
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
static rmw_ret_t do_for_node( static rmw_ret_t do_for_node(
CddsNode * node_impl,
std::function<bool(const dds_builtintopic_participant_t & sample)> oper) std::function<bool(const dds_builtintopic_participant_t & sample)> oper)
{ {
dds_entity_t rd; dds_entity_t rd;
if ((rd = dds_create_reader(ref_ppant(), DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { if ((rd = dds_create_reader(node_impl->pp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) {
unref_ppant();
RMW_SET_ERROR_MSG("rmw_get_node_names: failed to create reader"); RMW_SET_ERROR_MSG("rmw_get_node_names: failed to create reader");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
@ -2008,7 +2153,6 @@ static rmw_ret_t do_for_node(
dds_return_loan(rd, &msg, n); dds_return_loan(rd, &msg, n);
} }
dds_delete(rd); dds_delete(rd);
unref_ppant();
if (n < 0) { if (n < 0) {
RMW_SET_ERROR_MSG("rmw_get_node_names: error reading participants"); RMW_SET_ERROR_MSG("rmw_get_node_names: error reading participants");
return RMW_RET_ERROR; return RMW_RET_ERROR;
@ -2017,6 +2161,7 @@ static rmw_ret_t do_for_node(
} }
static rmw_ret_t do_for_node_user_data( static rmw_ret_t do_for_node_user_data(
CddsNode * node_impl,
std::function<bool(const dds_builtintopic_participant_t & sample, const char * user_data)> oper) std::function<bool(const dds_builtintopic_participant_t & sample, const char * user_data)> oper)
{ {
auto f = [oper](const dds_builtintopic_participant_t & sample) -> bool { auto f = [oper](const dds_builtintopic_participant_t & sample) -> bool {
@ -2033,7 +2178,7 @@ static rmw_ret_t do_for_node_user_data(
return oper(sample, ""); return oper(sample, "");
} }
}; };
return do_for_node(f); return do_for_node(node_impl, f);
} }
extern "C" rmw_ret_t rmw_get_node_names( extern "C" rmw_ret_t rmw_get_node_names(
@ -2042,6 +2187,7 @@ extern "C" rmw_ret_t rmw_get_node_names(
rcutils_string_array_t * node_namespaces) rcutils_string_array_t * node_namespaces)
{ {
RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(node);
auto node_impl = static_cast<CddsNode *>(node->data);
if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK || if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK ||
rmw_check_zero_rmw_string_array(node_namespaces) != RMW_RET_OK) rmw_check_zero_rmw_string_array(node_namespaces) != RMW_RET_OK)
{ {
@ -2060,7 +2206,7 @@ extern "C" rmw_ret_t rmw_get_node_names(
return true; return true;
}; };
rmw_ret_t ret; rmw_ret_t ret;
if ((ret = do_for_node_user_data(oper)) != RMW_RET_OK) { if ((ret = do_for_node_user_data(node_impl, oper)) != RMW_RET_OK) {
return ret; return ret;
} }
@ -2106,6 +2252,7 @@ fail_alloc:
static rmw_ret_t rmw_collect_tptyp_for_kind( static rmw_ret_t rmw_collect_tptyp_for_kind(
std::map<std::string, std::set<std::string>> & tt, std::map<std::string, std::set<std::string>> & tt,
CddsNode * node_impl,
dds_entity_t builtin_topic, dds_entity_t builtin_topic,
std::function<bool(const dds_builtintopic_endpoint_t & sample, std::string & topic_name, std::function<bool(const dds_builtintopic_endpoint_t & sample, std::string & topic_name,
std::string & type_name)> filter_and_map) std::string & type_name)> filter_and_map)
@ -2114,8 +2261,7 @@ static rmw_ret_t rmw_collect_tptyp_for_kind(
builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION ||
builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION);
dds_entity_t rd; dds_entity_t rd;
if ((rd = dds_create_reader(ref_ppant(), builtin_topic, NULL, NULL)) < 0) { if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) {
unref_ppant();
RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader"); RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
@ -2133,7 +2279,6 @@ static rmw_ret_t rmw_collect_tptyp_for_kind(
dds_return_loan(rd, &msg, n); dds_return_loan(rd, &msg, n);
} }
dds_delete(rd); dds_delete(rd);
unref_ppant();
if (n == 0) { if (n == 0) {
return RMW_RET_OK; return RMW_RET_OK;
} else { } else {
@ -2185,6 +2330,7 @@ fail_mem:
} }
static rmw_ret_t get_node_guids( static rmw_ret_t get_node_guids(
CddsNode * node_impl,
const char * node_name, const char * node_namespace, const char * node_name, const char * node_namespace,
std::set<dds_builtintopic_guid_t> & guids) std::set<dds_builtintopic_guid_t> & guids)
{ {
@ -2196,7 +2342,7 @@ static rmw_ret_t get_node_guids(
} }
return true; /* do keep looking - what if there are many? */ return true; /* do keep looking - what if there are many? */
}; };
rmw_ret_t ret = do_for_node_user_data(oper); rmw_ret_t ret = do_for_node_user_data(node_impl, oper);
if (ret != RMW_RET_OK) { if (ret != RMW_RET_OK) {
return ret; return ret;
} else if (guids.size() == 0) { } else if (guids.size() == 0) {
@ -2217,6 +2363,7 @@ static rmw_ret_t get_endpoint_names_and_types_by_node(
bool pubs) bool pubs)
{ {
RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(node);
auto node_impl = static_cast<CddsNode *>(node->data);
RET_NULL(allocator); RET_NULL(allocator);
rmw_ret_t ret = rmw_names_and_types_check_zero(tptyp); rmw_ret_t ret = rmw_names_and_types_check_zero(tptyp);
if (ret != RMW_RET_OK) { if (ret != RMW_RET_OK) {
@ -2234,7 +2381,7 @@ static rmw_ret_t get_endpoint_names_and_types_by_node(
} }
std::set<dds_builtintopic_guid_t> guids; std::set<dds_builtintopic_guid_t> guids;
if (node_name != nullptr && if (node_name != nullptr &&
(ret = get_node_guids(node_name, node_namespace, guids)) != RMW_RET_OK) (ret = get_node_guids(node_impl, node_name, node_namespace, guids)) != RMW_RET_OK)
{ {
return ret; return ret;
} }
@ -2268,14 +2415,14 @@ static rmw_ret_t get_endpoint_names_and_types_by_node(
std::map<std::string, std::set<std::string>> tt; std::map<std::string, std::set<std::string>> tt;
if (subs && if (subs &&
(ret = (ret =
rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION,
filter_and_map)) != RMW_RET_OK) filter_and_map)) != RMW_RET_OK)
{ {
return ret; return ret;
} }
if (pubs && if (pubs &&
(ret = (ret =
rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION,
filter_and_map)) != RMW_RET_OK) filter_and_map)) != RMW_RET_OK)
{ {
return ret; return ret;
@ -2306,6 +2453,7 @@ static rmw_ret_t get_cs_names_and_types_by_node(
bool looking_for_services) bool looking_for_services)
{ {
RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(node);
auto node_impl = static_cast<CddsNode *>(node->data);
RET_NULL(allocator); RET_NULL(allocator);
rmw_ret_t ret = rmw_names_and_types_check_zero(sntyp); rmw_ret_t ret = rmw_names_and_types_check_zero(sntyp);
if (ret != RMW_RET_OK) { if (ret != RMW_RET_OK) {
@ -2323,7 +2471,7 @@ static rmw_ret_t get_cs_names_and_types_by_node(
} }
std::set<dds_builtintopic_guid_t> guids; std::set<dds_builtintopic_guid_t> guids;
if (node_name != nullptr && if (node_name != nullptr &&
(ret = get_node_guids(node_name, node_namespace, guids)) != RMW_RET_OK) (ret = get_node_guids(node_impl, node_name, node_namespace, guids)) != RMW_RET_OK)
{ {
return ret; return ret;
} }
@ -2359,10 +2507,10 @@ static rmw_ret_t get_cs_names_and_types_by_node(
}; };
std::map<std::string, std::set<std::string>> tt; std::map<std::string, std::set<std::string>> tt;
if ((ret = if ((ret =
rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION,
filter_and_map)) != RMW_RET_OK || filter_and_map)) != RMW_RET_OK ||
(ret = (ret =
rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION,
filter_and_map)) != RMW_RET_OK) filter_and_map)) != RMW_RET_OK)
{ {
return ret; return ret;
@ -2418,11 +2566,11 @@ static rmw_ret_t rmw_count_pubs_or_subs(
RET_NULL(topic_name); RET_NULL(topic_name);
RET_NULL(count); RET_NULL(count);
RET_WRONG_IMPLID(node); RET_WRONG_IMPLID(node);
auto node_impl = static_cast<CddsNode *>(node->data);
std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", false); std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", false);
dds_entity_t rd; dds_entity_t rd;
if ((rd = dds_create_reader(ref_ppant(), builtin_topic, NULL, NULL)) < 0) { if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) {
unref_ppant();
RMW_SET_ERROR_MSG("rmw_count_pubs_or_subs failed to create reader"); RMW_SET_ERROR_MSG("rmw_count_pubs_or_subs failed to create reader");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
@ -2440,7 +2588,6 @@ static rmw_ret_t rmw_count_pubs_or_subs(
dds_return_loan(rd, &msg, n); dds_return_loan(rd, &msg, n);
} }
dds_delete(rd); dds_delete(rd);
unref_ppant();
return RMW_RET_OK; return RMW_RET_OK;
} }