diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index 2ee62f2..09989e0 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -267,6 +267,7 @@ struct rhc_instance { unsigned isnew : 1; /* NEW or NOT_NEW view state */ unsigned a_sample_free : 1; /* whether or not a_sample is in use */ unsigned isdisposed : 1; /* DISPOSED or NOT_DISPOSED (if not disposed, wrcount determines ALIVE/NOT_ALIVE_NO_WRITERS) */ + unsigned autodispose : 1; /* wrcount > 0 => at least one registered writer has had auto-dispose set on some update */ unsigned wr_iid_islive : 1; /* whether wr_iid is of a live writer */ unsigned inv_exists : 1; /* whether or not state change occurred since last sample (i.e., must return invalid sample) */ unsigned inv_isread : 1; /* whether or not that state change has been read before */ @@ -696,7 +697,12 @@ static void inst_clear_invsample_if_exists (struct dds_rhc_default *rhc, struct static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance *inst, struct trigger_info_qcond *trig_qc, bool *nda) { - if (!inst->inv_exists || inst->inv_isread) + if (inst->inv_exists && !inst->inv_isread) + { + /* FIXME: should this indeed trigger a "notify data available" event?*/ + *nda = true; + } + else { /* Obviously optimisable, but that is perhaps not worth the bother */ inst_clear_invsample_if_exists (rhc, inst, trig_qc); @@ -1027,7 +1033,7 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *__restric *instptr = NULL; } -static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update) +static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool autodispose, bool iid_update) { const uint64_t inst_wr_iid = inst->wr_iid_islive ? inst->wr_iid : 0; @@ -1064,6 +1070,7 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance * } inst->wrcount++; inst->no_writers_gen++; + inst->autodispose = autodispose; TRACE ("new1"); if (!inst_is_empty (inst) && !inst->isdisposed) @@ -1089,6 +1096,8 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance * if (lwregs_add (&rhc->registrations, inst->iid, wr_iid)) { inst->wrcount++; + if (autodispose) + inst->autodispose = 1; TRACE ("new2iidnull"); } else @@ -1125,6 +1134,8 @@ static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance * registers a previously unknown writer or not */ TRACE ("new3"); inst->wrcount++; + if (autodispose) + inst->autodispose = 1; } else { @@ -1218,6 +1229,8 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in { struct rhc_instance * const inst = *instptr; assert (inst->wrcount > 0); + if (wrinfo->auto_dispose) + inst->autodispose = 1; if (--inst->wrcount > 0) { @@ -1239,20 +1252,27 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in if (!inst_is_empty (inst)) { /* Instance still has content - do not drop until application - takes the last sample. Set the invalid sample if the latest - sample has been read already, so that the application can - read the change to not-alive. (If the latest sample is still - unread, we don't bother, even though it means the application - won't see the timestamp for the unregister event. It shouldn't - care.) */ - if (inst->latest == NULL || inst->latest->isread) - { - inst_set_invsample (rhc, inst, trig_qc, nda); - update_inst (inst, wrinfo, false, tstamp); - } + takes the last sample. Set the invalid sample if the latest + sample has been read already, so that the application can + read the change to not-alive. (If the latest sample is still + unread, we don't bother, even though it means the application + won't see the timestamp for the unregister event. It shouldn't + care.) */ if (!inst->isdisposed) { - rhc->n_not_alive_no_writers++; + if (inst->latest == NULL || inst->latest->isread) + { + inst_set_invsample (rhc, inst, trig_qc, nda); + update_inst (inst, wrinfo, false, tstamp); + } + if (!inst->autodispose) + rhc->n_not_alive_no_writers++; + else + { + TRACE (",autodispose"); + inst->isdisposed = 1; + rhc->n_not_alive_disposed++; + } } inst->wr_iid_islive = 0; return 0; @@ -1270,6 +1290,11 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in assert (inst_is_empty (inst)); inst_set_invsample (rhc, inst, trig_qc, nda); update_inst (inst, wrinfo, false, tstamp); + if (inst->autodispose) + { + TRACE (",autodispose"); + inst->isdisposed = 1; + } account_for_empty_to_nonempty_transition (rhc, inst); inst->wr_iid_islive = 0; return 0; @@ -1313,6 +1338,7 @@ static struct rhc_instance *alloc_new_instance (struct dds_rhc_default *rhc, con inst->tk = tk; inst->wrcount = (serdata->statusinfo & NN_STATUSINFO_UNREGISTER) ? 0 : 1; inst->isdisposed = (serdata->statusinfo & NN_STATUSINFO_DISPOSE) != 0; + inst->autodispose = wrinfo->auto_dispose; inst->deadline_reg = 0; inst->isnew = 1; inst->a_sample_free = 1; @@ -1517,7 +1543,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons get_trigger_info_pre (&pre, inst); if (has_data || is_dispose) { - dds_rhc_register (rhc, inst, wr_iid, false); + dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, false); } if (statusinfo & NN_STATUSINFO_UNREGISTER) { @@ -1563,7 +1589,7 @@ static bool dds_rhc_default_store (struct ddsi_rhc * __restrict rhc_common, cons (i.e., out-of-memory), abort the operation and hope that the caller can still notify the application. */ - dds_rhc_register (rhc, inst, wr_iid, true); + dds_rhc_register (rhc, inst, wr_iid, wrinfo->auto_dispose, true); /* Sample arriving for a NOT_ALIVE instance => view state NEW */ if (has_data && not_alive) @@ -1710,16 +1736,16 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm struct rhc_instance *inst; struct ddsrt_hh_iter iter; const uint64_t wr_iid = wrinfo->iid; - const int auto_dispose = wrinfo->auto_dispose; - + const bool auto_dispose = wrinfo->auto_dispose; size_t ntriggers = SIZE_MAX; ddsrt_mutex_lock (&rhc->lock); - TRACE ("rhc_unregister_wr_iid(%"PRIx64",%d:\n", wr_iid, auto_dispose); + TRACE ("rhc_unregister_wr_iid %"PRIx64",%d:\n", wr_iid, wrinfo->auto_dispose); for (inst = ddsrt_hh_iter_first (rhc->instances, &iter); inst; inst = ddsrt_hh_iter_next (&iter)) { if ((inst->wr_iid_islive && inst->wr_iid == wr_iid) || lwregs_contains (&rhc->registrations, inst->iid, wr_iid)) { + assert (inst->wrcount > 0); struct trigger_info_pre pre; struct trigger_info_post post; struct trigger_info_qcond trig_qc; @@ -1731,10 +1757,11 @@ static void dds_rhc_default_unregister_wr (struct ddsi_rhc * __restrict rhc_comm assert (inst->wrcount > 0); if (auto_dispose && !inst->isdisposed) { + notify_data_available = true; inst->isdisposed = 1; /* Set invalid sample for disposing it (unregister may also set it for unregistering) */ - if (inst->latest) + if (inst->latest && !inst->latest->isread) { assert (!inst->inv_exists); rhc->n_not_alive_disposed++; diff --git a/src/core/ddsc/tests/listener.c b/src/core/ddsc/tests/listener.c index 1ad9f71..0ab0108 100644 --- a/src/core/ddsc/tests/listener.c +++ b/src/core/ddsc/tests/listener.c @@ -997,9 +997,9 @@ CU_Test(ddsc_listener, data_available_delete_writer_disposed, .init=init_trigger ret = dds_delete (g_writer); CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); g_writer = 0; - triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); + ddsrt_mutex_lock(&g_mutex); + CU_ASSERT_EQUAL_FATAL(cb_called & DDS_DATA_AVAILABLE_STATUS_ID, 0); + ddsrt_mutex_unlock(&g_mutex); /* The listener should have swallowed the status. */ ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); diff --git a/src/core/xtests/rhc_torture/rhc_torture.c b/src/core/xtests/rhc_torture/rhc_torture.c index fe9dc53..132fbc6 100644 --- a/src/core/xtests/rhc_torture/rhc_torture.c +++ b/src/core/xtests/rhc_torture/rhc_torture.c @@ -935,6 +935,7 @@ int main (int argc, char **argv) printf ("************* 0 *************\n"); struct dds_rhc *rhc = mkrhc (gv, NULL, DDS_HISTORY_KEEP_LAST, 1, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); struct proxy_writer *wr0 = mkwr (gv, 1); + struct proxy_writer *wr1 = mkwr (gv, 1); uint64_t iid0, iid1, iid_t; iid0 = store (tkmap, rhc, wr0, mksample (0, 0), print, false); iid1 = store (tkmap, rhc, wr0, mksample (1, NN_STATUSINFO_DISPOSE), print, false); @@ -944,17 +945,38 @@ int main (int argc, char **argv) { 0, 0, 0, 0, 0, 0, 0, 0 } }; rdall (rhc, c0, print, states_seen); - iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); + /* write instance 0 with 2nd writer to have 2 live writers */ + iid_t = store (tkmap, rhc, wr1, mksample (0, 0), print, false); assert (iid_t == iid0); - (void)iid0; - (void)iid_t; const struct check c1[] = { - { "ROU", iid0, wr0->e.iid, 0,0, 1, 0,1 }, - { "NOU", iid0, 0, 0,0, 0, 0,0 }, + { "NOA", iid0, wr1->e.iid, 0,0, 1, 0,3 }, { "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 }, { 0, 0, 0, 0, 0, 0, 0, 0 } }; rdall (rhc, c1, print, states_seen); + /* unregister instance 0 with wr0 - autodispose, but 2nd writer keeps it alive, no visible change */ + iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); + assert (iid_t == iid0); + const struct check c2[] = { + { "ROA", iid0, wr1->e.iid, 0,0, 1, 0,3 }, + { "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 }, + { 0, 0, 0, 0, 0, 0, 0, 0 } + }; + rdall (rhc, c2, print, states_seen); + /* unregistering instance 0 again should be a no-op because wr0 no longer has it registered */ + iid_t = store (tkmap, rhc, wr0, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); + assert (iid_t == iid0); + rdall (rhc, c2, print, states_seen); + /* unregistering instance 0 with wr1 - autodispose, no live writers -> dispose */ + iid_t = store (tkmap, rhc, wr1, mkkeysample (0, NN_STATUSINFO_UNREGISTER), print, false); + assert (iid_t == iid0); + const struct check c3[] = { + { "ROD", iid0, wr1->e.iid, 0,0, 1, 0,3 }, + { "NOD", iid0, 0, 0,0, 0, 0,0 }, + { "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 }, + { 0, 0, 0, 0, 0, 0, 0, 0 } + }; + rdall (rhc, c3, print, states_seen); thread_state_awake_domain_ok (lookup_thread_state ()); struct ddsi_writer_info wr0_info; wr0_info.auto_dispose = wr0->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances; @@ -966,16 +988,18 @@ int main (int argc, char **argv) #endif dds_rhc_unregister_wr (rhc, &wr0_info); thread_state_asleep (lookup_thread_state ()); - const struct check c2[] = { - { "ROU", iid0, wr0->e.iid, 0,0, 1, 0,1 }, - { "ROU", iid0, 0, 0,0, 0, 0,0 }, + const struct check c4[] = { + { "ROD", iid0, wr1->e.iid, 0,0, 1, 0,3 }, + { "ROD", iid0, 0, 0,0, 0, 0,0 }, { "ROD", iid1, wr0->e.iid, 0,0, 1, 1,2 }, - { "NOD", iid1, 0, 0,0, 0, 1,0 }, + // { "NOD", iid1, 0, 0,0, 0, 1,0 }, doesn't exist because it is already disposed { 0, 0, 0, 0, 0, 0, 0, 0 } }; - tkall (rhc, c2, print, states_seen); + tkall (rhc, c4, print, states_seen); frhc (rhc); fwr (wr0); + fwr (wr1); + (void)iid_t; } if (1 >= first)