Auto-dispose when the instance goes to NO_WRITERS

This changes the behaviour of auto-dispose writers: instead of always
disposing when the writer disposes the data, it now only disposes the
data when the instance would otherwise go to the "no writers" state.
This only affects the behaviour when there are multiple writers for the
same instance.

In case the writers use a different value for the auto-dispose setting,
it now tracks whether an instance has ever been touched by an writer
with auto-dispose enabled, and treats auto-disposes the instance when
the last writer leaves if this is the case.  This way, if an instance is
registered by one auto-dispose and one non-auto-dispose writer, the
order of unregistering does not matter.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-04-29 17:59:22 +02:00 committed by eboasson
parent c29a81b339
commit 7f8f1d1a12
3 changed files with 84 additions and 33 deletions

View file

@ -267,6 +267,7 @@ struct rhc_instance {
unsigned isnew : 1; /* NEW or NOT_NEW view state */ unsigned isnew : 1; /* NEW or NOT_NEW view state */
unsigned a_sample_free : 1; /* whether or not a_sample is in use */ unsigned a_sample_free : 1; /* whether or not a_sample is in use */
unsigned isdisposed : 1; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */ unsigned isdisposed : 1; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */
unsigned autodispose : 1; /* wrcount > 0 => at least one registered writer has had auto-dispose set on some update */
unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */ unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */
unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */ unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */
unsigned inv_isread : 1; /* whether or not that state change has been read before */ unsigned inv_isread : 1; /* whether or not that state change has been read before */
@ -696,7 +697,12 @@ static void inst_clear_invsample_if_exists (struct dds_rhc_default *rhc, struct
static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc, bool *nda) static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc, bool *nda)
{ {
if (!inst->inv_exists || inst->inv_isread) if (inst->inv_exists && !inst->inv_isread)
{
/* FIXME: should this indeed trigger a "notify data available" event?*/
*nda = true;
}
else
{ {
/* Obviously optimisable, but that is perhaps not worth the bother */ /* Obviously optimisable, but that is perhaps not worth the bother */
inst_clear_invsample_if_exists (rhc, inst, trig_qc); inst_clear_invsample_if_exists (rhc, inst, trig_qc);
@ -1027,7 +1033,7 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *__restric
*instptr = NULL; *instptr = NULL;
} }
static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update) static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool autodispose, bool iid_update)
{ {
const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0; const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0;
@ -1064,6 +1070,7 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *
} }
inst->wrcount++; inst->wrcount++;
inst->no_writers_gen++; inst->no_writers_gen++;
inst->autodispose = autodispose;
TRACE ("new1"); TRACE ("new1");
if (!inst_is_empty (inst) && !inst->isdisposed) if (!inst_is_empty (inst) && !inst->isdisposed)
@ -1089,6 +1096,8 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *
if (lwregs_add (&rhc->registrations, inst->iid, wr_iid)) if (lwregs_add (&rhc->registrations, inst->iid, wr_iid))
{ {
inst->wrcount++; inst->wrcount++;
if (autodispose)
inst->autodispose = 1;
TRACE ("new2iidnull"); TRACE ("new2iidnull");
} }
else else
@ -1125,6 +1134,8 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *
registers a previously unknown writer or not */ registers a previously unknown writer or not */
TRACE ("new3"); TRACE ("new3");
inst->wrcount++; inst->wrcount++;
if (autodispose)
inst->autodispose = 1;
} }
else else
{ {
@ -1218,6 +1229,8 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
{ {
struct rhc_instance * const inst = *instptr; struct rhc_instance * const inst = *instptr;
assert (inst->wrcount > 0); assert (inst->wrcount > 0);
if (wrinfo->auto_dispose)
inst->autodispose = 1;
if (--inst->wrcount > 0) if (--inst->wrcount > 0)
{ {
@ -1245,14 +1258,21 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
unread, we don't bother, even though it means the application unread, we don't bother, even though it means the application
won't see the timestamp for the unregister event. It shouldn't won't see the timestamp for the unregister event. It shouldn't
care.) */ care.) */
if (!inst->isdisposed)
{
if (inst->latest == NULL || inst->latest->isread) if (inst->latest == NULL || inst->latest->isread)
{ {
inst_set_invsample (rhc, inst, trig_qc, nda); inst_set_invsample (rhc, inst, trig_qc, nda);
update_inst (inst, wrinfo, false, tstamp); update_inst (inst, wrinfo, false, tstamp);
} }
if (!inst->isdisposed) if (!inst->autodispose)
{
rhc->n_not_alive_no_writers++; rhc->n_not_alive_no_writers++;
else
{
TRACE (",autodispose");
inst->isdisposed = 1;
rhc->n_not_alive_disposed++;
}
} }
inst->wr_iid_islive = 0; inst->wr_iid_islive = 0;
return 0; return 0;
@ -1270,6 +1290,11 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
assert (inst_is_empty (inst)); assert (inst_is_empty (inst));
inst_set_invsample (rhc, inst, trig_qc, nda); inst_set_invsample (rhc, inst, trig_qc, nda);
update_inst (inst, wrinfo, false, tstamp); update_inst (inst, wrinfo, false, tstamp);
if (inst->autodispose)
{
TRACE (",autodispose");
inst->isdisposed = 1;
}
account_for_empty_to_nonempty_transition (rhc, inst); account_for_empty_to_nonempty_transition (rhc, inst);
inst->wr_iid_islive = 0; inst->wr_iid_islive = 0;
return 0; return 0;
@ -1313,6 +1338,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con
inst->tk = tk; inst->tk = tk;
inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1;
inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0; inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0;
inst->autodispose = wrinfo->auto_dispose;
inst->deadline_reg = 0; inst->deadline_reg = 0;
inst->isnew = 1; inst->isnew = 1;
inst->a_sample_free = 1; inst->a_sample_free = 1;
@ -1517,7 +1543,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
get_trigger_info_pre (&pre, inst); get_trigger_info_pre (&pre, inst);
if (has_data || is_dispose) if (has_data || is_dispose)
{ {
dds_rhc_register (rhc, inst, wr_iid, false); dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, false);
} }
if (statusinfo & NN_STATUSINFO_UNREGISTER) if (statusinfo & NN_STATUSINFO_UNREGISTER)
{ {
@ -1563,7 +1589,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons
(i.e., out-of-memory), abort the operation and hope that the (i.e., out-of-memory), abort the operation and hope that the
caller can still notify the application. */ caller can still notify the application. */
dds_rhc_register (rhc, inst, wr_iid, true); dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, true);
/* Sample arriving for a NOT_ALIVE instance => view state NEW */ /* Sample arriving for a NOT_ALIVE instance => view state NEW */
if (has_data && not_alive) if (has_data && not_alive)
@ -1710,16 +1736,16 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm
struct rhc_instance *inst; struct rhc_instance *inst;
struct ddsrt_hh_iter iter; struct ddsrt_hh_iter iter;
const uint64_t wr_iid = wrinfo->iid; const uint64_t wr_iid = wrinfo->iid;
const int auto_dispose = wrinfo->auto_dispose; const bool auto_dispose = wrinfo->auto_dispose;
size_t ntriggers = SIZE_MAX; size_t ntriggers = SIZE_MAX;
ddsrt_mutex_lock (&rhc->lock); ddsrt_mutex_lock (&rhc->lock);
TRACE ("rhc_unregister_wr_iid(%"PRIx64",%d:\n", wr_iid, auto_dispose); TRACE ("rhc_unregister_wr_iid %"PRIx64",%d:\n", wr_iid, wrinfo->auto_dispose);
for (inst = ddsrt_hh_iter_first (rhc->instances, &iter); inst; inst = ddsrt_hh_iter_next (&iter)) for (inst = ddsrt_hh_iter_first (rhc->instances, &iter); inst; inst = ddsrt_hh_iter_next (&iter))
{ {
if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid)) if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid))
{ {
assert (inst->wrcount > 0);
struct trigger_info_pre pre; struct trigger_info_pre pre;
struct trigger_info_post post; struct trigger_info_post post;
struct trigger_info_qcond trig_qc; struct trigger_info_qcond trig_qc;
@ -1731,10 +1757,11 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm
assert (inst->wrcount > 0); assert (inst->wrcount > 0);
if (auto_dispose && !inst->isdisposed) if (auto_dispose && !inst->isdisposed)
{ {
notify_data_available = true;
inst->isdisposed = 1; inst->isdisposed = 1;
/* Set invalid sample for disposing it (unregister may also set it for unregistering) */ /* Set invalid sample for disposing it (unregister may also set it for unregistering) */
if (inst->latest) if (inst->latest && !inst->latest->isread)
{ {
assert (!inst->inv_exists); assert (!inst->inv_exists);
rhc->n_not_alive_disposed++; rhc->n_not_alive_disposed++;

View file

@ -997,9 +997,9 @@ CU_Test(ddsc_listener, data_available_delete_writer_disposed, .init=init_trigger
ret = dds_delete (g_writer); ret = dds_delete (g_writer);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
g_writer = 0; g_writer = 0;
triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); ddsrt_mutex_lock(&g_mutex);
CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); CU_ASSERT_EQUAL_FATAL(cb_called & DDS_DATA_AVAILABLE_STATUS_ID, 0);
CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); ddsrt_mutex_unlock(&g_mutex);
/* The listener should have swallowed the status. */ /* The listener should have swallowed the status. */
ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS);

View file

@ -935,6 +935,7 @@ int main (int argc, char **argv)
printf ("************* 0 *************\n"); printf ("************* 0 *************\n");
struct dds_rhc *rhc = mkrhc (gv, NULL, DDS_HISTORY_KEEP_LAST, 1, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); struct dds_rhc *rhc = mkrhc (gv, NULL, DDS_HISTORY_KEEP_LAST, 1, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP);
struct proxy_writer *wr0 = mkwr (gv, 1); struct proxy_writer *wr0 = mkwr (gv, 1);
struct proxy_writer *wr1 = mkwr (gv, 1);
uint64_t iid0, iid1, iid_t; uint64_t iid0, iid1, iid_t;
iid0 = store (tkmap, rhc, wr0, mksample (0, 0), print, false); iid0 = store (tkmap, rhc, wr0, mksample (0, 0), print, false);
iid1 = store (tkmap, rhc, wr0, mksample (1, NN_STATUSINFO_DISPOSE), print, false); iid1 = store (tkmap, rhc, wr0, mksample (1, NN_STATUSINFO_DISPOSE), print, false);
@ -944,17 +945,38 @@ int main (int argc, char **argv)
{ 0, 0, 0, 0, 0, 0, 0, 0 } { 0, 0, 0, 0, 0, 0, 0, 0 }
}; };
rdall (rhc, c0, print, states_seen); rdall (rhc, c0, print, states_seen);
iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); /* write instance 0 with 2nd writer to have 2 live writers */
iid_t = store (tkmap, rhc, wr1, mksample (0, 0), print, false);
assert (iid_t == iid0); assert (iid_t == iid0);
(void)iid0;
(void)iid_t;
const struct check c1[] = { const struct check c1[] = {
{ "ROU", iid0, wr0->e.iid, 0,0, 1, 0,1 }, { "NOA", iid0, wr1->e.iid, 0,0, 1, 0,3 },
{ "NOU", iid0, 0, 0,0, 0, 0,0 },
{ "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 }, { "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 },
{ 0, 0, 0, 0, 0, 0, 0, 0 } { 0, 0, 0, 0, 0, 0, 0, 0 }
}; };
rdall (rhc, c1, print, states_seen); rdall (rhc, c1, print, states_seen);
/* unregister instance 0 with wr0 - autodispose, but 2nd writer keeps it alive, no visible change */
iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false);
assert (iid_t == iid0);
const struct check c2[] = {
{ "ROA", iid0, wr1->e.iid, 0,0, 1, 0,3 },
{ "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 },
{ 0, 0, 0, 0, 0, 0, 0, 0 }
};
rdall (rhc, c2, print, states_seen);
/* unregistering instance 0 again should be a no-op because wr0 no longer has it registered */
iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false);
assert (iid_t == iid0);
rdall (rhc, c2, print, states_seen);
/* unregistering instance 0 with wr1 - autodispose, no live writers -> dispose */
iid_t = store (tkmap, rhc, wr1, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false);
assert (iid_t == iid0);
const struct check c3[] = {
{ "ROD", iid0, wr1->e.iid, 0,0, 1, 0,3 },
{ "NOD", iid0, 0, 0,0, 0, 0,0 },
{ "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 },
{ 0, 0, 0, 0, 0, 0, 0, 0 }
};
rdall (rhc, c3, print, states_seen);
thread_state_awake_domain_ok (lookup_thread_state ()); thread_state_awake_domain_ok (lookup_thread_state ());
struct ddsi_writer_info wr0_info; struct ddsi_writer_info wr0_info;
wr0_info.auto_dispose = wr0->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances; wr0_info.auto_dispose = wr0->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances;
@ -966,16 +988,18 @@ int main (int argc, char **argv)
#endif #endif
dds_rhc_unregister_wr (rhc, &wr0_info); dds_rhc_unregister_wr (rhc, &wr0_info);
thread_state_asleep (lookup_thread_state ()); thread_state_asleep (lookup_thread_state ());
const struct check c2[] = { const struct check c4[] = {
{ "ROU", iid0, wr0->e.iid, 0,0, 1, 0,1 }, { "ROD", iid0, wr1->e.iid, 0,0, 1, 0,3 },
{ "ROU", iid0, 0, 0,0, 0, 0,0 }, { "ROD", iid0, 0, 0,0, 0, 0,0 },
{ "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 }, { "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 },
{ "NOD", iid1, 0, 0,0, 0, 1,0 }, // { "NOD", iid1, 0, 0,0, 0, 1,0 }, doesn't exist because it is already disposed
{ 0, 0, 0, 0, 0, 0, 0, 0 } { 0, 0, 0, 0, 0, 0, 0, 0 }
}; };
tkall (rhc, c2, print, states_seen); tkall (rhc, c4, print, states_seen);
frhc (rhc); frhc (rhc);
fwr (wr0); fwr (wr0);
fwr (wr1);
(void)iid_t;
} }
if (1 >= first) if (1 >= first)