diff --git a/.travis.yml b/.travis.yml index d4d10aa..f2f8ef3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -142,33 +142,33 @@ windows_vs2017: &windows_vs2017 jobs: include: - <<: *linux_gcc8 - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles", COVERITY_SCAN=true ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles", COVERITY_SCAN=true ] if: type = cron - <<: *linux_gcc8 - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ] - <<: *linux_gcc8 - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ] - <<: *linux_gcc8 - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=NO, LIFESPAN=NO, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=NO, LIFESPAN=NO, DEADLINE=NO, GENERATOR="Unix Makefiles" ] - <<: *linux_clang - env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ] - <<: *linux_clang - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ] - <<: *osx_xcode9 - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ] if: type = cron - <<: *osx_xcode - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, GENERATOR="Unix Makefiles", MACOSX_DEPLOYMENT_TARGET=10.12 ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles", MACOSX_DEPLOYMENT_TARGET=10.12 ] - <<: *osx_xcode - env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ] - <<: *osx_xcode - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ] - <<: *windows_vs2017 - env: [ ARCH=x86, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Visual Studio 15 2017" ] + env: [ ARCH=x86, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Visual Studio 15 2017" ] - <<: *windows_vs2017 - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Visual Studio 15 2017 Win64" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Visual Studio 15 2017 Win64" ] - <<: *windows_vs2017 - env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Visual Studio 15 2017 Win64" ] + env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Visual Studio 15 2017 Win64" ] before_script: - conan profile new default --detect @@ -200,6 +200,7 @@ script: -DUSE_SANITIZER=${ASAN} -DENABLE_SSL=${SSL} -DENABLE_LIFESPAN=${LIFESPAN} + -DENABLE_DEADLINE_MISSED=${DEADLINE} -DBUILD_TESTING=on -DWERROR=on -G "${GENERATOR}" .. diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index aba5f20..ab95de9 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -32,6 +32,11 @@ if(ENABLE_LIFESPAN) add_definitions(-DDDSI_INCLUDE_LIFESPAN) endif() +option(ENABLE_DEADLINE_MISSED "Enable Deadline Missed QoS support" ON) +if(ENABLE_DEADLINE_MISSED) + add_definitions(-DDDSI_INCLUDE_DEADLINE_MISSED) +endif() + # OpenSSL is huge, raising the RSS by 1MB or so, and moreover find_package(OpenSSL) causes # trouble on some older CMake versions that otherwise work fine, so provide an option to avoid # all OpenSSL related things. diff --git a/src/core/ddsc/src/dds__rhc_default.h b/src/core/ddsc/src/dds__rhc_default.h index 7a8a8e1..f6b2413 100644 --- a/src/core/ddsc/src/dds__rhc_default.h +++ b/src/core/ddsc/src/dds__rhc_default.h @@ -25,7 +25,12 @@ struct rhc_sample; DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks); DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic); +#ifdef DDSI_INCLUDE_LIFESPAN DDS_EXPORT nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow); +#endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED +DDS_EXPORT nn_mtime_t dds_rhc_default_deadline_missed_cb(void *hc, nn_mtime_t tnow); +#endif #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__whc.h b/src/core/ddsc/src/dds__whc.h index dcaeb42..a252ba0 100644 --- a/src/core/ddsc/src/dds__whc.h +++ b/src/core/ddsc/src/dds__whc.h @@ -19,7 +19,12 @@ extern "C" { #endif struct q_globals; -struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth); +struct whc_writer_info; +struct dds_writer; + +struct whc *whc_new (struct q_globals *gv, const struct whc_writer_info *wrinfo); +struct whc_writer_info *whc_make_wrinfo (struct dds_writer *wr, const dds_qos_t *qos); +void whc_free_wrinfo (struct whc_writer_info *); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__writer.h b/src/core/ddsc/src/dds__writer.h index c2c9336..0f1b7ef 100644 --- a/src/core/ddsc/src/dds__writer.h +++ b/src/core/ddsc/src/dds__writer.h @@ -20,6 +20,10 @@ extern "C" { DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER) +struct status_cb_data; + +void dds_writer_status_cb (void *entity, const struct status_cb_data * data); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 28964f9..ff0b8cc 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -71,9 +71,23 @@ static dds_return_t dds_reader_delete (dds_entity *e) return DDS_RETCODE_OK; } +static dds_return_t validate_reader_qos (const dds_qos_t *rqos) +{ +#ifndef DDSI_INCLUDE_DEADLINE_MISSED + if (rqos != NULL && (rqos->present & QP_DEADLINE) && rqos->deadline.deadline != DDS_INFINITY) + return DDS_RETCODE_BAD_PARAMETER; +#else + DDSRT_UNUSED_ARG (rqos); +#endif + return DDS_RETCODE_OK; +} + static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { /* note: e->m_qos is still the old one to allow for failure here */ + dds_return_t ret; + if ((ret = validate_reader_qos(qos)) != DDS_RETCODE_OK) + return ret; if (enabled) { struct reader *rd; @@ -420,7 +434,8 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe nn_xqos_mergein_missing (rqos, tp->m_entity.m_qos, ~(uint64_t)0); nn_xqos_mergein_missing (rqos, &sub->m_entity.m_domain->gv.default_xqos_rd, ~(uint64_t)0); - if ((ret = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) != DDS_RETCODE_OK) + if ((ret = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) < 0 || + (ret = validate_reader_qos(rqos)) != DDS_RETCODE_OK) { dds_delete_qos (rqos); reader = ret; diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index 15bf67d..cdce42e 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -43,6 +43,9 @@ #ifdef DDSI_INCLUDE_LIFESPAN #include "dds/ddsi/ddsi_lifespan.h" #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED +#include "dds/ddsi/ddsi_deadline.h" +#endif #include "dds/ddsi/sysdeps.h" /* INSTANCE MANAGEMENT @@ -273,6 +276,9 @@ struct rhc_instance { ddsi_guid_t wr_guid; /* guid of last writer (if wr_iid != 0 then wr_guid is the corresponding guid, else undef) */ nn_wctime_t tstamp; /* source time stamp of last update */ struct ddsrt_circlist_elem nonempty_list; /* links non-empty instances in arbitrary ordering */ +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + struct deadline_elem deadline; /* element in deadline missed administration */ +#endif struct ddsi_tkmap_instance *tk; /* backref into TK for unref'ing */ struct rhc_sample a_sample; /* pre-allocated storage for 1 sample */ }; @@ -325,6 +331,9 @@ struct dds_rhc_default { #ifdef DDSI_INCLUDE_LIFESPAN struct lifespan_adm lifespan; /* Lifespan administration */ #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + struct deadline_adm deadline; /* Deadline missed administration */ +#endif }; struct trigger_info_cmn { @@ -598,6 +607,36 @@ nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow) } #endif /* DDSI_INCLUDE_LIFESPAN */ +#ifdef DDSI_INCLUDE_DEADLINE_MISSED +nn_mtime_t dds_rhc_default_deadline_missed_cb(void *hc, nn_mtime_t tnow) +{ + struct dds_rhc_default *rhc = hc; + void *vinst; + nn_mtime_t tnext; + ddsrt_mutex_lock (&rhc->lock); + while ((tnext = deadline_next_missed_locked (&rhc->deadline, tnow, &vinst)).v == 0) + { + struct rhc_instance *inst = vinst; + deadline_reregister_instance_locked (&rhc->deadline, &inst->deadline, tnow); + + inst->wr_iid_islive = 0; + + status_cb_data_t cb_data; + cb_data.raw_status_id = (int) DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID; + cb_data.extra = 0; + cb_data.handle = inst->iid; + cb_data.add = true; + ddsrt_mutex_unlock (&rhc->lock); + dds_reader_status_cb (&rhc->reader->m_entity, &cb_data); + ddsrt_mutex_lock (&rhc->lock); + + tnow = now_mt (); + } + ddsrt_mutex_unlock (&rhc->lock); + return tnext; +} +#endif /* DDSI_INCLUDE_DEADLINE_MISSED */ + struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks) { struct dds_rhc_default *rhc = ddsrt_malloc (sizeof (*rhc)); @@ -618,6 +657,11 @@ struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_global lifespan_init (gv, &rhc->lifespan, offsetof(struct dds_rhc_default, lifespan), offsetof(struct rhc_sample, lifespan), dds_rhc_default_sample_expired_cb); #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + rhc->deadline.dur = (reader != NULL) ? reader->m_entity.m_qos->deadline.deadline : DDS_INFINITY; + deadline_init (gv, &rhc->deadline, offsetof(struct dds_rhc_default, deadline), offsetof(struct rhc_instance, deadline), dds_rhc_default_deadline_missed_cb); +#endif + return &rhc->common; } @@ -638,6 +682,8 @@ static void dds_rhc_default_set_qos (struct dds_rhc_default * rhc, const dds_qos rhc->reliable = (qos->reliability.kind == DDS_RELIABILITY_RELIABLE); assert(qos->history.kind != DDS_HISTORY_KEEP_LAST || qos->history.depth > 0); rhc->history_depth = (qos->history.kind == DDS_HISTORY_KEEP_LAST) ? (uint32_t)qos->history.depth : ~0u; + /* FIXME: updating deadline duration not yet supported + rhc->deadline.dur = qos->deadline.deadline; */ } static bool eval_predicate_sample (const struct dds_rhc_default *rhc, const struct ddsi_serdata *sample, bool (*pred) (const void *sample)) @@ -735,6 +781,10 @@ static void free_empty_instance (struct rhc_instance *inst, struct dds_rhc_defau { assert (inst_is_empty (inst)); ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk); +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + if (!inst->isdisposed) + deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); +#endif ddsrt_free (inst); } @@ -789,9 +839,15 @@ static void dds_rhc_default_free (struct dds_rhc_default *rhc) #ifdef DDSI_INCLUDE_LIFESPAN dds_rhc_default_sample_expired_cb (rhc, NN_MTIME_NEVER); lifespan_fini (&rhc->lifespan); +#endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_stop (&rhc->deadline); #endif ddsrt_hh_enum (rhc->instances, free_instance_rhc_free_wrap, rhc); assert (ddsrt_circlist_isempty (&rhc->nonempty_instances)); +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_fini (&rhc->deadline); +#endif ddsrt_hh_free (rhc->instances); lwregs_fini (&rhc->registrations); if (rhc->qcond_eval_samplebuf != NULL) @@ -863,7 +919,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, { /* replace oldest sample; latest points to the latest one, the list is circular from old -> new, so latest->next is the oldest */ - inst_clear_invsample_if_exists (rhc, inst, trig_qc); assert (inst->latest != NULL); s = inst->latest->next; @@ -885,7 +940,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, else { /* Check if resource max_samples QoS exceeded */ - if (rhc->reader && rhc->max_samples != DDS_LENGTH_UNLIMITED && rhc->n_vsamples >= (uint32_t) rhc->max_samples) { cb_data->raw_status_id = (int) DDS_SAMPLE_REJECTED_STATUS_ID; @@ -896,7 +950,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, } /* Check if resource max_samples_per_instance QoS exceeded */ - if (rhc->reader && rhc->max_samples_per_instance != DDS_LENGTH_UNLIMITED && inst->nvsamples >= (uint32_t) rhc->max_samples_per_instance) { cb_data->raw_status_id = (int) DDS_SAMPLE_REJECTED_STATUS_ID; @@ -907,7 +960,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, } /* add new latest sample */ - s = alloc_sample (inst); inst_clear_invsample_if_exists (rhc, inst, trig_qc); if (inst->latest == NULL) @@ -933,6 +985,11 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, s->lifespan.t_expire = wrinfo->lifespan_exp; lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan); #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + /* Only renew the deadline missed counter in case the sample is actually stored in the rhc */ + if (!inst->isdisposed) + deadline_renew_instance_locked (&rhc->deadline, &inst->deadline); +#endif s->conds = 0; if (rhc->nqconds != 0) @@ -1294,7 +1351,7 @@ static bool dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance return notify_data_available; } -static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) +static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) { struct rhc_instance *inst; @@ -1325,6 +1382,11 @@ static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rh } } +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + if (!inst->isdisposed) + deadline_register_instance_locked (&rhc->deadline, &inst->deadline, now_mt ()); +#endif + return inst; } @@ -1485,8 +1547,6 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons cb_data.handle = 0; cb_data.add = true; goto error_or_nochange; - - /* FIXME: deadline (and other) QoS? */ } else { @@ -1536,11 +1596,19 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons TRACE (" disposed->notdisposed"); inst->isdisposed = 0; inst->disposed_gen++; +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + if (!is_dispose) + deadline_register_instance_locked (&rhc->deadline, &inst->deadline, now_mt ()); +#endif } if (is_dispose) { inst->isdisposed = 1; inst_became_disposed = !old_isdisposed; +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + if (inst_became_disposed) + deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); +#endif TRACE (" dispose(%d)", inst_became_disposed); } @@ -1556,9 +1624,24 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons /* FIXME: fix the bad rejection handling, probably put back in a proper rollback, until then a band-aid like this will have to do: */ inst->isnew = old_isnew; - inst->isdisposed = old_isdisposed; if (old_isdisposed) + { inst->disposed_gen--; + if (!inst->isdisposed) + { + inst->isdisposed = 1; +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); +#endif + } + } + else if (inst->isdisposed) + { + inst->isdisposed = 0; +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_register_instance_locked (&rhc->deadline, &inst->deadline, now_mt ()); +#endif + } goto error_or_nochange; } notify_data_available = true; @@ -1695,6 +1778,9 @@ static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict r if (auto_dispose && !inst->isdisposed) { inst->isdisposed = 1; +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); +#endif /* Set invalid sample for disposing it (unregister may also set it for unregistering) */ if (inst->latest) diff --git a/src/core/ddsc/src/dds_whc.c b/src/core/ddsc/src/dds_whc.c index 5197a30..8c071ec 100644 --- a/src/core/ddsc/src/dds_whc.c +++ b/src/core/ddsc/src/dds_whc.c @@ -22,6 +22,9 @@ #ifdef DDSI_INCLUDE_LIFESPAN #include "dds/ddsi/ddsi_lifespan.h" #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED +#include "dds/ddsi/ddsi_deadline.h" +#endif #include "dds/ddsi/q_unused.h" #include "dds/ddsi/q_config.h" #include "dds/ddsi/ddsi_tkmap.h" @@ -29,7 +32,10 @@ #include "dds/ddsi/q_rtps.h" #include "dds/ddsi/q_freelist.h" #include "dds/ddsi/q_globals.h" +#include "dds/ddsi/q_entity.h" #include "dds__whc.h" +#include "dds__entity.h" +#include "dds__writer.h" #define USE_EHH 0 @@ -66,6 +72,9 @@ struct whc_idxnode { seqno_t prune_seq; struct ddsi_tkmap_instance *tk; uint32_t headidx; +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + struct deadline_elem deadline; /* list element for deadline missed */ +#endif struct whc_node *hist[]; }; @@ -76,6 +85,15 @@ struct whc_seq_entry { }; #endif +struct whc_writer_info { + dds_writer * writer; /* can be NULL, eg in case of whc for built-in writers */ + unsigned is_transient_local: 1; + unsigned has_deadline: 1; + uint32_t hdepth; /* 0 = unlimited */ + uint32_t tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */ + uint32_t idxdepth; /* = max (hdepth, tldepth) */ +}; + struct whc_impl { struct whc common; ddsrt_mutex_t lock; @@ -84,13 +102,10 @@ struct whc_impl { size_t sample_overhead; uint32_t fragment_size; uint64_t total_bytes; /* total number of bytes pushed in */ - unsigned is_transient_local: 1; unsigned xchecks: 1; struct q_globals *gv; struct ddsi_tkmap *tkmap; - uint32_t hdepth; /* 0 = unlimited */ - uint32_t tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */ - uint32_t idxdepth; /* = max (hdepth, tldepth) */ + struct whc_writer_info wrinfo; seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */ struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */ struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */ @@ -104,6 +119,9 @@ struct whc_impl { #ifdef DDSI_INCLUDE_LIFESPAN struct lifespan_adm lifespan; /* Lifespan administration */ #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + struct deadline_adm deadline; /* Deadline missed administration */ +#endif }; struct whc_sample_iter_impl { @@ -373,45 +391,91 @@ static nn_mtime_t whc_sample_expired_cb(void *hc, nn_mtime_t tnow) } #endif -struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth) +#ifdef DDSI_INCLUDE_DEADLINE_MISSED +static nn_mtime_t whc_deadline_missed_cb(void *hc, nn_mtime_t tnow) +{ + struct whc_impl *whc = hc; + void *vidxnode; + nn_mtime_t tnext; + ddsrt_mutex_lock (&whc->lock); + while ((tnext = deadline_next_missed_locked (&whc->deadline, tnow, &vidxnode)).v == 0) + { + struct whc_idxnode *idxnode = vidxnode; + deadline_reregister_instance_locked (&whc->deadline, &idxnode->deadline, tnow); + + status_cb_data_t cb_data; + cb_data.raw_status_id = (int) DDS_OFFERED_DEADLINE_MISSED_STATUS_ID; + cb_data.extra = 0; + cb_data.handle = 0; + cb_data.add = true; + ddsrt_mutex_unlock (&whc->lock); + dds_writer_status_cb (&whc->wrinfo.writer->m_entity, &cb_data); + ddsrt_mutex_lock (&whc->lock); + + tnow = now_mt (); + } + ddsrt_mutex_unlock (&whc->lock); + return tnext; +} +#endif + +struct whc_writer_info *whc_make_wrinfo (struct dds_writer *wr, const dds_qos_t *qos) +{ + struct whc_writer_info *wrinfo = ddsrt_malloc (sizeof (*wrinfo)); + wrinfo->writer = wr; + wrinfo->is_transient_local = (qos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL); + wrinfo->has_deadline = (qos->deadline.deadline != DDS_INFINITY); + wrinfo->hdepth = (qos->history.kind == DDS_HISTORY_KEEP_ALL) ? 0 : (unsigned) qos->history.depth; + if (!wrinfo->is_transient_local) + wrinfo->tldepth = 0; + else + wrinfo->tldepth = (qos->durability_service.history.kind == DDS_HISTORY_KEEP_ALL) ? 0 : (unsigned) qos->durability_service.history.depth; + wrinfo->idxdepth = wrinfo->hdepth > wrinfo->tldepth ? wrinfo->hdepth : wrinfo->tldepth; + return wrinfo; +} + +void whc_free_wrinfo (struct whc_writer_info *wrinfo) +{ + ddsrt_free (wrinfo); +} + +struct whc *whc_new (struct q_globals *gv, const struct whc_writer_info *wrinfo) { size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */ struct whc_impl *whc; struct whc_intvnode *intv; - assert ((hdepth == 0 || tldepth <= hdepth) || is_transient_local); + assert ((wrinfo->hdepth == 0 || wrinfo->tldepth <= wrinfo->hdepth) || wrinfo->is_transient_local); whc = ddsrt_malloc (sizeof (*whc)); whc->common.ops = &whc_ops; ddsrt_mutex_init (&whc->lock); - whc->is_transient_local = is_transient_local ? 1 : 0; whc->xchecks = (gv->config.enabled_xchecks & DDS_XCHECK_WHC) != 0; whc->gv = gv; whc->tkmap = gv->m_tkmap; - whc->hdepth = hdepth; - whc->tldepth = tldepth; - whc->idxdepth = hdepth > tldepth ? hdepth : tldepth; + memcpy (&whc->wrinfo, wrinfo, sizeof (*wrinfo)); whc->seq_size = 0; whc->max_drop_seq = 0; whc->unacked_bytes = 0; whc->total_bytes = 0; whc->sample_overhead = sample_overhead; whc->fragment_size = gv->config.fragment_size; + whc->idx_hash = ddsrt_hh_new (1, whc_idxnode_hash_key, whc_idxnode_eq_key); #if USE_EHH whc->seq_hash = ddsrt_ehh_new (sizeof (struct whc_seq_entry), 32, whc_seq_entry_hash, whc_seq_entry_eq); #else whc->seq_hash = ddsrt_hh_new (1, whc_node_hash, whc_node_eq); #endif - if (whc->idxdepth > 0) - whc->idx_hash = ddsrt_hh_new (1, whc_idxnode_hash_key, whc_idxnode_eq_key); - else - whc->idx_hash = NULL; - #ifdef DDSI_INCLUDE_LIFESPAN lifespan_init (gv, &whc->lifespan, offsetof(struct whc_impl, lifespan), offsetof(struct whc_node, lifespan), whc_sample_expired_cb); #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + whc->deadline.dur = (wrinfo->writer != NULL) ? wrinfo->writer->m_entity.m_qos->deadline.deadline : DDS_INFINITY; + deadline_init (gv, &whc->deadline, offsetof(struct whc_impl, deadline), offsetof(struct whc_idxnode, deadline), whc_deadline_missed_cb); +#endif + /* seq interval tree: always has an "open" node */ ddsrt_avl_init (&whc_seq_treedef, &whc->seq); intv = ddsrt_malloc (sizeof (*intv)); @@ -450,14 +514,19 @@ void whc_default_free (struct whc *whc_generic) lifespan_fini (&whc->lifespan); #endif - if (whc->idx_hash) - { - struct ddsrt_hh_iter it; - struct whc_idxnode *n; - for (n = ddsrt_hh_iter_first (whc->idx_hash, &it); n != NULL; n = ddsrt_hh_iter_next (&it)) - ddsrt_free (n); - ddsrt_hh_free (whc->idx_hash); - } +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_stop (&whc->deadline); + ddsrt_mutex_lock (&whc->lock); + deadline_clear (&whc->deadline); + ddsrt_mutex_unlock (&whc->lock); + deadline_fini (&whc->deadline); +#endif + + struct ddsrt_hh_iter it; + struct whc_idxnode *idxn; + for (idxn = ddsrt_hh_iter_first (whc->idx_hash, &it); idxn != NULL; idxn = ddsrt_hh_iter_next (&it)) + ddsrt_free (idxn); + ddsrt_hh_free (whc->idx_hash); { struct whc_node *whcn = whc->maxseq_node; @@ -577,31 +646,19 @@ static seqno_t whc_default_next_seq (const struct whc *whc_generic, seqno_t seq) return nseq; } -static void delete_one_sample_from_idx (struct whc_impl *whc, struct whc_node *whcn) +static void delete_one_sample_from_idx (struct whc_node *whcn) { struct whc_idxnode * const idxn = whcn->idxnode; assert (idxn != NULL); assert (idxn->hist[idxn->headidx] != NULL); assert (idxn->hist[whcn->idxnode_pos] == whcn); - if (whcn->idxnode_pos != idxn->headidx) - idxn->hist[whcn->idxnode_pos] = NULL; - else - { -#ifndef NDEBUG - for (uint32_t i = 0; i < whc->idxdepth; i++) - assert (i == idxn->headidx || idxn->hist[i] == NULL); -#endif - if (!ddsrt_hh_remove (whc->idx_hash, idxn)) - assert (0); - ddsi_tkmap_instance_unref (whc->tkmap, idxn->tk); - ddsrt_free (idxn); - } + idxn->hist[whcn->idxnode_pos] = NULL; whcn->idxnode = NULL; } static void free_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn) { - for (uint32_t i = 0; i < whc->idxdepth; i++) + for (uint32_t i = 0; i < whc->wrinfo.idxdepth; i++) { if (idxn->hist[i]) { @@ -622,6 +679,9 @@ static void delete_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop { if (!ddsrt_hh_remove (whc->idx_hash, idxn)) assert (0); +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_unregister_instance_locked (&whc->deadline, &idxn->deadline); +#endif free_one_instance_from_idx (whc, max_drop_seq, idxn); } @@ -631,9 +691,9 @@ static int whcn_in_tlidx (const struct whc_impl *whc, const struct whc_idxnode * return 0; else { - uint32_t d = (idxn->headidx + (pos > idxn->headidx ? whc->idxdepth : 0)) - pos; - assert (d < whc->idxdepth); - return d < whc->tldepth; + uint32_t d = (idxn->headidx + (pos > idxn->headidx ? whc->wrinfo.idxdepth : 0)) - pos; + assert (d < whc->wrinfo.idxdepth); + return d < whc->wrinfo.tldepth; } } @@ -649,7 +709,7 @@ static uint32_t whc_default_downgrade_to_volatile (struct whc *whc_generic, stru ddsrt_mutex_lock (&whc->lock); check_whc (whc); - if (whc->idxdepth == 0) + if (whc->wrinfo.idxdepth == 0) { /* if not maintaining an index at all, this is nonsense */ get_state_locked (whc, st); @@ -657,19 +717,24 @@ static uint32_t whc_default_downgrade_to_volatile (struct whc *whc_generic, stru return 0; } - assert (!whc->is_transient_local); - if (whc->tldepth > 0) + assert (!whc->wrinfo.is_transient_local); + if (whc->wrinfo.tldepth > 0) { - assert (whc->hdepth == 0 || whc->tldepth <= whc->hdepth); - whc->tldepth = 0; - if (whc->hdepth == 0) + assert (whc->wrinfo.hdepth == 0 || whc->wrinfo.tldepth <= whc->wrinfo.hdepth); + whc->wrinfo.tldepth = 0; + if (whc->wrinfo.hdepth == 0) { struct ddsrt_hh_iter it; - struct whc_idxnode *n; - for (n = ddsrt_hh_iter_first (whc->idx_hash, &it); n != NULL; n = ddsrt_hh_iter_next (&it)) - free_one_instance_from_idx (whc, 0, n); + struct whc_idxnode *idxn; + for (idxn = ddsrt_hh_iter_first (whc->idx_hash, &it); idxn != NULL; idxn = ddsrt_hh_iter_next (&it)) + { +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_unregister_instance_locked (&whc->deadline, &idxn->deadline); +#endif + free_one_instance_from_idx (whc, 0, idxn); + } ddsrt_hh_free (whc->idx_hash); - whc->idxdepth = 0; + whc->wrinfo.idxdepth = 0; whc->idx_hash = NULL; } } @@ -711,7 +776,7 @@ static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_i /* If it is in the tlidx, take it out. Transient-local data never gets here */ if (whcn->idxnode) - delete_one_sample_from_idx (whc, whcn); + delete_one_sample_from_idx (whcn); if (whcn->unacked) { assert (whc->unacked_bytes >= whcn->size); @@ -927,15 +992,27 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se struct whc_node deferred_list_head, *last_to_free = &deferred_list_head; uint32_t ndropped = 0; - if (whc->is_transient_local && whc->tldepth == 0) + whcn = find_nextseq_intv (&intv, whc, whc->max_drop_seq); + if (whc->wrinfo.is_transient_local && whc->wrinfo.tldepth == 0) { - /* KEEP_ALL on transient local, so we can never ever delete anything */ - TRACE (" KEEP_ALL transient-local: do nothing\n"); + /* KEEP_ALL on transient local, so we can never ever delete anything, but + we have to ack the data in whc */ + TRACE (" KEEP_ALL transient-local: ack data\n"); + while (whcn && whcn->seq <= max_drop_seq) + { + if (whcn->unacked) + { + assert (whc->unacked_bytes >= whcn->size); + whc->unacked_bytes -= whcn->size; + whcn->unacked = 0; + } + whcn = whcn->next_seq; + } + whc->max_drop_seq = max_drop_seq; *deferred_free_list = NULL; return 0; } - whcn = find_nextseq_intv (&intv, whc, whc->max_drop_seq); deferred_list_head.next_seq = NULL; prev_seq = whcn ? whcn->prev_seq : NULL; while (whcn && whcn->seq <= max_drop_seq) @@ -982,10 +1059,10 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se the T-L history but that are not anymore. Writing new samples will eventually push these out, but if the difference is large and the update rate low, it may take a long time. Thus, we had better prune them. */ - if (whc->tldepth > 0 && whc->idxdepth > whc->tldepth) + if (whc->wrinfo.tldepth > 0 && whc->wrinfo.idxdepth > whc->wrinfo.tldepth) { - assert (whc->hdepth == whc->idxdepth); - TRACE (" idxdepth %"PRIu32" > tldepth %"PRIu32" > 0 -- must prune\n", whc->idxdepth, whc->tldepth); + assert (whc->wrinfo.hdepth == whc->wrinfo.idxdepth); + TRACE (" idxdepth %"PRIu32" > tldepth %"PRIu32" > 0 -- must prune\n", whc->wrinfo.idxdepth, whc->wrinfo.tldepth); /* Do a second pass over the sequence number range we just processed: this time we only encounter samples that were retained because of the transient-local durability setting @@ -1010,11 +1087,11 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se idxn->prune_seq = max_drop_seq; idx = idxn->headidx; - cnt = whc->idxdepth - whc->tldepth; + cnt = whc->wrinfo.idxdepth - whc->wrinfo.tldepth; while (cnt--) { struct whc_node *oldn; - if (++idx == whc->idxdepth) + if (++idx == whc->wrinfo.idxdepth) idx = 0; if ((oldn = idxn->hist[idx]) != NULL) { @@ -1061,12 +1138,16 @@ static uint32_t whc_default_remove_acked_messages (struct whc *whc_generic, seqn get_state_locked (whc, &tmp); TRACE ("whc_default_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq); TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n", - tmp.min_seq, tmp.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth); + tmp.min_seq, tmp.max_seq, whc->max_drop_seq, whc->wrinfo.hdepth, whc->wrinfo.tldepth); } check_whc (whc); - if (whc->idxdepth == 0) + /* In case a deadline is set, a sample may be added to whc temporarily, which could be + stored in acked state. The _noidx variant of removing messages assumes that unacked + data exists in whc. So in case of a deadline, the _full variant is used instead, + even when index depth is 0 */ + if (whc->wrinfo.idxdepth == 0 && !whc->wrinfo.has_deadline && !whc->wrinfo.is_transient_local) cnt = whc_default_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list); else cnt = whc_default_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list); @@ -1080,8 +1161,6 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma struct whc_node *newn = NULL; #ifndef DDSI_INCLUDE_LIFESPAN - /* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info' - that contains both lifespan and deadline info of the writer */ DDSRT_UNUSED_ARG (exp); #endif @@ -1172,7 +1251,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" exp %"PRId64" plist %p serdata %p:%"PRIx32")\n", (void *) whc, max_drop_seq, seq, exp.v, (void *) plist, (void *) serdata, serdata->hash); TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n", - whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth); + whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->wrinfo.hdepth, whc->wrinfo.tldepth); } assert (max_drop_seq < MAX_SEQ_NUMBER); @@ -1189,7 +1268,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se TRACE (" whcn %p:", (void*)newn); /* Special case of empty data (such as commit messages) can't go into index, and if we're not maintaining an index, we're done, too */ - if (serdata->kind == SDK_EMPTY || whc->idxdepth == 0) + if (serdata->kind == SDK_EMPTY) { TRACE (" empty or no hist\n"); ddsrt_mutex_unlock (&whc->lock); @@ -1215,42 +1294,50 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se } else { - struct whc_node *oldn; - if (++idxn->headidx == whc->idxdepth) - idxn->headidx = 0; - if ((oldn = idxn->hist[idxn->headidx]) != NULL) +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_renew_instance_locked (&whc->deadline, &idxn->deadline); +#endif + if (whc->wrinfo.idxdepth > 0) { - TRACE (" overwrite whcn %p", (void *)oldn); - oldn->idxnode = NULL; - } - idxn->hist[idxn->headidx] = newn; - newn->idxnode = idxn; - newn->idxnode_pos = idxn->headidx; - - if (oldn && (whc->hdepth > 0 || oldn->seq <= max_drop_seq)) - { - TRACE (" prune whcn %p", (void *)oldn); - assert (oldn != whc->maxseq_node); - whc_delete_one (whc, oldn); - } - - /* Special case for dropping everything beyond T-L history when the new sample is being - auto-acknowledged (for lack of reliable readers), and the keep-last T-L history is - shallower than the keep-last regular history (normal path handles this via pruning in - whc_default_remove_acked_messages, but that never happens when there are no readers). */ - if (seq <= max_drop_seq && whc->tldepth > 0 && whc->idxdepth > whc->tldepth) - { - uint32_t pos = idxn->headidx + whc->idxdepth - whc->tldepth; - if (pos >= whc->idxdepth) - pos -= whc->idxdepth; - if ((oldn = idxn->hist[pos]) != NULL) + struct whc_node *oldn; + if (++idxn->headidx == whc->wrinfo.idxdepth) + idxn->headidx = 0; + if ((oldn = idxn->hist[idxn->headidx]) != NULL) { - TRACE (" prune tl whcn %p", (void *)oldn); - assert (oldn != whc->maxseq_node); - whc_delete_one (whc, oldn); + TRACE (" overwrite whcn %p", (void *)oldn); + oldn->idxnode = NULL; } + idxn->hist[idxn->headidx] = newn; + newn->idxnode = idxn; + newn->idxnode_pos = idxn->headidx; + + if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && whc->wrinfo.tldepth > 0) + { + TRACE (" prune whcn %p", (void *)oldn); + assert (oldn != whc->maxseq_node || whc->wrinfo.has_deadline); + whc_delete_one (whc, oldn); + if (oldn == whc->maxseq_node) + whc->maxseq_node = whc_findmax_procedurally (whc); + } + + /* Special case for dropping everything beyond T-L history when the new sample is being + auto-acknowledged (for lack of reliable readers), and the keep-last T-L history is + shallower than the keep-last regular history (normal path handles this via pruning in + whc_default_remove_acked_messages, but that never happens when there are no readers). */ + if (seq <= max_drop_seq && whc->wrinfo.tldepth > 0 && whc->wrinfo.idxdepth > whc->wrinfo.tldepth) + { + uint32_t pos = idxn->headidx + whc->wrinfo.idxdepth - whc->wrinfo.tldepth; + if (pos >= whc->wrinfo.idxdepth) + pos -= whc->wrinfo.idxdepth; + if ((oldn = idxn->hist[pos]) != NULL) + { + TRACE (" prune tl whcn %p", (void *)oldn); + assert (oldn != whc->maxseq_node); + whc_delete_one (whc, oldn); + } + } + TRACE ("\n"); } - TRACE ("\n"); } } else @@ -1259,20 +1346,26 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se /* Ignore unregisters, but insert everything else */ if (!(serdata->statusinfo & NN_STATUSINFO_UNREGISTER)) { - idxn = ddsrt_malloc (sizeof (*idxn) + whc->idxdepth * sizeof (idxn->hist[0])); + idxn = ddsrt_malloc (sizeof (*idxn) + whc->wrinfo.idxdepth * sizeof (idxn->hist[0])); TRACE (" idxn %p", (void *)idxn); ddsi_tkmap_instance_ref (tk); idxn->iid = tk->m_iid; idxn->tk = tk; idxn->prune_seq = 0; idxn->headidx = 0; - idxn->hist[0] = newn; - for (uint32_t i = 1; i < whc->idxdepth; i++) - idxn->hist[i] = NULL; - newn->idxnode = idxn; - newn->idxnode_pos = 0; + if (whc->wrinfo.idxdepth > 0) + { + idxn->hist[0] = newn; + for (uint32_t i = 1; i < whc->wrinfo.idxdepth; i++) + idxn->hist[i] = NULL; + newn->idxnode = idxn; + newn->idxnode_pos = 0; + } if (!ddsrt_hh_add (whc->idx_hash, idxn)) assert (0); +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + deadline_register_instance_locked (&whc->deadline, &idxn->deadline, now_mt ()); +#endif } else { diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 0dbe254..1e023c1 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -50,9 +50,9 @@ static dds_return_t dds_writer_status_validate (uint32_t mask) then status conditions is not triggered. */ -static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data) +void dds_writer_status_cb (void *entity, const struct status_cb_data *data) { - dds_writer * const wr = ventity; + dds_writer * const wr = entity; /* When data is NULL, it means that the DDSI reader is deleted. */ if (data == NULL) @@ -223,7 +223,12 @@ static dds_return_t validate_writer_qos (const dds_qos_t *wqos) #ifndef DDSI_INCLUDE_LIFESPAN if (wqos != NULL && (wqos->present & QP_LIFESPAN) && wqos->lifespan.duration != DDS_INFINITY) return DDS_RETCODE_BAD_PARAMETER; -#else +#endif +#ifndef DDSI_INCLUDE_DEADLINE_MISSED + if (wqos != NULL && (wqos->present & QP_DEADLINE) && wqos->deadline.deadline != DDS_INFINITY) + return DDS_RETCODE_BAD_PARAMETER; +#endif +#if defined(DDSI_INCLUDE_LIFESPAN) && defined(DDSI_INCLUDE_DEADLINE_MISSED) DDSRT_UNUSED_ARG (wqos); #endif return DDS_RETCODE_OK; @@ -246,30 +251,6 @@ static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, boo return DDS_RETCODE_OK; } -static struct whc *make_whc (struct dds_domain *dom, const dds_qos_t *qos) -{ - bool handle_as_transient_local; - uint32_t hdepth, tldepth; - /* Construct WHC -- if aggressive_keep_last1 is set, the WHC will - drop all samples for which a later update is available. This - forces it to maintain a tlidx. */ - handle_as_transient_local = (qos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL); - if (qos->history.kind == DDS_HISTORY_KEEP_ALL) - hdepth = 0; - else - hdepth = (unsigned) qos->history.depth; - if (!handle_as_transient_local) - tldepth = 0; - else - { - if (qos->durability_service.history.kind == DDS_HISTORY_KEEP_ALL) - tldepth = 0; - else - tldepth = (unsigned) qos->durability_service.history.depth; - } - return whc_new (&dom->gv, handle_as_transient_local, hdepth, tldepth); -} - const struct dds_entity_deriver dds_entity_deriver_writer = { .interrupt = dds_writer_interrupt, .close = dds_writer_close, @@ -288,6 +269,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit dds_participant *pp; dds_topic *tp; dds_entity_t publisher; + struct whc_writer_info *wrinfo; { dds_entity *p_or_p; @@ -348,7 +330,9 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit wr->m_topic = tp; dds_entity_add_ref_locked (&tp->m_entity); wr->m_xp = nn_xpack_new (conn, get_bandwidth_limit (wqos->transport_priority), pub->m_entity.m_domain->gv.config.xpack_send_async); - wr->m_whc = make_whc (pub->m_entity.m_domain, wqos); + wrinfo = whc_make_wrinfo (wr, wqos); + wr->m_whc = whc_new (&pub->m_entity.m_domain->gv, wrinfo); + whc_free_wrinfo (wrinfo); wr->whc_batch = pub->m_entity.m_domain->gv.config.whc_batch; thread_state_awake (lookup_thread_state (), &pub->m_entity.m_domain->gv); diff --git a/src/core/ddsc/tests/CMakeLists.txt b/src/core/ddsc/tests/CMakeLists.txt index bb6dbe7..3f902dd 100644 --- a/src/core/ddsc/tests/CMakeLists.txt +++ b/src/core/ddsc/tests/CMakeLists.txt @@ -51,6 +51,7 @@ set(ddsc_test_sources "unsupported.c" "waitset.c" "waitset_torture.c" + "whc.c" "write.c" "write_various_types.c" "writer.c") @@ -59,6 +60,10 @@ if(ENABLE_LIFESPAN) list(APPEND ddsc_test_sources "lifespan.c") endif() +if(ENABLE_DEADLINE_MISSED) + list(APPEND ddsc_test_sources "deadline.c") +endif() + add_cunit_executable(cunit_ddsc ${ddsc_test_sources}) target_include_directories( cunit_ddsc PRIVATE diff --git a/src/core/ddsc/tests/Space.idl b/src/core/ddsc/tests/Space.idl index 3ebbc48..d4d0149 100644 --- a/src/core/ddsc/tests/Space.idl +++ b/src/core/ddsc/tests/Space.idl @@ -22,6 +22,12 @@ module Space { long long_3; }; #pragma keylist Type2 long_1 + struct Type3 { + long long_1; + long long_2; + long long_3; + }; +#pragma keylist Type3 struct simpletypes { long l; diff --git a/src/core/ddsc/tests/deadline.c b/src/core/ddsc/tests/deadline.c new file mode 100644 index 0000000..818847a --- /dev/null +++ b/src/core/ddsc/tests/deadline.c @@ -0,0 +1,484 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include + +#include "dds/dds.h" +#include "CUnit/Theory.h" +#include "Space.h" + +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/threads.h" +#include "dds/ddsrt/environ.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_whc.h" +#include "dds__entity.h" + +#define MAX_RUNS 4 +#define WRITER_DEADLINE DDS_MSECS(50) + +#define DDS_DOMAINID_PUB 0 +#define DDS_DOMAINID_SUB 1 +#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}0" + +static dds_entity_t g_domain = 0; +static dds_entity_t g_participant = 0; +static dds_entity_t g_subscriber = 0; +static dds_entity_t g_publisher = 0; +static dds_entity_t g_topic = 0; +static dds_qos_t *g_qos; +static dds_entity_t g_remote_domain = 0; +static dds_entity_t g_remote_participant = 0; +static dds_entity_t g_remote_subscriber = 0; +static dds_entity_t g_remote_topic = 0; + + +static char * create_topic_name(const char *prefix, char *name, size_t size) +{ + ddsrt_pid_t pid = ddsrt_getpid(); + ddsrt_tid_t tid = ddsrt_gettid(); + (void) snprintf(name, size, "%s_pid%"PRIdPID"_tid%"PRIdTID"", prefix, pid, tid); + return name; +} + +static void sync_reader_writer(dds_entity_t participant, dds_entity_t reader, dds_entity_t writer) +{ + dds_attach_t triggered; + dds_return_t ret; + dds_entity_t waitset_rd = dds_create_waitset(participant); + CU_ASSERT_FATAL(waitset_rd > 0); + dds_entity_t waitset_wr = dds_create_waitset(g_participant); + CU_ASSERT_FATAL(waitset_wr > 0); + + /* Sync reader to writer. */ + ret = dds_set_status_mask(reader, DDS_SUBSCRIPTION_MATCHED_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_attach(waitset_rd, reader, reader); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_wait(waitset_rd, &triggered, 1, DDS_SECS(1)); + CU_ASSERT_EQUAL_FATAL(ret, 1); + CU_ASSERT_EQUAL_FATAL(reader, (dds_entity_t)(intptr_t)triggered); + ret = dds_waitset_detach(waitset_rd, reader); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + dds_delete(waitset_rd); + + /* Sync writer to reader. */ + ret = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_attach(waitset_wr, writer, writer); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_waitset_wait(waitset_wr, &triggered, 1, DDS_SECS(1)); + CU_ASSERT_EQUAL_FATAL(ret, 1); + CU_ASSERT_EQUAL_FATAL(writer, (dds_entity_t)(intptr_t)triggered); + ret = dds_waitset_detach(waitset_wr, writer); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + dds_delete(waitset_wr); +} + +static dds_entity_t create_and_sync_reader(dds_entity_t participant, dds_entity_t subscriber, dds_entity_t topic, dds_qos_t *qos, dds_entity_t writer) +{ + dds_entity_t reader = dds_create_reader(subscriber, topic, qos, NULL); + CU_ASSERT_FATAL(reader > 0); + sync_reader_writer (participant, reader, writer); + dds_return_t ret = dds_set_status_mask(reader, DDS_REQUESTED_DEADLINE_MISSED_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + return reader; +} + +static void deadline_init(void) +{ + char name[100]; + + /* Domains for pub and sub use a different domain id, but the portgain setting + * in configuration is 0, so that both domains will map to the same port number. + * This allows to create two domains in a single test process. */ + char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB); + char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB); + g_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub); + g_remote_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub); + dds_free(conf_pub); + dds_free(conf_sub); + + g_qos = dds_create_qos(); + CU_ASSERT_PTR_NOT_NULL_FATAL(g_qos); + + g_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL); + CU_ASSERT_FATAL(g_participant > 0); + g_remote_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL); + CU_ASSERT_FATAL(g_remote_participant > 0); + + g_subscriber = dds_create_subscriber(g_participant, NULL, NULL); + CU_ASSERT_FATAL(g_subscriber > 0); + + g_remote_subscriber = dds_create_subscriber(g_remote_participant, NULL, NULL); + CU_ASSERT_FATAL(g_remote_subscriber > 0); + + g_publisher = dds_create_publisher(g_participant, NULL, NULL); + CU_ASSERT_FATAL(g_publisher > 0); + + create_topic_name("ddsc_qos_deadline_test", name, sizeof name); + g_topic = dds_create_topic(g_participant, &Space_Type1_desc, name, NULL, NULL); + CU_ASSERT_FATAL(g_topic > 0); + g_remote_topic = dds_create_topic(g_remote_participant, &Space_Type1_desc, name, NULL, NULL); + CU_ASSERT_FATAL(g_remote_topic > 0); + + dds_qset_history(g_qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED); + dds_qset_durability(g_qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_reliability(g_qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY); +} + +static void deadline_fini(void) +{ + dds_delete_qos(g_qos); + dds_delete(g_subscriber); + dds_delete(g_remote_subscriber); + dds_delete(g_publisher); + dds_delete(g_topic); + dds_delete(g_participant); + dds_delete(g_remote_participant); + dds_delete(g_domain); + dds_delete(g_remote_domain); +} + +static void msg(const char *msg, ...) +{ + va_list args; + dds_time_t t; + t = dds_time(); + printf("%d.%06d ", (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); + va_start(args, msg); + vprintf(msg, args); + va_end(args); + printf("\n"); +} + +static void sleepfor(dds_duration_t sleep_dur) +{ + dds_sleepfor (sleep_dur); + msg("after sleeping %"PRId64, sleep_dur); +} + +static bool check_missed_deadline_reader(dds_entity_t reader, uint32_t exp_missed_total, int32_t exp_missed_change) +{ + struct dds_requested_deadline_missed_status dstatus; + dds_return_t ret = dds_get_requested_deadline_missed_status(reader, &dstatus); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + msg("- check reader total actual %u == expected %u / change actual %d == expected %d", dstatus.total_count, exp_missed_total, dstatus.total_count_change, exp_missed_change); + return dstatus.total_count == exp_missed_total && dstatus.total_count_change == exp_missed_change; +} + +static bool check_missed_deadline_writer(dds_entity_t writer, uint32_t exp_missed_total, int32_t exp_missed_change) +{ + struct dds_offered_deadline_missed_status dstatus; + dds_return_t ret = dds_get_offered_deadline_missed_status(writer, &dstatus); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + msg("- check writer total actual %u == expected %u / change actual %d == expected %d", dstatus.total_count, exp_missed_total, dstatus.total_count_change, exp_missed_change); + return dstatus.total_count == exp_missed_total && dstatus.total_count_change == exp_missed_change; +} + +CU_Test(ddsc_deadline, basic, .init=deadline_init, .fini=deadline_fini) +{ + Space_Type1 sample = { 0, 0, 0 }; + dds_entity_t reader, reader_remote, reader_dl, reader_dl_remote, writer; + dds_return_t ret; + dds_duration_t deadline_dur = WRITER_DEADLINE; + uint32_t run = 1; + bool test_finished = false; + + do + { + msg("deadline test: duration %"PRId64, deadline_dur); + + dds_qset_deadline(g_qos, deadline_dur); + writer = dds_create_writer(g_publisher, g_topic, g_qos, NULL); + CU_ASSERT_FATAL(writer > 0); + + reader_dl = create_and_sync_reader(g_participant, g_subscriber, g_topic, g_qos, writer); + reader_dl_remote = create_and_sync_reader(g_remote_participant, g_remote_subscriber, g_remote_topic, g_qos, writer); + + dds_qset_deadline(g_qos, DDS_INFINITY); + reader = create_and_sync_reader(g_participant, g_subscriber, g_topic, g_qos, writer); + reader_remote = create_and_sync_reader(g_remote_participant, g_remote_subscriber, g_remote_topic, g_qos, writer); + + ret = dds_set_status_mask(writer, DDS_OFFERED_DEADLINE_MISSED_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + + /* Write first sample */ + msg("write sample 1"); + ret = dds_write (writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + + /* Sleep 0.5 * deadline_dur: expect no missed deadlines for reader and writer */ + sleepfor(deadline_dur / 2); + if (!check_missed_deadline_reader(reader, 0, 0) || + !check_missed_deadline_reader(reader_remote, 0, 0) || + !check_missed_deadline_reader(reader_dl, 0, 0) || + !check_missed_deadline_reader(reader_dl_remote, 0, 0) || + !check_missed_deadline_writer(writer, 0, 0)) + deadline_dur *= 10 / (run + 1); + else + { + /* Write another sample */ + msg("write sample 2"); + ret = dds_write (writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + + /* Sleep 0.5 * deadline_dur: expect no missed deadlines for reader and writer */ + sleepfor(deadline_dur / 2); + if (!check_missed_deadline_reader(reader, 0, 0) || + !check_missed_deadline_reader(reader_remote, 0, 0) || + !check_missed_deadline_reader(reader_dl, 0, 0) || + !check_missed_deadline_reader(reader_dl_remote, 0, 0) || + !check_missed_deadline_writer(writer, 0, 0)) + deadline_dur *= 10 / (run + 1); + else + { + /* Sleep deadline_dur: expect deadline reader to have 1 missed deadline */ + sleepfor(deadline_dur); + if (!check_missed_deadline_reader(reader, 0, 0) || + !check_missed_deadline_reader(reader_remote, 0, 0) || + !check_missed_deadline_reader(reader_dl, 1, 1) || + !check_missed_deadline_reader(reader_dl_remote, 1, 1) || + !check_missed_deadline_writer(writer, 1, 1)) + deadline_dur *= 10 / (run + 1); + else + { + /* Sleep another 2 * deadline_duration: expect 2 new triggers for missed deadline for both reader and writer */ + sleepfor(2 * deadline_dur); + if (!check_missed_deadline_reader(reader, 0, 0) || + !check_missed_deadline_reader(reader_remote, 0, 0) || + !check_missed_deadline_reader(reader_dl, 3, 2) || + !check_missed_deadline_reader(reader_dl_remote, 3, 2) || + !check_missed_deadline_writer(writer, 3, 2)) + deadline_dur *= 10 / (run + 1); + else + test_finished = true; + } + } + } + + dds_delete(reader); + dds_delete(writer); + + if (!test_finished) + { + if (++run > MAX_RUNS) + { + msg("run limit reached, test failed"); + CU_FAIL_FATAL("Run limit reached"); + test_finished = true; + } + else + { + msg("restarting test with deadline duration %"PRId64, deadline_dur); + sleepfor(deadline_dur); + } + } + } while (!test_finished); +} + +#define V DDS_DURABILITY_VOLATILE +#define TL DDS_DURABILITY_TRANSIENT_LOCAL +#define R DDS_RELIABILITY_RELIABLE +#define BE DDS_RELIABILITY_BEST_EFFORT +#define KA DDS_HISTORY_KEEP_ALL +#define KL DDS_HISTORY_KEEP_LAST +CU_TheoryDataPoints(ddsc_deadline, writer_types) = { + CU_DataPoints(dds_durability_kind_t, V, V, V, V, TL, TL, TL, TL), + CU_DataPoints(dds_reliability_kind_t, BE, BE, R, R, BE, BE, R, R), + CU_DataPoints(dds_history_kind_t, KA, KL, KA, KL, KA, KL, KA, KL) +}; +#undef V +#undef TL +#undef R +#undef BE +#undef KA +#undef KL +CU_Theory((dds_durability_kind_t dur_kind, dds_reliability_kind_t rel_kind, dds_history_kind_t hist_kind), ddsc_deadline, writer_types, .init = deadline_init, .fini = deadline_fini) +{ + Space_Type1 sample = { 0, 0, 0 }; + dds_entity_t reader, writer; + dds_qos_t *qos; + dds_return_t ret; + void * samples[1]; + dds_sample_info_t info; + Space_Type1 rd_sample; + samples[0] = &rd_sample; + struct dds_offered_deadline_missed_status dstatus; + uint32_t run = 1; + dds_duration_t deadline_dur = WRITER_DEADLINE; + bool test_finished = false; + + do + { + msg("deadline test: duration %"PRId64", writer type %d %d %s", deadline_dur, dur_kind, rel_kind, hist_kind == DDS_HISTORY_KEEP_ALL ? "all" : "1"); + + qos = dds_create_qos(); + CU_ASSERT_PTR_NOT_NULL_FATAL(qos); + dds_qset_durability(qos, dur_kind); + dds_qset_reliability(qos, rel_kind, DDS_INFINITY); + dds_qset_history(qos, hist_kind, (hist_kind == DDS_HISTORY_KEEP_ALL) ? 0 : 1); + dds_qset_deadline(qos, deadline_dur); + writer = dds_create_writer(g_publisher, g_topic, qos, NULL); + CU_ASSERT_FATAL(writer > 0); + reader = create_and_sync_reader(g_participant, g_subscriber, g_topic, qos, writer); + + /* Set status mask on writer to get offered deadline missed status */ + ret = dds_set_status_mask(writer, DDS_OFFERED_DEADLINE_MISSED_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + + /* Write sample */ + ret = dds_write (writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + + /* Take sample */ + ret = dds_take (reader, samples, &info, 1, 1); + CU_ASSERT_EQUAL_FATAL (ret, 1); + + /* Sleep 2 * deadline_dur: expect missed deadlines for writer */ + sleepfor(2 * deadline_dur); + ret = dds_get_offered_deadline_missed_status(writer, &dstatus); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + msg("- check writer total actual %u > 0 / change actual %d > 0", dstatus.total_count, dstatus.total_count_change); + if (dstatus.total_count == 0 || dstatus.total_count_change == 0) + deadline_dur *= 10 / (run + 1); + else + { + uint32_t prev_cnt = dstatus.total_count; + + /* Sleep 3 * deadline_dur: expect more missed deadlines for writer */ + sleepfor(3 * deadline_dur); + ret = dds_get_offered_deadline_missed_status(writer, &dstatus); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + msg("- check reader total actual %u > expected %u / change actual %d > 0", dstatus.total_count, prev_cnt, dstatus.total_count_change); + if (dstatus.total_count <= prev_cnt || dstatus.total_count_change == 0) + deadline_dur *= 10 / (run + 1); + else + test_finished = true; + } + + dds_delete_qos(qos); + dds_delete(reader); + dds_delete(writer); + + if (!test_finished) + { + if (++run > MAX_RUNS) + { + msg("run limit reached, test failed"); + CU_FAIL_FATAL("Run limit reached"); + test_finished = true; + } + else + { + msg("restarting test with deadline duration %"PRId64, deadline_dur); + sleepfor(deadline_dur); + } + } + } while (!test_finished); +} + +CU_TheoryDataPoints(ddsc_deadline, instances) = { + CU_DataPoints(int32_t, 1, 10, 10, 100), /* instance count */ + CU_DataPoints(uint8_t, 0, 0, 4, 10), /* unregister every n-th instance */ + CU_DataPoints(uint8_t, 0, 0, 5, 20), /* dispose every n-th instance */ +}; +CU_Theory((int32_t n_inst, uint8_t unreg_nth, uint8_t dispose_nth), ddsc_deadline, instances, .init = deadline_init, .fini = deadline_fini, .timeout = 60) +{ + Space_Type1 sample = { 0, 0, 0 }; + dds_entity_t reader_dl, writer; + dds_return_t ret; + int32_t n, n_unreg, n_dispose, n_alive, run = 1; + bool test_finished = false; + dds_duration_t deadline_dur = WRITER_DEADLINE; + + do + { + msg("deadline test: duration %"PRId64", instance count %d, unreg %dth, dispose %dth", deadline_dur, n_inst, unreg_nth, dispose_nth); + dds_qset_deadline(g_qos, deadline_dur); + CU_ASSERT_PTR_NOT_NULL_FATAL(g_qos); + + writer = dds_create_writer(g_publisher, g_topic, g_qos, NULL); + CU_ASSERT_FATAL(writer > 0); + reader_dl = create_and_sync_reader(g_participant, g_subscriber, g_topic, g_qos, writer); + + /* Write first sample for each instance */ + n_unreg = n_dispose = 0; + for (n = 1; n <= n_inst; n++) + { + sample.long_1 = n; + ret = dds_write (writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + if (unreg_nth && n % unreg_nth == 0) + { + ret = dds_unregister_instance (writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + n_unreg++; + } + else if (dispose_nth && n % dispose_nth == 0) + { + ret = dds_dispose (writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + n_dispose++; + } + } + n_alive = n_inst - n_dispose - n_unreg; + + /* Sleep deadline_dur + 50% and check missed deadline count */ + sleepfor(3 * deadline_dur / 2); + if (!check_missed_deadline_reader(reader_dl, (uint32_t)n_alive, n_alive)) + deadline_dur *= 10 / (run + 1); + else + { + /* Sleep another deadline_dur: expect new trigger for missed deadline for all non-disposed instances */ + sleepfor(deadline_dur); + if (!check_missed_deadline_reader(reader_dl, 2 * (uint32_t)n_alive, n_alive)) + deadline_dur *= 10 / (run + 1); + else + { + /* Write second sample for all (including disposed) instances */ + for (n = 1; n <= n_inst; n++) + { + sample.long_1 = n; + ret = dds_write (writer, &sample); + CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK); + } + + /* Sleep deadline_dur + 25%: expect new trigger for missed deadline for non-disposed instances */ + sleepfor(5 * deadline_dur / 4); + if (!check_missed_deadline_reader(reader_dl, 2 * (uint32_t)n_alive + (uint32_t)n_inst, n_inst)) + deadline_dur *= 10 / (run + 1); + else + test_finished = true; + } + } + + dds_delete(reader_dl); + dds_delete(writer); + + if (!test_finished) + { + if (++run > MAX_RUNS) + { + msg("run limit reached, test failed"); + CU_FAIL_FATAL("Run limit reached"); + test_finished = true; + } + else + { + msg("restarting test with deadline duration %"PRId64, deadline_dur); + sleepfor(deadline_dur); + } + } + } while (!test_finished); +} diff --git a/src/core/ddsc/tests/whc.c b/src/core/ddsc/tests/whc.c new file mode 100644 index 0000000..212c51a --- /dev/null +++ b/src/core/ddsc/tests/whc.c @@ -0,0 +1,279 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include + +#include "dds/dds.h" +#include "CUnit/Theory.h" +#include "Space.h" + +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/threads.h" +#include "dds/ddsrt/environ.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_whc.h" +#include "dds__entity.h" + +#define DDS_DOMAINID_PUB 0 +#define DDS_DOMAINID_SUB 1 +#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}0" +#define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}cyclonedds_whc_test.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.logfinest0" + +#define SAMPLE_COUNT 5 +#define DEADLINE_DURATION DDS_MSECS(1) + +static uint32_t g_topic_nr = 0; +static dds_entity_t g_domain = 0; +static dds_entity_t g_participant = 0; +static dds_entity_t g_subscriber = 0; +static dds_entity_t g_publisher = 0; +static dds_qos_t *g_qos; +static dds_entity_t g_remote_domain = 0; +static dds_entity_t g_remote_participant = 0; +static dds_entity_t g_remote_subscriber = 0; + +static char *create_topic_name (const char *prefix, uint32_t nr, char *name, size_t size) +{ + /* Get unique g_topic name. */ + ddsrt_pid_t pid = ddsrt_getpid (); + ddsrt_tid_t tid = ddsrt_gettid (); + (void) snprintf (name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid); + return name; +} + +static void whc_init(void) +{ + /* Domains for pub and sub use a different domain id, but the portgain setting + * in configuration is 0, so that both domains will map to the same port number. + * This allows to create two domains in a single test process. */ + char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB); + char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB); + g_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub); + g_remote_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub); + dds_free(conf_pub); + dds_free(conf_sub); + + g_qos = dds_create_qos(); + CU_ASSERT_PTR_NOT_NULL_FATAL(g_qos); + + g_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL); + CU_ASSERT_FATAL(g_participant > 0); + g_remote_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL); + CU_ASSERT_FATAL(g_remote_participant > 0); + + g_subscriber = dds_create_subscriber(g_participant, NULL, NULL); + CU_ASSERT_FATAL(g_subscriber > 0); + + g_remote_subscriber = dds_create_subscriber(g_remote_participant, NULL, NULL); + CU_ASSERT_FATAL(g_remote_subscriber > 0); + + g_publisher = dds_create_publisher(g_participant, NULL, NULL); + CU_ASSERT_FATAL(g_publisher > 0); +} + +static void whc_fini (void) +{ + dds_delete_qos(g_qos); + dds_delete(g_subscriber); + dds_delete(g_remote_subscriber); + dds_delete(g_publisher); + dds_delete(g_participant); + dds_delete(g_remote_participant); + dds_delete(g_domain); + dds_delete(g_remote_domain); +} + +static dds_entity_t create_and_sync_reader(dds_entity_t subscriber, dds_entity_t topic, dds_qos_t *qos, dds_entity_t writer) +{ + dds_return_t ret; + dds_entity_t reader = dds_create_reader(subscriber, topic, qos, NULL); + CU_ASSERT_FATAL(reader > 0); + while (1) + { + dds_publication_matched_status_t st; + ret = dds_get_publication_matched_status (writer, &st); + CU_ASSERT_FATAL (ret == DDS_RETCODE_OK); + if (st.current_count_change == 1) + break; + dds_sleepfor (DDS_MSECS (1)); + } + return reader; +} + +static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max) +{ + struct dds_entity *wr_entity; + struct writer *wr; + struct whc_state whcst; + CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0); + thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv); + wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid); + CU_ASSERT_FATAL(wr != NULL); + assert(wr != NULL); /* for Clang's static analyzer */ + whc_get_state(wr->whc, &whcst); + thread_state_asleep(lookup_thread_state()); + dds_entity_unpin(wr_entity); + + printf(" -- final state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max); + CU_ASSERT_EQUAL_FATAL (whcst.unacked_bytes, 0); + CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min); + CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max); +} + + +#define V DDS_DURABILITY_VOLATILE +#define TL DDS_DURABILITY_TRANSIENT_LOCAL +#define R DDS_RELIABILITY_RELIABLE +#define BE DDS_RELIABILITY_BEST_EFFORT +#define KA DDS_HISTORY_KEEP_ALL +#define KL DDS_HISTORY_KEEP_LAST +static void test_whc_end_state(dds_durability_kind_t d, dds_reliability_kind_t r, dds_history_kind_t h, int32_t hd, dds_history_kind_t dh, + int32_t dhd, bool lrd, bool rrd, int32_t ni, bool k, bool dl) +{ + char name[100]; + Space_Type1 sample = { 0, 0, 0 }; + Space_Type3 sample_keyless = { 0, 0, 0 }; + dds_entity_t reader, reader_remote, writer; + dds_entity_t topic; + dds_entity_t remote_topic; + dds_return_t ret; + int32_t s, i; + + printf ("test_whc_end_state: %s, %s, %s(%d), durability %s(%d), readers: %u local, %u remote, instances: %u, key %u, deadline %"PRId64"\n", + d == V ? "volatile" : "TL", + r == BE ? "best-effort" : "reliable", + h == KA ? "keep-all" : "keep-last", h == KA ? 0 : hd, + dh == KA ? "keep-all" : "keep-last", dh == KA ? 0 : dhd, + lrd, rrd, ni, k, + dl ? DEADLINE_DURATION : INT64_C(-1)); + + dds_qset_durability (g_qos, d); + dds_qset_reliability (g_qos, r, DDS_INFINITY); + dds_qset_history (g_qos, h, h == KA ? 0 : hd); + dds_qset_deadline (g_qos, dl ? DEADLINE_DURATION : DDS_INFINITY); + dds_qset_durability_service (g_qos, 0, dh, dh == KA ? 0 : dhd, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + + create_topic_name ("ddsc_whc_end_state_test", g_topic_nr++, name, sizeof name); + topic = dds_create_topic (g_participant, k ? &Space_Type1_desc : &Space_Type3_desc, name, NULL, NULL); + CU_ASSERT_FATAL(topic > 0); + remote_topic = dds_create_topic (g_remote_participant, k ? &Space_Type1_desc : &Space_Type3_desc, name, NULL, NULL); + CU_ASSERT_FATAL(remote_topic > 0); + + writer = dds_create_writer (g_publisher, topic, g_qos, NULL); + CU_ASSERT_FATAL(writer > 0); + ret = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS); + CU_ASSERT_FATAL (ret == DDS_RETCODE_OK) + + reader = lrd ? create_and_sync_reader (g_subscriber, topic, g_qos, writer) : 0; + reader_remote = rrd ? create_and_sync_reader (g_remote_subscriber, remote_topic, g_qos, writer) : 0; + + for (s = 0; s < SAMPLE_COUNT; s++) + { + if (k) + for (i = 0; i < ni; i++) + { + sample.long_1 = (int32_t)i; + ret = dds_write (writer, &sample); + CU_ASSERT_FATAL (ret == DDS_RETCODE_OK); + } + else + { + ret = dds_write (writer, &sample_keyless); + CU_ASSERT_FATAL (ret == DDS_RETCODE_OK); + } + } + + /* delete readers, wait until no matching reader */ + if (rrd) + { + ret = dds_delete (reader_remote); + CU_ASSERT_FATAL (ret == DDS_RETCODE_OK); + } + if (lrd) + { + ret = dds_delete (reader); + CU_ASSERT_FATAL (ret == DDS_RETCODE_OK); + } + while (1) + { + dds_publication_matched_status_t st; + ret = dds_get_publication_matched_status (writer, &st); + CU_ASSERT_FATAL (ret == DDS_RETCODE_OK); + if (st.current_count == 0) + break; + dds_sleepfor (DDS_MSECS (1)); + } + + /* check whc state */ + int32_t exp_max = (d == TL) ? ni * SAMPLE_COUNT : -1; + int32_t exp_min = (d == TL) ? ((dh == KA) ? 1 : exp_max - dhd * ni + 1) : -1; + check_whc_state (writer, exp_min, exp_max); + + dds_delete (writer); + dds_delete (remote_topic); + dds_delete (topic); +} + +#define ARRAY_LEN(A) ((int32_t)(sizeof(A) / sizeof(A[0]))) +CU_Test(ddsc_whc, check_end_state, .init=whc_init, .fini=whc_fini, .timeout=30) +{ + dds_durability_kind_t dur[] = {V, TL}; + dds_reliability_kind_t rel[] = {BE, R}; + dds_history_kind_t hist[] = {KA, KL}; + dds_history_kind_t dhist[] = {KA, KL}; + int32_t hist_depth[] = {1, 3}; + int32_t dhist_depth[] = {1, 3}; + bool loc_rd[] = {false, true}; + bool rem_rd[] = {false, true}; + int32_t n_inst[] = {1, 3}; + bool keyed[] = {false, true}; +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + bool deadline[] = {false, true}; +#else + bool deadline[] = {false}; +#endif + int32_t i_d, i_r, i_h, i_hd, i_dh, i_dhd, i_lrd, i_rrd, i_ni, i_k, i_dl; + + for (i_d = 0; i_d < ARRAY_LEN(dur); i_d++) + for (i_r = 0; i_r < ARRAY_LEN(rel); i_r++) + for (i_h = 0; i_h < ARRAY_LEN(hist); i_h++) + for (i_hd = 0; i_hd < ARRAY_LEN(hist_depth); i_hd++) + for (i_dh = 0; i_dh < ARRAY_LEN(dhist); i_dh++) + for (i_dhd = 0; i_dhd < ARRAY_LEN(dhist_depth); i_dhd++) + for (i_lrd = 0; i_lrd < ARRAY_LEN(loc_rd); i_lrd++) + for (i_rrd = 0; i_rrd < ARRAY_LEN(rem_rd); i_rrd++) + for (i_ni = 0; i_ni < ARRAY_LEN(n_inst); i_ni++) + for (i_k = 0; i_k < ARRAY_LEN(keyed); i_k++) + for (i_dl = 0; i_dl < ARRAY_LEN(deadline); i_dl++) + { + if (rel[i_r] == BE && dur[i_d] == TL) + continue; + else if (hist[i_h] == KA && i_hd > 0) + continue; + else if (dhist[i_dh] == KA && i_dhd > 0) + continue; + else + { + test_whc_end_state (dur[i_d], rel[i_r], hist[i_h], hist_depth[i_hd], dhist[i_dh], dhist_depth[i_dhd], + loc_rd[i_lrd], rem_rd[i_rrd], keyed[i_k] ? n_inst[i_ni] : 1, keyed[i_k], deadline[i_dl]); + } + } +} + +#undef ARRAY_LEN +#undef V +#undef TL +#undef R +#undef BE +#undef KA +#undef KL diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index 9f85d8e..464f39b 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -30,6 +30,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" ddsi_rhc.c ddsi_pmd.c ddsi_entity_index.c + ddsi_deadline.c q_addrset.c q_bitset_inlines.c q_bswap.c @@ -63,6 +64,10 @@ if(ENABLE_LIFESPAN) list(APPEND srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src/ddsi_lifespan.c") endif() +if(ENABLE_DEADLINE_MISSED) + list(APPEND srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src/ddsi_deadline.c") +endif() + # The includes should reside close to the code. As long as that's not the case, # pull them in from this CMakeLists.txt. PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" @@ -86,6 +91,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" ddsi_rhc.h ddsi_guid.h ddsi_entity_index.h + ddsi_deadline.h q_addrset.h q_bitset.h q_bswap.h @@ -125,6 +131,9 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" if(ENABLE_LIFESPAN) list(APPEND hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi/ddsi_lifespan.h") endif() +if(ENABLE_DEADLINE_MISSED) + list(APPEND hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi/ddsi_deadline.h") +endif() target_sources(ddsc PRIVATE ${srcs_ddsi} ${hdrs_private_ddsi}) diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_deadline.h b/src/core/ddsi/include/dds/ddsi/ddsi_deadline.h new file mode 100644 index 0000000..53c250d --- /dev/null +++ b/src/core/ddsi/include/dds/ddsi/ddsi_deadline.h @@ -0,0 +1,84 @@ +/* + * Copyright(c) 2006 to 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDSI_DEADLINE_H +#define DDSI_DEADLINE_H + +#include "dds/ddsrt/circlist.h" +#include "dds/ddsi/q_time.h" +#include "dds/ddsi/q_globals.h" +#include "dds/ddsi/q_xevent.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +typedef nn_mtime_t (*deadline_missed_cb_t)(void *hc, nn_mtime_t tnow); + +struct deadline_adm { + struct ddsrt_circlist list; /* linked list for deadline missed */ + struct xevent *evt; /* xevent that triggers when deadline expires for an instance */ + deadline_missed_cb_t deadline_missed_cb; /* callback for deadline missed; this cb can use deadline_next_missed_locked to get next instance that has a missed deadline */ + size_t list_offset; /* offset of deadline_adm element in whc or rhc */ + size_t elem_offset; /* offset of deadline_elem element in whc or rhc instance */ + dds_duration_t dur; /* deadline duration */ +}; + +struct deadline_elem { + struct ddsrt_circlist_elem e; + nn_mtime_t t_deadline; +}; + +DDS_EXPORT void deadline_init (const struct q_globals *gv, struct deadline_adm *deadline_adm, size_t list_offset, size_t elem_offset, deadline_missed_cb_t deadline_missed_cb); +DDS_EXPORT void deadline_stop (const struct deadline_adm *deadline_adm); +DDS_EXPORT void deadline_clear (struct deadline_adm *deadline_adm); +DDS_EXPORT void deadline_fini (const struct deadline_adm *deadline_adm); +DDS_EXPORT nn_mtime_t deadline_next_missed_locked (struct deadline_adm *deadline_adm, nn_mtime_t tnow, void **instance); +DDS_EXPORT void deadline_register_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tprev, nn_mtime_t tnow); +DDS_EXPORT void deadline_unregister_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem); +DDS_EXPORT void deadline_renew_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem); + +inline void deadline_register_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow) +{ + if (deadline_adm->dur != T_NEVER) + deadline_register_instance_real (deadline_adm, elem, tnow, tnow); +} + +inline void deadline_reregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow) +{ + if (deadline_adm->dur != T_NEVER) + deadline_register_instance_real (deadline_adm, elem, elem->t_deadline, tnow); +} + +inline void deadline_unregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem) +{ + if (deadline_adm->dur != T_NEVER) + { + assert (elem->t_deadline.v != T_NEVER); + deadline_unregister_instance_real (deadline_adm, elem); + } +} + +inline void deadline_renew_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem) +{ + if (deadline_adm->dur != T_NEVER) + { + assert (elem->t_deadline.v != T_NEVER); + deadline_renew_instance_real (deadline_adm, elem); + } +} + +#if defined (__cplusplus) +} +#endif + +#endif /* DDSI_DEADLINE_H */ + diff --git a/src/core/ddsi/src/ddsi_deadline.c b/src/core/ddsi/src/ddsi_deadline.c new file mode 100644 index 0000000..a4775c8 --- /dev/null +++ b/src/core/ddsi/src/ddsi_deadline.c @@ -0,0 +1,112 @@ +/* + * Copyright(c) 2006 to 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include "dds/ddsrt/circlist.h" +#include "dds/ddsi/ddsi_deadline.h" +#include "dds/ddsi/q_time.h" +#include "dds/ddsi/q_xevent.h" + +static void instance_deadline_missed_cb (struct xevent *xev, void *varg, nn_mtime_t tnow) +{ + struct deadline_adm * const deadline_adm = varg; + nn_mtime_t next_valid = deadline_adm->deadline_missed_cb((char *)deadline_adm - deadline_adm->list_offset, tnow); + resched_xevent_if_earlier (xev, next_valid); +} + +/* Gets the instance from the list in deadline admin that has the earliest missed deadline and + * removes the instance element from the list. If no more instances with missed deadline exist + * in the list, the deadline (nn_mtime_t) for the first instance to 'expire' is returned. If the + * list is empty, NN_MTIME_NEVER is returned */ +nn_mtime_t deadline_next_missed_locked (struct deadline_adm *deadline_adm, nn_mtime_t tnow, void **instance) +{ + struct deadline_elem *elem = NULL; + if (!ddsrt_circlist_isempty (&deadline_adm->list)) + { + struct ddsrt_circlist_elem *list_elem = ddsrt_circlist_oldest (&deadline_adm->list); + elem = DDSRT_FROM_CIRCLIST (struct deadline_elem, e, list_elem); + if (elem->t_deadline.v <= tnow.v) + { + ddsrt_circlist_remove (&deadline_adm->list, &elem->e); + if (instance != NULL) + *instance = (char *)elem - deadline_adm->elem_offset; + return (nn_mtime_t) { 0 }; + } + } + if (instance != NULL) + *instance = NULL; + return (elem != NULL) ? elem->t_deadline : NN_MTIME_NEVER; +} + +void deadline_init (const struct q_globals *gv, struct deadline_adm *deadline_adm, size_t list_offset, size_t elem_offset, deadline_missed_cb_t deadline_missed_cb) +{ + ddsrt_circlist_init (&deadline_adm->list); + deadline_adm->evt = qxev_callback (gv->xevents, NN_MTIME_NEVER, instance_deadline_missed_cb, deadline_adm); + deadline_adm->deadline_missed_cb = deadline_missed_cb; + deadline_adm->list_offset = list_offset; + deadline_adm->elem_offset = elem_offset; +} + +void deadline_stop (const struct deadline_adm *deadline_adm) +{ + delete_xevent_callback (deadline_adm->evt); +} + +void deadline_clear (struct deadline_adm *deadline_adm) +{ + while ((deadline_next_missed_locked (deadline_adm, NN_MTIME_NEVER, NULL)).v == 0); +} + +void deadline_fini (const struct deadline_adm *deadline_adm) +{ + assert (ddsrt_circlist_isempty (&deadline_adm->list)); + (void) deadline_adm; +} + +extern inline void deadline_register_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow); +extern inline void deadline_reregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow); + +void deadline_register_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tprev, nn_mtime_t tnow) +{ + ddsrt_circlist_append(&deadline_adm->list, &elem->e); + elem->t_deadline = (tprev.v + deadline_adm->dur >= tnow.v) ? tprev : tnow; + elem->t_deadline.v += deadline_adm->dur; + resched_xevent_if_earlier (deadline_adm->evt, elem->t_deadline); +} + +extern inline void deadline_unregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem); + +void deadline_unregister_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem) +{ + /* Updating the scheduled event with the new shortest expiry + * is not required, because the event will be rescheduled when + * this removed element expires. Only remove the element from the + * deadline list */ + + elem->t_deadline = NN_MTIME_NEVER; + ddsrt_circlist_remove(&deadline_adm->list, &elem->e); +} + +extern inline void deadline_renew_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem); + +void deadline_renew_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem) +{ + /* move element to end of the list (list->latest) and update deadline + according to current deadline duration in rhc (event with old deadline + will still be triggered, but has no effect on this instance because in + the callback the deadline (which will be the updated value) will be + checked for expiry */ + ddsrt_circlist_remove(&deadline_adm->list, &elem->e); + elem->t_deadline = now_mt(); + elem->t_deadline.v += deadline_adm->dur; + ddsrt_circlist_append(&deadline_adm->list, &elem->e); +} diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index e887e35..a4380c8 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -486,6 +486,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals * { struct participant *pp; ddsi_guid_t subguid, group_guid; + struct whc_writer_info *wrinfo; /* no reserved bits may be set */ assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0); @@ -575,7 +576,9 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals * if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); - new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->spdp_endpoint_xqos, whc_new(gv, 1, 1, 1), NULL, NULL); + wrinfo = whc_make_wrinfo (NULL, &gv->spdp_endpoint_xqos); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->spdp_endpoint_xqos, whc_new(gv, wrinfo), NULL, NULL); + whc_free_wrinfo (wrinfo); /* But we need the as_disc address set for SPDP, because we need to send it to everyone regardless of the existence of readers. */ { @@ -595,14 +598,15 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals * entidx_insert_participant_guid (gv->entity_index, pp); /* SEDP writers: */ + wrinfo = whc_make_wrinfo (NULL, &gv->builtin_endpoint_xqos_wr); if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER); - new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER); - new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER; } @@ -610,7 +614,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals * { /* TODO: make this one configurable, we don't want all participants to publish all topics (or even just those that they use themselves) */ subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER); - new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_TOPIC_ANNOUNCER; } @@ -618,10 +622,12 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals * if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER); - new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL); pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER; } + whc_free_wrinfo (wrinfo); + /* SPDP, SEDP, PMD readers: */ if (!(flags & RTPS_PF_NO_BUILTIN_READERS)) { @@ -3020,12 +3026,10 @@ static void gc_delete_writer (struct gcreq *gcreq) /* Do last gasp on SEDP and free writer. */ if (!is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE)) sedp_dispose_unregister_writer (wr); - if (wr->status_cb) - { - (wr->status_cb) (wr->status_cb_entity, NULL); - } - whc_free (wr->whc); + if (wr->status_cb) + (wr->status_cb) (wr->status_cb_entity, NULL); + #ifdef DDSI_INCLUDE_SSM if (wr->ssm_as) unref_addrset (wr->ssm_as); diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 0edf47f..14587a1 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -879,7 +879,8 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) { /* returns: < 0 on error, 0 if no need to insert in whc, > 0 if inserted */ - int do_insert, insres, res; + int insres, res = 0; + bool wr_deadline = false; ASSERT_MUTEX_HELD (&wr->e.lock); @@ -900,17 +901,15 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist } assert (wr->reliable || have_reliable_subs (wr) == 0); +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + /* If deadline missed duration is not infinite, the sample is inserted in + the whc so that the instance is created (or renewed) in the whc and the deadline + missed event is registered. The sample is removed immediately after inserting it + as we don't want to store it. */ + wr_deadline = wr->xqos->deadline.deadline != DDS_INFINITY; +#endif - if (wr->reliable && have_reliable_subs (wr)) - do_insert = 1; - else if (wr->handle_as_transient_local) - do_insert = 1; - else - do_insert = 0; - - if (!do_insert) - res = 0; - else + if ((wr->reliable && have_reliable_subs (wr)) || wr_deadline || wr->handle_as_transient_local) { nn_mtime_t exp = NN_MTIME_NEVER; #ifdef DDSI_INCLUDE_LIFESPAN @@ -921,6 +920,20 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist exp = add_duration_to_mtime(serdata->twrite, wr->xqos->lifespan.duration); #endif res = ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, exp, plist, serdata, tk)) < 0) ? insres : 1; + +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + if (!(wr->reliable && have_reliable_subs (wr)) && !wr->handle_as_transient_local) + { + /* Sample was inserted only because writer has deadline, so we'll remove the sample from whc */ + struct whc_node *deferred_free_list = NULL; + struct whc_state whcst; + uint32_t n = whc_remove_acked_messages (wr->whc, seq, &whcst, &deferred_free_list); + (void)n; + assert (n <= 1); + assert (whcst.min_seq == -1 && whcst.max_seq == -1); + whc_free_deferred_free_list (wr->whc, deferred_free_list); + } +#endif } #ifndef NDEBUG diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 69826a2..fc178d1 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -342,14 +342,20 @@ void delete_xevent_callback (struct xevent *ev) struct xeventq *evq = ev->evq; assert (ev->kind == XEVK_CALLBACK); ddsrt_mutex_lock (&evq->lock); - if (ev->tsched.v != T_NEVER) + /* wait until neither scheduled nor executing; loop in case the callback reschedules the event */ + while (ev->tsched.v != T_NEVER || ev->u.callback.executing) { - assert (ev->tsched.v != TSCHED_DELETE); - ddsrt_fibheap_delete (&evq_xevents_fhdef, &evq->xevents, ev); - ev->tsched.v = TSCHED_DELETE; + if (ev->tsched.v != T_NEVER) + { + assert (ev->tsched.v != TSCHED_DELETE); + ddsrt_fibheap_delete (&evq_xevents_fhdef, &evq->xevents, ev); + ev->tsched.v = T_NEVER; + } + if (ev->u.callback.executing) + { + ddsrt_cond_wait (&evq->cond, &evq->lock); + } } - while (ev->u.callback.executing) - ddsrt_cond_wait (&evq->cond, &evq->lock); ddsrt_mutex_unlock (&evq->lock); free_xevent (evq, ev); } diff --git a/src/core/xtests/rhc_torture/rhc_torture.c b/src/core/xtests/rhc_torture/rhc_torture.c index 080f993..8568eeb 100644 --- a/src/core/xtests/rhc_torture/rhc_torture.c +++ b/src/core/xtests/rhc_torture/rhc_torture.c @@ -107,7 +107,7 @@ static struct ddsi_serdata *mkkeysample (int32_t keyval, unsigned statusinfo) return sd; } -#ifdef DDSI_INCLUDE_LIFESPAN +#if defined(DDSI_INCLUDE_LIFESPAN) || defined (DDSI_INCLUDE_DEADLINE_MISSED) static nn_mtime_t rand_texp () { nn_mtime_t ret = now_mt(); @@ -116,6 +116,13 @@ static nn_mtime_t rand_texp () } #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED +static dds_duration_t rand_deadline () +{ + return (dds_duration_t) (ddsrt_prng_random (&prng) % DDS_MSECS(500)); +} +#endif + static uint64_t store (struct ddsi_tkmap *tkmap, struct dds_rhc *rhc, struct proxy_writer *wr, struct ddsi_serdata *sd, bool print, bool lifespan_expiry) { #ifndef DDSI_INCLUDE_LIFESPAN @@ -569,6 +576,9 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, dds_qos_t *qos = dds_create_qos (); dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, MAX_HIST_DEPTH); dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + dds_qset_deadline (qos, rand_deadline()); +#endif /* two identical readers because we need 63 conditions while we can currently only attach 32 a single reader */ dds_entity_t rd[] = { dds_create_reader (pp, tp, qos, NULL), dds_create_reader (pp, tp, qos, NULL) }; const size_t nrd = sizeof (rd) / sizeof (rd[0]); @@ -673,7 +683,8 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, [9] = "tkc", [10] = "tkc1", [11] = "delwr", - [12] = "drpxp" + [12] = "drpxp", + [13] = "dlmis" }; static const uint32_t opfreqs[] = { [0] = 500, /* write */ @@ -689,9 +700,14 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, [10] = 100, /* take cond, max 1 */ [11] = 1, /* unreg writer */ #ifdef DDSI_INCLUDE_LIFESPAN - [12] = 100 /* drop expired sample */ + [12] = 100, /* drop expired sample */ #else - [12] = 0 /* drop expired sample */ + [12] = 0, /* drop expired sample */ +#endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + [13] = 100 /* deadline missed */ +#else + [13] = 0 /* drop expired sample */ #endif }; uint32_t opthres[sizeof (opfreqs) / sizeof (opfreqs[0])]; @@ -829,6 +845,16 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count, for (size_t k = 0; k < nrd; k++) (void) dds_rhc_default_sample_expired_cb (rhc[k], rand_texp()); thread_state_asleep (lookup_thread_state ()); +#endif + break; + } + case 13: { +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + thread_state_awake_domain_ok (lookup_thread_state ()); + /* We can assume that rhc[k] is a dds_rhc_default at this point */ + for (size_t k = 0; k < nrd; k++) + (void) dds_rhc_default_deadline_missed_cb (rhc[k], rand_texp()); + thread_state_asleep (lookup_thread_state ()); #endif break; } diff --git a/src/mpt/tests/qos/procs/rw.c b/src/mpt/tests/qos/procs/rw.c index 10cceee..6f7ad0b 100644 --- a/src/mpt/tests/qos/procs/rw.c +++ b/src/mpt/tests/qos/procs/rw.c @@ -98,7 +98,11 @@ static void setqos (dds_qos_t *q, size_t i, bool isrd, bool create) #else dds_qset_lifespan (q, DDS_INFINITY); #endif +#ifdef DDSI_INCLUDE_DEADLINE_MISSED dds_qset_deadline (q, INT64_C (67890123456789012) + (int32_t) i); +#else + dds_qset_deadline (q, DDS_INFINITY); +#endif dds_qset_latency_budget (q, INT64_C (45678901234567890) + (int32_t) i); dds_qset_ownership (q, (dds_ownership_kind_t) ((i + 1) % 2)); dds_qset_ownership_strength (q, 0x12345670 + (int32_t) i);