Merge pull request #11 from eboasson/master

Fix dds_sample_info.publication_handle incorrectly set to 1
This commit is contained in:
eboasson 2018-07-06 21:32:52 +02:00 committed by GitHub
commit 0da8f28265
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 343 additions and 385 deletions

View file

@ -173,8 +173,6 @@ typedef struct dds_sample_info
uint32_t generation_rank; uint32_t generation_rank;
/** difference in generations between the sample and most recent sample of the same instance when read/take was called */ /** difference in generations between the sample and most recent sample of the same instance when read/take was called */
uint32_t absolute_generation_rank; uint32_t absolute_generation_rank;
/** timestamp of a data instance when it is added to a read queue */
dds_time_t reception_timestamp; /* NOTE: VLite extension */
} }
dds_sample_info_t; dds_sample_info_t;

View file

@ -25,8 +25,6 @@ struct rhc;
struct nn_xqos; struct nn_xqos;
struct serdata; struct serdata;
struct tkmap_instance; struct tkmap_instance;
struct tkmap;
struct nn_rsample_info;
struct proxy_writer_info; struct proxy_writer_info;
struct rhc * dds_rhc_new (dds_reader * reader, const struct sertopic * topic); struct rhc * dds_rhc_new (dds_reader * reader, const struct sertopic * topic);
@ -37,7 +35,7 @@ uint32_t dds_rhc_lock_samples (struct rhc * rhc);
DDS_EXPORT bool dds_rhc_store DDS_EXPORT bool dds_rhc_store
( (
struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo, struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info,
struct serdata * __restrict sample, struct tkmap_instance * __restrict tk struct serdata * __restrict sample, struct tkmap_instance * __restrict tk
); );
void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info); void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);

View file

@ -38,7 +38,6 @@ void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk);
uint64_t dds_tkmap_lookup (_In_ struct tkmap *tkmap, _In_ const struct serdata *serdata); uint64_t dds_tkmap_lookup (_In_ struct tkmap *tkmap, _In_ const struct serdata *serdata);
_Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample); _Check_return_ bool dds_tkmap_get_key (_In_ struct tkmap * map, _In_ uint64_t iid, _Out_ void * sample);
_Check_return_ struct tkmap_instance * dds_tkmap_find( _Check_return_ struct tkmap_instance * dds_tkmap_find(
_In_opt_ const struct dds_topic * topic,
_In_ struct serdata * sd, _In_ struct serdata * sd,
_In_ const bool rd, _In_ const bool rd,
_In_ const bool create); _In_ const bool create);

View file

@ -263,14 +263,14 @@ void ddsi_plugin_init (void)
/* Register read cache functions */ /* Register read cache functions */
ddsi_plugin.rhc_free_fn = dds_rhc_free; ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free;
ddsi_plugin.rhc_fini_fn = dds_rhc_fini; ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini;
ddsi_plugin.rhc_store_fn = dds_rhc_store; ddsi_plugin.rhc_plugin.rhc_store_fn = dds_rhc_store;
ddsi_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr; ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr;
ddsi_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership; ddsi_plugin.rhc_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership;
ddsi_plugin.rhc_set_qos_fn = dds_rhc_set_qos; ddsi_plugin.rhc_plugin.rhc_set_qos_fn = dds_rhc_set_qos;
ddsi_plugin.rhc_lookup_fn = dds_tkmap_lookup_instance_ref; ddsi_plugin.rhc_plugin.rhc_lookup_fn = dds_tkmap_lookup_instance_ref;
ddsi_plugin.rhc_unref_fn = dds_tkmap_instance_unref; ddsi_plugin.rhc_plugin.rhc_unref_fn = dds_tkmap_instance_unref;
/* Register iid generator */ /* Register iid generator */

View file

@ -71,7 +71,7 @@ dds_instance_find(
_In_ const bool create) _In_ const bool create)
{ {
serdata_t sd = serialize_key (gv.serpool, topic->m_stopic, data); serdata_t sd = serialize_key (gv.serpool, topic->m_stopic, data);
struct tkmap_instance * inst = dds_tkmap_find (topic, sd, false, create); struct tkmap_instance * inst = dds_tkmap_find (sd, false, create);
ddsi_serdata_unref (sd); ddsi_serdata_unref (sd);
return inst; return inst;
} }

View file

@ -17,6 +17,14 @@
#include "ddsi/q_bswap.h" #include "ddsi/q_bswap.h"
#include "ddsi/q_md5.h" #include "ddsi/q_md5.h"
#ifndef NDEBUG
static bool keyhash_is_reset(const dds_key_hash_t *kh)
{
static const char nullhash[sizeof(kh->m_hash)];
return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0;
}
#endif
void dds_key_md5 (dds_key_hash_t * kh) void dds_key_md5 (dds_key_hash_t * kh)
{ {
md5_state_t md5st; md5_state_t md5st;
@ -43,8 +51,14 @@ void dds_key_gen
uint32_t len = 0; uint32_t len = 0;
char * dst; char * dst;
assert (desc->m_nkeys); assert(keyhash_is_reset(kh));
assert (kh->m_hash[0] == 0 && kh->m_hash[15] == 0);
if (desc->m_nkeys == 0)
{
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH;
kh->m_key_len = sizeof (kh->m_hash);
return;
}
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET;

View file

@ -210,6 +210,13 @@ static int lwregs_delete (struct lwregs *rt, uint64_t iid, uint64_t wr_iid)
return ut_ehhRemove (rt->regs, &dummy); return ut_ehhRemove (rt->regs, &dummy);
} }
void lwregs_dump (struct lwregs *rt)
{
struct ut_ehhIter it;
for (struct lwreg *r = ut_ehhIterFirst(rt->regs, &it); r; r = ut_ehhIterNext(&it))
printf("iid=%"PRIu64" wr_iid=%"PRIu64"\n", r->iid, r->wr_iid);
}
/************************* /*************************
****** RHC ****** ****** RHC ******
*************************/ *************************/
@ -219,7 +226,6 @@ struct rhc_sample
struct serdata *sample; /* serialised data (either just_key or real data) */ struct serdata *sample; /* serialised data (either just_key or real data) */
struct rhc_sample *next; /* next sample in time ordering, or oldest sample if most recent */ struct rhc_sample *next; /* next sample in time ordering, or oldest sample if most recent */
uint64_t wr_iid; /* unique id for writer of this sample (perhaps better in serdata) */ uint64_t wr_iid; /* unique id for writer of this sample (perhaps better in serdata) */
nn_wctime_t rtstamp; /* reception timestamp (not really required; perhaps better in serdata) */
bool isread; /* READ or NOT_READ sample state */ bool isread; /* READ or NOT_READ sample state */
unsigned disposed_gen; /* snapshot of instance counter at time of insertion */ unsigned disposed_gen; /* snapshot of instance counter at time of insertion */
unsigned no_writers_gen; /* __/ */ unsigned no_writers_gen; /* __/ */
@ -228,15 +234,16 @@ struct rhc_sample
struct rhc_instance struct rhc_instance
{ {
uint64_t iid; /* unique instance id, key of table, also serves as instance handle */ uint64_t iid; /* unique instance id, key of table, also serves as instance handle */
uint64_t wr_iid; /* unique of id of writer of latest sample or 0 */ uint64_t wr_iid; /* unique of id of writer of latest sample or 0; if wrcount = 0 it is the wr_iid that caused */
struct rhc_sample *latest; /* latest received sample; circular list old->new; null if no sample */ struct rhc_sample *latest; /* latest received sample; circular list old->new; null if no sample */
unsigned nvsamples; /* number of "valid" samples in instance */ unsigned nvsamples; /* number of "valid" samples in instance */
unsigned nvread; /* number of READ "valid" samples in instance (0 <= nvread <= nvsamples) */ unsigned nvread; /* number of READ "valid" samples in instance (0 <= nvread <= nvsamples) */
uint32_t wrcount; /* number of live writers */ uint32_t wrcount; /* number of live writers */
bool isnew; /* NEW or NOT_NEW view state */ unsigned isnew : 1; /* NEW or NOT_NEW view state */
bool a_sample_free; /* whether or not a_sample is in use */ unsigned a_sample_free : 1; /* whether or not a_sample is in use */
bool isdisposed; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */ unsigned isdisposed : 1; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */
bool has_changed; /* To track changes in an instance - if number of samples are added or data is overwritten */ unsigned has_changed : 1; /* To track changes in an instance - if number of samples are added or data is overwritten */
unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */
unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */ unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */
unsigned inv_isread : 1; /* whether or not that state change has been read before */ unsigned inv_isread : 1; /* whether or not that state change has been read before */
unsigned disposed_gen; /* bloody generation counters - worst invention of mankind */ unsigned disposed_gen; /* bloody generation counters - worst invention of mankind */
@ -312,9 +319,7 @@ struct trigger_info
#define INST_HAS_UNREAD(i) (INST_NREAD (i) < INST_NSAMPLES (i)) #define INST_HAS_UNREAD(i) (INST_NREAD (i) < INST_NSAMPLES (i))
static unsigned qmask_of_inst (const struct rhc_instance *inst); static unsigned qmask_of_inst (const struct rhc_instance *inst);
static bool update_conditions_locked static bool update_conditions_locked (struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct serdata *sample);
(struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct serdata *sample);
static void signal_conditions (struct rhc *rhc);
#ifndef NDEBUG #ifndef NDEBUG
static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds); static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds);
#endif #endif
@ -415,7 +420,7 @@ static struct rhc_sample * alloc_sample (struct rhc_instance *inst)
{ {
if (inst->a_sample_free) if (inst->a_sample_free)
{ {
inst->a_sample_free = false; inst->a_sample_free = 0;
#if USE_VALGRIND #if USE_VALGRIND
VALGRIND_MAKE_MEM_UNDEFINED (&inst->a_sample, sizeof (inst->a_sample)); VALGRIND_MAKE_MEM_UNDEFINED (&inst->a_sample, sizeof (inst->a_sample));
#endif #endif
@ -439,7 +444,7 @@ static void free_sample (struct rhc_instance *inst, struct rhc_sample *s)
#if USE_VALGRIND #if USE_VALGRIND
VALGRIND_MAKE_MEM_NOACCESS (&inst->a_sample, sizeof (inst->a_sample)); VALGRIND_MAKE_MEM_NOACCESS (&inst->a_sample, sizeof (inst->a_sample));
#endif #endif
inst->a_sample_free = true; inst->a_sample_free = 1;
} }
else else
{ {
@ -556,7 +561,7 @@ static void get_trigger_info (struct trigger_info *info, struct rhc_instance *in
/* reset instance has_changed before adding/overwriting a sample */ /* reset instance has_changed before adding/overwriting a sample */
if (pre) if (pre)
{ {
inst->has_changed = false; inst->has_changed = 0;
} }
info->has_changed = inst->has_changed; info->has_changed = inst->has_changed;
} }
@ -571,13 +576,12 @@ static bool add_sample
( (
struct rhc * rhc, struct rhc * rhc,
struct rhc_instance * inst, struct rhc_instance * inst,
const struct nn_rsample_info * sampleinfo, const struct proxy_writer_info * pwr_info,
const struct serdata * sample, const struct serdata * sample,
status_cb_data_t * cb_data status_cb_data_t * cb_data
) )
{ {
struct rhc_sample *s; struct rhc_sample *s;
assert (sample->v.bswap == sampleinfo->bswap);
/* Adding a sample always clears an invalid sample (because the information /* Adding a sample always clears an invalid sample (because the information
contained in the invalid sample - the instance state and the generation contained in the invalid sample - the instance state and the generation
@ -604,7 +608,7 @@ static bool add_sample
} }
/* set a flag to indicate instance has changed to notify data_available since the sample is overwritten */ /* set a flag to indicate instance has changed to notify data_available since the sample is overwritten */
inst->has_changed = true; inst->has_changed = 1;
} }
else else
{ {
@ -648,8 +652,7 @@ static bool add_sample
} }
s->sample = ddsi_serdata_ref ((serdata_t) sample); /* drops const (tho refcount does change) */ s->sample = ddsi_serdata_ref ((serdata_t) sample); /* drops const (tho refcount does change) */
s->wr_iid = sampleinfo->pwr_info.iid; s->wr_iid = pwr_info->iid;
s->rtstamp = sampleinfo->reception_timestamp;
s->isread = false; s->isread = false;
s->disposed_gen = inst->disposed_gen; s->disposed_gen = inst->disposed_gen;
s->no_writers_gen = inst->no_writers_gen; s->no_writers_gen = inst->no_writers_gen;
@ -670,16 +673,15 @@ static bool content_filter_accepts (const struct sertopic * topic, const struct
return ret; return ret;
} }
static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct nn_rsample_info *sampleinfo) static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct proxy_writer_info *pwr_info)
{ {
return inst->wr_iid == sampleinfo->pwr_info.iid || return (inst->wr_iid_islive && inst->wr_iid == pwr_info->iid) || memcmp (&pwr_info->guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0;
memcmp (&sampleinfo->pwr_info.guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0;
} }
static int inst_accepts_sample static int inst_accepts_sample
( (
const struct rhc *rhc, const struct rhc_instance *inst, const struct rhc *rhc, const struct rhc_instance *inst,
const struct nn_rsample_info *sampleinfo, const struct proxy_writer_info *pwr_info,
const struct serdata *sample, const bool has_data const struct serdata *sample, const bool has_data
) )
{ {
@ -693,7 +695,7 @@ static int inst_accepts_sample
{ {
return 0; return 0;
} }
else if (inst_accepts_sample_by_writer_guid (inst, sampleinfo)) else if (inst_accepts_sample_by_writer_guid (inst, pwr_info))
{ {
/* ok */ /* ok */
} }
@ -702,14 +704,14 @@ static int inst_accepts_sample
return 0; return 0;
} }
} }
if (rhc->exclusive_ownership && inst->wr_iid != sampleinfo->pwr_info.iid) if (rhc->exclusive_ownership && inst->wr_iid_islive && inst->wr_iid != pwr_info->iid)
{ {
uint32_t strength = sampleinfo->pwr_info.ownership_strength; uint32_t strength = pwr_info->ownership_strength;
if (strength > inst->strength) { if (strength > inst->strength) {
/* ok */ /* ok */
} else if (strength < inst->strength) { } else if (strength < inst->strength) {
return 0; return 0;
} else if (inst_accepts_sample_by_writer_guid (inst, sampleinfo)) { } else if (inst_accepts_sample_by_writer_guid (inst, pwr_info)) {
/* ok */ /* ok */
} else { } else {
return 0; return 0;
@ -725,14 +727,16 @@ static int inst_accepts_sample
static void update_inst static void update_inst
( (
const struct rhc *rhc, struct rhc_instance *inst, const struct rhc *rhc, struct rhc_instance *inst,
const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp) const struct proxy_writer_info * __restrict pwr_info, bool wr_iid_valid, nn_wctime_t tstamp)
{ {
if (inst->wr_iid != pwr_info->iid)
{
inst->wr_guid = pwr_info->guid;
}
inst->tstamp = tstamp; inst->tstamp = tstamp;
inst->wr_iid = (inst->wrcount == 0) ? 0 : pwr_info->iid; inst->wr_iid_islive = wr_iid_valid;
if (wr_iid_valid)
{
inst->wr_iid = pwr_info->iid;
if (inst->wr_iid != pwr_info->iid)
inst->wr_guid = pwr_info->guid;
}
inst->strength = pwr_info->ownership_strength; inst->strength = pwr_info->ownership_strength;
} }
@ -752,6 +756,8 @@ static void drop_instance_noupdate_no_writers (struct rhc *rhc, struct rhc_insta
static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update) static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update)
{ {
const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0;
TRACE ((" register:")); TRACE ((" register:"));
/* Is an implicitly registering dispose semantically equivalent to /* Is an implicitly registering dispose semantically equivalent to
@ -762,7 +768,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
Is a dispose a sample? I don't think so (though a write dispose Is a dispose a sample? I don't think so (though a write dispose
is). Is a pure register a sample? Don't think so either. */ is). Is a pure register a sample? Don't think so either. */
if (inst->wr_iid == wr_iid) if (inst_wr_iid == wr_iid)
{ {
/* Same writer as last time => we know it is registered already. /* Same writer as last time => we know it is registered already.
This is the fast path -- we don't have to check anything This is the fast path -- we don't have to check anything
@ -775,12 +781,13 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
if (inst->wrcount == 0) if (inst->wrcount == 0)
{ {
/* Currently no writers at all */ /* Currently no writers at all */
assert (inst->wr_iid == 0); assert (!inst->wr_iid_islive);
/* to avoid wr_iid update when register is called for sample rejected */ /* to avoid wr_iid update when register is called for sample rejected */
if (iid_update) if (iid_update)
{ {
inst->wr_iid = wr_iid; inst->wr_iid = wr_iid;
inst->wr_iid_islive = 1;
} }
inst->wrcount++; inst->wrcount++;
inst->no_writers_gen++; inst->no_writers_gen++;
@ -789,7 +796,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
if (!INST_IS_EMPTY (inst) && !inst->isdisposed) if (!INST_IS_EMPTY (inst) && !inst->isdisposed)
rhc->n_not_alive_no_writers--; rhc->n_not_alive_no_writers--;
} }
else if (inst->wr_iid == 0 && inst->wrcount == 1) else if (inst_wr_iid == 0 && inst->wrcount == 1)
{ {
/* Writers exist, but wr_iid is null => someone unregistered. /* Writers exist, but wr_iid is null => someone unregistered.
@ -822,6 +829,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
if (iid_update) if (iid_update)
{ {
inst->wr_iid = wr_iid; inst->wr_iid = wr_iid;
inst->wr_iid_islive = 1;
} }
} }
else else
@ -833,7 +841,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
/* 2nd writer => properly register the one we knew about */ /* 2nd writer => properly register the one we knew about */
TRACE (("rescue1")); TRACE (("rescue1"));
int x; int x;
x = lwregs_add (&rhc->registrations, inst->iid, inst->wr_iid); x = lwregs_add (&rhc->registrations, inst->iid, inst_wr_iid);
assert (x); assert (x);
(void) x; (void) x;
} }
@ -855,6 +863,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
if (iid_update) if (iid_update)
{ {
inst->wr_iid = wr_iid; inst->wr_iid = wr_iid;
inst->wr_iid_islive = 1;
} }
} }
} }
@ -863,10 +872,7 @@ static void account_for_empty_to_nonempty_transition (struct rhc *rhc, struct rh
{ {
assert (INST_NSAMPLES (inst) == 1); assert (INST_NSAMPLES (inst) == 1);
add_inst_to_nonempty_list (rhc, inst); add_inst_to_nonempty_list (rhc, inst);
if (inst->isnew) rhc->n_new += inst->isnew;
{
rhc->n_new++;
}
if (inst->isdisposed) if (inst->isdisposed)
rhc->n_not_alive_disposed++; rhc->n_not_alive_disposed++;
else if (inst->wrcount == 0) else if (inst->wrcount == 0)
@ -881,8 +887,9 @@ static int rhc_unregister_isreg_w_sideeffects (struct rhc *rhc, const struct rhc
TRACE (("unknown(#0)")); TRACE (("unknown(#0)"));
return 0; return 0;
} }
else if (inst->wrcount == 1 && inst->wr_iid != 0) else if (inst->wrcount == 1 && inst->wr_iid_islive)
{ {
assert(inst->wr_iid != 0);
if (wr_iid != inst->wr_iid) if (wr_iid != inst->wr_iid)
{ {
TRACE (("unknown(cache)")); TRACE (("unknown(cache)"));
@ -907,7 +914,7 @@ static int rhc_unregister_isreg_w_sideeffects (struct rhc *rhc, const struct rhc
afterward there will be 1 writer, it will be cached, and its afterward there will be 1 writer, it will be cached, and its
registration record must go (invariant that with wrcount = 1 registration record must go (invariant that with wrcount = 1
and wr_iid != 0 the wr_iid is not in "registrations") */ and wr_iid != 0 the wr_iid is not in "registrations") */
if (inst->wrcount == 2 && inst->wr_iid != wr_iid) if (inst->wrcount == 2 && inst->wr_iid_islive && inst->wr_iid != wr_iid)
{ {
TRACE ((",delreg(remain)")); TRACE ((",delreg(remain)"));
lwregs_delete (&rhc->registrations, inst->iid, inst->wr_iid); lwregs_delete (&rhc->registrations, inst->iid, inst->wr_iid);
@ -923,56 +930,62 @@ static int rhc_unregister_updateinst
{ {
assert (inst->wrcount > 0); assert (inst->wrcount > 0);
if (pwr_info->iid == inst->wr_iid)
{
/* Next register will have to do real work before we have a cached
wr_iid again */
inst->wr_iid = 0;
/* Reset the ownership strength to allow samples to be read from other
writer(s) */
inst->strength = 0;
TRACE ((",clearcache"));
}
if (--inst->wrcount > 0) if (--inst->wrcount > 0)
return 0;
else if (!INST_IS_EMPTY (inst))
{ {
/* Instance still has content - do not drop until application if (inst->wr_iid_islive && pwr_info->iid == inst->wr_iid)
{
/* Next register will have to do real work before we have a cached
wr_iid again */
inst->wr_iid_islive = 0;
/* Reset the ownership strength to allow samples to be read from other
writer(s) */
inst->strength = 0;
TRACE ((",clearcache"));
}
return 0;
}
else
{
if (!INST_IS_EMPTY (inst))
{
/* Instance still has content - do not drop until application
takes the last sample. Set the invalid sample if the latest takes the last sample. Set the invalid sample if the latest
sample has been read already, so that the application can sample has been read already, so that the application can
read the change to not-alive. (If the latest sample is still read the change to not-alive. (If the latest sample is still
unread, we don't bother, even though it means the application unread, we don't bother, even though it means the application
won't see the timestamp for the unregister event. It shouldn't won't see the timestamp for the unregister event. It shouldn't
care.) */ care.) */
if (inst->latest == NULL /*|| inst->latest->isread*/) if (inst->latest == NULL || inst->latest->isread)
{
inst_set_invsample (rhc, inst);
update_inst (rhc, inst, pwr_info, false, tstamp);
}
if (!inst->isdisposed)
{
rhc->n_not_alive_no_writers++;
}
inst->wr_iid_islive = 0;
return 0;
}
else if (inst->isdisposed)
{ {
/* No content left, no registrations left, so drop */
TRACE ((",#0,empty,disposed,drop"));
drop_instance_noupdate_no_writers (rhc, inst);
return 1;
}
else
{
/* Add invalid samples for transition to no-writers */
TRACE ((",#0,empty,nowriters"));
assert (INST_IS_EMPTY (inst));
inst_set_invsample (rhc, inst); inst_set_invsample (rhc, inst);
update_inst (rhc, inst, pwr_info, tstamp); update_inst (rhc, inst, pwr_info, false, tstamp);
account_for_empty_to_nonempty_transition (rhc, inst);
inst->wr_iid_islive = 0;
return 0;
} }
if (!inst->isdisposed)
{
rhc->n_not_alive_no_writers++;
}
return 0;
}
else if (inst->isdisposed)
{
/* No content left, no registrations left, so drop */
TRACE ((",#0,empty,disposed,drop"));
drop_instance_noupdate_no_writers (rhc, inst);
return 1;
}
else
{
/* Add invalid samples for transition to no-writers */
TRACE ((",#0,empty,nowriters"));
assert (INST_IS_EMPTY (inst));
inst_set_invsample (rhc, inst);
update_inst (rhc, inst, pwr_info, tstamp);
account_for_empty_to_nonempty_transition (rhc, inst);
return 0;
} }
} }
@ -1004,7 +1017,7 @@ static void dds_rhc_unregister
static struct rhc_instance * alloc_new_instance static struct rhc_instance * alloc_new_instance
( (
const struct rhc *rhc, const struct rhc *rhc,
const struct nn_rsample_info *sampleinfo, const struct proxy_writer_info *pwr_info,
struct serdata *serdata, struct serdata *serdata,
struct tkmap_instance *tk struct tkmap_instance *tk
) )
@ -1017,11 +1030,15 @@ static struct rhc_instance * alloc_new_instance
inst->tk = tk; inst->tk = tk;
inst->wrcount = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; inst->wrcount = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1;
inst->isdisposed = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_DISPOSE); inst->isdisposed = (serdata->v.msginfo.statusinfo & NN_STATUSINFO_DISPOSE);
inst->isnew = true; inst->isnew = 1;
inst->inv_exists = 0; inst->inv_exists = 0;
inst->inv_isread = 0; /* don't care */ inst->inv_isread = 0; /* don't care */
inst->a_sample_free = true; inst->a_sample_free = 1;
update_inst (rhc, inst, &sampleinfo->pwr_info, serdata->v.msginfo.timestamp); inst->wr_iid = pwr_info->iid;
inst->wr_iid_islive = (inst->wrcount != 0);
inst->wr_guid = pwr_info->guid;
inst->tstamp = serdata->v.msginfo.timestamp;
inst->strength = pwr_info->ownership_strength;
return inst; return inst;
} }
@ -1029,7 +1046,7 @@ static rhc_store_result_t rhc_store_new_instance
( (
struct trigger_info * post, struct trigger_info * post,
struct rhc *rhc, struct rhc *rhc,
const struct nn_rsample_info *sampleinfo, const struct proxy_writer_info *pwr_info,
struct serdata *sample, struct serdata *sample,
struct tkmap_instance *tk, struct tkmap_instance *tk,
const bool has_data, const bool has_data,
@ -1069,10 +1086,10 @@ static rhc_store_result_t rhc_store_new_instance
return RHC_REJECTED; return RHC_REJECTED;
} }
inst = alloc_new_instance (rhc, sampleinfo, sample, tk); inst = alloc_new_instance (rhc, pwr_info, sample, tk);
if (has_data) if (has_data)
{ {
if (!add_sample (rhc, inst, sampleinfo, sample, cb_data)) if (!add_sample (rhc, inst, pwr_info, sample, cb_data))
{ {
free_instance (inst, rhc); free_instance (inst, rhc);
return RHC_REJECTED; return RHC_REJECTED;
@ -1102,11 +1119,11 @@ static rhc_store_result_t rhc_store_new_instance
bool dds_rhc_store bool dds_rhc_store
( (
struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo, struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info,
struct serdata * __restrict sample, struct tkmap_instance * __restrict tk struct serdata * __restrict sample, struct tkmap_instance * __restrict tk
) )
{ {
const uint64_t wr_iid = sampleinfo->pwr_info.iid; const uint64_t wr_iid = pwr_info->iid;
const unsigned statusinfo = sample->v.msginfo.statusinfo; const unsigned statusinfo = sample->v.msginfo.statusinfo;
const bool has_data = (sample->v.st->kind == STK_DATA); const bool has_data = (sample->v.st->kind == STK_DATA);
const int is_dispose = (statusinfo & NN_STATUSINFO_DISPOSE) != 0; const int is_dispose = (statusinfo & NN_STATUSINFO_DISPOSE) != 0;
@ -1150,7 +1167,7 @@ bool dds_rhc_store
else else
{ {
TRACE ((" new instance")); TRACE ((" new instance"));
stored = rhc_store_new_instance (&post, rhc, sampleinfo, sample, tk, has_data, &cb_data); stored = rhc_store_new_instance (&post, rhc, pwr_info, sample, tk, has_data, &cb_data);
if (stored != RHC_STORED) if (stored != RHC_STORED)
{ {
goto error_or_nochange; goto error_or_nochange;
@ -1158,7 +1175,7 @@ bool dds_rhc_store
init_trigger_info_nonmatch (&pre); init_trigger_info_nonmatch (&pre);
} }
} }
else if (!inst_accepts_sample (rhc, inst, sampleinfo, sample, has_data)) else if (!inst_accepts_sample (rhc, inst, pwr_info, sample, has_data))
{ {
/* Rejected samples (and disposes) should still register the writer; /* Rejected samples (and disposes) should still register the writer;
unregister *must* be processed, or we have a memory leak. (We unregister *must* be processed, or we have a memory leak. (We
@ -1175,7 +1192,7 @@ bool dds_rhc_store
} }
if (statusinfo & NN_STATUSINFO_UNREGISTER) if (statusinfo & NN_STATUSINFO_UNREGISTER)
{ {
dds_rhc_unregister (&post, rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp); dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->v.msginfo.timestamp);
} }
else else
{ {
@ -1195,6 +1212,8 @@ bool dds_rhc_store
{ {
get_trigger_info (&pre, inst, true); get_trigger_info (&pre, inst, true);
TRACE ((" wc %"PRIu32, inst->wrcount));
if (has_data || is_dispose) if (has_data || is_dispose)
{ {
/* View state must be NEW following receipt of a sample when /* View state must be NEW following receipt of a sample when
@ -1222,7 +1241,7 @@ bool dds_rhc_store
if (has_data && not_alive) if (has_data && not_alive)
{ {
TRACE ((" notalive->alive")); TRACE ((" notalive->alive"));
inst->isnew = true; inst->isnew = 1;
} }
/* Desired effect on instance state and disposed_gen: /* Desired effect on instance state and disposed_gen:
@ -1235,12 +1254,12 @@ bool dds_rhc_store
if (has_data && inst->isdisposed) if (has_data && inst->isdisposed)
{ {
TRACE ((" disposed->notdisposed")); TRACE ((" disposed->notdisposed"));
inst->isdisposed = false; inst->isdisposed = 0;
inst->disposed_gen++; inst->disposed_gen++;
} }
if (is_dispose) if (is_dispose)
{ {
inst->isdisposed = true; inst->isdisposed = 1;
inst_became_disposed = !old_isdisposed; inst_became_disposed = !old_isdisposed;
TRACE ((" dispose(%d)", inst_became_disposed)); TRACE ((" dispose(%d)", inst_became_disposed));
} }
@ -1250,7 +1269,7 @@ bool dds_rhc_store
if (has_data) if (has_data)
{ {
TRACE ((" add_sample")); TRACE ((" add_sample"));
if (! add_sample (rhc, inst, sampleinfo, sample, &cb_data)) if (! add_sample (rhc, inst, pwr_info, sample, &cb_data))
{ {
TRACE (("(reject)")); TRACE (("(reject)"));
stored = RHC_REJECTED; stored = RHC_REJECTED;
@ -1262,7 +1281,7 @@ bool dds_rhc_store
if (inst_became_disposed && (inst->latest == NULL )) if (inst_became_disposed && (inst->latest == NULL ))
inst_set_invsample (rhc, inst); inst_set_invsample (rhc, inst);
update_inst (rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp); update_inst (rhc, inst, pwr_info, true, sample->v.msginfo.timestamp);
/* Can only add samples => only need to give special treatment /* Can only add samples => only need to give special treatment
to instances that were empty before. It is, however, not to instances that were empty before. It is, however, not
@ -1280,7 +1299,7 @@ bool dds_rhc_store
else else
{ {
rhc->n_not_alive_disposed += inst->isdisposed - old_isdisposed; rhc->n_not_alive_disposed += inst->isdisposed - old_isdisposed;
rhc->n_new += (inst->isnew ? 1 : 0) - (old_isnew ? 1 : 0); rhc->n_new += inst->isnew - old_isnew;
} }
} }
else else
@ -1306,7 +1325,7 @@ bool dds_rhc_store
mean an application reading "x" after the write and reading it mean an application reading "x" after the write and reading it
again after the unregister will see a change in the again after the unregister will see a change in the
no_writers_generation field? */ no_writers_generation field? */
dds_rhc_unregister (&post, rhc, inst, &sampleinfo->pwr_info, sample->v.msginfo.timestamp); dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->v.msginfo.timestamp);
} }
else else
{ {
@ -1390,7 +1409,7 @@ void dds_rhc_unregister_wr
TRACE (("rhc_unregister_wr_iid(%"PRIx64",%d:\n", wr_iid, auto_dispose)); TRACE (("rhc_unregister_wr_iid(%"PRIx64",%d:\n", wr_iid, auto_dispose));
for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter)) for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter))
{ {
if (inst->wr_iid == wr_iid || lwregs_contains (&rhc->registrations, inst->iid, wr_iid)) if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid))
{ {
struct trigger_info pre, post; struct trigger_info pre, post;
get_trigger_info (&pre, inst, true); get_trigger_info (&pre, inst, true);
@ -1400,7 +1419,7 @@ void dds_rhc_unregister_wr
assert (inst->wrcount > 0); assert (inst->wrcount > 0);
if (auto_dispose && !inst->isdisposed) if (auto_dispose && !inst->isdisposed)
{ {
inst->isdisposed = true; inst->isdisposed = 1;
/* Set invalid sample for disposing it (unregister may also set it for unregistering) */ /* Set invalid sample for disposing it (unregister may also set it for unregistering) */
if (inst->latest) if (inst->latest)
@ -1412,7 +1431,6 @@ void dds_rhc_unregister_wr
{ {
const bool was_empty = INST_IS_EMPTY (inst); const bool was_empty = INST_IS_EMPTY (inst);
inst_set_invsample (rhc, inst); inst_set_invsample (rhc, inst);
update_inst (rhc, inst, pwr_info, inst->tstamp);
if (was_empty) if (was_empty)
account_for_empty_to_nonempty_transition (rhc, inst); account_for_empty_to_nonempty_transition (rhc, inst);
else else
@ -1452,9 +1470,9 @@ void dds_rhc_relinquish_ownership (struct rhc * __restrict rhc, const uint64_t w
TRACE (("rhc_relinquish_ownership(%"PRIx64":\n", wr_iid)); TRACE (("rhc_relinquish_ownership(%"PRIx64":\n", wr_iid));
for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter)) for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter))
{ {
if (inst->wr_iid == wr_iid) if (inst->wr_iid_islive && inst->wr_iid == wr_iid)
{ {
inst->wr_iid = 0; inst->wr_iid_islive = 0;
} }
} }
TRACE ((")\n")); TRACE ((")\n"));
@ -1564,7 +1582,6 @@ static void set_sample_info (dds_sample_info_t *si, const struct rhc_instance *i
si->absolute_generation_rank = (inst->disposed_gen + inst->no_writers_gen) - (sample->disposed_gen + sample->no_writers_gen); si->absolute_generation_rank = (inst->disposed_gen + inst->no_writers_gen) - (sample->disposed_gen + sample->no_writers_gen);
si->valid_data = true; si->valid_data = true;
si->source_timestamp = sample->sample->v.msginfo.timestamp.v; si->source_timestamp = sample->sample->v.msginfo.timestamp.v;
si->reception_timestamp = sample->rtstamp.v;
} }
static void set_sample_info_invsample (dds_sample_info_t *si, const struct rhc_instance *inst) static void set_sample_info_invsample (dds_sample_info_t *si, const struct rhc_instance *inst)
@ -1581,9 +1598,6 @@ static void set_sample_info_invsample (dds_sample_info_t *si, const struct rhc_i
si->absolute_generation_rank = 0; si->absolute_generation_rank = 0;
si->valid_data = false; si->valid_data = false;
si->source_timestamp = inst->tstamp.v; si->source_timestamp = inst->tstamp.v;
/* Not storing the underlying "sample" so the reception time is lost */
si->reception_timestamp = 0;
} }
static void patch_generations (dds_sample_info_t *si, uint32_t last_of_inst) static void patch_generations (dds_sample_info_t *si, uint32_t last_of_inst)
@ -1692,7 +1706,7 @@ static int dds_rhc_read_w_qminv
if (n > n_first && inst->isnew) if (n > n_first && inst->isnew)
{ {
inst->isnew = false; inst->isnew = 0;
rhc->n_new--; rhc->n_new--;
} }
if (nread != INST_NREAD (inst)) if (nread != INST_NREAD (inst))
@ -1834,7 +1848,7 @@ static int dds_rhc_take_w_qminv
if (n > n_first && inst->isnew) if (n > n_first && inst->isnew)
{ {
inst->isnew = false; inst->isnew = 0;
rhc->n_new--; rhc->n_new--;
} }
@ -1983,7 +1997,7 @@ static int dds_rhc_takecdr_w_qminv
if (n > n_first && inst->isnew) if (n > n_first && inst->isnew)
{ {
inst->isnew = false; inst->isnew = 0;
rhc->n_new--; rhc->n_new--;
} }

View file

@ -1576,6 +1576,14 @@ static uint32_t dds_stream_get_keyhash
return (uint32_t) (dst - origin); return (uint32_t) (dst - origin);
} }
#ifndef NDEBUG
static bool keyhash_is_reset(const dds_key_hash_t *kh)
{
static const char nullhash[sizeof(kh->m_hash)];
return kh->m_flags == 0 && memcmp(kh->m_hash, nullhash, sizeof(nullhash)) == 0;
}
#endif
void dds_stream_read_keyhash void dds_stream_read_keyhash
( (
dds_stream_t * is, dds_stream_t * is,
@ -1586,14 +1594,19 @@ void dds_stream_read_keyhash
{ {
char * dst; char * dst;
assert (desc->m_keys); assert (keyhash_is_reset(kh));
if (desc->m_nkeys == 0)
{
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH;
return;
}
/* Select key buffer to use */ /* Select key buffer to use */
kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET; kh->m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET;
if (desc->m_flagset & DDS_TOPIC_FIXED_KEY) if (desc->m_flagset & DDS_TOPIC_FIXED_KEY)
{ {
memset (kh->m_hash, 0, 16);
kh->m_flags |= DDS_KEY_IS_HASH; kh->m_flags |= DDS_KEY_IS_HASH;
dst = kh->m_hash; dst = kh->m_hash;
} }
@ -1616,7 +1629,6 @@ void dds_stream_read_keyhash
else else
{ {
/* Hash is md5 of key */ /* Hash is md5 of key */
dds_key_md5 (kh); dds_key_md5 (kh);
} }
} }

View file

@ -243,7 +243,6 @@ struct tkmap_instance * dds_tkmap_find_by_id (_In_ struct tkmap * map, _In_ uint
_Check_return_ _Check_return_
struct tkmap_instance * dds_tkmap_find( struct tkmap_instance * dds_tkmap_find(
_In_opt_ const struct dds_topic * topic,
_In_ struct serdata * sd, _In_ struct serdata * sd,
_In_ const bool rd, _In_ const bool rd,
_In_ const bool create) _In_ const bool create)
@ -252,46 +251,9 @@ struct tkmap_instance * dds_tkmap_find(
struct tkmap_instance * tk; struct tkmap_instance * tk;
struct tkmap * map = gv.m_tkmap; struct tkmap * map = gv.m_tkmap;
assert(sd->v.keyhash.m_flags & DDS_KEY_HASH_SET);
dummy.m_sample = sd; dummy.m_sample = sd;
/* Generate key hash if required and not provided */
if (topic && topic->m_descriptor->m_nkeys)
{
if ((sd->v.keyhash.m_flags & DDS_KEY_HASH_SET) == 0)
{
dds_stream_t is;
dds_stream_from_serstate (&is, sd->v.st);
dds_stream_read_keyhash (&is, &sd->v.keyhash, topic->m_descriptor, sd->v.st->kind == STK_KEY);
}
else
{
if (topic->m_descriptor->m_flagset & DDS_TOPIC_FIXED_KEY)
{
sd->v.keyhash.m_flags |= DDS_KEY_IS_HASH;
}
#if DDS_DEBUG_KEYHASH
{
dds_stream_t is;
dds_key_hash_t kh;
/* Check that we generate same keyhash as provided */
memset (&kh, 0, sizeof (kh));
dds_stream_from_serstate (&is, sd->v.st);
dds_stream_read_keyhash (&is, &kh, topic->m_descriptor, sd->v.st->kind == STK_KEY);
assert (memcmp (kh.m_hash, sd->v.keyhash.m_hash, 16) == 0);
if (kh.m_key_buff_size)
{
dds_free (kh.m_key_buff);
}
}
#endif
}
}
retry: retry:
if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL) if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL)
{ {
@ -340,19 +302,15 @@ retry:
_Check_return_ _Check_return_
struct tkmap_instance * dds_tkmap_lookup_instance_ref (_In_ struct serdata * sd) struct tkmap_instance * dds_tkmap_lookup_instance_ref (_In_ struct serdata * sd)
{ {
dds_topic * topic = sd->v.st->topic ? sd->v.st->topic->status_cb_entity : NULL;
assert (vtime_awake_p (lookup_thread_state ()->vtime)); assert (vtime_awake_p (lookup_thread_state ()->vtime));
#if 0 #if 0
/* Topic might have been deleted -- FIXME: no way the topic may be deleted when there're still users out there */ /* Topic might have been deleted -- FIXME: no way the topic may be deleted when there're still users out there */
if (topic == NULL) if (sd->v.st->topic == NULL)
{ {
return NULL; return NULL;
} }
#endif #endif
return dds_tkmap_find (sd, true, true);
return dds_tkmap_find (topic, sd, true, true);
} }
void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk) void dds_tkmap_instance_ref (_In_ struct tkmap_instance *tk)

View file

@ -16,6 +16,7 @@
#include "ddsi/q_error.h" #include "ddsi/q_error.h"
#include "ddsi/q_thread.h" #include "ddsi/q_thread.h"
#include "q__osplser.h" #include "q__osplser.h"
#include "dds__stream.h"
#include "dds__err.h" #include "dds__err.h"
#include "ddsi/q_transmit.h" #include "ddsi/q_transmit.h"
#include "ddsi/q_ephash.h" #include "ddsi/q_ephash.h"
@ -26,14 +27,6 @@
#include <string.h> #include <string.h>
#if OS_ATOMIC64_SUPPORT
typedef os_atomic_uint64_t fake_seq_t;
uint64_t fake_seq_next (fake_seq_t *x) { return os_atomic_inc64_nv (x); }
#else /* HACK */
typedef os_atomic_uint32_t fake_seq_t;
uint64_t fake_seq_next (fake_seq_t *x) { return os_atomic_inc32_nv (x); }
#endif
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) _Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
dds_return_t dds_return_t
dds_write( dds_write(
@ -118,30 +111,9 @@ err:
return ret; return ret;
} }
static void
init_sampleinfo(
_Out_ struct nn_rsample_info *sampleinfo,
_In_ struct writer *wr,
_In_ int64_t seq,
_In_ serdata_t payload)
{
memset(sampleinfo, 0, sizeof(*sampleinfo));
sampleinfo->bswap = 0;
sampleinfo->complex_qos = 0;
sampleinfo->hashash = 0;
sampleinfo->seq = seq;
sampleinfo->reception_timestamp = payload->v.msginfo.timestamp;
sampleinfo->statusinfo = payload->v.msginfo.statusinfo;
sampleinfo->pwr_info.iid = 1;
sampleinfo->pwr_info.auto_dispose = 0;
sampleinfo->pwr_info.guid = wr->e.guid;
sampleinfo->pwr_info.ownership_strength = 0;
}
static int static int
deliver_locally( deliver_locally(
_In_ struct writer *wr, _In_ struct writer *wr,
_In_ int64_t seq,
_In_ serdata_t payload, _In_ serdata_t payload,
_In_ struct tkmap_instance *tk) _In_ struct tkmap_instance *tk)
{ {
@ -150,15 +122,15 @@ deliver_locally(
if (wr->rdary.fastpath_ok) { if (wr->rdary.fastpath_ok) {
struct reader ** const rdary = wr->rdary.rdary; struct reader ** const rdary = wr->rdary.rdary;
if (rdary[0]) { if (rdary[0]) {
struct nn_rsample_info sampleinfo; struct proxy_writer_info pwr_info;
unsigned i; unsigned i;
init_sampleinfo(&sampleinfo, wr, seq, payload); make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
for (i = 0; rdary[i]; i++) { for (i = 0; rdary[i]; i++) {
bool stored; bool stored;
TRACE (("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid))); TRACE (("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)));
dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC; dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC;
do { do {
stored = (ddsi_plugin.rhc_store_fn) (rdary[i]->rhc, &sampleinfo, payload, tk); stored = (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk);
if (!stored) { if (!stored) {
if (max_block_ms <= 0) { if (max_block_ms <= 0) {
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT, "The writer could not deliver data on time, probably due to a local reader resources being full."); ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT, "The writer could not deliver data on time, probably due to a local reader resources being full.");
@ -185,16 +157,16 @@ deliver_locally(
reliable samples that are rejected are simply discarded. */ reliable samples that are rejected are simply discarded. */
ut_avlIter_t it; ut_avlIter_t it;
struct pwr_rd_match *m; struct pwr_rd_match *m;
struct nn_rsample_info sampleinfo; struct proxy_writer_info pwr_info;
os_mutexUnlock (&wr->rdary.rdary_lock); os_mutexUnlock (&wr->rdary.rdary_lock);
init_sampleinfo(&sampleinfo, wr, seq, payload); make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
os_mutexLock (&wr->e.lock); os_mutexLock (&wr->e.lock);
for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) { for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) {
struct reader *rd; struct reader *rd;
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) { if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) {
TRACE (("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid))); TRACE (("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)));
/* Copied the return value ignore from DDSI deliver_user_data() function. */ /* Copied the return value ignore from DDSI deliver_user_data() function. */
(void)(ddsi_plugin.rhc_store_fn) (rd->rhc, &sampleinfo, payload, tk); (void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk);
} }
} }
os_mutexUnlock (&wr->e.lock); os_mutexUnlock (&wr->e.lock);
@ -209,7 +181,6 @@ dds_write_impl(
_In_ dds_time_t tstamp, _In_ dds_time_t tstamp,
_In_ dds_write_action action) _In_ dds_write_action action)
{ {
static fake_seq_t fake_seq;
dds_return_t ret = DDS_RETCODE_OK; dds_return_t ret = DDS_RETCODE_OK;
int w_rc; int w_rc;
@ -252,7 +223,7 @@ dds_write_impl(
d->v.msginfo.timestamp.v = tstamp; d->v.msginfo.timestamp.v = tstamp;
os_mutexLock (&writer->m_call_lock); os_mutexLock (&writer->m_call_lock);
ddsi_serdata_ref(d); ddsi_serdata_ref(d);
tk = (ddsi_plugin.rhc_lookup_fn) (d); tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (d);
w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk); w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk);
if (w_rc >= 0) { if (w_rc >= 0) {
@ -271,10 +242,10 @@ dds_write_impl(
os_mutexUnlock (&writer->m_call_lock); os_mutexUnlock (&writer->m_call_lock);
if (ret == DDS_RETCODE_OK) { if (ret == DDS_RETCODE_OK) {
ret = deliver_locally (ddsi_wr, fake_seq_next(&fake_seq), d, tk); ret = deliver_locally (ddsi_wr, d, tk);
} }
ddsi_serdata_unref(d); ddsi_serdata_unref(d);
(ddsi_plugin.rhc_unref_fn) (tk); (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
if (asleep) { if (asleep) {
thread_state_asleep (thr); thread_state_asleep (thr);
@ -292,7 +263,6 @@ dds_writecdr_impl(
_In_ dds_time_t tstamp, _In_ dds_time_t tstamp,
_In_ dds_write_action action) _In_ dds_write_action action)
{ {
static fake_seq_t fake_seq;
int ret = DDS_RETCODE_OK; int ret = DDS_RETCODE_OK;
int w_rc; int w_rc;
@ -316,17 +286,16 @@ dds_writecdr_impl(
} }
/* Serialize and write data or key */ /* Serialize and write data or key */
if (writekey) { {
abort(); serstate_t st = ddsi_serstate_new (gv.serpool, ddsi_wr->topic);
//d = serialize_key (gv.serpool, ddsi_wr->topic, data); dds_stream_t is;
} else { ddsi_serstate_append_blob(st, 1, sz, cdr);
serstate_t st = ddsi_serstate_new (gv.serpool, ddsi_wr->topic); d = ddsi_serstate_fix(st);
if (ddsi_wr->topic->nkeys) { assert(d->v.keyhash.m_flags == 0);
abort(); assert(!d->v.bswap);
//dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample); dds_stream_from_serstate (&is, d->v.st);
} d->v.st->kind = writekey ? STK_KEY : STK_DATA;
ddsi_serstate_append_blob(st, 1, sz, cdr); dds_stream_read_keyhash (&is, &d->v.keyhash, ddsi_wr->topic->status_cb_entity->m_descriptor, d->v.st->kind == STK_KEY);
d = ddsi_serstate_fix(st);
} }
/* Set if disposing or unregistering */ /* Set if disposing or unregistering */
@ -335,7 +304,7 @@ dds_writecdr_impl(
d->v.msginfo.timestamp.v = tstamp; d->v.msginfo.timestamp.v = tstamp;
os_mutexLock (&wr->m_call_lock); os_mutexLock (&wr->m_call_lock);
ddsi_serdata_ref(d); ddsi_serdata_ref(d);
tk = (ddsi_plugin.rhc_lookup_fn) (d); tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (d);
w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk); w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk);
if (w_rc >= 0) { if (w_rc >= 0) {
@ -354,10 +323,10 @@ dds_writecdr_impl(
os_mutexUnlock (&wr->m_call_lock); os_mutexUnlock (&wr->m_call_lock);
if (ret == DDS_RETCODE_OK) { if (ret == DDS_RETCODE_OK) {
ret = deliver_locally (ddsi_wr, fake_seq_next(&fake_seq), d, tk); ret = deliver_locally (ddsi_wr, d, tk);
} }
ddsi_serdata_unref(d); ddsi_serdata_unref(d);
(ddsi_plugin.rhc_unref_fn) (tk); (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
if (asleep) { if (asleep) {
thread_state_asleep (thr); thread_state_asleep (thr);

View file

@ -22,11 +22,7 @@ serdata_t serialize (serstatepool_t pool, const struct sertopic * tp, const void
{ {
dds_stream_t os; dds_stream_t os;
serstate_t st = ddsi_serstate_new (pool, tp); serstate_t st = ddsi_serstate_new (pool, tp);
dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample);
if (tp->nkeys)
{
dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample);
}
dds_stream_from_serstate (&os, st); dds_stream_from_serstate (&os, st);
dds_stream_write_sample (&os, sample, tp); dds_stream_write_sample (&os, sample, tp);
dds_stream_add_to_serstate (&os, st); dds_stream_add_to_serstate (&os, st);
@ -65,21 +61,14 @@ int serdata_cmp (const struct serdata *a, const struct serdata *b)
serdata_t serialize_key (serstatepool_t pool, const struct sertopic * tp, const void * sample) serdata_t serialize_key (serstatepool_t pool, const struct sertopic * tp, const void * sample)
{ {
serdata_t sd; serdata_t sd;
if (tp->nkeys) dds_stream_t os;
{ dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type;
dds_stream_t os; serstate_t st = ddsi_serstate_new (pool, tp);
dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type; dds_key_gen (desc, &st->data->v.keyhash, (char*) sample);
serstate_t st = ddsi_serstate_new (pool, tp); dds_stream_from_serstate (&os, st);
dds_key_gen (desc, &st->data->v.keyhash, (char*) sample); dds_stream_write_key (&os, sample, desc);
dds_stream_from_serstate (&os, st); dds_stream_add_to_serstate (&os, st);
dds_stream_write_key (&os, sample, desc); sd = st->data;
dds_stream_add_to_serstate (&os, st);
sd = st->data;
}
else
{
sd = serialize (pool, tp, sample);
}
sd->v.st->kind = STK_KEY; sd->v.st->kind = STK_KEY;
return sd; return sd;
} }

View file

@ -63,6 +63,7 @@
#define MAX_SAMPLES 21 #define MAX_SAMPLES 21
#define RDR_NOT_READ_CNT 11 #define RDR_NOT_READ_CNT 11
#define RDR_INV_READ_CNT 1
int rdr_expected_long_2[RDR_NOT_READ_CNT] = { 0, 1, 2, 6, 7, 9, 11, 13, 14, 16, 19 }; int rdr_expected_long_2[RDR_NOT_READ_CNT] = { 0, 1, 2, 6, 7, 9, 11, 13, 14, 16, 19 };
/* Because we only read one sample at a time, only the first sample of an instance /* Because we only read one sample at a time, only the first sample of an instance
@ -344,13 +345,13 @@ samples_cnt(void)
/*************************************************************************************************/ /*************************************************************************************************/
Test(ddsc_read_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) Test(ddsc_read_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
{ {
dds_return_t cnt = 0; dds_return_t cnt = 0, cntinv = 0;
dds_return_t ret = 1; dds_return_t ret = 1;
while (ret == 1){ while (ret == 1){
ret = dds_read_next(g_reader, g_samples, g_info); ret = dds_read_next(g_reader, g_samples, g_info);
cr_assert_geq(ret, 0 , "# read %d", ret); cr_assert_geq(ret, 0 , "# read %d", ret);
if(ret == 1){ if(ret == 1 && g_info[0].valid_data){
Space_Type1 *sample = (Space_Type1*)g_samples[0]; Space_Type1 *sample = (Space_Type1*)g_samples[0];
PRINT_SAMPLE("ddsc_read_next::reader: Read", (*sample)); PRINT_SAMPLE("ddsc_read_next::reader: Read", (*sample));
@ -373,10 +374,13 @@ Test(ddsc_read_next, reader, .init=reader_iterator_init, .fini=reader_iterator_f
cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].view_state, expected_vst);
cr_assert_eq(g_info[0].instance_state, expected_ist); cr_assert_eq(g_info[0].instance_state, expected_ist);
cnt ++; cnt ++;
} else if (ret == 1 && !g_info[0].valid_data) {
cntinv ++;
} }
} }
cr_assert_eq(cnt, RDR_NOT_READ_CNT); cr_assert_eq(cnt, RDR_NOT_READ_CNT);
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
/* All samples should still be available. */ /* All samples should still be available. */
ret = samples_cnt(); ret = samples_cnt();
@ -453,13 +457,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_read_next, invalid_buffers, .in
/*************************************************************************************************/ /*************************************************************************************************/
Test(ddsc_read_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) Test(ddsc_read_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
{ {
dds_return_t cnt = 0; dds_return_t cnt = 0, cntinv = 0;
dds_return_t ret = 1; dds_return_t ret = 1;
while (ret == 1){ while (ret == 1){
ret = dds_read_next_wl(g_reader, g_loans, g_info); ret = dds_read_next_wl(g_reader, g_loans, g_info);
cr_assert_geq(ret, 0 , "# read %d", ret); cr_assert_geq(ret, 0 , "# read %d", ret);
if(ret == 1){ if(ret == 1 && g_info[0].valid_data){
Space_Type1 *sample = (Space_Type1*)g_loans[0]; Space_Type1 *sample = (Space_Type1*)g_loans[0];
PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample)); PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample));
@ -482,10 +486,13 @@ Test(ddsc_read_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterato
cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].view_state, expected_vst);
cr_assert_eq(g_info[0].instance_state, expected_ist); cr_assert_eq(g_info[0].instance_state, expected_ist);
cnt ++; cnt ++;
} else if (ret == 1 && !g_info[0].valid_data) {
cntinv ++;
} }
} }
cr_assert_eq(cnt, RDR_NOT_READ_CNT); cr_assert_eq(cnt, RDR_NOT_READ_CNT);
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
ret = dds_return_loan(g_reader, g_loans, ret); ret = dds_return_loan(g_reader, g_loans, ret);
cr_assert_eq (ret, DDS_RETCODE_OK); cr_assert_eq (ret, DDS_RETCODE_OK);
@ -564,13 +571,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_read_next_wl, invalid_buffers,
/*************************************************************************************************/ /*************************************************************************************************/
Test(ddsc_take_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) Test(ddsc_take_next, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
{ {
dds_return_t cnt = 0; dds_return_t cnt = 0, cntinv = 0;
dds_return_t ret = 1; dds_return_t ret = 1;
while (ret == 1){ while (ret == 1){
ret = dds_take_next(g_reader, g_samples, g_info); ret = dds_take_next(g_reader, g_samples, g_info);
cr_assert_geq(ret, 0 , "# read %d", ret); cr_assert_geq(ret, 0 , "# read %d", ret);
if(ret == 1){ if(ret == 1 && g_info[0].valid_data){
Space_Type1 *sample = (Space_Type1*)g_samples[0]; Space_Type1 *sample = (Space_Type1*)g_samples[0];
PRINT_SAMPLE("ddsc_take_next::reader: Read", (*sample)); PRINT_SAMPLE("ddsc_take_next::reader: Read", (*sample));
@ -593,10 +600,13 @@ Test(ddsc_take_next, reader, .init=reader_iterator_init, .fini=reader_iterator_f
cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].view_state, expected_vst);
cr_assert_eq(g_info[0].instance_state, expected_ist); cr_assert_eq(g_info[0].instance_state, expected_ist);
cnt ++; cnt ++;
} else if (ret == 1 && !g_info[0].valid_data) {
cntinv ++;
} }
} }
cr_assert_eq(cnt, RDR_NOT_READ_CNT); cr_assert_eq(cnt, RDR_NOT_READ_CNT);
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
/* All samples should still be available. */ /* All samples should still be available. */
ret = samples_cnt(); ret = samples_cnt();
@ -671,13 +681,13 @@ Theory((void **buf, dds_sample_info_t *si), ddsc_take_next, invalid_buffers, .in
/*************************************************************************************************/ /*************************************************************************************************/
Test(ddsc_take_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini) Test(ddsc_take_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterator_fini)
{ {
dds_return_t cnt = 0; dds_return_t cnt = 0, cntinv = 0;
dds_return_t ret = 1; dds_return_t ret = 1;
while (ret == 1){ while (ret == 1){
ret = dds_take_next_wl(g_reader, g_loans, g_info); ret = dds_take_next_wl(g_reader, g_loans, g_info);
cr_assert_geq(ret, 0 , "# read %d", ret); cr_assert_geq(ret, 0 , "# read %d", ret);
if(ret == 1){ if(ret == 1 && g_info[0].valid_data){
Space_Type1 *sample = (Space_Type1*)g_loans[0]; Space_Type1 *sample = (Space_Type1*)g_loans[0];
PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample)); PRINT_SAMPLE("ddsc_read_next_wl::reader: Read", (*sample));
@ -700,10 +710,13 @@ Test(ddsc_take_next_wl, reader, .init=reader_iterator_init, .fini=reader_iterato
cr_assert_eq(g_info[0].view_state, expected_vst); cr_assert_eq(g_info[0].view_state, expected_vst);
cr_assert_eq(g_info[0].instance_state, expected_ist); cr_assert_eq(g_info[0].instance_state, expected_ist);
cnt ++; cnt ++;
} else if (ret == 1 && !g_info[0].valid_data) {
cntinv ++;
} }
} }
cr_assert_eq(cnt, RDR_NOT_READ_CNT); cr_assert_eq(cnt, RDR_NOT_READ_CNT);
cr_assert_eq(cntinv, RDR_INV_READ_CNT);
ret = dds_return_loan(g_reader, g_loans, ret); ret = dds_return_loan(g_reader, g_loans, ret);
cr_assert_eq (ret, DDS_RETCODE_OK); cr_assert_eq (ret, DDS_RETCODE_OK);

View file

@ -18,6 +18,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_raweth.c ddsi_raweth.c
ddsi_ipaddr.c ddsi_ipaddr.c
ddsi_mcgroup.c ddsi_mcgroup.c
ddsi_rhc_plugin.c
q_addrset.c q_addrset.c
q_bitset_inlines.c q_bitset_inlines.c
q_bswap.c q_bswap.c
@ -67,6 +68,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi"
ddsi_raweth.h ddsi_raweth.h
ddsi_ipaddr.h ddsi_ipaddr.h
ddsi_mcgroup.h ddsi_mcgroup.h
ddsi_rhc_plugin.h
probes-constants.h probes-constants.h
q_addrset.h q_addrset.h
q_align.h q_align.h

View file

@ -0,0 +1,48 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_RHC_PLUGIN_H
#define DDSI_RHC_PLUGIN_H
struct rhc;
struct nn_xqos;
struct tkmap_instance;
struct serdata;
struct sertopic;
struct entity_common;
struct proxy_writer_info
{
nn_guid_t guid;
bool auto_dispose;
int32_t ownership_strength;
uint64_t iid;
};
struct ddsi_rhc_plugin
{
void (*rhc_free_fn) (struct rhc *rhc);
void (*rhc_fini_fn) (struct rhc *rhc);
bool (*rhc_store_fn)
(struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info,
struct serdata * __restrict sample, struct tkmap_instance * __restrict tk);
void (*rhc_unregister_wr_fn)
(struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);
void (*rhc_relinquish_ownership_fn)
(struct rhc * __restrict rhc, const uint64_t wr_iid);
void (*rhc_set_qos_fn) (struct rhc * rhc, const struct nn_xqos * qos);
struct tkmap_instance * (*rhc_lookup_fn) (struct serdata *serdata);
void (*rhc_unref_fn) (struct tkmap_instance *tk);
};
void make_proxy_writer_info(struct proxy_writer_info *pwr_info, const struct entity_common *e, const struct nn_xqos *xqos);
#endif

View file

@ -100,7 +100,7 @@ struct serdata_base
serstate_t st; /* back pointer to (opaque) serstate so RTPS impl only needs serdata */ serstate_t st; /* back pointer to (opaque) serstate so RTPS impl only needs serdata */
struct serdata_msginfo msginfo; struct serdata_msginfo msginfo;
int hash_valid; /* whether hash is valid or must be computed from key/data */ int hash_valid; /* whether hash is valid or must be computed from key/data */
uint32_t hash; /* cached serdata hash, valid only if hash_valid != 0 */ uint32_t hash; /* cached serdata hash, valid only if hash_valid != 0 */
dds_key_hash_t keyhash; dds_key_hash_t keyhash;
bool bswap; /* Whether state is native endian or requires swapping */ bool bswap; /* Whether state is native endian or requires swapping */
}; };

View file

@ -22,6 +22,7 @@
#include "ddsi/q_xqos.h" #include "ddsi/q_xqos.h"
#include "ddsi/ddsi_tran.h" #include "ddsi/ddsi_tran.h"
#include "ddsi/q_feature_check.h" #include "ddsi/q_feature_check.h"
#include "ddsi/ddsi_rhc_plugin.h"
#if defined (__cplusplus) #if defined (__cplusplus)
extern "C" { extern "C" {
@ -404,35 +405,13 @@ struct config
q__schedPrioClass watchdog_sched_priority_class; q__schedPrioClass watchdog_sched_priority_class;
}; };
struct rhc;
struct nn_xqos;
struct tkmap_instance;
struct nn_rsample_info;
struct serdata;
struct sertopic;
struct proxy_writer;
struct proxy_writer_info;
struct ddsi_plugin struct ddsi_plugin
{ {
int (*init_fn) (void); int (*init_fn) (void);
void (*fini_fn) (void); void (*fini_fn) (void);
/* Read cache */ /* Read cache */
struct ddsi_rhc_plugin rhc_plugin;
void (*rhc_free_fn) (struct rhc *rhc);
void (*rhc_fini_fn) (struct rhc *rhc);
bool (*rhc_store_fn)
(struct rhc * __restrict rhc, const struct nn_rsample_info * __restrict sampleinfo,
struct serdata * __restrict sample, struct tkmap_instance * __restrict tk);
void (*rhc_unregister_wr_fn)
(struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);
void (*rhc_relinquish_ownership_fn)
(struct rhc * __restrict rhc, const uint64_t wr_iid);
void (*rhc_set_qos_fn) (struct rhc * rhc, const struct nn_xqos * qos);
struct tkmap_instance * (*rhc_lookup_fn) (struct serdata *serdata);
void (*rhc_unref_fn) (struct tkmap_instance *tk);
/* IID generator */ /* IID generator */

View file

@ -109,14 +109,6 @@ struct receiver_state {
nn_locator_t srcloc; nn_locator_t srcloc;
}; };
struct proxy_writer_info
{
nn_guid_t guid;
bool auto_dispose;
int32_t ownership_strength;
uint64_t iid;
};
struct nn_rsample_info { struct nn_rsample_info {
seqno_t seq; seqno_t seq;
struct receiver_state *rst; struct receiver_state *rst;
@ -129,9 +121,6 @@ struct nn_rsample_info {
unsigned pt_wr_info_zoff: 16; /* PrismTech writer info offset */ unsigned pt_wr_info_zoff: 16; /* PrismTech writer info offset */
unsigned bswap: 1; /* so we can extract well formatted writer info quicker */ unsigned bswap: 1; /* so we can extract well formatted writer info quicker */
unsigned complex_qos: 1; /* includes QoS other than keyhash, 2-bit statusinfo, PT writer info */ unsigned complex_qos: 1; /* includes QoS other than keyhash, 2-bit statusinfo, PT writer info */
unsigned hashash: 1; /* Do we have a key hash */
nn_keyhash_t keyhash; /* Key hash. Not currently used by OSPL */
struct proxy_writer_info pwr_info;
}; };
struct nn_rdata { struct nn_rdata {

View file

@ -0,0 +1,22 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include "ddsi/q_entity.h"
#include "ddsi/q_xqos.h"
#include "ddsi/ddsi_rhc_plugin.h"
void make_proxy_writer_info(struct proxy_writer_info *pwr_info, const struct entity_common *e, const struct nn_xqos *xqos)
{
pwr_info->guid = e->guid;
pwr_info->ownership_strength = xqos->ownership_strength.value;
pwr_info->auto_dispose = xqos->writer_data_lifecycle.autodispose_unregistered_instances;
pwr_info->iid = e->iid;
}

View file

@ -1379,12 +1379,8 @@ static void reader_drop_connection (const struct nn_guid *rd_guid, const struct
if (rd->rhc) if (rd->rhc)
{ {
struct proxy_writer_info pwr_info; struct proxy_writer_info pwr_info;
pwr_info.guid = pwr->e.guid; make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos);
pwr_info.ownership_strength = pwr->c.xqos->ownership_strength.value; (ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info);
pwr_info.auto_dispose = pwr->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances;
pwr_info.iid = pwr->e.iid;
(ddsi_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info);
} }
if (rd->status_cb) if (rd->status_cb)
{ {
@ -1418,12 +1414,8 @@ static void reader_drop_local_connection (const struct nn_guid *rd_guid, const s
{ {
/* FIXME: */ /* FIXME: */
struct proxy_writer_info pwr_info; struct proxy_writer_info pwr_info;
pwr_info.guid = wr->e.guid; make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
pwr_info.ownership_strength = wr->xqos->ownership_strength.value; (ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info);
pwr_info.auto_dispose = wr->xqos->writer_data_lifecycle.autodispose_unregistered_instances;
pwr_info.iid = wr->e.iid;
(ddsi_plugin.rhc_unregister_wr_fn) (rd->rhc, &pwr_info);
} }
if (rd->status_cb) if (rd->status_cb)
{ {
@ -1605,26 +1597,6 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd)
} }
} }
static void
init_sampleinfo(
_Out_ struct nn_rsample_info *sampleinfo,
_In_ struct writer *wr,
_In_ int64_t seq,
_In_ serdata_t payload)
{
memset(sampleinfo, 0, sizeof(*sampleinfo));
sampleinfo->bswap = 0;
sampleinfo->complex_qos = 0;
sampleinfo->hashash = 0;
sampleinfo->seq = seq;
sampleinfo->reception_timestamp = payload->v.msginfo.timestamp;
sampleinfo->statusinfo = payload->v.msginfo.statusinfo;
sampleinfo->pwr_info.iid = 1;
sampleinfo->pwr_info.auto_dispose = 0;
sampleinfo->pwr_info.guid = wr->e.guid;
sampleinfo->pwr_info.ownership_strength = 0;
}
static void writer_add_local_connection (struct writer *wr, struct reader *rd) static void writer_add_local_connection (struct writer *wr, struct reader *rd)
{ {
struct wr_rd_match *m = os_malloc (sizeof (*m)); struct wr_rd_match *m = os_malloc (sizeof (*m));
@ -1652,11 +1624,11 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
struct whc_node *n; struct whc_node *n;
while ((n = whc_next_node(wr->whc, seq)) != NULL) while ((n = whc_next_node(wr->whc, seq)) != NULL)
{ {
struct nn_rsample_info sampleinfo; struct proxy_writer_info pwr_info;
serdata_t payload = n->serdata; serdata_t payload = n->serdata;
struct tkmap_instance *tk = (ddsi_plugin.rhc_lookup_fn) (payload); struct tkmap_instance *tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (payload);
init_sampleinfo(&sampleinfo, wr, n->seq, payload); make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
(void)(ddsi_plugin.rhc_store_fn) (rd->rhc, &sampleinfo, payload, tk); (void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk);
seq = n->seq; seq = n->seq;
} }
} }
@ -3272,7 +3244,7 @@ static struct reader * new_reader_guid
/* set rhc qos for reader */ /* set rhc qos for reader */
if (rhc) if (rhc)
{ {
(ddsi_plugin.rhc_set_qos_fn) (rd->rhc, rd->xqos); (ddsi_plugin.rhc_plugin.rhc_set_qos_fn) (rd->rhc, rd->xqos);
} }
assert (rd->xqos->present & QP_LIVELINESS); assert (rd->xqos->present & QP_LIVELINESS);
if (rd->xqos->liveliness.kind != NN_AUTOMATIC_LIVELINESS_QOS || if (rd->xqos->liveliness.kind != NN_AUTOMATIC_LIVELINESS_QOS ||
@ -3402,7 +3374,7 @@ static void gc_delete_reader (struct gcreq *gcreq)
#endif #endif
if (rd->rhc) if (rd->rhc)
{ {
(ddsi_plugin.rhc_free_fn) (rd->rhc); (ddsi_plugin.rhc_plugin.rhc_free_fn) (rd->rhc);
} }
if (rd->status_cb) if (rd->status_cb)
{ {
@ -3431,7 +3403,7 @@ int delete_reader (const struct nn_guid *guid)
} }
if (rd->rhc) if (rd->rhc)
{ {
(ddsi_plugin.rhc_fini_fn) (rd->rhc); (ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc);
} }
nn_log (LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid)); nn_log (LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid));
ephash_remove_reader_guid (rd); ephash_remove_reader_guid (rd);

View file

@ -3123,16 +3123,6 @@ unsigned char *nn_plist_quickscan (struct nn_rsample_info *dest, const struct nn
{ {
case PID_PAD: case PID_PAD:
break; break;
case PID_KEYHASH:
if (length < sizeof (dest->keyhash))
{
TRACE (("plist(vendor %u.%u): quickscan(PID_KEYHASH): buffer too small\n",
src->vendorid.id[0], src->vendorid.id[1]));
return NULL;
}
memcpy (&dest->keyhash, pl, sizeof (dest->keyhash));
dest->hashash = 1;
break;
case PID_STATUSINFO: case PID_STATUSINFO:
if (length < 4) if (length < 4)
{ {

View file

@ -20,6 +20,7 @@
#include "ddsi/q_md5.h" #include "ddsi/q_md5.h"
#include "util/ut_avl.h" #include "util/ut_avl.h"
#include "q__osplser.h" #include "q__osplser.h"
#include "dds__stream.h"
#include "ddsi/q_protocol.h" #include "ddsi/q_protocol.h"
#include "ddsi/q_rtps.h" #include "ddsi/q_rtps.h"
#include "ddsi/q_misc.h" #include "ddsi/q_misc.h"
@ -320,13 +321,6 @@ static void set_sampleinfo_proxy_writer (struct nn_rsample_info *sampleinfo, nn_
{ {
struct proxy_writer * pwr = ephash_lookup_proxy_writer_guid (pwr_guid); struct proxy_writer * pwr = ephash_lookup_proxy_writer_guid (pwr_guid);
sampleinfo->pwr = pwr; sampleinfo->pwr = pwr;
if (pwr)
{
sampleinfo->pwr_info.guid = pwr->e.guid;
sampleinfo->pwr_info.ownership_strength = pwr->c.xqos->ownership_strength.value;
sampleinfo->pwr_info.auto_dispose = pwr->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances;
sampleinfo->pwr_info.iid = pwr->e.iid;
}
} }
static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, Data_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp) static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, Data_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp)
@ -1818,6 +1812,18 @@ static serstate_t make_raw_serstate
return st; return st;
} }
static serdata_t ddsi_serstate_fix_with_key (serstate_t st, const struct sertopic *topic, bool bswap)
{
serdata_t sample = ddsi_serstate_fix(st);
dds_stream_t is;
assert(sample->v.keyhash.m_flags == 0);
sample->v.bswap = bswap;
dds_stream_from_serstate (&is, sample->v.st);
/* FIXME: the relationship between dds_topic, topic_descriptor and sertopic clearly needs some work */
dds_stream_read_keyhash (&is, &sample->v.keyhash, topic->status_cb_entity->m_descriptor, sample->v.st->kind == STK_KEY);
return sample;
}
static serdata_t extract_sample_from_data static serdata_t extract_sample_from_data
( (
const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags, const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags,
@ -1845,7 +1851,7 @@ static serdata_t extract_sample_from_data
return NULL; return NULL;
} }
st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp); st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp);
sample = ddsi_serstate_fix (st); sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
} }
else if (sampleinfo->size) else if (sampleinfo->size)
{ {
@ -1856,13 +1862,13 @@ static serdata_t extract_sample_from_data
if (data_smhdr_flags & DATA_FLAG_KEYFLAG) if (data_smhdr_flags & DATA_FLAG_KEYFLAG)
{ {
st = make_raw_serstate (topic, fragchain, sampleinfo->size, 1, statusinfo, tstamp); st = make_raw_serstate (topic, fragchain, sampleinfo->size, 1, statusinfo, tstamp);
sample = ddsi_serstate_fix (st); sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
} }
else else
{ {
assert (data_smhdr_flags & DATA_FLAG_DATAFLAG); assert (data_smhdr_flags & DATA_FLAG_DATAFLAG);
st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp); st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp);
sample = ddsi_serstate_fix (st); sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
} }
} }
else if (data_smhdr_flags & DATA_FLAG_INLINE_QOS) else if (data_smhdr_flags & DATA_FLAG_INLINE_QOS)
@ -1880,7 +1886,7 @@ static serdata_t extract_sample_from_data
ddsi_serstate_set_msginfo (st, statusinfo, tstamp, NULL); ddsi_serstate_set_msginfo (st, statusinfo, tstamp, NULL);
st->kind = STK_KEY; st->kind = STK_KEY;
ddsi_serstate_append_blob (st, 1, sizeof (qos->keyhash), qos->keyhash.value); ddsi_serstate_append_blob (st, 1, sizeof (qos->keyhash), qos->keyhash.value);
sample = ddsi_serstate_fix (st); sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
} }
} }
else else
@ -1901,10 +1907,6 @@ static serdata_t extract_sample_from_data
failmsg ? failmsg : "for reasons unknown" failmsg ? failmsg : "for reasons unknown"
); );
} }
else
{
sample->v.bswap = sampleinfo->bswap;
}
return sample; return sample;
} }
@ -2036,16 +2038,12 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
us */ us */
{ {
struct tkmap_instance * tk; struct tkmap_instance * tk;
tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (payload);
if (sampleinfo->hashash)
{
payload->v.keyhash.m_flags = DDS_KEY_HASH_SET;
memcpy (&payload->v.keyhash.m_hash, &sampleinfo->keyhash, sizeof (payload->v.keyhash.m_hash));
}
tk = (ddsi_plugin.rhc_lookup_fn) (payload);
if (tk) if (tk)
{ {
struct proxy_writer_info pwr_info;
make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos);
if (rdguid == NULL) if (rdguid == NULL)
{ {
TRACE ((" %"PRId64"=>EVERYONE\n", sampleinfo->seq)); TRACE ((" %"PRId64"=>EVERYONE\n", sampleinfo->seq));
@ -2069,7 +2067,7 @@ retry:
for (i = 0; rdary[i]; i++) for (i = 0; rdary[i]; i++)
{ {
TRACE (("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid))); TRACE (("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)));
if (! (ddsi_plugin.rhc_store_fn) (rdary[i]->rhc, sampleinfo, payload, tk)) if (! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk))
{ {
if (pwr_locked) os_mutexUnlock (&pwr->e.lock); if (pwr_locked) os_mutexUnlock (&pwr->e.lock);
os_mutexUnlock (&pwr->rdary.rdary_lock); os_mutexUnlock (&pwr->rdary.rdary_lock);
@ -2100,7 +2098,7 @@ retry:
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL)
{ {
TRACE (("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid))); TRACE (("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)));
(void) (ddsi_plugin.rhc_store_fn) (rd->rhc, sampleinfo, payload, tk); (void) (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk);
} }
} }
if (!pwr_locked) os_mutexUnlock (&pwr->e.lock); if (!pwr_locked) os_mutexUnlock (&pwr->e.lock);
@ -2112,14 +2110,14 @@ retry:
{ {
struct reader *rd = ephash_lookup_reader_guid (rdguid);; struct reader *rd = ephash_lookup_reader_guid (rdguid);;
TRACE ((" %"PRId64"=>%x:%x:%x:%x%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?")); TRACE ((" %"PRId64"=>%x:%x:%x:%x%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?"));
while (rd && ! (ddsi_plugin.rhc_store_fn) (rd->rhc, sampleinfo, payload, tk) && ephash_lookup_proxy_writer_guid (&pwr->e.guid)) while (rd && ! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk) && ephash_lookup_proxy_writer_guid (&pwr->e.guid))
{ {
if (pwr_locked) os_mutexUnlock (&pwr->e.lock); if (pwr_locked) os_mutexUnlock (&pwr->e.lock);
dds_sleepfor (DDS_MSECS (1)); dds_sleepfor (DDS_MSECS (1));
if (pwr_locked) os_mutexLock (&pwr->e.lock); if (pwr_locked) os_mutexLock (&pwr->e.lock);
} }
} }
(ddsi_plugin.rhc_unref_fn) (tk); (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
} }
} }
ddsi_serdata_unref (payload); ddsi_serdata_unref (payload);
@ -2837,7 +2835,6 @@ static int handle_submsg_sequence
{ {
struct nn_rsample_info sampleinfo; struct nn_rsample_info sampleinfo;
unsigned char *datap; unsigned char *datap;
sampleinfo.hashash = 0;
/* valid_DataFrag does not validate the payload */ /* valid_DataFrag does not validate the payload */
if (!valid_DataFrag (rst, rmsg, &sm->datafrag, submsg_size, byteswap, &sampleinfo, &datap)) if (!valid_DataFrag (rst, rmsg, &sm->datafrag, submsg_size, byteswap, &sampleinfo, &datap))
goto malformed; goto malformed;
@ -2853,7 +2850,6 @@ static int handle_submsg_sequence
{ {
struct nn_rsample_info sampleinfo; struct nn_rsample_info sampleinfo;
unsigned char *datap; unsigned char *datap;
sampleinfo.hashash = 0;
/* valid_Data does not validate the payload */ /* valid_Data does not validate the payload */
if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap)) if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap))
{ {

View file

@ -1138,9 +1138,9 @@ int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, serdata_t serd
{ {
struct tkmap_instance *tk; struct tkmap_instance *tk;
int res; int res;
tk = (ddsi_plugin.rhc_lookup_fn) (serdata); tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (serdata);
res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 1); res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 1);
(ddsi_plugin.rhc_unref_fn) (tk); (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
return res; return res;
} }
@ -1148,9 +1148,9 @@ int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, serdata_t se
{ {
struct tkmap_instance *tk; struct tkmap_instance *tk;
int res; int res;
tk = (ddsi_plugin.rhc_lookup_fn) (serdata); tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (serdata);
res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 0); res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 0);
(ddsi_plugin.rhc_unref_fn) (tk); (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
return res; return res;
} }

View file

@ -1131,9 +1131,9 @@ static void write_pmd_message (struct nn_xpack *xp, struct participant *pp, unsi
encoding. */ encoding. */
serdata->hdr.identifier = PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE; serdata->hdr.identifier = PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE;
tk = (ddsi_plugin.rhc_lookup_fn) (serdata); tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (serdata);
write_sample_nogc (xp, wr, serdata, tk); write_sample_nogc (xp, wr, serdata, tk);
(ddsi_plugin.rhc_unref_fn) (tk); (ddsi_plugin.rhc_plugin.rhc_unref_fn) (tk);
#undef PMD_DATA_LENGTH #undef PMD_DATA_LENGTH
} }

View file

@ -52,11 +52,10 @@ enum readermode { MODE_PRINT, MODE_CHECK, MODE_ZEROLOAD, MODE_DUMP, MODE_NONE };
#define PM_IHANDLE 8u #define PM_IHANDLE 8u
#define PM_PHANDLE 16u #define PM_PHANDLE 16u
#define PM_STIME 32u #define PM_STIME 32u
#define PM_RTIME 64u #define PM_DGEN 64u
#define PM_DGEN 128u #define PM_NWGEN 128u
#define PM_NWGEN 256u #define PM_RANKS 256u
#define PM_RANKS 512u #define PM_STATE 512u
#define PM_STATE 1024u
static volatile sig_atomic_t termflag = 0; static volatile sig_atomic_t termflag = 0;
static int pid; static int pid;
@ -818,8 +817,6 @@ static void print_sampleinfo(dds_time_t *tstart, dds_time_t tnow, const dds_samp
sep = " : "; sep = " : ";
if (print_metadata & PM_STIME) if (print_metadata & PM_STIME)
n += printf ("%s%lld.%09lld", n > 0 ? sep : "", (si->source_timestamp/DDS_NSECS_IN_SEC), (si->source_timestamp%DDS_NSECS_IN_SEC)), sep = " "; n += printf ("%s%lld.%09lld", n > 0 ? sep : "", (si->source_timestamp/DDS_NSECS_IN_SEC), (si->source_timestamp%DDS_NSECS_IN_SEC)), sep = " ";
if (print_metadata & PM_RTIME)
n += printf ("%s%lld.%09lld", n > 0 ? sep : "", (si->reception_timestamp/DDS_NSECS_IN_SEC), (si->reception_timestamp%DDS_NSECS_IN_SEC));
sep = " : "; sep = " : ";
if (print_metadata & PM_DGEN) if (print_metadata & PM_DGEN)
n += printf ("%s%"PRIu32, n > 0 ? sep : "", si->disposed_generation_count), sep = " "; n += printf ("%s%"PRIu32, n > 0 ? sep : "", si->disposed_generation_count), sep = " ";
@ -2102,7 +2099,6 @@ static void set_print_mode(const char *optarg) {
{ "phandle", PM_PHANDLE }, { "phandle", PM_PHANDLE },
{ "ihandle", PM_IHANDLE }, { "ihandle", PM_IHANDLE },
{ "stime", PM_STIME }, { "stime", PM_STIME },
{ "rtime", PM_RTIME },
{ "dgen", PM_DGEN }, { "dgen", PM_DGEN },
{ "nwgen", PM_NWGEN }, { "nwgen", PM_NWGEN },
{ "ranks", PM_RANKS }, { "ranks", PM_RANKS },