From c9a23a9b8a09842e0ccd44a719067d6559026e1b Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 29 Apr 2019 09:33:13 +0200 Subject: [PATCH] update for Cyclone DDS changes and ROS2 changes The changes in this commit make it compile with ROS2 Crystal Clemmys and current Cyclone DDS. The RMW interface of ROS2 was modified in some ways and extended in some other ways since Bouncy Bolson; and similarly, Cyclone now has a somewhat reasonable interface for custom sample representations and serialization, but the code in this commit probably contains mistakes in using it. Therefore, the expectation should be that this doesn't actually work just yet, though it probably is quite close. As the old state wouldn't build at all with any version of Cyclone DDS except the early commits, this is significant progress already. --- rmw_cyclonedds_cpp/CMakeLists.txt | 7 +- .../rmw_cyclonedds_cpp/TypeSupport.hpp | 4 +- .../rmw_cyclonedds_cpp/TypeSupport_impl.hpp | 90 +- .../include/rmw_cyclonedds_cpp/macros.hpp | 10 +- .../include/rmw_cyclonedds_cpp/serdata.hpp | 55 + .../include/rmw_cyclonedds_cpp/serdes.hpp | 386 +++-- rmw_cyclonedds_cpp/src/rmw_node.cpp | 1362 +++++++---------- rmw_cyclonedds_cpp/src/serdata.cpp | 306 ++++ rmw_cyclonedds_cpp/src/serdes.cpp | 61 +- 9 files changed, 1202 insertions(+), 1079 deletions(-) create mode 100644 rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp create mode 100644 rmw_cyclonedds_cpp/src/serdata.cpp diff --git a/rmw_cyclonedds_cpp/CMakeLists.txt b/rmw_cyclonedds_cpp/CMakeLists.txt index 4a9e215..369d84b 100644 --- a/rmw_cyclonedds_cpp/CMakeLists.txt +++ b/rmw_cyclonedds_cpp/CMakeLists.txt @@ -52,12 +52,11 @@ include_directories(include) add_library(rmw_cyclonedds_cpp src/namespace_prefix.cpp src/rmw_node.cpp + src/serdata.cpp src/serdes.cpp ) -idlc_generate(rmw_cyclonedds_topic_lib src/rmw_cyclonedds_topic.idl) -target_link_libraries(rmw_cyclonedds_cpp - rmw_cyclonedds_topic_lib CycloneDDS::ddsc) +target_link_libraries(rmw_cyclonedds_cpp CycloneDDS::ddsc) ament_target_dependencies(rmw_cyclonedds_cpp "rcutils" @@ -72,7 +71,7 @@ configure_rmw_library(rmw_cyclonedds_cpp) # Causes the visibility macros to use dllexport rather than dllimport, # which is appropriate when building the dll but not consuming it. target_compile_definitions(${PROJECT_NAME} -PRIVATE "RMW_CYCLONEDDS_CPP_BUILDING_LIBRARY") + PRIVATE "RMW_CYCLONEDDS_CPP_BUILDING_LIBRARY") ament_export_include_directories(include) ament_export_libraries(rmw_cyclonedds_cpp) diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp index d5dd4d8..2d3f181 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp @@ -59,13 +59,13 @@ struct StringHelper if (!c_string) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", - "Failed to cast data as rosidl_generator_c__String") + "Failed to cast data as rosidl_generator_c__String"); return ""; } if (!c_string->data) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", - "rosidl_generator_c_String had invalid data") + "rosidl_generator_c_String had invalid data"); return ""; } return std::string(c_string->data); diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp index 9636449..e0ad3ed 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp @@ -30,7 +30,7 @@ #include "rosidl_typesupport_introspection_c/message_introspection.h" #include "rosidl_typesupport_introspection_c/service_introspection.h" -#include "rosidl_generator_c/primitives_array_functions.h" +#include "rosidl_generator_c/primitives_sequence_functions.h" #include "serdes.hpp" @@ -38,37 +38,37 @@ namespace rmw_cyclonedds_cpp { template -struct GenericCArray; +struct GenericCSequence; // multiple definitions of ambiguous primitive types -SPECIALIZE_GENERIC_C_ARRAY(bool, bool) -SPECIALIZE_GENERIC_C_ARRAY(byte, uint8_t) -SPECIALIZE_GENERIC_C_ARRAY(char, char) -SPECIALIZE_GENERIC_C_ARRAY(float32, float) -SPECIALIZE_GENERIC_C_ARRAY(float64, double) -SPECIALIZE_GENERIC_C_ARRAY(int8, int8_t) -SPECIALIZE_GENERIC_C_ARRAY(int16, int16_t) -SPECIALIZE_GENERIC_C_ARRAY(uint16, uint16_t) -SPECIALIZE_GENERIC_C_ARRAY(int32, int32_t) -SPECIALIZE_GENERIC_C_ARRAY(uint32, uint32_t) -SPECIALIZE_GENERIC_C_ARRAY(int64, int64_t) -SPECIALIZE_GENERIC_C_ARRAY(uint64, uint64_t) +SPECIALIZE_GENERIC_C_SEQUENCE(bool, bool) +SPECIALIZE_GENERIC_C_SEQUENCE(byte, uint8_t) +SPECIALIZE_GENERIC_C_SEQUENCE(char, char) +SPECIALIZE_GENERIC_C_SEQUENCE(float32, float) +SPECIALIZE_GENERIC_C_SEQUENCE(float64, double) +SPECIALIZE_GENERIC_C_SEQUENCE(int8, int8_t) +SPECIALIZE_GENERIC_C_SEQUENCE(int16, int16_t) +SPECIALIZE_GENERIC_C_SEQUENCE(uint16, uint16_t) +SPECIALIZE_GENERIC_C_SEQUENCE(int32, int32_t) +SPECIALIZE_GENERIC_C_SEQUENCE(uint32, uint32_t) +SPECIALIZE_GENERIC_C_SEQUENCE(int64, int64_t) +SPECIALIZE_GENERIC_C_SEQUENCE(uint64, uint64_t) -typedef struct rosidl_generator_c__void__Array +typedef struct rosidl_generator_c__void__Sequence { void * data; /// The number of valid items in data size_t size; /// The number of allocated items in data size_t capacity; -} rosidl_generator_c__void__Array; +} rosidl_generator_c__void__Sequence; inline bool -rosidl_generator_c__void__Array__init( - rosidl_generator_c__void__Array * array, size_t size, size_t member_size) +rosidl_generator_c__void__Sequence__init( + rosidl_generator_c__void__Sequence * sequence, size_t size, size_t member_size) { - if (!array) { + if (!sequence) { return false; } void * data = nullptr; @@ -78,31 +78,31 @@ rosidl_generator_c__void__Array__init( return false; } } - array->data = data; - array->size = size; - array->capacity = size; + sequence->data = data; + sequence->size = size; + sequence->capacity = size; return true; } inline void -rosidl_generator_c__void__Array__fini(rosidl_generator_c__void__Array * array) +rosidl_generator_c__void__Sequence__fini(rosidl_generator_c__void__Sequence * sequence) { - if (!array) { + if (!sequence) { return; } - if (array->data) { + if (sequence->data) { // ensure that data and capacity values are consistent - assert(array->capacity > 0); - // finalize all array elements - free(array->data); - array->data = nullptr; - array->size = 0; - array->capacity = 0; + assert(sequence->capacity > 0); + // finalize all sequence elements + free(sequence->data); + sequence->data = nullptr; + sequence->size = 0; + sequence->capacity = 0; } else { // ensure that data, size, and capacity values are consistent - assert(0 == array->size); - assert(0 == array->capacity); + assert(0 == sequence->size); + assert(0 == sequence->capacity); } } @@ -231,7 +231,7 @@ void serialize_field( } else if (member->array_size_ && !member->is_upper_bound_) { ser.serializeA(static_cast(field), member->array_size_); } else { - auto & data = *reinterpret_cast::type *>(field); + auto & data = *reinterpret_cast::type *>(field); ser.serializeS(reinterpret_cast(data.data), data.size); } } @@ -264,7 +264,7 @@ void serialize_field( ser.serialize(tmpstring); } } else { - auto & string_array_field = *reinterpret_cast(field); + auto & string_array_field = *reinterpret_cast(field); std::vector cpp_string_vector; for (size_t i = 0; i < string_array_field.size; ++i) { cpp_string_vector.push_back( @@ -300,12 +300,12 @@ size_t get_array_size_and_assign_field( void * & subros_message, size_t, size_t) { - auto tmparray = static_cast(field); - if (member->is_upper_bound_ && tmparray->size > member->array_size_) { + auto tmpsequence = static_cast(field); + if (member->is_upper_bound_ && tmpsequence->size > member->array_size_) { throw std::runtime_error("vector overcomes the maximum length"); } - subros_message = reinterpret_cast(tmparray->data); - return tmparray->size; + subros_message = reinterpret_cast(tmpsequence->data); + return tmpsequence->size; } template @@ -434,10 +434,10 @@ void deserialize_field( } else if (member->array_size_ && !member->is_upper_bound_) { deser.deserializeA(static_cast(field), member->array_size_); } else { - auto & data = *reinterpret_cast::type *>(field); + auto & data = *reinterpret_cast::type *>(field); int32_t dsize = 0; deser >> dsize; - GenericCArray::init(&data, dsize); + GenericCSequence::init(&data, dsize); deser.deserializeA(reinterpret_cast(data.data), dsize); } } @@ -469,8 +469,8 @@ inline void deserialize_field( std::vector cpp_string_vector; deser >> cpp_string_vector; - auto & string_array_field = *reinterpret_cast(field); - if (!rosidl_generator_c__String__Array__init(&string_array_field, cpp_string_vector.size())) { + auto & string_array_field = *reinterpret_cast(field); + if (!rosidl_generator_c__String__Sequence__init(&string_array_field, cpp_string_vector.size())) { throw std::runtime_error("unable to initialize rosidl_generator_c__String array"); } @@ -521,8 +521,8 @@ inline size_t get_submessage_array_deserialize( // Deserialize length uint32_t vsize = 0; deser >> vsize; - auto tmparray = static_cast(field); - rosidl_generator_c__void__Array__init(tmparray, vsize, sub_members_size); + auto tmparray = static_cast(field); + rosidl_generator_c__void__Sequence__init(tmparray, vsize, sub_members_size); subros_message = reinterpret_cast(tmparray->data); return vsize; } diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp index 6c50a92..2d6b621 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp @@ -18,18 +18,18 @@ #include #include -#define SPECIALIZE_GENERIC_C_ARRAY(C_NAME, C_TYPE) \ +#define SPECIALIZE_GENERIC_C_SEQUENCE(C_NAME, C_TYPE) \ template<> \ - struct GenericCArray \ + struct GenericCSequence \ { \ - using type = rosidl_generator_c__ ## C_NAME ## __Array; \ + using type = rosidl_generator_c__ ## C_NAME ## __Sequence; \ \ static void fini(type * array) { \ - rosidl_generator_c__ ## C_NAME ## __Array__fini(array); \ + rosidl_generator_c__ ## C_NAME ## __Sequence__fini(array); \ } \ \ static bool init(type * array, size_t size) { \ - return rosidl_generator_c__ ## C_NAME ## __Array__init(array, size); \ + return rosidl_generator_c__ ## C_NAME ## __Sequence__init(array, size); \ } \ }; diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp new file mode 100644 index 0000000..6fe9ad4 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp @@ -0,0 +1,55 @@ +// Copyright 2019 ADLINK Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef SERDATA_HPP +#define SERDATA_HPP + +#include "dds/ddsi/ddsi_sertopic.h" +#include "dds/ddsi/ddsi_serdata.h" + +struct CddsTypeSupport { + void *type_support_; + const char *typesupport_identifier_; +}; + +struct sertopic_rmw : ddsi_sertopic { + CddsTypeSupport type_support; + bool is_request_header; + std::string cpp_name; + std::string cpp_type_name; + std::string cpp_name_type_name; +}; + +struct serdata_rmw : ddsi_serdata { + /* first two bytes of data is CDR encoding + second two bytes are encoding options */ + std::vector data; +}; + +typedef struct cdds_request_header { + uint64_t guid; + int64_t seq; +} cdds_request_header_t; + +typedef struct cdds_request_wrapper { + cdds_request_header_t header; + void *data; +} cdds_request_wrapper_t; + +void *create_message_type_support (const void *untyped_members, const char *typesupport_identifier); +void *create_request_type_support (const void *untyped_members, const char *typesupport_identifier); +void *create_response_type_support (const void *untyped_members, const char *typesupport_identifier); + +struct sertopic_rmw *create_sertopic (const char *topicname, const char *type_support_identifier, void *type_support, bool is_request_header); + +#endif diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp index efc2b64..0754fb0 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp @@ -20,235 +20,217 @@ #include #include -#include "ddsc/dds.h" - -extern "C" { - struct serstate; - struct serdata; - struct sertopic; - - struct serdata *ddsi_serdata_ref(struct serdata *serdata); - void ddsi_serdata_unref(struct serdata *serdata); - struct serstate *ddsi_serstate_new(const struct sertopic *topic); - struct serdata *ddsi_serstate_fix(struct serstate *st); - void ddsi_serstate_release(struct serstate *st); - void *ddsi_serstate_append(struct serstate *st, size_t n); - void *ddsi_serstate_append_align(struct serstate *st, size_t a); - void *ddsi_serstate_append_aligned(struct serstate *st, size_t n, size_t a); - void ddsi_serdata_getblob(void **raw, size_t *sz, struct serdata *serdata); -} - -template -struct simpletype : std::integral_constant< bool, std::is_arithmetic::value && !std::is_same::value > {}; - -template -struct complextype : std::integral_constant< bool, !(std::is_arithmetic::value && !std::is_same::value) > {}; - -template struct rank : rank {}; -template<> struct rank<0> {}; - class cycser { public: - cycser(struct sertopic *topic); - cycser() = delete; - ~cycser(); - cycser& fix(); - cycser& ref(); - void unref(); - struct serdata *data(); + cycser (std::vector& dst_); + cycser () = delete; - inline cycser& operator<<(bool x) { serialize(x); return *this; } - inline cycser& operator<<(char x) { serialize(x); return *this; } - inline cycser& operator<<(uint8_t x) { serialize(x); return *this; } - inline cycser& operator<<(int16_t x) { serialize(x); return *this; } - inline cycser& operator<<(uint16_t x) { serialize(x); return *this; } - inline cycser& operator<<(int32_t x) { serialize(x); return *this; } - inline cycser& operator<<(uint32_t x) { serialize(x); return *this; } - inline cycser& operator<<(int64_t x) { serialize(x); return *this; } - inline cycser& operator<<(uint64_t x) { serialize(x); return *this; } - inline cycser& operator<<(float x) { serialize(x); return *this; } - inline cycser& operator<<(double x) { serialize(x); return *this; } - inline cycser& operator<<(const char *x) { serialize(x); return *this; } - inline cycser& operator<<(const std::string& x) { serialize(x); return *this; } - template inline cycser& operator<<(const std::vector& x) { serialize(x); return *this; } - template inline cycser& operator<<(const std::array& x) { serialize(x); return *this; } + inline cycser& operator<< (bool x) { serialize (x); return *this; } + inline cycser& operator<< (char x) { serialize (x); return *this; } + inline cycser& operator<< (uint8_t x) { serialize (x); return *this; } + inline cycser& operator<< (int16_t x) { serialize (x); return *this; } + inline cycser& operator<< (uint16_t x) { serialize (x); return *this; } + inline cycser& operator<< (int32_t x) { serialize (x); return *this; } + inline cycser& operator<< (uint32_t x) { serialize (x); return *this; } + inline cycser& operator<< (int64_t x) { serialize (x); return *this; } + inline cycser& operator<< (uint64_t x) { serialize (x); return *this; } + inline cycser& operator<< (float x) { serialize (x); return *this; } + inline cycser& operator<< (double x) { serialize (x); return *this; } + inline cycser& operator<< (const char *x) { serialize (x); return *this; } + inline cycser& operator<< (const std::string& x) { serialize (x); return *this; } + template inline cycser& operator<< (const std::vector& x) { serialize (x); return *this; } + template inline cycser& operator<< (const std::array& x) { serialize (x); return *this; } -#define SIMPLE(T) inline void serialize(T x) { \ - T *p = (T *)ddsi_serstate_append_aligned(st, sizeof(T), sizeof(T)); \ - *p = x; \ - } - SIMPLE(char); - SIMPLE(unsigned char); - SIMPLE(int16_t); - SIMPLE(uint16_t); - SIMPLE(int32_t); - SIMPLE(uint32_t); - SIMPLE(int64_t); - SIMPLE(uint64_t); - SIMPLE(float); - SIMPLE(double); +#define SIMPLE(T) inline void serialize (T x) { \ + if ((off % sizeof (T)) != 0) { \ + off += sizeof (T) - (off % sizeof (T)); \ + } \ + reserve (off + sizeof (T)); \ + *(T *) (data () + off) = x; \ + off += sizeof (T); \ + } + SIMPLE (char); + SIMPLE (unsigned char); + SIMPLE (int16_t); + SIMPLE (uint16_t); + SIMPLE (int32_t); + SIMPLE (uint32_t); + SIMPLE (int64_t); + SIMPLE (uint64_t); + SIMPLE (float); + SIMPLE (double); #undef SIMPLE - inline void serialize(bool x) { - serialize(static_cast(x)); - } - inline void serialize(const char *x) - { - size_t sz = strlen(x) + 1; - serialize(static_cast(sz)); - char *p = (char *)ddsi_serstate_append(st, sz); - memcpy(p, x, sz); - } - inline void serialize(const std::string& x) - { - size_t sz = x.size(); - serialize(static_cast(sz+1)); - char *p = (char *)ddsi_serstate_append(st, sz+1); - memcpy(p, x.data(), sz); - p[sz] = 0; - } + inline void serialize (bool x) { + serialize (static_cast (x)); + } + inline void serialize (const char *x) + { + size_t sz = strlen (x) + 1; + serialize (static_cast (sz)); + reserve (off + sz); + memcpy (data () + off, x, sz); + off += sz; + } + inline void serialize (const std::string& x) + { + size_t sz = x.size () + 1; + serialize (static_cast (sz)); + reserve (off + sz); + memcpy (data () + off, x.data (), sz - 1); + *(data () + off + sz - 1) = 0; + } -#define SIMPLEA(T) inline void serializeA(const T *x, size_t cnt) { \ - void *p = (void *)ddsi_serstate_append_aligned(st, (cnt) * sizeof(T), sizeof(T)); \ - memcpy(p, (void *)x, (cnt) * sizeof(T)); \ - } - SIMPLEA(char); - SIMPLEA(unsigned char); - SIMPLEA(int16_t); - SIMPLEA(uint16_t); - SIMPLEA(int32_t); - SIMPLEA(uint32_t); - SIMPLEA(int64_t); - SIMPLEA(uint64_t); - SIMPLEA(float); - SIMPLEA(double); +#define SIMPLEA(T) inline void serializeA (const T *x, size_t cnt) { \ + if ((off % sizeof (T)) != 0) { \ + off += sizeof (T) - (off % sizeof (T)); \ + } \ + reserve (off + cnt * sizeof (T)); \ + memcpy (data () + off, (void *) x, cnt * sizeof (T)); \ + off += cnt * sizeof (T); \ + } + SIMPLEA (char); + SIMPLEA (unsigned char); + SIMPLEA (int16_t); + SIMPLEA (uint16_t); + SIMPLEA (int32_t); + SIMPLEA (uint32_t); + SIMPLEA (int64_t); + SIMPLEA (uint64_t); + SIMPLEA (float); + SIMPLEA (double); #undef SIMPLEA - template inline void serializeA(const T *x, size_t cnt) { - for (size_t i = 0; i < cnt; i++) serialize(x[i]); - } + template inline void serializeA (const T *x, size_t cnt) { + for (size_t i = 0; i < cnt; i++) serialize (x[i]); + } - template inline void serialize(const std::vector& x) - { - serialize(static_cast(x.size())); - if (x.size() > 0) serializeA(x.data(), x.size()); - } - inline void serialize(const std::vector& x) { - serialize(static_cast(x.size())); - for (auto&& i : x) serialize(i); - } + template inline void serialize (const std::vector& x) + { + serialize (static_cast (x.size ())); + if (x.size () > 0) serializeA (x.data (), x.size ()); + } + inline void serialize (const std::vector& x) { + serialize (static_cast (x.size ())); + for (auto&& i : x) serialize (i); + } - template inline void serializeS(const T *x, size_t cnt) - { - serialize(static_cast(cnt)); - if (cnt > 0) serializeA(x, cnt); - } + template inline void serializeS (const T *x, size_t cnt) + { + serialize (static_cast (cnt)); + if (cnt > 0) serializeA (x, cnt); + } private: - struct serstate *st; - struct serdata *sd; + inline void reserve (size_t n) { dst.reserve (n + 4); } + inline unsigned char *data () { return dst.data () + 4; } + + std::vector& dst; + size_t off; }; class cycdeser { public: - // FIXME: byteswapping - cycdeser(const void *data, size_t size); - cycdeser() = delete; - ~cycdeser(); + // FIXME: byteswapping + cycdeser (const void *data, size_t size); + cycdeser () = delete; - inline cycdeser& operator>>(bool& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(char& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint8_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(int16_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint16_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(int32_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint32_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(int64_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint64_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(float& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(double& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(char *& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(std::string& x) { deserialize(x); return *this; } - template inline cycdeser& operator>>(std::vector& x) { deserialize(x); return *this; } - template inline cycdeser& operator>>(std::array& x) { deserialize(x); return *this; } + inline cycdeser& operator>> (bool& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (char& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint8_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (int16_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint16_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (int32_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint32_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (int64_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint64_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (float& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (double& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (char *& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (std::string& x) { deserialize (x); return *this; } + template inline cycdeser& operator>> (std::vector& x) { deserialize (x); return *this; } + template inline cycdeser& operator>> (std::array& x) { deserialize (x); return *this; } -#define SIMPLE(T) inline void deserialize(T& x) { \ - align(sizeof(x)); \ - x = *reinterpret_cast(data); \ - data += sizeof(x); \ - } - SIMPLE(char); - SIMPLE(unsigned char); - SIMPLE(int16_t); - SIMPLE(uint16_t); - SIMPLE(int32_t); - SIMPLE(uint32_t); - SIMPLE(int64_t); - SIMPLE(uint64_t); - SIMPLE(float); - SIMPLE(double); +#define SIMPLE(T) inline void deserialize (T& x) { \ + align (sizeof (x)); \ + x = *reinterpret_cast (data + pos); \ + pos += sizeof (x); \ + } + SIMPLE (char); + SIMPLE (unsigned char); + SIMPLE (int16_t); + SIMPLE (uint16_t); + SIMPLE (int32_t); + SIMPLE (uint32_t); + SIMPLE (int64_t); + SIMPLE (uint64_t); + SIMPLE (float); + SIMPLE (double); #undef SIMPLE - inline void deserialize(bool& x) { - unsigned char z; deserialize(z); x = (z != 0); - } - inline uint32_t deserialize32() { - uint32_t sz; deserialize(sz); return sz; - } - inline void deserialize(char *& x) { - const uint32_t sz = deserialize32(); x = (char *)malloc(sz); memcpy(x, data, sz); data += sz; - } - inline void deserialize(std::string& x) { - const uint32_t sz = deserialize32(); x = std::string(data, sz-1); data += sz; - } + inline void deserialize (bool& x) { + unsigned char z; deserialize (z); x = (z != 0); + } + inline uint32_t deserialize32 () { + uint32_t sz; deserialize (sz); return sz; + } + inline void deserialize (char *& x) { + const uint32_t sz = deserialize32 (); + x = (char *) malloc (sz); + memcpy (x, data + pos, sz); + pos += sz; + } + inline void deserialize (std::string& x) { + const uint32_t sz = deserialize32 (); + x = std::string (data + pos, sz-1); + pos += sz; + } -#define SIMPLEA(T) inline void deserializeA(T *x, size_t cnt) { \ - align(sizeof(T)); \ - memcpy((void *)x, (void *)data, (cnt) * sizeof(T)); \ - data += (cnt) * sizeof(T); \ - } - SIMPLEA(char); - SIMPLEA(unsigned char); - SIMPLEA(int16_t); - SIMPLEA(uint16_t); - SIMPLEA(int32_t); - SIMPLEA(uint32_t); - SIMPLEA(int64_t); - SIMPLEA(uint64_t); - SIMPLEA(float); - SIMPLEA(double); +#define SIMPLEA(T) inline void deserializeA (T *x, size_t cnt) { \ + align (sizeof (T)); \ + memcpy ((void *) x, (void *) (data + pos), (cnt) * sizeof (T)); \ + pos += (cnt) * sizeof (T); \ + } + SIMPLEA (char); + SIMPLEA (unsigned char); + SIMPLEA (int16_t); + SIMPLEA (uint16_t); + SIMPLEA (int32_t); + SIMPLEA (uint32_t); + SIMPLEA (int64_t); + SIMPLEA (uint64_t); + SIMPLEA (float); + SIMPLEA (double); #undef SIMPLEA - template inline void deserializeA(T *x, size_t cnt) { - for (size_t i = 0; i < cnt; i++) deserialize(x[i]); - } + template inline void deserializeA (T *x, size_t cnt) { + for (size_t i = 0; i < cnt; i++) deserialize (x[i]); + } - template inline void deserialize(std::vector& x) { - const uint32_t sz = deserialize32(); - x.resize(sz); - deserializeA(x.data(), sz); - } - inline void deserialize(std::vector& x) { - const uint32_t sz = deserialize32(); - x.resize(sz); - for (auto&& i : x) i = (data[i] != 0); - data += sz; - } - template inline void deserialize(std::array& x) { - deserializeA(x.data(), x.size()); - } + template inline void deserialize (std::vector& x) { + const uint32_t sz = deserialize32 (); + x.resize (sz); + deserializeA (x.data (), sz); + } + inline void deserialize (std::vector& x) { + const uint32_t sz = deserialize32 (); + x.resize (sz); + for (auto&& i : x) i = ((data + pos)[i] != 0); + pos += sz; + } + template inline void deserialize (std::array& x) { + deserializeA (x.data (), x.size ()); + } private: - inline void align(size_t a) - { - const uintptr_t x = reinterpret_cast(data); - if ((x % a) != 0) { - const uintptr_t pad = a - (x % a); - data = reinterpret_cast(x + pad); + inline void align (size_t a) + { + if ((pos % a) != 0) { + pos += a - (pos % a); + } } - } - const char *data; - size_t pos; - size_t lim; // ignored for now ... better provide correct input + const char *data; + size_t pos; + size_t lim; // ignored for now ... better provide correct input }; #endif diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 6dd8b7d..31426ac 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -22,45 +22,22 @@ - topic creation: shouldn't leak topics - - hash set of writer handles should be thread safe: no guarantee that no writers get - added/deleted in parallel to each other or to takes - - - introspection stuff not done yet (probably requires additions to cdds) + - introspection stuff not done yet - check / make sure a node remains valid while one of its subscriptions exists - - writecdr/takecdr interface abuse is beyond redemption - - service/client simply use the instance handle of its publisher as its GUID -- yikes! but it is actually only kinda wrong because the instance handles allocated by different instance of cdds are actually taken from uncorrelated (close to uncorrelated anyway) permutations of 64-bit unsigned integers - - ... and many more things ... + - ... and probably many more things ... */ -#include -#include -#include -#include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include #include -#include "rcutils/allocator.h" -#include "rcutils/filesystem.h" #include "rcutils/logging_macros.h" -#include "rcutils/strdup.h" -#include "rcutils/types.h" #include "rmw/allocators.h" #include "rmw/convert_rcutils_ret_to_rmw_ret.h" @@ -71,71 +48,60 @@ #include "rmw/impl/cpp/macros.hpp" -#include "namespace_prefix.hpp" - #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" #include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" -#include "ddsc/dds.h" +#include "namespace_prefix.hpp" -#include "rmw_cyclonedds_topic.h" +#include "dds/dds.h" +#include "dds/ddsi/ddsi_sertopic.h" #include "rmw_cyclonedds_cpp/serdes.hpp" +#include "rmw_cyclonedds_cpp/serdata.hpp" -#define RET_ERR_X(msg, code) do { RMW_SET_ERROR_MSG(msg); code; } while(0) -#define RET_NULL_X(var, code) do { if (!var) RET_ERR_X(#var " is null", code); } while(0) -#define RET_ALLOC_X(var, code) do { if (!var) RET_ERR_X("failed to allocate " #var, code); } while(0) +#define RET_ERR_X(msg, code) do { RMW_SET_ERROR_MSG (msg); code; } while (0) +#define RET_NULL_X(var, code) do { if (!var) RET_ERR_X (#var " is null", code); } while (0) +#define RET_ALLOC_X(var, code) do { if (!var) RET_ERR_X ("failed to allocate " #var, code); } while (0) #define RET_WRONG_IMPLID_X(var, code) do { \ - RET_NULL_X(var, code); \ + RET_NULL_X (var, code); \ if ((var)->implementation_identifier != adlink_cyclonedds_identifier) { \ - RET_ERR_X(#var " not from this implementation", code); \ + RET_ERR_X (#var " not from this implementation", code); \ } \ - } while(0) + } while (0) #define RET_NULL_OR_EMPTYSTR_X(var, code) do { \ - if(!var || strlen(var) == 0) { \ - RET_ERR_X(#var " is null or empty string", code); \ + if (!var || strlen (var) == 0) { \ + RET_ERR_X (#var " is null or empty string", code); \ } \ - } while(0) -#define RET_ERR(msg) RET_ERR_X(msg, return RMW_RET_ERROR) -#define RET_NULL(var) RET_NULL_X(var, return RMW_RET_ERROR) -#define RET_ALLOC(var) RET_ALLOC_X(var, return RMW_RET_ERROR) -#define RET_WRONG_IMPLID(var) RET_WRONG_IMPLID_X(var, return RMW_RET_ERROR) -#define RET_NULL_OR_EMPTYSTR(var) RET_NULL_OR_EMPTYSTR_X(var, return RMW_RET_ERROR) + } while (0) +#define RET_ERR(msg) RET_ERR_X (msg, return RMW_RET_ERROR) +#define RET_NULL(var) RET_NULL_X (var, return RMW_RET_ERROR) +#define RET_ALLOC(var) RET_ALLOC_X (var, return RMW_RET_ERROR) +#define RET_WRONG_IMPLID(var) RET_WRONG_IMPLID_X (var, return RMW_RET_ERROR) +#define RET_NULL_OR_EMPTYSTR(var) RET_NULL_OR_EMPTYSTR_X (var, return RMW_RET_ERROR) const char *const adlink_cyclonedds_identifier = "rmw_cyclonedds_cpp"; -struct CddsTypeSupport { - void *type_support_; - const char *typesupport_identifier_; -}; - /* instance handles are unsigned 64-bit integers carefully constructed to be as close to uniformly distributed as possible for no other reason than making them near-perfect hash keys, hence we can improve over the default hash function */ struct dds_instance_handle_hash { public: - std::size_t operator()(dds_instance_handle_t const& x) const noexcept { - return static_cast(x); + std::size_t operator () (dds_instance_handle_t const& x) const noexcept { + return static_cast (x); } }; struct CddsNode { rmw_guard_condition_t *graph_guard_condition; - std::unordered_set own_writers; }; struct CddsPublisher { dds_entity_t pubh; dds_instance_handle_t pubiid; - struct sertopic *sertopic; - CddsTypeSupport ts; }; struct CddsSubscription { dds_entity_t subh; dds_entity_t rdcondh; - CddsNode *node; - bool ignore_local_publications; - CddsTypeSupport ts; }; struct CddsCS { @@ -173,154 +139,31 @@ struct Cdds { uint32_t refcount; dds_entity_t ppant; std::unordered_set waitsets; - Cdds() : refcount(0), ppant(0) {} + Cdds () : refcount (0), ppant (0) {} }; -typedef struct cdds_request_header { - uint64_t guid; - int64_t seq; -} cdds_request_header_t; - static Cdds gcdds; -using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; -using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; -using RequestTypeSupport_c = rmw_cyclonedds_cpp::RequestTypeSupport; -using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport; -using ResponseTypeSupport_c = rmw_cyclonedds_cpp::ResponseTypeSupport; -using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport; +static void clean_waitset_caches (); -extern "C" { - struct dds_entity; - struct dds_domain; - int32_t dds_entity_lock(dds_entity_t hdl, dds_entity_kind_t kind, struct dds_entity **e); - void dds_entity_unlock(struct dds_entity *e); - struct sertopic *dds_topic_lookup (struct dds_domain *domain, const char *name); - dds_domain *dds__entity_domain(dds_entity* e); - void sertopic_free (struct sertopic * tp); -} +#pragma GCC visibility push (default) -static void clean_waitset_caches(); - -static struct sertopic *get_sertopic(dds_entity_t topic, const std::string& name) -{ - struct dds_entity *x; - struct sertopic *sertopic; - if (dds_entity_lock(topic, DDS_KIND_TOPIC, &x) < 0) { - abort(); - } - sertopic = dds_topic_lookup(dds__entity_domain(x), name.c_str()); - dds_entity_unlock(x); - return sertopic; -} - -static bool using_introspection_c_typesupport(const char *typesupport_identifier) -{ - return typesupport_identifier == rosidl_typesupport_introspection_c__identifier; -} - -static bool using_introspection_cpp_typesupport(const char *typesupport_identifier) -{ - return typesupport_identifier == rosidl_typesupport_introspection_cpp::typesupport_identifier; -} - -static void *create_message_type_support(const void *untyped_members, const char *typesupport_identifier) -{ - if (using_introspection_c_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new MessageTypeSupport_c(members); - } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new MessageTypeSupport_cpp(members); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return nullptr; -} - -static void *create_request_type_support(const void *untyped_members, const char *typesupport_identifier) -{ - if (using_introspection_c_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new RequestTypeSupport_c(members); - } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new RequestTypeSupport_cpp(members); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return nullptr; -} - -static void *create_response_type_support(const void *untyped_members, const char *typesupport_identifier) -{ - if (using_introspection_c_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new ResponseTypeSupport_c(members); - } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new ResponseTypeSupport_cpp(members); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return nullptr; -} - -template const void *get_request_ptr(const void *untyped_service_members) -{ - auto service_members = static_cast(untyped_service_members); - RET_NULL_X(service_members, return nullptr); - return service_members->request_members_; -} - -template const void *get_response_ptr(const void *untyped_service_members) -{ - auto service_members = static_cast(untyped_service_members); - RET_NULL_X(service_members, return nullptr); - return service_members->response_members_; -} - -static bool sermsg(const void *ros_message, cycser& ser, std::function prefix, const CddsTypeSupport& ts) -{ - if (using_introspection_c_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->serializeROSmessage(ros_message, ser, prefix); - } else if (using_introspection_cpp_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->serializeROSmessage(ros_message, ser, prefix); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return false; -} - -static bool desermsg(cycdeser& deser, void *ros_message, std::function prefix, const CddsTypeSupport& ts) -{ - if (using_introspection_c_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->deserializeROSmessage(deser, ros_message, prefix); - } else if (using_introspection_cpp_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->deserializeROSmessage(deser, ros_message, prefix); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return false; -} - -#pragma GCC visibility push(default) - -extern "C" const char *rmw_get_implementation_identifier() +extern "C" const char *rmw_get_implementation_identifier () { return adlink_cyclonedds_identifier; } -extern "C" rmw_ret_t rmw_init() +extern "C" rmw_ret_t rmw_init (const rmw_init_options_t *options __attribute__ ((unused)), rmw_context_t *context __attribute__ ((unused))) { return RMW_RET_OK; } -static dds_entity_t ref_ppant() +static dds_entity_t ref_ppant () { - std::lock_guard lock(gcdds.lock); + std::lock_guard lock (gcdds.lock); if (gcdds.refcount == 0) { - if ((gcdds.ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create participant"); + if ((gcdds.ppant = dds_create_participant (DDS_DOMAIN_DEFAULT, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create participant"); return gcdds.ppant; } } @@ -328,11 +171,11 @@ static dds_entity_t ref_ppant() return gcdds.ppant; } -static void unref_ppant() +static void unref_ppant () { - std::lock_guard lock(gcdds.lock); + std::lock_guard lock (gcdds.lock); if (--gcdds.refcount == 0) { - dds_delete(gcdds.ppant); + dds_delete (gcdds.ppant); gcdds.ppant = 0; } } @@ -343,82 +186,82 @@ static void unref_ppant() /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options) +extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((unused)), const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options) { - RET_NULL_X(name, return nullptr); - RET_NULL_X(namespace_, return nullptr); - (void)domain_id; - (void)security_options; + RET_NULL_X (name, return nullptr); + RET_NULL_X (namespace_, return nullptr); + (void) domain_id; + (void) security_options; - dds_entity_t pp = ref_ppant(); + dds_entity_t pp = ref_ppant (); if (pp < 0) { return nullptr; } - auto *node_impl = new CddsNode(); + auto *node_impl = new CddsNode (); rmw_node_t *node_handle = nullptr; - RET_ALLOC_X(node_impl, goto fail_node_impl); + RET_ALLOC_X (node_impl, goto fail_node_impl); rmw_guard_condition_t *graph_guard_condition; - if (!(graph_guard_condition = rmw_create_guard_condition())) { + if (!(graph_guard_condition = rmw_create_guard_condition (context))) { goto fail_ggc; } node_impl->graph_guard_condition = graph_guard_condition; - node_handle = rmw_node_allocate(); - RET_ALLOC_X(node_handle, goto fail_node_handle); + node_handle = rmw_node_allocate (); + RET_ALLOC_X (node_handle, goto fail_node_handle); node_handle->implementation_identifier = adlink_cyclonedds_identifier; node_handle->data = node_impl; - node_handle->name = static_cast(rmw_allocate(sizeof(char) * strlen(name) + 1)); - RET_ALLOC_X(node_handle->name, goto fail_node_handle_name); - memcpy(const_cast(node_handle->name), name, strlen(name) + 1); + node_handle->name = static_cast (rmw_allocate (sizeof (char) * strlen (name) + 1)); + RET_ALLOC_X (node_handle->name, goto fail_node_handle_name); + memcpy (const_cast (node_handle->name), name, strlen (name) + 1); - node_handle->namespace_ = static_cast(rmw_allocate(sizeof(char) * strlen(namespace_) + 1)); - RET_ALLOC_X(node_handle->namespace_, goto fail_node_handle_namespace); - memcpy(const_cast(node_handle->namespace_), namespace_, strlen(namespace_) + 1); + node_handle->namespace_ = static_cast (rmw_allocate (sizeof (char) * strlen (namespace_) + 1)); + RET_ALLOC_X (node_handle->namespace_, goto fail_node_handle_namespace); + memcpy (const_cast (node_handle->namespace_), namespace_, strlen (namespace_) + 1); return node_handle; #if 0 fail_add_node: - rmw_free(const_cast(node_handle->namespace_)); + rmw_free (const_cast (node_handle->namespace_)); #endif fail_node_handle_namespace: - rmw_free(const_cast(node_handle->name)); + rmw_free (const_cast (node_handle->name)); fail_node_handle_name: - rmw_node_free(node_handle); + rmw_node_free (node_handle); fail_node_handle: - if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); + if (RMW_RET_OK != rmw_destroy_guard_condition (graph_guard_condition)) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); } fail_ggc: delete node_impl; fail_node_impl: - unref_ppant(); + unref_ppant (); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t *node) +extern "C" rmw_ret_t rmw_destroy_node (rmw_node_t *node) { rmw_ret_t result_ret = RMW_RET_OK; - RET_WRONG_IMPLID(node); - auto node_impl = static_cast(node->data); - RET_NULL(node_impl); - rmw_free(const_cast(node->name)); - rmw_free(const_cast(node->namespace_)); - rmw_node_free(node); - if (RMW_RET_OK != rmw_destroy_guard_condition(node_impl->graph_guard_condition)) { - RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); + RET_WRONG_IMPLID (node); + auto node_impl = static_cast (node->data); + RET_NULL (node_impl); + rmw_free (const_cast (node->name)); + rmw_free (const_cast (node->namespace_)); + rmw_node_free (node); + if (RMW_RET_OK != rmw_destroy_guard_condition (node_impl->graph_guard_condition)) { + RMW_SET_ERROR_MSG ("failed to destroy graph guard condition"); result_ret = RMW_RET_ERROR; } delete node_impl; - unref_ppant(); + unref_ppant (); return result_ret; } -extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition(const rmw_node_t *node) +extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition (const rmw_node_t *node) { - RET_WRONG_IMPLID_X(node, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); + RET_WRONG_IMPLID_X (node, return nullptr); + auto node_impl = static_cast (node->data); + RET_NULL_X (node_impl, return nullptr); return node_impl->graph_guard_condition; } @@ -428,222 +271,205 @@ extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition(const /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" { - int dds_writecdr(dds_entity_t writer, struct serdata *serdata); -} - -static rmw_ret_t rmw_write_ser(dds_entity_t pubh, cycser& sd) +extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *ros_message) { - if (dds_writecdr(pubh, sd.fix().ref().data()) >= 0) { + RET_WRONG_IMPLID (publisher); + RET_NULL (ros_message); + auto pub = static_cast (publisher->data); + assert (pub); + if (dds_write (pub->pubh, ros_message) >= 0) { return RMW_RET_OK; } else { /* FIXME: what is the expected behavior when it times out? */ - RMW_SET_ERROR_MSG("cannot publish data"); + RMW_SET_ERROR_MSG ("cannot publish data"); //return RMW_RET_ERROR; return RMW_RET_OK; } } -extern "C" rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message) -{ - RET_WRONG_IMPLID(publisher); - RET_NULL(ros_message); - auto pub = static_cast(publisher->data); - assert(pub); - cycser sd(pub->sertopic); - if (sermsg(ros_message, sd, nullptr, pub->ts)) { - return rmw_write_ser(pub->pubh, sd); - } else { - RMW_SET_ERROR_MSG("cannot serialize data"); - return RMW_RET_ERROR; - } -} - -static const rosidl_message_type_support_t *get_typesupport(const rosidl_message_type_support_t *type_supports) +static const rosidl_message_type_support_t *get_typesupport (const rosidl_message_type_support_t *type_supports) { const rosidl_message_type_support_t *ts; - if ((ts = get_message_typesupport_handle(type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + if ((ts = get_message_typesupport_handle (type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { return ts; - } else if ((ts = get_message_typesupport_handle(type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + } else if ((ts = get_message_typesupport_handle (type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { return ts; } else { - RMW_SET_ERROR_MSG("type support not from this implementation"); + RMW_SET_ERROR_MSG ("type support not from this implementation"); return nullptr; } } -static std::string make_fqtopic(const char *prefix, const char *topic_name, const char *suffix, bool avoid_ros_namespace_conventions) +static std::string make_fqtopic (const char *prefix, const char *topic_name, const char *suffix, bool avoid_ros_namespace_conventions) { if (avoid_ros_namespace_conventions) { - return std::string(topic_name) + "__" + std::string(suffix); + return std::string (topic_name) + "__" + std::string (suffix); } else { - return std::string(prefix) + "/" + make_fqtopic(prefix, topic_name, suffix, true); + return std::string (prefix) + "/" + make_fqtopic (prefix, topic_name, suffix, true); } } -static std::string make_fqtopic(const char *prefix, const char *topic_name, const char *suffix, const rmw_qos_profile_t *qos_policies) +static std::string make_fqtopic (const char *prefix, const char *topic_name, const char *suffix, const rmw_qos_profile_t *qos_policies) { - return make_fqtopic(prefix, topic_name, suffix, qos_policies->avoid_ros_namespace_conventions); + return make_fqtopic (prefix, topic_name, suffix, qos_policies->avoid_ros_namespace_conventions); } -static dds_qos_t *create_readwrite_qos(const rmw_qos_profile_t *qos_policies) +static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { - dds_qos_t *qos = dds_qos_create(); + dds_qos_t *qos = dds_create_qos (); switch (qos_policies->history) { case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_HISTORY_KEEP_LAST: if (qos_policies->depth > INT32_MAX) { - RMW_SET_ERROR_MSG("unsupported history depth"); - dds_qos_delete(qos); + RMW_SET_ERROR_MSG ("unsupported history depth"); + dds_delete_qos (qos); return nullptr; } - dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, static_cast(qos_policies->depth)); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, static_cast (qos_policies->depth)); break; case RMW_QOS_POLICY_HISTORY_KEEP_ALL: - dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); break; } switch (qos_policies->reliability) { case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_RELIABILITY_RELIABLE: - dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1)); break; case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: - dds_qset_reliability(qos, DDS_RELIABILITY_BEST_EFFORT, 0); + dds_qset_reliability (qos, DDS_RELIABILITY_BEST_EFFORT, 0); break; } switch (qos_policies->durability) { case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_DURABILITY_VOLATILE: - dds_qset_durability(qos, DDS_DURABILITY_VOLATILE); + dds_qset_durability (qos, DDS_DURABILITY_VOLATILE); break; case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: - dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL); break; } + if (ignore_local_publications) { + dds_qset_ignorelocal (qos, DDS_IGNORELOCAL_PARTICIPANT); + } return qos; } -static CddsPublisher *create_cdds_publisher(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) +static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) { - RET_WRONG_IMPLID_X(node, return nullptr); - RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); - RET_NULL_X(qos_policies, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); - const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); - RET_NULL_X(type_support, return nullptr); - CddsPublisher *pub = new CddsPublisher(); + RET_WRONG_IMPLID_X (node, return nullptr); + RET_NULL_OR_EMPTYSTR_X (topic_name, return nullptr); + RET_NULL_X (qos_policies, return nullptr); + auto node_impl = static_cast (node->data); + RET_NULL_X (node_impl, return nullptr); + const rosidl_message_type_support_t *type_support = get_typesupport (type_supports); + RET_NULL_X (type_support, return nullptr); + CddsPublisher *pub = new CddsPublisher (); dds_entity_t topic; dds_qos_t *qos; - pub->ts.typesupport_identifier_ = type_support->typesupport_identifier; - pub->ts.type_support_ = create_message_type_support(type_support->data, pub->ts.typesupport_identifier_); - std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); + std::string fqtopic_name = make_fqtopic (ros_topic_prefix, topic_name, "", qos_policies); - if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); - goto fail_topic; + auto sertopic = create_sertopic (topic_name, type_support->typesupport_identifier, create_message_type_support (type_support->data, type_support->typesupport_identifier), false); + if ((topic = dds_create_topic_arbitrary (gcdds.ppant, sertopic, fqtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); + goto fail_topic; } - if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { + if ((qos = create_readwrite_qos (qos_policies, false)) == nullptr) { goto fail_qos; } - if ((pub->pubh = dds_create_writer(gcdds.ppant, topic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create writer"); + if ((pub->pubh = dds_create_writer (gcdds.ppant, topic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create writer"); goto fail_writer; } - pub->sertopic = get_sertopic(topic, fqtopic_name); - if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { - RMW_SET_ERROR_MSG("failed to get instance handle for writer"); + if (dds_get_instance_handle (pub->pubh, &pub->pubiid) < 0) { + RMW_SET_ERROR_MSG ("failed to get instance handle for writer"); goto fail_instance_handle; } - dds_qos_delete(qos); - node_impl->own_writers.insert(pub->pubiid); + dds_delete_qos (qos); + ddsi_sertopic_unref (sertopic); /* FIXME: leak the topic for now */ return pub; fail_instance_handle: - if (dds_delete(pub->pubh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); + if (dds_delete (pub->pubh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); } fail_writer: - dds_qos_delete(qos); + dds_delete_qos (qos); fail_qos: /* not deleting topic -- have to sort out proper topic handling & that requires fixing a few things in cyclone as well */ fail_topic: delete pub; + ddsi_sertopic_unref (sertopic); return nullptr; } -extern "C" rmw_publisher_t *rmw_create_publisher(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) +extern "C" rmw_publisher_t *rmw_create_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) { CddsPublisher *pub; rmw_publisher_t *rmw_publisher; - auto node_impl = static_cast(node->data); - if ((pub = create_cdds_publisher(node, type_supports, topic_name, qos_policies)) == nullptr) { + if ((pub = create_cdds_publisher (node, type_supports, topic_name, qos_policies)) == nullptr) { goto fail_common_init; } - rmw_publisher = rmw_publisher_allocate(); - RET_ALLOC_X(rmw_publisher, goto fail_publisher); + rmw_publisher = rmw_publisher_allocate (); + RET_ALLOC_X (rmw_publisher, goto fail_publisher); rmw_publisher->implementation_identifier = adlink_cyclonedds_identifier; rmw_publisher->data = pub; - rmw_publisher->topic_name = reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); - RET_ALLOC_X(rmw_publisher->topic_name, goto fail_topic_name); - memcpy(const_cast(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1); + rmw_publisher->topic_name = reinterpret_cast (rmw_allocate (strlen (topic_name) + 1)); + RET_ALLOC_X (rmw_publisher->topic_name, goto fail_topic_name); + memcpy (const_cast (rmw_publisher->topic_name), topic_name, strlen (topic_name) + 1); return rmw_publisher; fail_topic_name: - rmw_publisher_free(rmw_publisher); + rmw_publisher_free (rmw_publisher); fail_publisher: - node_impl->own_writers.erase(pub->pubiid); - if (dds_delete(pub->pubh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); + if (dds_delete (pub->pubh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); } delete pub; fail_common_init: return nullptr; } -extern "C" rmw_ret_t rmw_get_gid_for_publisher(const rmw_publisher_t *publisher, rmw_gid_t *gid) +extern "C" rmw_ret_t rmw_get_gid_for_publisher (const rmw_publisher_t *publisher, rmw_gid_t *gid) { - RET_WRONG_IMPLID(publisher); - RET_NULL(gid); - auto pub = static_cast(publisher->data); - RET_NULL(pub); + RET_WRONG_IMPLID (publisher); + RET_NULL (gid); + auto pub = static_cast (publisher->data); + RET_NULL (pub); gid->implementation_identifier = adlink_cyclonedds_identifier; - memset(gid->data, 0, sizeof(gid->data)); - assert(sizeof(pub->pubiid) <= sizeof(gid->data)); - memcpy(gid->data, &pub->pubiid, sizeof(pub->pubiid)); + memset (gid->data, 0, sizeof (gid->data)); + assert (sizeof (pub->pubiid) <= sizeof (gid->data)); + memcpy (gid->data, &pub->pubiid, sizeof (pub->pubiid)); return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_compare_gids_equal(const rmw_gid_t *gid1, const rmw_gid_t *gid2, bool *result) +extern "C" rmw_ret_t rmw_compare_gids_equal (const rmw_gid_t *gid1, const rmw_gid_t *gid2, bool *result) { - RET_WRONG_IMPLID(gid1); - RET_WRONG_IMPLID(gid2); - RET_NULL(result); + RET_WRONG_IMPLID (gid1); + RET_WRONG_IMPLID (gid2); + RET_NULL (result); /* alignment is potentially lost because of the translation to an array of bytes, so use memcmp instead of a simple integer comparison */ - *result = memcmp(gid1->data, gid2->data, sizeof(gid1->data)) == 0; + *result = memcmp (gid1->data, gid2->data, sizeof (gid1->data)) == 0; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t *node, rmw_publisher_t *publisher) +extern "C" rmw_ret_t rmw_destroy_publisher (rmw_node_t *node, rmw_publisher_t *publisher) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(publisher); - auto node_impl = static_cast(node->data); - auto pub = static_cast(publisher->data); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (publisher); + auto pub = static_cast (publisher->data); if (pub != nullptr) { - node_impl->own_writers.erase(pub->pubiid); - if (dds_delete(pub->pubh) < 0) { - RMW_SET_ERROR_MSG("failed to delete writer"); + if (dds_delete (pub->pubh) < 0) { + RMW_SET_ERROR_MSG ("failed to delete writer"); } delete pub; } - rmw_free(const_cast(publisher->topic_name)); + rmw_free (const_cast (publisher->topic_name)); publisher->topic_name = nullptr; - rmw_publisher_free(publisher); + rmw_publisher_free (publisher); return RMW_RET_OK; } @@ -653,147 +479,136 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t *node, rmw_publisher_t *pu /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) +static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { - RET_WRONG_IMPLID_X(node, return nullptr); - RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); - RET_NULL_X(qos_policies, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); - const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); - RET_NULL_X(type_support, return nullptr); - CddsSubscription *sub = new CddsSubscription(); + RET_WRONG_IMPLID_X (node, return nullptr); + RET_NULL_OR_EMPTYSTR_X (topic_name, return nullptr); + RET_NULL_X (qos_policies, return nullptr); + auto node_impl = static_cast (node->data); + RET_NULL_X (node_impl, return nullptr); + const rosidl_message_type_support_t *type_support = get_typesupport (type_supports); + RET_NULL_X (type_support, return nullptr); + CddsSubscription *sub = new CddsSubscription (); dds_entity_t topic; dds_qos_t *qos; - sub->node = node_impl; - sub->ignore_local_publications = ignore_local_publications; - sub->ts.typesupport_identifier_ = type_support->typesupport_identifier; - sub->ts.type_support_ = create_message_type_support(type_support->data, sub->ts.typesupport_identifier_); - std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); + std::string fqtopic_name = make_fqtopic (ros_topic_prefix, topic_name, "", qos_policies); - if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); + auto sertopic = create_sertopic (topic_name, type_support->typesupport_identifier, create_message_type_support (type_support->data, type_support->typesupport_identifier), false); + if ((topic = dds_create_topic_arbitrary (gcdds.ppant, sertopic, fqtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); goto fail_topic; } - if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { + if ((qos = create_readwrite_qos (qos_policies, ignore_local_publications)) == nullptr) { goto fail_qos; } - if ((sub->subh = dds_create_reader(gcdds.ppant, topic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create reader"); + if ((sub->subh = dds_create_reader (gcdds.ppant, topic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create reader"); goto fail_reader; } - if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { - RMW_SET_ERROR_MSG("failed to create readcondition"); + if ((sub->rdcondh = dds_create_readcondition (sub->subh, DDS_ANY_STATE)) < 0) { + RMW_SET_ERROR_MSG ("failed to create readcondition"); goto fail_readcond; } - dds_qos_delete(qos); + dds_delete_qos (qos); + ddsi_sertopic_unref (sertopic); return sub; fail_readcond: - if (dds_delete(sub->subh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); + if (dds_delete (sub->subh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); } fail_reader: - dds_qos_delete(qos); + dds_delete_qos (qos); fail_qos: /* FIXME: leak topic */ fail_topic: delete sub; + ddsi_sertopic_unref (sertopic); return nullptr; } -extern "C" rmw_subscription_t *rmw_create_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) +extern "C" rmw_subscription_t *rmw_create_subscription (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { CddsSubscription *sub; rmw_subscription_t *rmw_subscription; - if ((sub = create_cdds_subscription(node, type_supports, topic_name, qos_policies, ignore_local_publications)) == nullptr) { + if ((sub = create_cdds_subscription (node, type_supports, topic_name, qos_policies, ignore_local_publications)) == nullptr) { goto fail_common_init; } - rmw_subscription = rmw_subscription_allocate(); - RET_ALLOC_X(rmw_subscription, goto fail_subscription); + rmw_subscription = rmw_subscription_allocate (); + RET_ALLOC_X (rmw_subscription, goto fail_subscription); rmw_subscription->implementation_identifier = adlink_cyclonedds_identifier; rmw_subscription->data = sub; - rmw_subscription->topic_name = reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); - RET_ALLOC_X(rmw_subscription->topic_name, goto fail_topic_name); - memcpy(const_cast(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1); + rmw_subscription->topic_name = reinterpret_cast (rmw_allocate (strlen (topic_name) + 1)); + RET_ALLOC_X (rmw_subscription->topic_name, goto fail_topic_name); + memcpy (const_cast (rmw_subscription->topic_name), topic_name, strlen (topic_name) + 1); return rmw_subscription; fail_topic_name: - rmw_subscription_free(rmw_subscription); + rmw_subscription_free (rmw_subscription); fail_subscription: - if (dds_delete(sub->rdcondh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete readcondition during error handling"); + if (dds_delete (sub->rdcondh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete readcondition during error handling"); } - if (dds_delete(sub->subh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); + if (dds_delete (sub->subh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); } delete sub; fail_common_init: return nullptr; } -extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t *node, rmw_subscription_t *subscription) +extern "C" rmw_ret_t rmw_destroy_subscription (rmw_node_t *node, rmw_subscription_t *subscription) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(subscription); - auto sub = static_cast(subscription->data); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (subscription); + auto sub = static_cast (subscription->data); if (sub != nullptr) { - clean_waitset_caches(); - if (dds_delete(sub->rdcondh) < 0) { - RMW_SET_ERROR_MSG("failed to delete readcondition"); + clean_waitset_caches (); + if (dds_delete (sub->rdcondh) < 0) { + RMW_SET_ERROR_MSG ("failed to delete readcondition"); } - if (dds_delete(sub->subh) < 0) { - RMW_SET_ERROR_MSG("failed to delete reader"); + if (dds_delete (sub->subh) < 0) { + RMW_SET_ERROR_MSG ("failed to delete reader"); } delete sub; } - rmw_free(const_cast(subscription->topic_name)); + rmw_free (const_cast (subscription->topic_name)); subscription->topic_name = nullptr; - rmw_subscription_free(subscription); + rmw_subscription_free (subscription); return RMW_RET_OK; } -static rmw_ret_t rmw_take_int(const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) +static rmw_ret_t rmw_take_int (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) { - RET_NULL(taken); - RET_NULL(ros_message); - RET_WRONG_IMPLID(subscription); - CddsSubscription *sub = static_cast(subscription->data); - RET_NULL(sub); - struct serdata *sd; + RET_NULL (taken); + RET_NULL (ros_message); + RET_WRONG_IMPLID (subscription); + CddsSubscription *sub = static_cast (subscription->data); + RET_NULL (sub); dds_sample_info_t info; - while (dds_takecdr(sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { - if (info.valid_data && !(sub->ignore_local_publications && sub->node->own_writers.count(info.publication_handle))) { - size_t sz; - void *raw; - ddsi_serdata_getblob(&raw, &sz, sd); - /* FIXME: endianness (i.e., the "+ 4") */ - cycdeser deser(static_cast(static_cast(raw) + 4), sz); - desermsg(deser, ros_message, nullptr, sub->ts); - ddsi_serdata_unref(sd); + while (dds_take (sub->subh, &ros_message, &info, 1, 1) == 1) { + if (info.valid_data) { if (message_info) { message_info->publisher_gid.implementation_identifier = adlink_cyclonedds_identifier; - memset(message_info->publisher_gid.data, 0, sizeof(message_info->publisher_gid.data)); - assert(sizeof(info.publication_handle) <= sizeof(message_info->publisher_gid.data)); - memcpy(message_info->publisher_gid.data, &info.publication_handle, sizeof(info.publication_handle)); + memset (message_info->publisher_gid.data, 0, sizeof (message_info->publisher_gid.data)); + assert (sizeof (info.publication_handle) <= sizeof (message_info->publisher_gid.data)); + memcpy (message_info->publisher_gid.data, &info.publication_handle, sizeof (info.publication_handle)); } *taken = true; return RMW_RET_OK; - } else { - ddsi_serdata_unref(sd); } } *taken = false; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_take(const rmw_subscription_t *subscription, void *ros_message, bool *taken) +extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken) { - return rmw_take_int(subscription, ros_message, taken, nullptr); + return rmw_take_int (subscription, ros_message, taken, nullptr); } -extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) +extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) { - return rmw_take_int(subscription, ros_message, taken, message_info); + return rmw_take_int (subscription, ros_message, taken, message_info); } ///////////////////////////////////////////////////////////////////////////////////////// @@ -802,15 +617,15 @@ extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription, /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_guard_condition_t *rmw_create_guard_condition() +extern "C" rmw_guard_condition_t *rmw_create_guard_condition (rmw_context_t *context __attribute__ ((unused))) { rmw_guard_condition_t *guard_condition_handle; - auto *gcond_impl = new CddsGuardCondition(); - if (ref_ppant() < 0) { + auto *gcond_impl = new CddsGuardCondition (); + if (ref_ppant () < 0) { goto fail_ppant; } - if ((gcond_impl->gcondh = dds_create_guardcondition(gcdds.ppant)) < 0) { - RMW_SET_ERROR_MSG("failed to create guardcondition"); + if ((gcond_impl->gcondh = dds_create_guardcondition (gcdds.ppant)) < 0) { + RMW_SET_ERROR_MSG ("failed to create guardcondition"); goto fail_guardcond; } guard_condition_handle = new rmw_guard_condition_t; @@ -819,173 +634,173 @@ extern "C" rmw_guard_condition_t *rmw_create_guard_condition() return guard_condition_handle; fail_guardcond: - unref_ppant(); + unref_ppant (); fail_ppant: - delete(gcond_impl); + delete (gcond_impl); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_condition_handle) +extern "C" rmw_ret_t rmw_destroy_guard_condition (rmw_guard_condition_t *guard_condition_handle) { - RET_NULL(guard_condition_handle); - auto *gcond_impl = static_cast(guard_condition_handle->data); - clean_waitset_caches(); - dds_delete(gcond_impl->gcondh); - unref_ppant(); + RET_NULL (guard_condition_handle); + auto *gcond_impl = static_cast (guard_condition_handle->data); + clean_waitset_caches (); + dds_delete (gcond_impl->gcondh); + unref_ppant (); delete gcond_impl; delete guard_condition_handle; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_trigger_guard_condition(const rmw_guard_condition_t *guard_condition_handle) +extern "C" rmw_ret_t rmw_trigger_guard_condition (const rmw_guard_condition_t *guard_condition_handle) { - RET_WRONG_IMPLID(guard_condition_handle); - auto *gcond_impl = static_cast(guard_condition_handle->data); - dds_set_guardcondition(gcond_impl->gcondh, true); + RET_WRONG_IMPLID (guard_condition_handle); + auto *gcond_impl = static_cast (guard_condition_handle->data); + dds_set_guardcondition (gcond_impl->gcondh, true); return RMW_RET_OK; } -extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) +extern "C" rmw_wait_set_t *rmw_create_wait_set (size_t max_conditions) { - (void)max_conditions; - rmw_wait_set_t *wait_set = rmw_wait_set_allocate(); + (void) max_conditions; + rmw_wait_set_t *wait_set = rmw_wait_set_allocate (); CddsWaitset *ws = nullptr; - RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); + RET_ALLOC_X (wait_set, goto fail_alloc_wait_set); wait_set->implementation_identifier = adlink_cyclonedds_identifier; - wait_set->data = rmw_allocate(sizeof(CddsWaitset)); - RET_ALLOC_X(wait_set->data, goto fail_alloc_wait_set_data); + wait_set->data = rmw_allocate (sizeof (CddsWaitset)); + RET_ALLOC_X (wait_set->data, goto fail_alloc_wait_set_data); // This should default-construct the fields of CddsWaitset - ws = static_cast(wait_set->data); - RMW_TRY_PLACEMENT_NEW(ws, ws, goto fail_placement_new, CddsWaitset, ); + ws = static_cast (wait_set->data); + RMW_TRY_PLACEMENT_NEW (ws, ws, goto fail_placement_new, CddsWaitset, ); if (!ws) { - RMW_SET_ERROR_MSG("failed to construct wait set info struct"); + RMW_SET_ERROR_MSG ("failed to construct wait set info struct"); goto fail_ws; } ws->inuse = false; - if ((ws->waitseth = dds_create_waitset(gcdds.ppant)) < 0) { - RMW_SET_ERROR_MSG("failed to create waitset"); + if ((ws->waitseth = dds_create_waitset (gcdds.ppant)) < 0) { + RMW_SET_ERROR_MSG ("failed to create waitset"); goto fail_waitset; } { - std::lock_guard lock(gcdds.lock); - gcdds.waitsets.insert(ws); + std::lock_guard lock (gcdds.lock); + gcdds.waitsets.insert (ws); } return wait_set; fail_waitset: fail_ws: - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws); + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE (ws->~CddsWaitset (), ws); fail_placement_new: - rmw_free(wait_set->data); + rmw_free (wait_set->data); fail_alloc_wait_set_data: - rmw_wait_set_free(wait_set); + rmw_wait_set_free (wait_set); fail_alloc_wait_set: return nullptr; } -extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t *wait_set) +extern "C" rmw_ret_t rmw_destroy_wait_set (rmw_wait_set_t *wait_set) { - RET_WRONG_IMPLID(wait_set); + RET_WRONG_IMPLID (wait_set); auto result = RMW_RET_OK; - auto ws = static_cast(wait_set->data); - RET_NULL(ws); - dds_delete(ws->waitseth); + auto ws = static_cast (wait_set->data); + RET_NULL (ws); + dds_delete (ws->waitseth); { - std::lock_guard lock(gcdds.lock); - gcdds.waitsets.erase(ws); + std::lock_guard lock (gcdds.lock); + gcdds.waitsets.erase (ws); } - RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR); - rmw_free(wait_set->data); - rmw_wait_set_free(wait_set); + RMW_TRY_DESTRUCTOR (ws->~CddsWaitset (), ws, result = RMW_RET_ERROR); + rmw_free (wait_set->data); + rmw_wait_set_free (wait_set); return result; } template -static bool require_reattach(const std::vector& cached, size_t count, void **ary) +static bool require_reattach (const std::vector& cached, size_t count, void **ary) { if (ary == nullptr || count == 0) { - return (cached.size() != 0); - } else if (count != cached.size()) { + return (cached.size () != 0); + } else if (count != cached.size ()) { return true; } else { - return (memcmp(static_cast(cached.data()), static_cast(ary), count * sizeof(void *)) != 0); + return (memcmp (static_cast (cached.data ()), static_cast (ary), count * sizeof (void *)) != 0); } } -static void waitset_detach(CddsWaitset *ws) +static void waitset_detach (CddsWaitset *ws) { - for (auto&& x : ws->subs) dds_waitset_detach(ws->waitseth, x->rdcondh); - for (auto&& x : ws->gcs) dds_waitset_detach(ws->waitseth, x->gcondh); - for (auto&& x : ws->srvs) dds_waitset_detach(ws->waitseth, x->service.sub->rdcondh); - for (auto&& x : ws->cls) dds_waitset_detach(ws->waitseth, x->client.sub->rdcondh); - ws->subs.resize(0); - ws->gcs.resize(0); - ws->srvs.resize(0); - ws->cls.resize(0); + for (auto&& x : ws->subs) dds_waitset_detach (ws->waitseth, x->rdcondh); + for (auto&& x : ws->gcs) dds_waitset_detach (ws->waitseth, x->gcondh); + for (auto&& x : ws->srvs) dds_waitset_detach (ws->waitseth, x->service.sub->rdcondh); + for (auto&& x : ws->cls) dds_waitset_detach (ws->waitseth, x->client.sub->rdcondh); + ws->subs.resize (0); + ws->gcs.resize (0); + ws->srvs.resize (0); + ws->cls.resize (0); } -static void clean_waitset_caches() +static void clean_waitset_caches () { /* Called whenever a subscriber, guard condition, service or client is deleted (as these may have been cached in a waitset), and drops all cached entities from all waitsets (just to keep life simple). I'm assuming one is not allowed to delete an entity while it is still being used ... */ - std::lock_guard lock(gcdds.lock); + std::lock_guard lock (gcdds.lock); for (auto&& ws : gcdds.waitsets) { - std::lock_guard lock(ws->lock); + std::lock_guard lock (ws->lock); if (!ws->inuse) { - waitset_detach(ws); + waitset_detach (ws); } } } -extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) +extern "C" rmw_ret_t rmw_wait (rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) { - RET_NULL(wait_set); - CddsWaitset *ws = static_cast(wait_set->data); - RET_NULL(ws); + RET_NULL (wait_set); + CddsWaitset *ws = static_cast (wait_set->data); + RET_NULL (ws); { - std::lock_guard lock(ws->lock); + std::lock_guard lock (ws->lock); if (ws->inuse) { - RMW_SET_ERROR_MSG("concurrent calls to rmw_wait on a single waitset is not supported"); + RMW_SET_ERROR_MSG ("concurrent calls to rmw_wait on a single waitset is not supported"); return RMW_RET_ERROR; } ws->inuse = true; } - if (require_reattach(ws->subs, subs ? subs->subscriber_count : 0, subs ? subs->subscribers : nullptr) || - require_reattach(ws->gcs, gcs ? gcs->guard_condition_count : 0, gcs ? gcs->guard_conditions : nullptr) || - require_reattach(ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) || - require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) { + if (require_reattach (ws->subs, subs ? subs->subscriber_count : 0, subs ? subs->subscribers : nullptr) || + require_reattach (ws->gcs, gcs ? gcs->guard_condition_count : 0, gcs ? gcs->guard_conditions : nullptr) || + require_reattach (ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) || + require_reattach (ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) { size_t nelems = 0; - waitset_detach(ws); + waitset_detach (ws); #define ATTACH(type, var, name, cond) do { \ - ws->var.resize(0); \ + ws->var.resize (0); \ if (var) { \ - ws->var.reserve(var->name##_count); \ + ws->var.reserve (var->name##_count); \ for (size_t i = 0; i < var->name##_count; i++) { \ - auto x = static_cast(var->name##s[i]); \ - ws->var.push_back(x); \ - dds_waitset_attach(ws->waitseth, x->cond, nelems); \ + auto x = static_cast (var->name##s[i]); \ + ws->var.push_back (x); \ + dds_waitset_attach (ws->waitseth, x->cond, nelems); \ nelems++; \ } \ } \ } while (0) - ATTACH(CddsSubscription, subs, subscriber, rdcondh); - ATTACH(CddsGuardCondition, gcs, guard_condition, gcondh); - ATTACH(CddsService, srvs, service, service.sub->rdcondh); - ATTACH(CddsClient, cls, client, client.sub->rdcondh); + ATTACH (CddsSubscription, subs, subscriber, rdcondh); + ATTACH (CddsGuardCondition, gcs, guard_condition, gcondh); + ATTACH (CddsService, srvs, service, service.sub->rdcondh); + ATTACH (CddsClient, cls, client, client.sub->rdcondh); #undef ATTACH - ws->trigs.reserve(nelems + 1); + ws->trigs.reserve (nelems + 1); } const dds_duration_t timeout = - (wait_timeout == NULL) ? DDS_NEVER : (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec; - const dds_return_t ntrig = dds_waitset_wait(ws->waitseth, ws->trigs.data(), ws->trigs.capacity(), timeout); - ws->trigs.resize(ntrig); - std::sort(ws->trigs.begin(), ws->trigs.end()); - ws->trigs.push_back((dds_attach_t)-1); + (wait_timeout == NULL) ? DDS_NEVER : (dds_duration_t) wait_timeout->sec * 1000000000 + wait_timeout->nsec; + const dds_return_t ntrig = dds_waitset_wait (ws->waitseth, ws->trigs.data (), ws->trigs.capacity (), timeout); + ws->trigs.resize (ntrig); + std::sort (ws->trigs.begin (), ws->trigs.end ()); + ws->trigs.push_back ((dds_attach_t) -1); { long trig_idx = 0; @@ -994,9 +809,9 @@ extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t #define DETACH(type, var, name, cond, on_triggered) do { \ if (var) { \ for (size_t i = 0; i < var->name##_count; i++) { \ - auto x = static_cast(var->name##s[i]); \ - /*dds_waitset_detach(ws->waitseth, x->cond);*/ \ - if (ws->trigs[trig_idx] == (long)nelems) { \ + auto x = static_cast (var->name##s[i]); \ + /*dds_waitset_detach (ws->waitseth, x->cond);*/ \ + if (ws->trigs[trig_idx] == (long) nelems) { \ on_triggered; \ trig_idx++; \ } else { \ @@ -1006,19 +821,19 @@ extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t } \ } \ } while (0) - DETACH(CddsSubscription, subs, subscriber, rdcondh, (void)x); - DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy)); - DETACH(CddsService, srvs, service, service.sub->rdcondh, (void)x); - DETACH(CddsClient, cls, client, client.sub->rdcondh, (void)x); + DETACH (CddsSubscription, subs, subscriber, rdcondh, (void) x); + DETACH (CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition (x->gcondh, &dummy)); + DETACH (CddsService, srvs, service, service.sub->rdcondh, (void) x); + DETACH (CddsClient, cls, client, client.sub->rdcondh, (void) x); #undef DETACH } { - std::lock_guard lock(ws->lock); + std::lock_guard lock (ws->lock); ws->inuse = false; } - return (ws->trigs.size() == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK; + return (ws->trigs.size () == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK; } ///////////////////////////////////////////////////////////////////////////////////////// @@ -1027,263 +842,260 @@ extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static rmw_ret_t rmw_take_response_request(CddsCS *cs, rmw_request_id_t *request_header, void *ros_data, bool *taken, dds_instance_handle_t srcfilter) +static rmw_ret_t rmw_take_response_request (CddsCS *cs, rmw_request_id_t *request_header, void *ros_data, bool *taken, dds_instance_handle_t srcfilter) { - RET_NULL(taken); - RET_NULL(ros_data); - RET_NULL(request_header); - struct serdata *sd; + RET_NULL (taken); + RET_NULL (ros_data); + RET_NULL (request_header); + cdds_request_wrapper_t wrap; dds_sample_info_t info; - while (dds_takecdr(cs->sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { + wrap.data = ros_data; + void *wrap_ptr = static_cast (&wrap); + while (dds_take (cs->sub->subh, &wrap_ptr, &info, 1, 1) == 1) { if (info.valid_data) { - size_t sz; - void *raw; - ddsi_serdata_getblob(&raw, &sz, sd); - /* FIXME: endianness (i.e., the "+ 4") */ - cycdeser deser(static_cast(static_cast(raw) + 4), sz); - cdds_request_header_t header; - desermsg(deser, ros_data, [&header](cycdeser& ser) { ser >> header.guid; ser >> header.seq; }, cs->sub->ts); - ddsi_serdata_unref(sd); - memset(request_header, 0, sizeof(*request_header)); - assert(sizeof(header.guid) < sizeof(request_header->writer_guid)); - memcpy(request_header->writer_guid, &header.guid, sizeof(header.guid)); - request_header->sequence_number = header.seq; - if (srcfilter == 0 || srcfilter == header.guid) { + memset (request_header, 0, sizeof (wrap.header)); + assert (sizeof (wrap.header.guid) < sizeof (wrap.header.guid)); + memcpy (request_header->writer_guid, &wrap.header.guid, sizeof (wrap.header.guid)); + request_header->sequence_number = wrap.header.seq; + if (srcfilter == 0 || srcfilter == wrap.header.guid) { *taken = true; return RMW_RET_OK; } - } else { - ddsi_serdata_unref(sd); } } *taken = false; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_take_response(const rmw_client_t *client, rmw_request_id_t *request_header, void *ros_response, bool *taken) +extern "C" rmw_ret_t rmw_take_response (const rmw_client_t *client, rmw_request_id_t *request_header, void *ros_response, bool *taken) { - RET_WRONG_IMPLID(client); - auto info = static_cast(client->data); - return rmw_take_response_request(&info->client, request_header, ros_response, taken, info->client.pub->pubiid); + RET_WRONG_IMPLID (client); + auto info = static_cast (client->data); + return rmw_take_response_request (&info->client, request_header, ros_response, taken, info->client.pub->pubiid); } -extern "C" rmw_ret_t rmw_take_request(const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_request, bool *taken) +extern "C" rmw_ret_t rmw_take_request (const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_request, bool *taken) { - RET_WRONG_IMPLID(service); - auto info = static_cast(service->data); - return rmw_take_response_request(&info->service, request_header, ros_request, taken, 0); + RET_WRONG_IMPLID (service); + auto info = static_cast (service->data); + return rmw_take_response_request (&info->service, request_header, ros_request, taken, 0); } -static rmw_ret_t rmw_send_response_request(CddsCS *cs, cdds_request_header_t *header, const void *ros_data) +static rmw_ret_t rmw_send_response_request (CddsCS *cs, const cdds_request_header_t& header, const void *ros_data) { - cycser sd(cs->pub->sertopic); - if (sermsg(ros_data, sd, [&header](cycser& ser) { ser << header->guid; ser << header->seq; }, cs->pub->ts)) { - return rmw_write_ser(cs->pub->pubh, sd); + const cdds_request_wrapper_t wrap = { header, const_cast (ros_data) }; + if (dds_write (cs->pub->pubh, static_cast (&wrap)) >= 0) { + return RMW_RET_OK; } else { - RMW_SET_ERROR_MSG("cannot serialize data"); - return RMW_RET_ERROR; + /* FIXME: what is the expected behavior when it times out? */ + RMW_SET_ERROR_MSG ("cannot publish data"); + //return RMW_RET_ERROR; + return RMW_RET_OK; } } -extern "C" rmw_ret_t rmw_send_response(const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_response) +extern "C" rmw_ret_t rmw_send_response (const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_response) { - RET_WRONG_IMPLID(service); - RET_NULL(request_header); - RET_NULL(ros_response); - CddsService *info = static_cast(service->data); + RET_WRONG_IMPLID (service); + RET_NULL (request_header); + RET_NULL (ros_response); + CddsService *info = static_cast (service->data); cdds_request_header_t header; - memcpy(&header.guid, request_header->writer_guid, sizeof(header.guid)); + memcpy (&header.guid, request_header->writer_guid, sizeof (header.guid)); header.seq = request_header->sequence_number; - return rmw_send_response_request(&info->service, &header, ros_response); + return rmw_send_response_request (&info->service, header, ros_response); } -extern "C" rmw_ret_t rmw_send_request(const rmw_client_t *client, const void *ros_request, int64_t *sequence_id) +extern "C" rmw_ret_t rmw_send_request (const rmw_client_t *client, const void *ros_request, int64_t *sequence_id) { static std::atomic_uint next_request_id; - RET_WRONG_IMPLID(client); - RET_NULL(ros_request); - RET_NULL(sequence_id); - auto info = static_cast(client->data); + RET_WRONG_IMPLID (client); + RET_NULL (ros_request); + RET_NULL (sequence_id); + auto info = static_cast (client->data); cdds_request_header_t header; header.guid = info->client.pub->pubiid; header.seq = *sequence_id = next_request_id++; - return rmw_send_response_request(&info->client, &header, ros_request); + return rmw_send_response_request (&info->client, header, ros_request); } -static const rosidl_service_type_support_t *get_service_typesupport(const rosidl_service_type_support_t *type_supports) +static const rosidl_service_type_support_t *get_service_typesupport (const rosidl_service_type_support_t *type_supports) { const rosidl_service_type_support_t *ts; - if ((ts = get_service_typesupport_handle(type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + if ((ts = get_service_typesupport_handle (type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { return ts; - } else if ((ts = get_service_typesupport_handle(type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + } else if ((ts = get_service_typesupport_handle (type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { return ts; } else { - RMW_SET_ERROR_MSG("service type support not from this implementation"); + RMW_SET_ERROR_MSG ("service type support not from this implementation"); return nullptr; } } -static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies, bool is_service) +static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies, bool is_service) { - RET_WRONG_IMPLID(node); - RET_NULL_OR_EMPTYSTR(service_name); - RET_NULL(qos_policies); - auto node_impl = static_cast(node->data); - RET_NULL(node_impl); - const rosidl_service_type_support_t *type_support = get_service_typesupport(type_supports); - RET_NULL(type_support); + RET_WRONG_IMPLID (node); + RET_NULL_OR_EMPTYSTR (service_name); + RET_NULL (qos_policies); + auto node_impl = static_cast (node->data); + RET_NULL (node_impl); + const rosidl_service_type_support_t *type_support = get_service_typesupport (type_supports); + RET_NULL (type_support); - auto pub = new CddsPublisher(); - auto sub = new CddsSubscription(); + auto pub = new CddsPublisher (); + auto sub = new CddsSubscription (); std::string subtopic_name, pubtopic_name; - pub->ts.typesupport_identifier_ = type_support->typesupport_identifier; - sub->ts.typesupport_identifier_ = type_support->typesupport_identifier; + void *pub_type_support, *sub_type_support; if (is_service) { - sub->ts.type_support_ = create_request_type_support(type_support->data, type_support->typesupport_identifier); - pub->ts.type_support_ = create_response_type_support(type_support->data, type_support->typesupport_identifier); - subtopic_name = make_fqtopic(ros_service_requester_prefix, service_name, "_request", qos_policies); - pubtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); + sub_type_support = create_request_type_support (type_support->data, type_support->typesupport_identifier); + pub_type_support = create_response_type_support (type_support->data, type_support->typesupport_identifier); + subtopic_name = make_fqtopic (ros_service_requester_prefix, service_name, "_request", qos_policies); + pubtopic_name = make_fqtopic (ros_service_response_prefix, service_name, "_reply", qos_policies); } else { - pub->ts.type_support_ = create_request_type_support(type_support->data, type_support->typesupport_identifier); - sub->ts.type_support_ = create_response_type_support(type_support->data, type_support->typesupport_identifier); - pubtopic_name = make_fqtopic(ros_service_requester_prefix, service_name, "_request", qos_policies); - subtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); + pub_type_support = create_request_type_support (type_support->data, type_support->typesupport_identifier); + sub_type_support = create_response_type_support (type_support->data, type_support->typesupport_identifier); + pubtopic_name = make_fqtopic (ros_service_requester_prefix, service_name, "_request", qos_policies); + subtopic_name = make_fqtopic (ros_service_response_prefix, service_name, "_reply", qos_policies); } - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client") - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str()) - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str()) - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "***********") + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client"); + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str ()); + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str ()); + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "***********"); - dds_entity_t pubtopic, subtopic; + dds_entity_t pubtopic, subtopic; + + auto pub_st = create_sertopic (pubtopic_name.c_str (), type_support->typesupport_identifier, pub_type_support, true); + auto sub_st = create_sertopic (subtopic_name.c_str (), type_support->typesupport_identifier, sub_type_support, true); + dds_qos_t *qos; - if ((pubtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, pubtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); + if ((pubtopic = dds_create_topic_arbitrary (gcdds.ppant, pub_st, pubtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); goto fail_pubtopic; } - if ((subtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, subtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); + if ((subtopic = dds_create_topic_arbitrary (gcdds.ppant, sub_st, subtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); goto fail_subtopic; } - if ((qos = dds_qos_create()) == nullptr) { + if ((qos = dds_create_qos ()) == nullptr) { goto fail_qos; } - dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); - dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); - if ((pub->pubh = dds_create_writer(gcdds.ppant, pubtopic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create writer"); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1)); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + if ((pub->pubh = dds_create_writer (gcdds.ppant, pubtopic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create writer"); goto fail_writer; } - pub->sertopic = get_sertopic(pubtopic, pubtopic_name); - sub->node = node_impl; - if ((sub->subh = dds_create_reader(gcdds.ppant, subtopic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create reader"); + if ((sub->subh = dds_create_reader (gcdds.ppant, subtopic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create reader"); goto fail_reader; } - if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { - RMW_SET_ERROR_MSG("failed to create readcondition"); + if ((sub->rdcondh = dds_create_readcondition (sub->subh, DDS_ANY_STATE)) < 0) { + RMW_SET_ERROR_MSG ("failed to create readcondition"); goto fail_readcond; } - if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { - RMW_SET_ERROR_MSG("failed to get instance handle for writer"); + if (dds_get_instance_handle (pub->pubh, &pub->pubiid) < 0) { + RMW_SET_ERROR_MSG ("failed to get instance handle for writer"); goto fail_instance_handle; } - dds_qos_delete(qos); - node_impl->own_writers.insert(pub->pubiid); + dds_delete_qos (qos); + ddsi_sertopic_unref (pub_st); + ddsi_sertopic_unref (sub_st); cs->pub = pub; cs->sub = sub; return RMW_RET_OK; fail_instance_handle: - dds_delete(sub->rdcondh); + dds_delete (sub->rdcondh); fail_readcond: - dds_delete(sub->subh); + dds_delete (sub->subh); fail_reader: - dds_delete(pub->pubh); + dds_delete (pub->pubh); fail_writer: - dds_qos_delete(qos); + dds_delete_qos (qos); fail_qos: /* leak subtopic */ fail_subtopic: /* leak pubtopic */ fail_pubtopic: + ddsi_sertopic_unref (pub_st); + ddsi_sertopic_unref (sub_st); return RMW_RET_ERROR; } -static void rmw_fini_cs(CddsCS *cs) +static void rmw_fini_cs (CddsCS *cs) { - dds_delete(cs->sub->rdcondh); - dds_delete(cs->sub->subh); - dds_delete(cs->pub->pubh); - cs->sub->node->own_writers.erase(cs->pub->pubiid); + dds_delete (cs->sub->rdcondh); + dds_delete (cs->sub->subh); + dds_delete (cs->pub->pubh); } -extern "C" rmw_client_t *rmw_create_client(const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) +extern "C" rmw_client_t *rmw_create_client (const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) { - CddsClient *info = new CddsClient(); - if (rmw_init_cs(&info->client, node, type_supports, service_name, qos_policies, false) != RMW_RET_OK) { - delete(info); + CddsClient *info = new CddsClient (); + if (rmw_init_cs (&info->client, node, type_supports, service_name, qos_policies, false) != RMW_RET_OK) { + delete (info); return nullptr; } - rmw_client_t *rmw_client = rmw_client_allocate(); - RET_NULL_X(rmw_client, goto fail_client); + rmw_client_t *rmw_client = rmw_client_allocate (); + RET_NULL_X (rmw_client, goto fail_client); rmw_client->implementation_identifier = adlink_cyclonedds_identifier; rmw_client->data = info; - rmw_client->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); - RET_NULL_X(rmw_client->service_name, goto fail_service_name); - memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); + rmw_client->service_name = reinterpret_cast (rmw_allocate (strlen (service_name) + 1)); + RET_NULL_X (rmw_client->service_name, goto fail_service_name); + memcpy (const_cast (rmw_client->service_name), service_name, strlen (service_name) + 1); return rmw_client; fail_service_name: - rmw_client_free(rmw_client); + rmw_client_free (rmw_client); fail_client: - rmw_fini_cs(&info->client); + rmw_fini_cs (&info->client); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_client(rmw_node_t *node, rmw_client_t *client) +extern "C" rmw_ret_t rmw_destroy_client (rmw_node_t *node, rmw_client_t *client) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(client); - auto info = static_cast(client->data); - clean_waitset_caches(); - rmw_fini_cs(&info->client); - rmw_free(const_cast(client->service_name)); - rmw_client_free(client); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (client); + auto info = static_cast (client->data); + clean_waitset_caches (); + rmw_fini_cs (&info->client); + rmw_free (const_cast (client->service_name)); + rmw_client_free (client); return RMW_RET_OK; } -extern "C" rmw_service_t *rmw_create_service(const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) +extern "C" rmw_service_t *rmw_create_service (const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) { - CddsService *info = new CddsService(); - if (rmw_init_cs(&info->service, node, type_supports, service_name, qos_policies, true) != RMW_RET_OK) { - delete(info); + CddsService *info = new CddsService (); + if (rmw_init_cs (&info->service, node, type_supports, service_name, qos_policies, true) != RMW_RET_OK) { + delete (info); return nullptr; } - rmw_service_t *rmw_service = rmw_service_allocate(); - RET_NULL_X(rmw_service, goto fail_service); + rmw_service_t *rmw_service = rmw_service_allocate (); + RET_NULL_X (rmw_service, goto fail_service); rmw_service->implementation_identifier = adlink_cyclonedds_identifier; rmw_service->data = info; - rmw_service->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); - RET_NULL_X(rmw_service->service_name, goto fail_service_name); - memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); + rmw_service->service_name = reinterpret_cast (rmw_allocate (strlen (service_name) + 1)); + RET_NULL_X (rmw_service->service_name, goto fail_service_name); + memcpy (const_cast (rmw_service->service_name), service_name, strlen (service_name) + 1); return rmw_service; fail_service_name: - rmw_service_free(rmw_service); + rmw_service_free (rmw_service); fail_service: - rmw_fini_cs(&info->service); + rmw_fini_cs (&info->service); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t *node, rmw_service_t *service) +extern "C" rmw_ret_t rmw_destroy_service (rmw_node_t *node, rmw_service_t *service) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(service); - auto info = static_cast(service->data); - clean_waitset_caches(); - rmw_fini_cs(&info->service); - rmw_free(const_cast(service->service_name)); - rmw_service_free(service); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (service); + auto info = static_cast (service->data); + clean_waitset_caches (); + rmw_fini_cs (&info->service); + rmw_free (const_cast (service->service_name)); + rmw_service_free (service); return RMW_RET_OK; } @@ -1293,56 +1105,56 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t *node, rmw_service_t *servic /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_ret_t rmw_get_node_names(const rmw_node_t *node, rcutils_string_array_t *node_names) +extern "C" rmw_ret_t rmw_get_node_names (const rmw_node_t *node, rcutils_string_array_t *node_names, rcutils_string_array_t *node_namespaces) { #if 0 // NIY - RET_WRONG_IMPLID(node); - if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK) { + RET_WRONG_IMPLID (node); + if (rmw_check_zero_rmw_string_array (node_names) != RMW_RET_OK) { return RMW_RET_ERROR; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); // FIXME: sorry, can't do it with current Zenoh auto participant_names = std::vector{}; - rcutils_allocator_t allocator = rcutils_get_default_allocator(); + rcutils_allocator_t allocator = rcutils_get_default_allocator (); rcutils_ret_t rcutils_ret = - rcutils_string_array_init(node_names, participant_names.size(), &allocator); + rcutils_string_array_init (node_names, participant_names.size (), &allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) - return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ()) + return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret); } - for (size_t i = 0; i < participant_names.size(); ++i) { - node_names->data[i] = rcutils_strdup(participant_names[i].c_str(), allocator); + for (size_t i = 0; i < participant_names.size (); ++i) { + node_names->data[i] = rcutils_strdup (participant_names[i].c_str (), allocator); if (!node_names->data[i]) { - RMW_SET_ERROR_MSG("failed to allocate memory for node name") - rcutils_ret = rcutils_string_array_fini(node_names); + RMW_SET_ERROR_MSG ("failed to allocate memory for node name") + rcutils_ret = rcutils_string_array_fini (node_names); if (rcutils_ret != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string_safe()) - } + "failed to cleanup during error handling: %s", rcutils_get_error_string_safe ()); + } return RMW_RET_BAD_ALLOC; } } return RMW_RET_OK; #else - (void)node; (void)node_names; + (void) node; (void) node_names; (void) node_namespaces; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcutils_allocator_t *allocator, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) +extern "C" rmw_ret_t rmw_get_topic_names_and_types (const rmw_node_t *node, rcutils_allocator_t *allocator, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) { #if 0 // NIY - RET_NULL(allocator); - RET_WRONG_IMPLID(node); - rmw_ret_t ret = rmw_names_and_types_check_zero(topic_names_and_types); + RET_NULL (allocator); + RET_WRONG_IMPLID (node); + rmw_ret_t ret = rmw_names_and_types_check_zero (topic_names_and_types); if (ret != RMW_RET_OK) { return ret; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); // Access the slave Listeners, which are the ones that have the topicnamesandtypes member // Get info from publisher and subscriber @@ -1350,48 +1162,48 @@ extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcuti std::map> topics; { ReaderInfo * slave_target = impl->secondarySubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { + if (!no_demangle && _get_ros_prefix_if_exists (it.first) != ros_topic_prefix) { // if we are demangling and this is not prefixed with rt/, skip it continue; } for (auto & itt : it.second) { - topics[it.first].insert(itt); + topics[it.first].insert (itt); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } { WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { + if (!no_demangle && _get_ros_prefix_if_exists (it.first) != ros_topic_prefix) { // if we are demangling and this is not prefixed with rt/, skip it continue; } for (auto & itt : it.second) { - topics[it.first].insert(itt); + topics[it.first].insert (itt); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } // Copy data to results handle - if (topics.size() > 0) { + if (topics.size () > 0) { // Setup string array to store names - rmw_ret_t rmw_ret = rmw_names_and_types_init(topic_names_and_types, topics.size(), allocator); + rmw_ret_t rmw_ret = rmw_names_and_types_init (topic_names_and_types, topics.size (), allocator); if (rmw_ret != RMW_RET_OK) { return rmw_ret; } // Setup cleanup function, in case of failure below auto fail_cleanup = [&topic_names_and_types]() { - rmw_ret_t rmw_ret = rmw_names_and_types_fini(topic_names_and_types); + rmw_ret_t rmw_ret = rmw_names_and_types_fini (topic_names_and_types); if (rmw_ret != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "error during report of error: %s", rmw_get_error_string_safe()) - } + "error during report of error: %s", rmw_get_error_string_safe ()); + } }; // Setup demangling functions based on no_demangle option auto demangle_topic = _demangle_if_ros_topic; @@ -1407,32 +1219,32 @@ extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcuti size_t index = 0; for (const auto & topic_n_types : topics) { // Duplicate and store the topic_name - char * topic_name = rcutils_strdup(demangle_topic(topic_n_types.first).c_str(), *allocator); + char * topic_name = rcutils_strdup (demangle_topic (topic_n_types.first).c_str (), *allocator); if (!topic_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for topic name", *allocator); - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for topic name", *allocator); + fail_cleanup (); return RMW_RET_BAD_ALLOC; } topic_names_and_types->names.data[index] = topic_name; // Setup storage for types { - rcutils_ret_t rcutils_ret = rcutils_string_array_init( + rcutils_ret_t rcutils_ret = rcutils_string_array_init ( &topic_names_and_types->types[index], - topic_n_types.second.size(), + topic_n_types.second.size (), allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) - fail_cleanup(); - return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ()) + fail_cleanup (); + return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret); } } // Duplicate and store each type for the topic size_t type_index = 0; for (const auto & type : topic_n_types.second) { - char * type_name = rcutils_strdup(demangle_type(type).c_str(), *allocator); + char * type_name = rcutils_strdup (demangle_type (type).c_str (), *allocator); if (!type_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for type name", *allocator) - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for type name", *allocator) + fail_cleanup (); return RMW_RET_BAD_ALLOC; } topic_names_and_types->types[index].data[type_index] = type_name; @@ -1443,34 +1255,34 @@ extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcuti } return RMW_RET_OK; #else - (void)node; (void)allocator; (void)no_demangle; (void)topic_names_and_types; + (void) node; (void) allocator; (void) no_demangle; (void) topic_names_and_types; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_get_service_names_and_types(const rmw_node_t *node, rcutils_allocator_t *allocator, rmw_names_and_types_t *service_names_and_types) +extern "C" rmw_ret_t rmw_get_service_names_and_types (const rmw_node_t *node, rcutils_allocator_t *allocator, rmw_names_and_types_t *service_names_and_types) { #if 0 // NIY if (!allocator) { - RMW_SET_ERROR_MSG("allocator is null") + RMW_SET_ERROR_MSG ("allocator is null") return RMW_RET_INVALID_ARGUMENT; } if (!node) { - RMW_SET_ERROR_MSG_ALLOC("null node handle", *allocator) + RMW_SET_ERROR_MSG_ALLOC ("null node handle", *allocator) return RMW_RET_INVALID_ARGUMENT; } - rmw_ret_t ret = rmw_names_and_types_check_zero(service_names_and_types); + rmw_ret_t ret = rmw_names_and_types_check_zero (service_names_and_types); if (ret != RMW_RET_OK) { return ret; } // Get participant pointer from node if (node->implementation_identifier != adlink_cyclonedds_identifier) { - RMW_SET_ERROR_MSG_ALLOC("node handle not from this implementation", *allocator); + RMW_SET_ERROR_MSG_ALLOC ("node handle not from this implementation", *allocator); return RMW_RET_ERROR; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); // Access the slave Listeners, which are the ones that have the topicnamesandtypes member // Get info from publisher and subscriber @@ -1478,88 +1290,88 @@ extern "C" rmw_ret_t rmw_get_service_names_and_types(const rmw_node_t *node, rcu std::map> services; { ReaderInfo * slave_target = impl->secondarySubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { + std::string service_name = _demangle_service_from_topic (it.first); + if (!service_name.length ()) { // not a service continue; } for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + std::string service_type = _demangle_service_type_only (itt); + if (service_type.length ()) { + services[service_name].insert (service_type); } } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } { WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { + std::string service_name = _demangle_service_from_topic (it.first); + if (!service_name.length ()) { // not a service continue; } for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + std::string service_type = _demangle_service_type_only (itt); + if (service_type.length ()) { + services[service_name].insert (service_type); } } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } // Fill out service_names_and_types - if (services.size()) { + if (services.size ()) { // Setup string array to store names rmw_ret_t rmw_ret = - rmw_names_and_types_init(service_names_and_types, services.size(), allocator); + rmw_names_and_types_init (service_names_and_types, services.size (), allocator); if (rmw_ret != RMW_RET_OK) { return rmw_ret; } // Setup cleanup function, in case of failure below auto fail_cleanup = [&service_names_and_types]() { - rmw_ret_t rmw_ret = rmw_names_and_types_fini(service_names_and_types); + rmw_ret_t rmw_ret = rmw_names_and_types_fini (service_names_and_types); if (rmw_ret != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "error during report of error: %s", rmw_get_error_string_safe()) + "error during report of error: %s", rmw_get_error_string_safe ()); } }; // For each service, store the name, initialize the string array for types, and store all types size_t index = 0; for (const auto & service_n_types : services) { // Duplicate and store the service_name - char * service_name = rcutils_strdup(service_n_types.first.c_str(), *allocator); + char * service_name = rcutils_strdup (service_n_types.first.c_str (), *allocator); if (!service_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for service name", *allocator); - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for service name", *allocator); + fail_cleanup (); return RMW_RET_BAD_ALLOC; } service_names_and_types->names.data[index] = service_name; // Setup storage for types { - rcutils_ret_t rcutils_ret = rcutils_string_array_init( + rcutils_ret_t rcutils_ret = rcutils_string_array_init ( &service_names_and_types->types[index], - service_n_types.second.size(), + service_n_types.second.size (), allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) - fail_cleanup(); - return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ()) + fail_cleanup (); + return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret); } } // Duplicate and store each type for the service size_t type_index = 0; for (const auto & type : service_n_types.second) { - char * type_name = rcutils_strdup(type.c_str(), *allocator); + char * type_name = rcutils_strdup (type.c_str (), *allocator); if (!type_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for type name", *allocator) - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for type name", *allocator) + fail_cleanup (); return RMW_RET_BAD_ALLOC; } service_names_and_types->types[index].data[type_index] = type_name; @@ -1570,75 +1382,75 @@ extern "C" rmw_ret_t rmw_get_service_names_and_types(const rmw_node_t *node, rcu } return RMW_RET_OK; #else - (void)node; (void)allocator; (void)service_names_and_types; + (void) node; (void) allocator; (void) service_names_and_types; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_service_server_is_available(const rmw_node_t * node, const rmw_client_t * client, bool * is_available) +extern "C" rmw_ret_t rmw_service_server_is_available (const rmw_node_t * node, const rmw_client_t * client, bool * is_available) { #if 0 // NIY if (!node) { - RMW_SET_ERROR_MSG("node handle is null"); + RMW_SET_ERROR_MSG ("node handle is null"); return RMW_RET_ERROR; } - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + RMW_CHECK_TYPE_IDENTIFIERS_MATCH ( node handle, node->implementation_identifier, adlink_cyclonedds_identifier, return RMW_RET_ERROR); if (!client) { - RMW_SET_ERROR_MSG("client handle is null"); + RMW_SET_ERROR_MSG ("client handle is null"); return RMW_RET_ERROR; } if (!is_available) { - RMW_SET_ERROR_MSG("is_available is null"); + RMW_SET_ERROR_MSG ("is_available is null"); return RMW_RET_ERROR; } - auto client_info = static_cast(client->data); + auto client_info = static_cast (client->data); if (!client_info) { - RMW_SET_ERROR_MSG("client info handle is null"); + RMW_SET_ERROR_MSG ("client info handle is null"); return RMW_RET_ERROR; } auto pub_topic_name = - client_info->request_publisher_->getAttributes().topic.getTopicName(); + client_info->request_publisher_->getAttributes ().topic.getTopicName (); auto pub_partitions = - client_info->request_publisher_->getAttributes().qos.m_partition.getNames(); + client_info->request_publisher_->getAttributes ().qos.m_partition.getNames (); // every rostopic has exactly 1 partition field set - if (pub_partitions.size() != 1) { - RCUTILS_LOG_ERROR_NAMED( + if (pub_partitions.size () != 1) { + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "Topic %s is not a ros topic", pub_topic_name.c_str()) - RMW_SET_ERROR_MSG((std::string(pub_topic_name) + " is a non-ros topic\n").c_str()); + "Topic %s is not a ros topic", pub_topic_name.c_str ()) + RMW_SET_ERROR_MSG ((std::string (pub_topic_name) + " is a non-ros topic\n").c_str ()); return RMW_RET_ERROR; } auto pub_fqdn = pub_partitions[0] + "/" + pub_topic_name; - pub_fqdn = _demangle_if_ros_topic(pub_fqdn); + pub_fqdn = _demangle_if_ros_topic (pub_fqdn); auto sub_topic_name = - client_info->response_subscriber_->getAttributes().topic.getTopicName(); + client_info->response_subscriber_->getAttributes ().topic.getTopicName (); auto sub_partitions = - client_info->response_subscriber_->getAttributes().qos.m_partition.getNames(); + client_info->response_subscriber_->getAttributes ().qos.m_partition.getNames (); // every rostopic has exactly 1 partition field set - if (sub_partitions.size() != 1) { - RCUTILS_LOG_ERROR_NAMED( + if (sub_partitions.size () != 1) { + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "Topic %s is not a ros topic", pub_topic_name.c_str()) - RMW_SET_ERROR_MSG((std::string(sub_topic_name) + " is a non-ros topic\n").c_str()); + "Topic %s is not a ros topic", pub_topic_name.c_str ()) + RMW_SET_ERROR_MSG ((std::string (sub_topic_name) + " is a non-ros topic\n").c_str ()); return RMW_RET_ERROR; } auto sub_fqdn = sub_partitions[0] + "/" + sub_topic_name; - sub_fqdn = _demangle_if_ros_topic(sub_fqdn); + sub_fqdn = _demangle_if_ros_topic (sub_fqdn); *is_available = false; size_t number_of_request_subscribers = 0; - rmw_ret_t ret = rmw_count_subscribers( + rmw_ret_t ret = rmw_count_subscribers ( node, - pub_fqdn.c_str(), + pub_fqdn.c_str (), &number_of_request_subscribers); if (ret != RMW_RET_OK) { // error string already set @@ -1650,9 +1462,9 @@ extern "C" rmw_ret_t rmw_service_server_is_available(const rmw_node_t * node, co } size_t number_of_response_publishers = 0; - ret = rmw_count_publishers( + ret = rmw_count_publishers ( node, - sub_fqdn.c_str(), + sub_fqdn.c_str (), &number_of_response_publishers); if (ret != RMW_RET_OK) { // error string already set @@ -1667,87 +1479,87 @@ extern "C" rmw_ret_t rmw_service_server_is_available(const rmw_node_t * node, co *is_available = true; return RMW_RET_OK; #else - (void)node; (void)client; (void)is_available; + (void) node; (void) client; (void) is_available; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_count_publishers(const rmw_node_t *node, const char *topic_name, size_t *count) +extern "C" rmw_ret_t rmw_count_publishers (const rmw_node_t *node, const char *topic_name, size_t *count) { #if 0 // safechecks if (!node) { - RMW_SET_ERROR_MSG("null node handle"); + RMW_SET_ERROR_MSG ("null node handle"); return RMW_RET_ERROR; } // Get participant pointer from node if (node->implementation_identifier != eprosima_fastrtps_identifier) { - RMW_SET_ERROR_MSG("node handle not from this implementation"); + RMW_SET_ERROR_MSG ("node handle not from this implementation"); return RMW_RET_ERROR; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); *count = 0; for (auto it : slave_target->topicNtypes) { - auto topic_fqdn = _demangle_if_ros_topic(it.first); + auto topic_fqdn = _demangle_if_ros_topic (it.first); if (topic_fqdn == topic_name) { - *count += it.second.size(); + *count += it.second.size (); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); - RCUTILS_LOG_DEBUG_NAMED( + RCUTILS_LOG_DEBUG_NAMED ( "rmw_fastrtps_cpp", "looking for subscriber topic: %s, number of matches: %zu", - topic_name, *count) + topic_name, *count); - return RMW_RET_OK; + return RMW_RET_OK; #else - (void)node; (void)topic_name; (void)count; + (void) node; (void) topic_name; (void) count; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_count_subscribers(const rmw_node_t *node, const char *topic_name, size_t *count) +extern "C" rmw_ret_t rmw_count_subscribers (const rmw_node_t *node, const char *topic_name, size_t *count) { #if 0 // safechecks if (!node) { - RMW_SET_ERROR_MSG("null node handle"); + RMW_SET_ERROR_MSG ("null node handle"); return RMW_RET_ERROR; } // Get participant pointer from node if (node->implementation_identifier != eprosima_fastrtps_identifier) { - RMW_SET_ERROR_MSG("node handle not from this implementation"); + RMW_SET_ERROR_MSG ("node handle not from this implementation"); return RMW_RET_ERROR; } - CustomParticipantInfo * impl = static_cast(node->data); + CustomParticipantInfo * impl = static_cast (node->data); ReaderInfo * slave_target = impl->secondarySubListener; *count = 0; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - auto topic_fqdn = _demangle_if_ros_topic(it.first); + auto topic_fqdn = _demangle_if_ros_topic (it.first); if (topic_fqdn == topic_name) { - *count += it.second.size(); + *count += it.second.size (); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); - RCUTILS_LOG_DEBUG_NAMED( + RCUTILS_LOG_DEBUG_NAMED ( "rmw_fastrtps_cpp", "looking for subscriber topic: %s, number of matches: %zu", - topic_name, *count) + topic_name, *count); - return RMW_RET_OK; + return RMW_RET_OK; #else - (void)node; (void)topic_name; (void)count; + (void) node; (void) topic_name; (void) count; return RMW_RET_TIMEOUT; #endif } diff --git a/rmw_cyclonedds_cpp/src/serdata.cpp b/rmw_cyclonedds_cpp/src/serdata.cpp new file mode 100644 index 0000000..be0e8ea --- /dev/null +++ b/rmw_cyclonedds_cpp/src/serdata.cpp @@ -0,0 +1,306 @@ +// Copyright 2019 ADLINK Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "rmw/error_handling.h" + +#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" +#include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" + +#include "rmw_cyclonedds_cpp/serdes.hpp" +#include "rmw_cyclonedds_cpp/serdata.hpp" + +#include "dds/ddsi/ddsi_iid.h" +#include "dds/ddsi/q_radmin.h" + +using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; +using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; +using RequestTypeSupport_c = rmw_cyclonedds_cpp::RequestTypeSupport; +using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport; +using ResponseTypeSupport_c = rmw_cyclonedds_cpp::ResponseTypeSupport; +using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport; + +static bool using_introspection_c_typesupport (const char *typesupport_identifier) +{ + return typesupport_identifier == rosidl_typesupport_introspection_c__identifier; +} + +static bool using_introspection_cpp_typesupport (const char *typesupport_identifier) +{ + return typesupport_identifier == rosidl_typesupport_introspection_cpp::typesupport_identifier; +} + +void *create_message_type_support (const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new MessageTypeSupport_c (members); + } else if (using_introspection_cpp_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new MessageTypeSupport_cpp (members); + } + RMW_SET_ERROR_MSG ("Unknown typesupport identifier"); + return nullptr; +} + +void *create_request_type_support (const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new RequestTypeSupport_c (members); + } else if (using_introspection_cpp_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new RequestTypeSupport_cpp (members); + } + RMW_SET_ERROR_MSG ("Unknown typesupport identifier"); + return nullptr; +} + +void *create_response_type_support (const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new ResponseTypeSupport_c (members); + } else if (using_introspection_cpp_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new ResponseTypeSupport_cpp (members); + } + RMW_SET_ERROR_MSG ("Unknown typesupport identifier"); + return nullptr; +} + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wc99-extensions" + +static uint32_t serdata_rmw_size (const struct ddsi_serdata *dcmn) +{ + const struct serdata_rmw *d = static_cast (dcmn); + return d->data.size (); +} + +static void serdata_rmw_free (struct ddsi_serdata *dcmn) +{ + const struct serdata_rmw *d = static_cast (dcmn); + delete d; +} + +static struct serdata_rmw *new_serdata_rmw (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind) +{ + struct serdata_rmw *sd = new serdata_rmw; + ddsi_serdata_init (sd, topic, kind); + return sd; +} + +static struct ddsi_serdata *serdata_rmw_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size) +{ + struct serdata_rmw *d = new_serdata_rmw (topic, kind); + uint32_t off = 0; + assert (fragchain->min == 0); + assert (fragchain->maxp1 >= off); /* CDR header must be in first fragment */ + d->data.reserve (size); + while (fragchain) { + if (fragchain->maxp1 > off) { + /* only copy if this fragment adds data */ + const unsigned char *payload = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)); + d->data.insert (d->data.end (), payload + off - fragchain->min, payload + fragchain->maxp1 - fragchain->min); + off = fragchain->maxp1; + } + fragchain = fragchain->nextfrag; + } + return d; +} + +static struct ddsi_serdata *serdata_rmw_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash __attribute__ ((unused))) +{ + /* there is no key field, so from_keyhash is trivial */ + return new_serdata_rmw (topic, SDK_KEY); +} + +static struct ddsi_serdata *serdata_rmw_from_sample (const struct ddsi_sertopic *topiccmn, enum ddsi_serdata_kind kind, const void *sample) +{ + const struct sertopic_rmw *topic = static_cast (topiccmn); + struct serdata_rmw *d = new_serdata_rmw (topic, kind); + if (kind != SDK_DATA) { + /* ROS2 doesn't do keys, so SDK_KEY is trivial */ + } else if (!topic->is_request_header) { + cycser sd (d->data); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (sample, sd, nullptr); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (sample, sd, nullptr); + } + } else { + /* The "prefix" lambda is there to inject the service invocation header data into the CDR + stream -- I haven't checked how it is done in the official RMW implementations, so it is + probably incompatible. */ + const cdds_request_wrapper_t *wrap = static_cast (sample); + auto prefix = [wrap](cycser& ser) { ser << wrap->header.guid; ser << wrap->header.seq; }; + cycser sd (d->data); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (wrap->data, sd, prefix); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (wrap->data, sd, prefix); + } + } + return d; +} + +static struct ddsi_serdata *serdata_rmw_to_topicless (const struct ddsi_serdata *dcmn) +{ + const struct serdata_rmw *d = static_cast (dcmn); + struct serdata_rmw *d1 = new_serdata_rmw (d->topic, SDK_KEY); + d1->topic = nullptr; + return d1; +} + +static void serdata_rmw_to_ser (const struct ddsi_serdata *dcmn, size_t off, size_t sz, void *buf) +{ + const struct serdata_rmw *d = static_cast (dcmn); + memcpy (buf, (char *) d->data.data () + off, sz); +} + +static struct ddsi_serdata *serdata_rmw_to_ser_ref (const struct ddsi_serdata *dcmn, size_t off, size_t sz, ddsrt_iovec_t *ref) +{ + const struct serdata_rmw *d = static_cast (dcmn); + ref->iov_base = (char *) d->data.data () + off; + ref->iov_len = (ddsrt_iov_len_t) sz; + return ddsi_serdata_ref (d); +} + +static void serdata_rmw_to_ser_unref (struct ddsi_serdata *dcmn, const ddsrt_iovec_t *ref __attribute__ ((unused))) +{ + struct serdata_rmw *d = static_cast (dcmn); + ddsi_serdata_unref (d); +} + +static bool serdata_rmw_to_sample (const struct ddsi_serdata *dcmn, void *sample, void **bufptr __attribute__ ((unused)), void *buflim __attribute__ ((unused))) +{ + const struct serdata_rmw *d = static_cast (dcmn); + const struct sertopic_rmw *topic = static_cast (d->topic); + assert (bufptr == NULL); + assert (buflim == NULL); + if (d->kind != SDK_DATA) { + /* ROS2 doesn't do keys in a meaningful way yet */ + } else if (!topic->is_request_header) { + cycdeser sd (static_cast (d->data.data ()), d->data.size ()); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, sample, nullptr); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, sample, nullptr); + } + } else { + /* The "prefix" lambda is there to inject the service invocation header data into the CDR + stream -- I haven't checked how it is done in the official RMW implementations, so it is + probably incompatible. */ + cdds_request_wrapper_t * const wrap = static_cast (sample); + auto prefix = [wrap](cycdeser& ser) { ser >> wrap->header.guid; ser >> wrap->header.seq; }; + cycdeser sd (static_cast (d->data.data ()), d->data.size ()); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, wrap->data, prefix); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, wrap->data, prefix); + } + } + return false; +} + +static bool serdata_rmw_topicless_to_sample (const struct ddsi_sertopic *topic __attribute__ ((unused)), const struct ddsi_serdata *dcmn __attribute__ ((unused)), void *sample __attribute__ ((unused)), void **bufptr __attribute__ ((unused)), void *buflim __attribute__ ((unused))) +{ + /* ROS2 doesn't do keys in a meaningful way yet */ + return true; +} + +static bool serdata_rmw_eqkey (const struct ddsi_serdata *a __attribute__ ((unused)), const struct ddsi_serdata *b __attribute__ ((unused))) +{ + /* ROS2 doesn't do keys in a meaningful way yet */ + return true; +} + +static const struct ddsi_serdata_ops serdata_rmw_ops = { + .eqkey = serdata_rmw_eqkey, + .get_size = serdata_rmw_size, + .from_ser = serdata_rmw_from_ser, + .from_keyhash = serdata_rmw_from_keyhash, + .from_sample = serdata_rmw_from_sample, + .to_ser = serdata_rmw_to_ser, + .to_ser_ref = serdata_rmw_to_ser_ref, + .to_ser_unref = serdata_rmw_to_ser_unref, + .to_sample = serdata_rmw_to_sample, + .to_topicless = serdata_rmw_to_topicless, + .topicless_to_sample = serdata_rmw_topicless_to_sample, + .free = serdata_rmw_free +}; + +static void sertopic_rmw_deinit (struct ddsi_sertopic *tpcmn) +{ + struct sertopic_rmw *tp = static_cast (tpcmn); + delete tp; +} + +static void sertopic_rmw_zero_samples (const struct ddsi_sertopic *d __attribute__ ((unused)), void *samples __attribute__ ((unused)), size_t count __attribute__ ((unused))) +{ + /* Not using code paths that rely on the samples getting zero'd out */ +} + +static void sertopic_rmw_realloc_samples (void **ptrs __attribute__ ((unused)), const struct ddsi_sertopic *d __attribute__ ((unused)), void *old __attribute__ ((unused)), size_t oldcount __attribute__ ((unused)), size_t count __attribute__ ((unused))) +{ + /* Not using code paths that rely on this (loans, dispose, unregister with instance handle, + content filters) */ + abort (); +} + +static void sertopic_rmw_free_samples (const struct ddsi_sertopic *d __attribute__ ((unused)), void **ptrs __attribute__ ((unused)), size_t count __attribute__ ((unused)), dds_free_op_t op) +{ + /* Not using code paths that rely on this (dispose, unregister with instance handle, content + filters) */ + assert (!(op & DDS_FREE_ALL_BIT)); +} + +static const struct ddsi_sertopic_ops sertopic_rmw_ops = { + .deinit = sertopic_rmw_deinit, + .zero_samples = sertopic_rmw_zero_samples, + .realloc_samples = sertopic_rmw_realloc_samples, + .free_samples = sertopic_rmw_free_samples +}; + +struct sertopic_rmw *create_sertopic (const char *topicname, const char *type_support_identifier, void *type_support, bool is_request_header) +{ + struct sertopic_rmw *st = new struct sertopic_rmw; + st->cpp_name = std::string (topicname); + st->cpp_type_name = std::string ("absent"); // FIXME: obviously a hack + st->cpp_name_type_name = st->cpp_name + std::string (";") + std::string (st->cpp_type_name); + st->ops = &sertopic_rmw_ops; + st->serdata_ops = &serdata_rmw_ops; + st->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->serdata_ops); + st->name_type_name = const_cast (st->cpp_name_type_name.c_str ()); + st->name = const_cast (st->cpp_name.c_str ()); + st->type_name = const_cast (st->cpp_type_name.c_str ()); + st->iid = ddsi_iid_gen (); + ddsrt_atomic_st32 (&st->refc, 1); + + st->type_support.typesupport_identifier_ = type_support_identifier; + st->type_support.type_support_ = type_support; + st->is_request_header = is_request_header; + return st; +} + +#pragma GCC diagnostic pop diff --git a/rmw_cyclonedds_cpp/src/serdes.cpp b/rmw_cyclonedds_cpp/src/serdes.cpp index a44e346..76acc62 100644 --- a/rmw_cyclonedds_cpp/src/serdes.cpp +++ b/rmw_cyclonedds_cpp/src/serdes.cpp @@ -1,53 +1,22 @@ #include "rmw_cyclonedds_cpp/serdes.hpp" -cycser::cycser(struct sertopic *topic) +cycser::cycser (std::vector& dst_) + : dst (dst_) + , off (0) { - st = ddsi_serstate_new(topic); - sd = nullptr; + dst.reserve (4); + unsigned char *magic = dst.data (); + /* FIXME: hard code to little endian ... and ignoring endianness in deser */ + magic[0] = 0; + magic[1] = 3; + /* options: */ + magic[2] = 0; + magic[3] = 0; } -cycser::~cycser() -{ - if (sd == nullptr) { - ddsi_serstate_release(st); - } else { - ddsi_serdata_unref(sd); - } -} - -cycser& cycser::ref() -{ - assert(sd != nullptr); - ddsi_serdata_ref(sd); - return *this; -} - -void cycser::unref() -{ - assert(sd != nullptr); - ddsi_serdata_unref(sd); -} - -cycser& cycser::fix() -{ - assert(sd == nullptr); - sd = ddsi_serstate_fix(st); - return *this; -} - -struct serdata *cycser::data() -{ - assert(sd != nullptr); - return sd; -} - -cycdeser::cycdeser(const void *data_, size_t size_) -{ - data = static_cast(data_); - lim = size_; - pos = 0; -} - -cycdeser::~cycdeser() +cycdeser::cycdeser (const void *data_, size_t size_) + : data (static_cast (data_) + 4) + , pos (0) + , lim (size_ - 4) { }