From 06a0ba26c935421297cc5f77841011e6ef0f7e2b Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 29 Apr 2020 20:10:14 +0200 Subject: [PATCH] Rework listener tests A few failures to signal DATA_AVAILABLE (as well as some where it was signalled unnecessarily) were discovered while refactoring the RHC despite the tests all passing. Clearly the tests were inadequate. The enormous amount of boilerplate in the tests prompted a small rewrite to a programmable listener invocation tester that one simply feeds a noise-like one-liner in a string. This trades the boilerplate for somewhat inscrutable code. Signed-off-by: Erik Boasson --- src/core/ddsc/tests/listener.c | 2321 ++++++++++++++++---------------- 1 file changed, 1158 insertions(+), 1163 deletions(-) diff --git a/src/core/ddsc/tests/listener.c b/src/core/ddsc/tests/listener.c index 0ab0108..6738cf7 100644 --- a/src/core/ddsc/tests/listener.c +++ b/src/core/ddsc/tests/listener.c @@ -1,5 +1,5 @@ /* - * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * Copyright(c) 2006 to 2020 ADLINK Technology Limited and others * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -9,1254 +9,1249 @@ * * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause */ +#include + #include "dds/dds.h" - -#include "dds/ddsrt/cdtors.h" #include "dds/ddsrt/misc.h" -#include "dds/ddsrt/process.h" #include "dds/ddsrt/sync.h" -#include "dds/ddsrt/threads.h" - +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/environ.h" #include "test_common.h" -/**************************************************************************** - * TODO: Add DDS_INCONSISTENT_TOPIC_STATUS test - * TODO: Add DDS_OFFERED/REQUESTED_DEADLINE_MISSED_STATUS test - * TODO: Add DDS_LIVELINESS_LOST_STATUS test - * TODO: Check DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS intermittent fail (total_count != 1) - ****************************************************************************/ +static ddsrt_mutex_t g_mutex; +static ddsrt_cond_t g_cond; +static uint32_t cb_called; +static dds_entity_t cb_topic, cb_writer, cb_reader, cb_subscriber; + +#define DEFINE_STATUS_CALLBACK(name, NAME, kind) \ + static dds_##name##_status_t cb_##name##_status; \ + static void name##_cb (dds_entity_t kind, const dds_##name##_status_t status, void *arg) \ + { \ + (void) arg; \ + ddsrt_mutex_lock (&g_mutex); \ + cb_##kind = kind; \ + cb_##name##_status = status; \ + cb_called |= DDS_##NAME##_STATUS; \ + ddsrt_cond_broadcast (&g_cond); \ + ddsrt_mutex_unlock (&g_mutex); \ + } + +DEFINE_STATUS_CALLBACK (inconsistent_topic, INCONSISTENT_TOPIC, topic) +DEFINE_STATUS_CALLBACK (liveliness_changed, LIVELINESS_CHANGED, reader) +DEFINE_STATUS_CALLBACK (liveliness_lost, LIVELINESS_LOST, writer) +DEFINE_STATUS_CALLBACK (offered_deadline_missed, OFFERED_DEADLINE_MISSED, writer) +DEFINE_STATUS_CALLBACK (offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS, writer) +DEFINE_STATUS_CALLBACK (publication_matched, PUBLICATION_MATCHED, writer) +DEFINE_STATUS_CALLBACK (requested_deadline_missed, REQUESTED_DEADLINE_MISSED, reader) +DEFINE_STATUS_CALLBACK (requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS, reader) +DEFINE_STATUS_CALLBACK (sample_lost, SAMPLE_LOST, reader) +DEFINE_STATUS_CALLBACK (sample_rejected, SAMPLE_REJECTED, reader) +DEFINE_STATUS_CALLBACK (subscription_matched, SUBSCRIPTION_MATCHED, reader) + +static void data_on_readers_cb (dds_entity_t subscriber, void *arg) +{ + (void) arg; + ddsrt_mutex_lock (&g_mutex); + cb_subscriber = subscriber; + cb_called |= DDS_DATA_ON_READERS_STATUS; + ddsrt_cond_broadcast (&g_cond); + ddsrt_mutex_unlock (&g_mutex); +} + +static void data_available_cb (dds_entity_t reader, void *arg) +{ + (void)arg; + ddsrt_mutex_lock (&g_mutex); + cb_reader = reader; + cb_called |= DDS_DATA_AVAILABLE_STATUS; + ddsrt_cond_broadcast (&g_cond); + ddsrt_mutex_unlock (&g_mutex); +} + +static void dummy_data_on_readers_cb (dds_entity_t subscriber, void *arg) +{ + (void)subscriber; + (void)arg; +} + +static void dummy_data_available_cb (dds_entity_t reader, void *arg) +{ + (void)reader; + (void)arg; +} + +static void dummy_subscription_matched_cb (dds_entity_t reader, const dds_subscription_matched_status_t status, void *arg) +{ + (void)reader; + (void)status; + (void)arg; +} + +static void dummy_liveliness_changed_cb (dds_entity_t reader, const dds_liveliness_changed_status_t status, void *arg) +{ + (void)reader; + (void)status; + (void)arg; +} + +static void dummy_cb (void) +{ + // Used as a listener function in checking merging of listeners, + // and for that purpose, casting it to whatever function type is + // required is ok. It is not supposed to ever be called. + abort (); +} + +#undef DEFINE_STATUS_CALLBACK + +/************************************************** + **** **** + **** create/delete/get/set/copy/merge/reset **** + **** **** + **************************************************/ + +static void set_all_const (dds_listener_t *l, void (*c) (void)) +{ + dds_lset_data_available (l, (dds_on_data_available_fn) c); + dds_lset_data_on_readers (l, (dds_on_data_on_readers_fn) c); + dds_lset_inconsistent_topic (l, (dds_on_inconsistent_topic_fn) c); + dds_lset_liveliness_changed (l, (dds_on_liveliness_changed_fn) c); + dds_lset_liveliness_lost (l, (dds_on_liveliness_lost_fn) c); + dds_lset_offered_deadline_missed (l, (dds_on_offered_deadline_missed_fn) c); + dds_lset_offered_incompatible_qos (l, (dds_on_offered_incompatible_qos_fn) c); + dds_lset_publication_matched (l, (dds_on_publication_matched_fn) c); + dds_lset_requested_deadline_missed (l, (dds_on_requested_deadline_missed_fn) c); + dds_lset_requested_incompatible_qos (l, (dds_on_requested_incompatible_qos_fn) c); + dds_lset_sample_lost (l, (dds_on_sample_lost_fn) c); + dds_lset_sample_rejected (l, (dds_on_sample_rejected_fn) c); + dds_lset_subscription_matched (l, (dds_on_subscription_matched_fn) c); +} + +static void set_all (dds_listener_t *l) +{ + dds_lset_data_available (l, data_available_cb); + dds_lset_data_on_readers (l, data_on_readers_cb); + dds_lset_inconsistent_topic (l, inconsistent_topic_cb); + dds_lset_liveliness_changed (l, liveliness_changed_cb); + dds_lset_liveliness_lost (l, liveliness_lost_cb); + dds_lset_offered_deadline_missed (l, offered_deadline_missed_cb); + dds_lset_offered_incompatible_qos (l, offered_incompatible_qos_cb); + dds_lset_publication_matched (l, publication_matched_cb); + dds_lset_requested_deadline_missed (l, requested_deadline_missed_cb); + dds_lset_requested_incompatible_qos (l, requested_incompatible_qos_cb); + dds_lset_sample_lost (l, sample_lost_cb); + dds_lset_sample_rejected (l, sample_rejected_cb); + dds_lset_subscription_matched (l, subscription_matched_cb); +} -/**************************************************************************** - * Convenience test macros. - ****************************************************************************/ #define ASSERT_CALLBACK_EQUAL(fntype, listener, expected) \ - do { \ - dds_on_##fntype##_fn cb; \ - dds_lget_##fntype(listener, &cb); \ - CU_ASSERT_EQUAL(cb, expected); \ - } while (0) + do { \ + dds_on_##fntype##_fn cb; \ + dds_lget_##fntype(listener, &cb); \ + CU_ASSERT_EQUAL(cb, expected); \ + } while (0) -#define STR(fntype) #fntype##_cb - -#define TEST_GET_SET(listener, fntype, cb) \ - do { \ - dds_on_##fntype##_fn dummy = NULL; \ - /* Initially expect DDS_LUNSET on a newly created listener */ \ - ASSERT_CALLBACK_EQUAL(fntype, listener, DDS_LUNSET); \ - /* Using listener or callback NULL, shouldn't crash and be noop */ \ - dds_lset_##fntype(NULL, NULL); \ - dds_lget_##fntype(NULL, NULL); \ - dds_lget_##fntype(listener, NULL); \ - dds_lget_##fntype(NULL, &dummy); \ - CU_ASSERT_EQUAL_FATAL(dummy, NULL); \ - /* Set to NULL, get to confirm it succeeds */ \ - dds_lset_##fntype(listener, NULL); \ - ASSERT_CALLBACK_EQUAL(fntype, listener, NULL); \ - /* Set to a proper cb method, get to confirm it succeeds */ \ - dds_lset_##fntype(listener, cb); \ - ASSERT_CALLBACK_EQUAL(fntype, listener, cb); \ - } while (0) - - - -/**************************************************************************** - * Test globals. - ****************************************************************************/ -static dds_entity_t g_participant = 0; -static dds_entity_t g_subscriber = 0; -static dds_entity_t g_publisher = 0; -static dds_entity_t g_topic = 0; -static dds_entity_t g_writer = 0; -static dds_entity_t g_reader = 0; - -static dds_listener_t *g_listener = NULL; -static dds_qos_t *g_qos = NULL; -static ddsrt_mutex_t g_mutex; -static ddsrt_cond_t g_cond; - - - -/**************************************************************************** - * Callback stuff. - ****************************************************************************/ -static uint32_t cb_called = 0; -static dds_entity_t cb_topic = 0; -static dds_entity_t cb_writer = 0; -static dds_entity_t cb_reader = 0; -static dds_entity_t cb_subscriber = 0; - -static dds_inconsistent_topic_status_t cb_inconsistent_topic_status; -static dds_liveliness_lost_status_t cb_liveliness_lost_status; -static dds_offered_deadline_missed_status_t cb_offered_deadline_missed_status; -static dds_offered_incompatible_qos_status_t cb_offered_incompatible_qos_status; -static dds_sample_lost_status_t cb_sample_lost_status; -static dds_sample_rejected_status_t cb_sample_rejected_status; -static dds_liveliness_changed_status_t cb_liveliness_changed_status; -static dds_requested_deadline_missed_status_t cb_requested_deadline_missed_status; -static dds_requested_incompatible_qos_status_t cb_requested_incompatible_qos_status; -static dds_publication_matched_status_t cb_publication_matched_status; -static dds_subscription_matched_status_t cb_subscription_matched_status; - - -static void -inconsistent_topic_cb( - dds_entity_t topic, - const dds_inconsistent_topic_status_t status, void* arg) +static void check_all_const (const dds_listener_t *l, void (*c) (void)) { - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_topic = topic; - cb_inconsistent_topic_status = status; - cb_called |= DDS_INCONSISTENT_TOPIC_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); + ASSERT_CALLBACK_EQUAL (data_available, l, (dds_on_data_available_fn) c); + ASSERT_CALLBACK_EQUAL (data_on_readers, l, (dds_on_data_on_readers_fn) c); + ASSERT_CALLBACK_EQUAL (inconsistent_topic, l, (dds_on_inconsistent_topic_fn) c); + ASSERT_CALLBACK_EQUAL (liveliness_changed, l, (dds_on_liveliness_changed_fn) c); + ASSERT_CALLBACK_EQUAL (liveliness_lost, l, (dds_on_liveliness_lost_fn) c); + ASSERT_CALLBACK_EQUAL (offered_deadline_missed, l, (dds_on_offered_deadline_missed_fn) c); + ASSERT_CALLBACK_EQUAL (offered_incompatible_qos, l, (dds_on_offered_incompatible_qos_fn) c); + ASSERT_CALLBACK_EQUAL (publication_matched, l, (dds_on_publication_matched_fn) c); + ASSERT_CALLBACK_EQUAL (requested_deadline_missed, l, (dds_on_requested_deadline_missed_fn) c); + ASSERT_CALLBACK_EQUAL (requested_incompatible_qos, l, (dds_on_requested_incompatible_qos_fn) c); + ASSERT_CALLBACK_EQUAL (sample_lost, l, (dds_on_sample_lost_fn) c); + ASSERT_CALLBACK_EQUAL (sample_rejected, l, (dds_on_sample_rejected_fn) c); + ASSERT_CALLBACK_EQUAL (subscription_matched, l, (dds_on_subscription_matched_fn) c); } -static void -liveliness_lost_cb( - dds_entity_t writer, - const dds_liveliness_lost_status_t status, - void* arg) +static void check_all (const dds_listener_t *l) { - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_writer = writer; - cb_liveliness_lost_status = status; - cb_called |= DDS_LIVELINESS_LOST_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); + ASSERT_CALLBACK_EQUAL (data_available, l, data_available_cb); + ASSERT_CALLBACK_EQUAL (data_on_readers, l, data_on_readers_cb); + ASSERT_CALLBACK_EQUAL (inconsistent_topic, l, inconsistent_topic_cb); + ASSERT_CALLBACK_EQUAL (liveliness_changed, l, liveliness_changed_cb); + ASSERT_CALLBACK_EQUAL (liveliness_lost, l, liveliness_lost_cb); + ASSERT_CALLBACK_EQUAL (offered_deadline_missed, l, offered_deadline_missed_cb); + ASSERT_CALLBACK_EQUAL (offered_incompatible_qos, l, offered_incompatible_qos_cb); + ASSERT_CALLBACK_EQUAL (publication_matched, l, publication_matched_cb); + ASSERT_CALLBACK_EQUAL (requested_deadline_missed, l, requested_deadline_missed_cb); + ASSERT_CALLBACK_EQUAL (requested_incompatible_qos, l, requested_incompatible_qos_cb); + ASSERT_CALLBACK_EQUAL (sample_lost, l, sample_lost_cb); + ASSERT_CALLBACK_EQUAL (sample_rejected, l, sample_rejected_cb); + ASSERT_CALLBACK_EQUAL (subscription_matched, l, subscription_matched_cb); } -static void -offered_deadline_missed_cb( - dds_entity_t writer, - const dds_offered_deadline_missed_status_t status, - void* arg) +CU_Test (ddsc_listener, create_and_delete) { - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_writer = writer; - cb_offered_deadline_missed_status = status; - cb_called |= DDS_OFFERED_DEADLINE_MISSED_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); + dds_listener_t *listener = dds_create_listener (NULL); + CU_ASSERT_PTR_NOT_NULL_FATAL (listener); + check_all_const (listener, 0); + dds_delete_listener (listener); + + // check delete_listeners handles a null pointer gracefully + dds_delete_listener (NULL); } -static void -offered_incompatible_qos_cb( - dds_entity_t writer, - const dds_offered_incompatible_qos_status_t status, - void* arg) +CU_Test (ddsc_listener, reset) { - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_writer = writer; - cb_offered_incompatible_qos_status = status; - cb_called |= DDS_OFFERED_INCOMPATIBLE_QOS_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); + dds_listener_t *listener = dds_create_listener (NULL); + CU_ASSERT_PTR_NOT_NULL_FATAL (listener); + + set_all (listener); + + // all callbacks should revert to default after reset + dds_reset_listener (listener); + check_all_const (listener, 0); + dds_delete_listener (listener); + + // check reset_listeners handles a null pointer gracefully + dds_reset_listener (NULL); } -static void -data_on_readers_cb( - dds_entity_t subscriber, - void* arg) +CU_Test (ddsc_listener, copy) { - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_subscriber = subscriber; - cb_called |= DDS_DATA_ON_READERS_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); + dds_listener_t *listener1 = dds_create_listener (NULL); + CU_ASSERT_PTR_NOT_NULL_FATAL (listener1); + set_all (listener1); + + dds_listener_t *listener2 = dds_create_listener (NULL); + CU_ASSERT_PTR_NOT_NULL_FATAL (listener2); + dds_copy_listener (listener2, listener1); + check_all (listener2); + + // Calling copy with NULL should not crash and be noops + dds_copy_listener (listener2, NULL); + dds_copy_listener (NULL, listener1); + dds_copy_listener (NULL, NULL); + + dds_delete_listener (listener1); + dds_delete_listener (listener2); } -static void -sample_lost_cb( - dds_entity_t reader, - const dds_sample_lost_status_t status, - void* arg) +CU_Test (ddsc_listener, merge) { - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_reader = reader; - cb_sample_lost_status = status; - cb_called |= DDS_SAMPLE_LOST_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} + dds_listener_t *listener1 = dds_create_listener (NULL); + CU_ASSERT_PTR_NOT_NULL_FATAL (listener1); + set_all (listener1); -static void -data_available_cb( - dds_entity_t reader, - void* arg) -{ - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_reader = reader; - cb_called |= DDS_DATA_AVAILABLE_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} + // Merging listener1 into empty listener2 be like a copy + dds_listener_t *listener2 = dds_create_listener (NULL); + CU_ASSERT_PTR_NOT_NULL_FATAL (listener2); + dds_merge_listener (listener2, listener1); + check_all (listener2); -static void -sample_rejected_cb( - dds_entity_t reader, - const dds_sample_rejected_status_t status, - void* arg) -{ - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_reader = reader; - cb_sample_rejected_status = status; - cb_called |= DDS_SAMPLE_REJECTED_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} + // Merging listener into a full listener2 should not overwrite anything + set_all_const (listener2, dummy_cb); + dds_merge_listener (listener2, listener1); + check_all_const (listener2, dummy_cb); -static void -liveliness_changed_cb( - dds_entity_t reader, - const dds_liveliness_changed_status_t status, - void* arg) -{ - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_reader = reader; - cb_liveliness_changed_status = status; - cb_called |= DDS_LIVELINESS_CHANGED_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} + // Using NULLs shouldn't crash and be noops + dds_merge_listener (listener2, NULL); + dds_merge_listener (NULL, listener1); + dds_merge_listener (NULL, NULL); -static void -requested_deadline_missed_cb( - dds_entity_t reader, - const dds_requested_deadline_missed_status_t status, - void* arg) -{ - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_reader = reader; - cb_requested_deadline_missed_status = status; - cb_called |= DDS_REQUESTED_DEADLINE_MISSED_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} - -static void -requested_incompatible_qos_cb( - dds_entity_t reader, - const dds_requested_incompatible_qos_status_t status, - void* arg) -{ - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_reader = reader; - cb_requested_incompatible_qos_status = status; - cb_called |= DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} - -static void -publication_matched_cb( - dds_entity_t writer, - const dds_publication_matched_status_t status, - void* arg) -{ - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_writer = writer; - cb_publication_matched_status = status; - cb_called |= DDS_PUBLICATION_MATCHED_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} - -static void -subscription_matched_cb( - dds_entity_t reader, - const dds_subscription_matched_status_t status, - void* arg) -{ - (void)arg; - ddsrt_mutex_lock(&g_mutex); - cb_reader = reader; - cb_subscription_matched_status = status; - cb_called |= DDS_SUBSCRIPTION_MATCHED_STATUS; - ddsrt_cond_broadcast(&g_cond); - ddsrt_mutex_unlock(&g_mutex); -} - -static void -callback_dummy(void) -{ -} - -static uint32_t -waitfor_cb(uint32_t expected) -{ - dds_time_t timeout = 5 * DDS_NSECS_IN_SEC; - bool signalled = true; - ddsrt_mutex_lock(&g_mutex); - while (((cb_called & expected) != expected) && (signalled)) { - signalled = ddsrt_cond_waitfor(&g_cond, &g_mutex, timeout); - } - ddsrt_mutex_unlock(&g_mutex); - return cb_called; -} - - - -/**************************************************************************** - * Test initializations and teardowns. - ****************************************************************************/ - -static void -init_triggering_base(void) -{ - char name[100]; - - ddsrt_init(); - - ddsrt_mutex_init(&g_mutex); - ddsrt_cond_init(&g_cond); - - g_participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL); - CU_ASSERT_FATAL(g_participant > 0); - - g_subscriber = dds_create_subscriber(g_participant, NULL, NULL); - CU_ASSERT_FATAL(g_subscriber > 0); - - g_publisher = dds_create_publisher(g_participant, NULL, NULL); - CU_ASSERT_FATAL(g_publisher > 0); - - g_topic = dds_create_topic(g_participant, &RoundTripModule_DataType_desc, create_unique_topic_name("ddsc_listener_test", name, 100), NULL, NULL); - CU_ASSERT_FATAL(g_topic > 0); - - g_listener = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(g_listener); - - g_qos = dds_create_qos(); - CU_ASSERT_PTR_NOT_NULL_FATAL(g_qos); - dds_qset_reliability(g_qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(1)); - dds_qset_history(g_qos, DDS_HISTORY_KEEP_ALL, 0); - - cb_called = 0; -} - -static void -init_triggering_test(void) -{ - uint32_t triggered; - - /* Initialize base. */ - init_triggering_base(); - - /* Set QoS Policies that'll help us test various status callbacks. */ - dds_qset_destination_order(g_qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); - dds_qset_reliability(g_qos, DDS_RELIABILITY_BEST_EFFORT, DDS_MSECS(100)); - dds_qset_resource_limits(g_qos, 1, 1, 1); - - /* Use these to be sure reader and writer know each other. */ - dds_lset_publication_matched(g_listener, publication_matched_cb); - dds_lset_subscription_matched(g_listener, subscription_matched_cb); - dds_lset_liveliness_changed(g_listener, liveliness_changed_cb); - - /* Create reader and writer with proper listeners. */ - g_writer = dds_create_writer(g_publisher, g_topic, g_qos, g_listener); - CU_ASSERT(g_writer > 0); - g_reader = dds_create_reader(g_subscriber, g_topic, g_qos, g_listener); - CU_ASSERT(g_reader > 0); - - /* Sync. */ - triggered = waitfor_cb(DDS_PUBLICATION_MATCHED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_LIVELINESS_CHANGED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_LIVELINESS_CHANGED_STATUS, DDS_LIVELINESS_CHANGED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_PUBLICATION_MATCHED_STATUS, DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_SUBSCRIPTION_MATCHED_STATUS, DDS_SUBSCRIPTION_MATCHED_STATUS); -} - -static void -fini_triggering_base(void) -{ - dds_delete_qos(g_qos); - dds_delete_listener(g_listener); - dds_delete(g_participant); - ddsrt_cond_destroy(&g_cond); - ddsrt_mutex_destroy(&g_mutex); - ddsrt_fini(); -} - -static void -fini_triggering_test(void) -{ - dds_delete(g_reader); - if (g_writer) - dds_delete(g_writer); - fini_triggering_base(); -} - - -/**************************************************************************** - * API tests - ****************************************************************************/ -CU_Test(ddsc_listener, create_and_delete) -{ - /* Verify create doesn't return null */ - dds_listener_t *listener; - listener = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener); - - /* Check default cb's are set */ - ASSERT_CALLBACK_EQUAL(inconsistent_topic, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(liveliness_lost, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(offered_deadline_missed, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(offered_incompatible_qos, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(data_on_readers, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(sample_lost, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(sample_rejected, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(liveliness_changed, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(requested_deadline_missed, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(requested_incompatible_qos, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(publication_matched, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(subscription_matched, listener, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(data_available, listener, DDS_LUNSET); - - dds_delete_listener(listener); - DDSRT_WARNING_MSVC_OFF(6387); /* Disable SAL warning on intentional misuse of the API */ - dds_delete_listener(NULL); - DDSRT_WARNING_MSVC_ON(6387); -} - -CU_Test(ddsc_listener, reset) -{ - dds_listener_t *listener; - listener = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener); - - /* Set a listener cb to a non-default value */ - dds_lset_data_available(listener, NULL); - ASSERT_CALLBACK_EQUAL(data_available, listener, NULL); - - /* Listener cb should revert to default after reset */ - dds_reset_listener(listener); - ASSERT_CALLBACK_EQUAL(data_available, listener, DDS_LUNSET); - - /* Resetting a NULL listener should not crash */ - dds_reset_listener(NULL); - - dds_delete_listener(listener); -} - -CU_Test(ddsc_listener, copy) -{ - dds_listener_t *listener1 = NULL, *listener2 = NULL; - listener1 = dds_create_listener(NULL); - listener2 = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener1); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener2); - - /* Set some listener1 callbacks to non-default values */ - dds_lset_data_available(listener1, NULL); - dds_lset_sample_lost(listener1, sample_lost_cb); - ASSERT_CALLBACK_EQUAL(data_available, listener1, NULL); - ASSERT_CALLBACK_EQUAL(sample_lost, listener1, sample_lost_cb); - ASSERT_CALLBACK_EQUAL(data_available, listener2, DDS_LUNSET); - ASSERT_CALLBACK_EQUAL(sample_lost, listener2, DDS_LUNSET); - - /* Cb's should be copied to listener2 */ - dds_copy_listener(listener2, listener1); - ASSERT_CALLBACK_EQUAL(data_available, listener1, NULL); - ASSERT_CALLBACK_EQUAL(data_available, listener2, NULL); - ASSERT_CALLBACK_EQUAL(sample_lost, listener1, sample_lost_cb); - ASSERT_CALLBACK_EQUAL(sample_lost, listener2, sample_lost_cb); - - /* Calling copy with NULL should not crash and be noops. */ - DDSRT_WARNING_MSVC_OFF(6387); /* Disable SAL warning on intentional misuse of the API */ - dds_copy_listener(listener2, NULL); - dds_copy_listener(NULL, listener1); - dds_copy_listener(NULL, NULL); - DDSRT_WARNING_MSVC_ON(6387); - - dds_delete_listener(listener1); - dds_delete_listener(listener2); -} - -CU_Test(ddsc_listener, merge) -{ - dds_listener_t *listener1 = NULL, *listener2 = NULL; - listener1 = dds_create_listener(NULL); - listener2 = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener1); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener2); - - /* Set all listener1 callbacks to non-default values */ - dds_lset_inconsistent_topic (listener1, inconsistent_topic_cb); - dds_lset_liveliness_lost (listener1, liveliness_lost_cb); - dds_lset_offered_deadline_missed (listener1, offered_deadline_missed_cb); - dds_lset_offered_incompatible_qos (listener1, offered_incompatible_qos_cb); - dds_lset_data_on_readers (listener1, data_on_readers_cb); - dds_lset_sample_lost (listener1, sample_lost_cb); - dds_lset_data_available (listener1, data_available_cb); - dds_lset_sample_rejected (listener1, sample_rejected_cb); - dds_lset_liveliness_changed (listener1, liveliness_changed_cb); - dds_lset_requested_deadline_missed (listener1, requested_deadline_missed_cb); - dds_lset_requested_incompatible_qos (listener1, requested_incompatible_qos_cb); - dds_lset_publication_matched (listener1, publication_matched_cb); - dds_lset_subscription_matched (listener1, subscription_matched_cb); - - /* Merging listener1 into empty listener2 should act a bit like a copy. */ - dds_merge_listener(listener2, listener1); - ASSERT_CALLBACK_EQUAL(inconsistent_topic, listener2, inconsistent_topic_cb); - ASSERT_CALLBACK_EQUAL(liveliness_lost, listener2, liveliness_lost_cb); - ASSERT_CALLBACK_EQUAL(offered_deadline_missed, listener2, offered_deadline_missed_cb); - ASSERT_CALLBACK_EQUAL(offered_incompatible_qos, listener2, offered_incompatible_qos_cb); - ASSERT_CALLBACK_EQUAL(data_on_readers, listener2, data_on_readers_cb); - ASSERT_CALLBACK_EQUAL(sample_lost, listener2, sample_lost_cb); - ASSERT_CALLBACK_EQUAL(data_available, listener2, data_available_cb); - ASSERT_CALLBACK_EQUAL(sample_rejected, listener2, sample_rejected_cb); - ASSERT_CALLBACK_EQUAL(liveliness_changed, listener2, liveliness_changed_cb); - ASSERT_CALLBACK_EQUAL(requested_deadline_missed, listener2, requested_deadline_missed_cb); - ASSERT_CALLBACK_EQUAL(requested_incompatible_qos, listener2, requested_incompatible_qos_cb); - ASSERT_CALLBACK_EQUAL(publication_matched, listener2, publication_matched_cb); - ASSERT_CALLBACK_EQUAL(subscription_matched, listener2, subscription_matched_cb); - - /* Merging listener into a full listener2 should act as a noop. */ - dds_lset_inconsistent_topic (listener2, (dds_on_inconsistent_topic_fn)callback_dummy); - dds_lset_liveliness_lost (listener2, (dds_on_liveliness_lost_fn)callback_dummy); - dds_lset_offered_deadline_missed (listener2, (dds_on_offered_deadline_missed_fn)callback_dummy); - dds_lset_offered_incompatible_qos (listener2, (dds_on_offered_incompatible_qos_fn)callback_dummy); - dds_lset_data_on_readers (listener2, (dds_on_data_on_readers_fn)callback_dummy); - dds_lset_sample_lost (listener2, (dds_on_sample_lost_fn)callback_dummy); - dds_lset_data_available (listener2, (dds_on_data_available_fn)callback_dummy); - dds_lset_sample_rejected (listener2, (dds_on_sample_rejected_fn)callback_dummy); - dds_lset_liveliness_changed (listener2, (dds_on_liveliness_changed_fn)callback_dummy); - dds_lset_requested_deadline_missed (listener2, (dds_on_requested_deadline_missed_fn)callback_dummy); - dds_lset_requested_incompatible_qos (listener2, (dds_on_requested_incompatible_qos_fn)callback_dummy); - dds_lset_publication_matched (listener2, (dds_on_publication_matched_fn)callback_dummy); - dds_lset_subscription_matched (listener2, (dds_on_subscription_matched_fn)callback_dummy); - dds_merge_listener(listener2, listener1); - ASSERT_CALLBACK_EQUAL(inconsistent_topic, listener2, (dds_on_inconsistent_topic_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(liveliness_lost, listener2, (dds_on_liveliness_lost_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(offered_deadline_missed, listener2, (dds_on_offered_deadline_missed_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(offered_incompatible_qos, listener2, (dds_on_offered_incompatible_qos_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(data_on_readers, listener2, (dds_on_data_on_readers_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(sample_lost, listener2, (dds_on_sample_lost_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(data_available, listener2, (dds_on_data_available_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(sample_rejected, listener2, (dds_on_sample_rejected_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(liveliness_changed, listener2, (dds_on_liveliness_changed_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(requested_deadline_missed, listener2, (dds_on_requested_deadline_missed_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(requested_incompatible_qos, listener2, (dds_on_requested_incompatible_qos_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(publication_matched, listener2, (dds_on_publication_matched_fn)callback_dummy); - ASSERT_CALLBACK_EQUAL(subscription_matched, listener2, (dds_on_subscription_matched_fn)callback_dummy); - - /* Using NULLs shouldn't crash and be noops. */ - dds_merge_listener(listener2, NULL); - dds_merge_listener(NULL, listener1); - dds_merge_listener(NULL, NULL); - - dds_delete_listener(listener1); - dds_delete_listener(listener2); + dds_delete_listener (listener1); + dds_delete_listener (listener2); } CU_Test(ddsc_listener, getters_setters) { - /* test all individual cb get/set methods */ - dds_listener_t *listener = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener); + // test all individual cb get/set methods + dds_listener_t *listener = dds_create_listener (NULL); + CU_ASSERT_PTR_NOT_NULL_FATAL (listener); - DDSRT_WARNING_MSVC_OFF(6387); /* Disable SAL warning on intentional misuse of the API */ \ - TEST_GET_SET(listener, inconsistent_topic, inconsistent_topic_cb); - TEST_GET_SET(listener, liveliness_lost, liveliness_lost_cb); - TEST_GET_SET(listener, offered_deadline_missed, offered_deadline_missed_cb); - TEST_GET_SET(listener, offered_incompatible_qos, offered_incompatible_qos_cb); - TEST_GET_SET(listener, data_on_readers, data_on_readers_cb); - TEST_GET_SET(listener, sample_lost, sample_lost_cb); - TEST_GET_SET(listener, sample_rejected, sample_rejected_cb); - TEST_GET_SET(listener, liveliness_changed, liveliness_changed_cb); - TEST_GET_SET(listener, requested_deadline_missed, requested_deadline_missed_cb); - TEST_GET_SET(listener, requested_incompatible_qos, requested_incompatible_qos_cb); - TEST_GET_SET(listener, publication_matched, publication_matched_cb); - TEST_GET_SET(listener, subscription_matched, subscription_matched_cb); - TEST_GET_SET(listener, data_available, data_available_cb); - DDSRT_WARNING_MSVC_ON(6387); +#define TEST_GET_SET(listener, fntype, cb) \ + do { \ + dds_on_##fntype##_fn dummy = NULL; \ + /* Initially expect DDS_LUNSET on a newly created listener */ \ + ASSERT_CALLBACK_EQUAL (fntype, listener, 0); \ + /* Using listener or callback NULL, shouldn't crash and be noop */ \ + dds_lset_##fntype (NULL, NULL); \ + dds_lget_##fntype (NULL, NULL); \ + dds_lget_##fntype (listener, NULL); \ + dds_lget_##fntype (NULL, &dummy); \ + CU_ASSERT_EQUAL_FATAL (dummy, NULL); \ + /* Set to NULL, get to confirm it succeeds */ \ + dds_lset_##fntype (listener, NULL); \ + ASSERT_CALLBACK_EQUAL (fntype, listener, NULL); \ + /* Set to a proper cb method, get to confirm it succeeds */ \ + dds_lset_##fntype (listener, cb); \ + ASSERT_CALLBACK_EQUAL (fntype, listener, cb); \ + } while (0) + TEST_GET_SET (listener, data_available, data_available_cb); + TEST_GET_SET (listener, data_on_readers, data_on_readers_cb); + TEST_GET_SET (listener, inconsistent_topic, inconsistent_topic_cb); + TEST_GET_SET (listener, liveliness_changed, liveliness_changed_cb); + TEST_GET_SET (listener, liveliness_lost, liveliness_lost_cb); + TEST_GET_SET (listener, offered_deadline_missed, offered_deadline_missed_cb); + TEST_GET_SET (listener, offered_incompatible_qos, offered_incompatible_qos_cb); + TEST_GET_SET (listener, publication_matched, publication_matched_cb); + TEST_GET_SET (listener, requested_deadline_missed, requested_deadline_missed_cb); + TEST_GET_SET (listener, requested_incompatible_qos, requested_incompatible_qos_cb); + TEST_GET_SET (listener, sample_lost, sample_lost_cb); + TEST_GET_SET (listener, sample_rejected, sample_rejected_cb); + TEST_GET_SET (listener, subscription_matched, subscription_matched_cb); +#undef TEST_GET_SET - dds_delete_listener(listener); + dds_delete_listener (listener); } +#undef ASSERT_CALLBACK_EQUAL +/************************************************** + **** **** + **** programmable listener checker **** + **** **** + **************************************************/ -/**************************************************************************** - * Triggering tests - ****************************************************************************/ -CU_Test(ddsc_listener, propagation, .init=init_triggering_base, .fini=fini_triggering_base) +// These had better match the corresponding type definitions! +// n uint32_t ...count +// c int32_t ...count_change +// I instance handle of a data instance +// P uint32_t QoS policy ID +// E instance handle of an entity +// R sample_rejected_status_kind +static const struct { + size_t size; // size of status struct + const char *desc; // description of status struct + uint32_t mask; // status mask, bit in "cb_called" + const dds_entity_t *cb_entity; // which cb_... entity to look at + const void *cb_status; // cb_..._status to look at +} lldesc[] = { + { 0, NULL, DDS_DATA_AVAILABLE_STATUS, &cb_reader, NULL }, // data available + { 0, NULL, DDS_DATA_ON_READERS_STATUS, &cb_subscriber, NULL }, // data on readers + { sizeof (dds_inconsistent_topic_status_t), "nc", DDS_INCONSISTENT_TOPIC_STATUS, &cb_topic, &cb_inconsistent_topic_status }, + { sizeof (dds_liveliness_changed_status_t), "nnccE", DDS_LIVELINESS_CHANGED_STATUS, &cb_reader, &cb_liveliness_changed_status }, + { sizeof (dds_liveliness_lost_status_t), "nc", DDS_LIVELINESS_LOST_STATUS, &cb_writer, &cb_liveliness_lost_status }, + { sizeof (dds_offered_deadline_missed_status_t), "ncI", DDS_OFFERED_DEADLINE_MISSED_STATUS, &cb_writer, &cb_offered_deadline_missed_status }, + { sizeof (dds_offered_incompatible_qos_status_t), "ncP", DDS_OFFERED_INCOMPATIBLE_QOS_STATUS, &cb_writer, &cb_offered_incompatible_qos_status }, + { sizeof (dds_publication_matched_status_t), "ncncE", DDS_PUBLICATION_MATCHED_STATUS, &cb_writer, &cb_publication_matched_status }, + { sizeof (dds_requested_deadline_missed_status_t), "ncI", DDS_REQUESTED_DEADLINE_MISSED_STATUS, &cb_reader, &cb_requested_deadline_missed_status }, + { sizeof (dds_requested_incompatible_qos_status_t), "ncP", DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS, &cb_reader, &cb_requested_incompatible_qos_status }, + { sizeof (dds_sample_lost_status_t), "nc", DDS_SAMPLE_LOST_STATUS, &cb_reader, &cb_sample_lost_status }, + { sizeof (dds_sample_rejected_status_t), "ncRI", DDS_SAMPLE_REJECTED_STATUS, &cb_reader, &cb_sample_rejected_status }, + { sizeof (dds_subscription_matched_status_t), "ncncE", DDS_SUBSCRIPTION_MATCHED_STATUS, &cb_reader, &cb_subscription_matched_status } +}; + +static const void *advance (const void *status, size_t *off, char code) { - dds_listener_t *listener_par = NULL; - dds_listener_t *listener_pub = NULL; - dds_listener_t *listener_sub = NULL; - uint32_t triggered; +#define alignof(type_) offsetof (struct { char c; type_ d; }, d) + size_t align = 1, size = 1; + switch (code) + { + case 'n': case 'c': case 'P': + align = alignof (uint32_t); size = sizeof (uint32_t); + break; + case 'E': case 'I': + align = alignof (dds_instance_handle_t); size = sizeof (dds_instance_handle_t); + break; + case 'R': + align = alignof (dds_sample_rejected_status_kind); size = sizeof (dds_sample_rejected_status_kind); + break; + default: + abort (); + } +#undef alignof + *off = (*off + align - 1) & ~(align - 1); + const void *p = (const char *) status + *off; + *off += size; + return p; +} + +static void get_status (int ll, dds_entity_t ent, void *status) +{ + dds_return_t ret; + switch (ll) + { + case 2: ret = dds_get_inconsistent_topic_status (ent, status); break; + case 3: ret = dds_get_liveliness_changed_status (ent, status); break; + case 4: ret = dds_get_liveliness_lost_status (ent, status); break; + case 5: ret = dds_get_offered_deadline_missed_status (ent, status); break; + case 6: ret = dds_get_offered_incompatible_qos_status (ent, status); break; + case 7: ret = dds_get_publication_matched_status (ent, status); break; + case 8: ret = dds_get_requested_deadline_missed_status (ent, status); break; + case 9: ret = dds_get_requested_incompatible_qos_status (ent, status); break; + case 10: ret = dds_get_sample_lost_status (ent, status); break; + case 11: ret = dds_get_sample_rejected_status (ent, status); break; + case 12: ret = dds_get_subscription_matched_status (ent, status); break; + default: abort (); + } + CU_ASSERT_FATAL (ret == 0); +} + +static void assert_status_change_fields_are_0 (int ll, dds_entity_t ent) +{ + if (lldesc[ll].desc) + { + const char *d = lldesc[ll].desc; + void *status = malloc (lldesc[ll].size); + get_status (ll, ent, status); + size_t off = 0; + while (*d) + { + const uint32_t *p = advance (status, &off, *d); + if (*d == 'c') + CU_ASSERT_FATAL (*p == 0); + d++; + } + assert (off <= lldesc[ll].size); + free (status); + } +} + +static int getentity (const char *tok, bool *isbang, bool *ishash) +{ + static const char *known = "PRWrstwxy"; + const char *p; + if (isbang) + *isbang = false; + if (ishash) + *ishash = false; + if ((p = strchr (known, *tok)) == NULL) + return -1; + int ent = (int) (p - known); + if (*++tok == 0) + return ent; + if (*tok == '\'') + { + ent += (int) strlen (known); + tok++; + } + while (*tok == '!' || *tok == '#') + { + if (strchr (known + 3, *p) == NULL) + return -1; // only readers, writers + if (*tok == '!' && isbang) + *isbang = true; + else if (*tok == '#' && ishash) + *ishash = true; + tok++; + } + return (*tok == 0) ? ent : -1; +} + +static int getlistener (const char *tok, bool *isbang) +{ + // note: sort order is on full name (so sample rejected precedes subscription matched) + static const char *ls[] = { + "da", "dor", "it", "lc", "ll", "odm", "oiq", "pm", "rdm", "riq", "sl", "sr", "sm" + }; + if (isbang) + *isbang = false; + for (size_t i = 0; i < sizeof (ls) / sizeof (*ls); i++) + { + size_t n = strlen (ls[i]); + if (strncmp (tok, ls[i], n) == 0 && (tok[n] == 0 || tok[n+1] == ',')) + { + if (isbang) + *isbang = (tok[n] == '!'); + return (int) i; + } + } + return -1; +} + +struct ents { + dds_entity_t es[2 * 9]; + dds_entity_t tps[2]; + dds_entity_t doms[2]; + dds_instance_handle_t esi[2 * 9]; + // built-in topic readers for cross-referencing instance handles + dds_entity_t pubrd[2]; + dds_entity_t subrd[2]; +}; + +static void make_participant (struct ents *es, const char *topicname, int ent, const dds_qos_t *qos, dds_listener_t *list) +{ + const dds_domainid_t domid = (ent < 9) ? 0 : 1; + char *conf = ddsrt_expand_envvars ("${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}0", domid); + printf ("create domain %"PRIu32, domid); + fflush (stdout); + es->doms[domid] = dds_create_domain (domid, conf); + CU_ASSERT_FATAL (es->doms[domid] > 0); + ddsrt_free (conf); + printf (" create participant P%s", (ent < 9) ? "" : "'"); + fflush (stdout); + es->es[ent] = dds_create_participant (domid, NULL, list); + CU_ASSERT_FATAL (es->es[ent] > 0); + es->tps[domid] = dds_create_topic (es->es[ent], &Space_Type1_desc, topicname, qos, NULL); + CU_ASSERT_FATAL (es->tps[domid] > 0); + + // Create the built-in topic readers with a dummy listener to avoid any event (data available comes to mind) + // from propagating to the normal data available listener, in case it has been set on the participant. + // + // - dummy_cb aborts when it is invoked, but all reader-related listeners that can possibly trigger are set + // separately (incompatible qos, deadline missed, sample lost and sample rejected are all impossible by + // construction) + // - regarding data_on_readers: Cyclone handles listeners installed on an ancestor by *inheriting* them, + // rather than by walking up ancestor chain. Setting data_on_readers on the reader therefore overrides the + // listener set on the subscriber. It is a nice feature! + dds_listener_t *dummylist = dds_create_listener (NULL); + set_all_const (dummylist, dummy_cb); + dds_lset_data_available (dummylist, dummy_data_available_cb); + dds_lset_data_on_readers (dummylist, dummy_data_on_readers_cb); + dds_lset_subscription_matched (dummylist, dummy_subscription_matched_cb); + dds_lset_liveliness_changed (dummylist, dummy_liveliness_changed_cb); + es->pubrd[domid] = dds_create_reader (es->es[ent], DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, dummylist); + CU_ASSERT_FATAL (es->pubrd[domid] > 0); + es->subrd[domid] = dds_create_reader (es->es[ent], DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, dummylist); + CU_ASSERT_FATAL (es->subrd[domid] > 0); + dds_delete_listener (dummylist); + printf ("pubrd %"PRId32" subrd %"PRId32" sub %"PRId32"\n", es->pubrd[domid], es->subrd[domid], dds_get_parent (es->pubrd[domid])); +} + +static void make_entity1 (struct ents *es, const char *topicname, int ent, bool isbang, bool ishash, const dds_qos_t *qos, dds_qos_t *rwqos, dds_listener_t *list) +{ + dds_return_t ret; + switch (ent) + { + case 0: case 9: + make_participant (es, topicname, ent, qos, list); + break; + case 1: case 10: + if (es->es[ent-1] == 0) + { + printf ("["); + make_entity1 (es, topicname, ent-1, false, false, qos, rwqos, NULL); + printf ("] "); + } + printf ("create subscriber R%s", (ent < 9) ? "" : "'"); + fflush (stdout); + es->es[ent] = dds_create_subscriber (es->es[ent-1], NULL, list); + break; + case 2: case 11: + if (es->es[ent-2] == 0) + { + printf ("["); + make_entity1 (es, topicname, ent-2, false, false, qos, rwqos, NULL); + printf ("] "); + } + printf ("create publisher W%s", (ent < 9) ? "" : "'"); + fflush (stdout); + es->es[ent] = dds_create_publisher (es->es[ent-2], NULL, list); + break; + case 3: case 4: case 5: case 12: case 13: case 14: + if (es->es[ent < 9 ? 1 : 10] == 0) + { + printf ("["); + make_entity1 (es, topicname, ent < 9 ? 1 : 10, false, false, qos, rwqos, NULL); + printf ("] "); + } + printf ("create %s reader %c%s", isbang ? "best-effort" : "reliable", 'r' + (ent < 9 ? ent-3 : ent-12), (ent < 9) ? "" : "'"); + fflush (stdout); + dds_reset_qos (rwqos); + if (isbang) + dds_qset_reliability (rwqos, DDS_RELIABILITY_BEST_EFFORT, DDS_MSECS (100)); + if (ishash) + dds_qset_resource_limits (rwqos, 1, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + es->es[ent] = dds_create_reader (es->es[ent < 9 ? 1 : 10], es->tps[ent < 9 ? 0 : 1], rwqos, list); + break; + case 6: case 7: case 8: case 15: case 16: case 17: + if (es->es[ent < 9 ? 2 : 11] == 0) + { + printf ("["); + make_entity1 (es, topicname, ent < 9 ? 2 : 11, false, false, qos, rwqos, NULL); + printf ("] "); + } + printf ("create %s writer %c%s", isbang ? "best-effort" : "reliable", 'w' + (ent < 9 ? ent-6 : ent-15), (ent < 9) ? "" : "'"); + fflush (stdout); + dds_reset_qos (rwqos); + if (isbang) + dds_qset_reliability (rwqos, DDS_RELIABILITY_BEST_EFFORT, DDS_MSECS (100)); + if (ishash) + dds_qset_resource_limits (rwqos, 1, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + es->es[ent] = dds_create_writer (es->es[ent < 9 ? 2 : 11], es->tps[ent < 9 ? 0 : 1], rwqos, list); + break; + default: + abort (); + } + printf (" = %"PRId32, es->es[ent]); + fflush (stdout); + CU_ASSERT_FATAL (es->es[ent] > 0); + ret = dds_get_instance_handle (es->es[ent], &es->esi[ent]); + //printf (" %"PRIx64, es->esi[ent]); + //fflush (stdout); + CU_ASSERT_FATAL (ret == 0); +} + +static void make_entity (struct ents *es, const char *topicname, int ent, bool isbang, bool ishash, const dds_qos_t *qos, dds_qos_t *rwqos, dds_listener_t *list) +{ + make_entity1 (es, topicname, ent, isbang, ishash, qos, rwqos, list); + printf ("\n"); +} + +static char *strsep_noempty (char **cursor, const char *sep) +{ + char *tok; + while ((tok = ddsrt_strsep (cursor, sep)) != NULL && *tok == 0) { } + return tok; +} + +static dds_instance_handle_t lookup_insthandle (const struct ents *es, int ent, int ent1) +{ + // if both are in the same domain, it's easy + if (ent / 9 == ent1 / 9) + return es->esi[ent1]; + else + { + // if they aren't ... find GUID from instance handle in the one domain, + // then find instance handle for GUID in the other + dds_entity_t rd1 = 0, rd2 = 0; + switch (ent1) + { + case 3: case 4: case 5: rd1 = es->subrd[0]; rd2 = es->subrd[1]; break; + case 12: case 13: case 14: rd1 = es->subrd[1]; rd2 = es->subrd[0]; break; + case 6: case 7: case 8: rd1 = es->pubrd[0]; rd2 = es->pubrd[1]; break; + case 15: case 16: case 17: rd1 = es->pubrd[1]; rd2 = es->pubrd[0]; break; + default: abort (); + } + dds_return_t ret; - RoundTripModule_DataType sample; - memset (&sample, 0, sizeof (sample)); - - /* Let participant be interested in data. */ - listener_par = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener_par); - dds_lset_data_on_readers(listener_par, data_on_readers_cb); - ret = dds_set_listener(g_participant, listener_par); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - dds_delete_listener(listener_par); - - /* Let publisher be interested in publication matched. */ - listener_pub = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener_pub); - dds_lset_publication_matched(listener_pub, publication_matched_cb); - ret = dds_set_listener(g_publisher, listener_pub); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - dds_delete_listener(listener_pub); - - /* Let subscriber be interested in subscription matched. */ - listener_sub = dds_create_listener(NULL); - CU_ASSERT_PTR_NOT_NULL_FATAL(listener_pub); - dds_lset_subscription_matched(listener_sub, subscription_matched_cb); - ret = dds_set_listener(g_subscriber, listener_sub); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - dds_delete_listener(listener_sub); - - /* Create reader and writer without listeners. */ - g_reader = dds_create_reader(g_subscriber, g_topic, g_qos, NULL); - CU_ASSERT_FATAL(g_reader > 0); - g_writer = dds_create_writer(g_publisher, g_topic, g_qos, NULL); - CU_ASSERT_FATAL(g_writer > 0); - - /* Publication and Subscription should be matched. */ - triggered = waitfor_cb(DDS_PUBLICATION_MATCHED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_SUBSCRIPTION_MATCHED_STATUS, DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_PUBLICATION_MATCHED_STATUS, DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_writer, g_writer); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - - /* Write sample. */ - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Data on readers should be triggered with the right status. */ - triggered = waitfor_cb(DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_ON_READERS_STATUS, DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_subscriber, g_subscriber); - CU_ASSERT_NOT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - - dds_delete(g_writer); - dds_delete(g_reader); + dds_builtintopic_endpoint_t keysample; + //printf ("(in %"PRId32" %"PRIx64" -> ", rd1, es->esi[ent1]); + //fflush (stdout); + ret = dds_instance_get_key (rd1, es->esi[ent1], &keysample); + CU_ASSERT_FATAL (ret == 0); + // In principle, only key fields are set in sample returned by get_key; + // in the case of a built-in topic that is extended to the participant + // key. The qos and topic/type names should not be set, and there is no + // (therefore) memory allocated for the sample. + CU_ASSERT_FATAL (keysample.qos == NULL); + CU_ASSERT_FATAL (keysample.topic_name == NULL); + CU_ASSERT_FATAL (keysample.type_name == NULL); + //for (size_t j = 0; j < sizeof (keysample.key.v); j++) + // printf ("%s%02x", (j > 0 && j % 4 == 0) ? ":" : "", keysample.key.v[j]); + const dds_instance_handle_t ih = dds_lookup_instance (rd2, &keysample); + CU_ASSERT_FATAL (ih != 0); + //printf (" -> %"PRIx64")", ih); + //fflush (stdout); + return ih; + } } - -CU_Test(ddsc_listener, matched, .init=init_triggering_base, .fini=fini_triggering_base) +static void checkstatus (int ll, const struct ents *es, int ent, const char *args, const void *status) { - uint32_t triggered; - - /* We will basically do the same as the 'normal' init_triggering_test() and - * fini_triggering_test() calls. It's just that we do it in a different - * order and use the participant iso subscriber and publisher. */ - - /* We are interested in matched notifications. */ - dds_lset_publication_matched(g_listener, publication_matched_cb); - dds_lset_subscription_matched(g_listener, subscription_matched_cb); - - /* Create reader and writer with proper listeners. - * The creation order is deliberately different from publication_matched and subscription_matched. */ - g_reader = dds_create_reader(g_participant, g_topic, g_qos, g_listener); - CU_ASSERT_FATAL(g_reader > 0); - g_writer = dds_create_writer(g_participant, g_topic, g_qos, g_listener); - CU_ASSERT_FATAL(g_writer > 0); - - /* Both matched should be triggered on the right entities. */ - triggered = waitfor_cb(DDS_PUBLICATION_MATCHED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_SUBSCRIPTION_MATCHED_STATUS, DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_PUBLICATION_MATCHED_STATUS, DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_writer, g_writer); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - - dds_delete(g_writer); - dds_delete(g_reader); + DDSRT_WARNING_MSVC_OFF(4996); // use of sscanf triggers a warning + if (*args == 0) + return; + if (*args++ != '(') + abort (); + assert (lldesc[ll].desc != NULL); + const char *d = lldesc[ll].desc; + const char *sep = "("; + size_t off = 0; + while (*d) + { + const void *p = advance (status, &off, *d); + char str[32]; + unsigned u; + int i, pos = -1; + switch (*d) + { + case 'n': + if (sscanf (args, "%u%n", &u, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) + abort (); + printf ("%s%"PRIu32" %u", sep, *(uint32_t *)p, u); fflush (stdout); + CU_ASSERT_FATAL (*(uint32_t *)p == u); + break; + case 'c': + if (sscanf (args, "%d%n", &i, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) + abort (); + printf ("%s%"PRId32" %d", sep, *(int32_t *)p, i); fflush (stdout); + CU_ASSERT_FATAL (*(int32_t *)p == i); + break; + case 'P': // policy id: currently fixed at reliability + pos = -1; // not actually consuming an argument + printf ("%s%"PRIu32" %d", sep, *(uint32_t *)p, (int) DDS_RELIABILITY_QOS_POLICY_ID); fflush (stdout); + CU_ASSERT_FATAL (*(uint32_t *)p == (uint32_t) DDS_RELIABILITY_QOS_POLICY_ID); + break; + case 'R': + if (sscanf (args, "%31[^,)]%n", str, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) + abort (); + if (strcmp (str, "i") == 0) + i = (int) DDS_REJECTED_BY_INSTANCES_LIMIT; + else if (strcmp (str, "s") == 0) + i = (int) DDS_REJECTED_BY_SAMPLES_LIMIT; + else if (strcmp (str, "spi") == 0) + i = (int) DDS_REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT; + else + abort (); + printf ("%s%d %d", sep, (int) *(dds_sample_rejected_status_kind *)p, i); fflush (stdout); + CU_ASSERT_FATAL (*(dds_sample_rejected_status_kind *)p == (dds_sample_rejected_status_kind) i); + break; + case 'I': // instance handle is too complicated + pos = -1; // not actually consuming an argument + break; + case 'E': { + int ent1 = -1; + dds_instance_handle_t esi1 = 0; + if (sscanf (args, "%31[^,)]%n", str, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) + abort (); + if (strcmp (str, "*") != 0 && (ent1 = getentity (str, NULL, NULL)) < 0) + abort (); + if (ent1 != -1) + esi1 = lookup_insthandle (es, ent, ent1); + printf ("%s%"PRIx64" %"PRIx64, sep, *(dds_instance_handle_t *)p, esi1); fflush (stdout); + CU_ASSERT_FATAL (ent1 == -1 || *(dds_instance_handle_t *)p == esi1); + break; + } + default: abort (); + } + args += pos + 1; + sep = ", "; + d++; + } + printf (")"); + assert (*args == 0); + assert (off <= lldesc[ll].size); + DDSRT_WARNING_MSVC_ON(4996); } -CU_Test(ddsc_listener, publication_matched, .init=init_triggering_test, .fini=fini_triggering_test) +/** @brief run a "test" consisting of a sequence of simplish operations + * + * This operation takes a test description, really a program in a bizarre syntax, and executes it. Any failures, + * be it because of error codes coming out of the Cyclone calls or expected values being wrong cause it to fail + * the test via CU_ASSERT_FATAL. While it is doing this, it outputs the test steps to stdout including some + * actual values. An invalid program is mostly reported by calling abort(). It is geared towards checking for + * listener invocations and the effects on statuses. + * + * Entities in play: + * + * - participants: P P' + * - subscribers: R R' + * - publishers: W W' + * - readers: r s t r' s' t' + * - writers: w x y w' x' y' + * + * The unprimed ones exist in domain 0, the primed ones in domain 1 (but configured such that it talks to + * domain 0), so that network-related listener invocations can be checked as well. + * + * The first mention of an entity creates it as well as its ancestors. Implicitly created ancestors always have + * standard QoS and have no listeners. There is one topic that is created implicitly when the participant is + * created. + * + * Standard QoS is: default + reliable (100ms), by-source-timestamp, keep-all. + * The QoS of a reader/writer can be altered at the first mention of it by suffixing its name with "!" and/or "#" + * (the apostrophe is part of the name, so w#! or r'! are valid). Those suffixes are ignored if the entity + * already exists. + * + * A program consists of a sequence of operations separated by whitespace, ';' or '/' (there is no meaning to the + * separators, they exist to allow visual grouping): + * + * PROGRAM ::= (OP (\s+|[/;])*)* + * + * OP ::= (LISTENER)* ENTITY-NAME + * if entity ENTITY-NAME does not exist: + * creates the entity with the given listeners installed + * else + * changes the entity's listeners to the specified ones + * (see above for the valid ENTITY-NAMEs) + * | -ENTITY-NAME + * deletes the specified entity + * | WRITE-LIKE[fail][@DT] KEY + * writes/disposes/unregisters key KEY (an integer), if "fail" is appended, the + * expectation is that it fails with a timeout, if @DT is appended, the timestamp is the + * start time of the test +
s rather than the current time; DT is a floating-point + * number + * | READ-LIKE[(A,B))] + * reads/takes at most 10 samples, counting the number of valid and invalid samples seen + * and checking it against A and B if given + * | ?LISTENER[(ARGS)] + * waits until the specified listener has been invoked on using a flag set + * by the listener function, resets the flag and verifies that neither the entity status + * bit nor the "change" fields in the various statuses were set + * ARGS is used to check the status argument most recently passed to the listener: + * it (A,B) verifies count and change match A and B, policy matches RELIABILITY + * lc (A,B,C,D,E) verifies that alive and not-alive counts match A and B, that + * alive and not-alive changes match C and D and that the last handle matches + * E if an entity name (ignored if E = "*") + * ll (A,B) verifies count and change match A and B + * odm (A,B) verifies count and change match A and B, last handle is ignored + * oiq (A,B) verifies that total count and change match A and B and that the + * mismatching QoS is reliability (the only one that can for now) + * pm (A,B,C,D,E) verifies that total count and change match A and B, that + * current count and change match C and D and that the last handle matches E + * if an entity name (ignored if E = "*") + * rdm see odm + * riq see oiq + * sl (A,B) verifies that total count and change match A and B + * sr (A,B,C) verifies total count and change match A and B, and that the reason + * matches C (one of "s" for samples, "i" for instances, "spi" for samples + * per instance) + * sm see pm + * | ?!LISTENER + * (not listener) tests that LISTENER has not been invoked since last reset + * | sleep D + * delay program execution for D s (D is a floating-point number) + * WRITE-LIKE ::= wr write + * | wrdisp write-dispose + * | disp dispose + * | unreg unregister + * READ-LIKE ::= read dds_read (so any state) + * | take dds_take (so any state) + * LISTENER ::= da data available (acts on a reader) + * | dor data on readers (acts on a subcsriber) + * | it incompatible topic (acts on a topic) + * | lc liveliness changed (acts on a reader) + * | ll liveliness lost (acts on a writer) + * | odm offered deadline missed (acts on a writer) + * | oiq offered incompatible QoS (acts on a writer) + * | pm publication matched (acts on a writer) + * | rdm requested deadline missed (acts on a reader) + * | riq requested incompatible QoS (acts on a reader) + * | sl sample lost (acts on a reader) + * | sr sample rejected (acts on a reader) + * | sm subscription matched (acts on a reader) + * + * All entities share the listeners with their global state. Only the latest invocation is visible. + * + * @param[in] ops Program to execute. + */ +static void dotest (const char *ops) { - dds_publication_matched_status_t publication_matched; - dds_instance_handle_t reader_hdl; - dds_return_t ret; - uint32_t triggered; - uint32_t status; + DDSRT_WARNING_MSVC_OFF(4996); // use of sscanf triggers a warning + static const char *sep = " /;\n\t\r\v"; + char *opscopy = ddsrt_strdup (ops), *cursor = opscopy, *tok; + struct ents es; + dds_return_t ret; + Space_Type1 sample; + char topicname[100]; + dds_qos_t *qos = dds_create_qos (), *rwqos = dds_create_qos (); + dds_listener_t *list = dds_create_listener (NULL); + const dds_time_t tref = dds_time (); + CU_ASSERT_FATAL (qos != NULL); + CU_ASSERT_FATAL (rwqos != NULL); + CU_ASSERT_FATAL (list != NULL); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS (100)); + dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + memset (&es, 0, sizeof (es)); + memset (&sample, 0, sizeof (sample)); - /* Get reader handle that should be part of the status. */ - ret = dds_get_instance_handle(g_reader, &reader_hdl); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + ddsrt_mutex_init (&g_mutex); + ddsrt_cond_init (&g_cond); + ddsrt_mutex_lock (&g_mutex); + cb_called = 0; + ddsrt_mutex_unlock (&g_mutex); - /* Publication matched should be triggered with the right status. */ - triggered = waitfor_cb(DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_PUBLICATION_MATCHED_STATUS, DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_writer, g_writer); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.current_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.current_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.total_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.total_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.last_subscription_handle, reader_hdl); + create_unique_topic_name ("ddsc_listener_test", topicname, 100); + printf ("dotest: %s\n", ops); + printf ("topic: %s\n", topicname); + while ((tok = strsep_noempty (&cursor, sep)) != NULL) + { + int ent, ll; + bool isbang, ishash; + if ((ent = getentity (tok, &isbang, &ishash)) >= 0) + { + make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); + } + else if (*tok == '-' && (ent = getentity (tok + 1, NULL, NULL)) >= 0) + { + // delete deliberately leaves the instance handle in place for checking + // the publication/subscription handle in subscription matched/publication + // matched for a lost match + printf ("delete %"PRId32"\n", es.es[ent]); + ret = dds_delete (es.es[ent]); + CU_ASSERT_FATAL (ret == 0); + es.es[ent] = 0; + } + else if ((ll = getlistener (tok, &isbang)) >= 0) + { + printf ("set listener:"); + dds_reset_listener (list); + do { + printf (" %s", tok); + switch (ll) + { + case 0: dds_lset_data_available (list, isbang ? 0 : data_available_cb); break; + case 1: dds_lset_data_on_readers (list, isbang ? 0 : data_on_readers_cb); break; + case 2: dds_lset_inconsistent_topic (list, isbang ? 0: inconsistent_topic_cb); break; + case 3: dds_lset_liveliness_changed (list, isbang ? 0 : liveliness_changed_cb); break; + case 4: dds_lset_liveliness_lost (list, isbang ? 0 : liveliness_lost_cb); break; + case 5: dds_lset_offered_deadline_missed (list, isbang ? 0 : offered_deadline_missed_cb); break; + case 6: dds_lset_offered_incompatible_qos (list, isbang ? 0 : offered_incompatible_qos_cb); break; + case 7: dds_lset_publication_matched (list, isbang ? 0 : publication_matched_cb); break; + case 8: dds_lset_requested_deadline_missed (list, isbang ? 0 : requested_deadline_missed_cb); break; + case 9: dds_lset_requested_incompatible_qos (list, isbang ? 0 : requested_incompatible_qos_cb); break; + case 10: dds_lset_sample_lost (list, isbang ? 0 : sample_lost_cb); break; + case 11: dds_lset_sample_rejected (list, isbang ? 0 : sample_rejected_cb); break; + case 12: dds_lset_subscription_matched (list, isbang ? 0 : subscription_matched_cb); break; + default: abort (); + } + } while ((tok = strsep_noempty (&cursor, sep)) != NULL && (ll = getlistener (tok, &isbang)) >= 0); + if (tok == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) + abort (); + if (es.es[ent] == 0) + { + printf (" for "); + make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, list); + } + else + { + dds_listener_t *tmplist = dds_create_listener (NULL); + CU_ASSERT_FATAL (tmplist != NULL); + ret = dds_get_listener (es.es[ent], tmplist); + CU_ASSERT_FATAL (ret == 0); + dds_merge_listener (list, tmplist); + dds_delete_listener (tmplist); + printf (" on entity %"PRId32"\n", es.es[ent]); + ret = dds_set_listener (es.es[ent], list); + CU_ASSERT_FATAL (ret == 0); + } + } + else if (strncmp (tok, "wr", 2) == 0 || strncmp (tok, "disp", 4) == 0 || strncmp (tok, "unreg", 5) == 0) + { + dds_return_t (*fn) (dds_entity_t wr, const void *sample, dds_time_t ts) = 0; + double dt = 0.0; + dds_time_t ts = dds_time (); + char *cmd = tok; + bool expectfail = false; + int off, pos, key; + if ((tok = strsep_noempty (&cursor, sep)) == NULL) + abort (); + if (sscanf (tok, "%d%n", &key, &pos) != 1 || tok[pos] != 0) + abort (); + if ((tok = strsep_noempty (&cursor, sep)) == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) + abort (); + if (es.es[ent] == 0) + make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); + switch (cmd[0]) + { + case 'w': + if (strncmp (cmd + 2, "disp", 4) == 0) { + off = 6; fn = dds_writedispose_ts; + } else { + off = 2; fn = dds_write_ts; + } + break; + case 'd': off = 4; fn = dds_dispose_ts; break; + case 'u': off = 5; fn = dds_unregister_instance_ts; break; + default: abort (); + } + if (strncmp (cmd + off, "fail", 4) == 0) + { + expectfail = true; + off += 4; + } + if (cmd[off] == '@') + { + if (sscanf (cmd + off, "@%lf%n", &dt, &pos) != 1 || cmd[off + pos] != 0) + abort (); + ts = tref + (dds_time_t) (dt * 1e9); + } + sample.long_1 = key; + printf ("entity %"PRId32": %*.*s@%"PRId64".%09"PRId64" %d\n", es.es[ent], off, off, cmd, ts / DDS_NSECS_IN_SEC, ts % DDS_NSECS_IN_SEC, key); + ret = fn (es.es[ent], &sample, ts); + if (expectfail) { + CU_ASSERT_FATAL (ret == DDS_RETCODE_TIMEOUT); + } else { + CU_ASSERT_FATAL (ret == 0); + } + } + else if (strncmp (tok, "take", 4) == 0 || strncmp(tok, "read", 4) == 0) + { + char *args = (tok[4] ? tok + 4 : NULL); + int exp_nvalid = -1, exp_ninvalid = -1, pos; + dds_return_t (*fn) (dds_entity_t, void **buf, dds_sample_info_t *, size_t, uint32_t); + fn = (strncmp (tok, "take", 4) == 0) ? dds_take : dds_read; + assert (args == NULL || *args == '('); + if (args && (sscanf (args, "(%d,%d)%n", &exp_nvalid, &exp_ninvalid, &pos) != 2 || args[pos] != 0)) + abort (); + if ((tok = strsep_noempty (&cursor, sep)) == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) + abort (); + if (es.es[ent] == 0) + make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); + printf ("entity %"PRId32": %s", es.es[ent], (fn == dds_take) ? "take" : "read"); + fflush (stdout); + void *raw[10] = { NULL }; + dds_sample_info_t si[10]; + const uint32_t maxs = (uint32_t) (sizeof (raw) / sizeof (raw[0])); + int count[2] = { 0, 0 }; + ret = fn (es.es[ent], raw, si, maxs, maxs); + CU_ASSERT_FATAL (ret >= 0); + for (int32_t i = 0; i < ret; i++) + count[si[i].valid_data]++; + ret = dds_return_loan (es.es[ent], raw, ret); + CU_ASSERT_FATAL (ret == 0); + printf (" valid %d %d invalid %d %d\n", count[1], exp_nvalid, count[0], exp_ninvalid); + if (exp_nvalid >= 0) + CU_ASSERT_FATAL (count[1] == exp_nvalid); + if (exp_ninvalid >= 0) + CU_ASSERT_FATAL (count[0] == exp_ninvalid); + } + else if (tok[0] == '?') + { + const bool expectclear = (tok[1] == '!'); + const char *llname = tok + (expectclear ? 2 : 1); + char *checkargs; + if ((checkargs = strchr (llname, '(')) != NULL) + *checkargs = 0; // clear so getlistener groks the input + if ((ll = getlistener (llname, NULL)) < 0) + abort (); + if (expectclear) + { + printf ("listener %s: check not called", llname); + fflush (stdout); + ddsrt_mutex_lock (&g_mutex); + printf (" cb_called %"PRIx32" %s\n", cb_called, (cb_called & lldesc[ll].mask) == 0 ? "ok" : "fail"); + CU_ASSERT_FATAL ((cb_called & lldesc[ll].mask) == 0); + ddsrt_mutex_unlock (&g_mutex); + } + else + { + bool signalled = true; + uint32_t status; + if ((tok = strsep_noempty (&cursor, sep)) == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) + abort (); + if (es.es[ent] == 0) + make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); + if ((size_t) ll >= sizeof (lldesc) / sizeof (*lldesc)) + abort (); + printf ("listener %s: check called for entity %"PRId32, llname, es.es[ent]); + fflush (stdout); + ddsrt_mutex_lock (&g_mutex); + while ((cb_called & lldesc[ll].mask) == 0 && signalled) + signalled = ddsrt_cond_waitfor (&g_cond, &g_mutex, DDS_SECS (5)); + printf (" cb_called %"PRIx32" (%s)", cb_called, (cb_called & lldesc[ll].mask) != 0 ? "ok" : "fail"); + fflush (stdout); + CU_ASSERT_FATAL ((cb_called & lldesc[ll].mask) != 0); + printf (" cb_entity %"PRId32" %"PRId32" (%s)", *lldesc[ll].cb_entity, es.es[ent], (*lldesc[ll].cb_entity == es.es[ent]) ? "ok" : "fail"); + fflush (stdout); + CU_ASSERT_FATAL (*lldesc[ll].cb_entity == es.es[ent]); + if (!(es.doms[0] && es.doms[1])) + { + // FIXME: two domains: listener invocation happens on another thread and we can observe non-0 "change" fields + // they get updated, listener gets invoked, then they get reset -- pretty sure it is allowed by the spec, but + // not quite elegant + assert_status_change_fields_are_0 (ll, es.es[ent]); + } + if (checkargs && lldesc[ll].cb_status) + { + *checkargs = '('; // restore ( so checkargs function gets a more sensible input + checkstatus (ll, &es, ent, checkargs, lldesc[ll].cb_status); + } + printf ("\n"); + cb_called &= ~lldesc[ll].mask; + ddsrt_mutex_unlock (&g_mutex); + ret = dds_get_status_changes (es.es[ent], &status); + CU_ASSERT_FATAL (ret == 0); + CU_ASSERT_FATAL ((status & lldesc[ll].mask) == 0); + } + } + else if (strcmp (tok, "sleep") == 0) + { + if ((tok = strsep_noempty (&cursor, sep)) == NULL) + abort (); + double d; int pos; + if (sscanf (tok, "%lf%n", &d, &pos) != 1 || tok[pos] != 0) + abort (); + printf ("sleep %fs\n", d); + dds_sleepfor ((dds_duration_t) (d * 1e9)); + } + else + { + printf ("tok '%s': unrecognized\n", tok); + abort (); + } + } - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_writer, &status, DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - - /* The listener should have reset the count_change. */ - ret = dds_get_publication_matched_status(g_writer, &publication_matched); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(publication_matched.current_count, 1); - CU_ASSERT_EQUAL_FATAL(publication_matched.current_count_change, 0); - CU_ASSERT_EQUAL_FATAL(publication_matched.total_count, 1); - CU_ASSERT_EQUAL_FATAL(publication_matched.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(publication_matched.last_subscription_handle, reader_hdl); - - /* Reset the trigger flags. */ - ddsrt_mutex_lock(&g_mutex); - cb_called = 0; - ddsrt_mutex_unlock(&g_mutex); - - /* Un-match the publication by deleting the reader. */ - dds_delete(g_reader); - - /* Publication matched should be triggered with the right status. */ - triggered = waitfor_cb(DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_PUBLICATION_MATCHED_STATUS, DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_writer, g_writer); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.current_count, 0); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.current_count_change, -1); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.total_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(cb_publication_matched_status.last_subscription_handle, reader_hdl); - - /* The listener should have reset the count_change. */ - ret = dds_get_publication_matched_status(g_writer, &publication_matched); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(publication_matched.current_count, 0); - CU_ASSERT_EQUAL_FATAL(publication_matched.current_count_change, 0); - CU_ASSERT_EQUAL_FATAL(publication_matched.total_count, 1); - CU_ASSERT_EQUAL_FATAL(publication_matched.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(publication_matched.last_subscription_handle, reader_hdl); + dds_delete_listener (list); + dds_delete_qos (rwqos); + dds_delete_qos (qos); + // prevent any listeners from being invoked so we can safely delete the + // mutex and the condition variable -- must do this going down the + // hierarchy, or listeners may remain set through inheritance + for (size_t i = 0; i < sizeof (es.es) / sizeof (es.es[0]); i++) + { + if (es.es[i]) + { + ret = dds_set_listener (es.es[i], NULL); + CU_ASSERT_FATAL (ret == 0); + } + } + ddsrt_mutex_destroy (&g_mutex); + ddsrt_cond_destroy (&g_cond); + for (size_t i = 0; i < sizeof (es.doms) / sizeof (es.doms[0]); i++) + { + if (es.doms[i]) + { + ret = dds_delete (es.doms[i]); + CU_ASSERT_FATAL (ret == 0); + } + } + ddsrt_free (opscopy); + DDSRT_WARNING_MSVC_ON(4996); } -CU_Test(ddsc_listener, subscription_matched, .init=init_triggering_test, .fini=fini_triggering_test) +/************************************************** + **** **** + **** listener invocation checks **** + **** **** + **************************************************/ + +CU_Test (ddsc_listener, propagation) { - dds_subscription_matched_status_t subscription_matched; - dds_instance_handle_t writer_hdl; - dds_return_t ret; - uint32_t triggered; - uint32_t status; - - /* Get writer handle that should be part of the status. */ - ret = dds_get_instance_handle(g_writer, &writer_hdl); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Subscription matched should be triggered with the right status. */ - triggered = waitfor_cb(DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_SUBSCRIPTION_MATCHED_STATUS, DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.current_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.current_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.total_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.total_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.last_publication_handle, writer_hdl); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_reader, &status, DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - - /* The listener should have reset the count_change. */ - ret = dds_get_subscription_matched_status(g_reader, &subscription_matched); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(subscription_matched.current_count, 1); - CU_ASSERT_EQUAL_FATAL(subscription_matched.current_count_change, 0); - CU_ASSERT_EQUAL_FATAL(subscription_matched.total_count, 1); - CU_ASSERT_EQUAL_FATAL(subscription_matched.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(subscription_matched.last_publication_handle, writer_hdl); - - /* Reset the trigger flags. */ - ddsrt_mutex_lock(&g_mutex); - cb_called = 0; - ddsrt_mutex_unlock(&g_mutex); - - /* Un-match the subscription by deleting the writer. */ - dds_delete(g_writer); - - /* Subscription matched should be triggered with the right status. */ - triggered = waitfor_cb(DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_SUBSCRIPTION_MATCHED_STATUS, DDS_SUBSCRIPTION_MATCHED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.current_count, 0); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.current_count_change, -1); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.total_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(cb_subscription_matched_status.last_publication_handle, writer_hdl); - - /* The listener should have reset the count_change. */ - ret = dds_get_subscription_matched_status(g_reader, &subscription_matched); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(subscription_matched.current_count, 0); - CU_ASSERT_EQUAL_FATAL(subscription_matched.current_count_change, 0); - CU_ASSERT_EQUAL_FATAL(subscription_matched.total_count, 1); - CU_ASSERT_EQUAL_FATAL(subscription_matched.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(subscription_matched.last_publication_handle, writer_hdl); + // data-on-readers set on a participant at creation time must not trigger for + // the readers for DCPSPublication and DCPSSubscription: those events must be + // invisible for the test logic to work reliably. Installing a dummy listener + // for it on the reader should prevent that from happening + dotest ("da dor lc sm P ; ?!dor ?!da ?!sm ?!lc"); + // writing data should trigger data-available unless data-on-readers is set + dotest ("da lc sm P ; r ; wr 0 w ; ?da r ?sm r ?lc r"); + dotest ("da dor lc sm P ; r ; wr 0 w ; ?!da ; ?dor R ?sm r ?lc r"); + // setting listeners after entity creation should work, too + dotest ("P W R ; dor P pm W sm R ; r w ; ?sm r ?pm w ; wr 0 w ; ?dor R ; ?!da"); } -CU_Test(ddsc_listener, incompatible_qos, .init=init_triggering_base, .fini=fini_triggering_base) +CU_Test (ddsc_listener, matched) { - dds_offered_incompatible_qos_status_t offered_incompatible_qos; - dds_requested_incompatible_qos_status_t requested_incompatible_qos; - dds_return_t ret; - uint32_t triggered; - uint32_t status; - - /* We are interested in incompatible qos notifications. */ - dds_lset_offered_incompatible_qos(g_listener, offered_incompatible_qos_cb); - dds_lset_requested_incompatible_qos(g_listener, requested_incompatible_qos_cb); - - /* Create reader and writer with proper listeners. - * But create reader with persistent durability to get incompatible qos. */ - g_writer = dds_create_writer(g_participant, g_topic, g_qos, g_listener); - CU_ASSERT_FATAL(g_writer > 0); - dds_qset_durability (g_qos, DDS_DURABILITY_PERSISTENT); - g_reader = dds_create_reader(g_participant, g_topic, g_qos, g_listener); - CU_ASSERT_FATAL(g_reader > 0); - - /* Incompatible QoS should be triggered with the right status. */ - triggered = waitfor_cb(DDS_OFFERED_INCOMPATIBLE_QOS_STATUS | DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - CU_ASSERT_EQUAL_FATAL(cb_writer, g_writer); - CU_ASSERT_EQUAL_FATAL(cb_offered_incompatible_qos_status.total_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_offered_incompatible_qos_status.total_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_offered_incompatible_qos_status.last_policy_id, DDS_DURABILITY_QOS_POLICY_ID); - CU_ASSERT_EQUAL_FATAL(cb_requested_incompatible_qos_status.total_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_requested_incompatible_qos_status.total_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_requested_incompatible_qos_status.last_policy_id, DDS_DURABILITY_QOS_POLICY_ID); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_writer, &status, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - ret = dds_read_status(g_reader, &status, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - - /* The listener should have reset the count_change. */ - ret = dds_get_offered_incompatible_qos_status(g_writer, &offered_incompatible_qos); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - ret = dds_get_requested_incompatible_qos_status(g_reader, &requested_incompatible_qos); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(offered_incompatible_qos.total_count, 1); - CU_ASSERT_EQUAL_FATAL(offered_incompatible_qos.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(offered_incompatible_qos.last_policy_id, DDS_DURABILITY_QOS_POLICY_ID); - CU_ASSERT_EQUAL_FATAL(requested_incompatible_qos.total_count, 1); - CU_ASSERT_EQUAL_FATAL(requested_incompatible_qos.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(requested_incompatible_qos.last_policy_id, DDS_DURABILITY_QOS_POLICY_ID); - - dds_delete(g_writer); - dds_delete(g_reader); + // publication & subscription matched must both trigger; note: reader/writer matching inside + // a process is synchronous, no need to check everywhere + dotest ("sm r pm w ?pm w ?sm r"); + // across the network it should work just as well (matching happens on different threads for + // remote & local entity creation, so it is meaningfully different test) + dotest ("sm r pm w' ?pm w' ?sm r"); } -CU_Test(ddsc_listener, data_available, .init=init_triggering_test, .fini=fini_triggering_test) +CU_Test (ddsc_listener, publication_matched) { - dds_return_t ret; - uint32_t triggered; - uint32_t status; - RoundTripModule_DataType sample; - memset (&sample, 0, sizeof (sample)); + // regardless of order of creation, the writer should see one reader come & then go + dotest ("sm r pm w ; ?pm(1,1,1,1,r) w ?sm r ; -r ; ?pm(1,0,0,-1,r) w"); + dotest ("pm w sm r ; ?pm(1,1,1,1,r) w ?sm r ; -r ; ?pm(1,0,0,-1,r) w"); - /* We are interested in data available notifications. */ - dds_lset_data_available(g_listener, data_available_cb); - ret = dds_set_listener(g_reader, g_listener); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + // regardless of order of creation, the writer should see one reader come & then go, also + // when a second reader introduced + dotest ("sm r pm w ; ?pm(1,1,1,1,r) w ?sm r ; t ?pm(2,1,2,1,t) w ; -r ; ?pm(2,0,1,-1,r) w"); + dotest ("pm w sm r ; ?pm(1,1,1,1,r) w ?sm r ; t ?pm(2,1,2,1,t) w ; -t ; ?pm(2,0,1,-1,t) w"); - /* Write sample. */ - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + // same with 2 domains + dotest ("sm r pm w' ; ?pm(1,1,1,1,r) w' ?sm r ; -r ; ?pm(1,0,0,-1,r) w'"); + dotest ("pm w sm r' ; ?pm(1,1,1,1,r') w ?sm r' ; -r' ; ?pm(1,0,0,-1,r') w"); - /* Data available should be triggered with the right status. */ - triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - - /* Deleting the writer causes unregisters (or dispose+unregister), and those - should trigger DATA_AVAILABLE as well */ - ddsrt_mutex_lock(&g_mutex); - cb_called = 0; - cb_reader = 0; - ddsrt_mutex_unlock(&g_mutex); - ret = dds_delete (g_writer); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - g_writer = 0; - triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); + dotest ("sm r pm w' ; ?pm(1,1,1,1,r) w' ?sm r ; t ?pm(2,1,2,1,t) w' ; -r ; ?pm(2,0,1,-1,r) w'"); + dotest ("pm w sm r' ; ?pm(1,1,1,1,r') w ?sm r' ; t ?pm(2,1,2,1,t) w ; -t ; ?pm(2,0,1,-1,t) w"); + dotest ("sm r pm w' ; ?pm(1,1,1,1,r) w' ?sm r ; t' ?pm(2,1,2,1,t') w' ; -r ; ?pm(2,0,1,-1,r) w'"); + dotest ("pm w sm r' ; ?pm(1,1,1,1,r') w ?sm r' ; t' ?pm(2,1,2,1,t') w ; -t' ; ?pm(2,0,1,-1,t') w"); } -CU_Test(ddsc_listener, data_available_delete_writer, .init=init_triggering_test, .fini=fini_triggering_test) +CU_Test (ddsc_listener, subscription_matched) { - dds_return_t ret; - uint32_t triggered; - uint32_t status; - RoundTripModule_DataType sample; - memset (&sample, 0, sizeof (sample)); + // regardless of order of creation, the reader should see one writer come & then go + dotest ("sm r pm w ; ?pm w ?sm(1,1,1,1,w) r ; -w ; ?sm(1,0,0,-1,w) r"); + dotest ("pm w sm r ; ?pm w ?sm(1,1,1,1,w) r ; -w ; ?sm(1,0,0,-1,w) r"); - /* We are interested in data available notifications. */ - dds_lset_data_available(g_listener, data_available_cb); - ret = dds_set_listener(g_reader, g_listener); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + // regardless of order of creation, the reader should see one writer come & then go, also + // when a second writer is introduced + dotest ("sm r pm w ; ?pm w ?sm(1,1,1,1,w) r ; x ?sm(2,1,2,1,x) r ; -w ; ?sm(2,0,1,-1,w) r"); + dotest ("pm w sm r ; ?pm w ?sm(1,1,1,1,w) r ; x ?sm(2,1,2,1,x) r ; -x ; ?sm(2,0,1,-1,x) r"); - /* Write sample, wait for the listener to swallow the status. */ - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); + // same with 2 domains + dotest ("sm r pm w' ; ?pm w' ?sm(1,1,1,1,w') r ; -w' ; ?sm(1,0,0,-1,w') r"); + dotest ("pm w sm r' ; ?pm w ?sm(1,1,1,1,w) r' ; -w ; ?sm(1,0,0,-1,w) r'"); - /* Deleting the writer must trigger DATA_AVAILABLE as well */ - ddsrt_mutex_lock(&g_mutex); - cb_called = 0; - cb_reader = 0; - ddsrt_mutex_unlock(&g_mutex); - ret = dds_delete (g_writer); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - g_writer = 0; - triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); + dotest ("sm r pm w' ; ?pm w' ?sm(1,1,1,1,w') r ; x ?sm(2,1,2,1,x) r ; -w' ; ?sm(2,0,1,-1,w') r"); + dotest ("pm w sm r' ; ?pm w ?sm(1,1,1,1,w) r' ; x ?sm(2,1,2,1,x) r' ; -x ; ?sm(2,0,1,-1,x) r'"); + dotest ("sm r pm w' ; ?pm w' ?sm(1,1,1,1,w') r ; x' ?sm(2,1,2,1,x') r ; -w' ; ?sm(2,0,1,-1,w') r"); + dotest ("pm w sm r' ; ?pm w ?sm(1,1,1,1,w) r' ; x' ?sm(2,1,2,1,x') r' ; -x' ; ?sm(2,0,1,-1,x') r'"); } -CU_Test(ddsc_listener, data_available_delete_writer_disposed, .init=init_triggering_test, .fini=fini_triggering_test) +CU_Test (ddsc_listener, incompatible_qos) { - dds_return_t ret; - uint32_t triggered; - uint32_t status; - RoundTripModule_DataType sample; - memset (&sample, 0, sizeof (sample)); - - /* We are interested in data available notifications. */ - dds_lset_data_available(g_listener, data_available_cb); - ret = dds_set_listener(g_reader, g_listener); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Write & dispose sample and take it so that the instance is empty & disposed. Then deleting - the writer should silently drop the instance. */ - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - ret = dds_dispose(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - triggered = waitfor_cb(DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - - /* Take all data so that the instance becomes empty & disposed */ - do { - void *sampleptr = &sample; - dds_sample_info_t info; - ret = dds_take (g_reader, &sampleptr, &info, 1, 1); - } while (ret > 0); - - /* Deleting the writer should not trigger DATA_AVAILABLE with all instances empty & disposed */ - ddsrt_mutex_lock(&g_mutex); - cb_called = 0; - cb_reader = 0; - ddsrt_mutex_unlock(&g_mutex); - ret = dds_delete (g_writer); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - g_writer = 0; - ddsrt_mutex_lock(&g_mutex); - CU_ASSERT_EQUAL_FATAL(cb_called & DDS_DATA_AVAILABLE_STATUS_ID, 0); - ddsrt_mutex_unlock(&g_mutex); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); + // best-effort writer & reliable reader: both must trigger incompatible QoS event + dotest ("oiq w! riq r ; ?oiq(1,1) w ?riq(1,1) r"); + dotest ("riq r oiq w! ; ?oiq(1,1) w ?riq(1,1) r"); } -CU_Test(ddsc_listener, data_on_readers, .init=init_triggering_test, .fini=fini_triggering_test) +CU_Test (ddsc_listener, data_available) { - dds_return_t ret; - uint32_t triggered; - uint32_t status; - RoundTripModule_DataType sample; - memset (&sample, 0, sizeof (sample)); - - /* We are interested in data available notifications. */ - dds_lset_data_on_readers(g_listener, data_on_readers_cb); - ret = dds_set_listener(g_subscriber, g_listener); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Setting data available notifications should not 'sabotage' the on_readers call. */ - dds_lset_data_available(g_listener, data_available_cb); - ret = dds_set_listener(g_reader, g_listener); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Write sample. */ - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Data on readers should be triggered with the right status. */ - triggered = waitfor_cb(DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_DATA_ON_READERS_STATUS, DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_subscriber, g_subscriber); - CU_ASSERT_NOT_EQUAL_FATAL(triggered & DDS_DATA_AVAILABLE_STATUS, DDS_DATA_AVAILABLE_STATUS); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_subscriber, &status, DDS_DATA_ON_READERS_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - ret = dds_read_status(g_reader, &status, DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); + // data available on reader + dotest ("da sm r pm w ?pm w ?sm r wr 0 w ?da r ?!dor"); + // data available set on subscriber + dotest ("da R sm r pm w ?pm w ?sm r wr 0 w ?da r ?!dor"); + // data available set on participant + dotest ("da P sm r pm w ?pm w ?sm r wr 0 w ?da r ?!dor"); } - -CU_Test(ddsc_listener, sample_lost, .init=init_triggering_test, .fini=fini_triggering_test) +CU_Test (ddsc_listener, data_available_delete_writer) { - dds_sample_lost_status_t sample_lost; - dds_return_t ret; - uint32_t triggered; - dds_time_t the_past; - uint32_t status; - RoundTripModule_DataType sample; - memset (&sample, 0, sizeof (sample)); - - /* Get a time that should be historic on all platforms.*/ - the_past = dds_time() - 1000000; - - /* We are interested in sample lost notifications. */ - dds_lset_sample_lost(g_listener, sample_lost_cb); - ret = dds_set_listener(g_reader, g_listener); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Write first sample with current timestamp. */ - ret = dds_write_ts(g_writer, &sample, dds_time()); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Write second sample with older timestamp. */ - ret = dds_write_ts(g_writer, &sample, the_past); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Sample lost should be triggered with the right status. */ - triggered = waitfor_cb(DDS_SAMPLE_LOST_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_SAMPLE_LOST_STATUS, DDS_SAMPLE_LOST_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - CU_ASSERT_EQUAL_FATAL(cb_sample_lost_status.total_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_sample_lost_status.total_count_change, 1); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_reader, &status, DDS_SAMPLE_LOST_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - - /* The listener should have reset the count_change. */ - ret = dds_get_sample_lost_status(g_reader, &sample_lost); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(sample_lost.total_count, 1); - CU_ASSERT_EQUAL_FATAL(sample_lost.total_count_change, 0); + // unmatching a writer that didn't read anything has no visible effect on RHC + // subscription-matched event is generated synchronously, so "?sm r" doesn't + // really add anything (it'd be different if there are two domain instances) + dotest ("da sm r w ; -w ?sm r ?!da ; take(0,0) r"); + // after writing: auto-dispose should always trigger data available, an invalid + // sample needs to show up if there isn't an unread sample to use instead + dotest ("da r w ; wr 0 w ?da r ; -w ?da r ; take(1,0) r"); + dotest ("da r w ; wr 0 w ?da r ; read(1,0) r ; -w ?da r ; take(1,1) r"); + dotest ("da r w ; wr 0 w ?da r ; take(1,0) r ; -w ?da r ; take(0,1) r"); + // same with two writers (no point in doing this also with two domains) + dotest ("da r w x ; -w ?!da -x ?!da ; take(0,0) r"); + dotest ("da r w x ; wr 0 w ?da r ; -x ?!da ; -w ?da r ; take(1,0) r"); + dotest ("da r w x ; wr 0 w ?da r ; -w ?da r ; take(1,0) r ; -x ?!da ; take(0,0) r"); + dotest ("da r w x ; wr 0 w wr 0 x ?da r ; -w ?!da ; take(2,0) r ; -x ?da r ; take(0,1) r"); + dotest ("da r w x ; wr 0 w wr 0 x ?da r ; read(2,0) r ; -w ?!da -x ?da r ; take(2,1) r"); + dotest ("da r w x ; wr 0 w wr 0 x ?da r ; read(2,0) r ; -x ?!da -w ?da r ; take(2,1) r"); + dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; -w ?!da -x ?da r ; take(2,0) r"); + dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; -x ?!da -w ?da r ; take(2,0) r"); + dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; read(2,0) r ; -w ?!da -x ?da r ; take(2,1) r"); + dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; read(2,0) r ; -x ?!da -w ?da r ; take(2,1) r"); + dotest ("da r w x ; wr 0 w wr 0 x ?da r ; take(2,0) r ; -w ?!da -x ?da r ; take(0,1) r"); + dotest ("da r w x ; wr 0 w wr 0 x ?da r ; take(2,0) r ; -x ?!da -w ?da r ; take(0,1) r"); } -CU_Test(ddsc_listener, sample_rejected, .init=init_triggering_test, .fini=fini_triggering_test) +CU_Test (ddsc_listener, data_available_delete_writer_disposed) { - dds_sample_rejected_status_t sample_rejected; - dds_return_t ret; - uint32_t triggered; - uint32_t status; - RoundTripModule_DataType sample; - memset (&sample, 0, sizeof (sample)); + // same as data_available_delete_writer, but now with the instance disposed first + dotest ("da r w ; wr 0 w disp 0 w ?da r ; -w ?!da"); + dotest ("da r w ; wr 0 w disp 0 w ?da r ; read(1,0) r ; -w ?!da"); + dotest ("da r w ; wr 0 w disp 0 w ?da r ; take(1,0) r ; -w ?!da"); - /* We are interested in sample rejected notifications. */ - dds_lset_sample_rejected(g_listener, sample_rejected_cb); - ret = dds_set_listener(g_reader, g_listener); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 w ?da r ; read(1,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 w ?da r ; take(0,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 w ?da r ; read(1,1) r ; -x ?!da -w ?!da"); + dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 w ?da r ; take(0,1) r ; -x ?!da -w ?!da"); - /* Write more than resource limits set by the reader. */ - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - ret = dds_write(g_writer, &sample); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Sample lost should be triggered with the right status. */ - triggered = waitfor_cb(DDS_SAMPLE_REJECTED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_SAMPLE_REJECTED_STATUS, DDS_SAMPLE_REJECTED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - CU_ASSERT_EQUAL_FATAL(cb_sample_rejected_status.total_count, 2); - CU_ASSERT_EQUAL_FATAL(cb_sample_rejected_status.total_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_sample_rejected_status.last_reason, DDS_REJECTED_BY_SAMPLES_LIMIT); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_reader, &status, DDS_SAMPLE_REJECTED_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - - /* The listener should have reset the count_change. */ - ret = dds_get_sample_rejected_status(g_reader, &sample_rejected); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(sample_rejected.total_count, 2); - CU_ASSERT_EQUAL_FATAL(sample_rejected.total_count_change, 0); - CU_ASSERT_EQUAL_FATAL(cb_sample_rejected_status.last_reason, DDS_REJECTED_BY_SAMPLES_LIMIT); + dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 x ?da r ; read(1,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 x ?da r ; take(0,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 x ?da r ; read(1,1) r ; -x ?!da -w ?!da"); + dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 x ?da r ; take(0,1) r ; -x ?!da -w ?!da"); } -CU_Test(ddsc_listener, liveliness_changed, .init=init_triggering_test, .fini=fini_triggering_base) +CU_Test (ddsc_listener, data_on_readers) { - dds_liveliness_changed_status_t liveliness_changed; - dds_instance_handle_t writer_hdl; - dds_return_t ret; - uint32_t triggered; - uint32_t status; - - /* The init_triggering_test_byliveliness set our interest in liveliness. */ - - /* Get writer handle that should be part of the status. */ - ret = dds_get_instance_handle(g_writer, &writer_hdl); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - - /* Liveliness changed should be triggered with the right status. */ - triggered = waitfor_cb(DDS_LIVELINESS_CHANGED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_LIVELINESS_CHANGED_STATUS, DDS_LIVELINESS_CHANGED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.alive_count, 1); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.alive_count_change, 1); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count, 0); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count_change, 0); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.last_publication_handle, writer_hdl); - - /* The listener should have swallowed the status. */ - ret = dds_read_status(g_reader, &status, DDS_LIVELINESS_CHANGED_STATUS); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(status, 0); - - /* The listener should have reset the count_change. */ - ret = dds_get_liveliness_changed_status(g_reader, &liveliness_changed); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count, 1); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count_change, 0); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 0); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count_change, 0); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.last_publication_handle, writer_hdl); - - /* Reset the trigger flags. */ - ddsrt_mutex_lock(&g_mutex); - cb_called = 0; - ddsrt_mutex_unlock(&g_mutex); - - /* Change liveliness again by deleting the writer. */ - dds_delete(g_writer); - - /* Liveliness changed should be triggered with the right status. */ - triggered = waitfor_cb(DDS_LIVELINESS_CHANGED_STATUS); - CU_ASSERT_EQUAL_FATAL(triggered & DDS_LIVELINESS_CHANGED_STATUS, DDS_LIVELINESS_CHANGED_STATUS); - CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.alive_count, 0); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.alive_count_change, -1); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count, 0); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count_change, 0); - CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.last_publication_handle, writer_hdl); - - /* The listener should have reset the count_change. */ - ret = dds_get_liveliness_changed_status(g_reader, &liveliness_changed); - CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count, 0); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count_change, 0); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 0); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count_change, 0); - CU_ASSERT_EQUAL_FATAL(liveliness_changed.last_publication_handle, writer_hdl); + // data on readers wins from data available + dotest ("dor R da r ; wr 0 w ; ?dor R ?!da"); + dotest ("dor P da r ; wr 0 w ; ?dor R ?!da"); } -#if 0 -/* This is basically the same as the Lite test, but inconsistent topic is not triggered. - * That is actually what I would expect, because the code doesn't seem to be the way - * to go to test for inconsistent topic. */ -Test(ddsc_listener, inconsistent_topic, .init=init_triggering_base, .fini=fini_triggering_base) +CU_Test (ddsc_listener, sample_lost) { - dds_entity_t wr_topic; - dds_entity_t rd_topic; - dds_entity_t writer; - dds_entity_t reader; - uint32_t triggered; - - os_osInit(); - - ddsrt_mutex_init(&g_mutex); - ddsrt_cond_init(&g_cond); - - g_qos = dds_create_qos(); - cr_assert_not_null(g_qos, "Failed to create prerequisite g_qos"); - - g_listener = dds_create_listener(NULL); - cr_assert_not_null(g_listener, "Failed to create prerequisite g_listener"); - - g_participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL); - cr_assert_gt(g_participant, 0, "Failed to create prerequisite g_participant"); - - /* We are interested in inconsistent topics. */ - dds_lset_inconsistent_topic(g_listener, inconsistent_topic_cb); - - wr_topic = dds_create_topic(g_participant, &RoundTripModule_DataType_desc, "WRITER_TOPIC", NULL, g_listener); - cr_assert_gt(g_topic, 0, "Failed to create prerequisite wr_topic"); - - rd_topic = dds_create_topic(g_participant, &RoundTripModule_DataType_desc, "READER_TOPIC", NULL, g_listener); - cr_assert_gt(g_topic, 0, "Failed to create prerequisite rd_topic"); - - /* Create reader and writer. */ - writer = dds_create_writer(g_participant, g_topic, NULL, NULL); - cr_assert_gt(writer, 0, "Failed to create prerequisite writer"); - dds_qset_reliability (g_qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (1)); - dds_qset_history (g_qos, DDS_HISTORY_KEEP_ALL, 0); - reader = dds_create_reader(g_subscriber, g_topic, g_qos, NULL); - cr_assert_gt(reader, 0, "Failed to create prerequisite reader"); - - /* Inconsistent topic should be triggered with the right status. */ - triggered = waitfor_cb(DDS_INCONSISTENT_TOPIC_STATUS); - cr_assert_eq(triggered & DDS_INCONSISTENT_TOPIC_STATUS, DDS_INCONSISTENT_TOPIC_STATUS, "DDS_INCONSISTENT_TOPIC_STATUS not triggered"); - - dds_delete(reader); - dds_delete(writer); - dds_delete(rd_topic); - dds_delete(wr_topic); - dds_delete(g_participant); - - dds_delete_listener(g_listener); - dds_delete_qos(g_qos); + // FIXME: figure out what really constitutes a "lost sample" + dotest ("sl r ; wr@0 0 w ?!sl ; wr@-1 0 w ?sl(1,1) r"); } -#endif +CU_Test (ddsc_listener, sample_rejected) +{ + // FIXME: rejection counts with retries? + // reliable: expect timeout on the write when max samples has been reached + // invalid samples don't count towards resource limits, so dispose should + // not be blocked + dotest ("sr r# ; wr 0 w wrfail 0 w wrfail 0 w ; ?sr r"); + dotest ("sr r# ; wr 0 w wrfail 0 w ; read(1,0) r ; disp 0 w ; read(1,1) r ; ?sr r"); + + // best-effort: writes should succeed despite not delivering the data adding + // the data in the RHC, also check number of samples rejected + dotest ("sr r#! ; wr 0 w! wr 0 w wr 0 w ; ?sr(2,1,s) r"); + dotest ("sr r#! ; wr 0 w! wr 0 w ; read(1,0) r ; disp 0 w ; read(1,1) r ; ?sr(1,1,s) r"); +} + +CU_Test (ddsc_listener, liveliness_changed) +{ + // liveliness changed should trigger along with matching + dotest ("pm w lc sm r ; ?pm w ?sm r ; ?lc(1,0,1,0,w) r ; -w ; ?lc(0,0,-1,0,w) r"); + dotest ("pm w lc sm r' ; ?pm w ?sm r' ; ?lc(1,0,1,0,w) r' ; -w ; ?lc(0,0,-1,0,w) r'"); +}