set masks in samples/instances for query conditions

Whenever a sample or an instance is added, check it against the attached query conditions and indicate which ones match

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-02-10 16:13:40 +01:00
parent 796f6860c7
commit b21c7f032c

View file

@ -229,6 +229,7 @@ struct rhc_sample
struct ddsi_serdata *sample; /* serialised data (either just_key or real data) */
struct rhc_sample *next; /* next sample in time ordering, or oldest sample if most recent */
uint64_t wr_iid; /* unique id for writer of this sample (perhaps better in serdata) */
querycond_mask_t conds; /* matching query conditions */
bool isread; /* READ or NOT_READ sample state */
unsigned disposed_gen; /* snapshot of instance counter at time of insertion */
unsigned no_writers_gen; /* __/ */
@ -241,6 +242,7 @@ struct rhc_instance
struct rhc_sample *latest; /* latest received sample; circular list old->new; null if no sample */
unsigned nvsamples; /* number of "valid" samples in instance */
unsigned nvread; /* number of READ "valid" samples in instance (0 <= nvread <= nvsamples) */
querycond_mask_t conds; /* matching query conditions */
uint32_t wrcount; /* number of live writers */
unsigned isnew : 1; /* NEW or NOT_NEW view state */
unsigned a_sample_free : 1; /* whether or not a_sample is in use */
@ -303,6 +305,7 @@ struct rhc
dds_readcond * conds; /* List of associated read conditions */
uint32_t nconds; /* Number of associated read conditions */
uint32_t nqconds; /* Number of associated query conditions */
querycond_mask_t qconds_samplest; /* Mask of associated query conditions that check the sample state */
};
struct trigger_info
@ -448,7 +451,25 @@ void dds_rhc_set_qos (struct rhc * rhc, const nn_xqos_t * qos)
rhc->history_depth = (qos->history.kind == NN_KEEP_LAST_HISTORY_QOS) ? (uint32_t)qos->history.depth : ~0u;
}
static struct rhc_sample * alloc_sample (struct rhc_instance *inst)
static bool eval_predicate_allocs_buf (const struct ddsi_sertopic *sertopic, const struct ddsi_serdata *sample, bool (*pred) (const void *sample))
{
char *tmp = ddsi_sertopic_alloc_sample (sertopic);
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
bool ret = pred (tmp);
ddsi_sertopic_free_sample (sertopic, tmp, DDS_FREE_ALL);
return ret;
}
static bool eval_predicate_invsample_allocbuf (const struct ddsi_sertopic *sertopic, const struct rhc_instance *inst, bool (*pred) (const void *sample))
{
char *tmp = ddsi_sertopic_alloc_sample (sertopic);
ddsi_serdata_topicless_to_sample (sertopic, inst->tk->m_sample, tmp, NULL, NULL);
bool ret = pred (tmp);
ddsi_sertopic_free_sample (sertopic, tmp, DDS_FREE_ALL);
return ret;
}
static struct rhc_sample *alloc_sample (struct rhc_instance *inst)
{
if (inst->a_sample_free)
{
@ -687,8 +708,18 @@ static bool add_sample
s->isread = false;
s->disposed_gen = inst->disposed_gen;
s->no_writers_gen = inst->no_writers_gen;
inst->latest = s;
s->conds = 0;
for (dds_readcond *rc = rhc->conds; rc != NULL; rc = rc->m_rhc_next)
{
if (rc->m_query.m_filter != 0 && eval_predicate_allocs_buf (rhc->topic, s->sample, rc->m_query.m_filter))
{
assert (rc->m_query.m_index < MAX_ATTACHED_QUERYCONDS);
s->conds |= (querycond_mask_t)1 << rc->m_query.m_index;
}
}
inst->latest = s;
return true;
}
@ -1048,12 +1079,7 @@ static void dds_rhc_unregister
}
}
static struct rhc_instance * alloc_new_instance
(
const struct proxy_writer_info *pwr_info,
struct ddsi_serdata *serdata,
struct ddsi_tkmap_instance *tk
)
static struct rhc_instance *alloc_new_instance (const struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
struct rhc_instance *inst;
@ -1066,11 +1092,25 @@ static struct rhc_instance * alloc_new_instance
inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0;
inst->isnew = 1;
inst->a_sample_free = 1;
inst->conds = 0;
inst->wr_iid = pwr_info->iid;
inst->wr_iid_islive = (inst->wrcount != 0);
inst->wr_guid = pwr_info->guid;
inst->tstamp = serdata->timestamp;
inst->strength = pwr_info->ownership_strength;
for (dds_readcond *c = rhc->conds; c != NULL; c = c->m_rhc_next)
{
assert ((dds_entity_kind (&c->m_entity) == DDS_KIND_COND_READ && c->m_query.m_filter == 0) ||
(dds_entity_kind (&c->m_entity) == DDS_KIND_COND_QUERY && c->m_query.m_filter != 0));
/* FIXME: should perhaps have a single sample allocated in the RHC if there are attached query conditions, so that all the alloc/free nonsense can be skipped */
if (c->m_query.m_filter && eval_predicate_invsample_allocbuf (rhc->topic, inst, c->m_query.m_filter))
{
assert (c->m_query.m_index < MAX_ATTACHED_QUERYCONDS);
inst->conds |= (querycond_mask_t)1 << c->m_query.m_index;
}
}
return inst;
}
@ -1118,7 +1158,7 @@ static rhc_store_result_t rhc_store_new_instance
return RHC_REJECTED;
}
inst = alloc_new_instance (pwr_info, sample, tk);
inst = alloc_new_instance (rhc, pwr_info, sample, tk);
if (has_data)
{
if (!add_sample (rhc, inst, pwr_info, sample, cb_data))
@ -2133,6 +2173,22 @@ static uint32_t rhc_get_cond_trigger (struct rhc_instance * const inst, const dd
return m ? 1 : 0;
}
static bool cond_is_sample_state_dependent (uint32_t sample_states)
{
switch (sample_states)
{
case DDS_SST_READ:
case DDS_SST_NOT_READ:
return true;
case DDS_SST_READ | DDS_SST_NOT_READ:
case 0:
return false;
default:
DDS_FATAL("update_readconditions: sample_states invalid: %x\n", sample_states);
return false;
}
}
void dds_rhc_add_readcondition (dds_readcond *cond)
{
/* On the assumption that a readcondition will be attached to a
@ -2141,8 +2197,8 @@ void dds_rhc_add_readcondition (dds_readcond *cond)
between those attached to a waitset or not. */
struct rhc *rhc = cond->m_rhc;
struct ut_hhIter iter;
struct rhc_instance * inst;
struct ut_hhIter it;
querycond_mask_t qcmask = 0;
assert ((dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_READ && cond->m_query.m_filter == 0) ||
(dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_QUERY && cond->m_query.m_filter != 0));
@ -2150,20 +2206,20 @@ void dds_rhc_add_readcondition (dds_readcond *cond)
cond->m_qminv = qmask_from_dcpsquery (cond->m_sample_states, cond->m_view_states, cond->m_instance_states);
os_mutexLock (&rhc->lock);
for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter))
if (cond->m_query.m_filter == NULL)
{
if (dds_entity_kind_from_handle(cond->m_entity.m_hdl) == DDS_KIND_COND_READ)
/* Read condition is not cached inside the instances and samples, so it only needs to be evaluated on the non-empty instances */
if (rhc->nonempty_instances)
{
cond->m_entity.m_trigger += rhc_get_cond_trigger (inst, cond);
if (cond->m_entity.m_trigger) {
dds_entity_status_signal(&cond->m_entity);
}
struct rhc_instance *inst = rhc->nonempty_instances;
do {
cond->m_entity.m_trigger += rhc_get_cond_trigger (inst, cond);
inst = inst->next;
} while (inst != rhc->nonempty_instances);
}
}
cond->m_rhc_next = rhc->conds;
rhc->nconds++;
if (cond->m_query.m_filter)
else
{
/* Attaching a query condition means selecting an available index in the bitmasks of matching conditions, clearing the bit in all the masks & evaluating the condition on all samples. For the clearing and the setting of the instance mask, we need to scan all instances, not just the non-empty ones. */
dds_readcond *rc;
@ -2181,7 +2237,41 @@ void dds_rhc_add_readcondition (dds_readcond *cond)
index++;
cond->m_query.m_index = index;
/* Evaluate the condition for all instances. Only non-empty instances can match, but the bitmasks need clearing too, so scan all instances */
uint32_t trigger = 0;
qcmask = (querycond_mask_t)1 << index;
for (struct rhc_instance *inst = ut_hhIterFirst (rhc->instances, &it); inst != NULL; inst = ut_hhIterNext (&it))
{
const bool instmatch = eval_predicate_invsample_allocbuf (rhc->topic, inst, cond->m_query.m_filter);;
uint32_t matches = 0;
inst->conds = (inst->conds & ~qcmask) | (instmatch ? qcmask : 0);
if (inst->latest)
{
struct rhc_sample *sample = inst->latest->next, * const end = sample;
do {
const bool m = eval_predicate_allocs_buf (rhc->topic, sample->sample, cond->m_query.m_filter);
sample->conds = (sample->conds & ~qcmask) | (m ? qcmask : 0);
matches += m;
sample = sample->next;
} while (sample != end);
}
if (!inst_is_empty (inst) && rhc_get_cond_trigger (inst, cond))
trigger += (inst->inv_exists ? instmatch : 0) + matches;
}
}
if (cond->m_entity.m_trigger)
dds_entity_status_signal (&cond->m_entity);
cond->m_rhc_next = rhc->conds;
rhc->nconds++;
if (cond->m_query.m_filter)
{
rhc->nqconds++;
rhc->qconds_samplest |= cond_is_sample_state_dependent (cond->m_sample_states) ? qcmask : 0;
}
rhc->conds = cond;
@ -2204,7 +2294,9 @@ void dds_rhc_remove_readcondition (dds_readcond *cond)
rhc->nconds--;
if (cond->m_query.m_filter)
{
const querycond_mask_t qcmask = (querycond_mask_t)1 << cond->m_query.m_index;
rhc->nqconds--;
rhc->qconds_samplest &= ~(cond_is_sample_state_dependent (cond->m_sample_states) ? qcmask : 0);
cond->m_query.m_index = UINT_MAX;
}
os_mutexUnlock (&rhc->lock);
@ -2424,15 +2516,46 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds)
assert (a_sample_free == inst->a_sample_free);
{
dds_readcond * rciter = rhc->conds;
char *tmpsample = ddsi_sertopic_alloc_sample (rhc->topic);
dds_readcond *rciter = rhc->conds;
for (i = 0; i < (rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS); i++)
{
if (dds_entity_kind_from_handle(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ)
{
assert ((dds_entity_kind (&rciter->m_entity) == DDS_KIND_COND_READ && rciter->m_query.m_filter == 0) ||
(dds_entity_kind (&rciter->m_entity) == DDS_KIND_COND_QUERY && rciter->m_query.m_filter != 0));
if (rciter->m_query.m_filter == 0)
cond_match_count[i] += rhc_get_cond_trigger (inst, rciter);
else
{
assert (rciter->m_query.m_index < MAX_ATTACHED_QUERYCONDS);
if (rhc_get_cond_trigger (inst, rciter))
{
if (inst->latest)
{
struct rhc_sample *sample = inst->latest->next, * const end = sample;
do {
ddsi_serdata_to_sample (sample->sample, tmpsample, NULL, NULL);
const bool m = rciter->m_query.m_filter (tmpsample);
assert (m == ((sample->conds & ((querycond_mask_t)1 << rciter->m_query.m_index)) != 0));
cond_match_count[i] += (qmask_of_sample (sample) & rciter->m_qminv) == 0 && m;
sample = sample->next;
} while (sample != end);
}
if (inst->inv_exists)
{
/* Have to zero out the attributes before converting doing topicless_to_sample, or the
attributes may be those remaining from the previous sample, messing up the result of m_filter() */
ddsi_sertopic_free_sample (rhc->topic, tmpsample, DDS_FREE_CONTENTS);
ddsi_sertopic_zero_sample (rhc->topic, tmpsample);
ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, tmpsample, NULL, NULL);
const bool m = rciter->m_query.m_filter (tmpsample);
assert (m == ((inst->conds & ((querycond_mask_t)1 << rciter->m_query.m_index)) != 0));
cond_match_count[i] += (qmask_of_invsample (inst) & rciter->m_qminv) == 0 && m;
}
}
}
rciter = rciter->m_rhc_next;
}
ddsi_sertopic_free_sample (rhc->topic, tmpsample, DDS_FREE_ALL);
}
}
}