Merge branch 'master' into security

This commit is contained in:
Marcel Jordense 2019-12-18 18:24:30 +01:00
commit 35ce7788e1
54 changed files with 2115 additions and 1069 deletions

View file

@ -20,9 +20,12 @@ struct dds_rhc;
struct dds_reader;
struct ddsi_sertopic;
struct q_globals;
struct dds_rhc_default;
struct rhc_sample;
DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks);
DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic);
DDS_EXPORT nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow);
#if defined (__cplusplus)
}

View file

@ -19,7 +19,7 @@
extern "C" {
#endif
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct ephash *guid_hash);
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct entity_index *entidx);
#if defined (__cplusplus)
}

View file

@ -257,10 +257,12 @@ void dds__builtin_init (struct dds_domain *dom)
dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription", &dom->gv);
dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication", &dom->gv);
const struct ephash *gh = dom->gv.guid_hash;
thread_state_awake (lookup_thread_state (), &dom->gv);
const struct entity_index *gh = dom->gv.entity_index;
dom->builtintopic_writer_participant = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dom->builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT, gh));
dom->builtintopic_writer_publications = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dom->builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER, gh));
dom->builtintopic_writer_subscriptions = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dom->builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER, gh));
thread_state_asleep (lookup_thread_state ());
dds_delete_qos (qos);
}

View file

@ -16,7 +16,7 @@
#include "dds__guardcond.h"
#include "dds__participant.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_thread.h"

View file

@ -17,6 +17,7 @@
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_bswap.h"
#include "dds__writer.h"
@ -33,7 +34,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
return rc;
else
{
const struct ephash *gh = wr->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = wr->m_entity.m_domain->gv.entity_index;
size_t nrds_act = 0;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer */
@ -44,7 +45,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
m = ddsrt_avl_iter_next (&it))
{
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
{
if (nrds_act < nrds)
rds[nrds_act] = prd->e.iid;
@ -56,7 +57,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
if (nrds_act < nrds)
rds[nrds_act] = rd->e.iid;
@ -83,7 +84,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
return rc;
else
{
const struct ephash *gh = rd->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = rd->m_entity.m_domain->gv.entity_index;
size_t nwrs_act = 0;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer */
@ -94,7 +95,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
m = ddsrt_avl_iter_next (&it))
{
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
{
if (nwrs_act < nwrs)
wrs[nwrs_act] = pwr->e.iid;
@ -106,7 +107,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
m = ddsrt_avl_iter_next (&it))
{
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
{
if (nwrs_act < nwrs)
wrs[nwrs_act] = wr->e.iid;
@ -148,7 +149,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
return NULL;
else
{
const struct ephash *gh = wr->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = wr->m_entity.m_domain->gv.entity_index;
dds_builtintopic_endpoint_t *ret = NULL;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer, and not be so inefficient besides */
@ -159,7 +160,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
m = ddsrt_avl_iter_next (&it))
{
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
{
if (prd->e.iid == ih)
ret = make_builtintopic_endpoint (&prd->e.guid, &prd->c.proxypp->e.guid, prd->c.proxypp->e.iid, prd->c.xqos);
@ -170,7 +171,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
if (rd->e.iid == ih)
ret = make_builtintopic_endpoint (&rd->e.guid, &rd->c.pp->e.guid, rd->c.pp->e.iid, rd->xqos);
@ -191,7 +192,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
return NULL;
else
{
const struct ephash *gh = rd->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = rd->m_entity.m_domain->gv.entity_index;
dds_builtintopic_endpoint_t *ret = NULL;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer, and not be so inefficient besides */
@ -202,7 +203,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
m = ddsrt_avl_iter_next (&it))
{
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
{
if (pwr->e.iid == ih)
ret = make_builtintopic_endpoint (&pwr->e.guid, &pwr->c.proxypp->e.guid, pwr->c.proxypp->e.iid, pwr->c.xqos);
@ -213,7 +214,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
m = ddsrt_avl_iter_next (&it))
{
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
{
if (wr->e.iid == ih)
ret = make_builtintopic_endpoint (&wr->e.guid, &wr->c.pp->e.guid, wr->c.pp->e.iid, wr->xqos);

View file

@ -18,6 +18,7 @@
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/version.h"
#include "dds__init.h"
#include "dds__domain.h"
@ -55,7 +56,7 @@ static dds_return_t dds_participant_qos_set (dds_entity *e, const dds_qos_t *qos
{
struct participant *pp;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((pp = ephash_lookup_participant_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
if ((pp = entidx_lookup_participant_guid (e->m_domain->gv.entity_index, &e->m_guid)) != NULL)
{
nn_plist_t plist;
nn_plist_init_empty (&plist);

View file

@ -16,7 +16,7 @@
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsc/dds_rhc.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_sertopic.h"

View file

@ -15,7 +15,7 @@
#include "dds/ddsc/dds_rhc.h"
#include "dds__entity.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_thread.h"

View file

@ -28,6 +28,7 @@
#include "dds/ddsi/q_globals.h"
#include "dds__builtin.h"
#include "dds/ddsi/ddsi_sertopic.h"
#include "dds/ddsi/ddsi_entity_index.h"
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_reader)
@ -77,7 +78,7 @@ static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, boo
{
struct reader *rd;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((rd = ephash_lookup_reader_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (e->m_domain->gv.entity_index, &e->m_guid)) != NULL)
update_reader_qos (rd, qos);
thread_state_asleep (lookup_thread_state ());
}
@ -512,7 +513,7 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb
pwrguid_next.entityid.u = (pwrguid_next.entityid.u & ~(uint32_t)0xff) | NN_ENTITYID_KIND_WRITER_NO_KEY;
}
ddsrt_mutex_unlock (&rd->e.lock);
if ((pwr = ephash_lookup_proxy_writer_guid (dds_entity->m_domain->gv.guid_hash, &pwrguid)) != NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (dds_entity->m_domain->gv.entity_index, &pwrguid)) != NULL)
{
ddsrt_mutex_lock (&pwr->e.lock);
pwr->ddsi2direct_cb = cb;

View file

@ -39,6 +39,9 @@
#include "dds/ddsi/q_entity.h" /* proxy_writer_info */
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#ifdef DDSI_INCLUDE_LIFESPAN
#include "dds/ddsi/ddsi_lifespan.h"
#endif
#include "dds/ddsi/sysdeps.h"
/* INSTANCE MANAGEMENT
@ -122,9 +125,8 @@
DDSI implementation, but still) will be done by also reseting "wr_iid"
when an exclusive ownership writer lowers its strength.
Lifespan, time base filter and deadline, are based on the instance
timestamp ("tstamp"). This time stamp needs to be changed to either source
or reception timestamp, depending on the ordering chosen.
Lifespan is based on the reception timestamp, and the monotonic time is
used for sample expiry if this QoS is set to something else than infinite.
READ CONDITIONS
===============
@ -244,6 +246,10 @@ struct rhc_sample {
bool isread; /* READ or NOT_READ sample state */
uint32_t disposed_gen; /* snapshot of instance counter at time of insertion */
uint32_t no_writers_gen; /* __/ */
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_fhnode lifespan; /* fibheap node for lifespan */
struct rhc_instance *inst; /* reference to rhc instance */
#endif
};
struct rhc_instance {
@ -289,15 +295,15 @@ struct dds_rhc_default {
int32_t max_samples; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */
int32_t max_samples_per_instance; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */
uint32_t n_instances; /* # instances, including empty [NOT USED] */
uint32_t n_instances; /* # instances, including empty */
uint32_t n_nonempty_instances; /* # non-empty instances */
uint32_t n_not_alive_disposed; /* # disposed, non-empty instances [NOT USED] */
uint32_t n_not_alive_no_writers; /* # not-alive-no-writers, non-empty instances [NOT USED] */
uint32_t n_new; /* # new, non-empty instances [NOT USED] */
uint32_t n_not_alive_disposed; /* # disposed, non-empty instances */
uint32_t n_not_alive_no_writers; /* # not-alive-no-writers, non-empty instances */
uint32_t n_new; /* # new, non-empty instances */
uint32_t n_vsamples; /* # "valid" samples over all instances */
uint32_t n_vread; /* # read "valid" samples over all instances [NOT USED] */
uint32_t n_invsamples; /* # invalid samples over all instances [NOT USED] */
uint32_t n_invread; /* # read invalid samples over all instances [NOT USED] */
uint32_t n_vread; /* # read "valid" samples over all instances */
uint32_t n_invsamples; /* # invalid samples over all instances */
uint32_t n_invread; /* # read invalid samples over all instances */
bool by_source_ordering; /* true if BY_SOURCE, false if BY_RECEPTION */
bool exclusive_ownership; /* true if EXCLUSIVE, false if SHARED */
@ -316,6 +322,9 @@ struct dds_rhc_default {
uint32_t nqconds; /* Number of associated query conditions */
dds_querycond_mask_t qconds_samplest; /* Mask of associated query conditions that check the sample state */
void *qcond_eval_samplebuf; /* Temporary storage for evaluating query conditions, NULL if no qconds */
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_adm lifespan; /* Lifespan administration */
#endif
};
struct trigger_info_cmn {
@ -460,6 +469,11 @@ static void topicless_to_clean_invsample (const struct ddsi_sertopic *topic, con
}
static unsigned qmask_of_inst (const struct rhc_instance *inst);
static void free_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct rhc_sample *s);
static void get_trigger_info_cmn (struct trigger_info_cmn *info, struct rhc_instance *inst);
static void get_trigger_info_pre (struct trigger_info_pre *info, struct rhc_instance *inst);
static void init_trigger_info_qcond (struct trigger_info_qcond *qc);
static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, struct rhc_instance *inst);
static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst, struct dds_entity *triggers[], size_t *ntriggers);
#ifndef NDEBUG
static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_conds, bool check_qcmask);
@ -531,6 +545,83 @@ static void remove_inst_from_nonempty_list (struct dds_rhc_default *rhc, struct
rhc->n_nonempty_instances--;
}
#ifdef DDSI_INCLUDE_LIFESPAN
static void drop_expired_samples (struct dds_rhc_default *rhc, struct rhc_sample *sample)
{
struct rhc_instance *inst = sample->inst;
struct trigger_info_pre pre;
struct trigger_info_post post;
struct trigger_info_qcond trig_qc;
size_t ntriggers = SIZE_MAX;
assert (!inst_is_empty (inst));
TRACE ("rhc_default %p drop_exp(iid %"PRIx64" wriid %"PRIx64" exp %"PRId64" %s",
rhc, inst->iid, sample->wr_iid, sample->lifespan.t_expire.v, sample->isread ? "read" : "notread");
get_trigger_info_pre (&pre, inst);
init_trigger_info_qcond (&trig_qc);
/* Find prev sample: in case of history depth of 1 this is the sample itself,
* (which is inst->latest). In case of larger history depth the most likely sample
* to be expired is the oldest, in which case inst->latest is the previous
* sample and inst->latest->next points to sample (circular list). We can
* assume that 'sample' is in the list, so a check to avoid infinite loop is not
* required here. */
struct rhc_sample *psample = inst->latest;
while (psample->next != sample)
psample = psample->next;
rhc->n_vsamples--;
if (sample->isread)
{
inst->nvread--;
rhc->n_vread--;
trig_qc.dec_sample_read = true;
}
if (--inst->nvsamples > 0)
{
if (inst->latest == sample)
inst->latest = psample;
psample->next = sample->next;
}
else
{
inst->latest = NULL;
}
trig_qc.dec_conds_sample = sample->conds;
free_sample (rhc, inst, sample);
get_trigger_info_cmn (&post.c, inst);
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
if (inst_is_empty (inst))
{
remove_inst_from_nonempty_list (rhc, inst);
if (inst->isdisposed)
rhc->n_not_alive_disposed--;
if (inst->wrcount == 0)
{
TRACE ("; iid %"PRIx64" #0,empty,drop", inst->iid);
if (!inst->isdisposed)
rhc->n_not_alive_no_writers--;
drop_instance_noupdate_no_writers (rhc, inst);
}
}
TRACE (")\n");
}
nn_mtime_t dds_rhc_default_sample_expired_cb(void *hc, nn_mtime_t tnow)
{
struct dds_rhc_default *rhc = hc;
struct rhc_sample *sample;
nn_mtime_t tnext;
ddsrt_mutex_lock (&rhc->lock);
while ((tnext = lifespan_next_expired_locked (&rhc->lifespan, tnow, (void **)&sample)).v == 0)
drop_expired_samples (rhc, sample);
ddsrt_mutex_unlock (&rhc->lock);
return tnext;
}
#endif /* DDSI_INCLUDE_LIFESPAN */
struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks)
{
struct dds_rhc_default *rhc = ddsrt_malloc (sizeof (*rhc));
@ -546,6 +637,10 @@ struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_global
rhc->gv = gv;
rhc->xchecks = xchecks;
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_init (gv, &rhc->lifespan, offsetof(struct dds_rhc_default, lifespan), offsetof(struct rhc_sample, lifespan), dds_rhc_default_sample_expired_cb);
#endif
return &rhc->common;
}
@ -601,9 +696,15 @@ static struct rhc_sample *alloc_sample (struct rhc_instance *inst)
}
}
static void free_sample (struct rhc_instance *inst, struct rhc_sample *s)
static void free_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct rhc_sample *s)
{
#ifndef DDSI_INCLUDE_LIFESPAN
DDSRT_UNUSED_ARG (rhc);
#endif
ddsi_serdata_unref (s->sample);
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&rhc->lifespan, &s->lifespan);
#endif
if (s == &inst->a_sample)
{
assert (!inst->a_sample_free);
@ -665,11 +766,12 @@ static void free_instance_rhc_free (struct rhc_instance *inst, struct dds_rhc_de
struct rhc_sample *s = inst->latest;
const bool was_empty = inst_is_empty (inst);
struct trigger_info_qcond dummy_trig_qc;
if (s)
{
do {
struct rhc_sample * const s1 = s->next;
free_sample (inst, s);
free_sample (rhc, inst, s);
s = s1;
} while (s != inst->latest);
rhc->n_vsamples -= inst->nvsamples;
@ -682,11 +784,10 @@ static void free_instance_rhc_free (struct rhc_instance *inst, struct dds_rhc_de
#endif
inst_clear_invsample_if_exists (rhc, inst, &dummy_trig_qc);
if (!was_empty)
{
remove_inst_from_nonempty_list (rhc, inst);
}
ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk);
ddsrt_free (inst);
if (inst->isnew)
rhc->n_new--;
free_empty_instance(inst, rhc);
}
static uint32_t dds_rhc_default_lock_samples (struct dds_rhc_default *rhc)
@ -708,7 +809,10 @@ static void free_instance_rhc_free_wrap (void *vnode, void *varg)
static void dds_rhc_default_free (struct dds_rhc_default *rhc)
{
assert (rhc_check_counts_locked (rhc, true, true));
#ifdef DDSI_INCLUDE_LIFESPAN
dds_rhc_default_sample_expired_cb (rhc, NN_MTIME_NEVER);
lifespan_fini (&rhc->lifespan);
#endif
ddsrt_hh_enum (rhc->instances, free_instance_rhc_free_wrap, rhc);
assert (rhc->nonempty_instances == NULL);
ddsrt_hh_free (rhc->instances);
@ -789,6 +893,10 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
assert (trig_qc->dec_conds_sample == 0);
ddsi_serdata_unref (s->sample);
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&rhc->lifespan, &s->lifespan);
#endif
trig_qc->dec_sample_read = s->isread;
trig_qc->dec_conds_sample = s->conds;
if (s->isread)
@ -843,6 +951,11 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
s->isread = false;
s->disposed_gen = inst->disposed_gen;
s->no_writers_gen = inst->no_writers_gen;
#ifdef DDSI_INCLUDE_LIFESPAN
s->inst = inst;
s->lifespan.t_expire = wrinfo->lifespan_exp;
lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan);
#endif
s->conds = 0;
if (rhc->nqconds != 0)
@ -939,6 +1052,8 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, stru
assert (inst_is_empty (inst));
rhc->n_instances--;
if (inst->isnew)
rhc->n_new--;
ret = ddsrt_hh_remove (rhc->instances, inst);
assert (ret);
@ -1065,7 +1180,6 @@ static void account_for_empty_to_nonempty_transition (struct dds_rhc_default *rh
{
assert (inst_nsamples (inst) == 1);
add_inst_to_nonempty_list (rhc, inst);
rhc->n_new += inst->isnew;
if (inst->isdisposed)
rhc->n_not_alive_disposed++;
else if (inst->wrcount == 0)
@ -1294,6 +1408,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
assert (ret);
(void) ret;
rhc->n_instances++;
rhc->n_new++;
get_trigger_info_cmn (&post->c, inst);
*out_inst = inst;
@ -1486,16 +1601,10 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
if (inst->latest || inst_became_disposed)
{
if (was_empty)
{
/* general function is slightly slower than a specialised
one, but perhaps it is wiser to use the general one */
account_for_empty_to_nonempty_transition (rhc, inst);
}
else
{
rhc->n_not_alive_disposed += (uint32_t)(inst->isdisposed - old_isdisposed);
rhc->n_new += (uint32_t)(inst->isnew - old_isnew);
}
rhc->n_new += (uint32_t)(inst->isnew - old_isnew);
}
else
{
@ -2032,7 +2141,7 @@ static int dds_rhc_take_w_qminv (struct dds_rhc_default *rhc, bool lock, void **
inst->latest = NULL;
}
free_sample (inst, sample);
free_sample (rhc, inst, sample);
if (++n == max_samples)
{
@ -2179,7 +2288,7 @@ static int dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, stru
else
inst->latest = NULL;
free_sample (inst, sample);
free_sample (rhc, inst, sample);
if (++n == max_samples)
{
@ -2442,7 +2551,11 @@ static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_f
trig_qc->dec_conds_invsample, trig_qc->dec_conds_sample, trig_qc->inc_conds_invsample, trig_qc->inc_conds_sample);
assert (rhc->n_nonempty_instances >= rhc->n_not_alive_disposed + rhc->n_not_alive_no_writers);
#ifndef DDSI_INCLUDE_LIFESPAN
/* If lifespan is disabled, samples cannot expire and therefore
empty instances cannot be in the 'new' state. */
assert (rhc->n_nonempty_instances >= rhc->n_new);
#endif
assert (rhc->n_vsamples >= rhc->n_vread);
iter = rhc->conds;
@ -2687,6 +2800,8 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
bool a_sample_free = true;
n_instances++;
if (inst->isnew)
n_new++;
if (inst_is_empty (inst))
continue;
@ -2695,8 +2810,6 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
n_not_alive_disposed++;
else if (inst->wrcount == 0)
n_not_alive_no_writers++;
if (inst->isnew)
n_new++;
if (inst->latest)
{
@ -2790,9 +2903,7 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
if (check_conds)
{
for (i = 0, rciter = rhc->conds; i < ncheck; i++, rciter = rciter->m_next)
{
assert (cond_match_count[i] == ddsrt_atomic_ld32 (&rciter->m_entity.m_status.m_trigger));
}
}
if (rhc->n_nonempty_instances == 0)

View file

@ -24,6 +24,7 @@
#include "dds__serdata_builtintopic.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/ddsi_entity_index.h"
static const uint64_t unihashconsts[] = {
UINT64_C (16292676669999574021),
@ -131,7 +132,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
/* keyhash must in host format (which the GUIDs always are internally) */
struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const ddsi_guid_t *) keyhash->value);
struct entity_common *entity = entidx_lookup_guid_untyped (tp->gv->entity_index, (const ddsi_guid_t *) keyhash->value);
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
memcpy (&d->key, keyhash->value, sizeof (d->key));
if (entity)

View file

@ -1288,9 +1288,12 @@ static void dds_stream_extract_key_from_key_prim_op (dds_istream_t * __restrict
#if DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN
static void dds_stream_swap_copy (void * __restrict vdst, const void * __restrict vsrc, uint32_t size, uint32_t num)
{
assert (size == 2 || size == 4 || size == 8);
assert (size == 1 || size == 2 || size == 4 || size == 8);
switch (size)
{
case 1:
memcpy (vdst, vsrc, num);
break;
case 2: {
const uint16_t *src = vsrc;
uint16_t *dst = vdst;
@ -1342,7 +1345,7 @@ static void dds_stream_extract_keyBE_from_key_prim_op (dds_istream_t * __restric
void const * const src = is->m_buffer + is->m_index;
void * const dst = os->x.m_buffer + os->x.m_index;
#if DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN
dds_stream_swap_copy (dst, src, num, align);
dds_stream_swap_copy (dst, src, align, num);
#else
memcpy (dst, src, num * align);
#endif
@ -1722,7 +1725,7 @@ static bool prtf_simple_array (char * __restrict *buf, size_t * __restrict bufsi
abort ();
break;
}
return cont;
return prtf (buf, bufsize, "}");
}
static bool dds_stream_print_sample1 (char * __restrict *buf, size_t * __restrict bufsize, dds_istream_t * __restrict is, const uint32_t * __restrict ops, bool add_braces);

View file

@ -25,6 +25,7 @@
#include "dds__get_status.h"
#include "dds__qos.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/ddsi_sertopic.h"
#include "dds/ddsi/q_ddsi_discovery.h"
@ -437,7 +438,7 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
/* Publish Topic */
thread_state_awake (lookup_thread_state (), &par->m_entity.m_domain->gv);
ddsi_pp = ephash_lookup_participant_guid (par->m_entity.m_domain->gv.guid_hash, &par->m_entity.m_guid);
ddsi_pp = entidx_lookup_participant_guid (par->m_entity.m_domain->gv.entity_index, &par->m_entity.m_guid);
assert (ddsi_pp);
if (sedp_plist)
{

View file

@ -12,25 +12,28 @@
#include <assert.h>
#include <stddef.h>
#include <string.h>
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/sync.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/hopscotch.h"
#include "dds/ddsi/ddsi_serdata.h"
#ifdef DDSI_INCLUDE_LIFESPAN
#include "dds/ddsi/ddsi_lifespan.h"
#endif
#include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_config.h"
#include "dds__whc.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/hopscotch.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_freelist.h"
#include "dds/ddsi/q_globals.h"
#include "dds__whc.h"
#define USE_EHH 0
struct whc_node {
struct whc_node *next_seq; /* next in this interval */
struct whc_node *prev_seq; /* prev in this interval */
@ -44,6 +47,9 @@ struct whc_node {
unsigned borrowed: 1; /* at most one can borrow it at any time */
nn_mtime_t last_rexmit_ts;
uint32_t rexmit_count;
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_fhnode lifespan; /* fibheap node for lifespan */
#endif
struct ddsi_serdata *serdata;
};
@ -95,6 +101,9 @@ struct whc_impl {
#endif
struct ddsrt_hh *idx_hash;
ddsrt_avl_tree_t seq;
#ifdef DDSI_INCLUDE_LIFESPAN
struct lifespan_adm lifespan; /* Lifespan administration */
#endif
};
struct whc_sample_iter_impl {
@ -128,7 +137,7 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
static uint32_t whc_default_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list);
static void whc_default_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list);
static void whc_default_get_state (const struct whc *whc, struct whc_state *st);
static int whc_default_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
static int whc_default_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
static seqno_t whc_default_next_seq (const struct whc *whc, seqno_t seq);
static bool whc_default_borrow_sample (const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample);
static bool whc_default_borrow_sample_key (const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample);
@ -349,6 +358,21 @@ static struct whc_node *whc_findkey (const struct whc_impl *whc, const struct dd
}
}
#ifdef DDSI_INCLUDE_LIFESPAN
static nn_mtime_t whc_sample_expired_cb(void *hc, nn_mtime_t tnow)
{
struct whc_impl *whc = hc;
void *sample;
nn_mtime_t tnext;
ddsrt_mutex_lock (&whc->lock);
while ((tnext = lifespan_next_expired_locked (&whc->lifespan, tnow, &sample)).v == 0)
whc_delete_one (whc, sample);
whc->maxseq_node = whc_findmax_procedurally (whc);
ddsrt_mutex_unlock (&whc->lock);
return tnext;
}
#endif
struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth)
{
size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */
@ -384,6 +408,10 @@ struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdep
else
whc->idx_hash = NULL;
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_init (gv, &whc->lifespan, offsetof(struct whc_impl, lifespan), offsetof(struct whc_node, lifespan), whc_sample_expired_cb);
#endif
/* seq interval tree: always has an "open" node */
ddsrt_avl_init (&whc_seq_treedef, &whc->seq);
intv = ddsrt_malloc (sizeof (*intv));
@ -417,6 +445,11 @@ void whc_default_free (struct whc *whc_generic)
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
check_whc (whc);
#ifdef DDSI_INCLUDE_LIFESPAN
whc_sample_expired_cb (whc, NN_MTIME_NEVER);
lifespan_fini (&whc->lifespan);
#endif
if (whc->idx_hash)
{
struct ddsrt_hh_iter it;
@ -686,6 +719,10 @@ static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_i
whcn->unacked = 0;
}
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&whc->lifespan, &whcn->lifespan);
#endif
/* Take it out of seqhash; deleting it from the list ordered on
sequence numbers is left to the caller (it has to be done unconditionally,
but remove_acked_messages defers it until the end or a skipped node). */
@ -869,6 +906,9 @@ static uint32_t whc_default_remove_acked_messages_noidx (struct whc_impl *whc, s
whc->unacked_bytes -= (size_t) (whcn->total_bytes - (*deferred_free_list)->total_bytes + (*deferred_free_list)->size);
for (whcn = *deferred_free_list; whcn; whcn = whcn->next_seq)
{
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_unregister_sample_locked (&whc->lifespan, &whcn->lifespan);
#endif
remove_whcn_from_hash (whc, whcn);
assert (whcn->unacked);
}
@ -1035,10 +1075,16 @@ static uint32_t whc_default_remove_acked_messages (struct whc *whc_generic, seqn
return cnt;
}
static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata)
static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata)
{
struct whc_node *newn = NULL;
#ifndef DDSI_INCLUDE_LIFESPAN
/* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info'
that contains both lifespan and deadline info of the writer */
DDSRT_UNUSED_ARG (exp);
#endif
if ((newn = nn_freelist_pop (&whc_node_freelist)) == NULL)
newn = ddsrt_malloc (sizeof (*newn));
newn->seq = seq;
@ -1062,6 +1108,10 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma
if (newn->unacked)
whc->unacked_bytes += newn->size;
#ifdef DDSI_INCLUDE_LIFESPAN
newn->lifespan.t_expire = exp;
#endif
insert_whcn_in_hash (whc, newn);
if (whc->open_intv->first == NULL)
@ -1093,10 +1143,13 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma
}
whc->seq_size++;
#ifdef DDSI_INCLUDE_LIFESPAN
lifespan_register_sample_locked (&whc->lifespan, &newn->lifespan);
#endif
return newn;
}
static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
struct whc_node *newn = NULL;
@ -1106,6 +1159,9 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
char pad[sizeof (struct whc_idxnode) + sizeof (struct whc_node *)];
} template;
/* FIXME: the 'exp' arg is used for lifespan, refactor this parameter to a struct 'writer info'
that contains both lifespan als deadline info of the writer */
ddsrt_mutex_lock (&whc->lock);
check_whc (whc);
@ -1113,8 +1169,8 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
{
struct whc_state whcst;
get_state_locked (whc, &whcst);
TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%"PRIx32")\n",
(void *) whc, max_drop_seq, seq, (void *) plist, (void *) serdata, serdata->hash);
TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" exp %"PRId64" plist %p serdata %p:%"PRIx32")\n",
(void *) whc, max_drop_seq, seq, exp.v, (void *) plist, (void *) serdata, serdata->hash);
TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
}
@ -1128,7 +1184,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
assert (whc->seq_size == 0 || seq > whc->maxseq_node->seq);
/* Always insert in seq admin */
newn = whc_default_insert_seq (whc, max_drop_seq, seq, plist, serdata);
newn = whc_default_insert_seq (whc, max_drop_seq, seq, exp, plist, serdata);
TRACE (" whcn %p:", (void*)newn);

View file

@ -17,7 +17,7 @@
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_tkmap.h"
@ -28,7 +28,7 @@
struct bwhc {
struct whc common;
enum ddsi_sertopic_builtintopic_type type;
const struct ephash *guid_hash;
const struct entity_index *entidx;
};
enum bwhc_iter_state {
@ -42,7 +42,7 @@ struct bwhc_iter {
struct whc_sample_iter_base c;
enum bwhc_iter_state st;
bool have_sample;
struct ephash_enum it;
struct entidx_enum it;
};
/* check that our definition of whc_sample_iter fits in the type that callers allocate */
@ -92,11 +92,11 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
case DSBT_READER: kind = EK_READER; break;
}
assert (whc->type == DSBT_PARTICIPANT || kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, whc->guid_hash, kind);
entidx_enum_init (&it->it, whc->entidx, kind);
it->st = BIS_LOCAL;
/* FALLS THROUGH */
case BIS_LOCAL:
while ((entity = ephash_enum_next (&it->it)) != NULL)
while ((entity = entidx_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
@ -104,7 +104,7 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
entidx_enum_fini (&it->it);
it->st = BIS_INIT_PROXY;
}
/* FALLS THROUGH */
@ -115,11 +115,11 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
case DSBT_READER: kind = EK_PROXY_READER; break;
}
assert (kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, whc->guid_hash, kind);
entidx_enum_init (&it->it, whc->entidx, kind);
it->st = BIS_PROXY;
/* FALLS THROUGH */
case BIS_PROXY:
while ((entity = ephash_enum_next (&it->it)) != NULL)
while ((entity = entidx_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
@ -127,7 +127,7 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
entidx_enum_fini (&it->it);
return false;
}
}
@ -143,11 +143,12 @@ static void bwhc_get_state (const struct whc *whc, struct whc_state *st)
st->unacked_bytes = 0;
}
static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, nn_mtime_t exp, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
(void)whc;
(void)max_drop_seq;
(void)seq;
(void)exp;
(void)serdata;
(void)tk;
if (plist)
@ -192,11 +193,11 @@ static const struct whc_ops bwhc_ops = {
.free = bwhc_free
};
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct ephash *guid_hash)
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct entity_index *entidx)
{
struct bwhc *whc = ddsrt_malloc (sizeof (*whc));
whc->common.ops = &bwhc_ops;
whc->type = type;
whc->guid_hash = guid_hash;
whc->entidx = entidx;
return (struct whc *) whc;
}

View file

@ -20,7 +20,7 @@
#include "dds/ddsi/ddsi_serdata.h"
#include "dds__stream.h"
#include "dds/ddsi/q_transmit.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_radmin.h"
@ -99,7 +99,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
{
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
struct ddsi_writer_info pwr_info;
ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos);
ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos, payload->statusinfo);
for (uint32_t i = 0; rdary[i]; i++) {
DDS_CTRACE (&wr->e.gv->logconfig, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
@ -121,15 +121,15 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
ddsrt_avl_iter_t it;
struct pwr_rd_match *m;
struct ddsi_writer_info wrinfo;
const struct ephash *gh = wr->e.gv->guid_hash;
const struct entity_index *gh = wr->e.gv->entity_index;
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos);
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo);
ddsrt_mutex_lock (&wr->e.lock);
for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
DDS_CTRACE (&wr->e.gv->logconfig, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
/* Copied the return value ignore from DDSI deliver_user_data () function. */

View file

@ -19,6 +19,7 @@
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds__writer.h"
#include "dds__listener.h"
#include "dds__init.h"
@ -217,14 +218,28 @@ static dds_return_t dds_writer_delete (dds_entity *e)
return DDS_RETCODE_OK;
}
static dds_return_t validate_writer_qos (const dds_qos_t *wqos)
{
#ifndef DDSI_INCLUDE_LIFESPAN
if (wqos != NULL && (wqos->present & QP_LIFESPAN) && wqos->lifespan.duration != DDS_INFINITY)
return DDS_RETCODE_BAD_PARAMETER;
#else
DDSRT_UNUSED_ARG (wqos);
#endif
return DDS_RETCODE_OK;
}
static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled)
{
/* note: e->m_qos is still the old one to allow for failure here */
dds_return_t ret;
if ((ret = validate_writer_qos(qos)) != DDS_RETCODE_OK)
return ret;
if (enabled)
{
struct writer *wr;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((wr = ephash_lookup_writer_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (e->m_domain->gv.entity_index, &e->m_guid)) != NULL)
update_writer_qos (wr, qos);
thread_state_asleep (lookup_thread_state ());
}
@ -319,7 +334,8 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0);
nn_xqos_mergein_missing (wqos, &pub->m_entity.m_domain->gv.default_xqos_wr, ~(uint64_t)0);
if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0)
if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0 ||
(rc = validate_writer_qos(wqos)) != DDS_RETCODE_OK)
{
dds_delete_qos(wqos);
goto err_bad_qos;

View file

@ -14,6 +14,7 @@ include(CUnit)
idlc_generate(RoundTrip RoundTrip.idl)
idlc_generate(Space Space.idl)
idlc_generate(TypesArrayKey TypesArrayKey.idl)
idlc_generate(WriteTypes WriteTypes.idl)
set(ddsc_test_sources
"basic.c"
@ -51,15 +52,20 @@ set(ddsc_test_sources
"waitset.c"
"waitset_torture.c"
"write.c"
"write_various_types.c"
"writer.c")
if(ENABLE_LIFESPAN)
list(APPEND ddsc_test_sources "lifespan.c")
endif()
add_cunit_executable(cunit_ddsc ${ddsc_test_sources})
target_include_directories(
cunit_ddsc PRIVATE
"$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/src/include/>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsc/src>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsi/include>")
target_link_libraries(cunit_ddsc PRIVATE RoundTrip Space TypesArrayKey ddsc)
target_link_libraries(cunit_ddsc PRIVATE RoundTrip Space TypesArrayKey WriteTypes ddsc)
# Setup environment for config-tests
get_test_property(CUnit_ddsc_config_simple_udp ENVIRONMENT CUnit_ddsc_config_simple_udp_env)

View file

@ -0,0 +1,25 @@
module WriteTypes {
struct a {
octet k[3];
unsigned long long ll;
};
#pragma keylist a k
struct b {
unsigned short k[3];
unsigned long long ll;
};
#pragma keylist b k
struct c {
unsigned long k[3];
unsigned long long ll;
};
#pragma keylist c k
struct d {
unsigned long long k[3];
unsigned long long ll;
};
#pragma keylist d k
};

View file

@ -0,0 +1,164 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "Space.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_whc.h"
#include "dds__entity.h"
static dds_entity_t g_participant = 0;
static dds_entity_t g_subscriber = 0;
static dds_entity_t g_publisher = 0;
static dds_entity_t g_topic = 0;
static dds_entity_t g_reader = 0;
static dds_entity_t g_writer = 0;
static dds_entity_t g_waitset = 0;
static dds_entity_t g_rcond = 0;
static dds_entity_t g_qcond = 0;
static char*
create_topic_name(const char *prefix, char *name, size_t size)
{
/* Get semi random g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid();
(void) snprintf(name, size, "%s_pid%"PRIdPID"_tid%"PRIdTID"", prefix, pid, tid);
return name;
}
static void lifespan_init(void)
{
dds_attach_t triggered;
dds_return_t ret;
char name[100];
dds_qos_t *qos;
qos = dds_create_qos();
CU_ASSERT_PTR_NOT_NULL_FATAL(qos);
g_participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
CU_ASSERT_FATAL(g_participant > 0);
g_subscriber = dds_create_subscriber(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_subscriber > 0);
g_publisher = dds_create_publisher(g_participant, NULL, NULL);
CU_ASSERT_FATAL(g_publisher > 0);
g_waitset = dds_create_waitset(g_participant);
CU_ASSERT_FATAL(g_waitset > 0);
g_topic = dds_create_topic(g_participant, &Space_Type1_desc, create_topic_name("ddsc_qos_lifespan_test", name, sizeof name), NULL, NULL);
CU_ASSERT_FATAL(g_topic > 0);
dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
g_writer = dds_create_writer(g_publisher, g_topic, qos, NULL);
CU_ASSERT_FATAL(g_writer > 0);
g_reader = dds_create_reader(g_subscriber, g_topic, qos, NULL);
CU_ASSERT_FATAL(g_reader > 0);
/* Sync g_reader to g_writer. */
ret = dds_set_status_mask(g_reader, DDS_SUBSCRIPTION_MATCHED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_attach(g_waitset, g_reader, g_reader);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_wait(g_waitset, &triggered, 1, DDS_SECS(1));
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_EQUAL_FATAL(g_reader, (dds_entity_t)(intptr_t)triggered);
ret = dds_waitset_detach(g_waitset, g_reader);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
/* Sync g_writer to g_reader. */
ret = dds_set_status_mask(g_writer, DDS_PUBLICATION_MATCHED_STATUS);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_attach(g_waitset, g_writer, g_writer);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
ret = dds_waitset_wait(g_waitset, &triggered, 1, DDS_SECS(1));
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_EQUAL_FATAL(g_writer, (dds_entity_t)(intptr_t)triggered);
ret = dds_waitset_detach(g_waitset, g_writer);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
dds_delete_qos(qos);
}
static void lifespan_fini(void)
{
dds_delete(g_rcond);
dds_delete(g_qcond);
dds_delete(g_reader);
dds_delete(g_writer);
dds_delete(g_subscriber);
dds_delete(g_publisher);
dds_delete(g_waitset);
dds_delete(g_topic);
dds_delete(g_participant);
}
static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
{
struct dds_entity *wr_entity;
struct writer *wr;
struct whc_state whcst;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0);
thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv);
wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid);
CU_ASSERT_FATAL(wr != NULL);
assert(wr != NULL); /* for Clang's static analyzer */
whc_get_state(wr->whc, &whcst);
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(wr_entity);
CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min);
CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max);
}
CU_Test(ddsc_lifespan, basic, .init=lifespan_init, .fini=lifespan_fini)
{
Space_Type1 sample = { 0, 0, 0 };
dds_return_t ret;
dds_duration_t exp = DDS_MSECS(500);
dds_qos_t *qos;
qos = dds_create_qos();
CU_ASSERT_PTR_NOT_NULL_FATAL(qos);
/* Write with default qos: lifespan inifinite */
ret = dds_write (g_writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
check_whc_state(g_writer, 1, 1);
dds_sleepfor (2 * exp);
check_whc_state(g_writer, 1, 1);
dds_qset_lifespan(qos, exp);
ret = dds_set_qos(g_writer, qos);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
ret = dds_write (g_writer, &sample);
CU_ASSERT_EQUAL_FATAL (ret, DDS_RETCODE_OK);
check_whc_state(g_writer, 2, 2);
dds_sleepfor (2 * exp);
check_whc_state(g_writer, -1, -1);
dds_delete_qos(qos);
}

View file

@ -20,6 +20,7 @@
#include "dds/version.h"
#include "dds__entity.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsrt/cdtors.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/process.h"
@ -97,7 +98,7 @@ static seqno_t get_pmd_seqno(dds_entity_t participant)
struct writer *wr;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
pp = entidx_lookup_participant_guid(pp_entity->m_domain->gv.entity_index, &pp_entity->m_guid);
wr = get_builtin_writer(pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
CU_ASSERT_FATAL(wr != NULL);
assert(wr != NULL); /* for Clang's static analyzer */
@ -117,7 +118,7 @@ static dds_duration_t get_pmd_interval(dds_entity_t participant)
struct participant *pp;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
pp = entidx_lookup_participant_guid(pp_entity->m_domain->gv.entity_index, &pp_entity->m_guid);
intv = pp_get_pmd_interval(pp);
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(pp_entity);

View file

@ -0,0 +1,249 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "WriteTypes.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/environ.h"
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
#define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Tracing><OutputFile>cyclonedds_writetypes_various.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.log</OutputFile><Verbosity>finest</Verbosity></Tracing><Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
static uint32_t g_topic_nr = 0;
static dds_entity_t g_pub_domain = 0;
static dds_entity_t g_pub_participant = 0;
static dds_entity_t g_pub_publisher = 0;
static dds_entity_t g_sub_domain = 0;
static dds_entity_t g_sub_participant = 0;
static dds_entity_t g_sub_subscriber = 0;
static char *create_topic_name (const char *prefix, uint32_t nr, char *name, size_t size)
{
/* Get unique g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid ();
ddsrt_tid_t tid = ddsrt_gettid ();
(void) snprintf (name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid);
return name;
}
static void writetypes_init(void)
{
/* Domains for pub and sub use a different domain id, but the portgain setting
* in configuration is 0, so that both domains will map to the same port number.
* This allows to create two domains in a single test process. */
char *conf_pub = ddsrt_expand_envvars (DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB);
char *conf_sub = ddsrt_expand_envvars (DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB);
g_pub_domain = dds_create_domain (DDS_DOMAINID_PUB, conf_pub);
g_sub_domain = dds_create_domain (DDS_DOMAINID_SUB, conf_sub);
dds_free (conf_pub);
dds_free (conf_sub);
g_pub_participant = dds_create_participant (DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL (g_pub_participant > 0);
g_sub_participant = dds_create_participant (DDS_DOMAINID_SUB, NULL, NULL);
CU_ASSERT_FATAL (g_sub_participant > 0);
g_pub_publisher = dds_create_publisher (g_pub_participant, NULL, NULL);
CU_ASSERT_FATAL (g_pub_publisher > 0);
g_sub_subscriber = dds_create_subscriber (g_sub_participant, NULL, NULL);
CU_ASSERT_FATAL (g_sub_subscriber > 0);
}
static void writetypes_fini (void)
{
dds_delete (g_sub_subscriber);
dds_delete (g_pub_publisher);
dds_delete (g_sub_participant);
dds_delete (g_pub_participant);
dds_delete (g_sub_domain);
dds_delete (g_pub_domain);
}
typedef bool (*compare_fn_t) (const void *a, const void *b);
#define ABCD_CMP(typ_) \
static bool typ_##_cmp (const void *va, const void *vb) \
{ \
const struct WriteTypes_##typ_ *a = va; \
const struct WriteTypes_##typ_ *b = vb; \
return a->k[0] == b->k[0] && a->k[1] == b->k[1] && a->k[2] == b->k[2] && a->ll == b->ll; \
}
ABCD_CMP (a)
ABCD_CMP (b)
ABCD_CMP (c)
ABCD_CMP (d)
#undef ABCD_CMP
struct sample {
bool in_result;
const void *data;
};
#define S(n) &(struct WriteTypes_##n)
static const struct sample a_samples[] = {
{ 1, S(a) { .k={1,2,3}, .ll = UINT64_C (0x1234567890abcdef) } },
{ 0, S(a) { .k={3,2,1}, .ll = UINT64_C (0) } },
{ 1, S(a) { .k={3,2,1}, .ll = UINT64_C (1) } },
};
static const struct sample b_samples[] = {
{ 1, S(b) { .k={1001,1002,1003}, .ll = UINT64_C (0x1234567890abcdef) } },
{ 0, S(b) { .k={1003,1002,1001}, .ll = UINT64_C (0) } },
{ 1, S(b) { .k={1003,1002,1001}, .ll = UINT64_C (1) } },
};
static const struct sample c_samples[] = {
{ 1, S(c) { .k={12340001,12340002,12340003}, .ll = UINT64_C (0x1234567890abcdef) } },
{ 0, S(c) { .k={12340003,12340002,12340001}, .ll = UINT64_C (0) } },
{ 1, S(c) { .k={12340003,12340002,12340001}, .ll = UINT64_C (1) } },
};
static const struct sample d_samples[] = {
{ 1, S(d) { .k={123400056780001,2,3}, .ll = UINT64_C (0x1234567890abcdef) } },
{ 0, S(d) { .k={123400056780003,2,1}, .ll = UINT64_C (0) } },
{ 1, S(d) { .k={123400056780003,2,1}, .ll = UINT64_C (1) } },
};
#undef S
#define T(n) &WriteTypes_##n##_desc
#define C(n) &n##_cmp
#define N(n) (sizeof (n##_samples) / sizeof (n##_samples[0]))
#define S(n) n##_samples
CU_TheoryDataPoints(ddsc_writetypes, various) = {
CU_DataPoints(const dds_topic_descriptor_t *, T(a), T(b), T(c), T(d)),
CU_DataPoints(compare_fn_t, C(a), C(b), C(c), C(d)),
CU_DataPoints(size_t, N(a), N(b), N(c), N(d)),
CU_DataPoints(const struct sample *, S(a), S(b), S(c), S(d)),
};
#undef S
#undef N
#undef C
#undef T
#define MAX_SAMPLES 5
CU_Theory((const dds_topic_descriptor_t *desc, compare_fn_t cmp, size_t nsamples, const struct sample *samples), ddsc_writetypes, various, .init = writetypes_init, .fini = writetypes_fini, .timeout = 10)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writer;
dds_qos_t *qos;
dds_return_t rc;
char name[100];
/* nsamples < MAX_SAMPLES so there is room for an invalid sample if we need it */
CU_ASSERT_FATAL (nsamples < MAX_SAMPLES);
qos = dds_create_qos ();
CU_ASSERT_FATAL (qos != NULL);
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1));
dds_qset_writer_data_lifecycle (qos, false);
create_topic_name ("ddsc_writetypes_various", g_topic_nr++, name, sizeof name);
pub_topic = dds_create_topic (g_pub_participant, desc, name, qos, NULL);
CU_ASSERT_FATAL (pub_topic > 0);
sub_topic = dds_create_topic (g_sub_participant, desc, name, qos, NULL);
CU_ASSERT_FATAL (sub_topic > 0);
dds_delete_qos (qos);
reader = dds_create_reader (g_sub_participant, sub_topic, NULL, NULL);
CU_ASSERT_FATAL (reader > 0);
writer = dds_create_writer (g_pub_participant, pub_topic, NULL, NULL);
CU_ASSERT_FATAL (writer > 0);
/* simple-minded polling until reader/writer have matched each other */
while (1)
{
dds_publication_matched_status_t st;
rc = dds_get_publication_matched_status (writer, &st);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
if (st.current_count > 0)
break;
dds_sleepfor (DDS_MSECS (1));
}
while (1)
{
dds_subscription_matched_status_t st;
rc = dds_get_subscription_matched_status (reader, &st);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
if (st.current_count > 0)
break;
dds_sleepfor (DDS_MSECS (1));
}
/* write samples */
for (size_t i = 0; i < nsamples; i++) {
rc = dds_write (writer, samples[i].data);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
}
/* delete writer, wait until no matching writer: writer lingering should ensure the data
has been delivered at that point */
rc = dds_delete (writer);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
while (1)
{
dds_subscription_matched_status_t st;
rc = dds_get_subscription_matched_status (reader, &st);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
if (st.current_count == 0)
break;
dds_sleepfor (DDS_MSECS (1));
}
/* instances are unordered; this is a woefully inefficient way of comparing the sets,
but for the numbers of samples we do here, it really doesn't matter */
dds_sample_info_t si[MAX_SAMPLES];
void *xs[MAX_SAMPLES];
xs[0] = NULL;
int32_t n;
n = dds_read (reader, xs, si, MAX_SAMPLES, MAX_SAMPLES);
CU_ASSERT_FATAL (n > 0);
size_t nvalid = 0;
for (int32_t j = 0; j < n; j++)
{
if (si[j].valid_data)
nvalid++;
}
for (size_t i = 0; i < nsamples; i++)
{
if (samples[i].in_result)
{
/* sample must be present, erase it by marking it invalid */
int32_t j;
for (j = 0; j < n; j++)
if (si[j].valid_data && cmp (samples[i].data, xs[j]))
break;
CU_ASSERT (j < n);
si[j].valid_data = 0;
nvalid--;
}
}
/* all valid samples must be accounted for */
CU_ASSERT_FATAL (nvalid == 0);
rc = dds_return_loan (reader, xs, n);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
/* cleanup */
rc = dds_delete (reader);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_delete (sub_topic);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_delete (pub_topic);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
}