diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 4bc8e08..ff10451 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -68,6 +68,12 @@ minor < RMW_VERSION_MINOR || ( \ minor == RMW_VERSION_MINOR && patch <= RMW_VERSION_PATCH)))) +#if RMW_VERSION_GTE(0, 8, 1) && MULTIDOMAIN +#define SUPPORT_LOCALHOST 1 +#else +#define SUPPORT_LOCALHOST 0 +#endif + /* Set to > 0 for printing warnings to stderr for each messages that was taken more than this many ms after writing */ #define REPORT_LATE_MESSAGES 0 @@ -136,6 +142,9 @@ struct CddsNode #if MULTIDOMAIN builtin_readers brd; #endif + /* domain id can be retrieved from the entities, but that requires assuming the call + succeeds, storing it here means we are sure to have it */ + dds_domainid_t domain_id; }; struct CddsPublisher @@ -194,10 +203,58 @@ struct CddsWaitset std::vector srvs; }; +#if SUPPORT_LOCALHOST +struct CddsDomain +{ + /* This RMW implementation currently implements localhost-only by explicitly creating + domains with a configuration that consists of: (1) a hard-coded selection of + "localhost" as the network interface address; (2) followed by the contents of the + CYCLONEDDS_URI environment variable: + + - the "localhost" hostname should resolve to 127.0.0.1 (or equivalent) for IPv4 and + to ::1 for IPv6, so we don't have to worry about which of IPv4 or IPv6 is used (as + would be the case with a numerical IP address), nor do we have to worry about the + name of the loopback interface; + + - if the machine's configuration doesn't properly resolve "localhost", you can still + override via $CYCLONEDDS_URI. + + The CddsDomain type is used to track which domains exist and how many nodes are in + it. Because the domain is instantiated with the first nodes created in that domain, + the other nodes must have the same localhost-only setting. (It bugs out if not.) + Everything resets automatically when the last node in the domain is deleted. + + (It might be better still to for Cyclone to provide "loopback" or something similar + as a generic alias for a loopback interface ...) + + There are a few issues with the current support for creating domains explicitly in + Cyclone, fixing those might relax alter or relax some of the above. */ + + bool localhost_only; + uint32_t n_nodes; + + /* handle of the domain entity, for versions of Cyclone that have an updated version of + dds_create_domain that properly returns a handle; the original version has a value of + 0 in here (DDS_RETCODE_OK) which is never a valid handle */ + dds_entity_t domain_handle; + + /* Default constructor so operator[] can be safely be used to look one up */ + CddsDomain() + : localhost_only(false), n_nodes(0), domain_handle(0) + {} +}; +#endif + struct Cdds { std::mutex lock; +#if SUPPORT_LOCALHOST + /* Map of domain id to per-domain state, used by create/destroy node */ + std::mutex domains_lock; + std::map domains; +#endif + #if !MULTIDOMAIN uint32_t refcount; dds_entity_t ppant; @@ -488,6 +545,84 @@ static void unref_ppant() /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// +#if SUPPORT_LOCALHOST +static void node_gone_from_domain_locked(dds_domainid_t did) +{ + /* The initial support for dds_create_domain in Cyclone results in domains that get + automatically deleted when the last participant in it disappears. Later versions + return a handle and leave it in existence. */ + CddsDomain & dom = gcdds.domains[did]; + assert(dom.n_nodes > 0); + if (--dom.n_nodes == 0) { + if (dom.domain_handle > 0) { + dds_delete(dom.domain_handle); + } + gcdds.domains.erase(did); + } +} + +static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only) +{ + /* return true: n_nodes incremented, localhost_only set correctly, domain exists + " false: n_nodes unchanged, domain left intact if it already existed */ + CddsDomain & dom = gcdds.domains[did]; + if (dom.n_nodes != 0) { + /* Localhost setting must match */ + if (localhost_only == dom.localhost_only) { + dom.n_nodes++; + return true; + } else { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", + "rmw_create_node: attempt at creating localhost-only and non-localhost-only nodes " + "in the same domain"); + return false; + } + } else { + dom.n_nodes = 1; + dom.localhost_only = localhost_only; + + /* Localhost-only: set network interface address (shortened form of config would be + possible, too, but I think it is clearer to spell it out completely). Empty + configuration fragments are ignored, so it is safe to unconditionally append a + comma. */ + std::string config = + localhost_only ? + "localhost" + "," + : + ""; + + /* Emulate default behaviour of Cyclone of reading CYCLONEDDS_URI */ + char * config_from_env = getenv("CYCLONEDDS_URI"); + if (config_from_env != nullptr) { + config += std::string(config_from_env); + } + + if ((dom.domain_handle = dds_create_domain(did, config.c_str())) < 0) { + RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", + "rmw_create_node: failed to create domain, error %s", dds_strretcode(dom.domain_handle)); + node_gone_from_domain_locked(did); + return false; + } else { + return true; + } + } +} +#else +/* Dummy implementation to keep the rest of the code simpler */ +static void node_gone_from_domain_locked(dds_domainid_t did) +{ + static_cast(did); +} + +static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only) +{ + static_cast(did); + static_cast(localhost_only); + return true; +} +#endif + static std::string get_node_user_data(const char * node_name, const char * node_namespace) { return std::string("name=") + std::string(node_name) + @@ -505,9 +640,6 @@ extern "C" rmw_node_t * rmw_create_node( ) { static_cast(context); -#if RMW_VERSION_GTE(0, 8, 1) - static_cast(localhost_only); -#endif RET_NULL_X(name, return nullptr); RET_NULL_X(namespace_, return nullptr); #if MULTIDOMAIN @@ -531,12 +663,25 @@ extern "C" rmw_node_t * rmw_create_node( { return nullptr; } + +#if SUPPORT_LOCALHOST + /* Take domains_lock and hold it until after the participant creation succeeded or + failed: otherwise there is a race with rmw_destroy_node deleting the last participant + and tearing down the domain for versions of Cyclone that implement the original + version of dds_create_domain that doesn't return a handle. */ + std::lock_guard lock(gcdds.domains_lock); +#endif + if (!check_create_domain_locked(did, localhost_only)) { + return nullptr; + } + dds_qos_t * qos = dds_create_qos(); std::string user_data = get_node_user_data(name, namespace_); dds_qset_userdata(qos, user_data.c_str(), user_data.size()); dds_entity_t pp = dds_create_participant(did, qos, nullptr); dds_delete_qos(qos); if (pp < 0) { + node_gone_from_domain_locked(did); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant"); return nullptr; @@ -546,12 +691,14 @@ extern "C" rmw_node_t * rmw_create_node( pair & reuse that */ dds_entity_t pub, sub; if ((pub = dds_create_publisher(pp, nullptr, nullptr)) < 0) { + node_gone_from_domain_locked(did); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher"); dds_delete(pp); return nullptr; } if ((sub = dds_create_subscriber(pp, nullptr, nullptr)) < 0) { + node_gone_from_domain_locked(did); RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber"); dds_delete(pp); @@ -569,6 +716,7 @@ extern "C" rmw_node_t * rmw_create_node( node_impl->pub = pub; node_impl->sub = sub; node_impl->graph_guard_condition = graph_guard_condition; + node_impl->domain_id = did; #if MULTIDOMAIN if (!builtin_readers_init(node_impl->brd, pp, graph_guard_condition)) { @@ -618,6 +766,7 @@ fail_ggc: delete node_impl; fail_node_impl: dds_delete(pp); + node_gone_from_domain_locked(did); return nullptr; } @@ -630,6 +779,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) rmw_free(const_cast(node->name)); rmw_free(const_cast(node->namespace_)); rmw_node_free(node); + #if MULTIDOMAIN /* Must prevent the built-in topic listener from triggering before deleting the graph guard condition. Deleting them first is the easiest. */ @@ -644,10 +794,15 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node) RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); result_ret = RMW_RET_ERROR; } +#if SUPPORT_LOCALHOST + /* prevent race with rmw_create_node (see there) */ + std::lock_guard lock(gcdds.domains_lock); +#endif if (dds_delete(node_impl->pp) < 0) { RMW_SET_ERROR_MSG("failed to destroy DDS participant"); result_ret = RMW_RET_ERROR; } + node_gone_from_domain_locked(node_impl->domain_id); delete node_impl; return result_ret; }