diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index 2c704d5..ec3f2ab 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -229,6 +229,7 @@ struct rhc_sample struct ddsi_serdata *sample; /* serialised data (either just_key or real data) */ struct rhc_sample *next; /* next sample in time ordering, or oldest sample if most recent */ uint64_t wr_iid; /* unique id for writer of this sample (perhaps better in serdata) */ + querycond_mask_t conds; /* matching query conditions */ bool isread; /* READ or NOT_READ sample state */ unsigned disposed_gen; /* snapshot of instance counter at time of insertion */ unsigned no_writers_gen; /* __/ */ @@ -241,6 +242,7 @@ struct rhc_instance struct rhc_sample *latest; /* latest received sample; circular list old->new; null if no sample */ unsigned nvsamples; /* number of "valid" samples in instance */ unsigned nvread; /* number of READ "valid" samples in instance (0 <= nvread <= nvsamples) */ + querycond_mask_t conds; /* matching query conditions */ uint32_t wrcount; /* number of live writers */ unsigned isnew : 1; /* NEW or NOT_NEW view state */ unsigned a_sample_free : 1; /* whether or not a_sample is in use */ @@ -303,6 +305,7 @@ struct rhc dds_readcond * conds; /* List of associated read conditions */ uint32_t nconds; /* Number of associated read conditions */ uint32_t nqconds; /* Number of associated query conditions */ + querycond_mask_t qconds_samplest; /* Mask of associated query conditions that check the sample state */ }; struct trigger_info @@ -448,7 +451,25 @@ void dds_rhc_set_qos (struct rhc * rhc, const nn_xqos_t * qos) rhc->history_depth = (qos->history.kind == NN_KEEP_LAST_HISTORY_QOS) ? (uint32_t)qos->history.depth : ~0u; } -static struct rhc_sample * alloc_sample (struct rhc_instance *inst) +static bool eval_predicate_allocs_buf (const struct ddsi_sertopic *sertopic, const struct ddsi_serdata *sample, bool (*pred) (const void *sample)) +{ + char *tmp = ddsi_sertopic_alloc_sample (sertopic); + ddsi_serdata_to_sample (sample, tmp, NULL, NULL); + bool ret = pred (tmp); + ddsi_sertopic_free_sample (sertopic, tmp, DDS_FREE_ALL); + return ret; +} + +static bool eval_predicate_invsample_allocbuf (const struct ddsi_sertopic *sertopic, const struct rhc_instance *inst, bool (*pred) (const void *sample)) +{ + char *tmp = ddsi_sertopic_alloc_sample (sertopic); + ddsi_serdata_topicless_to_sample (sertopic, inst->tk->m_sample, tmp, NULL, NULL); + bool ret = pred (tmp); + ddsi_sertopic_free_sample (sertopic, tmp, DDS_FREE_ALL); + return ret; +} + +static struct rhc_sample *alloc_sample (struct rhc_instance *inst) { if (inst->a_sample_free) { @@ -687,8 +708,18 @@ static bool add_sample s->isread = false; s->disposed_gen = inst->disposed_gen; s->no_writers_gen = inst->no_writers_gen; - inst->latest = s; + s->conds = 0; + for (dds_readcond *rc = rhc->conds; rc != NULL; rc = rc->m_rhc_next) + { + if (rc->m_query.m_filter != 0 && eval_predicate_allocs_buf (rhc->topic, s->sample, rc->m_query.m_filter)) + { + assert (rc->m_query.m_index < MAX_ATTACHED_QUERYCONDS); + s->conds |= (querycond_mask_t)1 << rc->m_query.m_index; + } + } + + inst->latest = s; return true; } @@ -1048,12 +1079,7 @@ static void dds_rhc_unregister } } -static struct rhc_instance * alloc_new_instance -( - const struct proxy_writer_info *pwr_info, - struct ddsi_serdata *serdata, - struct ddsi_tkmap_instance *tk -) +static struct rhc_instance *alloc_new_instance (const struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) { struct rhc_instance *inst; @@ -1066,11 +1092,25 @@ static struct rhc_instance * alloc_new_instance inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0; inst->isnew = 1; inst->a_sample_free = 1; + inst->conds = 0; inst->wr_iid = pwr_info->iid; inst->wr_iid_islive = (inst->wrcount != 0); inst->wr_guid = pwr_info->guid; inst->tstamp = serdata->timestamp; inst->strength = pwr_info->ownership_strength; + + for (dds_readcond *c = rhc->conds; c != NULL; c = c->m_rhc_next) + { + assert ((dds_entity_kind (&c->m_entity) == DDS_KIND_COND_READ && c->m_query.m_filter == 0) || + (dds_entity_kind (&c->m_entity) == DDS_KIND_COND_QUERY && c->m_query.m_filter != 0)); + /* FIXME: should perhaps have a single sample allocated in the RHC if there are attached query conditions, so that all the alloc/free nonsense can be skipped */ + if (c->m_query.m_filter && eval_predicate_invsample_allocbuf (rhc->topic, inst, c->m_query.m_filter)) + { + assert (c->m_query.m_index < MAX_ATTACHED_QUERYCONDS); + inst->conds |= (querycond_mask_t)1 << c->m_query.m_index; + } + } + return inst; } @@ -1118,7 +1158,7 @@ static rhc_store_result_t rhc_store_new_instance return RHC_REJECTED; } - inst = alloc_new_instance (pwr_info, sample, tk); + inst = alloc_new_instance (rhc, pwr_info, sample, tk); if (has_data) { if (!add_sample (rhc, inst, pwr_info, sample, cb_data)) @@ -2133,6 +2173,22 @@ static uint32_t rhc_get_cond_trigger (struct rhc_instance * const inst, const dd return m ? 1 : 0; } +static bool cond_is_sample_state_dependent (uint32_t sample_states) +{ + switch (sample_states) + { + case DDS_SST_READ: + case DDS_SST_NOT_READ: + return true; + case DDS_SST_READ | DDS_SST_NOT_READ: + case 0: + return false; + default: + DDS_FATAL("update_readconditions: sample_states invalid: %x\n", sample_states); + return false; + } +} + void dds_rhc_add_readcondition (dds_readcond *cond) { /* On the assumption that a readcondition will be attached to a @@ -2141,8 +2197,8 @@ void dds_rhc_add_readcondition (dds_readcond *cond) between those attached to a waitset or not. */ struct rhc *rhc = cond->m_rhc; - struct ut_hhIter iter; - struct rhc_instance * inst; + struct ut_hhIter it; + querycond_mask_t qcmask = 0; assert ((dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_READ && cond->m_query.m_filter == 0) || (dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_QUERY && cond->m_query.m_filter != 0)); @@ -2150,20 +2206,20 @@ void dds_rhc_add_readcondition (dds_readcond *cond) cond->m_qminv = qmask_from_dcpsquery (cond->m_sample_states, cond->m_view_states, cond->m_instance_states); os_mutexLock (&rhc->lock); - for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter)) + + if (cond->m_query.m_filter == NULL) { - if (dds_entity_kind_from_handle(cond->m_entity.m_hdl) == DDS_KIND_COND_READ) + /* Read condition is not cached inside the instances and samples, so it only needs to be evaluated on the non-empty instances */ + if (rhc->nonempty_instances) { - cond->m_entity.m_trigger += rhc_get_cond_trigger (inst, cond); - if (cond->m_entity.m_trigger) { - dds_entity_status_signal(&cond->m_entity); - } + struct rhc_instance *inst = rhc->nonempty_instances; + do { + cond->m_entity.m_trigger += rhc_get_cond_trigger (inst, cond); + inst = inst->next; + } while (inst != rhc->nonempty_instances); } } - - cond->m_rhc_next = rhc->conds; - rhc->nconds++; - if (cond->m_query.m_filter) + else { /* Attaching a query condition means selecting an available index in the bitmasks of matching conditions, clearing the bit in all the masks & evaluating the condition on all samples. For the clearing and the setting of the instance mask, we need to scan all instances, not just the non-empty ones. */ dds_readcond *rc; @@ -2181,7 +2237,41 @@ void dds_rhc_add_readcondition (dds_readcond *cond) index++; cond->m_query.m_index = index; + /* Evaluate the condition for all instances. Only non-empty instances can match, but the bitmasks need clearing too, so scan all instances */ + uint32_t trigger = 0; + qcmask = (querycond_mask_t)1 << index; + for (struct rhc_instance *inst = ut_hhIterFirst (rhc->instances, &it); inst != NULL; inst = ut_hhIterNext (&it)) + { + const bool instmatch = eval_predicate_invsample_allocbuf (rhc->topic, inst, cond->m_query.m_filter);; + uint32_t matches = 0; + + inst->conds = (inst->conds & ~qcmask) | (instmatch ? qcmask : 0); + + if (inst->latest) + { + struct rhc_sample *sample = inst->latest->next, * const end = sample; + do { + const bool m = eval_predicate_allocs_buf (rhc->topic, sample->sample, cond->m_query.m_filter); + sample->conds = (sample->conds & ~qcmask) | (m ? qcmask : 0); + matches += m; + sample = sample->next; + } while (sample != end); + } + + if (!inst_is_empty (inst) && rhc_get_cond_trigger (inst, cond)) + trigger += (inst->inv_exists ? instmatch : 0) + matches; + } + } + + if (cond->m_entity.m_trigger) + dds_entity_status_signal (&cond->m_entity); + + cond->m_rhc_next = rhc->conds; + rhc->nconds++; + if (cond->m_query.m_filter) + { rhc->nqconds++; + rhc->qconds_samplest |= cond_is_sample_state_dependent (cond->m_sample_states) ? qcmask : 0; } rhc->conds = cond; @@ -2204,7 +2294,9 @@ void dds_rhc_remove_readcondition (dds_readcond *cond) rhc->nconds--; if (cond->m_query.m_filter) { + const querycond_mask_t qcmask = (querycond_mask_t)1 << cond->m_query.m_index; rhc->nqconds--; + rhc->qconds_samplest &= ~(cond_is_sample_state_dependent (cond->m_sample_states) ? qcmask : 0); cond->m_query.m_index = UINT_MAX; } os_mutexUnlock (&rhc->lock); @@ -2424,15 +2516,46 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds) assert (a_sample_free == inst->a_sample_free); { - dds_readcond * rciter = rhc->conds; + char *tmpsample = ddsi_sertopic_alloc_sample (rhc->topic); + dds_readcond *rciter = rhc->conds; for (i = 0; i < (rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS); i++) { - if (dds_entity_kind_from_handle(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ) - { + assert ((dds_entity_kind (&rciter->m_entity) == DDS_KIND_COND_READ && rciter->m_query.m_filter == 0) || + (dds_entity_kind (&rciter->m_entity) == DDS_KIND_COND_QUERY && rciter->m_query.m_filter != 0)); + if (rciter->m_query.m_filter == 0) cond_match_count[i] += rhc_get_cond_trigger (inst, rciter); + else + { + assert (rciter->m_query.m_index < MAX_ATTACHED_QUERYCONDS); + if (rhc_get_cond_trigger (inst, rciter)) + { + if (inst->latest) + { + struct rhc_sample *sample = inst->latest->next, * const end = sample; + do { + ddsi_serdata_to_sample (sample->sample, tmpsample, NULL, NULL); + const bool m = rciter->m_query.m_filter (tmpsample); + assert (m == ((sample->conds & ((querycond_mask_t)1 << rciter->m_query.m_index)) != 0)); + cond_match_count[i] += (qmask_of_sample (sample) & rciter->m_qminv) == 0 && m; + sample = sample->next; + } while (sample != end); + } + if (inst->inv_exists) + { + /* Have to zero out the attributes before converting doing topicless_to_sample, or the + attributes may be those remaining from the previous sample, messing up the result of m_filter() */ + ddsi_sertopic_free_sample (rhc->topic, tmpsample, DDS_FREE_CONTENTS); + ddsi_sertopic_zero_sample (rhc->topic, tmpsample); + ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, tmpsample, NULL, NULL); + const bool m = rciter->m_query.m_filter (tmpsample); + assert (m == ((inst->conds & ((querycond_mask_t)1 << rciter->m_query.m_index)) != 0)); + cond_match_count[i] += (qmask_of_invsample (inst) & rciter->m_qminv) == 0 && m; + } + } } rciter = rciter->m_rhc_next; } + ddsi_sertopic_free_sample (rhc->topic, tmpsample, DDS_FREE_ALL); } } }