Track deadline registration, consolidate updates

Deadline registration, renewal and deregistration was somewhat spread
through the code and relied on the "isdisposed" flag as a proxy for
whether it was registered or not.  This consolidates the deadline
handling code in a final step of updating the instance and uses a
separate flag to track whether the instance is currently registered in
the deadline administration or not.

This also makes it possible to trivially change the rules for when
deadline notifications are required, and so allows for, e.g., adding a
mode in which instances in the "no writers" state do not trigger any
deadline missed notifications, or just once (both of which seem useful
modes).

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-04-29 17:47:13 +02:00 committed by eboasson
parent ff591ae684
commit c29a81b339

View file

@ -270,6 +270,7 @@ struct rhc_instance {
unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */ unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */
unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */ unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */
unsigned inv_isread : 1; /* whether or not that state change has been read before */ unsigned inv_isread : 1; /* whether or not that state change has been read before */
unsigned deadline_reg : 1; /* whether or not registered for a deadline (== isdisposed, except store() defers updates) */
uint32_t disposed_gen; /* bloody generation counters - worst invention of mankind */ uint32_t disposed_gen; /* bloody generation counters - worst invention of mankind */
uint32_t no_writers_gen; /* __/ */ uint32_t no_writers_gen; /* __/ */
int32_t strength; /* "current" ownership strength */ int32_t strength; /* "current" ownership strength */
@ -713,7 +714,7 @@ static void free_empty_instance (struct rhc_instance *inst, struct dds_rhc_defau
assert (inst_is_empty (inst)); assert (inst_is_empty (inst));
ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk); ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk);
#ifdef DDSI_INCLUDE_DEADLINE_MISSED #ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (!inst->isdisposed) if (inst->deadline_reg)
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline); deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif #endif
ddsrt_free (inst); ddsrt_free (inst);
@ -918,11 +919,6 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
s->lifespan.t_expire = wrinfo->lifespan_exp; s->lifespan.t_expire = wrinfo->lifespan_exp;
lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan); lifespan_register_sample_locked (&rhc->lifespan, &s->lifespan);
#endif #endif
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
/* Only renew the deadline missed counter in case the sample is actually stored in the rhc */
if (!inst->isdisposed)
deadline_renew_instance_locked (&rhc->deadline, &inst->deadline);
#endif
s->conds = 0; s->conds = 0;
if (rhc->nqconds != 0) if (rhc->nqconds != 0)
@ -1264,8 +1260,7 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
else if (inst->isdisposed) else if (inst->isdisposed)
{ {
/* No content left, no registrations left, so drop */ /* No content left, no registrations left, so drop */
TRACE (",#0,empty,disposed,drop"); TRACE (",#0,empty,nowriters,disposed");
drop_instance_noupdate_no_writers (rhc, instptr);
return 1; return 1;
} }
else else
@ -1318,6 +1313,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con
inst->tk = tk; inst->tk = tk;
inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1;
inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0; inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0;
inst->deadline_reg = 0;
inst->isnew = 1; inst->isnew = 1;
inst->a_sample_free = 1; inst->a_sample_free = 1;
inst->conds = 0; inst->conds = 0;
@ -1337,12 +1333,6 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con
inst->conds |= c->m_query.m_qcmask; inst->conds |= c->m_query.m_qcmask;
} }
} }
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (!inst->isdisposed)
deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ());
#endif
return inst; return inst;
} }
@ -1410,6 +1400,44 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
return RHC_STORED; return RHC_STORED;
} }
static void postprocess_instance_update (struct dds_rhc_default * __restrict rhc, struct rhc_instance * __restrict * __restrict instptr, const struct trigger_info_pre *pre, const struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, dds_entity *triggers[], size_t *ntriggers)
{
{
struct rhc_instance *inst = *instptr;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (inst->isdisposed)
{
if (inst->deadline_reg)
{
inst->deadline_reg = 0;
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
}
}
else
{
if (inst->deadline_reg)
deadline_renew_instance_locked (&rhc->deadline, &inst->deadline);
else
{
deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ());
inst->deadline_reg = 1;
}
}
#endif
if (inst_is_empty (inst) && inst->wrcount == 0)
{
drop_instance_noupdate_no_writers (rhc, instptr);
}
}
if (trigger_info_differs (rhc, pre, post, trig_qc))
update_conditions_locked (rhc, true, pre, post, trig_qc, *instptr, triggers, ntriggers);
assert (rhc_check_counts_locked (rhc, true, true));
}
/* /*
dds_rhc_store: DDSI up call into read cache to store new sample. Returns whether sample dds_rhc_store: DDSI up call into read cache to store new sample. Returns whether sample
delivered (true unless a reliable sample rejected). delivered (true unless a reliable sample rejected).
@ -1429,22 +1457,25 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
struct trigger_info_qcond trig_qc; struct trigger_info_qcond trig_qc;
rhc_store_result_t stored; rhc_store_result_t stored;
status_cb_data_t cb_data; /* Callback data for reader status callback */ status_cb_data_t cb_data; /* Callback data for reader status callback */
bool delivered = true; bool notify_data_available;
bool notify_data_available = false; dds_entity *triggers[MAX_FAST_TRIGGERS];
size_t ntriggers;
TRACE ("rhc_store(%"PRIx64",%"PRIx64" si %x has_data %d:", tk->m_iid, wr_iid, statusinfo, has_data); TRACE ("rhc_store %"PRIx64",%"PRIx64" si %x has_data %d:", tk->m_iid, wr_iid, statusinfo, has_data);
if (!has_data && statusinfo == 0) if (!has_data && statusinfo == 0)
{ {
/* Write with nothing but a key -- I guess that would be a /* Write with nothing but a key -- I guess that would be a
register, which we do implicitly. (Currently DDSI2 won't allow register, which we do implicitly. (Currently DDSI2 won't allow
it through anyway.) */ it through anyway.) */
TRACE (" ignore explicit register)\n"); TRACE (" ignore explicit register\n");
return delivered; return true;
} }
notify_data_available = false;
dummy_instance.iid = tk->m_iid; dummy_instance.iid = tk->m_iid;
stored = RHC_FILTERED; stored = RHC_FILTERED;
cb_data.raw_status_id = -1; cb_data.raw_status_id = -1;
ntriggers = 0;
init_trigger_info_qcond (&trig_qc); init_trigger_info_qcond (&trig_qc);
@ -1459,7 +1490,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
*/ */
if (!has_data && !is_dispose) if (!has_data && !is_dispose)
{ {
TRACE (" disp/unreg on unknown instance"); TRACE (" unreg on unknown instance\n");
goto error_or_nochange; goto error_or_nochange;
} }
else else
@ -1481,7 +1512,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
will raise a SAMPLE_REJECTED, and indicate that the system should will raise a SAMPLE_REJECTED, and indicate that the system should
kill itself.) Not letting instances go to ALIVE or NEW based on kill itself.) Not letting instances go to ALIVE or NEW based on
a rejected sample - (no one knows, it seemed) */ a rejected sample - (no one knows, it seemed) */
TRACE (" instance rejects sample"); TRACE (" instance rejects sample\n");
get_trigger_info_pre (&pre, inst); get_trigger_info_pre (&pre, inst);
if (has_data || is_dispose) if (has_data || is_dispose)
@ -1553,19 +1584,11 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
TRACE (" disposed->notdisposed"); TRACE (" disposed->notdisposed");
inst->isdisposed = 0; inst->isdisposed = 0;
inst->disposed_gen++; inst->disposed_gen++;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (!is_dispose)
deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ());
#endif
} }
if (is_dispose) if (is_dispose)
{ {
inst->isdisposed = 1; inst->isdisposed = 1;
inst_became_disposed = !old_isdisposed; inst_became_disposed = !old_isdisposed;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
if (inst_became_disposed)
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif
TRACE (" dispose(%d)", inst_became_disposed); TRACE (" dispose(%d)", inst_became_disposed);
} }
@ -1587,17 +1610,11 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
if (!inst->isdisposed) if (!inst->isdisposed)
{ {
inst->isdisposed = 1; inst->isdisposed = 1;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif
} }
} }
else if (inst->isdisposed) else if (inst->isdisposed)
{ {
inst->isdisposed = 0; inst->isdisposed = 0;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_register_instance_locked (&rhc->deadline, &inst->deadline, ddsrt_time_monotonic ());
#endif
} }
goto error_or_nochange; goto error_or_nochange;
} }
@ -1654,15 +1671,9 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
} }
} }
TRACE (")\n"); postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, triggers, &ntriggers);
dds_entity *triggers[MAX_FAST_TRIGGERS];
size_t ntriggers = 0;
if (trigger_info_differs (rhc, &pre, &post, &trig_qc))
update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, triggers, &ntriggers);
assert (rhc_check_counts_locked (rhc, true, true));
error_or_nochange:
ddsrt_mutex_unlock (&rhc->lock); ddsrt_mutex_unlock (&rhc->lock);
if (rhc->reader) if (rhc->reader)
@ -1671,25 +1682,10 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
dds_reader_data_available_cb (rhc->reader); dds_reader_data_available_cb (rhc->reader);
for (size_t i = 0; i < ntriggers; i++) for (size_t i = 0; i < ntriggers; i++)
dds_entity_status_signal (triggers[i], 0); dds_entity_status_signal (triggers[i], 0);
if (cb_data.raw_status_id >= 0)
dds_reader_status_cb (&rhc->reader->m_entity, &cb_data);
} }
return !(rhc->reliable && stored == RHC_REJECTED);
return delivered;
error_or_nochange:
if (rhc->reliable && (stored == RHC_REJECTED))
{
delivered = false;
}
ddsrt_mutex_unlock (&rhc->lock);
TRACE (")\n");
/* Make any reader status callback */
if (cb_data.raw_status_id >= 0 && rhc->reader)
dds_reader_status_cb (&rhc->reader->m_entity, &cb_data);
return delivered;
} }
static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_common, const struct ddsi_writer_info * __restrict wrinfo) static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_common, const struct ddsi_writer_info * __restrict wrinfo)
@ -1736,9 +1732,6 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm
if (auto_dispose && !inst->isdisposed) if (auto_dispose && !inst->isdisposed)
{ {
inst->isdisposed = 1; inst->isdisposed = 1;
#ifdef DDSI_INCLUDE_DEADLINE_MISSED
deadline_unregister_instance_locked (&rhc->deadline, &inst->deadline);
#endif
/* Set invalid sample for disposing it (unregister may also set it for unregistering) */ /* Set invalid sample for disposing it (unregister may also set it for unregistering) */
if (inst->latest) if (inst->latest)
@ -1758,13 +1751,9 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm
} }
(void) dds_rhc_unregister (rhc, &inst, wrinfo, inst->tstamp, &post, &trig_qc); (void) dds_rhc_unregister (rhc, &inst, wrinfo, inst->tstamp, &post, &trig_qc);
TRACE ("\n");
notify_data_available = true; notify_data_available = true;
if (trigger_info_differs (rhc, &pre, &post, &trig_qc)) postprocess_instance_update (rhc, &inst, &pre, &post, &trig_qc, NULL, &ntriggers);
update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, NULL, &ntriggers); TRACE ("\n");
assert (rhc_check_counts_locked (rhc, true, false));
} }
} }
TRACE (")\n"); TRACE (")\n");