Share built-in readers across nodes
If each node has its own set of built-in topic readers there is quite a bit of memory use for no benefit. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
3a4b10421d
commit
3e9a56a36b
1 changed files with 61 additions and 41 deletions
|
@ -91,7 +91,6 @@ static const dds_entity_t builtin_topics[] = {
|
||||||
|
|
||||||
struct CddsNode {
|
struct CddsNode {
|
||||||
dds_entity_t pp;
|
dds_entity_t pp;
|
||||||
dds_entity_t builtin_readers[sizeof (builtin_topics) / sizeof (builtin_topics[0])];
|
|
||||||
rmw_guard_condition_t *graph_guard_condition;
|
rmw_guard_condition_t *graph_guard_condition;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -141,7 +140,17 @@ struct Cdds {
|
||||||
std::mutex lock;
|
std::mutex lock;
|
||||||
uint32_t refcount;
|
uint32_t refcount;
|
||||||
dds_entity_t ppant;
|
dds_entity_t ppant;
|
||||||
|
dds_entity_t builtin_readers[sizeof (builtin_topics) / sizeof (builtin_topics[0])];
|
||||||
|
|
||||||
|
/* set of waitsets protected by lock, used to invalidate all waitsets caches when an entity is
|
||||||
|
deleted */
|
||||||
std::unordered_set<CddsWaitset *> waitsets;
|
std::unordered_set<CddsWaitset *> waitsets;
|
||||||
|
|
||||||
|
/* set of nodes is used to trigger graph guard conditions when something changes, but that
|
||||||
|
something also changes when creating the built-in readers when Cdds::lock is already held */
|
||||||
|
std::mutex nodes_lock;
|
||||||
|
std::unordered_set<CddsNode *> nodes;
|
||||||
|
|
||||||
Cdds () : refcount (0), ppant (0) {}
|
Cdds () : refcount (0), ppant (0) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -267,6 +276,25 @@ extern "C" rmw_ret_t rmw_shutdown (rmw_context_t *context)
|
||||||
return RMW_RET_OK;
|
return RMW_RET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ggcallback (dds_entity_t rd, void *varg)
|
||||||
|
{
|
||||||
|
static_cast<void>(varg);
|
||||||
|
void *msg = 0;
|
||||||
|
dds_sample_info_t info;
|
||||||
|
while (dds_take (rd, &msg, &info, 1, 1) > 0) {
|
||||||
|
dds_return_loan (rd, &msg, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock (gcdds.nodes_lock);
|
||||||
|
for (auto&& node_impl : gcdds.nodes) {
|
||||||
|
if (rmw_trigger_guard_condition (node_impl->graph_guard_condition) != RMW_RET_OK) {
|
||||||
|
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to trigger graph guard condition");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static dds_entity_t ref_ppant ()
|
static dds_entity_t ref_ppant ()
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock (gcdds.lock);
|
std::lock_guard<std::mutex> lock (gcdds.lock);
|
||||||
|
@ -275,6 +303,26 @@ static dds_entity_t ref_ppant ()
|
||||||
RMW_SET_ERROR_MSG ("failed to create participant");
|
RMW_SET_ERROR_MSG ("failed to create participant");
|
||||||
return gcdds.ppant;
|
return gcdds.ppant;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static_assert (sizeof (gcdds.builtin_readers) / sizeof (gcdds.builtin_readers[0]) ==
|
||||||
|
sizeof (builtin_topics) / sizeof (builtin_topics[0]),
|
||||||
|
"mismatch between array of built-in topics and array of built-in readers");
|
||||||
|
dds_listener_t *gglistener = dds_create_listener (nullptr);
|
||||||
|
dds_lset_data_available (gglistener, ggcallback);
|
||||||
|
for (size_t i = 0; i < sizeof (builtin_topics) / sizeof (builtin_topics[0]); i++) {
|
||||||
|
dds_entity_t rd = dds_create_reader (gcdds.ppant, builtin_topics[i], nullptr, gglistener);
|
||||||
|
if (rd < 0) {
|
||||||
|
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS built-in reader");
|
||||||
|
while (i--) {
|
||||||
|
dds_delete (gcdds.builtin_readers[i]);
|
||||||
|
}
|
||||||
|
dds_delete (gcdds.ppant);
|
||||||
|
gcdds.ppant = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
gcdds.builtin_readers[i] = rd;
|
||||||
|
}
|
||||||
|
dds_delete_listener (gglistener);
|
||||||
}
|
}
|
||||||
gcdds.refcount++;
|
gcdds.refcount++;
|
||||||
return gcdds.ppant;
|
return gcdds.ppant;
|
||||||
|
@ -295,19 +343,6 @@ static void unref_ppant ()
|
||||||
/////////// ///////////
|
/////////// ///////////
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
static void ggcallback (dds_entity_t rd, void *varg)
|
|
||||||
{
|
|
||||||
auto node_impl = static_cast<CddsNode *> (varg);
|
|
||||||
void *msg = 0;
|
|
||||||
dds_sample_info_t info;
|
|
||||||
while (dds_take (rd, &msg, &info, 1, 1) > 0) {
|
|
||||||
dds_return_loan (rd, &msg, 1);
|
|
||||||
}
|
|
||||||
if (rmw_trigger_guard_condition (node_impl->graph_guard_condition) != RMW_RET_OK) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to trigger graph guard condition");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static std::string get_node_user_data (const char *node_name, const char *node_namespace)
|
static std::string get_node_user_data (const char *node_name, const char *node_namespace)
|
||||||
{
|
{
|
||||||
return (std::string ("name=") + std::string (node_name) +
|
return (std::string ("name=") + std::string (node_name) +
|
||||||
|
@ -334,7 +369,6 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context, const char *name
|
||||||
}
|
}
|
||||||
auto *node_impl = new CddsNode ();
|
auto *node_impl = new CddsNode ();
|
||||||
rmw_node_t *node_handle = nullptr;
|
rmw_node_t *node_handle = nullptr;
|
||||||
dds_listener_t *gglistener = nullptr;
|
|
||||||
RET_ALLOC_X (node_impl, goto fail_node_impl);
|
RET_ALLOC_X (node_impl, goto fail_node_impl);
|
||||||
rmw_guard_condition_t *graph_guard_condition;
|
rmw_guard_condition_t *graph_guard_condition;
|
||||||
if (!(graph_guard_condition = rmw_create_guard_condition (context))) {
|
if (!(graph_guard_condition = rmw_create_guard_condition (context))) {
|
||||||
|
@ -343,23 +377,12 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context, const char *name
|
||||||
node_impl->pp = pp;
|
node_impl->pp = pp;
|
||||||
node_impl->graph_guard_condition = graph_guard_condition;
|
node_impl->graph_guard_condition = graph_guard_condition;
|
||||||
|
|
||||||
//
|
{
|
||||||
static_assert (sizeof (node_impl->builtin_readers) / sizeof (node_impl->builtin_readers[0]) == sizeof (builtin_topics) / sizeof (builtin_topics[0]), "mismatch between array of built-in topics and array of built-in readers");
|
std::lock_guard<std::mutex> lock (gcdds.nodes_lock);
|
||||||
gglistener = dds_create_listener (node_impl);
|
gcdds.nodes.insert (node_impl);
|
||||||
dds_lset_data_available (gglistener, ggcallback);
|
|
||||||
for (size_t i = 0; i < sizeof (builtin_topics) / sizeof (builtin_topics[0]); i++) {
|
|
||||||
dds_entity_t rd = dds_create_reader (pp, builtin_topics[i], NULL, gglistener);
|
|
||||||
if (rd < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS built-in reader");
|
|
||||||
while (i--) {
|
|
||||||
dds_delete (node_impl->builtin_readers[i]);
|
|
||||||
}
|
}
|
||||||
goto fail_builtin_reader;
|
|
||||||
}
|
/* FIXME: should there be a (potentially spurious) trigger of the graph guard condition here? */
|
||||||
node_impl->builtin_readers[i] = rd;
|
|
||||||
}
|
|
||||||
dds_delete_listener (gglistener);
|
|
||||||
//
|
|
||||||
|
|
||||||
node_handle = rmw_node_allocate ();
|
node_handle = rmw_node_allocate ();
|
||||||
RET_ALLOC_X (node_handle, goto fail_node_handle);
|
RET_ALLOC_X (node_handle, goto fail_node_handle);
|
||||||
|
@ -384,15 +407,13 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context, const char *name
|
||||||
fail_node_handle_name:
|
fail_node_handle_name:
|
||||||
rmw_node_free (node_handle);
|
rmw_node_free (node_handle);
|
||||||
fail_node_handle:
|
fail_node_handle:
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock (gcdds.nodes_lock);
|
||||||
|
gcdds.nodes.erase (node_impl);
|
||||||
|
}
|
||||||
if (RMW_RET_OK != rmw_destroy_guard_condition (graph_guard_condition)) {
|
if (RMW_RET_OK != rmw_destroy_guard_condition (graph_guard_condition)) {
|
||||||
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling");
|
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling");
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < sizeof (node_impl->builtin_readers) / sizeof (node_impl->builtin_readers[0]); i++) {
|
|
||||||
if (dds_delete (node_impl->builtin_readers[i]) < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy DDS builtin-reader");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fail_builtin_reader:
|
|
||||||
fail_ggc:
|
fail_ggc:
|
||||||
delete node_impl;
|
delete node_impl;
|
||||||
fail_node_impl:
|
fail_node_impl:
|
||||||
|
@ -409,10 +430,9 @@ extern "C" rmw_ret_t rmw_destroy_node (rmw_node_t *node)
|
||||||
rmw_free (const_cast<char *> (node->name));
|
rmw_free (const_cast<char *> (node->name));
|
||||||
rmw_free (const_cast<char *> (node->namespace_));
|
rmw_free (const_cast<char *> (node->namespace_));
|
||||||
rmw_node_free (node);
|
rmw_node_free (node);
|
||||||
for (size_t i = 0; i < sizeof (node_impl->builtin_readers) / sizeof (node_impl->builtin_readers[0]); i++) {
|
{
|
||||||
if (dds_delete (node_impl->builtin_readers[i]) < 0) {
|
std::lock_guard<std::mutex> lock (gcdds.nodes_lock);
|
||||||
RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy DDS builtin-reader");
|
gcdds.nodes.erase (node_impl);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (RMW_RET_OK != rmw_destroy_guard_condition (node_impl->graph_guard_condition)) {
|
if (RMW_RET_OK != rmw_destroy_guard_condition (node_impl->graph_guard_condition)) {
|
||||||
RMW_SET_ERROR_MSG ("failed to destroy graph guard condition");
|
RMW_SET_ERROR_MSG ("failed to destroy graph guard condition");
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue