Support localhost-only communications (#60)
* Support localhost-only communications Signed-off-by: Erik Boasson <eb@ilities.com> * Future-proof localhost-only for upcoming Cyclone fix Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
0a1770e1e2
commit
c168334087
1 changed files with 158 additions and 3 deletions
|
@ -68,6 +68,12 @@
|
|||
minor < RMW_VERSION_MINOR || ( \
|
||||
minor == RMW_VERSION_MINOR && patch <= RMW_VERSION_PATCH))))
|
||||
|
||||
#if RMW_VERSION_GTE(0, 8, 1) && MULTIDOMAIN
|
||||
#define SUPPORT_LOCALHOST 1
|
||||
#else
|
||||
#define SUPPORT_LOCALHOST 0
|
||||
#endif
|
||||
|
||||
/* Set to > 0 for printing warnings to stderr for each messages that was taken more than this many
|
||||
ms after writing */
|
||||
#define REPORT_LATE_MESSAGES 0
|
||||
|
@ -136,6 +142,9 @@ struct CddsNode
|
|||
#if MULTIDOMAIN
|
||||
builtin_readers brd;
|
||||
#endif
|
||||
/* domain id can be retrieved from the entities, but that requires assuming the call
|
||||
succeeds, storing it here means we are sure to have it */
|
||||
dds_domainid_t domain_id;
|
||||
};
|
||||
|
||||
struct CddsPublisher
|
||||
|
@ -194,10 +203,58 @@ struct CddsWaitset
|
|||
std::vector<CddsService *> srvs;
|
||||
};
|
||||
|
||||
#if SUPPORT_LOCALHOST
|
||||
struct CddsDomain
|
||||
{
|
||||
/* This RMW implementation currently implements localhost-only by explicitly creating
|
||||
domains with a configuration that consists of: (1) a hard-coded selection of
|
||||
"localhost" as the network interface address; (2) followed by the contents of the
|
||||
CYCLONEDDS_URI environment variable:
|
||||
|
||||
- the "localhost" hostname should resolve to 127.0.0.1 (or equivalent) for IPv4 and
|
||||
to ::1 for IPv6, so we don't have to worry about which of IPv4 or IPv6 is used (as
|
||||
would be the case with a numerical IP address), nor do we have to worry about the
|
||||
name of the loopback interface;
|
||||
|
||||
- if the machine's configuration doesn't properly resolve "localhost", you can still
|
||||
override via $CYCLONEDDS_URI.
|
||||
|
||||
The CddsDomain type is used to track which domains exist and how many nodes are in
|
||||
it. Because the domain is instantiated with the first nodes created in that domain,
|
||||
the other nodes must have the same localhost-only setting. (It bugs out if not.)
|
||||
Everything resets automatically when the last node in the domain is deleted.
|
||||
|
||||
(It might be better still to for Cyclone to provide "loopback" or something similar
|
||||
as a generic alias for a loopback interface ...)
|
||||
|
||||
There are a few issues with the current support for creating domains explicitly in
|
||||
Cyclone, fixing those might relax alter or relax some of the above. */
|
||||
|
||||
bool localhost_only;
|
||||
uint32_t n_nodes;
|
||||
|
||||
/* handle of the domain entity, for versions of Cyclone that have an updated version of
|
||||
dds_create_domain that properly returns a handle; the original version has a value of
|
||||
0 in here (DDS_RETCODE_OK) which is never a valid handle */
|
||||
dds_entity_t domain_handle;
|
||||
|
||||
/* Default constructor so operator[] can be safely be used to look one up */
|
||||
CddsDomain()
|
||||
: localhost_only(false), n_nodes(0), domain_handle(0)
|
||||
{}
|
||||
};
|
||||
#endif
|
||||
|
||||
struct Cdds
|
||||
{
|
||||
std::mutex lock;
|
||||
|
||||
#if SUPPORT_LOCALHOST
|
||||
/* Map of domain id to per-domain state, used by create/destroy node */
|
||||
std::mutex domains_lock;
|
||||
std::map<dds_domainid_t, CddsDomain> domains;
|
||||
#endif
|
||||
|
||||
#if !MULTIDOMAIN
|
||||
uint32_t refcount;
|
||||
dds_entity_t ppant;
|
||||
|
@ -488,6 +545,84 @@ static void unref_ppant()
|
|||
/////////// ///////////
|
||||
/////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#if SUPPORT_LOCALHOST
|
||||
static void node_gone_from_domain_locked(dds_domainid_t did)
|
||||
{
|
||||
/* The initial support for dds_create_domain in Cyclone results in domains that get
|
||||
automatically deleted when the last participant in it disappears. Later versions
|
||||
return a handle and leave it in existence. */
|
||||
CddsDomain & dom = gcdds.domains[did];
|
||||
assert(dom.n_nodes > 0);
|
||||
if (--dom.n_nodes == 0) {
|
||||
if (dom.domain_handle > 0) {
|
||||
dds_delete(dom.domain_handle);
|
||||
}
|
||||
gcdds.domains.erase(did);
|
||||
}
|
||||
}
|
||||
|
||||
static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only)
|
||||
{
|
||||
/* return true: n_nodes incremented, localhost_only set correctly, domain exists
|
||||
" false: n_nodes unchanged, domain left intact if it already existed */
|
||||
CddsDomain & dom = gcdds.domains[did];
|
||||
if (dom.n_nodes != 0) {
|
||||
/* Localhost setting must match */
|
||||
if (localhost_only == dom.localhost_only) {
|
||||
dom.n_nodes++;
|
||||
return true;
|
||||
} else {
|
||||
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
|
||||
"rmw_create_node: attempt at creating localhost-only and non-localhost-only nodes "
|
||||
"in the same domain");
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
dom.n_nodes = 1;
|
||||
dom.localhost_only = localhost_only;
|
||||
|
||||
/* Localhost-only: set network interface address (shortened form of config would be
|
||||
possible, too, but I think it is clearer to spell it out completely). Empty
|
||||
configuration fragments are ignored, so it is safe to unconditionally append a
|
||||
comma. */
|
||||
std::string config =
|
||||
localhost_only ?
|
||||
"<CycloneDDS><Domain><General><NetworkInterfaceAddress>localhost</NetworkInterfaceAddress>"
|
||||
"</General></Domain></CycloneDDS>,"
|
||||
:
|
||||
"";
|
||||
|
||||
/* Emulate default behaviour of Cyclone of reading CYCLONEDDS_URI */
|
||||
char * config_from_env = getenv("CYCLONEDDS_URI");
|
||||
if (config_from_env != nullptr) {
|
||||
config += std::string(config_from_env);
|
||||
}
|
||||
|
||||
if ((dom.domain_handle = dds_create_domain(did, config.c_str())) < 0) {
|
||||
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
|
||||
"rmw_create_node: failed to create domain, error %s", dds_strretcode(dom.domain_handle));
|
||||
node_gone_from_domain_locked(did);
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
/* Dummy implementation to keep the rest of the code simpler */
|
||||
static void node_gone_from_domain_locked(dds_domainid_t did)
|
||||
{
|
||||
static_cast<void>(did);
|
||||
}
|
||||
|
||||
static bool check_create_domain_locked(dds_domainid_t did, bool localhost_only)
|
||||
{
|
||||
static_cast<void>(did);
|
||||
static_cast<void>(localhost_only);
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
|
||||
static std::string get_node_user_data(const char * node_name, const char * node_namespace)
|
||||
{
|
||||
return std::string("name=") + std::string(node_name) +
|
||||
|
@ -505,9 +640,6 @@ extern "C" rmw_node_t * rmw_create_node(
|
|||
)
|
||||
{
|
||||
static_cast<void>(context);
|
||||
#if RMW_VERSION_GTE(0, 8, 1)
|
||||
static_cast<void>(localhost_only);
|
||||
#endif
|
||||
RET_NULL_X(name, return nullptr);
|
||||
RET_NULL_X(namespace_, return nullptr);
|
||||
#if MULTIDOMAIN
|
||||
|
@ -531,12 +663,25 @@ extern "C" rmw_node_t * rmw_create_node(
|
|||
{
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
#if SUPPORT_LOCALHOST
|
||||
/* Take domains_lock and hold it until after the participant creation succeeded or
|
||||
failed: otherwise there is a race with rmw_destroy_node deleting the last participant
|
||||
and tearing down the domain for versions of Cyclone that implement the original
|
||||
version of dds_create_domain that doesn't return a handle. */
|
||||
std::lock_guard<std::mutex> lock(gcdds.domains_lock);
|
||||
#endif
|
||||
if (!check_create_domain_locked(did, localhost_only)) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
dds_qos_t * qos = dds_create_qos();
|
||||
std::string user_data = get_node_user_data(name, namespace_);
|
||||
dds_qset_userdata(qos, user_data.c_str(), user_data.size());
|
||||
dds_entity_t pp = dds_create_participant(did, qos, nullptr);
|
||||
dds_delete_qos(qos);
|
||||
if (pp < 0) {
|
||||
node_gone_from_domain_locked(did);
|
||||
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
|
||||
"rmw_create_node: failed to create DDS participant");
|
||||
return nullptr;
|
||||
|
@ -546,12 +691,14 @@ extern "C" rmw_node_t * rmw_create_node(
|
|||
pair & reuse that */
|
||||
dds_entity_t pub, sub;
|
||||
if ((pub = dds_create_publisher(pp, nullptr, nullptr)) < 0) {
|
||||
node_gone_from_domain_locked(did);
|
||||
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
|
||||
"rmw_create_node: failed to create DDS publisher");
|
||||
dds_delete(pp);
|
||||
return nullptr;
|
||||
}
|
||||
if ((sub = dds_create_subscriber(pp, nullptr, nullptr)) < 0) {
|
||||
node_gone_from_domain_locked(did);
|
||||
RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp",
|
||||
"rmw_create_node: failed to create DDS subscriber");
|
||||
dds_delete(pp);
|
||||
|
@ -569,6 +716,7 @@ extern "C" rmw_node_t * rmw_create_node(
|
|||
node_impl->pub = pub;
|
||||
node_impl->sub = sub;
|
||||
node_impl->graph_guard_condition = graph_guard_condition;
|
||||
node_impl->domain_id = did;
|
||||
|
||||
#if MULTIDOMAIN
|
||||
if (!builtin_readers_init(node_impl->brd, pp, graph_guard_condition)) {
|
||||
|
@ -618,6 +766,7 @@ fail_ggc:
|
|||
delete node_impl;
|
||||
fail_node_impl:
|
||||
dds_delete(pp);
|
||||
node_gone_from_domain_locked(did);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
|
@ -630,6 +779,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node)
|
|||
rmw_free(const_cast<char *>(node->name));
|
||||
rmw_free(const_cast<char *>(node->namespace_));
|
||||
rmw_node_free(node);
|
||||
|
||||
#if MULTIDOMAIN
|
||||
/* Must prevent the built-in topic listener from triggering before deleting the graph guard
|
||||
condition. Deleting them first is the easiest. */
|
||||
|
@ -644,10 +794,15 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node)
|
|||
RMW_SET_ERROR_MSG("failed to destroy graph guard condition");
|
||||
result_ret = RMW_RET_ERROR;
|
||||
}
|
||||
#if SUPPORT_LOCALHOST
|
||||
/* prevent race with rmw_create_node (see there) */
|
||||
std::lock_guard<std::mutex> lock(gcdds.domains_lock);
|
||||
#endif
|
||||
if (dds_delete(node_impl->pp) < 0) {
|
||||
RMW_SET_ERROR_MSG("failed to destroy DDS participant");
|
||||
result_ret = RMW_RET_ERROR;
|
||||
}
|
||||
node_gone_from_domain_locked(node_impl->domain_id);
|
||||
delete node_impl;
|
||||
return result_ret;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue