diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index e1a17b1..2ee62f2 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -270,6 +270,7 @@ struct rhc_instance { unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */ unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */ unsigned inv_isread : 1; /* whether or not that state change has been read before */ + unsigned deadline_reg : 1; /* whether or not registered for a deadline (== isdisposed, except store() defers updates) */ uint32_t disposed_gen; /* bloody generation counters - worst invention of mankind */ uint32_t no_writers_gen; /* __/ */ int32_t strength; /* "current" ownership strength */ @@ -713,7 +714,7 @@ static void free_empty_instance (struct rhc_instance *inst, struct dds_rhc_defau assert (inst_is_empty (inst)); ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk); #ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (!inst->isdisposed) + if (inst->deadline_reg) deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); #endif ddsrt_free (inst); @@ -918,11 +919,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, s->lifespan.t_expire = wrinfo->lifespan_exp; lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan); #endif -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - /* Only renew the deadline missed counter in case the sample is actually stored in the rhc */ - if (!inst->isdisposed) - deadline_renew_instance_locked (&rhc->deadline, &inst->deadline); -#endif s->conds = 0; if (rhc->nqconds != 0) @@ -1264,8 +1260,7 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in else if (inst->isdisposed) { /* No content left, no registrations left, so drop */ - TRACE (",#0,empty,disposed,drop"); - drop_instance_noupdate_no_writers (rhc, instptr); + TRACE (",#0,empty,nowriters,disposed"); return 1; } else @@ -1318,6 +1313,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con inst->tk = tk; inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0; + inst->deadline_reg = 0; inst->isnew = 1; inst->a_sample_free = 1; inst->conds = 0; @@ -1337,12 +1333,6 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con inst->conds |= c->m_query.m_qcmask; } } - -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (!inst->isdisposed) - deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); -#endif - return inst; } @@ -1410,6 +1400,44 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst return RHC_STORED; } +static void postprocess_instance_update (struct dds_rhc_default * __restrict rhc, struct rhc_instance * __restrict * __restrict instptr, const struct trigger_info_pre *pre, const struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, dds_entity *triggers[], size_t *ntriggers) +{ + { + struct rhc_instance *inst = *instptr; + +#ifdef DDSI_INCLUDE_DEADLINE_MISSED + if (inst->isdisposed) + { + if (inst->deadline_reg) + { + inst->deadline_reg = 0; + deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); + } + } + else + { + if (inst->deadline_reg) + deadline_renew_instance_locked (&rhc->deadline, &inst->deadline); + else + { + deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); + inst->deadline_reg = 1; + } + } +#endif + + if (inst_is_empty (inst) && inst->wrcount == 0) + { + drop_instance_noupdate_no_writers (rhc, instptr); + } + } + + if (trigger_info_differs (rhc, pre, post, trig_qc)) + update_conditions_locked (rhc, true, pre, post, trig_qc, *instptr, triggers, ntriggers); + + assert (rhc_check_counts_locked (rhc, true, true)); +} + /* dds_rhc_store: DDSI up call into read cache to store new sample. Returns whether sample delivered (true unless a reliable sample rejected). @@ -1429,22 +1457,25 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons struct trigger_info_qcond trig_qc; rhc_store_result_t stored; status_cb_data_t cb_data; /* Callback data for reader status callback */ - bool delivered = true; - bool notify_data_available = false; + bool notify_data_available; + dds_entity *triggers[MAX_FAST_TRIGGERS]; + size_t ntriggers; - TRACE ("rhc_store(%"PRIx64",%"PRIx64" si %x has_data %d:", tk->m_iid, wr_iid, statusinfo, has_data); + TRACE ("rhc_store %"PRIx64",%"PRIx64" si %x has_data %d:", tk->m_iid, wr_iid, statusinfo, has_data); if (!has_data && statusinfo == 0) { /* Write with nothing but a key -- I guess that would be a register, which we do implicitly. (Currently DDSI2 won't allow it through anyway.) */ - TRACE (" ignore explicit register)\n"); - return delivered; + TRACE (" ignore explicit register\n"); + return true; } + notify_data_available = false; dummy_instance.iid = tk->m_iid; stored = RHC_FILTERED; cb_data.raw_status_id = -1; + ntriggers = 0; init_trigger_info_qcond (&trig_qc); @@ -1459,7 +1490,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons */ if (!has_data && !is_dispose) { - TRACE (" disp/unreg on unknown instance"); + TRACE (" unreg on unknown instance\n"); goto error_or_nochange; } else @@ -1481,7 +1512,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons will raise a SAMPLE_REJECTED, and indicate that the system should kill itself.) Not letting instances go to ALIVE or NEW based on a rejected sample - (no one knows, it seemed) */ - TRACE (" instance rejects sample"); + TRACE (" instance rejects sample\n"); get_trigger_info_pre (&pre, inst); if (has_data || is_dispose) @@ -1553,19 +1584,11 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons TRACE (" disposed->notdisposed"); inst->isdisposed = 0; inst->disposed_gen++; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (!is_dispose) - deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); -#endif } if (is_dispose) { inst->isdisposed = 1; inst_became_disposed = !old_isdisposed; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - if (inst_became_disposed) - deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); -#endif TRACE (" dispose(%d)", inst_became_disposed); } @@ -1587,17 +1610,11 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons if (!inst->isdisposed) { inst->isdisposed = 1; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); -#endif } } else if (inst->isdisposed) { inst->isdisposed = 0; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ()); -#endif } goto error_or_nochange; } @@ -1654,15 +1671,9 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons } } - TRACE (")\n"); - - dds_entity *triggers[MAX_FAST_TRIGGERS]; - size_t ntriggers = 0; - if (trigger_info_differs (rhc, &pre, &post, &trig_qc)) - update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, triggers, &ntriggers); - - assert (rhc_check_counts_locked (rhc, true, true)); + postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, triggers, &ntriggers); +error_or_nochange: ddsrt_mutex_unlock (&rhc->lock); if (rhc->reader) @@ -1671,25 +1682,10 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons dds_reader_data_available_cb (rhc->reader); for (size_t i = 0; i < ntriggers; i++) dds_entity_status_signal (triggers[i], 0); + if (cb_data.raw_status_id >= 0) + dds_reader_status_cb (&rhc->reader->m_entity, &cb_data); } - - return delivered; - -error_or_nochange: - - if (rhc->reliable && (stored == RHC_REJECTED)) - { - delivered = false; - } - - ddsrt_mutex_unlock (&rhc->lock); - TRACE (")\n"); - - /* Make any reader status callback */ - - if (cb_data.raw_status_id >= 0 && rhc->reader) - dds_reader_status_cb (&rhc->reader->m_entity, &cb_data); - return delivered; + return !(rhc->reliable && stored == RHC_REJECTED); } static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_common, const struct ddsi_writer_info * __restrict wrinfo) @@ -1736,9 +1732,6 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm if (auto_dispose && !inst->isdisposed) { inst->isdisposed = 1; -#ifdef DDSI_INCLUDE_DEADLINE_MISSED - deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); -#endif /* Set invalid sample for disposing it (unregister may also set it for unregistering) */ if (inst->latest) @@ -1758,13 +1751,9 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm } (void) dds_rhc_unregister (rhc, &inst, wrinfo, inst->tstamp, &post, &trig_qc); - - TRACE ("\n"); - notify_data_available = true; - if (trigger_info_differs (rhc, &pre, &post, &trig_qc)) - update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, NULL, &ntriggers); - assert (rhc_check_counts_locked (rhc, true, false)); + postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, NULL, &ntriggers); + TRACE ("\n"); } } TRACE (")\n");