From daa17704db5101cd75e57edc3aacd19b785cac7d Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 6 Jul 2018 21:19:23 +0200 Subject: [PATCH] Fix dds_sample_info.publication_handle incorrectly set to 1 as well as some corner cases where it ended up at 0 and some related assertion failures (#8) Signed-off-by: Erik Boasson --- src/core/ddsc/include/ddsc/dds.h | 2 - src/core/ddsc/src/dds__rhc.h | 4 +- src/core/ddsc/src/dds__tkmap.h | 1 - src/core/ddsc/src/dds_init.c | 16 +- src/core/ddsc/src/dds_instance.c | 2 +- src/core/ddsc/src/dds_key.c | 18 +- src/core/ddsc/src/dds_rhc.c | 236 ++++++++++--------- src/core/ddsc/src/dds_stream.c | 18 +- src/core/ddsc/src/dds_tkmap.c | 48 +--- src/core/ddsc/src/dds_write.c | 77 ++---- src/core/ddsc/src/q_osplser.c | 29 +-- src/core/ddsc/tests/reader_iterator.c | 29 ++- src/core/ddsi/CMakeLists.txt | 2 + src/core/ddsi/include/ddsi/ddsi_rhc_plugin.h | 48 ++++ src/core/ddsi/include/ddsi/ddsi_ser.h | 2 +- src/core/ddsi/include/ddsi/q_config.h | 25 +- src/core/ddsi/include/ddsi/q_radmin.h | 11 - src/core/ddsi/src/ddsi_rhc_plugin.c | 22 ++ src/core/ddsi/src/q_entity.c | 50 +--- src/core/ddsi/src/q_plist.c | 10 - src/core/ddsi/src/q_receive.c | 54 ++--- src/core/ddsi/src/q_transmit.c | 8 +- src/core/ddsi/src/q_xevent.c | 4 +- src/tools/pubsub/pubsub.c | 12 +- 24 files changed, 343 insertions(+), 385 deletions(-) create mode 100644 src/core/ddsi/include/ddsi/ddsi_rhc_plugin.h create mode 100644 src/core/ddsi/src/ddsi_rhc_plugin.c diff --git a/src/core/ddsc/include/ddsc/dds.h b/src/core/ddsc/include/ddsc/dds.h index 813399f..47e3977 100644 --- a/src/core/ddsc/include/ddsc/dds.h +++ b/src/core/ddsc/include/ddsc/dds.h @@ -173,8 +173,6 @@ typedef struct dds_sample_info uint32_t generation_rank; /** difference in generations between the sample and most recent sample of the same instance when read/take was called */ uint32_t absolute_generation_rank; - /** timestamp of a data instance when it is added to a read queue */ - dds_time_t reception_timestamp; /* NOTE: VLite extension */ } dds_sample_info_t; diff --git a/src/core/ddsc/src/dds__rhc.h b/src/core/ddsc/src/dds__rhc.h index 0e5f9f8..315bd84 100644 --- a/src/core/ddsc/src/dds__rhc.h +++ b/src/core/ddsc/src/dds__rhc.h @@ -25,8 +25,6 @@ struct rhc; struct nn_xqos; struct serdata; struct tkmap_instance; -struct tkmap; -struct nn_rsample_info; struct proxy_writer_info; struct rhc * dds_rhc_new (dds_reader * reader, const struct sertopic * topic); @@ -37,7 +35,7 @@ uint32_t dds_rhc_lock_samples (struct rhc * rhc); DDS_EXPORT bool dds_rhc_store ( - struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo, + struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct serdata * __restrict sample, struct tkmap_instance * __restrict tk ); void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info); diff --git a/src/core/ddsc/src/dds__tkmap.h b/src/core/ddsc/src/dds__tkmap.h index 1c8ffdb..64d78d2 100644 --- a/src/core/ddsc/src/dds__tkmap.h +++ b/src/core/ddsc/src/dds__tkmap.h @@ -38,7 +38,6 @@ void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk); uint64_t dds_tkmap_lookup (_In_ struct tkmap *tkmap, _In_ const struct serdata *serdata); _Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample); _Check_return_ struct tkmap_instance * dds_tkmap_find( - _In_opt_ const struct dds_topic * topic, _In_ struct serdata * sd, _In_ const bool rd, _In_ const bool create); diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 5e9970d..785dd1d 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -263,14 +263,14 @@ void ddsi_plugin_init (void) /* Register read cache functions */ - ddsi_plugin.rhc_free_fn = dds_rhc_free; - ddsi_plugin.rhc_fini_fn = dds_rhc_fini; - ddsi_plugin.rhc_store_fn = dds_rhc_store; - ddsi_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr; - ddsi_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership; - ddsi_plugin.rhc_set_qos_fn = dds_rhc_set_qos; - ddsi_plugin.rhc_lookup_fn = dds_tkmap_lookup_instance_ref; - ddsi_plugin.rhc_unref_fn = dds_tkmap_instance_unref; + ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free; + ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini; + ddsi_plugin.rhc_plugin.rhc_store_fn = dds_rhc_store; + ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr; + ddsi_plugin.rhc_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership; + ddsi_plugin.rhc_plugin.rhc_set_qos_fn = dds_rhc_set_qos; + ddsi_plugin.rhc_plugin.rhc_lookup_fn = dds_tkmap_lookup_instance_ref; + ddsi_plugin.rhc_plugin.rhc_unref_fn = dds_tkmap_instance_unref; /* Register iid generator */ diff --git a/src/core/ddsc/src/dds_instance.c b/src/core/ddsc/src/dds_instance.c index a32b595..f39a748 100644 --- a/src/core/ddsc/src/dds_instance.c +++ b/src/core/ddsc/src/dds_instance.c @@ -71,7 +71,7 @@ dds_instance_find( _In_ const bool create) { serdata_t sd = serialize_key (gv.serpool, topic->m_stopic, data); - struct tkmap_instance * inst = dds_tkmap_find (topic, sd, false, create); + struct tkmap_instance * inst = dds_tkmap_find (sd, false, create); ddsi_serdata_unref (sd); return inst; } diff --git a/src/core/ddsc/src/dds_key.c b/src/core/ddsc/src/dds_key.c index 092821b..b3cd8dd 100644 --- a/src/core/ddsc/src/dds_key.c +++ b/src/core/ddsc/src/dds_key.c @@ -17,6 +17,14 @@ #include "ddsi/q_bswap.h" #include "ddsi/q_md5.h" +#ifndef NDEBUG +static bool keyhash_is_reset(const dds_key_hash_t *kh) +{ + static const char nullhash[sizeof(kh->m_hash)]; + return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0; +} +#endif + void dds_key_md5 (dds_key_hash_t * kh) { md5_state_t md5st; @@ -43,8 +51,14 @@ void dds_key_gen uint32_t len = 0; char * dst; - assert (desc->m_nkeys); - assert (kh->m_hash[0] == 0 && kh->m_hash[15] == 0); + assert(keyhash_is_reset(kh)); + + if (desc->m_nkeys == 0) + { + kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; + kh->m_key_len = sizeof (kh->m_hash); + return; + } kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index 33df761..1016964 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -210,6 +210,13 @@ static int lwregs_delete (struct lwregs *rt, uint64_t iid, uint64_t wr_iid) return ut_ehhRemove (rt->regs, &dummy); } +void lwregs_dump (struct lwregs *rt) +{ + struct ut_ehhIter it; + for (struct lwreg *r = ut_ehhIterFirst(rt->regs, &it); r; r = ut_ehhIterNext(&it)) + printf("iid=%"PRIu64" wr_iid=%"PRIu64"\n", r->iid, r->wr_iid); +} + /************************* ****** RHC ****** *************************/ @@ -219,7 +226,6 @@ struct rhc_sample struct serdata *sample; /* serialised data (either just_key or real data) */ struct rhc_sample *next; /* next sample in time ordering, or oldest sample if most recent */ uint64_t wr_iid; /* unique id for writer of this sample (perhaps better in serdata) */ - nn_wctime_t rtstamp; /* reception timestamp (not really required; perhaps better in serdata) */ bool isread; /* READ or NOT_READ sample state */ unsigned disposed_gen; /* snapshot of instance counter at time of insertion */ unsigned no_writers_gen; /* __/ */ @@ -228,15 +234,16 @@ struct rhc_sample struct rhc_instance { uint64_t iid; /* unique instance id, key of table, also serves as instance handle */ - uint64_t wr_iid; /* unique of id of writer of latest sample or 0 */ + uint64_t wr_iid; /* unique of id of writer of latest sample or 0; if wrcount = 0 it is the wr_iid that caused */ struct rhc_sample *latest; /* latest received sample; circular list old->new; null if no sample */ unsigned nvsamples; /* number of "valid" samples in instance */ unsigned nvread; /* number of READ "valid" samples in instance (0 <= nvread <= nvsamples) */ uint32_t wrcount; /* number of live writers */ - bool isnew; /* NEW or NOT_NEW view state */ - bool a_sample_free; /* whether or not a_sample is in use */ - bool isdisposed; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */ - bool has_changed; /* To track changes in an instance - if number of samples are added or data is overwritten */ + unsigned isnew : 1; /* NEW or NOT_NEW view state */ + unsigned a_sample_free : 1; /* whether or not a_sample is in use */ + unsigned isdisposed : 1; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */ + unsigned has_changed : 1; /* To track changes in an instance - if number of samples are added or data is overwritten */ + unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */ unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */ unsigned inv_isread : 1; /* whether or not that state change has been read before */ unsigned disposed_gen; /* bloody generation counters - worst invention of mankind */ @@ -312,9 +319,7 @@ struct trigger_info #define INST_HAS_UNREAD(i) (INST_NREAD (i) < INST_NSAMPLES (i)) static unsigned qmask_of_inst (const struct rhc_instance *inst); -static bool update_conditions_locked -(struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct serdata *sample); -static void signal_conditions (struct rhc *rhc); +static bool update_conditions_locked (struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct serdata *sample); #ifndef NDEBUG static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds); #endif @@ -415,7 +420,7 @@ static struct rhc_sample * alloc_sample (struct rhc_instance *inst) { if (inst->a_sample_free) { - inst->a_sample_free = false; + inst->a_sample_free = 0; #if USE_VALGRIND VALGRIND_MAKE_MEM_UNDEFINED (&inst->a_sample, sizeof (inst->a_sample)); #endif @@ -439,7 +444,7 @@ static void free_sample (struct rhc_instance *inst, struct rhc_sample *s) #if USE_VALGRIND VALGRIND_MAKE_MEM_NOACCESS (&inst->a_sample, sizeof (inst->a_sample)); #endif - inst->a_sample_free = true; + inst->a_sample_free = 1; } else { @@ -556,7 +561,7 @@ static void get_trigger_info (struct trigger_info *info, struct rhc_instance *in /* reset instance has_changed before adding/overwriting a sample */ if (pre) { - inst->has_changed = false; + inst->has_changed = 0; } info->has_changed = inst->has_changed; } @@ -571,13 +576,12 @@ static bool add_sample ( struct rhc * rhc, struct rhc_instance * inst, - const struct nn_rsample_info * sampleinfo, + const struct proxy_writer_info * pwr_info, const struct serdata * sample, status_cb_data_t * cb_data ) { struct rhc_sample *s; - assert (sample->v.bswap == sampleinfo->bswap); /* Adding a sample always clears an invalid sample (because the information contained in the invalid sample - the instance state and the generation @@ -604,7 +608,7 @@ static bool add_sample } /* set a flag to indicate instance has changed to notify data_available since the sample is overwritten */ - inst->has_changed = true; + inst->has_changed = 1; } else { @@ -648,8 +652,7 @@ static bool add_sample } s->sample = ddsi_serdata_ref ((serdata_t) sample); /* drops const (tho refcount does change) */ - s->wr_iid = sampleinfo->pwr_info.iid; - s->rtstamp = sampleinfo->reception_timestamp; + s->wr_iid = pwr_info->iid; s->isread = false; s->disposed_gen = inst->disposed_gen; s->no_writers_gen = inst->no_writers_gen; @@ -670,16 +673,15 @@ static bool content_filter_accepts (const struct sertopic * topic, const struct return ret; } -static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct nn_rsample_info *sampleinfo) +static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct proxy_writer_info *pwr_info) { - return inst->wr_iid == sampleinfo->pwr_info.iid || - memcmp (&sampleinfo->pwr_info.guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0; + return (inst->wr_iid_islive && inst->wr_iid == pwr_info->iid) || memcmp (&pwr_info->guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0; } static int inst_accepts_sample ( const struct rhc *rhc, const struct rhc_instance *inst, - const struct nn_rsample_info *sampleinfo, + const struct proxy_writer_info *pwr_info, const struct serdata *sample, const bool has_data ) { @@ -693,7 +695,7 @@ static int inst_accepts_sample { return 0; } - else if (inst_accepts_sample_by_writer_guid (inst, sampleinfo)) + else if (inst_accepts_sample_by_writer_guid (inst, pwr_info)) { /* ok */ } @@ -702,14 +704,14 @@ static int inst_accepts_sample return 0; } } - if (rhc->exclusive_ownership && inst->wr_iid != sampleinfo->pwr_info.iid) + if (rhc->exclusive_ownership && inst->wr_iid_islive && inst->wr_iid != pwr_info->iid) { - uint32_t strength = sampleinfo->pwr_info.ownership_strength; + uint32_t strength = pwr_info->ownership_strength; if (strength > inst->strength) { /* ok */ } else if (strength < inst->strength) { return 0; - } else if (inst_accepts_sample_by_writer_guid (inst, sampleinfo)) { + } else if (inst_accepts_sample_by_writer_guid (inst, pwr_info)) { /* ok */ } else { return 0; @@ -725,14 +727,16 @@ static int inst_accepts_sample static void update_inst ( const struct rhc *rhc, struct rhc_instance *inst, - const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp) + const struct proxy_writer_info * __restrict pwr_info, bool wr_iid_valid, nn_wctime_t tstamp) { - if (inst->wr_iid != pwr_info->iid) - { - inst->wr_guid = pwr_info->guid; - } inst->tstamp = tstamp; - inst->wr_iid = (inst->wrcount == 0) ? 0 : pwr_info->iid; + inst->wr_iid_islive = wr_iid_valid; + if (wr_iid_valid) + { + inst->wr_iid = pwr_info->iid; + if (inst->wr_iid != pwr_info->iid) + inst->wr_guid = pwr_info->guid; + } inst->strength = pwr_info->ownership_strength; } @@ -752,6 +756,8 @@ static void drop_instance_noupdate_no_writers (struct rhc *rhc, struct rhc_insta static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update) { + const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0; + TRACE ((" register:")); /* Is an implicitly registering dispose semantically equivalent to @@ -762,7 +768,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 Is a dispose a sample? I don't think so (though a write dispose is). Is a pure register a sample? Don't think so either. */ - if (inst->wr_iid == wr_iid) + if (inst_wr_iid == wr_iid) { /* Same writer as last time => we know it is registered already. This is the fast path -- we don't have to check anything @@ -775,12 +781,13 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 if (inst->wrcount == 0) { /* Currently no writers at all */ - assert (inst->wr_iid == 0); + assert (!inst->wr_iid_islive); /* to avoid wr_iid update when register is called for sample rejected */ if (iid_update) { inst->wr_iid = wr_iid; + inst->wr_iid_islive = 1; } inst->wrcount++; inst->no_writers_gen++; @@ -789,7 +796,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 if (!INST_IS_EMPTY (inst) && !inst->isdisposed) rhc->n_not_alive_no_writers--; } - else if (inst->wr_iid == 0 && inst->wrcount == 1) + else if (inst_wr_iid == 0 && inst->wrcount == 1) { /* Writers exist, but wr_iid is null => someone unregistered. @@ -822,6 +829,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 if (iid_update) { inst->wr_iid = wr_iid; + inst->wr_iid_islive = 1; } } else @@ -833,7 +841,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 /* 2nd writer => properly register the one we knew about */ TRACE (("rescue1")); int x; - x = lwregs_add (&rhc->registrations, inst->iid, inst->wr_iid); + x = lwregs_add (&rhc->registrations, inst->iid, inst_wr_iid); assert (x); (void) x; } @@ -855,6 +863,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 if (iid_update) { inst->wr_iid = wr_iid; + inst->wr_iid_islive = 1; } } } @@ -863,10 +872,7 @@ static void account_for_empty_to_nonempty_transition (struct rhc *rhc, struct rh { assert (INST_NSAMPLES (inst) == 1); add_inst_to_nonempty_list (rhc, inst); - if (inst->isnew) - { - rhc->n_new++; - } + rhc->n_new += inst->isnew; if (inst->isdisposed) rhc->n_not_alive_disposed++; else if (inst->wrcount == 0) @@ -881,8 +887,9 @@ static int rhc_unregister_isreg_w_sideeffects (struct rhc *rhc, const struct rhc TRACE (("unknown(#0)")); return 0; } - else if (inst->wrcount == 1 && inst->wr_iid != 0) + else if (inst->wrcount == 1 && inst->wr_iid_islive) { + assert(inst->wr_iid != 0); if (wr_iid != inst->wr_iid) { TRACE (("unknown(cache)")); @@ -907,7 +914,7 @@ static int rhc_unregister_isreg_w_sideeffects (struct rhc *rhc, const struct rhc afterward there will be 1 writer, it will be cached, and its registration record must go (invariant that with wrcount = 1 and wr_iid != 0 the wr_iid is not in "registrations") */ - if (inst->wrcount == 2 && inst->wr_iid != wr_iid) + if (inst->wrcount == 2 && inst->wr_iid_islive && inst->wr_iid != wr_iid) { TRACE ((",delreg(remain)")); lwregs_delete (&rhc->registrations, inst->iid, inst->wr_iid); @@ -923,56 +930,62 @@ static int rhc_unregister_updateinst { assert (inst->wrcount > 0); - if (pwr_info->iid == inst->wr_iid) - { - /* Next register will have to do real work before we have a cached - wr_iid again */ - inst->wr_iid = 0; - - /* Reset the ownership strength to allow samples to be read from other - writer(s) */ - inst->strength = 0; - TRACE ((",clearcache")); - } - if (--inst->wrcount > 0) - return 0; - else if (!INST_IS_EMPTY (inst)) { - /* Instance still has content - do not drop until application + if (inst->wr_iid_islive && pwr_info->iid == inst->wr_iid) + { + /* Next register will have to do real work before we have a cached + wr_iid again */ + inst->wr_iid_islive = 0; + + /* Reset the ownership strength to allow samples to be read from other + writer(s) */ + inst->strength = 0; + TRACE ((",clearcache")); + } + return 0; + } + else + { + if (!INST_IS_EMPTY (inst)) + { + /* Instance still has content - do not drop until application takes the last sample. Set the invalid sample if the latest sample has been read already, so that the application can read the change to not-alive. (If the latest sample is still unread, we don't bother, even though it means the application won't see the timestamp for the unregister event. It shouldn't care.) */ - if (inst->latest == NULL /*|| inst->latest->isread*/) + if (inst->latest == NULL || inst->latest->isread) + { + inst_set_invsample (rhc, inst); + update_inst (rhc, inst, pwr_info, false, tstamp); + } + if (!inst->isdisposed) + { + rhc->n_not_alive_no_writers++; + } + inst->wr_iid_islive = 0; + return 0; + } + else if (inst->isdisposed) { + /* No content left, no registrations left, so drop */ + TRACE ((",#0,empty,disposed,drop")); + drop_instance_noupdate_no_writers (rhc, inst); + return 1; + } + else + { + /* Add invalid samples for transition to no-writers */ + TRACE ((",#0,empty,nowriters")); + assert (INST_IS_EMPTY (inst)); inst_set_invsample (rhc, inst); - update_inst (rhc, inst, pwr_info, tstamp); + update_inst (rhc, inst, pwr_info, false, tstamp); + account_for_empty_to_nonempty_transition (rhc, inst); + inst->wr_iid_islive = 0; + return 0; } - if (!inst->isdisposed) - { - rhc->n_not_alive_no_writers++; - } - return 0; - } - else if (inst->isdisposed) - { - /* No content left, no registrations left, so drop */ - TRACE ((",#0,empty,disposed,drop")); - drop_instance_noupdate_no_writers (rhc, inst); - return 1; - } - else - { - /* Add invalid samples for transition to no-writers */ - TRACE ((",#0,empty,nowriters")); - assert (INST_IS_EMPTY (inst)); - inst_set_invsample (rhc, inst); - update_inst (rhc, inst, pwr_info, tstamp); - account_for_empty_to_nonempty_transition (rhc, inst); - return 0; } } @@ -1004,7 +1017,7 @@ static void dds_rhc_unregister static struct rhc_instance * alloc_new_instance ( const struct rhc *rhc, - const struct nn_rsample_info *sampleinfo, + const struct proxy_writer_info *pwr_info, struct serdata *serdata, struct tkmap_instance *tk ) @@ -1017,11 +1030,15 @@ static struct rhc_instance * alloc_new_instance inst->tk = tk; inst->wrcount = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; inst->isdisposed = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_DISPOSE); - inst->isnew = true; + inst->isnew = 1; inst->inv_exists = 0; inst->inv_isread = 0; /* don't care */ - inst->a_sample_free = true; - update_inst (rhc, inst, &sampleinfo->pwr_info, serdata->v.msginfo.timestamp); + inst->a_sample_free = 1; + inst->wr_iid = pwr_info->iid; + inst->wr_iid_islive = (inst->wrcount != 0); + inst->wr_guid = pwr_info->guid; + inst->tstamp = serdata->v.msginfo.timestamp; + inst->strength = pwr_info->ownership_strength; return inst; } @@ -1029,7 +1046,7 @@ static rhc_store_result_t rhc_store_new_instance ( struct trigger_info * post, struct rhc *rhc, - const struct nn_rsample_info *sampleinfo, + const struct proxy_writer_info *pwr_info, struct serdata *sample, struct tkmap_instance *tk, const bool has_data, @@ -1069,10 +1086,10 @@ static rhc_store_result_t rhc_store_new_instance return RHC_REJECTED; } - inst = alloc_new_instance (rhc, sampleinfo, sample, tk); + inst = alloc_new_instance (rhc, pwr_info, sample, tk); if (has_data) { - if (!add_sample (rhc, inst, sampleinfo, sample, cb_data)) + if (!add_sample (rhc, inst, pwr_info, sample, cb_data)) { free_instance (inst, rhc); return RHC_REJECTED; @@ -1102,11 +1119,11 @@ static rhc_store_result_t rhc_store_new_instance bool dds_rhc_store ( - struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo, + struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct serdata * __restrict sample, struct tkmap_instance * __restrict tk ) { - const uint64_t wr_iid = sampleinfo->pwr_info.iid; + const uint64_t wr_iid = pwr_info->iid; const unsigned statusinfo = sample->v.msginfo.statusinfo; const bool has_data = (sample->v.st->kind == STK_DATA); const int is_dispose = (statusinfo & NN_STATUSINFO_DISPOSE) != 0; @@ -1150,7 +1167,7 @@ bool dds_rhc_store else { TRACE ((" new instance")); - stored = rhc_store_new_instance (&post, rhc, sampleinfo, sample, tk, has_data, &cb_data); + stored = rhc_store_new_instance (&post, rhc, pwr_info, sample, tk, has_data, &cb_data); if (stored != RHC_STORED) { goto error_or_nochange; @@ -1158,7 +1175,7 @@ bool dds_rhc_store init_trigger_info_nonmatch (&pre); } } - else if (!inst_accepts_sample (rhc, inst, sampleinfo, sample, has_data)) + else if (!inst_accepts_sample (rhc, inst, pwr_info, sample, has_data)) { /* Rejected samples (and disposes) should still register the writer; unregister *must* be processed, or we have a memory leak. (We @@ -1175,7 +1192,7 @@ bool dds_rhc_store } if (statusinfo & NN_STATUSINFO_UNREGISTER) { - dds_rhc_unregister (&post, rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp); + dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->v.msginfo.timestamp); } else { @@ -1195,6 +1212,8 @@ bool dds_rhc_store { get_trigger_info (&pre, inst, true); + TRACE ((" wc %"PRIu32, inst->wrcount)); + if (has_data || is_dispose) { /* View state must be NEW following receipt of a sample when @@ -1222,7 +1241,7 @@ bool dds_rhc_store if (has_data && not_alive) { TRACE ((" notalive->alive")); - inst->isnew = true; + inst->isnew = 1; } /* Desired effect on instance state and disposed_gen: @@ -1235,12 +1254,12 @@ bool dds_rhc_store if (has_data && inst->isdisposed) { TRACE ((" disposed->notdisposed")); - inst->isdisposed = false; + inst->isdisposed = 0; inst->disposed_gen++; } if (is_dispose) { - inst->isdisposed = true; + inst->isdisposed = 1; inst_became_disposed = !old_isdisposed; TRACE ((" dispose(%d)", inst_became_disposed)); } @@ -1250,7 +1269,7 @@ bool dds_rhc_store if (has_data) { TRACE ((" add_sample")); - if (! add_sample (rhc, inst, sampleinfo, sample, &cb_data)) + if (! add_sample (rhc, inst, pwr_info, sample, &cb_data)) { TRACE (("(reject)")); stored = RHC_REJECTED; @@ -1262,7 +1281,7 @@ bool dds_rhc_store if (inst_became_disposed && (inst->latest == NULL )) inst_set_invsample (rhc, inst); - update_inst (rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp); + update_inst (rhc, inst, pwr_info, true, sample->v.msginfo.timestamp); /* Can only add samples => only need to give special treatment to instances that were empty before. It is, however, not @@ -1280,7 +1299,7 @@ bool dds_rhc_store else { rhc->n_not_alive_disposed += inst->isdisposed - old_isdisposed; - rhc->n_new += (inst->isnew ? 1 : 0) - (old_isnew ? 1 : 0); + rhc->n_new += inst->isnew - old_isnew; } } else @@ -1306,7 +1325,7 @@ bool dds_rhc_store mean an application reading "x" after the write and reading it again after the unregister will see a change in the no_writers_generation field? */ - dds_rhc_unregister (&post, rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp); + dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->v.msginfo.timestamp); } else { @@ -1390,7 +1409,7 @@ void dds_rhc_unregister_wr TRACE (("rhc_unregister_wr_iid(%"PRIx64",%d:\n", wr_iid, auto_dispose)); for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter)) { - if (inst->wr_iid == wr_iid || lwregs_contains (&rhc->registrations, inst->iid, wr_iid)) + if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid)) { struct trigger_info pre, post; get_trigger_info (&pre, inst, true); @@ -1400,7 +1419,7 @@ void dds_rhc_unregister_wr assert (inst->wrcount > 0); if (auto_dispose && !inst->isdisposed) { - inst->isdisposed = true; + inst->isdisposed = 1; /* Set invalid sample for disposing it (unregister may also set it for unregistering) */ if (inst->latest) @@ -1412,7 +1431,6 @@ void dds_rhc_unregister_wr { const bool was_empty = INST_IS_EMPTY (inst); inst_set_invsample (rhc, inst); - update_inst (rhc, inst, pwr_info, inst->tstamp); if (was_empty) account_for_empty_to_nonempty_transition (rhc, inst); else @@ -1452,9 +1470,9 @@ void dds_rhc_relinquish_ownership (struct rhc * __restrict rhc, const uint64_t w TRACE (("rhc_relinquish_ownership(%"PRIx64":\n", wr_iid)); for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter)) { - if (inst->wr_iid == wr_iid) + if (inst->wr_iid_islive && inst->wr_iid == wr_iid) { - inst->wr_iid = 0; + inst->wr_iid_islive = 0; } } TRACE ((")\n")); @@ -1564,7 +1582,6 @@ static void set_sample_info (dds_sample_info_t *si, const struct rhc_instance *i si->absolute_generation_rank = (inst->disposed_gen + inst->no_writers_gen) - (sample->disposed_gen + sample->no_writers_gen); si->valid_data = true; si->source_timestamp = sample->sample->v.msginfo.timestamp.v; - si->reception_timestamp = sample->rtstamp.v; } static void set_sample_info_invsample (dds_sample_info_t *si, const struct rhc_instance *inst) @@ -1581,9 +1598,6 @@ static void set_sample_info_invsample (dds_sample_info_t *si, const struct rhc_i si->absolute_generation_rank = 0; si->valid_data = false; si->source_timestamp = inst->tstamp.v; - - /* Not storing the underlying "sample" so the reception time is lost */ - si->reception_timestamp = 0; } static void patch_generations (dds_sample_info_t *si, uint32_t last_of_inst) @@ -1692,7 +1706,7 @@ static int dds_rhc_read_w_qminv if (n > n_first && inst->isnew) { - inst->isnew = false; + inst->isnew = 0; rhc->n_new--; } if (nread != INST_NREAD (inst)) @@ -1834,7 +1848,7 @@ static int dds_rhc_take_w_qminv if (n > n_first && inst->isnew) { - inst->isnew = false; + inst->isnew = 0; rhc->n_new--; } @@ -1983,7 +1997,7 @@ static int dds_rhc_takecdr_w_qminv if (n > n_first && inst->isnew) { - inst->isnew = false; + inst->isnew = 0; rhc->n_new--; } diff --git a/src/core/ddsc/src/dds_stream.c b/src/core/ddsc/src/dds_stream.c index bbe749b..1d89f85 100644 --- a/src/core/ddsc/src/dds_stream.c +++ b/src/core/ddsc/src/dds_stream.c @@ -1576,6 +1576,14 @@ static uint32_t dds_stream_get_keyhash return (uint32_t) (dst - origin); } +#ifndef NDEBUG +static bool keyhash_is_reset(const dds_key_hash_t *kh) +{ + static const char nullhash[sizeof(kh->m_hash)]; + return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0; +} +#endif + void dds_stream_read_keyhash ( dds_stream_t * is, @@ -1586,14 +1594,19 @@ void dds_stream_read_keyhash { char * dst; - assert (desc->m_keys); + assert (keyhash_is_reset(kh)); + + if (desc->m_nkeys == 0) + { + kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH; + return; + } /* Select key buffer to use */ kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; if (desc->m_flagset & DDS_TOPIC_FIXED_KEY) { - memset (kh->m_hash, 0, 16); kh->m_flags |= DDS_KEY_IS_HASH; dst = kh->m_hash; } @@ -1616,7 +1629,6 @@ void dds_stream_read_keyhash else { /* Hash is md5 of key */ - dds_key_md5 (kh); } } diff --git a/src/core/ddsc/src/dds_tkmap.c b/src/core/ddsc/src/dds_tkmap.c index 12d92c3..a11165b 100644 --- a/src/core/ddsc/src/dds_tkmap.c +++ b/src/core/ddsc/src/dds_tkmap.c @@ -243,7 +243,6 @@ struct tkmap_instance * dds_tkmap_find_by_id (_In_ struct tkmap * map, _In_ uint _Check_return_ struct tkmap_instance * dds_tkmap_find( - _In_opt_ const struct dds_topic * topic, _In_ struct serdata * sd, _In_ const bool rd, _In_ const bool create) @@ -252,46 +251,9 @@ struct tkmap_instance * dds_tkmap_find( struct tkmap_instance * tk; struct tkmap * map = gv.m_tkmap; + assert(sd->v.keyhash.m_flags & DDS_KEY_HASH_SET); dummy.m_sample = sd; - /* Generate key hash if required and not provided */ - - if (topic && topic->m_descriptor->m_nkeys) - { - if ((sd->v.keyhash.m_flags & DDS_KEY_HASH_SET) == 0) - { - dds_stream_t is; - dds_stream_from_serstate (&is, sd->v.st); - dds_stream_read_keyhash (&is, &sd->v.keyhash, topic->m_descriptor, sd->v.st->kind == STK_KEY); - } - else - { - if (topic->m_descriptor->m_flagset & DDS_TOPIC_FIXED_KEY) - { - sd->v.keyhash.m_flags |= DDS_KEY_IS_HASH; - } - -#if DDS_DEBUG_KEYHASH - - { - dds_stream_t is; - dds_key_hash_t kh; - - /* Check that we generate same keyhash as provided */ - - memset (&kh, 0, sizeof (kh)); - dds_stream_from_serstate (&is, sd->v.st); - dds_stream_read_keyhash (&is, &kh, topic->m_descriptor, sd->v.st->kind == STK_KEY); - assert (memcmp (kh.m_hash, sd->v.keyhash.m_hash, 16) == 0); - if (kh.m_key_buff_size) - { - dds_free (kh.m_key_buff); - } - } -#endif - } - } - retry: if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL) { @@ -340,19 +302,15 @@ retry: _Check_return_ struct tkmap_instance * dds_tkmap_lookup_instance_ref (_In_ struct serdata * sd) { - dds_topic * topic = sd->v.st->topic ? sd->v.st->topic->status_cb_entity : NULL; - assert (vtime_awake_p (lookup_thread_state ()->vtime)); - #if 0 /* Topic might have been deleted -- FIXME: no way the topic may be deleted when there're still users out there */ - if (topic == NULL) + if (sd->v.st->topic == NULL) { return NULL; } #endif - - return dds_tkmap_find (topic, sd, true, true); + return dds_tkmap_find (sd, true, true); } void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk) diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index c9a1bc0..54a6b95 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -16,6 +16,7 @@ #include "ddsi/q_error.h" #include "ddsi/q_thread.h" #include "q__osplser.h" +#include "dds__stream.h" #include "dds__err.h" #include "ddsi/q_transmit.h" #include "ddsi/q_ephash.h" @@ -26,14 +27,6 @@ #include -#if OS_ATOMIC64_SUPPORT -typedef os_atomic_uint64_t fake_seq_t; -uint64_t fake_seq_next (fake_seq_t *x) { return os_atomic_inc64_nv (x); } -#else /* HACK */ -typedef os_atomic_uint32_t fake_seq_t; -uint64_t fake_seq_next (fake_seq_t *x) { return os_atomic_inc32_nv (x); } -#endif - _Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) dds_return_t dds_write( @@ -118,30 +111,9 @@ err: return ret; } -static void -init_sampleinfo( - _Out_ struct nn_rsample_info *sampleinfo, - _In_ struct writer *wr, - _In_ int64_t seq, - _In_ serdata_t payload) -{ - memset(sampleinfo, 0, sizeof(*sampleinfo)); - sampleinfo->bswap = 0; - sampleinfo->complex_qos = 0; - sampleinfo->hashash = 0; - sampleinfo->seq = seq; - sampleinfo->reception_timestamp = payload->v.msginfo.timestamp; - sampleinfo->statusinfo = payload->v.msginfo.statusinfo; - sampleinfo->pwr_info.iid = 1; - sampleinfo->pwr_info.auto_dispose = 0; - sampleinfo->pwr_info.guid = wr->e.guid; - sampleinfo->pwr_info.ownership_strength = 0; -} - static int deliver_locally( _In_ struct writer *wr, - _In_ int64_t seq, _In_ serdata_t payload, _In_ struct tkmap_instance *tk) { @@ -150,15 +122,15 @@ deliver_locally( if (wr->rdary.fastpath_ok) { struct reader ** const rdary = wr->rdary.rdary; if (rdary[0]) { - struct nn_rsample_info sampleinfo; + struct proxy_writer_info pwr_info; unsigned i; - init_sampleinfo(&sampleinfo, wr, seq, payload); + make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); for (i = 0; rdary[i]; i++) { bool stored; TRACE (("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid))); dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC; do { - stored = (ddsi_plugin.rhc_store_fn) (rdary[i]->rhc, &sampleinfo, payload, tk); + stored = (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk); if (!stored) { if (max_block_ms <= 0) { ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT, "The writer could not deliver data on time, probably due to a local reader resources being full."); @@ -185,16 +157,16 @@ deliver_locally( reliable samples that are rejected are simply discarded. */ ut_avlIter_t it; struct pwr_rd_match *m; - struct nn_rsample_info sampleinfo; + struct proxy_writer_info pwr_info; os_mutexUnlock (&wr->rdary.rdary_lock); - init_sampleinfo(&sampleinfo, wr, seq, payload); + make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); os_mutexLock (&wr->e.lock); for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) { struct reader *rd; if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) { TRACE (("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid))); /* Copied the return value ignore from DDSI deliver_user_data() function. */ - (void)(ddsi_plugin.rhc_store_fn) (rd->rhc, &sampleinfo, payload, tk); + (void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); } } os_mutexUnlock (&wr->e.lock); @@ -209,7 +181,6 @@ dds_write_impl( _In_ dds_time_t tstamp, _In_ dds_write_action action) { - static fake_seq_t fake_seq; dds_return_t ret = DDS_RETCODE_OK; int w_rc; @@ -252,7 +223,7 @@ dds_write_impl( d->v.msginfo.timestamp.v = tstamp; os_mutexLock (&writer->m_call_lock); ddsi_serdata_ref(d); - tk = (ddsi_plugin.rhc_lookup_fn) (d); + tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (d); w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk); if (w_rc >= 0) { @@ -271,10 +242,10 @@ dds_write_impl( os_mutexUnlock (&writer->m_call_lock); if (ret == DDS_RETCODE_OK) { - ret = deliver_locally (ddsi_wr, fake_seq_next(&fake_seq), d, tk); + ret = deliver_locally (ddsi_wr, d, tk); } ddsi_serdata_unref(d); - (ddsi_plugin.rhc_unref_fn) (tk); + (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk); if (asleep) { thread_state_asleep (thr); @@ -292,7 +263,6 @@ dds_writecdr_impl( _In_ dds_time_t tstamp, _In_ dds_write_action action) { - static fake_seq_t fake_seq; int ret = DDS_RETCODE_OK; int w_rc; @@ -316,17 +286,16 @@ dds_writecdr_impl( } /* Serialize and write data or key */ - if (writekey) { - abort(); - //d = serialize_key (gv.serpool, ddsi_wr->topic, data); - } else { - serstate_t st = ddsi_serstate_new (gv.serpool, ddsi_wr->topic); - if (ddsi_wr->topic->nkeys) { - abort(); - //dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample); - } - ddsi_serstate_append_blob(st, 1, sz, cdr); - d = ddsi_serstate_fix(st); + { + serstate_t st = ddsi_serstate_new (gv.serpool, ddsi_wr->topic); + dds_stream_t is; + ddsi_serstate_append_blob(st, 1, sz, cdr); + d = ddsi_serstate_fix(st); + assert(d->v.keyhash.m_flags == 0); + assert(!d->v.bswap); + dds_stream_from_serstate (&is, d->v.st); + d->v.st->kind = writekey ? STK_KEY : STK_DATA; + dds_stream_read_keyhash (&is, &d->v.keyhash, ddsi_wr->topic->status_cb_entity->m_descriptor, d->v.st->kind == STK_KEY); } /* Set if disposing or unregistering */ @@ -335,7 +304,7 @@ dds_writecdr_impl( d->v.msginfo.timestamp.v = tstamp; os_mutexLock (&wr->m_call_lock); ddsi_serdata_ref(d); - tk = (ddsi_plugin.rhc_lookup_fn) (d); + tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (d); w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk); if (w_rc >= 0) { @@ -354,10 +323,10 @@ dds_writecdr_impl( os_mutexUnlock (&wr->m_call_lock); if (ret == DDS_RETCODE_OK) { - ret = deliver_locally (ddsi_wr, fake_seq_next(&fake_seq), d, tk); + ret = deliver_locally (ddsi_wr, d, tk); } ddsi_serdata_unref(d); - (ddsi_plugin.rhc_unref_fn) (tk); + (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk); if (asleep) { thread_state_asleep (thr); diff --git a/src/core/ddsc/src/q_osplser.c b/src/core/ddsc/src/q_osplser.c index e0ed6e7..5ee8501 100644 --- a/src/core/ddsc/src/q_osplser.c +++ b/src/core/ddsc/src/q_osplser.c @@ -22,11 +22,7 @@ serdata_t serialize (serstatepool_t pool, const struct sertopic * tp, const void { dds_stream_t os; serstate_t st = ddsi_serstate_new (pool, tp); - - if (tp->nkeys) - { - dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample); - } + dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample); dds_stream_from_serstate (&os, st); dds_stream_write_sample (&os, sample, tp); dds_stream_add_to_serstate (&os, st); @@ -65,21 +61,14 @@ int serdata_cmp (const struct serdata *a, const struct serdata *b) serdata_t serialize_key (serstatepool_t pool, const struct sertopic * tp, const void * sample) { serdata_t sd; - if (tp->nkeys) - { - dds_stream_t os; - dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type; - serstate_t st = ddsi_serstate_new (pool, tp); - dds_key_gen (desc, &st->data->v.keyhash, (char*) sample); - dds_stream_from_serstate (&os, st); - dds_stream_write_key (&os, sample, desc); - dds_stream_add_to_serstate (&os, st); - sd = st->data; - } - else - { - sd = serialize (pool, tp, sample); - } + dds_stream_t os; + dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type; + serstate_t st = ddsi_serstate_new (pool, tp); + dds_key_gen (desc, &st->data->v.keyhash, (char*) sample); + dds_stream_from_serstate (&os, st); + dds_stream_write_key (&os, sample, desc); + dds_stream_add_to_serstate (&os, st); + sd = st->data; sd->v.st->kind = STK_KEY; return sd; } diff --git a/src/core/ddsc/tests/reader_iterator.c b/src/core/ddsc/tests/reader_iterator.c index 7058ece..e1f9d4f 100644 --- a/src/core/ddsc/tests/reader_iterator.c +++ b/src/core/ddsc/tests/reader_iterator.c @@ -63,6 +63,7 @@ #define MAX_SAMPLES 21 #define RDR_NOT_READ_CNT 11 +#define RDR_INV_READ_CNT 1 int rdr_expected_long_2[RDR_NOT_READ_CNT] = { 0, 1, 2, 6, 7, 9, 11, 13, 14, 16, 19 }; /* Because we only read one sample at a time, only the first sample of an instance @@ -344,13 +345,13 @@ samples_cnt(void) /*************************************************************************************************/ Test(ddsc_read_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) { - dds_return_t cnt = 0; + dds_return_t cnt = 0, cntinv = 0; dds_return_t ret = 1; while (ret == 1){ ret = dds_read_next(g_reader, g_samples, g_info); cr_assert_geq(ret, 0 , "# read %d", ret); - if(ret == 1){ + if(ret == 1 && g_info[0].valid_data){ Space_Type1 *sample = (Space_Type1*)g_samples[0]; PRINT_SAMPLE("ddsc_read_next::reader: Read", (*sample)); @@ -373,10 +374,13 @@ Test(ddsc_read_next, reader, .init=reader_iterator_init, .fini=reader_iterator_f cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].instance_state, expected_ist); cnt ++; + } else if (ret == 1 && !g_info[0].valid_data) { + cntinv ++; } } cr_assert_eq(cnt, RDR_NOT_READ_CNT); + cr_assert_eq(cntinv, RDR_INV_READ_CNT); /* All samples should still be available. */ ret = samples_cnt(); @@ -453,13 +457,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_read_next, invalid_buffers, .in /*************************************************************************************************/ Test(ddsc_read_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) { - dds_return_t cnt = 0; + dds_return_t cnt = 0, cntinv = 0; dds_return_t ret = 1; while (ret == 1){ ret = dds_read_next_wl(g_reader, g_loans, g_info); cr_assert_geq(ret, 0 , "# read %d", ret); - if(ret == 1){ + if(ret == 1 && g_info[0].valid_data){ Space_Type1 *sample = (Space_Type1*)g_loans[0]; PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample)); @@ -482,10 +486,13 @@ Test(ddsc_read_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterato cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].instance_state, expected_ist); cnt ++; + } else if (ret == 1 && !g_info[0].valid_data) { + cntinv ++; } } cr_assert_eq(cnt, RDR_NOT_READ_CNT); + cr_assert_eq(cntinv, RDR_INV_READ_CNT); ret = dds_return_loan(g_reader, g_loans, ret); cr_assert_eq (ret, DDS_RETCODE_OK); @@ -564,13 +571,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_read_next_wl, invalid_buffers, /*************************************************************************************************/ Test(ddsc_take_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) { - dds_return_t cnt = 0; + dds_return_t cnt = 0, cntinv = 0; dds_return_t ret = 1; while (ret == 1){ ret = dds_take_next(g_reader, g_samples, g_info); cr_assert_geq(ret, 0 , "# read %d", ret); - if(ret == 1){ + if(ret == 1 && g_info[0].valid_data){ Space_Type1 *sample = (Space_Type1*)g_samples[0]; PRINT_SAMPLE("ddsc_take_next::reader: Read", (*sample)); @@ -593,10 +600,13 @@ Test(ddsc_take_next, reader, .init=reader_iterator_init, .fini=reader_iterator_f cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].instance_state, expected_ist); cnt ++; + } else if (ret == 1 && !g_info[0].valid_data) { + cntinv ++; } } cr_assert_eq(cnt, RDR_NOT_READ_CNT); + cr_assert_eq(cntinv, RDR_INV_READ_CNT); /* All samples should still be available. */ ret = samples_cnt(); @@ -671,13 +681,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_take_next, invalid_buffers, .in /*************************************************************************************************/ Test(ddsc_take_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) { - dds_return_t cnt = 0; + dds_return_t cnt = 0, cntinv = 0; dds_return_t ret = 1; while (ret == 1){ ret = dds_take_next_wl(g_reader, g_loans, g_info); cr_assert_geq(ret, 0 , "# read %d", ret); - if(ret == 1){ + if(ret == 1 && g_info[0].valid_data){ Space_Type1 *sample = (Space_Type1*)g_loans[0]; PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample)); @@ -700,10 +710,13 @@ Test(ddsc_take_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterato cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].instance_state, expected_ist); cnt ++; + } else if (ret == 1 && !g_info[0].valid_data) { + cntinv ++; } } cr_assert_eq(cnt, RDR_NOT_READ_CNT); + cr_assert_eq(cntinv, RDR_INV_READ_CNT); ret = dds_return_loan(g_reader, g_loans, ret); cr_assert_eq (ret, DDS_RETCODE_OK); diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index 5f7c1a8..e284eb0 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -18,6 +18,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" ddsi_raweth.c ddsi_ipaddr.c ddsi_mcgroup.c + ddsi_rhc_plugin.c q_addrset.c q_bitset_inlines.c q_bswap.c @@ -67,6 +68,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi" ddsi_raweth.h ddsi_ipaddr.h ddsi_mcgroup.h + ddsi_rhc_plugin.h probes-constants.h q_addrset.h q_align.h diff --git a/src/core/ddsi/include/ddsi/ddsi_rhc_plugin.h b/src/core/ddsi/include/ddsi/ddsi_rhc_plugin.h new file mode 100644 index 0000000..317d382 --- /dev/null +++ b/src/core/ddsi/include/ddsi/ddsi_rhc_plugin.h @@ -0,0 +1,48 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDSI_RHC_PLUGIN_H +#define DDSI_RHC_PLUGIN_H + +struct rhc; +struct nn_xqos; +struct tkmap_instance; +struct serdata; +struct sertopic; +struct entity_common; + +struct proxy_writer_info +{ + nn_guid_t guid; + bool auto_dispose; + int32_t ownership_strength; + uint64_t iid; +}; + +struct ddsi_rhc_plugin +{ + void (*rhc_free_fn) (struct rhc *rhc); + void (*rhc_fini_fn) (struct rhc *rhc); + bool (*rhc_store_fn) + (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, + struct serdata * __restrict sample, struct tkmap_instance * __restrict tk); + void (*rhc_unregister_wr_fn) + (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info); + void (*rhc_relinquish_ownership_fn) + (struct rhc * __restrict rhc, const uint64_t wr_iid); + void (*rhc_set_qos_fn) (struct rhc * rhc, const struct nn_xqos * qos); + struct tkmap_instance * (*rhc_lookup_fn) (struct serdata *serdata); + void (*rhc_unref_fn) (struct tkmap_instance *tk); +}; + +void make_proxy_writer_info(struct proxy_writer_info *pwr_info, const struct entity_common *e, const struct nn_xqos *xqos); + +#endif diff --git a/src/core/ddsi/include/ddsi/ddsi_ser.h b/src/core/ddsi/include/ddsi/ddsi_ser.h index 170b39e..d8f4506 100644 --- a/src/core/ddsi/include/ddsi/ddsi_ser.h +++ b/src/core/ddsi/include/ddsi/ddsi_ser.h @@ -100,7 +100,7 @@ struct serdata_base serstate_t st; /* back pointer to (opaque) serstate so RTPS impl only needs serdata */ struct serdata_msginfo msginfo; int hash_valid; /* whether hash is valid or must be computed from key/data */ - uint32_t hash; /* cached serdata hash, valid only if hash_valid != 0 */ + uint32_t hash; /* cached serdata hash, valid only if hash_valid != 0 */ dds_key_hash_t keyhash; bool bswap; /* Whether state is native endian or requires swapping */ }; diff --git a/src/core/ddsi/include/ddsi/q_config.h b/src/core/ddsi/include/ddsi/q_config.h index abfb6bc..b23e812 100644 --- a/src/core/ddsi/include/ddsi/q_config.h +++ b/src/core/ddsi/include/ddsi/q_config.h @@ -22,6 +22,7 @@ #include "ddsi/q_xqos.h" #include "ddsi/ddsi_tran.h" #include "ddsi/q_feature_check.h" +#include "ddsi/ddsi_rhc_plugin.h" #if defined (__cplusplus) extern "C" { @@ -404,35 +405,13 @@ struct config q__schedPrioClass watchdog_sched_priority_class; }; -struct rhc; -struct nn_xqos; -struct tkmap_instance; -struct nn_rsample_info; -struct serdata; -struct sertopic; -struct proxy_writer; -struct proxy_writer_info; - struct ddsi_plugin { int (*init_fn) (void); void (*fini_fn) (void); - /* Read cache */ - - void (*rhc_free_fn) (struct rhc *rhc); - void (*rhc_fini_fn) (struct rhc *rhc); - bool (*rhc_store_fn) - (struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo, - struct serdata * __restrict sample, struct tkmap_instance * __restrict tk); - void (*rhc_unregister_wr_fn) - (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info); - void (*rhc_relinquish_ownership_fn) - (struct rhc * __restrict rhc, const uint64_t wr_iid); - void (*rhc_set_qos_fn) (struct rhc * rhc, const struct nn_xqos * qos); - struct tkmap_instance * (*rhc_lookup_fn) (struct serdata *serdata); - void (*rhc_unref_fn) (struct tkmap_instance *tk); + struct ddsi_rhc_plugin rhc_plugin; /* IID generator */ diff --git a/src/core/ddsi/include/ddsi/q_radmin.h b/src/core/ddsi/include/ddsi/q_radmin.h index 5ee2dc4..8be7b21 100644 --- a/src/core/ddsi/include/ddsi/q_radmin.h +++ b/src/core/ddsi/include/ddsi/q_radmin.h @@ -109,14 +109,6 @@ struct receiver_state { nn_locator_t srcloc; }; -struct proxy_writer_info -{ - nn_guid_t guid; - bool auto_dispose; - int32_t ownership_strength; - uint64_t iid; -}; - struct nn_rsample_info { seqno_t seq; struct receiver_state *rst; @@ -129,9 +121,6 @@ struct nn_rsample_info { unsigned pt_wr_info_zoff: 16; /* PrismTech writer info offset */ unsigned bswap: 1; /* so we can extract well formatted writer info quicker */ unsigned complex_qos: 1; /* includes QoS other than keyhash, 2-bit statusinfo, PT writer info */ - unsigned hashash: 1; /* Do we have a key hash */ - nn_keyhash_t keyhash; /* Key hash. Not currently used by OSPL */ - struct proxy_writer_info pwr_info; }; struct nn_rdata { diff --git a/src/core/ddsi/src/ddsi_rhc_plugin.c b/src/core/ddsi/src/ddsi_rhc_plugin.c new file mode 100644 index 0000000..e5677c2 --- /dev/null +++ b/src/core/ddsi/src/ddsi_rhc_plugin.c @@ -0,0 +1,22 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include "ddsi/q_entity.h" +#include "ddsi/q_xqos.h" +#include "ddsi/ddsi_rhc_plugin.h" + +void make_proxy_writer_info(struct proxy_writer_info *pwr_info, const struct entity_common *e, const struct nn_xqos *xqos) +{ + pwr_info->guid = e->guid; + pwr_info->ownership_strength = xqos->ownership_strength.value; + pwr_info->auto_dispose = xqos->writer_data_lifecycle.autodispose_unregistered_instances; + pwr_info->iid = e->iid; +} diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index e56ede9..c63ad23 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -1379,12 +1379,8 @@ static void reader_drop_connection (const struct nn_guid *rd_guid, const struct if (rd->rhc) { struct proxy_writer_info pwr_info; - pwr_info.guid = pwr->e.guid; - pwr_info.ownership_strength = pwr->c.xqos->ownership_strength.value; - pwr_info.auto_dispose = pwr->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances; - pwr_info.iid = pwr->e.iid; - - (ddsi_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info); + make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos); + (ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info); } if (rd->status_cb) { @@ -1418,12 +1414,8 @@ static void reader_drop_local_connection (const struct nn_guid *rd_guid, const s { /* FIXME: */ struct proxy_writer_info pwr_info; - pwr_info.guid = wr->e.guid; - pwr_info.ownership_strength = wr->xqos->ownership_strength.value; - pwr_info.auto_dispose = wr->xqos->writer_data_lifecycle.autodispose_unregistered_instances; - pwr_info.iid = wr->e.iid; - - (ddsi_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info); + make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); + (ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info); } if (rd->status_cb) { @@ -1605,26 +1597,6 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd) } } -static void -init_sampleinfo( - _Out_ struct nn_rsample_info *sampleinfo, - _In_ struct writer *wr, - _In_ int64_t seq, - _In_ serdata_t payload) -{ - memset(sampleinfo, 0, sizeof(*sampleinfo)); - sampleinfo->bswap = 0; - sampleinfo->complex_qos = 0; - sampleinfo->hashash = 0; - sampleinfo->seq = seq; - sampleinfo->reception_timestamp = payload->v.msginfo.timestamp; - sampleinfo->statusinfo = payload->v.msginfo.statusinfo; - sampleinfo->pwr_info.iid = 1; - sampleinfo->pwr_info.auto_dispose = 0; - sampleinfo->pwr_info.guid = wr->e.guid; - sampleinfo->pwr_info.ownership_strength = 0; -} - static void writer_add_local_connection (struct writer *wr, struct reader *rd) { struct wr_rd_match *m = os_malloc (sizeof (*m)); @@ -1652,11 +1624,11 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd) struct whc_node *n; while ((n = whc_next_node(wr->whc, seq)) != NULL) { - struct nn_rsample_info sampleinfo; + struct proxy_writer_info pwr_info; serdata_t payload = n->serdata; - struct tkmap_instance *tk = (ddsi_plugin.rhc_lookup_fn) (payload); - init_sampleinfo(&sampleinfo, wr, n->seq, payload); - (void)(ddsi_plugin.rhc_store_fn) (rd->rhc, &sampleinfo, payload, tk); + struct tkmap_instance *tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (payload); + make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); + (void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); seq = n->seq; } } @@ -3272,7 +3244,7 @@ static struct reader * new_reader_guid /* set rhc qos for reader */ if (rhc) { - (ddsi_plugin.rhc_set_qos_fn) (rd->rhc, rd->xqos); + (ddsi_plugin.rhc_plugin.rhc_set_qos_fn) (rd->rhc, rd->xqos); } assert (rd->xqos->present & QP_LIVELINESS); if (rd->xqos->liveliness.kind != NN_AUTOMATIC_LIVELINESS_QOS || @@ -3402,7 +3374,7 @@ static void gc_delete_reader (struct gcreq *gcreq) #endif if (rd->rhc) { - (ddsi_plugin.rhc_free_fn) (rd->rhc); + (ddsi_plugin.rhc_plugin.rhc_free_fn) (rd->rhc); } if (rd->status_cb) { @@ -3431,7 +3403,7 @@ int delete_reader (const struct nn_guid *guid) } if (rd->rhc) { - (ddsi_plugin.rhc_fini_fn) (rd->rhc); + (ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc); } nn_log (LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid)); ephash_remove_reader_guid (rd); diff --git a/src/core/ddsi/src/q_plist.c b/src/core/ddsi/src/q_plist.c index 5b11983..edefa65 100644 --- a/src/core/ddsi/src/q_plist.c +++ b/src/core/ddsi/src/q_plist.c @@ -3123,16 +3123,6 @@ unsigned char *nn_plist_quickscan (struct nn_rsample_info *dest, const struct nn { case PID_PAD: break; - case PID_KEYHASH: - if (length < sizeof (dest->keyhash)) - { - TRACE (("plist(vendor %u.%u): quickscan(PID_KEYHASH): buffer too small\n", - src->vendorid.id[0], src->vendorid.id[1])); - return NULL; - } - memcpy (&dest->keyhash, pl, sizeof (dest->keyhash)); - dest->hashash = 1; - break; case PID_STATUSINFO: if (length < 4) { diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 8962942..da90564 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -20,6 +20,7 @@ #include "ddsi/q_md5.h" #include "util/ut_avl.h" #include "q__osplser.h" +#include "dds__stream.h" #include "ddsi/q_protocol.h" #include "ddsi/q_rtps.h" #include "ddsi/q_misc.h" @@ -320,13 +321,6 @@ static void set_sampleinfo_proxy_writer (struct nn_rsample_info *sampleinfo, nn_ { struct proxy_writer * pwr = ephash_lookup_proxy_writer_guid (pwr_guid); sampleinfo->pwr = pwr; - if (pwr) - { - sampleinfo->pwr_info.guid = pwr->e.guid; - sampleinfo->pwr_info.ownership_strength = pwr->c.xqos->ownership_strength.value; - sampleinfo->pwr_info.auto_dispose = pwr->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances; - sampleinfo->pwr_info.iid = pwr->e.iid; - } } static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, Data_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp) @@ -1818,6 +1812,18 @@ static serstate_t make_raw_serstate return st; } +static serdata_t ddsi_serstate_fix_with_key (serstate_t st, const struct sertopic *topic, bool bswap) +{ + serdata_t sample = ddsi_serstate_fix(st); + dds_stream_t is; + assert(sample->v.keyhash.m_flags == 0); + sample->v.bswap = bswap; + dds_stream_from_serstate (&is, sample->v.st); + /* FIXME: the relationship between dds_topic, topic_descriptor and sertopic clearly needs some work */ + dds_stream_read_keyhash (&is, &sample->v.keyhash, topic->status_cb_entity->m_descriptor, sample->v.st->kind == STK_KEY); + return sample; +} + static serdata_t extract_sample_from_data ( const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags, @@ -1845,7 +1851,7 @@ static serdata_t extract_sample_from_data return NULL; } st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp); - sample = ddsi_serstate_fix (st); + sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap); } else if (sampleinfo->size) { @@ -1856,13 +1862,13 @@ static serdata_t extract_sample_from_data if (data_smhdr_flags & DATA_FLAG_KEYFLAG) { st = make_raw_serstate (topic, fragchain, sampleinfo->size, 1, statusinfo, tstamp); - sample = ddsi_serstate_fix (st); + sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap); } else { assert (data_smhdr_flags & DATA_FLAG_DATAFLAG); st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp); - sample = ddsi_serstate_fix (st); + sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap); } } else if (data_smhdr_flags & DATA_FLAG_INLINE_QOS) @@ -1880,7 +1886,7 @@ static serdata_t extract_sample_from_data ddsi_serstate_set_msginfo (st, statusinfo, tstamp, NULL); st->kind = STK_KEY; ddsi_serstate_append_blob (st, 1, sizeof (qos->keyhash), qos->keyhash.value); - sample = ddsi_serstate_fix (st); + sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap); } } else @@ -1901,10 +1907,6 @@ static serdata_t extract_sample_from_data failmsg ? failmsg : "for reasons unknown" ); } - else - { - sample->v.bswap = sampleinfo->bswap; - } return sample; } @@ -2036,16 +2038,12 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st us */ { struct tkmap_instance * tk; - - if (sampleinfo->hashash) - { - payload->v.keyhash.m_flags = DDS_KEY_HASH_SET; - memcpy (&payload->v.keyhash.m_hash, &sampleinfo->keyhash, sizeof (payload->v.keyhash.m_hash)); - } - tk = (ddsi_plugin.rhc_lookup_fn) (payload); - + tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (payload); if (tk) { + struct proxy_writer_info pwr_info; + make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos); + if (rdguid == NULL) { TRACE ((" %"PRId64"=>EVERYONE\n", sampleinfo->seq)); @@ -2069,7 +2067,7 @@ retry: for (i = 0; rdary[i]; i++) { TRACE (("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid))); - if (! (ddsi_plugin.rhc_store_fn) (rdary[i]->rhc, sampleinfo, payload, tk)) + if (! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk)) { if (pwr_locked) os_mutexUnlock (&pwr->e.lock); os_mutexUnlock (&pwr->rdary.rdary_lock); @@ -2100,7 +2098,7 @@ retry: if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) { TRACE (("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid))); - (void) (ddsi_plugin.rhc_store_fn) (rd->rhc, sampleinfo, payload, tk); + (void) (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); } } if (!pwr_locked) os_mutexUnlock (&pwr->e.lock); @@ -2112,14 +2110,14 @@ retry: { struct reader *rd = ephash_lookup_reader_guid (rdguid);; TRACE ((" %"PRId64"=>%x:%x:%x:%x%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?")); - while (rd && ! (ddsi_plugin.rhc_store_fn) (rd->rhc, sampleinfo, payload, tk) && ephash_lookup_proxy_writer_guid (&pwr->e.guid)) + while (rd && ! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk) && ephash_lookup_proxy_writer_guid (&pwr->e.guid)) { if (pwr_locked) os_mutexUnlock (&pwr->e.lock); dds_sleepfor (DDS_MSECS (1)); if (pwr_locked) os_mutexLock (&pwr->e.lock); } } - (ddsi_plugin.rhc_unref_fn) (tk); + (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk); } } ddsi_serdata_unref (payload); @@ -2837,7 +2835,6 @@ static int handle_submsg_sequence { struct nn_rsample_info sampleinfo; unsigned char *datap; - sampleinfo.hashash = 0; /* valid_DataFrag does not validate the payload */ if (!valid_DataFrag (rst, rmsg, &sm->datafrag, submsg_size, byteswap, &sampleinfo, &datap)) goto malformed; @@ -2853,7 +2850,6 @@ static int handle_submsg_sequence { struct nn_rsample_info sampleinfo; unsigned char *datap; - sampleinfo.hashash = 0; /* valid_Data does not validate the payload */ if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap)) { diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index e1e3c88..0479dd2 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -1138,9 +1138,9 @@ int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, serdata_t serd { struct tkmap_instance *tk; int res; - tk = (ddsi_plugin.rhc_lookup_fn) (serdata); + tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (serdata); res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 1); - (ddsi_plugin.rhc_unref_fn) (tk); + (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk); return res; } @@ -1148,9 +1148,9 @@ int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, serdata_t se { struct tkmap_instance *tk; int res; - tk = (ddsi_plugin.rhc_lookup_fn) (serdata); + tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (serdata); res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 0); - (ddsi_plugin.rhc_unref_fn) (tk); + (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk); return res; } diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 5bef152..4d402a2 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -1131,9 +1131,9 @@ static void write_pmd_message (struct nn_xpack *xp, struct participant *pp, unsi encoding. */ serdata->hdr.identifier = PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE; - tk = (ddsi_plugin.rhc_lookup_fn) (serdata); + tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (serdata); write_sample_nogc (xp, wr, serdata, tk); - (ddsi_plugin.rhc_unref_fn) (tk); + (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk); #undef PMD_DATA_LENGTH } diff --git a/src/tools/pubsub/pubsub.c b/src/tools/pubsub/pubsub.c index 309dd48..c3058cd 100755 --- a/src/tools/pubsub/pubsub.c +++ b/src/tools/pubsub/pubsub.c @@ -52,11 +52,10 @@ enum readermode { MODE_PRINT, MODE_CHECK, MODE_ZEROLOAD, MODE_DUMP, MODE_NONE }; #define PM_IHANDLE 8u #define PM_PHANDLE 16u #define PM_STIME 32u -#define PM_RTIME 64u -#define PM_DGEN 128u -#define PM_NWGEN 256u -#define PM_RANKS 512u -#define PM_STATE 1024u +#define PM_DGEN 64u +#define PM_NWGEN 128u +#define PM_RANKS 256u +#define PM_STATE 512u static volatile sig_atomic_t termflag = 0; static int pid; @@ -818,8 +817,6 @@ static void print_sampleinfo(dds_time_t *tstart, dds_time_t tnow, const dds_samp sep = " : "; if (print_metadata & PM_STIME) n += printf ("%s%lld.%09lld", n > 0 ? sep : "", (si->source_timestamp/DDS_NSECS_IN_SEC), (si->source_timestamp%DDS_NSECS_IN_SEC)), sep = " "; - if (print_metadata & PM_RTIME) - n += printf ("%s%lld.%09lld", n > 0 ? sep : "", (si->reception_timestamp/DDS_NSECS_IN_SEC), (si->reception_timestamp%DDS_NSECS_IN_SEC)); sep = " : "; if (print_metadata & PM_DGEN) n += printf ("%s%"PRIu32, n > 0 ? sep : "", si->disposed_generation_count), sep = " "; @@ -2102,7 +2099,6 @@ static void set_print_mode(const char *optarg) { { "phandle", PM_PHANDLE }, { "ihandle", PM_IHANDLE }, { "stime", PM_STIME }, - { "rtime", PM_RTIME }, { "dgen", PM_DGEN }, { "nwgen", PM_NWGEN }, { "ranks", PM_RANKS },