From c29a81b339de245dd3fd13152c00e70bbadb9c8e Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 29 Apr 2020 17:47:13 +0200 Subject: [PATCH] Track deadline registration, consolidate updates Deadline registration, renewal and deregistration was somewhat spread through the code and relied on the "isdisposed" flag as a proxy for whether it was registered or not. This consolidates the deadline handling code in a final step of updating the instance and uses a separate flag to track whether the instance is currently registered in the deadline administration or not. This also makes it possible to trivially change the rules for when deadline notifications are required, and so allows for, e.g., adding a mode in which instances in the "no writers" state do not trigger any deadline missed notifications, or just once (both of which seem useful modes). Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_rhc_default.c | 129 +++++++++++++--------------- 1 file changed, 59 insertions(+), 70 deletions(-) diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index e1a17b1..2ee62f2 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -270,6 +270,7 @@ struct rhc_instance { unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */ unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */ unsigned inv_isread : 1; /* whether or not that state change has been read before */ + unsigned deadline_reg : 1; /* whether or not registered for a deadline (== isdisposed, except store() defers updates) */ uint32_t disposed_gen; /* bloody generation counters - worst invention of mankind */ uint32_t no_writers_gen; /* __/ */ int32_t strength; /* "current" ownership strength */ @@ -713,7 +714,7 @@ static void free_empty_instance (struct rhc_instance *inst, struct dds_rhc_defau assert (inst_is_empty (inst)); ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk); #ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (!inst->isdisposed) + if (inst->deadline_reg) deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); #endif ddsrt_free (inst); @@ -918,11 +919,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, s->lifespan.t_expire = wrinfo->lifespan_exp; lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan); #endif -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - /* Only renew the deadline missed counter in case the sample is actually stored in the rhc */ - if (!inst->isdisposed) - deadline_renew_instance_locked (&rhc->deadline, &inst->deadline); -#endif s->conds = 0; if (rhc->nqconds != 0) @@ -1264,8 +1260,7 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in else if (inst->isdisposed) { /* No content left, no registrations left, so drop */ - TRACE (",#0,empty,disposed,drop"); - drop_instance_noupdate_no_writers (rhc, instptr); + TRACE (",#0,empty,nowriters,disposed"); return 1; } else @@ -1318,6 +1313,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con inst->tk = tk; inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0; + inst->deadline_reg = 0; inst->isnew = 1; inst->a_sample_free = 1; inst->conds = 0; @@ -1337,12 +1333,6 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con inst->conds |= c->m_query.m_qcmask; } } - -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (!inst->isdisposed) - deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); -#endif - return inst; } @@ -1410,6 +1400,44 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst return RHC_STORED; } +static void postprocess_instance_update (struct dds_rhc_default * __restrict rhc, struct rhc_instance * __restrict * __restrict instptr, const struct trigger_info_pre *pre, const struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, dds_entity *triggers[], size_t *ntriggers) +{ + { + struct rhc_instance *inst = *instptr; + +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + if (inst->isdisposed) + { + if (inst->deadline_reg) + { + inst->deadline_reg = 0; + deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); + } + } + else + { + if (inst->deadline_reg) + deadline_renew_instance_locked (&rhc->deadline, &inst->deadline); + else + { + deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); + inst->deadline_reg = 1; + } + } +#endif + + if (inst_is_empty (inst) && inst->wrcount == 0) + { + drop_instance_noupdate_no_writers (rhc, instptr); + } + } + + if (trigger_info_differs (rhc, pre, post, trig_qc)) + update_conditions_locked (rhc, true, pre, post, trig_qc, *instptr, triggers, ntriggers); + + assert (rhc_check_counts_locked (rhc, true, true)); +} + /* dds_rhc_store: DDSI up call into read cache to store new sample. Returns whether sample delivered (true unless a reliable sample rejected). @@ -1429,22 +1457,25 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons struct trigger_info_qcond trig_qc; rhc_store_result_t stored; status_cb_data_t cb_data; /* Callback data for reader status callback */ - bool delivered = true; - bool notify_data_available = false; + bool notify_data_available; + dds_entity *triggers[MAX_FAST_TRIGGERS]; + size_t ntriggers; - TRACE ("rhc_store(%"PRIx64",%"PRIx64" si %x has_data %d:", tk->m_iid, wr_iid, statusinfo, has_data); + TRACE ("rhc_store %"PRIx64",%"PRIx64" si %x has_data %d:", tk->m_iid, wr_iid, statusinfo, has_data); if (!has_data && statusinfo == 0) { /* Write with nothing but a key -- I guess that would be a register, which we do implicitly. (Currently DDSI2 won't allow it through anyway.) */ - TRACE (" ignore explicit register)\n"); - return delivered; + TRACE (" ignore explicit register\n"); + return true; } + notify_data_available = false; dummy_instance.iid = tk->m_iid; stored = RHC_FILTERED; cb_data.raw_status_id = -1; + ntriggers = 0; init_trigger_info_qcond (&trig_qc); @@ -1459,7 +1490,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons */ if (!has_data && !is_dispose) { - TRACE (" disp/unreg on unknown instance"); + TRACE (" unreg on unknown instance\n"); goto error_or_nochange; } else @@ -1481,7 +1512,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons will raise a SAMPLE_REJECTED, and indicate that the system should kill itself.) Not letting instances go to ALIVE or NEW based on a rejected sample - (no one knows, it seemed) */ - TRACE (" instance rejects sample"); + TRACE (" instance rejects sample\n"); get_trigger_info_pre (&pre, inst); if (has_data || is_dispose) @@ -1553,19 +1584,11 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons TRACE (" disposed->notdisposed"); inst->isdisposed = 0; inst->disposed_gen++; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (!is_dispose) - deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); -#endif } if (is_dispose) { inst->isdisposed = 1; inst_became_disposed = !old_isdisposed; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (inst_became_disposed) - deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); -#endif TRACE (" dispose(%d)", inst_became_disposed); } @@ -1587,17 +1610,11 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons if (!inst->isdisposed) { inst->isdisposed = 1; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); -#endif } } else if (inst->isdisposed) { inst->isdisposed = 0; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); -#endif } goto error_or_nochange; } @@ -1654,15 +1671,9 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons } } - TRACE (")\n"); - - dds_entity *triggers[MAX_FAST_TRIGGERS]; - size_t ntriggers = 0; - if (trigger_info_differs (rhc, &pre, &post, &trig_qc)) - update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, triggers, &ntriggers); - - assert (rhc_check_counts_locked (rhc, true, true)); + postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, triggers, &ntriggers); +error_or_nochange: ddsrt_mutex_unlock (&rhc->lock); if (rhc->reader) @@ -1671,25 +1682,10 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons dds_reader_data_available_cb (rhc->reader); for (size_t i = 0; i < ntriggers; i++) dds_entity_status_signal (triggers[i], 0); + if (cb_data.raw_status_id >= 0) + dds_reader_status_cb (&rhc->reader->m_entity, &cb_data); } - - return delivered; - -error_or_nochange: - - if (rhc->reliable && (stored == RHC_REJECTED)) - { - delivered = false; - } - - ddsrt_mutex_unlock (&rhc->lock); - TRACE (")\n"); - - /* Make any reader status callback */ - - if (cb_data.raw_status_id >= 0 && rhc->reader) - dds_reader_status_cb (&rhc->reader->m_entity, &cb_data); - return delivered; + return !(rhc->reliable && stored == RHC_REJECTED); } static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_common, const struct ddsi_writer_info * __restrict wrinfo) @@ -1736,9 +1732,6 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm if (auto_dispose && !inst->isdisposed) { inst->isdisposed = 1; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); -#endif /* Set invalid sample for disposing it (unregister may also set it for unregistering) */ if (inst->latest) @@ -1758,13 +1751,9 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm } (void) dds_rhc_unregister (rhc, &inst, wrinfo, inst->tstamp, &post, &trig_qc); - - TRACE ("\n"); - notify_data_available = true; - if (trigger_info_differs (rhc, &pre, &post, &trig_qc)) - update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, NULL, &ntriggers); - assert (rhc_check_counts_locked (rhc, true, false)); + postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, NULL, &ntriggers); + TRACE ("\n"); } } TRACE (")\n");