Refactor storing and unregistering writers in RHC

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-04-29 20:08:48 +02:00 committed by eboasson
parent 6800887a74
commit ebdb3fc5cf

View file

@ -695,7 +695,7 @@ static void inst_clear_invsample_if_exists (struct dds_rhc_default *rhc, struct
inst_clear_invsample (rhc, inst, trig_qc);
}
static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc, bool *nda)
static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc, bool * __restrict nda)
{
if (inst->inv_exists && !inst->inv_isread)
{
@ -842,7 +842,7 @@ static bool trigger_info_differs (const struct dds_rhc_default *rhc, const struc
trig_qc->dec_sample_read != trig_qc->inc_sample_read);
}
static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info *wrinfo, const struct ddsi_serdata *sample, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc)
static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info *wrinfo, const struct ddsi_serdata *sample, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc, bool * __restrict nda)
{
struct rhc_sample *s;
@ -936,6 +936,7 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
trig_qc->inc_conds_sample = s->conds;
inst->latest = s;
*nda = true;
return true;
}
@ -1033,7 +1034,7 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *__restric
*instptr = NULL;
}
static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool autodispose, bool iid_update)
static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool autodispose, bool iid_update, bool * __restrict nda)
{
const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0;
@ -1054,10 +1055,8 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *
else. */
TRACE ("cached");
assert (inst->wrcount > 0);
return;
}
if (inst->wrcount == 0)
else if (inst->wrcount == 0)
{
/* Currently no writers at all */
assert (!inst->wr_iid_islive);
@ -1075,6 +1074,7 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *
if (!inst_is_empty (inst) && !inst->isdisposed)
rhc->n_not_alive_no_writers--;
*nda = true;
}
else if (inst_wr_iid == 0 && inst->wrcount == 1)
{
@ -1136,6 +1136,7 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *
inst->wrcount++;
if (autodispose)
inst->autodispose = 1;
*nda = true;
}
else
{
@ -1181,7 +1182,7 @@ static void account_for_nonempty_to_empty_transition (struct dds_rhc_default *__
}
}
static int rhc_unregister_isreg_w_sideeffects (struct dds_rhc_default *rhc, const struct rhc_instance *inst, uint64_t wr_iid)
static int rhc_unregister_delete_registration (struct dds_rhc_default *rhc, const struct rhc_instance *inst, uint64_t wr_iid)
{
/* Returns 1 if last registration just disappeared */
if (inst->wrcount == 0)
@ -1225,9 +1226,8 @@ static int rhc_unregister_isreg_w_sideeffects (struct dds_rhc_default *rhc, cons
}
}
static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_instance **instptr, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_qcond *trig_qc, bool *nda)
static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_qcond *trig_qc, bool * __restrict nda)
{
struct rhc_instance * const inst = *instptr;
assert (inst->wrcount > 0);
if (wrinfo->auto_dispose)
inst->autodispose = 1;
@ -1273,6 +1273,7 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
inst->isdisposed = 1;
rhc->n_not_alive_disposed++;
}
*nda = true;
}
inst->wr_iid_islive = 0;
return 0;
@ -1297,34 +1298,27 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
}
account_for_empty_to_nonempty_transition (rhc, inst);
inst->wr_iid_islive = 0;
*nda = true;
return 0;
}
}
}
static bool dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance **instptr, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc)
static void dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, bool * __restrict nda)
{
struct rhc_instance * const inst = *instptr;
bool notify_data_available = false;
/* 'post' always gets set; instance may have been freed upon return. */
/* 'post' always gets set */
TRACE (" unregister:");
if (!rhc_unregister_isreg_w_sideeffects (rhc, inst, wrinfo->iid))
{
if (!rhc_unregister_delete_registration (rhc, inst, wrinfo->iid)) {
/* other registrations remain */
get_trigger_info_cmn (&post->c, inst);
}
else if (rhc_unregister_updateinst (rhc, instptr, wrinfo, tstamp, trig_qc, &notify_data_available))
{
} else if (rhc_unregister_updateinst (rhc, inst, wrinfo, tstamp, trig_qc, nda)) {
/* instance dropped */
init_trigger_info_cmn_nonmatch (&post->c);
}
else
{
} else {
/* no writers remain, but instance not empty */
get_trigger_info_cmn (&post->c, inst);
}
return notify_data_available;
TRACE (" nda=%d\n", *nda);
}
static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
@ -1336,7 +1330,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con
memset (inst, 0, sizeof (*inst));
inst->iid = tk->m_iid;
inst->tk = tk;
inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1;
inst->wrcount = 1;
inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0;
inst->autodispose = wrinfo->auto_dispose;
inst->deadline_reg = 0;
@ -1362,7 +1356,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con
return inst;
}
static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst, struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk, const bool has_data, status_cb_data_t *cb_data, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc)
static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst, struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk, const bool has_data, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc, bool * __restrict nda)
{
struct rhc_instance *inst;
int ret;
@ -1400,7 +1394,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
inst = alloc_new_instance (rhc, wrinfo, sample, tk);
if (has_data)
{
if (!add_sample (rhc, inst, wrinfo, sample, cb_data, trig_qc))
if (!add_sample (rhc, inst, wrinfo, sample, cb_data, trig_qc, nda))
{
free_empty_instance (inst, rhc);
return RHC_REJECTED;
@ -1408,10 +1402,8 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
}
else
{
if (inst->isdisposed) {
bool nda_dummy = false;
inst_set_invsample (rhc, inst, trig_qc, &nda_dummy);
}
if (inst->isdisposed)
inst_set_invsample (rhc, inst, trig_qc, nda);
}
account_for_empty_to_nonempty_transition (rhc, inst);
@ -1420,7 +1412,6 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
(void) ret;
rhc->n_instances++;
rhc->n_new++;
get_trigger_info_cmn (&post->c, inst);
*out_inst = inst;
return RHC_STORED;
@ -1464,6 +1455,43 @@ static void postprocess_instance_update (struct dds_rhc_default * __restrict rhc
assert (rhc_check_counts_locked (rhc, true, true));
}
static void update_viewstate_and_disposedness (struct dds_rhc_default * __restrict rhc, struct rhc_instance * __restrict inst, bool has_data, bool not_alive, bool is_dispose, bool * __restrict nda)
{
/* Sample arriving for a NOT_ALIVE instance => view state NEW */
if (has_data && not_alive)
{
TRACE (" notalive->alive");
inst->isnew = 1;
*nda = true;
}
/* Desired effect on instance state and disposed_gen:
op DISPOSED NOT_DISPOSED
W ND;gen++ ND
D D D
WD D;gen++ D
Simplest way is to toggle istate when it is currently DISPOSED
and the operation is WD. */
if (has_data && inst->isdisposed)
{
TRACE (" disposed->notdisposed");
inst->disposed_gen++;
if (!is_dispose)
inst->isdisposed = 0;
*nda = true;
}
if (is_dispose)
{
bool wasdisposed = inst->isdisposed;
if (!inst->isdisposed)
{
inst->isdisposed = 1;
*nda = true;
}
TRACE (" dispose(%d)", !wasdisposed);
}
}
/*
dds_rhc_store: DDSI up call into read cache to store new sample. Returns whether sample
delivered (true unless a reliable sample rejected).
@ -1521,14 +1549,12 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
}
else
{
TRACE (" new instance");
stored = rhc_store_new_instance (&inst, rhc, wrinfo, sample, tk, has_data, &cb_data, &post, &trig_qc);
TRACE (" new instance\n");
stored = rhc_store_new_instance (&inst, rhc, wrinfo, sample, tk, has_data, &cb_data, &trig_qc, &notify_data_available);
if (stored != RHC_STORED)
{
goto error_or_nochange;
}
init_trigger_info_cmn_nonmatch (&pre.c);
notify_data_available = true;
}
}
else if (!inst_accepts_sample (rhc, inst, wrinfo, sample, has_data))
@ -1542,25 +1568,13 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
get_trigger_info_pre (&pre, inst);
if (has_data || is_dispose)
{
dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, false);
}
if (statusinfo & NN_STATUSINFO_UNREGISTER)
{
if (dds_rhc_unregister (rhc, &inst, wrinfo, sample->timestamp, &post, &trig_qc))
notify_data_available = true;
}
else
{
get_trigger_info_cmn (&post.c, inst);
}
/* notify sample lost */
dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, false, &notify_data_available);
/* notify sample lost */
cb_data.raw_status_id = (int) DDS_SAMPLE_LOST_STATUS_ID;
cb_data.extra = 0;
cb_data.handle = 0;
cb_data.add = true;
goto error_or_nochange;
}
else
{
@ -1579,7 +1593,6 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
const bool old_isdisposed = inst->isdisposed;
const bool old_isnew = inst->isnew;
const bool was_empty = inst_is_empty (inst);
int inst_became_disposed = 0;
/* Not just an unregister, so a write and/or a dispose (possibly
combined with an unregister). Write & dispose create a
@ -1589,66 +1602,29 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
(i.e., out-of-memory), abort the operation and hope that the
caller can still notify the application. */
dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, true);
dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, true, &notify_data_available);
update_viewstate_and_disposedness (rhc, inst, has_data, not_alive, is_dispose, &notify_data_available);
/* Sample arriving for a NOT_ALIVE instance => view state NEW */
if (has_data && not_alive)
{
TRACE (" notalive->alive");
inst->isnew = 1;
}
/* Desired effect on instance state and disposed_gen:
op DISPOSED NOT_DISPOSED
W ND;gen++ ND
D D D
WD D;gen++ D
Simplest way is to toggle istate when it is currently DISPOSED
and the operation is WD. */
if (has_data && inst->isdisposed)
{
TRACE (" disposed->notdisposed");
inst->isdisposed = 0;
inst->disposed_gen++;
}
if (is_dispose)
{
inst->isdisposed = 1;
inst_became_disposed = !old_isdisposed;
TRACE (" dispose(%d)", inst_became_disposed);
}
/* Only need to add a sample to the history if the input actually
is a sample. */
/* Only need to add a sample to the history if the input actually is a sample. */
if (has_data)
{
TRACE (" add_sample");
if (!add_sample (rhc, inst, wrinfo, sample, &cb_data, &trig_qc))
if (!add_sample (rhc, inst, wrinfo, sample, &cb_data, &trig_qc, &notify_data_available))
{
TRACE ("(reject)");
TRACE ("(reject)\n");
stored = RHC_REJECTED;
/* FIXME: fix the bad rejection handling, probably put back in a proper rollback, until then a band-aid like this will have to do: */
inst->isnew = old_isnew;
if (old_isdisposed)
{
inst->disposed_gen--;
if (!inst->isdisposed)
{
inst->isdisposed = 1;
}
}
else if (inst->isdisposed)
{
inst->isdisposed = 0;
}
inst->isdisposed = old_isdisposed;
goto error_or_nochange;
}
notify_data_available = true;
}
/* If instance became disposed, add an invalid sample if there are no samples left */
if (inst_became_disposed && (inst->latest == NULL || inst->latest->isread))
if ((bool) inst->isdisposed > old_isdisposed && (inst->latest == NULL || inst->latest->isread))
inst_set_invsample (rhc, inst, &trig_qc, &notify_data_available);
update_inst (inst, wrinfo, true, sample->timestamp);
@ -1658,7 +1634,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
guaranteed that we end up with a non-empty instance: for
example, if the instance was disposed & empty, nothing
changes. */
if (inst->latest || inst_became_disposed)
if (inst->latest || (bool) inst->isdisposed > old_isdisposed)
{
if (was_empty)
account_for_empty_to_nonempty_transition (rhc, inst);
@ -1672,29 +1648,30 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
}
}
TRACE(" nda=%d\n", notify_data_available);
assert (rhc_check_counts_locked (rhc, false, false));
}
if (statusinfo & NN_STATUSINFO_UNREGISTER)
{
/* Either a pure unregister, or the instance rejected the sample
because of time stamps, content filter, or something else. If
the writer unregisters the instance, I think we should ignore
the acceptance filters and process it anyway.
if (statusinfo & NN_STATUSINFO_UNREGISTER)
{
/* Either a pure unregister, or the instance rejected the sample
because of time stamps, content filter, or something else. If
the writer unregisters the instance, I think we should ignore
the acceptance filters and process it anyway.
It is a bit unclear what
It is a bit unclear what
write_w_timestamp(x,1) ; unregister_w_timestamp(x,0)
write_w_timestamp(x,1) ; unregister_w_timestamp(x,0)
actually means if BY_SOURCE ordering is selected: does that
mean an application reading "x" after the write and reading it
again after the unregister will see a change in the
no_writers_generation field? */
dds_rhc_unregister (rhc, &inst, wrinfo, sample->timestamp, &post, &trig_qc);
}
else
{
get_trigger_info_cmn (&post.c, inst);
}
actually means if BY_SOURCE ordering is selected: does that
mean an application reading "x" after the write and reading it
again after the unregister will see a change in the
no_writers_generation field? */
dds_rhc_unregister (rhc, inst, wrinfo, sample->timestamp, &post, &trig_qc, &notify_data_available);
}
else
{
get_trigger_info_cmn (&post.c, inst);
}
postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, triggers, &ntriggers);
@ -1736,7 +1713,6 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm
struct rhc_instance *inst;
struct ddsrt_hh_iter iter;
const uint64_t wr_iid = wrinfo->iid;
const bool auto_dispose = wrinfo->auto_dispose;
size_t ntriggers = SIZE_MAX;
ddsrt_mutex_lock (&rhc->lock);
@ -1751,47 +1727,16 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm
struct trigger_info_qcond trig_qc;
get_trigger_info_pre (&pre, inst);
init_trigger_info_qcond (&trig_qc);
TRACE (" %"PRIx64":", inst->iid);
assert (inst->wrcount > 0);
if (auto_dispose && !inst->isdisposed)
{
notify_data_available = true;
inst->isdisposed = 1;
/* Set invalid sample for disposing it (unregister may also set it for unregistering) */
if (inst->latest && !inst->latest->isread)
{
assert (!inst->inv_exists);
rhc->n_not_alive_disposed++;
}
else
{
const bool was_empty = inst_is_empty (inst);
inst_set_invsample (rhc, inst, &trig_qc, &notify_data_available);
if (was_empty)
account_for_empty_to_nonempty_transition (rhc, inst);
else
rhc->n_not_alive_disposed++;
}
}
(void) dds_rhc_unregister (rhc, &inst, wrinfo, inst->tstamp, &post, &trig_qc);
notify_data_available = true;
dds_rhc_unregister (rhc, inst, wrinfo, inst->tstamp, &post, &trig_qc, &notify_data_available);
postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, NULL, &ntriggers);
TRACE ("\n");
}
}
TRACE (")\n");
ddsrt_mutex_unlock (&rhc->lock);
if (rhc->reader)
{
if (notify_data_available)
dds_reader_data_available_cb (rhc->reader);
}
if (rhc->reader && notify_data_available)
dds_reader_data_available_cb (rhc->reader);
}
static void dds_rhc_default_relinquish_ownership (struct ddsi_rhc * __restrict rhc_common, const uint64_t wr_iid)
@ -2044,7 +1989,6 @@ static int32_t read_w_qminv_inst (struct dds_rhc_default * const __restrict rhc,
to_sample (sample->sample, values + n, 0, 0);
if (!sample->isread)
{
TRACE ("s");
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, false);
sample->isread = true;
inst->nvread++;
@ -2063,7 +2007,6 @@ static int32_t read_w_qminv_inst (struct dds_rhc_default * const __restrict rhc,
to_invsample (rhc->topic, inst->tk->m_sample, values + n, 0, 0);
if (!inst->inv_isread)
{
TRACE ("i");
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, false);
inst->inv_isread = 1;
rhc->n_invread++;