diff --git a/README.md b/README.md index 2fc09dd..b93a444 100644 --- a/README.md +++ b/README.md @@ -38,29 +38,8 @@ There are a number of known limitations: * Cyclone DDS does not yet implement DDS Security. Consequently, there is no support for security in this RMW implementation either. -* Cyclone DDS does not allow creating a waitset or a guard condition outside a participant, and this - forces the creation of an additional participant. It can be fixed in the RMW layer, or it can be - dealt with in Cyclone DDS, but the trouble with the latter is that there are solid reasons for not - allowing it, even if it is easy to support it today. (E.g., a remote procedure call interface - ...) - -* Cyclone DDS does not currently support multiple domains simultaneously (waiting in a PR for the - final polish), and so this RMW implementation ignores the domain\_id parameter in create\_node, - instead creating all nodes/participants (including the special participant mentioned above) in the - default domain, which can be controlled via CYCLONEDDS\_URI. - * Deserialization only handles native format (it doesn't do any byte swapping). This is pure laziness, adding it is trivial. * Deserialization assumes the input is valid and will do terrible things if it isn't. Again, pure laziness, it's just adding some bounds checks and other validation code. - -* There are some "oddities" with the way service requests and replies are serialized and what it - uses as a "GUID". (It actually uses an almost-certainly-unique 64-bit number, the Cyclone DDS - instance id, instead of a real GUID.) I'm pretty sure the format is wildly different from that in - other RMW implementations, and so services presumably will not function cross-implementation. - -* The name mangling seems to be compatibl-ish with the FastRTPS implementation and in some cases - using the ros2 CLI for querying the system works cross-implementation, but not always. The one in - this implementation is reverse-engineered, so trouble may be lurking somewhere. As a related - point: the "no_demangle" option is currently ignored ... it causes a compiler warning. diff --git a/rmw_cyclonedds_cpp/CMakeLists.txt b/rmw_cyclonedds_cpp/CMakeLists.txt index a3d5f80..9e823ad 100644 --- a/rmw_cyclonedds_cpp/CMakeLists.txt +++ b/rmw_cyclonedds_cpp/CMakeLists.txt @@ -54,6 +54,7 @@ add_library(rmw_cyclonedds_cpp src/rmw_node.cpp src/serdata.cpp src/serdes.cpp + src/graphrhc.cpp src/u16string.cpp ) diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/graphrhc.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/graphrhc.hpp new file mode 100644 index 0000000..a4541ac --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/graphrhc.hpp @@ -0,0 +1,27 @@ +// Copyright 2019 ADLINK Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ +#define RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ + +#include "dds/dds.h" + +/* Introduction of custom RHC coincides with promoting the library instance & domains to entities, + and so with the introduction of DDS_CYCLONEDDS_HANDLE. */ +#ifdef DDS_CYCLONEDDS_HANDLE +#include "dds/ddsc/dds_rhc.h" + +struct dds_rhc * graphrhc_new(); +#endif // DDS_CYCLONEDDS_HANDLE + +#endif // RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ diff --git a/rmw_cyclonedds_cpp/src/graphrhc.cpp b/rmw_cyclonedds_cpp/src/graphrhc.cpp new file mode 100644 index 0000000..0a31039 --- /dev/null +++ b/rmw_cyclonedds_cpp/src/graphrhc.cpp @@ -0,0 +1,184 @@ +// Copyright 2019 ADLINK Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ +#define RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ + +#include "dds/dds.h" + +/* Introduction of custom RHC coincides with promoting the library instance & domains to entities, + and so with the introduction of DDS_CYCLONEDDS_HANDLE. */ +#ifdef DDS_CYCLONEDDS_HANDLE + +#include "dds/ddsc/dds_rhc.h" + +struct graphrhc : dds_rhc +{ + struct dds_reader * reader; +}; + +static dds_return_t graphrhc_associate( + struct dds_rhc * rhc_cmn, struct dds_reader * reader, + const struct ddsi_sertopic * topic, + struct ddsi_tkmap * tkmap) +{ + // C++ doesn't grok the fake inheritance in C, so static_cast won't work + struct graphrhc * rhc = reinterpret_cast(rhc_cmn); + rhc->reader = reader; + static_cast(topic); + static_cast(tkmap); + return DDS_RETCODE_OK; +} + +static void graphrhc_free(struct ddsi_rhc * rhc_cmn) +{ + // C++ doesn't grok the fake inheritance in C, so static_cast won't work + struct graphrhc * rhc = reinterpret_cast(rhc_cmn); + delete rhc; +} + +static bool graphrhc_store( + struct ddsi_rhc * __restrict rhc_cmn, + const struct ddsi_writer_info * __restrict wrinfo, + struct ddsi_serdata * __restrict sample, + struct ddsi_tkmap_instance * __restrict tk) +{ + // C++ doesn't grok the fake inheritance in C, so static_cast won't work + struct graphrhc * rhc = reinterpret_cast(rhc_cmn); + dds_reader_data_available_cb(rhc->reader); + static_cast(wrinfo); + static_cast(sample); + static_cast(tk); + return true; +} + +static void graphrhc_unregister_wr( + struct ddsi_rhc * __restrict rhc_cmn, + const struct ddsi_writer_info * __restrict wrinfo) +{ + // C++ doesn't grok the fake inheritance in C, so static_cast won't work + struct graphrhc * rhc = reinterpret_cast(rhc_cmn); + dds_reader_data_available_cb(rhc->reader); + static_cast(wrinfo); +} + +static void graphrhc_relinquish_ownership( + struct ddsi_rhc * __restrict rhc_cmn, + const uint64_t wr_iid) +{ + static_cast(rhc_cmn); + static_cast(wr_iid); +} + +static void graphrhc_set_qos(struct ddsi_rhc * rhc_cmn, const struct dds_qos * qos) +{ + static_cast(rhc_cmn); + static_cast(qos); +} + +static int graphrhc_read( + struct dds_rhc * rhc_cmn, bool lock, void ** values, + dds_sample_info_t * info_seq, uint32_t max_samples, uint32_t mask, + dds_instance_handle_t handle, struct dds_readcond * cond) +{ + static_cast(rhc_cmn); + static_cast(lock); + static_cast(values); + static_cast(info_seq); + static_cast(max_samples); + static_cast(mask); + static_cast(handle); + static_cast(cond); + return 0; +} + +static int graphrhc_take( + struct dds_rhc * rhc_cmn, bool lock, void ** values, + dds_sample_info_t * info_seq, uint32_t max_samples, uint32_t mask, + dds_instance_handle_t handle, struct dds_readcond * cond) +{ + static_cast(rhc_cmn); + static_cast(lock); + static_cast(values); + static_cast(info_seq); + static_cast(max_samples); + static_cast(mask); + static_cast(handle); + static_cast(cond); + return 0; +} + +static int graphrhc_takecdr( + struct dds_rhc * rhc_cmn, bool lock, struct ddsi_serdata ** values, + dds_sample_info_t * info_seq, uint32_t max_samples, + uint32_t sample_states, uint32_t view_states, uint32_t instance_states, + dds_instance_handle_t handle) +{ + static_cast(rhc_cmn); + static_cast(lock); + static_cast(values); + static_cast(info_seq); + static_cast(max_samples); + static_cast(sample_states); + static_cast(view_states); + static_cast(instance_states); + static_cast(handle); + return 0; +} + +static bool graphrhc_add_readcondition(struct dds_rhc * rhc_cmn, struct dds_readcond * cond) +{ + static_cast(rhc_cmn); + static_cast(cond); + return true; +} + +static void graphrhc_remove_readcondition(struct dds_rhc * rhc_cmn, struct dds_readcond * cond) +{ + static_cast(rhc_cmn); + static_cast(cond); +} + +static uint32_t graphrhc_lock_samples(struct dds_rhc * rhc_cmn) +{ + static_cast(rhc_cmn); + return 0; +} + +static const struct dds_rhc_ops graphrhc_ops = { + { + graphrhc_store, + graphrhc_unregister_wr, + graphrhc_relinquish_ownership, + graphrhc_set_qos, + graphrhc_free + }, + graphrhc_read, + graphrhc_take, + graphrhc_takecdr, + graphrhc_add_readcondition, + graphrhc_remove_readcondition, + graphrhc_lock_samples, + graphrhc_associate +}; + +struct dds_rhc * graphrhc_new() +{ + auto rhc = new graphrhc; + rhc->common.ops = &graphrhc_ops; + return static_cast(rhc); +} + +#endif // DDS_CYCLONEDDS_HANDLE + +#endif // RMW_CYCLONEDDS_CPP__GRAPHRHC_HPP_ diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index e2a8866..d451770 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -51,6 +51,16 @@ #include "rmw_cyclonedds_cpp/serdes.hpp" #include "rmw_cyclonedds_cpp/serdata.hpp" +/* Proper multi-domain support requires eliminating the "extra" participant, which in turn relies on + the promotion of the Cyclone DDS library instance and the daomsin to full-fledged entities. The + custom RHC was introduced at essentially the same time */ +#ifdef DDS_CYCLONEDDS_HANDLE +#define MULTIDOMAIN 1 +#include "rmw_cyclonedds_cpp/graphrhc.hpp" +#else +#define MULTIDOMAIN 0 +#endif + #define RET_ERR_X(msg, code) do {RMW_SET_ERROR_MSG(msg); code;} while (0) #define RET_NULL_X(var, code) do {if (!var) RET_ERR_X (#var " is null", code);} while (0) #define RET_ALLOC_X(var, code) do {if (!var) RET_ERR_X ("failed to allocate " #var, code); \ @@ -98,12 +108,20 @@ static const dds_entity_t builtin_topics[] = { DDS_BUILTIN_TOPIC_DCPSPUBLICATION }; +struct builtin_readers +{ + dds_entity_t rds[sizeof(builtin_topics) / sizeof(builtin_topics[0])]; +}; + struct CddsNode { dds_entity_t pp; dds_entity_t pub; dds_entity_t sub; rmw_guard_condition_t * graph_guard_condition; +#if MULTIDOMAIN + builtin_readers brd; +#endif }; struct CddsPublisher @@ -146,6 +164,7 @@ struct CddsWaitset dds_entity_t waitseth; std::vector trigs; + size_t nelems; std::mutex lock; bool inuse; @@ -158,9 +177,12 @@ struct CddsWaitset struct Cdds { std::mutex lock; + +#if !MULTIDOMAIN uint32_t refcount; dds_entity_t ppant; - dds_entity_t builtin_readers[sizeof(builtin_topics) / sizeof(builtin_topics[0])]; + builtin_readers brd; +#endif /* special guard condition that gets attached to every waitset but that is never triggered: this way, we can avoid Cyclone's behaviour of always returning immediately when no @@ -171,13 +193,22 @@ struct Cdds deleted */ std::unordered_set waitsets; +#if !MULTIDOMAIN /* set of nodes is used to trigger graph guard conditions when something changes, but that - something also changes when creating the built-in readers when Cdds::lock is already held */ + something also changes when creating the built-in readers when Cdds::lock is already held */ std::mutex nodes_lock; std::unordered_set nodes; +#endif +#if MULTIDOMAIN Cdds() - : refcount(0), ppant(0) {} + : gc_for_empty_waitset(0) + {} +#else + Cdds() + : refcount(0), ppant(0), gc_for_empty_waitset(0) + {} +#endif }; static Cdds gcdds; @@ -304,8 +335,25 @@ extern "C" rmw_ret_t rmw_shutdown(rmw_context_t * context) return RMW_RET_OK; } +///////////////////////////////////////////////////////////////////////////////////////// +/////////// /////////// +/////////// GRAPH GUARD /////////// +/////////// /////////// +///////////////////////////////////////////////////////////////////////////////////////// + static void ggcallback(dds_entity_t rd, void * varg) { +#if MULTIDOMAIN + auto gg = static_cast(varg); + void * msg = 0; + dds_sample_info_t info; + while (dds_take(rd, &msg, &info, 1, 1) > 0) { + dds_return_loan(rd, &msg, 1); + } + if (rmw_trigger_guard_condition(gg) != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to trigger graph guard condition"); + } +#else static_cast(varg); void * msg = 0; dds_sample_info_t info; @@ -321,8 +369,59 @@ static void ggcallback(dds_entity_t rd, void * varg) } } } +#endif } +static void builtin_readers_fini(builtin_readers & brd) +{ + for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) { + if (brd.rds[i] > 0) { + dds_delete(brd.rds[i]); + } + } +} + +static bool builtin_readers_init(builtin_readers & brd, dds_entity_t pp, rmw_guard_condition_t * gg) +{ + /* Built-in topics readers: have to be per-node or the graph guard condition support becomes a + real mess. */ +#if MULTIDOMAIN + assert(gg != nullptr); +#else + assert(gg == nullptr); +#endif + dds_listener_t * gglistener = dds_create_listener(static_cast(gg)); + dds_lset_data_available(gglistener, ggcallback); + for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) { + brd.rds[i] = 0; + } + for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) { +#if MULTIDOMAIN + struct dds_rhc * rhc = graphrhc_new(); + dds_entity_t rd = dds_create_reader_rhc(pp, builtin_topics[i], nullptr, gglistener, rhc); +#else + dds_entity_t rd = dds_create_reader(pp, builtin_topics[i], nullptr, gglistener); +#endif + if (rd < 0) { +#if MULTIDOMAIN + dds_rhc_free(rhc); +#endif + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", + "rmw_create_node: failed to create DDS built-in reader"); + goto fail; + } + brd.rds[i] = rd; + } + dds_delete_listener(gglistener); + return true; + +fail: + builtin_readers_fini(brd); + dds_delete_listener(gglistener); + return false; +} + +#if !MULTIDOMAIN static dds_entity_t ref_ppant() { std::lock_guard lock(gcdds.lock); @@ -336,29 +435,14 @@ static dds_entity_t ref_ppant() RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets"); dds_delete(gcdds.ppant); gcdds.ppant = 0; - return 0; + return DDS_RETCODE_ERROR; } - static_assert(sizeof(gcdds.builtin_readers) / sizeof(gcdds.builtin_readers[0]) == - sizeof(builtin_topics) / sizeof(builtin_topics[0]), - "mismatch between array of built-in topics and array of built-in readers"); - dds_listener_t * gglistener = dds_create_listener(nullptr); - dds_lset_data_available(gglistener, ggcallback); - for (size_t i = 0; i < sizeof(builtin_topics) / sizeof(builtin_topics[0]); i++) { - dds_entity_t rd = dds_create_reader(gcdds.ppant, builtin_topics[i], nullptr, gglistener); - if (rd < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", - "rmw_create_node: failed to create DDS built-in reader"); - while (i--) { - dds_delete(gcdds.builtin_readers[i]); - } - dds_delete(gcdds.ppant); - gcdds.ppant = 0; - return 0; - } - gcdds.builtin_readers[i] = rd; + if (!builtin_readers_init(gcdds.brd, gcdds.ppant, nullptr)) { + dds_delete(gcdds.ppant); + gcdds.ppant = 0; + return DDS_RETCODE_ERROR; } - dds_delete_listener(gglistener); } gcdds.refcount++; return gcdds.ppant; @@ -368,10 +452,12 @@ static void unref_ppant() { std::lock_guard lock(gcdds.lock); if (--gcdds.refcount == 0) { + builtin_readers_fini(gcdds.brd); dds_delete(gcdds.ppant); gcdds.ppant = 0; } } +#endif ///////////////////////////////////////////////////////////////////////////////////////// /////////// /////////// @@ -394,7 +480,18 @@ extern "C" rmw_node_t * rmw_create_node( static_cast(context); RET_NULL_X(name, return nullptr); RET_NULL_X(namespace_, return nullptr); - (void) domain_id; +#if MULTIDOMAIN + /* domain_id = UINT32_MAX = Cyclone DDS' "default domain id".*/ + if (domain_id >= UINT32_MAX) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", + "rmw_create_node: domain id out of range"); + return nullptr; + } + const dds_domainid_t did = static_cast(domain_id); +#else + static_cast(domain_id); + const dds_domainid_t did = DDS_DOMAIN_DEFAULT; +#endif (void) security_options; rmw_ret_t ret; int dummy_validation_result; @@ -407,13 +504,16 @@ extern "C" rmw_node_t * rmw_create_node( dds_qos_t * qos = dds_create_qos(); std::string user_data = get_node_user_data(name, namespace_); dds_qset_userdata(qos, user_data.c_str(), user_data.size()); - dds_entity_t pp = dds_create_participant(DDS_DOMAIN_DEFAULT, qos, nullptr); + dds_entity_t pp = dds_create_participant(did, qos, nullptr); dds_delete_qos(qos); if (pp < 0) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); return nullptr; } + + /* Since ROS2 doesn't require anything fancy from DDS Subscribers or Publishers, create a single + pair & reuse that */ dds_entity_t pub, sub; if ((pub = dds_create_publisher(pp, nullptr, nullptr)) < 0) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", @@ -427,6 +527,7 @@ extern "C" rmw_node_t * rmw_create_node( dds_delete(pp); return nullptr; } + auto * node_impl = new CddsNode(); rmw_node_t * node_handle = nullptr; RET_ALLOC_X(node_impl, goto fail_node_impl); @@ -439,12 +540,16 @@ extern "C" rmw_node_t * rmw_create_node( node_impl->sub = sub; node_impl->graph_guard_condition = graph_guard_condition; +#if MULTIDOMAIN + if (!builtin_readers_init(node_impl->brd, pp, graph_guard_condition)) { + goto fail_builtin_reader; + } +#else { std::lock_guard lock(gcdds.nodes_lock); gcdds.nodes.insert(node_impl); } - - /* FIXME: should there be a (potentially spurious) trigger of the graph guard condition here? */ +#endif node_handle = rmw_node_allocate(); RET_ALLOC_X(node_handle, goto fail_node_handle); @@ -461,19 +566,20 @@ extern "C" rmw_node_t * rmw_create_node( memcpy(const_cast(node_handle->namespace_), namespace_, strlen(namespace_) + 1); return node_handle; -#if 0 -fail_add_node: - rmw_free(const_cast(node_handle->namespace_)); -#endif fail_node_handle_namespace: rmw_free(const_cast(node_handle->name)); fail_node_handle_name: rmw_node_free(node_handle); fail_node_handle: +#if !MULTIDOMAIN { std::lock_guard lock(gcdds.nodes_lock); gcdds.nodes.erase(node_impl); } +#else + builtin_readers_fini(node_impl->brd); +fail_builtin_reader: +#endif if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); @@ -494,10 +600,16 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) rmw_free(const_cast(node->name)); rmw_free(const_cast(node->namespace_)); rmw_node_free(node); +#if MULTIDOMAIN + /* Must prevent the built-in topic listener from triggering before deleting the graph guard + condition. Deleting them first is the easiest. */ + builtin_readers_fini(node_impl->brd); +#else { std::lock_guard lock(gcdds.nodes_lock); gcdds.nodes.erase(node_impl); } +#endif if (RMW_RET_OK != rmw_destroy_guard_condition(node_impl->graph_guard_condition)) { RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); result_ret = RMW_RET_ERROR; @@ -571,7 +683,7 @@ extern "C" rmw_ret_t rmw_serialize( return RMW_RET_ERROR; } } - /* FIXME: what about the header - should be included or not? */ + if ((ret = rmw_serialized_message_resize(serialized_message, data.size())) != RMW_RET_OK) { return ret; } @@ -906,8 +1018,6 @@ static CddsPublisher * create_cdds_publisher( RMW_SET_ERROR_MSG("failed to get instance handle for writer"); goto fail_instance_handle; } - /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been - created earlier, but the two are equivalent, so this'll do */ pub->sertopic = sertopic; dds_delete_qos(qos); dds_delete(topic); @@ -1094,8 +1204,6 @@ static CddsSubscription * create_cdds_subscription( RMW_SET_ERROR_MSG("failed to create readcondition"); goto fail_readcond; } - /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been - created earlier, but the two are equivalent, so this'll do */ sub->sertopic = sertopic; dds_delete_qos(qos); dds_delete(topic); @@ -1410,10 +1518,15 @@ extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * co static_cast(context); rmw_guard_condition_t * guard_condition_handle; auto * gcond_impl = new CddsGuardCondition(); - if (ref_ppant() < 0) { +#if MULTIDOMAIN + const dds_entity_t owner = DDS_CYCLONEDDS_HANDLE; +#else + const dds_entity_t owner = ref_ppant(); + if (owner < 0) { goto fail_ppant; } - if ((gcond_impl->gcondh = dds_create_guardcondition(gcdds.ppant)) < 0) { +#endif + if ((gcond_impl->gcondh = dds_create_guardcondition(owner)) < 0) { RMW_SET_ERROR_MSG("failed to create guardcondition"); goto fail_guardcond; } @@ -1423,8 +1536,10 @@ extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * co return guard_condition_handle; fail_guardcond: +#if !MULTIDOMAIN unref_ppant(); fail_ppant: +#endif delete (gcond_impl); return nullptr; } @@ -1435,7 +1550,6 @@ extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t * guard_c auto * gcond_impl = static_cast(guard_condition_handle->data); clean_waitset_caches(); dds_delete(gcond_impl->gcondh); - unref_ppant(); delete gcond_impl; delete guard_condition_handle; return RMW_RET_OK; @@ -1456,6 +1570,7 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t (void) max_conditions; rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); CddsWaitset * ws = nullptr; + dds_entity_t owner = 0; RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); wait_set->implementation_identifier = eclipse_cyclonedds_identifier; wait_set->data = rmw_allocate(sizeof(CddsWaitset)); @@ -1468,26 +1583,53 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t goto fail_ws; } ws->inuse = false; - if ((ws->waitseth = dds_create_waitset(ref_ppant())) < 0) { + ws->nelems = 0; +#if MULTIDOMAIN + owner = DDS_CYCLONEDDS_HANDLE; +#else + owner = ref_ppant(); + if (owner < 0) { + goto fail_ppant; + } +#endif + + if ((ws->waitseth = dds_create_waitset(owner)) < 0) { RMW_SET_ERROR_MSG("failed to create waitset"); goto fail_waitset; } - // Attach never-triggered guard condition. As it will never be triggered, it will never be - // included in the result of dds_waitset_wait - if (dds_waitset_attach(ws->waitseth, gcdds.gc_for_empty_waitset, INTPTR_MAX) < 0) { - RMW_SET_ERROR_MSG("failed to attach dummy guard condition for blocking on empty waitset"); - goto fail_attach_dummy; - } + { std::lock_guard lock(gcdds.lock); +#if MULTIDOMAIN + // Lazily create dummy guard condition + if (gcdds.waitsets.size() == 0) { + if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(owner)) < 0) { + RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets"); + goto fail_create_dummy; + } + } +#endif + // Attach never-triggered guard condition. As it will never be triggered, it will never be + // included in the result of dds_waitset_wait + if (dds_waitset_attach(ws->waitseth, gcdds.gc_for_empty_waitset, INTPTR_MAX) < 0) { + RMW_SET_ERROR_MSG("failed to attach dummy guard condition for blocking on empty waitset"); + goto fail_attach_dummy; + } gcdds.waitsets.insert(ws); } + return wait_set; fail_attach_dummy: +#if MULTIDOMAIN +fail_create_dummy: +#endif dds_delete(ws->waitseth); fail_waitset: +#if !MULTIDOMAIN unref_ppant(); +fail_ppant: +#endif fail_ws: RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws); fail_placement_new: @@ -1508,8 +1650,13 @@ extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t * wait_set) { std::lock_guard lock(gcdds.lock); gcdds.waitsets.erase(ws); +#if MULTIDOMAIN + if (gcdds.waitsets.size() == 0) { + dds_delete(gcdds.gc_for_empty_waitset); + gcdds.gc_for_empty_waitset = 0; + } +#endif } - unref_ppant(); RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR); rmw_free(wait_set->data); rmw_wait_set_free(wait_set); @@ -1548,6 +1695,7 @@ static void waitset_detach(CddsWaitset * ws) ws->gcs.resize(0); ws->srvs.resize(0); ws->cls.resize(0); + ws->nelems = 0; } static void clean_waitset_caches() @@ -1611,9 +1759,10 @@ extern "C" rmw_ret_t rmw_wait( ATTACH(CddsService, srvs, service, service.sub->rdcondh); ATTACH(CddsClient, cls, client, client.sub->rdcondh); #undef ATTACH - ws->trigs.resize(nelems + 1); + ws->nelems = nelems; } + ws->trigs.resize(ws->nelems + 1); const dds_duration_t timeout = (wait_timeout == NULL) ? DDS_NEVER : @@ -1849,15 +1998,11 @@ static rmw_ret_t rmw_init_cs( RMW_SET_ERROR_MSG("failed to create writer"); goto fail_writer; } - /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been - created earlier, but the two are equivalent, so this'll do */ pub->sertopic = pub_st; if ((sub->subh = dds_create_reader(node_impl->sub, subtopic, qos, nullptr)) < 0) { RMW_SET_ERROR_MSG("failed to create reader"); goto fail_reader; } - /* FIXME: not guaranteed that "topic" will refer to "sertopic" because topic might have been - created earlier, but the two are equivalent, so this'll do */ sub->sertopic = sub_st; if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { RMW_SET_ERROR_MSG("failed to create readcondition"); @@ -1988,11 +2133,11 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * serv ///////////////////////////////////////////////////////////////////////////////////////// static rmw_ret_t do_for_node( + CddsNode * node_impl, std::function oper) { dds_entity_t rd; - if ((rd = dds_create_reader(ref_ppant(), DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { - unref_ppant(); + if ((rd = dds_create_reader(node_impl->pp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("rmw_get_node_names: failed to create reader"); return RMW_RET_ERROR; } @@ -2008,7 +2153,6 @@ static rmw_ret_t do_for_node( dds_return_loan(rd, &msg, n); } dds_delete(rd); - unref_ppant(); if (n < 0) { RMW_SET_ERROR_MSG("rmw_get_node_names: error reading participants"); return RMW_RET_ERROR; @@ -2017,6 +2161,7 @@ static rmw_ret_t do_for_node( } static rmw_ret_t do_for_node_user_data( + CddsNode * node_impl, std::function oper) { auto f = [oper](const dds_builtintopic_participant_t & sample) -> bool { @@ -2033,7 +2178,7 @@ static rmw_ret_t do_for_node_user_data( return oper(sample, ""); } }; - return do_for_node(f); + return do_for_node(node_impl, f); } extern "C" rmw_ret_t rmw_get_node_names( @@ -2042,6 +2187,7 @@ extern "C" rmw_ret_t rmw_get_node_names( rcutils_string_array_t * node_namespaces) { RET_WRONG_IMPLID(node); + auto node_impl = static_cast(node->data); if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK || rmw_check_zero_rmw_string_array(node_namespaces) != RMW_RET_OK) { @@ -2060,7 +2206,7 @@ extern "C" rmw_ret_t rmw_get_node_names( return true; }; rmw_ret_t ret; - if ((ret = do_for_node_user_data(oper)) != RMW_RET_OK) { + if ((ret = do_for_node_user_data(node_impl, oper)) != RMW_RET_OK) { return ret; } @@ -2106,6 +2252,7 @@ fail_alloc: static rmw_ret_t rmw_collect_tptyp_for_kind( std::map> & tt, + CddsNode * node_impl, dds_entity_t builtin_topic, std::function filter_and_map) @@ -2114,8 +2261,7 @@ static rmw_ret_t rmw_collect_tptyp_for_kind( builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); dds_entity_t rd; - if ((rd = dds_create_reader(ref_ppant(), builtin_topic, NULL, NULL)) < 0) { - unref_ppant(); + if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader"); return RMW_RET_ERROR; } @@ -2133,7 +2279,6 @@ static rmw_ret_t rmw_collect_tptyp_for_kind( dds_return_loan(rd, &msg, n); } dds_delete(rd); - unref_ppant(); if (n == 0) { return RMW_RET_OK; } else { @@ -2185,6 +2330,7 @@ fail_mem: } static rmw_ret_t get_node_guids( + CddsNode * node_impl, const char * node_name, const char * node_namespace, std::set & guids) { @@ -2196,7 +2342,7 @@ static rmw_ret_t get_node_guids( } return true; /* do keep looking - what if there are many? */ }; - rmw_ret_t ret = do_for_node_user_data(oper); + rmw_ret_t ret = do_for_node_user_data(node_impl, oper); if (ret != RMW_RET_OK) { return ret; } else if (guids.size() == 0) { @@ -2217,6 +2363,7 @@ static rmw_ret_t get_endpoint_names_and_types_by_node( bool pubs) { RET_WRONG_IMPLID(node); + auto node_impl = static_cast(node->data); RET_NULL(allocator); rmw_ret_t ret = rmw_names_and_types_check_zero(tptyp); if (ret != RMW_RET_OK) { @@ -2234,7 +2381,7 @@ static rmw_ret_t get_endpoint_names_and_types_by_node( } std::set guids; if (node_name != nullptr && - (ret = get_node_guids(node_name, node_namespace, guids)) != RMW_RET_OK) + (ret = get_node_guids(node_impl, node_name, node_namespace, guids)) != RMW_RET_OK) { return ret; } @@ -2268,14 +2415,14 @@ static rmw_ret_t get_endpoint_names_and_types_by_node( std::map> tt; if (subs && (ret = - rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, + rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK) { return ret; } if (pubs && (ret = - rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, + rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) { return ret; @@ -2306,6 +2453,7 @@ static rmw_ret_t get_cs_names_and_types_by_node( bool looking_for_services) { RET_WRONG_IMPLID(node); + auto node_impl = static_cast(node->data); RET_NULL(allocator); rmw_ret_t ret = rmw_names_and_types_check_zero(sntyp); if (ret != RMW_RET_OK) { @@ -2323,7 +2471,7 @@ static rmw_ret_t get_cs_names_and_types_by_node( } std::set guids; if (node_name != nullptr && - (ret = get_node_guids(node_name, node_namespace, guids)) != RMW_RET_OK) + (ret = get_node_guids(node_impl, node_name, node_namespace, guids)) != RMW_RET_OK) { return ret; } @@ -2359,10 +2507,10 @@ static rmw_ret_t get_cs_names_and_types_by_node( }; std::map> tt; if ((ret = - rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, + rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK || (ret = - rmw_collect_tptyp_for_kind(tt, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, + rmw_collect_tptyp_for_kind(tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) { return ret; @@ -2418,11 +2566,11 @@ static rmw_ret_t rmw_count_pubs_or_subs( RET_NULL(topic_name); RET_NULL(count); RET_WRONG_IMPLID(node); + auto node_impl = static_cast(node->data); std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", false); dds_entity_t rd; - if ((rd = dds_create_reader(ref_ppant(), builtin_topic, NULL, NULL)) < 0) { - unref_ppant(); + if ((rd = dds_create_reader(node_impl->pp, builtin_topic, NULL, NULL)) < 0) { RMW_SET_ERROR_MSG("rmw_count_pubs_or_subs failed to create reader"); return RMW_RET_ERROR; } @@ -2440,7 +2588,6 @@ static rmw_ret_t rmw_count_pubs_or_subs( dds_return_loan(rd, &msg, n); } dds_delete(rd); - unref_ppant(); return RMW_RET_OK; }