replace FastCDR and serialise straight into a serdata to avoid an extra copy

This commit is contained in:
Erik Boasson 2018-07-19 16:37:12 +02:00
parent e6b6ede709
commit 40a042c6dc
12 changed files with 391 additions and 210 deletions

View file

@ -18,7 +18,7 @@
- type names (make a copy of the generic type descriptor, modify the name and pass that)
- should serialize straight into serdata_t, instead of into a FastBuffer that then gets copied
- need to handle endianness differences in deserialization
- topic creation: shouldn't leak topics
@ -72,18 +72,14 @@
#include "rmw/impl/cpp/macros.hpp"
#include "namespace_prefix.hpp"
#include "fastcdr/FastBuffer.h"
#include "rmw_cyclonedds_cpp/MessageTypeSupport.hpp"
#include "rmw_cyclonedds_cpp/ServiceTypeSupport.hpp"
#include "ddsc/dds.h"
extern "C" {
extern void ddsi_serdata_getblob (void **raw, size_t *sz, struct serdata *serdata);
extern void ddsi_serdata_unref (struct serdata *serdata);
}
#include "rmw_cyclonedds_topic.h"
#include "rmw_cyclonedds_cpp/serdes.hpp"
#define RET_ERR_X(msg, code) do { RMW_SET_ERROR_MSG(msg); code; } while(0)
#define RET_NULL_X(var, code) do { if (!var) RET_ERR_X(#var " is null", code); } while(0)
@ -130,12 +126,11 @@ struct CddsNode {
struct CddsPublisher {
dds_entity_t pubh;
dds_instance_handle_t pubiid;
struct sertopic *sertopic;
CddsTypeSupport ts;
};
struct CddsSubscription {
typedef rmw_subscription_t rmw_type;
typedef rmw_subscriptions_t rmw_set_type;
dds_entity_t subh;
dds_entity_t rdcondh;
CddsNode *node;
@ -149,14 +144,10 @@ struct CddsCS {
};
struct CddsClient {
typedef rmw_client_t rmw_type;
typedef rmw_clients_t rmw_set_type;
CddsCS client;
};
struct CddsService {
typedef rmw_service_t rmw_type;
typedef rmw_services_t rmw_set_type;
CddsCS service;
};
@ -199,8 +190,30 @@ using RequestTypeSupport_cpp = rmw_cyclonedds_cpp::RequestTypeSupport<rosidl_typ
using ResponseTypeSupport_c = rmw_cyclonedds_cpp::ResponseTypeSupport<rosidl_typesupport_introspection_c__ServiceMembers, rosidl_typesupport_introspection_c__MessageMembers>;
using ResponseTypeSupport_cpp = rmw_cyclonedds_cpp::ResponseTypeSupport<rosidl_typesupport_introspection_cpp::ServiceMembers, rosidl_typesupport_introspection_cpp::MessageMembers>;
extern "C" {
struct dds_entity;
struct dds_domain;
int32_t dds_entity_lock(dds_entity_t hdl, dds_entity_kind_t kind, struct dds_entity **e);
void dds_entity_unlock(struct dds_entity *e);
struct sertopic *dds_topic_lookup (struct dds_domain *domain, const char *name);
dds_domain *dds__entity_domain(dds_entity* e);
void sertopic_free (struct sertopic * tp);
}
static void clean_waitset_caches();
static struct sertopic *get_sertopic(dds_entity_t topic, const std::string& name)
{
struct dds_entity *x;
struct sertopic *sertopic;
if (dds_entity_lock(topic, DDS_KIND_TOPIC, &x) < 0) {
abort();
}
sertopic = dds_topic_lookup(dds__entity_domain(x), name.c_str());
dds_entity_unlock(x);
return sertopic;
}
static bool using_introspection_c_typesupport(const char *typesupport_identifier)
{
return typesupport_identifier == rosidl_typesupport_introspection_c__identifier;
@ -264,7 +277,7 @@ template<typename ServiceType> const void *get_response_ptr(const void *untyped_
return service_members->response_members_;
}
static bool sermsg(const void *ros_message, eprosima::fastcdr::Cdr& ser, std::function<void(eprosima::fastcdr::Cdr&)> prefix, const CddsTypeSupport& ts)
static bool sermsg(const void *ros_message, cycser& ser, std::function<void(cycser&)> prefix, const CddsTypeSupport& ts)
{
if (using_introspection_c_typesupport(ts.typesupport_identifier_)) {
auto typed_typesupport = static_cast<MessageTypeSupport_c *>(ts.type_support_);
@ -277,7 +290,7 @@ static bool sermsg(const void *ros_message, eprosima::fastcdr::Cdr& ser, std::fu
return false;
}
static bool desermsg(eprosima::fastcdr::Cdr& deser, void *ros_message, std::function<void(eprosima::fastcdr::Cdr&)> prefix, const CddsTypeSupport& ts)
static bool desermsg(cycdeser& deser, void *ros_message, std::function<void(cycdeser&)> prefix, const CddsTypeSupport& ts)
{
if (using_introspection_c_typesupport(ts.typesupport_identifier_)) {
auto typed_typesupport = static_cast<MessageTypeSupport_c *>(ts.type_support_);
@ -415,13 +428,13 @@ extern "C" const rmw_guard_condition_t *rmw_node_get_graph_guard_condition(const
/////////// ///////////
/////////////////////////////////////////////////////////////////////////////////////////
static rmw_ret_t rmw_write_ser(dds_entity_t pubh, eprosima::fastcdr::Cdr& ser)
extern "C" {
int dds_writecdr(dds_entity_t writer, struct serdata *serdata);
}
static rmw_ret_t rmw_write_ser(dds_entity_t pubh, cycser& sd)
{
const size_t sz = ser.getSerializedDataLength();
const void *raw = static_cast<void *>(ser.getBufferPointer());
/* shifting by 4 bytes skips the CDR header -- it should be identical and the entire
writecdr is a hack at the moment anyway */
if (dds_writecdr(pubh, (char *)raw + 4, sz) >= 0) {
if (dds_writecdr(pubh, sd.fix().ref().data()) >= 0) {
return RMW_RET_OK;
} else {
/* FIXME: what is the expected behavior when it times out? */
@ -430,17 +443,16 @@ static rmw_ret_t rmw_write_ser(dds_entity_t pubh, eprosima::fastcdr::Cdr& ser)
return RMW_RET_OK;
}
}
extern "C" rmw_ret_t rmw_publish(const rmw_publisher_t *publisher, const void *ros_message)
{
RET_WRONG_IMPLID(publisher);
RET_NULL(ros_message);
auto pub = static_cast<CddsPublisher *>(publisher->data);
assert(pub);
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
if (sermsg(ros_message, ser, nullptr, pub->ts)) {
return rmw_write_ser(pub->pubh, ser);
cycser sd(pub->sertopic);
if (sermsg(ros_message, sd, nullptr, pub->ts)) {
return rmw_write_ser(pub->pubh, sd);
} else {
RMW_SET_ERROR_MSG("cannot serialize data");
return RMW_RET_ERROR;
@ -540,11 +552,12 @@ static CddsPublisher *create_cdds_publisher(const rmw_node_t *node, const rosidl
RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer;
}
dds_qos_delete(qos);
pub->sertopic = get_sertopic(topic, fqtopic_name);
if (dds_get_instance_handle(pub->pubh, &pub->pubiid) < 0) {
RMW_SET_ERROR_MSG("failed to get instance handle for writer");
goto fail_instance_handle;
}
dds_qos_delete(qos);
node_impl->own_writers.insert(pub->pubiid);
/* FIXME: leak the topic for now */
return pub;
@ -753,8 +766,8 @@ static rmw_ret_t rmw_take_int(const rmw_subscription_t *subscription, void *ros_
size_t sz;
void *raw;
ddsi_serdata_getblob(&raw, &sz, sd);
eprosima::fastcdr::FastBuffer buffer(static_cast<char *>(raw), sz);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
/* FIXME: endianness (i.e., the "+ 4") */
cycdeser deser(static_cast<void *>(static_cast<char *>(raw) + 4), sz);
desermsg(deser, ros_message, nullptr, sub->ts);
ddsi_serdata_unref(sd);
if (message_info) {
@ -1026,10 +1039,10 @@ static rmw_ret_t rmw_take_response_request(CddsCS *cs, rmw_request_id_t *request
size_t sz;
void *raw;
ddsi_serdata_getblob(&raw, &sz, sd);
eprosima::fastcdr::FastBuffer buffer(static_cast<char *>(raw), sz);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
/* FIXME: endianness (i.e., the "+ 4") */
cycdeser deser(static_cast<void *>(static_cast<char *>(raw) + 4), sz);
cdds_request_header_t header;
desermsg(deser, ros_data, [&header](eprosima::fastcdr::Cdr& ser) { ser >> header.guid; ser >> header.seq; }, cs->sub->ts);
desermsg(deser, ros_data, [&header](cycdeser& ser) { ser >> header.guid; ser >> header.seq; }, cs->sub->ts);
ddsi_serdata_unref(sd);
memset(request_header, 0, sizeof(*request_header));
assert(sizeof(header.guid) < sizeof(request_header->writer_guid));
@ -1063,10 +1076,9 @@ extern "C" rmw_ret_t rmw_take_request(const rmw_service_t *service, rmw_request_
static rmw_ret_t rmw_send_response_request(CddsCS *cs, cdds_request_header_t *header, const void *ros_data)
{
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
if (sermsg(ros_data, ser, [&header](eprosima::fastcdr::Cdr& ser) { ser << header->guid; ser << header->seq; }, cs->pub->ts)) {
return rmw_write_ser(cs->pub->pubh, ser);
cycser sd(cs->pub->sertopic);
if (sermsg(ros_data, sd, [&header](cycser& ser) { ser << header->guid; ser << header->seq; }, cs->pub->ts)) {
return rmw_write_ser(cs->pub->pubh, sd);
} else {
RMW_SET_ERROR_MSG("cannot serialize data");
return RMW_RET_ERROR;
@ -1162,6 +1174,7 @@ static rmw_ret_t rmw_init_cs(CddsCS *cs, const rmw_node_t *node, const rosidl_se
RMW_SET_ERROR_MSG("failed to create writer");
goto fail_writer;
}
pub->sertopic = get_sertopic(pubtopic, pubtopic_name);
sub->node = node_impl;
if ((sub->subh = dds_create_reader(gcdds.ppant, subtopic, qos, nullptr)) < 0) {
RMW_SET_ERROR_MSG("failed to create reader");