From ebdb3fc5cf38c3bee423155d064842f4859d1e8d Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 29 Apr 2020 20:08:48 +0200 Subject: [PATCH] Refactor storing and unregistering writers in RHC Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_rhc_default.c | 245 +++++++++++----------------- 1 file changed, 94 insertions(+), 151 deletions(-) diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index 0a373e7..9195cd9 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -695,7 +695,7 @@ static void inst_clear_invsample_if_exists (struct dds_rhc_default *rhc, struct inst_clear_invsample (rhc, inst, trig_qc); } -static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc, bool *nda) +static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc, bool * __restrict nda) { if (inst->inv_exists && !inst->inv_isread) { @@ -842,7 +842,7 @@ static bool trigger_info_differs (const struct dds_rhc_default *rhc, const struc trig_qc->dec_sample_read != trig_qc->inc_sample_read); } -static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info *wrinfo, const struct ddsi_serdata *sample, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc) +static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info *wrinfo, const struct ddsi_serdata *sample, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc, bool * __restrict nda) { struct rhc_sample *s; @@ -936,6 +936,7 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, trig_qc->inc_conds_sample = s->conds; inst->latest = s; + *nda = true; return true; } @@ -1033,7 +1034,7 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *__restric *instptr = NULL; } -static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool autodispose, bool iid_update) +static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool autodispose, bool iid_update, bool * __restrict nda) { const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0; @@ -1054,10 +1055,8 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance * else. */ TRACE ("cached"); assert (inst->wrcount > 0); - return; } - - if (inst->wrcount == 0) + else if (inst->wrcount == 0) { /* Currently no writers at all */ assert (!inst->wr_iid_islive); @@ -1075,6 +1074,7 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance * if (!inst_is_empty (inst) && !inst->isdisposed) rhc->n_not_alive_no_writers--; + *nda = true; } else if (inst_wr_iid == 0 && inst->wrcount == 1) { @@ -1136,6 +1136,7 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance * inst->wrcount++; if (autodispose) inst->autodispose = 1; + *nda = true; } else { @@ -1181,7 +1182,7 @@ static void account_for_nonempty_to_empty_transition (struct dds_rhc_default *__ } } -static int rhc_unregister_isreg_w_sideeffects (struct dds_rhc_default *rhc, const struct rhc_instance *inst, uint64_t wr_iid) +static int rhc_unregister_delete_registration (struct dds_rhc_default *rhc, const struct rhc_instance *inst, uint64_t wr_iid) { /* Returns 1 if last registration just disappeared */ if (inst->wrcount == 0) @@ -1225,9 +1226,8 @@ static int rhc_unregister_isreg_w_sideeffects (struct dds_rhc_default *rhc, cons } } -static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_instance **instptr, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_qcond *trig_qc, bool *nda) +static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_qcond *trig_qc, bool * __restrict nda) { - struct rhc_instance * const inst = *instptr; assert (inst->wrcount > 0); if (wrinfo->auto_dispose) inst->autodispose = 1; @@ -1273,6 +1273,7 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in inst->isdisposed = 1; rhc->n_not_alive_disposed++; } + *nda = true; } inst->wr_iid_islive = 0; return 0; @@ -1297,34 +1298,27 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in } account_for_empty_to_nonempty_transition (rhc, inst); inst->wr_iid_islive = 0; + *nda = true; return 0; } } } -static bool dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance **instptr, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc) +static void dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info * __restrict wrinfo, ddsrt_wctime_t tstamp, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, bool * __restrict nda) { - struct rhc_instance * const inst = *instptr; - bool notify_data_available = false; - - /* 'post' always gets set; instance may have been freed upon return. */ + /* 'post' always gets set */ TRACE (" unregister:"); - if (!rhc_unregister_isreg_w_sideeffects (rhc, inst, wrinfo->iid)) - { + if (!rhc_unregister_delete_registration (rhc, inst, wrinfo->iid)) { /* other registrations remain */ get_trigger_info_cmn (&post->c, inst); - } - else if (rhc_unregister_updateinst (rhc, instptr, wrinfo, tstamp, trig_qc, ¬ify_data_available)) - { + } else if (rhc_unregister_updateinst (rhc, inst, wrinfo, tstamp, trig_qc, nda)) { /* instance dropped */ init_trigger_info_cmn_nonmatch (&post->c); - } - else - { + } else { /* no writers remain, but instance not empty */ get_trigger_info_cmn (&post->c, inst); } - return notify_data_available; + TRACE (" nda=%d\n", *nda); } static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) @@ -1336,7 +1330,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con memset (inst, 0, sizeof (*inst)); inst->iid = tk->m_iid; inst->tk = tk; - inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; + inst->wrcount = 1; inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0; inst->autodispose = wrinfo->auto_dispose; inst->deadline_reg = 0; @@ -1362,7 +1356,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con return inst; } -static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst, struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk, const bool has_data, status_cb_data_t *cb_data, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc) +static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst, struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk, const bool has_data, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc, bool * __restrict nda) { struct rhc_instance *inst; int ret; @@ -1400,7 +1394,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst inst = alloc_new_instance (rhc, wrinfo, sample, tk); if (has_data) { - if (!add_sample (rhc, inst, wrinfo, sample, cb_data, trig_qc)) + if (!add_sample (rhc, inst, wrinfo, sample, cb_data, trig_qc, nda)) { free_empty_instance (inst, rhc); return RHC_REJECTED; @@ -1408,10 +1402,8 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst } else { - if (inst->isdisposed) { - bool nda_dummy = false; - inst_set_invsample (rhc, inst, trig_qc, &nda_dummy); - } + if (inst->isdisposed) + inst_set_invsample (rhc, inst, trig_qc, nda); } account_for_empty_to_nonempty_transition (rhc, inst); @@ -1420,7 +1412,6 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst (void) ret; rhc->n_instances++; rhc->n_new++; - get_trigger_info_cmn (&post->c, inst); *out_inst = inst; return RHC_STORED; @@ -1464,6 +1455,43 @@ static void postprocess_instance_update (struct dds_rhc_default * __restrict rhc assert (rhc_check_counts_locked (rhc, true, true)); } +static void update_viewstate_and_disposedness (struct dds_rhc_default * __restrict rhc, struct rhc_instance * __restrict inst, bool has_data, bool not_alive, bool is_dispose, bool * __restrict nda) +{ + /* Sample arriving for a NOT_ALIVE instance => view state NEW */ + if (has_data && not_alive) + { + TRACE (" notalive->alive"); + inst->isnew = 1; + *nda = true; + } + + /* Desired effect on instance state and disposed_gen: + op DISPOSED NOT_DISPOSED + W ND;gen++ ND + D D D + WD D;gen++ D + Simplest way is to toggle istate when it is currently DISPOSED + and the operation is WD. */ + if (has_data && inst->isdisposed) + { + TRACE (" disposed->notdisposed"); + inst->disposed_gen++; + if (!is_dispose) + inst->isdisposed = 0; + *nda = true; + } + if (is_dispose) + { + bool wasdisposed = inst->isdisposed; + if (!inst->isdisposed) + { + inst->isdisposed = 1; + *nda = true; + } + TRACE (" dispose(%d)", !wasdisposed); + } +} + /* dds_rhc_store: DDSI up call into read cache to store new sample. Returns whether sample delivered (true unless a reliable sample rejected). @@ -1521,14 +1549,12 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons } else { - TRACE (" new instance"); - stored = rhc_store_new_instance (&inst, rhc, wrinfo, sample, tk, has_data, &cb_data, &post, &trig_qc); + TRACE (" new instance\n"); + stored = rhc_store_new_instance (&inst, rhc, wrinfo, sample, tk, has_data, &cb_data, &trig_qc, ¬ify_data_available); if (stored != RHC_STORED) - { goto error_or_nochange; - } + init_trigger_info_cmn_nonmatch (&pre.c); - notify_data_available = true; } } else if (!inst_accepts_sample (rhc, inst, wrinfo, sample, has_data)) @@ -1542,25 +1568,13 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons get_trigger_info_pre (&pre, inst); if (has_data || is_dispose) - { - dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, false); - } - if (statusinfo & NN_STATUSINFO_UNREGISTER) - { - if (dds_rhc_unregister (rhc, &inst, wrinfo, sample->timestamp, &post, &trig_qc)) - notify_data_available = true; - } - else - { - get_trigger_info_cmn (&post.c, inst); - } - /* notify sample lost */ + dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, false, ¬ify_data_available); + /* notify sample lost */ cb_data.raw_status_id = (int) DDS_SAMPLE_LOST_STATUS_ID; cb_data.extra = 0; cb_data.handle = 0; cb_data.add = true; - goto error_or_nochange; } else { @@ -1579,7 +1593,6 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons const bool old_isdisposed = inst->isdisposed; const bool old_isnew = inst->isnew; const bool was_empty = inst_is_empty (inst); - int inst_became_disposed = 0; /* Not just an unregister, so a write and/or a dispose (possibly combined with an unregister). Write & dispose create a @@ -1589,66 +1602,29 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons (i.e., out-of-memory), abort the operation and hope that the caller can still notify the application. */ - dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, true); + dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, true, ¬ify_data_available); + update_viewstate_and_disposedness (rhc, inst, has_data, not_alive, is_dispose, ¬ify_data_available); - /* Sample arriving for a NOT_ALIVE instance => view state NEW */ - if (has_data && not_alive) - { - TRACE (" notalive->alive"); - inst->isnew = 1; - } - - /* Desired effect on instance state and disposed_gen: - op DISPOSED NOT_DISPOSED - W ND;gen++ ND - D D D - WD D;gen++ D - Simplest way is to toggle istate when it is currently DISPOSED - and the operation is WD. */ - if (has_data && inst->isdisposed) - { - TRACE (" disposed->notdisposed"); - inst->isdisposed = 0; - inst->disposed_gen++; - } - if (is_dispose) - { - inst->isdisposed = 1; - inst_became_disposed = !old_isdisposed; - TRACE (" dispose(%d)", inst_became_disposed); - } - - /* Only need to add a sample to the history if the input actually - is a sample. */ + /* Only need to add a sample to the history if the input actually is a sample. */ if (has_data) { TRACE (" add_sample"); - if (!add_sample (rhc, inst, wrinfo, sample, &cb_data, &trig_qc)) + if (!add_sample (rhc, inst, wrinfo, sample, &cb_data, &trig_qc, ¬ify_data_available)) { - TRACE ("(reject)"); + TRACE ("(reject)\n"); stored = RHC_REJECTED; /* FIXME: fix the bad rejection handling, probably put back in a proper rollback, until then a band-aid like this will have to do: */ inst->isnew = old_isnew; if (old_isdisposed) - { inst->disposed_gen--; - if (!inst->isdisposed) - { - inst->isdisposed = 1; - } - } - else if (inst->isdisposed) - { - inst->isdisposed = 0; - } + inst->isdisposed = old_isdisposed; goto error_or_nochange; } - notify_data_available = true; } /* If instance became disposed, add an invalid sample if there are no samples left */ - if (inst_became_disposed && (inst->latest == NULL || inst->latest->isread)) + if ((bool) inst->isdisposed > old_isdisposed && (inst->latest == NULL || inst->latest->isread)) inst_set_invsample (rhc, inst, &trig_qc, ¬ify_data_available); update_inst (inst, wrinfo, true, sample->timestamp); @@ -1658,7 +1634,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons guaranteed that we end up with a non-empty instance: for example, if the instance was disposed & empty, nothing changes. */ - if (inst->latest || inst_became_disposed) + if (inst->latest || (bool) inst->isdisposed > old_isdisposed) { if (was_empty) account_for_empty_to_nonempty_transition (rhc, inst); @@ -1672,29 +1648,30 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons } } + TRACE(" nda=%d\n", notify_data_available); assert (rhc_check_counts_locked (rhc, false, false)); + } - if (statusinfo & NN_STATUSINFO_UNREGISTER) - { - /* Either a pure unregister, or the instance rejected the sample - because of time stamps, content filter, or something else. If - the writer unregisters the instance, I think we should ignore - the acceptance filters and process it anyway. + if (statusinfo & NN_STATUSINFO_UNREGISTER) + { + /* Either a pure unregister, or the instance rejected the sample + because of time stamps, content filter, or something else. If + the writer unregisters the instance, I think we should ignore + the acceptance filters and process it anyway. - It is a bit unclear what + It is a bit unclear what - write_w_timestamp(x,1) ; unregister_w_timestamp(x,0) + write_w_timestamp(x,1) ; unregister_w_timestamp(x,0) - actually means if BY_SOURCE ordering is selected: does that - mean an application reading "x" after the write and reading it - again after the unregister will see a change in the - no_writers_generation field? */ - dds_rhc_unregister (rhc, &inst, wrinfo, sample->timestamp, &post, &trig_qc); - } - else - { - get_trigger_info_cmn (&post.c, inst); - } + actually means if BY_SOURCE ordering is selected: does that + mean an application reading "x" after the write and reading it + again after the unregister will see a change in the + no_writers_generation field? */ + dds_rhc_unregister (rhc, inst, wrinfo, sample->timestamp, &post, &trig_qc, ¬ify_data_available); + } + else + { + get_trigger_info_cmn (&post.c, inst); } postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, triggers, &ntriggers); @@ -1736,7 +1713,6 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm struct rhc_instance *inst; struct ddsrt_hh_iter iter; const uint64_t wr_iid = wrinfo->iid; - const bool auto_dispose = wrinfo->auto_dispose; size_t ntriggers = SIZE_MAX; ddsrt_mutex_lock (&rhc->lock); @@ -1751,47 +1727,16 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm struct trigger_info_qcond trig_qc; get_trigger_info_pre (&pre, inst); init_trigger_info_qcond (&trig_qc); - TRACE (" %"PRIx64":", inst->iid); - - assert (inst->wrcount > 0); - if (auto_dispose && !inst->isdisposed) - { - notify_data_available = true; - inst->isdisposed = 1; - - /* Set invalid sample for disposing it (unregister may also set it for unregistering) */ - if (inst->latest && !inst->latest->isread) - { - assert (!inst->inv_exists); - rhc->n_not_alive_disposed++; - } - else - { - const bool was_empty = inst_is_empty (inst); - inst_set_invsample (rhc, inst, &trig_qc, ¬ify_data_available); - if (was_empty) - account_for_empty_to_nonempty_transition (rhc, inst); - else - rhc->n_not_alive_disposed++; - } - } - - (void) dds_rhc_unregister (rhc, &inst, wrinfo, inst->tstamp, &post, &trig_qc); - notify_data_available = true; + dds_rhc_unregister (rhc, inst, wrinfo, inst->tstamp, &post, &trig_qc, ¬ify_data_available); postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, NULL, &ntriggers); TRACE ("\n"); } } - TRACE (")\n"); - ddsrt_mutex_unlock (&rhc->lock); - if (rhc->reader) - { - if (notify_data_available) - dds_reader_data_available_cb (rhc->reader); - } + if (rhc->reader && notify_data_available) + dds_reader_data_available_cb (rhc->reader); } static void dds_rhc_default_relinquish_ownership (struct ddsi_rhc * __restrict rhc_common, const uint64_t wr_iid) @@ -2044,7 +1989,6 @@ static int32_t read_w_qminv_inst (struct dds_rhc_default * const __restrict rhc, to_sample (sample->sample, values + n, 0, 0); if (!sample->isread) { - TRACE ("s"); read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, false); sample->isread = true; inst->nvread++; @@ -2063,7 +2007,6 @@ static int32_t read_w_qminv_inst (struct dds_rhc_default * const __restrict rhc, to_invsample (rhc->topic, inst->tk->m_sample, values + n, 0, 0); if (!inst->inv_isread) { - TRACE ("i"); read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, false); inst->inv_isread = 1; rhc->n_invread++;