various interconnected changes for ddsi_serdata

- topic-erased key-only serdata for use in tkmap
- restoration of including key values in invalid samples
- special handling of keyless topics
- keyhash generation via streams
- elimination of dynamically allocated buffers in keyhash
- removal of the last vestiges of "serstate"

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-10-28 13:28:23 +08:00
parent 8e20ae547e
commit 9cab5e769c
19 changed files with 375 additions and 383 deletions

View file

@ -90,7 +90,9 @@ DDS_EXPORT void dds_stream_write_uint64 (dds_stream_t * os, uint64_t val);
DDS_EXPORT void dds_stream_write_float (dds_stream_t * os, float val); DDS_EXPORT void dds_stream_write_float (dds_stream_t * os, float val);
DDS_EXPORT void dds_stream_write_double (dds_stream_t * os, double val); DDS_EXPORT void dds_stream_write_double (dds_stream_t * os, double val);
DDS_EXPORT void dds_stream_write_string (dds_stream_t * os, const char * val); DDS_EXPORT void dds_stream_write_string (dds_stream_t * os, const char * val);
DDS_EXPORT void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, uint8_t * buffer); DDS_EXPORT void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, const uint8_t * buffer);
DDS_EXPORT void *dds_stream_address (dds_stream_t * s);
DDS_EXPORT void *dds_stream_alignto (dds_stream_t * s, uint32_t a);
#define dds_stream_write_char(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v))) #define dds_stream_write_char(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v)))
#define dds_stream_write_int8(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v))) #define dds_stream_write_int8(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v)))

View file

@ -37,6 +37,7 @@ void dds_stream_from_serdata_default (dds_stream_t * s, const struct ddsi_serdat
void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_default **d); void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_default **d);
void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct ddsi_sertopic_default * topic); void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct ddsi_sertopic_default * topic);
void dds_stream_read_sample_write_key (dds_stream_t *os, dds_stream_t *is, const struct ddsi_sertopic_default *topic);
void dds_stream_read_key void dds_stream_read_key
( (
dds_stream_t * is, dds_stream_t * is,

View file

@ -36,7 +36,7 @@ struct tkmap * dds_tkmap_new (void);
void dds_tkmap_free (_Inout_ _Post_invalid_ struct tkmap *tkmap); void dds_tkmap_free (_Inout_ _Post_invalid_ struct tkmap *tkmap);
void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk); void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk);
uint64_t dds_tkmap_lookup (_In_ struct tkmap *tkmap, _In_ const struct ddsi_serdata *serdata); uint64_t dds_tkmap_lookup (_In_ struct tkmap *tkmap, _In_ const struct ddsi_serdata *serdata);
_Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample); _Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, const struct ddsi_sertopic *topic, _In_ uint64_t iid, _Out_ void * sample);
_Check_return_ struct tkmap_instance * dds_tkmap_find( _Check_return_ struct tkmap_instance * dds_tkmap_find(
_In_ struct ddsi_serdata * sd, _In_ struct ddsi_serdata * sd,
_In_ const bool rd, _In_ const bool rd,

View file

@ -285,7 +285,7 @@ dds_unregister_instance_ih_ts(
map = gv.m_tkmap; map = gv.m_tkmap;
topic = dds_instance_info((dds_entity*)wr); topic = dds_instance_info((dds_entity*)wr);
sample = dds_alloc (topic->m_descriptor->m_size); sample = dds_alloc (topic->m_descriptor->m_size);
if (dds_tkmap_get_key (map, handle, sample)) { if (dds_tkmap_get_key (map, topic->m_stopic, handle, sample)) {
ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action); ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action);
} else{ } else{
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided handle is found"); ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided handle is found");
@ -383,7 +383,7 @@ dds_dispose_ih_ts(
struct tkmap *map = gv.m_tkmap; struct tkmap *map = gv.m_tkmap;
const dds_topic *topic = dds_instance_info((dds_entity*)wr); const dds_topic *topic = dds_instance_info((dds_entity*)wr);
void *sample = dds_alloc (topic->m_descriptor->m_size); void *sample = dds_alloc (topic->m_descriptor->m_size);
if (dds_tkmap_get_key (map, handle, sample)) { if (dds_tkmap_get_key (map, topic->m_stopic, handle, sample)) {
ret = dds_dispose_impl(wr, sample, handle, timestamp); ret = dds_dispose_impl(wr, sample, handle, timestamp);
} else { } else {
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided handle is found"); ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided handle is found");
@ -455,7 +455,7 @@ dds_instance_get_key(
} }
memset (data, 0, topic->m_descriptor->m_size); memset (data, 0, topic->m_descriptor->m_size);
if (dds_tkmap_get_key (map, inst, data)) { if (dds_tkmap_get_key (map, topic->m_stopic, inst, data)) {
ret = DDS_RETCODE_OK; ret = DDS_RETCODE_OK;
} else{ } else{
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided entity is found"); ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided entity is found");

View file

@ -20,94 +20,21 @@
#ifndef NDEBUG #ifndef NDEBUG
static bool keyhash_is_reset(const dds_key_hash_t *kh) static bool keyhash_is_reset(const dds_key_hash_t *kh)
{ {
static const char nullhash[sizeof(kh->m_hash)] = { 0 }; return !kh->m_set;
return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0;
} }
#endif #endif
void dds_key_md5 (dds_key_hash_t * kh)
{
md5_state_t md5st;
md5_init (&md5st);
md5_append (&md5st, (md5_byte_t*) kh->m_key_buff, kh->m_key_len);
md5_finish (&md5st, (unsigned char *) kh->m_hash);
}
/* /*
dds_key_gen: Generates key and keyhash for a sample. dds_key_gen: Generates key and keyhash for a sample.
See section 9.6.3.3 of DDSI spec. See section 9.6.3.3 of DDSI spec.
*/ */
void dds_key_gen static void dds_key_gen_stream (const dds_topic_descriptor_t * const desc, dds_stream_t *os, const char *sample)
(
const dds_topic_descriptor_t * const desc,
dds_key_hash_t * kh,
const char * sample
)
{ {
const char * src; const char * src;
const uint32_t * op; const uint32_t * op;
uint32_t i; uint32_t i;
uint32_t len = 0; uint32_t len = 0;
char * dst;
assert(keyhash_is_reset(kh));
if (desc->m_nkeys == 0)
{
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH;
kh->m_key_len = sizeof (kh->m_hash);
return;
}
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET;
/* Select key buffer to use */
if (desc->m_flagset & DDS_TOPIC_FIXED_KEY)
{
kh->m_flags |= DDS_KEY_IS_HASH;
kh->m_key_len = sizeof (kh->m_hash);
dst = kh->m_hash;
}
else
{
/* Calculate key length */
for (i = 0; i < desc->m_nkeys; i++)
{
op = desc->m_ops + desc->m_keys[i].m_index;
src = sample + op[1];
switch (DDS_OP_TYPE (*op))
{
case DDS_OP_VAL_1BY: len += 1; break;
case DDS_OP_VAL_2BY: len += 2; break;
case DDS_OP_VAL_4BY: len += 4; break;
case DDS_OP_VAL_8BY: len += 8; break;
case DDS_OP_VAL_STR:
src = *((char**) src);
/* FALLS THROUGH */
case DDS_OP_VAL_BST:
len += (uint32_t) (5 + strlen (src));
break;
case DDS_OP_VAL_ARR:
len += op[2] * dds_op_size[DDS_OP_SUBTYPE (*op)];
break;
default: assert (0);
}
}
kh->m_key_len = len;
if (len > kh->m_key_buff_size)
{
kh->m_key_buff = dds_realloc_zero (kh->m_key_buff, len);
kh->m_key_buff_size = len;
}
dst = kh->m_key_buff;
}
/* Write keys to buffer (Big Endian CDR encoded with no padding) */
for (i = 0; i < desc->m_nkeys; i++) for (i = 0; i < desc->m_nkeys; i++)
{ {
@ -119,29 +46,22 @@ void dds_key_gen
{ {
case DDS_OP_VAL_1BY: case DDS_OP_VAL_1BY:
{ {
*dst = *src; dds_stream_write_uint8 (os, *((const uint8_t *) src));
dst++;
break; break;
} }
case DDS_OP_VAL_2BY: case DDS_OP_VAL_2BY:
{ {
uint16_t u16 = toBE2u (*((const uint16_t*) src)); dds_stream_write_uint16 (os, *((const uint16_t *) src));
memcpy (dst, &u16, sizeof (u16));
dst += sizeof (u16);
break; break;
} }
case DDS_OP_VAL_4BY: case DDS_OP_VAL_4BY:
{ {
uint32_t u32 = toBE4u (*((const uint32_t*) src)); dds_stream_write_uint32 (os, *((const uint32_t *) src));
memcpy (dst, &u32, sizeof (u32));
dst += sizeof (u32);
break; break;
} }
case DDS_OP_VAL_8BY: case DDS_OP_VAL_8BY:
{ {
uint64_t u64 = toBE8u (*((const uint64_t*) src)); dds_stream_write_uint64 (os, *((const uint64_t *) src));
memcpy (dst, &u64, sizeof (u64));
dst += sizeof (u64);
break; break;
} }
case DDS_OP_VAL_STR: case DDS_OP_VAL_STR:
@ -151,35 +71,55 @@ void dds_key_gen
/* FALLS THROUGH */ /* FALLS THROUGH */
case DDS_OP_VAL_BST: case DDS_OP_VAL_BST:
{ {
uint32_t u32;
len = (uint32_t) (strlen (src) + 1); len = (uint32_t) (strlen (src) + 1);
u32 = toBE4u (len); dds_stream_write_uint32 (os, len);
memcpy (dst, &u32, sizeof (u32)); dds_stream_write_buffer (os, len, (const uint8_t *) src);
dst += sizeof (u32);
memcpy (dst, src, len);
dst += len;
break; break;
} }
case DDS_OP_VAL_ARR: case DDS_OP_VAL_ARR:
{ {
uint32_t size = dds_op_size[DDS_OP_SUBTYPE (*op)]; uint32_t size = dds_op_size[DDS_OP_SUBTYPE (*op)];
char *dst;
len = size * op[2]; len = size * op[2];
memcpy (dst, src, len); dst = dds_stream_alignto (os, op[2]);
dds_stream_write_buffer (os, len, (const uint8_t *) src);
if (dds_stream_endian () && (size != 1u)) if (dds_stream_endian () && (size != 1u))
{
dds_stream_swap (dst, size, op[2]); dds_stream_swap (dst, size, op[2]);
}
dst += len;
break; break;
} }
default: assert (0); default: assert (0);
} }
} }
}
/* Hash is md5 of key */ void dds_key_gen (const dds_topic_descriptor_t * const desc, dds_key_hash_t * kh, const char * sample)
{
assert(keyhash_is_reset(kh));
if ((kh->m_flags & DDS_KEY_IS_HASH) == 0) kh->m_set = 1;
if (desc->m_nkeys == 0)
kh->m_iskey = 1;
else if (desc->m_flagset & DDS_TOPIC_FIXED_KEY)
{ {
dds_key_md5 (kh); dds_stream_t os;
kh->m_iskey = 1;
dds_stream_init(&os, 0);
os.m_endian = 0;
os.m_buffer.pv = kh->m_hash;
os.m_size = 16;
dds_key_gen_stream (desc, &os, sample);
}
else
{
dds_stream_t os;
md5_state_t md5st;
kh->m_iskey = 0;
dds_stream_init(&os, 64);
os.m_endian = 0;
dds_key_gen_stream (desc, &os, sample);
md5_init (&md5st);
md5_append (&md5st, os.m_buffer.p8, os.m_index);
md5_finish (&md5st, (unsigned char *) kh->m_hash);
dds_stream_fini (&os);
} }
} }

View file

@ -149,11 +149,11 @@
static const status_cb_data_t dds_rhc_data_avail_cb_data = { DDS_DATA_AVAILABLE_STATUS, 0, 0, true }; static const status_cb_data_t dds_rhc_data_avail_cb_data = { DDS_DATA_AVAILABLE_STATUS, 0, 0, true };
/* FIXME: populate tkmap with key-only derived serdata, with timestamp /* FIXME: tkmap should perhaps retain data with timestamp set to invalid
set to invalid. An invalid timestamp is (logically) unordered with An invalid timestamp is (logically) unordered with respect to valid
respect to valid timestamps, and that would mean BY_SOURCE order timestamps, and that would mean BY_SOURCE order could be respected
would be respected even when generating an invalid sample for an even when generating an invalid sample for an unregister message using
unregister message using the tkmap data. */ the tkmap data. */
/****************************** /******************************
****** LIVE WRITERS ****** ****** LIVE WRITERS ******
@ -1632,12 +1632,7 @@ static int dds_rhc_read_w_qminv
{ {
bool trigger_waitsets = false; bool trigger_waitsets = false;
uint32_t n = 0; uint32_t n = 0;
#if 0 const struct dds_topic_descriptor * desc = rhc->topic->status_cb_entity->m_descriptor;
const struct dds_topic_descriptor * desc = (const struct dds_topic_descriptor *) rhc->topic->type;
#else /* FIXME: hack hack -- deserialize_into */
const struct ddsi_sertopic_default *sertopic_def = (const struct ddsi_sertopic_default *)rhc->topic;
const struct dds_topic_descriptor * desc = sertopic_def->type;
#endif
if (lock) if (lock)
{ {
@ -1707,7 +1702,7 @@ static int dds_rhc_read_w_qminv
if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0) if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0)
{ {
set_sample_info_invsample (info_seq + n, inst); set_sample_info_invsample (info_seq + n, inst);
ddsi_serdata_to_sample (inst->tk->m_sample, values[n], 0, 0); ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0);
if (!inst->inv_isread) if (!inst->inv_isread)
{ {
inst->inv_isread = 1; inst->inv_isread = 1;
@ -1765,12 +1760,7 @@ static int dds_rhc_take_w_qminv
bool trigger_waitsets = false; bool trigger_waitsets = false;
uint64_t iid; uint64_t iid;
uint32_t n = 0; uint32_t n = 0;
#if 0 const struct dds_topic_descriptor * desc = rhc->topic->status_cb_entity->m_descriptor;
const struct dds_topic_descriptor * desc = (const struct dds_topic_descriptor *) rhc->topic->type;
#else /* FIXME: hack hack -- deserialize_into */
const struct ddsi_sertopic_default *sertopic_def = (const struct ddsi_sertopic_default *)rhc->topic;
const struct dds_topic_descriptor * desc = sertopic_def->type;
#endif
if (lock) if (lock)
{ {
@ -1859,7 +1849,7 @@ static int dds_rhc_take_w_qminv
if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0) if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0)
{ {
set_sample_info_invsample (info_seq + n, inst); set_sample_info_invsample (info_seq + n, inst);
ddsi_serdata_to_sample (inst->tk->m_sample, values[n], 0, 0); ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0);
inst_clear_invsample (rhc, inst); inst_clear_invsample (rhc, inst);
++n; ++n;
} }

View file

@ -17,12 +17,12 @@
#include "dds__key.h" #include "dds__key.h"
#include "dds__alloc.h" #include "dds__alloc.h"
#include "os/os.h" #include "os/os.h"
#include "ddsi/q_md5.h"
//#define OP_DEBUG_READ 1
//#define OP_DEBUG_WRITE 1
//#define OP_DEBUG_KEY 1
/*
#define OP_DEBUG_READ 1
#define OP_DEBUG_WRITE 1
#define OP_DEBUG_KEY 1
*/
#if defined OP_DEBUG_WRITE || defined OP_DEBUG_READ || defined OP_DEBUG_KEY #if defined OP_DEBUG_WRITE || defined OP_DEBUG_READ || defined OP_DEBUG_KEY
static const char * stream_op_type[11] = static const char * stream_op_type[11] =
@ -452,11 +452,22 @@ void dds_stream_write_string (dds_stream_t * os, const char * val)
} }
} }
void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, uint8_t * buffer) void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, const uint8_t * buffer)
{ {
DDS_OS_PUT_BYTES (os, buffer, len); DDS_OS_PUT_BYTES (os, buffer, len);
} }
void *dds_stream_address (dds_stream_t * s)
{
return DDS_CDR_ADDRESS(s, void);
}
void *dds_stream_alignto (dds_stream_t * s, uint32_t a)
{
DDS_CDR_ALIGNTO (s, a);
return DDS_CDR_ADDRESS (s, void);
}
static void dds_stream_write static void dds_stream_write
( (
dds_stream_t * os, dds_stream_t * os,
@ -1178,7 +1189,8 @@ void dds_stream_from_serdata_default (_Out_ dds_stream_t * s, _In_ const struct
s->m_buffer.p8 = (uint8_t*) d; s->m_buffer.p8 = (uint8_t*) d;
s->m_index = (uint32_t) offsetof (struct ddsi_serdata_default, data); s->m_index = (uint32_t) offsetof (struct ddsi_serdata_default, data);
s->m_size = d->size + s->m_index; s->m_size = d->size + s->m_index;
s->m_endian = (d->bswap) ? (! DDS_ENDIAN) : DDS_ENDIAN; assert (d->hdr.identifier == CDR_LE || d->hdr.identifier == CDR_BE);
s->m_endian = (d->hdr.identifier == CDR_LE);
} }
void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_default **d) void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_default **d)
@ -1191,7 +1203,7 @@ void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_de
(*d) = s->m_buffer.pv; (*d) = s->m_buffer.pv;
(*d)->pos = (s->m_index - (uint32_t)offsetof (struct ddsi_serdata_default, data)); (*d)->pos = (s->m_index - (uint32_t)offsetof (struct ddsi_serdata_default, data));
(*d)->size = (s->m_size - (uint32_t)offsetof(struct ddsi_serdata_default, data)); (*d)->size = (s->m_size - (uint32_t)offsetof (struct ddsi_serdata_default, data));
} }
void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct ddsi_sertopic_default * topic) void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct ddsi_sertopic_default * topic)
@ -1250,7 +1262,7 @@ void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct
static uint32_t dds_stream_get_keyhash static uint32_t dds_stream_get_keyhash
( (
dds_stream_t * is, dds_stream_t * is,
char * dst, dds_stream_t * os,
const uint32_t * ops, const uint32_t * ops,
const bool just_key const bool just_key
) )
@ -1261,9 +1273,9 @@ static uint32_t dds_stream_get_keyhash
uint32_t subtype; uint32_t subtype;
uint32_t num; uint32_t num;
uint32_t len; uint32_t len;
const uint32_t origin = os->m_index;
bool is_key; bool is_key;
bool have_data; bool have_data;
const char * origin = dst;
while ((op = *ops) != DDS_OP_RTS) while ((op = *ops) != DDS_OP_RTS)
{ {
@ -1272,7 +1284,7 @@ static uint32_t dds_stream_get_keyhash
case DDS_OP_ADR: case DDS_OP_ADR:
{ {
type = DDS_OP_TYPE (op); type = DDS_OP_TYPE (op);
is_key = (op & DDS_OP_FLAG_KEY) && (dst != NULL); is_key = (op & DDS_OP_FLAG_KEY) && (os != NULL);
have_data = is_key || !just_key; have_data = is_key || !just_key;
ops += 2; ops += 2;
if (type <= DDS_OP_VAL_8BY) if (type <= DDS_OP_VAL_8BY)
@ -1305,43 +1317,29 @@ static uint32_t dds_stream_get_keyhash
{ {
case DDS_OP_VAL_1BY: case DDS_OP_VAL_1BY:
{ {
*dst++ = (char) DDS_IS_GET1 (is); uint8_t v = DDS_IS_GET1 (is);
DDS_OS_PUT1 (os, v);
break; break;
} }
case DDS_OP_VAL_2BY: case DDS_OP_VAL_2BY:
{ {
uint16_t u16 = *DDS_CDR_ADDRESS (is, uint16_t); uint16_t v;
if (is->m_endian) DDS_IS_GET2 (is, v);
{ DDS_OS_PUT2 (os, v);
u16 = DDS_SWAP16 (u16);
}
memcpy (dst, &u16, sizeof (u16));
is->m_index += 2;
dst += 2;
break; break;
} }
case DDS_OP_VAL_4BY: case DDS_OP_VAL_4BY:
{ {
uint32_t u32 = *DDS_CDR_ADDRESS (is, uint32_t); uint32_t v;
if (is->m_endian) DDS_IS_GET4 (is, v, uint32_t);
{ DDS_OS_PUT4 (os, v, uint32_t);
u32 = DDS_SWAP32 (u32);
}
memcpy (dst, &u32, sizeof (u32));
is->m_index += 4;
dst += 4;
break; break;
} }
case DDS_OP_VAL_8BY: case DDS_OP_VAL_8BY:
{ {
uint64_t u64 = *DDS_CDR_ADDRESS (is, uint64_t); uint64_t v;
if (is->m_endian) DDS_IS_GET8 (is, v, uint64_t);
{ DDS_OS_PUT8 (os, v, uint64_t);
u64 = DDS_SWAP64 (u64);
}
memcpy (dst, &u64, sizeof (u64));
is->m_index += 8;
dst += 8;
break; break;
} }
case DDS_OP_VAL_STR: case DDS_OP_VAL_STR:
@ -1352,11 +1350,8 @@ static uint32_t dds_stream_get_keyhash
len = dds_stream_read_uint32 (is); len = dds_stream_read_uint32 (is);
if (is_key) if (is_key)
{ {
uint32_t be32 = toBE4u (len); DDS_OS_PUT4 (os, len, uint32_t);
memcpy (dst, &be32, 4); DDS_OS_PUT_BYTES(os, DDS_CDR_ADDRESS (is, void), len);
dst += 4;
memcpy (dst, DDS_CDR_ADDRESS (is, void), len);
dst += len;
#ifdef OP_DEBUG_KEY #ifdef OP_DEBUG_KEY
TRACE (("K-ADR: String/BString (%d)\n", len)); TRACE (("K-ADR: String/BString (%d)\n", len));
#endif #endif
@ -1443,8 +1438,11 @@ static uint32_t dds_stream_get_keyhash
align = dds_op_size[subtype]; align = dds_op_size[subtype];
if (is_key) if (is_key)
{ {
char *dst;
DDS_CDR_ALIGNTO (os, align);
dst = DDS_CDR_ADDRESS(os, char);
dds_stream_read_fixed_buffer (is, dst, num, align, is->m_endian); dds_stream_read_fixed_buffer (is, dst, num, align, is->m_endian);
dst += num * align; os->m_index += num * align;
} }
is->m_index += num * align; is->m_index += num * align;
} }
@ -1563,21 +1561,40 @@ static uint32_t dds_stream_get_keyhash
} }
case DDS_OP_JSR: /* Implies nested type */ case DDS_OP_JSR: /* Implies nested type */
{ {
dst += dds_stream_get_keyhash (is, dst, ops + DDS_OP_JUMP (op), just_key); dds_stream_get_keyhash (is, os, ops + DDS_OP_JUMP (op), just_key);
ops++; ops++;
break; break;
} }
default: assert (0); default: assert (0);
} }
} }
return (uint32_t) (dst - origin); return os->m_index - origin;
}
void dds_stream_read_sample_write_key (dds_stream_t *os, dds_stream_t *is, const struct ddsi_sertopic_default *topic)
{
const struct dds_topic_descriptor *desc = (const struct dds_topic_descriptor *) topic->type;
uint32_t nbytes;
os->m_endian = 0;
if (os->m_size < is->m_size)
{
os->m_buffer.p8 = dds_realloc (os->m_buffer.p8, is->m_size);
os->m_size = is->m_size;
}
nbytes = dds_stream_get_keyhash (is, os, desc->m_ops, false);
os->m_index += nbytes;
if (os->m_index < os->m_size)
{
os->m_buffer.p8 = dds_realloc (os->m_buffer.p8, os->m_index);
os->m_size = os->m_index;
}
} }
#ifndef NDEBUG #ifndef NDEBUG
static bool keyhash_is_reset(const dds_key_hash_t *kh) static bool keyhash_is_reset(const dds_key_hash_t *kh)
{ {
static const char nullhash[sizeof(kh->m_hash)] = { 0 }; static const char nullhash[sizeof(kh->m_hash)] = { 0 };
return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0; return !kh->m_set && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0;
} }
#endif #endif
@ -1589,44 +1606,35 @@ void dds_stream_read_keyhash
const bool just_key const bool just_key
) )
{ {
char * dst;
assert (keyhash_is_reset(kh)); assert (keyhash_is_reset(kh));
kh->m_set = 1;
if (desc->m_nkeys == 0) if (desc->m_nkeys == 0)
kh->m_iskey = 1;
else if (desc->m_flagset & DDS_TOPIC_FIXED_KEY)
{ {
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; dds_stream_t os;
return; uint32_t ncheck;
} kh->m_iskey = 1;
dds_stream_init(&os, 0);
/* Select key buffer to use */ os.m_buffer.pv = kh->m_hash;
os.m_size = 16;
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; os.m_endian = 0;
if (desc->m_flagset & DDS_TOPIC_FIXED_KEY) ncheck = dds_stream_get_keyhash (is, &os, desc->m_ops, just_key);
{ assert(ncheck <= 16);
kh->m_flags |= DDS_KEY_IS_HASH; (void)ncheck;
dst = kh->m_hash;
} }
else else
{ {
if (is->m_size > kh->m_key_buff_size) dds_stream_t os;
{ md5_state_t md5st;
kh->m_key_buff = dds_realloc (kh->m_key_buff, is->m_size); kh->m_iskey = 0;
kh->m_key_buff_size = (uint32_t) is->m_size; dds_stream_init (&os, 0);
} os.m_endian = 0;
dst = kh->m_key_buff; dds_stream_get_keyhash (is, &os, desc->m_ops, just_key);
} md5_init (&md5st);
kh->m_key_len = dds_stream_get_keyhash (is, dst, desc->m_ops, just_key); md5_append (&md5st, os.m_buffer.p8, os.m_index);
md5_finish (&md5st, (unsigned char *) kh->m_hash);
if (kh->m_flags & DDS_KEY_IS_HASH) dds_stream_fini (&os);
{
assert (kh->m_key_len <= 16);
kh->m_key_len = 16;
}
else
{
/* Hash is md5 of key */
dds_key_md5 (kh);
} }
} }

View file

@ -118,6 +118,7 @@ uint64_t dds_tkmap_lookup (_In_ struct tkmap * map, _In_ const struct ddsi_serda
typedef struct typedef struct
{ {
const struct ddsi_sertopic *topic;
uint64_t m_iid; uint64_t m_iid;
void * m_sample; void * m_sample;
bool m_ret; bool m_ret;
@ -130,15 +131,15 @@ static void dds_tkmap_get_key_fn (void * vtk, void * varg)
tkmap_get_key_arg * arg = (tkmap_get_key_arg*) varg; tkmap_get_key_arg * arg = (tkmap_get_key_arg*) varg;
if (tk->m_iid == arg->m_iid) if (tk->m_iid == arg->m_iid)
{ {
ddsi_serdata_to_sample (tk->m_sample, arg->m_sample, 0, 0); ddsi_serdata_topicless_to_sample (arg->topic, tk->m_sample, arg->m_sample, 0, 0);
arg->m_ret = true; arg->m_ret = true;
} }
} }
_Check_return_ _Check_return_
bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample) bool dds_tkmap_get_key (_In_ struct tkmap * map, const struct ddsi_sertopic *topic, _In_ uint64_t iid, _Out_ void * sample)
{ {
tkmap_get_key_arg arg = { iid, sample, false }; tkmap_get_key_arg arg = { topic, iid, sample, false };
os_mutexLock (&map->m_lock); os_mutexLock (&map->m_lock);
ut_chhEnumUnsafe (map->m_hh, dds_tkmap_get_key_fn, &arg); ut_chhEnumUnsafe (map->m_hh, dds_tkmap_get_key_fn, &arg);
os_mutexUnlock (&map->m_lock); os_mutexUnlock (&map->m_lock);
@ -192,12 +193,7 @@ struct tkmap_instance * dds_tkmap_find(
struct tkmap_instance * tk; struct tkmap_instance * tk;
struct tkmap * map = gv.m_tkmap; struct tkmap * map = gv.m_tkmap;
/* FIXME: check this */
#if 0
assert(sd->v.keyhash.m_flags & DDS_KEY_HASH_SET);
#endif
dummy.m_sample = sd; dummy.m_sample = sd;
retry: retry:
if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL) if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL)
{ {
@ -223,7 +219,7 @@ retry:
if ((tk = dds_alloc (sizeof (*tk))) == NULL) if ((tk = dds_alloc (sizeof (*tk))) == NULL)
return NULL; return NULL;
tk->m_sample = ddsi_serdata_ref (sd); tk->m_sample = ddsi_serdata_to_topicless (sd);
tk->m_map = map; tk->m_map = map;
os_atomic_st32 (&tk->m_refc, 1); os_atomic_st32 (&tk->m_refc, 1);
tk->m_iid = dds_iid_gen (); tk->m_iid = dds_iid_gen ();
@ -238,7 +234,7 @@ retry:
if (tk && rd) if (tk && rd)
{ {
TRACE (("tk=%p iid=%"PRIx64"", &tk, tk->m_iid)); TRACE (("tk=%p iid=%"PRIx64" ", &tk, tk->m_iid));
} }
return tk; return tk;
} }
@ -247,13 +243,6 @@ _Check_return_
struct tkmap_instance * dds_tkmap_lookup_instance_ref (_In_ struct ddsi_serdata * sd) struct tkmap_instance * dds_tkmap_lookup_instance_ref (_In_ struct ddsi_serdata * sd)
{ {
assert (vtime_awake_p (lookup_thread_state ()->vtime)); assert (vtime_awake_p (lookup_thread_state ()->vtime));
#if 0
/* Topic might have been deleted -- FIXME: no way the topic may be deleted when there're still users out there */
if (sd->v.st->topic == NULL)
{
return NULL;
}
#endif
return dds_tkmap_find (sd, true, true); return dds_tkmap_find (sd, true, true);
} }

View file

@ -424,7 +424,8 @@ dds_create_topic(
st->c.typename = dds_alloc (strlen (typename) + 1); st->c.typename = dds_alloc (strlen (typename) + 1);
strcpy (st->c.typename, typename); strcpy (st->c.typename, typename);
st->c.ops = &ddsi_sertopic_ops_default; st->c.ops = &ddsi_sertopic_ops_default;
st->c.serdata_ops = &ddsi_serdata_ops_cdr; st->c.serdata_ops = desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey;
st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops);
st->native_encoding_identifier = (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE); st->native_encoding_identifier = (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE);
st->type = (void*) desc; st->type = (void*) desc;

View file

@ -55,6 +55,9 @@ typedef struct ddsi_serdata * (*ddsi_serdata_from_keyhash_t) (const struct ddsi_
/* Construct a serdata from an application sample */ /* Construct a serdata from an application sample */
typedef struct ddsi_serdata * (*ddsi_serdata_from_sample_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample); typedef struct ddsi_serdata * (*ddsi_serdata_from_sample_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample);
/* Construct a topic-less serdata with a keyvalue given a normal serdata (either key or data) - used for tkmap */
typedef struct ddsi_serdata * (*ddsi_serdata_to_topicless_t) (const struct ddsi_serdata *d);
/* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <= /* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <=
alignup4(size(d)) */ alignup4(size(d)) */
typedef void (*ddsi_serdata_to_ser_t) (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf); typedef void (*ddsi_serdata_to_ser_t) (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf);
@ -79,16 +82,14 @@ typedef void (*ddsi_serdata_to_ser_unref_t) (struct ddsi_serdata *d, const ddsi_
otherwise malloc() is to be used for those. (This allows read/take to be given a block of memory otherwise malloc() is to be used for those. (This allows read/take to be given a block of memory
by the caller.) */ by the caller.) */
typedef bool (*ddsi_serdata_to_sample_t) (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim); typedef bool (*ddsi_serdata_to_sample_t) (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
typedef bool (*ddsi_serdata_topicless_to_sample_t) (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
/* Compare key values of two serdatas (with the same ddsi_serdata_ops, but not necessarily of the
same topic) (FIXME: not sure I need this one) */
typedef int (*ddsi_serdata_cmpkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b);
/* Test key values of two serdatas for equality (with the same ddsi_serdata_ops, but not necessarily /* Test key values of two serdatas for equality (with the same ddsi_serdata_ops, but not necessarily
of the same topic) */ of the same topic) */
typedef bool (*ddsi_serdata_eqkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b); typedef bool (*ddsi_serdata_eqkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b);
struct ddsi_serdata_ops { struct ddsi_serdata_ops {
ddsi_serdata_eqkey_t eqkey;
ddsi_serdata_size_t get_size; ddsi_serdata_size_t get_size;
ddsi_serdata_from_ser_t from_ser; ddsi_serdata_from_ser_t from_ser;
ddsi_serdata_from_keyhash_t from_keyhash; ddsi_serdata_from_keyhash_t from_keyhash;
@ -97,8 +98,8 @@ struct ddsi_serdata_ops {
ddsi_serdata_to_ser_ref_t to_ser_ref; ddsi_serdata_to_ser_ref_t to_ser_ref;
ddsi_serdata_to_ser_unref_t to_ser_unref; ddsi_serdata_to_ser_unref_t to_ser_unref;
ddsi_serdata_to_sample_t to_sample; ddsi_serdata_to_sample_t to_sample;
ddsi_serdata_cmpkey_t cmpkey; ddsi_serdata_to_topicless_t to_topicless;
ddsi_serdata_eqkey_t eqkey; ddsi_serdata_topicless_to_sample_t topicless_to_sample;
ddsi_serdata_free_t free; ddsi_serdata_free_t free;
}; };
@ -131,6 +132,10 @@ inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic
return topic->serdata_ops->from_sample (topic, kind, sample); return topic->serdata_ops->from_sample (topic, kind, sample);
} }
inline struct ddsi_serdata *ddsi_serdata_to_topicless (const struct ddsi_serdata *d) {
return d->ops->to_topicless (d);
}
inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf) { inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf) {
d->ops->to_ser (d, off, sz, buf); d->ops->to_ser (d, off, sz, buf);
} }
@ -147,8 +152,8 @@ inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample,
return d->ops->to_sample (d, sample, bufptr, buflim); return d->ops->to_sample (d, sample, bufptr, buflim);
} }
inline int ddsi_serdata_cmpkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b) { inline bool ddsi_serdata_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim) {
return a->ops->cmpkey (a, b); return d->ops->topicless_to_sample (topic, d, sample, bufptr, buflim);
} }
inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b) { inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b) {

View file

@ -47,26 +47,16 @@ struct CDRHeader
unsigned short options; unsigned short options;
}; };
struct serstatepool /* FIXME: now a serdatapool */ struct serdatapool /* FIXME: now a serdatapool */
{ {
struct nn_freelist freelist; struct nn_freelist freelist;
}; };
struct serstate {
struct ddsi_serdata_default *data;
};
#define DDS_KEY_SET 0x0001
#define DDS_KEY_HASH_SET 0x0002
#define DDS_KEY_IS_HASH 0x0004
typedef struct dds_key_hash typedef struct dds_key_hash
{ {
char m_hash [16]; /* Key hash value. Also possibly key. Suitably aligned for accessing as uint32_t's */ char m_hash [16]; /* Key hash value. Also possibly key. Suitably aligned for accessing as uint32_t's */
uint32_t m_key_len; /* Length of key (may be in m_hash or m_key_buff) */ unsigned m_set : 1; /* has it been initialised? */
uint32_t m_key_buff_size; /* Size of allocated key buffer (m_key_buff) */ unsigned m_iskey : 1; /* m_hash is key value */
char * m_key_buff; /* Key buffer */
uint32_t m_flags; /* State of key/hash (see DDS_KEY_XXX) */
} }
dds_key_hash_t; dds_key_hash_t;
@ -75,13 +65,12 @@ struct ddsi_serdata_default
struct ddsi_serdata c; struct ddsi_serdata c;
uint32_t pos; uint32_t pos;
uint32_t size; uint32_t size;
bool bswap;
#ifndef NDEBUG #ifndef NDEBUG
bool fixed; bool fixed;
#endif #endif
dds_key_hash_t keyhash; dds_key_hash_t keyhash;
struct serstatepool *pool; struct serdatapool *pool;
struct ddsi_serdata_default *next; /* in pool->freelist */ struct ddsi_serdata_default *next; /* in pool->freelist */
/* padding to ensure CDRHeader is at an offset 4 mod 8 from the /* padding to ensure CDRHeader is at an offset 4 mod 8 from the
@ -144,14 +133,11 @@ struct ddsi_rawcdr_sample {
extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_default; extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_default;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_cdr; extern const struct ddsi_serdata_ops ddsi_serdata_ops_cdr;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_plist; extern const struct ddsi_serdata_ops ddsi_serdata_ops_plist;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr; extern const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr;
struct serstatepool * ddsi_serstatepool_new (void); struct serdatapool * ddsi_serdatapool_new (void);
void ddsi_serstatepool_free (struct serstatepool * pool); void ddsi_serdatapool_free (struct serdatapool * pool);
OSAPI_EXPORT void ddsi_serstate_append_blob (struct serstate * st, size_t align, size_t sz, const void *data);
void * ddsi_serstate_append (struct serstate * st, size_t n);
void * ddsi_serstate_append_aligned (struct serstate * st, size_t n, size_t a);
#endif #endif

View file

@ -26,6 +26,7 @@ struct ddsi_sertopic {
ut_avlNode_t avlnode; /* index on name_typename */ ut_avlNode_t avlnode; /* index on name_typename */
const struct ddsi_sertopic_ops *ops; const struct ddsi_sertopic_ops *ops;
const struct ddsi_serdata_ops *serdata_ops; const struct ddsi_serdata_ops *serdata_ops;
uint32_t serdata_basehash;
char *name_typename; char *name_typename;
char *name; char *name;
char *typename; char *typename;
@ -44,5 +45,6 @@ struct ddsi_sertopic_ops {
struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp); struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp);
void ddsi_sertopic_unref (struct ddsi_sertopic *tp); void ddsi_sertopic_unref (struct ddsi_sertopic *tp);
uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops);
#endif #endif

View file

@ -33,7 +33,7 @@ extern "C" {
#endif #endif
struct nn_xmsgpool; struct nn_xmsgpool;
struct serstatepool; struct serdatapool;
struct nn_dqueue; struct nn_dqueue;
struct nn_reorder; struct nn_reorder;
struct nn_defrag; struct nn_defrag;
@ -275,7 +275,7 @@ struct q_globals {
/* Transmit side: pools for the serializer & transmit messages and a /* Transmit side: pools for the serializer & transmit messages and a
transmit queue*/ transmit queue*/
struct serstatepool *serpool; struct serdatapool *serpool;
struct nn_xmsgpool *xmsgpool; struct nn_xmsgpool *xmsgpool;
struct ddsi_sertopic *plist_topic; /* used for all discovery data */ struct ddsi_sertopic *plist_topic; /* used for all discovery data */
struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */ struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */

View file

@ -40,9 +40,10 @@ extern inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d);
extern inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size); extern inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size);
extern inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash); extern inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash);
extern inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample); extern inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample);
extern inline struct ddsi_serdata *ddsi_serdata_to_topicless (const struct ddsi_serdata *d);
extern inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf); extern inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf);
extern inline struct ddsi_serdata *ddsi_serdata_to_ser_ref (const struct ddsi_serdata *d, size_t off, size_t sz, ddsi_iovec_t *ref); extern inline struct ddsi_serdata *ddsi_serdata_to_ser_ref (const struct ddsi_serdata *d, size_t off, size_t sz, ddsi_iovec_t *ref);
extern inline void ddsi_serdata_to_ser_unref (struct ddsi_serdata *d, const ddsi_iovec_t *ref); extern inline void ddsi_serdata_to_ser_unref (struct ddsi_serdata *d, const ddsi_iovec_t *ref);
extern inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim); extern inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
extern inline int ddsi_serdata_cmpkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b); extern inline bool ddsi_serdata_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
extern inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b); extern inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b);

View file

@ -41,15 +41,15 @@ static int ispowerof2_size (size_t x)
static size_t alignup_size (size_t x, size_t a); static size_t alignup_size (size_t x, size_t a);
struct serstatepool * ddsi_serstatepool_new (void) struct serdatapool * ddsi_serdatapool_new (void)
{ {
struct serstatepool * pool; struct serdatapool * pool;
pool = os_malloc (sizeof (*pool)); pool = os_malloc (sizeof (*pool));
nn_freelist_init (&pool->freelist, MAX_POOL_SIZE, offsetof (struct ddsi_serdata_default, next)); nn_freelist_init (&pool->freelist, MAX_POOL_SIZE, offsetof (struct ddsi_serdata_default, next));
return pool; return pool;
} }
static void serstate_free_wrap (void *elem) static void serdata_free_wrap (void *elem)
{ {
#ifndef NDEBUG #ifndef NDEBUG
struct ddsi_serdata_default *d = elem; struct ddsi_serdata_default *d = elem;
@ -58,19 +58,13 @@ static void serstate_free_wrap (void *elem)
ddsi_serdata_unref(elem); ddsi_serdata_unref(elem);
} }
void ddsi_serstatepool_free (struct serstatepool * pool) void ddsi_serdatapool_free (struct serdatapool * pool)
{ {
TRACE (("ddsi_serstatepool_free(%p)\n", pool)); TRACE (("ddsi_serdatapool_free(%p)\n", pool));
nn_freelist_fini (&pool->freelist, serstate_free_wrap); nn_freelist_fini (&pool->freelist, serdata_free_wrap);
os_free (pool); os_free (pool);
} }
void ddsi_serstate_append_blob (struct serstate * st, size_t align, size_t sz, const void *data)
{
char *p = ddsi_serstate_append_aligned (st, sz, align);
memcpy (p, data, sz);
}
static size_t alignup_size (size_t x, size_t a) static size_t alignup_size (size_t x, size_t a)
{ {
size_t m = a-1; size_t m = a-1;
@ -78,42 +72,43 @@ static size_t alignup_size (size_t x, size_t a)
return (x+m) & ~m; return (x+m) & ~m;
} }
void * ddsi_serstate_append (struct serstate * st, size_t n) static void *serdata_default_append (struct ddsi_serdata_default **d, size_t n)
{ {
char *p; char *p;
if (st->data->pos + n > st->data->size) if ((*d)->pos + n > (*d)->size)
{ {
size_t size1 = alignup_size (st->data->pos + n, 128); size_t size1 = alignup_size ((*d)->pos + n, 128);
struct ddsi_serdata_default * data1 = os_realloc (st->data, offsetof (struct ddsi_serdata_default, data) + size1); *d = os_realloc (*d, offsetof (struct ddsi_serdata_default, data) + size1);
st->data = data1; (*d)->size = (uint32_t)size1;
st->data->size = (uint32_t)size1;
} }
assert (st->data->pos + n <= st->data->size); assert ((*d)->pos + n <= (*d)->size);
p = st->data->data + st->data->pos; p = (*d)->data + (*d)->pos;
st->data->pos += (uint32_t)n; (*d)->pos += (uint32_t)n;
return p; return p;
} }
void * ddsi_serstate_append_aligned (struct serstate * st, size_t n, size_t a) static void *serdata_default_append_aligned (struct ddsi_serdata_default **d, size_t n, size_t a)
{ {
/* Simply align st->pos, without verifying it fits in the allocated
buffer: ddsi_serstate_append() is called immediately afterward and will
grow the buffer as soon as the end of the requested space no
longer fits. */
#if CLEAR_PADDING #if CLEAR_PADDING
size_t pos0 = st->pos; size_t pos0 = st->pos;
#endif #endif
char *p; char *p;
assert (ispowerof2_size (a)); assert (ispowerof2_size (a));
st->data->pos = (uint32_t) alignup_size (st->data->pos, a); (*d)->pos = (uint32_t) alignup_size ((*d)->pos, a);
p = ddsi_serstate_append (st, n); p = serdata_default_append (d, n);
#if CLEAR_PADDING #if CLEAR_PADDING
if (p && st->pos > pos0) if (p && (*d)->pos > pos0)
memset (st->data->data + pos0, 0, st->pos - pos0); memset ((*d)->data + pos0, 0, (*d)->pos - pos0);
#endif #endif
return p; return p;
} }
static void serdata_default_append_blob (struct ddsi_serdata_default **d, size_t align, size_t sz, const void *data)
{
char *p = serdata_default_append_aligned (d, sz, align);
memcpy (p, data, sz);
}
/* Fixed seed and length */ /* Fixed seed and length */
#define DDS_MH3_LEN 16 #define DDS_MH3_LEN 16
@ -163,12 +158,12 @@ static uint32_t dds_mh3 (const void * key)
return h1; return h1;
} }
static struct ddsi_serdata *fix_serdata_default(struct ddsi_serdata_default *d, uint64_t tp_iid) static struct ddsi_serdata *fix_serdata_default(struct ddsi_serdata_default *d, uint32_t basehash)
{ {
if (d->keyhash.m_flags & DDS_KEY_IS_HASH) if (d->keyhash.m_iskey)
d->c.hash = dds_mh3 (d->keyhash.m_hash) ^ (uint32_t)tp_iid; d->c.hash = dds_mh3 (d->keyhash.m_hash) ^ basehash;
else else
d->c.hash = *((uint32_t *)d->keyhash.m_hash) ^ (uint32_t)tp_iid; d->c.hash = *((uint32_t *)d->keyhash.m_hash) ^ basehash;
return &d->c; return &d->c;
} }
@ -182,23 +177,29 @@ static bool serdata_default_eqkey(const struct ddsi_serdata *acmn, const struct
{ {
const struct ddsi_serdata_default *a = (const struct ddsi_serdata_default *)acmn; const struct ddsi_serdata_default *a = (const struct ddsi_serdata_default *)acmn;
const struct ddsi_serdata_default *b = (const struct ddsi_serdata_default *)bcmn; const struct ddsi_serdata_default *b = (const struct ddsi_serdata_default *)bcmn;
const struct ddsi_sertopic_default *tp; assert (a->keyhash.m_set);
#if 0
assert(a->c.ops == b->c.ops); char astr[50], bstr[50];
tp = (struct ddsi_sertopic_default *)a->c.topic; for (int i = 0; i < 16; i++) {
if (tp->nkeys == 0) sprintf (astr + 3*i, ":%02x", (unsigned char)a->keyhash.m_hash[i]);
return true;
else
{
assert (a->keyhash.m_flags & DDS_KEY_HASH_SET);
return memcmp (a->keyhash.m_hash, b->keyhash.m_hash, 16) == 0;
} }
for (int i = 0; i < 16; i++) {
sprintf (bstr + 3*i, ":%02x", (unsigned char)b->keyhash.m_hash[i]);
}
printf("serdata_default_eqkey: %s %s\n", astr+1, bstr+1);
#endif
return memcmp (a->keyhash.m_hash, b->keyhash.m_hash, 16) == 0;
}
static bool serdata_default_eqkey_nokey (const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn)
{
(void)acmn; (void)bcmn;
return true;
} }
static void serdata_default_free(struct ddsi_serdata *dcmn) static void serdata_default_free(struct ddsi_serdata *dcmn)
{ {
struct ddsi_serdata_default *d = (struct ddsi_serdata_default *)dcmn; struct ddsi_serdata_default *d = (struct ddsi_serdata_default *)dcmn;
dds_free (d->keyhash.m_key_buff);
dds_free (d); dds_free (d);
} }
@ -211,15 +212,12 @@ static void serdata_default_init(struct ddsi_serdata_default *d, const struct dd
#endif #endif
d->hdr.identifier = tp->native_encoding_identifier; d->hdr.identifier = tp->native_encoding_identifier;
d->hdr.options = 0; d->hdr.options = 0;
d->bswap = false;
memset (d->keyhash.m_hash, 0, sizeof (d->keyhash.m_hash)); memset (d->keyhash.m_hash, 0, sizeof (d->keyhash.m_hash));
d->keyhash.m_key_len = 0; d->keyhash.m_set = 0;
d->keyhash.m_flags = 0; d->keyhash.m_iskey = 0;
d->keyhash.m_key_buff = NULL;
d->keyhash.m_key_buff_size = 0;
} }
static struct ddsi_serdata_default *serdata_default_allocnew(struct serstatepool *pool) static struct ddsi_serdata_default *serdata_default_allocnew(struct serdatapool *pool)
{ {
const uint32_t init_size = 128; const uint32_t init_size = 128;
struct ddsi_serdata_default *d = os_malloc(offsetof (struct ddsi_serdata_default, data) + init_size); struct ddsi_serdata_default *d = os_malloc(offsetof (struct ddsi_serdata_default, data) + init_size);
@ -243,26 +241,13 @@ static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind); struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
uint32_t off = 4; /* must skip the CDR header */ uint32_t off = 4; /* must skip the CDR header */
struct serstate st = { .data = d };
assert (fragchain->min == 0); assert (fragchain->min == 0);
assert (fragchain->maxp1 >= off); /* CDR header must be in first fragment */ assert (fragchain->maxp1 >= off); /* CDR header must be in first fragment */
(void)size; (void)size;
memcpy (&d->hdr, NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)), sizeof (d->hdr)); memcpy (&d->hdr, NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)), sizeof (d->hdr));
switch (d->hdr.identifier) { assert (d->hdr.identifier == CDR_LE || d->hdr.identifier == CDR_BE);
case CDR_LE:
case PL_CDR_LE:
d->bswap = ! PLATFORM_IS_LITTLE_ENDIAN;
break;
case CDR_BE:
case PL_CDR_BE:
d->bswap = PLATFORM_IS_LITTLE_ENDIAN;
break;
default:
/* must not ever try to use a serdata format for an unsupported encoding */
abort ();
}
while (fragchain) while (fragchain)
{ {
@ -272,31 +257,48 @@ static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic
{ {
/* only copy if this fragment adds data */ /* only copy if this fragment adds data */
const unsigned char *payload = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)); const unsigned char *payload = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain));
ddsi_serstate_append_blob (&st, 1, fragchain->maxp1 - off, payload + off - fragchain->min); serdata_default_append_blob (&d, 1, fragchain->maxp1 - off, payload + off - fragchain->min);
off = fragchain->maxp1; off = fragchain->maxp1;
} }
fragchain = fragchain->nextfrag; fragchain = fragchain->nextfrag;
} }
/* FIXME: assignment here is because of reallocs, but doing it this way is a bit hacky */
d = st.data;
dds_stream_t is; dds_stream_t is;
dds_stream_from_serdata_default (&is, d); dds_stream_from_serdata_default (&is, d);
dds_stream_read_keyhash (&is, &d->keyhash, (const dds_topic_descriptor_t *)tp->type, kind == SDK_KEY); dds_stream_read_keyhash (&is, &d->keyhash, (const dds_topic_descriptor_t *)tp->type, kind == SDK_KEY);
return fix_serdata_default (d, tp->c.iid); return fix_serdata_default (d, tp->c.serdata_basehash);
} }
struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash) struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash)
{
/* FIXME: not quite sure this is correct, though a check against a specially hacked OpenSplice suggests it is */
if (!(tpcmn->status_cb_entity->m_descriptor->m_flagset & DDS_TOPIC_FIXED_KEY))
{
/* keyhash is MD5 of a key value, so impossible to turn into a key value */
return NULL;
}
else
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY);
d->hdr.identifier = CDR_BE;
serdata_default_append_blob (&d, 1, sizeof (keyhash->value), keyhash->value);
memcpy (d->keyhash.m_hash, keyhash->value, sizeof (d->keyhash.m_hash));
d->keyhash.m_set = 1;
d->keyhash.m_iskey = 1;
return fix_serdata_default(d, tp->c.serdata_basehash);
}
}
struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr_nokey (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash)
{ {
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY); struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY);
struct serstate st = { .data = d }; (void)keyhash;
/* FIXME: not quite sure this is correct */ d->keyhash.m_set = 1;
ddsi_serstate_append_blob (&st, 1, sizeof (keyhash->value), keyhash->value); d->keyhash.m_iskey = 1;
/* FIXME: assignment here is because of reallocs, but doing it this way is a bit hacky */ d->c.hash = tp->c.serdata_basehash;
d = st.data; return (struct ddsi_serdata *)d;
return fix_serdata_default(d, tp->c.iid);
} }
static struct ddsi_serdata *serdata_default_from_sample_cdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample) static struct ddsi_serdata *serdata_default_from_sample_cdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample)
@ -318,7 +320,7 @@ static struct ddsi_serdata *serdata_default_from_sample_cdr (const struct ddsi_s
break; break;
} }
dds_stream_add_to_serdata_default (&os, &d); dds_stream_add_to_serdata_default (&os, &d);
return fix_serdata_default (d, tp->c.iid); return fix_serdata_default (d, tp->c.serdata_basehash);
} }
static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample) static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample)
@ -327,9 +329,7 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
const struct ddsi_plist_sample *sample = vsample; const struct ddsi_plist_sample *sample = vsample;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind); struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
struct serstate st = { .data = d }; serdata_default_append_blob (&d, 1, sample->size, sample->blob);
ddsi_serstate_append_blob (&st, 1, sample->size, sample->blob);
d = st.data;
const unsigned char *rawkey = nn_plist_findparam_native_unchecked (sample->blob, sample->keyparam); const unsigned char *rawkey = nn_plist_findparam_native_unchecked (sample->blob, sample->keyparam);
#ifndef NDEBUG #ifndef NDEBUG
size_t keysize; size_t keysize;
@ -339,11 +339,11 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi
case PID_PARTICIPANT_GUID: case PID_PARTICIPANT_GUID:
case PID_ENDPOINT_GUID: case PID_ENDPOINT_GUID:
case PID_GROUP_GUID: case PID_GROUP_GUID:
d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; d->keyhash.m_set = 1;
d->keyhash.m_key_len = 16; d->keyhash.m_iskey = 1;
memcpy (&d->keyhash.m_hash, rawkey, d->keyhash.m_key_len); memcpy (&d->keyhash.m_hash, rawkey, 16);
#ifndef NDEBUG #ifndef NDEBUG
keysize = d->keyhash.m_key_len; keysize = 16;
#endif #endif
break; break;
@ -354,13 +354,13 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi
md5_state_t md5st; md5_state_t md5st;
md5_byte_t digest[16]; md5_byte_t digest[16];
topic_name_sz = (uint32_t) strlen (topic_name) + 1; topic_name_sz = (uint32_t) strlen (topic_name) + 1;
d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; d->keyhash.m_set = 1;
d->keyhash.m_key_len = 16; d->keyhash.m_iskey = 0;
md5_init (&md5st); md5_init (&md5st);
md5_append (&md5st, (const md5_byte_t *) &topic_name_sz_BE, sizeof (topic_name_sz_BE)); md5_append (&md5st, (const md5_byte_t *) &topic_name_sz_BE, sizeof (topic_name_sz_BE));
md5_append (&md5st, (const md5_byte_t *) topic_name, topic_name_sz); md5_append (&md5st, (const md5_byte_t *) topic_name, topic_name_sz);
md5_finish (&md5st, digest); md5_finish (&md5st, digest);
memcpy (&d->keyhash.m_hash, digest, d->keyhash.m_key_len); memcpy (&d->keyhash.m_hash, digest, 16);
#ifndef NDEBUG #ifndef NDEBUG
keysize = sizeof (uint32_t) + topic_name_sz; keysize = sizeof (uint32_t) + topic_name_sz;
#endif #endif
@ -371,10 +371,10 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi
abort(); abort();
} }
/* if we're it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */ /* if it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */
assert (kind != SDK_KEY || rawkey == (const unsigned char *)sample->blob + sizeof (nn_parameter_t)); assert (kind != SDK_KEY || rawkey == (const unsigned char *)sample->blob + sizeof (nn_parameter_t));
assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup_size (keysize, 4) + sizeof (nn_parameter_t)); assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup_size (keysize, 4) + sizeof (nn_parameter_t));
return fix_serdata_default (d, tp->c.iid); return fix_serdata_default (d, tp->c.serdata_basehash);
} }
static struct ddsi_serdata *serdata_default_from_sample_rawcdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample) static struct ddsi_serdata *serdata_default_from_sample_rawcdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample)
@ -383,15 +383,46 @@ static struct ddsi_serdata *serdata_default_from_sample_rawcdr (const struct dds
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
const struct ddsi_rawcdr_sample *sample = vsample; const struct ddsi_rawcdr_sample *sample = vsample;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind); struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
struct serstate st = { .data = d };
assert (sample->keysize <= 16); assert (sample->keysize <= 16);
ddsi_serstate_append_blob (&st, 1, sample->size, sample->blob); serdata_default_append_blob (&d, 1, sample->size, sample->blob);
d = st.data; d->keyhash.m_set = 1;
d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; d->keyhash.m_iskey = 1;
d->keyhash.m_key_len = (uint32_t) sample->keysize;
if (sample->keysize > 0) if (sample->keysize > 0)
memcpy (&d->keyhash.m_hash, sample->key, sample->keysize); memcpy (&d->keyhash.m_hash, sample->key, sample->keysize);
return fix_serdata_default (d, tp->c.iid); return fix_serdata_default (d, tp->c.serdata_basehash);
}
static struct ddsi_serdata *serdata_default_to_topicless (const struct ddsi_serdata *serdata_common)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)d->c.topic;
struct ddsi_serdata_default *d_tl = serdata_default_new(tp, SDK_KEY);
d_tl->c.topic = NULL;
d_tl->c.hash = d->c.hash;
d_tl->c.timestamp.v = INT64_MIN;
d_tl->keyhash = d->keyhash;
/* These things are used for the key-to-instance map and only subject to eq, free and conversion to an invalid
sample of some topic for topics that can end up in a RHC, so, of the four kinds we have, only for CDR-with-key
the payload is of interest. */
if (d->c.ops == &ddsi_serdata_ops_cdr)
{
if (d->c.kind == SDK_KEY)
{
d_tl->hdr.identifier = d->hdr.identifier;
serdata_default_append_blob (&d_tl, 1, d->pos, d->data);
}
else
{
/* One big hack ... read_sample_write_key goes via keyhash generation ... */
dds_stream_t is, os;
dds_stream_from_serdata_default (&is, d);
dds_stream_from_serdata_default (&os, d_tl);
dds_stream_read_sample_write_key (&os, &is, tp);
dds_stream_add_to_serdata_default (&os, &d_tl);
d_tl->hdr.identifier = os.m_endian ? CDR_LE : CDR_BE;
}
}
return (struct ddsi_serdata *)d_tl;
} }
/* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <= alignup4(size(d)) */ /* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <= alignup4(size(d)) */
@ -400,7 +431,6 @@ static void serdata_default_to_ser (const struct ddsi_serdata *serdata_common, s
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common; const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
assert (off < d->pos + sizeof(struct CDRHeader)); assert (off < d->pos + sizeof(struct CDRHeader));
assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off); assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off);
/* FIXME: maybe I should pull the header out ... */
memcpy (buf, (char *)&d->hdr + off, sz); memcpy (buf, (char *)&d->hdr + off, sz);
} }
@ -410,7 +440,7 @@ static struct ddsi_serdata *serdata_default_to_ser_ref (const struct ddsi_serdat
assert (off < d->pos + sizeof(struct CDRHeader)); assert (off < d->pos + sizeof(struct CDRHeader));
assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off); assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off);
ref->iov_base = (char *)&d->hdr + off; ref->iov_base = (char *)&d->hdr + off;
ref->iov_len = sz; ref->iov_len = (ddsi_iov_len_t)sz;
return ddsi_serdata_ref(serdata_common); return ddsi_serdata_ref(serdata_common);
} }
@ -433,38 +463,29 @@ static bool serdata_default_to_sample_cdr (const struct ddsi_serdata *serdata_co
return true; /* FIXME: can't conversion to sample fail? */ return true; /* FIXME: can't conversion to sample fail? */
} }
static bool serdata_default_to_sample_plist (const struct ddsi_serdata *serdata_common, void *vsample, void **bufptr, void *buflim) static bool serdata_default_topicless_to_sample_cdr (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{ {
#if 0
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common; const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
struct ddsi_plist_sample *sample = vsample; dds_stream_t is;
/* output of to_sample for normal samples is a copy, and so it should be for this one; only for native format (like the inverse) */ assert (d->c.topic == NULL);
assert (d->c.kind == SDK_KEY);
assert (d->c.ops == topic->serdata_ops);
if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */ if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */
assert (d->hdr.identifier == PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE); dds_stream_from_serdata_default(&is, d);
sample->size = d->pos; dds_stream_read_key (&is, sample, (const dds_topic_descriptor_t*) ((struct ddsi_sertopic_default *)topic)->type);
sample->blob = os_malloc (sample->size); return true; /* FIXME: can't conversion to sample fail? */
memcpy (sample->blob, (char *)&d->hdr + sizeof(struct CDRHeader), sample->size);
sample->keyparam = PID_PAD;
return true;
#else
/* I don't think I need this */
(void)serdata_common; (void)vsample; (void)bufptr; (void)buflim;
abort();
return false;
#endif
} }
static bool serdata_default_to_sample_rawcdr (const struct ddsi_serdata *serdata_common, void *vsample, void **bufptr, void *buflim) static bool serdata_default_topicless_to_sample_cdr_nokey (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{ {
/* I don't think I need this */ (void)topic; (void)sample; (void)bufptr; (void)buflim;
(void)serdata_common; (void)vsample; (void)bufptr; (void)buflim; assert (serdata_common->topic == NULL);
abort(); assert (serdata_common->kind == SDK_KEY);
return false; return true;
} }
const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = { const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = {
.get_size = serdata_default_get_size, .get_size = serdata_default_get_size,
.cmpkey = 0,
.eqkey = serdata_default_eqkey, .eqkey = serdata_default_eqkey,
.free = serdata_default_free, .free = serdata_default_free,
.from_ser = serdata_default_from_ser, .from_ser = serdata_default_from_ser,
@ -473,33 +494,52 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = {
.to_ser = serdata_default_to_ser, .to_ser = serdata_default_to_ser,
.to_sample = serdata_default_to_sample_cdr, .to_sample = serdata_default_to_sample_cdr,
.to_ser_ref = serdata_default_to_ser_ref, .to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref .to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = serdata_default_topicless_to_sample_cdr
};
const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey = {
.get_size = serdata_default_get_size,
.eqkey = serdata_default_eqkey_nokey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser,
.from_keyhash = ddsi_serdata_from_keyhash_cdr_nokey,
.from_sample = serdata_default_from_sample_cdr,
.to_ser = serdata_default_to_ser,
.to_sample = serdata_default_to_sample_cdr,
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = serdata_default_topicless_to_sample_cdr_nokey
}; };
const struct ddsi_serdata_ops ddsi_serdata_ops_plist = { const struct ddsi_serdata_ops ddsi_serdata_ops_plist = {
.get_size = serdata_default_get_size, .get_size = serdata_default_get_size,
.cmpkey = 0,
.eqkey = serdata_default_eqkey, .eqkey = serdata_default_eqkey,
.free = serdata_default_free, .free = serdata_default_free,
.from_ser = serdata_default_from_ser, .from_ser = serdata_default_from_ser,
.from_keyhash = 0, /* q_ddsi_discovery.c takes care of it internally */ .from_keyhash = 0,
.from_sample = serdata_default_from_sample_plist, .from_sample = serdata_default_from_sample_plist,
.to_ser = serdata_default_to_ser, .to_ser = serdata_default_to_ser,
.to_sample = serdata_default_to_sample_plist, .to_sample = 0,
.to_ser_ref = serdata_default_to_ser_ref, .to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref .to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = 0
}; };
const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = { const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = {
.get_size = serdata_default_get_size, .get_size = serdata_default_get_size,
.cmpkey = 0,
.eqkey = serdata_default_eqkey, .eqkey = serdata_default_eqkey,
.free = serdata_default_free, .free = serdata_default_free,
.from_ser = serdata_default_from_ser, .from_ser = serdata_default_from_ser,
.from_keyhash = 0, /* q_ddsi_discovery.c takes care of it internally */ .from_keyhash = 0,
.from_sample = serdata_default_from_sample_rawcdr, .from_sample = serdata_default_from_sample_rawcdr,
.to_ser = serdata_default_to_ser, .to_ser = serdata_default_to_ser,
.to_sample = serdata_default_to_sample_rawcdr, .to_sample = 0,
.to_ser_ref = serdata_default_to_ser_ref, .to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref .to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = 0
}; };

View file

@ -21,6 +21,8 @@
#include "ddsi/q_config.h" #include "ddsi/q_config.h"
#include "ddsi/q_freelist.h" #include "ddsi/q_freelist.h"
#include "ddsi/ddsi_sertopic.h" #include "ddsi/ddsi_sertopic.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/q_md5.h"
struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *sertopic_const) struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *sertopic_const)
{ {
@ -44,3 +46,16 @@ void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic)
} }
} }
} }
uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops)
{
md5_state_t md5st;
md5_byte_t digest[16];
uint32_t res;
md5_init (&md5st);
md5_append (&md5st, (const md5_byte_t *) &ops, sizeof (ops));
md5_append (&md5st, (const md5_byte_t *) ops, sizeof (*ops));
md5_finish (&md5st, digest);
memcpy (&res, digest, sizeof (res));
return res;
}

View file

@ -767,8 +767,9 @@ static struct ddsi_sertopic *make_special_topic (uint16_t enc_id, const struct d
os_atomic_st32 (&st->c.refc, 1); os_atomic_st32 (&st->c.refc, 1);
st->c.ops = &ddsi_sertopic_ops_default; st->c.ops = &ddsi_sertopic_ops_default;
st->c.serdata_ops = ops; st->c.serdata_ops = ops;
st->native_encoding_identifier = enc_id; st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops);
st->c.iid = ddsi_plugin.iidgen_fn(); st->c.iid = ddsi_plugin.iidgen_fn();
st->native_encoding_identifier = enc_id;
st->nkeys = 1; st->nkeys = 1;
return (struct ddsi_sertopic *)st; return (struct ddsi_sertopic *)st;
} }
@ -1009,7 +1010,7 @@ int rtps_init (void)
(ddsi_plugin.init_fn) (); (ddsi_plugin.init_fn) ();
gv.xmsgpool = nn_xmsgpool_new (); gv.xmsgpool = nn_xmsgpool_new ();
gv.serpool = ddsi_serstatepool_new (); gv.serpool = ddsi_serdatapool_new ();
#ifdef DDSI_INCLUDE_ENCRYPTION #ifdef DDSI_INCLUDE_ENCRYPTION
if (q_security_plugin.new_decoder) if (q_security_plugin.new_decoder)
@ -1359,7 +1360,7 @@ err_unicast_sockets:
nn_xqos_fini (&gv.default_xqos_wr); nn_xqos_fini (&gv.default_xqos_wr);
nn_xqos_fini (&gv.default_xqos_rd); nn_xqos_fini (&gv.default_xqos_rd);
nn_plist_fini (&gv.default_plist_pp); nn_plist_fini (&gv.default_plist_pp);
ddsi_serstatepool_free (gv.serpool); ddsi_serdatapool_free (gv.serpool);
nn_xmsgpool_free (gv.xmsgpool); nn_xmsgpool_free (gv.xmsgpool);
(ddsi_plugin.fini_fn) (); (ddsi_plugin.fini_fn) ();
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
@ -1654,7 +1655,7 @@ OS_WARNING_MSVC_ON(6001);
os_free (gv.interfaces[i].name); os_free (gv.interfaces[i].name);
} }
ddsi_serstatepool_free (gv.serpool); ddsi_serdatapool_free (gv.serpool);
nn_xmsgpool_free (gv.xmsgpool); nn_xmsgpool_free (gv.xmsgpool);
(ddsi_plugin.fini_fn) (); (ddsi_plugin.fini_fn) ();
nn_log (LC_CONFIG, "Finis.\n"); nn_log (LC_CONFIG, "Finis.\n");

View file

@ -1842,9 +1842,10 @@ static struct ddsi_serdata *extract_sample_from_data
failmsg = "no content"; failmsg = "no content";
else if (!(qos->present & PP_KEYHASH)) else if (!(qos->present & PP_KEYHASH))
failmsg = "qos present but without keyhash"; failmsg = "qos present but without keyhash";
else if ((sample = ddsi_serdata_from_keyhash (topic, &qos->keyhash)) == NULL)
failmsg = "keyhash is MD5 and can't be converted to key value";
else else
{ {
sample = ddsi_serdata_from_keyhash (topic, &qos->keyhash);
sample->statusinfo = statusinfo; sample->statusinfo = statusinfo;
sample->timestamp = tstamp; sample->timestamp = tstamp;
} }

View file

@ -397,6 +397,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg) static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg)
{ {
#define TEST_KEYHASH 0
const size_t expected_inline_qos_size = 4+8+20+4 + 32; const size_t expected_inline_qos_size = 4+8+20+4 + 32;
struct nn_xmsg_marker sm_marker; struct nn_xmsg_marker sm_marker;
unsigned char contentflag; unsigned char contentflag;
@ -408,7 +409,11 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc
contentflag = 0; contentflag = 0;
break; break;
case SDK_KEY: case SDK_KEY:
#if TEST_KEYHASH
contentflag = wr->include_keyhash ? 0 : DATA_FLAG_KEYFLAG;
#else
contentflag = DATA_FLAG_KEYFLAG; contentflag = DATA_FLAG_KEYFLAG;
#endif
break; break;
case SDK_DATA: case SDK_DATA:
contentflag = DATA_FLAG_DATAFLAG; contentflag = DATA_FLAG_DATAFLAG;
@ -452,7 +457,12 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc
data->x.smhdr.flags |= DATAFRAG_FLAG_INLINE_QOS; data->x.smhdr.flags |= DATAFRAG_FLAG_INLINE_QOS;
} }
#if TEST_KEYHASH
if (serdata->kind != SDK_KEY || !wr->include_keyhash)
nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata));
#else
nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata)); nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata));
#endif
nn_xmsg_submsg_setnext (*pmsg, sm_marker); nn_xmsg_submsg_setnext (*pmsg, sm_marker);
return 0; return 0;
} }