diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index bfa6261..1d480b2 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -151,7 +151,7 @@ static rmw_subscription_t * create_subscription( ); static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription); -static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl); +static rmw_guard_condition_t * create_guard_condition(); static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc); struct CddsDomain; @@ -254,6 +254,10 @@ struct rmw_context_impl_t dds_entity_t dds_pub; dds_entity_t dds_sub; + /* Participant reference count*/ + size_t node_count{0}; + std::mutex initialization_mutex; + rmw_context_impl_t() : common(), domain_id(UINT32_MAX), ppant(0) { @@ -263,33 +267,29 @@ struct rmw_context_impl_t common.pub = nullptr; common.sub = nullptr; } + + // Initializes the participant, if it wasn't done already. + // node_count is increased + rmw_ret_t + init(rmw_init_options_t * options); + + // Destroys the participant, when node_count reaches 0. + rmw_ret_t + fini(); + ~rmw_context_impl_t() { - discovery_thread_stop(common); - common.graph_cache.clear_on_change_callback(); - if (common.graph_guard_condition) { - destroy_guard_condition(common.graph_guard_condition); - } - if (common.pub) { - destroy_publisher(common.pub); - } - if (common.sub) { - destroy_subscription(common.sub); - } - if (ppant > 0 && dds_delete(ppant) < 0) { + if (0u != this->node_count) { RCUTILS_SAFE_FWRITE_TO_STDERR( - "Failed to destroy domain in destructor\n"); - } - if (domain_id != UINT32_MAX) { - std::lock_guard lock(gcdds.domains_lock); - CddsDomain & dom = gcdds.domains[domain_id]; - assert(dom.refcount > 0); - if (--dom.refcount == 0) { - static_cast(dds_delete(dom.domain_handle)); - gcdds.domains.erase(domain_id); - } + "Not all nodes were finished before finishing the context\n." + "Ensure `rcl_node_fini` is called for all nodes before `rcl_context_fini`," + "to avoid leaking.\n"); } } + +private: + void + clean_up(); }; struct CddsNode @@ -583,7 +583,7 @@ static void discovery_thread(rmw_context_impl_t * impl) dds_entity_t ws; /* deleting ppant will delete waitset as well, so there is no real need to delete the waitset here on error, but it is more hygienic */ - if ((ws = dds_create_waitset(impl->ppant)) < 0) { + if ((ws = dds_create_waitset(DDS_CYCLONEDDS_HANDLE)) < 0) { RCUTILS_SAFE_FWRITE_TO_STDERR( "ros discovery info listener thread: failed to create waitset, will shutdown ...\n"); return; @@ -637,7 +637,7 @@ static rmw_ret_t discovery_thread_start(rmw_context_impl_t * impl) { auto common_context = &impl->common; common_context->thread_is_running.store(true); - common_context->listener_thread_gc = create_guard_condition(impl); + common_context->listener_thread_gc = create_guard_condition(); if (common_context->listener_thread_gc) { try { common_context->listener_thread = std::thread(discovery_thread, impl); @@ -745,6 +745,21 @@ static bool check_create_domain(dds_domainid_t did, rmw_localhost_only_t localho } } +static +void +check_destroy_domain(dds_domainid_t domain_id) +{ + if (domain_id != UINT32_MAX) { + std::lock_guard lock(gcdds.domains_lock); + CddsDomain & dom = gcdds.domains[domain_id]; + assert(dom.refcount > 0); + if (--dom.refcount == 0) { + static_cast(dds_delete(dom.domain_handle)); + gcdds.domains.erase(domain_id); + } + } +} + #if RMW_SUPPORT_SECURITY /* Returns the full URI of a security file properly formatted for DDS */ bool get_security_file_URI( @@ -816,6 +831,7 @@ void finalize_security_file_URIs( #endif /* RMW_SUPPORT_SECURITY */ /* Attempt to set all the qos properties needed to enable DDS security */ +static rmw_ret_t configure_qos_for_security( dds_qos_t * qos, const rmw_security_options_t * security_options) @@ -861,6 +877,194 @@ rmw_ret_t configure_qos_for_security( #endif } +rmw_ret_t +rmw_context_impl_t::init(rmw_init_options_t * options) +{ + std::lock_guard guard(initialization_mutex); + if (0u != this->node_count) { + // initialization has already been done + this->node_count++; + return RMW_RET_OK; + } + + /* Take domains_lock and hold it until after the participant creation succeeded or + failed: otherwise there is a race with rmw_destroy_node deleting the last participant + and tearing down the domain for versions of Cyclone that implement the original + version of dds_create_domain that doesn't return a handle. */ + this->domain_id = static_cast(options->domain_id); + if (!check_create_domain(this->domain_id, options->localhost_only)) { + return RMW_RET_ERROR; + } + + std::unique_ptr> + ppant_qos(dds_create_qos(), &dds_delete_qos); + if (ppant_qos == nullptr) { + this->clean_up(); + return RMW_RET_BAD_ALLOC; + } + std::string user_data = std::string("enclave=") + std::string( + options->enclave) + std::string(";"); + dds_qset_userdata(ppant_qos.get(), user_data.c_str(), user_data.size()); + if (configure_qos_for_security( + ppant_qos.get(), + &options->security_options) != RMW_RET_OK) + { + if (RMW_SECURITY_ENFORCEMENT_ENFORCE == options->security_options.enforce_security) { + this->clean_up(); + return RMW_RET_ERROR; + } + } + + this->ppant = dds_create_participant(this->domain_id, ppant_qos.get(), nullptr); + if (this->ppant < 0) { + this->clean_up(); + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); + return RMW_RET_ERROR; + } + + /* Create readers for DDS built-in topics for monitoring discovery */ + if ((this->rd_participant = + dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0) + { + this->clean_up(); + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSParticipant reader"); + return RMW_RET_ERROR; + } + if ((this->rd_subscription = + dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, nullptr, nullptr)) < 0) + { + this->clean_up(); + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSSubscription reader"); + return RMW_RET_ERROR; + } + if ((this->rd_publication = + dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, nullptr, nullptr)) < 0) + { + this->clean_up(); + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSPublication reader"); + return RMW_RET_ERROR; + } + /* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers + to be created for RMW publishers/subscriptions. */ + if ((this->dds_pub = dds_create_publisher(this->ppant, nullptr, nullptr)) < 0) { + this->clean_up(); + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher"); + return RMW_RET_ERROR; + } + if ((this->dds_sub = dds_create_subscriber(this->ppant, nullptr, nullptr)) < 0) { + this->clean_up(); + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber"); + return RMW_RET_ERROR; + } + + rmw_qos_profile_t pubsub_qos = rmw_qos_profile_default; + pubsub_qos.avoid_ros_namespace_conventions = true; + pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + pubsub_qos.depth = 1; + pubsub_qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + pubsub_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + + /* Create RMW publisher/subscription/guard condition used by rmw_dds_common + discovery */ + rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options(); + this->common.pub = create_publisher( + this->ppant, this->dds_pub, + rosidl_typesupport_cpp::get_message_type_support_handle(), + "ros_discovery_info", + &pubsub_qos, + &publisher_options); + if (this->common.pub == nullptr) { + this->clean_up(); + return RMW_RET_ERROR; + } + + rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options(); + subscription_options.ignore_local_publications = true; + // FIXME: keyed topics => KEEP_LAST and depth 1. + pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL; + this->common.sub = create_subscription( + this->ppant, this->dds_sub, + rosidl_typesupport_cpp::get_message_type_support_handle(), + "ros_discovery_info", + &pubsub_qos, + &subscription_options); + if (this->common.sub == nullptr) { + this->clean_up(); + return RMW_RET_ERROR; + } + + this->common.graph_guard_condition = create_guard_condition(); + if (this->common.graph_guard_condition == nullptr) { + this->clean_up(); + return RMW_RET_BAD_ALLOC; + } + + this->common.graph_cache.set_on_change_callback( + [guard_condition = this->common.graph_guard_condition]() { + rmw_ret_t ret = rmw_trigger_guard_condition(guard_condition); + if (ret != RMW_RET_OK) { + RMW_SET_ERROR_MSG("graph cache on_change_callback failed to trigger guard condition"); + } + }); + + get_entity_gid(this->ppant, this->common.gid); + this->common.graph_cache.add_participant(this->common.gid, options->enclave); + + // One could also use a set of listeners instead of a thread for maintaining the graph cache: + // - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock + // caused by the graph cache's mutex already having been locked by (e.g.) rmw_create_node. + // - Whatever the graph cache implementation does, it shouldn't involve much more than local state + // updates and triggering a guard condition, and so that should be safe. + // however, the graph cache updates could be expensive, and so performing those operations on + // the thread receiving data from the network may not be wise. + rmw_ret_t ret; + if ((ret = discovery_thread_start(this)) != RMW_RET_OK) { + this->clean_up(); + return ret; + } + ++this->node_count; + return RMW_RET_OK; +} + +void +rmw_context_impl_t::clean_up() +{ + discovery_thread_stop(common); + common.graph_cache.clear_on_change_callback(); + if (common.graph_guard_condition) { + destroy_guard_condition(common.graph_guard_condition); + } + if (common.pub) { + destroy_publisher(common.pub); + } + if (common.sub) { + destroy_subscription(common.sub); + } + if (ppant > 0 && dds_delete(ppant) < 0) { + RCUTILS_SAFE_FWRITE_TO_STDERR( + "Failed to destroy domain in destructor\n"); + } + check_destroy_domain(domain_id); +} + +rmw_ret_t +rmw_context_impl_t::fini() +{ + std::lock_guard guard(initialization_mutex); + if (0u != --this->node_count) { + // destruction shouldn't happen yet + return RMW_RET_OK; + } + this->clean_up(); + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t * context) { rmw_ret_t ret; @@ -881,7 +1085,6 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t "rmw_cyclonedds_cpp", "rmw_create_node: domain id out of range"); return RMW_RET_INVALID_ARGUMENT; } - const dds_domainid_t domain_id = static_cast(options->domain_id); context->instance_id = options->instance_id; context->implementation_identifier = eclipse_cyclonedds_identifier; @@ -891,144 +1094,12 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t return ret; } - std::unique_ptr impl(new(std::nothrow) rmw_context_impl_t()); - if (impl == nullptr) { + rmw_context_impl_t * impl = new(std::nothrow) rmw_context_impl_t(); + if (nullptr == impl) { return RMW_RET_BAD_ALLOC; } - /* Take domains_lock and hold it until after the participant creation succeeded or - failed: otherwise there is a race with rmw_destroy_node deleting the last participant - and tearing down the domain for versions of Cyclone that implement the original - version of dds_create_domain that doesn't return a handle. */ - if (!check_create_domain(domain_id, options->localhost_only)) { - return RMW_RET_ERROR; - } - - /* Once the domain id is set in impl, impl's destructor will take care of unref'ing - the domain */ - impl->domain_id = domain_id; - - std::unique_ptr> - ppant_qos(dds_create_qos(), &dds_delete_qos); - if (ppant_qos == nullptr) { - return RMW_RET_BAD_ALLOC; - } - std::string user_data = std::string("enclave=") + std::string( - context->options.enclave) + std::string(";"); - dds_qset_userdata(ppant_qos.get(), user_data.c_str(), user_data.size()); - if (configure_qos_for_security( - ppant_qos.get(), - &context->options.security_options) != RMW_RET_OK) - { - if (context->options.security_options.enforce_security == RMW_SECURITY_ENFORCEMENT_ENFORCE) { - return RMW_RET_ERROR; - } - } - impl->ppant = dds_create_participant(domain_id, ppant_qos.get(), nullptr); - if (impl->ppant < 0) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); - return RMW_RET_ERROR; - } - - /* Create readers for DDS built-in topics for monitoring discovery */ - if ((impl->rd_participant = - dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0) - { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSParticipant reader"); - return RMW_RET_ERROR; - } - if ((impl->rd_subscription = - dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, nullptr, nullptr)) < 0) - { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSSubscription reader"); - return RMW_RET_ERROR; - } - if ((impl->rd_publication = - dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, nullptr, nullptr)) < 0) - { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSPublication reader"); - return RMW_RET_ERROR; - } - - /* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers - to be created for RMW publishers/subscriptions. */ - if ((impl->dds_pub = dds_create_publisher(impl->ppant, nullptr, nullptr)) < 0) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher"); - return RMW_RET_ERROR; - } - if ((impl->dds_sub = dds_create_subscriber(impl->ppant, nullptr, nullptr)) < 0) { - RCUTILS_LOG_ERROR_NAMED( - "rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber"); - return RMW_RET_ERROR; - } - - rmw_qos_profile_t pubsub_qos = rmw_qos_profile_default; - pubsub_qos.avoid_ros_namespace_conventions = true; - pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; - pubsub_qos.depth = 1; - pubsub_qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; - pubsub_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; - - /* Create RMW publisher/subscription/guard condition used by rmw_dds_common - discovery */ - rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options(); - impl->common.pub = create_publisher( - impl->ppant, impl->dds_pub, - rosidl_typesupport_cpp::get_message_type_support_handle(), - "ros_discovery_info", - &pubsub_qos, - &publisher_options); - if (impl->common.pub == nullptr) { - return RMW_RET_ERROR; - } - - rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options(); - subscription_options.ignore_local_publications = true; - // FIXME: keyed topics => KEEP_LAST and depth 1. - pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL; - impl->common.sub = create_subscription( - impl->ppant, impl->dds_sub, - rosidl_typesupport_cpp::get_message_type_support_handle(), - "ros_discovery_info", - &pubsub_qos, - &subscription_options); - if (impl->common.sub == nullptr) { - return RMW_RET_ERROR; - } - - impl->common.graph_guard_condition = create_guard_condition(impl.get()); - if (impl->common.graph_guard_condition == nullptr) { - return RMW_RET_BAD_ALLOC; - } - - impl->common.graph_cache.set_on_change_callback( - [guard_condition = impl->common.graph_guard_condition]() { - rmw_ret_t ret = rmw_trigger_guard_condition(guard_condition); - if (ret != RMW_RET_OK) { - RMW_SET_ERROR_MSG("graph cache on_change_callback failed to trigger guard condition"); - } - }); - - get_entity_gid(impl->ppant, impl->common.gid); - impl->common.graph_cache.add_participant(impl->common.gid, context->options.enclave); - - // One could also use a set of listeners instead of a thread for maintaining the graph cache: - // - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock - // caused by the graph cache's mutex already having been locked by (e.g.) rmw_create_node. - // - Whatever the graph cache implementation does, it shouldn't involve much more than local state - // updates and triggering a guard condition, and so that should be safe. - // however, the graph cache updates could be expensive, and so performing those operations on - // the thread receiving data from the network may not be wise. - if ((ret = discovery_thread_start(impl.get())) != RMW_RET_OK) { - return ret; - } - - context->impl = impl.release(); + context->impl = impl; return RMW_RET_OK; } @@ -1091,6 +1162,11 @@ extern "C" rmw_node_t * rmw_create_node( return nullptr; } + ret = context->impl->init(&context->options); + if (RMW_RET_OK != ret) { + return nullptr; + } + auto * node_impl = new CddsNode(); rmw_node_t * node_handle = nullptr; RET_ALLOC_X(node_impl, goto fail_node_impl); @@ -1142,6 +1218,7 @@ fail_node_handle_name: fail_node_handle: delete node_impl; fail_node_impl: + context->impl->fini(); return nullptr; } @@ -1168,6 +1245,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) rmw_free(const_cast(node->name)); rmw_free(const_cast(node->namespace_)); + node->context->impl->fini(); rmw_node_free(node); delete node_impl; return result_ret; @@ -2562,11 +2640,11 @@ extern "C" rmw_ret_t rmw_take_event( /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl) +static rmw_guard_condition_t * create_guard_condition() { rmw_guard_condition_t * guard_condition_handle; auto * gcond_impl = new CddsGuardCondition(); - if ((gcond_impl->gcondh = dds_create_guardcondition(impl->ppant)) < 0) { + if ((gcond_impl->gcondh = dds_create_guardcondition(DDS_CYCLONEDDS_HANDLE)) < 0) { RMW_SET_ERROR_MSG("failed to create guardcondition"); goto fail_guardcond; } @@ -2582,7 +2660,8 @@ fail_guardcond: extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * context) { - return create_guard_condition(context->impl); + (void)context; + return create_guard_condition(); } static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * guard_condition_handle) @@ -2612,7 +2691,8 @@ extern "C" rmw_ret_t rmw_trigger_guard_condition( extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t max_conditions) { - (void) max_conditions; + (void)context; + (void)max_conditions; rmw_wait_set_t * wait_set = rmw_wait_set_allocate(); CddsWaitset * ws = nullptr; RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); @@ -2639,7 +2719,7 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t std::lock_guard lock(gcdds.lock); // Lazily create dummy guard condition if (gcdds.waitsets.size() == 0) { - if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(context->impl->ppant)) < 0) { + if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(DDS_CYCLONEDDS_HANDLE)) < 0) { RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets"); goto fail_create_dummy; }