Initialize participant on first use. Destroy participant after last node is destroyed. (#176)
* Initialize participant on first use. Destroy participant after last node is destroyed Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com> * Please linters Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com> * Solve problems with guard conditions Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com> * Address peer review comments Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com> * Address peer review comments Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com> * Used DDS_CYCLONEDDS_HANDLE to create all guard conditions Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com> * Increase ref count always Signed-off-by: Ivan Santiago Paunovic <ivanpauno@ekumenlabs.com>
This commit is contained in:
parent
8e14104246
commit
f820994aab
1 changed files with 246 additions and 166 deletions
|
@ -151,7 +151,7 @@ static rmw_subscription_t * create_subscription(
|
||||||
);
|
);
|
||||||
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription);
|
static rmw_ret_t destroy_subscription(rmw_subscription_t * subscription);
|
||||||
|
|
||||||
static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl);
|
static rmw_guard_condition_t * create_guard_condition();
|
||||||
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc);
|
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * gc);
|
||||||
|
|
||||||
struct CddsDomain;
|
struct CddsDomain;
|
||||||
|
@ -254,6 +254,10 @@ struct rmw_context_impl_t
|
||||||
dds_entity_t dds_pub;
|
dds_entity_t dds_pub;
|
||||||
dds_entity_t dds_sub;
|
dds_entity_t dds_sub;
|
||||||
|
|
||||||
|
/* Participant reference count*/
|
||||||
|
size_t node_count{0};
|
||||||
|
std::mutex initialization_mutex;
|
||||||
|
|
||||||
rmw_context_impl_t()
|
rmw_context_impl_t()
|
||||||
: common(), domain_id(UINT32_MAX), ppant(0)
|
: common(), domain_id(UINT32_MAX), ppant(0)
|
||||||
{
|
{
|
||||||
|
@ -263,33 +267,29 @@ struct rmw_context_impl_t
|
||||||
common.pub = nullptr;
|
common.pub = nullptr;
|
||||||
common.sub = nullptr;
|
common.sub = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initializes the participant, if it wasn't done already.
|
||||||
|
// node_count is increased
|
||||||
|
rmw_ret_t
|
||||||
|
init(rmw_init_options_t * options);
|
||||||
|
|
||||||
|
// Destroys the participant, when node_count reaches 0.
|
||||||
|
rmw_ret_t
|
||||||
|
fini();
|
||||||
|
|
||||||
~rmw_context_impl_t()
|
~rmw_context_impl_t()
|
||||||
{
|
{
|
||||||
discovery_thread_stop(common);
|
if (0u != this->node_count) {
|
||||||
common.graph_cache.clear_on_change_callback();
|
|
||||||
if (common.graph_guard_condition) {
|
|
||||||
destroy_guard_condition(common.graph_guard_condition);
|
|
||||||
}
|
|
||||||
if (common.pub) {
|
|
||||||
destroy_publisher(common.pub);
|
|
||||||
}
|
|
||||||
if (common.sub) {
|
|
||||||
destroy_subscription(common.sub);
|
|
||||||
}
|
|
||||||
if (ppant > 0 && dds_delete(ppant) < 0) {
|
|
||||||
RCUTILS_SAFE_FWRITE_TO_STDERR(
|
RCUTILS_SAFE_FWRITE_TO_STDERR(
|
||||||
"Failed to destroy domain in destructor\n");
|
"Not all nodes were finished before finishing the context\n."
|
||||||
}
|
"Ensure `rcl_node_fini` is called for all nodes before `rcl_context_fini`,"
|
||||||
if (domain_id != UINT32_MAX) {
|
"to avoid leaking.\n");
|
||||||
std::lock_guard<std::mutex> lock(gcdds.domains_lock);
|
|
||||||
CddsDomain & dom = gcdds.domains[domain_id];
|
|
||||||
assert(dom.refcount > 0);
|
|
||||||
if (--dom.refcount == 0) {
|
|
||||||
static_cast<void>(dds_delete(dom.domain_handle));
|
|
||||||
gcdds.domains.erase(domain_id);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void
|
||||||
|
clean_up();
|
||||||
};
|
};
|
||||||
|
|
||||||
struct CddsNode
|
struct CddsNode
|
||||||
|
@ -583,7 +583,7 @@ static void discovery_thread(rmw_context_impl_t * impl)
|
||||||
dds_entity_t ws;
|
dds_entity_t ws;
|
||||||
/* deleting ppant will delete waitset as well, so there is no real need to delete
|
/* deleting ppant will delete waitset as well, so there is no real need to delete
|
||||||
the waitset here on error, but it is more hygienic */
|
the waitset here on error, but it is more hygienic */
|
||||||
if ((ws = dds_create_waitset(impl->ppant)) < 0) {
|
if ((ws = dds_create_waitset(DDS_CYCLONEDDS_HANDLE)) < 0) {
|
||||||
RCUTILS_SAFE_FWRITE_TO_STDERR(
|
RCUTILS_SAFE_FWRITE_TO_STDERR(
|
||||||
"ros discovery info listener thread: failed to create waitset, will shutdown ...\n");
|
"ros discovery info listener thread: failed to create waitset, will shutdown ...\n");
|
||||||
return;
|
return;
|
||||||
|
@ -637,7 +637,7 @@ static rmw_ret_t discovery_thread_start(rmw_context_impl_t * impl)
|
||||||
{
|
{
|
||||||
auto common_context = &impl->common;
|
auto common_context = &impl->common;
|
||||||
common_context->thread_is_running.store(true);
|
common_context->thread_is_running.store(true);
|
||||||
common_context->listener_thread_gc = create_guard_condition(impl);
|
common_context->listener_thread_gc = create_guard_condition();
|
||||||
if (common_context->listener_thread_gc) {
|
if (common_context->listener_thread_gc) {
|
||||||
try {
|
try {
|
||||||
common_context->listener_thread = std::thread(discovery_thread, impl);
|
common_context->listener_thread = std::thread(discovery_thread, impl);
|
||||||
|
@ -745,6 +745,21 @@ static bool check_create_domain(dds_domainid_t did, rmw_localhost_only_t localho
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static
|
||||||
|
void
|
||||||
|
check_destroy_domain(dds_domainid_t domain_id)
|
||||||
|
{
|
||||||
|
if (domain_id != UINT32_MAX) {
|
||||||
|
std::lock_guard<std::mutex> lock(gcdds.domains_lock);
|
||||||
|
CddsDomain & dom = gcdds.domains[domain_id];
|
||||||
|
assert(dom.refcount > 0);
|
||||||
|
if (--dom.refcount == 0) {
|
||||||
|
static_cast<void>(dds_delete(dom.domain_handle));
|
||||||
|
gcdds.domains.erase(domain_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#if RMW_SUPPORT_SECURITY
|
#if RMW_SUPPORT_SECURITY
|
||||||
/* Returns the full URI of a security file properly formatted for DDS */
|
/* Returns the full URI of a security file properly formatted for DDS */
|
||||||
bool get_security_file_URI(
|
bool get_security_file_URI(
|
||||||
|
@ -816,6 +831,7 @@ void finalize_security_file_URIs(
|
||||||
#endif /* RMW_SUPPORT_SECURITY */
|
#endif /* RMW_SUPPORT_SECURITY */
|
||||||
|
|
||||||
/* Attempt to set all the qos properties needed to enable DDS security */
|
/* Attempt to set all the qos properties needed to enable DDS security */
|
||||||
|
static
|
||||||
rmw_ret_t configure_qos_for_security(
|
rmw_ret_t configure_qos_for_security(
|
||||||
dds_qos_t * qos,
|
dds_qos_t * qos,
|
||||||
const rmw_security_options_t * security_options)
|
const rmw_security_options_t * security_options)
|
||||||
|
@ -861,6 +877,194 @@ rmw_ret_t configure_qos_for_security(
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rmw_ret_t
|
||||||
|
rmw_context_impl_t::init(rmw_init_options_t * options)
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(initialization_mutex);
|
||||||
|
if (0u != this->node_count) {
|
||||||
|
// initialization has already been done
|
||||||
|
this->node_count++;
|
||||||
|
return RMW_RET_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Take domains_lock and hold it until after the participant creation succeeded or
|
||||||
|
failed: otherwise there is a race with rmw_destroy_node deleting the last participant
|
||||||
|
and tearing down the domain for versions of Cyclone that implement the original
|
||||||
|
version of dds_create_domain that doesn't return a handle. */
|
||||||
|
this->domain_id = static_cast<dds_domainid_t>(options->domain_id);
|
||||||
|
if (!check_create_domain(this->domain_id, options->localhost_only)) {
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<dds_qos_t, std::function<void(dds_qos_t *)>>
|
||||||
|
ppant_qos(dds_create_qos(), &dds_delete_qos);
|
||||||
|
if (ppant_qos == nullptr) {
|
||||||
|
this->clean_up();
|
||||||
|
return RMW_RET_BAD_ALLOC;
|
||||||
|
}
|
||||||
|
std::string user_data = std::string("enclave=") + std::string(
|
||||||
|
options->enclave) + std::string(";");
|
||||||
|
dds_qset_userdata(ppant_qos.get(), user_data.c_str(), user_data.size());
|
||||||
|
if (configure_qos_for_security(
|
||||||
|
ppant_qos.get(),
|
||||||
|
&options->security_options) != RMW_RET_OK)
|
||||||
|
{
|
||||||
|
if (RMW_SECURITY_ENFORCEMENT_ENFORCE == options->security_options.enforce_security) {
|
||||||
|
this->clean_up();
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this->ppant = dds_create_participant(this->domain_id, ppant_qos.get(), nullptr);
|
||||||
|
if (this->ppant < 0) {
|
||||||
|
this->clean_up();
|
||||||
|
RCUTILS_LOG_ERROR_NAMED(
|
||||||
|
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant");
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Create readers for DDS built-in topics for monitoring discovery */
|
||||||
|
if ((this->rd_participant =
|
||||||
|
dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0)
|
||||||
|
{
|
||||||
|
this->clean_up();
|
||||||
|
RCUTILS_LOG_ERROR_NAMED(
|
||||||
|
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSParticipant reader");
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
if ((this->rd_subscription =
|
||||||
|
dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, nullptr, nullptr)) < 0)
|
||||||
|
{
|
||||||
|
this->clean_up();
|
||||||
|
RCUTILS_LOG_ERROR_NAMED(
|
||||||
|
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSSubscription reader");
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
if ((this->rd_publication =
|
||||||
|
dds_create_reader(this->ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, nullptr, nullptr)) < 0)
|
||||||
|
{
|
||||||
|
this->clean_up();
|
||||||
|
RCUTILS_LOG_ERROR_NAMED(
|
||||||
|
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSPublication reader");
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
/* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers
|
||||||
|
to be created for RMW publishers/subscriptions. */
|
||||||
|
if ((this->dds_pub = dds_create_publisher(this->ppant, nullptr, nullptr)) < 0) {
|
||||||
|
this->clean_up();
|
||||||
|
RCUTILS_LOG_ERROR_NAMED(
|
||||||
|
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher");
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
if ((this->dds_sub = dds_create_subscriber(this->ppant, nullptr, nullptr)) < 0) {
|
||||||
|
this->clean_up();
|
||||||
|
RCUTILS_LOG_ERROR_NAMED(
|
||||||
|
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber");
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
rmw_qos_profile_t pubsub_qos = rmw_qos_profile_default;
|
||||||
|
pubsub_qos.avoid_ros_namespace_conventions = true;
|
||||||
|
pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST;
|
||||||
|
pubsub_qos.depth = 1;
|
||||||
|
pubsub_qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
|
||||||
|
pubsub_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE;
|
||||||
|
|
||||||
|
/* Create RMW publisher/subscription/guard condition used by rmw_dds_common
|
||||||
|
discovery */
|
||||||
|
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
|
||||||
|
this->common.pub = create_publisher(
|
||||||
|
this->ppant, this->dds_pub,
|
||||||
|
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
|
||||||
|
"ros_discovery_info",
|
||||||
|
&pubsub_qos,
|
||||||
|
&publisher_options);
|
||||||
|
if (this->common.pub == nullptr) {
|
||||||
|
this->clean_up();
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
|
||||||
|
subscription_options.ignore_local_publications = true;
|
||||||
|
// FIXME: keyed topics => KEEP_LAST and depth 1.
|
||||||
|
pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL;
|
||||||
|
this->common.sub = create_subscription(
|
||||||
|
this->ppant, this->dds_sub,
|
||||||
|
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
|
||||||
|
"ros_discovery_info",
|
||||||
|
&pubsub_qos,
|
||||||
|
&subscription_options);
|
||||||
|
if (this->common.sub == nullptr) {
|
||||||
|
this->clean_up();
|
||||||
|
return RMW_RET_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
this->common.graph_guard_condition = create_guard_condition();
|
||||||
|
if (this->common.graph_guard_condition == nullptr) {
|
||||||
|
this->clean_up();
|
||||||
|
return RMW_RET_BAD_ALLOC;
|
||||||
|
}
|
||||||
|
|
||||||
|
this->common.graph_cache.set_on_change_callback(
|
||||||
|
[guard_condition = this->common.graph_guard_condition]() {
|
||||||
|
rmw_ret_t ret = rmw_trigger_guard_condition(guard_condition);
|
||||||
|
if (ret != RMW_RET_OK) {
|
||||||
|
RMW_SET_ERROR_MSG("graph cache on_change_callback failed to trigger guard condition");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
get_entity_gid(this->ppant, this->common.gid);
|
||||||
|
this->common.graph_cache.add_participant(this->common.gid, options->enclave);
|
||||||
|
|
||||||
|
// One could also use a set of listeners instead of a thread for maintaining the graph cache:
|
||||||
|
// - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock
|
||||||
|
// caused by the graph cache's mutex already having been locked by (e.g.) rmw_create_node.
|
||||||
|
// - Whatever the graph cache implementation does, it shouldn't involve much more than local state
|
||||||
|
// updates and triggering a guard condition, and so that should be safe.
|
||||||
|
// however, the graph cache updates could be expensive, and so performing those operations on
|
||||||
|
// the thread receiving data from the network may not be wise.
|
||||||
|
rmw_ret_t ret;
|
||||||
|
if ((ret = discovery_thread_start(this)) != RMW_RET_OK) {
|
||||||
|
this->clean_up();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
++this->node_count;
|
||||||
|
return RMW_RET_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
rmw_context_impl_t::clean_up()
|
||||||
|
{
|
||||||
|
discovery_thread_stop(common);
|
||||||
|
common.graph_cache.clear_on_change_callback();
|
||||||
|
if (common.graph_guard_condition) {
|
||||||
|
destroy_guard_condition(common.graph_guard_condition);
|
||||||
|
}
|
||||||
|
if (common.pub) {
|
||||||
|
destroy_publisher(common.pub);
|
||||||
|
}
|
||||||
|
if (common.sub) {
|
||||||
|
destroy_subscription(common.sub);
|
||||||
|
}
|
||||||
|
if (ppant > 0 && dds_delete(ppant) < 0) {
|
||||||
|
RCUTILS_SAFE_FWRITE_TO_STDERR(
|
||||||
|
"Failed to destroy domain in destructor\n");
|
||||||
|
}
|
||||||
|
check_destroy_domain(domain_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
rmw_ret_t
|
||||||
|
rmw_context_impl_t::fini()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> guard(initialization_mutex);
|
||||||
|
if (0u != --this->node_count) {
|
||||||
|
// destruction shouldn't happen yet
|
||||||
|
return RMW_RET_OK;
|
||||||
|
}
|
||||||
|
this->clean_up();
|
||||||
|
return RMW_RET_OK;
|
||||||
|
}
|
||||||
|
|
||||||
extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
|
extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t * context)
|
||||||
{
|
{
|
||||||
rmw_ret_t ret;
|
rmw_ret_t ret;
|
||||||
|
@ -881,7 +1085,6 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
|
||||||
"rmw_cyclonedds_cpp", "rmw_create_node: domain id out of range");
|
"rmw_cyclonedds_cpp", "rmw_create_node: domain id out of range");
|
||||||
return RMW_RET_INVALID_ARGUMENT;
|
return RMW_RET_INVALID_ARGUMENT;
|
||||||
}
|
}
|
||||||
const dds_domainid_t domain_id = static_cast<dds_domainid_t>(options->domain_id);
|
|
||||||
|
|
||||||
context->instance_id = options->instance_id;
|
context->instance_id = options->instance_id;
|
||||||
context->implementation_identifier = eclipse_cyclonedds_identifier;
|
context->implementation_identifier = eclipse_cyclonedds_identifier;
|
||||||
|
@ -891,144 +1094,12 @@ extern "C" rmw_ret_t rmw_init(const rmw_init_options_t * options, rmw_context_t
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<rmw_context_impl_t> impl(new(std::nothrow) rmw_context_impl_t());
|
rmw_context_impl_t * impl = new(std::nothrow) rmw_context_impl_t();
|
||||||
if (impl == nullptr) {
|
if (nullptr == impl) {
|
||||||
return RMW_RET_BAD_ALLOC;
|
return RMW_RET_BAD_ALLOC;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Take domains_lock and hold it until after the participant creation succeeded or
|
context->impl = impl;
|
||||||
failed: otherwise there is a race with rmw_destroy_node deleting the last participant
|
|
||||||
and tearing down the domain for versions of Cyclone that implement the original
|
|
||||||
version of dds_create_domain that doesn't return a handle. */
|
|
||||||
if (!check_create_domain(domain_id, options->localhost_only)) {
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Once the domain id is set in impl, impl's destructor will take care of unref'ing
|
|
||||||
the domain */
|
|
||||||
impl->domain_id = domain_id;
|
|
||||||
|
|
||||||
std::unique_ptr<dds_qos_t, std::function<void(dds_qos_t *)>>
|
|
||||||
ppant_qos(dds_create_qos(), &dds_delete_qos);
|
|
||||||
if (ppant_qos == nullptr) {
|
|
||||||
return RMW_RET_BAD_ALLOC;
|
|
||||||
}
|
|
||||||
std::string user_data = std::string("enclave=") + std::string(
|
|
||||||
context->options.enclave) + std::string(";");
|
|
||||||
dds_qset_userdata(ppant_qos.get(), user_data.c_str(), user_data.size());
|
|
||||||
if (configure_qos_for_security(
|
|
||||||
ppant_qos.get(),
|
|
||||||
&context->options.security_options) != RMW_RET_OK)
|
|
||||||
{
|
|
||||||
if (context->options.security_options.enforce_security == RMW_SECURITY_ENFORCEMENT_ENFORCE) {
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
impl->ppant = dds_create_participant(domain_id, ppant_qos.get(), nullptr);
|
|
||||||
if (impl->ppant < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED(
|
|
||||||
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS participant");
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Create readers for DDS built-in topics for monitoring discovery */
|
|
||||||
if ((impl->rd_participant =
|
|
||||||
dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, nullptr, nullptr)) < 0)
|
|
||||||
{
|
|
||||||
RCUTILS_LOG_ERROR_NAMED(
|
|
||||||
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSParticipant reader");
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
if ((impl->rd_subscription =
|
|
||||||
dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, nullptr, nullptr)) < 0)
|
|
||||||
{
|
|
||||||
RCUTILS_LOG_ERROR_NAMED(
|
|
||||||
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSSubscription reader");
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
if ((impl->rd_publication =
|
|
||||||
dds_create_reader(impl->ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, nullptr, nullptr)) < 0)
|
|
||||||
{
|
|
||||||
RCUTILS_LOG_ERROR_NAMED(
|
|
||||||
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DCPSPublication reader");
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Create DDS publisher/subscriber objects that will be used for all DDS writers/readers
|
|
||||||
to be created for RMW publishers/subscriptions. */
|
|
||||||
if ((impl->dds_pub = dds_create_publisher(impl->ppant, nullptr, nullptr)) < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED(
|
|
||||||
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS publisher");
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
if ((impl->dds_sub = dds_create_subscriber(impl->ppant, nullptr, nullptr)) < 0) {
|
|
||||||
RCUTILS_LOG_ERROR_NAMED(
|
|
||||||
"rmw_cyclonedds_cpp", "rmw_create_node: failed to create DDS subscriber");
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
rmw_qos_profile_t pubsub_qos = rmw_qos_profile_default;
|
|
||||||
pubsub_qos.avoid_ros_namespace_conventions = true;
|
|
||||||
pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST;
|
|
||||||
pubsub_qos.depth = 1;
|
|
||||||
pubsub_qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
|
|
||||||
pubsub_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE;
|
|
||||||
|
|
||||||
/* Create RMW publisher/subscription/guard condition used by rmw_dds_common
|
|
||||||
discovery */
|
|
||||||
rmw_publisher_options_t publisher_options = rmw_get_default_publisher_options();
|
|
||||||
impl->common.pub = create_publisher(
|
|
||||||
impl->ppant, impl->dds_pub,
|
|
||||||
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
|
|
||||||
"ros_discovery_info",
|
|
||||||
&pubsub_qos,
|
|
||||||
&publisher_options);
|
|
||||||
if (impl->common.pub == nullptr) {
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
rmw_subscription_options_t subscription_options = rmw_get_default_subscription_options();
|
|
||||||
subscription_options.ignore_local_publications = true;
|
|
||||||
// FIXME: keyed topics => KEEP_LAST and depth 1.
|
|
||||||
pubsub_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_ALL;
|
|
||||||
impl->common.sub = create_subscription(
|
|
||||||
impl->ppant, impl->dds_sub,
|
|
||||||
rosidl_typesupport_cpp::get_message_type_support_handle<ParticipantEntitiesInfo>(),
|
|
||||||
"ros_discovery_info",
|
|
||||||
&pubsub_qos,
|
|
||||||
&subscription_options);
|
|
||||||
if (impl->common.sub == nullptr) {
|
|
||||||
return RMW_RET_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl->common.graph_guard_condition = create_guard_condition(impl.get());
|
|
||||||
if (impl->common.graph_guard_condition == nullptr) {
|
|
||||||
return RMW_RET_BAD_ALLOC;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl->common.graph_cache.set_on_change_callback(
|
|
||||||
[guard_condition = impl->common.graph_guard_condition]() {
|
|
||||||
rmw_ret_t ret = rmw_trigger_guard_condition(guard_condition);
|
|
||||||
if (ret != RMW_RET_OK) {
|
|
||||||
RMW_SET_ERROR_MSG("graph cache on_change_callback failed to trigger guard condition");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
get_entity_gid(impl->ppant, impl->common.gid);
|
|
||||||
impl->common.graph_cache.add_participant(impl->common.gid, context->options.enclave);
|
|
||||||
|
|
||||||
// One could also use a set of listeners instead of a thread for maintaining the graph cache:
|
|
||||||
// - Locally published samples shouldn't make it to the reader, so there shouldn't be a deadlock
|
|
||||||
// caused by the graph cache's mutex already having been locked by (e.g.) rmw_create_node.
|
|
||||||
// - Whatever the graph cache implementation does, it shouldn't involve much more than local state
|
|
||||||
// updates and triggering a guard condition, and so that should be safe.
|
|
||||||
// however, the graph cache updates could be expensive, and so performing those operations on
|
|
||||||
// the thread receiving data from the network may not be wise.
|
|
||||||
if ((ret = discovery_thread_start(impl.get())) != RMW_RET_OK) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
context->impl = impl.release();
|
|
||||||
return RMW_RET_OK;
|
return RMW_RET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1091,6 +1162,11 @@ extern "C" rmw_node_t * rmw_create_node(
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret = context->impl->init(&context->options);
|
||||||
|
if (RMW_RET_OK != ret) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
auto * node_impl = new CddsNode();
|
auto * node_impl = new CddsNode();
|
||||||
rmw_node_t * node_handle = nullptr;
|
rmw_node_t * node_handle = nullptr;
|
||||||
RET_ALLOC_X(node_impl, goto fail_node_impl);
|
RET_ALLOC_X(node_impl, goto fail_node_impl);
|
||||||
|
@ -1142,6 +1218,7 @@ fail_node_handle_name:
|
||||||
fail_node_handle:
|
fail_node_handle:
|
||||||
delete node_impl;
|
delete node_impl;
|
||||||
fail_node_impl:
|
fail_node_impl:
|
||||||
|
context->impl->fini();
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1168,6 +1245,7 @@ extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t * node)
|
||||||
|
|
||||||
rmw_free(const_cast<char *>(node->name));
|
rmw_free(const_cast<char *>(node->name));
|
||||||
rmw_free(const_cast<char *>(node->namespace_));
|
rmw_free(const_cast<char *>(node->namespace_));
|
||||||
|
node->context->impl->fini();
|
||||||
rmw_node_free(node);
|
rmw_node_free(node);
|
||||||
delete node_impl;
|
delete node_impl;
|
||||||
return result_ret;
|
return result_ret;
|
||||||
|
@ -2562,11 +2640,11 @@ extern "C" rmw_ret_t rmw_take_event(
|
||||||
/////////// ///////////
|
/////////// ///////////
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
static rmw_guard_condition_t * create_guard_condition(rmw_context_impl_t * impl)
|
static rmw_guard_condition_t * create_guard_condition()
|
||||||
{
|
{
|
||||||
rmw_guard_condition_t * guard_condition_handle;
|
rmw_guard_condition_t * guard_condition_handle;
|
||||||
auto * gcond_impl = new CddsGuardCondition();
|
auto * gcond_impl = new CddsGuardCondition();
|
||||||
if ((gcond_impl->gcondh = dds_create_guardcondition(impl->ppant)) < 0) {
|
if ((gcond_impl->gcondh = dds_create_guardcondition(DDS_CYCLONEDDS_HANDLE)) < 0) {
|
||||||
RMW_SET_ERROR_MSG("failed to create guardcondition");
|
RMW_SET_ERROR_MSG("failed to create guardcondition");
|
||||||
goto fail_guardcond;
|
goto fail_guardcond;
|
||||||
}
|
}
|
||||||
|
@ -2582,7 +2660,8 @@ fail_guardcond:
|
||||||
|
|
||||||
extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * context)
|
extern "C" rmw_guard_condition_t * rmw_create_guard_condition(rmw_context_t * context)
|
||||||
{
|
{
|
||||||
return create_guard_condition(context->impl);
|
(void)context;
|
||||||
|
return create_guard_condition();
|
||||||
}
|
}
|
||||||
|
|
||||||
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * guard_condition_handle)
|
static rmw_ret_t destroy_guard_condition(rmw_guard_condition_t * guard_condition_handle)
|
||||||
|
@ -2612,7 +2691,8 @@ extern "C" rmw_ret_t rmw_trigger_guard_condition(
|
||||||
|
|
||||||
extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t max_conditions)
|
extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t max_conditions)
|
||||||
{
|
{
|
||||||
(void) max_conditions;
|
(void)context;
|
||||||
|
(void)max_conditions;
|
||||||
rmw_wait_set_t * wait_set = rmw_wait_set_allocate();
|
rmw_wait_set_t * wait_set = rmw_wait_set_allocate();
|
||||||
CddsWaitset * ws = nullptr;
|
CddsWaitset * ws = nullptr;
|
||||||
RET_ALLOC_X(wait_set, goto fail_alloc_wait_set);
|
RET_ALLOC_X(wait_set, goto fail_alloc_wait_set);
|
||||||
|
@ -2639,7 +2719,7 @@ extern "C" rmw_wait_set_t * rmw_create_wait_set(rmw_context_t * context, size_t
|
||||||
std::lock_guard<std::mutex> lock(gcdds.lock);
|
std::lock_guard<std::mutex> lock(gcdds.lock);
|
||||||
// Lazily create dummy guard condition
|
// Lazily create dummy guard condition
|
||||||
if (gcdds.waitsets.size() == 0) {
|
if (gcdds.waitsets.size() == 0) {
|
||||||
if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(context->impl->ppant)) < 0) {
|
if ((gcdds.gc_for_empty_waitset = dds_create_guardcondition(DDS_CYCLONEDDS_HANDLE)) < 0) {
|
||||||
RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets");
|
RMW_SET_ERROR_MSG("failed to create guardcondition for handling empty waitsets");
|
||||||
goto fail_create_dummy;
|
goto fail_create_dummy;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue