diff --git a/src/core/ddsc/include/ddsc/dds_public_stream.h b/src/core/ddsc/include/ddsc/dds_public_stream.h index f729ff6..ef125c8 100644 --- a/src/core/ddsc/include/ddsc/dds_public_stream.h +++ b/src/core/ddsc/include/ddsc/dds_public_stream.h @@ -90,7 +90,9 @@ DDS_EXPORT void dds_stream_write_uint64 (dds_stream_t * os, uint64_t val); DDS_EXPORT void dds_stream_write_float (dds_stream_t * os, float val); DDS_EXPORT void dds_stream_write_double (dds_stream_t * os, double val); DDS_EXPORT void dds_stream_write_string (dds_stream_t * os, const char * val); -DDS_EXPORT void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, uint8_t * buffer); +DDS_EXPORT void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, const uint8_t * buffer); +DDS_EXPORT void *dds_stream_address (dds_stream_t * s); +DDS_EXPORT void *dds_stream_alignto (dds_stream_t * s, uint32_t a); #define dds_stream_write_char(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v))) #define dds_stream_write_int8(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v))) diff --git a/src/core/ddsc/src/dds__stream.h b/src/core/ddsc/src/dds__stream.h index 47da8c4..56350b9 100644 --- a/src/core/ddsc/src/dds__stream.h +++ b/src/core/ddsc/src/dds__stream.h @@ -37,6 +37,7 @@ void dds_stream_from_serdata_default (dds_stream_t * s, const struct ddsi_serdat void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_default **d); void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct ddsi_sertopic_default * topic); +void dds_stream_read_sample_write_key (dds_stream_t *os, dds_stream_t *is, const struct ddsi_sertopic_default *topic); void dds_stream_read_key ( dds_stream_t * is, diff --git a/src/core/ddsc/src/dds__tkmap.h b/src/core/ddsc/src/dds__tkmap.h index 613fb51..c25be3e 100644 --- a/src/core/ddsc/src/dds__tkmap.h +++ b/src/core/ddsc/src/dds__tkmap.h @@ -36,7 +36,7 @@ struct tkmap * dds_tkmap_new (void); void dds_tkmap_free (_Inout_ _Post_invalid_ struct tkmap *tkmap); void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk); uint64_t dds_tkmap_lookup (_In_ struct tkmap *tkmap, _In_ const struct ddsi_serdata *serdata); -_Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample); +_Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, const struct ddsi_sertopic *topic, _In_ uint64_t iid, _Out_ void * sample); _Check_return_ struct tkmap_instance * dds_tkmap_find( _In_ struct ddsi_serdata * sd, _In_ const bool rd, diff --git a/src/core/ddsc/src/dds_instance.c b/src/core/ddsc/src/dds_instance.c index f0736ae..8d43026 100644 --- a/src/core/ddsc/src/dds_instance.c +++ b/src/core/ddsc/src/dds_instance.c @@ -285,7 +285,7 @@ dds_unregister_instance_ih_ts( map = gv.m_tkmap; topic = dds_instance_info((dds_entity*)wr); sample = dds_alloc (topic->m_descriptor->m_size); - if (dds_tkmap_get_key (map, handle, sample)) { + if (dds_tkmap_get_key (map, topic->m_stopic, handle, sample)) { ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action); } else{ ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided handle is found"); @@ -383,7 +383,7 @@ dds_dispose_ih_ts( struct tkmap *map = gv.m_tkmap; const dds_topic *topic = dds_instance_info((dds_entity*)wr); void *sample = dds_alloc (topic->m_descriptor->m_size); - if (dds_tkmap_get_key (map, handle, sample)) { + if (dds_tkmap_get_key (map, topic->m_stopic, handle, sample)) { ret = dds_dispose_impl(wr, sample, handle, timestamp); } else { ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided handle is found"); @@ -455,7 +455,7 @@ dds_instance_get_key( } memset (data, 0, topic->m_descriptor->m_size); - if (dds_tkmap_get_key (map, inst, data)) { + if (dds_tkmap_get_key (map, topic->m_stopic, inst, data)) { ret = DDS_RETCODE_OK; } else{ ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET, "No instance related with the provided entity is found"); diff --git a/src/core/ddsc/src/dds_key.c b/src/core/ddsc/src/dds_key.c index 2bd804b..a7652c2 100644 --- a/src/core/ddsc/src/dds_key.c +++ b/src/core/ddsc/src/dds_key.c @@ -20,94 +20,21 @@ #ifndef NDEBUG static bool keyhash_is_reset(const dds_key_hash_t *kh) { - static const char nullhash[sizeof(kh->m_hash)] = { 0 }; - return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0; + return !kh->m_set; } #endif -void dds_key_md5 (dds_key_hash_t * kh) -{ - md5_state_t md5st; - md5_init (&md5st); - md5_append (&md5st, (md5_byte_t*) kh->m_key_buff, kh->m_key_len); - md5_finish (&md5st, (unsigned char *) kh->m_hash); -} - /* dds_key_gen: Generates key and keyhash for a sample. See section 9.6.3.3 of DDSI spec. */ -void dds_key_gen -( - const dds_topic_descriptor_t * const desc, - dds_key_hash_t * kh, - const char * sample -) +static void dds_key_gen_stream (const dds_topic_descriptor_t * const desc, dds_stream_t *os, const char *sample) { const char * src; const uint32_t * op; uint32_t i; uint32_t len = 0; - char * dst; - - assert(keyhash_is_reset(kh)); - - if (desc->m_nkeys == 0) - { - kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; - kh->m_key_len = sizeof (kh->m_hash); - return; - } - - kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; - - /* Select key buffer to use */ - - if (desc->m_flagset & DDS_TOPIC_FIXED_KEY) - { - kh->m_flags |= DDS_KEY_IS_HASH; - kh->m_key_len = sizeof (kh->m_hash); - dst = kh->m_hash; - } - else - { - /* Calculate key length */ - - for (i = 0; i < desc->m_nkeys; i++) - { - op = desc->m_ops + desc->m_keys[i].m_index; - src = sample + op[1]; - - switch (DDS_OP_TYPE (*op)) - { - case DDS_OP_VAL_1BY: len += 1; break; - case DDS_OP_VAL_2BY: len += 2; break; - case DDS_OP_VAL_4BY: len += 4; break; - case DDS_OP_VAL_8BY: len += 8; break; - case DDS_OP_VAL_STR: - src = *((char**) src); - /* FALLS THROUGH */ - case DDS_OP_VAL_BST: - len += (uint32_t) (5 + strlen (src)); - break; - case DDS_OP_VAL_ARR: - len += op[2] * dds_op_size[DDS_OP_SUBTYPE (*op)]; - break; - default: assert (0); - } - } - - kh->m_key_len = len; - if (len > kh->m_key_buff_size) - { - kh->m_key_buff = dds_realloc_zero (kh->m_key_buff, len); - kh->m_key_buff_size = len; - } - dst = kh->m_key_buff; - } - - /* Write keys to buffer (Big Endian CDR encoded with no padding) */ for (i = 0; i < desc->m_nkeys; i++) { @@ -119,29 +46,22 @@ void dds_key_gen { case DDS_OP_VAL_1BY: { - *dst = *src; - dst++; + dds_stream_write_uint8 (os, *((const uint8_t *) src)); break; } case DDS_OP_VAL_2BY: { - uint16_t u16 = toBE2u (*((const uint16_t*) src)); - memcpy (dst, &u16, sizeof (u16)); - dst += sizeof (u16); + dds_stream_write_uint16 (os, *((const uint16_t *) src)); break; } case DDS_OP_VAL_4BY: { - uint32_t u32 = toBE4u (*((const uint32_t*) src)); - memcpy (dst, &u32, sizeof (u32)); - dst += sizeof (u32); + dds_stream_write_uint32 (os, *((const uint32_t *) src)); break; } case DDS_OP_VAL_8BY: { - uint64_t u64 = toBE8u (*((const uint64_t*) src)); - memcpy (dst, &u64, sizeof (u64)); - dst += sizeof (u64); + dds_stream_write_uint64 (os, *((const uint64_t *) src)); break; } case DDS_OP_VAL_STR: @@ -151,35 +71,55 @@ void dds_key_gen /* FALLS THROUGH */ case DDS_OP_VAL_BST: { - uint32_t u32; len = (uint32_t) (strlen (src) + 1); - u32 = toBE4u (len); - memcpy (dst, &u32, sizeof (u32)); - dst += sizeof (u32); - memcpy (dst, src, len); - dst += len; + dds_stream_write_uint32 (os, len); + dds_stream_write_buffer (os, len, (const uint8_t *) src); break; } case DDS_OP_VAL_ARR: { uint32_t size = dds_op_size[DDS_OP_SUBTYPE (*op)]; + char *dst; len = size * op[2]; - memcpy (dst, src, len); + dst = dds_stream_alignto (os, op[2]); + dds_stream_write_buffer (os, len, (const uint8_t *) src); if (dds_stream_endian () && (size != 1u)) - { dds_stream_swap (dst, size, op[2]); - } - dst += len; break; } default: assert (0); } } +} - /* Hash is md5 of key */ +void dds_key_gen (const dds_topic_descriptor_t * const desc, dds_key_hash_t * kh, const char * sample) +{ + assert(keyhash_is_reset(kh)); - if ((kh->m_flags & DDS_KEY_IS_HASH) == 0) + kh->m_set = 1; + if (desc->m_nkeys == 0) + kh->m_iskey = 1; + else if (desc->m_flagset & DDS_TOPIC_FIXED_KEY) { - dds_key_md5 (kh); + dds_stream_t os; + kh->m_iskey = 1; + dds_stream_init(&os, 0); + os.m_endian = 0; + os.m_buffer.pv = kh->m_hash; + os.m_size = 16; + dds_key_gen_stream (desc, &os, sample); + } + else + { + dds_stream_t os; + md5_state_t md5st; + kh->m_iskey = 0; + dds_stream_init(&os, 64); + os.m_endian = 0; + dds_key_gen_stream (desc, &os, sample); + md5_init (&md5st); + md5_append (&md5st, os.m_buffer.p8, os.m_index); + md5_finish (&md5st, (unsigned char *) kh->m_hash); + dds_stream_fini (&os); } } diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index 28d769b..b9d9cb5 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -149,11 +149,11 @@ static const status_cb_data_t dds_rhc_data_avail_cb_data = { DDS_DATA_AVAILABLE_STATUS, 0, 0, true }; -/* FIXME: populate tkmap with key-only derived serdata, with timestamp - set to invalid. An invalid timestamp is (logically) unordered with - respect to valid timestamps, and that would mean BY_SOURCE order - would be respected even when generating an invalid sample for an - unregister message using the tkmap data. */ +/* FIXME: tkmap should perhaps retain data with timestamp set to invalid + An invalid timestamp is (logically) unordered with respect to valid + timestamps, and that would mean BY_SOURCE order could be respected + even when generating an invalid sample for an unregister message using + the tkmap data. */ /****************************** ****** LIVE WRITERS ****** @@ -1632,12 +1632,7 @@ static int dds_rhc_read_w_qminv { bool trigger_waitsets = false; uint32_t n = 0; -#if 0 - const struct dds_topic_descriptor * desc = (const struct dds_topic_descriptor *) rhc->topic->type; -#else /* FIXME: hack hack -- deserialize_into */ - const struct ddsi_sertopic_default *sertopic_def = (const struct ddsi_sertopic_default *)rhc->topic; - const struct dds_topic_descriptor * desc = sertopic_def->type; -#endif + const struct dds_topic_descriptor * desc = rhc->topic->status_cb_entity->m_descriptor; if (lock) { @@ -1707,7 +1702,7 @@ static int dds_rhc_read_w_qminv if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0) { set_sample_info_invsample (info_seq + n, inst); - ddsi_serdata_to_sample (inst->tk->m_sample, values[n], 0, 0); + ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0); if (!inst->inv_isread) { inst->inv_isread = 1; @@ -1765,12 +1760,7 @@ static int dds_rhc_take_w_qminv bool trigger_waitsets = false; uint64_t iid; uint32_t n = 0; -#if 0 - const struct dds_topic_descriptor * desc = (const struct dds_topic_descriptor *) rhc->topic->type; -#else /* FIXME: hack hack -- deserialize_into */ - const struct ddsi_sertopic_default *sertopic_def = (const struct ddsi_sertopic_default *)rhc->topic; - const struct dds_topic_descriptor * desc = sertopic_def->type; -#endif + const struct dds_topic_descriptor * desc = rhc->topic->status_cb_entity->m_descriptor; if (lock) { @@ -1859,7 +1849,7 @@ static int dds_rhc_take_w_qminv if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0) { set_sample_info_invsample (info_seq + n, inst); - ddsi_serdata_to_sample (inst->tk->m_sample, values[n], 0, 0); + ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0); inst_clear_invsample (rhc, inst); ++n; } diff --git a/src/core/ddsc/src/dds_stream.c b/src/core/ddsc/src/dds_stream.c index cd429a3..e7371d2 100644 --- a/src/core/ddsc/src/dds_stream.c +++ b/src/core/ddsc/src/dds_stream.c @@ -17,12 +17,12 @@ #include "dds__key.h" #include "dds__alloc.h" #include "os/os.h" +#include "ddsi/q_md5.h" + +//#define OP_DEBUG_READ 1 +//#define OP_DEBUG_WRITE 1 +//#define OP_DEBUG_KEY 1 -/* -#define OP_DEBUG_READ 1 -#define OP_DEBUG_WRITE 1 -#define OP_DEBUG_KEY 1 -*/ #if defined OP_DEBUG_WRITE || defined OP_DEBUG_READ || defined OP_DEBUG_KEY static const char * stream_op_type[11] = @@ -452,11 +452,22 @@ void dds_stream_write_string (dds_stream_t * os, const char * val) } } -void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, uint8_t * buffer) +void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, const uint8_t * buffer) { DDS_OS_PUT_BYTES (os, buffer, len); } +void *dds_stream_address (dds_stream_t * s) +{ + return DDS_CDR_ADDRESS(s, void); +} + +void *dds_stream_alignto (dds_stream_t * s, uint32_t a) +{ + DDS_CDR_ALIGNTO (s, a); + return DDS_CDR_ADDRESS (s, void); +} + static void dds_stream_write ( dds_stream_t * os, @@ -1178,7 +1189,8 @@ void dds_stream_from_serdata_default (_Out_ dds_stream_t * s, _In_ const struct s->m_buffer.p8 = (uint8_t*) d; s->m_index = (uint32_t) offsetof (struct ddsi_serdata_default, data); s->m_size = d->size + s->m_index; - s->m_endian = (d->bswap) ? (! DDS_ENDIAN) : DDS_ENDIAN; + assert (d->hdr.identifier == CDR_LE || d->hdr.identifier == CDR_BE); + s->m_endian = (d->hdr.identifier == CDR_LE); } void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_default **d) @@ -1191,7 +1203,7 @@ void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_de (*d) = s->m_buffer.pv; (*d)->pos = (s->m_index - (uint32_t)offsetof (struct ddsi_serdata_default, data)); - (*d)->size = (s->m_size - (uint32_t)offsetof(struct ddsi_serdata_default, data)); + (*d)->size = (s->m_size - (uint32_t)offsetof (struct ddsi_serdata_default, data)); } void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct ddsi_sertopic_default * topic) @@ -1250,7 +1262,7 @@ void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct static uint32_t dds_stream_get_keyhash ( dds_stream_t * is, - char * dst, + dds_stream_t * os, const uint32_t * ops, const bool just_key ) @@ -1261,9 +1273,9 @@ static uint32_t dds_stream_get_keyhash uint32_t subtype; uint32_t num; uint32_t len; + const uint32_t origin = os->m_index; bool is_key; bool have_data; - const char * origin = dst; while ((op = *ops) != DDS_OP_RTS) { @@ -1272,7 +1284,7 @@ static uint32_t dds_stream_get_keyhash case DDS_OP_ADR: { type = DDS_OP_TYPE (op); - is_key = (op & DDS_OP_FLAG_KEY) && (dst != NULL); + is_key = (op & DDS_OP_FLAG_KEY) && (os != NULL); have_data = is_key || !just_key; ops += 2; if (type <= DDS_OP_VAL_8BY) @@ -1305,43 +1317,29 @@ static uint32_t dds_stream_get_keyhash { case DDS_OP_VAL_1BY: { - *dst++ = (char) DDS_IS_GET1 (is); + uint8_t v = DDS_IS_GET1 (is); + DDS_OS_PUT1 (os, v); break; } case DDS_OP_VAL_2BY: { - uint16_t u16 = *DDS_CDR_ADDRESS (is, uint16_t); - if (is->m_endian) - { - u16 = DDS_SWAP16 (u16); - } - memcpy (dst, &u16, sizeof (u16)); - is->m_index += 2; - dst += 2; + uint16_t v; + DDS_IS_GET2 (is, v); + DDS_OS_PUT2 (os, v); break; } case DDS_OP_VAL_4BY: { - uint32_t u32 = *DDS_CDR_ADDRESS (is, uint32_t); - if (is->m_endian) - { - u32 = DDS_SWAP32 (u32); - } - memcpy (dst, &u32, sizeof (u32)); - is->m_index += 4; - dst += 4; + uint32_t v; + DDS_IS_GET4 (is, v, uint32_t); + DDS_OS_PUT4 (os, v, uint32_t); break; } case DDS_OP_VAL_8BY: { - uint64_t u64 = *DDS_CDR_ADDRESS (is, uint64_t); - if (is->m_endian) - { - u64 = DDS_SWAP64 (u64); - } - memcpy (dst, &u64, sizeof (u64)); - is->m_index += 8; - dst += 8; + uint64_t v; + DDS_IS_GET8 (is, v, uint64_t); + DDS_OS_PUT8 (os, v, uint64_t); break; } case DDS_OP_VAL_STR: @@ -1352,11 +1350,8 @@ static uint32_t dds_stream_get_keyhash len = dds_stream_read_uint32 (is); if (is_key) { - uint32_t be32 = toBE4u (len); - memcpy (dst, &be32, 4); - dst += 4; - memcpy (dst, DDS_CDR_ADDRESS (is, void), len); - dst += len; + DDS_OS_PUT4 (os, len, uint32_t); + DDS_OS_PUT_BYTES(os, DDS_CDR_ADDRESS (is, void), len); #ifdef OP_DEBUG_KEY TRACE (("K-ADR: String/BString (%d)\n", len)); #endif @@ -1443,8 +1438,11 @@ static uint32_t dds_stream_get_keyhash align = dds_op_size[subtype]; if (is_key) { + char *dst; + DDS_CDR_ALIGNTO (os, align); + dst = DDS_CDR_ADDRESS(os, char); dds_stream_read_fixed_buffer (is, dst, num, align, is->m_endian); - dst += num * align; + os->m_index += num * align; } is->m_index += num * align; } @@ -1563,21 +1561,40 @@ static uint32_t dds_stream_get_keyhash } case DDS_OP_JSR: /* Implies nested type */ { - dst += dds_stream_get_keyhash (is, dst, ops + DDS_OP_JUMP (op), just_key); + dds_stream_get_keyhash (is, os, ops + DDS_OP_JUMP (op), just_key); ops++; break; } default: assert (0); } } - return (uint32_t) (dst - origin); + return os->m_index - origin; +} + +void dds_stream_read_sample_write_key (dds_stream_t *os, dds_stream_t *is, const struct ddsi_sertopic_default *topic) +{ + const struct dds_topic_descriptor *desc = (const struct dds_topic_descriptor *) topic->type; + uint32_t nbytes; + os->m_endian = 0; + if (os->m_size < is->m_size) + { + os->m_buffer.p8 = dds_realloc (os->m_buffer.p8, is->m_size); + os->m_size = is->m_size; + } + nbytes = dds_stream_get_keyhash (is, os, desc->m_ops, false); + os->m_index += nbytes; + if (os->m_index < os->m_size) + { + os->m_buffer.p8 = dds_realloc (os->m_buffer.p8, os->m_index); + os->m_size = os->m_index; + } } #ifndef NDEBUG static bool keyhash_is_reset(const dds_key_hash_t *kh) { static const char nullhash[sizeof(kh->m_hash)] = { 0 }; - return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0; + return !kh->m_set && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0; } #endif @@ -1589,44 +1606,35 @@ void dds_stream_read_keyhash const bool just_key ) { - char * dst; - assert (keyhash_is_reset(kh)); - + kh->m_set = 1; if (desc->m_nkeys == 0) + kh->m_iskey = 1; + else if (desc->m_flagset & DDS_TOPIC_FIXED_KEY) { - kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; - return; - } - - /* Select key buffer to use */ - - kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; - if (desc->m_flagset & DDS_TOPIC_FIXED_KEY) - { - kh->m_flags |= DDS_KEY_IS_HASH; - dst = kh->m_hash; + dds_stream_t os; + uint32_t ncheck; + kh->m_iskey = 1; + dds_stream_init(&os, 0); + os.m_buffer.pv = kh->m_hash; + os.m_size = 16; + os.m_endian = 0; + ncheck = dds_stream_get_keyhash (is, &os, desc->m_ops, just_key); + assert(ncheck <= 16); + (void)ncheck; } else { - if (is->m_size > kh->m_key_buff_size) - { - kh->m_key_buff = dds_realloc (kh->m_key_buff, is->m_size); - kh->m_key_buff_size = (uint32_t) is->m_size; - } - dst = kh->m_key_buff; - } - kh->m_key_len = dds_stream_get_keyhash (is, dst, desc->m_ops, just_key); - - if (kh->m_flags & DDS_KEY_IS_HASH) - { - assert (kh->m_key_len <= 16); - kh->m_key_len = 16; - } - else - { - /* Hash is md5 of key */ - dds_key_md5 (kh); + dds_stream_t os; + md5_state_t md5st; + kh->m_iskey = 0; + dds_stream_init (&os, 0); + os.m_endian = 0; + dds_stream_get_keyhash (is, &os, desc->m_ops, just_key); + md5_init (&md5st); + md5_append (&md5st, os.m_buffer.p8, os.m_index); + md5_finish (&md5st, (unsigned char *) kh->m_hash); + dds_stream_fini (&os); } } diff --git a/src/core/ddsc/src/dds_tkmap.c b/src/core/ddsc/src/dds_tkmap.c index ad13f98..6348226 100644 --- a/src/core/ddsc/src/dds_tkmap.c +++ b/src/core/ddsc/src/dds_tkmap.c @@ -118,6 +118,7 @@ uint64_t dds_tkmap_lookup (_In_ struct tkmap * map, _In_ const struct ddsi_serda typedef struct { + const struct ddsi_sertopic *topic; uint64_t m_iid; void * m_sample; bool m_ret; @@ -130,15 +131,15 @@ static void dds_tkmap_get_key_fn (void * vtk, void * varg) tkmap_get_key_arg * arg = (tkmap_get_key_arg*) varg; if (tk->m_iid == arg->m_iid) { - ddsi_serdata_to_sample (tk->m_sample, arg->m_sample, 0, 0); + ddsi_serdata_topicless_to_sample (arg->topic, tk->m_sample, arg->m_sample, 0, 0); arg->m_ret = true; } } _Check_return_ -bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample) +bool dds_tkmap_get_key (_In_ struct tkmap * map, const struct ddsi_sertopic *topic, _In_ uint64_t iid, _Out_ void * sample) { - tkmap_get_key_arg arg = { iid, sample, false }; + tkmap_get_key_arg arg = { topic, iid, sample, false }; os_mutexLock (&map->m_lock); ut_chhEnumUnsafe (map->m_hh, dds_tkmap_get_key_fn, &arg); os_mutexUnlock (&map->m_lock); @@ -192,12 +193,7 @@ struct tkmap_instance * dds_tkmap_find( struct tkmap_instance * tk; struct tkmap * map = gv.m_tkmap; - /* FIXME: check this */ -#if 0 - assert(sd->v.keyhash.m_flags & DDS_KEY_HASH_SET); -#endif dummy.m_sample = sd; - retry: if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL) { @@ -223,7 +219,7 @@ retry: if ((tk = dds_alloc (sizeof (*tk))) == NULL) return NULL; - tk->m_sample = ddsi_serdata_ref (sd); + tk->m_sample = ddsi_serdata_to_topicless (sd); tk->m_map = map; os_atomic_st32 (&tk->m_refc, 1); tk->m_iid = dds_iid_gen (); @@ -238,7 +234,7 @@ retry: if (tk && rd) { - TRACE (("tk=%p iid=%"PRIx64"", &tk, tk->m_iid)); + TRACE (("tk=%p iid=%"PRIx64" ", &tk, tk->m_iid)); } return tk; } @@ -247,13 +243,6 @@ _Check_return_ struct tkmap_instance * dds_tkmap_lookup_instance_ref (_In_ struct ddsi_serdata * sd) { assert (vtime_awake_p (lookup_thread_state ()->vtime)); -#if 0 - /* Topic might have been deleted -- FIXME: no way the topic may be deleted when there're still users out there */ - if (sd->v.st->topic == NULL) - { - return NULL; - } -#endif return dds_tkmap_find (sd, true, true); } diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index a1684f5..442499e 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -424,7 +424,8 @@ dds_create_topic( st->c.typename = dds_alloc (strlen (typename) + 1); strcpy (st->c.typename, typename); st->c.ops = &ddsi_sertopic_ops_default; - st->c.serdata_ops = &ddsi_serdata_ops_cdr; + st->c.serdata_ops = desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey; + st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops); st->native_encoding_identifier = (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE); st->type = (void*) desc; diff --git a/src/core/ddsi/include/ddsi/ddsi_serdata.h b/src/core/ddsi/include/ddsi/ddsi_serdata.h index 8511dff..4b2c8ba 100644 --- a/src/core/ddsi/include/ddsi/ddsi_serdata.h +++ b/src/core/ddsi/include/ddsi/ddsi_serdata.h @@ -55,6 +55,9 @@ typedef struct ddsi_serdata * (*ddsi_serdata_from_keyhash_t) (const struct ddsi_ /* Construct a serdata from an application sample */ typedef struct ddsi_serdata * (*ddsi_serdata_from_sample_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample); +/* Construct a topic-less serdata with a keyvalue given a normal serdata (either key or data) - used for tkmap */ +typedef struct ddsi_serdata * (*ddsi_serdata_to_topicless_t) (const struct ddsi_serdata *d); + /* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <= alignup4(size(d)) */ typedef void (*ddsi_serdata_to_ser_t) (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf); @@ -79,16 +82,14 @@ typedef void (*ddsi_serdata_to_ser_unref_t) (struct ddsi_serdata *d, const ddsi_ otherwise malloc() is to be used for those. (This allows read/take to be given a block of memory by the caller.) */ typedef bool (*ddsi_serdata_to_sample_t) (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim); - -/* Compare key values of two serdatas (with the same ddsi_serdata_ops, but not necessarily of the - same topic) (FIXME: not sure I need this one) */ -typedef int (*ddsi_serdata_cmpkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b); +typedef bool (*ddsi_serdata_topicless_to_sample_t) (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim); /* Test key values of two serdatas for equality (with the same ddsi_serdata_ops, but not necessarily of the same topic) */ typedef bool (*ddsi_serdata_eqkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b); struct ddsi_serdata_ops { + ddsi_serdata_eqkey_t eqkey; ddsi_serdata_size_t get_size; ddsi_serdata_from_ser_t from_ser; ddsi_serdata_from_keyhash_t from_keyhash; @@ -97,8 +98,8 @@ struct ddsi_serdata_ops { ddsi_serdata_to_ser_ref_t to_ser_ref; ddsi_serdata_to_ser_unref_t to_ser_unref; ddsi_serdata_to_sample_t to_sample; - ddsi_serdata_cmpkey_t cmpkey; - ddsi_serdata_eqkey_t eqkey; + ddsi_serdata_to_topicless_t to_topicless; + ddsi_serdata_topicless_to_sample_t topicless_to_sample; ddsi_serdata_free_t free; }; @@ -131,6 +132,10 @@ inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic return topic->serdata_ops->from_sample (topic, kind, sample); } +inline struct ddsi_serdata *ddsi_serdata_to_topicless (const struct ddsi_serdata *d) { + return d->ops->to_topicless (d); +} + inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf) { d->ops->to_ser (d, off, sz, buf); } @@ -147,8 +152,8 @@ inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample, return d->ops->to_sample (d, sample, bufptr, buflim); } -inline int ddsi_serdata_cmpkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b) { - return a->ops->cmpkey (a, b); +inline bool ddsi_serdata_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim) { + return d->ops->topicless_to_sample (topic, d, sample, bufptr, buflim); } inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b) { diff --git a/src/core/ddsi/include/ddsi/ddsi_serdata_default.h b/src/core/ddsi/include/ddsi/ddsi_serdata_default.h index f08d8d7..61b0712 100644 --- a/src/core/ddsi/include/ddsi/ddsi_serdata_default.h +++ b/src/core/ddsi/include/ddsi/ddsi_serdata_default.h @@ -47,26 +47,16 @@ struct CDRHeader unsigned short options; }; -struct serstatepool /* FIXME: now a serdatapool */ +struct serdatapool /* FIXME: now a serdatapool */ { struct nn_freelist freelist; }; -struct serstate { - struct ddsi_serdata_default *data; -}; - -#define DDS_KEY_SET 0x0001 -#define DDS_KEY_HASH_SET 0x0002 -#define DDS_KEY_IS_HASH 0x0004 - typedef struct dds_key_hash { char m_hash [16]; /* Key hash value. Also possibly key. Suitably aligned for accessing as uint32_t's */ - uint32_t m_key_len; /* Length of key (may be in m_hash or m_key_buff) */ - uint32_t m_key_buff_size; /* Size of allocated key buffer (m_key_buff) */ - char * m_key_buff; /* Key buffer */ - uint32_t m_flags; /* State of key/hash (see DDS_KEY_XXX) */ + unsigned m_set : 1; /* has it been initialised? */ + unsigned m_iskey : 1; /* m_hash is key value */ } dds_key_hash_t; @@ -75,13 +65,12 @@ struct ddsi_serdata_default struct ddsi_serdata c; uint32_t pos; uint32_t size; - bool bswap; #ifndef NDEBUG bool fixed; #endif dds_key_hash_t keyhash; - struct serstatepool *pool; + struct serdatapool *pool; struct ddsi_serdata_default *next; /* in pool->freelist */ /* padding to ensure CDRHeader is at an offset 4 mod 8 from the @@ -144,14 +133,11 @@ struct ddsi_rawcdr_sample { extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_default; extern const struct ddsi_serdata_ops ddsi_serdata_ops_cdr; +extern const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey; extern const struct ddsi_serdata_ops ddsi_serdata_ops_plist; extern const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr; -struct serstatepool * ddsi_serstatepool_new (void); -void ddsi_serstatepool_free (struct serstatepool * pool); - -OSAPI_EXPORT void ddsi_serstate_append_blob (struct serstate * st, size_t align, size_t sz, const void *data); -void * ddsi_serstate_append (struct serstate * st, size_t n); -void * ddsi_serstate_append_aligned (struct serstate * st, size_t n, size_t a); +struct serdatapool * ddsi_serdatapool_new (void); +void ddsi_serdatapool_free (struct serdatapool * pool); #endif diff --git a/src/core/ddsi/include/ddsi/ddsi_sertopic.h b/src/core/ddsi/include/ddsi/ddsi_sertopic.h index 03a21cb..09b2048 100644 --- a/src/core/ddsi/include/ddsi/ddsi_sertopic.h +++ b/src/core/ddsi/include/ddsi/ddsi_sertopic.h @@ -26,6 +26,7 @@ struct ddsi_sertopic { ut_avlNode_t avlnode; /* index on name_typename */ const struct ddsi_sertopic_ops *ops; const struct ddsi_serdata_ops *serdata_ops; + uint32_t serdata_basehash; char *name_typename; char *name; char *typename; @@ -44,5 +45,6 @@ struct ddsi_sertopic_ops { struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp); void ddsi_sertopic_unref (struct ddsi_sertopic *tp); +uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops); #endif diff --git a/src/core/ddsi/include/ddsi/q_globals.h b/src/core/ddsi/include/ddsi/q_globals.h index 3ae72d2..f8b6087 100644 --- a/src/core/ddsi/include/ddsi/q_globals.h +++ b/src/core/ddsi/include/ddsi/q_globals.h @@ -33,7 +33,7 @@ extern "C" { #endif struct nn_xmsgpool; -struct serstatepool; +struct serdatapool; struct nn_dqueue; struct nn_reorder; struct nn_defrag; @@ -275,7 +275,7 @@ struct q_globals { /* Transmit side: pools for the serializer & transmit messages and a transmit queue*/ - struct serstatepool *serpool; + struct serdatapool *serpool; struct nn_xmsgpool *xmsgpool; struct ddsi_sertopic *plist_topic; /* used for all discovery data */ struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */ diff --git a/src/core/ddsi/src/ddsi_serdata.c b/src/core/ddsi/src/ddsi_serdata.c index 6cbd605..382d524 100644 --- a/src/core/ddsi/src/ddsi_serdata.c +++ b/src/core/ddsi/src/ddsi_serdata.c @@ -40,9 +40,10 @@ extern inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d); extern inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size); extern inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash); extern inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample); +extern inline struct ddsi_serdata *ddsi_serdata_to_topicless (const struct ddsi_serdata *d); extern inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf); extern inline struct ddsi_serdata *ddsi_serdata_to_ser_ref (const struct ddsi_serdata *d, size_t off, size_t sz, ddsi_iovec_t *ref); extern inline void ddsi_serdata_to_ser_unref (struct ddsi_serdata *d, const ddsi_iovec_t *ref); extern inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim); -extern inline int ddsi_serdata_cmpkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b); +extern inline bool ddsi_serdata_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim); extern inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b); diff --git a/src/core/ddsi/src/ddsi_serdata_default.c b/src/core/ddsi/src/ddsi_serdata_default.c index e050c8a..a933626 100644 --- a/src/core/ddsi/src/ddsi_serdata_default.c +++ b/src/core/ddsi/src/ddsi_serdata_default.c @@ -41,15 +41,15 @@ static int ispowerof2_size (size_t x) static size_t alignup_size (size_t x, size_t a); -struct serstatepool * ddsi_serstatepool_new (void) +struct serdatapool * ddsi_serdatapool_new (void) { - struct serstatepool * pool; + struct serdatapool * pool; pool = os_malloc (sizeof (*pool)); nn_freelist_init (&pool->freelist, MAX_POOL_SIZE, offsetof (struct ddsi_serdata_default, next)); return pool; } -static void serstate_free_wrap (void *elem) +static void serdata_free_wrap (void *elem) { #ifndef NDEBUG struct ddsi_serdata_default *d = elem; @@ -58,19 +58,13 @@ static void serstate_free_wrap (void *elem) ddsi_serdata_unref(elem); } -void ddsi_serstatepool_free (struct serstatepool * pool) +void ddsi_serdatapool_free (struct serdatapool * pool) { - TRACE (("ddsi_serstatepool_free(%p)\n", pool)); - nn_freelist_fini (&pool->freelist, serstate_free_wrap); + TRACE (("ddsi_serdatapool_free(%p)\n", pool)); + nn_freelist_fini (&pool->freelist, serdata_free_wrap); os_free (pool); } -void ddsi_serstate_append_blob (struct serstate * st, size_t align, size_t sz, const void *data) -{ - char *p = ddsi_serstate_append_aligned (st, sz, align); - memcpy (p, data, sz); -} - static size_t alignup_size (size_t x, size_t a) { size_t m = a-1; @@ -78,42 +72,43 @@ static size_t alignup_size (size_t x, size_t a) return (x+m) & ~m; } -void * ddsi_serstate_append (struct serstate * st, size_t n) +static void *serdata_default_append (struct ddsi_serdata_default **d, size_t n) { char *p; - if (st->data->pos + n > st->data->size) + if ((*d)->pos + n > (*d)->size) { - size_t size1 = alignup_size (st->data->pos + n, 128); - struct ddsi_serdata_default * data1 = os_realloc (st->data, offsetof (struct ddsi_serdata_default, data) + size1); - st->data = data1; - st->data->size = (uint32_t)size1; + size_t size1 = alignup_size ((*d)->pos + n, 128); + *d = os_realloc (*d, offsetof (struct ddsi_serdata_default, data) + size1); + (*d)->size = (uint32_t)size1; } - assert (st->data->pos + n <= st->data->size); - p = st->data->data + st->data->pos; - st->data->pos += (uint32_t)n; + assert ((*d)->pos + n <= (*d)->size); + p = (*d)->data + (*d)->pos; + (*d)->pos += (uint32_t)n; return p; } -void * ddsi_serstate_append_aligned (struct serstate * st, size_t n, size_t a) +static void *serdata_default_append_aligned (struct ddsi_serdata_default **d, size_t n, size_t a) { - /* Simply align st->pos, without verifying it fits in the allocated - buffer: ddsi_serstate_append() is called immediately afterward and will - grow the buffer as soon as the end of the requested space no - longer fits. */ #if CLEAR_PADDING size_t pos0 = st->pos; #endif char *p; assert (ispowerof2_size (a)); - st->data->pos = (uint32_t) alignup_size (st->data->pos, a); - p = ddsi_serstate_append (st, n); + (*d)->pos = (uint32_t) alignup_size ((*d)->pos, a); + p = serdata_default_append (d, n); #if CLEAR_PADDING - if (p && st->pos > pos0) - memset (st->data->data + pos0, 0, st->pos - pos0); + if (p && (*d)->pos > pos0) + memset ((*d)->data + pos0, 0, (*d)->pos - pos0); #endif return p; } +static void serdata_default_append_blob (struct ddsi_serdata_default **d, size_t align, size_t sz, const void *data) +{ + char *p = serdata_default_append_aligned (d, sz, align); + memcpy (p, data, sz); +} + /* Fixed seed and length */ #define DDS_MH3_LEN 16 @@ -163,12 +158,12 @@ static uint32_t dds_mh3 (const void * key) return h1; } -static struct ddsi_serdata *fix_serdata_default(struct ddsi_serdata_default *d, uint64_t tp_iid) +static struct ddsi_serdata *fix_serdata_default(struct ddsi_serdata_default *d, uint32_t basehash) { - if (d->keyhash.m_flags & DDS_KEY_IS_HASH) - d->c.hash = dds_mh3 (d->keyhash.m_hash) ^ (uint32_t)tp_iid; + if (d->keyhash.m_iskey) + d->c.hash = dds_mh3 (d->keyhash.m_hash) ^ basehash; else - d->c.hash = *((uint32_t *)d->keyhash.m_hash) ^ (uint32_t)tp_iid; + d->c.hash = *((uint32_t *)d->keyhash.m_hash) ^ basehash; return &d->c; } @@ -182,23 +177,29 @@ static bool serdata_default_eqkey(const struct ddsi_serdata *acmn, const struct { const struct ddsi_serdata_default *a = (const struct ddsi_serdata_default *)acmn; const struct ddsi_serdata_default *b = (const struct ddsi_serdata_default *)bcmn; - const struct ddsi_sertopic_default *tp; - - assert(a->c.ops == b->c.ops); - tp = (struct ddsi_sertopic_default *)a->c.topic; - if (tp->nkeys == 0) - return true; - else - { - assert (a->keyhash.m_flags & DDS_KEY_HASH_SET); - return memcmp (a->keyhash.m_hash, b->keyhash.m_hash, 16) == 0; + assert (a->keyhash.m_set); +#if 0 + char astr[50], bstr[50]; + for (int i = 0; i < 16; i++) { + sprintf (astr + 3*i, ":%02x", (unsigned char)a->keyhash.m_hash[i]); } + for (int i = 0; i < 16; i++) { + sprintf (bstr + 3*i, ":%02x", (unsigned char)b->keyhash.m_hash[i]); + } + printf("serdata_default_eqkey: %s %s\n", astr+1, bstr+1); +#endif + return memcmp (a->keyhash.m_hash, b->keyhash.m_hash, 16) == 0; +} + +static bool serdata_default_eqkey_nokey (const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn) +{ + (void)acmn; (void)bcmn; + return true; } static void serdata_default_free(struct ddsi_serdata *dcmn) { struct ddsi_serdata_default *d = (struct ddsi_serdata_default *)dcmn; - dds_free (d->keyhash.m_key_buff); dds_free (d); } @@ -211,15 +212,12 @@ static void serdata_default_init(struct ddsi_serdata_default *d, const struct dd #endif d->hdr.identifier = tp->native_encoding_identifier; d->hdr.options = 0; - d->bswap = false; memset (d->keyhash.m_hash, 0, sizeof (d->keyhash.m_hash)); - d->keyhash.m_key_len = 0; - d->keyhash.m_flags = 0; - d->keyhash.m_key_buff = NULL; - d->keyhash.m_key_buff_size = 0; + d->keyhash.m_set = 0; + d->keyhash.m_iskey = 0; } -static struct ddsi_serdata_default *serdata_default_allocnew(struct serstatepool *pool) +static struct ddsi_serdata_default *serdata_default_allocnew(struct serdatapool *pool) { const uint32_t init_size = 128; struct ddsi_serdata_default *d = os_malloc(offsetof (struct ddsi_serdata_default, data) + init_size); @@ -243,26 +241,13 @@ static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; struct ddsi_serdata_default *d = serdata_default_new(tp, kind); uint32_t off = 4; /* must skip the CDR header */ - struct serstate st = { .data = d }; assert (fragchain->min == 0); assert (fragchain->maxp1 >= off); /* CDR header must be in first fragment */ (void)size; memcpy (&d->hdr, NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)), sizeof (d->hdr)); - switch (d->hdr.identifier) { - case CDR_LE: - case PL_CDR_LE: - d->bswap = ! PLATFORM_IS_LITTLE_ENDIAN; - break; - case CDR_BE: - case PL_CDR_BE: - d->bswap = PLATFORM_IS_LITTLE_ENDIAN; - break; - default: - /* must not ever try to use a serdata format for an unsupported encoding */ - abort (); - } + assert (d->hdr.identifier == CDR_LE || d->hdr.identifier == CDR_BE); while (fragchain) { @@ -272,31 +257,48 @@ static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic { /* only copy if this fragment adds data */ const unsigned char *payload = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)); - ddsi_serstate_append_blob (&st, 1, fragchain->maxp1 - off, payload + off - fragchain->min); + serdata_default_append_blob (&d, 1, fragchain->maxp1 - off, payload + off - fragchain->min); off = fragchain->maxp1; } fragchain = fragchain->nextfrag; } - /* FIXME: assignment here is because of reallocs, but doing it this way is a bit hacky */ - d = st.data; - dds_stream_t is; dds_stream_from_serdata_default (&is, d); dds_stream_read_keyhash (&is, &d->keyhash, (const dds_topic_descriptor_t *)tp->type, kind == SDK_KEY); - return fix_serdata_default (d, tp->c.iid); + return fix_serdata_default (d, tp->c.serdata_basehash); } struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash) +{ + /* FIXME: not quite sure this is correct, though a check against a specially hacked OpenSplice suggests it is */ + if (!(tpcmn->status_cb_entity->m_descriptor->m_flagset & DDS_TOPIC_FIXED_KEY)) + { + /* keyhash is MD5 of a key value, so impossible to turn into a key value */ + return NULL; + } + else + { + const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; + struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY); + d->hdr.identifier = CDR_BE; + serdata_default_append_blob (&d, 1, sizeof (keyhash->value), keyhash->value); + memcpy (d->keyhash.m_hash, keyhash->value, sizeof (d->keyhash.m_hash)); + d->keyhash.m_set = 1; + d->keyhash.m_iskey = 1; + return fix_serdata_default(d, tp->c.serdata_basehash); + } +} + +struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr_nokey (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash) { const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY); - struct serstate st = { .data = d }; - /* FIXME: not quite sure this is correct */ - ddsi_serstate_append_blob (&st, 1, sizeof (keyhash->value), keyhash->value); - /* FIXME: assignment here is because of reallocs, but doing it this way is a bit hacky */ - d = st.data; - return fix_serdata_default(d, tp->c.iid); + (void)keyhash; + d->keyhash.m_set = 1; + d->keyhash.m_iskey = 1; + d->c.hash = tp->c.serdata_basehash; + return (struct ddsi_serdata *)d; } static struct ddsi_serdata *serdata_default_from_sample_cdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample) @@ -318,7 +320,7 @@ static struct ddsi_serdata *serdata_default_from_sample_cdr (const struct ddsi_s break; } dds_stream_add_to_serdata_default (&os, &d); - return fix_serdata_default (d, tp->c.iid); + return fix_serdata_default (d, tp->c.serdata_basehash); } static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample) @@ -327,9 +329,7 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; const struct ddsi_plist_sample *sample = vsample; struct ddsi_serdata_default *d = serdata_default_new(tp, kind); - struct serstate st = { .data = d }; - ddsi_serstate_append_blob (&st, 1, sample->size, sample->blob); - d = st.data; + serdata_default_append_blob (&d, 1, sample->size, sample->blob); const unsigned char *rawkey = nn_plist_findparam_native_unchecked (sample->blob, sample->keyparam); #ifndef NDEBUG size_t keysize; @@ -339,11 +339,11 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi case PID_PARTICIPANT_GUID: case PID_ENDPOINT_GUID: case PID_GROUP_GUID: - d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; - d->keyhash.m_key_len = 16; - memcpy (&d->keyhash.m_hash, rawkey, d->keyhash.m_key_len); + d->keyhash.m_set = 1; + d->keyhash.m_iskey = 1; + memcpy (&d->keyhash.m_hash, rawkey, 16); #ifndef NDEBUG - keysize = d->keyhash.m_key_len; + keysize = 16; #endif break; @@ -354,13 +354,13 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi md5_state_t md5st; md5_byte_t digest[16]; topic_name_sz = (uint32_t) strlen (topic_name) + 1; - d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; - d->keyhash.m_key_len = 16; + d->keyhash.m_set = 1; + d->keyhash.m_iskey = 0; md5_init (&md5st); md5_append (&md5st, (const md5_byte_t *) &topic_name_sz_BE, sizeof (topic_name_sz_BE)); md5_append (&md5st, (const md5_byte_t *) topic_name, topic_name_sz); md5_finish (&md5st, digest); - memcpy (&d->keyhash.m_hash, digest, d->keyhash.m_key_len); + memcpy (&d->keyhash.m_hash, digest, 16); #ifndef NDEBUG keysize = sizeof (uint32_t) + topic_name_sz; #endif @@ -371,10 +371,10 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi abort(); } - /* if we're it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */ + /* if it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */ assert (kind != SDK_KEY || rawkey == (const unsigned char *)sample->blob + sizeof (nn_parameter_t)); assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup_size (keysize, 4) + sizeof (nn_parameter_t)); - return fix_serdata_default (d, tp->c.iid); + return fix_serdata_default (d, tp->c.serdata_basehash); } static struct ddsi_serdata *serdata_default_from_sample_rawcdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample) @@ -383,15 +383,46 @@ static struct ddsi_serdata *serdata_default_from_sample_rawcdr (const struct dds const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; const struct ddsi_rawcdr_sample *sample = vsample; struct ddsi_serdata_default *d = serdata_default_new(tp, kind); - struct serstate st = { .data = d }; assert (sample->keysize <= 16); - ddsi_serstate_append_blob (&st, 1, sample->size, sample->blob); - d = st.data; - d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; - d->keyhash.m_key_len = (uint32_t) sample->keysize; + serdata_default_append_blob (&d, 1, sample->size, sample->blob); + d->keyhash.m_set = 1; + d->keyhash.m_iskey = 1; if (sample->keysize > 0) memcpy (&d->keyhash.m_hash, sample->key, sample->keysize); - return fix_serdata_default (d, tp->c.iid); + return fix_serdata_default (d, tp->c.serdata_basehash); +} + +static struct ddsi_serdata *serdata_default_to_topicless (const struct ddsi_serdata *serdata_common) +{ + const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common; + const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)d->c.topic; + struct ddsi_serdata_default *d_tl = serdata_default_new(tp, SDK_KEY); + d_tl->c.topic = NULL; + d_tl->c.hash = d->c.hash; + d_tl->c.timestamp.v = INT64_MIN; + d_tl->keyhash = d->keyhash; + /* These things are used for the key-to-instance map and only subject to eq, free and conversion to an invalid + sample of some topic for topics that can end up in a RHC, so, of the four kinds we have, only for CDR-with-key + the payload is of interest. */ + if (d->c.ops == &ddsi_serdata_ops_cdr) + { + if (d->c.kind == SDK_KEY) + { + d_tl->hdr.identifier = d->hdr.identifier; + serdata_default_append_blob (&d_tl, 1, d->pos, d->data); + } + else + { + /* One big hack ... read_sample_write_key goes via keyhash generation ... */ + dds_stream_t is, os; + dds_stream_from_serdata_default (&is, d); + dds_stream_from_serdata_default (&os, d_tl); + dds_stream_read_sample_write_key (&os, &is, tp); + dds_stream_add_to_serdata_default (&os, &d_tl); + d_tl->hdr.identifier = os.m_endian ? CDR_LE : CDR_BE; + } + } + return (struct ddsi_serdata *)d_tl; } /* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <= alignup4(size(d)) */ @@ -400,7 +431,6 @@ static void serdata_default_to_ser (const struct ddsi_serdata *serdata_common, s const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common; assert (off < d->pos + sizeof(struct CDRHeader)); assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off); - /* FIXME: maybe I should pull the header out ... */ memcpy (buf, (char *)&d->hdr + off, sz); } @@ -410,7 +440,7 @@ static struct ddsi_serdata *serdata_default_to_ser_ref (const struct ddsi_serdat assert (off < d->pos + sizeof(struct CDRHeader)); assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off); ref->iov_base = (char *)&d->hdr + off; - ref->iov_len = sz; + ref->iov_len = (ddsi_iov_len_t)sz; return ddsi_serdata_ref(serdata_common); } @@ -433,38 +463,29 @@ static bool serdata_default_to_sample_cdr (const struct ddsi_serdata *serdata_co return true; /* FIXME: can't conversion to sample fail? */ } -static bool serdata_default_to_sample_plist (const struct ddsi_serdata *serdata_common, void *vsample, void **bufptr, void *buflim) +static bool serdata_default_topicless_to_sample_cdr (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim) { -#if 0 const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common; - struct ddsi_plist_sample *sample = vsample; - /* output of to_sample for normal samples is a copy, and so it should be for this one; only for native format (like the inverse) */ + dds_stream_t is; + assert (d->c.topic == NULL); + assert (d->c.kind == SDK_KEY); + assert (d->c.ops == topic->serdata_ops); if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */ - assert (d->hdr.identifier == PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE); - sample->size = d->pos; - sample->blob = os_malloc (sample->size); - memcpy (sample->blob, (char *)&d->hdr + sizeof(struct CDRHeader), sample->size); - sample->keyparam = PID_PAD; - return true; -#else - /* I don't think I need this */ - (void)serdata_common; (void)vsample; (void)bufptr; (void)buflim; - abort(); - return false; -#endif + dds_stream_from_serdata_default(&is, d); + dds_stream_read_key (&is, sample, (const dds_topic_descriptor_t*) ((struct ddsi_sertopic_default *)topic)->type); + return true; /* FIXME: can't conversion to sample fail? */ } -static bool serdata_default_to_sample_rawcdr (const struct ddsi_serdata *serdata_common, void *vsample, void **bufptr, void *buflim) +static bool serdata_default_topicless_to_sample_cdr_nokey (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim) { - /* I don't think I need this */ - (void)serdata_common; (void)vsample; (void)bufptr; (void)buflim; - abort(); - return false; + (void)topic; (void)sample; (void)bufptr; (void)buflim; + assert (serdata_common->topic == NULL); + assert (serdata_common->kind == SDK_KEY); + return true; } const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = { .get_size = serdata_default_get_size, - .cmpkey = 0, .eqkey = serdata_default_eqkey, .free = serdata_default_free, .from_ser = serdata_default_from_ser, @@ -473,33 +494,52 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = { .to_ser = serdata_default_to_ser, .to_sample = serdata_default_to_sample_cdr, .to_ser_ref = serdata_default_to_ser_ref, - .to_ser_unref = serdata_default_to_ser_unref + .to_ser_unref = serdata_default_to_ser_unref, + .to_topicless = serdata_default_to_topicless, + .topicless_to_sample = serdata_default_topicless_to_sample_cdr +}; + +const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey = { + .get_size = serdata_default_get_size, + .eqkey = serdata_default_eqkey_nokey, + .free = serdata_default_free, + .from_ser = serdata_default_from_ser, + .from_keyhash = ddsi_serdata_from_keyhash_cdr_nokey, + .from_sample = serdata_default_from_sample_cdr, + .to_ser = serdata_default_to_ser, + .to_sample = serdata_default_to_sample_cdr, + .to_ser_ref = serdata_default_to_ser_ref, + .to_ser_unref = serdata_default_to_ser_unref, + .to_topicless = serdata_default_to_topicless, + .topicless_to_sample = serdata_default_topicless_to_sample_cdr_nokey }; const struct ddsi_serdata_ops ddsi_serdata_ops_plist = { .get_size = serdata_default_get_size, - .cmpkey = 0, .eqkey = serdata_default_eqkey, .free = serdata_default_free, .from_ser = serdata_default_from_ser, - .from_keyhash = 0, /* q_ddsi_discovery.c takes care of it internally */ + .from_keyhash = 0, .from_sample = serdata_default_from_sample_plist, .to_ser = serdata_default_to_ser, - .to_sample = serdata_default_to_sample_plist, + .to_sample = 0, .to_ser_ref = serdata_default_to_ser_ref, - .to_ser_unref = serdata_default_to_ser_unref + .to_ser_unref = serdata_default_to_ser_unref, + .to_topicless = serdata_default_to_topicless, + .topicless_to_sample = 0 }; const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = { .get_size = serdata_default_get_size, - .cmpkey = 0, .eqkey = serdata_default_eqkey, .free = serdata_default_free, .from_ser = serdata_default_from_ser, - .from_keyhash = 0, /* q_ddsi_discovery.c takes care of it internally */ + .from_keyhash = 0, .from_sample = serdata_default_from_sample_rawcdr, .to_ser = serdata_default_to_ser, - .to_sample = serdata_default_to_sample_rawcdr, + .to_sample = 0, .to_ser_ref = serdata_default_to_ser_ref, - .to_ser_unref = serdata_default_to_ser_unref + .to_ser_unref = serdata_default_to_ser_unref, + .to_topicless = serdata_default_to_topicless, + .topicless_to_sample = 0 }; diff --git a/src/core/ddsi/src/ddsi_sertopic.c b/src/core/ddsi/src/ddsi_sertopic.c index 0dc2d10..2c415a6 100644 --- a/src/core/ddsi/src/ddsi_sertopic.c +++ b/src/core/ddsi/src/ddsi_sertopic.c @@ -21,6 +21,8 @@ #include "ddsi/q_config.h" #include "ddsi/q_freelist.h" #include "ddsi/ddsi_sertopic.h" +#include "ddsi/ddsi_serdata.h" +#include "ddsi/q_md5.h" struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *sertopic_const) { @@ -44,3 +46,16 @@ void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic) } } } + +uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops) +{ + md5_state_t md5st; + md5_byte_t digest[16]; + uint32_t res; + md5_init (&md5st); + md5_append (&md5st, (const md5_byte_t *) &ops, sizeof (ops)); + md5_append (&md5st, (const md5_byte_t *) ops, sizeof (*ops)); + md5_finish (&md5st, digest); + memcpy (&res, digest, sizeof (res)); + return res; +} diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 3f653f0..688e6d7 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -767,8 +767,9 @@ static struct ddsi_sertopic *make_special_topic (uint16_t enc_id, const struct d os_atomic_st32 (&st->c.refc, 1); st->c.ops = &ddsi_sertopic_ops_default; st->c.serdata_ops = ops; - st->native_encoding_identifier = enc_id; + st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops); st->c.iid = ddsi_plugin.iidgen_fn(); + st->native_encoding_identifier = enc_id; st->nkeys = 1; return (struct ddsi_sertopic *)st; } @@ -1009,7 +1010,7 @@ int rtps_init (void) (ddsi_plugin.init_fn) (); gv.xmsgpool = nn_xmsgpool_new (); - gv.serpool = ddsi_serstatepool_new (); + gv.serpool = ddsi_serdatapool_new (); #ifdef DDSI_INCLUDE_ENCRYPTION if (q_security_plugin.new_decoder) @@ -1359,7 +1360,7 @@ err_unicast_sockets: nn_xqos_fini (&gv.default_xqos_wr); nn_xqos_fini (&gv.default_xqos_rd); nn_plist_fini (&gv.default_plist_pp); - ddsi_serstatepool_free (gv.serpool); + ddsi_serdatapool_free (gv.serpool); nn_xmsgpool_free (gv.xmsgpool); (ddsi_plugin.fini_fn) (); #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS @@ -1654,7 +1655,7 @@ OS_WARNING_MSVC_ON(6001); os_free (gv.interfaces[i].name); } - ddsi_serstatepool_free (gv.serpool); + ddsi_serdatapool_free (gv.serpool); nn_xmsgpool_free (gv.xmsgpool); (ddsi_plugin.fini_fn) (); nn_log (LC_CONFIG, "Finis.\n"); diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 5c3b69b..1fe232a 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -1842,9 +1842,10 @@ static struct ddsi_serdata *extract_sample_from_data failmsg = "no content"; else if (!(qos->present & PP_KEYHASH)) failmsg = "qos present but without keyhash"; + else if ((sample = ddsi_serdata_from_keyhash (topic, &qos->keyhash)) == NULL) + failmsg = "keyhash is MD5 and can't be converted to key value"; else { - sample = ddsi_serdata_from_keyhash (topic, &qos->keyhash); sample->statusinfo = statusinfo; sample->timestamp = tstamp; } diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index cbd8e39..3b87032 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -397,6 +397,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg) { +#define TEST_KEYHASH 0 const size_t expected_inline_qos_size = 4+8+20+4 + 32; struct nn_xmsg_marker sm_marker; unsigned char contentflag; @@ -408,7 +409,11 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc contentflag = 0; break; case SDK_KEY: +#if TEST_KEYHASH + contentflag = wr->include_keyhash ? 0 : DATA_FLAG_KEYFLAG; +#else contentflag = DATA_FLAG_KEYFLAG; +#endif break; case SDK_DATA: contentflag = DATA_FLAG_DATAFLAG; @@ -452,7 +457,12 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc data->x.smhdr.flags |= DATAFRAG_FLAG_INLINE_QOS; } +#if TEST_KEYHASH + if (serdata->kind != SDK_KEY || !wr->include_keyhash) + nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata)); +#else nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata)); +#endif nn_xmsg_submsg_setnext (*pmsg, sm_marker); return 0; }