Lifespan QoS implementation

This commit enables specifying a duration for data to be valid when writing
samples. After this duration, samples are dropped from the reader and writer
history cache. See section 2.2.3.16 of the DDS specification for more
details on this QoS.

The expiration of samples in the reader history cache is calculated
based on the reception timestamp of the sample and uses the monotonic
clock. As a result, the current implementation does not rely on clock
synchronisation between reader and writer. There may be reasons to
change this behavior in future and use the source timestamp instead.

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
Dennis Potman 2019-12-17 10:58:00 +01:00 committed by eboasson
parent 1ec9c3a194
commit ef0f4c2ae7
21 changed files with 660 additions and 81 deletions

View file

@ -20,9 +20,12 @@ struct dds_rhc;
struct dds_reader;
struct ddsi_sertopic;
struct q_globals;
struct dds_rhc_default;
struct rhc_sample;
DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks);
DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic);
DDS_EXPORT nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow);
#if defined (__cplusplus)
}

View file

@ -39,6 +39,9 @@
#include "dds/ddsi/q_entity.h" /* proxy_writer_info */
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#ifdef DDSI_INCLUDE_LIFESPAN
#include "dds/ddsi/ddsi_lifespan.h"
#endif
#include "dds/ddsi/sysdeps.h"
/* INSTANCE MANAGEMENT
@ -122,9 +125,8 @@
DDSI implementation, but still) will be done by also reseting "wr_iid"
when an exclusive ownership writer lowers its strength.
Lifespan, time base filter and deadline, are based on the instance
timestamp ("tstamp"). This time stamp needs to be changed to either source
or reception timestamp, depending on the ordering chosen.
Lifespan is based on the reception timestamp, and the monotonic time is
used for sample expiry if this QoS is set to something else than infinite.
READ CONDITIONS
===============
@ -244,6 +246,10 @@ struct rhc_sample {
bool isread; /* READ or NOT_READ sample state */
uint32_t disposed_gen; /* snapshot of instance counter at time of insertion */
uint32_t no_writers_gen; /* __/ */
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_fhnode lifespan; /* fibheap node for lifespan */
struct rhc_instance *inst; /* reference to rhc instance */
#endif
};
struct rhc_instance {
@ -289,15 +295,15 @@ struct dds_rhc_default {
int32_t max_samples; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */
int32_t max_samples_per_instance; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */
uint32_t n_instances; /* # instances, including empty [NOT USED] */
uint32_t n_instances; /* # instances, including empty */
uint32_t n_nonempty_instances; /* # non-empty instances */
uint32_t n_not_alive_disposed; /* # disposed, non-empty instances [NOT USED] */
uint32_t n_not_alive_no_writers; /* # not-alive-no-writers, non-empty instances [NOT USED] */
uint32_t n_new; /* # new, non-empty instances [NOT USED] */
uint32_t n_not_alive_disposed; /* # disposed, non-empty instances */
uint32_t n_not_alive_no_writers; /* # not-alive-no-writers, non-empty instances */
uint32_t n_new; /* # new, non-empty instances */
uint32_t n_vsamples; /* # "valid" samples over all instances */
uint32_t n_vread; /* # read "valid" samples over all instances [NOT USED] */
uint32_t n_invsamples; /* # invalid samples over all instances [NOT USED] */
uint32_t n_invread; /* # read invalid samples over all instances [NOT USED] */
uint32_t n_vread; /* # read "valid" samples over all instances */
uint32_t n_invsamples; /* # invalid samples over all instances */
uint32_t n_invread; /* # read invalid samples over all instances */
bool by_source_ordering; /* true if BY_SOURCE, false if BY_RECEPTION */
bool exclusive_ownership; /* true if EXCLUSIVE, false if SHARED */
@ -316,6 +322,9 @@ struct dds_rhc_default {
uint32_t nqconds; /* Number of associated query conditions */
dds_querycond_mask_t qconds_samplest; /* Mask of associated query conditions that check the sample state */
void *qcond_eval_samplebuf; /* Temporary storage for evaluating query conditions, NULL if no qconds */
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_adm lifespan; /* Lifespan administration */
#endif
};
struct trigger_info_cmn {
@ -460,6 +469,11 @@ static void topicless_to_clean_invsample (const struct ddsi_sertopic *topic, con
}
static unsigned qmask_of_inst (const struct rhc_instance *inst);
static void free_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct rhc_sample *s);
static void get_trigger_info_cmn (struct trigger_info_cmn *info, struct rhc_instance *inst);
static void get_trigger_info_pre (struct trigger_info_pre *info, struct rhc_instance *inst);
static void init_trigger_info_qcond (struct trigger_info_qcond *qc);
static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, struct rhc_instance *inst);
static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst, struct dds_entity *triggers[], size_t *ntriggers);
#ifndef NDEBUG
static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_conds, bool check_qcmask);
@ -531,6 +545,83 @@ static void remove_inst_from_nonempty_list (struct dds_rhc_default *rhc, struct
rhc->n_nonempty_instances--;
}
#ifdef DDSI_INCLUDE_LIFESPAN
static void drop_expired_samples (struct dds_rhc_default *rhc, struct rhc_sample *sample)
{
struct rhc_instance *inst = sample->inst;
struct trigger_info_pre pre;
struct trigger_info_post post;
struct trigger_info_qcond trig_qc;
size_t ntriggers = SIZE_MAX;
assert (!inst_is_empty (inst));
TRACE ("rhc_default %p drop_exp(iid %"PRIx64" wriid %"PRIx64" exp %"PRId64" %s",
rhc, inst->iid, sample->wr_iid, sample->lifespan.t_expire.v, sample->isread ? "read" : "notread");
get_trigger_info_pre (&pre, inst);
init_trigger_info_qcond (&trig_qc);
/* Find prev sample: in case of history depth of 1 this is the sample itself,
* (which is inst->latest). In case of larger history depth the most likely sample
* to be expired is the oldest, in which case inst->latest is the previous
* sample and inst->latest->next points to sample (circular list). We can
* assume that 'sample' is in the list, so a check to avoid infinite loop is not
* required here. */
struct rhc_sample *psample = inst->latest;
while (psample->next != sample)
psample = psample->next;
rhc->n_vsamples--;
if (sample->isread)
{
inst->nvread--;
rhc->n_vread--;
trig_qc.dec_sample_read = true;
}
if (--inst->nvsamples > 0)
{
if (inst->latest == sample)
inst->latest = psample;
psample->next = sample->next;
}
else
{
inst->latest = NULL;
}
trig_qc.dec_conds_sample = sample->conds;
free_sample (rhc, inst, sample);
get_trigger_info_cmn (&post.c, inst);
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
if (inst_is_empty (inst))
{
remove_inst_from_nonempty_list (rhc, inst);
if (inst->isdisposed)
rhc->n_not_alive_disposed--;
if (inst->wrcount == 0)
{
TRACE ("; iid %"PRIx64" #0,empty,drop", inst->iid);
if (!inst->isdisposed)
rhc->n_not_alive_no_writers--;
drop_instance_noupdate_no_writers (rhc, inst);
}
}
TRACE (")\n");
}
nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow)
{
struct dds_rhc_default *rhc = hc;
struct rhc_sample *sample;
nn_mtime_t tnext;
ddsrt_mutex_lock (&rhc->lock);
while ((tnext = lifespan_next_expired_locked (&rhc->lifespan, tnow, (void **)&sample)).v == 0)
drop_expired_samples (rhc, sample);
ddsrt_mutex_unlock (&rhc->lock);
return tnext;
}
#endif /* DDSI_INCLUDE_LIFESPAN */
struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks)
{
struct dds_rhc_default *rhc = ddsrt_malloc (sizeof (*rhc));
@ -546,6 +637,10 @@ struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_global
rhc->gv = gv;
rhc->xchecks = xchecks;
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_init (gv, &rhc->lifespan, offsetof(struct dds_rhc_default, lifespan), offsetof(struct rhc_sample, lifespan), dds_rhc_default_sample_expired_cb);
#endif
return &rhc->common;
}
@ -601,9 +696,15 @@ static struct rhc_sample *alloc_sample (struct rhc_instance *inst)
}
}
static void free_sample (struct rhc_instance *inst, struct rhc_sample *s)
static void free_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct rhc_sample *s)
{
#ifndef DDSI_INCLUDE_LIFESPAN
DDSRT_UNUSED_ARG (rhc);
#endif
ddsi_serdata_unref (s->sample);
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&rhc->lifespan, &s->lifespan);
#endif
if (s == &inst->a_sample)
{
assert (!inst->a_sample_free);
@ -665,11 +766,12 @@ static void free_instance_rhc_free (struct rhc_instance *inst, struct dds_rhc_de
struct rhc_sample *s = inst->latest;
const bool was_empty = inst_is_empty (inst);
struct trigger_info_qcond dummy_trig_qc;
if (s)
{
do {
struct rhc_sample * const s1 = s->next;
free_sample (inst, s);
free_sample (rhc, inst, s);
s = s1;
} while (s != inst->latest);
rhc->n_vsamples -= inst->nvsamples;
@ -682,11 +784,10 @@ static void free_instance_rhc_free (struct rhc_instance *inst, struct dds_rhc_de
#endif
inst_clear_invsample_if_exists (rhc, inst, &dummy_trig_qc);
if (!was_empty)
{
remove_inst_from_nonempty_list (rhc, inst);
}
ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk);
ddsrt_free (inst);
if (inst->isnew)
rhc->n_new--;
free_empty_instance(inst, rhc);
}
static uint32_t dds_rhc_default_lock_samples (struct dds_rhc_default *rhc)
@ -708,7 +809,10 @@ static void free_instance_rhc_free_wrap (void *vnode, void *varg)
static void dds_rhc_default_free (struct dds_rhc_default *rhc)
{
assert (rhc_check_counts_locked (rhc, true, true));
#ifdef DDSI_INCLUDE_LIFESPAN
dds_rhc_default_sample_expired_cb (rhc, NN_MTIME_NEVER);
lifespan_fini (&rhc->lifespan);
#endif
ddsrt_hh_enum (rhc->instances, free_instance_rhc_free_wrap, rhc);
assert (rhc->nonempty_instances == NULL);
ddsrt_hh_free (rhc->instances);
@ -789,6 +893,10 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
assert (trig_qc->dec_conds_sample == 0);
ddsi_serdata_unref (s->sample);
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&rhc->lifespan, &s->lifespan);
#endif
trig_qc->dec_sample_read = s->isread;
trig_qc->dec_conds_sample = s->conds;
if (s->isread)
@ -843,6 +951,11 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
s->isread = false;
s->disposed_gen = inst->disposed_gen;
s->no_writers_gen = inst->no_writers_gen;
#ifdef DDSI_INCLUDE_LIFESPAN
s->inst = inst;
s->lifespan.t_expire = wrinfo->lifespan_exp;
lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan);
#endif
s->conds = 0;
if (rhc->nqconds != 0)
@ -939,6 +1052,8 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, stru
assert (inst_is_empty (inst));
rhc->n_instances--;
if (inst->isnew)
rhc->n_new--;
ret = ddsrt_hh_remove (rhc->instances, inst);
assert (ret);
@ -1065,7 +1180,6 @@ static void account_for_empty_to_nonempty_transition (struct dds_rhc_default *rh
{
assert (inst_nsamples (inst) == 1);
add_inst_to_nonempty_list (rhc, inst);
rhc->n_new += inst->isnew;
if (inst->isdisposed)
rhc->n_not_alive_disposed++;
else if (inst->wrcount == 0)
@ -1294,6 +1408,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
assert (ret);
(void) ret;
rhc->n_instances++;
rhc->n_new++;
get_trigger_info_cmn (&post->c, inst);
*out_inst = inst;
@ -1486,16 +1601,10 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
if (inst->latest || inst_became_disposed)
{
if (was_empty)
{
/* general function is slightly slower than a specialised
one, but perhaps it is wiser to use the general one */
account_for_empty_to_nonempty_transition (rhc, inst);
}
else
{
rhc->n_not_alive_disposed += (uint32_t)(inst->isdisposed - old_isdisposed);
rhc->n_new += (uint32_t)(inst->isnew - old_isnew);
}
rhc->n_new += (uint32_t)(inst->isnew - old_isnew);
}
else
{
@ -2032,7 +2141,7 @@ static int dds_rhc_take_w_qminv (struct dds_rhc_default *rhc, bool lock, void **
inst->latest = NULL;
}
free_sample (inst, sample);
free_sample (rhc, inst, sample);
if (++n == max_samples)
{
@ -2179,7 +2288,7 @@ static int dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, stru
else
inst->latest = NULL;
free_sample (inst, sample);
free_sample (rhc, inst, sample);
if (++n == max_samples)
{
@ -2442,7 +2551,11 @@ static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_f
trig_qc->dec_conds_invsample, trig_qc->dec_conds_sample, trig_qc->inc_conds_invsample, trig_qc->inc_conds_sample);
assert (rhc->n_nonempty_instances >= rhc->n_not_alive_disposed + rhc->n_not_alive_no_writers);
#ifndef DDSI_INCLUDE_LIFESPAN
/* If lifespan is disabled, samples cannot expire and therefore
empty instances cannot be in the 'new' state. */
assert (rhc->n_nonempty_instances >= rhc->n_new);
#endif
assert (rhc->n_vsamples >= rhc->n_vread);
iter = rhc->conds;
@ -2687,6 +2800,8 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
bool a_sample_free = true;
n_instances++;
if (inst->isnew)
n_new++;
if (inst_is_empty (inst))
continue;
@ -2695,8 +2810,6 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
n_not_alive_disposed++;
else if (inst->wrcount == 0)
n_not_alive_no_writers++;
if (inst->isnew)
n_new++;
if (inst->latest)
{
@ -2790,9 +2903,7 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
if (check_conds)
{
for (i = 0, rciter = rhc->conds; i < ncheck; i++, rciter = rciter->m_next)
{
assert (cond_match_count[i] == ddsrt_atomic_ld32 (&rciter->m_entity.m_status.m_trigger));
}
}
if (rhc->n_nonempty_instances == 0)

View file

@ -12,25 +12,28 @@
#include <assert.h>
#include <stddef.h>
#include <string.h>
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/sync.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/hopscotch.h"
#include "dds/ddsi/ddsi_serdata.h"
#ifdef DDSI_INCLUDE_LIFESPAN
#include "dds/ddsi/ddsi_lifespan.h"
#endif
#include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_config.h"
#include "dds__whc.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/hopscotch.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_freelist.h"
#include "dds/ddsi/q_globals.h"
#include "dds__whc.h"
#define USE_EHH 0
struct whc_node {
struct whc_node *next_seq; /* next in this interval */
struct whc_node *prev_seq; /* prev in this interval */
@ -44,6 +47,9 @@ struct whc_node {
unsigned borrowed: 1; /* at most one can borrow it at any time */
nn_mtime_t last_rexmit_ts;
uint32_t rexmit_count;
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_fhnode lifespan; /* fibheap node for lifespan */
#endif
struct ddsi_serdata *serdata;
};
@ -95,6 +101,9 @@ struct whc_impl {
#endif
struct ddsrt_hh *idx_hash;
ddsrt_avl_tree_t seq;
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_adm lifespan; /* Lifespan administration */
#endif
};
struct whc_sample_iter_impl {
@ -128,7 +137,7 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
static uint32_t whc_default_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list);
static void whc_default_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list);
static void whc_default_get_state (const struct whc *whc, struct whc_state *st);
static int whc_default_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
static int whc_default_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
static seqno_t whc_default_next_seq (const struct whc *whc, seqno_t seq);
static bool whc_default_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample);
static bool whc_default_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample);
@ -349,6 +358,21 @@ static struct whc_node *whc_findkey (const struct whc_impl *whc, const struct dd
}
}
#ifdef DDSI_INCLUDE_LIFESPAN
static nn_mtime_t whc_sample_expired_cb(void *hc, nn_mtime_t tnow)
{
struct whc_impl *whc = hc;
void *sample;
nn_mtime_t tnext;
ddsrt_mutex_lock (&whc->lock);
while ((tnext = lifespan_next_expired_locked (&whc->lifespan, tnow, &sample)).v == 0)
whc_delete_one (whc, sample);
whc->maxseq_node = whc_findmax_procedurally (whc);
ddsrt_mutex_unlock (&whc->lock);
return tnext;
}
#endif
struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth)
{
size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */
@ -384,6 +408,10 @@ struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdep
else
whc->idx_hash = NULL;
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_init (gv, &whc->lifespan, offsetof(struct whc_impl, lifespan), offsetof(struct whc_node, lifespan), whc_sample_expired_cb);
#endif
/* seq interval tree: always has an "open" node */
ddsrt_avl_init (&whc_seq_treedef, &whc->seq);
intv = ddsrt_malloc (sizeof (*intv));
@ -417,6 +445,11 @@ void whc_default_free (struct whc *whc_generic)
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
check_whc (whc);
#ifdef DDSI_INCLUDE_LIFESPAN
whc_sample_expired_cb (whc, NN_MTIME_NEVER);
lifespan_fini (&whc->lifespan);
#endif
if (whc->idx_hash)
{
struct ddsrt_hh_iter it;
@ -686,6 +719,10 @@ static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_i
whcn->unacked = 0;
}
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&whc->lifespan, &whcn->lifespan);
#endif
/* Take it out of seqhash; deleting it from the list ordered on
sequence numbers is left to the caller (it has to be done unconditionally,
but remove_acked_messages defers it until the end or a skipped node). */
@ -869,6 +906,9 @@ static uint32_t whc_default_remove_acked_messages_noidx (struct whc_impl *whc, s
whc->unacked_bytes -= (size_t) (whcn->total_bytes - (*deferred_free_list)->total_bytes + (*deferred_free_list)->size);
for (whcn = *deferred_free_list; whcn; whcn = whcn->next_seq)
{
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&whc->lifespan, &whcn->lifespan);
#endif
remove_whcn_from_hash (whc, whcn);
assert (whcn->unacked);
}
@ -1035,10 +1075,16 @@ static uint32_t whc_default_remove_acked_messages (struct whc *whc_generic, seqn
return cnt;
}
static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata)
static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata)
{
struct whc_node *newn = NULL;
#ifndef DDSI_INCLUDE_LIFESPAN
/* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info'
that contains both lifespan and deadline info of the writer */
DDSRT_UNUSED_ARG (exp);
#endif
if ((newn = nn_freelist_pop (&whc_node_freelist)) == NULL)
newn = ddsrt_malloc (sizeof (*newn));
newn->seq = seq;
@ -1062,6 +1108,10 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma
if (newn->unacked)
whc->unacked_bytes += newn->size;
#ifdef DDSI_INCLUDE_LIFESPAN
newn->lifespan.t_expire = exp;
#endif
insert_whcn_in_hash (whc, newn);
if (whc->open_intv->first == NULL)
@ -1093,10 +1143,13 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma
}
whc->seq_size++;
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_register_sample_locked (&whc->lifespan, &newn->lifespan);
#endif
return newn;
}
static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
struct whc_node *newn = NULL;
@ -1106,6 +1159,9 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
char pad[sizeof (struct whc_idxnode) + sizeof (struct whc_node *)];
} template;
/* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info'
that contains both lifespan als deadline info of the writer */
ddsrt_mutex_lock (&whc->lock);
check_whc (whc);
@ -1113,8 +1169,8 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
{
struct whc_state whcst;
get_state_locked (whc, &whcst);
TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%"PRIx32")\n",
(void *) whc, max_drop_seq, seq, (void *) plist, (void *) serdata, serdata->hash);
TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" exp %"PRId64" plist %p serdata %p:%"PRIx32")\n",
(void *) whc, max_drop_seq, seq, exp.v, (void *) plist, (void *) serdata, serdata->hash);
TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
}
@ -1128,7 +1184,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
assert (whc->seq_size == 0 || seq > whc->maxseq_node->seq);
/* Always insert in seq admin */
newn = whc_default_insert_seq (whc, max_drop_seq, seq, plist, serdata);
newn = whc_default_insert_seq (whc, max_drop_seq, seq, exp, plist, serdata);
TRACE (" whcn %p:", (void*)newn);

View file

@ -143,11 +143,12 @@ static void bwhc_get_state (const struct whc *whc, struct whc_state *st)
st->unacked_bytes = 0;
}
static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
(void)whc;
(void)max_drop_seq;
(void)seq;
(void)exp;
(void)serdata;
(void)tk;
if (plist)

View file

@ -99,7 +99,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
{
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
struct ddsi_writer_info pwr_info;
ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos);
ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos, payload->statusinfo);
for (uint32_t i = 0; rdary[i]; i++) {
DDS_CTRACE (&wr->e.gv->logconfig, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
@ -124,7 +124,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
const struct entity_index *gh = wr->e.gv->entity_index;
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos);
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo);
ddsrt_mutex_lock (&wr->e.lock);
for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
{

View file

@ -218,9 +218,23 @@ static dds_return_t dds_writer_delete (dds_entity *e)
return DDS_RETCODE_OK;
}
static dds_return_t validate_writer_qos (const dds_qos_t *wqos)
{
#ifndef DDSI_INCLUDE_LIFESPAN
if (wqos != NULL && (wqos->present & QP_LIFESPAN) && wqos->lifespan.duration != DDS_INFINITY)
return DDS_RETCODE_BAD_PARAMETER;
#else
DDSRT_UNUSED_ARG (wqos);
#endif
return DDS_RETCODE_OK;
}
static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled)
{
/* note: e->m_qos is still the old one to allow for failure here */
dds_return_t ret;
if ((ret = validate_writer_qos(qos)) != DDS_RETCODE_OK)
return ret;
if (enabled)
{
struct writer *wr;
@ -320,7 +334,8 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0);
nn_xqos_mergein_missing (wqos, &pub->m_entity.m_domain->gv.default_xqos_wr, ~(uint64_t)0);
if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0)
if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0 ||
(rc = validate_writer_qos(wqos)) != DDS_RETCODE_OK)
{
dds_delete_qos(wqos);
goto err_bad_qos;

View file

@ -55,6 +55,10 @@ set(ddsc_test_sources
"write_various_types.c"
"writer.c")
if(ENABLE_LIFESPAN)
list(APPEND ddsc_test_sources "lifespan.c")
endif()
add_cunit_executable(cunit_ddsc ${ddsc_test_sources})
target_include_directories(
cunit_ddsc PRIVATE

View file

@ -0,0 +1,164 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "Space.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_whc.h"
#include "dds__entity.h"
static dds_entity_t g_participant = 0;
static dds_entity_t g_subscriber = 0;
static dds_entity_t g_publisher = 0;
static dds_entity_t g_topic = 0;
static dds_entity_t g_reader = 0;
static dds_entity_t g_writer = 0;
static dds_entity_t g_waitset = 0;
static dds_entity_t g_rcond = 0;
static dds_entity_t g_qcond = 0;
static char*
create_topic_name(const char *prefix, char *name, size_t size)
{
/* Get semi random g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid();
(void) snprintf(name, size, "%s_pid%"PRIdPID"_tid%"PRIdTID"", prefix, pid, tid);
return name;
}
static void lifespan_init(void)
{
dds_attach_t triggered;
dds_return_t ret;
char name[100];
dds_qos_t *qos;
qos = dds_create_qos();
CU_ASSERT_PTR_NOT_NULL_FATAL(qos);
g_participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
CU_ASSERT_FATAL(g_participant > 0);
g_subscriber = dds_create_subscriber(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_subscriber > 0);
g_publisher = dds_create_publisher(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_publisher > 0);
g_waitset = dds_create_waitset(g_participant);
CU_ASSERT_FATAL(g_waitset > 0);
g_topic = dds_create_topic(g_participant, &Space_Type1_desc, create_topic_name("ddsc_qos_lifespan_test", name, sizeof name), NULL, NULL);
CU_ASSERT_FATAL(g_topic > 0);
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
g_writer = dds_create_writer(g_publisher, g_topic, qos, NULL);
CU_ASSERT_FATAL(g_writer > 0);
g_reader = dds_create_reader(g_subscriber, g_topic, qos, NULL);
CU_ASSERT_FATAL(g_reader > 0);
/* Sync g_reader to g_writer. */
ret = dds_set_status_mask(g_reader, DDS_SUBSCRIPTION_MATCHED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_attach(g_waitset, g_reader, g_reader);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_wait(g_waitset, &triggered, 1, DDS_SECS(1));
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_EQUAL_FATAL(g_reader, (dds_entity_t)(intptr_t)triggered);
ret = dds_waitset_detach(g_waitset, g_reader);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
/* Sync g_writer to g_reader. */
ret = dds_set_status_mask(g_writer, DDS_PUBLICATION_MATCHED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_attach(g_waitset, g_writer, g_writer);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_wait(g_waitset, &triggered, 1, DDS_SECS(1));
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_EQUAL_FATAL(g_writer, (dds_entity_t)(intptr_t)triggered);
ret = dds_waitset_detach(g_waitset, g_writer);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
dds_delete_qos(qos);
}
static void lifespan_fini(void)
{
dds_delete(g_rcond);
dds_delete(g_qcond);
dds_delete(g_reader);
dds_delete(g_writer);
dds_delete(g_subscriber);
dds_delete(g_publisher);
dds_delete(g_waitset);
dds_delete(g_topic);
dds_delete(g_participant);
}
static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
{
struct dds_entity *wr_entity;
struct writer *wr;
struct whc_state whcst;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0);
thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv);
wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid);
CU_ASSERT_FATAL(wr != NULL);
assert(wr != NULL); /* for Clang's static analyzer */
whc_get_state(wr->whc, &whcst);
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(wr_entity);
CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min);
CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max);
}
CU_Test(ddsc_lifespan, basic, .init=lifespan_init, .fini=lifespan_fini)
{
Space_Type1 sample = { 0, 0, 0 };
dds_return_t ret;
dds_duration_t exp = DDS_MSECS(500);
dds_qos_t *qos;
qos = dds_create_qos();
CU_ASSERT_PTR_NOT_NULL_FATAL(qos);
/* Write with default qos: lifespan inifinite */
ret = dds_write (g_writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
check_whc_state(g_writer, 1, 1);
dds_sleepfor (2 * exp);
check_whc_state(g_writer, 1, 1);
dds_qset_lifespan(qos, exp);
ret = dds_set_qos(g_writer, qos);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
ret = dds_write (g_writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
check_whc_state(g_writer, 2, 2);
dds_sleepfor (2 * exp);
check_whc_state(g_writer, -1, -1);
dds_delete_qos(qos);
}