From 3e9a56a36b1749aff5d0174c8eb8105815018759 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 24 Jun 2019 09:52:41 +0200 Subject: [PATCH] Share built-in readers across nodes If each node has its own set of built-in topic readers there is quite a bit of memory use for no benefit. Signed-off-by: Erik Boasson --- rmw_cyclonedds_cpp/src/rmw_node.cpp | 102 +++++++++++++++++----------- 1 file changed, 61 insertions(+), 41 deletions(-) diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 0276584..1596f44 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -91,7 +91,6 @@ static const dds_entity_t builtin_topics[] = { struct CddsNode { dds_entity_t pp; - dds_entity_t builtin_readers[sizeof (builtin_topics) / sizeof (builtin_topics[0])]; rmw_guard_condition_t *graph_guard_condition; }; @@ -141,7 +140,17 @@ struct Cdds { std::mutex lock; uint32_t refcount; dds_entity_t ppant; + dds_entity_t builtin_readers[sizeof (builtin_topics) / sizeof (builtin_topics[0])]; + + /* set of waitsets protected by lock, used to invalidate all waitsets caches when an entity is + deleted */ std::unordered_set waitsets; + + /* set of nodes is used to trigger graph guard conditions when something changes, but that + something also changes when creating the built-in readers when Cdds::lock is already held */ + std::mutex nodes_lock; + std::unordered_set nodes; + Cdds () : refcount (0), ppant (0) {} }; @@ -267,6 +276,25 @@ extern "C" rmw_ret_t rmw_shutdown (rmw_context_t *context) return RMW_RET_OK; } +static void ggcallback (dds_entity_t rd, void *varg) +{ + static_cast(varg); + void *msg = 0; + dds_sample_info_t info; + while (dds_take (rd, &msg, &info, 1, 1) > 0) { + dds_return_loan (rd, &msg, 1); + } + + { + std::lock_guard lock (gcdds.nodes_lock); + for (auto&& node_impl : gcdds.nodes) { + if (rmw_trigger_guard_condition (node_impl->graph_guard_condition) != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to trigger graph guard condition"); + } + } + } +} + static dds_entity_t ref_ppant () { std::lock_guard lock (gcdds.lock); @@ -275,6 +303,26 @@ static dds_entity_t ref_ppant () RMW_SET_ERROR_MSG ("failed to create participant"); return gcdds.ppant; } + + static_assert (sizeof (gcdds.builtin_readers) / sizeof (gcdds.builtin_readers[0]) == + sizeof (builtin_topics) / sizeof (builtin_topics[0]), + "mismatch between array of built-in topics and array of built-in readers"); + dds_listener_t *gglistener = dds_create_listener (nullptr); + dds_lset_data_available (gglistener, ggcallback); + for (size_t i = 0; i < sizeof (builtin_topics) / sizeof (builtin_topics[0]); i++) { + dds_entity_t rd = dds_create_reader (gcdds.ppant, builtin_topics[i], nullptr, gglistener); + if (rd < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS built-in reader"); + while (i--) { + dds_delete (gcdds.builtin_readers[i]); + } + dds_delete (gcdds.ppant); + gcdds.ppant = 0; + return 0; + } + gcdds.builtin_readers[i] = rd; + } + dds_delete_listener (gglistener); } gcdds.refcount++; return gcdds.ppant; @@ -295,19 +343,6 @@ static void unref_ppant () /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static void ggcallback (dds_entity_t rd, void *varg) -{ - auto node_impl = static_cast (varg); - void *msg = 0; - dds_sample_info_t info; - while (dds_take (rd, &msg, &info, 1, 1) > 0) { - dds_return_loan (rd, &msg, 1); - } - if (rmw_trigger_guard_condition (node_impl->graph_guard_condition) != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to trigger graph guard condition"); - } -} - static std::string get_node_user_data (const char *node_name, const char *node_namespace) { return (std::string ("name=") + std::string (node_name) + @@ -334,7 +369,6 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context, const char *name } auto *node_impl = new CddsNode (); rmw_node_t *node_handle = nullptr; - dds_listener_t *gglistener = nullptr; RET_ALLOC_X (node_impl, goto fail_node_impl); rmw_guard_condition_t *graph_guard_condition; if (!(graph_guard_condition = rmw_create_guard_condition (context))) { @@ -343,23 +377,12 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context, const char *name node_impl->pp = pp; node_impl->graph_guard_condition = graph_guard_condition; - // - static_assert (sizeof (node_impl->builtin_readers) / sizeof (node_impl->builtin_readers[0]) == sizeof (builtin_topics) / sizeof (builtin_topics[0]), "mismatch between array of built-in topics and array of built-in readers"); - gglistener = dds_create_listener (node_impl); - dds_lset_data_available (gglistener, ggcallback); - for (size_t i = 0; i < sizeof (builtin_topics) / sizeof (builtin_topics[0]); i++) { - dds_entity_t rd = dds_create_reader (pp, builtin_topics[i], NULL, gglistener); - if (rd < 0) { - RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS built-in reader"); - while (i--) { - dds_delete (node_impl->builtin_readers[i]); - } - goto fail_builtin_reader; - } - node_impl->builtin_readers[i] = rd; + { + std::lock_guard lock (gcdds.nodes_lock); + gcdds.nodes.insert (node_impl); } - dds_delete_listener (gglistener); - // + + /* FIXME: should there be a (potentially spurious) trigger of the graph guard condition here? */ node_handle = rmw_node_allocate (); RET_ALLOC_X (node_handle, goto fail_node_handle); @@ -384,15 +407,13 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context, const char *name fail_node_handle_name: rmw_node_free (node_handle); fail_node_handle: + { + std::lock_guard lock (gcdds.nodes_lock); + gcdds.nodes.erase (node_impl); + } if (RMW_RET_OK != rmw_destroy_guard_condition (graph_guard_condition)) { RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); } - for (size_t i = 0; i < sizeof (node_impl->builtin_readers) / sizeof (node_impl->builtin_readers[0]); i++) { - if (dds_delete (node_impl->builtin_readers[i]) < 0) { - RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy DDS builtin-reader"); - } - } - fail_builtin_reader: fail_ggc: delete node_impl; fail_node_impl: @@ -409,10 +430,9 @@ extern "C" rmw_ret_t rmw_destroy_node (rmw_node_t *node) rmw_free (const_cast (node->name)); rmw_free (const_cast (node->namespace_)); rmw_node_free (node); - for (size_t i = 0; i < sizeof (node_impl->builtin_readers) / sizeof (node_impl->builtin_readers[0]); i++) { - if (dds_delete (node_impl->builtin_readers[i]) < 0) { - RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy DDS builtin-reader"); - } + { + std::lock_guard lock (gcdds.nodes_lock); + gcdds.nodes.erase (node_impl); } if (RMW_RET_OK != rmw_destroy_guard_condition (node_impl->graph_guard_condition)) { RMW_SET_ERROR_MSG ("failed to destroy graph guard condition");