From 58f21af36e8f5593a633f17cafdd35082e8f94dc Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 10 Apr 2019 11:00:10 +0200 Subject: [PATCH] set DATA_AVAILABLE when deleting writer (#148) Deleting a writer causes unregisters (and possibly disposes) in the rest of the network, and these updates to the instances should trigger DATA_AVAILABLE. Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_rhc.c | 17 ++++- src/core/ddsc/tests/listener.c | 110 ++++++++++++++++++++++++++++++++- 2 files changed, 124 insertions(+), 3 deletions(-) diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index b427df3..30dc4f3 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -1523,6 +1523,7 @@ void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writ built-in topics and in get_instance_handle, and one used internally for tracking registrations and unregistrations. */ bool trigger_waitsets = false; + bool notify_data_available = false; struct rhc_instance *inst; struct ut_hhIter iter; const uint64_t wr_iid = pwr_info->iid; @@ -1568,16 +1569,28 @@ void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writ TRACE ("\n"); + notify_data_available = true; if (trigger_info_differs (&pre, &post, &trig_qc) && update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst)) trigger_waitsets = true; assert (rhc_check_counts_locked (rhc, true, false)); } } TRACE (")\n"); + ddsrt_mutex_unlock (&rhc->lock); - if (trigger_waitsets) - dds_entity_status_signal (&rhc->reader->m_entity); + if (rhc->reader) + { + if (notify_data_available && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) + { + ddsrt_atomic_inc32 (&rhc->n_cbs); + dds_reader_data_available_cb (rhc->reader); + ddsrt_atomic_dec32 (&rhc->n_cbs); + } + + if (trigger_waitsets) + dds_entity_status_signal (&rhc->reader->m_entity); + } } void dds_rhc_relinquish_ownership (struct rhc * __restrict rhc, const uint64_t wr_iid) diff --git a/src/core/ddsc/tests/listener.c b/src/core/ddsc/tests/listener.c index e3f5c65..d049c85 100644 --- a/src/core/ddsc/tests/listener.c +++ b/src/core/ddsc/tests/listener.c @@ -400,7 +400,8 @@ static void fini_triggering_test(void) { dds_delete(g_reader); - dds_delete(g_writer); + if (g_writer) + dds_delete(g_writer); fini_triggering_base(); } @@ -847,6 +848,113 @@ CU_Test(ddsc_listener, data_available, .init=init_triggering_test, .fini=fini_tr ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(status, 0); + + /* Deleting the writer causes unregisters (or dispose+unregister), and those + should trigger DATA_AVAILABLE as well */ + cb_called = 0; + cb_reader = 0; + ret = dds_delete (g_writer); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + g_writer = 0; + triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); + + /* The listener should have swallowed the status. */ + ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(status, 0); + ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(status, 0); +} + +CU_Test(ddsc_listener, data_available_delete_writer, .init=init_triggering_test, .fini=fini_triggering_test) +{ + dds_return_t ret; + uint32_t triggered; + uint32_t status; + RoundTripModule_DataType sample; + memset (&sample, 0, sizeof (sample)); + + /* We are interested in data available notifications. */ + dds_lset_data_available(g_listener, data_available_cb); + ret = dds_set_listener(g_reader, g_listener); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + + /* Write sample, wait for the listener to swallow the status. */ + ret = dds_write(g_writer, &sample); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); + + /* Deleting the writer must trigger DATA_AVAILABLE as well */ + cb_called = 0; + cb_reader = 0; + ret = dds_delete (g_writer); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + g_writer = 0; + triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); + + /* The listener should have swallowed the status. */ + ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(status, 0); + ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(status, 0); +} + +CU_Test(ddsc_listener, data_available_delete_writer_disposed, .init=init_triggering_test, .fini=fini_triggering_test) +{ + dds_return_t ret; + uint32_t triggered; + uint32_t status; + RoundTripModule_DataType sample; + memset (&sample, 0, sizeof (sample)); + + /* We are interested in data available notifications. */ + dds_lset_data_available(g_listener, data_available_cb); + ret = dds_set_listener(g_reader, g_listener); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + + /* Write & dispose sample and take it so that the instance is empty & disposed. Then deleting + the writer should silently drop the instance. */ + ret = dds_write(g_writer, &sample); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ret = dds_dispose(g_writer, &sample); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); + + /* Take all data so that the instance becomes empty & disposed */ + do { + void *sampleptr = &sample; + dds_sample_info_t info; + ret = dds_take (g_reader, &sampleptr, &info, 1, 1); + } while (ret > 0); + + /* Deleting the writer should not trigger DATA_AVAILABLE with all instances empty & disposed */ + cb_called = 0; + cb_reader = 0; + ret = dds_delete (g_writer); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + g_writer = 0; + triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); + + /* The listener should have swallowed the status. */ + ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(status, 0); + ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(status, 0); } CU_Test(ddsc_listener, data_on_readers, .init=init_triggering_test, .fini=fini_triggering_test)