diff --git a/src/core/ddsc/src/dds__rhc.h b/src/core/ddsc/src/dds__rhc.h index 3007692..a3b3cb7 100644 --- a/src/core/ddsc/src/dds__rhc.h +++ b/src/core/ddsc/src/dds__rhc.h @@ -64,7 +64,7 @@ dds_rhc_take( void dds_rhc_set_qos (struct rhc * rhc, const struct nn_xqos * qos); -void dds_rhc_add_readcondition (dds_readcond * cond); +bool dds_rhc_add_readcondition (dds_readcond * cond); void dds_rhc_remove_readcondition (dds_readcond * cond); bool dds_rhc_add_waitset (dds_readcond * cond, dds_waitset * waitset, dds_attach_t x); diff --git a/src/core/ddsc/src/dds_querycond.c b/src/core/ddsc/src/dds_querycond.c index 5da179c..73cbda1 100644 --- a/src/core/ddsc/src/dds_querycond.c +++ b/src/core/ddsc/src/dds_querycond.c @@ -34,8 +34,14 @@ dds_create_querycondition( if (rc == DDS_RETCODE_OK) { dds_readcond *cond = dds_create_readcond(r, DDS_KIND_COND_QUERY, mask, filter); assert(cond); - hdl = cond->m_entity.m_hdl; + const bool success = (cond->m_entity.m_deriver.delete != 0); dds_reader_unlock(r); + if (success) { + hdl = cond->m_entity.m_hdl; + } else { + dds_delete (cond->m_entity.m_hdl); + hdl = DDS_ERRNO(DDS_RETCODE_OUT_OF_RESOURCES); + } } else { DDS_ERROR("Error occurred on locking reader\n"); hdl = DDS_ERRNO(rc); diff --git a/src/core/ddsc/src/dds_readcond.c b/src/core/ddsc/src/dds_readcond.c index 503e072..b6cdc00 100644 --- a/src/core/ddsc/src/dds_readcond.c +++ b/src/core/ddsc/src/dds_readcond.c @@ -47,7 +47,12 @@ dds_create_readcond( cond->m_query.m_filter = filter; cond->m_query.m_index = UINT_MAX; } - dds_rhc_add_readcondition (cond); + if (!dds_rhc_add_readcondition (cond)) { + /* FIXME: current entity management code can't deal with an error late in the creation of the + entity because it doesn't allow deleting it again ... instead use a hack to signal a problem + to the caller and let that one handle it. */ + cond->m_entity.m_deriver.delete = 0; + } return cond; } @@ -65,6 +70,7 @@ dds_create_readcondition( if (rc == DDS_RETCODE_OK) { dds_readcond *cond = dds_create_readcond(rd, DDS_KIND_COND_READ, mask, 0); assert(cond); + assert(cond->m_entity.m_deriver.delete); hdl = cond->m_entity.m_hdl; dds_reader_unlock(rd); } else { diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index ec3f2ab..bcab20d 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -224,8 +224,7 @@ void lwregs_dump (struct lwregs *rt) ****** RHC ****** *************************/ -struct rhc_sample -{ +struct rhc_sample { struct ddsi_serdata *sample; /* serialised data (either just_key or real data) */ struct rhc_sample *next; /* next sample in time ordering, or oldest sample if most recent */ uint64_t wr_iid; /* unique id for writer of this sample (perhaps better in serdata) */ @@ -235,8 +234,7 @@ struct rhc_sample unsigned no_writers_gen; /* __/ */ }; -struct rhc_instance -{ +struct rhc_instance { uint64_t iid; /* unique instance id, key of table, also serves as instance handle */ uint64_t wr_iid; /* unique of id of writer of latest sample or 0; if wrcount = 0 it is the wr_iid that caused */ struct rhc_sample *latest; /* latest received sample; circular list old->new; null if no sample */ @@ -262,16 +260,13 @@ struct rhc_instance struct rhc_sample a_sample; /* pre-allocated storage for 1 sample */ }; -typedef enum rhc_store_result -{ +typedef enum rhc_store_result { RHC_STORED, RHC_FILTERED, RHC_REJECTED -} -rhc_store_result_t; +} rhc_store_result_t; -struct rhc -{ +struct rhc { struct ut_hh *instances; struct rhc_instance *nonempty_instances; /* circular, points to most recently added one, NULL if none */ struct lwregs registrations; /* should be a global one (with lock-free lookups) */ @@ -308,14 +303,34 @@ struct rhc querycond_mask_t qconds_samplest; /* Mask of associated query conditions that check the sample state */ }; -struct trigger_info -{ +struct trigger_info_cmn { unsigned qminst; bool has_read; bool has_not_read; bool has_changed; }; +struct trigger_info_pre { + struct trigger_info_cmn c; +}; + +struct trigger_info_qcond { + /* 0 or inst->conds depending on whether an invalid/valid sample was pushed out/added; + inc_xxx_read is there so read can indicate a sample changed from unread to read */ + bool dec_invsample_read; + bool dec_sample_read; + bool inc_invsample_read; + bool inc_sample_read; + querycond_mask_t dec_conds_invsample; + querycond_mask_t dec_conds_sample; + querycond_mask_t inc_conds_invsample; + querycond_mask_t inc_conds_sample; +}; + +struct trigger_info_post { + struct trigger_info_cmn c; +}; + static unsigned qmask_of_sample (const struct rhc_sample *s) { return s->isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE; @@ -352,7 +367,7 @@ static bool inst_has_unread (const struct rhc_instance *i) } static unsigned qmask_of_inst (const struct rhc_instance *inst); -static bool update_conditions_locked (struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct ddsi_serdata *sample); +static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst); #ifndef NDEBUG static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds); #endif @@ -505,40 +520,52 @@ static void free_sample (struct rhc_instance *inst, struct rhc_sample *s) } } -static void inst_clear_invsample (struct rhc *rhc, struct rhc_instance *inst) +static void inst_clear_invsample (struct rhc *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc) { assert (inst->inv_exists); + assert (trig_qc->dec_conds_invsample == 0); inst->inv_exists = 0; + trig_qc->dec_conds_invsample = inst->conds; if (inst->inv_isread) { + trig_qc->dec_invsample_read = true; rhc->n_invread--; } rhc->n_invsamples--; } -static void inst_clear_invsample_if_exists (struct rhc *rhc, struct rhc_instance *inst) +static void inst_clear_invsample_if_exists (struct rhc *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc) { if (inst->inv_exists) + inst_clear_invsample (rhc, inst, trig_qc); +} + +static void inst_set_invsample (struct rhc *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc) +{ + if (!inst->inv_exists || inst->inv_isread) { - inst_clear_invsample (rhc, inst); + /* Obviously optimisable, but that is perhaps not worth the bother */ + inst_clear_invsample_if_exists (rhc, inst, trig_qc); + assert (trig_qc->inc_conds_invsample == 0); + trig_qc->inc_conds_invsample = inst->conds; + inst->inv_exists = 1; + inst->inv_isread = 0; + rhc->n_invsamples++; } } -static void inst_set_invsample (struct rhc *rhc, struct rhc_instance *inst) +static void free_empty_instance (struct rhc_instance *inst) { - /* Obviously optimisable, but that is perhaps not worth the bother */ - inst_clear_invsample_if_exists (rhc, inst); - inst->inv_exists = 1; - inst->inv_isread = 0; - rhc->n_invsamples++; + assert (inst_is_empty (inst)); + ddsi_tkmap_instance_unref (inst->tk); + os_free (inst); } -static void free_instance (void *vnode, void *varg) +static void free_instance_rhc_free (struct rhc_instance *inst, struct rhc *rhc) { - struct rhc *rhc = varg; - struct rhc_instance *inst = vnode; struct rhc_sample *s = inst->latest; const bool was_empty = inst_is_empty (inst); + struct trigger_info_qcond dummy_trig_qc; if (s) { do { @@ -551,7 +578,10 @@ static void free_instance (void *vnode, void *varg) inst->nvsamples = 0; inst->nvread = 0; } - inst_clear_invsample_if_exists (rhc, inst); +#ifndef NDEBUG + memset (&dummy_trig_qc, 0, sizeof (dummy_trig_qc)); +#endif + inst_clear_invsample_if_exists (rhc, inst, &dummy_trig_qc); if (!was_empty) { remove_inst_from_nonempty_list (rhc, inst); @@ -572,10 +602,15 @@ uint32_t dds_rhc_lock_samples (struct rhc *rhc) return no; } +static void free_instance_rhc_free_wrap (void *vnode, void *varg) +{ + free_instance_rhc_free (vnode, varg); +} + void dds_rhc_free (struct rhc *rhc) { assert (rhc_check_counts_locked (rhc, true)); - ut_hhEnum (rhc->instances, free_instance, rhc); + ut_hhEnum (rhc->instances, free_instance_rhc_free_wrap, rhc); assert (rhc->nonempty_instances == NULL); ut_hhFree (rhc->instances); lwregs_fini (&rhc->registrations); @@ -597,7 +632,7 @@ void dds_rhc_fini (struct rhc * rhc) } } -static void init_trigger_info_nonmatch (struct trigger_info *info) +static void init_trigger_info_cmn_nonmatch (struct trigger_info_cmn *info) { info->qminst = ~0u; info->has_read = false; @@ -605,33 +640,44 @@ static void init_trigger_info_nonmatch (struct trigger_info *info) info->has_changed = false; } -static void get_trigger_info (struct trigger_info *info, struct rhc_instance *inst, bool pre) +static void get_trigger_info_cmn (struct trigger_info_cmn *info, struct rhc_instance *inst) { info->qminst = qmask_of_inst (inst); info->has_read = inst_has_read (inst); info->has_not_read = inst_has_unread (inst); - /* reset instance has_changed before adding/overwriting a sample */ - if (pre) - { - inst->has_changed = 0; - } info->has_changed = inst->has_changed; } -static bool trigger_info_differs (const struct trigger_info *pre, const struct trigger_info *post) +static void get_trigger_info_pre (struct trigger_info_pre *info, struct rhc_instance *inst) { - return pre->qminst != post->qminst || pre->has_read != post->has_read || pre->has_not_read != post->has_not_read || - pre->has_changed != post->has_changed; + inst->has_changed = 0; + get_trigger_info_cmn (&info->c, inst); } -static bool add_sample -( - struct rhc * rhc, - struct rhc_instance * inst, - const struct proxy_writer_info * pwr_info, - const struct ddsi_serdata * sample, - status_cb_data_t * cb_data -) +static void init_trigger_info_qcond (struct trigger_info_qcond *qc) +{ + qc->dec_invsample_read = false; + qc->dec_sample_read = false; + qc->inc_invsample_read = false; + qc->inc_sample_read = false; + qc->dec_conds_invsample = 0; + qc->dec_conds_sample = 0; + qc->inc_conds_invsample = 0; + qc->inc_conds_sample = 0; +} + +static bool trigger_info_differs (const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc) +{ + return (pre->c.qminst != post->c.qminst || + pre->c.has_read != post->c.has_read || + pre->c.has_not_read != post->c.has_not_read || + pre->c.has_changed != post->c.has_changed || + trig_qc->dec_conds_invsample != trig_qc->inc_conds_invsample || + trig_qc->dec_conds_sample != trig_qc->inc_conds_sample || + trig_qc->dec_invsample_read != trig_qc->inc_invsample_read); +} + +static bool add_sample (struct rhc *rhc, struct rhc_instance *inst, const struct proxy_writer_info *pwr_info, const struct ddsi_serdata *sample, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc) { struct rhc_sample *s; @@ -649,10 +695,14 @@ static bool add_sample /* replace oldest sample; latest points to the latest one, the list is circular from old -> new, so latest->next is the oldest */ - inst_clear_invsample_if_exists (rhc, inst); + inst_clear_invsample_if_exists (rhc, inst, trig_qc); assert (inst->latest != NULL); s = inst->latest->next; + assert (trig_qc->dec_conds_sample == 0); ddsi_serdata_unref (s->sample); + + trig_qc->dec_sample_read = s->isread; + trig_qc->dec_conds_sample = s->conds; if (s->isread) { inst->nvread--; @@ -689,7 +739,7 @@ static bool add_sample /* add new latest sample */ s = alloc_sample (inst); - inst_clear_invsample_if_exists (rhc, inst); + inst_clear_invsample_if_exists (rhc, inst, trig_qc); if (inst->latest == NULL) { s->next = s; @@ -719,6 +769,7 @@ static bool add_sample } } + trig_qc->inc_conds_sample = s->conds; inst->latest = s; return true; } @@ -729,10 +780,10 @@ static bool content_filter_accepts (const struct ddsi_sertopic *sertopic, const const struct dds_topic *tp = sertopic->status_cb_entity; if (tp->filter_fn) { - char *tmp = ddsi_sertopic_alloc_sample (tp->m_stopic); + char *tmp = ddsi_sertopic_alloc_sample (sertopic); ddsi_serdata_to_sample (sample, tmp, NULL, NULL); ret = (tp->filter_fn) (tmp, tp->filter_ctx); - ddsi_sertopic_free_sample (tp->m_stopic, tmp, DDS_FREE_ALL); + ddsi_sertopic_free_sample (sertopic, tmp, DDS_FREE_ALL); } return ret; } @@ -742,12 +793,7 @@ static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, return (inst->wr_iid_islive && inst->wr_iid == pwr_info->iid) || memcmp (&pwr_info->guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0; } -static int inst_accepts_sample -( - const struct rhc *rhc, const struct rhc_instance *inst, - const struct proxy_writer_info *pwr_info, - const struct ddsi_serdata *sample, const bool has_data -) +static int inst_accepts_sample (const struct rhc *rhc, const struct rhc_instance *inst, const struct proxy_writer_info *pwr_info, const struct ddsi_serdata *sample, const bool has_data) { if (rhc->by_source_ordering) { @@ -788,12 +834,8 @@ static int inst_accepts_sample return 1; } -static void update_inst -( - const struct rhc *rhc, struct rhc_instance *inst, - const struct proxy_writer_info * __restrict pwr_info, bool wr_iid_valid, nn_wctime_t tstamp) +static void update_inst (struct rhc_instance *inst, const struct proxy_writer_info * __restrict pwr_info, bool wr_iid_valid, nn_wctime_t tstamp) { - (void)rhc; inst->tstamp = tstamp; inst->wr_iid_islive = wr_iid_valid; if (wr_iid_valid) @@ -816,7 +858,7 @@ static void drop_instance_noupdate_no_writers (struct rhc *rhc, struct rhc_insta assert (ret); (void) ret; - free_instance (inst, rhc); + free_empty_instance (inst); } static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update) @@ -988,10 +1030,7 @@ static int rhc_unregister_isreg_w_sideeffects (struct rhc *rhc, const struct rhc } } -static int rhc_unregister_updateinst -( - struct rhc *rhc, struct rhc_instance *inst, - const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp) +static int rhc_unregister_updateinst (struct rhc *rhc, struct rhc_instance *inst, const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp, struct trigger_info_qcond *trig_qc) { assert (inst->wrcount > 0); @@ -1023,8 +1062,8 @@ static int rhc_unregister_updateinst care.) */ if (inst->latest == NULL || inst->latest->isread) { - inst_set_invsample (rhc, inst); - update_inst (rhc, inst, pwr_info, false, tstamp); + inst_set_invsample (rhc, inst, trig_qc); + update_inst (inst, pwr_info, false, tstamp); } if (!inst->isdisposed) { @@ -1045,8 +1084,8 @@ static int rhc_unregister_updateinst /* Add invalid samples for transition to no-writers */ TRACE (",#0,empty,nowriters"); assert (inst_is_empty (inst)); - inst_set_invsample (rhc, inst); - update_inst (rhc, inst, pwr_info, false, tstamp); + inst_set_invsample (rhc, inst, trig_qc); + update_inst (inst, pwr_info, false, tstamp); account_for_empty_to_nonempty_transition (rhc, inst); inst->wr_iid_islive = 0; return 0; @@ -1054,28 +1093,24 @@ static int rhc_unregister_updateinst } } -static void dds_rhc_unregister -( - struct trigger_info *post, struct rhc *rhc, struct rhc_instance *inst, - const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp -) +static void dds_rhc_unregister (struct rhc *rhc, struct rhc_instance *inst, const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc) { /* 'post' always gets set; instance may have been freed upon return. */ TRACE (" unregister:"); if (!rhc_unregister_isreg_w_sideeffects (rhc, inst, pwr_info->iid)) { /* other registrations remain */ - get_trigger_info (post, inst, false); + get_trigger_info_cmn (&post->c, inst); } - else if (rhc_unregister_updateinst (rhc, inst, pwr_info, tstamp)) + else if (rhc_unregister_updateinst (rhc, inst, pwr_info, tstamp, trig_qc)) { /* instance dropped */ - init_trigger_info_nonmatch (post); + init_trigger_info_cmn_nonmatch (&post->c); } else { /* no writers remain, but instance not empty */ - get_trigger_info (post, inst, false); + get_trigger_info_cmn (&post->c, inst); } } @@ -1114,16 +1149,7 @@ static struct rhc_instance *alloc_new_instance (const struct rhc *rhc, const str return inst; } -static rhc_store_result_t rhc_store_new_instance -( - struct trigger_info * post, - struct rhc *rhc, - const struct proxy_writer_info *pwr_info, - struct ddsi_serdata *sample, - struct ddsi_tkmap_instance *tk, - const bool has_data, - status_cb_data_t * cb_data -) +static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst, struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk, const bool has_data, status_cb_data_t *cb_data, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc) { struct rhc_instance *inst; int ret; @@ -1161,16 +1187,16 @@ static rhc_store_result_t rhc_store_new_instance inst = alloc_new_instance (rhc, pwr_info, sample, tk); if (has_data) { - if (!add_sample (rhc, inst, pwr_info, sample, cb_data)) + if (!add_sample (rhc, inst, pwr_info, sample, cb_data, trig_qc)) { - free_instance (inst, rhc); + free_empty_instance (inst); return RHC_REJECTED; } } else { if (inst->isdisposed) { - inst_set_invsample(rhc, inst); + inst_set_invsample (rhc, inst, trig_qc); } } @@ -1179,8 +1205,9 @@ static rhc_store_result_t rhc_store_new_instance assert (ret); (void) ret; rhc->n_instances++; - get_trigger_info (post, inst, false); + get_trigger_info_cmn (&post->c, inst); + *out_inst = inst; return RHC_STORED; } @@ -1189,11 +1216,7 @@ static rhc_store_result_t rhc_store_new_instance delivered (true unless a reliable sample rejected). */ -bool dds_rhc_store -( - struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, - struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk -) +bool dds_rhc_store (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) { const uint64_t wr_iid = pwr_info->iid; const unsigned statusinfo = sample->statusinfo; @@ -1201,11 +1224,12 @@ bool dds_rhc_store const int is_dispose = (statusinfo & NN_STATUSINFO_DISPOSE) != 0; struct rhc_instance dummy_instance; struct rhc_instance *inst; - struct trigger_info pre, post; + struct trigger_info_pre pre; + struct trigger_info_post post; + struct trigger_info_qcond trig_qc; bool trigger_waitsets; rhc_store_result_t stored; status_cb_data_t cb_data; /* Callback data for reader status callback */ - bool notify_data_available = true; bool delivered = true; TRACE ("rhc_store(%"PRIx64",%"PRIx64" si %x has_data %d:", tk->m_iid, wr_iid, statusinfo, has_data); @@ -1222,6 +1246,8 @@ bool dds_rhc_store stored = RHC_FILTERED; cb_data.raw_status_id = -1; + init_trigger_info_qcond (&trig_qc); + os_mutexLock (&rhc->lock); inst = ut_hhLookup (rhc->instances, &dummy_instance); @@ -1239,12 +1265,12 @@ bool dds_rhc_store else { TRACE (" new instance"); - stored = rhc_store_new_instance (&post, rhc, pwr_info, sample, tk, has_data, &cb_data); + stored = rhc_store_new_instance (&inst, rhc, pwr_info, sample, tk, has_data, &cb_data, &post, &trig_qc); if (stored != RHC_STORED) { goto error_or_nochange; } - init_trigger_info_nonmatch (&pre); + init_trigger_info_cmn_nonmatch (&pre.c); } } else if (!inst_accepts_sample (rhc, inst, pwr_info, sample, has_data)) @@ -1256,19 +1282,18 @@ bool dds_rhc_store a rejected sample - (no one knows, it seemed) */ TRACE (" instance rejects sample"); - - get_trigger_info (&pre, inst, true); + get_trigger_info_pre (&pre, inst); if (has_data || is_dispose) { dds_rhc_register (rhc, inst, wr_iid, false); } if (statusinfo & NN_STATUSINFO_UNREGISTER) { - dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->timestamp); + dds_rhc_unregister (rhc, inst, pwr_info, sample->timestamp, &post, &trig_qc); } else { - get_trigger_info (&post, inst, false); + get_trigger_info_cmn (&post.c, inst); } /* notify sample lost */ @@ -1282,7 +1307,7 @@ bool dds_rhc_store } else { - get_trigger_info (&pre, inst, true); + get_trigger_info_pre (&pre, inst); TRACE (" wc %"PRIu32, inst->wrcount); @@ -1340,8 +1365,8 @@ bool dds_rhc_store is a sample. */ if (has_data) { - if (! add_sample (rhc, inst, pwr_info, sample, &cb_data)) TRACE (" add_sample"); + if (!add_sample (rhc, inst, pwr_info, sample, &cb_data, &trig_qc)) { TRACE ("(reject)"); stored = RHC_REJECTED; @@ -1356,10 +1381,10 @@ bool dds_rhc_store } /* If instance became disposed, add an invalid sample if there are no samples left */ - if (inst_became_disposed && (inst->latest == NULL )) - inst_set_invsample (rhc, inst); + if (inst_became_disposed && inst->latest == NULL) + inst_set_invsample (rhc, inst, &trig_qc); - update_inst (rhc, inst, pwr_info, true, sample->timestamp); + update_inst (inst, pwr_info, true, sample->timestamp); /* Can only add samples => only need to give special treatment to instances that were empty before. It is, however, not @@ -1403,41 +1428,46 @@ bool dds_rhc_store mean an application reading "x" after the write and reading it again after the unregister will see a change in the no_writers_generation field? */ - dds_rhc_unregister (&post, rhc, inst, pwr_info, sample->timestamp); + dds_rhc_unregister (rhc, inst, pwr_info, sample->timestamp, &post, &trig_qc); } else { - get_trigger_info (&post, inst, false); + get_trigger_info_cmn (&post.c, inst); } } TRACE (")\n"); + const bool update_read_conditions = trigger_info_differs (&pre, &post, &trig_qc); + bool notify_data_available; /* do not send data available notification when an instance is dropped */ - if ((post.qminst == ~0u) && (post.has_read == 0) && (post.has_not_read == 0) && (post.has_changed == false)) - { + if ((post.c.qminst == ~0u) && (post.c.has_read == 0) && (post.c.has_not_read == 0) && (post.c.has_changed == false)) notify_data_available = false; - } - trigger_waitsets = trigger_info_differs (&pre, &post) - && update_conditions_locked (rhc, &pre, &post, sample); + else /* FIXME: now that trigger_info_differs incorporates details on samples added/removed, this might well be wrong */ + notify_data_available = update_read_conditions; + + if (update_read_conditions) + trigger_waitsets = update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst); + else + trigger_waitsets = false; assert (rhc_check_counts_locked (rhc, true)); os_mutexUnlock (&rhc->lock); - if (notify_data_available && (trigger_info_differs (&pre, &post))) + if (rhc->reader) { - if (rhc->reader && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) + if (notify_data_available && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) { os_atomic_inc32 (&rhc->n_cbs); dds_reader_data_available_cb (rhc->reader); os_atomic_dec32 (&rhc->n_cbs); } - } - if (rhc->reader && trigger_waitsets) - { + if (trigger_waitsets) + { dds_entity_status_signal(&rhc->reader->m_entity); + } } return delivered; @@ -1464,11 +1494,7 @@ error_or_nochange: return delivered; } -void dds_rhc_unregister_wr -( - struct rhc * __restrict rhc, - const struct proxy_writer_info * __restrict pwr_info -) +void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info) { /* Only to be called when writer with ID WR_IID has died. @@ -1497,8 +1523,11 @@ void dds_rhc_unregister_wr { if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid)) { - struct trigger_info pre, post; - get_trigger_info (&pre, inst, true); + struct trigger_info_pre pre; + struct trigger_info_post post; + struct trigger_info_qcond trig_qc; + get_trigger_info_pre (&pre, inst); + init_trigger_info_qcond (&trig_qc); TRACE (" %"PRIx64":", inst->iid); @@ -1516,7 +1545,7 @@ void dds_rhc_unregister_wr else { const bool was_empty = inst_is_empty (inst); - inst_set_invsample (rhc, inst); + inst_set_invsample (rhc, inst, &trig_qc); if (was_empty) account_for_empty_to_nonempty_transition (rhc, inst); else @@ -1524,18 +1553,12 @@ void dds_rhc_unregister_wr } } - dds_rhc_unregister (&post, rhc, inst, pwr_info, inst->tstamp); + dds_rhc_unregister (rhc, inst, pwr_info, inst->tstamp, &post, &trig_qc); TRACE ("\n"); - if (trigger_info_differs (&pre, &post)) - { - if (update_conditions_locked (rhc, &pre, &post, NULL)) - { - trigger_waitsets = true; - } - } - + if (trigger_info_differs (&pre, &post, &trig_qc) && update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst)) + trigger_waitsets = true; assert (rhc_check_counts_locked (rhc, true)); } } @@ -1543,9 +1566,7 @@ void dds_rhc_unregister_wr os_mutexUnlock (&rhc->lock); if (trigger_waitsets) - { - dds_entity_status_signal(&rhc->reader->m_entity); - } + dds_entity_status_signal (&rhc->reader->m_entity); } void dds_rhc_relinquish_ownership (struct rhc * __restrict rhc, const uint64_t wr_iid) @@ -1703,11 +1724,45 @@ static void patch_generations (dds_sample_info_t *si, uint32_t last_of_inst) } } -static int dds_rhc_read_w_qminv -( - struct rhc *rhc, bool lock, void ** values, dds_sample_info_t *info_seq, - uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond -) +static bool read_sample_update_conditions (struct rhc *rhc, struct trigger_info_pre *pre, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, struct rhc_instance *inst, querycond_mask_t conds, bool sample_wasread) +{ + /* No query conditions that are dependent on sample states */ + if (rhc->qconds_samplest == 0) + return false; + + /* Some, but perhaps none that matches this sample */ + if ((conds & rhc->qconds_samplest) == 0) + return false; + + TRACE("read_sample_update_conditions\n"); + trig_qc->dec_conds_sample = trig_qc->inc_conds_sample = conds; + trig_qc->dec_sample_read = sample_wasread; + trig_qc->inc_sample_read = true; + get_trigger_info_cmn (&post->c, inst); + const bool trigger_waitsets = update_conditions_locked (rhc, false, pre, post, trig_qc, inst); + trig_qc->dec_conds_sample = trig_qc->inc_conds_sample = 0; + pre->c = post->c; + return trigger_waitsets; +} + +static bool take_sample_update_conditions (struct rhc *rhc, struct trigger_info_pre *pre, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, struct rhc_instance *inst, querycond_mask_t conds, bool sample_wasread) +{ + /* Mostly the same as read_...: but we are deleting samples (so no "inc sample") and need to process all query conditions that match this sample. */ + if (rhc->nqconds == 0 || conds == 0) + return false; + + TRACE("take_sample_update_conditions\n"); + trig_qc->dec_conds_sample = conds; + trig_qc->dec_sample_read = sample_wasread; + get_trigger_info_cmn (&post->c, inst); + const bool trigger_waitsets = update_conditions_locked (rhc, false, pre, post, trig_qc, inst); + trig_qc->dec_conds_sample = 0; + pre->c = post->c; + return trigger_waitsets; +} + + +static int dds_rhc_read_w_qminv (struct rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond) { bool trigger_waitsets = false; uint32_t n = 0; @@ -1725,6 +1780,7 @@ static int dds_rhc_read_w_qminv if (rhc->nonempty_instances) { + const querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? (querycond_mask_t)1 << cond->m_query.m_index : 0; struct rhc_instance * inst = rhc->nonempty_instances->next; struct rhc_instance * const end = inst; do @@ -1734,42 +1790,36 @@ static int dds_rhc_read_w_qminv if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0) { /* samples present & instance, view state matches */ - struct trigger_info pre, post; + struct trigger_info_pre pre; + struct trigger_info_post post; + struct trigger_info_qcond trig_qc; const unsigned nread = inst_nread (inst); const uint32_t n_first = n; - get_trigger_info (&pre, inst, true); + get_trigger_info_pre (&pre, inst); + init_trigger_info_qcond (&trig_qc); if (inst->latest) { struct rhc_sample *sample = inst->latest->next, * const end1 = sample; do { - if ((qmask_of_sample (sample) & qminv) == 0) + if ((qmask_of_sample (sample) & qminv) == 0 && (qcmask == 0 || (sample->conds & qcmask))) { /* sample state matches too */ set_sample_info (info_seq + n, inst, sample); ddsi_serdata_to_sample (sample->sample, values[n], 0, 0); - if (cond == NULL - || (dds_entity_kind_from_handle(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY) - || (cond->m_query.m_filter != NULL && cond->m_query.m_filter(values[n]))) + if (!sample->isread) { - if (!sample->isread) - { - TRACE ("s"); - sample->isread = true; - inst->nvread++; - rhc->n_vread++; - } - - if (++n == max_samples) - { - break; - } + TRACE ("s"); + if (read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, false)) + trigger_waitsets = true; + sample->isread = true; + inst->nvread++; + rhc->n_vread++; } - else + if (++n == max_samples) { - /* The filter didn't match, so free the deserialised copy. */ - ddsi_sertopic_free_sample (rhc->topic, values[n], DDS_FREE_CONTENTS); + break; } } sample = sample->next; @@ -1777,12 +1827,15 @@ static int dds_rhc_read_w_qminv while (sample != end1); } - if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0) + if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask))) { set_sample_info_invsample (info_seq + n, inst); ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0); if (!inst->inv_isread) { + TRACE ("i"); + if (read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, false)) + trigger_waitsets = true; inst->inv_isread = 1; rhc->n_invread++; } @@ -1798,15 +1851,17 @@ static int dds_rhc_read_w_qminv } if (nread != inst_nread (inst) || inst_became_old) { - get_trigger_info (&post, inst, false); - if (update_conditions_locked (rhc, &pre, &post, NULL)) - { + get_trigger_info_cmn (&post.c, inst); + assert (trig_qc.dec_conds_invsample == 0); + assert (trig_qc.dec_conds_sample == 0); + assert (trig_qc.inc_conds_invsample == 0); + assert (trig_qc.inc_conds_sample == 0); + if (update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst)) trigger_waitsets = true; - } } if (n > n_first) { - patch_generations (info_seq + n_first, n - n_first - 1); + patch_generations (info_seq + n_first, n - n_first - 1); } } if (inst->iid == handle) @@ -1823,19 +1878,13 @@ static int dds_rhc_read_w_qminv os_mutexUnlock (&rhc->lock); if (trigger_waitsets) - { - dds_entity_status_signal(&rhc->reader->m_entity); - } + dds_entity_status_signal (&rhc->reader->m_entity); assert (n <= INT_MAX); return (int)n; } -static int dds_rhc_take_w_qminv -( - struct rhc *rhc, bool lock, void ** values, dds_sample_info_t *info_seq, - uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond -) +static int dds_rhc_take_w_qminv (struct rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond) { bool trigger_waitsets = false; uint64_t iid; @@ -1854,6 +1903,7 @@ static int dds_rhc_take_w_qminv if (rhc->nonempty_instances) { + const querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? (querycond_mask_t)1 << cond->m_query.m_index : 0; struct rhc_instance *inst = rhc->nonempty_instances->next; unsigned n_insts = rhc->n_nonempty_instances; while (n_insts-- > 0 && n < max_samples) @@ -1864,73 +1914,74 @@ static int dds_rhc_take_w_qminv { if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0) { - struct trigger_info pre, post; + struct trigger_info_pre pre; + struct trigger_info_post post; + struct trigger_info_qcond trig_qc; unsigned nvsamples = inst->nvsamples; const uint32_t n_first = n; - get_trigger_info (&pre, inst, true); + get_trigger_info_pre (&pre, inst); + init_trigger_info_qcond (&trig_qc); if (inst->latest) { - struct rhc_sample * psample = inst->latest; - struct rhc_sample * sample = psample->next; + struct rhc_sample *psample = inst->latest; + struct rhc_sample *sample = psample->next; while (nvsamples--) { struct rhc_sample * const sample1 = sample->next; - if ((qmask_of_sample (sample) & qminv) != 0) + if ((qmask_of_sample (sample) & qminv) != 0 || (qcmask != 0 && !(sample->conds & qcmask))) { + /* sample mask doesn't match, or content predicate doesn't match */ psample = sample; } else { + if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread)) + trigger_waitsets = true; + set_sample_info (info_seq + n, inst, sample); ddsi_serdata_to_sample (sample->sample, values[n], 0, 0); - if (cond == NULL - || (dds_entity_kind_from_handle(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY) - || ( cond->m_query.m_filter != NULL && cond->m_query.m_filter(values[n]))) + rhc->n_vsamples--; + if (sample->isread) { - rhc->n_vsamples--; - if (sample->isread) - { - inst->nvread--; - rhc->n_vread--; - } + inst->nvread--; + rhc->n_vread--; + } - if (--inst->nvsamples > 0) - { - if (inst->latest == sample) { - inst->latest = psample; - } - psample->next = sample1; - } - else - { - inst->latest = NULL; - } - - free_sample (inst, sample); - - if (++n == max_samples) - { - break; - } + if (--inst->nvsamples > 0) + { + if (inst->latest == sample) + inst->latest = psample; + psample->next = sample1; } else { - /* The filter didn't match, so free the deserialised copy. */ - ddsi_sertopic_free_sample (rhc->topic, values[n], DDS_FREE_CONTENTS); - psample = sample; + inst->latest = NULL; + } + + free_sample (inst, sample); + + if (++n == max_samples) + { + break; } } sample = sample1; } } - if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0) + if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask) != 0)) { + struct trigger_info_qcond dummy_trig_qc; +#ifndef NDEBUG + init_trigger_info_qcond (&dummy_trig_qc); +#endif + if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread)) + trigger_waitsets = true; set_sample_info_invsample (info_seq + n, inst); ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0); - inst_clear_invsample (rhc, inst); + inst_clear_invsample (rhc, inst, &dummy_trig_qc); ++n; } @@ -1944,11 +1995,13 @@ static int dds_rhc_take_w_qminv { /* if nsamples = 0, it won't match anything, so no need to do anything here for drop_instance_noupdate_no_writers */ - get_trigger_info (&post, inst, false); - if (update_conditions_locked (rhc, &pre, &post, NULL)) - { + get_trigger_info_cmn (&post.c, inst); + assert (trig_qc.dec_conds_invsample == 0); + assert (trig_qc.dec_conds_sample == 0); + assert (trig_qc.inc_conds_invsample == 0); + assert (trig_qc.inc_conds_sample == 0); + if (update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst)) trigger_waitsets = true; - } } if (inst_is_empty (inst)) @@ -1971,9 +2024,8 @@ static int dds_rhc_take_w_qminv } } - if (n > n_first) { - patch_generations (info_seq + n_first, n - n_first - 1); - } + if (n > n_first) + patch_generations (info_seq + n_first, n - n_first - 1); } if (iid == handle) { @@ -1988,19 +2040,13 @@ static int dds_rhc_take_w_qminv os_mutexUnlock (&rhc->lock); if (trigger_waitsets) - { - dds_entity_status_signal(&rhc->reader->m_entity); - } + dds_entity_status_signal(&rhc->reader->m_entity); assert (n <= INT_MAX); return (int)n; } -static int dds_rhc_takecdr_w_qminv -( - struct rhc *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, - uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond - ) +static int dds_rhc_takecdr_w_qminv (struct rhc *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond) { bool trigger_waitsets = false; uint64_t iid; @@ -2020,6 +2066,7 @@ static int dds_rhc_takecdr_w_qminv if (rhc->nonempty_instances) { + const querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? (querycond_mask_t)1 << cond->m_query.m_index : 0; struct rhc_instance *inst = rhc->nonempty_instances->next; unsigned n_insts = rhc->n_nonempty_instances; while (n_insts-- > 0 && n < max_samples) @@ -2030,25 +2077,31 @@ static int dds_rhc_takecdr_w_qminv { if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0) { - struct trigger_info pre, post; + struct trigger_info_pre pre; + struct trigger_info_post post; + struct trigger_info_qcond trig_qc; unsigned nvsamples = inst->nvsamples; const uint32_t n_first = n; - get_trigger_info (&pre, inst, true); + get_trigger_info_pre (&pre, inst); + init_trigger_info_qcond (&trig_qc); if (inst->latest) { - struct rhc_sample * psample = inst->latest; - struct rhc_sample * sample = psample->next; + struct rhc_sample *psample = inst->latest; + struct rhc_sample *sample = psample->next; while (nvsamples--) { struct rhc_sample * const sample1 = sample->next; - if ((qmask_of_sample (sample) & qminv) != 0) + if ((qmask_of_sample (sample) & qminv) != 0 || (qcmask && !(sample->conds & qcmask))) { psample = sample; } else { + if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread)) + trigger_waitsets = true; + set_sample_info (info_seq + n, inst, sample); values[n] = ddsi_serdata_ref(sample->sample); rhc->n_vsamples--; @@ -2057,16 +2110,13 @@ static int dds_rhc_takecdr_w_qminv inst->nvread--; rhc->n_vread--; } - free_sample (inst, sample); if (--inst->nvsamples > 0) - { psample->next = sample1; - } else - { inst->latest = NULL; - } + + free_sample (inst, sample); if (++n == max_samples) { @@ -2077,11 +2127,17 @@ static int dds_rhc_takecdr_w_qminv } } - if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0) + if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask) != 0)) { + struct trigger_info_qcond dummy_trig_qc; +#ifndef NDEBUG + init_trigger_info_qcond (&dummy_trig_qc); +#endif + if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread)) + trigger_waitsets = true; set_sample_info_invsample (info_seq + n, inst); values[n] = ddsi_serdata_ref(inst->tk->m_sample); - inst_clear_invsample (rhc, inst); + inst_clear_invsample (rhc, inst, &dummy_trig_qc); ++n; } @@ -2095,11 +2151,9 @@ static int dds_rhc_takecdr_w_qminv { /* if nsamples = 0, it won't match anything, so no need to do anything here for drop_instance_noupdate_no_writers */ - get_trigger_info (&post, inst, false); - if (update_conditions_locked (rhc, &pre, &post, NULL)) - { + get_trigger_info_cmn (&post.c, inst); + if (update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst)) trigger_waitsets = true; - } } if (inst_is_empty (inst)) @@ -2122,9 +2176,8 @@ static int dds_rhc_takecdr_w_qminv } } - if (n > n_first) { - patch_generations (info_seq + n_first, n - n_first - 1); - } + if (n > n_first) + patch_generations (info_seq + n_first, n - n_first - 1); } if (iid == handle) { @@ -2139,9 +2192,7 @@ static int dds_rhc_takecdr_w_qminv os_mutexUnlock (&rhc->lock); if (trigger_waitsets) - { - dds_entity_status_signal(&rhc->reader->m_entity); - } + dds_entity_status_signal (&rhc->reader->m_entity); assert (n <= INT_MAX); return (int)n; @@ -2153,6 +2204,7 @@ static int dds_rhc_takecdr_w_qminv static uint32_t rhc_get_cond_trigger (struct rhc_instance * const inst, const dds_readcond * const c) { + assert (!inst_is_empty (inst)); bool m = ((qmask_of_inst (inst) & c->m_qminv) == 0); switch (c->m_sample_states) { @@ -2163,7 +2215,7 @@ static uint32_t rhc_get_cond_trigger (struct rhc_instance * const inst, const dd m = m && inst_has_unread (inst); break; case DDS_SST_READ | DDS_SST_NOT_READ: - case 0: + case 0: /* note: we get here only if inst not empty, so this is a no-op */ m = m && !inst_is_empty (inst); break; @@ -2189,7 +2241,7 @@ static bool cond_is_sample_state_dependent (uint32_t sample_states) } } -void dds_rhc_add_readcondition (dds_readcond *cond) +bool dds_rhc_add_readcondition (dds_readcond *cond) { /* On the assumption that a readcondition will be attached to a waitset for nearly all of its life, we keep track of all @@ -2202,6 +2254,7 @@ void dds_rhc_add_readcondition (dds_readcond *cond) assert ((dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_READ && cond->m_query.m_filter == 0) || (dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_QUERY && cond->m_query.m_filter != 0)); + assert (cond->m_entity.m_trigger == 0); cond->m_qminv = qmask_from_dcpsquery (cond->m_sample_states, cond->m_view_states, cond->m_instance_states); @@ -2231,7 +2284,13 @@ void dds_rhc_add_readcondition (dds_readcond *cond) assert (rc->m_query.m_index < MAX_ATTACHED_QUERYCONDS); inv_qcmask &= ~((querycond_mask_t)1 << rc->m_query.m_index); } - if (inv_qcmask == 0) abort (); /* FIXME: must return an error instead of crashing */ + if (inv_qcmask == 0) + { + /* no available indices */ + os_mutexUnlock (&rhc->lock); + return false; + } + unsigned index = 0; while (!(inv_qcmask & ((querycond_mask_t)1 << index))) index++; @@ -2261,6 +2320,7 @@ void dds_rhc_add_readcondition (dds_readcond *cond) if (!inst_is_empty (inst) && rhc_get_cond_trigger (inst, cond)) trigger += (inst->inv_exists ? instmatch : 0) + matches; } + cond->m_entity.m_trigger = trigger; } if (cond->m_entity.m_trigger) @@ -2280,6 +2340,7 @@ void dds_rhc_add_readcondition (dds_readcond *cond) cond->m_instance_states, (void *) cond, cond->m_qminv, rhc->nconds); os_mutexUnlock (&rhc->lock); + return true; } void dds_rhc_remove_readcondition (dds_readcond *cond) @@ -2302,23 +2363,19 @@ void dds_rhc_remove_readcondition (dds_readcond *cond) os_mutexUnlock (&rhc->lock); } -static bool update_conditions_locked -( - struct rhc *rhc, const struct trigger_info *pre, - const struct trigger_info *post, - const struct ddsi_serdata *sample -) +static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst) { /* Pre: rhc->lock held; returns 1 if triggering required, else 0. */ bool trigger = false; dds_readcond * iter; - int m_pre; - int m_post; - char *tmp = NULL; + bool m_pre, m_post; - TRACE ("update_conditions_locked(%p) - inst %u nonempty %u disp %u nowr %u new %u samples %u read %u\n", - (void *) rhc, rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed, + TRACE ("update_conditions_locked(%p %p) - inst %u nonempty %u disp %u nowr %u new %u samples %u read %u\n", + (void *) rhc, (void *) inst, rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed, rhc->n_not_alive_no_writers, rhc->n_new, rhc->n_vsamples, rhc->n_vread); + TRACE (" read -[%d,%d]+[%d,%d] qcmask -[%x,%x]+[%x,%x]\n", + trig_qc->dec_invsample_read, trig_qc->dec_sample_read, trig_qc->inc_invsample_read, trig_qc->inc_sample_read, + trig_qc->dec_conds_invsample, trig_qc->dec_conds_sample, trig_qc->inc_conds_invsample, trig_qc->inc_conds_sample); assert (rhc->n_nonempty_instances >= rhc->n_not_alive_disposed + rhc->n_not_alive_no_writers); assert (rhc->n_nonempty_instances >= rhc->n_new); @@ -2327,76 +2384,169 @@ static bool update_conditions_locked iter = rhc->conds; while (iter) { - m_pre = ((pre->qminst & iter->m_qminv) == 0); - m_post = ((post->qminst & iter->m_qminv) == 0); + m_pre = ((pre->c.qminst & iter->m_qminv) == 0); + m_post = ((post->c.qminst & iter->m_qminv) == 0); /* FIXME: use bitmask? */ switch (iter->m_sample_states) { case DDS_SST_READ: - m_pre = m_pre && pre->has_read; - m_post = m_post && post->has_read; + m_pre = m_pre && pre->c.has_read; + m_post = m_post && post->c.has_read; break; case DDS_SST_NOT_READ: - m_pre = m_pre && pre->has_not_read; - m_post = m_post && post->has_not_read; + m_pre = m_pre && pre->c.has_not_read; + m_post = m_post && post->c.has_not_read; break; case DDS_SST_READ | DDS_SST_NOT_READ: case 0: - m_pre = m_pre && (pre->has_read + pre->has_not_read); - m_post = m_post && (post->has_read + post->has_not_read); + m_pre = m_pre && (pre->c.has_read + pre->c.has_not_read); + m_post = m_post && (post->c.has_read + post->c.has_not_read); break; default: - DDS_FATAL("update_readconditions: sample_states invalid: %x\n", iter->m_sample_states); + DDS_FATAL ("update_readconditions: sample_states invalid: %x\n", iter->m_sample_states); } - TRACE (" cond %p: ", (void *) iter); - if (m_pre == m_post) + TRACE (" cond %p %d: ", (void *) iter, iter->m_query.m_filter ? (int) iter->m_query.m_index : -1); + if (iter->m_query.m_filter == 0) { - TRACE ("no change"); - } - else if (m_pre < m_post) - { - if (sample && tmp == NULL && (dds_entity_kind_from_handle(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY)) - { - tmp = ddsi_sertopic_alloc_sample (rhc->topic); - ddsi_serdata_to_sample (sample, tmp, NULL, NULL); - } - if - ( - (sample == NULL) - || (dds_entity_kind_from_handle(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY) - || (iter->m_query.m_filter != NULL && iter->m_query.m_filter (tmp)) - ) + assert (dds_entity_kind (&iter->m_entity) == DDS_KIND_COND_READ); + if (m_pre == m_post) + TRACE ("no change"); + else if (m_pre < m_post) { TRACE ("now matches"); - if (iter->m_entity.m_trigger++ == 0) - { + trigger = (iter->m_entity.m_trigger++ == 0); + if (trigger) TRACE (" (cond now triggers)"); - trigger = true; - } + } + else + { + TRACE ("no longer matches"); + if (--iter->m_entity.m_trigger == 0) + TRACE (" (cond no longer triggers)"); } } else { - TRACE ("no longer matches"); - if (--iter->m_entity.m_trigger == 0) + assert (dds_entity_kind (&iter->m_entity) == DDS_KIND_COND_QUERY); + const querycond_mask_t qcmask = (querycond_mask_t)1 << iter->m_query.m_index; + int32_t minc = 0, mdec = 0; /* note: signed */ + + switch (iter->m_sample_states) { - TRACE (" (cond no longer triggers)"); + case DDS_SST_READ: + if (trig_qc->dec_invsample_read) + mdec += (trig_qc->dec_conds_invsample & qcmask) != 0; + if (trig_qc->dec_sample_read) + mdec += (trig_qc->dec_conds_sample & qcmask) != 0; + if (trig_qc->inc_invsample_read) + minc += (trig_qc->inc_conds_invsample & qcmask) != 0; + if (trig_qc->inc_sample_read) + minc += (trig_qc->inc_conds_sample & qcmask) != 0; + break; + case DDS_SST_NOT_READ: + if (!trig_qc->dec_invsample_read) + mdec += (trig_qc->dec_conds_invsample & qcmask) != 0; + if (!trig_qc->dec_sample_read) + mdec += (trig_qc->dec_conds_sample & qcmask) != 0; + if (!trig_qc->inc_invsample_read) + minc += (trig_qc->inc_conds_invsample & qcmask) != 0; + if (!trig_qc->inc_sample_read) + minc += (trig_qc->inc_conds_sample & qcmask) != 0; + break; + case DDS_SST_READ | DDS_SST_NOT_READ: + case 0: + mdec += (trig_qc->dec_conds_invsample & qcmask) != 0; + mdec += (trig_qc->dec_conds_sample & qcmask) != 0; + minc += (trig_qc->inc_conds_invsample & qcmask) != 0; + minc += (trig_qc->inc_conds_sample & qcmask) != 0; + break; + default: + DDS_FATAL ("update_readconditions: sample_states invalid: %x\n", iter->m_sample_states); + } + + if (m_pre == m_post) + { + /* no change at instance level, but samples pushed out or in may still affect it */ + if (m_pre) + { + /* there was a match at read-condition level + - therefore the matching samples in the instance are accounted for in the trigger count + - therefore an incremental update is required + there is always space for a valid and an invalid sample, both add and remove + inserting an update always has unread data added, but a read pretends it is a removal + of whatever and an insertion of read data */ + const int32_t m = minc - mdec; + assert (m >= 0 || iter->m_entity.m_trigger >= (uint32_t) -m); + if (m == 0) + TRACE ("no change @ %"PRIu32" (0)", iter->m_entity.m_trigger); + else + TRACE ("m=%"PRId32" @ %"PRIu32" (0)", m, iter->m_entity.m_trigger + (uint32_t) m); + /* even though it matches now and matched before, it is not a given that any of the samples + matched before, so m_trigger may still be 0 */ + if (m > 0 && iter->m_entity.m_trigger == 0) + trigger = true; + iter->m_entity.m_trigger += (uint32_t) m; + if (trigger) + TRACE (" (cond now triggers)"); + else if (m < 0 && iter->m_entity.m_trigger == 0) + TRACE (" (cond no longer triggers)"); + } + else + { + /* there was and is no match at the read-condition level and all is well */ + TRACE ("no change (1)"); + } + } + else + { + /* There either was no match at read-condition level, now there is: scan all samples for matches; + or there was a match and now there is not: so also scan all samples for matches. The only + difference is in whether the number of matches should be added or subtracted. */ + int32_t mrem = 0; + if (inst->inv_exists) + mrem += (qmask_of_invsample (inst) & iter->m_qminv) == 0 && (inst->conds & qcmask) != 0; + if (inst->latest) + { + struct rhc_sample *sample = inst->latest->next, * const end = sample; + do { + mrem += (qmask_of_sample (sample) & iter->m_qminv) == 0 && (sample->conds & qcmask) != 0; + sample = sample->next; + } while (sample != end); + } + if (mdec == 0 && minc == 0 && mrem == 0) + TRACE ("no change @ %"PRIu32" (2)", iter->m_entity.m_trigger); + else if (m_pre < m_post) + { + /* No match previously, so the instance wasn't accounted for at all in the trigger value, and therefore when inserting data, all that matters is how many currently match (including dec/inc would double-count); when reading or taking it is evaluated incrementally _before_ changing the state of the sample, so mrem reflects the state before the change, and the incremental change is required, too */ + const int32_t m = called_from_insert ? mrem : mrem - mdec + minc; + TRACE ("minc=%"PRId32" mdec=%"PRId32" mrem=%"PRId32" => %"PRId32" => %"PRIu32" (2a)", minc, mdec, mrem, m, iter->m_entity.m_trigger + (uint32_t) m); + assert (m >= 0 || iter->m_entity.m_trigger >= (uint32_t) -m); + trigger = (iter->m_entity.m_trigger == 0) && m > 0; + iter->m_entity.m_trigger += (uint32_t) m; + if (trigger) + TRACE (" (cond now triggers)"); + } + else + { + /* Previously matched, but no longer, which means we need to subtract the current number of matches as well as those that were removed just before, hence need the incremental change as well */ + const int32_t m = mrem + mdec - minc; + TRACE ("minc=%"PRId32" mdec=%"PRId32" mrem=%"PRId32" => %"PRId32" => %"PRIu32" (2b)", minc, mdec, mrem, m, iter->m_entity.m_trigger - (uint32_t) m); + assert (m < 0 || iter->m_entity.m_trigger >= (uint32_t) m); + iter->m_entity.m_trigger -= (uint32_t) m; + if (iter->m_entity.m_trigger == 0) + TRACE (" (cond no longer triggers)"); + } } } - if (iter->m_entity.m_trigger) { - dds_entity_status_signal(&(iter->m_entity)); - } + + if (iter->m_entity.m_trigger) + dds_entity_status_signal (&iter->m_entity); TRACE ("\n"); iter = iter->m_rhc_next; } - - if (tmp) - { - ddsi_sertopic_free_sample (rhc->topic, tmp, DDS_FREE_ALL); - } return trigger; } @@ -2575,10 +2725,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds) dds_readcond * rciter = rhc->conds; for (i = 0; i < (rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS); i++) { - if (dds_entity_kind_from_handle(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ) - { - assert (cond_match_count[i] == rciter->m_entity.m_trigger); - } + assert (cond_match_count[i] == rciter->m_entity.m_trigger); rciter = rciter->m_rhc_next; } }