diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp index d075c7b..a70ec05 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp @@ -113,6 +113,7 @@ class TypeSupport public: bool serializeROSmessage(const void * ros_message, cycser & ser, std::function prefix = nullptr); bool deserializeROSmessage(cycdeser & deser, void * ros_message, std::function prefix = nullptr); + std::string getName (); protected: TypeSupport(); diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp index e0ad3ed..298351a 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp @@ -656,6 +656,12 @@ bool TypeSupport::deserializeROSmessage( return true; } +template +std::string TypeSupport::getName () +{ + return name; +} + } // namespace rmw_cyclonedds_cpp #endif // RMW_CYCLONEDDS_CPP__TYPESUPPORT_IMPL_HPP_ diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 8f82ad5..7b7e3e8 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -92,11 +93,15 @@ const char * const eclipse_cyclonedds_serialization_format = "cdr"; improve over the default hash function */ struct dds_instance_handle_hash { public: - std::size_t operator () (dds_instance_handle_t const& x) const noexcept { + std::size_t operator() (dds_instance_handle_t const& x) const noexcept { return static_cast (x); } }; +bool operator< (dds_builtintopic_guid_t const& a, dds_builtintopic_guid_t const& b) { + return (memcmp (&a, &b, sizeof (dds_builtintopic_guid_t)) < 0); +} + struct CddsNode { dds_entity_t pp; rmw_guard_condition_t *graph_guard_condition; @@ -307,6 +312,12 @@ static void ggcallback (dds_entity_t rd, void *varg) (void) rmw_trigger_guard_condition (node_impl->graph_guard_condition); } +static std::string get_node_user_data (const char *node_name, const char *node_namespace) +{ + return (std::string ("ros2_name=") + std::string (node_name) + + std::string (";ros2_namespace=") + std::string (node_namespace)); +} + extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((unused)), const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options) { RET_NULL_X (name, return nullptr); @@ -315,8 +326,7 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((u (void) security_options; dds_qos_t *qos = dds_create_qos (); - std::string user_data = (std::string ("ros2_name=") + std::string (name) + - std::string (";ros2_namespace=") + std::string (namespace_)); + std::string user_data = get_node_user_data (name, namespace_); dds_qset_userdata (qos, user_data.c_str (), user_data.size ()); dds_entity_t pp = dds_create_participant (DDS_DOMAIN_DEFAULT, qos, nullptr); dds_delete_qos (qos); @@ -1520,68 +1530,91 @@ extern "C" rmw_ret_t rmw_destroy_service (rmw_node_t *node, rmw_service_t *servi /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_ret_t rmw_get_node_names (const rmw_node_t *node, rcutils_string_array_t *node_names, rcutils_string_array_t *node_namespaces) +static rmw_ret_t do_for_node (std::function oper) { dds_entity_t rd; - std::set< std::pair > ns; + if ((rd = dds_create_reader (ref_ppant (), DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { + unref_ppant (); + RMW_SET_ERROR_MSG ("rmw_get_node_names: failed to create reader"); + return RMW_RET_ERROR; + } + dds_sample_info_t info; + void *msg = NULL; + int32_t n; + bool cont = true; + while (cont && (n = dds_take (rd, &msg, &info, 1, 1)) == 1) { + if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { + auto sample = static_cast (msg); + cont = oper (*sample); + } + dds_return_loan (rd, &msg, n); + } + dds_delete (rd); + unref_ppant (); + if (n < 0) { + RMW_SET_ERROR_MSG ("rmw_get_node_names: error reading participants"); + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} +static rmw_ret_t do_for_node_user_data (std::function oper) +{ + auto f = [oper](const dds_builtintopic_participant_t& sample) -> bool { + void *ud; + size_t udsz; + if (dds_qget_userdata (sample.qos, &ud, &udsz)) { + /* CycloneDDS guarantees a null-terminated user data so we pretend it's a + string */ + bool ret = oper (sample, static_cast (ud)); + dds_free (ud); + return ret; + } else { + /* If no user data present treat it as an empty string */ + return oper (sample, ""); + } + }; + return do_for_node (f); +} + +extern "C" rmw_ret_t rmw_get_node_names (const rmw_node_t *node, rcutils_string_array_t *node_names, rcutils_string_array_t *node_namespaces) +{ RET_WRONG_IMPLID (node); if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK || rmw_check_zero_rmw_string_array(node_namespaces) != RMW_RET_OK) { return RMW_RET_ERROR; } - { - if ((rd = dds_create_reader (ref_ppant (), DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { - unref_ppant (); - RMW_SET_ERROR_MSG ("rmw_get_node_names: failed to create reader"); - return RMW_RET_ERROR; - } - dds_sample_info_t info; - void *msg = NULL; - int32_t n; - const auto re = std::regex ("^ros2_name=(.*);ros2_namespace=(.*)$"); - while ((n = dds_take (rd, &msg, &info, 1, 1)) == 1) { - if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { - auto sample = static_cast (msg); - void *ud; - size_t udsz; - if (dds_qget_userdata (sample->qos, &ud, &udsz)) { + std::set< std::pair > ns; + const auto re = std::regex ("^ros2_name=(.*);ros2_namespace=(.*)$"); + auto oper = [&ns, re](const dds_builtintopic_participant_t& sample __attribute__ ((unused)), const char *ud) -> bool { std::cmatch cm; - /* CycloneDDS guarantees a null-terminated user data, so no need to bother with the length */ - if (std::regex_match (static_cast (ud), cm, re, std::regex_constants::match_default)) { + if (std::regex_match (ud, cm, re)) { ns.insert (std::make_pair (std::string (cm[1]), std::string (cm[2]))); } - dds_free (ud); - } - } - dds_return_loan (rd, &msg, n); - } - dds_delete (rd); - unref_ppant (); - if (n < 0) { - RMW_SET_ERROR_MSG ("rmw_get_node_names: error reading participants"); - return RMW_RET_ERROR; - } + return true; + }; + rmw_ret_t ret; + if ((ret = do_for_node_user_data (oper)) != RMW_RET_OK) { + return ret; } - { - rcutils_allocator_t allocator = rcutils_get_default_allocator (); - if (rcutils_string_array_init (node_names, ns.size (), &allocator) != RCUTILS_RET_OK || - rcutils_string_array_init (node_namespaces, ns.size(), &allocator) != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG (rcutils_get_error_string ().str); + rcutils_allocator_t allocator = rcutils_get_default_allocator (); + if (rcutils_string_array_init (node_names, ns.size (), &allocator) != RCUTILS_RET_OK || + rcutils_string_array_init (node_namespaces, ns.size(), &allocator) != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG (rcutils_get_error_string ().str); + goto fail_alloc; + } + size_t i; + i = 0; + for (auto&& n : ns) { + node_names->data[i] = rcutils_strdup (n.first.c_str (), allocator); + node_namespaces->data[i] = rcutils_strdup (n.second.c_str (), allocator); + if (!node_names->data[i] || !node_namespaces->data[i]) { + RMW_SET_ERROR_MSG ("rmw_get_node_names for name/namespace"); goto fail_alloc; } - size_t i = 0; - for (auto&& n : ns) { - node_names->data[i] = rcutils_strdup (n.first.c_str (), allocator); - node_namespaces->data[i] = rcutils_strdup (n.second.c_str (), allocator); - if (!node_names->data[i] || !node_namespaces->data[i]) { - RMW_SET_ERROR_MSG ("rmw_get_node_names for name/namespace"); - goto fail_alloc; - } - i++; - } + i++; } return RMW_RET_OK; @@ -1605,7 +1638,7 @@ fail_alloc: return RMW_RET_BAD_ALLOC; } -static rmw_ret_t rmw_collect_tptyp_for_kind (std::map>& tt, dds_entity_t builtin_topic) +static rmw_ret_t rmw_collect_tptyp_for_kind (std::map>& tt, dds_entity_t builtin_topic, std::function filter_and_map) { assert (builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); dds_entity_t rd; @@ -1620,7 +1653,10 @@ static rmw_ret_t rmw_collect_tptyp_for_kind (std::map (msg); - tt[std::string (sample->topic_name)].insert (std::string (sample->type_name)); + std::string topic_name, type_name; + if (filter_and_map (*sample, topic_name, type_name)) { + tt[topic_name].insert (type_name); + } } dds_return_loan (rd, &msg, n); } @@ -1634,31 +1670,14 @@ static rmw_ret_t rmw_collect_tptyp_for_kind (std::map>& source, bool no_demangle, rcutils_allocator_t *allocator) { - RET_NULL (allocator); - RET_WRONG_IMPLID (node); - rmw_ret_t ret = rmw_names_and_types_check_zero (tptyp); - if (ret != RMW_RET_OK) { - return ret; - } - std::map> tt; - if ((ret = rmw_collect_tptyp_for_kind (tt, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION)) != RMW_RET_OK || - (ret = rmw_collect_tptyp_for_kind (tt, DDS_BUILTIN_TOPIC_DCPSPUBLICATION)) != RMW_RET_OK) { - return ret; - } - if (tt.size () == 0) { - return RMW_RET_OK; - } - - /* FIXME: demangling */ - (void) no_demangle; - - if ((ret = rmw_names_and_types_init (tptyp, tt.size (), allocator)) != RMW_RET_OK) { + rmw_ret_t ret; + if ((ret = rmw_names_and_types_init (tptyp, source.size (), allocator)) != RMW_RET_OK) { return ret; } size_t index = 0; - for (const auto& tp : tt) { + for (const auto& tp : source) { if ((tptyp->names.data[index] = rcutils_strdup (tp.first.c_str (), *allocator)) == NULL) { goto fail_mem; } @@ -1681,6 +1700,33 @@ fail_mem: return RMW_RET_BAD_ALLOC; } +extern "C" rmw_ret_t rmw_get_topic_names_and_types (const rmw_node_t *node, rcutils_allocator_t *allocator, bool no_demangle, rmw_names_and_types_t *tptyp) +{ + RET_NULL (allocator); + RET_WRONG_IMPLID (node); + rmw_ret_t ret = rmw_names_and_types_check_zero (tptyp); + if (ret != RMW_RET_OK) { + return ret; + } + const auto re = std::regex ("^" + std::string (ros_topic_prefix) + "/"); + const auto filter_and_map = + [re](const dds_builtintopic_endpoint_t& sample, std::string& topic_name, std::string& type_name) -> bool { + if (! std::regex_match (topic_name, re)) { + return false; + } else { + topic_name = std::string (sample.topic_name); + type_name = std::string (sample.type_name); + return true; + } + }; + std::map> tt; + if ((ret = rmw_collect_tptyp_for_kind (tt, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, filter_and_map)) != RMW_RET_OK || + (ret = rmw_collect_tptyp_for_kind (tt, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, filter_and_map)) != RMW_RET_OK) { + return ret; + } + return make_names_and_types (tptyp, tt, no_demangle, allocator); +} + extern "C" rmw_ret_t rmw_get_service_names_and_types (const rmw_node_t *node, rcutils_allocator_t *allocator, rmw_names_and_types_t *service_names_and_types) { #if 0 // NIY @@ -1825,96 +1871,99 @@ extern "C" rmw_ret_t rmw_service_server_is_available (const rmw_node_t *node, co return RMW_RET_OK; } +static rmw_ret_t rmw_count_pubs_or_subs (const rmw_node_t *node, dds_entity_t builtin_topic, const char *topic_name, size_t *count) +{ + assert (builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); + RET_NULL (topic_name); + RET_NULL (count); + RET_WRONG_IMPLID (node); + + std::string fqtopic_name = make_fqtopic (ros_topic_prefix, topic_name, "", false); + dds_entity_t rd; + if ((rd = dds_create_reader (ref_ppant (), builtin_topic, NULL, NULL)) < 0) { + unref_ppant (); + RMW_SET_ERROR_MSG ("rmw_count_pubs_or_subs failed to create reader"); + return RMW_RET_ERROR; + } + dds_sample_info_t info; + void *msg = NULL; + int32_t n; + *count = 0; + while ((n = dds_take (rd, &msg, &info, 1, 1)) == 1) { + if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { + auto sample = static_cast (msg); + if (fqtopic_name == std::string (sample->topic_name)) { + (*count)++; + } + } + dds_return_loan (rd, &msg, n); + } + dds_delete (rd); + unref_ppant (); + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_count_publishers (const rmw_node_t *node, const char *topic_name, size_t *count) { -#if 0 - // safechecks - - if (!node) { - RMW_SET_ERROR_MSG ("null node handle"); - return RMW_RET_ERROR; - } - // Get participant pointer from node - if (node->implementation_identifier != eclipse_cyclonedds_identifier) { - RMW_SET_ERROR_MSG ("node handle not from this implementation"); - return RMW_RET_ERROR; - } - - auto impl = static_cast (node->data); - - WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock (); - *count = 0; - for (auto it : slave_target->topicNtypes) { - auto topic_fqdn = _demangle_if_ros_topic (it.first); - if (topic_fqdn == topic_name) { - *count += it.second.size (); - } - } - slave_target->mapmutex.unlock (); - - RCUTILS_LOG_DEBUG_NAMED ( - "rmw_fastrtps_cpp", - "looking for subscriber topic: %s, number of matches: %zu", - topic_name, *count); - - return RMW_RET_OK; -#else - (void) node; (void) topic_name; (void) count; - return RMW_RET_TIMEOUT; -#endif + return rmw_count_pubs_or_subs (node, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, topic_name, count); } extern "C" rmw_ret_t rmw_count_subscribers (const rmw_node_t *node, const char *topic_name, size_t *count) { -#if 0 - // safechecks - - if (!node) { - RMW_SET_ERROR_MSG ("null node handle"); - return RMW_RET_ERROR; - } - // Get participant pointer from node - if (node->implementation_identifier != eclipse_cyclonedds_identifier) { - RMW_SET_ERROR_MSG ("node handle not from this implementation"); - return RMW_RET_ERROR; - } - - CustomParticipantInfo * impl = static_cast (node->data); - - ReaderInfo * slave_target = impl->secondarySubListener; - *count = 0; - slave_target->mapmutex.lock (); - for (auto it : slave_target->topicNtypes) { - auto topic_fqdn = _demangle_if_ros_topic (it.first); - if (topic_fqdn == topic_name) { - *count += it.second.size (); - } - } - slave_target->mapmutex.unlock (); - - RCUTILS_LOG_DEBUG_NAMED ( - "rmw_fastrtps_cpp", - "looking for subscriber topic: %s, number of matches: %zu", - topic_name, *count); - - return RMW_RET_OK; -#else - (void) node; (void) topic_name; (void) count; - return RMW_RET_TIMEOUT; -#endif + return rmw_count_pubs_or_subs (node, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, topic_name, count); } -extern "C" rmw_ret_t rmw_get_subscriber_names_and_types_by_node (const rmw_node_t *node, rcutils_allocator_t *allocator, const char *node_name, const char *node_namespace, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) +static rmw_ret_t get_node_guids (const char *node_name, const char *node_namespace, std::set& guids) { - (void) node; (void) allocator; (void) node_name; (void) node_namespace; (void) no_demangle; (void) topic_names_and_types; - return RMW_RET_TIMEOUT; + std::string needle = get_node_user_data (node_name, node_namespace); + auto oper = [&guids, needle](const dds_builtintopic_participant_t& sample, const char *ud) -> bool { + if (std::string (ud) == needle) { + guids.insert (sample.key); + } + return true; /* do keep looking - what if there are many? */ + }; + return do_for_node_user_data (oper); } -extern "C" rmw_ret_t rmw_get_publisher_names_and_types_by_node (const rmw_node_t *node, rcutils_allocator_t *allocator, const char *node_name, const char *node_namespace, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) +static rmw_ret_t rmw_get_endpoint_names_and_types_by_node (const rmw_node_t *node, rcutils_allocator_t *allocator, const char *node_name, const char *node_namespace, bool no_demangle, rmw_names_and_types_t *tptyp, dds_entity_t builtin_topic) { - (void) node; (void) allocator; (void) node_name; (void) node_namespace; (void) no_demangle; (void) topic_names_and_types; - return RMW_RET_TIMEOUT; + assert (builtin_topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION || builtin_topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION); + + RET_WRONG_IMPLID (node); + RET_NULL (allocator); + rmw_ret_t ret = rmw_names_and_types_check_zero (tptyp); + if (ret != RMW_RET_OK) { + return ret; + } + std::set guids; + if ((ret = get_node_guids (node_name, node_namespace, guids)) != RMW_RET_OK) { + return ret; + } + const auto filter_and_map = + [guids](const dds_builtintopic_endpoint_t& sample, std::string& topic_name, std::string& type_name) -> bool { + if (guids.count (sample.participant_key) == 0) { + return false; + } else { + topic_name = std::string (sample.topic_name); + type_name = std::string (sample.type_name); + return true; + } + }; + std::map> tt; + if ((ret = rmw_collect_tptyp_for_kind (tt, builtin_topic, filter_and_map)) != RMW_RET_OK) { + return ret; + } + return make_names_and_types (tptyp, tt, no_demangle, allocator); +} + +extern "C" rmw_ret_t rmw_get_subscriber_names_and_types_by_node (const rmw_node_t *node, rcutils_allocator_t *allocator, const char *node_name, const char *node_namespace, bool no_demangle, rmw_names_and_types_t *tptyp) +{ + return rmw_get_endpoint_names_and_types_by_node (node, allocator, node_name, node_namespace, no_demangle, tptyp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION); +} + +extern "C" rmw_ret_t rmw_get_publisher_names_and_types_by_node (const rmw_node_t *node, rcutils_allocator_t *allocator, const char *node_name, const char *node_namespace, bool no_demangle, rmw_names_and_types_t *tptyp) +{ + return rmw_get_endpoint_names_and_types_by_node (node, allocator, node_name, node_namespace, no_demangle, tptyp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION); } extern "C" rmw_ret_t rmw_get_service_names_and_types_by_node (const rmw_node_t *node, rcutils_allocator_t *allocator, const char *node_name, const char *node_namespace, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) diff --git a/rmw_cyclonedds_cpp/src/serdata.cpp b/rmw_cyclonedds_cpp/src/serdata.cpp index 7ed81bc..f6f2019 100644 --- a/rmw_cyclonedds_cpp/src/serdata.cpp +++ b/rmw_cyclonedds_cpp/src/serdata.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. #include +#include +#include #include "rmw/error_handling.h" @@ -299,11 +301,46 @@ static const struct ddsi_sertopic_ops sertopic_rmw_ops = { sertopic_rmw_free_samples }; +template +ROSIDL_TYPESUPPORT_INTROSPECTION_CPP_LOCAL +inline std::string create_type_name (const void * untyped_members) +{ + auto members = static_cast (untyped_members); + if (!members) { + RMW_SET_ERROR_MSG("members handle is null"); + return ""; + } + + std::ostringstream ss; + std::string message_namespace (members->message_namespace_); + // Find and replace C namespace separator with C++, in case this is using C typesupport + message_namespace = std::regex_replace (message_namespace, std::regex("__"), "::"); + std::string message_name (members->message_name_); + if (!message_namespace.empty ()) { + ss << message_namespace << "::"; + } + ss << "dds_::" << message_name << "_"; + return ss.str (); +} + +static std::string get_type_name (const char *type_support_identifier, void *type_support) +{ + if (using_introspection_c_typesupport (type_support_identifier)) { + auto typed_typesupport = static_cast (type_support); + return typed_typesupport->getName (); + } else if (using_introspection_cpp_typesupport (type_support_identifier)) { + auto typed_typesupport = static_cast (type_support); + return typed_typesupport->getName (); + } else { + return "absent"; + } +} + struct sertopic_rmw *create_sertopic (const char *topicname, const char *type_support_identifier, void *type_support, bool is_request_header) { struct sertopic_rmw *st = new struct sertopic_rmw; st->cpp_name = std::string (topicname); - st->cpp_type_name = std::string ("absent"); // FIXME: obviously a hack + st->cpp_type_name = get_type_name (type_support_identifier, type_support); st->cpp_name_type_name = st->cpp_name + std::string (";") + std::string (st->cpp_type_name); st->ops = &sertopic_rmw_ops; st->serdata_ops = &serdata_rmw_ops;