Serialize into initialized memory, not vector (#75)

Make serdata a private header
Serialize into uninitialize data, not a vector.
This commit is contained in:
Dan Rose 2019-12-06 10:21:18 -06:00 committed by GitHub
parent 88e74c322f
commit b322f478de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 91 additions and 66 deletions

View file

@ -91,14 +91,18 @@ struct DataCursor : public CDRCursor
: origin(position), position(position) {} : origin(position), position(position) {}
size_t offset() const final {return (const byte *)position - (const byte *)origin;} size_t offset() const final {return (const byte *)position - (const byte *)origin;}
void advance(size_t n_bytes) final {position = byte_offset(position, n_bytes);} void advance(size_t n_bytes) final
{
std::memset(position, '\0', n_bytes);
position = byte_offset(position, n_bytes);
}
void put_bytes(const void * bytes, size_t n_bytes) final void put_bytes(const void * bytes, size_t n_bytes) final
{ {
if (n_bytes == 0) { if (n_bytes == 0) {
return; return;
} }
std::memcpy(position, bytes, n_bytes); std::memcpy(position, bytes, n_bytes);
advance(n_bytes); position = byte_offset(position, n_bytes);
} }
bool ignores_data() const final {return false;} bool ignores_data() const final {return false;}
void rebase(ptrdiff_t relative_origin) final {origin = byte_offset(origin, relative_origin);} void rebase(ptrdiff_t relative_origin) final {origin = byte_offset(origin, relative_origin);}

View file

@ -15,8 +15,8 @@
#define SERIALIZATION_HPP_ #define SERIALIZATION_HPP_
#include "TypeSupport2.hpp" #include "TypeSupport2.hpp"
#include "rmw_cyclonedds_cpp/serdata.hpp"
#include "rosidl_generator_c/service_type_support_struct.h" #include "rosidl_generator_c/service_type_support_struct.h"
#include "serdata.hpp"
namespace rmw_cyclonedds_cpp namespace rmw_cyclonedds_cpp
{ {

View file

@ -54,7 +54,7 @@
#include "dds/dds.h" #include "dds/dds.h"
#include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/ddsi_sertopic.h"
#include "rmw_cyclonedds_cpp/serdes.hpp" #include "rmw_cyclonedds_cpp/serdes.hpp"
#include "rmw_cyclonedds_cpp/serdata.hpp" #include "serdata.hpp"
/* Proper multi-domain support requires eliminating the "extra" participant, which in turn relies on /* Proper multi-domain support requires eliminating the "extra" participant, which in turn relies on
the promotion of the Cyclone DDS library instance and the daomsin to full-fledged entities. The the promotion of the Cyclone DDS library instance and the daomsin to full-fledged entities. The
@ -1651,15 +1651,15 @@ static rmw_ret_t rmw_take_ser_int(
memcpy(message_info->publisher_gid.data, &info.publication_handle, memcpy(message_info->publisher_gid.data, &info.publication_handle,
sizeof(info.publication_handle)); sizeof(info.publication_handle));
} }
auto d = static_cast<struct serdata_rmw *>(dcmn); auto d = static_cast<serdata_rmw *>(dcmn);
/* FIXME: what about the header - should be included or not? */ /* FIXME: what about the header - should be included or not? */
if (rmw_serialized_message_resize(serialized_message, d->data.size()) != RMW_RET_OK) { if (rmw_serialized_message_resize(serialized_message, d->size()) != RMW_RET_OK) {
ddsi_serdata_unref(dcmn); ddsi_serdata_unref(dcmn);
*taken = false; *taken = false;
return RMW_RET_ERROR; return RMW_RET_ERROR;
} }
memcpy(serialized_message->buffer, d->data.data(), d->data.size()); memcpy(serialized_message->buffer, d->data(), d->size());
serialized_message->buffer_length = d->data.size(); serialized_message->buffer_length = d->size();
ddsi_serdata_unref(dcmn); ddsi_serdata_unref(dcmn);
*taken = true; *taken = true;
return RMW_RET_OK; return RMW_RET_OK;

View file

@ -11,7 +11,9 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "rmw_cyclonedds_cpp/serdata.hpp" #include "serdata.hpp"
#include <rmw/allocators.h>
#include <cstring> #include <cstring>
#include <memory> #include <memory>
@ -20,8 +22,9 @@
#include <string> #include <string>
#include <utility> #include <utility>
#include "TypeSupport2.hpp"
#include "Serialization.hpp" #include "Serialization.hpp"
#include "TypeSupport2.hpp"
#include "bytewise.hpp"
#include "dds/ddsi/q_radmin.h" #include "dds/ddsi/q_radmin.h"
#include "rmw/error_handling.h" #include "rmw/error_handling.h"
#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp" #include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp"
@ -108,47 +111,42 @@ void * create_response_type_support(
static uint32_t serdata_rmw_size(const struct ddsi_serdata * dcmn) static uint32_t serdata_rmw_size(const struct ddsi_serdata * dcmn)
{ {
const struct serdata_rmw * d = static_cast<const struct serdata_rmw *>(dcmn); return static_cast<const serdata_rmw *>(dcmn)->size();
return static_cast<uint32_t>(d->data.size());
} }
static void serdata_rmw_free(struct ddsi_serdata * dcmn) static void serdata_rmw_free(struct ddsi_serdata * dcmn)
{ {
const struct serdata_rmw * d = static_cast<const struct serdata_rmw *>(dcmn); auto * d = static_cast<const serdata_rmw *>(dcmn);
delete d; delete d;
} }
static struct serdata_rmw * rmw_serdata_new(
const struct ddsi_sertopic * topic,
enum ddsi_serdata_kind kind)
{
struct serdata_rmw * sd = new serdata_rmw;
ddsi_serdata_init(sd, topic, kind);
return sd;
}
static struct ddsi_serdata * serdata_rmw_from_ser( static struct ddsi_serdata * serdata_rmw_from_ser(
const struct ddsi_sertopic * topic, const struct ddsi_sertopic * topic,
enum ddsi_serdata_kind kind, enum ddsi_serdata_kind kind,
const struct nn_rdata * fragchain, size_t size) const struct nn_rdata * fragchain, size_t size)
{ {
struct serdata_rmw * d = rmw_serdata_new(topic, kind); auto d = std::make_unique<serdata_rmw>(topic, kind);
uint32_t off = 0; uint32_t off = 0;
assert(fragchain->min == 0); assert(fragchain->min == 0);
assert(fragchain->maxp1 >= off); /* CDR header must be in first fragment */ assert(fragchain->maxp1 >= off); /* CDR header must be in first fragment */
d->data.reserve(size); d->resize(size);
auto cursor = d->data();
while (fragchain) { while (fragchain) {
if (fragchain->maxp1 > off) { if (fragchain->maxp1 > off) {
/* only copy if this fragment adds data */ /* only copy if this fragment adds data */
const unsigned char * payload = const unsigned char * payload =
NN_RMSG_PAYLOADOFF(fragchain->rmsg, NN_RDATA_PAYLOAD_OFF(fragchain)); NN_RMSG_PAYLOADOFF(fragchain->rmsg, NN_RDATA_PAYLOAD_OFF(fragchain));
d->data.insert( auto src = payload + off - fragchain->min;
d->data.end(), payload + off - fragchain->min, payload + fragchain->maxp1 - fragchain->min); auto n_bytes = fragchain->maxp1 - off;
memcpy(cursor, src, n_bytes);
cursor = byte_offset(cursor, n_bytes);
off = fragchain->maxp1; off = fragchain->maxp1;
assert(off < size);
} }
fragchain = fragchain->nextfrag; fragchain = fragchain->nextfrag;
} }
return d; return d.release();
} }
static struct ddsi_serdata * serdata_rmw_from_keyhash( static struct ddsi_serdata * serdata_rmw_from_keyhash(
@ -157,14 +155,7 @@ static struct ddsi_serdata * serdata_rmw_from_keyhash(
{ {
static_cast<void>(keyhash); // unused static_cast<void>(keyhash); // unused
/* there is no key field, so from_keyhash is trivial */ /* there is no key field, so from_keyhash is trivial */
return rmw_serdata_new(topic, SDK_KEY); return new serdata_rmw(topic, SDK_KEY);
}
size_t next_multiple_of_4(size_t n_bytes)
{
/* FIXME: CDR padding in DDSI makes me do this to avoid reading beyond the bounds of the vector
when copying data to network. Should fix Cyclone to handle that more elegantly. */
return n_bytes + (-n_bytes) % 4;
} }
static struct ddsi_serdata * serdata_rmw_from_sample( static struct ddsi_serdata * serdata_rmw_from_sample(
@ -174,13 +165,13 @@ static struct ddsi_serdata * serdata_rmw_from_sample(
{ {
try { try {
const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(topiccmn); const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(topiccmn);
struct serdata_rmw * d = rmw_serdata_new(topic, kind); auto d = std::make_unique<serdata_rmw>(topic, kind);
if (kind != SDK_DATA) { if (kind != SDK_DATA) {
/* ROS2 doesn't do keys, so SDK_KEY is trivial */ /* ROS2 doesn't do keys, so SDK_KEY is trivial */
} else if (!topic->is_request_header) { } else if (!topic->is_request_header) {
size_t sz = rmw_cyclonedds_cpp::get_serialized_size(sample, topic->value_type.get()); size_t sz = rmw_cyclonedds_cpp::get_serialized_size(sample, topic->value_type.get());
d->data.resize(next_multiple_of_4(sz)); d->resize(sz);
rmw_cyclonedds_cpp::serialize(d->data.data(), sample, topic->value_type.get()); rmw_cyclonedds_cpp::serialize(d->data(), sample, topic->value_type.get());
} else { } else {
/* inject the service invocation header data into the CDR stream -- /* inject the service invocation header data into the CDR stream --
* I haven't checked how it is done in the official RMW implementations, so it is * I haven't checked how it is done in the official RMW implementations, so it is
@ -188,10 +179,10 @@ static struct ddsi_serdata * serdata_rmw_from_sample(
auto wrap = *static_cast<const cdds_request_wrapper_t *>(sample); auto wrap = *static_cast<const cdds_request_wrapper_t *>(sample);
size_t sz = rmw_cyclonedds_cpp::get_serialized_size(wrap, topic->value_type.get()); size_t sz = rmw_cyclonedds_cpp::get_serialized_size(wrap, topic->value_type.get());
d->data.resize(next_multiple_of_4(sz)); d->resize(sz);
rmw_cyclonedds_cpp::serialize(d->data.data(), wrap, topic->value_type.get()); rmw_cyclonedds_cpp::serialize(d->data(), wrap, topic->value_type.get());
} }
return d; return d.release();
} catch (std::exception & e) { } catch (std::exception & e) {
RMW_SET_ERROR_MSG(e.what()); RMW_SET_ERROR_MSG(e.what());
return nullptr; return nullptr;
@ -203,32 +194,32 @@ struct ddsi_serdata * serdata_rmw_from_serialized_message(
const void * raw, size_t size) const void * raw, size_t size)
{ {
const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(topiccmn); const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(topiccmn);
struct serdata_rmw * d = rmw_serdata_new(topic, SDK_DATA); auto d = new serdata_rmw(topic, SDK_DATA);
d->data.resize(next_multiple_of_4(size)); d->resize(size);
memcpy(d->data.data(), raw, size); memcpy(d->data(), raw, size);
return d; return d;
} }
static struct ddsi_serdata * serdata_rmw_to_topicless(const struct ddsi_serdata * dcmn) static struct ddsi_serdata * serdata_rmw_to_topicless(const struct ddsi_serdata * dcmn)
{ {
const struct serdata_rmw * d = static_cast<const struct serdata_rmw *>(dcmn); auto d = static_cast<const serdata_rmw *>(dcmn);
struct serdata_rmw * d1 = rmw_serdata_new(d->topic, SDK_KEY); auto d1 = new serdata_rmw(d->topic, SDK_KEY);
d1->topic = nullptr; d1->topic = nullptr;
return d1; return d1;
} }
static void serdata_rmw_to_ser(const struct ddsi_serdata * dcmn, size_t off, size_t sz, void * buf) static void serdata_rmw_to_ser(const struct ddsi_serdata * dcmn, size_t off, size_t sz, void * buf)
{ {
const struct serdata_rmw * d = static_cast<const struct serdata_rmw *>(dcmn); auto d = static_cast<const serdata_rmw *>(dcmn);
memcpy(buf, reinterpret_cast<const char *>(d->data.data()) + off, sz); memcpy(buf, byte_offset(d->data(), off), sz);
} }
static struct ddsi_serdata * serdata_rmw_to_ser_ref( static struct ddsi_serdata * serdata_rmw_to_ser_ref(
const struct ddsi_serdata * dcmn, size_t off, const struct ddsi_serdata * dcmn, size_t off,
size_t sz, ddsrt_iovec_t * ref) size_t sz, ddsrt_iovec_t * ref)
{ {
const struct serdata_rmw * d = static_cast<const struct serdata_rmw *>(dcmn); auto d = static_cast<const serdata_rmw *>(dcmn);
ref->iov_base = const_cast<char *>(reinterpret_cast<const char *>(d->data.data())) + off; ref->iov_base = byte_offset(d->data(), off);
ref->iov_len = (ddsrt_iov_len_t) sz; ref->iov_len = (ddsrt_iov_len_t) sz;
return ddsi_serdata_ref(d); return ddsi_serdata_ref(d);
} }
@ -236,8 +227,7 @@ static struct ddsi_serdata * serdata_rmw_to_ser_ref(
static void serdata_rmw_to_ser_unref(struct ddsi_serdata * dcmn, const ddsrt_iovec_t * ref) static void serdata_rmw_to_ser_unref(struct ddsi_serdata * dcmn, const ddsrt_iovec_t * ref)
{ {
static_cast<void>(ref); // unused static_cast<void>(ref); // unused
struct serdata_rmw * d = static_cast<struct serdata_rmw *>(dcmn); ddsi_serdata_unref(static_cast<serdata_rmw *>(dcmn));
ddsi_serdata_unref(d);
} }
static bool serdata_rmw_to_sample( static bool serdata_rmw_to_sample(
@ -247,14 +237,14 @@ static bool serdata_rmw_to_sample(
try { try {
static_cast<void>(bufptr); // unused static_cast<void>(bufptr); // unused
static_cast<void>(buflim); // unused static_cast<void>(buflim); // unused
const struct serdata_rmw * d = static_cast<const struct serdata_rmw *>(dcmn); auto d = static_cast<const serdata_rmw *>(dcmn);
const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(d->topic); const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(d->topic);
assert(bufptr == NULL); assert(bufptr == NULL);
assert(buflim == NULL); assert(buflim == NULL);
if (d->kind != SDK_DATA) { if (d->kind != SDK_DATA) {
/* ROS2 doesn't do keys in a meaningful way yet */ /* ROS2 doesn't do keys in a meaningful way yet */
} else if (!topic->is_request_header) { } else if (!topic->is_request_header) {
cycdeser sd(static_cast<const void *>(d->data.data()), d->data.size()); cycdeser sd(d->data(), d->size());
if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) {
auto typed_typesupport = auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_); static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_);
@ -270,7 +260,7 @@ static bool serdata_rmw_to_sample(
probably incompatible. */ probably incompatible. */
cdds_request_wrapper_t * const wrap = static_cast<cdds_request_wrapper_t *>(sample); cdds_request_wrapper_t * const wrap = static_cast<cdds_request_wrapper_t *>(sample);
auto prefix = [wrap](cycdeser & ser) {ser >> wrap->header.guid; ser >> wrap->header.seq;}; auto prefix = [wrap](cycdeser & ser) {ser >> wrap->header.guid; ser >> wrap->header.seq;};
cycdeser sd(static_cast<const void *>(d->data.data()), d->data.size()); cycdeser sd(d->data(), d->size());
if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) {
auto typed_typesupport = auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_); static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_);
@ -319,13 +309,13 @@ static size_t serdata_rmw_print(
const struct ddsi_sertopic * tpcmn, const struct ddsi_serdata * dcmn, char * buf, size_t bufsize) const struct ddsi_sertopic * tpcmn, const struct ddsi_serdata * dcmn, char * buf, size_t bufsize)
{ {
try { try {
const struct serdata_rmw * d = static_cast<const struct serdata_rmw *>(dcmn); auto d = static_cast<const serdata_rmw *>(dcmn);
const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(tpcmn); const struct sertopic_rmw * topic = static_cast<const struct sertopic_rmw *>(tpcmn);
if (d->kind != SDK_DATA) { if (d->kind != SDK_DATA) {
/* ROS2 doesn't do keys in a meaningful way yet */ /* ROS2 doesn't do keys in a meaningful way yet */
return static_cast<size_t>(snprintf(buf, bufsize, ":k:{}")); return static_cast<size_t>(snprintf(buf, bufsize, ":k:{}"));
} else if (!topic->is_request_header) { } else if (!topic->is_request_header) {
cycprint sd(buf, bufsize, static_cast<const void *>(d->data.data()), d->data.size()); cycprint sd(buf, bufsize, d->data(), d->size());
if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) {
auto typed_typesupport = auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_); static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_);
@ -343,7 +333,7 @@ static size_t serdata_rmw_print(
auto prefix = [&wrap](cycprint & ser) { auto prefix = [&wrap](cycprint & ser) {
ser >> wrap.header.guid; ser.print_constant(","); ser >> wrap.header.seq; ser >> wrap.header.guid; ser.print_constant(","); ser >> wrap.header.seq;
}; };
cycprint sd(buf, bufsize, static_cast<const void *>(d->data.data()), d->data.size()); cycprint sd(buf, bufsize, d->data(), d->size());
if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) { if (using_introspection_c_typesupport(topic->type_support.typesupport_identifier_)) {
auto typed_typesupport = auto typed_typesupport =
static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_); static_cast<MessageTypeSupport_c *>(topic->type_support.type_support_);
@ -500,3 +490,27 @@ struct sertopic_rmw * create_sertopic(
st->value_type = std::move(message_type); st->value_type = std::move(message_type);
return st; return st;
} }
void serdata_rmw::resize(uint32_t requested_size)
{
if (!requested_size) {
m_size = 0;
m_data.reset();
return;
}
/* FIXME: CDR padding in DDSI makes me do this to avoid reading beyond the bounds
when copying data to network. Should fix Cyclone to handle that more elegantly. */
size_t n_pad_bytes = (-requested_size) % 4;
m_data.reset(new byte[requested_size + n_pad_bytes]);
m_size = requested_size + n_pad_bytes;
// zero the very end. The caller isn't necessarily going to overwrite it.
std::memset(byte_offset(m_data.get(), requested_size), '\0', n_pad_bytes);
}
serdata_rmw::serdata_rmw(const ddsi_sertopic * topic, ddsi_serdata_kind kind)
: ddsi_serdata{}
{
ddsi_serdata_init(this, topic, kind);
}

View file

@ -11,15 +11,16 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#ifndef RMW_CYCLONEDDS_CPP__SERDATA_HPP_ #ifndef SERDATA_HPP_
#define RMW_CYCLONEDDS_CPP__SERDATA_HPP_ #define SERDATA_HPP_
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector>
#include "bytewise.hpp"
#include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/ddsi_sertopic.h"
#include "TypeSupport2.hpp"
struct CddsTypeSupport struct CddsTypeSupport
{ {
@ -27,8 +28,6 @@ struct CddsTypeSupport
const char * typesupport_identifier_; const char * typesupport_identifier_;
}; };
namespace rmw_cyclonedds_cpp {class StructValueType;}
struct sertopic_rmw : ddsi_sertopic struct sertopic_rmw : ddsi_sertopic
{ {
CddsTypeSupport type_support; CddsTypeSupport type_support;
@ -41,11 +40,19 @@ struct sertopic_rmw : ddsi_sertopic
std::unique_ptr<const rmw_cyclonedds_cpp::StructValueType> value_type; std::unique_ptr<const rmw_cyclonedds_cpp::StructValueType> value_type;
}; };
struct serdata_rmw : ddsi_serdata class serdata_rmw : public ddsi_serdata
{ {
protected:
uint32_t m_size {0};
/* first two bytes of data is CDR encoding /* first two bytes of data is CDR encoding
second two bytes are encoding options */ second two bytes are encoding options */
std::vector<unsigned char> data; std::unique_ptr<byte[]> m_data {nullptr};
public:
serdata_rmw(const ddsi_sertopic * topic, ddsi_serdata_kind kind);
void resize(uint32_t requested_size);
uint32_t size() const {return m_size;}
void * data() const {return m_data.get();}
}; };
typedef struct cdds_request_header typedef struct cdds_request_header
@ -79,4 +86,4 @@ struct ddsi_serdata * serdata_rmw_from_serialized_message(
const struct ddsi_sertopic * topiccmn, const struct ddsi_sertopic * topiccmn,
const void * raw, size_t size); const void * raw, size_t size);
#endif // RMW_CYCLONEDDS_CPP__SERDATA_HPP_ #endif // SERDATA_HPP_