update to match ROS2 Dashing interface

passes a decent subset of the tests ...

fixes:
* sequences of simple types: remove accidental alignment
* trigger graph guard on any built-in topic
* create a participant for each node, with node name/namespace in user data

It is still only a proof-of-concept, but it might now actually be usable ...
This commit is contained in:
Erik Boasson 2019-05-12 13:52:26 +02:00
parent e520cb4d63
commit 24726b4685
4 changed files with 328 additions and 67 deletions

View file

@ -17,6 +17,8 @@
#include <cassert>
#include <memory>
#include <regex>
#include <sstream>
#include <string>
#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp"
@ -31,9 +33,16 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(const MembersType * members)
assert(members);
this->members_ = members;
std::string name = std::string(members->package_name_) + "::msg::dds_::" +
members->message_name_ + "_";
this->setName(name.c_str());
std::ostringstream ss;
std::string message_namespace(this->members_->message_namespace_);
std::string message_name(this->members_->message_name_);
if (!message_namespace.empty()) {
// Find and replace C namespace separator with C++, in case this is using C typesupport
message_namespace = std::regex_replace(message_namespace, std::regex("__"), "::");
ss << message_namespace << "::";
}
ss << "dds_::" << message_name << "_";
this->setName(ss.str().c_str());
}
} // namespace rmw_cyclonedds_cpp

View file

@ -16,6 +16,8 @@
#define RMW_CYCLONEDDS_CPP__SERVICETYPESUPPORT_IMPL_HPP_
#include <cassert>
#include <regex>
#include <sstream>
#include <string>
#include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp"
@ -36,9 +38,16 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
assert(members);
this->members_ = members->request_members_;
std::string name = std::string(members->package_name_) + "::srv::dds_::" +
members->service_name_ + "_Request_";
this->setName(name.c_str());
std::ostringstream ss;
std::string service_namespace(members->service_namespace_);
std::string service_name(members->service_name_);
if (!service_namespace.empty()) {
// Find and replace C namespace separator with C++, in case this is using C typesupport
service_namespace = std::regex_replace(service_namespace, std::regex("__"), "::");
ss << service_namespace << "::";
}
ss << "dds_::" << service_name << "_Request_";
this->setName(ss.str().c_str());
}
template<typename ServiceMembersType, typename MessageMembersType>
@ -48,9 +57,17 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
assert(members);
this->members_ = members->response_members_;
std::string name = std::string(members->package_name_) + "::srv::dds_::" +
members->service_name_ + "_Response_";
this->setName(name.c_str());
std::ostringstream ss;
std::string service_namespace(members->service_namespace_);
std::string service_name(members->service_name_);
if (!service_namespace.empty()) {
// Find and replace C namespace separator with C++, in case this is using C typesupport
service_namespace = std::regex_replace(service_namespace, std::regex("__"), "::");
ss << service_namespace << "::";
}
ss << "dds_::" << service_name << "_Response_";
this->setName(ss.str().c_str());
}
} // namespace rmw_cyclonedds_cpp

View file

@ -85,12 +85,14 @@ public:
}
#define SIMPLEA(T) inline void serializeA (const T *x, size_t cnt) { \
if ((off % sizeof (T)) != 0) { \
off += sizeof (T) - (off % sizeof (T)); \
if (cnt > 0) { \
if ((off % sizeof (T)) != 0) { \
off += sizeof (T) - (off % sizeof (T)); \
} \
resize (off + cnt * sizeof (T)); \
memcpy (data () + off, (void *) x, cnt * sizeof (T)); \
off += cnt * sizeof (T); \
} \
resize (off + cnt * sizeof (T)); \
memcpy (data () + off, (void *) x, cnt * sizeof (T)); \
off += cnt * sizeof (T); \
}
SIMPLEA (char);
SIMPLEA (int8_t);
@ -111,7 +113,7 @@ public:
template<class T> inline void serialize (const std::vector<T>& x)
{
serialize (static_cast<uint32_t> (x.size ()));
if (x.size () > 0) serializeA (x.data (), x.size ());
serializeA (x.data (), x.size ());
}
inline void serialize (const std::vector<bool>& x) {
serialize (static_cast<uint32_t> (x.size ()));
@ -121,7 +123,7 @@ public:
template<class T> inline void serializeS (const T *x, size_t cnt)
{
serialize (static_cast<uint32_t> (cnt));
if (cnt > 0) serializeA (x, cnt);
serializeA (x, cnt);
}
private:
@ -192,9 +194,11 @@ public:
}
#define SIMPLEA(T) inline void deserializeA (T *x, size_t cnt) { \
align (sizeof (T)); \
memcpy ((void *) x, (void *) (data + pos), (cnt) * sizeof (T)); \
pos += (cnt) * sizeof (T); \
if (cnt > 0) { \
align (sizeof (T)); \
memcpy ((void *) x, (void *) (data + pos), (cnt) * sizeof (T)); \
pos += (cnt) * sizeof (T); \
} \
}
SIMPLEA (char);
SIMPLEA (int8_t);
@ -215,7 +219,7 @@ public:
template<class T> inline void deserialize (std::vector<T>& x) {
const uint32_t sz = deserialize32 ();
x.resize (sz);
if (sz > 0) deserializeA (x.data (), sz);
deserializeA (x.data (), sz);
}
inline void deserialize (std::vector<bool>& x) {
const uint32_t sz = deserialize32 ();

View file

@ -40,6 +40,7 @@
#include <map>
#include <set>
#include <atomic>
#include <regex>
#include "rcutils/logging_macros.h"
#include "rcutils/strdup.h"
@ -97,6 +98,7 @@ public:
};
struct CddsNode {
dds_entity_t pp;
rmw_guard_condition_t *graph_guard_condition;
};
@ -166,6 +168,26 @@ extern "C" const char *rmw_get_serialization_format()
return eclipse_cyclonedds_serialization_format;
}
extern "C" rmw_ret_t rmw_set_log_severity (rmw_log_severity_t severity __attribute__ ((unused)))
{
RMW_SET_ERROR_MSG ("unimplemented");
return RMW_RET_ERROR;
}
extern "C" rmw_ret_t rmw_context_fini (rmw_context_t *context)
{
RCUTILS_CHECK_ARGUMENT_FOR_NULL (context, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH (
context,
context->implementation_identifier,
eclipse_cyclonedds_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
// context impl is explicitly supposed to be nullptr for now, see rmw_init's code
// RCUTILS_CHECK_ARGUMENT_FOR_NULL(context->impl, RMW_RET_INVALID_ARGUMENT);
*context = rmw_get_zero_initialized_context ();
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_init_options_init (rmw_init_options_t *init_options, rcutils_allocator_t allocator)
{
RMW_CHECK_ARGUMENT_FOR_NULL (init_options, RMW_RET_INVALID_ARGUMENT);
@ -226,6 +248,12 @@ extern "C" rmw_ret_t rmw_init (const rmw_init_options_t *options __attribute__ (
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_node_assert_liveliness (const rmw_node_t *node)
{
RET_WRONG_IMPLID (node);
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_shutdown (rmw_context_t *context)
{
RCUTILS_CHECK_ARGUMENT_FOR_NULL (context, RMW_RET_INVALID_ARGUMENT);
@ -268,6 +296,17 @@ static void unref_ppant ()
/////////// ///////////
/////////////////////////////////////////////////////////////////////////////////////////
static void ggcallback (dds_entity_t rd, void *varg)
{
auto node_impl = static_cast<CddsNode *> (varg);
void *msg = 0;
dds_sample_info_t info;
while (dds_take_mask (rd, &msg, &info, 1, 1, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE | DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) > 0) {
dds_return_loan (rd, &msg, 1);
}
(void) rmw_trigger_guard_condition (node_impl->graph_guard_condition);
}
extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((unused)), const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options)
{
RET_NULL_X (name, return nullptr);
@ -275,19 +314,40 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((u
(void) domain_id;
(void) security_options;
dds_entity_t pp = ref_ppant ();
dds_qos_t *qos = dds_create_qos ();
std::string user_data = (std::string ("ros2_name=") + std::string (name) +
std::string (";ros2_namespace=") + std::string (namespace_));
dds_qset_userdata (qos, user_data.c_str (), user_data.size ());
dds_entity_t pp = dds_create_participant (DDS_DOMAIN_DEFAULT, qos, nullptr);
dds_delete_qos (qos);
if (pp < 0) {
return nullptr;
}
auto *node_impl = new CddsNode ();
rmw_node_t *node_handle = nullptr;
dds_listener_t *gglistener = nullptr;
RET_ALLOC_X (node_impl, goto fail_node_impl);
rmw_guard_condition_t *graph_guard_condition;
if (!(graph_guard_condition = rmw_create_guard_condition (context))) {
goto fail_ggc;
}
node_impl->pp = pp;
node_impl->graph_guard_condition = graph_guard_condition;
//
static const dds_entity_t bts[] = {
DDS_BUILTIN_TOPIC_DCPSPARTICIPANT,
DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION,
DDS_BUILTIN_TOPIC_DCPSPUBLICATION
};
gglistener = dds_create_listener (node_impl);
dds_lset_data_available (gglistener, ggcallback);
for (size_t i = 0; i < sizeof (bts) / sizeof (bts[0]); i++) {
dds_create_reader (pp, bts[i], NULL, gglistener);
}
dds_delete_listener (gglistener);
//
node_handle = rmw_node_allocate ();
RET_ALLOC_X (node_handle, goto fail_node_handle);
node_handle->implementation_identifier = eclipse_cyclonedds_identifier;
@ -317,7 +377,7 @@ extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((u
fail_ggc:
delete node_impl;
fail_node_impl:
unref_ppant ();
dds_delete (pp);
return nullptr;
}
@ -334,8 +394,8 @@ extern "C" rmw_ret_t rmw_destroy_node (rmw_node_t *node)
RMW_SET_ERROR_MSG ("failed to destroy graph guard condition");
result_ret = RMW_RET_ERROR;
}
dds_delete (node_impl->pp);
delete node_impl;
unref_ppant ();
return result_ret;
}
@ -356,6 +416,12 @@ extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition (cons
using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_c__MessageMembers>;
using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport<rosidl_typesupport_introspection_cpp::MessageMembers>;
extern "C" rmw_ret_t rmw_get_serialized_message_size (const rosidl_message_type_support_t *type_support __attribute__ ((unused)), const rosidl_message_bounds_t *message_bounds __attribute__ ((unused)), size_t *size __attribute__ ((unused)))
{
RMW_SET_ERROR_MSG ("rmw_get_serialized_message_size: unimplemented");
return RMW_RET_ERROR;
}
extern "C" rmw_ret_t rmw_serialize (const void *ros_message, const rosidl_message_type_support_t *type_support, rmw_serialized_message_t *serialized_message)
{
std::vector<unsigned char> data;
@ -409,7 +475,7 @@ extern "C" rmw_ret_t rmw_deserialize (const rmw_serialized_message_t *serialized
/////////// ///////////
/////////////////////////////////////////////////////////////////////////////////////////
extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *ros_message)
extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *ros_message, rmw_publisher_allocation_t *allocation __attribute__ ((unused)))
{
RET_WRONG_IMPLID (publisher);
RET_NULL (ros_message);
@ -418,14 +484,12 @@ extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *
if (dds_write (pub->pubh, ros_message) >= 0) {
return RMW_RET_OK;
} else {
/* FIXME: what is the expected behavior when it times out? */
RMW_SET_ERROR_MSG ("cannot publish data");
//return RMW_RET_ERROR;
return RMW_RET_OK;
RMW_SET_ERROR_MSG ("failed to publish data");
return RMW_RET_ERROR;
}
}
extern "C" rmw_ret_t rmw_publish_serialized_message (const rmw_publisher_t *publisher, const rmw_serialized_message_t *serialized_message)
extern "C" rmw_ret_t rmw_publish_serialized_message (const rmw_publisher_t *publisher, const rmw_serialized_message_t *serialized_message, rmw_publisher_allocation_t *allocation __attribute__ ((unused)))
{
RET_WRONG_IMPLID (publisher);
RET_NULL (serialized_message);
@ -467,6 +531,7 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b
dds_qos_t *qos = dds_create_qos ();
switch (qos_policies->history) {
case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_HISTORY_UNKNOWN:
case RMW_QOS_POLICY_HISTORY_KEEP_LAST:
if (qos_policies->depth > INT32_MAX) {
RMW_SET_ERROR_MSG ("unsupported history depth");
@ -481,8 +546,9 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b
}
switch (qos_policies->reliability) {
case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_RELIABILITY_UNKNOWN:
case RMW_QOS_POLICY_RELIABILITY_RELIABLE:
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1));
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
break;
case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT:
dds_qset_reliability (qos, DDS_RELIABILITY_BEST_EFFORT, 0);
@ -490,6 +556,7 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b
}
switch (qos_policies->durability) {
case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT:
case RMW_QOS_POLICY_DURABILITY_UNKNOWN:
case RMW_QOS_POLICY_DURABILITY_VOLATILE:
dds_qset_durability (qos, DDS_DURABILITY_VOLATILE);
break;
@ -503,6 +570,77 @@ static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, b
return qos;
}
static bool get_readwrite_qos (dds_entity_t handle, rmw_qos_profile_t *qos_policies)
{
dds_qos_t *qos = dds_create_qos ();
if (dds_get_qos (handle, qos) < 0) {
RMW_SET_ERROR_MSG ("get_readwrite_qos: invalid handle");
goto error;
}
{
dds_history_kind_t kind;
int32_t depth;
if (!dds_qget_history (qos, &kind, &depth)) {
RMW_SET_ERROR_MSG ("get_readwrite_qos: history not set");
goto error;
}
switch (kind) {
case DDS_HISTORY_KEEP_LAST:
qos_policies->history = RMW_QOS_POLICY_HISTORY_KEEP_LAST;
qos_policies->depth = (uint32_t) depth;
break;
case DDS_HISTORY_KEEP_ALL:
qos_policies->history = RMW_QOS_POLICY_HISTORY_KEEP_ALL;
qos_policies->depth = 0;
break;
}
}
{
dds_reliability_kind_t kind;
dds_duration_t max_blocking_time;
if (!dds_qget_reliability (qos, &kind, &max_blocking_time)) {
RMW_SET_ERROR_MSG ("get_readwrite_qos: history not set");
goto error;
}
switch (kind) {
case DDS_RELIABILITY_BEST_EFFORT:
qos_policies->reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;
break;
case DDS_RELIABILITY_RELIABLE:
qos_policies->reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE;
break;
}
}
{
dds_durability_kind_t kind;
if (!dds_qget_durability (qos, &kind)){
RMW_SET_ERROR_MSG ("get_readwrite_qos: durability not set");
goto error;
}
switch (kind)
{
case DDS_DURABILITY_VOLATILE:
qos_policies->durability = RMW_QOS_POLICY_DURABILITY_VOLATILE;
break;
case DDS_DURABILITY_TRANSIENT_LOCAL:
qos_policies->durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
break;
case DDS_DURABILITY_TRANSIENT:
case DDS_DURABILITY_PERSISTENT:
qos_policies->durability = RMW_QOS_POLICY_DURABILITY_UNKNOWN;
break;
}
}
dds_delete_qos (qos);
return true;
error:
dds_delete_qos (qos);
return false;
}
static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies)
{
RET_WRONG_IMPLID_X (node, return nullptr);
@ -554,6 +692,18 @@ static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosid
return nullptr;
}
extern "C" rmw_ret_t rmw_init_publisher_allocation (const rosidl_message_type_support_t *type_support __attribute__ ((unused)), const rosidl_message_bounds_t *message_bounds __attribute__ ((unused)), rmw_publisher_allocation_t *allocation __attribute__ ((unused)))
{
RMW_SET_ERROR_MSG ("rmw_init_publisher_allocation: unimplemented");
return RMW_RET_ERROR;
}
extern "C" rmw_ret_t rmw_fini_publisher_allocation (rmw_publisher_allocation_t *allocation __attribute__ ((unused)))
{
RMW_SET_ERROR_MSG ("rmw_fini_publisher_allocation: unimplemented");
return RMW_RET_ERROR;
}
extern "C" rmw_publisher_t *rmw_create_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies)
{
CddsPublisher *pub;
@ -617,6 +767,24 @@ extern "C" rmw_ret_t rmw_publisher_count_matched_subscriptions (const rmw_publis
}
}
rmw_ret_t rmw_publisher_assert_liveliness (const rmw_publisher_t *publisher)
{
RET_WRONG_IMPLID (publisher);
return RMW_RET_OK;
}
rmw_ret_t rmw_publisher_get_actual_qos (const rmw_publisher_t *publisher, rmw_qos_profile_t *qos)
{
RET_NULL (qos);
RET_WRONG_IMPLID (publisher);
auto pub = static_cast<CddsPublisher *> (publisher->data);
if (get_readwrite_qos (pub->pubh, qos)) {
return RMW_RET_OK;
} else {
return RMW_RET_ERROR;
}
}
extern "C" rmw_ret_t rmw_destroy_publisher (rmw_node_t *node, rmw_publisher_t *publisher)
{
RET_WRONG_IMPLID (node);
@ -691,6 +859,18 @@ static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const
return nullptr;
}
extern "C" rmw_ret_t rmw_init_subscription_allocation (const rosidl_message_type_support_t *type_support __attribute__ ((unused)), const rosidl_message_bounds_t *message_bounds __attribute__ ((unused)), rmw_subscription_allocation_t *allocation __attribute__ ((unused)))
{
RMW_SET_ERROR_MSG ("rmw_init_subscription_allocation: unimplemented");
return RMW_RET_ERROR;
}
extern "C" rmw_ret_t rmw_fini_subscription_allocation (rmw_subscription_allocation_t *allocation __attribute__ ((unused)))
{
RMW_SET_ERROR_MSG ("rmw_fini_subscription_allocation: unimplemented");
return RMW_RET_ERROR;
}
extern "C" rmw_subscription_t *rmw_create_subscription (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications)
{
CddsSubscription *sub;
@ -815,26 +995,33 @@ static rmw_ret_t rmw_take_ser_int (const rmw_subscription_t *subscription, rmw_s
return RMW_RET_OK;
}
extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken)
extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_subscription_allocation_t *allocation __attribute__ ((unused)))
{
return rmw_take_int (subscription, ros_message, taken, nullptr);
}
extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info)
extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation __attribute__ ((unused)))
{
return rmw_take_int (subscription, ros_message, taken, message_info);
}
extern "C" rmw_ret_t rmw_take_serialized_message (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken)
extern "C" rmw_ret_t rmw_take_serialized_message (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_subscription_allocation_t *allocation __attribute__ ((unused)))
{
return rmw_take_ser_int (subscription, serialized_message, taken, nullptr);
}
extern "C" rmw_ret_t rmw_take_serialized_message_with_info (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info)
extern "C" rmw_ret_t rmw_take_serialized_message_with_info (const rmw_subscription_t *subscription, rmw_serialized_message_t *serialized_message, bool *taken, rmw_message_info_t *message_info, rmw_subscription_allocation_t *allocation __attribute__ ((unused)))
{
return rmw_take_ser_int (subscription, serialized_message, taken, message_info);
}
extern "C" rmw_ret_t rmw_take_event (const rmw_events_t *event_handle __attribute__ ((unused)), void *event_info __attribute__ ((unused)), bool *taken)
{
RET_NULL (taken);
*taken = false;
return RMW_RET_OK;
}
/////////////////////////////////////////////////////////////////////////////////////////
/////////// ///////////
/////////// GUARDS AND WAITSETS ///////////
@ -884,7 +1071,7 @@ extern "C" rmw_ret_t rmw_trigger_guard_condition (const rmw_guard_condition_t *g
return RMW_RET_OK;
}
extern "C" rmw_wait_set_t *rmw_create_wait_set (size_t max_conditions)
extern "C" rmw_wait_set_t *rmw_create_wait_set (rmw_context_t *context __attribute__ ((unused)), size_t max_conditions)
{
(void) max_conditions;
rmw_wait_set_t *wait_set = rmw_wait_set_allocate ();
@ -978,7 +1165,7 @@ static void clean_waitset_caches ()
}
}
extern "C" rmw_ret_t rmw_wait (rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout)
extern "C" rmw_ret_t rmw_wait (rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_events_t *evs __attribute__ ((unused)), rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout)
{
RET_NULL (wait_set);
CddsWaitset *ws = static_cast<CddsWaitset *> (wait_set->data);
@ -1078,7 +1265,7 @@ static rmw_ret_t rmw_take_response_request (CddsCS *cs, rmw_request_id_t *reques
while (dds_take (cs->sub->subh, &wrap_ptr, &info, 1, 1) == 1) {
if (info.valid_data) {
memset (request_header, 0, sizeof (wrap.header));
assert (sizeof (wrap.header.guid) < sizeof (wrap.header.guid));
assert (sizeof (wrap.header.guid) <= sizeof (request_header->writer_guid));
memcpy (request_header->writer_guid, &wrap.header.guid, sizeof (wrap.header.guid));
request_header->sequence_number = wrap.header.seq;
if (srcfilter == 0 || srcfilter == wrap.header.guid) {
@ -1111,10 +1298,8 @@ static rmw_ret_t rmw_send_response_request (CddsCS *cs, const cdds_request_heade
if (dds_write (cs->pub->pubh, static_cast<const void *> (&wrap)) >= 0) {
return RMW_RET_OK;
} else {
/* FIXME: what is the expected behavior when it times out? */
RMW_SET_ERROR_MSG ("cannot publish data");
//return RMW_RET_ERROR;
return RMW_RET_OK;
return RMW_RET_ERROR;
}
}
@ -1337,41 +1522,87 @@ extern "C" rmw_ret_t rmw_destroy_service (rmw_node_t *node, rmw_service_t *servi
extern "C" rmw_ret_t rmw_get_node_names (const rmw_node_t *node, rcutils_string_array_t *node_names, rcutils_string_array_t *node_namespaces)
{
#if 0 // NIY
dds_entity_t rd;
std::set< std::pair<std::string, std::string> > ns;
RET_WRONG_IMPLID (node);
if (rmw_check_zero_rmw_string_array (node_names) != RMW_RET_OK) {
if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK ||
rmw_check_zero_rmw_string_array(node_namespaces) != RMW_RET_OK) {
return RMW_RET_ERROR;
}
auto impl = static_cast<CddsNode *> (node->data);
// FIXME: sorry, can't do it with current Zenoh
auto participant_names = std::vector<std::string>{};
rcutils_allocator_t allocator = rcutils_get_default_allocator ();
rcutils_ret_t rcutils_ret =
rcutils_string_array_init (node_names, participant_names.size (), &allocator);
if (rcutils_ret != RCUTILS_RET_OK) {
RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ())
return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret);
}
for (size_t i = 0; i < participant_names.size (); ++i) {
node_names->data[i] = rcutils_strdup (participant_names[i].c_str (), allocator);
if (!node_names->data[i]) {
RMW_SET_ERROR_MSG ("failed to allocate memory for node name")
rcutils_ret = rcutils_string_array_fini (node_names);
if (rcutils_ret != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED (
"rmw_cyclonedds_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string_safe ());
{
if ((rd = dds_create_reader (ref_ppant (), DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) {
unref_ppant ();
RMW_SET_ERROR_MSG ("rmw_get_node_names: failed to create reader");
return RMW_RET_ERROR;
}
dds_sample_info_t info;
void *msg = NULL;
int32_t n;
const auto re = std::regex ("^ros2_name=(.*);ros2_namespace=(.*)$");
while ((n = dds_take (rd, &msg, &info, 1, 1)) == 1) {
if (info.valid_data && info.instance_state == DDS_IST_ALIVE) {
auto sample = static_cast<const dds_builtintopic_participant_t *> (msg);
void *ud;
size_t udsz;
if (dds_qget_userdata (sample->qos, &ud, &udsz)) {
std::cmatch cm;
/* CycloneDDS guarantees a null-terminated user data, so no need to bother with the length */
if (std::regex_match (static_cast<char *> (ud), cm, re, std::regex_constants::match_default)) {
ns.insert (std::make_pair (std::string (cm[1]), std::string (cm[2])));
}
dds_free (ud);
}
}
return RMW_RET_BAD_ALLOC;
dds_return_loan (rd, &msg, n);
}
dds_delete (rd);
unref_ppant ();
if (n < 0) {
RMW_SET_ERROR_MSG ("rmw_get_node_names: error reading participants");
return RMW_RET_ERROR;
}
}
{
rcutils_allocator_t allocator = rcutils_get_default_allocator ();
if (rcutils_string_array_init (node_names, ns.size (), &allocator) != RCUTILS_RET_OK ||
rcutils_string_array_init (node_namespaces, ns.size(), &allocator) != RCUTILS_RET_OK) {
RMW_SET_ERROR_MSG (rcutils_get_error_string ().str);
goto fail_alloc;
}
size_t i = 0;
for (auto&& n : ns) {
node_names->data[i] = rcutils_strdup (n.first.c_str (), allocator);
node_namespaces->data[i] = rcutils_strdup (n.second.c_str (), allocator);
if (!node_names->data[i] || !node_namespaces->data[i]) {
RMW_SET_ERROR_MSG ("rmw_get_node_names for name/namespace");
goto fail_alloc;
}
i++;
}
}
return RMW_RET_OK;
#else
(void) node; (void) node_names; (void) node_namespaces;
return RMW_RET_TIMEOUT;
#endif
fail_alloc:
if (node_names) {
if (rcutils_string_array_fini (node_names) != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED (
"rmw_cyclonedds_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string ().str);
rcutils_reset_error();
}
}
if (node_namespaces) {
if (rcutils_string_array_fini(node_namespaces) != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rmw_cyclonedds_cpp",
"failed to cleanup during error handling: %s", rcutils_get_error_string ().str);
rcutils_reset_error();
}
}
return RMW_RET_BAD_ALLOC;
}
static rmw_ret_t rmw_collect_tptyp_for_kind (std::map<std::string, std::set<std::string>>& tt, dds_entity_t builtin_topic)