diff --git a/rmw_cyclonedds_cpp/src/Serialization.cpp b/rmw_cyclonedds_cpp/src/Serialization.cpp index e1b5d08..559e1fa 100644 --- a/rmw_cyclonedds_cpp/src/Serialization.cpp +++ b/rmw_cyclonedds_cpp/src/Serialization.cpp @@ -91,14 +91,18 @@ struct DataCursor : public CDRCursor : origin(position), position(position) {} size_t offset() const final {return (const byte *)position - (const byte *)origin;} - void advance(size_t n_bytes) final {position = byte_offset(position, n_bytes);} + void advance(size_t n_bytes) final + { + std::memset(position, '\0', n_bytes); + position = byte_offset(position, n_bytes); + } void put_bytes(const void * bytes, size_t n_bytes) final { if (n_bytes == 0) { return; } std::memcpy(position, bytes, n_bytes); - advance(n_bytes); + position = byte_offset(position, n_bytes); } bool ignores_data() const final {return false;} void rebase(ptrdiff_t relative_origin) final {origin = byte_offset(origin, relative_origin);} diff --git a/rmw_cyclonedds_cpp/src/Serialization.hpp b/rmw_cyclonedds_cpp/src/Serialization.hpp index 2fcf2ed..3258b34 100644 --- a/rmw_cyclonedds_cpp/src/Serialization.hpp +++ b/rmw_cyclonedds_cpp/src/Serialization.hpp @@ -15,8 +15,8 @@ #define SERIALIZATION_HPP_ #include "TypeSupport2.hpp" -#include "rmw_cyclonedds_cpp/serdata.hpp" #include "rosidl_generator_c/service_type_support_struct.h" +#include "serdata.hpp" namespace rmw_cyclonedds_cpp { diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index e6487e9..0bdf5c0 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -54,7 +54,7 @@ #include "dds/dds.h" #include "dds/ddsi/ddsi_sertopic.h" #include "rmw_cyclonedds_cpp/serdes.hpp" -#include "rmw_cyclonedds_cpp/serdata.hpp" +#include "serdata.hpp" /* Proper multi-domain support requires eliminating the "extra" participant, which in turn relies on the promotion of the Cyclone DDS library instance and the daomsin to full-fledged entities. The @@ -1651,15 +1651,15 @@ static rmw_ret_t rmw_take_ser_int( memcpy(message_info->publisher_gid.data, &info.publication_handle, sizeof(info.publication_handle)); } - auto d = static_cast(dcmn); + auto d = static_cast(dcmn); /* FIXME: what about the header - should be included or not? */ - if (rmw_serialized_message_resize(serialized_message, d->data.size()) != RMW_RET_OK) { + if (rmw_serialized_message_resize(serialized_message, d->size()) != RMW_RET_OK) { ddsi_serdata_unref(dcmn); *taken = false; return RMW_RET_ERROR; } - memcpy(serialized_message->buffer, d->data.data(), d->data.size()); - serialized_message->buffer_length = d->data.size(); + memcpy(serialized_message->buffer, d->data(), d->size()); + serialized_message->buffer_length = d->size(); ddsi_serdata_unref(dcmn); *taken = true; return RMW_RET_OK; diff --git a/rmw_cyclonedds_cpp/src/serdata.cpp b/rmw_cyclonedds_cpp/src/serdata.cpp index 52a929d..23a85a9 100644 --- a/rmw_cyclonedds_cpp/src/serdata.cpp +++ b/rmw_cyclonedds_cpp/src/serdata.cpp @@ -11,7 +11,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#include "rmw_cyclonedds_cpp/serdata.hpp" +#include "serdata.hpp" + +#include #include #include @@ -20,8 +22,9 @@ #include #include -#include "TypeSupport2.hpp" #include "Serialization.hpp" +#include "TypeSupport2.hpp" +#include "bytewise.hpp" #include "dds/ddsi/q_radmin.h" #include "rmw/error_handling.h" #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" @@ -108,47 +111,42 @@ void * create_response_type_support( static uint32_t serdata_rmw_size(const struct ddsi_serdata * dcmn) { - const struct serdata_rmw * d = static_cast(dcmn); - return static_cast(d->data.size()); + return static_cast(dcmn)->size(); } static void serdata_rmw_free(struct ddsi_serdata * dcmn) { - const struct serdata_rmw * d = static_cast(dcmn); + auto * d = static_cast(dcmn); delete d; } -static struct serdata_rmw * rmw_serdata_new( - const struct ddsi_sertopic * topic, - enum ddsi_serdata_kind kind) -{ - struct serdata_rmw * sd = new serdata_rmw; - ddsi_serdata_init(sd, topic, kind); - return sd; -} - static struct ddsi_serdata * serdata_rmw_from_ser( const struct ddsi_sertopic * topic, enum ddsi_serdata_kind kind, const struct nn_rdata * fragchain, size_t size) { - struct serdata_rmw * d = rmw_serdata_new(topic, kind); + auto d = std::make_unique(topic, kind); uint32_t off = 0; assert(fragchain->min == 0); assert(fragchain->maxp1 >= off); /* CDR header must be in first fragment */ - d->data.reserve(size); + d->resize(size); + + auto cursor = d->data(); while (fragchain) { if (fragchain->maxp1 > off) { /* only copy if this fragment adds data */ const unsigned char * payload = NN_RMSG_PAYLOADOFF(fragchain->rmsg, NN_RDATA_PAYLOAD_OFF(fragchain)); - d->data.insert( - d->data.end(), payload + off - fragchain->min, payload + fragchain->maxp1 - fragchain->min); + auto src = payload + off - fragchain->min; + auto n_bytes = fragchain->maxp1 - off; + memcpy(cursor, src, n_bytes); + cursor = byte_offset(cursor, n_bytes); off = fragchain->maxp1; + assert(off < size); } fragchain = fragchain->nextfrag; } - return d; + return d.release(); } static struct ddsi_serdata * serdata_rmw_from_keyhash( @@ -157,14 +155,7 @@ static struct ddsi_serdata * serdata_rmw_from_keyhash( { static_cast(keyhash); // unused /* there is no key field, so from_keyhash is trivial */ - return rmw_serdata_new(topic, SDK_KEY); -} - -size_t next_multiple_of_4(size_t n_bytes) -{ - /* FIXME: CDR padding in DDSI makes me do this to avoid reading beyond the bounds of the vector - when copying data to network. Should fix Cyclone to handle that more elegantly. */ - return n_bytes + (-n_bytes) % 4; + return new serdata_rmw(topic, SDK_KEY); } static struct ddsi_serdata * serdata_rmw_from_sample( @@ -174,13 +165,13 @@ static struct ddsi_serdata * serdata_rmw_from_sample( { try { const struct sertopic_rmw * topic = static_cast(topiccmn); - struct serdata_rmw * d = rmw_serdata_new(topic, kind); + auto d = std::make_unique(topic, kind); if (kind != SDK_DATA) { /* ROS2 doesn't do keys, so SDK_KEY is trivial */ } else if (!topic->is_request_header) { size_t sz = rmw_cyclonedds_cpp::get_serialized_size(sample, topic->value_type.get()); - d->data.resize(next_multiple_of_4(sz)); - rmw_cyclonedds_cpp::serialize(d->data.data(), sample, topic->value_type.get()); + d->resize(sz); + rmw_cyclonedds_cpp::serialize(d->data(), sample, topic->value_type.get()); } else { /* inject the service invocation header data into the CDR stream -- * I haven't checked how it is done in the official RMW implementations, so it is @@ -188,10 +179,10 @@ static struct ddsi_serdata * serdata_rmw_from_sample( auto wrap = *static_cast(sample); size_t sz = rmw_cyclonedds_cpp::get_serialized_size(wrap, topic->value_type.get()); - d->data.resize(next_multiple_of_4(sz)); - rmw_cyclonedds_cpp::serialize(d->data.data(), wrap, topic->value_type.get()); + d->resize(sz); + rmw_cyclonedds_cpp::serialize(d->data(), wrap, topic->value_type.get()); } - return d; + return d.release(); } catch (std::exception & e) { RMW_SET_ERROR_MSG(e.what()); return nullptr; @@ -203,32 +194,32 @@ struct ddsi_serdata * serdata_rmw_from_serialized_message( const void * raw, size_t size) { const struct sertopic_rmw * topic = static_cast(topiccmn); - struct serdata_rmw * d = rmw_serdata_new(topic, SDK_DATA); - d->data.resize(next_multiple_of_4(size)); - memcpy(d->data.data(), raw, size); + auto d = new serdata_rmw(topic, SDK_DATA); + d->resize(size); + memcpy(d->data(), raw, size); return d; } static struct ddsi_serdata * serdata_rmw_to_topicless(const struct ddsi_serdata * dcmn) { - const struct serdata_rmw * d = static_cast(dcmn); - struct serdata_rmw * d1 = rmw_serdata_new(d->topic, SDK_KEY); + auto d = static_cast(dcmn); + auto d1 = new serdata_rmw(d->topic, SDK_KEY); d1->topic = nullptr; return d1; } static void serdata_rmw_to_ser(const struct ddsi_serdata * dcmn, size_t off, size_t sz, void * buf) { - const struct serdata_rmw * d = static_cast(dcmn); - memcpy(buf, reinterpret_cast(d->data.data()) + off, sz); + auto d = static_cast(dcmn); + memcpy(buf, byte_offset(d->data(), off), sz); } static struct ddsi_serdata * serdata_rmw_to_ser_ref( const struct ddsi_serdata * dcmn, size_t off, size_t sz, ddsrt_iovec_t * ref) { - const struct serdata_rmw * d = static_cast(dcmn); - ref->iov_base = const_cast(reinterpret_cast(d->data.data())) + off; + auto d = static_cast(dcmn); + ref->iov_base = byte_offset(d->data(), off); ref->iov_len = (ddsrt_iov_len_t) sz; return ddsi_serdata_ref(d); } @@ -236,8 +227,7 @@ static struct ddsi_serdata * serdata_rmw_to_ser_ref( static void serdata_rmw_to_ser_unref(struct ddsi_serdata * dcmn, const ddsrt_iovec_t * ref) { static_cast(ref); // unused - struct serdata_rmw * d = static_cast(dcmn); - ddsi_serdata_unref(d); + ddsi_serdata_unref(static_cast(dcmn)); } static bool serdata_rmw_to_sample( @@ -247,14 +237,14 @@ static bool serdata_rmw_to_sample( try { static_cast(bufptr); // unused static_cast(buflim); // unused - const struct serdata_rmw * d = static_cast(dcmn); + auto d = static_cast(dcmn); const struct sertopic_rmw * topic = static_cast(d->topic); assert(bufptr == NULL); assert(buflim == NULL); if (d->kind != SDK_DATA) { /* ROS2 doesn't do keys in a meaningful way yet */ } else if (!topic->is_request_header) { - cycdeser sd(static_cast(d->data.data()), d->data.size()); + cycdeser sd(d->data(), d->size()); if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(topic->type_support.type_support_); @@ -270,7 +260,7 @@ static bool serdata_rmw_to_sample( probably incompatible. */ cdds_request_wrapper_t * const wrap = static_cast(sample); auto prefix = [wrap](cycdeser & ser) {ser >> wrap->header.guid; ser >> wrap->header.seq;}; - cycdeser sd(static_cast(d->data.data()), d->data.size()); + cycdeser sd(d->data(), d->size()); if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(topic->type_support.type_support_); @@ -319,13 +309,13 @@ static size_t serdata_rmw_print( const struct ddsi_sertopic * tpcmn, const struct ddsi_serdata * dcmn, char * buf, size_t bufsize) { try { - const struct serdata_rmw * d = static_cast(dcmn); + auto d = static_cast(dcmn); const struct sertopic_rmw * topic = static_cast(tpcmn); if (d->kind != SDK_DATA) { /* ROS2 doesn't do keys in a meaningful way yet */ return static_cast(snprintf(buf, bufsize, ":k:{}")); } else if (!topic->is_request_header) { - cycprint sd(buf, bufsize, static_cast(d->data.data()), d->data.size()); + cycprint sd(buf, bufsize, d->data(), d->size()); if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(topic->type_support.type_support_); @@ -343,7 +333,7 @@ static size_t serdata_rmw_print( auto prefix = [&wrap](cycprint & ser) { ser >> wrap.header.guid; ser.print_constant(","); ser >> wrap.header.seq; }; - cycprint sd(buf, bufsize, static_cast(d->data.data()), d->data.size()); + cycprint sd(buf, bufsize, d->data(), d->size()); if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { auto typed_typesupport = static_cast(topic->type_support.type_support_); @@ -500,3 +490,27 @@ struct sertopic_rmw * create_sertopic( st->value_type = std::move(message_type); return st; } + +void serdata_rmw::resize(uint32_t requested_size) +{ + if (!requested_size) { + m_size = 0; + m_data.reset(); + return; + } + + /* FIXME: CDR padding in DDSI makes me do this to avoid reading beyond the bounds + when copying data to network. Should fix Cyclone to handle that more elegantly. */ + size_t n_pad_bytes = (-requested_size) % 4; + m_data.reset(new byte[requested_size + n_pad_bytes]); + m_size = requested_size + n_pad_bytes; + + // zero the very end. The caller isn't necessarily going to overwrite it. + std::memset(byte_offset(m_data.get(), requested_size), '\0', n_pad_bytes); +} + +serdata_rmw::serdata_rmw(const ddsi_sertopic * topic, ddsi_serdata_kind kind) +: ddsi_serdata{} +{ + ddsi_serdata_init(this, topic, kind); +} diff --git a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp b/rmw_cyclonedds_cpp/src/serdata.hpp similarity index 82% rename from rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp rename to rmw_cyclonedds_cpp/src/serdata.hpp index 53ae249..22d6155 100644 --- a/rmw_cyclonedds_cpp/include/rmw_cyclonedds_cpp/serdata.hpp +++ b/rmw_cyclonedds_cpp/src/serdata.hpp @@ -11,15 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -#ifndef RMW_CYCLONEDDS_CPP__SERDATA_HPP_ -#define RMW_CYCLONEDDS_CPP__SERDATA_HPP_ +#ifndef SERDATA_HPP_ +#define SERDATA_HPP_ #include #include -#include +#include "bytewise.hpp" #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_sertopic.h" +#include "TypeSupport2.hpp" struct CddsTypeSupport { @@ -27,8 +28,6 @@ struct CddsTypeSupport const char * typesupport_identifier_; }; -namespace rmw_cyclonedds_cpp {class StructValueType;} - struct sertopic_rmw : ddsi_sertopic { CddsTypeSupport type_support; @@ -41,11 +40,19 @@ struct sertopic_rmw : ddsi_sertopic std::unique_ptr value_type; }; -struct serdata_rmw : ddsi_serdata +class serdata_rmw : public ddsi_serdata { +protected: + uint32_t m_size {0}; /* first two bytes of data is CDR encoding second two bytes are encoding options */ - std::vector data; + std::unique_ptr m_data {nullptr}; + +public: + serdata_rmw(const ddsi_sertopic * topic, ddsi_serdata_kind kind); + void resize(uint32_t requested_size); + uint32_t size() const {return m_size;} + void * data() const {return m_data.get();} }; typedef struct cdds_request_header @@ -79,4 +86,4 @@ struct ddsi_serdata * serdata_rmw_from_serialized_message( const struct ddsi_sertopic * topiccmn, const void * raw, size_t size); -#endif // RMW_CYCLONEDDS_CPP__SERDATA_HPP_ +#endif // SERDATA_HPP_