Implementation for rmw_get_pub/sub_info_by_topic (#97)

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
Dennis Potman 2020-02-10 18:59:04 +01:00 committed by GitHub
parent 20007b4dc2
commit d70fad6c46
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 394 additions and 147 deletions

View file

@ -50,7 +50,6 @@ ament_export_dependencies(rosidl_typesupport_introspection_cpp)
add_library(rmw_cyclonedds_cpp add_library(rmw_cyclonedds_cpp
src/rmw_node.cpp src/rmw_node.cpp
src/rmw_get_topic_endpoint_info.cpp
src/serdata.cpp src/serdata.cpp
src/serdes.cpp src/serdes.cpp
src/graphrhc.cpp src/graphrhc.cpp

View file

@ -1,48 +0,0 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "rmw_cyclonedds_cpp/rmw_version_test.hpp"
#if RMW_VERSION_GTE(0, 8, 2)
#include "rmw/error_handling.h"
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/topic_endpoint_info_array.h"
extern "C"
{
rmw_ret_t
rmw_get_publishers_info_by_topic(
const rmw_node_t * /* unused_param */,
rcutils_allocator_t * /* unused_param */,
const char * /* unused_param */,
bool /* unused_param */,
rmw_topic_endpoint_info_array_t * /* unused_param */)
{
return RMW_RET_UNSUPPORTED;
}
rmw_ret_t
rmw_get_subscriptions_info_by_topic(
const rmw_node_t * /* unused_param */,
rcutils_allocator_t * /* unused_param */,
const char * /* unused_param */,
bool /* unused_param */,
rmw_topic_endpoint_info_array_t * /* unused_param */)
{
return RMW_RET_UNSUPPORTED;
}
} // extern "C"
#endif

View file

@ -51,6 +51,11 @@
#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp"
#include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" #include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp"
#if RMW_VERSION_GTE(0, 8, 2)
#include "rmw/get_topic_endpoint_info.h"
#include "rmw/topic_endpoint_info_array.h"
#endif
#include "namespace_prefix.hpp" #include "namespace_prefix.hpp"
#include "dds/dds.h" #include "dds/dds.h"
@ -1110,20 +1115,16 @@ static dds_qos_t * create_readwrite_qos(
return qos; return qos;
} }
static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_policies) static bool dds_qos_to_rmw_qos(const dds_qos_t * dds_qos, rmw_qos_profile_t * qos_policies)
{ {
dds_qos_t * qos = dds_create_qos(); assert(dds_qos);
if (dds_get_qos(handle, qos) < 0) { assert(qos_policies);
RMW_SET_ERROR_MSG("get_readwrite_qos: invalid handle");
goto error;
}
{ {
dds_history_kind_t kind; dds_history_kind_t kind;
int32_t depth; int32_t depth;
if (!dds_qget_history(qos, &kind, &depth)) { if (!dds_qget_history(dds_qos, &kind, &depth)) {
RMW_SET_ERROR_MSG("get_readwrite_qos: history not set"); RMW_SET_ERROR_MSG("get_readwrite_qos: history not set");
goto error; return false;
} }
switch (kind) { switch (kind) {
case DDS_HISTORY_KEEP_LAST: case DDS_HISTORY_KEEP_LAST:
@ -1142,9 +1143,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
{ {
dds_reliability_kind_t kind; dds_reliability_kind_t kind;
dds_duration_t max_blocking_time; dds_duration_t max_blocking_time;
if (!dds_qget_reliability(qos, &kind, &max_blocking_time)) { if (!dds_qget_reliability(dds_qos, &kind, &max_blocking_time)) {
RMW_SET_ERROR_MSG("get_readwrite_qos: history not set"); RMW_SET_ERROR_MSG("get_readwrite_qos: history not set");
goto error; return false;
} }
switch (kind) { switch (kind) {
case DDS_RELIABILITY_BEST_EFFORT: case DDS_RELIABILITY_BEST_EFFORT:
@ -1160,9 +1161,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
{ {
dds_durability_kind_t kind; dds_durability_kind_t kind;
if (!dds_qget_durability(qos, &kind)) { if (!dds_qget_durability(dds_qos, &kind)) {
RMW_SET_ERROR_MSG("get_readwrite_qos: durability not set"); RMW_SET_ERROR_MSG("get_readwrite_qos: durability not set");
goto error; return false;
} }
switch (kind) { switch (kind) {
case DDS_DURABILITY_VOLATILE: case DDS_DURABILITY_VOLATILE:
@ -1182,9 +1183,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
{ {
dds_duration_t deadline; dds_duration_t deadline;
if (!dds_qget_deadline(qos, &deadline)) { if (!dds_qget_deadline(dds_qos, &deadline)) {
RMW_SET_ERROR_MSG("get_readwrite_qos: deadline not set"); RMW_SET_ERROR_MSG("get_readwrite_qos: deadline not set");
goto error; return false;
} }
qos_policies->deadline.sec = (uint64_t) deadline / 1000000000; qos_policies->deadline.sec = (uint64_t) deadline / 1000000000;
qos_policies->deadline.nsec = (uint64_t) deadline % 1000000000; qos_policies->deadline.nsec = (uint64_t) deadline % 1000000000;
@ -1192,7 +1193,7 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
{ {
dds_duration_t lifespan; dds_duration_t lifespan;
if (!dds_qget_lifespan(qos, &lifespan)) { if (!dds_qget_lifespan(dds_qos, &lifespan)) {
lifespan = DDS_INFINITY; lifespan = DDS_INFINITY;
} }
qos_policies->lifespan.sec = (uint64_t) lifespan / 1000000000; qos_policies->lifespan.sec = (uint64_t) lifespan / 1000000000;
@ -1202,9 +1203,9 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
{ {
dds_liveliness_kind_t kind; dds_liveliness_kind_t kind;
dds_duration_t lease_duration; dds_duration_t lease_duration;
if (!dds_qget_liveliness(qos, &kind, &lease_duration)) { if (!dds_qget_liveliness(dds_qos, &kind, &lease_duration)) {
RMW_SET_ERROR_MSG("get_readwrite_qos: liveliness not set"); RMW_SET_ERROR_MSG("get_readwrite_qos: liveliness not set");
goto error; return false;
} }
switch (kind) { switch (kind) {
case DDS_LIVELINESS_AUTOMATIC: case DDS_LIVELINESS_AUTOMATIC:
@ -1223,11 +1224,20 @@ static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * qos_polic
qos_policies->liveliness_lease_duration.nsec = (uint64_t) lease_duration % 1000000000; qos_policies->liveliness_lease_duration.nsec = (uint64_t) lease_duration % 1000000000;
} }
dds_delete_qos(qos);
return true; return true;
error: }
static bool get_readwrite_qos(dds_entity_t handle, rmw_qos_profile_t * rmw_qos_policies)
{
dds_qos_t * qos = dds_create_qos();
dds_return_t ret = false;
if (dds_get_qos(handle, qos) < 0) {
RMW_SET_ERROR_MSG("get_readwrite_qos: invalid handle");
} else {
ret = dds_qos_to_rmw_qos(qos, rmw_qos_policies);
}
dds_delete_qos(qos); dds_delete_qos(qos);
return false; return ret;
} }
static CddsPublisher * create_cdds_publisher( static CddsPublisher * create_cdds_publisher(
@ -2691,8 +2701,69 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t * node, rmw_service_t * serv
/////////// /////////// /////////// ///////////
///////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////
enum topic_kind
{
DEFAULT,
REQUEST_RESPONSE
};
static bool demangle_topic_name(
const char * topic_name,
topic_kind topic_kind,
std::string & demangled_topic_name,
bool * is_request)
{
std::string re;
bool is_def_kind = topic_kind == topic_kind::DEFAULT;
if (is_def_kind) {
re = "^" + std::string(ROS_TOPIC_PREFIX) + "(/.*)";
} else {
re = "^(" + std::string(ROS_SERVICE_REQUESTER_PREFIX) + "|" +
std::string(ROS_SERVICE_RESPONSE_PREFIX) + ")(/.*)(Request|Reply)$";
}
const auto re_tp = std::regex(re, std::regex::extended);
std::cmatch cm_tp;
if (std::regex_search(topic_name, cm_tp, re_tp)) {
demangled_topic_name = std::string(cm_tp[is_def_kind ? 1 : 2]);
if (is_request) {
assert(!is_def_kind);
*is_request = (std::string(cm_tp[3]) == "Request");
}
return true;
}
return false;
}
static bool demangle_topic_name(
const char * topic_name,
topic_kind topic_kind,
std::string & demangled_topic_name)
{
return demangle_topic_name(topic_name, topic_kind, demangled_topic_name, NULL);
}
static bool demangle_topic_type(
const char * type_name,
topic_kind topic_kind,
std::string & demangled_type_name)
{
bool is_def_kind = topic_kind == topic_kind::DEFAULT;
const auto re_typ = std::regex("^(.*::)dds_::(.*)_" +
(is_def_kind ? std::string() : std::string("(Response|Request)_")) +
std::string("$"),
std::regex::extended);
std::cmatch cm_typ;
if (std::regex_search(type_name, cm_typ, re_typ)) {
demangled_type_name =
std::regex_replace(std::string(cm_typ[1]), std::regex("::"), "/") +
std::string(cm_typ[2]);
return true;
}
return false;
}
static rmw_ret_t do_for_node( static rmw_ret_t do_for_node(
CddsNode * node_impl, const CddsNode * node_impl,
std::function<bool(const dds_builtintopic_participant_t & sample)> oper) std::function<bool(const dds_builtintopic_participant_t & sample)> oper)
{ {
dds_entity_t rd; dds_entity_t rd;
@ -2723,7 +2794,7 @@ static rmw_ret_t do_for_node(
} }
static rmw_ret_t do_for_node_user_data( static rmw_ret_t do_for_node_user_data(
CddsNode * node_impl, const CddsNode * node_impl,
std::function<bool(const dds_builtintopic_participant_t & sample, const char * user_data)> oper) std::function<bool(const dds_builtintopic_participant_t & sample, const char * user_data)> oper)
{ {
auto f = [oper](const dds_builtintopic_participant_t & sample) -> bool { auto f = [oper](const dds_builtintopic_participant_t & sample) -> bool {
@ -2812,19 +2883,17 @@ fail_alloc:
return RMW_RET_BAD_ALLOC; return RMW_RET_BAD_ALLOC;
} }
static rmw_ret_t rmw_collect_tptyp_for_kind( static rmw_ret_t rmw_collect_data_for_endpoint(
std::map<std::string, std::set<std::string>> & tt,
CddsNode * node_impl, CddsNode * node_impl,
dds_entity_t builtin_topic, dds_entity_t builtin_topic,
std::function<bool(const dds_builtintopic_endpoint_t & sample, std::string & topic_name, std::function<void(const dds_builtintopic_endpoint_t & sample)> filter_and_map)
std::string & type_name)> filter_and_map)
{ {
assert( assert(
builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION ||
builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION);
dds_entity_t rd; dds_entity_t rd;
if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) { if ((rd = dds_create_reader(node_impl->enth, builtin_topic, NULL, NULL)) < 0) {
RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind failed to create reader"); RMW_SET_ERROR_MSG("rmw_collect_data_for_endpoint failed to create reader");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
dds_sample_info_t info; dds_sample_info_t info;
@ -2833,10 +2902,7 @@ static rmw_ret_t rmw_collect_tptyp_for_kind(
while ((n = dds_take(rd, &msg, &info, 1, 1)) == 1) { while ((n = dds_take(rd, &msg, &info, 1, 1)) == 1) {
if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { if (info.valid_data && info.instance_state == DDS_IST_ALIVE) {
auto sample = static_cast<const dds_builtintopic_endpoint_t *>(msg); auto sample = static_cast<const dds_builtintopic_endpoint_t *>(msg);
std::string topic_name, type_name; filter_and_map(*sample);
if (filter_and_map(*sample, topic_name, type_name)) {
tt[topic_name].insert(type_name);
}
} }
dds_return_loan(rd, &msg, n); dds_return_loan(rd, &msg, n);
} }
@ -2844,11 +2910,237 @@ static rmw_ret_t rmw_collect_tptyp_for_kind(
if (n == 0) { if (n == 0) {
return RMW_RET_OK; return RMW_RET_OK;
} else { } else {
RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind dds_take failed"); RMW_SET_ERROR_MSG("rmw_collect_data_for_endpoint dds_take failed");
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
} }
struct endpoint_info_t
{
std::string topic_type;
dds_builtintopic_guid_t endpoint_key;
dds_builtintopic_guid_t node_key;
dds_instance_handle_t node_insth;
rmw_qos_profile_t qos_profile;
};
typedef std::map<dds_builtintopic_guid_t, std::pair<std::string, std::string>> node_cache_t;
static void get_node_name(
dds_entity_t ppant_rd,
const dds_builtintopic_guid_t & pp_guid,
const dds_instance_handle_t pp_insth,
std::string & node_name,
std::string & node_ns)
{
static_cast<void>(pp_guid); // only used in assert()
bool node_found = false;
const auto re_ud = std::regex("^name=(.*);namespace=(.*);$", std::regex::extended);
size_t udsz;
dds_sample_info_t info;
void * msg = NULL;
int32_t n;
if ((n = dds_read_instance(ppant_rd, &msg, &info, 1, 1, pp_insth)) == 1) {
void * ud;
auto sample = static_cast<const dds_builtintopic_participant_t *>(msg);
assert(!memcmp(&sample->key, &pp_guid, sizeof(dds_builtintopic_guid_t)));
if (dds_qget_userdata(sample->qos, &ud, &udsz) && ud != nullptr) {
std::cmatch cm;
if (std::regex_search(reinterpret_cast<const char *>(ud), cm, re_ud)) {
node_found = true;
node_name = std::string(cm[1]);
node_ns = std::string(cm[2]);
}
}
dds_return_loan(ppant_rd, &msg, n);
}
if (!node_found) {
node_name = "_NODE_NAME_UNKNOWN_";
node_ns = "_NODE_NAMESPACE_UNKNOWN_";
}
}
static rmw_ret_t
set_rmw_topic_endpoint_info(
node_cache_t & node_cache,
dds_entity_t ppant_rd,
rcutils_allocator_t * allocator,
const endpoint_info_t & epinfo,
bool is_publisher,
rmw_topic_endpoint_info_t & topic_endpoint_info)
{
static_assert(
sizeof(dds_builtintopic_guid_t) <= RMW_GID_STORAGE_SIZE,
"RMW_GID_STORAGE_SIZE insufficient to store the rmw_cyclonedds_cpp GID implementation."
);
rmw_ret_t ret;
std::string node_name;
std::string node_ns;
const auto it = node_cache.find(epinfo.node_key);
if (it != node_cache.end()) {
node_name = it->second.first;
node_ns = it->second.second;
} else {
get_node_name(ppant_rd, epinfo.node_key, epinfo.node_insth, node_name, node_ns);
node_cache[epinfo.node_key] = std::make_pair(node_name, node_ns);
}
if ((ret = rmw_topic_endpoint_info_set_node_name(
&topic_endpoint_info,
node_name.c_str(),
allocator)) != RMW_RET_OK)
{
return ret;
}
if ((ret = rmw_topic_endpoint_info_set_node_namespace(
&topic_endpoint_info,
node_ns.c_str(),
allocator)) != RMW_RET_OK)
{
return ret;
}
if ((ret = rmw_topic_endpoint_info_set_topic_type(
&topic_endpoint_info,
epinfo.topic_type.c_str(),
allocator)) != RMW_RET_OK)
{
return ret;
}
if ((ret = rmw_topic_endpoint_info_set_endpoint_type(
&topic_endpoint_info,
is_publisher ? RMW_ENDPOINT_PUBLISHER : RMW_ENDPOINT_SUBSCRIPTION)) != RMW_RET_OK)
{
return ret;
}
if ((ret = rmw_topic_endpoint_info_set_gid(
&topic_endpoint_info,
epinfo.endpoint_key.v,
sizeof(dds_builtintopic_guid_t))) != RMW_RET_OK)
{
return ret;
}
if ((ret = rmw_topic_endpoint_info_set_qos_profile(
&topic_endpoint_info,
&epinfo.qos_profile)) != RMW_RET_OK)
{
return ret;
}
return RMW_RET_OK;
}
static void handle_topic_endpoint_info_array_fini(
rmw_topic_endpoint_info_array_t * topic_endpoint_info_array,
rcutils_allocator_t * allocator)
{
std::string err_msg;
rmw_ret_t ret;
bool err_set = rmw_error_is_set();
if (err_set) {
err_msg = rmw_get_error_string().str;
}
rmw_reset_error();
if ((ret =
rmw_topic_endpoint_info_array_fini(topic_endpoint_info_array, allocator)) != RMW_RET_OK)
{
RCUTILS_LOG_ERROR_NAMED(
"rmw_cyclonedds_cpp",
"handle_topic_endpoint_info_array_fini failed: %s",
rmw_get_error_string().str);
rmw_reset_error();
}
if (err_set) {
RMW_SET_ERROR_MSG(err_msg.c_str());
}
}
static rmw_ret_t
get_endpoint_info_by_topic(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * topic_name,
bool no_mangle,
bool is_publisher,
rmw_topic_endpoint_info_array_t * endpoint_info)
{
auto node_impl = static_cast<CddsNode *>(node->data);
const std::string fqtopic_name = make_fqtopic(ROS_TOPIC_PREFIX, topic_name, "", no_mangle);
std::vector<endpoint_info_t> endpoint_info_vector;
rmw_ret_t ret;
const auto filter_and_map =
[fqtopic_name, no_mangle, &endpoint_info_vector](
const dds_builtintopic_endpoint_t & sample) -> void
{
endpoint_info_t info;
if (std::string(sample.topic_name) == fqtopic_name) {
info.endpoint_key = sample.key;
dds_qos_to_rmw_qos(sample.qos, &info.qos_profile);
if (no_mangle) {
info.topic_type = std::string(sample.type_name);
} else if (!demangle_topic_type(sample.type_name, topic_kind::DEFAULT, info.topic_type)) {
// skip a non-ROS2 endpoint
return;
}
info.node_key = sample.participant_key;
info.node_insth = sample.participant_instance_handle;
endpoint_info_vector.push_back(info);
}
};
if ((ret = rmw_collect_data_for_endpoint(
node_impl,
is_publisher ? DDS_BUILTIN_TOPIC_DCPSPUBLICATION : DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION,
filter_and_map)) != RMW_RET_OK)
{
return ret;
}
size_t epi_count = endpoint_info_vector.size();
if ((ret = rmw_topic_endpoint_info_array_init_with_size(
endpoint_info,
epi_count,
allocator)) != RMW_RET_OK)
{
rmw_error_string_t err_msg = rmw_get_error_string();
rmw_reset_error();
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"rmw_topic_endpoint_info_array_init_with_size failed to allocate memory: %s",
err_msg.str);
return ret;
}
endpoint_info->count = 0;
node_cache_t node_cache;
dds_entity_t ppant_rd;
if (
(ppant_rd = dds_create_reader(
node_impl->enth, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0)
{
RMW_SET_ERROR_MSG("get_endpoint_info_by_topic: failed to create reader");
handle_topic_endpoint_info_array_fini(endpoint_info, allocator);
return RMW_RET_ERROR;
}
for (const auto & epi : endpoint_info_vector) {
endpoint_info->info_array[endpoint_info->count] =
rmw_get_zero_initialized_topic_endpoint_info();
if ((ret = set_rmw_topic_endpoint_info(
node_cache,
ppant_rd,
allocator,
epi,
is_publisher,
endpoint_info->info_array[endpoint_info->count])) != RMW_RET_OK)
{
endpoint_info->count++;
handle_topic_endpoint_info_array_fini(endpoint_info, allocator);
dds_delete(ppant_rd);
return ret;
}
endpoint_info->count++;
}
assert(endpoint_info->count == epi_count);
dds_delete(ppant_rd);
return RMW_RET_OK;
}
static rmw_ret_t make_names_and_types( static rmw_ret_t make_names_and_types(
rmw_names_and_types_t * tptyp, const std::map<std::string, rmw_names_and_types_t * tptyp, const std::map<std::string,
std::set<std::string>> & source, std::set<std::string>> & source,
@ -2887,7 +3179,7 @@ static rmw_ret_t make_names_and_types(
fail_mem: fail_mem:
if (rmw_names_and_types_fini(tptyp) != RMW_RET_OK) { if (rmw_names_and_types_fini(tptyp) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("rmw_collect_tptyp_for_kind: rmw_names_and_types_fini failed"); RMW_SET_ERROR_MSG("make_names_and_types: rmw_names_and_types_fini failed");
} }
return RMW_RET_BAD_ALLOC; return RMW_RET_BAD_ALLOC;
} }
@ -2953,46 +3245,32 @@ static rmw_ret_t get_endpoint_names_and_types_by_node(
{ {
return ret; return ret;
} }
const auto re_tp = std::map<std::string, std::set<std::string>> tt;
std::regex("^" + std::string(ROS_TOPIC_PREFIX) + "(/.*)", std::regex::extended);
const auto re_typ = std::regex("^(.*::)dds_::(.*)_$", std::regex::extended);
const auto filter_and_map = const auto filter_and_map =
[re_tp, re_typ, guids, node_name, no_demangle](const dds_builtintopic_endpoint_t & sample, [&tt, guids, node_name, no_demangle](
std::string & topic_name, std::string & type_name) -> bool { const dds_builtintopic_endpoint_t & sample) -> void
std::cmatch cm_tp, cm_typ; {
if (node_name != nullptr && guids.count(sample.participant_key) == 0) { std::string demangled_topic_name, demangled_type_name;
return false; if (
} else { (node_name == nullptr || guids.count(sample.participant_key) != 0) &&
if ( demangle_topic_name(sample.topic_name, topic_kind::DEFAULT, demangled_topic_name) &&
!std::regex_search(sample.topic_name, cm_tp, re_tp) || demangle_topic_type(sample.type_name, topic_kind::DEFAULT, demangled_type_name))
!std::regex_search(sample.type_name, cm_typ, re_typ)) {
{ tt[demangled_topic_name].insert(
return false; no_demangle ? std::string(sample.type_name) : demangled_type_name);
} else {
topic_name = std::string(cm_tp[1]);
if (no_demangle) {
type_name = std::string(type_name);
} else {
std::string demangled_type = std::regex_replace(
std::string(cm_typ[1]), std::regex("::"), "/");
type_name = std::string(demangled_type) + std::string(cm_typ[2]);
}
return true;
}
} }
}; };
std::map<std::string, std::set<std::string>> tt;
if (subs && if (subs &&
(ret = (ret =
rmw_collect_tptyp_for_kind( rmw_collect_data_for_endpoint(
tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK) node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK)
{ {
return ret; return ret;
} }
if (pubs && if (pubs &&
(ret = (ret =
rmw_collect_tptyp_for_kind( rmw_collect_data_for_endpoint(
tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK)
{ {
return ret; return ret;
} }
@ -3045,45 +3323,26 @@ static rmw_ret_t get_cs_names_and_types_by_node(
{ {
return ret; return ret;
} }
const auto re_tp = std::regex( std::map<std::string, std::set<std::string>> tt;
"^(" + std::string(ROS_SERVICE_REQUESTER_PREFIX) + "|" + const auto filter_and_map = [&tt, guids, node_name, looking_for_services](
std::string(ROS_SERVICE_RESPONSE_PREFIX) + ")(/.*)(Request|Reply)$", const dds_builtintopic_endpoint_t & sample) -> void {
std::regex::extended); std::string topic_name, type_name;
const auto re_typ = std::regex("^(.*::)dds_::(.*)_(Response|Request)_$", std::regex::extended); bool is_request;
const auto filter_and_map = [re_tp, re_typ, guids, node_name, looking_for_services]( if ((node_name == nullptr || guids.count(sample.participant_key) != 0) &&
const dds_builtintopic_endpoint_t & sample, demangle_topic_name(
std::string & topic_name, std::string & type_name) -> bool { sample.topic_name, topic_kind::REQUEST_RESPONSE, topic_name, &is_request) &&
std::cmatch cm_tp, cm_typ; demangle_topic_type(sample.type_name, topic_kind::REQUEST_RESPONSE, type_name) &&
if (node_name != nullptr && guids.count(sample.participant_key) == 0) { looking_for_services == endpoint_is_from_service(is_request, sample.key))
return false;
}
if (
!std::regex_search(sample.topic_name, cm_tp, re_tp) ||
!std::regex_search(sample.type_name, cm_typ, re_typ))
{ {
return false; tt[topic_name].insert(type_name);
} else {
if (
looking_for_services !=
endpoint_is_from_service(std::string(cm_tp[3]) == "Request", sample.key))
{
return false;
} else {
std::string demangled_type =
std::regex_replace(std::string(cm_typ[1]), std::regex("::"), "/");
topic_name = std::string(cm_tp[2]);
type_name = std::string(demangled_type) + std::string(cm_typ[2]);
return true;
}
} }
}; };
std::map<std::string, std::set<std::string>> tt;
if ((ret = if ((ret =
rmw_collect_tptyp_for_kind( rmw_collect_data_for_endpoint(
tt, node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK || node_impl, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK ||
(ret = (ret =
rmw_collect_tptyp_for_kind( rmw_collect_data_for_endpoint(
tt, node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) node_impl, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK)
{ {
return ret; return ret;
} }
@ -3220,3 +3479,40 @@ extern "C" rmw_ret_t rmw_get_client_names_and_types_by_node(
{ {
return get_cs_names_and_types_by_node(node, allocator, node_name, node_namespace, sntyp, false); return get_cs_names_and_types_by_node(node, allocator, node_name, node_namespace, sntyp, false);
} }
#if RMW_VERSION_GTE(0, 8, 2)
extern "C" rmw_ret_t rmw_get_publishers_info_by_topic(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * topic_name,
bool no_mangle,
rmw_topic_endpoint_info_array_t * publishers_info)
{
return get_endpoint_info_by_topic(
node,
allocator,
topic_name,
no_mangle,
true,
publishers_info);
}
extern "C" rmw_ret_t rmw_get_subscriptions_info_by_topic(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * topic_name,
bool no_mangle,
rmw_topic_endpoint_info_array_t * subscriptions_info)
{
return get_endpoint_info_by_topic(
node,
allocator,
topic_name,
no_mangle,
false,
subscriptions_info);
}
#endif