diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index e2edb53..aba5f20 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -27,6 +27,11 @@ endif() add_definitions(-DDDSI_INCLUDE_NETWORK_PARTITIONS -DDDSI_INCLUDE_SSM) +option(ENABLE_LIFESPAN "Enable Lifespan QoS support" ON) +if(ENABLE_LIFESPAN) + add_definitions(-DDDSI_INCLUDE_LIFESPAN) +endif() + # OpenSSL is huge, raising the RSS by 1MB or so, and moreover find_package(OpenSSL) causes # trouble on some older CMake versions that otherwise work fine, so provide an option to avoid # all OpenSSL related things. diff --git a/src/core/ddsc/src/dds__rhc_default.h b/src/core/ddsc/src/dds__rhc_default.h index 654577a..7a8a8e1 100644 --- a/src/core/ddsc/src/dds__rhc_default.h +++ b/src/core/ddsc/src/dds__rhc_default.h @@ -20,9 +20,12 @@ struct dds_rhc; struct dds_reader; struct ddsi_sertopic; struct q_globals; +struct dds_rhc_default; +struct rhc_sample; DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks); DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic); +DDS_EXPORT nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index ff19588..a3922cb 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -39,6 +39,9 @@ #include "dds/ddsi/q_entity.h" /* proxy_writer_info */ #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata_default.h" +#ifdef DDSI_INCLUDE_LIFESPAN +#include "dds/ddsi/ddsi_lifespan.h" +#endif #include "dds/ddsi/sysdeps.h" /* INSTANCE MANAGEMENT @@ -122,9 +125,8 @@ DDSI implementation, but still) will be done by also reseting "wr_iid" when an exclusive ownership writer lowers its strength. - Lifespan, time base filter and deadline, are based on the instance - timestamp ("tstamp"). This time stamp needs to be changed to either source - or reception timestamp, depending on the ordering chosen. + Lifespan is based on the reception timestamp, and the monotonic time is + used for sample expiry if this QoS is set to something else than infinite. READ CONDITIONS =============== @@ -244,6 +246,10 @@ struct rhc_sample { bool isread; /* READ or NOT_READ sample state */ uint32_t disposed_gen; /* snapshot of instance counter at time of insertion */ uint32_t no_writers_gen; /* __/ */ +#ifdef DDSI_INCLUDE_LIFESPAN + struct lifespan_fhnode lifespan; /* fibheap node for lifespan */ + struct rhc_instance *inst; /* reference to rhc instance */ +#endif }; struct rhc_instance { @@ -289,15 +295,15 @@ struct dds_rhc_default { int32_t max_samples; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */ int32_t max_samples_per_instance; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */ - uint32_t n_instances; /* # instances, including empty [NOT USED] */ + uint32_t n_instances; /* # instances, including empty */ uint32_t n_nonempty_instances; /* # non-empty instances */ - uint32_t n_not_alive_disposed; /* # disposed, non-empty instances [NOT USED] */ - uint32_t n_not_alive_no_writers; /* # not-alive-no-writers, non-empty instances [NOT USED] */ - uint32_t n_new; /* # new, non-empty instances [NOT USED] */ + uint32_t n_not_alive_disposed; /* # disposed, non-empty instances */ + uint32_t n_not_alive_no_writers; /* # not-alive-no-writers, non-empty instances */ + uint32_t n_new; /* # new, non-empty instances */ uint32_t n_vsamples; /* # "valid" samples over all instances */ - uint32_t n_vread; /* # read "valid" samples over all instances [NOT USED] */ - uint32_t n_invsamples; /* # invalid samples over all instances [NOT USED] */ - uint32_t n_invread; /* # read invalid samples over all instances [NOT USED] */ + uint32_t n_vread; /* # read "valid" samples over all instances */ + uint32_t n_invsamples; /* # invalid samples over all instances */ + uint32_t n_invread; /* # read invalid samples over all instances */ bool by_source_ordering; /* true if BY_SOURCE, false if BY_RECEPTION */ bool exclusive_ownership; /* true if EXCLUSIVE, false if SHARED */ @@ -316,6 +322,9 @@ struct dds_rhc_default { uint32_t nqconds; /* Number of associated query conditions */ dds_querycond_mask_t qconds_samplest; /* Mask of associated query conditions that check the sample state */ void *qcond_eval_samplebuf; /* Temporary storage for evaluating query conditions, NULL if no qconds */ +#ifdef DDSI_INCLUDE_LIFESPAN + struct lifespan_adm lifespan; /* Lifespan administration */ +#endif }; struct trigger_info_cmn { @@ -460,6 +469,11 @@ static void topicless_to_clean_invsample (const struct ddsi_sertopic *topic, con } static unsigned qmask_of_inst (const struct rhc_instance *inst); +static void free_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct rhc_sample *s); +static void get_trigger_info_cmn (struct trigger_info_cmn *info, struct rhc_instance *inst); +static void get_trigger_info_pre (struct trigger_info_pre *info, struct rhc_instance *inst); +static void init_trigger_info_qcond (struct trigger_info_qcond *qc); +static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, struct rhc_instance *inst); static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst, struct dds_entity *triggers[], size_t *ntriggers); #ifndef NDEBUG static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_conds, bool check_qcmask); @@ -531,6 +545,83 @@ static void remove_inst_from_nonempty_list (struct dds_rhc_default *rhc, struct rhc->n_nonempty_instances--; } +#ifdef DDSI_INCLUDE_LIFESPAN +static void drop_expired_samples (struct dds_rhc_default *rhc, struct rhc_sample *sample) +{ + struct rhc_instance *inst = sample->inst; + struct trigger_info_pre pre; + struct trigger_info_post post; + struct trigger_info_qcond trig_qc; + size_t ntriggers = SIZE_MAX; + + assert (!inst_is_empty (inst)); + + TRACE ("rhc_default %p drop_exp(iid %"PRIx64" wriid %"PRIx64" exp %"PRId64" %s", + rhc, inst->iid, sample->wr_iid, sample->lifespan.t_expire.v, sample->isread ? "read" : "notread"); + + get_trigger_info_pre (&pre, inst); + init_trigger_info_qcond (&trig_qc); + + /* Find prev sample: in case of history depth of 1 this is the sample itself, + * (which is inst->latest). In case of larger history depth the most likely sample + * to be expired is the oldest, in which case inst->latest is the previous + * sample and inst->latest->next points to sample (circular list). We can + * assume that 'sample' is in the list, so a check to avoid infinite loop is not + * required here. */ + struct rhc_sample *psample = inst->latest; + while (psample->next != sample) + psample = psample->next; + + rhc->n_vsamples--; + if (sample->isread) + { + inst->nvread--; + rhc->n_vread--; + trig_qc.dec_sample_read = true; + } + if (--inst->nvsamples > 0) + { + if (inst->latest == sample) + inst->latest = psample; + psample->next = sample->next; + } + else + { + inst->latest = NULL; + } + trig_qc.dec_conds_sample = sample->conds; + free_sample (rhc, inst, sample); + get_trigger_info_cmn (&post.c, inst); + update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers); + if (inst_is_empty (inst)) + { + remove_inst_from_nonempty_list (rhc, inst); + if (inst->isdisposed) + rhc->n_not_alive_disposed--; + if (inst->wrcount == 0) + { + TRACE ("; iid %"PRIx64" #0,empty,drop", inst->iid); + if (!inst->isdisposed) + rhc->n_not_alive_no_writers--; + drop_instance_noupdate_no_writers (rhc, inst); + } + } + TRACE (")\n"); +} + +nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow) +{ + struct dds_rhc_default *rhc = hc; + struct rhc_sample *sample; + nn_mtime_t tnext; + ddsrt_mutex_lock (&rhc->lock); + while ((tnext = lifespan_next_expired_locked (&rhc->lifespan, tnow, (void **)&sample)).v == 0) + drop_expired_samples (rhc, sample); + ddsrt_mutex_unlock (&rhc->lock); + return tnext; +} +#endif /* DDSI_INCLUDE_LIFESPAN */ + struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks) { struct dds_rhc_default *rhc = ddsrt_malloc (sizeof (*rhc)); @@ -546,6 +637,10 @@ struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_global rhc->gv = gv; rhc->xchecks = xchecks; +#ifdef DDSI_INCLUDE_LIFESPAN + lifespan_init (gv, &rhc->lifespan, offsetof(struct dds_rhc_default, lifespan), offsetof(struct rhc_sample, lifespan), dds_rhc_default_sample_expired_cb); +#endif + return &rhc->common; } @@ -601,9 +696,15 @@ static struct rhc_sample *alloc_sample (struct rhc_instance *inst) } } -static void free_sample (struct rhc_instance *inst, struct rhc_sample *s) +static void free_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct rhc_sample *s) { +#ifndef DDSI_INCLUDE_LIFESPAN + DDSRT_UNUSED_ARG (rhc); +#endif ddsi_serdata_unref (s->sample); +#ifdef DDSI_INCLUDE_LIFESPAN + lifespan_unregister_sample_locked (&rhc->lifespan, &s->lifespan); +#endif if (s == &inst->a_sample) { assert (!inst->a_sample_free); @@ -665,11 +766,12 @@ static void free_instance_rhc_free (struct rhc_instance *inst, struct dds_rhc_de struct rhc_sample *s = inst->latest; const bool was_empty = inst_is_empty (inst); struct trigger_info_qcond dummy_trig_qc; + if (s) { do { struct rhc_sample * const s1 = s->next; - free_sample (inst, s); + free_sample (rhc, inst, s); s = s1; } while (s != inst->latest); rhc->n_vsamples -= inst->nvsamples; @@ -682,11 +784,10 @@ static void free_instance_rhc_free (struct rhc_instance *inst, struct dds_rhc_de #endif inst_clear_invsample_if_exists (rhc, inst, &dummy_trig_qc); if (!was_empty) - { remove_inst_from_nonempty_list (rhc, inst); - } - ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk); - ddsrt_free (inst); + if (inst->isnew) + rhc->n_new--; + free_empty_instance(inst, rhc); } static uint32_t dds_rhc_default_lock_samples (struct dds_rhc_default *rhc) @@ -708,7 +809,10 @@ static void free_instance_rhc_free_wrap (void *vnode, void *varg) static void dds_rhc_default_free (struct dds_rhc_default *rhc) { - assert (rhc_check_counts_locked (rhc, true, true)); +#ifdef DDSI_INCLUDE_LIFESPAN + dds_rhc_default_sample_expired_cb (rhc, NN_MTIME_NEVER); + lifespan_fini (&rhc->lifespan); +#endif ddsrt_hh_enum (rhc->instances, free_instance_rhc_free_wrap, rhc); assert (rhc->nonempty_instances == NULL); ddsrt_hh_free (rhc->instances); @@ -789,6 +893,10 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, assert (trig_qc->dec_conds_sample == 0); ddsi_serdata_unref (s->sample); +#ifdef DDSI_INCLUDE_LIFESPAN + lifespan_unregister_sample_locked (&rhc->lifespan, &s->lifespan); +#endif + trig_qc->dec_sample_read = s->isread; trig_qc->dec_conds_sample = s->conds; if (s->isread) @@ -843,6 +951,11 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, s->isread = false; s->disposed_gen = inst->disposed_gen; s->no_writers_gen = inst->no_writers_gen; +#ifdef DDSI_INCLUDE_LIFESPAN + s->inst = inst; + s->lifespan.t_expire = wrinfo->lifespan_exp; + lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan); +#endif s->conds = 0; if (rhc->nqconds != 0) @@ -939,6 +1052,8 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, stru assert (inst_is_empty (inst)); rhc->n_instances--; + if (inst->isnew) + rhc->n_new--; ret = ddsrt_hh_remove (rhc->instances, inst); assert (ret); @@ -1065,7 +1180,6 @@ static void account_for_empty_to_nonempty_transition (struct dds_rhc_default *rh { assert (inst_nsamples (inst) == 1); add_inst_to_nonempty_list (rhc, inst); - rhc->n_new += inst->isnew; if (inst->isdisposed) rhc->n_not_alive_disposed++; else if (inst->wrcount == 0) @@ -1294,6 +1408,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst assert (ret); (void) ret; rhc->n_instances++; + rhc->n_new++; get_trigger_info_cmn (&post->c, inst); *out_inst = inst; @@ -1486,16 +1601,10 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons if (inst->latest || inst_became_disposed) { if (was_empty) - { - /* general function is slightly slower than a specialised - one, but perhaps it is wiser to use the general one */ account_for_empty_to_nonempty_transition (rhc, inst); - } else - { rhc->n_not_alive_disposed += (uint32_t)(inst->isdisposed - old_isdisposed); - rhc->n_new += (uint32_t)(inst->isnew - old_isnew); - } + rhc->n_new += (uint32_t)(inst->isnew - old_isnew); } else { @@ -2032,7 +2141,7 @@ static int dds_rhc_take_w_qminv (struct dds_rhc_default *rhc, bool lock, void ** inst->latest = NULL; } - free_sample (inst, sample); + free_sample (rhc, inst, sample); if (++n == max_samples) { @@ -2179,7 +2288,7 @@ static int dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, stru else inst->latest = NULL; - free_sample (inst, sample); + free_sample (rhc, inst, sample); if (++n == max_samples) { @@ -2442,7 +2551,11 @@ static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_f trig_qc->dec_conds_invsample, trig_qc->dec_conds_sample, trig_qc->inc_conds_invsample, trig_qc->inc_conds_sample); assert (rhc->n_nonempty_instances >= rhc->n_not_alive_disposed + rhc->n_not_alive_no_writers); +#ifndef DDSI_INCLUDE_LIFESPAN + /* If lifespan is disabled, samples cannot expire and therefore + empty instances cannot be in the 'new' state. */ assert (rhc->n_nonempty_instances >= rhc->n_new); +#endif assert (rhc->n_vsamples >= rhc->n_vread); iter = rhc->conds; @@ -2687,6 +2800,8 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond bool a_sample_free = true; n_instances++; + if (inst->isnew) + n_new++; if (inst_is_empty (inst)) continue; @@ -2695,8 +2810,6 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond n_not_alive_disposed++; else if (inst->wrcount == 0) n_not_alive_no_writers++; - if (inst->isnew) - n_new++; if (inst->latest) { @@ -2790,9 +2903,7 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond if (check_conds) { for (i = 0, rciter = rhc->conds; i < ncheck; i++, rciter = rciter->m_next) - { assert (cond_match_count[i] == ddsrt_atomic_ld32 (&rciter->m_entity.m_status.m_trigger)); - } } if (rhc->n_nonempty_instances == 0) diff --git a/src/core/ddsc/src/dds_whc.c b/src/core/ddsc/src/dds_whc.c index e3c3913..5197a30 100644 --- a/src/core/ddsc/src/dds_whc.c +++ b/src/core/ddsc/src/dds_whc.c @@ -12,25 +12,28 @@ #include #include #include - #include "dds/ddsrt/heap.h" #include "dds/ddsrt/sync.h" #include "dds/ddsrt/misc.h" +#include "dds/ddsrt/avl.h" +#include "dds/ddsrt/fibheap.h" +#include "dds/ddsrt/hopscotch.h" #include "dds/ddsi/ddsi_serdata.h" +#ifdef DDSI_INCLUDE_LIFESPAN +#include "dds/ddsi/ddsi_lifespan.h" +#endif #include "dds/ddsi/q_unused.h" #include "dds/ddsi/q_config.h" -#include "dds__whc.h" #include "dds/ddsi/ddsi_tkmap.h" - -#include "dds/ddsrt/avl.h" -#include "dds/ddsrt/hopscotch.h" #include "dds/ddsi/q_time.h" #include "dds/ddsi/q_rtps.h" #include "dds/ddsi/q_freelist.h" #include "dds/ddsi/q_globals.h" +#include "dds__whc.h" #define USE_EHH 0 + struct whc_node { struct whc_node *next_seq; /* next in this interval */ struct whc_node *prev_seq; /* prev in this interval */ @@ -44,6 +47,9 @@ struct whc_node { unsigned borrowed: 1; /* at most one can borrow it at any time */ nn_mtime_t last_rexmit_ts; uint32_t rexmit_count; +#ifdef DDSI_INCLUDE_LIFESPAN + struct lifespan_fhnode lifespan; /* fibheap node for lifespan */ +#endif struct ddsi_serdata *serdata; }; @@ -95,6 +101,9 @@ struct whc_impl { #endif struct ddsrt_hh *idx_hash; ddsrt_avl_tree_t seq; +#ifdef DDSI_INCLUDE_LIFESPAN + struct lifespan_adm lifespan; /* Lifespan administration */ +#endif }; struct whc_sample_iter_impl { @@ -128,7 +137,7 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se static uint32_t whc_default_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); static void whc_default_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list); static void whc_default_get_state (const struct whc *whc, struct whc_state *st); -static int whc_default_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk); +static int whc_default_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk); static seqno_t whc_default_next_seq (const struct whc *whc, seqno_t seq); static bool whc_default_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample); static bool whc_default_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample); @@ -349,6 +358,21 @@ static struct whc_node *whc_findkey (const struct whc_impl *whc, const struct dd } } +#ifdef DDSI_INCLUDE_LIFESPAN +static nn_mtime_t whc_sample_expired_cb(void *hc, nn_mtime_t tnow) +{ + struct whc_impl *whc = hc; + void *sample; + nn_mtime_t tnext; + ddsrt_mutex_lock (&whc->lock); + while ((tnext = lifespan_next_expired_locked (&whc->lifespan, tnow, &sample)).v == 0) + whc_delete_one (whc, sample); + whc->maxseq_node = whc_findmax_procedurally (whc); + ddsrt_mutex_unlock (&whc->lock); + return tnext; +} +#endif + struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth) { size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */ @@ -384,6 +408,10 @@ struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdep else whc->idx_hash = NULL; +#ifdef DDSI_INCLUDE_LIFESPAN + lifespan_init (gv, &whc->lifespan, offsetof(struct whc_impl, lifespan), offsetof(struct whc_node, lifespan), whc_sample_expired_cb); +#endif + /* seq interval tree: always has an "open" node */ ddsrt_avl_init (&whc_seq_treedef, &whc->seq); intv = ddsrt_malloc (sizeof (*intv)); @@ -417,6 +445,11 @@ void whc_default_free (struct whc *whc_generic) struct whc_impl * const whc = (struct whc_impl *)whc_generic; check_whc (whc); +#ifdef DDSI_INCLUDE_LIFESPAN + whc_sample_expired_cb (whc, NN_MTIME_NEVER); + lifespan_fini (&whc->lifespan); +#endif + if (whc->idx_hash) { struct ddsrt_hh_iter it; @@ -686,6 +719,10 @@ static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_i whcn->unacked = 0; } +#ifdef DDSI_INCLUDE_LIFESPAN + lifespan_unregister_sample_locked (&whc->lifespan, &whcn->lifespan); +#endif + /* Take it out of seqhash; deleting it from the list ordered on sequence numbers is left to the caller (it has to be done unconditionally, but remove_acked_messages defers it until the end or a skipped node). */ @@ -869,6 +906,9 @@ static uint32_t whc_default_remove_acked_messages_noidx (struct whc_impl *whc, s whc->unacked_bytes -= (size_t) (whcn->total_bytes - (*deferred_free_list)->total_bytes + (*deferred_free_list)->size); for (whcn = *deferred_free_list; whcn; whcn = whcn->next_seq) { +#ifdef DDSI_INCLUDE_LIFESPAN + lifespan_unregister_sample_locked (&whc->lifespan, &whcn->lifespan); +#endif remove_whcn_from_hash (whc, whcn); assert (whcn->unacked); } @@ -1035,10 +1075,16 @@ static uint32_t whc_default_remove_acked_messages (struct whc *whc_generic, seqn return cnt; } -static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata) +static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata) { struct whc_node *newn = NULL; +#ifndef DDSI_INCLUDE_LIFESPAN + /* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info' + that contains both lifespan and deadline info of the writer */ + DDSRT_UNUSED_ARG (exp); +#endif + if ((newn = nn_freelist_pop (&whc_node_freelist)) == NULL) newn = ddsrt_malloc (sizeof (*newn)); newn->seq = seq; @@ -1062,6 +1108,10 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma if (newn->unacked) whc->unacked_bytes += newn->size; +#ifdef DDSI_INCLUDE_LIFESPAN + newn->lifespan.t_expire = exp; +#endif + insert_whcn_in_hash (whc, newn); if (whc->open_intv->first == NULL) @@ -1093,10 +1143,13 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma } whc->seq_size++; +#ifdef DDSI_INCLUDE_LIFESPAN + lifespan_register_sample_locked (&whc->lifespan, &newn->lifespan); +#endif return newn; } -static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) +static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) { struct whc_impl * const whc = (struct whc_impl *)whc_generic; struct whc_node *newn = NULL; @@ -1106,6 +1159,9 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se char pad[sizeof (struct whc_idxnode) + sizeof (struct whc_node *)]; } template; + /* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info' + that contains both lifespan als deadline info of the writer */ + ddsrt_mutex_lock (&whc->lock); check_whc (whc); @@ -1113,8 +1169,8 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se { struct whc_state whcst; get_state_locked (whc, &whcst); - TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%"PRIx32")\n", - (void *) whc, max_drop_seq, seq, (void *) plist, (void *) serdata, serdata->hash); + TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" exp %"PRId64" plist %p serdata %p:%"PRIx32")\n", + (void *) whc, max_drop_seq, seq, exp.v, (void *) plist, (void *) serdata, serdata->hash); TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n", whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth); } @@ -1128,7 +1184,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se assert (whc->seq_size == 0 || seq > whc->maxseq_node->seq); /* Always insert in seq admin */ - newn = whc_default_insert_seq (whc, max_drop_seq, seq, plist, serdata); + newn = whc_default_insert_seq (whc, max_drop_seq, seq, exp, plist, serdata); TRACE (" whcn %p:", (void*)newn); diff --git a/src/core/ddsc/src/dds_whc_builtintopic.c b/src/core/ddsc/src/dds_whc_builtintopic.c index 8cf1e10..36a27f0 100644 --- a/src/core/ddsc/src/dds_whc_builtintopic.c +++ b/src/core/ddsc/src/dds_whc_builtintopic.c @@ -143,11 +143,12 @@ static void bwhc_get_state (const struct whc *whc, struct whc_state *st) st->unacked_bytes = 0; } -static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) +static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) { (void)whc; (void)max_drop_seq; (void)seq; + (void)exp; (void)serdata; (void)tk; if (plist) diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index 8abd3e7..0f65cc6 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -99,7 +99,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay { dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time; struct ddsi_writer_info pwr_info; - ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos); + ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos, payload->statusinfo); for (uint32_t i = 0; rdary[i]; i++) { DDS_CTRACE (&wr->e.gv->logconfig, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid)); if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK) @@ -124,7 +124,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay const struct entity_index *gh = wr->e.gv->entity_index; dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time; ddsrt_mutex_unlock (&wr->rdary.rdary_lock); - ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos); + ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo); ddsrt_mutex_lock (&wr->e.lock); for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) { diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 87d9b51..0dbe254 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -218,9 +218,23 @@ static dds_return_t dds_writer_delete (dds_entity *e) return DDS_RETCODE_OK; } +static dds_return_t validate_writer_qos (const dds_qos_t *wqos) +{ +#ifndef DDSI_INCLUDE_LIFESPAN + if (wqos != NULL && (wqos->present & QP_LIFESPAN) && wqos->lifespan.duration != DDS_INFINITY) + return DDS_RETCODE_BAD_PARAMETER; +#else + DDSRT_UNUSED_ARG (wqos); +#endif + return DDS_RETCODE_OK; +} + static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { /* note: e->m_qos is still the old one to allow for failure here */ + dds_return_t ret; + if ((ret = validate_writer_qos(qos)) != DDS_RETCODE_OK) + return ret; if (enabled) { struct writer *wr; @@ -320,7 +334,8 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0); nn_xqos_mergein_missing (wqos, &pub->m_entity.m_domain->gv.default_xqos_wr, ~(uint64_t)0); - if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0) + if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0 || + (rc = validate_writer_qos(wqos)) != DDS_RETCODE_OK) { dds_delete_qos(wqos); goto err_bad_qos; diff --git a/src/core/ddsc/tests/CMakeLists.txt b/src/core/ddsc/tests/CMakeLists.txt index 530361d..bb6dbe7 100644 --- a/src/core/ddsc/tests/CMakeLists.txt +++ b/src/core/ddsc/tests/CMakeLists.txt @@ -55,6 +55,10 @@ set(ddsc_test_sources "write_various_types.c" "writer.c") +if(ENABLE_LIFESPAN) + list(APPEND ddsc_test_sources "lifespan.c") +endif() + add_cunit_executable(cunit_ddsc ${ddsc_test_sources}) target_include_directories( cunit_ddsc PRIVATE diff --git a/src/core/ddsc/tests/lifespan.c b/src/core/ddsc/tests/lifespan.c new file mode 100644 index 0000000..7190a71 --- /dev/null +++ b/src/core/ddsc/tests/lifespan.c @@ -0,0 +1,164 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include + +#include "dds/dds.h" +#include "CUnit/Theory.h" +#include "Space.h" + +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/threads.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_whc.h" +#include "dds__entity.h" + +static dds_entity_t g_participant = 0; +static dds_entity_t g_subscriber = 0; +static dds_entity_t g_publisher = 0; +static dds_entity_t g_topic = 0; +static dds_entity_t g_reader = 0; +static dds_entity_t g_writer = 0; +static dds_entity_t g_waitset = 0; +static dds_entity_t g_rcond = 0; +static dds_entity_t g_qcond = 0; + +static char* +create_topic_name(const char *prefix, char *name, size_t size) +{ + /* Get semi random g_topic name. */ + ddsrt_pid_t pid = ddsrt_getpid(); + ddsrt_tid_t tid = ddsrt_gettid(); + (void) snprintf(name, size, "%s_pid%"PRIdPID"_tid%"PRIdTID"", prefix, pid, tid); + return name; +} + +static void lifespan_init(void) +{ + dds_attach_t triggered; + dds_return_t ret; + char name[100]; + dds_qos_t *qos; + + qos = dds_create_qos(); + CU_ASSERT_PTR_NOT_NULL_FATAL(qos); + + g_participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL); + CU_ASSERT_FATAL(g_participant > 0); + + g_subscriber = dds_create_subscriber(g_participant, NULL, NULL); + CU_ASSERT_FATAL(g_subscriber > 0); + + g_publisher = dds_create_publisher(g_participant, NULL, NULL); + CU_ASSERT_FATAL(g_publisher > 0); + + g_waitset = dds_create_waitset(g_participant); + CU_ASSERT_FATAL(g_waitset > 0); + + g_topic = dds_create_topic(g_participant, &Space_Type1_desc, create_topic_name("ddsc_qos_lifespan_test", name, sizeof name), NULL, NULL); + CU_ASSERT_FATAL(g_topic > 0); + + dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY); + g_writer = dds_create_writer(g_publisher, g_topic, qos, NULL); + CU_ASSERT_FATAL(g_writer > 0); + g_reader = dds_create_reader(g_subscriber, g_topic, qos, NULL); + CU_ASSERT_FATAL(g_reader > 0); + + /* Sync g_reader to g_writer. */ + ret = dds_set_status_mask(g_reader, DDS_SUBSCRIPTION_MATCHED_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_attach(g_waitset, g_reader, g_reader); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_wait(g_waitset, &triggered, 1, DDS_SECS(1)); + CU_ASSERT_EQUAL_FATAL(ret, 1); + CU_ASSERT_EQUAL_FATAL(g_reader, (dds_entity_t)(intptr_t)triggered); + ret = dds_waitset_detach(g_waitset, g_reader); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + + /* Sync g_writer to g_reader. */ + ret = dds_set_status_mask(g_writer, DDS_PUBLICATION_MATCHED_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_attach(g_waitset, g_writer, g_writer); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_wait(g_waitset, &triggered, 1, DDS_SECS(1)); + CU_ASSERT_EQUAL_FATAL(ret, 1); + CU_ASSERT_EQUAL_FATAL(g_writer, (dds_entity_t)(intptr_t)triggered); + ret = dds_waitset_detach(g_waitset, g_writer); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + + dds_delete_qos(qos); +} + +static void lifespan_fini(void) +{ + dds_delete(g_rcond); + dds_delete(g_qcond); + dds_delete(g_reader); + dds_delete(g_writer); + dds_delete(g_subscriber); + dds_delete(g_publisher); + dds_delete(g_waitset); + dds_delete(g_topic); + dds_delete(g_participant); +} + +static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max) +{ + struct dds_entity *wr_entity; + struct writer *wr; + struct whc_state whcst; + CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0); + thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv); + wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid); + CU_ASSERT_FATAL(wr != NULL); + assert(wr != NULL); /* for Clang's static analyzer */ + whc_get_state(wr->whc, &whcst); + thread_state_asleep(lookup_thread_state()); + dds_entity_unpin(wr_entity); + + CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min); + CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max); +} + +CU_Test(ddsc_lifespan, basic, .init=lifespan_init, .fini=lifespan_fini) +{ + Space_Type1 sample = { 0, 0, 0 }; + dds_return_t ret; + dds_duration_t exp = DDS_MSECS(500); + dds_qos_t *qos; + + qos = dds_create_qos(); + CU_ASSERT_PTR_NOT_NULL_FATAL(qos); + + /* Write with default qos: lifespan inifinite */ + ret = dds_write (g_writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + check_whc_state(g_writer, 1, 1); + + dds_sleepfor (2 * exp); + check_whc_state(g_writer, 1, 1); + + dds_qset_lifespan(qos, exp); + ret = dds_set_qos(g_writer, qos); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + ret = dds_write (g_writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + check_whc_state(g_writer, 2, 2); + + dds_sleepfor (2 * exp); + check_whc_state(g_writer, -1, -1); + + dds_delete_qos(qos); +} \ No newline at end of file diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index ba3c40e..9f85d8e 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -59,6 +59,9 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" q_freelist.c sysdeps.c ) +if(ENABLE_LIFESPAN) + list(APPEND srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src/ddsi_lifespan.c") +endif() # The includes should reside close to the code. As long as that's not the case, # pull them in from this CMakeLists.txt. @@ -119,6 +122,9 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" q_xqos.h sysdeps.h ) +if(ENABLE_LIFESPAN) + list(APPEND hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi/ddsi_lifespan.h") +endif() target_sources(ddsc PRIVATE ${srcs_ddsi} ${hdrs_private_ddsi}) diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_lifespan.h b/src/core/ddsi/include/dds/ddsi/ddsi_lifespan.h new file mode 100644 index 0000000..8ce3fee --- /dev/null +++ b/src/core/ddsi/include/dds/ddsi/ddsi_lifespan.h @@ -0,0 +1,61 @@ +/* + * Copyright(c) 2006 to 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDSI_LIFESPAN_H +#define DDSI_LIFESPAN_H + +#include "dds/ddsrt/fibheap.h" +#include "dds/ddsi/q_time.h" +#include "dds/ddsi/q_globals.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +typedef nn_mtime_t (*sample_expired_cb_t)(void *hc, nn_mtime_t tnow); + +struct lifespan_adm { + ddsrt_fibheap_t ls_exp_heap; /* heap for sample expiration (lifespan) */ + struct xevent *evt; /* xevent that triggers for sample with earliest expiration */ + sample_expired_cb_t sample_expired_cb; /* callback for expired sample; this cb can use lifespan_next_expired_locked to get next expired sample */ + size_t fh_offset; /* offset of lifespan_adm element in whc or rhc */ + size_t fhn_offset; /* offset of lifespan_fhnode element in whc or rhc node (sample) */ +}; + +struct lifespan_fhnode { + ddsrt_fibheap_node_t heapnode; + nn_mtime_t t_expire; +}; + +DDS_EXPORT void lifespan_init (const struct q_globals *gv, struct lifespan_adm *lifespan_adm, size_t fh_offset, size_t fh_node_offset, sample_expired_cb_t sample_expired_cb); +DDS_EXPORT void lifespan_fini (const struct lifespan_adm *lifespan_adm); +DDS_EXPORT nn_mtime_t lifespan_next_expired_locked (const struct lifespan_adm *lifespan_adm, nn_mtime_t tnow, void **sample); +DDS_EXPORT void lifespan_register_sample_real (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node); +DDS_EXPORT void lifespan_unregister_sample_real (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node); + +inline void lifespan_register_sample_locked (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node) +{ + if (node->t_expire.v != T_NEVER) + lifespan_register_sample_real (lifespan_adm, node); +} + +inline void lifespan_unregister_sample_locked (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node) +{ + if (node->t_expire.v != T_NEVER) + lifespan_unregister_sample_real (lifespan_adm, node); +} + +#if defined (__cplusplus) +} +#endif + +#endif /* DDSI_LIFESPAN_H */ + diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h b/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h index 2e57e08..506e407 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h @@ -21,6 +21,7 @@ /* DDS_EXPORT inline i.c.w. __attributes__((visibility...)) and some compilers: */ #include "dds/ddsrt/attributes.h" #include "dds/ddsi/ddsi_guid.h" +#include "dds/ddsi/q_time.h" #if defined (__cplusplus) extern "C" { @@ -38,6 +39,9 @@ struct ddsi_writer_info bool auto_dispose; int32_t ownership_strength; uint64_t iid; +#ifdef DDSI_INCLUDE_LIFESPAN + nn_mtime_t lifespan_exp; +#endif }; typedef void (*ddsi_rhc_free_t) (struct ddsi_rhc *rhc); diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index b70df77..cae655a 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -696,7 +696,7 @@ void rebuild_or_clear_writer_addrsets(struct q_globals *gv, int rebuild); void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok); struct ddsi_writer_info; -DDS_EXPORT void ddsi_make_writer_info(struct ddsi_writer_info *wrinfo, const struct entity_common *e, const struct dds_qos *xqos); +DDS_EXPORT void ddsi_make_writer_info(struct ddsi_writer_info *wrinfo, const struct entity_common *e, const struct dds_qos *xqos, uint32_t statusinfo); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/dds/ddsi/q_whc.h b/src/core/ddsi/include/dds/ddsi/q_whc.h index 9eb73a5..b726f52 100644 --- a/src/core/ddsi/include/dds/ddsi/q_whc.h +++ b/src/core/ddsi/include/dds/ddsi/q_whc.h @@ -13,6 +13,7 @@ #define Q_WHC_H #include +#include "dds/ddsi/q_time.h" #if defined (__cplusplus) extern "C" { @@ -72,7 +73,7 @@ typedef void (*whc_free_t)(struct whc *whc); reliable readers that have not acknowledged all data */ /* max_drop_seq must go soon, it's way too ugly. */ /* plist may be NULL or ddsrt_malloc'd, WHC takes ownership of plist */ -typedef int (*whc_insert_t)(struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk); +typedef int (*whc_insert_t)(struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk); typedef uint32_t (*whc_downgrade_to_volatile_t)(struct whc *whc, struct whc_state *st); typedef uint32_t (*whc_remove_acked_messages_t)(struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); typedef void (*whc_free_deferred_free_list_t)(struct whc *whc, struct whc_node *deferred_free_list); @@ -120,8 +121,8 @@ inline bool whc_sample_iter_borrow_next (struct whc_sample_iter *it, struct whc_ inline void whc_free (struct whc *whc) { whc->ops->free (whc); } -inline int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) { - return whc->ops->insert (whc, max_drop_seq, seq, plist, serdata, tk); +inline int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) { + return whc->ops->insert (whc, max_drop_seq, seq, exp, plist, serdata, tk); } inline unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st) { return whc->ops->downgrade_to_volatile (whc, st); diff --git a/src/core/ddsi/src/ddsi_lifespan.c b/src/core/ddsi/src/ddsi_lifespan.c new file mode 100644 index 0000000..1ed3a3c --- /dev/null +++ b/src/core/ddsi/src/ddsi_lifespan.c @@ -0,0 +1,84 @@ +/* + * Copyright(c) 2006 to 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/fibheap.h" +#include "dds/ddsi/ddsi_lifespan.h" +#include "dds/ddsi/q_time.h" +#include "dds/ddsi/q_xevent.h" + +static int compare_lifespan_texp (const void *va, const void *vb) +{ + const struct lifespan_fhnode *a = va; + const struct lifespan_fhnode *b = vb; + return (a->t_expire.v == b->t_expire.v) ? 0 : (a->t_expire.v < b->t_expire.v) ? -1 : 1; +} + +const ddsrt_fibheap_def_t lifespan_fhdef = DDSRT_FIBHEAPDEF_INITIALIZER(offsetof (struct lifespan_fhnode, heapnode), compare_lifespan_texp); + +static void lifespan_rhc_node_exp (struct xevent *xev, void *varg, nn_mtime_t tnow) +{ + struct lifespan_adm * const lifespan_adm = varg; + nn_mtime_t next_valid = lifespan_adm->sample_expired_cb((char *)lifespan_adm - lifespan_adm->fh_offset, tnow); + resched_xevent_if_earlier (xev, next_valid); +} + + +/* Gets the sample from the fibheap in lifespan admin that was expired first. If no more + * expired samples exist in the fibheap, the expiry time (nn_mtime_t) for the next sample to + * expire is returned. If the fibheap contains no more samples, NN_MTIME_NEVER is returned */ +nn_mtime_t lifespan_next_expired_locked (const struct lifespan_adm *lifespan_adm, nn_mtime_t tnow, void **sample) +{ + struct lifespan_fhnode *node; + if ((node = ddsrt_fibheap_min(&lifespan_fhdef, &lifespan_adm->ls_exp_heap)) != NULL && node->t_expire.v <= tnow.v) + { + *sample = (char *)node - lifespan_adm->fhn_offset; + return (nn_mtime_t) { 0 }; + } + *sample = NULL; + return (node != NULL) ? node->t_expire : NN_MTIME_NEVER; +} + +void lifespan_init (const struct q_globals *gv, struct lifespan_adm *lifespan_adm, size_t fh_offset, size_t fh_node_offset, sample_expired_cb_t sample_expired_cb) +{ + ddsrt_fibheap_init (&lifespan_fhdef, &lifespan_adm->ls_exp_heap); + lifespan_adm->evt = qxev_callback (gv->xevents, NN_MTIME_NEVER, lifespan_rhc_node_exp, lifespan_adm); + lifespan_adm->sample_expired_cb = sample_expired_cb; + lifespan_adm->fh_offset = fh_offset; + lifespan_adm->fhn_offset = fh_node_offset; +} + +void lifespan_fini (const struct lifespan_adm *lifespan_adm) +{ + assert (ddsrt_fibheap_min (&lifespan_fhdef, &lifespan_adm->ls_exp_heap) == NULL); + delete_xevent_callback (lifespan_adm->evt); +} + +extern inline void lifespan_register_sample_locked (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node); + +void lifespan_register_sample_real (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node) +{ + ddsrt_fibheap_insert(&lifespan_fhdef, &lifespan_adm->ls_exp_heap, node); + resched_xevent_if_earlier (lifespan_adm->evt, node->t_expire); +} + +extern inline void lifespan_unregister_sample_locked (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node); + +void lifespan_unregister_sample_real (struct lifespan_adm *lifespan_adm, struct lifespan_fhnode *node) +{ + /* Updating the scheduled event with the new shortest expiry + * is not required, because the event will be rescheduled when + * this removed node expires. Only remove the node from the + * lifespan heap */ + ddsrt_fibheap_delete(&lifespan_fhdef, &lifespan_adm->ls_exp_heap, node); +} diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 1107a19..f69e5b7 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -311,12 +311,21 @@ nn_vendorid_t get_entity_vendorid (const struct entity_common *e) return NN_VENDORID_UNKNOWN; } -void ddsi_make_writer_info(struct ddsi_writer_info *wrinfo, const struct entity_common *e, const struct dds_qos *xqos) +void ddsi_make_writer_info(struct ddsi_writer_info *wrinfo, const struct entity_common *e, const struct dds_qos *xqos, uint32_t statusinfo) { +#ifndef DDSI_INCLUDE_LIFESPAN + DDSRT_UNUSED_ARG (statusinfo); +#endif wrinfo->guid = e->guid; wrinfo->ownership_strength = xqos->ownership_strength.value; wrinfo->auto_dispose = xqos->writer_data_lifecycle.autodispose_unregistered_instances; wrinfo->iid = e->iid; +#ifdef DDSI_INCLUDE_LIFESPAN + if (xqos->lifespan.duration != DDS_INFINITY && (statusinfo & (NN_STATUSINFO_UNREGISTER | NN_STATUSINFO_DISPOSE)) == 0) + wrinfo->lifespan_exp = add_duration_to_mtime(now_mt(), xqos->lifespan.duration); + else + wrinfo->lifespan_exp = NN_MTIME_NEVER; +#endif } /* DELETED PARTICIPANTS --------------------------------------------- */ @@ -1527,7 +1536,7 @@ static void reader_update_notify_pwr_alive_state (struct reader *rd, const struc if (delta < 0 && rd->rhc) { struct ddsi_writer_info wrinfo; - ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos); + ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos, NN_STATUSINFO_UNREGISTER); ddsi_rhc_unregister_wr (rd->rhc, &wrinfo); } @@ -1570,7 +1579,7 @@ static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struc if (rd->rhc) { struct ddsi_writer_info wrinfo; - ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos); + ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos, NN_STATUSINFO_UNREGISTER); ddsi_rhc_unregister_wr (rd->rhc, &wrinfo); } if (rd->status_cb) @@ -1607,7 +1616,7 @@ static void reader_drop_local_connection (const struct ddsi_guid *rd_guid, const { /* FIXME: */ struct ddsi_writer_info wrinfo; - ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos); + ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, NN_STATUSINFO_UNREGISTER); ddsi_rhc_unregister_wr (rd->rhc, &wrinfo); } if (rd->status_cb) @@ -1834,7 +1843,7 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd) struct ddsi_serdata *payload = sample.serdata; /* FIXME: whc has tk reference in its index nodes, which is what we really should be iterating over anyway, and so we don't really have to look them up anymore */ struct ddsi_tkmap_instance *tk = ddsi_tkmap_lookup_instance_ref (tkmap, payload); - ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos); + ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, sample.serdata->statusinfo); (void) ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk); ddsi_tkmap_instance_unref (tkmap, tk); } diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 07016e9..24b4bab 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -1950,7 +1950,7 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st /* FIXME: should it be 0, local wall clock time or INVALID? */ const nn_wctime_t tstamp = (sampleinfo->timestamp.v != NN_WCTIME_INVALID.v) ? sampleinfo->timestamp : ((nn_wctime_t) {0}); struct ddsi_writer_info wrinfo; - ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos); + ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos, statusinfo); if (rdguid == NULL) { diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index ebc04f6..0edf47f 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -910,10 +910,18 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist if (!do_insert) res = 0; - else if ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, plist, serdata, tk)) < 0) - res = insres; else - res = 1; + { + nn_mtime_t exp = NN_MTIME_NEVER; +#ifdef DDSI_INCLUDE_LIFESPAN + /* Don't set expiry for samples with flags unregister or dispose, because these are required + * for sample lifecycle and should always be delivered to the reader so that is can clean up + * its history cache. */ + if (wr->xqos->lifespan.duration != DDS_INFINITY && (serdata->statusinfo & (NN_STATUSINFO_UNREGISTER | NN_STATUSINFO_DISPOSE)) == 0) + exp = add_duration_to_mtime(serdata->twrite, wr->xqos->lifespan.duration); +#endif + res = ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, exp, plist, serdata, tk)) < 0) ? insres : 1; + } #ifndef NDEBUG if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER && !is_local_orphan_endpoint (&wr->e)) diff --git a/src/core/ddsi/src/q_whc.c b/src/core/ddsi/src/q_whc.c index 6ec4a1a..0a7d1c8 100644 --- a/src/core/ddsi/src/q_whc.c +++ b/src/core/ddsi/src/q_whc.c @@ -21,7 +21,7 @@ extern inline void whc_return_sample (struct whc *whc, struct whc_borrowed_sampl extern inline void whc_sample_iter_init (const struct whc *whc, struct whc_sample_iter *it); extern inline bool whc_sample_iter_borrow_next (struct whc_sample_iter *it, struct whc_borrowed_sample *sample); extern inline void whc_free (struct whc *whc); -extern int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk); +extern int whc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk); extern unsigned whc_downgrade_to_volatile (struct whc *whc, struct whc_state *st); extern unsigned whc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list); extern void whc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list); diff --git a/src/core/xtests/rhc_torture/rhc_torture.c b/src/core/xtests/rhc_torture/rhc_torture.c index 6046519..080f993 100644 --- a/src/core/xtests/rhc_torture/rhc_torture.c +++ b/src/core/xtests/rhc_torture/rhc_torture.c @@ -33,6 +33,9 @@ #include "dds/ddsc/dds_rhc.h" #include "dds__rhc_default.h" #include "dds/ddsi/ddsi_iid.h" +#ifdef DDSI_INCLUDE_LIFESPAN +#include "dds/ddsi/ddsi_lifespan.h" +#endif #include "RhcTypes.h" @@ -104,8 +107,20 @@ static struct ddsi_serdata *mkkeysample (int32_t keyval, unsigned statusinfo) return sd; } -static uint64_t store (struct ddsi_tkmap *tkmap, struct dds_rhc *rhc, struct proxy_writer *wr, struct ddsi_serdata *sd, bool print) +#ifdef DDSI_INCLUDE_LIFESPAN +static nn_mtime_t rand_texp () { + nn_mtime_t ret = now_mt(); + ret.v -= DDS_MSECS(500) + (int64_t) (ddsrt_prng_random (&prng) % DDS_MSECS(1500)); + return ret; +} +#endif + +static uint64_t store (struct ddsi_tkmap *tkmap, struct dds_rhc *rhc, struct proxy_writer *wr, struct ddsi_serdata *sd, bool print, bool lifespan_expiry) +{ +#ifndef DDSI_INCLUDE_LIFESPAN + DDSRT_UNUSED_ARG (lifespan_expiry); +#endif /* beware: unrefs sd */ struct ddsi_tkmap_instance *tk; struct ddsi_writer_info pwr_info; @@ -132,6 +147,12 @@ static uint64_t store (struct ddsi_tkmap *tkmap, struct dds_rhc *rhc, struct pro pwr_info.guid = wr->e.guid; pwr_info.iid = wr->e.iid; pwr_info.ownership_strength = wr->c.xqos->ownership_strength.value; +#ifdef DDSI_INCLUDE_LIFESPAN + if (lifespan_expiry && (sd->statusinfo & (NN_STATUSINFO_UNREGISTER | NN_STATUSINFO_DISPOSE)) == 0) + pwr_info.lifespan_exp = rand_texp(); + else + pwr_info.lifespan_exp = NN_MTIME_NEVER; +#endif dds_rhc_store (rhc, &pwr_info, sd, tk); ddsi_tkmap_instance_unref (tkmap, tk); thread_state_asleep (lookup_thread_state ()); @@ -651,7 +672,8 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, [8] = "rdc", [9] = "tkc", [10] = "tkc1", - [11] = "delwr" + [11] = "delwr", + [12] = "drpxp" }; static const uint32_t opfreqs[] = { [0] = 500, /* write */ @@ -665,7 +687,12 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, [8] = 200, /* read cond */ [9] = 30, /* take cond */ [10] = 100, /* take cond, max 1 */ - [11] = 1 /* unreg writer */ + [11] = 1, /* unreg writer */ +#ifdef DDSI_INCLUDE_LIFESPAN + [12] = 100 /* drop expired sample */ +#else + [12] = 0 /* drop expired sample */ +#endif }; uint32_t opthres[sizeof (opfreqs) / sizeof (opfreqs[0])]; { @@ -715,42 +742,42 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, case 0: { /* wr */ struct ddsi_serdata *s = mksample (keyval, 0); for (size_t k = 0; k < nrd; k++) - store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0); + store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0, true); ddsi_serdata_unref (s); break; } case 1: { /* wr disp */ struct ddsi_serdata *s = mksample (keyval, NN_STATUSINFO_DISPOSE); for (size_t k = 0; k < nrd; k++) - store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0); + store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0, true); ddsi_serdata_unref (s); break; } case 2: { /* disp */ struct ddsi_serdata *s = mkkeysample (keyval, NN_STATUSINFO_DISPOSE); for (size_t k = 0; k < nrd; k++) - store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0); + store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0, true); ddsi_serdata_unref (s); break; } case 3: { /* unreg */ struct ddsi_serdata *s = mkkeysample (keyval, NN_STATUSINFO_UNREGISTER); for (size_t k = 0; k < nrd; k++) - store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0); + store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0, true); ddsi_serdata_unref (s); break; } case 4: { /* disp unreg */ struct ddsi_serdata *s = mkkeysample (keyval, NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER); for (size_t k = 0; k < nrd; k++) - store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0); + store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0, true); ddsi_serdata_unref (s); break; } case 5: { /* wr disp unreg */ struct ddsi_serdata *s = mksample (keyval, NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER); for (size_t k = 0; k < nrd; k++) - store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0); + store (tkmap, rhc[k], wr[which], ddsi_serdata_ref (s), print && k == 0, true); ddsi_serdata_unref (s); break; } @@ -787,11 +814,24 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, wr_info.guid = wr[which]->e.guid; wr_info.iid = wr[which]->e.iid; wr_info.ownership_strength = wr[which]->c.xqos->ownership_strength.value; +#ifdef DDSI_INCLUDE_LIFESPAN + wr_info.lifespan_exp = NN_MTIME_NEVER; +#endif for (size_t k = 0; k < nrd; k++) dds_rhc_unregister_wr (rhc[k], &wr_info); thread_state_asleep (lookup_thread_state ()); break; } + case 12: { +#ifdef DDSI_INCLUDE_LIFESPAN + thread_state_awake_domain_ok (lookup_thread_state ()); + /* We can assume that rhc[k] is a dds_rhc_default at this point */ + for (size_t k = 0; k < nrd; k++) + (void) dds_rhc_default_sample_expired_cb (rhc[k], rand_texp()); + thread_state_asleep (lookup_thread_state ()); +#endif + break; + } } if ((i % 200) == 0) @@ -870,15 +910,15 @@ int main (int argc, char **argv) struct dds_rhc *rhc = mkrhc (gv, NULL, DDS_HISTORY_KEEP_LAST, 1, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); struct proxy_writer *wr0 = mkwr (gv, 1); uint64_t iid0, iid1, iid_t; - iid0 = store (tkmap, rhc, wr0, mksample (0, 0), print); - iid1 = store (tkmap, rhc, wr0, mksample (1, NN_STATUSINFO_DISPOSE), print); + iid0 = store (tkmap, rhc, wr0, mksample (0, 0), print, false); + iid1 = store (tkmap, rhc, wr0, mksample (1, NN_STATUSINFO_DISPOSE), print, false); const struct check c0[] = { { "NNA", iid0, wr0->e.iid, 0,0, 1, 0,1 }, { "NND", iid1, wr0->e.iid, 0,0, 1, 1,2 }, { 0, 0, 0, 0, 0, 0, 0, 0 } }; rdall (rhc, c0, print, states_seen); - iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print); + iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); assert (iid_t == iid0); (void)iid0; (void)iid_t; @@ -895,6 +935,9 @@ int main (int argc, char **argv) wr0_info.guid = wr0->e.guid; wr0_info.iid = wr0->e.iid; wr0_info.ownership_strength = wr0->c.xqos->ownership_strength.value; +#ifdef DDSI_INCLUDE_LIFESPAN + wr0_info.lifespan_exp = NN_MTIME_NEVER; +#endif dds_rhc_unregister_wr (rhc, &wr0_info); thread_state_asleep (lookup_thread_state ()); const struct check c2[] = { @@ -919,9 +962,9 @@ int main (int argc, char **argv) struct proxy_writer *wr[] = { mkwr (gv, 0), mkwr (gv, 0), mkwr (gv, 0) }; uint64_t iid0, iid_t; int nregs = 3, isreg[] = { 1, 1, 1 }; - iid0 = store (tkmap, rhc, wr[0], mksample (0, 0), print); - iid_t = store (tkmap, rhc, wr[1], mksample (0, 0), print); assert (iid0 == iid_t); - iid_t = store (tkmap, rhc, wr[2], mksample (0, 0), print); assert (iid0 == iid_t); + iid0 = store (tkmap, rhc, wr[0], mksample (0, 0), print, false); + iid_t = store (tkmap, rhc, wr[1], mksample (0, 0), print, false); assert (iid0 == iid_t); + iid_t = store (tkmap, rhc, wr[2], mksample (0, 0), print, false); assert (iid0 == iid_t); (void)iid0; tkall (rhc, NULL, print, states_seen); for (int i = 0; i < 3*3 * 3*3 * 3*3 * 3*3; i++) @@ -933,17 +976,17 @@ int main (int argc, char **argv) switch (oper) { case 0: - iid_t = store (tkmap, rhc, wr[which], mksample (0, 0), print); + iid_t = store (tkmap, rhc, wr[which], mksample (0, 0), print, false); if (!isreg[which]) { nregs++; isreg[which] = 1; } break; case 1: - iid_t = store (tkmap, rhc, wr[which], mkkeysample (0, NN_STATUSINFO_DISPOSE), print); + iid_t = store (tkmap, rhc, wr[which], mkkeysample (0, NN_STATUSINFO_DISPOSE), print, false); if (!isreg[which]) { nregs++; isreg[which] = 1; } break; case 2: if (nregs > 1 || !isreg[which]) { - iid_t = store (tkmap, rhc, wr[which], mkkeysample (0, NN_STATUSINFO_UNREGISTER), print); + iid_t = store (tkmap, rhc, wr[which], mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); if (isreg[which]) { isreg[which] = 0; nregs--; } } break; @@ -958,7 +1001,7 @@ int main (int argc, char **argv) { if (isreg[i]) { - iid_t = store (tkmap, rhc, wr[i], mkkeysample (0, NN_STATUSINFO_UNREGISTER), print); + iid_t = store (tkmap, rhc, wr[i], mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); assert (iid_t == iid0); isreg[i] = 0; nregs--; @@ -967,10 +1010,10 @@ int main (int argc, char **argv) assert (nregs == 0); tkall (rhc, 0, print, states_seen); wait_gc_cycle (gv->gcreq_queue); - iid_t = store (tkmap, rhc, wr[0], mksample (0, 0), print); + iid_t = store (tkmap, rhc, wr[0], mksample (0, 0), print, false); assert (iid_t != iid0); iid0 = iid_t; - iid_t = store (tkmap, rhc, wr[0], mkkeysample (0, NN_STATUSINFO_UNREGISTER), print); + iid_t = store (tkmap, rhc, wr[0], mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); assert (iid_t == iid0); frhc (rhc); diff --git a/src/mpt/tests/qos/procs/rw.c b/src/mpt/tests/qos/procs/rw.c index f663db9..10cceee 100644 --- a/src/mpt/tests/qos/procs/rw.c +++ b/src/mpt/tests/qos/procs/rw.c @@ -93,7 +93,11 @@ static void setqos (dds_qos_t *q, size_t i, bool isrd, bool create) dds_qset_history (q, (dds_history_kind_t) ((i + 1) % 2), (int32_t) (i + 1)); dds_qset_resource_limits (q, (int32_t) i + 3, (int32_t) i + 2, (int32_t) i + 1); dds_qset_presentation (q, (dds_presentation_access_scope_kind_t) ((psi + 1) % 3), 1, 1); +#ifdef DDSI_INCLUDE_LIFESPAN dds_qset_lifespan (q, INT64_C (23456789012345678) + (int32_t) i); +#else + dds_qset_lifespan (q, DDS_INFINITY); +#endif dds_qset_deadline (q, INT64_C (67890123456789012) + (int32_t) i); dds_qset_latency_budget (q, INT64_C (45678901234567890) + (int32_t) i); dds_qset_ownership (q, (dds_ownership_kind_t) ((i + 1) % 2));