Do not scan instances in dds_{read,take}_instance
Scanning all instances was never good for anything: the RHC is organised as hash table on instance id (which is an alias for "instance handle") and it was always designed to do this with a fast lookup. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
5f829684ef
commit
9aef05542f
3 changed files with 320 additions and 425 deletions
|
@ -27,9 +27,8 @@ struct dds_reader;
|
|||
struct ddsi_tkmap;
|
||||
|
||||
typedef dds_return_t (*dds_rhc_associate_t) (struct dds_rhc *rhc, struct dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap);
|
||||
typedef int (*dds_rhc_read_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
|
||||
typedef int (*dds_rhc_take_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
|
||||
typedef int (*dds_rhc_takecdr_t) (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
|
||||
typedef int32_t (*dds_rhc_read_take_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
|
||||
typedef int32_t (*dds_rhc_read_take_cdr_t) (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
|
||||
|
||||
typedef bool (*dds_rhc_add_readcondition_t) (struct dds_rhc *rhc, struct dds_readcond *cond);
|
||||
typedef void (*dds_rhc_remove_readcondition_t) (struct dds_rhc *rhc, struct dds_readcond *cond);
|
||||
|
@ -40,9 +39,9 @@ struct dds_rhc_ops {
|
|||
/* A copy of DDSI rhc ops comes first so we can use either interface without
|
||||
additional indirections */
|
||||
struct ddsi_rhc_ops rhc_ops;
|
||||
dds_rhc_read_t read;
|
||||
dds_rhc_take_t take;
|
||||
dds_rhc_takecdr_t takecdr;
|
||||
dds_rhc_read_take_t read;
|
||||
dds_rhc_read_take_t take;
|
||||
dds_rhc_read_take_cdr_t takecdr;
|
||||
dds_rhc_add_readcondition_t add_readcondition;
|
||||
dds_rhc_remove_readcondition_t remove_readcondition;
|
||||
dds_rhc_lock_samples_t lock_samples;
|
||||
|
@ -76,13 +75,13 @@ DDS_EXPORT inline void dds_rhc_set_qos (struct dds_rhc *rhc, const struct dds_qo
|
|||
DDS_EXPORT inline void dds_rhc_free (struct dds_rhc *rhc) {
|
||||
rhc->common.ops->rhc_ops.free (&rhc->common.rhc);
|
||||
}
|
||||
DDS_EXPORT inline int dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) {
|
||||
DDS_EXPORT inline int32_t dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) {
|
||||
return (rhc->common.ops->read) (rhc, lock, values, info_seq, max_samples, mask, handle, cond);
|
||||
}
|
||||
DDS_EXPORT inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) {
|
||||
DDS_EXPORT inline int32_t dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) {
|
||||
return rhc->common.ops->take (rhc, lock, values, info_seq, max_samples, mask, handle, cond);
|
||||
}
|
||||
DDS_EXPORT inline int dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
|
||||
DDS_EXPORT inline int32_t dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
|
||||
return rhc->common.ops->takecdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
|
||||
}
|
||||
DDS_EXPORT inline bool dds_rhc_add_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond) {
|
||||
|
|
|
@ -40,7 +40,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
|
|||
#define NC_FREE_BUF 2u
|
||||
#define NC_RESET_BUF 4u
|
||||
|
||||
if (buf == NULL || si == NULL || maxs == 0 || bufsz == 0 || bufsz < maxs)
|
||||
if (buf == NULL || si == NULL || maxs == 0 || bufsz == 0 || bufsz < maxs || maxs > INT32_MAX)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) {
|
||||
|
@ -61,14 +61,6 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
|
|||
|
||||
thread_state_awake (ts1, &entity->m_domain->gv);
|
||||
|
||||
if (hand != DDS_HANDLE_NIL)
|
||||
{
|
||||
if (ddsi_tkmap_find_by_id (entity->m_domain->gv.m_tkmap, hand) == NULL) {
|
||||
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
goto fail_awake_pinned;
|
||||
}
|
||||
}
|
||||
|
||||
/* Allocate samples if not provided (assuming all or none provided) */
|
||||
if (buf[0] == NULL)
|
||||
{
|
||||
|
@ -142,8 +134,6 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
|
|||
#undef NC_FREE_BUF
|
||||
#undef NC_RESET_BUF
|
||||
|
||||
fail_awake_pinned:
|
||||
thread_state_asleep (ts1);
|
||||
fail_pinned:
|
||||
dds_entity_unpin (entity);
|
||||
fail:
|
||||
|
@ -157,12 +147,8 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
|
|||
struct dds_reader *rd;
|
||||
struct dds_entity *entity;
|
||||
|
||||
assert (take);
|
||||
assert (buf);
|
||||
assert (si);
|
||||
assert (hand == DDS_HANDLE_NIL);
|
||||
assert (maxs > 0);
|
||||
(void)take;
|
||||
if (buf == NULL || si == NULL || maxs == 0 || maxs > INT32_MAX)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) {
|
||||
return ret;
|
||||
|
|
|
@ -337,7 +337,7 @@ struct dds_rhc_default {
|
|||
};
|
||||
|
||||
struct trigger_info_cmn {
|
||||
unsigned qminst;
|
||||
uint32_t qminst;
|
||||
bool has_read;
|
||||
bool has_not_read;
|
||||
};
|
||||
|
@ -368,9 +368,9 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
|
|||
static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo);
|
||||
static void dds_rhc_default_relinquish_ownership (struct dds_rhc_default * __restrict rhc, const uint64_t wr_iid);
|
||||
static void dds_rhc_default_set_qos (struct dds_rhc_default *rhc, const struct dds_qos *qos);
|
||||
static int dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
static int dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
static int dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
|
||||
static int32_t dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
static int32_t dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
|
||||
static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond);
|
||||
static void dds_rhc_default_remove_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond);
|
||||
static uint32_t dds_rhc_default_lock_samples (struct dds_rhc_default *rhc);
|
||||
|
@ -390,13 +390,13 @@ static void dds_rhc_default_relinquish_ownership_wrap (struct ddsi_rhc * __restr
|
|||
static void dds_rhc_default_set_qos_wrap (struct ddsi_rhc *rhc, const struct dds_qos *qos) {
|
||||
dds_rhc_default_set_qos ((struct dds_rhc_default *) rhc, qos);
|
||||
}
|
||||
static int dds_rhc_default_read_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) {
|
||||
static int32_t dds_rhc_default_read_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) {
|
||||
return dds_rhc_default_read ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, mask, handle, cond);
|
||||
}
|
||||
static int dds_rhc_default_take_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) {
|
||||
static int32_t dds_rhc_default_take_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) {
|
||||
return dds_rhc_default_take ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, mask, handle, cond);
|
||||
}
|
||||
static int dds_rhc_default_takecdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
|
||||
static int32_t dds_rhc_default_takecdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
|
||||
return dds_rhc_default_takecdr ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
|
||||
}
|
||||
static bool dds_rhc_default_add_readcondition_wrap (struct dds_rhc *rhc, dds_readcond *cond) {
|
||||
|
@ -432,12 +432,12 @@ static const struct dds_rhc_ops dds_rhc_default_ops = {
|
|||
.associate = dds_rhc_default_associate
|
||||
};
|
||||
|
||||
static unsigned qmask_of_sample (const struct rhc_sample *s)
|
||||
static uint32_t qmask_of_sample (const struct rhc_sample *s)
|
||||
{
|
||||
return s->isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE;
|
||||
}
|
||||
|
||||
static unsigned qmask_of_invsample (const struct rhc_instance *i)
|
||||
static uint32_t qmask_of_invsample (const struct rhc_instance *i)
|
||||
{
|
||||
return i->inv_isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE;
|
||||
}
|
||||
|
@ -467,17 +467,17 @@ static bool inst_has_unread (const struct rhc_instance *i)
|
|||
return inst_nread (i) < inst_nsamples (i);
|
||||
}
|
||||
|
||||
static void topicless_to_clean_invsample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim)
|
||||
static bool topicless_to_clean_invsample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim)
|
||||
{
|
||||
/* ddsi_serdata_topicless_to_sample just deals with the key value, without paying any attention to attributes;
|
||||
but that makes life harder for the user: the attributes of an invalid sample would be garbage, but would
|
||||
nonetheless have to be freed in the end. Zero'ing it explicitly solves that problem. */
|
||||
ddsi_sertopic_free_sample (topic, sample, DDS_FREE_CONTENTS);
|
||||
ddsi_sertopic_zero_sample (topic, sample);
|
||||
ddsi_serdata_topicless_to_sample (topic, d, sample, bufptr, buflim);
|
||||
return ddsi_serdata_topicless_to_sample (topic, d, sample, bufptr, buflim);
|
||||
}
|
||||
|
||||
static unsigned qmask_of_inst (const struct rhc_instance *inst);
|
||||
static uint32_t qmask_of_inst (const struct rhc_instance *inst);
|
||||
static void free_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct rhc_sample *s);
|
||||
static void get_trigger_info_cmn (struct trigger_info_cmn *info, struct rhc_instance *inst);
|
||||
static void get_trigger_info_pre (struct trigger_info_pre *info, struct rhc_instance *inst);
|
||||
|
@ -1462,7 +1462,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
|
|||
static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk)
|
||||
{
|
||||
const uint64_t wr_iid = wrinfo->iid;
|
||||
const unsigned statusinfo = sample->statusinfo;
|
||||
const uint32_t statusinfo = sample->statusinfo;
|
||||
const bool has_data = (sample->kind == SDK_DATA);
|
||||
const int is_dispose = (statusinfo & NN_STATUSINFO_DISPOSE) != 0;
|
||||
struct rhc_instance dummy_instance;
|
||||
|
@ -1845,9 +1845,9 @@ static void dds_rhc_default_relinquish_ownership (struct dds_rhc_default * __res
|
|||
instance: ANY, ALIVE, NOT_ALIVE, NOT_ALIVE_NO_WRITERS, NOT_ALIVE_DISPOSED
|
||||
*/
|
||||
|
||||
static unsigned qmask_of_inst (const struct rhc_instance *inst)
|
||||
static uint32_t qmask_of_inst (const struct rhc_instance *inst)
|
||||
{
|
||||
unsigned qm = inst->isnew ? DDS_NEW_VIEW_STATE : DDS_NOT_NEW_VIEW_STATE;
|
||||
uint32_t qm = inst->isnew ? DDS_NEW_VIEW_STATE : DDS_NOT_NEW_VIEW_STATE;
|
||||
|
||||
if (inst->isdisposed)
|
||||
qm |= DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE;
|
||||
|
@ -1905,9 +1905,9 @@ static uint32_t qmask_from_dcpsquery (uint32_t sample_states, uint32_t view_stat
|
|||
return qminv;
|
||||
}
|
||||
|
||||
static unsigned qmask_from_mask_n_cond (uint32_t mask, dds_readcond* cond)
|
||||
static uint32_t qmask_from_mask_n_cond (uint32_t mask, dds_readcond* cond)
|
||||
{
|
||||
unsigned qminv;
|
||||
uint32_t qminv;
|
||||
if (mask == NO_STATE_MASK_SET) {
|
||||
if (cond) {
|
||||
/* No mask set, use the one from the condition. */
|
||||
|
@ -1962,12 +1962,11 @@ static void patch_generations (dds_sample_info_t *si, uint32_t last_of_inst)
|
|||
{
|
||||
if (last_of_inst > 0)
|
||||
{
|
||||
const unsigned ref =
|
||||
const uint32_t ref =
|
||||
si[last_of_inst].disposed_generation_count + si[last_of_inst].no_writers_generation_count;
|
||||
uint32_t i;
|
||||
assert (si[last_of_inst].sample_rank == 0);
|
||||
assert (si[last_of_inst].generation_rank == 0);
|
||||
for (i = 0; i < last_of_inst; i++)
|
||||
for (uint32_t i = 0; i < last_of_inst; i++)
|
||||
{
|
||||
si[i].sample_rank = last_of_inst - i;
|
||||
si[i].generation_rank = ref - (si[i].disposed_generation_count + si[i].no_writers_generation_count);
|
||||
|
@ -2014,418 +2013,329 @@ static bool take_sample_update_conditions (struct dds_rhc_default *rhc, struct t
|
|||
return false;
|
||||
}
|
||||
|
||||
static int dds_rhc_read_w_qminv (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
uint32_t n = 0;
|
||||
typedef bool (*read_take_to_sample_t) (const struct ddsi_serdata * __restrict d, void *__restrict *__restrict sample, void * __restrict * __restrict bufptr, void * __restrict buflim);
|
||||
typedef bool (*read_take_to_invsample_t) (const struct ddsi_sertopic * __restrict topic, const struct ddsi_serdata * __restrict d, void *__restrict * __restrict sample, void * __restrict * __restrict bufptr, void * __restrict buflim);
|
||||
|
||||
static bool read_take_to_sample (const struct ddsi_serdata * __restrict d, void * __restrict * __restrict sample, void * __restrict * __restrict bufptr, void * __restrict buflim)
|
||||
{
|
||||
return ddsi_serdata_to_sample (d, *sample, (void **) bufptr, buflim);
|
||||
}
|
||||
|
||||
static bool read_take_to_invsample (const struct ddsi_sertopic * __restrict topic, const struct ddsi_serdata * __restrict d, void * __restrict * __restrict sample, void * __restrict * __restrict bufptr, void * __restrict buflim)
|
||||
{
|
||||
return topicless_to_clean_invsample (topic, d, *sample, (void **) bufptr, buflim);
|
||||
}
|
||||
|
||||
static bool read_take_to_sample_ref (const struct ddsi_serdata * __restrict d, void * __restrict * __restrict sample, void * __restrict * __restrict bufptr, void * __restrict buflim)
|
||||
{
|
||||
(void) bufptr; (void) buflim;
|
||||
*sample = ddsi_serdata_ref (d);
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool read_take_to_invsample_ref (const struct ddsi_sertopic * __restrict topic, const struct ddsi_serdata * __restrict d, void * __restrict * __restrict sample, void * __restrict * __restrict bufptr, void * __restrict buflim)
|
||||
{
|
||||
(void) topic; (void) bufptr; (void) buflim;
|
||||
*sample = ddsi_serdata_ref (d);
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t read_w_qminv_inst (struct dds_rhc_default * const __restrict rhc, struct rhc_instance * const __restrict inst, void * __restrict * __restrict values, dds_sample_info_t * __restrict info_seq, const int32_t max_samples, const uint32_t qminv, const dds_querycond_mask_t qcmask, read_take_to_sample_t to_sample, read_take_to_invsample_t to_invsample)
|
||||
{
|
||||
assert (max_samples > 0);
|
||||
if (inst_is_empty (inst) || (qmask_of_inst (inst) & qminv) != 0)
|
||||
{
|
||||
/* no samples present, or the instance/view state doesn't match */
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct trigger_info_pre pre;
|
||||
struct trigger_info_post post;
|
||||
struct trigger_info_qcond trig_qc;
|
||||
const uint32_t nread = inst_nread (inst);
|
||||
int32_t n = 0;
|
||||
get_trigger_info_pre (&pre, inst);
|
||||
init_trigger_info_qcond (&trig_qc);
|
||||
|
||||
/* any valid samples precede a possible invalid sample */
|
||||
if (inst->latest)
|
||||
{
|
||||
struct rhc_sample *sample = inst->latest->next, * const end1 = sample;
|
||||
do {
|
||||
if ((qmask_of_sample (sample) & qminv) == 0 && (qcmask == 0 || (sample->conds & qcmask)))
|
||||
{
|
||||
/* sample state matches too */
|
||||
set_sample_info (info_seq + n, inst, sample);
|
||||
to_sample (sample->sample, values + n, 0, 0);
|
||||
if (!sample->isread)
|
||||
{
|
||||
TRACE ("s");
|
||||
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, false);
|
||||
sample->isread = true;
|
||||
inst->nvread++;
|
||||
rhc->n_vread++;
|
||||
}
|
||||
++n;
|
||||
}
|
||||
sample = sample->next;
|
||||
} while (n < max_samples && sample != end1);
|
||||
}
|
||||
|
||||
/* add an invalid sample if it exists, matches and there is room in the result */
|
||||
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask)))
|
||||
{
|
||||
set_sample_info_invsample (info_seq + n, inst);
|
||||
to_invsample (rhc->topic, inst->tk->m_sample, values + n, 0, 0);
|
||||
if (!inst->inv_isread)
|
||||
{
|
||||
TRACE ("i");
|
||||
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, false);
|
||||
inst->inv_isread = 1;
|
||||
rhc->n_invread++;
|
||||
}
|
||||
++n;
|
||||
}
|
||||
|
||||
/* set generation counts in sample info now that we can compute them; update instance state */
|
||||
bool inst_became_old = false;
|
||||
if (n > 0)
|
||||
{
|
||||
patch_generations (info_seq, (uint32_t) n - 1);
|
||||
if (inst->isnew)
|
||||
{
|
||||
inst_became_old = true;
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
}
|
||||
if (nread != inst_nread (inst) || inst_became_old)
|
||||
{
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
get_trigger_info_cmn (&post.c, inst);
|
||||
assert (trig_qc.dec_conds_invsample == 0);
|
||||
assert (trig_qc.dec_conds_sample == 0);
|
||||
assert (trig_qc.inc_conds_invsample == 0);
|
||||
assert (trig_qc.inc_conds_sample == 0);
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t take_w_qminv_inst (struct dds_rhc_default * const __restrict rhc, struct rhc_instance * const __restrict inst, void * __restrict * __restrict values, dds_sample_info_t * __restrict info_seq, const int32_t max_samples, const uint32_t qminv, const dds_querycond_mask_t qcmask, size_t * __restrict ntriggers, read_take_to_sample_t to_sample, read_take_to_invsample_t to_invsample)
|
||||
{
|
||||
assert (max_samples > 0);
|
||||
if (inst_is_empty (inst) || (qmask_of_inst (inst) & qminv) != 0)
|
||||
{
|
||||
/* no samples present, or the instance/view state doesn't match */
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct trigger_info_pre pre;
|
||||
struct trigger_info_post post;
|
||||
struct trigger_info_qcond trig_qc;
|
||||
int32_t n = 0;
|
||||
get_trigger_info_pre (&pre, inst);
|
||||
init_trigger_info_qcond (&trig_qc);
|
||||
|
||||
if (inst->latest)
|
||||
{
|
||||
struct rhc_sample *psample = inst->latest;
|
||||
struct rhc_sample *sample = psample->next;
|
||||
uint32_t nvsamples = inst->nvsamples;
|
||||
while (nvsamples--)
|
||||
{
|
||||
struct rhc_sample * const sample1 = sample->next;
|
||||
if ((qmask_of_sample (sample) & qminv) != 0 || (qcmask != 0 && !(sample->conds & qcmask)))
|
||||
{
|
||||
/* sample mask doesn't match, or content predicate doesn't match */
|
||||
psample = sample;
|
||||
}
|
||||
else
|
||||
{
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread);
|
||||
set_sample_info (info_seq + n, inst, sample);
|
||||
to_sample (sample->sample, values + n, 0, 0);
|
||||
rhc->n_vsamples--;
|
||||
if (sample->isread)
|
||||
{
|
||||
inst->nvread--;
|
||||
rhc->n_vread--;
|
||||
}
|
||||
if (--inst->nvsamples == 0)
|
||||
inst->latest = NULL;
|
||||
else
|
||||
{
|
||||
if (inst->latest == sample)
|
||||
inst->latest = psample;
|
||||
psample->next = sample1;
|
||||
}
|
||||
free_sample (rhc, inst, sample);
|
||||
if (++n == max_samples)
|
||||
break;
|
||||
}
|
||||
sample = sample1;
|
||||
}
|
||||
}
|
||||
|
||||
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask) != 0))
|
||||
{
|
||||
struct trigger_info_qcond dummy_trig_qc;
|
||||
#ifndef NDEBUG
|
||||
init_trigger_info_qcond (&dummy_trig_qc);
|
||||
#endif
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread);
|
||||
set_sample_info_invsample (info_seq + n, inst);
|
||||
to_invsample (rhc->topic, inst->tk->m_sample, values + n, 0, 0);
|
||||
inst_clear_invsample (rhc, inst, &dummy_trig_qc);
|
||||
++n;
|
||||
}
|
||||
|
||||
if (n > 0)
|
||||
{
|
||||
patch_generations (info_seq, (uint32_t) n - 1);
|
||||
if (inst->isnew)
|
||||
{
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
/* if nsamples = 0, it won't match anything, so no need to do anything here for drop_instance_noupdate_no_writers */
|
||||
get_trigger_info_cmn (&post.c, inst);
|
||||
assert (trig_qc.dec_conds_invsample == 0);
|
||||
assert (trig_qc.dec_conds_sample == 0);
|
||||
assert (trig_qc.inc_conds_invsample == 0);
|
||||
assert (trig_qc.inc_conds_sample == 0);
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, ntriggers);
|
||||
}
|
||||
|
||||
if (inst_is_empty (inst))
|
||||
{
|
||||
remove_inst_from_nonempty_list (rhc, inst);
|
||||
if (inst->isdisposed)
|
||||
rhc->n_not_alive_disposed--;
|
||||
if (inst->wrcount == 0)
|
||||
{
|
||||
TRACE ("take: iid %"PRIx64" #0,empty,drop\n", inst->iid);
|
||||
if (!inst->isdisposed)
|
||||
{
|
||||
/* disposed has priority over no writers (why not just 2 bits?) */
|
||||
rhc->n_not_alive_no_writers--;
|
||||
}
|
||||
drop_instance_noupdate_no_writers (rhc, inst);
|
||||
}
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t read_w_qminv (struct dds_rhc_default * __restrict rhc, bool lock, void * __restrict * __restrict values, dds_sample_info_t * __restrict info_seq, int32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond * __restrict cond, read_take_to_sample_t to_sample, read_take_to_invsample_t to_invsample)
|
||||
{
|
||||
int32_t n = 0;
|
||||
assert (max_samples > 0);
|
||||
if (lock)
|
||||
{
|
||||
ddsrt_mutex_lock (&rhc->lock);
|
||||
}
|
||||
|
||||
TRACE ("read_w_qminv(%p,%p,%p,%"PRIu32",%x,%p) - inst %"PRIu32" nonempty %"PRIu32" disp %"PRIu32" nowr %"PRIu32" new %"PRIu32" samples %"PRIu32"+%"PRIu32" read %"PRIu32"+%"PRIu32"\n",
|
||||
(void *) rhc, (void *) values, (void *) info_seq, max_samples, qminv, (void *) cond,
|
||||
TRACE ("read_w_qminv(%p,%p,%p,%"PRId32",%x,%"PRIx64",%p) - inst %"PRIu32" nonempty %"PRIu32" disp %"PRIu32" nowr %"PRIu32" new %"PRIu32" samples %"PRIu32"+%"PRIu32" read %"PRIu32"+%"PRIu32"\n",
|
||||
(void *) rhc, (void *) values, (void *) info_seq, max_samples, qminv, handle, (void *) cond,
|
||||
rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed,
|
||||
rhc->n_not_alive_no_writers, rhc->n_new, rhc->n_vsamples, rhc->n_invsamples,
|
||||
rhc->n_vread, rhc->n_invread);
|
||||
|
||||
if (!ddsrt_circlist_isempty (&rhc->nonempty_instances))
|
||||
const dds_querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? cond->m_query.m_qcmask : 0;
|
||||
if (handle)
|
||||
{
|
||||
struct rhc_instance template, *inst;
|
||||
template.iid = handle;
|
||||
if ((inst = ddsrt_hh_lookup (rhc->instances, &template)) != NULL)
|
||||
n = read_w_qminv_inst (rhc, inst, values, info_seq, max_samples, qminv, qcmask, to_sample, to_invsample);
|
||||
else
|
||||
n = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
}
|
||||
else if (!ddsrt_circlist_isempty (&rhc->nonempty_instances))
|
||||
{
|
||||
const dds_querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? cond->m_query.m_qcmask : 0;
|
||||
struct rhc_instance * inst = oldest_nonempty_instance (rhc);
|
||||
struct rhc_instance * const end = inst;
|
||||
do
|
||||
{
|
||||
if (handle == DDS_HANDLE_NIL || inst->iid == handle)
|
||||
{
|
||||
if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0)
|
||||
{
|
||||
/* samples present & instance, view state matches */
|
||||
struct trigger_info_pre pre;
|
||||
struct trigger_info_post post;
|
||||
struct trigger_info_qcond trig_qc;
|
||||
const unsigned nread = inst_nread (inst);
|
||||
const uint32_t n_first = n;
|
||||
get_trigger_info_pre (&pre, inst);
|
||||
init_trigger_info_qcond (&trig_qc);
|
||||
|
||||
if (inst->latest)
|
||||
{
|
||||
struct rhc_sample *sample = inst->latest->next, * const end1 = sample;
|
||||
do
|
||||
{
|
||||
if ((qmask_of_sample (sample) & qminv) == 0 && (qcmask == 0 || (sample->conds & qcmask)))
|
||||
{
|
||||
/* sample state matches too */
|
||||
set_sample_info (info_seq + n, inst, sample);
|
||||
ddsi_serdata_to_sample (sample->sample, values[n], 0, 0);
|
||||
if (!sample->isread)
|
||||
{
|
||||
TRACE ("s");
|
||||
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, false);
|
||||
sample->isread = true;
|
||||
inst->nvread++;
|
||||
rhc->n_vread++;
|
||||
}
|
||||
if (++n == max_samples)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
sample = sample->next;
|
||||
}
|
||||
while (sample != end1);
|
||||
}
|
||||
|
||||
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask)))
|
||||
{
|
||||
set_sample_info_invsample (info_seq + n, inst);
|
||||
topicless_to_clean_invsample (rhc->topic, inst->tk->m_sample, values[n], 0, 0);
|
||||
if (!inst->inv_isread)
|
||||
{
|
||||
TRACE ("i");
|
||||
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, false);
|
||||
inst->inv_isread = 1;
|
||||
rhc->n_invread++;
|
||||
}
|
||||
++n;
|
||||
}
|
||||
|
||||
bool inst_became_old = false;
|
||||
if (n > n_first && inst->isnew)
|
||||
{
|
||||
inst_became_old = true;
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
if (nread != inst_nread (inst) || inst_became_old)
|
||||
{
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
get_trigger_info_cmn (&post.c, inst);
|
||||
assert (trig_qc.dec_conds_invsample == 0);
|
||||
assert (trig_qc.dec_conds_sample == 0);
|
||||
assert (trig_qc.inc_conds_invsample == 0);
|
||||
assert (trig_qc.inc_conds_sample == 0);
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
}
|
||||
|
||||
if (n > n_first) {
|
||||
patch_generations (info_seq + n_first, n - n_first - 1);
|
||||
}
|
||||
}
|
||||
if (inst->iid == handle)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
do {
|
||||
n += read_w_qminv_inst(rhc, inst, values + n, info_seq + n, max_samples - n, qminv, qcmask, to_sample, to_invsample);
|
||||
inst = next_nonempty_instance (inst);
|
||||
}
|
||||
while (inst != end && n < max_samples);
|
||||
} while (inst != end && n < max_samples);
|
||||
}
|
||||
TRACE ("read: returning %"PRIu32"\n", n);
|
||||
assert (rhc_check_counts_locked (rhc, true, false));
|
||||
|
||||
// FIXME: conditional "lock" plus unconditional "unlock" is inexcusably bad design
|
||||
// It appears to have been introduced at some point so another language binding could lock
|
||||
// the RHC using dds_rhc_default_lock_samples to find out the number of samples present,
|
||||
// then allocate stuff and call read/take with lock=true. All that needs fixing.
|
||||
ddsrt_mutex_unlock (&rhc->lock);
|
||||
assert (n <= INT_MAX);
|
||||
return (int)n;
|
||||
return n;
|
||||
}
|
||||
|
||||
static int dds_rhc_take_w_qminv (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
static int32_t take_w_qminv (struct dds_rhc_default * __restrict rhc, bool lock, void * __restrict * __restrict values, dds_sample_info_t * __restrict info_seq, int32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond * __restrict cond, read_take_to_sample_t to_sample, read_take_to_invsample_t to_invsample)
|
||||
{
|
||||
uint64_t iid;
|
||||
uint32_t n = 0;
|
||||
int32_t n = 0;
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
|
||||
assert (max_samples > 0);
|
||||
if (lock)
|
||||
{
|
||||
ddsrt_mutex_lock (&rhc->lock);
|
||||
}
|
||||
|
||||
TRACE ("take_w_qminv(%p,%p,%p,%"PRIu32",%x) - inst %"PRIu32" nonempty %"PRIu32" disp %"PRIu32" nowr %"PRIu32" new %"PRIu32" samples %"PRIu32"+%"PRIu32" read %"PRIu32"+%"PRIu32"\n",
|
||||
(void*) rhc, (void*) values, (void*) info_seq, max_samples, qminv,
|
||||
TRACE ("take_w_qminv(%p,%p,%p,%"PRId32",%x,%"PRIx64",%p) - inst %"PRIu32" nonempty %"PRIu32" disp %"PRIu32" nowr %"PRIu32" new %"PRIu32" samples %"PRIu32"+%"PRIu32" read %"PRIu32"+%"PRIu32"\n",
|
||||
(void*) rhc, (void*) values, (void*) info_seq, max_samples, qminv, handle, (void *) cond,
|
||||
rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed,
|
||||
rhc->n_not_alive_no_writers, rhc->n_new, rhc->n_vsamples,
|
||||
rhc->n_invsamples, rhc->n_vread, rhc->n_invread);
|
||||
|
||||
if (!ddsrt_circlist_isempty (&rhc->nonempty_instances))
|
||||
const dds_querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? cond->m_query.m_qcmask : 0;
|
||||
if (handle)
|
||||
{
|
||||
struct rhc_instance template, *inst;
|
||||
template.iid = handle;
|
||||
if ((inst = ddsrt_hh_lookup (rhc->instances, &template)) != NULL)
|
||||
n = take_w_qminv_inst (rhc, inst, values, info_seq, max_samples, qminv, qcmask, &ntriggers, to_sample, to_invsample);
|
||||
else
|
||||
n = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
}
|
||||
else if (!ddsrt_circlist_isempty (&rhc->nonempty_instances))
|
||||
{
|
||||
const dds_querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? cond->m_query.m_qcmask : 0;
|
||||
struct rhc_instance *inst = oldest_nonempty_instance (rhc);
|
||||
unsigned n_insts = rhc->n_nonempty_instances;
|
||||
uint32_t n_insts = rhc->n_nonempty_instances;
|
||||
while (n_insts-- > 0 && n < max_samples)
|
||||
{
|
||||
struct rhc_instance * const inst1 = next_nonempty_instance (inst);
|
||||
iid = inst->iid;
|
||||
if (handle == DDS_HANDLE_NIL || iid == handle)
|
||||
{
|
||||
if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0)
|
||||
{
|
||||
struct trigger_info_pre pre;
|
||||
struct trigger_info_post post;
|
||||
struct trigger_info_qcond trig_qc;
|
||||
unsigned nvsamples = inst->nvsamples;
|
||||
const uint32_t n_first = n;
|
||||
get_trigger_info_pre (&pre, inst);
|
||||
init_trigger_info_qcond (&trig_qc);
|
||||
|
||||
if (inst->latest)
|
||||
{
|
||||
struct rhc_sample *psample = inst->latest;
|
||||
struct rhc_sample *sample = psample->next;
|
||||
while (nvsamples--)
|
||||
{
|
||||
struct rhc_sample * const sample1 = sample->next;
|
||||
|
||||
if ((qmask_of_sample (sample) & qminv) != 0 || (qcmask != 0 && !(sample->conds & qcmask)))
|
||||
{
|
||||
/* sample mask doesn't match, or content predicate doesn't match */
|
||||
psample = sample;
|
||||
}
|
||||
else
|
||||
{
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread);
|
||||
|
||||
set_sample_info (info_seq + n, inst, sample);
|
||||
ddsi_serdata_to_sample (sample->sample, values[n], 0, 0);
|
||||
rhc->n_vsamples--;
|
||||
if (sample->isread)
|
||||
{
|
||||
inst->nvread--;
|
||||
rhc->n_vread--;
|
||||
}
|
||||
|
||||
if (--inst->nvsamples > 0)
|
||||
{
|
||||
if (inst->latest == sample)
|
||||
inst->latest = psample;
|
||||
psample->next = sample1;
|
||||
}
|
||||
else
|
||||
{
|
||||
inst->latest = NULL;
|
||||
}
|
||||
|
||||
free_sample (rhc, inst, sample);
|
||||
|
||||
if (++n == max_samples)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
sample = sample1;
|
||||
}
|
||||
}
|
||||
|
||||
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask) != 0))
|
||||
{
|
||||
struct trigger_info_qcond dummy_trig_qc;
|
||||
#ifndef NDEBUG
|
||||
init_trigger_info_qcond (&dummy_trig_qc);
|
||||
#endif
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread);
|
||||
set_sample_info_invsample (info_seq + n, inst);
|
||||
topicless_to_clean_invsample (rhc->topic, inst->tk->m_sample, values[n], 0, 0);
|
||||
inst_clear_invsample (rhc, inst, &dummy_trig_qc);
|
||||
++n;
|
||||
}
|
||||
|
||||
if (n > n_first && inst->isnew)
|
||||
{
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
|
||||
if (n > n_first)
|
||||
{
|
||||
/* if nsamples = 0, it won't match anything, so no need to do
|
||||
anything here for drop_instance_noupdate_no_writers */
|
||||
get_trigger_info_cmn (&post.c, inst);
|
||||
assert (trig_qc.dec_conds_invsample == 0);
|
||||
assert (trig_qc.dec_conds_sample == 0);
|
||||
assert (trig_qc.inc_conds_invsample == 0);
|
||||
assert (trig_qc.inc_conds_sample == 0);
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
}
|
||||
|
||||
if (inst_is_empty (inst))
|
||||
{
|
||||
remove_inst_from_nonempty_list (rhc, inst);
|
||||
|
||||
if (inst->isdisposed)
|
||||
{
|
||||
rhc->n_not_alive_disposed--;
|
||||
}
|
||||
if (inst->wrcount == 0)
|
||||
{
|
||||
TRACE ("take: iid %"PRIx64" #0,empty,drop\n", iid);
|
||||
if (!inst->isdisposed)
|
||||
{
|
||||
/* disposed has priority over no writers (why not just 2 bits?) */
|
||||
rhc->n_not_alive_no_writers--;
|
||||
}
|
||||
drop_instance_noupdate_no_writers (rhc, inst);
|
||||
}
|
||||
}
|
||||
|
||||
if (n > n_first)
|
||||
patch_generations (info_seq + n_first, n - n_first - 1);
|
||||
}
|
||||
if (iid == handle)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
n += take_w_qminv_inst (rhc, inst, values + n, info_seq + n, max_samples - n, qminv, qcmask, &ntriggers, to_sample, to_invsample);
|
||||
inst = inst1;
|
||||
}
|
||||
}
|
||||
TRACE ("take: returning %"PRIu32"\n", n);
|
||||
assert (rhc_check_counts_locked (rhc, true, false));
|
||||
|
||||
// FIXME: conditional "lock" plus unconditional "unlock" is inexcusably bad design
|
||||
// It appears to have been introduced at some point so another language binding could lock
|
||||
// the RHC using dds_rhc_default_lock_samples to find out the number of samples present,
|
||||
// then allocate stuff and call read/take with lock=true. All that needs fixing.
|
||||
ddsrt_mutex_unlock (&rhc->lock);
|
||||
assert (n <= INT_MAX);
|
||||
return (int)n;
|
||||
return n;
|
||||
}
|
||||
|
||||
static int dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
static int32_t dds_rhc_read_w_qminv (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
uint64_t iid;
|
||||
uint32_t n = 0;
|
||||
(void)cond;
|
||||
assert (max_samples <= INT32_MAX);
|
||||
return read_w_qminv (rhc, lock, values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample, read_take_to_invsample);
|
||||
}
|
||||
|
||||
if (lock)
|
||||
{
|
||||
ddsrt_mutex_lock (&rhc->lock);
|
||||
}
|
||||
static int32_t dds_rhc_take_w_qminv (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
assert (max_samples <= INT32_MAX);
|
||||
return take_w_qminv (rhc, lock, values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample, read_take_to_invsample);
|
||||
}
|
||||
|
||||
TRACE ("take_w_qminv(%p,%p,%p,%"PRIu32",%x) - inst %"PRIu32" nonempty %"PRIu32" disp %"PRIu32" nowr %"PRIu32" new %"PRIu32" samples %"PRIu32"+%"PRIu32" read %"PRIu32"+%"PRIu32"\n",
|
||||
(void*) rhc, (void*) values, (void*) info_seq, max_samples, qminv,
|
||||
rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed,
|
||||
rhc->n_not_alive_no_writers, rhc->n_new, rhc->n_vsamples,
|
||||
rhc->n_invsamples, rhc->n_vread, rhc->n_invread);
|
||||
|
||||
if (!ddsrt_circlist_isempty (&rhc->nonempty_instances))
|
||||
{
|
||||
const dds_querycond_mask_t qcmask = (cond && cond->m_query.m_filter) ? cond->m_query.m_qcmask : 0;
|
||||
struct rhc_instance *inst = oldest_nonempty_instance (rhc);
|
||||
unsigned n_insts = rhc->n_nonempty_instances;
|
||||
while (n_insts-- > 0 && n < max_samples)
|
||||
{
|
||||
struct rhc_instance * const inst1 = next_nonempty_instance (inst);
|
||||
iid = inst->iid;
|
||||
if (handle == DDS_HANDLE_NIL || iid == handle)
|
||||
{
|
||||
if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0)
|
||||
{
|
||||
struct trigger_info_pre pre;
|
||||
struct trigger_info_post post;
|
||||
struct trigger_info_qcond trig_qc;
|
||||
unsigned nvsamples = inst->nvsamples;
|
||||
const uint32_t n_first = n;
|
||||
get_trigger_info_pre (&pre, inst);
|
||||
init_trigger_info_qcond (&trig_qc);
|
||||
|
||||
if (inst->latest)
|
||||
{
|
||||
struct rhc_sample *psample = inst->latest;
|
||||
struct rhc_sample *sample = psample->next;
|
||||
while (nvsamples--)
|
||||
{
|
||||
struct rhc_sample * const sample1 = sample->next;
|
||||
|
||||
if ((qmask_of_sample (sample) & qminv) != 0 || (qcmask && !(sample->conds & qcmask)))
|
||||
{
|
||||
psample = sample;
|
||||
}
|
||||
else
|
||||
{
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread);
|
||||
set_sample_info (info_seq + n, inst, sample);
|
||||
values[n] = ddsi_serdata_ref(sample->sample);
|
||||
rhc->n_vsamples--;
|
||||
if (sample->isread)
|
||||
{
|
||||
inst->nvread--;
|
||||
rhc->n_vread--;
|
||||
}
|
||||
|
||||
if (--inst->nvsamples > 0)
|
||||
psample->next = sample1;
|
||||
else
|
||||
inst->latest = NULL;
|
||||
|
||||
free_sample (rhc, inst, sample);
|
||||
|
||||
if (++n == max_samples)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
sample = sample1;
|
||||
}
|
||||
}
|
||||
|
||||
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0 && (qcmask == 0 || (inst->conds & qcmask) != 0))
|
||||
{
|
||||
struct trigger_info_qcond dummy_trig_qc;
|
||||
#ifndef NDEBUG
|
||||
init_trigger_info_qcond (&dummy_trig_qc);
|
||||
#endif
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread);
|
||||
set_sample_info_invsample (info_seq + n, inst);
|
||||
values[n] = ddsi_serdata_ref(inst->tk->m_sample);
|
||||
inst_clear_invsample (rhc, inst, &dummy_trig_qc);
|
||||
++n;
|
||||
}
|
||||
|
||||
if (n > n_first && inst->isnew)
|
||||
{
|
||||
inst->isnew = 0;
|
||||
rhc->n_new--;
|
||||
}
|
||||
|
||||
if (n > n_first)
|
||||
{
|
||||
/* if nsamples = 0, it won't match anything, so no need to do
|
||||
anything here for drop_instance_noupdate_no_writers */
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
get_trigger_info_cmn (&post.c, inst);
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
}
|
||||
|
||||
if (inst_is_empty (inst))
|
||||
{
|
||||
remove_inst_from_nonempty_list (rhc, inst);
|
||||
|
||||
if (inst->isdisposed)
|
||||
{
|
||||
rhc->n_not_alive_disposed--;
|
||||
}
|
||||
if (inst->wrcount == 0)
|
||||
{
|
||||
TRACE ("take: iid %"PRIx64" #0,empty,drop\n", iid);
|
||||
if (!inst->isdisposed)
|
||||
{
|
||||
/* disposed has priority over no writers (why not just 2 bits?) */
|
||||
rhc->n_not_alive_no_writers--;
|
||||
}
|
||||
drop_instance_noupdate_no_writers (rhc, inst);
|
||||
}
|
||||
}
|
||||
|
||||
if (n > n_first)
|
||||
patch_generations (info_seq + n_first, n - n_first - 1);
|
||||
}
|
||||
if (iid == handle)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
inst = inst1;
|
||||
}
|
||||
}
|
||||
TRACE ("take: returning %"PRIu32"\n", n);
|
||||
assert (rhc_check_counts_locked (rhc, true, false));
|
||||
ddsrt_mutex_unlock (&rhc->lock);
|
||||
assert (n <= INT_MAX);
|
||||
return (int)n;
|
||||
static int32_t dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
DDSRT_STATIC_ASSERT (sizeof (void *) == sizeof (struct ddsi_serdata *));
|
||||
assert (max_samples <= INT32_MAX);
|
||||
return take_w_qminv (rhc, lock, (void **) values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample_ref, read_take_to_invsample_ref);
|
||||
}
|
||||
|
||||
/*************************
|
||||
|
@ -2805,21 +2715,21 @@ static bool update_conditions_locked (struct dds_rhc_default *rhc, bool called_f
|
|||
****** READ/TAKE ******
|
||||
*************************/
|
||||
|
||||
static int dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
static int32_t dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
unsigned qminv = qmask_from_mask_n_cond (mask, cond);
|
||||
return dds_rhc_read_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, cond);
|
||||
uint32_t qminv = qmask_from_mask_n_cond (mask, cond);
|
||||
return dds_rhc_read_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, cond);
|
||||
}
|
||||
|
||||
static int dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
static int32_t dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
unsigned qminv = qmask_from_mask_n_cond(mask, cond);
|
||||
return dds_rhc_take_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, cond);
|
||||
uint32_t qminv = qmask_from_mask_n_cond(mask, cond);
|
||||
return dds_rhc_take_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, cond);
|
||||
}
|
||||
|
||||
static int dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle)
|
||||
static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle)
|
||||
{
|
||||
unsigned qminv = qmask_from_dcpsquery (sample_states, view_states, instance_states);
|
||||
uint32_t qminv = qmask_from_dcpsquery (sample_states, view_states, instance_states);
|
||||
return dds_rhc_takecdr_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, NULL);
|
||||
}
|
||||
|
||||
|
@ -2835,11 +2745,11 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
|
|||
return 1;
|
||||
|
||||
const uint32_t ncheck = rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS;
|
||||
unsigned n_instances = 0, n_nonempty_instances = 0;
|
||||
unsigned n_not_alive_disposed = 0, n_not_alive_no_writers = 0, n_new = 0;
|
||||
unsigned n_vsamples = 0, n_vread = 0;
|
||||
unsigned n_invsamples = 0, n_invread = 0;
|
||||
unsigned cond_match_count[CHECK_MAX_CONDS];
|
||||
uint32_t n_instances = 0, n_nonempty_instances = 0;
|
||||
uint32_t n_not_alive_disposed = 0, n_not_alive_no_writers = 0, n_new = 0;
|
||||
uint32_t n_vsamples = 0, n_vread = 0;
|
||||
uint32_t n_invsamples = 0, n_invread = 0;
|
||||
uint32_t cond_match_count[CHECK_MAX_CONDS];
|
||||
dds_querycond_mask_t enabled_qcmask = 0;
|
||||
struct rhc_instance *inst;
|
||||
struct ddsrt_hh_iter iter;
|
||||
|
@ -2860,7 +2770,7 @@ static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_cond
|
|||
|
||||
for (inst = ddsrt_hh_iter_first (rhc->instances, &iter); inst; inst = ddsrt_hh_iter_next (&iter))
|
||||
{
|
||||
unsigned n_vsamples_in_instance = 0, n_read_vsamples_in_instance = 0;
|
||||
uint32_t n_vsamples_in_instance = 0, n_read_vsamples_in_instance = 0;
|
||||
bool a_sample_free = true;
|
||||
|
||||
n_instances++;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue