From 24726b4685ccbcf2900a8a10112ac22a5937c916 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Sun, 12 May 2019 13:52:26 +0200 Subject: [PATCH] update to match ROS2 Dashing interface passes a decent subset of the tests ... fixes: * sequences of simple types: remove accidental alignment * trigger graph guard on any built-in topic * create a participant for each node, with node name/namespace in user data It is still only a proof-of-concept, but it might now actually be usable ... --- .../MessageTypeSupport_impl.hpp | 15 +- .../ServiceTypeSupport_impl.hpp | 29 +- .../include/rmw_cyclonedds_cpp/serdes.hpp | 26 +- rmw_cyclonedds_cpp/src/rmw_node.cpp | 325 +++++++++++++++--- 4 files changed, 328 insertions(+), 67 deletions(-) diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport_impl.hpp index 5eaed34..ef312dd 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport_impl.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/MessageTypeSupport_impl.hpp @@ -17,6 +17,8 @@ #include #include +#include +#include #include #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" @@ -31,9 +33,16 @@ MessageTypeSupport::MessageTypeSupport(const MembersType * members) assert(members); this->members_ = members; - std::string name = std::string(members->package_name_) + "::msg::dds_::" + - members->message_name_ + "_"; - this->setName(name.c_str()); + std::ostringstream ss; + std::string message_namespace(this->members_->message_namespace_); + std::string message_name(this->members_->message_name_); + if (!message_namespace.empty()) { + // Find and replace C namespace separator with C++, in case this is using C typesupport + message_namespace = std::regex_replace(message_namespace, std::regex("__"), "::"); + ss << message_namespace << "::"; + } + ss << "dds_::" << message_name << "_"; + this->setName(ss.str().c_str()); } } // namespace rmw_cyclonedds_cpp diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport_impl.hpp index e3b341a..3da1ca7 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport_impl.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/ServiceTypeSupport_impl.hpp @@ -16,6 +16,8 @@ #define RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_IMPL_HPP_ #include +#include +#include #include #include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" @@ -36,9 +38,16 @@ RequestTypeSupport::RequestTypeSupport( assert(members); this->members_ = members->request_members_; - std::string name = std::string(members->package_name_) + "::srv::dds_::" + - members->service_name_ + "_Request_"; - this->setName(name.c_str()); + std::ostringstream ss; + std::string service_namespace(members->service_namespace_); + std::string service_name(members->service_name_); + if (!service_namespace.empty()) { + // Find and replace C namespace separator with C++, in case this is using C typesupport + service_namespace = std::regex_replace(service_namespace, std::regex("__"), "::"); + ss << service_namespace << "::"; + } + ss << "dds_::" << service_name << "_Request_"; + this->setName(ss.str().c_str()); } template @@ -48,9 +57,17 @@ ResponseTypeSupport::ResponseTypeSupport assert(members); this->members_ = members->response_members_; - std::string name = std::string(members->package_name_) + "::srv::dds_::" + - members->service_name_ + "_Response_"; - this->setName(name.c_str()); + + std::ostringstream ss; + std::string service_namespace(members->service_namespace_); + std::string service_name(members->service_name_); + if (!service_namespace.empty()) { + // Find and replace C namespace separator with C++, in case this is using C typesupport + service_namespace = std::regex_replace(service_namespace, std::regex("__"), "::"); + ss << service_namespace << "::"; + } + ss << "dds_::" << service_name << "_Response_"; + this->setName(ss.str().c_str()); } } // namespace rmw_cyclonedds_cpp diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp index 4820723..a036141 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp @@ -85,12 +85,14 @@ public: } #define SIMPLEA(T) inline void serializeA (const T *x, size_t cnt) { \ - if ((off % sizeof (T)) != 0) { \ - off += sizeof (T) - (off % sizeof (T)); \ + if (cnt > 0) { \ + if ((off % sizeof (T)) != 0) { \ + off += sizeof (T) - (off % sizeof (T)); \ + } \ + resize (off + cnt * sizeof (T)); \ + memcpy (data () + off, (void *) x, cnt * sizeof (T)); \ + off += cnt * sizeof (T); \ } \ - resize (off + cnt * sizeof (T)); \ - memcpy (data () + off, (void *) x, cnt * sizeof (T)); \ - off += cnt * sizeof (T); \ } SIMPLEA (char); SIMPLEA (int8_t); @@ -111,7 +113,7 @@ public: template inline void serialize (const std::vector& x) { serialize (static_cast (x.size ())); - if (x.size () > 0) serializeA (x.data (), x.size ()); + serializeA (x.data (), x.size ()); } inline void serialize (const std::vector& x) { serialize (static_cast (x.size ())); @@ -121,7 +123,7 @@ public: template inline void serializeS (const T *x, size_t cnt) { serialize (static_cast (cnt)); - if (cnt > 0) serializeA (x, cnt); + serializeA (x, cnt); } private: @@ -192,9 +194,11 @@ public: } #define SIMPLEA(T) inline void deserializeA (T *x, size_t cnt) { \ - align (sizeof (T)); \ - memcpy ((void *) x, (void *) (data + pos), (cnt) * sizeof (T)); \ - pos += (cnt) * sizeof (T); \ + if (cnt > 0) { \ + align (sizeof (T)); \ + memcpy ((void *) x, (void *) (data + pos), (cnt) * sizeof (T)); \ + pos += (cnt) * sizeof (T); \ + } \ } SIMPLEA (char); SIMPLEA (int8_t); @@ -215,7 +219,7 @@ public: template inline void deserialize (std::vector& x) { const uint32_t sz = deserialize32 (); x.resize (sz); - if (sz > 0) deserializeA (x.data (), sz); + deserializeA (x.data (), sz); } inline void deserialize (std::vector& x) { const uint32_t sz = deserialize32 (); diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 0f0a46f..8f82ad5 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -40,6 +40,7 @@ #include #include #include +#include #include "rcutils/logging_macros.h" #include "rcutils/strdup.h" @@ -97,6 +98,7 @@ public: }; struct CddsNode { + dds_entity_t pp; rmw_guard_condition_t *graph_guard_condition; }; @@ -166,6 +168,26 @@ extern "C" const char *rmw_get_serialization_format() return eclipse_cyclonedds_serialization_format; } +extern "C" rmw_ret_t rmw_set_log_severity (rmw_log_severity_t severity __attribute__ ((unused))) +{ + RMW_SET_ERROR_MSG ("unimplemented"); + return RMW_RET_ERROR; +} + +extern "C" rmw_ret_t rmw_context_fini (rmw_context_t *context) +{ + RCUTILS_CHECK_ARGUMENT_FOR_NULL (context, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH ( + context, + context->implementation_identifier, + eclipse_cyclonedds_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + // context impl is explicitly supposed to be nullptr for now, see rmw_init's code + // RCUTILS_CHECK_ARGUMENT_FOR_NULL(context->impl, RMW_RET_INVALID_ARGUMENT); + *context = rmw_get_zero_initialized_context (); + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_init_options_init (rmw_init_options_t *init_options, rcutils_allocator_t allocator) { RMW_CHECK_ARGUMENT_FOR_NULL (init_options, RMW_RET_INVALID_ARGUMENT); @@ -226,6 +248,12 @@ extern "C" rmw_ret_t rmw_init (const rmw_init_options_t *options __attribute__ ( return RMW_RET_OK; } +extern "C" rmw_ret_t rmw_node_assert_liveliness (const rmw_node_t *node) +{ + RET_WRONG_IMPLID (node); + return RMW_RET_OK; +} + extern "C" rmw_ret_t rmw_shutdown (rmw_context_t *context) { RCUTILS_CHECK_ARGUMENT_FOR_NULL (context, RMW_RET_INVALID_ARGUMENT); @@ -268,6 +296,17 @@ static void unref_ppant () /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// +static void ggcallback (dds_entity_t rd, void *varg) +{ + auto node_impl = static_cast (varg); + void *msg = 0; + dds_sample_info_t info; + while (dds_take_mask (rd, &msg, &info, 1, 1, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE | DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) > 0) { + dds_return_loan (rd, &msg, 1); + } + (void) rmw_trigger_guard_condition (node_impl->graph_guard_condition); +} + extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((unused)), const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options) { RET_NULL_X (name, return nullptr); @@ -275,19 +314,40 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((u (void) domain_id; (void) security_options; - dds_entity_t pp = ref_ppant (); + dds_qos_t *qos = dds_create_qos (); + std::string user_data = (std::string ("ros2_name=") + std::string (name) + + std::string (";ros2_namespace=") + std::string (namespace_)); + dds_qset_userdata (qos, user_data.c_str (), user_data.size ()); + dds_entity_t pp = dds_create_participant (DDS_DOMAIN_DEFAULT, qos, nullptr); + dds_delete_qos (qos); if (pp < 0) { return nullptr; } auto *node_impl = new CddsNode (); rmw_node_t *node_handle = nullptr; + dds_listener_t *gglistener = nullptr; RET_ALLOC_X (node_impl, goto fail_node_impl); rmw_guard_condition_t *graph_guard_condition; if (!(graph_guard_condition = rmw_create_guard_condition (context))) { goto fail_ggc; } + node_impl->pp = pp; node_impl->graph_guard_condition = graph_guard_condition; + // + static const dds_entity_t bts[] = { + DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, + DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, + DDS_BUILTIN_TOPIC_DCPSPUBLICATION + }; + gglistener = dds_create_listener (node_impl); + dds_lset_data_available (gglistener, ggcallback); + for (size_t i = 0; i < sizeof (bts) / sizeof (bts[0]); i++) { + dds_create_reader (pp, bts[i], NULL, gglistener); + } + dds_delete_listener (gglistener); + // + node_handle = rmw_node_allocate (); RET_ALLOC_X (node_handle, goto fail_node_handle); node_handle->implementation_identifier = eclipse_cyclonedds_identifier; @@ -317,7 +377,7 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((u fail_ggc: delete node_impl; fail_node_impl: - unref_ppant (); + dds_delete (pp); return nullptr; } @@ -334,8 +394,8 @@ extern "C" rmw_ret_t rmw_destroy_node (rmw_node_t *node) RMW_SET_ERROR_MSG ("failed to destroy graph guard condition"); result_ret = RMW_RET_ERROR; } + dds_delete (node_impl->pp); delete node_impl; - unref_ppant (); return result_ret; } @@ -356,6 +416,12 @@ extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition (cons using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; +extern "C" rmw_ret_t rmw_get_serialized_message_size (const rosidl_message_type_support_t *type_support __attribute__ ((unused)), const rosidl_message_bounds_t *message_bounds __attribute__ ((unused)), size_t *size __attribute__ ((unused))) +{ + RMW_SET_ERROR_MSG ("rmw_get_serialized_message_size: unimplemented"); + return RMW_RET_ERROR; +} + extern "C" rmw_ret_t rmw_serialize (const void *ros_message, const rosidl_message_type_support_t *type_support, rmw_serialized_message_t *serialized_message) { std::vector data; @@ -409,7 +475,7 @@ extern "C" rmw_ret_t rmw_deserialize (const rmw_serialized_message_t *serialized /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *ros_message) +extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *ros_message, rmw_publisher_allocation_t *allocation __attribute__ ((unused))) { RET_WRONG_IMPLID (publisher); RET_NULL (ros_message); @@ -418,14 +484,12 @@ extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void * if (dds_write (pub->pubh, ros_message) >= 0) { return RMW_RET_OK; } else { - /* FIXME: what is the expected behavior when it times out? */ - RMW_SET_ERROR_MSG ("cannot publish data"); - //return RMW_RET_ERROR; - return RMW_RET_OK; + RMW_SET_ERROR_MSG ("failed to publish data"); + return RMW_RET_ERROR; } } -extern "C" rmw_ret_t rmw_publish_serialized_message (const rmw_publisher_t *publisher, const rmw_serialized_message_t *serialized_message) +extern "C" rmw_ret_t rmw_publish_serialized_message (const rmw_publisher_t *publisher, const rmw_serialized_message_t *serialized_message, rmw_publisher_allocation_t *allocation __attribute__ ((unused))) { RET_WRONG_IMPLID (publisher); RET_NULL (serialized_message); @@ -467,6 +531,7 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b dds_qos_t *qos = dds_create_qos (); switch (qos_policies->history) { case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_HISTORY_UNKNOWN: case RMW_QOS_POLICY_HISTORY_KEEP_LAST: if (qos_policies->depth > INT32_MAX) { RMW_SET_ERROR_MSG ("unsupported history depth"); @@ -481,8 +546,9 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b } switch (qos_policies->reliability) { case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_RELIABILITY_UNKNOWN: case RMW_QOS_POLICY_RELIABILITY_RELIABLE: - dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1)); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY); break; case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: dds_qset_reliability (qos, DDS_RELIABILITY_BEST_EFFORT, 0); @@ -490,6 +556,7 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b } switch (qos_policies->durability) { case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: + case RMW_QOS_POLICY_DURABILITY_UNKNOWN: case RMW_QOS_POLICY_DURABILITY_VOLATILE: dds_qset_durability (qos, DDS_DURABILITY_VOLATILE); break; @@ -503,6 +570,77 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b return qos; } +static bool get_readwrite_qos (dds_entity_t handle, rmw_qos_profile_t *qos_policies) +{ + dds_qos_t *qos = dds_create_qos (); + if (dds_get_qos (handle, qos) < 0) { + RMW_SET_ERROR_MSG ("get_readwrite_qos: invalid handle"); + goto error; + } + + { + dds_history_kind_t kind; + int32_t depth; + if (!dds_qget_history (qos, &kind, &depth)) { + RMW_SET_ERROR_MSG ("get_readwrite_qos: history not set"); + goto error; + } + switch (kind) { + case DDS_HISTORY_KEEP_LAST: + qos_policies->history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + qos_policies->depth = (uint32_t) depth; + break; + case DDS_HISTORY_KEEP_ALL: + qos_policies->history = RMW_QOS_POLICY_HISTORY_KEEP_ALL; + qos_policies->depth = 0; + break; + } + } + + { + dds_reliability_kind_t kind; + dds_duration_t max_blocking_time; + if (!dds_qget_reliability (qos, &kind, &max_blocking_time)) { + RMW_SET_ERROR_MSG ("get_readwrite_qos: history not set"); + goto error; + } + switch (kind) { + case DDS_RELIABILITY_BEST_EFFORT: + qos_policies->reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + break; + case DDS_RELIABILITY_RELIABLE: + qos_policies->reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + break; + } + } + + { + dds_durability_kind_t kind; + if (!dds_qget_durability (qos, &kind)){ + RMW_SET_ERROR_MSG ("get_readwrite_qos: durability not set"); + goto error; + } + switch (kind) + { + case DDS_DURABILITY_VOLATILE: + qos_policies->durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; + break; + case DDS_DURABILITY_TRANSIENT_LOCAL: + qos_policies->durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + break; + case DDS_DURABILITY_TRANSIENT: + case DDS_DURABILITY_PERSISTENT: + qos_policies->durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN; + break; + } + } + dds_delete_qos (qos); + return true; + error: + dds_delete_qos (qos); + return false; +} + static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) { RET_WRONG_IMPLID_X (node, return nullptr); @@ -554,6 +692,18 @@ static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosid return nullptr; } +extern "C" rmw_ret_t rmw_init_publisher_allocation (const rosidl_message_type_support_t *type_support __attribute__ ((unused)), const rosidl_message_bounds_t *message_bounds __attribute__ ((unused)), rmw_publisher_allocation_t *allocation __attribute__ ((unused))) +{ + RMW_SET_ERROR_MSG ("rmw_init_publisher_allocation: unimplemented"); + return RMW_RET_ERROR; +} + +extern "C" rmw_ret_t rmw_fini_publisher_allocation (rmw_publisher_allocation_t *allocation __attribute__ ((unused))) +{ + RMW_SET_ERROR_MSG ("rmw_fini_publisher_allocation: unimplemented"); + return RMW_RET_ERROR; +} + extern "C" rmw_publisher_t *rmw_create_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) { CddsPublisher *pub; @@ -617,6 +767,24 @@ extern "C" rmw_ret_t rmw_publisher_count_matched_subscriptions (const rmw_publis } } +rmw_ret_t rmw_publisher_assert_liveliness (const rmw_publisher_t *publisher) +{ + RET_WRONG_IMPLID (publisher); + return RMW_RET_OK; +} + +rmw_ret_t rmw_publisher_get_actual_qos (const rmw_publisher_t *publisher, rmw_qos_profile_t *qos) +{ + RET_NULL (qos); + RET_WRONG_IMPLID (publisher); + auto pub = static_cast (publisher->data); + if (get_readwrite_qos (pub->pubh, qos)) { + return RMW_RET_OK; + } else { + return RMW_RET_ERROR; + } +} + extern "C" rmw_ret_t rmw_destroy_publisher (rmw_node_t *node, rmw_publisher_t *publisher) { RET_WRONG_IMPLID (node); @@ -691,6 +859,18 @@ static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const return nullptr; } +extern "C" rmw_ret_t rmw_init_subscription_allocation (const rosidl_message_type_support_t *type_support __attribute__ ((unused)), const rosidl_message_bounds_t *message_bounds __attribute__ ((unused)), rmw_subscription_allocation_t *allocation __attribute__ ((unused))) +{ + RMW_SET_ERROR_MSG ("rmw_init_subscription_allocation: unimplemented"); + return RMW_RET_ERROR; +} + +extern "C" rmw_ret_t rmw_fini_subscription_allocation (rmw_subscription_allocation_t *allocation __attribute__ ((unused))) +{ + RMW_SET_ERROR_MSG ("rmw_fini_subscription_allocation: unimplemented"); + return RMW_RET_ERROR; +} + extern "C" rmw_subscription_t *rmw_create_subscription (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { CddsSubscription *sub; @@ -815,26 +995,33 @@ static rmw_ret_t rmw_take_ser_int (const rmw_subscription_t *subscription, rmw_s return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken) +extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_subscription_allocation_t *allocation __attribute__ ((unused))) { return rmw_take_int (subscription, ros_message, taken, nullptr); } -extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) +extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation __attribute__ ((unused))) { return rmw_take_int (subscription, ros_message, taken, message_info); } -extern "C" rmw_ret_t rmw_take_serialized_message (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken) +extern "C" rmw_ret_t rmw_take_serialized_message (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_subscription_allocation_t *allocation __attribute__ ((unused))) { return rmw_take_ser_int (subscription, serialized_message, taken, nullptr); } -extern "C" rmw_ret_t rmw_take_serialized_message_with_info (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info) +extern "C" rmw_ret_t rmw_take_serialized_message_with_info (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation __attribute__ ((unused))) { return rmw_take_ser_int (subscription, serialized_message, taken, message_info); } +extern "C" rmw_ret_t rmw_take_event (const rmw_events_t *event_handle __attribute__ ((unused)), void *event_info __attribute__ ((unused)), bool *taken) +{ + RET_NULL (taken); + *taken = false; + return RMW_RET_OK; +} + ///////////////////////////////////////////////////////////////////////////////////////// /////////// /////////// /////////// GUARDS AND WAITSETS /////////// @@ -884,7 +1071,7 @@ extern "C" rmw_ret_t rmw_trigger_guard_condition (const rmw_guard_condition_t *g return RMW_RET_OK; } -extern "C" rmw_wait_set_t *rmw_create_wait_set (size_t max_conditions) +extern "C" rmw_wait_set_t *rmw_create_wait_set (rmw_context_t *context __attribute__ ((unused)), size_t max_conditions) { (void) max_conditions; rmw_wait_set_t *wait_set = rmw_wait_set_allocate (); @@ -978,7 +1165,7 @@ static void clean_waitset_caches () } } -extern "C" rmw_ret_t rmw_wait (rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) +extern "C" rmw_ret_t rmw_wait (rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_events_t *evs __attribute__ ((unused)), rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) { RET_NULL (wait_set); CddsWaitset *ws = static_cast (wait_set->data); @@ -1078,7 +1265,7 @@ static rmw_ret_t rmw_take_response_request (CddsCS *cs, rmw_request_id_t *reques while (dds_take (cs->sub->subh, &wrap_ptr, &info, 1, 1) == 1) { if (info.valid_data) { memset (request_header, 0, sizeof (wrap.header)); - assert (sizeof (wrap.header.guid) < sizeof (wrap.header.guid)); + assert (sizeof (wrap.header.guid) <= sizeof (request_header->writer_guid)); memcpy (request_header->writer_guid, &wrap.header.guid, sizeof (wrap.header.guid)); request_header->sequence_number = wrap.header.seq; if (srcfilter == 0 || srcfilter == wrap.header.guid) { @@ -1111,10 +1298,8 @@ static rmw_ret_t rmw_send_response_request (CddsCS *cs, const cdds_request_heade if (dds_write (cs->pub->pubh, static_cast (&wrap)) >= 0) { return RMW_RET_OK; } else { - /* FIXME: what is the expected behavior when it times out? */ RMW_SET_ERROR_MSG ("cannot publish data"); - //return RMW_RET_ERROR; - return RMW_RET_OK; + return RMW_RET_ERROR; } } @@ -1337,41 +1522,87 @@ extern "C" rmw_ret_t rmw_destroy_service (rmw_node_t *node, rmw_service_t *servi extern "C" rmw_ret_t rmw_get_node_names (const rmw_node_t *node, rcutils_string_array_t *node_names, rcutils_string_array_t *node_namespaces) { -#if 0 // NIY + dds_entity_t rd; + std::set< std::pair > ns; + RET_WRONG_IMPLID (node); - if (rmw_check_zero_rmw_string_array (node_names) != RMW_RET_OK) { + if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK || + rmw_check_zero_rmw_string_array(node_namespaces) != RMW_RET_OK) { return RMW_RET_ERROR; } - auto impl = static_cast (node->data); - - // FIXME: sorry, can't do it with current Zenoh - auto participant_names = std::vector{}; - rcutils_allocator_t allocator = rcutils_get_default_allocator (); - rcutils_ret_t rcutils_ret = - rcutils_string_array_init (node_names, participant_names.size (), &allocator); - if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ()) - return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret); - } - for (size_t i = 0; i < participant_names.size (); ++i) { - node_names->data[i] = rcutils_strdup (participant_names[i].c_str (), allocator); - if (!node_names->data[i]) { - RMW_SET_ERROR_MSG ("failed to allocate memory for node name") - rcutils_ret = rcutils_string_array_fini (node_names); - if (rcutils_ret != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED ( - "rmw_cyclonedds_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string_safe ()); + { + if ((rd = dds_create_reader (ref_ppant (), DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) { + unref_ppant (); + RMW_SET_ERROR_MSG ("rmw_get_node_names: failed to create reader"); + return RMW_RET_ERROR; + } + dds_sample_info_t info; + void *msg = NULL; + int32_t n; + const auto re = std::regex ("^ros2_name=(.*);ros2_namespace=(.*)$"); + while ((n = dds_take (rd, &msg, &info, 1, 1)) == 1) { + if (info.valid_data && info.instance_state == DDS_IST_ALIVE) { + auto sample = static_cast (msg); + void *ud; + size_t udsz; + if (dds_qget_userdata (sample->qos, &ud, &udsz)) { + std::cmatch cm; + /* CycloneDDS guarantees a null-terminated user data, so no need to bother with the length */ + if (std::regex_match (static_cast (ud), cm, re, std::regex_constants::match_default)) { + ns.insert (std::make_pair (std::string (cm[1]), std::string (cm[2]))); + } + dds_free (ud); + } } - return RMW_RET_BAD_ALLOC; + dds_return_loan (rd, &msg, n); + } + dds_delete (rd); + unref_ppant (); + if (n < 0) { + RMW_SET_ERROR_MSG ("rmw_get_node_names: error reading participants"); + return RMW_RET_ERROR; + } + } + + { + rcutils_allocator_t allocator = rcutils_get_default_allocator (); + if (rcutils_string_array_init (node_names, ns.size (), &allocator) != RCUTILS_RET_OK || + rcutils_string_array_init (node_namespaces, ns.size(), &allocator) != RCUTILS_RET_OK) { + RMW_SET_ERROR_MSG (rcutils_get_error_string ().str); + goto fail_alloc; + } + size_t i = 0; + for (auto&& n : ns) { + node_names->data[i] = rcutils_strdup (n.first.c_str (), allocator); + node_namespaces->data[i] = rcutils_strdup (n.second.c_str (), allocator); + if (!node_names->data[i] || !node_namespaces->data[i]) { + RMW_SET_ERROR_MSG ("rmw_get_node_names for name/namespace"); + goto fail_alloc; + } + i++; } } return RMW_RET_OK; -#else - (void) node; (void) node_names; (void) node_namespaces; - return RMW_RET_TIMEOUT; -#endif + +fail_alloc: + if (node_names) { + if (rcutils_string_array_fini (node_names) != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED ( + "rmw_cyclonedds_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string ().str); + rcutils_reset_error(); + } + } + if (node_namespaces) { + if (rcutils_string_array_fini(node_namespaces) != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rmw_cyclonedds_cpp", + "failed to cleanup during error handling: %s", rcutils_get_error_string ().str); + rcutils_reset_error(); + } + } + return RMW_RET_BAD_ALLOC; } static rmw_ret_t rmw_collect_tptyp_for_kind (std::map>& tt, dds_entity_t builtin_topic)