diff --git a/.travis.yml b/.travis.yml index f2f8ef3..1be51fc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -215,7 +215,7 @@ script: ${SCAN_BUILD} cmake --build . --config ${BUILD_TYPE} --target install ;; esac - - CYCLONEDDS_URI='allconfigstderr' ctest -j 4 --output-on-failure -T test -E '^CUnit_ddsrt_random_default_random$' -C ${BUILD_TYPE} + - CYCLONEDDS_URI='alltrueconfigstderr' ctest -j 4 --output-on-failure -T test -E '^CUnit_ddsrt_random_default_random$' -C ${BUILD_TYPE} - if [ "${ASAN}" != "none" ]; then CMAKE_LINKER_FLAGS="-DCMAKE_LINKER_FLAGS=-fsanitize=${USE_SANITIZER}"; CMAKE_C_FLAGS="-DCMAKE_C_FLAGS=-fsanitize=${USE_SANITIZER}"; diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c index 0976c5f..856e897 100644 --- a/src/core/ddsc/tests/liveliness.c +++ b/src/core/ddsc/tests/liveliness.c @@ -814,3 +814,272 @@ CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp) } while (!test_finished); } #undef MAX_WRITERS + +/** + * Check that manual-by-participant/topic writers with lease duration 0ns and 1ns work. + */ +struct liveliness_changed_state { + ddsrt_mutex_t lock; + dds_instance_handle_t w0_handle; + bool weirdness; + uint32_t w0_alive, w0_not_alive; +}; + +static void liveliness_changed_listener (dds_entity_t rd, const dds_liveliness_changed_status_t status, void *arg) +{ + struct liveliness_changed_state *st = arg; + (void) rd; + + ddsrt_mutex_lock (&st->lock); + if (status.last_publication_handle != st->w0_handle) + { + if (st->w0_handle == 0) + { + printf ("liveliness_changed_listener: w0 = %"PRIx64"\n", status.last_publication_handle); + st->w0_handle = status.last_publication_handle; + } + else + { + printf ("liveliness_changed_listener: too many writer handles\n"); + st->weirdness = true; + } + } + + if (status.alive_count_change != 0 || status.not_alive_count_change != 0) + { + switch (status.alive_count_change) + { + case -1: + break; + case 1: + if (status.last_publication_handle == st->w0_handle) + st->w0_alive++; + else + { + printf ("liveliness_changed_listener: alive_count_change = %d: unrecognized writer\n", status.alive_count_change); + st->weirdness = true; + } + break; + default: + printf ("liveliness_changed_listener: alive_count_change = %d\n", status.alive_count_change); + st->weirdness = true; + } + + switch (status.not_alive_count_change) + { + case -1: + break; + case 1: + if (status.last_publication_handle == st->w0_handle) + st->w0_not_alive++; + else + { + printf ("liveliness_changed_listener: not_alive_count_change = %d: unrecognized writer\n", status.not_alive_count_change); + st->weirdness = true; + } + break; + default: + printf ("liveliness_changed_listener: not_alive_count_change = %d\n", status.not_alive_count_change); + st->weirdness = true; + } + } + ddsrt_mutex_unlock (&st->lock); +} + +static bool get_and_check_status (dds_entity_t reader, dds_entity_t writer_active) +{ + struct dds_liveliness_changed_status lstatus; + struct dds_subscription_matched_status sstatus; + struct dds_publication_matched_status pstatus; + dds_return_t rc; + rc = dds_get_subscription_matched_status(reader, &sstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_get_publication_matched_status(writer_active, &pstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + CU_ASSERT_FATAL(lstatus.alive_count + lstatus.not_alive_count <= 2); + printf ("%d %d %d %d\n", (int)sstatus.current_count, (int)lstatus.alive_count, (int)lstatus.not_alive_count, (int)pstatus.current_count); + return (sstatus.current_count == 2 && lstatus.not_alive_count == 2 && pstatus.current_count == 1); +} + +static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur) +{ + const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 100 : 10; + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writer_active; /* writing */ + dds_entity_t writer_inactive; /* not writing, liveliness should still toggle */ + dds_entity_t waitset; + dds_listener_t *listener; + dds_qos_t *qos; + dds_return_t rc; + struct dds_liveliness_changed_status lstatus; + char name[100]; + Space_Type1 sample = {1, 0, 0}; + struct liveliness_changed_state listener_state = { + .weirdness = false, + .w0_handle = 0, + .w0_alive = 0, + .w0_not_alive = 0, + }; + ddsrt_mutex_init (&listener_state.lock); + + waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE); + CU_ASSERT_FATAL(waitset > 0); + + qos = dds_create_qos(); + CU_ASSERT_FATAL(qos != NULL); + dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY); + dds_qset_history(qos, DDS_HISTORY_KEEP_ALL, 0); + + create_topic_name("ddsc_liveliness_lease_duration_zero", g_topic_nr++, name, sizeof name); + pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, qos, NULL); + CU_ASSERT_FATAL(pub_topic > 0); + sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, qos, NULL); + CU_ASSERT_FATAL(sub_topic > 0); + + /* reader liveliness is always automatic/infinity */ + dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + reader = dds_create_reader(g_sub_participant, sub_topic, qos, NULL); + CU_ASSERT_FATAL(reader > 0); + rc = dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_waitset_attach(waitset, reader, reader); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + /* writer liveliness varies */ + dds_qset_liveliness(qos, lkind, ldur); + writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); + CU_ASSERT_FATAL(writer_active > 0); + writer_inactive = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); + CU_ASSERT_FATAL(writer_inactive > 0); + rc = dds_set_status_mask(writer_active, DDS_PUBLICATION_MATCHED_STATUS); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_waitset_attach(waitset, writer_active, writer_active); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + dds_delete_qos(qos); + + /* wait for writers to be discovered and to have lost their liveliness, and for + writer_active to have discovered the reader */ + while (!get_and_check_status (reader, writer_active)) + { + rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(5)); + if (rc < 1) + { + get_and_check_status (reader, writer_active); + CU_ASSERT_FATAL(rc >= 1); + } + } + + /* switch to using a listener: those allow us to observe all events */ + listener = dds_create_listener (&listener_state); + dds_lset_liveliness_changed(listener, liveliness_changed_listener); + rc = dds_set_listener (reader, listener); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + dds_delete_listener (listener); + + /* write as fast as possible - we don't expect this to cause the writers + to gain and lose liveliness once for each sample, but it should have + become alive at least once and fall back to not alive afterward */ + for (uint32_t i = 0; i < nsamples; i++) + { + rc = dds_write(writer_active, &sample); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + if (sleep && i < nsamples - 1) + dds_sleepfor(sleep); + } + + rc = dds_wait_for_acks(writer_active, DDS_SECS(5)); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + /* verify the reader received all samples */ + { + void *raw[] = { &sample }; + dds_sample_info_t si; + uint32_t cnt = 0; + while (dds_take(reader, raw, &si, 1, 1) == 1) + cnt++; + CU_ASSERT(cnt == nsamples); + } + + /* transition to not alive is not necessarily immediate */ + { + int retries = 100; + rc = dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); + + ddsrt_mutex_lock (&listener_state.lock); + printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive); + CU_ASSERT(!listener_state.weirdness); + CU_ASSERT(listener_state.w0_handle != 0); + while (listener_state.w0_not_alive < listener_state.w0_alive && retries-- > 0) + { + ddsrt_mutex_unlock(&listener_state.lock); + dds_sleepfor(DDS_MSECS(10)); + rc = dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + ddsrt_mutex_lock(&listener_state.lock); + } + + printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); + printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive); + CU_ASSERT(listener_state.w0_alive == listener_state.w0_not_alive); + if (sleep == 0) + { + /* if not sleeping, it's ok if the transition happens only once */ + CU_ASSERT(listener_state.w0_alive > 0); + } + else if (sleep <= DDS_MSECS(10)) + { + /* if sleeping briefly, expect the a good number of writes to toggle liveliness */ + CU_ASSERT(listener_state.w0_alive >= nsamples / 3); + } + else + { + /* if sleeping, expect the vast majority (90%) of the writes to toggle liveliness */ + CU_ASSERT(listener_state.w0_alive >= nsamples - nsamples / 10); + } + ddsrt_mutex_unlock(&listener_state.lock); + } + + /* cleanup */ + rc = dds_delete(waitset); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_delete(reader); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_delete(writer_active); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_delete(writer_inactive); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_delete(sub_topic); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + rc = dds_delete(pub_topic); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + ddsrt_mutex_destroy(&listener_state.lock); +} + +CU_Test(ddsc_liveliness, lease_duration_zero_or_one, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30) +{ + static const dds_duration_t sleep[] = { 0, DDS_MSECS(10), DDS_MSECS(100) }; + static const dds_liveliness_kind_t lkind[] = { DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_LIVELINESS_MANUAL_BY_TOPIC }; + static const dds_duration_t ldur[] = { 0, 1 }; + for (size_t sleep_idx = 0; sleep_idx < sizeof (sleep) / sizeof (sleep[0]); sleep_idx++) + { + for (size_t lkind_idx = 0; lkind_idx < sizeof (lkind) / sizeof (lkind[0]); lkind_idx++) + { + for (size_t ldur_idx = 0; ldur_idx < sizeof (ldur) / sizeof (ldur[0]); ldur_idx++) + { + dds_duration_t s = sleep[sleep_idx]; + dds_liveliness_kind_t k = lkind[lkind_idx]; + dds_duration_t d = ldur[ldur_idx]; + printf ("lease_duration_zero_or_one: sleep = %"PRId64" lkind = %d ldur = %"PRId64"\n", s, (int) k, d); + lease_duration_zero_or_one_impl (s, k, d); + } + } + } +}