Fix dds_sample_info.publication_handle incorrectly set to 1 as well as some corner cases where it ended up at 0 and some related assertion failures (#8)
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
79ec9e1cd4
commit
daa17704db
24 changed files with 343 additions and 385 deletions
|
@ -173,8 +173,6 @@ typedef struct dds_sample_info
|
|||
uint32_t generation_rank;
|
||||
/** difference in generations between the sample and most recent sample of the same instance when read/take was called */
|
||||
uint32_t absolute_generation_rank;
|
||||
/** timestamp of a data instance when it is added to a read queue */
|
||||
dds_time_t reception_timestamp; /* NOTE: VLite extension */
|
||||
}
|
||||
dds_sample_info_t;
|
||||
|
||||
|
|
|
@ -25,8 +25,6 @@ struct rhc;
|
|||
struct nn_xqos;
|
||||
struct serdata;
|
||||
struct tkmap_instance;
|
||||
struct tkmap;
|
||||
struct nn_rsample_info;
|
||||
struct proxy_writer_info;
|
||||
|
||||
struct rhc * dds_rhc_new (dds_reader * reader, const struct sertopic * topic);
|
||||
|
@ -37,7 +35,7 @@ uint32_t dds_rhc_lock_samples (struct rhc * rhc);
|
|||
|
||||
DDS_EXPORT bool dds_rhc_store
|
||||
(
|
||||
struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo,
|
||||
struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info,
|
||||
struct serdata * __restrict sample, struct tkmap_instance * __restrict tk
|
||||
);
|
||||
void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);
|
||||
|
|
|
@ -38,7 +38,6 @@ void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk);
|
|||
uint64_t dds_tkmap_lookup (_In_ struct tkmap *tkmap, _In_ const struct serdata *serdata);
|
||||
_Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample);
|
||||
_Check_return_ struct tkmap_instance * dds_tkmap_find(
|
||||
_In_opt_ const struct dds_topic * topic,
|
||||
_In_ struct serdata * sd,
|
||||
_In_ const bool rd,
|
||||
_In_ const bool create);
|
||||
|
|
|
@ -263,14 +263,14 @@ void ddsi_plugin_init (void)
|
|||
|
||||
/* Register read cache functions */
|
||||
|
||||
ddsi_plugin.rhc_free_fn = dds_rhc_free;
|
||||
ddsi_plugin.rhc_fini_fn = dds_rhc_fini;
|
||||
ddsi_plugin.rhc_store_fn = dds_rhc_store;
|
||||
ddsi_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr;
|
||||
ddsi_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership;
|
||||
ddsi_plugin.rhc_set_qos_fn = dds_rhc_set_qos;
|
||||
ddsi_plugin.rhc_lookup_fn = dds_tkmap_lookup_instance_ref;
|
||||
ddsi_plugin.rhc_unref_fn = dds_tkmap_instance_unref;
|
||||
ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free;
|
||||
ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini;
|
||||
ddsi_plugin.rhc_plugin.rhc_store_fn = dds_rhc_store;
|
||||
ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr;
|
||||
ddsi_plugin.rhc_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership;
|
||||
ddsi_plugin.rhc_plugin.rhc_set_qos_fn = dds_rhc_set_qos;
|
||||
ddsi_plugin.rhc_plugin.rhc_lookup_fn = dds_tkmap_lookup_instance_ref;
|
||||
ddsi_plugin.rhc_plugin.rhc_unref_fn = dds_tkmap_instance_unref;
|
||||
|
||||
/* Register iid generator */
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ dds_instance_find(
|
|||
_In_ const bool create)
|
||||
{
|
||||
serdata_t sd = serialize_key (gv.serpool, topic->m_stopic, data);
|
||||
struct tkmap_instance * inst = dds_tkmap_find (topic, sd, false, create);
|
||||
struct tkmap_instance * inst = dds_tkmap_find (sd, false, create);
|
||||
ddsi_serdata_unref (sd);
|
||||
return inst;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,14 @@
|
|||
#include "ddsi/q_bswap.h"
|
||||
#include "ddsi/q_md5.h"
|
||||
|
||||
#ifndef NDEBUG
|
||||
static bool keyhash_is_reset(const dds_key_hash_t *kh)
|
||||
{
|
||||
static const char nullhash[sizeof(kh->m_hash)];
|
||||
return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
void dds_key_md5 (dds_key_hash_t * kh)
|
||||
{
|
||||
md5_state_t md5st;
|
||||
|
@ -43,8 +51,14 @@ void dds_key_gen
|
|||
uint32_t len = 0;
|
||||
char * dst;
|
||||
|
||||
assert (desc->m_nkeys);
|
||||
assert (kh->m_hash[0] == 0 && kh->m_hash[15] == 0);
|
||||
assert(keyhash_is_reset(kh));
|
||||
|
||||
if (desc->m_nkeys == 0)
|
||||
{
|
||||
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH;
|
||||
kh->m_key_len = sizeof (kh->m_hash);
|
||||
return;
|
||||
}
|
||||
|
||||
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET;
|
||||
|
||||
|
|
|
@ -210,6 +210,13 @@ static int lwregs_delete (struct lwregs *rt, uint64_t iid, uint64_t wr_iid)
|
|||
return ut_ehhRemove (rt->regs, &dummy);
|
||||
}
|
||||
|
||||
void lwregs_dump (struct lwregs *rt)
|
||||
{
|
||||
struct ut_ehhIter it;
|
||||
for (struct lwreg *r = ut_ehhIterFirst(rt->regs, &it); r; r = ut_ehhIterNext(&it))
|
||||
printf("iid=%"PRIu64" wr_iid=%"PRIu64"\n", r->iid, r->wr_iid);
|
||||
}
|
||||
|
||||
/*************************
|
||||
****** RHC ******
|
||||
*************************/
|
||||
|
@ -219,7 +226,6 @@ struct rhc_sample
|
|||
struct serdata *sample; /* serialised data (either just_key or real data) */
|
||||
struct rhc_sample *next; /* next sample in time ordering, or oldest sample if most recent */
|
||||
uint64_t wr_iid; /* unique id for writer of this sample (perhaps better in serdata) */
|
||||
nn_wctime_t rtstamp; /* reception timestamp (not really required; perhaps better in serdata) */
|
||||
bool isread; /* READ or NOT_READ sample state */
|
||||
unsigned disposed_gen; /* snapshot of instance counter at time of insertion */
|
||||
unsigned no_writers_gen; /* __/ */
|
||||
|
@ -228,15 +234,16 @@ struct rhc_sample
|
|||
struct rhc_instance
|
||||
{
|
||||
uint64_t iid; /* unique instance id, key of table, also serves as instance handle */
|
||||
uint64_t wr_iid; /* unique of id of writer of latest sample or 0 */
|
||||
uint64_t wr_iid; /* unique of id of writer of latest sample or 0; if wrcount = 0 it is the wr_iid that caused */
|
||||
struct rhc_sample *latest; /* latest received sample; circular list old->new; null if no sample */
|
||||
unsigned nvsamples; /* number of "valid" samples in instance */
|
||||
unsigned nvread; /* number of READ "valid" samples in instance (0 <= nvread <= nvsamples) */
|
||||
uint32_t wrcount; /* number of live writers */
|
||||
bool isnew; /* NEW or NOT_NEW view state */
|
||||
bool a_sample_free; /* whether or not a_sample is in use */
|
||||
bool isdisposed; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */
|
||||
bool has_changed; /* To track changes in an instance - if number of samples are added or data is overwritten */
|
||||
unsigned isnew : 1; /* NEW or NOT_NEW view state */
|
||||
unsigned a_sample_free : 1; /* whether or not a_sample is in use */
|
||||
unsigned isdisposed : 1; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */
|
||||
unsigned has_changed : 1; /* To track changes in an instance - if number of samples are added or data is overwritten */
|
||||
unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */
|
||||
unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */
|
||||
unsigned inv_isread : 1; /* whether or not that state change has been read before */
|
||||
unsigned disposed_gen; /* bloody generation counters - worst invention of mankind */
|
||||
|
@ -312,9 +319,7 @@ struct trigger_info
|
|||
#define INST_HAS_UNREAD(i) (INST_NREAD (i) < INST_NSAMPLES (i))
|
||||
|
||||
static unsigned qmask_of_inst (const struct rhc_instance *inst);
|
||||
static bool update_conditions_locked
|
||||
(struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct serdata *sample);
|
||||
static void signal_conditions (struct rhc *rhc);
|
||||
static bool update_conditions_locked (struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct serdata *sample);
|
||||
#ifndef NDEBUG
|
||||
static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds);
|
||||
#endif
|
||||
|
@ -415,7 +420,7 @@ static struct rhc_sample * alloc_sample (struct rhc_instance *inst)
|
|||
{
|
||||
if (inst->a_sample_free)
|
||||
{
|
||||
inst->a_sample_free = false;
|
||||
inst->a_sample_free = 0;
|
||||
#if USE_VALGRIND
|
||||
VALGRIND_MAKE_MEM_UNDEFINED (&inst->a_sample, sizeof (inst->a_sample));
|
||||
#endif
|
||||
|
@ -439,7 +444,7 @@ static void free_sample (struct rhc_instance *inst, struct rhc_sample *s)
|
|||
#if USE_VALGRIND
|
||||
VALGRIND_MAKE_MEM_NOACCESS (&inst->a_sample, sizeof (inst->a_sample));
|
||||
#endif
|
||||
inst->a_sample_free = true;
|
||||
inst->a_sample_free = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -556,7 +561,7 @@ static void get_trigger_info (struct trigger_info *info, struct rhc_instance *in
|
|||
/* reset instance has_changed before adding/overwriting a sample */
|
||||
if (pre)
|
||||
{
|
||||
inst->has_changed = false;
|
||||
inst->has_changed = 0;
|
||||
}
|
||||
info->has_changed = inst->has_changed;
|
||||
}
|
||||
|
@ -571,13 +576,12 @@ static bool add_sample
|
|||
(
|
||||
struct rhc * rhc,
|
||||
struct rhc_instance * inst,
|
||||
const struct nn_rsample_info * sampleinfo,
|
||||
const struct proxy_writer_info * pwr_info,
|
||||
const struct serdata * sample,
|
||||
status_cb_data_t * cb_data
|
||||
)
|
||||
{
|
||||
struct rhc_sample *s;
|
||||
assert (sample->v.bswap == sampleinfo->bswap);
|
||||
|
||||
/* Adding a sample always clears an invalid sample (because the information
|
||||
contained in the invalid sample - the instance state and the generation
|
||||
|
@ -604,7 +608,7 @@ static bool add_sample
|
|||
}
|
||||
|
||||
/* set a flag to indicate instance has changed to notify data_available since the sample is overwritten */
|
||||
inst->has_changed = true;
|
||||
inst->has_changed = 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -648,8 +652,7 @@ static bool add_sample
|
|||
}
|
||||
|
||||
s->sample = ddsi_serdata_ref ((serdata_t) sample); /* drops const (tho refcount does change) */
|
||||
s->wr_iid = sampleinfo->pwr_info.iid;
|
||||
s->rtstamp = sampleinfo->reception_timestamp;
|
||||
s->wr_iid = pwr_info->iid;
|
||||
s->isread = false;
|
||||
s->disposed_gen = inst->disposed_gen;
|
||||
s->no_writers_gen = inst->no_writers_gen;
|
||||
|
@ -670,16 +673,15 @@ static bool content_filter_accepts (const struct sertopic * topic, const struct
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct nn_rsample_info *sampleinfo)
|
||||
static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct proxy_writer_info *pwr_info)
|
||||
{
|
||||
return inst->wr_iid == sampleinfo->pwr_info.iid ||
|
||||
memcmp (&sampleinfo->pwr_info.guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0;
|
||||
return (inst->wr_iid_islive && inst->wr_iid == pwr_info->iid) || memcmp (&pwr_info->guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0;
|
||||
}
|
||||
|
||||
static int inst_accepts_sample
|
||||
(
|
||||
const struct rhc *rhc, const struct rhc_instance *inst,
|
||||
const struct nn_rsample_info *sampleinfo,
|
||||
const struct proxy_writer_info *pwr_info,
|
||||
const struct serdata *sample, const bool has_data
|
||||
)
|
||||
{
|
||||
|
@ -693,7 +695,7 @@ static int inst_accepts_sample
|
|||
{
|
||||
return 0;
|
||||
}
|
||||
else if (inst_accepts_sample_by_writer_guid (inst, sampleinfo))
|
||||
else if (inst_accepts_sample_by_writer_guid (inst, pwr_info))
|
||||
{
|
||||
/* ok */
|
||||
}
|
||||
|
@ -702,14 +704,14 @@ static int inst_accepts_sample
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
if (rhc->exclusive_ownership && inst->wr_iid != sampleinfo->pwr_info.iid)
|
||||
if (rhc->exclusive_ownership && inst->wr_iid_islive && inst->wr_iid != pwr_info->iid)
|
||||
{
|
||||
uint32_t strength = sampleinfo->pwr_info.ownership_strength;
|
||||
uint32_t strength = pwr_info->ownership_strength;
|
||||
if (strength > inst->strength) {
|
||||
/* ok */
|
||||
} else if (strength < inst->strength) {
|
||||
return 0;
|
||||
} else if (inst_accepts_sample_by_writer_guid (inst, sampleinfo)) {
|
||||
} else if (inst_accepts_sample_by_writer_guid (inst, pwr_info)) {
|
||||
/* ok */
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -725,14 +727,16 @@ static int inst_accepts_sample
|
|||
static void update_inst
|
||||
(
|
||||
const struct rhc *rhc, struct rhc_instance *inst,
|
||||
const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp)
|
||||
const struct proxy_writer_info * __restrict pwr_info, bool wr_iid_valid, nn_wctime_t tstamp)
|
||||
{
|
||||
if (inst->wr_iid != pwr_info->iid)
|
||||
{
|
||||
inst->wr_guid = pwr_info->guid;
|
||||
}
|
||||
inst->tstamp = tstamp;
|
||||
inst->wr_iid = (inst->wrcount == 0) ? 0 : pwr_info->iid;
|
||||
inst->wr_iid_islive = wr_iid_valid;
|
||||
if (wr_iid_valid)
|
||||
{
|
||||
inst->wr_iid = pwr_info->iid;
|
||||
if (inst->wr_iid != pwr_info->iid)
|
||||
inst->wr_guid = pwr_info->guid;
|
||||
}
|
||||
inst->strength = pwr_info->ownership_strength;
|
||||
}
|
||||
|
||||
|
@ -752,6 +756,8 @@ static void drop_instance_noupdate_no_writers (struct rhc *rhc, struct rhc_insta
|
|||
|
||||
static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update)
|
||||
{
|
||||
const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0;
|
||||
|
||||
TRACE ((" register:"));
|
||||
|
||||
/* Is an implicitly registering dispose semantically equivalent to
|
||||
|
@ -762,7 +768,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
|
|||
|
||||
Is a dispose a sample? I don't think so (though a write dispose
|
||||
is). Is a pure register a sample? Don't think so either. */
|
||||
if (inst->wr_iid == wr_iid)
|
||||
if (inst_wr_iid == wr_iid)
|
||||
{
|
||||
/* Same writer as last time => we know it is registered already.
|
||||
This is the fast path -- we don't have to check anything
|
||||
|
@ -775,12 +781,13 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
|
|||
if (inst->wrcount == 0)
|
||||
{
|
||||
/* Currently no writers at all */
|
||||
assert (inst->wr_iid == 0);
|
||||
assert (!inst->wr_iid_islive);
|
||||
|
||||
/* to avoid wr_iid update when register is called for sample rejected */
|
||||
if (iid_update)
|
||||
{
|
||||
inst->wr_iid = wr_iid;
|
||||
inst->wr_iid_islive = 1;
|
||||
}
|
||||
inst->wrcount++;
|
||||
inst->no_writers_gen++;
|
||||
|
@ -789,7 +796,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
|
|||
if (!INST_IS_EMPTY (inst) && !inst->isdisposed)
|
||||
rhc->n_not_alive_no_writers--;
|
||||
}
|
||||
else if (inst->wr_iid == 0 && inst->wrcount == 1)
|
||||
else if (inst_wr_iid == 0 && inst->wrcount == 1)
|
||||
{
|
||||
/* Writers exist, but wr_iid is null => someone unregistered.
|
||||
|
||||
|
@ -822,6 +829,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
|
|||
if (iid_update)
|
||||
{
|
||||
inst->wr_iid = wr_iid;
|
||||
inst->wr_iid_islive = 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -833,7 +841,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
|
|||
/* 2nd writer => properly register the one we knew about */
|
||||
TRACE (("rescue1"));
|
||||
int x;
|
||||
x = lwregs_add (&rhc->registrations, inst->iid, inst->wr_iid);
|
||||
x = lwregs_add (&rhc->registrations, inst->iid, inst_wr_iid);
|
||||
assert (x);
|
||||
(void) x;
|
||||
}
|
||||
|
@ -855,6 +863,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
|
|||
if (iid_update)
|
||||
{
|
||||
inst->wr_iid = wr_iid;
|
||||
inst->wr_iid_islive = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -863,10 +872,7 @@ static void account_for_empty_to_nonempty_transition (struct rhc *rhc, struct rh
|
|||
{
|
||||
assert (INST_NSAMPLES (inst) == 1);
|
||||
add_inst_to_nonempty_list (rhc, inst);
|
||||
if (inst->isnew)
|
||||
{
|
||||
rhc->n_new++;
|
||||
}
|
||||
rhc->n_new += inst->isnew;
|
||||
if (inst->isdisposed)
|
||||
rhc->n_not_alive_disposed++;
|
||||
else if (inst->wrcount == 0)
|
||||
|
@ -881,8 +887,9 @@ static int rhc_unregister_isreg_w_sideeffects (struct rhc *rhc, const struct rhc
|
|||
TRACE (("unknown(#0)"));
|
||||
return 0;
|
||||
}
|
||||
else if (inst->wrcount == 1 && inst->wr_iid != 0)
|
||||
else if (inst->wrcount == 1 && inst->wr_iid_islive)
|
||||
{
|
||||
assert(inst->wr_iid != 0);
|
||||
if (wr_iid != inst->wr_iid)
|
||||
{
|
||||
TRACE (("unknown(cache)"));
|
||||
|
@ -907,7 +914,7 @@ static int rhc_unregister_isreg_w_sideeffects (struct rhc *rhc, const struct rhc
|
|||
afterward there will be 1 writer, it will be cached, and its
|
||||
registration record must go (invariant that with wrcount = 1
|
||||
and wr_iid != 0 the wr_iid is not in "registrations") */
|
||||
if (inst->wrcount == 2 && inst->wr_iid != wr_iid)
|
||||
if (inst->wrcount == 2 && inst->wr_iid_islive && inst->wr_iid != wr_iid)
|
||||
{
|
||||
TRACE ((",delreg(remain)"));
|
||||
lwregs_delete (&rhc->registrations, inst->iid, inst->wr_iid);
|
||||
|
@ -923,56 +930,62 @@ static int rhc_unregister_updateinst
|
|||
{
|
||||
assert (inst->wrcount > 0);
|
||||
|
||||
if (pwr_info->iid == inst->wr_iid)
|
||||
{
|
||||
/* Next register will have to do real work before we have a cached
|
||||
wr_iid again */
|
||||
inst->wr_iid = 0;
|
||||
|
||||
/* Reset the ownership strength to allow samples to be read from other
|
||||
writer(s) */
|
||||
inst->strength = 0;
|
||||
TRACE ((",clearcache"));
|
||||
}
|
||||
|
||||
if (--inst->wrcount > 0)
|
||||
return 0;
|
||||
else if (!INST_IS_EMPTY (inst))
|
||||
{
|
||||
/* Instance still has content - do not drop until application
|
||||
if (inst->wr_iid_islive && pwr_info->iid == inst->wr_iid)
|
||||
{
|
||||
/* Next register will have to do real work before we have a cached
|
||||
wr_iid again */
|
||||
inst->wr_iid_islive = 0;
|
||||
|
||||
/* Reset the ownership strength to allow samples to be read from other
|
||||
writer(s) */
|
||||
inst->strength = 0;
|
||||
TRACE ((",clearcache"));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!INST_IS_EMPTY (inst))
|
||||
{
|
||||
/* Instance still has content - do not drop until application
|
||||
takes the last sample. Set the invalid sample if the latest
|
||||
sample has been read already, so that the application can
|
||||
read the change to not-alive. (If the latest sample is still
|
||||
unread, we don't bother, even though it means the application
|
||||
won't see the timestamp for the unregister event. It shouldn't
|
||||
care.) */
|
||||
if (inst->latest == NULL /*|| inst->latest->isread*/)
|
||||
if (inst->latest == NULL || inst->latest->isread)
|
||||
{
|
||||
inst_set_invsample (rhc, inst);
|
||||
update_inst (rhc, inst, pwr_info, false, tstamp);
|
||||
}
|
||||
if (!inst->isdisposed)
|
||||
{
|
||||
rhc->n_not_alive_no_writers++;
|
||||
}
|
||||
inst->wr_iid_islive = 0;
|
||||
return 0;
|
||||
}
|
||||
else if (inst->isdisposed)
|
||||
{
|
||||
/* No content left, no registrations left, so drop */
|
||||
TRACE ((",#0,empty,disposed,drop"));
|
||||
drop_instance_noupdate_no_writers (rhc, inst);
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Add invalid samples for transition to no-writers */
|
||||
TRACE ((",#0,empty,nowriters"));
|
||||
assert (INST_IS_EMPTY (inst));
|
||||
inst_set_invsample (rhc, inst);
|
||||
update_inst (rhc, inst, pwr_info, tstamp);
|
||||
update_inst (rhc, inst, pwr_info, false, tstamp);
|
||||
account_for_empty_to_nonempty_transition (rhc, inst);
|
||||
inst->wr_iid_islive = 0;
|
||||
return 0;
|
||||
}
|
||||
if (!inst->isdisposed)
|
||||
{
|
||||
rhc->n_not_alive_no_writers++;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
else if (inst->isdisposed)
|
||||
{
|
||||
/* No content left, no registrations left, so drop */
|
||||
TRACE ((",#0,empty,disposed,drop"));
|
||||
drop_instance_noupdate_no_writers (rhc, inst);
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Add invalid samples for transition to no-writers */
|
||||
TRACE ((",#0,empty,nowriters"));
|
||||
assert (INST_IS_EMPTY (inst));
|
||||
inst_set_invsample (rhc, inst);
|
||||
update_inst (rhc, inst, pwr_info, tstamp);
|
||||
account_for_empty_to_nonempty_transition (rhc, inst);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1004,7 +1017,7 @@ static void dds_rhc_unregister
|
|||
static struct rhc_instance * alloc_new_instance
|
||||
(
|
||||
const struct rhc *rhc,
|
||||
const struct nn_rsample_info *sampleinfo,
|
||||
const struct proxy_writer_info *pwr_info,
|
||||
struct serdata *serdata,
|
||||
struct tkmap_instance *tk
|
||||
)
|
||||
|
@ -1017,11 +1030,15 @@ static struct rhc_instance * alloc_new_instance
|
|||
inst->tk = tk;
|
||||
inst->wrcount = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1;
|
||||
inst->isdisposed = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_DISPOSE);
|
||||
inst->isnew = true;
|
||||
inst->isnew = 1;
|
||||
inst->inv_exists = 0;
|
||||
inst->inv_isread = 0; /* don't care */
|
||||
inst->a_sample_free = true;
|
||||
update_inst (rhc, inst, &sampleinfo->pwr_info, serdata->v.msginfo.timestamp);
|
||||
inst->a_sample_free = 1;
|
||||
inst->wr_iid = pwr_info->iid;
|
||||
inst->wr_iid_islive = (inst->wrcount != 0);
|
||||
inst->wr_guid = pwr_info->guid;
|
||||
inst->tstamp = serdata->v.msginfo.timestamp;
|
||||
inst->strength = pwr_info->ownership_strength;
|
||||
return inst;
|
||||
}
|
||||
|
||||
|
@ -1029,7 +1046,7 @@ static rhc_store_result_t rhc_store_new_instance
|
|||
(
|
||||
struct trigger_info * post,
|
||||
struct rhc *rhc,
|
||||
const struct nn_rsample_info *sampleinfo,
|
||||
const struct proxy_writer_info *pwr_info,
|
||||
struct serdata *sample,
|
||||
struct tkmap_instance *tk,
|
||||
const bool has_data,
|
||||
|
@ -1069,10 +1086,10 @@ static rhc_store_result_t rhc_store_new_instance
|
|||
return RHC_REJECTED;
|
||||
}
|
||||
|
||||
inst = alloc_new_instance (rhc, sampleinfo, sample, tk);
|
||||
inst = alloc_new_instance (rhc, pwr_info, sample, tk);
|
||||
if (has_data)
|
||||
{
|
||||
if (!add_sample (rhc, inst, sampleinfo, sample, cb_data))
|
||||
if (!add_sample (rhc, inst, pwr_info, sample, cb_data))
|
||||
{
|
||||
free_instance (inst, rhc);
|
||||
return RHC_REJECTED;
|
||||
|
@ -1102,11 +1119,11 @@ static rhc_store_result_t rhc_store_new_instance
|
|||
|
||||
bool dds_rhc_store
|
||||
(
|
||||
struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo,
|
||||
struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info,
|
||||
struct serdata * __restrict sample, struct tkmap_instance * __restrict tk
|
||||
)
|
||||
{
|
||||
const uint64_t wr_iid = sampleinfo->pwr_info.iid;
|
||||
const uint64_t wr_iid = pwr_info->iid;
|
||||
const unsigned statusinfo = sample->v.msginfo.statusinfo;
|
||||
const bool has_data = (sample->v.st->kind == STK_DATA);
|
||||
const int is_dispose = (statusinfo & NN_STATUSINFO_DISPOSE) != 0;
|
||||
|
@ -1150,7 +1167,7 @@ bool dds_rhc_store
|
|||
else
|
||||
{
|
||||
TRACE ((" new instance"));
|
||||
stored = rhc_store_new_instance (&post, rhc, sampleinfo, sample, tk, has_data, &cb_data);
|
||||
stored = rhc_store_new_instance (&post, rhc, pwr_info, sample, tk, has_data, &cb_data);
|
||||
if (stored != RHC_STORED)
|
||||
{
|
||||
goto error_or_nochange;
|
||||
|
@ -1158,7 +1175,7 @@ bool dds_rhc_store
|
|||
init_trigger_info_nonmatch (&pre);
|
||||
}
|
||||
}
|
||||
else if (!inst_accepts_sample (rhc, inst, sampleinfo, sample, has_data))
|
||||
else if (!inst_accepts_sample (rhc, inst, pwr_info, sample, has_data))
|
||||
{
|
||||
/* Rejected samples (and disposes) should still register the writer;
|
||||
unregister *must* be processed, or we have a memory leak. (We
|
||||
|
@ -1175,7 +1192,7 @@ bool dds_rhc_store
|
|||
}
|
||||
if (statusinfo & NN_STATUSINFO_UNREGISTER)
|
||||
{
|
||||
dds_rhc_unregister (&post, rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp);
|
||||
dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->v.msginfo.timestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1195,6 +1212,8 @@ bool dds_rhc_store
|
|||
{
|
||||
get_trigger_info (&pre, inst, true);
|
||||
|
||||
TRACE ((" wc %"PRIu32, inst->wrcount));
|
||||
|
||||
if (has_data || is_dispose)
|
||||
{
|
||||
/* View state must be NEW following receipt of a sample when
|
||||
|
@ -1222,7 +1241,7 @@ bool dds_rhc_store
|
|||
if (has_data && not_alive)
|
||||
{
|
||||
TRACE ((" notalive->alive"));
|
||||
inst->isnew = true;
|
||||
inst->isnew = 1;
|
||||
}
|
||||
|
||||
/* Desired effect on instance state and disposed_gen:
|
||||
|
@ -1235,12 +1254,12 @@ bool dds_rhc_store
|
|||
if (has_data && inst->isdisposed)
|
||||
{
|
||||
TRACE ((" disposed->notdisposed"));
|
||||
inst->isdisposed = false;
|
||||
inst->isdisposed = 0;
|
||||
inst->disposed_gen++;
|
||||
}
|
||||
if (is_dispose)
|
||||
{
|
||||
inst->isdisposed = true;
|
||||
inst->isdisposed = 1;
|
||||
inst_became_disposed = !old_isdisposed;
|
||||
TRACE ((" dispose(%d)", inst_became_disposed));
|
||||
}
|
||||
|
@ -1250,7 +1269,7 @@ bool dds_rhc_store
|
|||
if (has_data)
|
||||
{
|
||||
TRACE ((" add_sample"));
|
||||
if (! add_sample (rhc, inst, sampleinfo, sample, &cb_data))
|
||||
if (! add_sample (rhc, inst, pwr_info, sample, &cb_data))
|
||||
{
|
||||
TRACE (("(reject)"));
|
||||
stored = RHC_REJECTED;
|
||||
|
@ -1262,7 +1281,7 @@ bool dds_rhc_store
|
|||
if (inst_became_disposed && (inst->latest == NULL ))
|
||||
inst_set_invsample (rhc, inst);
|
||||
|
||||
update_inst (rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp);
|
||||
update_inst (rhc, inst, pwr_info, true, sample->v.msginfo.timestamp);
|
||||
|
||||
/* Can only add samples => only need to give special treatment
|
||||
to instances that were empty before. It is, however, not
|
||||
|
@ -1280,7 +1299,7 @@ bool dds_rhc_store
|
|||
else
|
||||
{
|
||||
rhc->n_not_alive_disposed += inst->isdisposed - old_isdisposed;
|
||||
rhc->n_new += (inst->isnew ? 1 : 0) - (old_isnew ? 1 : 0);
|
||||
rhc->n_new += inst->isnew - old_isnew;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -1306,7 +1325,7 @@ bool dds_rhc_store
|
|||
mean an application reading "x" after the write and reading it
|
||||
again after the unregister will see a change in the
|
||||
no_writers_generation field? */
|
||||
dds_rhc_unregister (&post, rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp);
|
||||
dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->v.msginfo.timestamp);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1390,7 +1409,7 @@ void dds_rhc_unregister_wr
|
|||
TRACE (("rhc_unregister_wr_iid(%"PRIx64",%d:\n", wr_iid, auto_dispose));
|
||||
for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter))
|
||||
{
|
||||
if (inst->wr_iid == wr_iid || lwregs_contains (&rhc->registrations, inst->iid, wr_iid))
|
||||
if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid))
|
||||
{
|
||||
struct trigger_info pre, post;
|
||||
get_trigger_info (&pre, inst, true);
|
||||
|
@ -1400,7 +1419,7 @@ void dds_rhc_unregister_wr
|
|||
assert (inst->wrcount > 0);
|
||||
if (auto_dispose && !inst->isdisposed)
|
||||
{
|
||||
inst->isdisposed = true;
|
||||
inst->isdisposed = 1;
|
||||
|
||||
/* Set invalid sample for disposing it (unregister may also set it for unregistering) */
|
||||
if (inst->latest)
|
||||
|
@ -1412,7 +1431,6 @@ void dds_rhc_unregister_wr
|
|||
{
|
||||
const bool was_empty = INST_IS_EMPTY (inst);
|
||||
inst_set_invsample (rhc, inst);
|
||||
update_inst (rhc, inst, pwr_info, inst->tstamp);
|
||||
if (was_empty)
|
||||
account_for_empty_to_nonempty_transition (rhc, inst);
|
||||
else
|
||||
|
@ -1452,9 +1470,9 @@ void dds_rhc_relinquish_ownership (struct rhc * __restrict rhc, const uint64_t w
|
|||
TRACE (("rhc_relinquish_ownership(%"PRIx64":\n", wr_iid));
|
||||
for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter))
|
||||
{
|
||||
if (inst->wr_iid == wr_iid)
|
||||
if (inst->wr_iid_islive && inst->wr_iid == wr_iid)
|
||||
{
|
||||
inst->wr_iid = 0;
|
||||
inst->wr_iid_islive = 0;
|
||||
}
|
||||
}
|
||||
TRACE ((")\n"));
|
||||
|
@ -1564,7 +1582,6 @@ static void set_sample_info (dds_sample_info_t *si, const struct rhc_instance *i
|
|||
si->absolute_generation_rank = (inst->disposed_gen + inst->no_writers_gen) - (sample->disposed_gen + sample->no_writers_gen);
|
||||
si->valid_data = true;
|
||||
si->source_timestamp = sample->sample->v.msginfo.timestamp.v;
|
||||
si->reception_timestamp = sample->rtstamp.v;
|
||||
}
|
||||
|
||||
static void set_sample_info_invsample (dds_sample_info_t *si, const struct rhc_instance *inst)
|
||||
|
@ -1581,9 +1598,6 @@ static void set_sample_info_invsample (dds_sample_info_t *si, const struct rhc_i
|
|||
si->absolute_generation_rank = 0;
|
||||
si->valid_data = false;
|
||||
si->source_timestamp = inst->tstamp.v;
|
||||
|
||||
/* Not storing the underlying "sample" so the reception time is lost */
|
||||
si->reception_timestamp = 0;
|
||||
}
|
||||
|
||||
static void patch_generations (dds_sample_info_t *si, uint32_t last_of_inst)
|
||||
|
@ -1692,7 +1706,7 @@ static int dds_rhc_read_w_qminv
|
|||
|
||||
if (n > n_first && inst->isnew)
|
||||
{
|
||||
inst->isnew = false;
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
if (nread != INST_NREAD (inst))
|
||||
|
@ -1834,7 +1848,7 @@ static int dds_rhc_take_w_qminv
|
|||
|
||||
if (n > n_first && inst->isnew)
|
||||
{
|
||||
inst->isnew = false;
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
|
||||
|
@ -1983,7 +1997,7 @@ static int dds_rhc_takecdr_w_qminv
|
|||
|
||||
if (n > n_first && inst->isnew)
|
||||
{
|
||||
inst->isnew = false;
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
|
||||
|
|
|
@ -1576,6 +1576,14 @@ static uint32_t dds_stream_get_keyhash
|
|||
return (uint32_t) (dst - origin);
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
static bool keyhash_is_reset(const dds_key_hash_t *kh)
|
||||
{
|
||||
static const char nullhash[sizeof(kh->m_hash)];
|
||||
return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
void dds_stream_read_keyhash
|
||||
(
|
||||
dds_stream_t * is,
|
||||
|
@ -1586,14 +1594,19 @@ void dds_stream_read_keyhash
|
|||
{
|
||||
char * dst;
|
||||
|
||||
assert (desc->m_keys);
|
||||
assert (keyhash_is_reset(kh));
|
||||
|
||||
if (desc->m_nkeys == 0)
|
||||
{
|
||||
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Select key buffer to use */
|
||||
|
||||
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET;
|
||||
if (desc->m_flagset & DDS_TOPIC_FIXED_KEY)
|
||||
{
|
||||
memset (kh->m_hash, 0, 16);
|
||||
kh->m_flags |= DDS_KEY_IS_HASH;
|
||||
dst = kh->m_hash;
|
||||
}
|
||||
|
@ -1616,7 +1629,6 @@ void dds_stream_read_keyhash
|
|||
else
|
||||
{
|
||||
/* Hash is md5 of key */
|
||||
|
||||
dds_key_md5 (kh);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -243,7 +243,6 @@ struct tkmap_instance * dds_tkmap_find_by_id (_In_ struct tkmap * map, _In_ uint
|
|||
|
||||
_Check_return_
|
||||
struct tkmap_instance * dds_tkmap_find(
|
||||
_In_opt_ const struct dds_topic * topic,
|
||||
_In_ struct serdata * sd,
|
||||
_In_ const bool rd,
|
||||
_In_ const bool create)
|
||||
|
@ -252,46 +251,9 @@ struct tkmap_instance * dds_tkmap_find(
|
|||
struct tkmap_instance * tk;
|
||||
struct tkmap * map = gv.m_tkmap;
|
||||
|
||||
assert(sd->v.keyhash.m_flags & DDS_KEY_HASH_SET);
|
||||
dummy.m_sample = sd;
|
||||
|
||||
/* Generate key hash if required and not provided */
|
||||
|
||||
if (topic && topic->m_descriptor->m_nkeys)
|
||||
{
|
||||
if ((sd->v.keyhash.m_flags & DDS_KEY_HASH_SET) == 0)
|
||||
{
|
||||
dds_stream_t is;
|
||||
dds_stream_from_serstate (&is, sd->v.st);
|
||||
dds_stream_read_keyhash (&is, &sd->v.keyhash, topic->m_descriptor, sd->v.st->kind == STK_KEY);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (topic->m_descriptor->m_flagset & DDS_TOPIC_FIXED_KEY)
|
||||
{
|
||||
sd->v.keyhash.m_flags |= DDS_KEY_IS_HASH;
|
||||
}
|
||||
|
||||
#if DDS_DEBUG_KEYHASH
|
||||
|
||||
{
|
||||
dds_stream_t is;
|
||||
dds_key_hash_t kh;
|
||||
|
||||
/* Check that we generate same keyhash as provided */
|
||||
|
||||
memset (&kh, 0, sizeof (kh));
|
||||
dds_stream_from_serstate (&is, sd->v.st);
|
||||
dds_stream_read_keyhash (&is, &kh, topic->m_descriptor, sd->v.st->kind == STK_KEY);
|
||||
assert (memcmp (kh.m_hash, sd->v.keyhash.m_hash, 16) == 0);
|
||||
if (kh.m_key_buff_size)
|
||||
{
|
||||
dds_free (kh.m_key_buff);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
retry:
|
||||
if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL)
|
||||
{
|
||||
|
@ -340,19 +302,15 @@ retry:
|
|||
_Check_return_
|
||||
struct tkmap_instance * dds_tkmap_lookup_instance_ref (_In_ struct serdata * sd)
|
||||
{
|
||||
dds_topic * topic = sd->v.st->topic ? sd->v.st->topic->status_cb_entity : NULL;
|
||||
|
||||
assert (vtime_awake_p (lookup_thread_state ()->vtime));
|
||||
|
||||
#if 0
|
||||
/* Topic might have been deleted -- FIXME: no way the topic may be deleted when there're still users out there */
|
||||
if (topic == NULL)
|
||||
if (sd->v.st->topic == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
return dds_tkmap_find (topic, sd, true, true);
|
||||
return dds_tkmap_find (sd, true, true);
|
||||
}
|
||||
|
||||
void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk)
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "ddsi/q_error.h"
|
||||
#include "ddsi/q_thread.h"
|
||||
#include "q__osplser.h"
|
||||
#include "dds__stream.h"
|
||||
#include "dds__err.h"
|
||||
#include "ddsi/q_transmit.h"
|
||||
#include "ddsi/q_ephash.h"
|
||||
|
@ -26,14 +27,6 @@
|
|||
#include <string.h>
|
||||
|
||||
|
||||
#if OS_ATOMIC64_SUPPORT
|
||||
typedef os_atomic_uint64_t fake_seq_t;
|
||||
uint64_t fake_seq_next (fake_seq_t *x) { return os_atomic_inc64_nv (x); }
|
||||
#else /* HACK */
|
||||
typedef os_atomic_uint32_t fake_seq_t;
|
||||
uint64_t fake_seq_next (fake_seq_t *x) { return os_atomic_inc32_nv (x); }
|
||||
#endif
|
||||
|
||||
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
|
||||
dds_return_t
|
||||
dds_write(
|
||||
|
@ -118,30 +111,9 @@ err:
|
|||
return ret;
|
||||
}
|
||||
|
||||
static void
|
||||
init_sampleinfo(
|
||||
_Out_ struct nn_rsample_info *sampleinfo,
|
||||
_In_ struct writer *wr,
|
||||
_In_ int64_t seq,
|
||||
_In_ serdata_t payload)
|
||||
{
|
||||
memset(sampleinfo, 0, sizeof(*sampleinfo));
|
||||
sampleinfo->bswap = 0;
|
||||
sampleinfo->complex_qos = 0;
|
||||
sampleinfo->hashash = 0;
|
||||
sampleinfo->seq = seq;
|
||||
sampleinfo->reception_timestamp = payload->v.msginfo.timestamp;
|
||||
sampleinfo->statusinfo = payload->v.msginfo.statusinfo;
|
||||
sampleinfo->pwr_info.iid = 1;
|
||||
sampleinfo->pwr_info.auto_dispose = 0;
|
||||
sampleinfo->pwr_info.guid = wr->e.guid;
|
||||
sampleinfo->pwr_info.ownership_strength = 0;
|
||||
}
|
||||
|
||||
static int
|
||||
deliver_locally(
|
||||
_In_ struct writer *wr,
|
||||
_In_ int64_t seq,
|
||||
_In_ serdata_t payload,
|
||||
_In_ struct tkmap_instance *tk)
|
||||
{
|
||||
|
@ -150,15 +122,15 @@ deliver_locally(
|
|||
if (wr->rdary.fastpath_ok) {
|
||||
struct reader ** const rdary = wr->rdary.rdary;
|
||||
if (rdary[0]) {
|
||||
struct nn_rsample_info sampleinfo;
|
||||
struct proxy_writer_info pwr_info;
|
||||
unsigned i;
|
||||
init_sampleinfo(&sampleinfo, wr, seq, payload);
|
||||
make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
|
||||
for (i = 0; rdary[i]; i++) {
|
||||
bool stored;
|
||||
TRACE (("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)));
|
||||
dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC;
|
||||
do {
|
||||
stored = (ddsi_plugin.rhc_store_fn) (rdary[i]->rhc, &sampleinfo, payload, tk);
|
||||
stored = (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk);
|
||||
if (!stored) {
|
||||
if (max_block_ms <= 0) {
|
||||
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT, "The writer could not deliver data on time, probably due to a local reader resources being full.");
|
||||
|
@ -185,16 +157,16 @@ deliver_locally(
|
|||
reliable samples that are rejected are simply discarded. */
|
||||
ut_avlIter_t it;
|
||||
struct pwr_rd_match *m;
|
||||
struct nn_rsample_info sampleinfo;
|
||||
struct proxy_writer_info pwr_info;
|
||||
os_mutexUnlock (&wr->rdary.rdary_lock);
|
||||
init_sampleinfo(&sampleinfo, wr, seq, payload);
|
||||
make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
|
||||
os_mutexLock (&wr->e.lock);
|
||||
for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) {
|
||||
struct reader *rd;
|
||||
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) {
|
||||
TRACE (("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)));
|
||||
/* Copied the return value ignore from DDSI deliver_user_data() function. */
|
||||
(void)(ddsi_plugin.rhc_store_fn) (rd->rhc, &sampleinfo, payload, tk);
|
||||
(void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk);
|
||||
}
|
||||
}
|
||||
os_mutexUnlock (&wr->e.lock);
|
||||
|
@ -209,7 +181,6 @@ dds_write_impl(
|
|||
_In_ dds_time_t tstamp,
|
||||
_In_ dds_write_action action)
|
||||
{
|
||||
static fake_seq_t fake_seq;
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
int w_rc;
|
||||
|
||||
|
@ -252,7 +223,7 @@ dds_write_impl(
|
|||
d->v.msginfo.timestamp.v = tstamp;
|
||||
os_mutexLock (&writer->m_call_lock);
|
||||
ddsi_serdata_ref(d);
|
||||
tk = (ddsi_plugin.rhc_lookup_fn) (d);
|
||||
tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (d);
|
||||
w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk);
|
||||
|
||||
if (w_rc >= 0) {
|
||||
|
@ -271,10 +242,10 @@ dds_write_impl(
|
|||
os_mutexUnlock (&writer->m_call_lock);
|
||||
|
||||
if (ret == DDS_RETCODE_OK) {
|
||||
ret = deliver_locally (ddsi_wr, fake_seq_next(&fake_seq), d, tk);
|
||||
ret = deliver_locally (ddsi_wr, d, tk);
|
||||
}
|
||||
ddsi_serdata_unref(d);
|
||||
(ddsi_plugin.rhc_unref_fn) (tk);
|
||||
(ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_asleep (thr);
|
||||
|
@ -292,7 +263,6 @@ dds_writecdr_impl(
|
|||
_In_ dds_time_t tstamp,
|
||||
_In_ dds_write_action action)
|
||||
{
|
||||
static fake_seq_t fake_seq;
|
||||
int ret = DDS_RETCODE_OK;
|
||||
int w_rc;
|
||||
|
||||
|
@ -316,17 +286,16 @@ dds_writecdr_impl(
|
|||
}
|
||||
|
||||
/* Serialize and write data or key */
|
||||
if (writekey) {
|
||||
abort();
|
||||
//d = serialize_key (gv.serpool, ddsi_wr->topic, data);
|
||||
} else {
|
||||
serstate_t st = ddsi_serstate_new (gv.serpool, ddsi_wr->topic);
|
||||
if (ddsi_wr->topic->nkeys) {
|
||||
abort();
|
||||
//dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample);
|
||||
}
|
||||
ddsi_serstate_append_blob(st, 1, sz, cdr);
|
||||
d = ddsi_serstate_fix(st);
|
||||
{
|
||||
serstate_t st = ddsi_serstate_new (gv.serpool, ddsi_wr->topic);
|
||||
dds_stream_t is;
|
||||
ddsi_serstate_append_blob(st, 1, sz, cdr);
|
||||
d = ddsi_serstate_fix(st);
|
||||
assert(d->v.keyhash.m_flags == 0);
|
||||
assert(!d->v.bswap);
|
||||
dds_stream_from_serstate (&is, d->v.st);
|
||||
d->v.st->kind = writekey ? STK_KEY : STK_DATA;
|
||||
dds_stream_read_keyhash (&is, &d->v.keyhash, ddsi_wr->topic->status_cb_entity->m_descriptor, d->v.st->kind == STK_KEY);
|
||||
}
|
||||
|
||||
/* Set if disposing or unregistering */
|
||||
|
@ -335,7 +304,7 @@ dds_writecdr_impl(
|
|||
d->v.msginfo.timestamp.v = tstamp;
|
||||
os_mutexLock (&wr->m_call_lock);
|
||||
ddsi_serdata_ref(d);
|
||||
tk = (ddsi_plugin.rhc_lookup_fn) (d);
|
||||
tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (d);
|
||||
w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk);
|
||||
|
||||
if (w_rc >= 0) {
|
||||
|
@ -354,10 +323,10 @@ dds_writecdr_impl(
|
|||
os_mutexUnlock (&wr->m_call_lock);
|
||||
|
||||
if (ret == DDS_RETCODE_OK) {
|
||||
ret = deliver_locally (ddsi_wr, fake_seq_next(&fake_seq), d, tk);
|
||||
ret = deliver_locally (ddsi_wr, d, tk);
|
||||
}
|
||||
ddsi_serdata_unref(d);
|
||||
(ddsi_plugin.rhc_unref_fn) (tk);
|
||||
(ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_asleep (thr);
|
||||
|
|
|
@ -22,11 +22,7 @@ serdata_t serialize (serstatepool_t pool, const struct sertopic * tp, const void
|
|||
{
|
||||
dds_stream_t os;
|
||||
serstate_t st = ddsi_serstate_new (pool, tp);
|
||||
|
||||
if (tp->nkeys)
|
||||
{
|
||||
dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample);
|
||||
}
|
||||
dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample);
|
||||
dds_stream_from_serstate (&os, st);
|
||||
dds_stream_write_sample (&os, sample, tp);
|
||||
dds_stream_add_to_serstate (&os, st);
|
||||
|
@ -65,21 +61,14 @@ int serdata_cmp (const struct serdata *a, const struct serdata *b)
|
|||
serdata_t serialize_key (serstatepool_t pool, const struct sertopic * tp, const void * sample)
|
||||
{
|
||||
serdata_t sd;
|
||||
if (tp->nkeys)
|
||||
{
|
||||
dds_stream_t os;
|
||||
dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type;
|
||||
serstate_t st = ddsi_serstate_new (pool, tp);
|
||||
dds_key_gen (desc, &st->data->v.keyhash, (char*) sample);
|
||||
dds_stream_from_serstate (&os, st);
|
||||
dds_stream_write_key (&os, sample, desc);
|
||||
dds_stream_add_to_serstate (&os, st);
|
||||
sd = st->data;
|
||||
}
|
||||
else
|
||||
{
|
||||
sd = serialize (pool, tp, sample);
|
||||
}
|
||||
dds_stream_t os;
|
||||
dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type;
|
||||
serstate_t st = ddsi_serstate_new (pool, tp);
|
||||
dds_key_gen (desc, &st->data->v.keyhash, (char*) sample);
|
||||
dds_stream_from_serstate (&os, st);
|
||||
dds_stream_write_key (&os, sample, desc);
|
||||
dds_stream_add_to_serstate (&os, st);
|
||||
sd = st->data;
|
||||
sd->v.st->kind = STK_KEY;
|
||||
return sd;
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@
|
|||
#define MAX_SAMPLES 21
|
||||
|
||||
#define RDR_NOT_READ_CNT 11
|
||||
#define RDR_INV_READ_CNT 1
|
||||
int rdr_expected_long_2[RDR_NOT_READ_CNT] = { 0, 1, 2, 6, 7, 9, 11, 13, 14, 16, 19 };
|
||||
|
||||
/* Because we only read one sample at a time, only the first sample of an instance
|
||||
|
@ -344,13 +345,13 @@ samples_cnt(void)
|
|||
/*************************************************************************************************/
|
||||
Test(ddsc_read_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
|
||||
{
|
||||
dds_return_t cnt = 0;
|
||||
dds_return_t cnt = 0, cntinv = 0;
|
||||
dds_return_t ret = 1;
|
||||
|
||||
while (ret == 1){
|
||||
ret = dds_read_next(g_reader, g_samples, g_info);
|
||||
cr_assert_geq(ret, 0 , "# read %d", ret);
|
||||
if(ret == 1){
|
||||
if(ret == 1 && g_info[0].valid_data){
|
||||
Space_Type1 *sample = (Space_Type1*)g_samples[0];
|
||||
PRINT_SAMPLE("ddsc_read_next::reader: Read", (*sample));
|
||||
|
||||
|
@ -373,10 +374,13 @@ Test(ddsc_read_next, reader, .init=reader_iterator_init, .fini=reader_iterator_f
|
|||
cr_assert_eq(g_info[0].view_state, expected_vst);
|
||||
cr_assert_eq(g_info[0].instance_state, expected_ist);
|
||||
cnt ++;
|
||||
} else if (ret == 1 && !g_info[0].valid_data) {
|
||||
cntinv ++;
|
||||
}
|
||||
}
|
||||
|
||||
cr_assert_eq(cnt, RDR_NOT_READ_CNT);
|
||||
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
|
||||
|
||||
/* All samples should still be available. */
|
||||
ret = samples_cnt();
|
||||
|
@ -453,13 +457,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_read_next, invalid_buffers, .in
|
|||
/*************************************************************************************************/
|
||||
Test(ddsc_read_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
|
||||
{
|
||||
dds_return_t cnt = 0;
|
||||
dds_return_t cnt = 0, cntinv = 0;
|
||||
dds_return_t ret = 1;
|
||||
|
||||
while (ret == 1){
|
||||
ret = dds_read_next_wl(g_reader, g_loans, g_info);
|
||||
cr_assert_geq(ret, 0 , "# read %d", ret);
|
||||
if(ret == 1){
|
||||
if(ret == 1 && g_info[0].valid_data){
|
||||
Space_Type1 *sample = (Space_Type1*)g_loans[0];
|
||||
PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample));
|
||||
|
||||
|
@ -482,10 +486,13 @@ Test(ddsc_read_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterato
|
|||
cr_assert_eq(g_info[0].view_state, expected_vst);
|
||||
cr_assert_eq(g_info[0].instance_state, expected_ist);
|
||||
cnt ++;
|
||||
} else if (ret == 1 && !g_info[0].valid_data) {
|
||||
cntinv ++;
|
||||
}
|
||||
}
|
||||
|
||||
cr_assert_eq(cnt, RDR_NOT_READ_CNT);
|
||||
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
|
||||
|
||||
ret = dds_return_loan(g_reader, g_loans, ret);
|
||||
cr_assert_eq (ret, DDS_RETCODE_OK);
|
||||
|
@ -564,13 +571,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_read_next_wl, invalid_buffers,
|
|||
/*************************************************************************************************/
|
||||
Test(ddsc_take_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
|
||||
{
|
||||
dds_return_t cnt = 0;
|
||||
dds_return_t cnt = 0, cntinv = 0;
|
||||
dds_return_t ret = 1;
|
||||
|
||||
while (ret == 1){
|
||||
ret = dds_take_next(g_reader, g_samples, g_info);
|
||||
cr_assert_geq(ret, 0 , "# read %d", ret);
|
||||
if(ret == 1){
|
||||
if(ret == 1 && g_info[0].valid_data){
|
||||
Space_Type1 *sample = (Space_Type1*)g_samples[0];
|
||||
PRINT_SAMPLE("ddsc_take_next::reader: Read", (*sample));
|
||||
|
||||
|
@ -593,10 +600,13 @@ Test(ddsc_take_next, reader, .init=reader_iterator_init, .fini=reader_iterator_f
|
|||
cr_assert_eq(g_info[0].view_state, expected_vst);
|
||||
cr_assert_eq(g_info[0].instance_state, expected_ist);
|
||||
cnt ++;
|
||||
} else if (ret == 1 && !g_info[0].valid_data) {
|
||||
cntinv ++;
|
||||
}
|
||||
}
|
||||
|
||||
cr_assert_eq(cnt, RDR_NOT_READ_CNT);
|
||||
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
|
||||
|
||||
/* All samples should still be available. */
|
||||
ret = samples_cnt();
|
||||
|
@ -671,13 +681,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_take_next, invalid_buffers, .in
|
|||
/*************************************************************************************************/
|
||||
Test(ddsc_take_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
|
||||
{
|
||||
dds_return_t cnt = 0;
|
||||
dds_return_t cnt = 0, cntinv = 0;
|
||||
dds_return_t ret = 1;
|
||||
|
||||
while (ret == 1){
|
||||
ret = dds_take_next_wl(g_reader, g_loans, g_info);
|
||||
cr_assert_geq(ret, 0 , "# read %d", ret);
|
||||
if(ret == 1){
|
||||
if(ret == 1 && g_info[0].valid_data){
|
||||
Space_Type1 *sample = (Space_Type1*)g_loans[0];
|
||||
PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample));
|
||||
|
||||
|
@ -700,10 +710,13 @@ Test(ddsc_take_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterato
|
|||
cr_assert_eq(g_info[0].view_state, expected_vst);
|
||||
cr_assert_eq(g_info[0].instance_state, expected_ist);
|
||||
cnt ++;
|
||||
} else if (ret == 1 && !g_info[0].valid_data) {
|
||||
cntinv ++;
|
||||
}
|
||||
}
|
||||
|
||||
cr_assert_eq(cnt, RDR_NOT_READ_CNT);
|
||||
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
|
||||
|
||||
ret = dds_return_loan(g_reader, g_loans, ret);
|
||||
cr_assert_eq (ret, DDS_RETCODE_OK);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue