diff --git a/rmw_cyclonedds_cpp/CMakeLists.txt b/rmw_cyclonedds_cpp/CMakeLists.txt index 4a9e215..369d84b 100644 --- a/rmw_cyclonedds_cpp/CMakeLists.txt +++ b/rmw_cyclonedds_cpp/CMakeLists.txt @@ -52,12 +52,11 @@ include_directories(include) add_library(rmw_cyclonedds_cpp src/namespace_prefix.cpp src/rmw_node.cpp + src/serdata.cpp src/serdes.cpp ) -idlc_generate(rmw_cyclonedds_topic_lib src/rmw_cyclonedds_topic.idl) -target_link_libraries(rmw_cyclonedds_cpp - rmw_cyclonedds_topic_lib CycloneDDS::ddsc) +target_link_libraries(rmw_cyclonedds_cpp CycloneDDS::ddsc) ament_target_dependencies(rmw_cyclonedds_cpp "rcutils" @@ -72,7 +71,7 @@ configure_rmw_library(rmw_cyclonedds_cpp) # Causes the visibility macros to use dllexport rather than dllimport, # which is appropriate when building the dll but not consuming it. target_compile_definitions(${PROJECT_NAME} -PRIVATE "RMW_CYCLONEDDS_CPP_BUILDING_LIBRARY") + PRIVATE "RMW_CYCLONEDDS_CPP_BUILDING_LIBRARY") ament_export_include_directories(include) ament_export_libraries(rmw_cyclonedds_cpp) diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp index d5dd4d8..2d3f181 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport.hpp @@ -59,13 +59,13 @@ struct StringHelper if (!c_string) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", - "Failed to cast data as rosidl_generator_c__String") + "Failed to cast data as rosidl_generator_c__String"); return ""; } if (!c_string->data) { RCUTILS_LOG_ERROR_NAMED( "rmw_cyclonedds_cpp", - "rosidl_generator_c_String had invalid data") + "rosidl_generator_c_String had invalid data"); return ""; } return std::string(c_string->data); diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp index 9636449..e0ad3ed 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/TypeSupport_impl.hpp @@ -30,7 +30,7 @@ #include "rosidl_typesupport_introspection_c/message_introspection.h" #include "rosidl_typesupport_introspection_c/service_introspection.h" -#include "rosidl_generator_c/primitives_array_functions.h" +#include "rosidl_generator_c/primitives_sequence_functions.h" #include "serdes.hpp" @@ -38,37 +38,37 @@ namespace rmw_cyclonedds_cpp { template -struct GenericCArray; +struct GenericCSequence; // multiple definitions of ambiguous primitive types -SPECIALIZE_GENERIC_C_ARRAY(bool, bool) -SPECIALIZE_GENERIC_C_ARRAY(byte, uint8_t) -SPECIALIZE_GENERIC_C_ARRAY(char, char) -SPECIALIZE_GENERIC_C_ARRAY(float32, float) -SPECIALIZE_GENERIC_C_ARRAY(float64, double) -SPECIALIZE_GENERIC_C_ARRAY(int8, int8_t) -SPECIALIZE_GENERIC_C_ARRAY(int16, int16_t) -SPECIALIZE_GENERIC_C_ARRAY(uint16, uint16_t) -SPECIALIZE_GENERIC_C_ARRAY(int32, int32_t) -SPECIALIZE_GENERIC_C_ARRAY(uint32, uint32_t) -SPECIALIZE_GENERIC_C_ARRAY(int64, int64_t) -SPECIALIZE_GENERIC_C_ARRAY(uint64, uint64_t) +SPECIALIZE_GENERIC_C_SEQUENCE(bool, bool) +SPECIALIZE_GENERIC_C_SEQUENCE(byte, uint8_t) +SPECIALIZE_GENERIC_C_SEQUENCE(char, char) +SPECIALIZE_GENERIC_C_SEQUENCE(float32, float) +SPECIALIZE_GENERIC_C_SEQUENCE(float64, double) +SPECIALIZE_GENERIC_C_SEQUENCE(int8, int8_t) +SPECIALIZE_GENERIC_C_SEQUENCE(int16, int16_t) +SPECIALIZE_GENERIC_C_SEQUENCE(uint16, uint16_t) +SPECIALIZE_GENERIC_C_SEQUENCE(int32, int32_t) +SPECIALIZE_GENERIC_C_SEQUENCE(uint32, uint32_t) +SPECIALIZE_GENERIC_C_SEQUENCE(int64, int64_t) +SPECIALIZE_GENERIC_C_SEQUENCE(uint64, uint64_t) -typedef struct rosidl_generator_c__void__Array +typedef struct rosidl_generator_c__void__Sequence { void * data; /// The number of valid items in data size_t size; /// The number of allocated items in data size_t capacity; -} rosidl_generator_c__void__Array; +} rosidl_generator_c__void__Sequence; inline bool -rosidl_generator_c__void__Array__init( - rosidl_generator_c__void__Array * array, size_t size, size_t member_size) +rosidl_generator_c__void__Sequence__init( + rosidl_generator_c__void__Sequence * sequence, size_t size, size_t member_size) { - if (!array) { + if (!sequence) { return false; } void * data = nullptr; @@ -78,31 +78,31 @@ rosidl_generator_c__void__Array__init( return false; } } - array->data = data; - array->size = size; - array->capacity = size; + sequence->data = data; + sequence->size = size; + sequence->capacity = size; return true; } inline void -rosidl_generator_c__void__Array__fini(rosidl_generator_c__void__Array * array) +rosidl_generator_c__void__Sequence__fini(rosidl_generator_c__void__Sequence * sequence) { - if (!array) { + if (!sequence) { return; } - if (array->data) { + if (sequence->data) { // ensure that data and capacity values are consistent - assert(array->capacity > 0); - // finalize all array elements - free(array->data); - array->data = nullptr; - array->size = 0; - array->capacity = 0; + assert(sequence->capacity > 0); + // finalize all sequence elements + free(sequence->data); + sequence->data = nullptr; + sequence->size = 0; + sequence->capacity = 0; } else { // ensure that data, size, and capacity values are consistent - assert(0 == array->size); - assert(0 == array->capacity); + assert(0 == sequence->size); + assert(0 == sequence->capacity); } } @@ -231,7 +231,7 @@ void serialize_field( } else if (member->array_size_ && !member->is_upper_bound_) { ser.serializeA(static_cast(field), member->array_size_); } else { - auto & data = *reinterpret_cast::type *>(field); + auto & data = *reinterpret_cast::type *>(field); ser.serializeS(reinterpret_cast(data.data), data.size); } } @@ -264,7 +264,7 @@ void serialize_field( ser.serialize(tmpstring); } } else { - auto & string_array_field = *reinterpret_cast(field); + auto & string_array_field = *reinterpret_cast(field); std::vector cpp_string_vector; for (size_t i = 0; i < string_array_field.size; ++i) { cpp_string_vector.push_back( @@ -300,12 +300,12 @@ size_t get_array_size_and_assign_field( void * & subros_message, size_t, size_t) { - auto tmparray = static_cast(field); - if (member->is_upper_bound_ && tmparray->size > member->array_size_) { + auto tmpsequence = static_cast(field); + if (member->is_upper_bound_ && tmpsequence->size > member->array_size_) { throw std::runtime_error("vector overcomes the maximum length"); } - subros_message = reinterpret_cast(tmparray->data); - return tmparray->size; + subros_message = reinterpret_cast(tmpsequence->data); + return tmpsequence->size; } template @@ -434,10 +434,10 @@ void deserialize_field( } else if (member->array_size_ && !member->is_upper_bound_) { deser.deserializeA(static_cast(field), member->array_size_); } else { - auto & data = *reinterpret_cast::type *>(field); + auto & data = *reinterpret_cast::type *>(field); int32_t dsize = 0; deser >> dsize; - GenericCArray::init(&data, dsize); + GenericCSequence::init(&data, dsize); deser.deserializeA(reinterpret_cast(data.data), dsize); } } @@ -469,8 +469,8 @@ inline void deserialize_field( std::vector cpp_string_vector; deser >> cpp_string_vector; - auto & string_array_field = *reinterpret_cast(field); - if (!rosidl_generator_c__String__Array__init(&string_array_field, cpp_string_vector.size())) { + auto & string_array_field = *reinterpret_cast(field); + if (!rosidl_generator_c__String__Sequence__init(&string_array_field, cpp_string_vector.size())) { throw std::runtime_error("unable to initialize rosidl_generator_c__String array"); } @@ -521,8 +521,8 @@ inline size_t get_submessage_array_deserialize( // Deserialize length uint32_t vsize = 0; deser >> vsize; - auto tmparray = static_cast(field); - rosidl_generator_c__void__Array__init(tmparray, vsize, sub_members_size); + auto tmparray = static_cast(field); + rosidl_generator_c__void__Sequence__init(tmparray, vsize, sub_members_size); subros_message = reinterpret_cast(tmparray->data); return vsize; } diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp index 6c50a92..2d6b621 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/macros.hpp @@ -18,18 +18,18 @@ #include #include -#define SPECIALIZE_GENERIC_C_ARRAY(C_NAME, C_TYPE) \ +#define SPECIALIZE_GENERIC_C_SEQUENCE(C_NAME, C_TYPE) \ template<> \ - struct GenericCArray \ + struct GenericCSequence \ { \ - using type = rosidl_generator_c__ ## C_NAME ## __Array; \ + using type = rosidl_generator_c__ ## C_NAME ## __Sequence; \ \ static void fini(type * array) { \ - rosidl_generator_c__ ## C_NAME ## __Array__fini(array); \ + rosidl_generator_c__ ## C_NAME ## __Sequence__fini(array); \ } \ \ static bool init(type * array, size_t size) { \ - return rosidl_generator_c__ ## C_NAME ## __Array__init(array, size); \ + return rosidl_generator_c__ ## C_NAME ## __Sequence__init(array, size); \ } \ }; diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp new file mode 100644 index 0000000..6fe9ad4 --- /dev/null +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp @@ -0,0 +1,55 @@ +// Copyright 2019 ADLINK Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#ifndef SERDATA_HPP +#define SERDATA_HPP + +#include "dds/ddsi/ddsi_sertopic.h" +#include "dds/ddsi/ddsi_serdata.h" + +struct CddsTypeSupport { + void *type_support_; + const char *typesupport_identifier_; +}; + +struct sertopic_rmw : ddsi_sertopic { + CddsTypeSupport type_support; + bool is_request_header; + std::string cpp_name; + std::string cpp_type_name; + std::string cpp_name_type_name; +}; + +struct serdata_rmw : ddsi_serdata { + /* first two bytes of data is CDR encoding + second two bytes are encoding options */ + std::vector data; +}; + +typedef struct cdds_request_header { + uint64_t guid; + int64_t seq; +} cdds_request_header_t; + +typedef struct cdds_request_wrapper { + cdds_request_header_t header; + void *data; +} cdds_request_wrapper_t; + +void *create_message_type_support (const void *untyped_members, const char *typesupport_identifier); +void *create_request_type_support (const void *untyped_members, const char *typesupport_identifier); +void *create_response_type_support (const void *untyped_members, const char *typesupport_identifier); + +struct sertopic_rmw *create_sertopic (const char *topicname, const char *type_support_identifier, void *type_support, bool is_request_header); + +#endif diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp index efc2b64..0754fb0 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp +++ b/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdes.hpp @@ -20,235 +20,217 @@ #include #include -#include "ddsc/dds.h" - -extern "C" { - struct serstate; - struct serdata; - struct sertopic; - - struct serdata *ddsi_serdata_ref(struct serdata *serdata); - void ddsi_serdata_unref(struct serdata *serdata); - struct serstate *ddsi_serstate_new(const struct sertopic *topic); - struct serdata *ddsi_serstate_fix(struct serstate *st); - void ddsi_serstate_release(struct serstate *st); - void *ddsi_serstate_append(struct serstate *st, size_t n); - void *ddsi_serstate_append_align(struct serstate *st, size_t a); - void *ddsi_serstate_append_aligned(struct serstate *st, size_t n, size_t a); - void ddsi_serdata_getblob(void **raw, size_t *sz, struct serdata *serdata); -} - -template -struct simpletype : std::integral_constant< bool, std::is_arithmetic::value && !std::is_same::value > {}; - -template -struct complextype : std::integral_constant< bool, !(std::is_arithmetic::value && !std::is_same::value) > {}; - -template struct rank : rank {}; -template<> struct rank<0> {}; - class cycser { public: - cycser(struct sertopic *topic); - cycser() = delete; - ~cycser(); - cycser& fix(); - cycser& ref(); - void unref(); - struct serdata *data(); + cycser (std::vector& dst_); + cycser () = delete; - inline cycser& operator<<(bool x) { serialize(x); return *this; } - inline cycser& operator<<(char x) { serialize(x); return *this; } - inline cycser& operator<<(uint8_t x) { serialize(x); return *this; } - inline cycser& operator<<(int16_t x) { serialize(x); return *this; } - inline cycser& operator<<(uint16_t x) { serialize(x); return *this; } - inline cycser& operator<<(int32_t x) { serialize(x); return *this; } - inline cycser& operator<<(uint32_t x) { serialize(x); return *this; } - inline cycser& operator<<(int64_t x) { serialize(x); return *this; } - inline cycser& operator<<(uint64_t x) { serialize(x); return *this; } - inline cycser& operator<<(float x) { serialize(x); return *this; } - inline cycser& operator<<(double x) { serialize(x); return *this; } - inline cycser& operator<<(const char *x) { serialize(x); return *this; } - inline cycser& operator<<(const std::string& x) { serialize(x); return *this; } - template inline cycser& operator<<(const std::vector& x) { serialize(x); return *this; } - template inline cycser& operator<<(const std::array& x) { serialize(x); return *this; } + inline cycser& operator<< (bool x) { serialize (x); return *this; } + inline cycser& operator<< (char x) { serialize (x); return *this; } + inline cycser& operator<< (uint8_t x) { serialize (x); return *this; } + inline cycser& operator<< (int16_t x) { serialize (x); return *this; } + inline cycser& operator<< (uint16_t x) { serialize (x); return *this; } + inline cycser& operator<< (int32_t x) { serialize (x); return *this; } + inline cycser& operator<< (uint32_t x) { serialize (x); return *this; } + inline cycser& operator<< (int64_t x) { serialize (x); return *this; } + inline cycser& operator<< (uint64_t x) { serialize (x); return *this; } + inline cycser& operator<< (float x) { serialize (x); return *this; } + inline cycser& operator<< (double x) { serialize (x); return *this; } + inline cycser& operator<< (const char *x) { serialize (x); return *this; } + inline cycser& operator<< (const std::string& x) { serialize (x); return *this; } + template inline cycser& operator<< (const std::vector& x) { serialize (x); return *this; } + template inline cycser& operator<< (const std::array& x) { serialize (x); return *this; } -#define SIMPLE(T) inline void serialize(T x) { \ - T *p = (T *)ddsi_serstate_append_aligned(st, sizeof(T), sizeof(T)); \ - *p = x; \ - } - SIMPLE(char); - SIMPLE(unsigned char); - SIMPLE(int16_t); - SIMPLE(uint16_t); - SIMPLE(int32_t); - SIMPLE(uint32_t); - SIMPLE(int64_t); - SIMPLE(uint64_t); - SIMPLE(float); - SIMPLE(double); +#define SIMPLE(T) inline void serialize (T x) { \ + if ((off % sizeof (T)) != 0) { \ + off += sizeof (T) - (off % sizeof (T)); \ + } \ + reserve (off + sizeof (T)); \ + *(T *) (data () + off) = x; \ + off += sizeof (T); \ + } + SIMPLE (char); + SIMPLE (unsigned char); + SIMPLE (int16_t); + SIMPLE (uint16_t); + SIMPLE (int32_t); + SIMPLE (uint32_t); + SIMPLE (int64_t); + SIMPLE (uint64_t); + SIMPLE (float); + SIMPLE (double); #undef SIMPLE - inline void serialize(bool x) { - serialize(static_cast(x)); - } - inline void serialize(const char *x) - { - size_t sz = strlen(x) + 1; - serialize(static_cast(sz)); - char *p = (char *)ddsi_serstate_append(st, sz); - memcpy(p, x, sz); - } - inline void serialize(const std::string& x) - { - size_t sz = x.size(); - serialize(static_cast(sz+1)); - char *p = (char *)ddsi_serstate_append(st, sz+1); - memcpy(p, x.data(), sz); - p[sz] = 0; - } + inline void serialize (bool x) { + serialize (static_cast (x)); + } + inline void serialize (const char *x) + { + size_t sz = strlen (x) + 1; + serialize (static_cast (sz)); + reserve (off + sz); + memcpy (data () + off, x, sz); + off += sz; + } + inline void serialize (const std::string& x) + { + size_t sz = x.size () + 1; + serialize (static_cast (sz)); + reserve (off + sz); + memcpy (data () + off, x.data (), sz - 1); + *(data () + off + sz - 1) = 0; + } -#define SIMPLEA(T) inline void serializeA(const T *x, size_t cnt) { \ - void *p = (void *)ddsi_serstate_append_aligned(st, (cnt) * sizeof(T), sizeof(T)); \ - memcpy(p, (void *)x, (cnt) * sizeof(T)); \ - } - SIMPLEA(char); - SIMPLEA(unsigned char); - SIMPLEA(int16_t); - SIMPLEA(uint16_t); - SIMPLEA(int32_t); - SIMPLEA(uint32_t); - SIMPLEA(int64_t); - SIMPLEA(uint64_t); - SIMPLEA(float); - SIMPLEA(double); +#define SIMPLEA(T) inline void serializeA (const T *x, size_t cnt) { \ + if ((off % sizeof (T)) != 0) { \ + off += sizeof (T) - (off % sizeof (T)); \ + } \ + reserve (off + cnt * sizeof (T)); \ + memcpy (data () + off, (void *) x, cnt * sizeof (T)); \ + off += cnt * sizeof (T); \ + } + SIMPLEA (char); + SIMPLEA (unsigned char); + SIMPLEA (int16_t); + SIMPLEA (uint16_t); + SIMPLEA (int32_t); + SIMPLEA (uint32_t); + SIMPLEA (int64_t); + SIMPLEA (uint64_t); + SIMPLEA (float); + SIMPLEA (double); #undef SIMPLEA - template inline void serializeA(const T *x, size_t cnt) { - for (size_t i = 0; i < cnt; i++) serialize(x[i]); - } + template inline void serializeA (const T *x, size_t cnt) { + for (size_t i = 0; i < cnt; i++) serialize (x[i]); + } - template inline void serialize(const std::vector& x) - { - serialize(static_cast(x.size())); - if (x.size() > 0) serializeA(x.data(), x.size()); - } - inline void serialize(const std::vector& x) { - serialize(static_cast(x.size())); - for (auto&& i : x) serialize(i); - } + template inline void serialize (const std::vector& x) + { + serialize (static_cast (x.size ())); + if (x.size () > 0) serializeA (x.data (), x.size ()); + } + inline void serialize (const std::vector& x) { + serialize (static_cast (x.size ())); + for (auto&& i : x) serialize (i); + } - template inline void serializeS(const T *x, size_t cnt) - { - serialize(static_cast(cnt)); - if (cnt > 0) serializeA(x, cnt); - } + template inline void serializeS (const T *x, size_t cnt) + { + serialize (static_cast (cnt)); + if (cnt > 0) serializeA (x, cnt); + } private: - struct serstate *st; - struct serdata *sd; + inline void reserve (size_t n) { dst.reserve (n + 4); } + inline unsigned char *data () { return dst.data () + 4; } + + std::vector& dst; + size_t off; }; class cycdeser { public: - // FIXME: byteswapping - cycdeser(const void *data, size_t size); - cycdeser() = delete; - ~cycdeser(); + // FIXME: byteswapping + cycdeser (const void *data, size_t size); + cycdeser () = delete; - inline cycdeser& operator>>(bool& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(char& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint8_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(int16_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint16_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(int32_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint32_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(int64_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(uint64_t& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(float& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(double& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(char *& x) { deserialize(x); return *this; } - inline cycdeser& operator>>(std::string& x) { deserialize(x); return *this; } - template inline cycdeser& operator>>(std::vector& x) { deserialize(x); return *this; } - template inline cycdeser& operator>>(std::array& x) { deserialize(x); return *this; } + inline cycdeser& operator>> (bool& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (char& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint8_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (int16_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint16_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (int32_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint32_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (int64_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (uint64_t& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (float& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (double& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (char *& x) { deserialize (x); return *this; } + inline cycdeser& operator>> (std::string& x) { deserialize (x); return *this; } + template inline cycdeser& operator>> (std::vector& x) { deserialize (x); return *this; } + template inline cycdeser& operator>> (std::array& x) { deserialize (x); return *this; } -#define SIMPLE(T) inline void deserialize(T& x) { \ - align(sizeof(x)); \ - x = *reinterpret_cast(data); \ - data += sizeof(x); \ - } - SIMPLE(char); - SIMPLE(unsigned char); - SIMPLE(int16_t); - SIMPLE(uint16_t); - SIMPLE(int32_t); - SIMPLE(uint32_t); - SIMPLE(int64_t); - SIMPLE(uint64_t); - SIMPLE(float); - SIMPLE(double); +#define SIMPLE(T) inline void deserialize (T& x) { \ + align (sizeof (x)); \ + x = *reinterpret_cast (data + pos); \ + pos += sizeof (x); \ + } + SIMPLE (char); + SIMPLE (unsigned char); + SIMPLE (int16_t); + SIMPLE (uint16_t); + SIMPLE (int32_t); + SIMPLE (uint32_t); + SIMPLE (int64_t); + SIMPLE (uint64_t); + SIMPLE (float); + SIMPLE (double); #undef SIMPLE - inline void deserialize(bool& x) { - unsigned char z; deserialize(z); x = (z != 0); - } - inline uint32_t deserialize32() { - uint32_t sz; deserialize(sz); return sz; - } - inline void deserialize(char *& x) { - const uint32_t sz = deserialize32(); x = (char *)malloc(sz); memcpy(x, data, sz); data += sz; - } - inline void deserialize(std::string& x) { - const uint32_t sz = deserialize32(); x = std::string(data, sz-1); data += sz; - } + inline void deserialize (bool& x) { + unsigned char z; deserialize (z); x = (z != 0); + } + inline uint32_t deserialize32 () { + uint32_t sz; deserialize (sz); return sz; + } + inline void deserialize (char *& x) { + const uint32_t sz = deserialize32 (); + x = (char *) malloc (sz); + memcpy (x, data + pos, sz); + pos += sz; + } + inline void deserialize (std::string& x) { + const uint32_t sz = deserialize32 (); + x = std::string (data + pos, sz-1); + pos += sz; + } -#define SIMPLEA(T) inline void deserializeA(T *x, size_t cnt) { \ - align(sizeof(T)); \ - memcpy((void *)x, (void *)data, (cnt) * sizeof(T)); \ - data += (cnt) * sizeof(T); \ - } - SIMPLEA(char); - SIMPLEA(unsigned char); - SIMPLEA(int16_t); - SIMPLEA(uint16_t); - SIMPLEA(int32_t); - SIMPLEA(uint32_t); - SIMPLEA(int64_t); - SIMPLEA(uint64_t); - SIMPLEA(float); - SIMPLEA(double); +#define SIMPLEA(T) inline void deserializeA (T *x, size_t cnt) { \ + align (sizeof (T)); \ + memcpy ((void *) x, (void *) (data + pos), (cnt) * sizeof (T)); \ + pos += (cnt) * sizeof (T); \ + } + SIMPLEA (char); + SIMPLEA (unsigned char); + SIMPLEA (int16_t); + SIMPLEA (uint16_t); + SIMPLEA (int32_t); + SIMPLEA (uint32_t); + SIMPLEA (int64_t); + SIMPLEA (uint64_t); + SIMPLEA (float); + SIMPLEA (double); #undef SIMPLEA - template inline void deserializeA(T *x, size_t cnt) { - for (size_t i = 0; i < cnt; i++) deserialize(x[i]); - } + template inline void deserializeA (T *x, size_t cnt) { + for (size_t i = 0; i < cnt; i++) deserialize (x[i]); + } - template inline void deserialize(std::vector& x) { - const uint32_t sz = deserialize32(); - x.resize(sz); - deserializeA(x.data(), sz); - } - inline void deserialize(std::vector& x) { - const uint32_t sz = deserialize32(); - x.resize(sz); - for (auto&& i : x) i = (data[i] != 0); - data += sz; - } - template inline void deserialize(std::array& x) { - deserializeA(x.data(), x.size()); - } + template inline void deserialize (std::vector& x) { + const uint32_t sz = deserialize32 (); + x.resize (sz); + deserializeA (x.data (), sz); + } + inline void deserialize (std::vector& x) { + const uint32_t sz = deserialize32 (); + x.resize (sz); + for (auto&& i : x) i = ((data + pos)[i] != 0); + pos += sz; + } + template inline void deserialize (std::array& x) { + deserializeA (x.data (), x.size ()); + } private: - inline void align(size_t a) - { - const uintptr_t x = reinterpret_cast(data); - if ((x % a) != 0) { - const uintptr_t pad = a - (x % a); - data = reinterpret_cast(x + pad); + inline void align (size_t a) + { + if ((pos % a) != 0) { + pos += a - (pos % a); + } } - } - const char *data; - size_t pos; - size_t lim; // ignored for now ... better provide correct input + const char *data; + size_t pos; + size_t lim; // ignored for now ... better provide correct input }; #endif diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index 6dd8b7d..31426ac 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -22,45 +22,22 @@ - topic creation: shouldn't leak topics - - hash set of writer handles should be thread safe: no guarantee that no writers get - added/deleted in parallel to each other or to takes - - - introspection stuff not done yet (probably requires additions to cdds) + - introspection stuff not done yet - check / make sure a node remains valid while one of its subscriptions exists - - writecdr/takecdr interface abuse is beyond redemption - - service/client simply use the instance handle of its publisher as its GUID -- yikes! but it is actually only kinda wrong because the instance handles allocated by different instance of cdds are actually taken from uncorrelated (close to uncorrelated anyway) permutations of 64-bit unsigned integers - - ... and many more things ... + - ... and probably many more things ... */ -#include -#include -#include -#include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include #include -#include "rcutils/allocator.h" -#include "rcutils/filesystem.h" #include "rcutils/logging_macros.h" -#include "rcutils/strdup.h" -#include "rcutils/types.h" #include "rmw/allocators.h" #include "rmw/convert_rcutils_ret_to_rmw_ret.h" @@ -71,71 +48,60 @@ #include "rmw/impl/cpp/macros.hpp" -#include "namespace_prefix.hpp" - #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" #include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" -#include "ddsc/dds.h" +#include "namespace_prefix.hpp" -#include "rmw_cyclonedds_topic.h" +#include "dds/dds.h" +#include "dds/ddsi/ddsi_sertopic.h" #include "rmw_cyclonedds_cpp/serdes.hpp" +#include "rmw_cyclonedds_cpp/serdata.hpp" -#define RET_ERR_X(msg, code) do { RMW_SET_ERROR_MSG(msg); code; } while(0) -#define RET_NULL_X(var, code) do { if (!var) RET_ERR_X(#var " is null", code); } while(0) -#define RET_ALLOC_X(var, code) do { if (!var) RET_ERR_X("failed to allocate " #var, code); } while(0) +#define RET_ERR_X(msg, code) do { RMW_SET_ERROR_MSG (msg); code; } while (0) +#define RET_NULL_X(var, code) do { if (!var) RET_ERR_X (#var " is null", code); } while (0) +#define RET_ALLOC_X(var, code) do { if (!var) RET_ERR_X ("failed to allocate " #var, code); } while (0) #define RET_WRONG_IMPLID_X(var, code) do { \ - RET_NULL_X(var, code); \ + RET_NULL_X (var, code); \ if ((var)->implementation_identifier != adlink_cyclonedds_identifier) { \ - RET_ERR_X(#var " not from this implementation", code); \ + RET_ERR_X (#var " not from this implementation", code); \ } \ - } while(0) + } while (0) #define RET_NULL_OR_EMPTYSTR_X(var, code) do { \ - if(!var || strlen(var) == 0) { \ - RET_ERR_X(#var " is null or empty string", code); \ + if (!var || strlen (var) == 0) { \ + RET_ERR_X (#var " is null or empty string", code); \ } \ - } while(0) -#define RET_ERR(msg) RET_ERR_X(msg, return RMW_RET_ERROR) -#define RET_NULL(var) RET_NULL_X(var, return RMW_RET_ERROR) -#define RET_ALLOC(var) RET_ALLOC_X(var, return RMW_RET_ERROR) -#define RET_WRONG_IMPLID(var) RET_WRONG_IMPLID_X(var, return RMW_RET_ERROR) -#define RET_NULL_OR_EMPTYSTR(var) RET_NULL_OR_EMPTYSTR_X(var, return RMW_RET_ERROR) + } while (0) +#define RET_ERR(msg) RET_ERR_X (msg, return RMW_RET_ERROR) +#define RET_NULL(var) RET_NULL_X (var, return RMW_RET_ERROR) +#define RET_ALLOC(var) RET_ALLOC_X (var, return RMW_RET_ERROR) +#define RET_WRONG_IMPLID(var) RET_WRONG_IMPLID_X (var, return RMW_RET_ERROR) +#define RET_NULL_OR_EMPTYSTR(var) RET_NULL_OR_EMPTYSTR_X (var, return RMW_RET_ERROR) const char *const adlink_cyclonedds_identifier = "rmw_cyclonedds_cpp"; -struct CddsTypeSupport { - void *type_support_; - const char *typesupport_identifier_; -}; - /* instance handles are unsigned 64-bit integers carefully constructed to be as close to uniformly distributed as possible for no other reason than making them near-perfect hash keys, hence we can improve over the default hash function */ struct dds_instance_handle_hash { public: - std::size_t operator()(dds_instance_handle_t const& x) const noexcept { - return static_cast(x); + std::size_t operator () (dds_instance_handle_t const& x) const noexcept { + return static_cast (x); } }; struct CddsNode { rmw_guard_condition_t *graph_guard_condition; - std::unordered_set own_writers; }; struct CddsPublisher { dds_entity_t pubh; dds_instance_handle_t pubiid; - struct sertopic *sertopic; - CddsTypeSupport ts; }; struct CddsSubscription { dds_entity_t subh; dds_entity_t rdcondh; - CddsNode *node; - bool ignore_local_publications; - CddsTypeSupport ts; }; struct CddsCS { @@ -173,154 +139,31 @@ struct Cdds { uint32_t refcount; dds_entity_t ppant; std::unordered_set waitsets; - Cdds() : refcount(0), ppant(0) {} + Cdds () : refcount (0), ppant (0) {} }; -typedef struct cdds_request_header { - uint64_t guid; - int64_t seq; -} cdds_request_header_t; - static Cdds gcdds; -using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; -using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; -using RequestTypeSupport_c = rmw_cyclonedds_cpp::RequestTypeSupport; -using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport; -using ResponseTypeSupport_c = rmw_cyclonedds_cpp::ResponseTypeSupport; -using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport; +static void clean_waitset_caches (); -extern "C" { - struct dds_entity; - struct dds_domain; - int32_t dds_entity_lock(dds_entity_t hdl, dds_entity_kind_t kind, struct dds_entity **e); - void dds_entity_unlock(struct dds_entity *e); - struct sertopic *dds_topic_lookup (struct dds_domain *domain, const char *name); - dds_domain *dds__entity_domain(dds_entity* e); - void sertopic_free (struct sertopic * tp); -} +#pragma GCC visibility push (default) -static void clean_waitset_caches(); - -static struct sertopic *get_sertopic(dds_entity_t topic, const std::string& name) -{ - struct dds_entity *x; - struct sertopic *sertopic; - if (dds_entity_lock(topic, DDS_KIND_TOPIC, &x) < 0) { - abort(); - } - sertopic = dds_topic_lookup(dds__entity_domain(x), name.c_str()); - dds_entity_unlock(x); - return sertopic; -} - -static bool using_introspection_c_typesupport(const char *typesupport_identifier) -{ - return typesupport_identifier == rosidl_typesupport_introspection_c__identifier; -} - -static bool using_introspection_cpp_typesupport(const char *typesupport_identifier) -{ - return typesupport_identifier == rosidl_typesupport_introspection_cpp::typesupport_identifier; -} - -static void *create_message_type_support(const void *untyped_members, const char *typesupport_identifier) -{ - if (using_introspection_c_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new MessageTypeSupport_c(members); - } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new MessageTypeSupport_cpp(members); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return nullptr; -} - -static void *create_request_type_support(const void *untyped_members, const char *typesupport_identifier) -{ - if (using_introspection_c_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new RequestTypeSupport_c(members); - } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new RequestTypeSupport_cpp(members); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return nullptr; -} - -static void *create_response_type_support(const void *untyped_members, const char *typesupport_identifier) -{ - if (using_introspection_c_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new ResponseTypeSupport_c(members); - } else if (using_introspection_cpp_typesupport(typesupport_identifier)) { - auto members = static_cast(untyped_members); - return new ResponseTypeSupport_cpp(members); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return nullptr; -} - -template const void *get_request_ptr(const void *untyped_service_members) -{ - auto service_members = static_cast(untyped_service_members); - RET_NULL_X(service_members, return nullptr); - return service_members->request_members_; -} - -template const void *get_response_ptr(const void *untyped_service_members) -{ - auto service_members = static_cast(untyped_service_members); - RET_NULL_X(service_members, return nullptr); - return service_members->response_members_; -} - -static bool sermsg(const void *ros_message, cycser& ser, std::function prefix, const CddsTypeSupport& ts) -{ - if (using_introspection_c_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->serializeROSmessage(ros_message, ser, prefix); - } else if (using_introspection_cpp_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->serializeROSmessage(ros_message, ser, prefix); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return false; -} - -static bool desermsg(cycdeser& deser, void *ros_message, std::function prefix, const CddsTypeSupport& ts) -{ - if (using_introspection_c_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->deserializeROSmessage(deser, ros_message, prefix); - } else if (using_introspection_cpp_typesupport(ts.typesupport_identifier_)) { - auto typed_typesupport = static_cast(ts.type_support_); - return typed_typesupport->deserializeROSmessage(deser, ros_message, prefix); - } - RMW_SET_ERROR_MSG("Unknown typesupport identifier"); - return false; -} - -#pragma GCC visibility push(default) - -extern "C" const char *rmw_get_implementation_identifier() +extern "C" const char *rmw_get_implementation_identifier () { return adlink_cyclonedds_identifier; } -extern "C" rmw_ret_t rmw_init() +extern "C" rmw_ret_t rmw_init (const rmw_init_options_t *options __attribute__ ((unused)), rmw_context_t *context __attribute__ ((unused))) { return RMW_RET_OK; } -static dds_entity_t ref_ppant() +static dds_entity_t ref_ppant () { - std::lock_guard lock(gcdds.lock); + std::lock_guard lock (gcdds.lock); if (gcdds.refcount == 0) { - if ((gcdds.ppant = dds_create_participant(DDS_DOMAIN_DEFAULT, nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create participant"); + if ((gcdds.ppant = dds_create_participant (DDS_DOMAIN_DEFAULT, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create participant"); return gcdds.ppant; } } @@ -328,11 +171,11 @@ static dds_entity_t ref_ppant() return gcdds.ppant; } -static void unref_ppant() +static void unref_ppant () { - std::lock_guard lock(gcdds.lock); + std::lock_guard lock (gcdds.lock); if (--gcdds.refcount == 0) { - dds_delete(gcdds.ppant); + dds_delete (gcdds.ppant); gcdds.ppant = 0; } } @@ -343,82 +186,82 @@ static void unref_ppant() /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_node_t *rmw_create_node(const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options) +extern "C" rmw_node_t *rmw_create_node (rmw_context_t *context __attribute__ ((unused)), const char *name, const char *namespace_, size_t domain_id, const rmw_node_security_options_t *security_options) { - RET_NULL_X(name, return nullptr); - RET_NULL_X(namespace_, return nullptr); - (void)domain_id; - (void)security_options; + RET_NULL_X (name, return nullptr); + RET_NULL_X (namespace_, return nullptr); + (void) domain_id; + (void) security_options; - dds_entity_t pp = ref_ppant(); + dds_entity_t pp = ref_ppant (); if (pp < 0) { return nullptr; } - auto *node_impl = new CddsNode(); + auto *node_impl = new CddsNode (); rmw_node_t *node_handle = nullptr; - RET_ALLOC_X(node_impl, goto fail_node_impl); + RET_ALLOC_X (node_impl, goto fail_node_impl); rmw_guard_condition_t *graph_guard_condition; - if (!(graph_guard_condition = rmw_create_guard_condition())) { + if (!(graph_guard_condition = rmw_create_guard_condition (context))) { goto fail_ggc; } node_impl->graph_guard_condition = graph_guard_condition; - node_handle = rmw_node_allocate(); - RET_ALLOC_X(node_handle, goto fail_node_handle); + node_handle = rmw_node_allocate (); + RET_ALLOC_X (node_handle, goto fail_node_handle); node_handle->implementation_identifier = adlink_cyclonedds_identifier; node_handle->data = node_impl; - node_handle->name = static_cast(rmw_allocate(sizeof(char) * strlen(name) + 1)); - RET_ALLOC_X(node_handle->name, goto fail_node_handle_name); - memcpy(const_cast(node_handle->name), name, strlen(name) + 1); + node_handle->name = static_cast (rmw_allocate (sizeof (char) * strlen (name) + 1)); + RET_ALLOC_X (node_handle->name, goto fail_node_handle_name); + memcpy (const_cast (node_handle->name), name, strlen (name) + 1); - node_handle->namespace_ = static_cast(rmw_allocate(sizeof(char) * strlen(namespace_) + 1)); - RET_ALLOC_X(node_handle->namespace_, goto fail_node_handle_namespace); - memcpy(const_cast(node_handle->namespace_), namespace_, strlen(namespace_) + 1); + node_handle->namespace_ = static_cast (rmw_allocate (sizeof (char) * strlen (namespace_) + 1)); + RET_ALLOC_X (node_handle->namespace_, goto fail_node_handle_namespace); + memcpy (const_cast (node_handle->namespace_), namespace_, strlen (namespace_) + 1); return node_handle; #if 0 fail_add_node: - rmw_free(const_cast(node_handle->namespace_)); + rmw_free (const_cast (node_handle->namespace_)); #endif fail_node_handle_namespace: - rmw_free(const_cast(node_handle->name)); + rmw_free (const_cast (node_handle->name)); fail_node_handle_name: - rmw_node_free(node_handle); + rmw_node_free (node_handle); fail_node_handle: - if (RMW_RET_OK != rmw_destroy_guard_condition(graph_guard_condition)) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); + if (RMW_RET_OK != rmw_destroy_guard_condition (graph_guard_condition)) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy guard condition during error handling"); } fail_ggc: delete node_impl; fail_node_impl: - unref_ppant(); + unref_ppant (); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_node(rmw_node_t *node) +extern "C" rmw_ret_t rmw_destroy_node (rmw_node_t *node) { rmw_ret_t result_ret = RMW_RET_OK; - RET_WRONG_IMPLID(node); - auto node_impl = static_cast(node->data); - RET_NULL(node_impl); - rmw_free(const_cast(node->name)); - rmw_free(const_cast(node->namespace_)); - rmw_node_free(node); - if (RMW_RET_OK != rmw_destroy_guard_condition(node_impl->graph_guard_condition)) { - RMW_SET_ERROR_MSG("failed to destroy graph guard condition"); + RET_WRONG_IMPLID (node); + auto node_impl = static_cast (node->data); + RET_NULL (node_impl); + rmw_free (const_cast (node->name)); + rmw_free (const_cast (node->namespace_)); + rmw_node_free (node); + if (RMW_RET_OK != rmw_destroy_guard_condition (node_impl->graph_guard_condition)) { + RMW_SET_ERROR_MSG ("failed to destroy graph guard condition"); result_ret = RMW_RET_ERROR; } delete node_impl; - unref_ppant(); + unref_ppant (); return result_ret; } -extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition(const rmw_node_t *node) +extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition (const rmw_node_t *node) { - RET_WRONG_IMPLID_X(node, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); + RET_WRONG_IMPLID_X (node, return nullptr); + auto node_impl = static_cast (node->data); + RET_NULL_X (node_impl, return nullptr); return node_impl->graph_guard_condition; } @@ -428,222 +271,205 @@ extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition(const /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" { - int dds_writecdr(dds_entity_t writer, struct serdata *serdata); -} - -static rmw_ret_t rmw_write_ser(dds_entity_t pubh, cycser& sd) +extern "C" rmw_ret_t rmw_publish (const rmw_publisher_t *publisher, const void *ros_message) { - if (dds_writecdr(pubh, sd.fix().ref().data()) >= 0) { + RET_WRONG_IMPLID (publisher); + RET_NULL (ros_message); + auto pub = static_cast (publisher->data); + assert (pub); + if (dds_write (pub->pubh, ros_message) >= 0) { return RMW_RET_OK; } else { /* FIXME: what is the expected behavior when it times out? */ - RMW_SET_ERROR_MSG("cannot publish data"); + RMW_SET_ERROR_MSG ("cannot publish data"); //return RMW_RET_ERROR; return RMW_RET_OK; } } -extern "C" rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message) -{ - RET_WRONG_IMPLID(publisher); - RET_NULL(ros_message); - auto pub = static_cast(publisher->data); - assert(pub); - cycser sd(pub->sertopic); - if (sermsg(ros_message, sd, nullptr, pub->ts)) { - return rmw_write_ser(pub->pubh, sd); - } else { - RMW_SET_ERROR_MSG("cannot serialize data"); - return RMW_RET_ERROR; - } -} - -static const rosidl_message_type_support_t *get_typesupport(const rosidl_message_type_support_t *type_supports) +static const rosidl_message_type_support_t *get_typesupport (const rosidl_message_type_support_t *type_supports) { const rosidl_message_type_support_t *ts; - if ((ts = get_message_typesupport_handle(type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + if ((ts = get_message_typesupport_handle (type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { return ts; - } else if ((ts = get_message_typesupport_handle(type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + } else if ((ts = get_message_typesupport_handle (type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { return ts; } else { - RMW_SET_ERROR_MSG("type support not from this implementation"); + RMW_SET_ERROR_MSG ("type support not from this implementation"); return nullptr; } } -static std::string make_fqtopic(const char *prefix, const char *topic_name, const char *suffix, bool avoid_ros_namespace_conventions) +static std::string make_fqtopic (const char *prefix, const char *topic_name, const char *suffix, bool avoid_ros_namespace_conventions) { if (avoid_ros_namespace_conventions) { - return std::string(topic_name) + "__" + std::string(suffix); + return std::string (topic_name) + "__" + std::string (suffix); } else { - return std::string(prefix) + "/" + make_fqtopic(prefix, topic_name, suffix, true); + return std::string (prefix) + "/" + make_fqtopic (prefix, topic_name, suffix, true); } } -static std::string make_fqtopic(const char *prefix, const char *topic_name, const char *suffix, const rmw_qos_profile_t *qos_policies) +static std::string make_fqtopic (const char *prefix, const char *topic_name, const char *suffix, const rmw_qos_profile_t *qos_policies) { - return make_fqtopic(prefix, topic_name, suffix, qos_policies->avoid_ros_namespace_conventions); + return make_fqtopic (prefix, topic_name, suffix, qos_policies->avoid_ros_namespace_conventions); } -static dds_qos_t *create_readwrite_qos(const rmw_qos_profile_t *qos_policies) +static dds_qos_t *create_readwrite_qos (const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { - dds_qos_t *qos = dds_qos_create(); + dds_qos_t *qos = dds_create_qos (); switch (qos_policies->history) { case RMW_QOS_POLICY_HISTORY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_HISTORY_KEEP_LAST: if (qos_policies->depth > INT32_MAX) { - RMW_SET_ERROR_MSG("unsupported history depth"); - dds_qos_delete(qos); + RMW_SET_ERROR_MSG ("unsupported history depth"); + dds_delete_qos (qos); return nullptr; } - dds_qset_history(qos, DDS_HISTORY_KEEP_LAST, static_cast(qos_policies->depth)); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, static_cast (qos_policies->depth)); break; case RMW_QOS_POLICY_HISTORY_KEEP_ALL: - dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); break; } switch (qos_policies->reliability) { case RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_RELIABILITY_RELIABLE: - dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1)); break; case RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT: - dds_qset_reliability(qos, DDS_RELIABILITY_BEST_EFFORT, 0); + dds_qset_reliability (qos, DDS_RELIABILITY_BEST_EFFORT, 0); break; } switch (qos_policies->durability) { case RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT: case RMW_QOS_POLICY_DURABILITY_VOLATILE: - dds_qset_durability(qos, DDS_DURABILITY_VOLATILE); + dds_qset_durability (qos, DDS_DURABILITY_VOLATILE); break; case RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL: - dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL); break; } + if (ignore_local_publications) { + dds_qset_ignorelocal (qos, DDS_IGNORELOCAL_PARTICIPANT); + } return qos; } -static CddsPublisher *create_cdds_publisher(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) +static CddsPublisher *create_cdds_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) { - RET_WRONG_IMPLID_X(node, return nullptr); - RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); - RET_NULL_X(qos_policies, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); - const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); - RET_NULL_X(type_support, return nullptr); - CddsPublisher *pub = new CddsPublisher(); + RET_WRONG_IMPLID_X (node, return nullptr); + RET_NULL_OR_EMPTYSTR_X (topic_name, return nullptr); + RET_NULL_X (qos_policies, return nullptr); + auto node_impl = static_cast (node->data); + RET_NULL_X (node_impl, return nullptr); + const rosidl_message_type_support_t *type_support = get_typesupport (type_supports); + RET_NULL_X (type_support, return nullptr); + CddsPublisher *pub = new CddsPublisher (); dds_entity_t topic; dds_qos_t *qos; - pub->ts.typesupport_identifier_ = type_support->typesupport_identifier; - pub->ts.type_support_ = create_message_type_support(type_support->data, pub->ts.typesupport_identifier_); - std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); + std::string fqtopic_name = make_fqtopic (ros_topic_prefix, topic_name, "", qos_policies); - if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); - goto fail_topic; + auto sertopic = create_sertopic (topic_name, type_support->typesupport_identifier, create_message_type_support (type_support->data, type_support->typesupport_identifier), false); + if ((topic = dds_create_topic_arbitrary (gcdds.ppant, sertopic, fqtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); + goto fail_topic; } - if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { + if ((qos = create_readwrite_qos (qos_policies, false)) == nullptr) { goto fail_qos; } - if ((pub->pubh = dds_create_writer(gcdds.ppant, topic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create writer"); + if ((pub->pubh = dds_create_writer (gcdds.ppant, topic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create writer"); goto fail_writer; } - pub->sertopic = get_sertopic(topic, fqtopic_name); - if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { - RMW_SET_ERROR_MSG("failed to get instance handle for writer"); + if (dds_get_instance_handle (pub->pubh, &pub->pubiid) < 0) { + RMW_SET_ERROR_MSG ("failed to get instance handle for writer"); goto fail_instance_handle; } - dds_qos_delete(qos); - node_impl->own_writers.insert(pub->pubiid); + dds_delete_qos (qos); + ddsi_sertopic_unref (sertopic); /* FIXME: leak the topic for now */ return pub; fail_instance_handle: - if (dds_delete(pub->pubh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); + if (dds_delete (pub->pubh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to destroy writer during error handling"); } fail_writer: - dds_qos_delete(qos); + dds_delete_qos (qos); fail_qos: /* not deleting topic -- have to sort out proper topic handling & that requires fixing a few things in cyclone as well */ fail_topic: delete pub; + ddsi_sertopic_unref (sertopic); return nullptr; } -extern "C" rmw_publisher_t *rmw_create_publisher(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) +extern "C" rmw_publisher_t *rmw_create_publisher (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies) { CddsPublisher *pub; rmw_publisher_t *rmw_publisher; - auto node_impl = static_cast(node->data); - if ((pub = create_cdds_publisher(node, type_supports, topic_name, qos_policies)) == nullptr) { + if ((pub = create_cdds_publisher (node, type_supports, topic_name, qos_policies)) == nullptr) { goto fail_common_init; } - rmw_publisher = rmw_publisher_allocate(); - RET_ALLOC_X(rmw_publisher, goto fail_publisher); + rmw_publisher = rmw_publisher_allocate (); + RET_ALLOC_X (rmw_publisher, goto fail_publisher); rmw_publisher->implementation_identifier = adlink_cyclonedds_identifier; rmw_publisher->data = pub; - rmw_publisher->topic_name = reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); - RET_ALLOC_X(rmw_publisher->topic_name, goto fail_topic_name); - memcpy(const_cast(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1); + rmw_publisher->topic_name = reinterpret_cast (rmw_allocate (strlen (topic_name) + 1)); + RET_ALLOC_X (rmw_publisher->topic_name, goto fail_topic_name); + memcpy (const_cast (rmw_publisher->topic_name), topic_name, strlen (topic_name) + 1); return rmw_publisher; fail_topic_name: - rmw_publisher_free(rmw_publisher); + rmw_publisher_free (rmw_publisher); fail_publisher: - node_impl->own_writers.erase(pub->pubiid); - if (dds_delete(pub->pubh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); + if (dds_delete (pub->pubh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete writer during error handling"); } delete pub; fail_common_init: return nullptr; } -extern "C" rmw_ret_t rmw_get_gid_for_publisher(const rmw_publisher_t *publisher, rmw_gid_t *gid) +extern "C" rmw_ret_t rmw_get_gid_for_publisher (const rmw_publisher_t *publisher, rmw_gid_t *gid) { - RET_WRONG_IMPLID(publisher); - RET_NULL(gid); - auto pub = static_cast(publisher->data); - RET_NULL(pub); + RET_WRONG_IMPLID (publisher); + RET_NULL (gid); + auto pub = static_cast (publisher->data); + RET_NULL (pub); gid->implementation_identifier = adlink_cyclonedds_identifier; - memset(gid->data, 0, sizeof(gid->data)); - assert(sizeof(pub->pubiid) <= sizeof(gid->data)); - memcpy(gid->data, &pub->pubiid, sizeof(pub->pubiid)); + memset (gid->data, 0, sizeof (gid->data)); + assert (sizeof (pub->pubiid) <= sizeof (gid->data)); + memcpy (gid->data, &pub->pubiid, sizeof (pub->pubiid)); return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_compare_gids_equal(const rmw_gid_t *gid1, const rmw_gid_t *gid2, bool *result) +extern "C" rmw_ret_t rmw_compare_gids_equal (const rmw_gid_t *gid1, const rmw_gid_t *gid2, bool *result) { - RET_WRONG_IMPLID(gid1); - RET_WRONG_IMPLID(gid2); - RET_NULL(result); + RET_WRONG_IMPLID (gid1); + RET_WRONG_IMPLID (gid2); + RET_NULL (result); /* alignment is potentially lost because of the translation to an array of bytes, so use memcmp instead of a simple integer comparison */ - *result = memcmp(gid1->data, gid2->data, sizeof(gid1->data)) == 0; + *result = memcmp (gid1->data, gid2->data, sizeof (gid1->data)) == 0; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t *node, rmw_publisher_t *publisher) +extern "C" rmw_ret_t rmw_destroy_publisher (rmw_node_t *node, rmw_publisher_t *publisher) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(publisher); - auto node_impl = static_cast(node->data); - auto pub = static_cast(publisher->data); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (publisher); + auto pub = static_cast (publisher->data); if (pub != nullptr) { - node_impl->own_writers.erase(pub->pubiid); - if (dds_delete(pub->pubh) < 0) { - RMW_SET_ERROR_MSG("failed to delete writer"); + if (dds_delete (pub->pubh) < 0) { + RMW_SET_ERROR_MSG ("failed to delete writer"); } delete pub; } - rmw_free(const_cast(publisher->topic_name)); + rmw_free (const_cast (publisher->topic_name)); publisher->topic_name = nullptr; - rmw_publisher_free(publisher); + rmw_publisher_free (publisher); return RMW_RET_OK; } @@ -653,147 +479,136 @@ extern "C" rmw_ret_t rmw_destroy_publisher(rmw_node_t *node, rmw_publisher_t *pu /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static CddsSubscription *create_cdds_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) +static CddsSubscription *create_cdds_subscription (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { - RET_WRONG_IMPLID_X(node, return nullptr); - RET_NULL_OR_EMPTYSTR_X(topic_name, return nullptr); - RET_NULL_X(qos_policies, return nullptr); - auto node_impl = static_cast(node->data); - RET_NULL_X(node_impl, return nullptr); - const rosidl_message_type_support_t *type_support = get_typesupport(type_supports); - RET_NULL_X(type_support, return nullptr); - CddsSubscription *sub = new CddsSubscription(); + RET_WRONG_IMPLID_X (node, return nullptr); + RET_NULL_OR_EMPTYSTR_X (topic_name, return nullptr); + RET_NULL_X (qos_policies, return nullptr); + auto node_impl = static_cast (node->data); + RET_NULL_X (node_impl, return nullptr); + const rosidl_message_type_support_t *type_support = get_typesupport (type_supports); + RET_NULL_X (type_support, return nullptr); + CddsSubscription *sub = new CddsSubscription (); dds_entity_t topic; dds_qos_t *qos; - sub->node = node_impl; - sub->ignore_local_publications = ignore_local_publications; - sub->ts.typesupport_identifier_ = type_support->typesupport_identifier; - sub->ts.type_support_ = create_message_type_support(type_support->data, sub->ts.typesupport_identifier_); - std::string fqtopic_name = make_fqtopic(ros_topic_prefix, topic_name, "", qos_policies); + std::string fqtopic_name = make_fqtopic (ros_topic_prefix, topic_name, "", qos_policies); - if ((topic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, fqtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); + auto sertopic = create_sertopic (topic_name, type_support->typesupport_identifier, create_message_type_support (type_support->data, type_support->typesupport_identifier), false); + if ((topic = dds_create_topic_arbitrary (gcdds.ppant, sertopic, fqtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); goto fail_topic; } - if ((qos = create_readwrite_qos(qos_policies)) == nullptr) { + if ((qos = create_readwrite_qos (qos_policies, ignore_local_publications)) == nullptr) { goto fail_qos; } - if ((sub->subh = dds_create_reader(gcdds.ppant, topic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create reader"); + if ((sub->subh = dds_create_reader (gcdds.ppant, topic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create reader"); goto fail_reader; } - if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { - RMW_SET_ERROR_MSG("failed to create readcondition"); + if ((sub->rdcondh = dds_create_readcondition (sub->subh, DDS_ANY_STATE)) < 0) { + RMW_SET_ERROR_MSG ("failed to create readcondition"); goto fail_readcond; } - dds_qos_delete(qos); + dds_delete_qos (qos); + ddsi_sertopic_unref (sertopic); return sub; fail_readcond: - if (dds_delete(sub->subh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); + if (dds_delete (sub->subh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); } fail_reader: - dds_qos_delete(qos); + dds_delete_qos (qos); fail_qos: /* FIXME: leak topic */ fail_topic: delete sub; + ddsi_sertopic_unref (sertopic); return nullptr; } -extern "C" rmw_subscription_t *rmw_create_subscription(const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) +extern "C" rmw_subscription_t *rmw_create_subscription (const rmw_node_t *node, const rosidl_message_type_support_t *type_supports, const char *topic_name, const rmw_qos_profile_t *qos_policies, bool ignore_local_publications) { CddsSubscription *sub; rmw_subscription_t *rmw_subscription; - if ((sub = create_cdds_subscription(node, type_supports, topic_name, qos_policies, ignore_local_publications)) == nullptr) { + if ((sub = create_cdds_subscription (node, type_supports, topic_name, qos_policies, ignore_local_publications)) == nullptr) { goto fail_common_init; } - rmw_subscription = rmw_subscription_allocate(); - RET_ALLOC_X(rmw_subscription, goto fail_subscription); + rmw_subscription = rmw_subscription_allocate (); + RET_ALLOC_X (rmw_subscription, goto fail_subscription); rmw_subscription->implementation_identifier = adlink_cyclonedds_identifier; rmw_subscription->data = sub; - rmw_subscription->topic_name = reinterpret_cast(rmw_allocate(strlen(topic_name) + 1)); - RET_ALLOC_X(rmw_subscription->topic_name, goto fail_topic_name); - memcpy(const_cast(rmw_subscription->topic_name), topic_name, strlen(topic_name) + 1); + rmw_subscription->topic_name = reinterpret_cast (rmw_allocate (strlen (topic_name) + 1)); + RET_ALLOC_X (rmw_subscription->topic_name, goto fail_topic_name); + memcpy (const_cast (rmw_subscription->topic_name), topic_name, strlen (topic_name) + 1); return rmw_subscription; fail_topic_name: - rmw_subscription_free(rmw_subscription); + rmw_subscription_free (rmw_subscription); fail_subscription: - if (dds_delete(sub->rdcondh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete readcondition during error handling"); + if (dds_delete (sub->rdcondh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete readcondition during error handling"); } - if (dds_delete(sub->subh) < 0) { - RCUTILS_LOG_ERROR_NAMED("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); + if (dds_delete (sub->subh) < 0) { + RCUTILS_LOG_ERROR_NAMED ("rmw_cyclonedds_cpp", "failed to delete reader during error handling"); } delete sub; fail_common_init: return nullptr; } -extern "C" rmw_ret_t rmw_destroy_subscription(rmw_node_t *node, rmw_subscription_t *subscription) +extern "C" rmw_ret_t rmw_destroy_subscription (rmw_node_t *node, rmw_subscription_t *subscription) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(subscription); - auto sub = static_cast(subscription->data); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (subscription); + auto sub = static_cast (subscription->data); if (sub != nullptr) { - clean_waitset_caches(); - if (dds_delete(sub->rdcondh) < 0) { - RMW_SET_ERROR_MSG("failed to delete readcondition"); + clean_waitset_caches (); + if (dds_delete (sub->rdcondh) < 0) { + RMW_SET_ERROR_MSG ("failed to delete readcondition"); } - if (dds_delete(sub->subh) < 0) { - RMW_SET_ERROR_MSG("failed to delete reader"); + if (dds_delete (sub->subh) < 0) { + RMW_SET_ERROR_MSG ("failed to delete reader"); } delete sub; } - rmw_free(const_cast(subscription->topic_name)); + rmw_free (const_cast (subscription->topic_name)); subscription->topic_name = nullptr; - rmw_subscription_free(subscription); + rmw_subscription_free (subscription); return RMW_RET_OK; } -static rmw_ret_t rmw_take_int(const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) +static rmw_ret_t rmw_take_int (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) { - RET_NULL(taken); - RET_NULL(ros_message); - RET_WRONG_IMPLID(subscription); - CddsSubscription *sub = static_cast(subscription->data); - RET_NULL(sub); - struct serdata *sd; + RET_NULL (taken); + RET_NULL (ros_message); + RET_WRONG_IMPLID (subscription); + CddsSubscription *sub = static_cast (subscription->data); + RET_NULL (sub); dds_sample_info_t info; - while (dds_takecdr(sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { - if (info.valid_data && !(sub->ignore_local_publications && sub->node->own_writers.count(info.publication_handle))) { - size_t sz; - void *raw; - ddsi_serdata_getblob(&raw, &sz, sd); - /* FIXME: endianness (i.e., the "+ 4") */ - cycdeser deser(static_cast(static_cast(raw) + 4), sz); - desermsg(deser, ros_message, nullptr, sub->ts); - ddsi_serdata_unref(sd); + while (dds_take (sub->subh, &ros_message, &info, 1, 1) == 1) { + if (info.valid_data) { if (message_info) { message_info->publisher_gid.implementation_identifier = adlink_cyclonedds_identifier; - memset(message_info->publisher_gid.data, 0, sizeof(message_info->publisher_gid.data)); - assert(sizeof(info.publication_handle) <= sizeof(message_info->publisher_gid.data)); - memcpy(message_info->publisher_gid.data, &info.publication_handle, sizeof(info.publication_handle)); + memset (message_info->publisher_gid.data, 0, sizeof (message_info->publisher_gid.data)); + assert (sizeof (info.publication_handle) <= sizeof (message_info->publisher_gid.data)); + memcpy (message_info->publisher_gid.data, &info.publication_handle, sizeof (info.publication_handle)); } *taken = true; return RMW_RET_OK; - } else { - ddsi_serdata_unref(sd); } } *taken = false; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_take(const rmw_subscription_t *subscription, void *ros_message, bool *taken) +extern "C" rmw_ret_t rmw_take (const rmw_subscription_t *subscription, void *ros_message, bool *taken) { - return rmw_take_int(subscription, ros_message, taken, nullptr); + return rmw_take_int (subscription, ros_message, taken, nullptr); } -extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) +extern "C" rmw_ret_t rmw_take_with_info (const rmw_subscription_t *subscription, void *ros_message, bool *taken, rmw_message_info_t *message_info) { - return rmw_take_int(subscription, ros_message, taken, message_info); + return rmw_take_int (subscription, ros_message, taken, message_info); } ///////////////////////////////////////////////////////////////////////////////////////// @@ -802,15 +617,15 @@ extern "C" rmw_ret_t rmw_take_with_info(const rmw_subscription_t *subscription, /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_guard_condition_t *rmw_create_guard_condition() +extern "C" rmw_guard_condition_t *rmw_create_guard_condition (rmw_context_t *context __attribute__ ((unused))) { rmw_guard_condition_t *guard_condition_handle; - auto *gcond_impl = new CddsGuardCondition(); - if (ref_ppant() < 0) { + auto *gcond_impl = new CddsGuardCondition (); + if (ref_ppant () < 0) { goto fail_ppant; } - if ((gcond_impl->gcondh = dds_create_guardcondition(gcdds.ppant)) < 0) { - RMW_SET_ERROR_MSG("failed to create guardcondition"); + if ((gcond_impl->gcondh = dds_create_guardcondition (gcdds.ppant)) < 0) { + RMW_SET_ERROR_MSG ("failed to create guardcondition"); goto fail_guardcond; } guard_condition_handle = new rmw_guard_condition_t; @@ -819,173 +634,173 @@ extern "C" rmw_guard_condition_t *rmw_create_guard_condition() return guard_condition_handle; fail_guardcond: - unref_ppant(); + unref_ppant (); fail_ppant: - delete(gcond_impl); + delete (gcond_impl); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_guard_condition(rmw_guard_condition_t *guard_condition_handle) +extern "C" rmw_ret_t rmw_destroy_guard_condition (rmw_guard_condition_t *guard_condition_handle) { - RET_NULL(guard_condition_handle); - auto *gcond_impl = static_cast(guard_condition_handle->data); - clean_waitset_caches(); - dds_delete(gcond_impl->gcondh); - unref_ppant(); + RET_NULL (guard_condition_handle); + auto *gcond_impl = static_cast (guard_condition_handle->data); + clean_waitset_caches (); + dds_delete (gcond_impl->gcondh); + unref_ppant (); delete gcond_impl; delete guard_condition_handle; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_trigger_guard_condition(const rmw_guard_condition_t *guard_condition_handle) +extern "C" rmw_ret_t rmw_trigger_guard_condition (const rmw_guard_condition_t *guard_condition_handle) { - RET_WRONG_IMPLID(guard_condition_handle); - auto *gcond_impl = static_cast(guard_condition_handle->data); - dds_set_guardcondition(gcond_impl->gcondh, true); + RET_WRONG_IMPLID (guard_condition_handle); + auto *gcond_impl = static_cast (guard_condition_handle->data); + dds_set_guardcondition (gcond_impl->gcondh, true); return RMW_RET_OK; } -extern "C" rmw_wait_set_t *rmw_create_wait_set(size_t max_conditions) +extern "C" rmw_wait_set_t *rmw_create_wait_set (size_t max_conditions) { - (void)max_conditions; - rmw_wait_set_t *wait_set = rmw_wait_set_allocate(); + (void) max_conditions; + rmw_wait_set_t *wait_set = rmw_wait_set_allocate (); CddsWaitset *ws = nullptr; - RET_ALLOC_X(wait_set, goto fail_alloc_wait_set); + RET_ALLOC_X (wait_set, goto fail_alloc_wait_set); wait_set->implementation_identifier = adlink_cyclonedds_identifier; - wait_set->data = rmw_allocate(sizeof(CddsWaitset)); - RET_ALLOC_X(wait_set->data, goto fail_alloc_wait_set_data); + wait_set->data = rmw_allocate (sizeof (CddsWaitset)); + RET_ALLOC_X (wait_set->data, goto fail_alloc_wait_set_data); // This should default-construct the fields of CddsWaitset - ws = static_cast(wait_set->data); - RMW_TRY_PLACEMENT_NEW(ws, ws, goto fail_placement_new, CddsWaitset, ); + ws = static_cast (wait_set->data); + RMW_TRY_PLACEMENT_NEW (ws, ws, goto fail_placement_new, CddsWaitset, ); if (!ws) { - RMW_SET_ERROR_MSG("failed to construct wait set info struct"); + RMW_SET_ERROR_MSG ("failed to construct wait set info struct"); goto fail_ws; } ws->inuse = false; - if ((ws->waitseth = dds_create_waitset(gcdds.ppant)) < 0) { - RMW_SET_ERROR_MSG("failed to create waitset"); + if ((ws->waitseth = dds_create_waitset (gcdds.ppant)) < 0) { + RMW_SET_ERROR_MSG ("failed to create waitset"); goto fail_waitset; } { - std::lock_guard lock(gcdds.lock); - gcdds.waitsets.insert(ws); + std::lock_guard lock (gcdds.lock); + gcdds.waitsets.insert (ws); } return wait_set; fail_waitset: fail_ws: - RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(ws->~CddsWaitset(), ws); + RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE (ws->~CddsWaitset (), ws); fail_placement_new: - rmw_free(wait_set->data); + rmw_free (wait_set->data); fail_alloc_wait_set_data: - rmw_wait_set_free(wait_set); + rmw_wait_set_free (wait_set); fail_alloc_wait_set: return nullptr; } -extern "C" rmw_ret_t rmw_destroy_wait_set(rmw_wait_set_t *wait_set) +extern "C" rmw_ret_t rmw_destroy_wait_set (rmw_wait_set_t *wait_set) { - RET_WRONG_IMPLID(wait_set); + RET_WRONG_IMPLID (wait_set); auto result = RMW_RET_OK; - auto ws = static_cast(wait_set->data); - RET_NULL(ws); - dds_delete(ws->waitseth); + auto ws = static_cast (wait_set->data); + RET_NULL (ws); + dds_delete (ws->waitseth); { - std::lock_guard lock(gcdds.lock); - gcdds.waitsets.erase(ws); + std::lock_guard lock (gcdds.lock); + gcdds.waitsets.erase (ws); } - RMW_TRY_DESTRUCTOR(ws->~CddsWaitset(), ws, result = RMW_RET_ERROR); - rmw_free(wait_set->data); - rmw_wait_set_free(wait_set); + RMW_TRY_DESTRUCTOR (ws->~CddsWaitset (), ws, result = RMW_RET_ERROR); + rmw_free (wait_set->data); + rmw_wait_set_free (wait_set); return result; } template -static bool require_reattach(const std::vector& cached, size_t count, void **ary) +static bool require_reattach (const std::vector& cached, size_t count, void **ary) { if (ary == nullptr || count == 0) { - return (cached.size() != 0); - } else if (count != cached.size()) { + return (cached.size () != 0); + } else if (count != cached.size ()) { return true; } else { - return (memcmp(static_cast(cached.data()), static_cast(ary), count * sizeof(void *)) != 0); + return (memcmp (static_cast (cached.data ()), static_cast (ary), count * sizeof (void *)) != 0); } } -static void waitset_detach(CddsWaitset *ws) +static void waitset_detach (CddsWaitset *ws) { - for (auto&& x : ws->subs) dds_waitset_detach(ws->waitseth, x->rdcondh); - for (auto&& x : ws->gcs) dds_waitset_detach(ws->waitseth, x->gcondh); - for (auto&& x : ws->srvs) dds_waitset_detach(ws->waitseth, x->service.sub->rdcondh); - for (auto&& x : ws->cls) dds_waitset_detach(ws->waitseth, x->client.sub->rdcondh); - ws->subs.resize(0); - ws->gcs.resize(0); - ws->srvs.resize(0); - ws->cls.resize(0); + for (auto&& x : ws->subs) dds_waitset_detach (ws->waitseth, x->rdcondh); + for (auto&& x : ws->gcs) dds_waitset_detach (ws->waitseth, x->gcondh); + for (auto&& x : ws->srvs) dds_waitset_detach (ws->waitseth, x->service.sub->rdcondh); + for (auto&& x : ws->cls) dds_waitset_detach (ws->waitseth, x->client.sub->rdcondh); + ws->subs.resize (0); + ws->gcs.resize (0); + ws->srvs.resize (0); + ws->cls.resize (0); } -static void clean_waitset_caches() +static void clean_waitset_caches () { /* Called whenever a subscriber, guard condition, service or client is deleted (as these may have been cached in a waitset), and drops all cached entities from all waitsets (just to keep life simple). I'm assuming one is not allowed to delete an entity while it is still being used ... */ - std::lock_guard lock(gcdds.lock); + std::lock_guard lock (gcdds.lock); for (auto&& ws : gcdds.waitsets) { - std::lock_guard lock(ws->lock); + std::lock_guard lock (ws->lock); if (!ws->inuse) { - waitset_detach(ws); + waitset_detach (ws); } } } -extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) +extern "C" rmw_ret_t rmw_wait (rmw_subscriptions_t *subs, rmw_guard_conditions_t *gcs, rmw_services_t *srvs, rmw_clients_t *cls, rmw_wait_set_t *wait_set, const rmw_time_t *wait_timeout) { - RET_NULL(wait_set); - CddsWaitset *ws = static_cast(wait_set->data); - RET_NULL(ws); + RET_NULL (wait_set); + CddsWaitset *ws = static_cast (wait_set->data); + RET_NULL (ws); { - std::lock_guard lock(ws->lock); + std::lock_guard lock (ws->lock); if (ws->inuse) { - RMW_SET_ERROR_MSG("concurrent calls to rmw_wait on a single waitset is not supported"); + RMW_SET_ERROR_MSG ("concurrent calls to rmw_wait on a single waitset is not supported"); return RMW_RET_ERROR; } ws->inuse = true; } - if (require_reattach(ws->subs, subs ? subs->subscriber_count : 0, subs ? subs->subscribers : nullptr) || - require_reattach(ws->gcs, gcs ? gcs->guard_condition_count : 0, gcs ? gcs->guard_conditions : nullptr) || - require_reattach(ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) || - require_reattach(ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) { + if (require_reattach (ws->subs, subs ? subs->subscriber_count : 0, subs ? subs->subscribers : nullptr) || + require_reattach (ws->gcs, gcs ? gcs->guard_condition_count : 0, gcs ? gcs->guard_conditions : nullptr) || + require_reattach (ws->srvs, srvs ? srvs->service_count : 0, srvs ? srvs->services : nullptr) || + require_reattach (ws->cls, cls ? cls->client_count : 0, cls ? cls->clients : nullptr)) { size_t nelems = 0; - waitset_detach(ws); + waitset_detach (ws); #define ATTACH(type, var, name, cond) do { \ - ws->var.resize(0); \ + ws->var.resize (0); \ if (var) { \ - ws->var.reserve(var->name##_count); \ + ws->var.reserve (var->name##_count); \ for (size_t i = 0; i < var->name##_count; i++) { \ - auto x = static_cast(var->name##s[i]); \ - ws->var.push_back(x); \ - dds_waitset_attach(ws->waitseth, x->cond, nelems); \ + auto x = static_cast (var->name##s[i]); \ + ws->var.push_back (x); \ + dds_waitset_attach (ws->waitseth, x->cond, nelems); \ nelems++; \ } \ } \ } while (0) - ATTACH(CddsSubscription, subs, subscriber, rdcondh); - ATTACH(CddsGuardCondition, gcs, guard_condition, gcondh); - ATTACH(CddsService, srvs, service, service.sub->rdcondh); - ATTACH(CddsClient, cls, client, client.sub->rdcondh); + ATTACH (CddsSubscription, subs, subscriber, rdcondh); + ATTACH (CddsGuardCondition, gcs, guard_condition, gcondh); + ATTACH (CddsService, srvs, service, service.sub->rdcondh); + ATTACH (CddsClient, cls, client, client.sub->rdcondh); #undef ATTACH - ws->trigs.reserve(nelems + 1); + ws->trigs.reserve (nelems + 1); } const dds_duration_t timeout = - (wait_timeout == NULL) ? DDS_NEVER : (dds_duration_t)wait_timeout->sec * 1000000000 + wait_timeout->nsec; - const dds_return_t ntrig = dds_waitset_wait(ws->waitseth, ws->trigs.data(), ws->trigs.capacity(), timeout); - ws->trigs.resize(ntrig); - std::sort(ws->trigs.begin(), ws->trigs.end()); - ws->trigs.push_back((dds_attach_t)-1); + (wait_timeout == NULL) ? DDS_NEVER : (dds_duration_t) wait_timeout->sec * 1000000000 + wait_timeout->nsec; + const dds_return_t ntrig = dds_waitset_wait (ws->waitseth, ws->trigs.data (), ws->trigs.capacity (), timeout); + ws->trigs.resize (ntrig); + std::sort (ws->trigs.begin (), ws->trigs.end ()); + ws->trigs.push_back ((dds_attach_t) -1); { long trig_idx = 0; @@ -994,9 +809,9 @@ extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t #define DETACH(type, var, name, cond, on_triggered) do { \ if (var) { \ for (size_t i = 0; i < var->name##_count; i++) { \ - auto x = static_cast(var->name##s[i]); \ - /*dds_waitset_detach(ws->waitseth, x->cond);*/ \ - if (ws->trigs[trig_idx] == (long)nelems) { \ + auto x = static_cast (var->name##s[i]); \ + /*dds_waitset_detach (ws->waitseth, x->cond);*/ \ + if (ws->trigs[trig_idx] == (long) nelems) { \ on_triggered; \ trig_idx++; \ } else { \ @@ -1006,19 +821,19 @@ extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t } \ } \ } while (0) - DETACH(CddsSubscription, subs, subscriber, rdcondh, (void)x); - DETACH(CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition(x->gcondh, &dummy)); - DETACH(CddsService, srvs, service, service.sub->rdcondh, (void)x); - DETACH(CddsClient, cls, client, client.sub->rdcondh, (void)x); + DETACH (CddsSubscription, subs, subscriber, rdcondh, (void) x); + DETACH (CddsGuardCondition, gcs, guard_condition, gcondh, dds_take_guardcondition (x->gcondh, &dummy)); + DETACH (CddsService, srvs, service, service.sub->rdcondh, (void) x); + DETACH (CddsClient, cls, client, client.sub->rdcondh, (void) x); #undef DETACH } { - std::lock_guard lock(ws->lock); + std::lock_guard lock (ws->lock); ws->inuse = false; } - return (ws->trigs.size() == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK; + return (ws->trigs.size () == 0) ? RMW_RET_TIMEOUT : RMW_RET_OK; } ///////////////////////////////////////////////////////////////////////////////////////// @@ -1027,263 +842,260 @@ extern "C" rmw_ret_t rmw_wait(rmw_subscriptions_t *subs, rmw_guard_conditions_t /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -static rmw_ret_t rmw_take_response_request(CddsCS *cs, rmw_request_id_t *request_header, void *ros_data, bool *taken, dds_instance_handle_t srcfilter) +static rmw_ret_t rmw_take_response_request (CddsCS *cs, rmw_request_id_t *request_header, void *ros_data, bool *taken, dds_instance_handle_t srcfilter) { - RET_NULL(taken); - RET_NULL(ros_data); - RET_NULL(request_header); - struct serdata *sd; + RET_NULL (taken); + RET_NULL (ros_data); + RET_NULL (request_header); + cdds_request_wrapper_t wrap; dds_sample_info_t info; - while (dds_takecdr(cs->sub->subh, &sd, 1, &info, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE) == 1) { + wrap.data = ros_data; + void *wrap_ptr = static_cast (&wrap); + while (dds_take (cs->sub->subh, &wrap_ptr, &info, 1, 1) == 1) { if (info.valid_data) { - size_t sz; - void *raw; - ddsi_serdata_getblob(&raw, &sz, sd); - /* FIXME: endianness (i.e., the "+ 4") */ - cycdeser deser(static_cast(static_cast(raw) + 4), sz); - cdds_request_header_t header; - desermsg(deser, ros_data, [&header](cycdeser& ser) { ser >> header.guid; ser >> header.seq; }, cs->sub->ts); - ddsi_serdata_unref(sd); - memset(request_header, 0, sizeof(*request_header)); - assert(sizeof(header.guid) < sizeof(request_header->writer_guid)); - memcpy(request_header->writer_guid, &header.guid, sizeof(header.guid)); - request_header->sequence_number = header.seq; - if (srcfilter == 0 || srcfilter == header.guid) { + memset (request_header, 0, sizeof (wrap.header)); + assert (sizeof (wrap.header.guid) < sizeof (wrap.header.guid)); + memcpy (request_header->writer_guid, &wrap.header.guid, sizeof (wrap.header.guid)); + request_header->sequence_number = wrap.header.seq; + if (srcfilter == 0 || srcfilter == wrap.header.guid) { *taken = true; return RMW_RET_OK; } - } else { - ddsi_serdata_unref(sd); } } *taken = false; return RMW_RET_OK; } -extern "C" rmw_ret_t rmw_take_response(const rmw_client_t *client, rmw_request_id_t *request_header, void *ros_response, bool *taken) +extern "C" rmw_ret_t rmw_take_response (const rmw_client_t *client, rmw_request_id_t *request_header, void *ros_response, bool *taken) { - RET_WRONG_IMPLID(client); - auto info = static_cast(client->data); - return rmw_take_response_request(&info->client, request_header, ros_response, taken, info->client.pub->pubiid); + RET_WRONG_IMPLID (client); + auto info = static_cast (client->data); + return rmw_take_response_request (&info->client, request_header, ros_response, taken, info->client.pub->pubiid); } -extern "C" rmw_ret_t rmw_take_request(const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_request, bool *taken) +extern "C" rmw_ret_t rmw_take_request (const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_request, bool *taken) { - RET_WRONG_IMPLID(service); - auto info = static_cast(service->data); - return rmw_take_response_request(&info->service, request_header, ros_request, taken, 0); + RET_WRONG_IMPLID (service); + auto info = static_cast (service->data); + return rmw_take_response_request (&info->service, request_header, ros_request, taken, 0); } -static rmw_ret_t rmw_send_response_request(CddsCS *cs, cdds_request_header_t *header, const void *ros_data) +static rmw_ret_t rmw_send_response_request (CddsCS *cs, const cdds_request_header_t& header, const void *ros_data) { - cycser sd(cs->pub->sertopic); - if (sermsg(ros_data, sd, [&header](cycser& ser) { ser << header->guid; ser << header->seq; }, cs->pub->ts)) { - return rmw_write_ser(cs->pub->pubh, sd); + const cdds_request_wrapper_t wrap = { header, const_cast (ros_data) }; + if (dds_write (cs->pub->pubh, static_cast (&wrap)) >= 0) { + return RMW_RET_OK; } else { - RMW_SET_ERROR_MSG("cannot serialize data"); - return RMW_RET_ERROR; + /* FIXME: what is the expected behavior when it times out? */ + RMW_SET_ERROR_MSG ("cannot publish data"); + //return RMW_RET_ERROR; + return RMW_RET_OK; } } -extern "C" rmw_ret_t rmw_send_response(const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_response) +extern "C" rmw_ret_t rmw_send_response (const rmw_service_t *service, rmw_request_id_t *request_header, void *ros_response) { - RET_WRONG_IMPLID(service); - RET_NULL(request_header); - RET_NULL(ros_response); - CddsService *info = static_cast(service->data); + RET_WRONG_IMPLID (service); + RET_NULL (request_header); + RET_NULL (ros_response); + CddsService *info = static_cast (service->data); cdds_request_header_t header; - memcpy(&header.guid, request_header->writer_guid, sizeof(header.guid)); + memcpy (&header.guid, request_header->writer_guid, sizeof (header.guid)); header.seq = request_header->sequence_number; - return rmw_send_response_request(&info->service, &header, ros_response); + return rmw_send_response_request (&info->service, header, ros_response); } -extern "C" rmw_ret_t rmw_send_request(const rmw_client_t *client, const void *ros_request, int64_t *sequence_id) +extern "C" rmw_ret_t rmw_send_request (const rmw_client_t *client, const void *ros_request, int64_t *sequence_id) { static std::atomic_uint next_request_id; - RET_WRONG_IMPLID(client); - RET_NULL(ros_request); - RET_NULL(sequence_id); - auto info = static_cast(client->data); + RET_WRONG_IMPLID (client); + RET_NULL (ros_request); + RET_NULL (sequence_id); + auto info = static_cast (client->data); cdds_request_header_t header; header.guid = info->client.pub->pubiid; header.seq = *sequence_id = next_request_id++; - return rmw_send_response_request(&info->client, &header, ros_request); + return rmw_send_response_request (&info->client, header, ros_request); } -static const rosidl_service_type_support_t *get_service_typesupport(const rosidl_service_type_support_t *type_supports) +static const rosidl_service_type_support_t *get_service_typesupport (const rosidl_service_type_support_t *type_supports) { const rosidl_service_type_support_t *ts; - if ((ts = get_service_typesupport_handle(type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { + if ((ts = get_service_typesupport_handle (type_supports, rosidl_typesupport_introspection_c__identifier)) != nullptr) { return ts; - } else if ((ts = get_service_typesupport_handle(type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { + } else if ((ts = get_service_typesupport_handle (type_supports, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr) { return ts; } else { - RMW_SET_ERROR_MSG("service type support not from this implementation"); + RMW_SET_ERROR_MSG ("service type support not from this implementation"); return nullptr; } } -static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies, bool is_service) +static rmw_ret_t rmw_init_cs (CddsCS *cs, const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies, bool is_service) { - RET_WRONG_IMPLID(node); - RET_NULL_OR_EMPTYSTR(service_name); - RET_NULL(qos_policies); - auto node_impl = static_cast(node->data); - RET_NULL(node_impl); - const rosidl_service_type_support_t *type_support = get_service_typesupport(type_supports); - RET_NULL(type_support); + RET_WRONG_IMPLID (node); + RET_NULL_OR_EMPTYSTR (service_name); + RET_NULL (qos_policies); + auto node_impl = static_cast (node->data); + RET_NULL (node_impl); + const rosidl_service_type_support_t *type_support = get_service_typesupport (type_supports); + RET_NULL (type_support); - auto pub = new CddsPublisher(); - auto sub = new CddsSubscription(); + auto pub = new CddsPublisher (); + auto sub = new CddsSubscription (); std::string subtopic_name, pubtopic_name; - pub->ts.typesupport_identifier_ = type_support->typesupport_identifier; - sub->ts.typesupport_identifier_ = type_support->typesupport_identifier; + void *pub_type_support, *sub_type_support; if (is_service) { - sub->ts.type_support_ = create_request_type_support(type_support->data, type_support->typesupport_identifier); - pub->ts.type_support_ = create_response_type_support(type_support->data, type_support->typesupport_identifier); - subtopic_name = make_fqtopic(ros_service_requester_prefix, service_name, "_request", qos_policies); - pubtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); + sub_type_support = create_request_type_support (type_support->data, type_support->typesupport_identifier); + pub_type_support = create_response_type_support (type_support->data, type_support->typesupport_identifier); + subtopic_name = make_fqtopic (ros_service_requester_prefix, service_name, "_request", qos_policies); + pubtopic_name = make_fqtopic (ros_service_response_prefix, service_name, "_reply", qos_policies); } else { - pub->ts.type_support_ = create_request_type_support(type_support->data, type_support->typesupport_identifier); - sub->ts.type_support_ = create_response_type_support(type_support->data, type_support->typesupport_identifier); - pubtopic_name = make_fqtopic(ros_service_requester_prefix, service_name, "_request", qos_policies); - subtopic_name = make_fqtopic(ros_service_response_prefix, service_name, "_reply", qos_policies); + pub_type_support = create_request_type_support (type_support->data, type_support->typesupport_identifier); + sub_type_support = create_response_type_support (type_support->data, type_support->typesupport_identifier); + pubtopic_name = make_fqtopic (ros_service_requester_prefix, service_name, "_request", qos_policies); + subtopic_name = make_fqtopic (ros_service_response_prefix, service_name, "_reply", qos_policies); } - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client") - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str()) - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str()) - RCUTILS_LOG_DEBUG_NAMED("rmw_cyclonedds_cpp", "***********") + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "************ %s Details *********", is_service ? "Service" : "Client"); + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "Sub Topic %s", subtopic_name.c_str ()); + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "Pub Topic %s", pubtopic_name.c_str ()); + RCUTILS_LOG_DEBUG_NAMED ("rmw_cyclonedds_cpp", "***********"); - dds_entity_t pubtopic, subtopic; + dds_entity_t pubtopic, subtopic; + + auto pub_st = create_sertopic (pubtopic_name.c_str (), type_support->typesupport_identifier, pub_type_support, true); + auto sub_st = create_sertopic (subtopic_name.c_str (), type_support->typesupport_identifier, sub_type_support, true); + dds_qos_t *qos; - if ((pubtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, pubtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); + if ((pubtopic = dds_create_topic_arbitrary (gcdds.ppant, pub_st, pubtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); goto fail_pubtopic; } - if ((subtopic = dds_create_topic(gcdds.ppant, &rmw_cyclonedds_topic_desc, subtopic_name.c_str(), nullptr, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create topic"); + if ((subtopic = dds_create_topic_arbitrary (gcdds.ppant, sub_st, subtopic_name.c_str (), nullptr, nullptr, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create topic"); goto fail_subtopic; } - if ((qos = dds_qos_create()) == nullptr) { + if ((qos = dds_create_qos ()) == nullptr) { goto fail_qos; } - dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); - dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); - if ((pub->pubh = dds_create_writer(gcdds.ppant, pubtopic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create writer"); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1)); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + if ((pub->pubh = dds_create_writer (gcdds.ppant, pubtopic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create writer"); goto fail_writer; } - pub->sertopic = get_sertopic(pubtopic, pubtopic_name); - sub->node = node_impl; - if ((sub->subh = dds_create_reader(gcdds.ppant, subtopic, qos, nullptr)) < 0) { - RMW_SET_ERROR_MSG("failed to create reader"); + if ((sub->subh = dds_create_reader (gcdds.ppant, subtopic, qos, nullptr)) < 0) { + RMW_SET_ERROR_MSG ("failed to create reader"); goto fail_reader; } - if ((sub->rdcondh = dds_create_readcondition(sub->subh, DDS_ANY_STATE)) < 0) { - RMW_SET_ERROR_MSG("failed to create readcondition"); + if ((sub->rdcondh = dds_create_readcondition (sub->subh, DDS_ANY_STATE)) < 0) { + RMW_SET_ERROR_MSG ("failed to create readcondition"); goto fail_readcond; } - if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) { - RMW_SET_ERROR_MSG("failed to get instance handle for writer"); + if (dds_get_instance_handle (pub->pubh, &pub->pubiid) < 0) { + RMW_SET_ERROR_MSG ("failed to get instance handle for writer"); goto fail_instance_handle; } - dds_qos_delete(qos); - node_impl->own_writers.insert(pub->pubiid); + dds_delete_qos (qos); + ddsi_sertopic_unref (pub_st); + ddsi_sertopic_unref (sub_st); cs->pub = pub; cs->sub = sub; return RMW_RET_OK; fail_instance_handle: - dds_delete(sub->rdcondh); + dds_delete (sub->rdcondh); fail_readcond: - dds_delete(sub->subh); + dds_delete (sub->subh); fail_reader: - dds_delete(pub->pubh); + dds_delete (pub->pubh); fail_writer: - dds_qos_delete(qos); + dds_delete_qos (qos); fail_qos: /* leak subtopic */ fail_subtopic: /* leak pubtopic */ fail_pubtopic: + ddsi_sertopic_unref (pub_st); + ddsi_sertopic_unref (sub_st); return RMW_RET_ERROR; } -static void rmw_fini_cs(CddsCS *cs) +static void rmw_fini_cs (CddsCS *cs) { - dds_delete(cs->sub->rdcondh); - dds_delete(cs->sub->subh); - dds_delete(cs->pub->pubh); - cs->sub->node->own_writers.erase(cs->pub->pubiid); + dds_delete (cs->sub->rdcondh); + dds_delete (cs->sub->subh); + dds_delete (cs->pub->pubh); } -extern "C" rmw_client_t *rmw_create_client(const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) +extern "C" rmw_client_t *rmw_create_client (const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) { - CddsClient *info = new CddsClient(); - if (rmw_init_cs(&info->client, node, type_supports, service_name, qos_policies, false) != RMW_RET_OK) { - delete(info); + CddsClient *info = new CddsClient (); + if (rmw_init_cs (&info->client, node, type_supports, service_name, qos_policies, false) != RMW_RET_OK) { + delete (info); return nullptr; } - rmw_client_t *rmw_client = rmw_client_allocate(); - RET_NULL_X(rmw_client, goto fail_client); + rmw_client_t *rmw_client = rmw_client_allocate (); + RET_NULL_X (rmw_client, goto fail_client); rmw_client->implementation_identifier = adlink_cyclonedds_identifier; rmw_client->data = info; - rmw_client->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); - RET_NULL_X(rmw_client->service_name, goto fail_service_name); - memcpy(const_cast(rmw_client->service_name), service_name, strlen(service_name) + 1); + rmw_client->service_name = reinterpret_cast (rmw_allocate (strlen (service_name) + 1)); + RET_NULL_X (rmw_client->service_name, goto fail_service_name); + memcpy (const_cast (rmw_client->service_name), service_name, strlen (service_name) + 1); return rmw_client; fail_service_name: - rmw_client_free(rmw_client); + rmw_client_free (rmw_client); fail_client: - rmw_fini_cs(&info->client); + rmw_fini_cs (&info->client); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_client(rmw_node_t *node, rmw_client_t *client) +extern "C" rmw_ret_t rmw_destroy_client (rmw_node_t *node, rmw_client_t *client) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(client); - auto info = static_cast(client->data); - clean_waitset_caches(); - rmw_fini_cs(&info->client); - rmw_free(const_cast(client->service_name)); - rmw_client_free(client); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (client); + auto info = static_cast (client->data); + clean_waitset_caches (); + rmw_fini_cs (&info->client); + rmw_free (const_cast (client->service_name)); + rmw_client_free (client); return RMW_RET_OK; } -extern "C" rmw_service_t *rmw_create_service(const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) +extern "C" rmw_service_t *rmw_create_service (const rmw_node_t *node, const rosidl_service_type_support_t *type_supports, const char *service_name, const rmw_qos_profile_t *qos_policies) { - CddsService *info = new CddsService(); - if (rmw_init_cs(&info->service, node, type_supports, service_name, qos_policies, true) != RMW_RET_OK) { - delete(info); + CddsService *info = new CddsService (); + if (rmw_init_cs (&info->service, node, type_supports, service_name, qos_policies, true) != RMW_RET_OK) { + delete (info); return nullptr; } - rmw_service_t *rmw_service = rmw_service_allocate(); - RET_NULL_X(rmw_service, goto fail_service); + rmw_service_t *rmw_service = rmw_service_allocate (); + RET_NULL_X (rmw_service, goto fail_service); rmw_service->implementation_identifier = adlink_cyclonedds_identifier; rmw_service->data = info; - rmw_service->service_name = reinterpret_cast(rmw_allocate(strlen(service_name) + 1)); - RET_NULL_X(rmw_service->service_name, goto fail_service_name); - memcpy(const_cast(rmw_service->service_name), service_name, strlen(service_name) + 1); + rmw_service->service_name = reinterpret_cast (rmw_allocate (strlen (service_name) + 1)); + RET_NULL_X (rmw_service->service_name, goto fail_service_name); + memcpy (const_cast (rmw_service->service_name), service_name, strlen (service_name) + 1); return rmw_service; fail_service_name: - rmw_service_free(rmw_service); + rmw_service_free (rmw_service); fail_service: - rmw_fini_cs(&info->service); + rmw_fini_cs (&info->service); return nullptr; } -extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t *node, rmw_service_t *service) +extern "C" rmw_ret_t rmw_destroy_service (rmw_node_t *node, rmw_service_t *service) { - RET_WRONG_IMPLID(node); - RET_WRONG_IMPLID(service); - auto info = static_cast(service->data); - clean_waitset_caches(); - rmw_fini_cs(&info->service); - rmw_free(const_cast(service->service_name)); - rmw_service_free(service); + RET_WRONG_IMPLID (node); + RET_WRONG_IMPLID (service); + auto info = static_cast (service->data); + clean_waitset_caches (); + rmw_fini_cs (&info->service); + rmw_free (const_cast (service->service_name)); + rmw_service_free (service); return RMW_RET_OK; } @@ -1293,56 +1105,56 @@ extern "C" rmw_ret_t rmw_destroy_service(rmw_node_t *node, rmw_service_t *servic /////////// /////////// ///////////////////////////////////////////////////////////////////////////////////////// -extern "C" rmw_ret_t rmw_get_node_names(const rmw_node_t *node, rcutils_string_array_t *node_names) +extern "C" rmw_ret_t rmw_get_node_names (const rmw_node_t *node, rcutils_string_array_t *node_names, rcutils_string_array_t *node_namespaces) { #if 0 // NIY - RET_WRONG_IMPLID(node); - if (rmw_check_zero_rmw_string_array(node_names) != RMW_RET_OK) { + RET_WRONG_IMPLID (node); + if (rmw_check_zero_rmw_string_array (node_names) != RMW_RET_OK) { return RMW_RET_ERROR; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); // FIXME: sorry, can't do it with current Zenoh auto participant_names = std::vector{}; - rcutils_allocator_t allocator = rcutils_get_default_allocator(); + rcutils_allocator_t allocator = rcutils_get_default_allocator (); rcutils_ret_t rcutils_ret = - rcutils_string_array_init(node_names, participant_names.size(), &allocator); + rcutils_string_array_init (node_names, participant_names.size (), &allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) - return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ()) + return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret); } - for (size_t i = 0; i < participant_names.size(); ++i) { - node_names->data[i] = rcutils_strdup(participant_names[i].c_str(), allocator); + for (size_t i = 0; i < participant_names.size (); ++i) { + node_names->data[i] = rcutils_strdup (participant_names[i].c_str (), allocator); if (!node_names->data[i]) { - RMW_SET_ERROR_MSG("failed to allocate memory for node name") - rcutils_ret = rcutils_string_array_fini(node_names); + RMW_SET_ERROR_MSG ("failed to allocate memory for node name") + rcutils_ret = rcutils_string_array_fini (node_names); if (rcutils_ret != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "failed to cleanup during error handling: %s", rcutils_get_error_string_safe()) - } + "failed to cleanup during error handling: %s", rcutils_get_error_string_safe ()); + } return RMW_RET_BAD_ALLOC; } } return RMW_RET_OK; #else - (void)node; (void)node_names; + (void) node; (void) node_names; (void) node_namespaces; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcutils_allocator_t *allocator, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) +extern "C" rmw_ret_t rmw_get_topic_names_and_types (const rmw_node_t *node, rcutils_allocator_t *allocator, bool no_demangle, rmw_names_and_types_t *topic_names_and_types) { #if 0 // NIY - RET_NULL(allocator); - RET_WRONG_IMPLID(node); - rmw_ret_t ret = rmw_names_and_types_check_zero(topic_names_and_types); + RET_NULL (allocator); + RET_WRONG_IMPLID (node); + rmw_ret_t ret = rmw_names_and_types_check_zero (topic_names_and_types); if (ret != RMW_RET_OK) { return ret; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); // Access the slave Listeners, which are the ones that have the topicnamesandtypes member // Get info from publisher and subscriber @@ -1350,48 +1162,48 @@ extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcuti std::map> topics; { ReaderInfo * slave_target = impl->secondarySubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { + if (!no_demangle && _get_ros_prefix_if_exists (it.first) != ros_topic_prefix) { // if we are demangling and this is not prefixed with rt/, skip it continue; } for (auto & itt : it.second) { - topics[it.first].insert(itt); + topics[it.first].insert (itt); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } { WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - if (!no_demangle && _get_ros_prefix_if_exists(it.first) != ros_topic_prefix) { + if (!no_demangle && _get_ros_prefix_if_exists (it.first) != ros_topic_prefix) { // if we are demangling and this is not prefixed with rt/, skip it continue; } for (auto & itt : it.second) { - topics[it.first].insert(itt); + topics[it.first].insert (itt); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } // Copy data to results handle - if (topics.size() > 0) { + if (topics.size () > 0) { // Setup string array to store names - rmw_ret_t rmw_ret = rmw_names_and_types_init(topic_names_and_types, topics.size(), allocator); + rmw_ret_t rmw_ret = rmw_names_and_types_init (topic_names_and_types, topics.size (), allocator); if (rmw_ret != RMW_RET_OK) { return rmw_ret; } // Setup cleanup function, in case of failure below auto fail_cleanup = [&topic_names_and_types]() { - rmw_ret_t rmw_ret = rmw_names_and_types_fini(topic_names_and_types); + rmw_ret_t rmw_ret = rmw_names_and_types_fini (topic_names_and_types); if (rmw_ret != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "error during report of error: %s", rmw_get_error_string_safe()) - } + "error during report of error: %s", rmw_get_error_string_safe ()); + } }; // Setup demangling functions based on no_demangle option auto demangle_topic = _demangle_if_ros_topic; @@ -1407,32 +1219,32 @@ extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcuti size_t index = 0; for (const auto & topic_n_types : topics) { // Duplicate and store the topic_name - char * topic_name = rcutils_strdup(demangle_topic(topic_n_types.first).c_str(), *allocator); + char * topic_name = rcutils_strdup (demangle_topic (topic_n_types.first).c_str (), *allocator); if (!topic_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for topic name", *allocator); - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for topic name", *allocator); + fail_cleanup (); return RMW_RET_BAD_ALLOC; } topic_names_and_types->names.data[index] = topic_name; // Setup storage for types { - rcutils_ret_t rcutils_ret = rcutils_string_array_init( + rcutils_ret_t rcutils_ret = rcutils_string_array_init ( &topic_names_and_types->types[index], - topic_n_types.second.size(), + topic_n_types.second.size (), allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) - fail_cleanup(); - return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ()) + fail_cleanup (); + return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret); } } // Duplicate and store each type for the topic size_t type_index = 0; for (const auto & type : topic_n_types.second) { - char * type_name = rcutils_strdup(demangle_type(type).c_str(), *allocator); + char * type_name = rcutils_strdup (demangle_type (type).c_str (), *allocator); if (!type_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for type name", *allocator) - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for type name", *allocator) + fail_cleanup (); return RMW_RET_BAD_ALLOC; } topic_names_and_types->types[index].data[type_index] = type_name; @@ -1443,34 +1255,34 @@ extern "C" rmw_ret_t rmw_get_topic_names_and_types(const rmw_node_t *node, rcuti } return RMW_RET_OK; #else - (void)node; (void)allocator; (void)no_demangle; (void)topic_names_and_types; + (void) node; (void) allocator; (void) no_demangle; (void) topic_names_and_types; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_get_service_names_and_types(const rmw_node_t *node, rcutils_allocator_t *allocator, rmw_names_and_types_t *service_names_and_types) +extern "C" rmw_ret_t rmw_get_service_names_and_types (const rmw_node_t *node, rcutils_allocator_t *allocator, rmw_names_and_types_t *service_names_and_types) { #if 0 // NIY if (!allocator) { - RMW_SET_ERROR_MSG("allocator is null") + RMW_SET_ERROR_MSG ("allocator is null") return RMW_RET_INVALID_ARGUMENT; } if (!node) { - RMW_SET_ERROR_MSG_ALLOC("null node handle", *allocator) + RMW_SET_ERROR_MSG_ALLOC ("null node handle", *allocator) return RMW_RET_INVALID_ARGUMENT; } - rmw_ret_t ret = rmw_names_and_types_check_zero(service_names_and_types); + rmw_ret_t ret = rmw_names_and_types_check_zero (service_names_and_types); if (ret != RMW_RET_OK) { return ret; } // Get participant pointer from node if (node->implementation_identifier != adlink_cyclonedds_identifier) { - RMW_SET_ERROR_MSG_ALLOC("node handle not from this implementation", *allocator); + RMW_SET_ERROR_MSG_ALLOC ("node handle not from this implementation", *allocator); return RMW_RET_ERROR; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); // Access the slave Listeners, which are the ones that have the topicnamesandtypes member // Get info from publisher and subscriber @@ -1478,88 +1290,88 @@ extern "C" rmw_ret_t rmw_get_service_names_and_types(const rmw_node_t *node, rcu std::map> services; { ReaderInfo * slave_target = impl->secondarySubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { + std::string service_name = _demangle_service_from_topic (it.first); + if (!service_name.length ()) { // not a service continue; } for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + std::string service_type = _demangle_service_type_only (itt); + if (service_type.length ()) { + services[service_name].insert (service_type); } } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } { WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - std::string service_name = _demangle_service_from_topic(it.first); - if (!service_name.length()) { + std::string service_name = _demangle_service_from_topic (it.first); + if (!service_name.length ()) { // not a service continue; } for (auto & itt : it.second) { - std::string service_type = _demangle_service_type_only(itt); - if (service_type.length()) { - services[service_name].insert(service_type); + std::string service_type = _demangle_service_type_only (itt); + if (service_type.length ()) { + services[service_name].insert (service_type); } } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); } // Fill out service_names_and_types - if (services.size()) { + if (services.size ()) { // Setup string array to store names rmw_ret_t rmw_ret = - rmw_names_and_types_init(service_names_and_types, services.size(), allocator); + rmw_names_and_types_init (service_names_and_types, services.size (), allocator); if (rmw_ret != RMW_RET_OK) { return rmw_ret; } // Setup cleanup function, in case of failure below auto fail_cleanup = [&service_names_and_types]() { - rmw_ret_t rmw_ret = rmw_names_and_types_fini(service_names_and_types); + rmw_ret_t rmw_ret = rmw_names_and_types_fini (service_names_and_types); if (rmw_ret != RMW_RET_OK) { - RCUTILS_LOG_ERROR_NAMED( + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "error during report of error: %s", rmw_get_error_string_safe()) + "error during report of error: %s", rmw_get_error_string_safe ()); } }; // For each service, store the name, initialize the string array for types, and store all types size_t index = 0; for (const auto & service_n_types : services) { // Duplicate and store the service_name - char * service_name = rcutils_strdup(service_n_types.first.c_str(), *allocator); + char * service_name = rcutils_strdup (service_n_types.first.c_str (), *allocator); if (!service_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for service name", *allocator); - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for service name", *allocator); + fail_cleanup (); return RMW_RET_BAD_ALLOC; } service_names_and_types->names.data[index] = service_name; // Setup storage for types { - rcutils_ret_t rcutils_ret = rcutils_string_array_init( + rcutils_ret_t rcutils_ret = rcutils_string_array_init ( &service_names_and_types->types[index], - service_n_types.second.size(), + service_n_types.second.size (), allocator); if (rcutils_ret != RCUTILS_RET_OK) { - RMW_SET_ERROR_MSG(rcutils_get_error_string_safe()) - fail_cleanup(); - return rmw_convert_rcutils_ret_to_rmw_ret(rcutils_ret); + RMW_SET_ERROR_MSG (rcutils_get_error_string_safe ()) + fail_cleanup (); + return rmw_convert_rcutils_ret_to_rmw_ret (rcutils_ret); } } // Duplicate and store each type for the service size_t type_index = 0; for (const auto & type : service_n_types.second) { - char * type_name = rcutils_strdup(type.c_str(), *allocator); + char * type_name = rcutils_strdup (type.c_str (), *allocator); if (!type_name) { - RMW_SET_ERROR_MSG_ALLOC("failed to allocate memory for type name", *allocator) - fail_cleanup(); + RMW_SET_ERROR_MSG_ALLOC ("failed to allocate memory for type name", *allocator) + fail_cleanup (); return RMW_RET_BAD_ALLOC; } service_names_and_types->types[index].data[type_index] = type_name; @@ -1570,75 +1382,75 @@ extern "C" rmw_ret_t rmw_get_service_names_and_types(const rmw_node_t *node, rcu } return RMW_RET_OK; #else - (void)node; (void)allocator; (void)service_names_and_types; + (void) node; (void) allocator; (void) service_names_and_types; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_service_server_is_available(const rmw_node_t * node, const rmw_client_t * client, bool * is_available) +extern "C" rmw_ret_t rmw_service_server_is_available (const rmw_node_t * node, const rmw_client_t * client, bool * is_available) { #if 0 // NIY if (!node) { - RMW_SET_ERROR_MSG("node handle is null"); + RMW_SET_ERROR_MSG ("node handle is null"); return RMW_RET_ERROR; } - RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + RMW_CHECK_TYPE_IDENTIFIERS_MATCH ( node handle, node->implementation_identifier, adlink_cyclonedds_identifier, return RMW_RET_ERROR); if (!client) { - RMW_SET_ERROR_MSG("client handle is null"); + RMW_SET_ERROR_MSG ("client handle is null"); return RMW_RET_ERROR; } if (!is_available) { - RMW_SET_ERROR_MSG("is_available is null"); + RMW_SET_ERROR_MSG ("is_available is null"); return RMW_RET_ERROR; } - auto client_info = static_cast(client->data); + auto client_info = static_cast (client->data); if (!client_info) { - RMW_SET_ERROR_MSG("client info handle is null"); + RMW_SET_ERROR_MSG ("client info handle is null"); return RMW_RET_ERROR; } auto pub_topic_name = - client_info->request_publisher_->getAttributes().topic.getTopicName(); + client_info->request_publisher_->getAttributes ().topic.getTopicName (); auto pub_partitions = - client_info->request_publisher_->getAttributes().qos.m_partition.getNames(); + client_info->request_publisher_->getAttributes ().qos.m_partition.getNames (); // every rostopic has exactly 1 partition field set - if (pub_partitions.size() != 1) { - RCUTILS_LOG_ERROR_NAMED( + if (pub_partitions.size () != 1) { + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "Topic %s is not a ros topic", pub_topic_name.c_str()) - RMW_SET_ERROR_MSG((std::string(pub_topic_name) + " is a non-ros topic\n").c_str()); + "Topic %s is not a ros topic", pub_topic_name.c_str ()) + RMW_SET_ERROR_MSG ((std::string (pub_topic_name) + " is a non-ros topic\n").c_str ()); return RMW_RET_ERROR; } auto pub_fqdn = pub_partitions[0] + "/" + pub_topic_name; - pub_fqdn = _demangle_if_ros_topic(pub_fqdn); + pub_fqdn = _demangle_if_ros_topic (pub_fqdn); auto sub_topic_name = - client_info->response_subscriber_->getAttributes().topic.getTopicName(); + client_info->response_subscriber_->getAttributes ().topic.getTopicName (); auto sub_partitions = - client_info->response_subscriber_->getAttributes().qos.m_partition.getNames(); + client_info->response_subscriber_->getAttributes ().qos.m_partition.getNames (); // every rostopic has exactly 1 partition field set - if (sub_partitions.size() != 1) { - RCUTILS_LOG_ERROR_NAMED( + if (sub_partitions.size () != 1) { + RCUTILS_LOG_ERROR_NAMED ( "rmw_cyclonedds_cpp", - "Topic %s is not a ros topic", pub_topic_name.c_str()) - RMW_SET_ERROR_MSG((std::string(sub_topic_name) + " is a non-ros topic\n").c_str()); + "Topic %s is not a ros topic", pub_topic_name.c_str ()) + RMW_SET_ERROR_MSG ((std::string (sub_topic_name) + " is a non-ros topic\n").c_str ()); return RMW_RET_ERROR; } auto sub_fqdn = sub_partitions[0] + "/" + sub_topic_name; - sub_fqdn = _demangle_if_ros_topic(sub_fqdn); + sub_fqdn = _demangle_if_ros_topic (sub_fqdn); *is_available = false; size_t number_of_request_subscribers = 0; - rmw_ret_t ret = rmw_count_subscribers( + rmw_ret_t ret = rmw_count_subscribers ( node, - pub_fqdn.c_str(), + pub_fqdn.c_str (), &number_of_request_subscribers); if (ret != RMW_RET_OK) { // error string already set @@ -1650,9 +1462,9 @@ extern "C" rmw_ret_t rmw_service_server_is_available(const rmw_node_t * node, co } size_t number_of_response_publishers = 0; - ret = rmw_count_publishers( + ret = rmw_count_publishers ( node, - sub_fqdn.c_str(), + sub_fqdn.c_str (), &number_of_response_publishers); if (ret != RMW_RET_OK) { // error string already set @@ -1667,87 +1479,87 @@ extern "C" rmw_ret_t rmw_service_server_is_available(const rmw_node_t * node, co *is_available = true; return RMW_RET_OK; #else - (void)node; (void)client; (void)is_available; + (void) node; (void) client; (void) is_available; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_count_publishers(const rmw_node_t *node, const char *topic_name, size_t *count) +extern "C" rmw_ret_t rmw_count_publishers (const rmw_node_t *node, const char *topic_name, size_t *count) { #if 0 // safechecks if (!node) { - RMW_SET_ERROR_MSG("null node handle"); + RMW_SET_ERROR_MSG ("null node handle"); return RMW_RET_ERROR; } // Get participant pointer from node if (node->implementation_identifier != eprosima_fastrtps_identifier) { - RMW_SET_ERROR_MSG("node handle not from this implementation"); + RMW_SET_ERROR_MSG ("node handle not from this implementation"); return RMW_RET_ERROR; } - auto impl = static_cast(node->data); + auto impl = static_cast (node->data); WriterInfo * slave_target = impl->secondaryPubListener; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); *count = 0; for (auto it : slave_target->topicNtypes) { - auto topic_fqdn = _demangle_if_ros_topic(it.first); + auto topic_fqdn = _demangle_if_ros_topic (it.first); if (topic_fqdn == topic_name) { - *count += it.second.size(); + *count += it.second.size (); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); - RCUTILS_LOG_DEBUG_NAMED( + RCUTILS_LOG_DEBUG_NAMED ( "rmw_fastrtps_cpp", "looking for subscriber topic: %s, number of matches: %zu", - topic_name, *count) + topic_name, *count); - return RMW_RET_OK; + return RMW_RET_OK; #else - (void)node; (void)topic_name; (void)count; + (void) node; (void) topic_name; (void) count; return RMW_RET_TIMEOUT; #endif } -extern "C" rmw_ret_t rmw_count_subscribers(const rmw_node_t *node, const char *topic_name, size_t *count) +extern "C" rmw_ret_t rmw_count_subscribers (const rmw_node_t *node, const char *topic_name, size_t *count) { #if 0 // safechecks if (!node) { - RMW_SET_ERROR_MSG("null node handle"); + RMW_SET_ERROR_MSG ("null node handle"); return RMW_RET_ERROR; } // Get participant pointer from node if (node->implementation_identifier != eprosima_fastrtps_identifier) { - RMW_SET_ERROR_MSG("node handle not from this implementation"); + RMW_SET_ERROR_MSG ("node handle not from this implementation"); return RMW_RET_ERROR; } - CustomParticipantInfo * impl = static_cast(node->data); + CustomParticipantInfo * impl = static_cast (node->data); ReaderInfo * slave_target = impl->secondarySubListener; *count = 0; - slave_target->mapmutex.lock(); + slave_target->mapmutex.lock (); for (auto it : slave_target->topicNtypes) { - auto topic_fqdn = _demangle_if_ros_topic(it.first); + auto topic_fqdn = _demangle_if_ros_topic (it.first); if (topic_fqdn == topic_name) { - *count += it.second.size(); + *count += it.second.size (); } } - slave_target->mapmutex.unlock(); + slave_target->mapmutex.unlock (); - RCUTILS_LOG_DEBUG_NAMED( + RCUTILS_LOG_DEBUG_NAMED ( "rmw_fastrtps_cpp", "looking for subscriber topic: %s, number of matches: %zu", - topic_name, *count) + topic_name, *count); - return RMW_RET_OK; + return RMW_RET_OK; #else - (void)node; (void)topic_name; (void)count; + (void) node; (void) topic_name; (void) count; return RMW_RET_TIMEOUT; #endif } diff --git a/rmw_cyclonedds_cpp/src/serdata.cpp b/rmw_cyclonedds_cpp/src/serdata.cpp new file mode 100644 index 0000000..be0e8ea --- /dev/null +++ b/rmw_cyclonedds_cpp/src/serdata.cpp @@ -0,0 +1,306 @@ +// Copyright 2019 ADLINK Technology +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#include + +#include "rmw/error_handling.h" + +#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" +#include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp" + +#include "rmw_cyclonedds_cpp/serdes.hpp" +#include "rmw_cyclonedds_cpp/serdata.hpp" + +#include "dds/ddsi/ddsi_iid.h" +#include "dds/ddsi/q_radmin.h" + +using MessageTypeSupport_c = rmw_cyclonedds_cpp::MessageTypeSupport; +using MessageTypeSupport_cpp = rmw_cyclonedds_cpp::MessageTypeSupport; +using RequestTypeSupport_c = rmw_cyclonedds_cpp::RequestTypeSupport; +using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport; +using ResponseTypeSupport_c = rmw_cyclonedds_cpp::ResponseTypeSupport; +using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport; + +static bool using_introspection_c_typesupport (const char *typesupport_identifier) +{ + return typesupport_identifier == rosidl_typesupport_introspection_c__identifier; +} + +static bool using_introspection_cpp_typesupport (const char *typesupport_identifier) +{ + return typesupport_identifier == rosidl_typesupport_introspection_cpp::typesupport_identifier; +} + +void *create_message_type_support (const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new MessageTypeSupport_c (members); + } else if (using_introspection_cpp_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new MessageTypeSupport_cpp (members); + } + RMW_SET_ERROR_MSG ("Unknown typesupport identifier"); + return nullptr; +} + +void *create_request_type_support (const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new RequestTypeSupport_c (members); + } else if (using_introspection_cpp_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new RequestTypeSupport_cpp (members); + } + RMW_SET_ERROR_MSG ("Unknown typesupport identifier"); + return nullptr; +} + +void *create_response_type_support (const void *untyped_members, const char *typesupport_identifier) +{ + if (using_introspection_c_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new ResponseTypeSupport_c (members); + } else if (using_introspection_cpp_typesupport (typesupport_identifier)) { + auto members = static_cast (untyped_members); + return new ResponseTypeSupport_cpp (members); + } + RMW_SET_ERROR_MSG ("Unknown typesupport identifier"); + return nullptr; +} + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wc99-extensions" + +static uint32_t serdata_rmw_size (const struct ddsi_serdata *dcmn) +{ + const struct serdata_rmw *d = static_cast (dcmn); + return d->data.size (); +} + +static void serdata_rmw_free (struct ddsi_serdata *dcmn) +{ + const struct serdata_rmw *d = static_cast (dcmn); + delete d; +} + +static struct serdata_rmw *new_serdata_rmw (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind) +{ + struct serdata_rmw *sd = new serdata_rmw; + ddsi_serdata_init (sd, topic, kind); + return sd; +} + +static struct ddsi_serdata *serdata_rmw_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size) +{ + struct serdata_rmw *d = new_serdata_rmw (topic, kind); + uint32_t off = 0; + assert (fragchain->min == 0); + assert (fragchain->maxp1 >= off); /* CDR header must be in first fragment */ + d->data.reserve (size); + while (fragchain) { + if (fragchain->maxp1 > off) { + /* only copy if this fragment adds data */ + const unsigned char *payload = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)); + d->data.insert (d->data.end (), payload + off - fragchain->min, payload + fragchain->maxp1 - fragchain->min); + off = fragchain->maxp1; + } + fragchain = fragchain->nextfrag; + } + return d; +} + +static struct ddsi_serdata *serdata_rmw_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash __attribute__ ((unused))) +{ + /* there is no key field, so from_keyhash is trivial */ + return new_serdata_rmw (topic, SDK_KEY); +} + +static struct ddsi_serdata *serdata_rmw_from_sample (const struct ddsi_sertopic *topiccmn, enum ddsi_serdata_kind kind, const void *sample) +{ + const struct sertopic_rmw *topic = static_cast (topiccmn); + struct serdata_rmw *d = new_serdata_rmw (topic, kind); + if (kind != SDK_DATA) { + /* ROS2 doesn't do keys, so SDK_KEY is trivial */ + } else if (!topic->is_request_header) { + cycser sd (d->data); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (sample, sd, nullptr); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (sample, sd, nullptr); + } + } else { + /* The "prefix" lambda is there to inject the service invocation header data into the CDR + stream -- I haven't checked how it is done in the official RMW implementations, so it is + probably incompatible. */ + const cdds_request_wrapper_t *wrap = static_cast (sample); + auto prefix = [wrap](cycser& ser) { ser << wrap->header.guid; ser << wrap->header.seq; }; + cycser sd (d->data); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (wrap->data, sd, prefix); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + (void) typed_typesupport->serializeROSmessage (wrap->data, sd, prefix); + } + } + return d; +} + +static struct ddsi_serdata *serdata_rmw_to_topicless (const struct ddsi_serdata *dcmn) +{ + const struct serdata_rmw *d = static_cast (dcmn); + struct serdata_rmw *d1 = new_serdata_rmw (d->topic, SDK_KEY); + d1->topic = nullptr; + return d1; +} + +static void serdata_rmw_to_ser (const struct ddsi_serdata *dcmn, size_t off, size_t sz, void *buf) +{ + const struct serdata_rmw *d = static_cast (dcmn); + memcpy (buf, (char *) d->data.data () + off, sz); +} + +static struct ddsi_serdata *serdata_rmw_to_ser_ref (const struct ddsi_serdata *dcmn, size_t off, size_t sz, ddsrt_iovec_t *ref) +{ + const struct serdata_rmw *d = static_cast (dcmn); + ref->iov_base = (char *) d->data.data () + off; + ref->iov_len = (ddsrt_iov_len_t) sz; + return ddsi_serdata_ref (d); +} + +static void serdata_rmw_to_ser_unref (struct ddsi_serdata *dcmn, const ddsrt_iovec_t *ref __attribute__ ((unused))) +{ + struct serdata_rmw *d = static_cast (dcmn); + ddsi_serdata_unref (d); +} + +static bool serdata_rmw_to_sample (const struct ddsi_serdata *dcmn, void *sample, void **bufptr __attribute__ ((unused)), void *buflim __attribute__ ((unused))) +{ + const struct serdata_rmw *d = static_cast (dcmn); + const struct sertopic_rmw *topic = static_cast (d->topic); + assert (bufptr == NULL); + assert (buflim == NULL); + if (d->kind != SDK_DATA) { + /* ROS2 doesn't do keys in a meaningful way yet */ + } else if (!topic->is_request_header) { + cycdeser sd (static_cast (d->data.data ()), d->data.size ()); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, sample, nullptr); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, sample, nullptr); + } + } else { + /* The "prefix" lambda is there to inject the service invocation header data into the CDR + stream -- I haven't checked how it is done in the official RMW implementations, so it is + probably incompatible. */ + cdds_request_wrapper_t * const wrap = static_cast (sample); + auto prefix = [wrap](cycdeser& ser) { ser >> wrap->header.guid; ser >> wrap->header.seq; }; + cycdeser sd (static_cast (d->data.data ()), d->data.size ()); + if (using_introspection_c_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, wrap->data, prefix); + } else if (using_introspection_cpp_typesupport (topic->type_support.typesupport_identifier_)) { + auto typed_typesupport = static_cast (topic->type_support.type_support_); + return typed_typesupport->deserializeROSmessage (sd, wrap->data, prefix); + } + } + return false; +} + +static bool serdata_rmw_topicless_to_sample (const struct ddsi_sertopic *topic __attribute__ ((unused)), const struct ddsi_serdata *dcmn __attribute__ ((unused)), void *sample __attribute__ ((unused)), void **bufptr __attribute__ ((unused)), void *buflim __attribute__ ((unused))) +{ + /* ROS2 doesn't do keys in a meaningful way yet */ + return true; +} + +static bool serdata_rmw_eqkey (const struct ddsi_serdata *a __attribute__ ((unused)), const struct ddsi_serdata *b __attribute__ ((unused))) +{ + /* ROS2 doesn't do keys in a meaningful way yet */ + return true; +} + +static const struct ddsi_serdata_ops serdata_rmw_ops = { + .eqkey = serdata_rmw_eqkey, + .get_size = serdata_rmw_size, + .from_ser = serdata_rmw_from_ser, + .from_keyhash = serdata_rmw_from_keyhash, + .from_sample = serdata_rmw_from_sample, + .to_ser = serdata_rmw_to_ser, + .to_ser_ref = serdata_rmw_to_ser_ref, + .to_ser_unref = serdata_rmw_to_ser_unref, + .to_sample = serdata_rmw_to_sample, + .to_topicless = serdata_rmw_to_topicless, + .topicless_to_sample = serdata_rmw_topicless_to_sample, + .free = serdata_rmw_free +}; + +static void sertopic_rmw_deinit (struct ddsi_sertopic *tpcmn) +{ + struct sertopic_rmw *tp = static_cast (tpcmn); + delete tp; +} + +static void sertopic_rmw_zero_samples (const struct ddsi_sertopic *d __attribute__ ((unused)), void *samples __attribute__ ((unused)), size_t count __attribute__ ((unused))) +{ + /* Not using code paths that rely on the samples getting zero'd out */ +} + +static void sertopic_rmw_realloc_samples (void **ptrs __attribute__ ((unused)), const struct ddsi_sertopic *d __attribute__ ((unused)), void *old __attribute__ ((unused)), size_t oldcount __attribute__ ((unused)), size_t count __attribute__ ((unused))) +{ + /* Not using code paths that rely on this (loans, dispose, unregister with instance handle, + content filters) */ + abort (); +} + +static void sertopic_rmw_free_samples (const struct ddsi_sertopic *d __attribute__ ((unused)), void **ptrs __attribute__ ((unused)), size_t count __attribute__ ((unused)), dds_free_op_t op) +{ + /* Not using code paths that rely on this (dispose, unregister with instance handle, content + filters) */ + assert (!(op & DDS_FREE_ALL_BIT)); +} + +static const struct ddsi_sertopic_ops sertopic_rmw_ops = { + .deinit = sertopic_rmw_deinit, + .zero_samples = sertopic_rmw_zero_samples, + .realloc_samples = sertopic_rmw_realloc_samples, + .free_samples = sertopic_rmw_free_samples +}; + +struct sertopic_rmw *create_sertopic (const char *topicname, const char *type_support_identifier, void *type_support, bool is_request_header) +{ + struct sertopic_rmw *st = new struct sertopic_rmw; + st->cpp_name = std::string (topicname); + st->cpp_type_name = std::string ("absent"); // FIXME: obviously a hack + st->cpp_name_type_name = st->cpp_name + std::string (";") + std::string (st->cpp_type_name); + st->ops = &sertopic_rmw_ops; + st->serdata_ops = &serdata_rmw_ops; + st->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->serdata_ops); + st->name_type_name = const_cast (st->cpp_name_type_name.c_str ()); + st->name = const_cast (st->cpp_name.c_str ()); + st->type_name = const_cast (st->cpp_type_name.c_str ()); + st->iid = ddsi_iid_gen (); + ddsrt_atomic_st32 (&st->refc, 1); + + st->type_support.typesupport_identifier_ = type_support_identifier; + st->type_support.type_support_ = type_support; + st->is_request_header = is_request_header; + return st; +} + +#pragma GCC diagnostic pop diff --git a/rmw_cyclonedds_cpp/src/serdes.cpp b/rmw_cyclonedds_cpp/src/serdes.cpp index a44e346..76acc62 100644 --- a/rmw_cyclonedds_cpp/src/serdes.cpp +++ b/rmw_cyclonedds_cpp/src/serdes.cpp @@ -1,53 +1,22 @@ #include "rmw_cyclonedds_cpp/serdes.hpp" -cycser::cycser(struct sertopic *topic) +cycser::cycser (std::vector& dst_) + : dst (dst_) + , off (0) { - st = ddsi_serstate_new(topic); - sd = nullptr; + dst.reserve (4); + unsigned char *magic = dst.data (); + /* FIXME: hard code to little endian ... and ignoring endianness in deser */ + magic[0] = 0; + magic[1] = 3; + /* options: */ + magic[2] = 0; + magic[3] = 0; } -cycser::~cycser() -{ - if (sd == nullptr) { - ddsi_serstate_release(st); - } else { - ddsi_serdata_unref(sd); - } -} - -cycser& cycser::ref() -{ - assert(sd != nullptr); - ddsi_serdata_ref(sd); - return *this; -} - -void cycser::unref() -{ - assert(sd != nullptr); - ddsi_serdata_unref(sd); -} - -cycser& cycser::fix() -{ - assert(sd == nullptr); - sd = ddsi_serstate_fix(st); - return *this; -} - -struct serdata *cycser::data() -{ - assert(sd != nullptr); - return sd; -} - -cycdeser::cycdeser(const void *data_, size_t size_) -{ - data = static_cast(data_); - lim = size_; - pos = 0; -} - -cycdeser::~cycdeser() +cycdeser::cycdeser (const void *data_, size_t size_) + : data (static_cast (data_) + 4) + , pos (0) + , lim (size_ - 4) { }