Deadline Missed QoS implementation

This commit contains the implementation of the deadline QoS
for readers and writers. The description of this QoS in
the DDS specification (section 2.2.3.7):

"This policy is useful for cases where a Topic is expected to
have each instance updated periodically. On the publishing side this
setting establishes a contract that the application must meet.
On the subscribing side the setting establishes a minimum
requirement for the remote publishers that are expected to supply
the data values."

On the writer side, the deadline missed event also needs to trigger in
case only local readers exist. The implementation for this inserts
the sample in the writer history cache temporary, so that an instance
is created in the whc. Immediately after inserting the sample, it is
removed again. With the creation of the instance, the deadline missed event
is created, which will take care of triggering the deadline missed
callback if required. In case the instance already existed, the timer
of the event is renewed.

To verify the changes to the writer history cache, add an additional
test to check the write history cache state. This test checks the state
of the whc after writing samples by a writer with specific combinations
of qos settings. The state of the whc is checked for stored
samples (min/max sequence number) and the absence of unacked data, after
writing samples and wait for acks by the local and/or remote
readers (which is also a parameter for this test). This test is
introduced as part of the deadline implementation, but its scope is
wider than only the changes that were made in the whc implementation for
the deadline qos.

This test showed that even before the deadline support was added,
whc_default_remove_acked_messages_full data was not marked as acked in
case of transient-local keep-all. This resulted in data in whc that
never gets in acked state. This has been fixed as well.

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
Dennis Potman 2019-12-18 16:22:31 +01:00 committed by eboasson
parent 219cb6cf4f
commit 231cb8c9f7
21 changed files with 1417 additions and 187 deletions

View file

@ -142,33 +142,33 @@ windows_vs2017: &windows_vs2017
jobs:
include:
- <<: *linux_gcc8
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles", COVERITY_SCAN=true ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles", COVERITY_SCAN=true ]
if: type = cron
- <<: *linux_gcc8
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ]
- <<: *linux_gcc8
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ]
- <<: *linux_gcc8
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=NO, LIFESPAN=NO, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=NO, LIFESPAN=NO, DEADLINE=NO, GENERATOR="Unix Makefiles" ]
- <<: *linux_clang
env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ]
- <<: *linux_clang
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ]
- <<: *osx_xcode9
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ]
if: type = cron
- <<: *osx_xcode
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, GENERATOR="Unix Makefiles", MACOSX_DEPLOYMENT_TARGET=10.12 ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=NO, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles", MACOSX_DEPLOYMENT_TARGET=10.12 ]
- <<: *osx_xcode
env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=address, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ]
- <<: *osx_xcode
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Unix Makefiles" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Unix Makefiles" ]
- <<: *windows_vs2017
env: [ ARCH=x86, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Visual Studio 15 2017" ]
env: [ ARCH=x86, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Visual Studio 15 2017" ]
- <<: *windows_vs2017
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, GENERATOR="Visual Studio 15 2017 Win64" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Debug, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Visual Studio 15 2017 Win64" ]
- <<: *windows_vs2017
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, GENERATOR="Visual Studio 15 2017 Win64" ]
env: [ ARCH=x86_64, ASAN=none, BUILD_TYPE=Release, SSL=YES, LIFESPAN=YES, DEADLINE=YES, GENERATOR="Visual Studio 15 2017 Win64" ]
before_script:
- conan profile new default --detect
@ -200,6 +200,7 @@ script:
-DUSE_SANITIZER=${ASAN}
-DENABLE_SSL=${SSL}
-DENABLE_LIFESPAN=${LIFESPAN}
-DENABLE_DEADLINE_MISSED=${DEADLINE}
-DBUILD_TESTING=on
-DWERROR=on
-G "${GENERATOR}" ..

View file

@ -32,6 +32,11 @@ if(ENABLE_LIFESPAN)
add_definitions(-DDDSI_INCLUDE_LIFESPAN)
endif()
option(ENABLE_DEADLINE_MISSED "Enable Deadline Missed QoS support" ON)
if(ENABLE_DEADLINE_MISSED)
add_definitions(-DDDSI_INCLUDE_DEADLINE_MISSED)
endif()
# OpenSSL is huge, raising the RSS by 1MB or so, and moreover find_package(OpenSSL) causes
# trouble on some older CMake versions that otherwise work fine, so provide an option to avoid
# all OpenSSL related things.

View file

@ -25,7 +25,12 @@ struct rhc_sample;
DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks);
DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic);
#ifdef DDSI_INCLUDE_LIFESPAN
DDS_EXPORT nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow);
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
DDS_EXPORT nn_mtime_t dds_rhc_default_deadline_missed_cb(void *hc, nn_mtime_t tnow);
#endif
#if defined (__cplusplus)
}

View file

@ -19,7 +19,12 @@ extern "C" {
#endif
struct q_globals;
struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth);
struct whc_writer_info;
struct dds_writer;
struct whc *whc_new (struct q_globals *gv, const struct whc_writer_info *wrinfo);
struct whc_writer_info *whc_make_wrinfo (struct dds_writer *wr, const dds_qos_t *qos);
void whc_free_wrinfo (struct whc_writer_info *);
#if defined (__cplusplus)
}

View file

@ -20,6 +20,10 @@ extern "C" {
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER)
struct status_cb_data;
void dds_writer_status_cb (void *entity, const struct status_cb_data * data);
#if defined (__cplusplus)
}
#endif

View file

@ -71,9 +71,23 @@ static dds_return_t dds_reader_delete (dds_entity *e)
return DDS_RETCODE_OK;
}
static dds_return_t validate_reader_qos (const dds_qos_t *rqos)
{
#ifndef DDSI_INCLUDE_DEADLINE_MISSED
if (rqos != NULL && (rqos->present & QP_DEADLINE) && rqos->deadline.deadline != DDS_INFINITY)
return DDS_RETCODE_BAD_PARAMETER;
#else
DDSRT_UNUSED_ARG (rqos);
#endif
return DDS_RETCODE_OK;
}
static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled)
{
/* note: e->m_qos is still the old one to allow for failure here */
dds_return_t ret;
if ((ret = validate_reader_qos(qos)) != DDS_RETCODE_OK)
return ret;
if (enabled)
{
struct reader *rd;
@ -420,7 +434,8 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
nn_xqos_mergein_missing (rqos, tp->m_entity.m_qos, ~(uint64_t)0);
nn_xqos_mergein_missing (rqos, &sub->m_entity.m_domain->gv.default_xqos_rd, ~(uint64_t)0);
if ((ret = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) != DDS_RETCODE_OK)
if ((ret = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) < 0 ||
(ret = validate_reader_qos(rqos)) != DDS_RETCODE_OK)
{
dds_delete_qos (rqos);
reader = ret;

View file

@ -43,6 +43,9 @@
#ifdef DDSI_INCLUDE_LIFESPAN
#include "dds/ddsi/ddsi_lifespan.h"
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
#include "dds/ddsi/ddsi_deadline.h"
#endif
#include "dds/ddsi/sysdeps.h"
/* INSTANCE MANAGEMENT
@ -273,6 +276,9 @@ struct rhc_instance {
ddsi_guid_t wr_guid; /* guid of last writer (if wr_iid != 0 then wr_guid is the corresponding guid, else undef) */
nn_wctime_t tstamp; /* source time stamp of last update */
struct ddsrt_circlist_elem nonempty_list; /* links non-empty instances in arbitrary ordering */
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
struct deadline_elem deadline; /* element in deadline missed administration */
#endif
struct ddsi_tkmap_instance *tk; /* backref into TK for unref'ing */
struct rhc_sample a_sample; /* pre-allocated storage for 1 sample */
};
@ -325,6 +331,9 @@ struct dds_rhc_default {
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_adm lifespan; /* Lifespan administration */
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
struct deadline_adm deadline; /* Deadline missed administration */
#endif
};
struct trigger_info_cmn {
@ -598,6 +607,36 @@ nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow)
}
#endif /* DDSI_INCLUDE_LIFESPAN */
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
nn_mtime_t dds_rhc_default_deadline_missed_cb(void *hc, nn_mtime_t tnow)
{
struct dds_rhc_default *rhc = hc;
void *vinst;
nn_mtime_t tnext;
ddsrt_mutex_lock (&rhc->lock);
while ((tnext = deadline_next_missed_locked (&rhc->deadline, tnow, &vinst)).v == 0)
{
struct rhc_instance *inst = vinst;
deadline_reregister_instance_locked (&rhc->deadline, &inst->deadline, tnow);
inst->wr_iid_islive = 0;
status_cb_data_t cb_data;
cb_data.raw_status_id = (int) DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID;
cb_data.extra = 0;
cb_data.handle = inst->iid;
cb_data.add = true;
ddsrt_mutex_unlock (&rhc->lock);
dds_reader_status_cb (&rhc->reader->m_entity, &cb_data);
ddsrt_mutex_lock (&rhc->lock);
tnow = now_mt ();
}
ddsrt_mutex_unlock (&rhc->lock);
return tnext;
}
#endif /* DDSI_INCLUDE_DEADLINE_MISSED */
struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks)
{
struct dds_rhc_default *rhc = ddsrt_malloc (sizeof (*rhc));
@ -618,6 +657,11 @@ struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_global
lifespan_init (gv, &rhc->lifespan, offsetof(struct dds_rhc_default, lifespan), offsetof(struct rhc_sample, lifespan), dds_rhc_default_sample_expired_cb);
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
rhc->deadline.dur = (reader != NULL) ? reader->m_entity.m_qos->deadline.deadline : DDS_INFINITY;
deadline_init (gv, &rhc->deadline, offsetof(struct dds_rhc_default, deadline), offsetof(struct rhc_instance, deadline), dds_rhc_default_deadline_missed_cb);
#endif
return &rhc->common;
}
@ -638,6 +682,8 @@ static void dds_rhc_default_set_qos (struct dds_rhc_default * rhc, const dds_qos
rhc->reliable = (qos->reliability.kind == DDS_RELIABILITY_RELIABLE);
assert(qos->history.kind != DDS_HISTORY_KEEP_LAST || qos->history.depth > 0);
rhc->history_depth = (qos->history.kind == DDS_HISTORY_KEEP_LAST) ? (uint32_t)qos->history.depth : ~0u;
/* FIXME: updating deadline duration not yet supported
rhc->deadline.dur = qos->deadline.deadline; */
}
static bool eval_predicate_sample (const struct dds_rhc_default *rhc, const struct ddsi_serdata *sample, bool (*pred) (const void *sample))
@ -735,6 +781,10 @@ static void free_empty_instance (struct rhc_instance *inst, struct dds_rhc_defau
{
assert (inst_is_empty (inst));
ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk);
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (!inst->isdisposed)
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif
ddsrt_free (inst);
}
@ -789,9 +839,15 @@ static void dds_rhc_default_free (struct dds_rhc_default *rhc)
#ifdef DDSI_INCLUDE_LIFESPAN
dds_rhc_default_sample_expired_cb (rhc, NN_MTIME_NEVER);
lifespan_fini (&rhc->lifespan);
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_stop (&rhc->deadline);
#endif
ddsrt_hh_enum (rhc->instances, free_instance_rhc_free_wrap, rhc);
assert (ddsrt_circlist_isempty (&rhc->nonempty_instances));
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_fini (&rhc->deadline);
#endif
ddsrt_hh_free (rhc->instances);
lwregs_fini (&rhc->registrations);
if (rhc->qcond_eval_samplebuf != NULL)
@ -863,7 +919,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
{
/* replace oldest sample; latest points to the latest one, the
list is circular from old -> new, so latest->next is the oldest */
inst_clear_invsample_if_exists (rhc, inst, trig_qc);
assert (inst->latest != NULL);
s = inst->latest->next;
@ -885,7 +940,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
else
{
/* Check if resource max_samples QoS exceeded */
if (rhc->reader && rhc->max_samples != DDS_LENGTH_UNLIMITED && rhc->n_vsamples >= (uint32_t) rhc->max_samples)
{
cb_data->raw_status_id = (int) DDS_SAMPLE_REJECTED_STATUS_ID;
@ -896,7 +950,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
}
/* Check if resource max_samples_per_instance QoS exceeded */
if (rhc->reader && rhc->max_samples_per_instance != DDS_LENGTH_UNLIMITED && inst->nvsamples >= (uint32_t) rhc->max_samples_per_instance)
{
cb_data->raw_status_id = (int) DDS_SAMPLE_REJECTED_STATUS_ID;
@ -907,7 +960,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
}
/* add new latest sample */
s = alloc_sample (inst);
inst_clear_invsample_if_exists (rhc, inst, trig_qc);
if (inst->latest == NULL)
@ -933,6 +985,11 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
s->lifespan.t_expire = wrinfo->lifespan_exp;
lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan);
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
/* Only renew the deadline missed counter in case the sample is actually stored in the rhc */
if (!inst->isdisposed)
deadline_renew_instance_locked (&rhc->deadline, &inst->deadline);
#endif
s->conds = 0;
if (rhc->nqconds != 0)
@ -1294,7 +1351,7 @@ static bool dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance
return notify_data_available;
}
static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
struct rhc_instance *inst;
@ -1325,6 +1382,11 @@ static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rh
}
}
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (!inst->isdisposed)
deadline_register_instance_locked (&rhc->deadline, &inst->deadline, now_mt ());
#endif
return inst;
}
@ -1485,8 +1547,6 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
cb_data.handle = 0;
cb_data.add = true;
goto error_or_nochange;
/* FIXME: deadline (and other) QoS? */
}
else
{
@ -1536,11 +1596,19 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
TRACE (" disposed->notdisposed");
inst->isdisposed = 0;
inst->disposed_gen++;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (!is_dispose)
deadline_register_instance_locked (&rhc->deadline, &inst->deadline, now_mt ());
#endif
}
if (is_dispose)
{
inst->isdisposed = 1;
inst_became_disposed = !old_isdisposed;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (inst_became_disposed)
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif
TRACE (" dispose(%d)", inst_became_disposed);
}
@ -1556,9 +1624,24 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
/* FIXME: fix the bad rejection handling, probably put back in a proper rollback, until then a band-aid like this will have to do: */
inst->isnew = old_isnew;
inst->isdisposed = old_isdisposed;
if (old_isdisposed)
{
inst->disposed_gen--;
if (!inst->isdisposed)
{
inst->isdisposed = 1;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif
}
}
else if (inst->isdisposed)
{
inst->isdisposed = 0;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_register_instance_locked (&rhc->deadline, &inst->deadline, now_mt ());
#endif
}
goto error_or_nochange;
}
notify_data_available = true;
@ -1695,6 +1778,9 @@ static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict r
if (auto_dispose && !inst->isdisposed)
{
inst->isdisposed = 1;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif
/* Set invalid sample for disposing it (unregister may also set it for unregistering) */
if (inst->latest)

View file

@ -22,6 +22,9 @@
#ifdef DDSI_INCLUDE_LIFESPAN
#include "dds/ddsi/ddsi_lifespan.h"
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
#include "dds/ddsi/ddsi_deadline.h"
#endif
#include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/ddsi_tkmap.h"
@ -29,7 +32,10 @@
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_freelist.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_entity.h"
#include "dds__whc.h"
#include "dds__entity.h"
#include "dds__writer.h"
#define USE_EHH 0
@ -66,6 +72,9 @@ struct whc_idxnode {
seqno_t prune_seq;
struct ddsi_tkmap_instance *tk;
uint32_t headidx;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
struct deadline_elem deadline; /* list element for deadline missed */
#endif
struct whc_node *hist[];
};
@ -76,6 +85,15 @@ struct whc_seq_entry {
};
#endif
struct whc_writer_info {
dds_writer * writer; /* can be NULL, eg in case of whc for built-in writers */
unsigned is_transient_local: 1;
unsigned has_deadline: 1;
uint32_t hdepth; /* 0 = unlimited */
uint32_t tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */
uint32_t idxdepth; /* = max (hdepth, tldepth) */
};
struct whc_impl {
struct whc common;
ddsrt_mutex_t lock;
@ -84,13 +102,10 @@ struct whc_impl {
size_t sample_overhead;
uint32_t fragment_size;
uint64_t total_bytes; /* total number of bytes pushed in */
unsigned is_transient_local: 1;
unsigned xchecks: 1;
struct q_globals *gv;
struct ddsi_tkmap *tkmap;
uint32_t hdepth; /* 0 = unlimited */
uint32_t tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */
uint32_t idxdepth; /* = max (hdepth, tldepth) */
struct whc_writer_info wrinfo;
seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */
struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */
struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */
@ -104,6 +119,9 @@ struct whc_impl {
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_adm lifespan; /* Lifespan administration */
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
struct deadline_adm deadline; /* Deadline missed administration */
#endif
};
struct whc_sample_iter_impl {
@ -373,45 +391,91 @@ static nn_mtime_t whc_sample_expired_cb(void *hc, nn_mtime_t tnow)
}
#endif
struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth)
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
static nn_mtime_t whc_deadline_missed_cb(void *hc, nn_mtime_t tnow)
{
struct whc_impl *whc = hc;
void *vidxnode;
nn_mtime_t tnext;
ddsrt_mutex_lock (&whc->lock);
while ((tnext = deadline_next_missed_locked (&whc->deadline, tnow, &vidxnode)).v == 0)
{
struct whc_idxnode *idxnode = vidxnode;
deadline_reregister_instance_locked (&whc->deadline, &idxnode->deadline, tnow);
status_cb_data_t cb_data;
cb_data.raw_status_id = (int) DDS_OFFERED_DEADLINE_MISSED_STATUS_ID;
cb_data.extra = 0;
cb_data.handle = 0;
cb_data.add = true;
ddsrt_mutex_unlock (&whc->lock);
dds_writer_status_cb (&whc->wrinfo.writer->m_entity, &cb_data);
ddsrt_mutex_lock (&whc->lock);
tnow = now_mt ();
}
ddsrt_mutex_unlock (&whc->lock);
return tnext;
}
#endif
struct whc_writer_info *whc_make_wrinfo (struct dds_writer *wr, const dds_qos_t *qos)
{
struct whc_writer_info *wrinfo = ddsrt_malloc (sizeof (*wrinfo));
wrinfo->writer = wr;
wrinfo->is_transient_local = (qos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL);
wrinfo->has_deadline = (qos->deadline.deadline != DDS_INFINITY);
wrinfo->hdepth = (qos->history.kind == DDS_HISTORY_KEEP_ALL) ? 0 : (unsigned) qos->history.depth;
if (!wrinfo->is_transient_local)
wrinfo->tldepth = 0;
else
wrinfo->tldepth = (qos->durability_service.history.kind == DDS_HISTORY_KEEP_ALL) ? 0 : (unsigned) qos->durability_service.history.depth;
wrinfo->idxdepth = wrinfo->hdepth > wrinfo->tldepth ? wrinfo->hdepth : wrinfo->tldepth;
return wrinfo;
}
void whc_free_wrinfo (struct whc_writer_info *wrinfo)
{
ddsrt_free (wrinfo);
}
struct whc *whc_new (struct q_globals *gv, const struct whc_writer_info *wrinfo)
{
size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */
struct whc_impl *whc;
struct whc_intvnode *intv;
assert ((hdepth == 0 || tldepth <= hdepth) || is_transient_local);
assert ((wrinfo->hdepth == 0 || wrinfo->tldepth <= wrinfo->hdepth) || wrinfo->is_transient_local);
whc = ddsrt_malloc (sizeof (*whc));
whc->common.ops = &whc_ops;
ddsrt_mutex_init (&whc->lock);
whc->is_transient_local = is_transient_local ? 1 : 0;
whc->xchecks = (gv->config.enabled_xchecks & DDS_XCHECK_WHC) != 0;
whc->gv = gv;
whc->tkmap = gv->m_tkmap;
whc->hdepth = hdepth;
whc->tldepth = tldepth;
whc->idxdepth = hdepth > tldepth ? hdepth : tldepth;
memcpy (&whc->wrinfo, wrinfo, sizeof (*wrinfo));
whc->seq_size = 0;
whc->max_drop_seq = 0;
whc->unacked_bytes = 0;
whc->total_bytes = 0;
whc->sample_overhead = sample_overhead;
whc->fragment_size = gv->config.fragment_size;
whc->idx_hash = ddsrt_hh_new (1, whc_idxnode_hash_key, whc_idxnode_eq_key);
#if USE_EHH
whc->seq_hash = ddsrt_ehh_new (sizeof (struct whc_seq_entry), 32, whc_seq_entry_hash, whc_seq_entry_eq);
#else
whc->seq_hash = ddsrt_hh_new (1, whc_node_hash, whc_node_eq);
#endif
if (whc->idxdepth > 0)
whc->idx_hash = ddsrt_hh_new (1, whc_idxnode_hash_key, whc_idxnode_eq_key);
else
whc->idx_hash = NULL;
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_init (gv, &whc->lifespan, offsetof(struct whc_impl, lifespan), offsetof(struct whc_node, lifespan), whc_sample_expired_cb);
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
whc->deadline.dur = (wrinfo->writer != NULL) ? wrinfo->writer->m_entity.m_qos->deadline.deadline : DDS_INFINITY;
deadline_init (gv, &whc->deadline, offsetof(struct whc_impl, deadline), offsetof(struct whc_idxnode, deadline), whc_deadline_missed_cb);
#endif
/* seq interval tree: always has an "open" node */
ddsrt_avl_init (&whc_seq_treedef, &whc->seq);
intv = ddsrt_malloc (sizeof (*intv));
@ -450,14 +514,19 @@ void whc_default_free (struct whc *whc_generic)
lifespan_fini (&whc->lifespan);
#endif
if (whc->idx_hash)
{
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_stop (&whc->deadline);
ddsrt_mutex_lock (&whc->lock);
deadline_clear (&whc->deadline);
ddsrt_mutex_unlock (&whc->lock);
deadline_fini (&whc->deadline);
#endif
struct ddsrt_hh_iter it;
struct whc_idxnode *n;
for (n = ddsrt_hh_iter_first (whc->idx_hash, &it); n != NULL; n = ddsrt_hh_iter_next (&it))
ddsrt_free (n);
struct whc_idxnode *idxn;
for (idxn = ddsrt_hh_iter_first (whc->idx_hash, &it); idxn != NULL; idxn = ddsrt_hh_iter_next (&it))
ddsrt_free (idxn);
ddsrt_hh_free (whc->idx_hash);
}
{
struct whc_node *whcn = whc->maxseq_node;
@ -577,31 +646,19 @@ static seqno_t whc_default_next_seq (const struct whc *whc_generic, seqno_t seq)
return nseq;
}
static void delete_one_sample_from_idx (struct whc_impl *whc, struct whc_node *whcn)
static void delete_one_sample_from_idx (struct whc_node *whcn)
{
struct whc_idxnode * const idxn = whcn->idxnode;
assert (idxn != NULL);
assert (idxn->hist[idxn->headidx] != NULL);
assert (idxn->hist[whcn->idxnode_pos] == whcn);
if (whcn->idxnode_pos != idxn->headidx)
idxn->hist[whcn->idxnode_pos] = NULL;
else
{
#ifndef NDEBUG
for (uint32_t i = 0; i < whc->idxdepth; i++)
assert (i == idxn->headidx || idxn->hist[i] == NULL);
#endif
if (!ddsrt_hh_remove (whc->idx_hash, idxn))
assert (0);
ddsi_tkmap_instance_unref (whc->tkmap, idxn->tk);
ddsrt_free (idxn);
}
whcn->idxnode = NULL;
}
static void free_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_idxnode *idxn)
{
for (uint32_t i = 0; i < whc->idxdepth; i++)
for (uint32_t i = 0; i < whc->wrinfo.idxdepth; i++)
{
if (idxn->hist[i])
{
@ -622,6 +679,9 @@ static void delete_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop
{
if (!ddsrt_hh_remove (whc->idx_hash, idxn))
assert (0);
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_unregister_instance_locked (&whc->deadline, &idxn->deadline);
#endif
free_one_instance_from_idx (whc, max_drop_seq, idxn);
}
@ -631,9 +691,9 @@ static int whcn_in_tlidx (const struct whc_impl *whc, const struct whc_idxnode *
return 0;
else
{
uint32_t d = (idxn->headidx + (pos > idxn->headidx ? whc->idxdepth : 0)) - pos;
assert (d < whc->idxdepth);
return d < whc->tldepth;
uint32_t d = (idxn->headidx + (pos > idxn->headidx ? whc->wrinfo.idxdepth : 0)) - pos;
assert (d < whc->wrinfo.idxdepth);
return d < whc->wrinfo.tldepth;
}
}
@ -649,7 +709,7 @@ static uint32_t whc_default_downgrade_to_volatile (struct whc *whc_generic, stru
ddsrt_mutex_lock (&whc->lock);
check_whc (whc);
if (whc->idxdepth == 0)
if (whc->wrinfo.idxdepth == 0)
{
/* if not maintaining an index at all, this is nonsense */
get_state_locked (whc, st);
@ -657,19 +717,24 @@ static uint32_t whc_default_downgrade_to_volatile (struct whc *whc_generic, stru
return 0;
}
assert (!whc->is_transient_local);
if (whc->tldepth > 0)
assert (!whc->wrinfo.is_transient_local);
if (whc->wrinfo.tldepth > 0)
{
assert (whc->hdepth == 0 || whc->tldepth <= whc->hdepth);
whc->tldepth = 0;
if (whc->hdepth == 0)
assert (whc->wrinfo.hdepth == 0 || whc->wrinfo.tldepth <= whc->wrinfo.hdepth);
whc->wrinfo.tldepth = 0;
if (whc->wrinfo.hdepth == 0)
{
struct ddsrt_hh_iter it;
struct whc_idxnode *n;
for (n = ddsrt_hh_iter_first (whc->idx_hash, &it); n != NULL; n = ddsrt_hh_iter_next (&it))
free_one_instance_from_idx (whc, 0, n);
struct whc_idxnode *idxn;
for (idxn = ddsrt_hh_iter_first (whc->idx_hash, &it); idxn != NULL; idxn = ddsrt_hh_iter_next (&it))
{
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_unregister_instance_locked (&whc->deadline, &idxn->deadline);
#endif
free_one_instance_from_idx (whc, 0, idxn);
}
ddsrt_hh_free (whc->idx_hash);
whc->idxdepth = 0;
whc->wrinfo.idxdepth = 0;
whc->idx_hash = NULL;
}
}
@ -711,7 +776,7 @@ static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_i
/* If it is in the tlidx, take it out. Transient-local data never gets here */
if (whcn->idxnode)
delete_one_sample_from_idx (whc, whcn);
delete_one_sample_from_idx (whcn);
if (whcn->unacked)
{
assert (whc->unacked_bytes >= whcn->size);
@ -927,15 +992,27 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
struct whc_node deferred_list_head, *last_to_free = &deferred_list_head;
uint32_t ndropped = 0;
if (whc->is_transient_local && whc->tldepth == 0)
whcn = find_nextseq_intv (&intv, whc, whc->max_drop_seq);
if (whc->wrinfo.is_transient_local && whc->wrinfo.tldepth == 0)
{
/* KEEP_ALL on transient local, so we can never ever delete anything */
TRACE (" KEEP_ALL transient-local: do nothing\n");
/* KEEP_ALL on transient local, so we can never ever delete anything, but
we have to ack the data in whc */
TRACE (" KEEP_ALL transient-local: ack data\n");
while (whcn && whcn->seq <= max_drop_seq)
{
if (whcn->unacked)
{
assert (whc->unacked_bytes >= whcn->size);
whc->unacked_bytes -= whcn->size;
whcn->unacked = 0;
}
whcn = whcn->next_seq;
}
whc->max_drop_seq = max_drop_seq;
*deferred_free_list = NULL;
return 0;
}
whcn = find_nextseq_intv (&intv, whc, whc->max_drop_seq);
deferred_list_head.next_seq = NULL;
prev_seq = whcn ? whcn->prev_seq : NULL;
while (whcn && whcn->seq <= max_drop_seq)
@ -982,10 +1059,10 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
the T-L history but that are not anymore. Writing new samples will eventually push these
out, but if the difference is large and the update rate low, it may take a long time.
Thus, we had better prune them. */
if (whc->tldepth > 0 && whc->idxdepth > whc->tldepth)
if (whc->wrinfo.tldepth > 0 && whc->wrinfo.idxdepth > whc->wrinfo.tldepth)
{
assert (whc->hdepth == whc->idxdepth);
TRACE (" idxdepth %"PRIu32" > tldepth %"PRIu32" > 0 -- must prune\n", whc->idxdepth, whc->tldepth);
assert (whc->wrinfo.hdepth == whc->wrinfo.idxdepth);
TRACE (" idxdepth %"PRIu32" > tldepth %"PRIu32" > 0 -- must prune\n", whc->wrinfo.idxdepth, whc->wrinfo.tldepth);
/* Do a second pass over the sequence number range we just processed: this time we only
encounter samples that were retained because of the transient-local durability setting
@ -1010,11 +1087,11 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
idxn->prune_seq = max_drop_seq;
idx = idxn->headidx;
cnt = whc->idxdepth - whc->tldepth;
cnt = whc->wrinfo.idxdepth - whc->wrinfo.tldepth;
while (cnt--)
{
struct whc_node *oldn;
if (++idx == whc->idxdepth)
if (++idx == whc->wrinfo.idxdepth)
idx = 0;
if ((oldn = idxn->hist[idx]) != NULL)
{
@ -1061,12 +1138,16 @@ static uint32_t whc_default_remove_acked_messages (struct whc *whc_generic, seqn
get_state_locked (whc, &tmp);
TRACE ("whc_default_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq);
TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
tmp.min_seq, tmp.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
tmp.min_seq, tmp.max_seq, whc->max_drop_seq, whc->wrinfo.hdepth, whc->wrinfo.tldepth);
}
check_whc (whc);
if (whc->idxdepth == 0)
/* In case a deadline is set, a sample may be added to whc temporarily, which could be
stored in acked state. The _noidx variant of removing messages assumes that unacked
data exists in whc. So in case of a deadline, the _full variant is used instead,
even when index depth is 0 */
if (whc->wrinfo.idxdepth == 0 && !whc->wrinfo.has_deadline && !whc->wrinfo.is_transient_local)
cnt = whc_default_remove_acked_messages_noidx (whc, max_drop_seq, deferred_free_list);
else
cnt = whc_default_remove_acked_messages_full (whc, max_drop_seq, deferred_free_list);
@ -1080,8 +1161,6 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma
struct whc_node *newn = NULL;
#ifndef DDSI_INCLUDE_LIFESPAN
/* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info'
that contains both lifespan and deadline info of the writer */
DDSRT_UNUSED_ARG (exp);
#endif
@ -1172,7 +1251,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" exp %"PRId64" plist %p serdata %p:%"PRIx32")\n",
(void *) whc, max_drop_seq, seq, exp.v, (void *) plist, (void *) serdata, serdata->hash);
TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->wrinfo.hdepth, whc->wrinfo.tldepth);
}
assert (max_drop_seq < MAX_SEQ_NUMBER);
@ -1189,7 +1268,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
TRACE (" whcn %p:", (void*)newn);
/* Special case of empty data (such as commit messages) can't go into index, and if we're not maintaining an index, we're done, too */
if (serdata->kind == SDK_EMPTY || whc->idxdepth == 0)
if (serdata->kind == SDK_EMPTY)
{
TRACE (" empty or no hist\n");
ddsrt_mutex_unlock (&whc->lock);
@ -1215,8 +1294,13 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
}
else
{
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_renew_instance_locked (&whc->deadline, &idxn->deadline);
#endif
if (whc->wrinfo.idxdepth > 0)
{
struct whc_node *oldn;
if (++idxn->headidx == whc->idxdepth)
if (++idxn->headidx == whc->wrinfo.idxdepth)
idxn->headidx = 0;
if ((oldn = idxn->hist[idxn->headidx]) != NULL)
{
@ -1227,22 +1311,24 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
newn->idxnode = idxn;
newn->idxnode_pos = idxn->headidx;
if (oldn && (whc->hdepth > 0 || oldn->seq <= max_drop_seq))
if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && whc->wrinfo.tldepth > 0)
{
TRACE (" prune whcn %p", (void *)oldn);
assert (oldn != whc->maxseq_node);
assert (oldn != whc->maxseq_node || whc->wrinfo.has_deadline);
whc_delete_one (whc, oldn);
if (oldn == whc->maxseq_node)
whc->maxseq_node = whc_findmax_procedurally (whc);
}
/* Special case for dropping everything beyond T-L history when the new sample is being
auto-acknowledged (for lack of reliable readers), and the keep-last T-L history is
shallower than the keep-last regular history (normal path handles this via pruning in
whc_default_remove_acked_messages, but that never happens when there are no readers). */
if (seq <= max_drop_seq && whc->tldepth > 0 && whc->idxdepth > whc->tldepth)
if (seq <= max_drop_seq && whc->wrinfo.tldepth > 0 && whc->wrinfo.idxdepth > whc->wrinfo.tldepth)
{
uint32_t pos = idxn->headidx + whc->idxdepth - whc->tldepth;
if (pos >= whc->idxdepth)
pos -= whc->idxdepth;
uint32_t pos = idxn->headidx + whc->wrinfo.idxdepth - whc->wrinfo.tldepth;
if (pos >= whc->wrinfo.idxdepth)
pos -= whc->wrinfo.idxdepth;
if ((oldn = idxn->hist[pos]) != NULL)
{
TRACE (" prune tl whcn %p", (void *)oldn);
@ -1253,26 +1339,33 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
TRACE ("\n");
}
}
}
else
{
TRACE (" newkey");
/* Ignore unregisters, but insert everything else */
if (!(serdata->statusinfo & NN_STATUSINFO_UNREGISTER))
{
idxn = ddsrt_malloc (sizeof (*idxn) + whc->idxdepth * sizeof (idxn->hist[0]));
idxn = ddsrt_malloc (sizeof (*idxn) + whc->wrinfo.idxdepth * sizeof (idxn->hist[0]));
TRACE (" idxn %p", (void *)idxn);
ddsi_tkmap_instance_ref (tk);
idxn->iid = tk->m_iid;
idxn->tk = tk;
idxn->prune_seq = 0;
idxn->headidx = 0;
if (whc->wrinfo.idxdepth > 0)
{
idxn->hist[0] = newn;
for (uint32_t i = 1; i < whc->idxdepth; i++)
for (uint32_t i = 1; i < whc->wrinfo.idxdepth; i++)
idxn->hist[i] = NULL;
newn->idxnode = idxn;
newn->idxnode_pos = 0;
}
if (!ddsrt_hh_add (whc->idx_hash, idxn))
assert (0);
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_register_instance_locked (&whc->deadline, &idxn->deadline, now_mt ());
#endif
}
else
{

View file

@ -50,9 +50,9 @@ static dds_return_t dds_writer_status_validate (uint32_t mask)
then status conditions is not triggered.
*/
static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data)
void dds_writer_status_cb (void *entity, const struct status_cb_data *data)
{
dds_writer * const wr = ventity;
dds_writer * const wr = entity;
/* When data is NULL, it means that the DDSI reader is deleted. */
if (data == NULL)
@ -223,7 +223,12 @@ static dds_return_t validate_writer_qos (const dds_qos_t *wqos)
#ifndef DDSI_INCLUDE_LIFESPAN
if (wqos != NULL && (wqos->present & QP_LIFESPAN) && wqos->lifespan.duration != DDS_INFINITY)
return DDS_RETCODE_BAD_PARAMETER;
#else
#endif
#ifndef DDSI_INCLUDE_DEADLINE_MISSED
if (wqos != NULL && (wqos->present & QP_DEADLINE) && wqos->deadline.deadline != DDS_INFINITY)
return DDS_RETCODE_BAD_PARAMETER;
#endif
#if defined(DDSI_INCLUDE_LIFESPAN) && defined(DDSI_INCLUDE_DEADLINE_MISSED)
DDSRT_UNUSED_ARG (wqos);
#endif
return DDS_RETCODE_OK;
@ -246,30 +251,6 @@ static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, boo
return DDS_RETCODE_OK;
}
static struct whc *make_whc (struct dds_domain *dom, const dds_qos_t *qos)
{
bool handle_as_transient_local;
uint32_t hdepth, tldepth;
/* Construct WHC -- if aggressive_keep_last1 is set, the WHC will
drop all samples for which a later update is available. This
forces it to maintain a tlidx. */
handle_as_transient_local = (qos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL);
if (qos->history.kind == DDS_HISTORY_KEEP_ALL)
hdepth = 0;
else
hdepth = (unsigned) qos->history.depth;
if (!handle_as_transient_local)
tldepth = 0;
else
{
if (qos->durability_service.history.kind == DDS_HISTORY_KEEP_ALL)
tldepth = 0;
else
tldepth = (unsigned) qos->durability_service.history.depth;
}
return whc_new (&dom->gv, handle_as_transient_local, hdepth, tldepth);
}
const struct dds_entity_deriver dds_entity_deriver_writer = {
.interrupt = dds_writer_interrupt,
.close = dds_writer_close,
@ -288,6 +269,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
dds_participant *pp;
dds_topic *tp;
dds_entity_t publisher;
struct whc_writer_info *wrinfo;
{
dds_entity *p_or_p;
@ -348,7 +330,9 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
wr->m_topic = tp;
dds_entity_add_ref_locked (&tp->m_entity);
wr->m_xp = nn_xpack_new (conn, get_bandwidth_limit (wqos->transport_priority), pub->m_entity.m_domain->gv.config.xpack_send_async);
wr->m_whc = make_whc (pub->m_entity.m_domain, wqos);
wrinfo = whc_make_wrinfo (wr, wqos);
wr->m_whc = whc_new (&pub->m_entity.m_domain->gv, wrinfo);
whc_free_wrinfo (wrinfo);
wr->whc_batch = pub->m_entity.m_domain->gv.config.whc_batch;
thread_state_awake (lookup_thread_state (), &pub->m_entity.m_domain->gv);

View file

@ -51,6 +51,7 @@ set(ddsc_test_sources
"unsupported.c"
"waitset.c"
"waitset_torture.c"
"whc.c"
"write.c"
"write_various_types.c"
"writer.c")
@ -59,6 +60,10 @@ if(ENABLE_LIFESPAN)
list(APPEND ddsc_test_sources "lifespan.c")
endif()
if(ENABLE_DEADLINE_MISSED)
list(APPEND ddsc_test_sources "deadline.c")
endif()
add_cunit_executable(cunit_ddsc ${ddsc_test_sources})
target_include_directories(
cunit_ddsc PRIVATE

View file

@ -22,6 +22,12 @@ module Space {
long long_3;
};
#pragma keylist Type2 long_1
struct Type3 {
long long_1;
long long_2;
long long_3;
};
#pragma keylist Type3
struct simpletypes {
long l;

View file

@ -0,0 +1,484 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "Space.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/environ.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_whc.h"
#include "dds__entity.h"
#define MAX_RUNS 4
#define WRITER_DEADLINE DDS_MSECS(50)
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
static dds_entity_t g_domain = 0;
static dds_entity_t g_participant = 0;
static dds_entity_t g_subscriber = 0;
static dds_entity_t g_publisher = 0;
static dds_entity_t g_topic = 0;
static dds_qos_t *g_qos;
static dds_entity_t g_remote_domain = 0;
static dds_entity_t g_remote_participant = 0;
static dds_entity_t g_remote_subscriber = 0;
static dds_entity_t g_remote_topic = 0;
static char * create_topic_name(const char *prefix, char *name, size_t size)
{
ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid();
(void) snprintf(name, size, "%s_pid%"PRIdPID"_tid%"PRIdTID"", prefix, pid, tid);
return name;
}
static void sync_reader_writer(dds_entity_t participant, dds_entity_t reader, dds_entity_t writer)
{
dds_attach_t triggered;
dds_return_t ret;
dds_entity_t waitset_rd = dds_create_waitset(participant);
CU_ASSERT_FATAL(waitset_rd > 0);
dds_entity_t waitset_wr = dds_create_waitset(g_participant);
CU_ASSERT_FATAL(waitset_wr > 0);
/* Sync reader to writer. */
ret = dds_set_status_mask(reader, DDS_SUBSCRIPTION_MATCHED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_attach(waitset_rd, reader, reader);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_wait(waitset_rd, &triggered, 1, DDS_SECS(1));
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_EQUAL_FATAL(reader, (dds_entity_t)(intptr_t)triggered);
ret = dds_waitset_detach(waitset_rd, reader);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
dds_delete(waitset_rd);
/* Sync writer to reader. */
ret = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_attach(waitset_wr, writer, writer);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_wait(waitset_wr, &triggered, 1, DDS_SECS(1));
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_EQUAL_FATAL(writer, (dds_entity_t)(intptr_t)triggered);
ret = dds_waitset_detach(waitset_wr, writer);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
dds_delete(waitset_wr);
}
static dds_entity_t create_and_sync_reader(dds_entity_t participant, dds_entity_t subscriber, dds_entity_t topic, dds_qos_t *qos, dds_entity_t writer)
{
dds_entity_t reader = dds_create_reader(subscriber, topic, qos, NULL);
CU_ASSERT_FATAL(reader > 0);
sync_reader_writer (participant, reader, writer);
dds_return_t ret = dds_set_status_mask(reader, DDS_REQUESTED_DEADLINE_MISSED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
return reader;
}
static void deadline_init(void)
{
char name[100];
/* Domains for pub and sub use a different domain id, but the portgain setting
* in configuration is 0, so that both domains will map to the same port number.
* This allows to create two domains in a single test process. */
char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB);
char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB);
g_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub);
g_remote_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub);
dds_free(conf_pub);
dds_free(conf_sub);
g_qos = dds_create_qos();
CU_ASSERT_PTR_NOT_NULL_FATAL(g_qos);
g_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL(g_participant > 0);
g_remote_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL);
CU_ASSERT_FATAL(g_remote_participant > 0);
g_subscriber = dds_create_subscriber(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_subscriber > 0);
g_remote_subscriber = dds_create_subscriber(g_remote_participant, NULL, NULL);
CU_ASSERT_FATAL(g_remote_subscriber > 0);
g_publisher = dds_create_publisher(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_publisher > 0);
create_topic_name("ddsc_qos_deadline_test", name, sizeof name);
g_topic = dds_create_topic(g_participant, &Space_Type1_desc, name, NULL, NULL);
CU_ASSERT_FATAL(g_topic > 0);
g_remote_topic = dds_create_topic(g_remote_participant, &Space_Type1_desc, name, NULL, NULL);
CU_ASSERT_FATAL(g_remote_topic > 0);
dds_qset_history(g_qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
dds_qset_durability(g_qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_reliability(g_qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
}
static void deadline_fini(void)
{
dds_delete_qos(g_qos);
dds_delete(g_subscriber);
dds_delete(g_remote_subscriber);
dds_delete(g_publisher);
dds_delete(g_topic);
dds_delete(g_participant);
dds_delete(g_remote_participant);
dds_delete(g_domain);
dds_delete(g_remote_domain);
}
static void msg(const char *msg, ...)
{
va_list args;
dds_time_t t;
t = dds_time();
printf("%d.%06d ", (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
va_start(args, msg);
vprintf(msg, args);
va_end(args);
printf("\n");
}
static void sleepfor(dds_duration_t sleep_dur)
{
dds_sleepfor (sleep_dur);
msg("after sleeping %"PRId64, sleep_dur);
}
static bool check_missed_deadline_reader(dds_entity_t reader, uint32_t exp_missed_total, int32_t exp_missed_change)
{
struct dds_requested_deadline_missed_status dstatus;
dds_return_t ret = dds_get_requested_deadline_missed_status(reader, &dstatus);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
msg("- check reader total actual %u == expected %u / change actual %d == expected %d", dstatus.total_count, exp_missed_total, dstatus.total_count_change, exp_missed_change);
return dstatus.total_count == exp_missed_total && dstatus.total_count_change == exp_missed_change;
}
static bool check_missed_deadline_writer(dds_entity_t writer, uint32_t exp_missed_total, int32_t exp_missed_change)
{
struct dds_offered_deadline_missed_status dstatus;
dds_return_t ret = dds_get_offered_deadline_missed_status(writer, &dstatus);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
msg("- check writer total actual %u == expected %u / change actual %d == expected %d", dstatus.total_count, exp_missed_total, dstatus.total_count_change, exp_missed_change);
return dstatus.total_count == exp_missed_total && dstatus.total_count_change == exp_missed_change;
}
CU_Test(ddsc_deadline, basic, .init=deadline_init, .fini=deadline_fini)
{
Space_Type1 sample = { 0, 0, 0 };
dds_entity_t reader, reader_remote, reader_dl, reader_dl_remote, writer;
dds_return_t ret;
dds_duration_t deadline_dur = WRITER_DEADLINE;
uint32_t run = 1;
bool test_finished = false;
do
{
msg("deadline test: duration %"PRId64, deadline_dur);
dds_qset_deadline(g_qos, deadline_dur);
writer = dds_create_writer(g_publisher, g_topic, g_qos, NULL);
CU_ASSERT_FATAL(writer > 0);
reader_dl = create_and_sync_reader(g_participant, g_subscriber, g_topic, g_qos, writer);
reader_dl_remote = create_and_sync_reader(g_remote_participant, g_remote_subscriber, g_remote_topic, g_qos, writer);
dds_qset_deadline(g_qos, DDS_INFINITY);
reader = create_and_sync_reader(g_participant, g_subscriber, g_topic, g_qos, writer);
reader_remote = create_and_sync_reader(g_remote_participant, g_remote_subscriber, g_remote_topic, g_qos, writer);
ret = dds_set_status_mask(writer, DDS_OFFERED_DEADLINE_MISSED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
/* Write first sample */
msg("write sample 1");
ret = dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
/* Sleep 0.5 * deadline_dur: expect no missed deadlines for reader and writer */
sleepfor(deadline_dur / 2);
if (!check_missed_deadline_reader(reader, 0, 0) ||
!check_missed_deadline_reader(reader_remote, 0, 0) ||
!check_missed_deadline_reader(reader_dl, 0, 0) ||
!check_missed_deadline_reader(reader_dl_remote, 0, 0) ||
!check_missed_deadline_writer(writer, 0, 0))
deadline_dur *= 10 / (run + 1);
else
{
/* Write another sample */
msg("write sample 2");
ret = dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
/* Sleep 0.5 * deadline_dur: expect no missed deadlines for reader and writer */
sleepfor(deadline_dur / 2);
if (!check_missed_deadline_reader(reader, 0, 0) ||
!check_missed_deadline_reader(reader_remote, 0, 0) ||
!check_missed_deadline_reader(reader_dl, 0, 0) ||
!check_missed_deadline_reader(reader_dl_remote, 0, 0) ||
!check_missed_deadline_writer(writer, 0, 0))
deadline_dur *= 10 / (run + 1);
else
{
/* Sleep deadline_dur: expect deadline reader to have 1 missed deadline */
sleepfor(deadline_dur);
if (!check_missed_deadline_reader(reader, 0, 0) ||
!check_missed_deadline_reader(reader_remote, 0, 0) ||
!check_missed_deadline_reader(reader_dl, 1, 1) ||
!check_missed_deadline_reader(reader_dl_remote, 1, 1) ||
!check_missed_deadline_writer(writer, 1, 1))
deadline_dur *= 10 / (run + 1);
else
{
/* Sleep another 2 * deadline_duration: expect 2 new triggers for missed deadline for both reader and writer */
sleepfor(2 * deadline_dur);
if (!check_missed_deadline_reader(reader, 0, 0) ||
!check_missed_deadline_reader(reader_remote, 0, 0) ||
!check_missed_deadline_reader(reader_dl, 3, 2) ||
!check_missed_deadline_reader(reader_dl_remote, 3, 2) ||
!check_missed_deadline_writer(writer, 3, 2))
deadline_dur *= 10 / (run + 1);
else
test_finished = true;
}
}
}
dds_delete(reader);
dds_delete(writer);
if (!test_finished)
{
if (++run > MAX_RUNS)
{
msg("run limit reached, test failed");
CU_FAIL_FATAL("Run limit reached");
test_finished = true;
}
else
{
msg("restarting test with deadline duration %"PRId64, deadline_dur);
sleepfor(deadline_dur);
}
}
} while (!test_finished);
}
#define V DDS_DURABILITY_VOLATILE
#define TL DDS_DURABILITY_TRANSIENT_LOCAL
#define R DDS_RELIABILITY_RELIABLE
#define BE DDS_RELIABILITY_BEST_EFFORT
#define KA DDS_HISTORY_KEEP_ALL
#define KL DDS_HISTORY_KEEP_LAST
CU_TheoryDataPoints(ddsc_deadline, writer_types) = {
CU_DataPoints(dds_durability_kind_t, V, V, V, V, TL, TL, TL, TL),
CU_DataPoints(dds_reliability_kind_t, BE, BE, R, R, BE, BE, R, R),
CU_DataPoints(dds_history_kind_t, KA, KL, KA, KL, KA, KL, KA, KL)
};
#undef V
#undef TL
#undef R
#undef BE
#undef KA
#undef KL
CU_Theory((dds_durability_kind_t dur_kind, dds_reliability_kind_t rel_kind, dds_history_kind_t hist_kind), ddsc_deadline, writer_types, .init = deadline_init, .fini = deadline_fini)
{
Space_Type1 sample = { 0, 0, 0 };
dds_entity_t reader, writer;
dds_qos_t *qos;
dds_return_t ret;
void * samples[1];
dds_sample_info_t info;
Space_Type1 rd_sample;
samples[0] = &rd_sample;
struct dds_offered_deadline_missed_status dstatus;
uint32_t run = 1;
dds_duration_t deadline_dur = WRITER_DEADLINE;
bool test_finished = false;
do
{
msg("deadline test: duration %"PRId64", writer type %d %d %s", deadline_dur, dur_kind, rel_kind, hist_kind == DDS_HISTORY_KEEP_ALL ? "all" : "1");
qos = dds_create_qos();
CU_ASSERT_PTR_NOT_NULL_FATAL(qos);
dds_qset_durability(qos, dur_kind);
dds_qset_reliability(qos, rel_kind, DDS_INFINITY);
dds_qset_history(qos, hist_kind, (hist_kind == DDS_HISTORY_KEEP_ALL) ? 0 : 1);
dds_qset_deadline(qos, deadline_dur);
writer = dds_create_writer(g_publisher, g_topic, qos, NULL);
CU_ASSERT_FATAL(writer > 0);
reader = create_and_sync_reader(g_participant, g_subscriber, g_topic, qos, writer);
/* Set status mask on writer to get offered deadline missed status */
ret = dds_set_status_mask(writer, DDS_OFFERED_DEADLINE_MISSED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
/* Write sample */
ret = dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
/* Take sample */
ret = dds_take (reader, samples, &info, 1, 1);
CU_ASSERT_EQUAL_FATAL (ret, 1);
/* Sleep 2 * deadline_dur: expect missed deadlines for writer */
sleepfor(2 * deadline_dur);
ret = dds_get_offered_deadline_missed_status(writer, &dstatus);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
msg("- check writer total actual %u > 0 / change actual %d > 0", dstatus.total_count, dstatus.total_count_change);
if (dstatus.total_count == 0 || dstatus.total_count_change == 0)
deadline_dur *= 10 / (run + 1);
else
{
uint32_t prev_cnt = dstatus.total_count;
/* Sleep 3 * deadline_dur: expect more missed deadlines for writer */
sleepfor(3 * deadline_dur);
ret = dds_get_offered_deadline_missed_status(writer, &dstatus);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
msg("- check reader total actual %u > expected %u / change actual %d > 0", dstatus.total_count, prev_cnt, dstatus.total_count_change);
if (dstatus.total_count <= prev_cnt || dstatus.total_count_change == 0)
deadline_dur *= 10 / (run + 1);
else
test_finished = true;
}
dds_delete_qos(qos);
dds_delete(reader);
dds_delete(writer);
if (!test_finished)
{
if (++run > MAX_RUNS)
{
msg("run limit reached, test failed");
CU_FAIL_FATAL("Run limit reached");
test_finished = true;
}
else
{
msg("restarting test with deadline duration %"PRId64, deadline_dur);
sleepfor(deadline_dur);
}
}
} while (!test_finished);
}
CU_TheoryDataPoints(ddsc_deadline, instances) = {
CU_DataPoints(int32_t, 1, 10, 10, 100), /* instance count */
CU_DataPoints(uint8_t, 0, 0, 4, 10), /* unregister every n-th instance */
CU_DataPoints(uint8_t, 0, 0, 5, 20), /* dispose every n-th instance */
};
CU_Theory((int32_t n_inst, uint8_t unreg_nth, uint8_t dispose_nth), ddsc_deadline, instances, .init = deadline_init, .fini = deadline_fini, .timeout = 60)
{
Space_Type1 sample = { 0, 0, 0 };
dds_entity_t reader_dl, writer;
dds_return_t ret;
int32_t n, n_unreg, n_dispose, n_alive, run = 1;
bool test_finished = false;
dds_duration_t deadline_dur = WRITER_DEADLINE;
do
{
msg("deadline test: duration %"PRId64", instance count %d, unreg %dth, dispose %dth", deadline_dur, n_inst, unreg_nth, dispose_nth);
dds_qset_deadline(g_qos, deadline_dur);
CU_ASSERT_PTR_NOT_NULL_FATAL(g_qos);
writer = dds_create_writer(g_publisher, g_topic, g_qos, NULL);
CU_ASSERT_FATAL(writer > 0);
reader_dl = create_and_sync_reader(g_participant, g_subscriber, g_topic, g_qos, writer);
/* Write first sample for each instance */
n_unreg = n_dispose = 0;
for (n = 1; n <= n_inst; n++)
{
sample.long_1 = n;
ret = dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
if (unreg_nth && n % unreg_nth == 0)
{
ret = dds_unregister_instance (writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
n_unreg++;
}
else if (dispose_nth && n % dispose_nth == 0)
{
ret = dds_dispose (writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
n_dispose++;
}
}
n_alive = n_inst - n_dispose - n_unreg;
/* Sleep deadline_dur + 50% and check missed deadline count */
sleepfor(3 * deadline_dur / 2);
if (!check_missed_deadline_reader(reader_dl, (uint32_t)n_alive, n_alive))
deadline_dur *= 10 / (run + 1);
else
{
/* Sleep another deadline_dur: expect new trigger for missed deadline for all non-disposed instances */
sleepfor(deadline_dur);
if (!check_missed_deadline_reader(reader_dl, 2 * (uint32_t)n_alive, n_alive))
deadline_dur *= 10 / (run + 1);
else
{
/* Write second sample for all (including disposed) instances */
for (n = 1; n <= n_inst; n++)
{
sample.long_1 = n;
ret = dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
}
/* Sleep deadline_dur + 25%: expect new trigger for missed deadline for non-disposed instances */
sleepfor(5 * deadline_dur / 4);
if (!check_missed_deadline_reader(reader_dl, 2 * (uint32_t)n_alive + (uint32_t)n_inst, n_inst))
deadline_dur *= 10 / (run + 1);
else
test_finished = true;
}
}
dds_delete(reader_dl);
dds_delete(writer);
if (!test_finished)
{
if (++run > MAX_RUNS)
{
msg("run limit reached, test failed");
CU_FAIL_FATAL("Run limit reached");
test_finished = true;
}
else
{
msg("restarting test with deadline duration %"PRId64, deadline_dur);
sleepfor(deadline_dur);
}
}
} while (!test_finished);
}

279
src/core/ddsc/tests/whc.c Normal file
View file

@ -0,0 +1,279 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "Space.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/environ.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_whc.h"
#include "dds__entity.h"
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
#define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Tracing><OutputFile>cyclonedds_whc_test.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.log</OutputFile><Verbosity>finest</Verbosity></Tracing><Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
#define SAMPLE_COUNT 5
#define DEADLINE_DURATION DDS_MSECS(1)
static uint32_t g_topic_nr = 0;
static dds_entity_t g_domain = 0;
static dds_entity_t g_participant = 0;
static dds_entity_t g_subscriber = 0;
static dds_entity_t g_publisher = 0;
static dds_qos_t *g_qos;
static dds_entity_t g_remote_domain = 0;
static dds_entity_t g_remote_participant = 0;
static dds_entity_t g_remote_subscriber = 0;
static char *create_topic_name (const char *prefix, uint32_t nr, char *name, size_t size)
{
/* Get unique g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid ();
ddsrt_tid_t tid = ddsrt_gettid ();
(void) snprintf (name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid);
return name;
}
static void whc_init(void)
{
/* Domains for pub and sub use a different domain id, but the portgain setting
* in configuration is 0, so that both domains will map to the same port number.
* This allows to create two domains in a single test process. */
char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB);
char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB);
g_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub);
g_remote_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub);
dds_free(conf_pub);
dds_free(conf_sub);
g_qos = dds_create_qos();
CU_ASSERT_PTR_NOT_NULL_FATAL(g_qos);
g_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL(g_participant > 0);
g_remote_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL);
CU_ASSERT_FATAL(g_remote_participant > 0);
g_subscriber = dds_create_subscriber(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_subscriber > 0);
g_remote_subscriber = dds_create_subscriber(g_remote_participant, NULL, NULL);
CU_ASSERT_FATAL(g_remote_subscriber > 0);
g_publisher = dds_create_publisher(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_publisher > 0);
}
static void whc_fini (void)
{
dds_delete_qos(g_qos);
dds_delete(g_subscriber);
dds_delete(g_remote_subscriber);
dds_delete(g_publisher);
dds_delete(g_participant);
dds_delete(g_remote_participant);
dds_delete(g_domain);
dds_delete(g_remote_domain);
}
static dds_entity_t create_and_sync_reader(dds_entity_t subscriber, dds_entity_t topic, dds_qos_t *qos, dds_entity_t writer)
{
dds_return_t ret;
dds_entity_t reader = dds_create_reader(subscriber, topic, qos, NULL);
CU_ASSERT_FATAL(reader > 0);
while (1)
{
dds_publication_matched_status_t st;
ret = dds_get_publication_matched_status (writer, &st);
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
if (st.current_count_change == 1)
break;
dds_sleepfor (DDS_MSECS (1));
}
return reader;
}
static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
{
struct dds_entity *wr_entity;
struct writer *wr;
struct whc_state whcst;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0);
thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv);
wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid);
CU_ASSERT_FATAL(wr != NULL);
assert(wr != NULL); /* for Clang's static analyzer */
whc_get_state(wr->whc, &whcst);
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(wr_entity);
printf(" -- final state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max);
CU_ASSERT_EQUAL_FATAL (whcst.unacked_bytes, 0);
CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min);
CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max);
}
#define V DDS_DURABILITY_VOLATILE
#define TL DDS_DURABILITY_TRANSIENT_LOCAL
#define R DDS_RELIABILITY_RELIABLE
#define BE DDS_RELIABILITY_BEST_EFFORT
#define KA DDS_HISTORY_KEEP_ALL
#define KL DDS_HISTORY_KEEP_LAST
static void test_whc_end_state(dds_durability_kind_t d, dds_reliability_kind_t r, dds_history_kind_t h, int32_t hd, dds_history_kind_t dh,
int32_t dhd, bool lrd, bool rrd, int32_t ni, bool k, bool dl)
{
char name[100];
Space_Type1 sample = { 0, 0, 0 };
Space_Type3 sample_keyless = { 0, 0, 0 };
dds_entity_t reader, reader_remote, writer;
dds_entity_t topic;
dds_entity_t remote_topic;
dds_return_t ret;
int32_t s, i;
printf ("test_whc_end_state: %s, %s, %s(%d), durability %s(%d), readers: %u local, %u remote, instances: %u, key %u, deadline %"PRId64"\n",
d == V ? "volatile" : "TL",
r == BE ? "best-effort" : "reliable",
h == KA ? "keep-all" : "keep-last", h == KA ? 0 : hd,
dh == KA ? "keep-all" : "keep-last", dh == KA ? 0 : dhd,
lrd, rrd, ni, k,
dl ? DEADLINE_DURATION : INT64_C(-1));
dds_qset_durability (g_qos, d);
dds_qset_reliability (g_qos, r, DDS_INFINITY);
dds_qset_history (g_qos, h, h == KA ? 0 : hd);
dds_qset_deadline (g_qos, dl ? DEADLINE_DURATION : DDS_INFINITY);
dds_qset_durability_service (g_qos, 0, dh, dh == KA ? 0 : dhd, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
create_topic_name ("ddsc_whc_end_state_test", g_topic_nr++, name, sizeof name);
topic = dds_create_topic (g_participant, k ? &Space_Type1_desc : &Space_Type3_desc, name, NULL, NULL);
CU_ASSERT_FATAL(topic > 0);
remote_topic = dds_create_topic (g_remote_participant, k ? &Space_Type1_desc : &Space_Type3_desc, name, NULL, NULL);
CU_ASSERT_FATAL(remote_topic > 0);
writer = dds_create_writer (g_publisher, topic, g_qos, NULL);
CU_ASSERT_FATAL(writer > 0);
ret = dds_set_status_mask(writer, DDS_PUBLICATION_MATCHED_STATUS);
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK)
reader = lrd ? create_and_sync_reader (g_subscriber, topic, g_qos, writer) : 0;
reader_remote = rrd ? create_and_sync_reader (g_remote_subscriber, remote_topic, g_qos, writer) : 0;
for (s = 0; s < SAMPLE_COUNT; s++)
{
if (k)
for (i = 0; i < ni; i++)
{
sample.long_1 = (int32_t)i;
ret = dds_write (writer, &sample);
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
}
else
{
ret = dds_write (writer, &sample_keyless);
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
}
}
/* delete readers, wait until no matching reader */
if (rrd)
{
ret = dds_delete (reader_remote);
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
}
if (lrd)
{
ret = dds_delete (reader);
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
}
while (1)
{
dds_publication_matched_status_t st;
ret = dds_get_publication_matched_status (writer, &st);
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
if (st.current_count == 0)
break;
dds_sleepfor (DDS_MSECS (1));
}
/* check whc state */
int32_t exp_max = (d == TL) ? ni * SAMPLE_COUNT : -1;
int32_t exp_min = (d == TL) ? ((dh == KA) ? 1 : exp_max - dhd * ni + 1) : -1;
check_whc_state (writer, exp_min, exp_max);
dds_delete (writer);
dds_delete (remote_topic);
dds_delete (topic);
}
#define ARRAY_LEN(A) ((int32_t)(sizeof(A) / sizeof(A[0])))
CU_Test(ddsc_whc, check_end_state, .init=whc_init, .fini=whc_fini, .timeout=30)
{
dds_durability_kind_t dur[] = {V, TL};
dds_reliability_kind_t rel[] = {BE, R};
dds_history_kind_t hist[] = {KA, KL};
dds_history_kind_t dhist[] = {KA, KL};
int32_t hist_depth[] = {1, 3};
int32_t dhist_depth[] = {1, 3};
bool loc_rd[] = {false, true};
bool rem_rd[] = {false, true};
int32_t n_inst[] = {1, 3};
bool keyed[] = {false, true};
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
bool deadline[] = {false, true};
#else
bool deadline[] = {false};
#endif
int32_t i_d, i_r, i_h, i_hd, i_dh, i_dhd, i_lrd, i_rrd, i_ni, i_k, i_dl;
for (i_d = 0; i_d < ARRAY_LEN(dur); i_d++)
for (i_r = 0; i_r < ARRAY_LEN(rel); i_r++)
for (i_h = 0; i_h < ARRAY_LEN(hist); i_h++)
for (i_hd = 0; i_hd < ARRAY_LEN(hist_depth); i_hd++)
for (i_dh = 0; i_dh < ARRAY_LEN(dhist); i_dh++)
for (i_dhd = 0; i_dhd < ARRAY_LEN(dhist_depth); i_dhd++)
for (i_lrd = 0; i_lrd < ARRAY_LEN(loc_rd); i_lrd++)
for (i_rrd = 0; i_rrd < ARRAY_LEN(rem_rd); i_rrd++)
for (i_ni = 0; i_ni < ARRAY_LEN(n_inst); i_ni++)
for (i_k = 0; i_k < ARRAY_LEN(keyed); i_k++)
for (i_dl = 0; i_dl < ARRAY_LEN(deadline); i_dl++)
{
if (rel[i_r] == BE && dur[i_d] == TL)
continue;
else if (hist[i_h] == KA && i_hd > 0)
continue;
else if (dhist[i_dh] == KA && i_dhd > 0)
continue;
else
{
test_whc_end_state (dur[i_d], rel[i_r], hist[i_h], hist_depth[i_hd], dhist[i_dh], dhist_depth[i_dhd],
loc_rd[i_lrd], rem_rd[i_rrd], keyed[i_k] ? n_inst[i_ni] : 1, keyed[i_k], deadline[i_dl]);
}
}
}
#undef ARRAY_LEN
#undef V
#undef TL
#undef R
#undef BE
#undef KA
#undef KL

View file

@ -30,6 +30,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_rhc.c
ddsi_pmd.c
ddsi_entity_index.c
ddsi_deadline.c
q_addrset.c
q_bitset_inlines.c
q_bswap.c
@ -63,6 +64,10 @@ if(ENABLE_LIFESPAN)
list(APPEND srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src/ddsi_lifespan.c")
endif()
if(ENABLE_DEADLINE_MISSED)
list(APPEND srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src/ddsi_deadline.c")
endif()
# The includes should reside close to the code. As long as that's not the case,
# pull them in from this CMakeLists.txt.
PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
@ -86,6 +91,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
ddsi_rhc.h
ddsi_guid.h
ddsi_entity_index.h
ddsi_deadline.h
q_addrset.h
q_bitset.h
q_bswap.h
@ -125,6 +131,9 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
if(ENABLE_LIFESPAN)
list(APPEND hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi/ddsi_lifespan.h")
endif()
if(ENABLE_DEADLINE_MISSED)
list(APPEND hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi/ddsi_deadline.h")
endif()
target_sources(ddsc
PRIVATE ${srcs_ddsi} ${hdrs_private_ddsi})

View file

@ -0,0 +1,84 @@
/*
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_DEADLINE_H
#define DDSI_DEADLINE_H
#include "dds/ddsrt/circlist.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_xevent.h"
#if defined (__cplusplus)
extern "C" {
#endif
typedef nn_mtime_t (*deadline_missed_cb_t)(void *hc, nn_mtime_t tnow);
struct deadline_adm {
struct ddsrt_circlist list; /* linked list for deadline missed */
struct xevent *evt; /* xevent that triggers when deadline expires for an instance */
deadline_missed_cb_t deadline_missed_cb; /* callback for deadline missed; this cb can use deadline_next_missed_locked to get next instance that has a missed deadline */
size_t list_offset; /* offset of deadline_adm element in whc or rhc */
size_t elem_offset; /* offset of deadline_elem element in whc or rhc instance */
dds_duration_t dur; /* deadline duration */
};
struct deadline_elem {
struct ddsrt_circlist_elem e;
nn_mtime_t t_deadline;
};
DDS_EXPORT void deadline_init (const struct q_globals *gv, struct deadline_adm *deadline_adm, size_t list_offset, size_t elem_offset, deadline_missed_cb_t deadline_missed_cb);
DDS_EXPORT void deadline_stop (const struct deadline_adm *deadline_adm);
DDS_EXPORT void deadline_clear (struct deadline_adm *deadline_adm);
DDS_EXPORT void deadline_fini (const struct deadline_adm *deadline_adm);
DDS_EXPORT nn_mtime_t deadline_next_missed_locked (struct deadline_adm *deadline_adm, nn_mtime_t tnow, void **instance);
DDS_EXPORT void deadline_register_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tprev, nn_mtime_t tnow);
DDS_EXPORT void deadline_unregister_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem);
DDS_EXPORT void deadline_renew_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem);
inline void deadline_register_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow)
{
if (deadline_adm->dur != T_NEVER)
deadline_register_instance_real (deadline_adm, elem, tnow, tnow);
}
inline void deadline_reregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow)
{
if (deadline_adm->dur != T_NEVER)
deadline_register_instance_real (deadline_adm, elem, elem->t_deadline, tnow);
}
inline void deadline_unregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem)
{
if (deadline_adm->dur != T_NEVER)
{
assert (elem->t_deadline.v != T_NEVER);
deadline_unregister_instance_real (deadline_adm, elem);
}
}
inline void deadline_renew_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem)
{
if (deadline_adm->dur != T_NEVER)
{
assert (elem->t_deadline.v != T_NEVER);
deadline_renew_instance_real (deadline_adm, elem);
}
}
#if defined (__cplusplus)
}
#endif
#endif /* DDSI_DEADLINE_H */

View file

@ -0,0 +1,112 @@
/*
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stddef.h>
#include <stdlib.h>
#include "dds/ddsrt/circlist.h"
#include "dds/ddsi/ddsi_deadline.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_xevent.h"
static void instance_deadline_missed_cb (struct xevent *xev, void *varg, nn_mtime_t tnow)
{
struct deadline_adm * const deadline_adm = varg;
nn_mtime_t next_valid = deadline_adm->deadline_missed_cb((char *)deadline_adm - deadline_adm->list_offset, tnow);
resched_xevent_if_earlier (xev, next_valid);
}
/* Gets the instance from the list in deadline admin that has the earliest missed deadline and
* removes the instance element from the list. If no more instances with missed deadline exist
* in the list, the deadline (nn_mtime_t) for the first instance to 'expire' is returned. If the
* list is empty, NN_MTIME_NEVER is returned */
nn_mtime_t deadline_next_missed_locked (struct deadline_adm *deadline_adm, nn_mtime_t tnow, void **instance)
{
struct deadline_elem *elem = NULL;
if (!ddsrt_circlist_isempty (&deadline_adm->list))
{
struct ddsrt_circlist_elem *list_elem = ddsrt_circlist_oldest (&deadline_adm->list);
elem = DDSRT_FROM_CIRCLIST (struct deadline_elem, e, list_elem);
if (elem->t_deadline.v <= tnow.v)
{
ddsrt_circlist_remove (&deadline_adm->list, &elem->e);
if (instance != NULL)
*instance = (char *)elem - deadline_adm->elem_offset;
return (nn_mtime_t) { 0 };
}
}
if (instance != NULL)
*instance = NULL;
return (elem != NULL) ? elem->t_deadline : NN_MTIME_NEVER;
}
void deadline_init (const struct q_globals *gv, struct deadline_adm *deadline_adm, size_t list_offset, size_t elem_offset, deadline_missed_cb_t deadline_missed_cb)
{
ddsrt_circlist_init (&deadline_adm->list);
deadline_adm->evt = qxev_callback (gv->xevents, NN_MTIME_NEVER, instance_deadline_missed_cb, deadline_adm);
deadline_adm->deadline_missed_cb = deadline_missed_cb;
deadline_adm->list_offset = list_offset;
deadline_adm->elem_offset = elem_offset;
}
void deadline_stop (const struct deadline_adm *deadline_adm)
{
delete_xevent_callback (deadline_adm->evt);
}
void deadline_clear (struct deadline_adm *deadline_adm)
{
while ((deadline_next_missed_locked (deadline_adm, NN_MTIME_NEVER, NULL)).v == 0);
}
void deadline_fini (const struct deadline_adm *deadline_adm)
{
assert (ddsrt_circlist_isempty (&deadline_adm->list));
(void) deadline_adm;
}
extern inline void deadline_register_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow);
extern inline void deadline_reregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tnow);
void deadline_register_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem, nn_mtime_t tprev, nn_mtime_t tnow)
{
ddsrt_circlist_append(&deadline_adm->list, &elem->e);
elem->t_deadline = (tprev.v + deadline_adm->dur >= tnow.v) ? tprev : tnow;
elem->t_deadline.v += deadline_adm->dur;
resched_xevent_if_earlier (deadline_adm->evt, elem->t_deadline);
}
extern inline void deadline_unregister_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem);
void deadline_unregister_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem)
{
/* Updating the scheduled event with the new shortest expiry
* is not required, because the event will be rescheduled when
* this removed element expires. Only remove the element from the
* deadline list */
elem->t_deadline = NN_MTIME_NEVER;
ddsrt_circlist_remove(&deadline_adm->list, &elem->e);
}
extern inline void deadline_renew_instance_locked (struct deadline_adm *deadline_adm, struct deadline_elem *elem);
void deadline_renew_instance_real (struct deadline_adm *deadline_adm, struct deadline_elem *elem)
{
/* move element to end of the list (list->latest) and update deadline
according to current deadline duration in rhc (event with old deadline
will still be triggered, but has no effect on this instance because in
the callback the deadline (which will be the updated value) will be
checked for expiry */
ddsrt_circlist_remove(&deadline_adm->list, &elem->e);
elem->t_deadline = now_mt();
elem->t_deadline.v += deadline_adm->dur;
ddsrt_circlist_append(&deadline_adm->list, &elem->e);
}

View file

@ -486,6 +486,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
{
struct participant *pp;
ddsi_guid_t subguid, group_guid;
struct whc_writer_info *wrinfo;
/* no reserved bits may be set */
assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0);
@ -575,7 +576,9 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->spdp_endpoint_xqos, whc_new(gv, 1, 1, 1), NULL, NULL);
wrinfo = whc_make_wrinfo (NULL, &gv->spdp_endpoint_xqos);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->spdp_endpoint_xqos, whc_new(gv, wrinfo), NULL, NULL);
whc_free_wrinfo (wrinfo);
/* But we need the as_disc address set for SPDP, because we need to
send it to everyone regardless of the existence of readers. */
{
@ -595,14 +598,15 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
entidx_insert_participant_guid (gv->entity_index, pp);
/* SEDP writers: */
wrinfo = whc_make_wrinfo (NULL, &gv->builtin_endpoint_xqos_wr);
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER;
}
@ -610,7 +614,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
{
/* TODO: make this one configurable, we don't want all participants to publish all topics (or even just those that they use themselves) */
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_TOPIC_ANNOUNCER;
}
@ -618,10 +622,12 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, wrinfo), NULL, NULL);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER;
}
whc_free_wrinfo (wrinfo);
/* SPDP, SEDP, PMD readers: */
if (!(flags & RTPS_PF_NO_BUILTIN_READERS))
{
@ -3020,12 +3026,10 @@ static void gc_delete_writer (struct gcreq *gcreq)
/* Do last gasp on SEDP and free writer. */
if (!is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE))
sedp_dispose_unregister_writer (wr);
if (wr->status_cb)
{
(wr->status_cb) (wr->status_cb_entity, NULL);
}
whc_free (wr->whc);
if (wr->status_cb)
(wr->status_cb) (wr->status_cb_entity, NULL);
#ifdef DDSI_INCLUDE_SSM
if (wr->ssm_as)
unref_addrset (wr->ssm_as);

View file

@ -879,7 +879,8 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_
static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
/* returns: < 0 on error, 0 if no need to insert in whc, > 0 if inserted */
int do_insert, insres, res;
int insres, res = 0;
bool wr_deadline = false;
ASSERT_MUTEX_HELD (&wr->e.lock);
@ -900,17 +901,15 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
}
assert (wr->reliable || have_reliable_subs (wr) == 0);
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
/* If deadline missed duration is not infinite, the sample is inserted in
the whc so that the instance is created (or renewed) in the whc and the deadline
missed event is registered. The sample is removed immediately after inserting it
as we don't want to store it. */
wr_deadline = wr->xqos->deadline.deadline != DDS_INFINITY;
#endif
if (wr->reliable && have_reliable_subs (wr))
do_insert = 1;
else if (wr->handle_as_transient_local)
do_insert = 1;
else
do_insert = 0;
if (!do_insert)
res = 0;
else
if ((wr->reliable && have_reliable_subs (wr)) || wr_deadline || wr->handle_as_transient_local)
{
nn_mtime_t exp = NN_MTIME_NEVER;
#ifdef DDSI_INCLUDE_LIFESPAN
@ -921,6 +920,20 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
exp = add_duration_to_mtime(serdata->twrite, wr->xqos->lifespan.duration);
#endif
res = ((insres = whc_insert (wr->whc, writer_max_drop_seq (wr), seq, exp, plist, serdata, tk)) < 0) ? insres : 1;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (!(wr->reliable && have_reliable_subs (wr)) && !wr->handle_as_transient_local)
{
/* Sample was inserted only because writer has deadline, so we'll remove the sample from whc */
struct whc_node *deferred_free_list = NULL;
struct whc_state whcst;
uint32_t n = whc_remove_acked_messages (wr->whc, seq, &whcst, &deferred_free_list);
(void)n;
assert (n <= 1);
assert (whcst.min_seq == -1 && whcst.max_seq == -1);
whc_free_deferred_free_list (wr->whc, deferred_free_list);
}
#endif
}
#ifndef NDEBUG

View file

@ -342,14 +342,20 @@ void delete_xevent_callback (struct xevent *ev)
struct xeventq *evq = ev->evq;
assert (ev->kind == XEVK_CALLBACK);
ddsrt_mutex_lock (&evq->lock);
/* wait until neither scheduled nor executing; loop in case the callback reschedules the event */
while (ev->tsched.v != T_NEVER || ev->u.callback.executing)
{
if (ev->tsched.v != T_NEVER)
{
assert (ev->tsched.v != TSCHED_DELETE);
ddsrt_fibheap_delete (&evq_xevents_fhdef, &evq->xevents, ev);
ev->tsched.v = TSCHED_DELETE;
ev->tsched.v = T_NEVER;
}
while (ev->u.callback.executing)
if (ev->u.callback.executing)
{
ddsrt_cond_wait (&evq->cond, &evq->lock);
}
}
ddsrt_mutex_unlock (&evq->lock);
free_xevent (evq, ev);
}

View file

@ -107,7 +107,7 @@ static struct ddsi_serdata *mkkeysample (int32_t keyval, unsigned statusinfo)
return sd;
}
#ifdef DDSI_INCLUDE_LIFESPAN
#if defined(DDSI_INCLUDE_LIFESPAN) || defined (DDSI_INCLUDE_DEADLINE_MISSED)
static nn_mtime_t rand_texp ()
{
nn_mtime_t ret = now_mt();
@ -116,6 +116,13 @@ static nn_mtime_t rand_texp ()
}
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
static dds_duration_t rand_deadline ()
{
return (dds_duration_t) (ddsrt_prng_random (&prng) % DDS_MSECS(500));
}
#endif
static uint64_t store (struct ddsi_tkmap *tkmap, struct dds_rhc *rhc, struct proxy_writer *wr, struct ddsi_serdata *sd, bool print, bool lifespan_expiry)
{
#ifndef DDSI_INCLUDE_LIFESPAN
@ -569,6 +576,9 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count,
dds_qos_t *qos = dds_create_qos ();
dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, MAX_HIST_DEPTH);
dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP);
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
dds_qset_deadline (qos, rand_deadline());
#endif
/* two identical readers because we need 63 conditions while we can currently only attach 32 a single reader */
dds_entity_t rd[] = { dds_create_reader (pp, tp, qos, NULL), dds_create_reader (pp, tp, qos, NULL) };
const size_t nrd = sizeof (rd) / sizeof (rd[0]);
@ -673,7 +683,8 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count,
[9] = "tkc",
[10] = "tkc1",
[11] = "delwr",
[12] = "drpxp"
[12] = "drpxp",
[13] = "dlmis"
};
static const uint32_t opfreqs[] = {
[0] = 500, /* write */
@ -689,9 +700,14 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count,
[10] = 100, /* take cond, max 1 */
[11] = 1, /* unreg writer */
#ifdef DDSI_INCLUDE_LIFESPAN
[12] = 100 /* drop expired sample */
[12] = 100, /* drop expired sample */
#else
[12] = 0 /* drop expired sample */
[12] = 0, /* drop expired sample */
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
[13] = 100 /* deadline missed */
#else
[13] = 0 /* drop expired sample */
#endif
};
uint32_t opthres[sizeof (opfreqs) / sizeof (opfreqs[0])];
@ -829,6 +845,16 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count,
for (size_t k = 0; k < nrd; k++)
(void) dds_rhc_default_sample_expired_cb (rhc[k], rand_texp());
thread_state_asleep (lookup_thread_state ());
#endif
break;
}
case 13: {
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
thread_state_awake_domain_ok (lookup_thread_state ());
/* We can assume that rhc[k] is a dds_rhc_default at this point */
for (size_t k = 0; k < nrd; k++)
(void) dds_rhc_default_deadline_missed_cb (rhc[k], rand_texp());
thread_state_asleep (lookup_thread_state ());
#endif
break;
}

View file

@ -98,7 +98,11 @@ static void setqos (dds_qos_t *q, size_t i, bool isrd, bool create)
#else
dds_qset_lifespan (q, DDS_INFINITY);
#endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
dds_qset_deadline (q, INT64_C (67890123456789012) + (int32_t) i);
#else
dds_qset_deadline (q, DDS_INFINITY);
#endif
dds_qset_latency_budget (q, INT64_C (45678901234567890) + (int32_t) i);
dds_qset_ownership (q, (dds_ownership_kind_t) ((i + 1) % 2));
dds_qset_ownership_strength (q, 0x12345670 + (int32_t) i);