diff --git a/src/core/ddsc/tests/CMakeLists.txt b/src/core/ddsc/tests/CMakeLists.txt index 0942ce3..bde5eae 100644 --- a/src/core/ddsc/tests/CMakeLists.txt +++ b/src/core/ddsc/tests/CMakeLists.txt @@ -59,7 +59,9 @@ set(ddsc_test_sources "write_various_types.c" "writer.c" "test_common.c" - "test_common.h") + "test_common.h" + "test_oneliner.c" + "test_oneliner.h") if(ENABLE_LIFESPAN) list(APPEND ddsc_test_sources "lifespan.c") diff --git a/src/core/ddsc/tests/listener.c b/src/core/ddsc/tests/listener.c index 6738cf7..2bd4112 100644 --- a/src/core/ddsc/tests/listener.c +++ b/src/core/ddsc/tests/listener.c @@ -18,23 +18,14 @@ #include "dds/ddsrt/string.h" #include "dds/ddsrt/environ.h" #include "test_common.h" - -static ddsrt_mutex_t g_mutex; -static ddsrt_cond_t g_cond; -static uint32_t cb_called; -static dds_entity_t cb_topic, cb_writer, cb_reader, cb_subscriber; +#include "test_oneliner.h" #define DEFINE_STATUS_CALLBACK(name, NAME, kind) \ - static dds_##name##_status_t cb_##name##_status; \ static void name##_cb (dds_entity_t kind, const dds_##name##_status_t status, void *arg) \ { \ (void) arg; \ - ddsrt_mutex_lock (&g_mutex); \ - cb_##kind = kind; \ - cb_##name##_status = status; \ - cb_called |= DDS_##NAME##_STATUS; \ - ddsrt_cond_broadcast (&g_cond); \ - ddsrt_mutex_unlock (&g_mutex); \ + (void) kind; \ + (void) status; \ } DEFINE_STATUS_CALLBACK (inconsistent_topic, INCONSISTENT_TOPIC, topic) @@ -51,48 +42,14 @@ DEFINE_STATUS_CALLBACK (subscription_matched, SUBSCRIPTION_MATCHED, reader) static void data_on_readers_cb (dds_entity_t subscriber, void *arg) { + (void) subscriber; (void) arg; - ddsrt_mutex_lock (&g_mutex); - cb_subscriber = subscriber; - cb_called |= DDS_DATA_ON_READERS_STATUS; - ddsrt_cond_broadcast (&g_cond); - ddsrt_mutex_unlock (&g_mutex); } static void data_available_cb (dds_entity_t reader, void *arg) { - (void)arg; - ddsrt_mutex_lock (&g_mutex); - cb_reader = reader; - cb_called |= DDS_DATA_AVAILABLE_STATUS; - ddsrt_cond_broadcast (&g_cond); - ddsrt_mutex_unlock (&g_mutex); -} - -static void dummy_data_on_readers_cb (dds_entity_t subscriber, void *arg) -{ - (void)subscriber; - (void)arg; -} - -static void dummy_data_available_cb (dds_entity_t reader, void *arg) -{ - (void)reader; - (void)arg; -} - -static void dummy_subscription_matched_cb (dds_entity_t reader, const dds_subscription_matched_status_t status, void *arg) -{ - (void)reader; - (void)status; - (void)arg; -} - -static void dummy_liveliness_changed_cb (dds_entity_t reader, const dds_liveliness_changed_status_t status, void *arg) -{ - (void)reader; - (void)status; - (void)arg; + (void) reader; + (void) arg; } static void dummy_cb (void) @@ -303,796 +260,7 @@ CU_Test(ddsc_listener, getters_setters) #undef ASSERT_CALLBACK_EQUAL -/************************************************** - **** **** - **** programmable listener checker **** - **** **** - **************************************************/ - -// These had better match the corresponding type definitions! -// n uint32_t ...count -// c int32_t ...count_change -// I instance handle of a data instance -// P uint32_t QoS policy ID -// E instance handle of an entity -// R sample_rejected_status_kind -static const struct { - size_t size; // size of status struct - const char *desc; // description of status struct - uint32_t mask; // status mask, bit in "cb_called" - const dds_entity_t *cb_entity; // which cb_... entity to look at - const void *cb_status; // cb_..._status to look at -} lldesc[] = { - { 0, NULL, DDS_DATA_AVAILABLE_STATUS, &cb_reader, NULL }, // data available - { 0, NULL, DDS_DATA_ON_READERS_STATUS, &cb_subscriber, NULL }, // data on readers - { sizeof (dds_inconsistent_topic_status_t), "nc", DDS_INCONSISTENT_TOPIC_STATUS, &cb_topic, &cb_inconsistent_topic_status }, - { sizeof (dds_liveliness_changed_status_t), "nnccE", DDS_LIVELINESS_CHANGED_STATUS, &cb_reader, &cb_liveliness_changed_status }, - { sizeof (dds_liveliness_lost_status_t), "nc", DDS_LIVELINESS_LOST_STATUS, &cb_writer, &cb_liveliness_lost_status }, - { sizeof (dds_offered_deadline_missed_status_t), "ncI", DDS_OFFERED_DEADLINE_MISSED_STATUS, &cb_writer, &cb_offered_deadline_missed_status }, - { sizeof (dds_offered_incompatible_qos_status_t), "ncP", DDS_OFFERED_INCOMPATIBLE_QOS_STATUS, &cb_writer, &cb_offered_incompatible_qos_status }, - { sizeof (dds_publication_matched_status_t), "ncncE", DDS_PUBLICATION_MATCHED_STATUS, &cb_writer, &cb_publication_matched_status }, - { sizeof (dds_requested_deadline_missed_status_t), "ncI", DDS_REQUESTED_DEADLINE_MISSED_STATUS, &cb_reader, &cb_requested_deadline_missed_status }, - { sizeof (dds_requested_incompatible_qos_status_t), "ncP", DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS, &cb_reader, &cb_requested_incompatible_qos_status }, - { sizeof (dds_sample_lost_status_t), "nc", DDS_SAMPLE_LOST_STATUS, &cb_reader, &cb_sample_lost_status }, - { sizeof (dds_sample_rejected_status_t), "ncRI", DDS_SAMPLE_REJECTED_STATUS, &cb_reader, &cb_sample_rejected_status }, - { sizeof (dds_subscription_matched_status_t), "ncncE", DDS_SUBSCRIPTION_MATCHED_STATUS, &cb_reader, &cb_subscription_matched_status } -}; - -static const void *advance (const void *status, size_t *off, char code) -{ -#define alignof(type_) offsetof (struct { char c; type_ d; }, d) - size_t align = 1, size = 1; - switch (code) - { - case 'n': case 'c': case 'P': - align = alignof (uint32_t); size = sizeof (uint32_t); - break; - case 'E': case 'I': - align = alignof (dds_instance_handle_t); size = sizeof (dds_instance_handle_t); - break; - case 'R': - align = alignof (dds_sample_rejected_status_kind); size = sizeof (dds_sample_rejected_status_kind); - break; - default: - abort (); - } -#undef alignof - *off = (*off + align - 1) & ~(align - 1); - const void *p = (const char *) status + *off; - *off += size; - return p; -} - -static void get_status (int ll, dds_entity_t ent, void *status) -{ - dds_return_t ret; - switch (ll) - { - case 2: ret = dds_get_inconsistent_topic_status (ent, status); break; - case 3: ret = dds_get_liveliness_changed_status (ent, status); break; - case 4: ret = dds_get_liveliness_lost_status (ent, status); break; - case 5: ret = dds_get_offered_deadline_missed_status (ent, status); break; - case 6: ret = dds_get_offered_incompatible_qos_status (ent, status); break; - case 7: ret = dds_get_publication_matched_status (ent, status); break; - case 8: ret = dds_get_requested_deadline_missed_status (ent, status); break; - case 9: ret = dds_get_requested_incompatible_qos_status (ent, status); break; - case 10: ret = dds_get_sample_lost_status (ent, status); break; - case 11: ret = dds_get_sample_rejected_status (ent, status); break; - case 12: ret = dds_get_subscription_matched_status (ent, status); break; - default: abort (); - } - CU_ASSERT_FATAL (ret == 0); -} - -static void assert_status_change_fields_are_0 (int ll, dds_entity_t ent) -{ - if (lldesc[ll].desc) - { - const char *d = lldesc[ll].desc; - void *status = malloc (lldesc[ll].size); - get_status (ll, ent, status); - size_t off = 0; - while (*d) - { - const uint32_t *p = advance (status, &off, *d); - if (*d == 'c') - CU_ASSERT_FATAL (*p == 0); - d++; - } - assert (off <= lldesc[ll].size); - free (status); - } -} - -static int getentity (const char *tok, bool *isbang, bool *ishash) -{ - static const char *known = "PRWrstwxy"; - const char *p; - if (isbang) - *isbang = false; - if (ishash) - *ishash = false; - if ((p = strchr (known, *tok)) == NULL) - return -1; - int ent = (int) (p - known); - if (*++tok == 0) - return ent; - if (*tok == '\'') - { - ent += (int) strlen (known); - tok++; - } - while (*tok == '!' || *tok == '#') - { - if (strchr (known + 3, *p) == NULL) - return -1; // only readers, writers - if (*tok == '!' && isbang) - *isbang = true; - else if (*tok == '#' && ishash) - *ishash = true; - tok++; - } - return (*tok == 0) ? ent : -1; -} - -static int getlistener (const char *tok, bool *isbang) -{ - // note: sort order is on full name (so sample rejected precedes subscription matched) - static const char *ls[] = { - "da", "dor", "it", "lc", "ll", "odm", "oiq", "pm", "rdm", "riq", "sl", "sr", "sm" - }; - if (isbang) - *isbang = false; - for (size_t i = 0; i < sizeof (ls) / sizeof (*ls); i++) - { - size_t n = strlen (ls[i]); - if (strncmp (tok, ls[i], n) == 0 && (tok[n] == 0 || tok[n+1] == ',')) - { - if (isbang) - *isbang = (tok[n] == '!'); - return (int) i; - } - } - return -1; -} - -struct ents { - dds_entity_t es[2 * 9]; - dds_entity_t tps[2]; - dds_entity_t doms[2]; - dds_instance_handle_t esi[2 * 9]; - // built-in topic readers for cross-referencing instance handles - dds_entity_t pubrd[2]; - dds_entity_t subrd[2]; -}; - -static void make_participant (struct ents *es, const char *topicname, int ent, const dds_qos_t *qos, dds_listener_t *list) -{ - const dds_domainid_t domid = (ent < 9) ? 0 : 1; - char *conf = ddsrt_expand_envvars ("${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}0", domid); - printf ("create domain %"PRIu32, domid); - fflush (stdout); - es->doms[domid] = dds_create_domain (domid, conf); - CU_ASSERT_FATAL (es->doms[domid] > 0); - ddsrt_free (conf); - printf (" create participant P%s", (ent < 9) ? "" : "'"); - fflush (stdout); - es->es[ent] = dds_create_participant (domid, NULL, list); - CU_ASSERT_FATAL (es->es[ent] > 0); - es->tps[domid] = dds_create_topic (es->es[ent], &Space_Type1_desc, topicname, qos, NULL); - CU_ASSERT_FATAL (es->tps[domid] > 0); - - // Create the built-in topic readers with a dummy listener to avoid any event (data available comes to mind) - // from propagating to the normal data available listener, in case it has been set on the participant. - // - // - dummy_cb aborts when it is invoked, but all reader-related listeners that can possibly trigger are set - // separately (incompatible qos, deadline missed, sample lost and sample rejected are all impossible by - // construction) - // - regarding data_on_readers: Cyclone handles listeners installed on an ancestor by *inheriting* them, - // rather than by walking up ancestor chain. Setting data_on_readers on the reader therefore overrides the - // listener set on the subscriber. It is a nice feature! - dds_listener_t *dummylist = dds_create_listener (NULL); - set_all_const (dummylist, dummy_cb); - dds_lset_data_available (dummylist, dummy_data_available_cb); - dds_lset_data_on_readers (dummylist, dummy_data_on_readers_cb); - dds_lset_subscription_matched (dummylist, dummy_subscription_matched_cb); - dds_lset_liveliness_changed (dummylist, dummy_liveliness_changed_cb); - es->pubrd[domid] = dds_create_reader (es->es[ent], DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, dummylist); - CU_ASSERT_FATAL (es->pubrd[domid] > 0); - es->subrd[domid] = dds_create_reader (es->es[ent], DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, dummylist); - CU_ASSERT_FATAL (es->subrd[domid] > 0); - dds_delete_listener (dummylist); - printf ("pubrd %"PRId32" subrd %"PRId32" sub %"PRId32"\n", es->pubrd[domid], es->subrd[domid], dds_get_parent (es->pubrd[domid])); -} - -static void make_entity1 (struct ents *es, const char *topicname, int ent, bool isbang, bool ishash, const dds_qos_t *qos, dds_qos_t *rwqos, dds_listener_t *list) -{ - dds_return_t ret; - switch (ent) - { - case 0: case 9: - make_participant (es, topicname, ent, qos, list); - break; - case 1: case 10: - if (es->es[ent-1] == 0) - { - printf ("["); - make_entity1 (es, topicname, ent-1, false, false, qos, rwqos, NULL); - printf ("] "); - } - printf ("create subscriber R%s", (ent < 9) ? "" : "'"); - fflush (stdout); - es->es[ent] = dds_create_subscriber (es->es[ent-1], NULL, list); - break; - case 2: case 11: - if (es->es[ent-2] == 0) - { - printf ("["); - make_entity1 (es, topicname, ent-2, false, false, qos, rwqos, NULL); - printf ("] "); - } - printf ("create publisher W%s", (ent < 9) ? "" : "'"); - fflush (stdout); - es->es[ent] = dds_create_publisher (es->es[ent-2], NULL, list); - break; - case 3: case 4: case 5: case 12: case 13: case 14: - if (es->es[ent < 9 ? 1 : 10] == 0) - { - printf ("["); - make_entity1 (es, topicname, ent < 9 ? 1 : 10, false, false, qos, rwqos, NULL); - printf ("] "); - } - printf ("create %s reader %c%s", isbang ? "best-effort" : "reliable", 'r' + (ent < 9 ? ent-3 : ent-12), (ent < 9) ? "" : "'"); - fflush (stdout); - dds_reset_qos (rwqos); - if (isbang) - dds_qset_reliability (rwqos, DDS_RELIABILITY_BEST_EFFORT, DDS_MSECS (100)); - if (ishash) - dds_qset_resource_limits (rwqos, 1, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); - es->es[ent] = dds_create_reader (es->es[ent < 9 ? 1 : 10], es->tps[ent < 9 ? 0 : 1], rwqos, list); - break; - case 6: case 7: case 8: case 15: case 16: case 17: - if (es->es[ent < 9 ? 2 : 11] == 0) - { - printf ("["); - make_entity1 (es, topicname, ent < 9 ? 2 : 11, false, false, qos, rwqos, NULL); - printf ("] "); - } - printf ("create %s writer %c%s", isbang ? "best-effort" : "reliable", 'w' + (ent < 9 ? ent-6 : ent-15), (ent < 9) ? "" : "'"); - fflush (stdout); - dds_reset_qos (rwqos); - if (isbang) - dds_qset_reliability (rwqos, DDS_RELIABILITY_BEST_EFFORT, DDS_MSECS (100)); - if (ishash) - dds_qset_resource_limits (rwqos, 1, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); - es->es[ent] = dds_create_writer (es->es[ent < 9 ? 2 : 11], es->tps[ent < 9 ? 0 : 1], rwqos, list); - break; - default: - abort (); - } - printf (" = %"PRId32, es->es[ent]); - fflush (stdout); - CU_ASSERT_FATAL (es->es[ent] > 0); - ret = dds_get_instance_handle (es->es[ent], &es->esi[ent]); - //printf (" %"PRIx64, es->esi[ent]); - //fflush (stdout); - CU_ASSERT_FATAL (ret == 0); -} - -static void make_entity (struct ents *es, const char *topicname, int ent, bool isbang, bool ishash, const dds_qos_t *qos, dds_qos_t *rwqos, dds_listener_t *list) -{ - make_entity1 (es, topicname, ent, isbang, ishash, qos, rwqos, list); - printf ("\n"); -} - -static char *strsep_noempty (char **cursor, const char *sep) -{ - char *tok; - while ((tok = ddsrt_strsep (cursor, sep)) != NULL && *tok == 0) { } - return tok; -} - -static dds_instance_handle_t lookup_insthandle (const struct ents *es, int ent, int ent1) -{ - // if both are in the same domain, it's easy - if (ent / 9 == ent1 / 9) - return es->esi[ent1]; - else - { - // if they aren't ... find GUID from instance handle in the one domain, - // then find instance handle for GUID in the other - dds_entity_t rd1 = 0, rd2 = 0; - switch (ent1) - { - case 3: case 4: case 5: rd1 = es->subrd[0]; rd2 = es->subrd[1]; break; - case 12: case 13: case 14: rd1 = es->subrd[1]; rd2 = es->subrd[0]; break; - case 6: case 7: case 8: rd1 = es->pubrd[0]; rd2 = es->pubrd[1]; break; - case 15: case 16: case 17: rd1 = es->pubrd[1]; rd2 = es->pubrd[0]; break; - default: abort (); - } - - dds_return_t ret; - dds_builtintopic_endpoint_t keysample; - //printf ("(in %"PRId32" %"PRIx64" -> ", rd1, es->esi[ent1]); - //fflush (stdout); - ret = dds_instance_get_key (rd1, es->esi[ent1], &keysample); - CU_ASSERT_FATAL (ret == 0); - // In principle, only key fields are set in sample returned by get_key; - // in the case of a built-in topic that is extended to the participant - // key. The qos and topic/type names should not be set, and there is no - // (therefore) memory allocated for the sample. - CU_ASSERT_FATAL (keysample.qos == NULL); - CU_ASSERT_FATAL (keysample.topic_name == NULL); - CU_ASSERT_FATAL (keysample.type_name == NULL); - //for (size_t j = 0; j < sizeof (keysample.key.v); j++) - // printf ("%s%02x", (j > 0 && j % 4 == 0) ? ":" : "", keysample.key.v[j]); - const dds_instance_handle_t ih = dds_lookup_instance (rd2, &keysample); - CU_ASSERT_FATAL (ih != 0); - //printf (" -> %"PRIx64")", ih); - //fflush (stdout); - return ih; - } -} - -static void checkstatus (int ll, const struct ents *es, int ent, const char *args, const void *status) -{ - DDSRT_WARNING_MSVC_OFF(4996); // use of sscanf triggers a warning - if (*args == 0) - return; - if (*args++ != '(') - abort (); - assert (lldesc[ll].desc != NULL); - const char *d = lldesc[ll].desc; - const char *sep = "("; - size_t off = 0; - while (*d) - { - const void *p = advance (status, &off, *d); - char str[32]; - unsigned u; - int i, pos = -1; - switch (*d) - { - case 'n': - if (sscanf (args, "%u%n", &u, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) - abort (); - printf ("%s%"PRIu32" %u", sep, *(uint32_t *)p, u); fflush (stdout); - CU_ASSERT_FATAL (*(uint32_t *)p == u); - break; - case 'c': - if (sscanf (args, "%d%n", &i, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) - abort (); - printf ("%s%"PRId32" %d", sep, *(int32_t *)p, i); fflush (stdout); - CU_ASSERT_FATAL (*(int32_t *)p == i); - break; - case 'P': // policy id: currently fixed at reliability - pos = -1; // not actually consuming an argument - printf ("%s%"PRIu32" %d", sep, *(uint32_t *)p, (int) DDS_RELIABILITY_QOS_POLICY_ID); fflush (stdout); - CU_ASSERT_FATAL (*(uint32_t *)p == (uint32_t) DDS_RELIABILITY_QOS_POLICY_ID); - break; - case 'R': - if (sscanf (args, "%31[^,)]%n", str, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) - abort (); - if (strcmp (str, "i") == 0) - i = (int) DDS_REJECTED_BY_INSTANCES_LIMIT; - else if (strcmp (str, "s") == 0) - i = (int) DDS_REJECTED_BY_SAMPLES_LIMIT; - else if (strcmp (str, "spi") == 0) - i = (int) DDS_REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT; - else - abort (); - printf ("%s%d %d", sep, (int) *(dds_sample_rejected_status_kind *)p, i); fflush (stdout); - CU_ASSERT_FATAL (*(dds_sample_rejected_status_kind *)p == (dds_sample_rejected_status_kind) i); - break; - case 'I': // instance handle is too complicated - pos = -1; // not actually consuming an argument - break; - case 'E': { - int ent1 = -1; - dds_instance_handle_t esi1 = 0; - if (sscanf (args, "%31[^,)]%n", str, &pos) != 1 || (args[pos] != ',' && args[pos] != ')')) - abort (); - if (strcmp (str, "*") != 0 && (ent1 = getentity (str, NULL, NULL)) < 0) - abort (); - if (ent1 != -1) - esi1 = lookup_insthandle (es, ent, ent1); - printf ("%s%"PRIx64" %"PRIx64, sep, *(dds_instance_handle_t *)p, esi1); fflush (stdout); - CU_ASSERT_FATAL (ent1 == -1 || *(dds_instance_handle_t *)p == esi1); - break; - } - default: abort (); - } - args += pos + 1; - sep = ", "; - d++; - } - printf (")"); - assert (*args == 0); - assert (off <= lldesc[ll].size); - DDSRT_WARNING_MSVC_ON(4996); -} - -/** @brief run a "test" consisting of a sequence of simplish operations - * - * This operation takes a test description, really a program in a bizarre syntax, and executes it. Any failures, - * be it because of error codes coming out of the Cyclone calls or expected values being wrong cause it to fail - * the test via CU_ASSERT_FATAL. While it is doing this, it outputs the test steps to stdout including some - * actual values. An invalid program is mostly reported by calling abort(). It is geared towards checking for - * listener invocations and the effects on statuses. - * - * Entities in play: - * - * - participants: P P' - * - subscribers: R R' - * - publishers: W W' - * - readers: r s t r' s' t' - * - writers: w x y w' x' y' - * - * The unprimed ones exist in domain 0, the primed ones in domain 1 (but configured such that it talks to - * domain 0), so that network-related listener invocations can be checked as well. - * - * The first mention of an entity creates it as well as its ancestors. Implicitly created ancestors always have - * standard QoS and have no listeners. There is one topic that is created implicitly when the participant is - * created. - * - * Standard QoS is: default + reliable (100ms), by-source-timestamp, keep-all. - * The QoS of a reader/writer can be altered at the first mention of it by suffixing its name with "!" and/or "#" - * (the apostrophe is part of the name, so w#! or r'! are valid). Those suffixes are ignored if the entity - * already exists. - * - * A program consists of a sequence of operations separated by whitespace, ';' or '/' (there is no meaning to the - * separators, they exist to allow visual grouping): - * - * PROGRAM ::= (OP (\s+|[/;])*)* - * - * OP ::= (LISTENER)* ENTITY-NAME - * if entity ENTITY-NAME does not exist: - * creates the entity with the given listeners installed - * else - * changes the entity's listeners to the specified ones - * (see above for the valid ENTITY-NAMEs) - * | -ENTITY-NAME - * deletes the specified entity - * | WRITE-LIKE[fail][@DT] KEY - * writes/disposes/unregisters key KEY (an integer), if "fail" is appended, the - * expectation is that it fails with a timeout, if @DT is appended, the timestamp is the - * start time of the test +
s rather than the current time; DT is a floating-point - * number - * | READ-LIKE[(A,B))] - * reads/takes at most 10 samples, counting the number of valid and invalid samples seen - * and checking it against A and B if given - * | ?LISTENER[(ARGS)] - * waits until the specified listener has been invoked on using a flag set - * by the listener function, resets the flag and verifies that neither the entity status - * bit nor the "change" fields in the various statuses were set - * ARGS is used to check the status argument most recently passed to the listener: - * it (A,B) verifies count and change match A and B, policy matches RELIABILITY - * lc (A,B,C,D,E) verifies that alive and not-alive counts match A and B, that - * alive and not-alive changes match C and D and that the last handle matches - * E if an entity name (ignored if E = "*") - * ll (A,B) verifies count and change match A and B - * odm (A,B) verifies count and change match A and B, last handle is ignored - * oiq (A,B) verifies that total count and change match A and B and that the - * mismatching QoS is reliability (the only one that can for now) - * pm (A,B,C,D,E) verifies that total count and change match A and B, that - * current count and change match C and D and that the last handle matches E - * if an entity name (ignored if E = "*") - * rdm see odm - * riq see oiq - * sl (A,B) verifies that total count and change match A and B - * sr (A,B,C) verifies total count and change match A and B, and that the reason - * matches C (one of "s" for samples, "i" for instances, "spi" for samples - * per instance) - * sm see pm - * | ?!LISTENER - * (not listener) tests that LISTENER has not been invoked since last reset - * | sleep D - * delay program execution for D s (D is a floating-point number) - * WRITE-LIKE ::= wr write - * | wrdisp write-dispose - * | disp dispose - * | unreg unregister - * READ-LIKE ::= read dds_read (so any state) - * | take dds_take (so any state) - * LISTENER ::= da data available (acts on a reader) - * | dor data on readers (acts on a subcsriber) - * | it incompatible topic (acts on a topic) - * | lc liveliness changed (acts on a reader) - * | ll liveliness lost (acts on a writer) - * | odm offered deadline missed (acts on a writer) - * | oiq offered incompatible QoS (acts on a writer) - * | pm publication matched (acts on a writer) - * | rdm requested deadline missed (acts on a reader) - * | riq requested incompatible QoS (acts on a reader) - * | sl sample lost (acts on a reader) - * | sr sample rejected (acts on a reader) - * | sm subscription matched (acts on a reader) - * - * All entities share the listeners with their global state. Only the latest invocation is visible. - * - * @param[in] ops Program to execute. - */ -static void dotest (const char *ops) -{ - DDSRT_WARNING_MSVC_OFF(4996); // use of sscanf triggers a warning - static const char *sep = " /;\n\t\r\v"; - char *opscopy = ddsrt_strdup (ops), *cursor = opscopy, *tok; - struct ents es; - dds_return_t ret; - Space_Type1 sample; - char topicname[100]; - dds_qos_t *qos = dds_create_qos (), *rwqos = dds_create_qos (); - dds_listener_t *list = dds_create_listener (NULL); - const dds_time_t tref = dds_time (); - CU_ASSERT_FATAL (qos != NULL); - CU_ASSERT_FATAL (rwqos != NULL); - CU_ASSERT_FATAL (list != NULL); - dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS (100)); - dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); - dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); - memset (&es, 0, sizeof (es)); - memset (&sample, 0, sizeof (sample)); - - ddsrt_mutex_init (&g_mutex); - ddsrt_cond_init (&g_cond); - ddsrt_mutex_lock (&g_mutex); - cb_called = 0; - ddsrt_mutex_unlock (&g_mutex); - - create_unique_topic_name ("ddsc_listener_test", topicname, 100); - printf ("dotest: %s\n", ops); - printf ("topic: %s\n", topicname); - while ((tok = strsep_noempty (&cursor, sep)) != NULL) - { - int ent, ll; - bool isbang, ishash; - if ((ent = getentity (tok, &isbang, &ishash)) >= 0) - { - make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); - } - else if (*tok == '-' && (ent = getentity (tok + 1, NULL, NULL)) >= 0) - { - // delete deliberately leaves the instance handle in place for checking - // the publication/subscription handle in subscription matched/publication - // matched for a lost match - printf ("delete %"PRId32"\n", es.es[ent]); - ret = dds_delete (es.es[ent]); - CU_ASSERT_FATAL (ret == 0); - es.es[ent] = 0; - } - else if ((ll = getlistener (tok, &isbang)) >= 0) - { - printf ("set listener:"); - dds_reset_listener (list); - do { - printf (" %s", tok); - switch (ll) - { - case 0: dds_lset_data_available (list, isbang ? 0 : data_available_cb); break; - case 1: dds_lset_data_on_readers (list, isbang ? 0 : data_on_readers_cb); break; - case 2: dds_lset_inconsistent_topic (list, isbang ? 0: inconsistent_topic_cb); break; - case 3: dds_lset_liveliness_changed (list, isbang ? 0 : liveliness_changed_cb); break; - case 4: dds_lset_liveliness_lost (list, isbang ? 0 : liveliness_lost_cb); break; - case 5: dds_lset_offered_deadline_missed (list, isbang ? 0 : offered_deadline_missed_cb); break; - case 6: dds_lset_offered_incompatible_qos (list, isbang ? 0 : offered_incompatible_qos_cb); break; - case 7: dds_lset_publication_matched (list, isbang ? 0 : publication_matched_cb); break; - case 8: dds_lset_requested_deadline_missed (list, isbang ? 0 : requested_deadline_missed_cb); break; - case 9: dds_lset_requested_incompatible_qos (list, isbang ? 0 : requested_incompatible_qos_cb); break; - case 10: dds_lset_sample_lost (list, isbang ? 0 : sample_lost_cb); break; - case 11: dds_lset_sample_rejected (list, isbang ? 0 : sample_rejected_cb); break; - case 12: dds_lset_subscription_matched (list, isbang ? 0 : subscription_matched_cb); break; - default: abort (); - } - } while ((tok = strsep_noempty (&cursor, sep)) != NULL && (ll = getlistener (tok, &isbang)) >= 0); - if (tok == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) - abort (); - if (es.es[ent] == 0) - { - printf (" for "); - make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, list); - } - else - { - dds_listener_t *tmplist = dds_create_listener (NULL); - CU_ASSERT_FATAL (tmplist != NULL); - ret = dds_get_listener (es.es[ent], tmplist); - CU_ASSERT_FATAL (ret == 0); - dds_merge_listener (list, tmplist); - dds_delete_listener (tmplist); - printf (" on entity %"PRId32"\n", es.es[ent]); - ret = dds_set_listener (es.es[ent], list); - CU_ASSERT_FATAL (ret == 0); - } - } - else if (strncmp (tok, "wr", 2) == 0 || strncmp (tok, "disp", 4) == 0 || strncmp (tok, "unreg", 5) == 0) - { - dds_return_t (*fn) (dds_entity_t wr, const void *sample, dds_time_t ts) = 0; - double dt = 0.0; - dds_time_t ts = dds_time (); - char *cmd = tok; - bool expectfail = false; - int off, pos, key; - if ((tok = strsep_noempty (&cursor, sep)) == NULL) - abort (); - if (sscanf (tok, "%d%n", &key, &pos) != 1 || tok[pos] != 0) - abort (); - if ((tok = strsep_noempty (&cursor, sep)) == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) - abort (); - if (es.es[ent] == 0) - make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); - switch (cmd[0]) - { - case 'w': - if (strncmp (cmd + 2, "disp", 4) == 0) { - off = 6; fn = dds_writedispose_ts; - } else { - off = 2; fn = dds_write_ts; - } - break; - case 'd': off = 4; fn = dds_dispose_ts; break; - case 'u': off = 5; fn = dds_unregister_instance_ts; break; - default: abort (); - } - if (strncmp (cmd + off, "fail", 4) == 0) - { - expectfail = true; - off += 4; - } - if (cmd[off] == '@') - { - if (sscanf (cmd + off, "@%lf%n", &dt, &pos) != 1 || cmd[off + pos] != 0) - abort (); - ts = tref + (dds_time_t) (dt * 1e9); - } - sample.long_1 = key; - printf ("entity %"PRId32": %*.*s@%"PRId64".%09"PRId64" %d\n", es.es[ent], off, off, cmd, ts / DDS_NSECS_IN_SEC, ts % DDS_NSECS_IN_SEC, key); - ret = fn (es.es[ent], &sample, ts); - if (expectfail) { - CU_ASSERT_FATAL (ret == DDS_RETCODE_TIMEOUT); - } else { - CU_ASSERT_FATAL (ret == 0); - } - } - else if (strncmp (tok, "take", 4) == 0 || strncmp(tok, "read", 4) == 0) - { - char *args = (tok[4] ? tok + 4 : NULL); - int exp_nvalid = -1, exp_ninvalid = -1, pos; - dds_return_t (*fn) (dds_entity_t, void **buf, dds_sample_info_t *, size_t, uint32_t); - fn = (strncmp (tok, "take", 4) == 0) ? dds_take : dds_read; - assert (args == NULL || *args == '('); - if (args && (sscanf (args, "(%d,%d)%n", &exp_nvalid, &exp_ninvalid, &pos) != 2 || args[pos] != 0)) - abort (); - if ((tok = strsep_noempty (&cursor, sep)) == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) - abort (); - if (es.es[ent] == 0) - make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); - printf ("entity %"PRId32": %s", es.es[ent], (fn == dds_take) ? "take" : "read"); - fflush (stdout); - void *raw[10] = { NULL }; - dds_sample_info_t si[10]; - const uint32_t maxs = (uint32_t) (sizeof (raw) / sizeof (raw[0])); - int count[2] = { 0, 0 }; - ret = fn (es.es[ent], raw, si, maxs, maxs); - CU_ASSERT_FATAL (ret >= 0); - for (int32_t i = 0; i < ret; i++) - count[si[i].valid_data]++; - ret = dds_return_loan (es.es[ent], raw, ret); - CU_ASSERT_FATAL (ret == 0); - printf (" valid %d %d invalid %d %d\n", count[1], exp_nvalid, count[0], exp_ninvalid); - if (exp_nvalid >= 0) - CU_ASSERT_FATAL (count[1] == exp_nvalid); - if (exp_ninvalid >= 0) - CU_ASSERT_FATAL (count[0] == exp_ninvalid); - } - else if (tok[0] == '?') - { - const bool expectclear = (tok[1] == '!'); - const char *llname = tok + (expectclear ? 2 : 1); - char *checkargs; - if ((checkargs = strchr (llname, '(')) != NULL) - *checkargs = 0; // clear so getlistener groks the input - if ((ll = getlistener (llname, NULL)) < 0) - abort (); - if (expectclear) - { - printf ("listener %s: check not called", llname); - fflush (stdout); - ddsrt_mutex_lock (&g_mutex); - printf (" cb_called %"PRIx32" %s\n", cb_called, (cb_called & lldesc[ll].mask) == 0 ? "ok" : "fail"); - CU_ASSERT_FATAL ((cb_called & lldesc[ll].mask) == 0); - ddsrt_mutex_unlock (&g_mutex); - } - else - { - bool signalled = true; - uint32_t status; - if ((tok = strsep_noempty (&cursor, sep)) == NULL || (ent = getentity (tok, &isbang, &ishash)) < 0) - abort (); - if (es.es[ent] == 0) - make_entity (&es, topicname, ent, isbang, ishash, qos, rwqos, NULL); - if ((size_t) ll >= sizeof (lldesc) / sizeof (*lldesc)) - abort (); - printf ("listener %s: check called for entity %"PRId32, llname, es.es[ent]); - fflush (stdout); - ddsrt_mutex_lock (&g_mutex); - while ((cb_called & lldesc[ll].mask) == 0 && signalled) - signalled = ddsrt_cond_waitfor (&g_cond, &g_mutex, DDS_SECS (5)); - printf (" cb_called %"PRIx32" (%s)", cb_called, (cb_called & lldesc[ll].mask) != 0 ? "ok" : "fail"); - fflush (stdout); - CU_ASSERT_FATAL ((cb_called & lldesc[ll].mask) != 0); - printf (" cb_entity %"PRId32" %"PRId32" (%s)", *lldesc[ll].cb_entity, es.es[ent], (*lldesc[ll].cb_entity == es.es[ent]) ? "ok" : "fail"); - fflush (stdout); - CU_ASSERT_FATAL (*lldesc[ll].cb_entity == es.es[ent]); - if (!(es.doms[0] && es.doms[1])) - { - // FIXME: two domains: listener invocation happens on another thread and we can observe non-0 "change" fields - // they get updated, listener gets invoked, then they get reset -- pretty sure it is allowed by the spec, but - // not quite elegant - assert_status_change_fields_are_0 (ll, es.es[ent]); - } - if (checkargs && lldesc[ll].cb_status) - { - *checkargs = '('; // restore ( so checkargs function gets a more sensible input - checkstatus (ll, &es, ent, checkargs, lldesc[ll].cb_status); - } - printf ("\n"); - cb_called &= ~lldesc[ll].mask; - ddsrt_mutex_unlock (&g_mutex); - ret = dds_get_status_changes (es.es[ent], &status); - CU_ASSERT_FATAL (ret == 0); - CU_ASSERT_FATAL ((status & lldesc[ll].mask) == 0); - } - } - else if (strcmp (tok, "sleep") == 0) - { - if ((tok = strsep_noempty (&cursor, sep)) == NULL) - abort (); - double d; int pos; - if (sscanf (tok, "%lf%n", &d, &pos) != 1 || tok[pos] != 0) - abort (); - printf ("sleep %fs\n", d); - dds_sleepfor ((dds_duration_t) (d * 1e9)); - } - else - { - printf ("tok '%s': unrecognized\n", tok); - abort (); - } - } - - dds_delete_listener (list); - dds_delete_qos (rwqos); - dds_delete_qos (qos); - // prevent any listeners from being invoked so we can safely delete the - // mutex and the condition variable -- must do this going down the - // hierarchy, or listeners may remain set through inheritance - for (size_t i = 0; i < sizeof (es.es) / sizeof (es.es[0]); i++) - { - if (es.es[i]) - { - ret = dds_set_listener (es.es[i], NULL); - CU_ASSERT_FATAL (ret == 0); - } - } - ddsrt_mutex_destroy (&g_mutex); - ddsrt_cond_destroy (&g_cond); - for (size_t i = 0; i < sizeof (es.doms) / sizeof (es.doms[0]); i++) - { - if (es.doms[i]) - { - ret = dds_delete (es.doms[i]); - CU_ASSERT_FATAL (ret == 0); - } - } - ddsrt_free (opscopy); - DDSRT_WARNING_MSVC_ON(4996); -} - -/************************************************** - **** **** - **** listener invocation checks **** - **** **** - **************************************************/ +#define dotest(ops) CU_ASSERT_FATAL (test_oneliner (ops) > 0) CU_Test (ddsc_listener, propagation) { @@ -1102,10 +270,10 @@ CU_Test (ddsc_listener, propagation) // for it on the reader should prevent that from happening dotest ("da dor lc sm P ; ?!dor ?!da ?!sm ?!lc"); // writing data should trigger data-available unless data-on-readers is set - dotest ("da lc sm P ; r ; wr 0 w ; ?da r ?sm r ?lc r"); - dotest ("da dor lc sm P ; r ; wr 0 w ; ?!da ; ?dor R ?sm r ?lc r"); + dotest ("da lc sm P ; r ; wr w 0 ; ?da r ?sm r ?lc r"); + dotest ("da dor lc sm P ; r ; wr w 0 ; ?!da ; ?dor R ?sm r ?lc r"); // setting listeners after entity creation should work, too - dotest ("P W R ; dor P pm W sm R ; r w ; ?sm r ?pm w ; wr 0 w ; ?dor R ; ?!da"); + dotest ("P W R ; dor P pm W sm R ; r w ; ?sm r ?pm w ; wr w 0 ; ?dor R ; ?!da"); } CU_Test (ddsc_listener, matched) @@ -1116,6 +284,47 @@ CU_Test (ddsc_listener, matched) // across the network it should work just as well (matching happens on different threads for // remote & local entity creation, so it is meaningfully different test) dotest ("sm r pm w' ?pm w' ?sm r"); + + // Disconnect + reconnect; "deaf P" means the disconnect is asymmetrical: P no longer observes P' + // but P' still observes P. If r did not ACK the data before losing connectivity, w' will hold + // the data and it will be re-delivered after reconnecting, depending on QoS settings (the "..." + // allows for extra samples) and whether the instance was taken or not + // + // the uncertainty also means we don't really know how many "data available" events there will be + // and the "sleep 0.3" simply gives it a bit more time after the first event + dotest ("sm da r pm w' ; ?sm r ?pm w' ;" + " wr w' 1 ; ?da r take{(1,0,0)} r sleep 1;" + " deaf P ; ?sm(1,0,0,-1,w') r ?da r take{d1} r ; wr w' 2 ;" + " hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{(2,0,0),...} r ; ?!pm"); + // same without taking the "dispose" after disconnect + // sample 1 will be delivered anew + dotest ("sm da r pm w' ; ?sm r ?pm w' ; wr w' 1 ; ?da r take{(1,0,0)} r ;" + " deaf P ; ?sm(1,0,0,-1,w') r ?da r ; wr w' 2 ;" + " hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{d1,(2,0,0)} r ; ?!pm"); + + // if a volatile writer loses the reader temporarily, the data won't show up + dotest ("sm da r pm w' ; ?sm r ?pm w' ; wr w' 1 ; ?da r read{(1,0,0)} r ;" + " deaf P' ; ?!sm ?!da ?pm(1,0,0,-1,r) w' ; wr w' 2 ;" + " hearing P' ; ?!sm ?pm(2,1,1,1,r) w' ?!da ; wr w' 3 ;" + " ?da r sleep 0.3 read{s(1,0,0),f(3,0,0)} r"); + // if a transient-local writer loses the reader temporarily, what data + // has been published during the disconnect must still show up; delete + // writer, &c. checks nothing else showed up afterward + // - first: durability service history depth 1: 2nd write of 2 pushes + // the 1st write of it out of the history and only 2 samples arrive + // - second: d.s. keep-all: both writes are kept and 3 samples arrive + dotest ("sm da r(d=tl) pm w'(d=tl,h=1,ds=0/1) ; ?sm r ?pm w' ;" + " wr w' 1 ; ?da r read{(1,0,0)} r ;" + " deaf P' ; ?pm(1,0,0,-1,r) w' ; wr w' 2 wr w' 2 ;" + " hearing P' ; ?pm(2,1,1,1,r) w' ; wr w' 3 ;" + " ?da(2) r read{s(1,0,0),f(2,0,0),f(3,0,0)} r ;" + " -w' ?sm r ?da r read(3,3) r"); + dotest ("sm da r(d=tl) pm w'(d=tl,h=1,ds=0/all) ; ?sm r ?pm w' ;" + " wr w' 1 ; ?da r read{(1,0,0)} r ;" + " deaf P' ; ?pm(1,0,0,-1,r) w' ; wr w' 2 wr w' 2 ;" + " hearing P' ; ?pm(2,1,1,1,r) w' ; wr w' 3 ;" + " ?da(3) r read{s(1,0,0),f(2,0,0),f(2,0,0),f(3,0,0)} r ;" + " -w' ?sm r ?da r read(4,3) r"); } CU_Test (ddsc_listener, publication_matched) @@ -1163,18 +372,20 @@ CU_Test (ddsc_listener, subscription_matched) CU_Test (ddsc_listener, incompatible_qos) { // best-effort writer & reliable reader: both must trigger incompatible QoS event - dotest ("oiq w! riq r ; ?oiq(1,1) w ?riq(1,1) r"); - dotest ("riq r oiq w! ; ?oiq(1,1) w ?riq(1,1) r"); + dotest ("oiq w(r=be) riq r ; ?oiq(1,1,r) w ?riq(1,1,r) r"); + dotest ("riq r oiq w(r=be) ; ?oiq(1,1,r) w ?riq(1,1,r) r"); + dotest ("oiq w(o=x) riq r ; ?oiq(1,1,o) w ?riq(1,1,o) r"); + dotest ("riq r oiq w(o=x) ; ?oiq(1,1,o) w ?riq(1,1,o) r"); } CU_Test (ddsc_listener, data_available) { // data available on reader - dotest ("da sm r pm w ?pm w ?sm r wr 0 w ?da r ?!dor"); + dotest ("da sm r pm w ?pm w ?sm r wr w 0 ?da r ?!dor"); // data available set on subscriber - dotest ("da R sm r pm w ?pm w ?sm r wr 0 w ?da r ?!dor"); + dotest ("da R sm r pm w ?pm w ?sm r wr w 0 ?da r ?!dor"); // data available set on participant - dotest ("da P sm r pm w ?pm w ?sm r wr 0 w ?da r ?!dor"); + dotest ("da P sm r pm w ?pm w ?sm r wr w 0 ?da r ?!dor"); } CU_Test (ddsc_listener, data_available_delete_writer) @@ -1185,53 +396,53 @@ CU_Test (ddsc_listener, data_available_delete_writer) dotest ("da sm r w ; -w ?sm r ?!da ; take(0,0) r"); // after writing: auto-dispose should always trigger data available, an invalid // sample needs to show up if there isn't an unread sample to use instead - dotest ("da r w ; wr 0 w ?da r ; -w ?da r ; take(1,0) r"); - dotest ("da r w ; wr 0 w ?da r ; read(1,0) r ; -w ?da r ; take(1,1) r"); - dotest ("da r w ; wr 0 w ?da r ; take(1,0) r ; -w ?da r ; take(0,1) r"); + dotest ("da r w ; wr w 0 ?da r ; -w ?da r ; take(1,0) r"); + dotest ("da r w ; wr w 0 ?da r ; read(1,0) r ; -w ?da r ; take(1,1) r"); + dotest ("da r w ; wr w 0 ?da r ; take(1,0) r ; -w ?da r ; take(0,1) r"); // same with two writers (no point in doing this also with two domains) dotest ("da r w x ; -w ?!da -x ?!da ; take(0,0) r"); - dotest ("da r w x ; wr 0 w ?da r ; -x ?!da ; -w ?da r ; take(1,0) r"); - dotest ("da r w x ; wr 0 w ?da r ; -w ?da r ; take(1,0) r ; -x ?!da ; take(0,0) r"); - dotest ("da r w x ; wr 0 w wr 0 x ?da r ; -w ?!da ; take(2,0) r ; -x ?da r ; take(0,1) r"); - dotest ("da r w x ; wr 0 w wr 0 x ?da r ; read(2,0) r ; -w ?!da -x ?da r ; take(2,1) r"); - dotest ("da r w x ; wr 0 w wr 0 x ?da r ; read(2,0) r ; -x ?!da -w ?da r ; take(2,1) r"); - dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; -w ?!da -x ?da r ; take(2,0) r"); - dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; -x ?!da -w ?da r ; take(2,0) r"); - dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; read(2,0) r ; -w ?!da -x ?da r ; take(2,1) r"); - dotest ("da r w x ; wr 0 w read(1,0) r ; wr 0 x ?da r ; read(2,0) r ; -x ?!da -w ?da r ; take(2,1) r"); - dotest ("da r w x ; wr 0 w wr 0 x ?da r ; take(2,0) r ; -w ?!da -x ?da r ; take(0,1) r"); - dotest ("da r w x ; wr 0 w wr 0 x ?da r ; take(2,0) r ; -x ?!da -w ?da r ; take(0,1) r"); + dotest ("da r w x ; wr w 0 ?da r ; -x ?!da ; -w ?da r ; take(1,0) r"); + dotest ("da r w x ; wr w 0 ?da r ; -w ?da r ; take(1,0) r ; -x ?!da ; take(0,0) r"); + dotest ("da r w x ; wr w 0 wr x 0 ?da r ; -w ?!da ; take(2,0) r ; -x ?da r ; take(0,1) r"); + dotest ("da r w x ; wr w 0 wr x 0 ?da r ; read(2,0) r ; -w ?!da -x ?da r ; take(2,1) r"); + dotest ("da r w x ; wr w 0 wr x 0 ?da r ; read(2,0) r ; -x ?!da -w ?da r ; take(2,1) r"); + dotest ("da r w x ; wr w 0 read(1,0) r ; wr x 0 ?da r ; -w ?!da -x ?da r ; take(2,0) r"); + dotest ("da r w x ; wr w 0 read(1,0) r ; wr x 0 ?da r ; -x ?!da -w ?da r ; take(2,0) r"); + dotest ("da r w x ; wr w 0 read(1,0) r ; wr x 0 ?da r ; read(2,0) r ; -w ?!da -x ?da r ; take(2,1) r"); + dotest ("da r w x ; wr w 0 read(1,0) r ; wr x 0 ?da r ; read(2,0) r ; -x ?!da -w ?da r ; take(2,1) r"); + dotest ("da r w x ; wr w 0 wr x 0 ?da r ; take(2,0) r ; -w ?!da -x ?da r ; take(0,1) r"); + dotest ("da r w x ; wr w 0 wr x 0 ?da r ; take(2,0) r ; -x ?!da -w ?da r ; take(0,1) r"); } CU_Test (ddsc_listener, data_available_delete_writer_disposed) { // same as data_available_delete_writer, but now with the instance disposed first - dotest ("da r w ; wr 0 w disp 0 w ?da r ; -w ?!da"); - dotest ("da r w ; wr 0 w disp 0 w ?da r ; read(1,0) r ; -w ?!da"); - dotest ("da r w ; wr 0 w disp 0 w ?da r ; take(1,0) r ; -w ?!da"); + dotest ("da r w ; wr w 0 disp w 0 ?da r ; -w ?!da"); + dotest ("da r w ; wr w 0 disp w 0 ?da r ; read(1,0) r ; -w ?!da"); + dotest ("da r w ; wr w 0 disp w 0 ?da r ; take(1,0) r ; -w ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 w ?da r ; read(1,1) r ; -w ?!da -x ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 w ?da r ; take(0,1) r ; -w ?!da -x ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 w ?da r ; read(1,1) r ; -x ?!da -w ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 w ?da r ; take(0,1) r ; -x ?!da -w ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; read(1,0) r ; disp w 0 ?da r ; read(1,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; take(1,0) r ; disp w 0 ?da r ; take(0,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; read(1,0) r ; disp w 0 ?da r ; read(1,1) r ; -x ?!da -w ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; take(1,0) r ; disp w 0 ?da r ; take(0,1) r ; -x ?!da -w ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 x ?da r ; read(1,1) r ; -w ?!da -x ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 x ?da r ; take(0,1) r ; -w ?!da -x ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; read(1,0) r ; disp 0 x ?da r ; read(1,1) r ; -x ?!da -w ?!da"); - dotest ("da r w x ; wr 0 w ?da r ; take(1,0) r ; disp 0 x ?da r ; take(0,1) r ; -x ?!da -w ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; read(1,0) r ; disp x 0 ?da r ; read(1,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; take(1,0) r ; disp x 0 ?da r ; take(0,1) r ; -w ?!da -x ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; read(1,0) r ; disp x 0 ?da r ; read(1,1) r ; -x ?!da -w ?!da"); + dotest ("da r w x ; wr w 0 ?da r ; take(1,0) r ; disp x 0 ?da r ; take(0,1) r ; -x ?!da -w ?!da"); } CU_Test (ddsc_listener, data_on_readers) { // data on readers wins from data available - dotest ("dor R da r ; wr 0 w ; ?dor R ?!da"); - dotest ("dor P da r ; wr 0 w ; ?dor R ?!da"); + dotest ("dor R da r ; wr w 0 ; ?dor R ?!da"); + dotest ("dor P da r ; wr w 0 ; ?dor R ?!da"); } CU_Test (ddsc_listener, sample_lost) { // FIXME: figure out what really constitutes a "lost sample" - dotest ("sl r ; wr@0 0 w ?!sl ; wr@-1 0 w ?sl(1,1) r"); + dotest ("sl r ; wr w 0@0 ?!sl ; wr w 0@-1 ?sl(1,1) r"); } CU_Test (ddsc_listener, sample_rejected) @@ -1240,13 +451,13 @@ CU_Test (ddsc_listener, sample_rejected) // reliable: expect timeout on the write when max samples has been reached // invalid samples don't count towards resource limits, so dispose should // not be blocked - dotest ("sr r# ; wr 0 w wrfail 0 w wrfail 0 w ; ?sr r"); - dotest ("sr r# ; wr 0 w wrfail 0 w ; read(1,0) r ; disp 0 w ; read(1,1) r ; ?sr r"); + dotest ("sr r(rl=1) ; wr w 0 wrfail w 0 wrfail w 0 ; ?sr r"); + dotest ("sr r(rl=1) ; wr w 0 wrfail w 0 ; read(1,0) r ; disp w 0 ; read(1,1) r ; ?sr r"); // best-effort: writes should succeed despite not delivering the data adding // the data in the RHC, also check number of samples rejected - dotest ("sr r#! ; wr 0 w! wr 0 w wr 0 w ; ?sr(2,1,s) r"); - dotest ("sr r#! ; wr 0 w! wr 0 w ; read(1,0) r ; disp 0 w ; read(1,1) r ; ?sr(1,1,s) r"); + dotest ("sr r(rl=1,r=be) ; wr w(r=be) 0 wr w 0 wr w 0 ; ?sr(2,1,s) r"); + dotest ("sr r(rl=1,r=be) ; wr w(r=be) 0 wr w 0 ; read(1,0) r ; disp w 0 ; read(1,1) r ; ?sr(1,1,s) r"); } CU_Test (ddsc_listener, liveliness_changed) diff --git a/src/core/ddsc/tests/test_oneliner.c b/src/core/ddsc/tests/test_oneliner.c new file mode 100644 index 0000000..255400a --- /dev/null +++ b/src/core/ddsc/tests/test_oneliner.c @@ -0,0 +1,1780 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include +#include +#include + +#include "dds/dds.h" +#include "dds/ddsrt/misc.h" +#include "dds/ddsrt/sync.h" +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/strtod.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/environ.h" + +#include "dds__types.h" +#include "dds__entity.h" +#include "dds/ddsi/q_lease.h" +#include "dds/ddsi/q_xevent.h" +#include "dds/ddsi/ddsi_entity_index.h" + +#include "test_common.h" +#include "test_oneliner.h" + +#define MAXDOMS (sizeof (((struct oneliner_ctx){.result=0}).doms) / sizeof (((struct oneliner_ctx){.result=0}).doms[0])) + +static const char knownentities[] = "PRWrstwxy"; +typedef struct { char n[MAXDOMS + 1]; } entname_t; + +#define DEFINE_STATUS_CALLBACK(name, NAME, kind) \ + static void name##_cb (dds_entity_t kind, const dds_##name##_status_t status, void *arg) \ + { \ + struct oneliner_cb *cb = arg; \ + ddsrt_mutex_lock (&cb->ctx->g_mutex); \ + cb->cb_##kind = kind; \ + cb->cb_##name##_status = status; \ + cb->cb_called[DDS_##NAME##_STATUS_ID]++; \ + ddsrt_cond_broadcast (&cb->ctx->g_cond); \ + ddsrt_mutex_unlock (&cb->ctx->g_mutex); \ + } + +DEFINE_STATUS_CALLBACK (inconsistent_topic, INCONSISTENT_TOPIC, topic) +DEFINE_STATUS_CALLBACK (liveliness_changed, LIVELINESS_CHANGED, reader) +DEFINE_STATUS_CALLBACK (liveliness_lost, LIVELINESS_LOST, writer) +DEFINE_STATUS_CALLBACK (offered_deadline_missed, OFFERED_DEADLINE_MISSED, writer) +DEFINE_STATUS_CALLBACK (offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS, writer) +DEFINE_STATUS_CALLBACK (publication_matched, PUBLICATION_MATCHED, writer) +DEFINE_STATUS_CALLBACK (requested_deadline_missed, REQUESTED_DEADLINE_MISSED, reader) +DEFINE_STATUS_CALLBACK (requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS, reader) +DEFINE_STATUS_CALLBACK (sample_lost, SAMPLE_LOST, reader) +DEFINE_STATUS_CALLBACK (sample_rejected, SAMPLE_REJECTED, reader) +DEFINE_STATUS_CALLBACK (subscription_matched, SUBSCRIPTION_MATCHED, reader) + +static void data_on_readers_cb (dds_entity_t subscriber, void *arg) +{ + struct oneliner_cb *cb = arg; + ddsrt_mutex_lock (&cb->ctx->g_mutex); + cb->cb_subscriber = subscriber; + cb->cb_called[DDS_DATA_ON_READERS_STATUS_ID]++; + ddsrt_cond_broadcast (&cb->ctx->g_cond); + ddsrt_mutex_unlock (&cb->ctx->g_mutex); +} + +static void data_available_cb (dds_entity_t reader, void *arg) +{ + struct oneliner_cb *cb = arg; + ddsrt_mutex_lock (&cb->ctx->g_mutex); + cb->cb_reader = reader; + cb->cb_called[DDS_DATA_AVAILABLE_STATUS_ID]++; + ddsrt_cond_broadcast (&cb->ctx->g_cond); + ddsrt_mutex_unlock (&cb->ctx->g_mutex); +} + +static void dummy_data_on_readers_cb (dds_entity_t subscriber, void *arg) +{ + (void)subscriber; + (void)arg; +} + +static void dummy_data_available_cb (dds_entity_t reader, void *arg) +{ + (void)reader; + (void)arg; +} + +static void dummy_subscription_matched_cb (dds_entity_t reader, const dds_subscription_matched_status_t status, void *arg) +{ + (void)reader; + (void)status; + (void)arg; +} + +static void dummy_liveliness_changed_cb (dds_entity_t reader, const dds_liveliness_changed_status_t status, void *arg) +{ + (void)reader; + (void)status; + (void)arg; +} + +static void dummy_cb (void) +{ + // Used as a listener function in checking merging of listeners, + // and for that purpose, casting it to whatever function type is + // required is ok. It is not supposed to ever be called. + abort (); +} + +#undef DEFINE_STATUS_CALLBACK + +// These had better match the corresponding type definitions! +// n uint32_t ...count +// c int32_t ...count_change +// I instance handle of a data instance +// P uint32_t QoS policy ID +// E instance handle of an entity +// R sample_rejected_status_kind +static const struct { + const char *name; + size_t size; // size of status struct + const char *desc; // description of status struct + dds_status_id_t id; // status id, entry in "cb_called" + size_t cb_entity_off; // which cb_... entity to look at + size_t cb_status_off; // cb_..._status to look at +} lldesc[] = { +#define S0(abbrev, NAME, entity) \ + { abbrev, 0, NULL, DDS_##NAME##_STATUS_ID, offsetof (struct oneliner_cb, cb_##entity), 0 } +#define S(abbrev, name, NAME, desc, entity) \ + { abbrev, sizeof (dds_##name##_status_t), desc, DDS_##NAME##_STATUS_ID, offsetof (struct oneliner_cb, cb_##entity), offsetof (struct oneliner_cb, cb_##name##_status) } + S0 ("da", DATA_AVAILABLE, reader), + S0 ("dor", DATA_ON_READERS, subscriber), + S ("it", inconsistent_topic, INCONSISTENT_TOPIC, "nc", topic), + S ("lc", liveliness_changed, LIVELINESS_CHANGED, "nnccE", reader), + S ("ll", liveliness_lost, LIVELINESS_LOST, "nc", writer), + S ("odm", offered_deadline_missed, OFFERED_DEADLINE_MISSED, "ncI", writer), + S ("oiq", offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS, "ncP", writer), + S ("pm", publication_matched, PUBLICATION_MATCHED, "ncncE", writer), + S ("rdm", requested_deadline_missed, REQUESTED_DEADLINE_MISSED, "ncI", reader), + S ("riq", requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS, "ncP", reader), + S ("sl", sample_lost, SAMPLE_LOST, "nc", reader), + S ("sr", sample_rejected, SAMPLE_REJECTED, "ncRI", reader), + S ("sm", subscription_matched, SUBSCRIPTION_MATCHED, "ncncE", reader) +#undef S +#undef S0 +}; + + +static const void *advance (const void *status, size_t *off, char code) +{ +#define alignof(type_) offsetof (struct { char c; type_ d; }, d) + size_t align = 1, size = 1; + switch (code) + { + case 'n': case 'c': case 'P': + align = alignof (uint32_t); size = sizeof (uint32_t); + break; + case 'E': case 'I': + align = alignof (dds_instance_handle_t); size = sizeof (dds_instance_handle_t); + break; + case 'R': + align = alignof (dds_sample_rejected_status_kind); size = sizeof (dds_sample_rejected_status_kind); + break; + default: + abort (); + } +#undef alignof + *off = (*off + align - 1) & ~(align - 1); + const void *p = (const char *) status + *off; + *off += size; + return p; +} + +static dds_return_t get_status (int ll, dds_entity_t ent, void *status) +{ + dds_return_t ret; + switch (ll) + { + case 2: ret = dds_get_inconsistent_topic_status (ent, status); break; + case 3: ret = dds_get_liveliness_changed_status (ent, status); break; + case 4: ret = dds_get_liveliness_lost_status (ent, status); break; + case 5: ret = dds_get_offered_deadline_missed_status (ent, status); break; + case 6: ret = dds_get_offered_incompatible_qos_status (ent, status); break; + case 7: ret = dds_get_publication_matched_status (ent, status); break; + case 8: ret = dds_get_requested_deadline_missed_status (ent, status); break; + case 9: ret = dds_get_requested_incompatible_qos_status (ent, status); break; + case 10: ret = dds_get_sample_lost_status (ent, status); break; + case 11: ret = dds_get_sample_rejected_status (ent, status); break; + case 12: ret = dds_get_subscription_matched_status (ent, status); break; + default: return -1; + } + return (ret == 0); +} + +static dds_return_t check_status_change_fields_are_0 (int ll, dds_entity_t ent) +{ + if (lldesc[ll].desc) + { + const char *d = lldesc[ll].desc; + void *status = malloc (lldesc[ll].size); + dds_return_t ret; + if ((ret = get_status (ll, ent, status)) <= 0) + { + free (status); + return ret; + } + size_t off = 0; + while (*d) + { + const uint32_t *p = advance (status, &off, *d); + if (*d == 'c' && *p != 0) + { + free (status); + return 0; + } + d++; + } + assert (off <= lldesc[ll].size); + free (status); + } + return 1; +} + +#define TOK_END -1 +#define TOK_NAME -2 +#define TOK_INT -3 +#define TOK_DURATION -4 +#define TOK_TIMESTAMP -5 +#define TOK_ELLIPSIS -6 +#define TOK_INVALID -7 + +static int setresult (struct oneliner_ctx *ctx, int result, const char *msg, ...) ddsrt_attribute_format((printf, 3, 4)); +static void error (struct oneliner_ctx *ctx, const char *msg, ...) ddsrt_attribute_format((printf, 2, 3)); +static void error_dds (struct oneliner_ctx *ctx, dds_return_t ret, const char *msg, ...) ddsrt_attribute_format((printf, 3, 4)); +static void testfail (struct oneliner_ctx *ctx, const char *msg, ...) ddsrt_attribute_format((printf, 2, 3)); + +static void vsetresult (struct oneliner_ctx *ctx, int result, const char *msg, va_list ap) +{ + assert (result <= 0); + ctx->result = result; + vsnprintf (ctx->msg, sizeof (ctx->msg), msg, ap); +} + +static int setresult (struct oneliner_ctx *ctx, int result, const char *msg, ...) +{ + va_list ap; + va_start (ap, msg); + vsetresult (ctx, result, msg, ap); + va_end (ap); + return result; +} + +static void error (struct oneliner_ctx *ctx, const char *msg, ...) +{ + va_list ap; + va_start (ap, msg); + vsetresult (ctx, -1, msg, ap); + va_end (ap); + longjmp (ctx->jb, 1); +} + +static void error_dds (struct oneliner_ctx *ctx, dds_return_t ret, const char *msg, ...) +{ + va_list ap; + va_start (ap, msg); + vsetresult (ctx, -1, msg, ap); + va_end (ap); + size_t n = strlen (ctx->msg); + if (n < sizeof (ctx->msg)) + snprintf (ctx->msg + n, sizeof (ctx->msg) - n, " (%s)", dds_strretcode (ret)); + longjmp (ctx->jb, 1); +} + +static void testfail (struct oneliner_ctx *ctx, const char *msg, ...) +{ + va_list ap; + va_start (ap, msg); + vsetresult (ctx, 0, msg, ap); + va_end (ap); + longjmp (ctx->jb, 1); +} + +static void advancetok (struct oneliner_lex *l) +{ + while (isspace ((unsigned char) *l->inp)) + l->inp++; +} + +static int issymchar0 (char c) +{ + return isalpha ((unsigned char) c) || c == '_'; +} + +static int issymchar (char c) +{ + return isalnum ((unsigned char) c) || c == '_' || c == '\''; +} + +static bool lookingatnum (const struct oneliner_lex *l) +{ + return (isdigit ((unsigned char) l->inp[(l->inp[0] == '-')])); +} + +static int nexttok_dur (struct oneliner_lex *l, union oneliner_tokval *v, bool expecting_duration) +{ + advancetok (l); + if (l->inp[0] == 0) + { + l->tok = TOK_END; + } + else if (strncmp (l->inp, "...", 3) == 0) + { + l->inp += 3; + l->tok = TOK_ELLIPSIS; + } + else if (!expecting_duration && lookingatnum (l)) + { + char *endp; + // strtol: [0-9]+ ; endp = l->inp if no digits present + l->v.i = (int) strtol (l->inp, &endp, 10); + l->inp = endp; + if (v) *v = l->v; + l->tok = TOK_INT; + } + else if (l->inp[0] == '@' || (expecting_duration && lookingatnum (l))) + { + const int ists = (l->inp[0] == '@'); + char *endp; + if (!ists && strncmp (l->inp + ists, "inf", 3) == 0 && !issymchar (l->inp[ists + 3])) + { + l->inp += ists + 3; + l->v.d = DDS_INFINITY; + } + else + { + double d; + if (ddsrt_strtod (l->inp + ists, &endp, &d) != DDS_RETCODE_OK) + return false; + if (!ists && d < 0) + return false; + if (d >= (double) (INT64_MAX / DDS_NSECS_IN_SEC)) + l->v.d = DDS_INFINITY; + else if (d >= 0) + l->v.d = (int64_t) (d * 1e9 + 0.5); + else + l->v.d = -(int64_t) (-d * 1e9 + 0.5); + if (ists) + l->v.d += l->tref; + l->inp = endp; + } + if (v) *v = l->v; + l->tok = ists ? TOK_TIMESTAMP : TOK_DURATION; + } + else if (issymchar0 (l->inp[0])) + { + int p = 0; + while (issymchar (l->inp[p])) + { + if (p == (int) sizeof (l->v.n)) + return TOK_INVALID; + l->v.n[p] = l->inp[p]; + p++; + } + l->v.n[p] = 0; + l->inp += p; + if (v) *v = l->v; + l->tok = TOK_NAME; + } + else + { + l->tok = *l->inp++; + } + return l->tok; +} + +static int nexttok (struct oneliner_lex *l, union oneliner_tokval *v) +{ + return nexttok_dur (l, v, false); +} + +static int peektok (const struct oneliner_lex *l, union oneliner_tokval *v) +{ + struct oneliner_lex l1 = *l; + return nexttok (&l1, v); +} + +static bool nexttok_if (struct oneliner_lex *l, int tok) +{ + if (peektok (l, NULL) != tok) + return false; + nexttok (l, NULL); + return true; +} + +static bool nexttok_int (struct oneliner_lex *l, int *dst) +{ + if (peektok (l, NULL) != TOK_INT) + return false; + (void) nexttok (l, NULL); + *dst = l->v.i; + return true; +} + +struct kvarg { + const char *k; + size_t klen; + int v; + bool (*arg) (struct oneliner_lex *l, void *dst); // *inp unchanged when false + void (*def) (void *dst); +}; + +static void def_kvarg_int0 (void *dst) { *(int *)dst = 0; } +static void def_kvarg_int1 (void *dst) { *(int *)dst = 1; } +static void def_kvarg_dur_inf (void *dst) { *(dds_duration_t *)dst = DDS_INFINITY; } +static void def_kvarg_dur_100ms (void *dst) { *(dds_duration_t *)dst = DDS_MSECS (100); } + +static bool read_kvarg_int (struct oneliner_lex *l, void *dst) +{ + return nexttok_int (l, dst); +} + +static bool read_kvarg_posint (struct oneliner_lex *l, void *dst) +{ + return nexttok_int (l, dst) && l->v.i > 0; +} + +static bool read_kvarg_dur (struct oneliner_lex *l, void *dst) +{ + dds_duration_t *x = dst; + struct oneliner_lex l1 = *l; + if (nexttok_dur (&l1, NULL, true) != TOK_DURATION) + return false; + *x = l1.v.d; + *l = l1; + return true; +} + +static bool read_kvarg_3len (struct oneliner_lex *l, void *dst) +{ + struct oneliner_lex l1 = *l; + int *x = dst, i = 0; + x[0] = x[1] = x[2] = DDS_LENGTH_UNLIMITED; + do { + if (!nexttok_int (&l1, &x[i]) || (x[i] <= 0 && x[i] != DDS_LENGTH_UNLIMITED)) + return false; + } while (++i < 3 && nexttok_if (&l1, '/')); + *l = l1; + return true; +} + +static bool read_kvarg (const struct kvarg *ks, size_t sizeof_ks, struct oneliner_lex *l, int *v, void *arg) +{ + // l points at name, *inp is , or ) terminated; *l unchanged when false + const struct kvarg *kend = ks + sizeof_ks / sizeof (*ks); + struct oneliner_lex l1 = *l; + advancetok (&l1); + for (const struct kvarg *k = ks; k < kend; k++) + { + assert (strlen (k->k) == k->klen); + *v = k->v; + if (k->klen == 0) + { + assert (k->arg != 0 && k->def == 0); + struct oneliner_lex l2 = l1; + if (k->arg (&l2, arg) && (peektok (&l2, NULL) == ',' || peektok (&l2, NULL) == ')')) + { + *l = l2; + return true; + } + } + else if (strncmp (l1.inp, k->k, k->klen) != 0) + { + continue; + } + else + { + /* skip symbol */ + struct oneliner_lex l2 = l1; + l2.inp += k->klen; + if (peektok (&l2, NULL) == ',' || peektok (&l2, NULL) == ')') + { + if (k->arg == 0 || k->def != 0) + { + if (k->def) k->def (arg); + *l = l2; + return true; + } + } + else if (k->arg != 0 && nexttok (&l2, NULL) == ':') + { + if (k->arg (&l2, arg) && (peektok (&l2, NULL) == ',' || peektok (&l2, NULL) == ')')) + { + *l = l2; + return true; + } + } + } + } + return false; +} + +static bool qos_durability (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg ks[] = { + { "v", 1, (int) DDS_DURABILITY_VOLATILE }, + { "tl", 2, (int) DDS_DURABILITY_TRANSIENT_LOCAL }, + { "t", 1, (int) DDS_DURABILITY_TRANSIENT }, + { "p", 1, (int) DDS_DURABILITY_PERSISTENT } + }; + int v; + if (!read_kvarg (ks, sizeof ks, l, &v, NULL)) + return false; + dds_qset_durability (q, (dds_durability_kind_t) v); + return true; +} + +static const struct kvarg ks_history[] = { + { "all", 3, (int) DDS_HISTORY_KEEP_ALL, .def = def_kvarg_int1 }, + { "", 0, (int) DDS_HISTORY_KEEP_LAST, .arg = read_kvarg_posint } +}; + +static bool qos_history (struct oneliner_lex *l, dds_qos_t *q) +{ + int v, x = 1; + if (!read_kvarg (ks_history, sizeof ks_history, l, &v, &x)) + return false; + dds_qset_history (q, (dds_history_kind_t) v, x); + return true; +} + +static bool qos_destination_order (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg ks[] = { + { "r", 1, (int) DDS_DESTINATIONORDER_BY_RECEPTION_TIMESTAMP }, + { "s", 1, (int) DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP } + }; + int v; + if (!read_kvarg (ks, sizeof ks, l, &v, NULL)) + return false; + dds_qset_destination_order (q, (dds_destination_order_kind_t) v); + return true; +} + +static bool qos_ownership (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg ks[] = { + { "s", 1, (int) DDS_OWNERSHIP_SHARED, .def = def_kvarg_int0 }, + { "x", 1, (int) DDS_OWNERSHIP_EXCLUSIVE, .arg = read_kvarg_int, .def = def_kvarg_int0 } + }; + int v, x; + if (!read_kvarg (ks, sizeof ks, l, &v, &x)) + return false; + dds_qset_ownership (q, (dds_ownership_kind_t) v); + dds_qset_ownership_strength (q, x); + return true; +} + +static bool qos_transport_priority (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg k = { "", 0, 0, .arg = read_kvarg_int }; + int v, x; + if (!read_kvarg (&k, sizeof k, l, &v, &x)) + return false; + dds_qset_transport_priority (q, x); + return true; +} + +static bool qos_reliability (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg ks[] = { + { "be", 2, (int) DDS_RELIABILITY_BEST_EFFORT, .def = def_kvarg_dur_100ms }, + { "r", 1, (int) DDS_RELIABILITY_RELIABLE, .def = def_kvarg_dur_100ms, .arg = read_kvarg_dur } + }; + int v; + dds_duration_t x; + if (!read_kvarg (ks, sizeof ks, l, &v, &x)) + return false; + dds_qset_reliability (q, (dds_reliability_kind_t) v, x); + return true; +} + +static bool qos_liveliness (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg ks[] = { + { "a", 1, (int) DDS_LIVELINESS_AUTOMATIC, .def = def_kvarg_dur_inf, .arg = read_kvarg_dur }, + { "p", 1, (int) DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, .arg = read_kvarg_dur }, + { "w", 1, (int) DDS_LIVELINESS_MANUAL_BY_TOPIC, .arg = read_kvarg_dur } + }; + int v; + dds_duration_t x; + if (!read_kvarg (ks, sizeof ks, l, &v, &x)) + return false; + dds_qset_liveliness (q, (dds_liveliness_kind_t) v, x); + return true; +} + +static bool qos_simple_duration (struct oneliner_lex *l, dds_qos_t *q, void (*set) (dds_qos_t * __restrict q, dds_duration_t dur)) +{ + static const struct kvarg k = { "", 0, 0, .arg = read_kvarg_dur }; + int v; + dds_duration_t x; + if (!read_kvarg (&k, sizeof k, l, &v, &x)) + return false; + set (q, x); + return true; +} + +static bool qos_latency_budget (struct oneliner_lex *l, dds_qos_t *q) +{ + return qos_simple_duration (l, q, dds_qset_latency_budget); +} + +static bool qos_deadline (struct oneliner_lex *l, dds_qos_t *q) +{ + return qos_simple_duration (l, q, dds_qset_deadline); +} + +static bool qos_lifespan (struct oneliner_lex *l, dds_qos_t *q) +{ + return qos_simple_duration (l, q, dds_qset_lifespan); +} + +static bool qos_resource_limits (struct oneliner_lex *l, dds_qos_t *q) +{ + int rl[3]; + if (!read_kvarg_3len (l, rl)) + return false; + dds_qset_resource_limits (q, rl[0], rl[1], rl[2]); + return true; +} + +static bool qos_durability_service (struct oneliner_lex *l, dds_qos_t *q) +{ + struct oneliner_lex l1 = *l; + dds_duration_t scd; + int hk = DDS_HISTORY_KEEP_LAST, hd = 1, rl[3]; + if (!read_kvarg_dur (&l1, &scd)) + return false; + if (peektok (&l1, NULL) == '/') + { + (void) nexttok (&l1, NULL); + if (!read_kvarg (ks_history, sizeof ks_history, &l1, &hk, &hd)) + return false; + } + if (peektok (&l1, NULL) != '/') + rl[0] = rl[1] = rl[2] = DDS_LENGTH_UNLIMITED; + else + { + (void) nexttok (&l1, NULL); + if (!read_kvarg_3len (&l1, rl)) + return false; + } + dds_qset_durability_service (q, scd, (dds_history_kind_t) hk, hd, rl[0], rl[1], rl[2]); + *l = l1; + return true; +} + +static bool qos_presentation (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg ks[] = { + { "i", 1, (int) DDS_PRESENTATION_INSTANCE, .def = def_kvarg_int0 }, + { "t", 1, (int) DDS_PRESENTATION_TOPIC, .def = def_kvarg_int1 }, + { "g", 1, (int) DDS_PRESENTATION_GROUP, .def = def_kvarg_int1 } + }; + int v, x; + if (!read_kvarg (ks, sizeof ks, l, &v, &x)) + return false; + dds_qset_presentation (q, (dds_presentation_access_scope_kind_t) v, x, 0); + return true; +} + +static bool qos_autodispose_unregistered_instances (struct oneliner_lex *l, dds_qos_t *q) +{ + static const struct kvarg ks[] = { + { "y", 1, 1 }, + { "n", 1, 0 } + }; + int v; + if (!read_kvarg (ks, sizeof ks, l, &v, NULL)) + return false; + dds_qset_writer_data_lifecycle (q, !!v); + return true; +} + +static const struct { + char *abbrev; + size_t n; + bool (*fn) (struct oneliner_lex *l, dds_qos_t *q); + dds_qos_policy_id_t id; +} qostab[] = { + { "ll", 2, qos_liveliness, DDS_LIVELINESS_QOS_POLICY_ID }, + { "d", 1, qos_durability, DDS_DURABILITY_QOS_POLICY_ID }, + { "dl", 2, qos_deadline, DDS_DEADLINE_QOS_POLICY_ID }, + { "h", 1, qos_history, DDS_HISTORY_QOS_POLICY_ID }, + { "lb", 2, qos_latency_budget, DDS_LATENCYBUDGET_QOS_POLICY_ID }, + { "ls", 2, qos_lifespan, DDS_LIFESPAN_QOS_POLICY_ID }, + { "do", 2, qos_destination_order, DDS_DESTINATIONORDER_QOS_POLICY_ID }, + { "o", 1, qos_ownership, DDS_OWNERSHIP_QOS_POLICY_ID }, + { "tp", 2, qos_transport_priority, DDS_OWNERSHIPSTRENGTH_QOS_POLICY_ID }, + { "p", 1, qos_presentation, DDS_PRESENTATION_QOS_POLICY_ID }, + { "r", 1, qos_reliability, DDS_RELIABILITY_QOS_POLICY_ID }, + { "rl", 2, qos_resource_limits, DDS_RESOURCELIMITS_QOS_POLICY_ID }, + { "ds", 2, qos_durability_service, DDS_DURABILITYSERVICE_QOS_POLICY_ID }, + { "ad", 2, qos_autodispose_unregistered_instances, DDS_WRITERDATALIFECYCLE_QOS_POLICY_ID } +}; + +static bool setqos (struct oneliner_lex *l, dds_qos_t *q) +{ + struct oneliner_lex l1 = *l; + dds_reset_qos (q); + // no whitespace between name & QoS + if (*l1.inp != '(') + return true; + nexttok (&l1, NULL); // eat '(' + do { + size_t i; + union oneliner_tokval name; + if (nexttok (&l1, &name) != TOK_NAME || nexttok (&l1, NULL) != '=') + return false; + for (i = 0; i < sizeof (qostab) / sizeof (qostab[0]); i++) + { + assert (strlen (qostab[i].abbrev) == qostab[i].n); + if (strcmp (name.n, qostab[i].abbrev) == 0) + break; + } + if (i == sizeof (qostab) / sizeof (qostab[0])) + return false; + if (!qostab[i].fn (&l1, q)) + return false; + } while (nexttok_if (&l1, ',')); + if (nexttok (&l1, NULL) != ')') + return false; + *l = l1; + return true; +} + +static int parse_entity1 (struct oneliner_lex *l, dds_qos_t *qos) +{ + struct oneliner_lex l1 = *l; + if (nexttok (&l1, NULL) != TOK_NAME) + return -1; + const char *p; + if ((p = strchr (knownentities, l1.v.n[0])) == NULL) + return -1; + int ent = (int) (p - knownentities); + int i; + for (i = 1; l1.v.n[i] == '\''; i++) + ent += (int) sizeof (knownentities) - 1; + if (l1.v.n[i] != 0) + return -1; + if (ent / 9 >= (int) MAXDOMS) + return -1; + if (!setqos (&l1, qos)) + return -1; + *l = l1; + return ent; +} + +static int parse_entity (struct oneliner_ctx *ctx) +{ + return parse_entity1 (&ctx->l, ctx->rwqos); +} + +static int parse_listener1 (struct oneliner_lex *l) +{ + struct oneliner_lex l1 = *l; + size_t i; + if (nexttok (&l1, NULL) != TOK_NAME) + return -1; + for (i = 0; i < sizeof (lldesc) / sizeof (lldesc[0]); i++) + if (strcmp (l1.v.n, lldesc[i].name) == 0) + break; + if (i == sizeof (lldesc) / sizeof (lldesc[0])) + return -1; + *l = l1; + return (int) i; +} + +static int parse_listener (struct oneliner_ctx *ctx) +{ + return parse_listener1 (&ctx->l); +} + +static const char *getentname (entname_t *name, int ent) +{ + DDSRT_STATIC_ASSERT (sizeof (knownentities) == 10); + DDSRT_STATIC_ASSERT (MAXDOMS == 3); + name->n[0] = knownentities[ent % 9]; + const int dom = ent / 9; + int i; + for (i = 1; i <= dom; i++) + name->n[i] = '\''; + name->n[i] = 0; + return name->n; +} + +static void make_participant (struct oneliner_ctx *ctx, int ent, dds_listener_t *list) +{ + const dds_domainid_t domid = (dds_domainid_t) (ent / 9); + char *conf = ddsrt_expand_envvars ("${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}0", domid); + entname_t name; + printf ("create domain %"PRIu32, domid); + fflush (stdout); + if ((ctx->doms[domid] = dds_create_domain (domid, conf)) <= 0) + error_dds (ctx, ctx->doms[domid], "make_participant: create domain %"PRIu32" failed", domid); + ddsrt_free (conf); + printf (" create participant %s", getentname (&name, ent)); + fflush (stdout); + if ((ctx->es[ent] = dds_create_participant (domid, NULL, list)) <= 0) + error_dds (ctx, ctx->es[ent], "make_participant: create participant failed in domain %"PRIu32, domid); + if ((ctx->tps[domid] = dds_create_topic (ctx->es[ent], &Space_Type1_desc, ctx->topicname, ctx->qos, NULL)) <= 0) + error_dds (ctx, ctx->tps[domid], "make_participant: create topic failed in domain %"PRIu32, domid); + + // Create the built-in topic readers with a dummy listener to avoid any event (data available comes to mind) + // from propagating to the normal data available listener, in case it has been set on the participant. + // + // - dummy_cb aborts when it is invoked, but all reader-related listeners that can possibly trigger are set + // separately (incompatible qos, deadline missed, sample lost and sample rejected are all impossible by + // construction) + // - regarding data_on_readers: Cyclone handles listeners installed on an ancestor by *inheriting* them, + // rather than by walking up ancestor chain. Setting data_on_readers on the reader therefore overrides the + // listener set on the subscriber. It is a nice feature! + dds_listener_t *dummylist = dds_create_listener (ctx); + dds_lset_data_available (dummylist, dummy_data_available_cb); + dds_lset_data_on_readers (dummylist, dummy_data_on_readers_cb); + dds_lset_inconsistent_topic (dummylist, (dds_on_inconsistent_topic_fn) dummy_cb); + dds_lset_liveliness_changed (dummylist, dummy_liveliness_changed_cb); + dds_lset_liveliness_lost (dummylist, (dds_on_liveliness_lost_fn) dummy_cb); + dds_lset_offered_deadline_missed (dummylist, (dds_on_offered_deadline_missed_fn) dummy_cb); + dds_lset_offered_incompatible_qos (dummylist, (dds_on_offered_incompatible_qos_fn) dummy_cb); + dds_lset_publication_matched (dummylist, (dds_on_publication_matched_fn) dummy_cb); + dds_lset_requested_deadline_missed (dummylist, (dds_on_requested_deadline_missed_fn) dummy_cb); + dds_lset_requested_incompatible_qos (dummylist, (dds_on_requested_incompatible_qos_fn) dummy_cb); + dds_lset_sample_lost (dummylist, (dds_on_sample_lost_fn) dummy_cb); + dds_lset_sample_rejected (dummylist, (dds_on_sample_rejected_fn) dummy_cb); + dds_lset_subscription_matched (dummylist, dummy_subscription_matched_cb); + if ((ctx->pubrd[domid] = dds_create_reader (ctx->es[ent], DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, dummylist)) <= 0) + error_dds (ctx, ctx->pubrd[domid], "make_participant: create DCPSPublication reader in domain %"PRIu32, domid); + if ((ctx->subrd[domid] = dds_create_reader (ctx->es[ent], DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, dummylist)) <= 0) + error_dds (ctx, ctx->subrd[domid], "make_participant: create DCPSSubscription reader in domain %"PRIu32, domid); + dds_delete_listener (dummylist); + //printf ("pubrd %"PRId32" subrd %"PRId32" sub %"PRId32"\n", es->pubrd[domid], es->subrd[domid], dds_get_parent (es->pubrd[domid])); +} + +static void make_entity1 (struct oneliner_ctx *ctx, int ent, dds_listener_t *list) +{ + entname_t wrname; + dds_return_t ret; + int domid = ent / 9; + int ent1 = ent % 9; + switch (ent1) + { + case 0: + make_participant (ctx, ent, list); + break; + case 1: + if (ctx->es[ent-1] == 0) + { + printf ("["); + make_entity1 (ctx, ent-1, NULL); + printf ("] "); + } + printf ("create subscriber %s", getentname (&wrname, ent)); + fflush (stdout); + ctx->es[ent] = dds_create_subscriber (ctx->es[ent-1], NULL, list); + break; + case 2: + if (ctx->es[ent-2] == 0) + { + printf ("["); + make_entity1 (ctx, ent-2, NULL); + printf ("] "); + } + printf ("create publisher %s", getentname (&wrname, ent)); + fflush (stdout); + ctx->es[ent] = dds_create_publisher (ctx->es[ent-2], NULL, list); + break; + case 3: case 4: case 5: + if (ctx->es[9*domid+1] == 0) + { + printf ("["); + make_entity1 (ctx, 9*domid+1, NULL); + printf ("] "); + } + printf ("create reader %s", getentname (&wrname, ent)); + fflush (stdout); + ctx->es[ent] = dds_create_reader (ctx->es[9*domid+1], ctx->tps[domid], ctx->rwqos, list); + break; + case 6: case 7: case 8: + if (ctx->es[9*domid+2] == 0) + { + printf ("["); + make_entity1 (ctx, 9*domid+2, NULL); + printf ("] "); + } + printf ("create writer %s", getentname (&wrname, ent)); + fflush (stdout); + ctx->es[ent] = dds_create_writer (ctx->es[9*domid+2], ctx->tps[domid], ctx->rwqos, list); + break; + default: + abort (); + } + printf (" = %"PRId32, ctx->es[ent]); + fflush (stdout); + if (ctx->es[ent] <= 0) + error_dds (ctx, ctx->es[ent], "create entity %d failed", ent); + if ((ret = dds_get_instance_handle (ctx->es[ent], &ctx->esi[ent])) != 0) + error_dds (ctx, ret, "get instance handle for entity %"PRId32" failed", ctx->es[ent]); + //printf (" %"PRIx64, es->esi[ent]); + //fflush (stdout); +} + +static void make_entity (struct oneliner_ctx *ctx, int ent, dds_listener_t *list) +{ + make_entity1 (ctx, ent, list); + printf ("\n"); +} + +static void setlistener (struct oneliner_ctx *ctx, struct oneliner_lex *l, int ll, int ent) +{ + printf ("set listener:"); + dds_return_t ret; + int dom = ent / 9; + dds_listener_t *list = ctx->cb[dom].list; + dds_reset_listener (list); + do { + printf (" %s", lldesc[ll].name); + switch (ll) + { + case 0: dds_lset_data_available (list, data_available_cb); break; + case 1: dds_lset_data_on_readers (list, data_on_readers_cb); break; + case 2: dds_lset_inconsistent_topic (list, inconsistent_topic_cb); break; + case 3: dds_lset_liveliness_changed (list, liveliness_changed_cb); break; + case 4: dds_lset_liveliness_lost (list, liveliness_lost_cb); break; + case 5: dds_lset_offered_deadline_missed (list, offered_deadline_missed_cb); break; + case 6: dds_lset_offered_incompatible_qos (list, offered_incompatible_qos_cb); break; + case 7: dds_lset_publication_matched (list, publication_matched_cb); break; + case 8: dds_lset_requested_deadline_missed (list, requested_deadline_missed_cb); break; + case 9: dds_lset_requested_incompatible_qos (list, requested_incompatible_qos_cb); break; + case 10: dds_lset_sample_lost (list, sample_lost_cb); break; + case 11: dds_lset_sample_rejected (list, sample_rejected_cb); break; + case 12: dds_lset_subscription_matched (list, subscription_matched_cb); break; + default: abort (); + } + } while (l && (ll = parse_listener1 (l)) >= 0); + if (ctx->es[ent] == 0) + { + printf (" for "); + make_entity (ctx, ent, list); + } + else + { + dds_listener_t *tmplist = dds_create_listener (&ctx->cb[dom]); + if ((ret = dds_get_listener (ctx->es[ent], tmplist)) != 0) + { + dds_delete_listener (tmplist); + error_dds (ctx, ret, "set listener: dds_get_listener failed on %"PRId32, ctx->es[ent]); + } + dds_merge_listener (list, tmplist); + dds_delete_listener (tmplist); + printf (" on entity %"PRId32"\n", ctx->es[ent]); + if ((ret = dds_set_listener (ctx->es[ent], list)) != 0) + error_dds (ctx, ret, "set listener: dds_set_listener failed on %"PRId32, ctx->es[ent]); + } +} + +static dds_instance_handle_t lookup_insthandle (const struct oneliner_ctx *ctx, int ent, int ent1) +{ + // if both are in the same domain, it's easy + if (ent / 9 == ent1 / 9) + return ctx->esi[ent1]; + else + { + // if they aren't ... find GUID from instance handle in the one domain, + // then find instance handle for GUID in the other + dds_entity_t rd1 = 0, rd2 = 0; + switch (ent1 % 9) + { + case 3: case 4: case 5: rd1 = ctx->subrd[ent1/9]; rd2 = ctx->subrd[ent/9]; break; + case 6: case 7: case 8: rd1 = ctx->pubrd[ent1/9]; rd2 = ctx->pubrd[ent/9]; break; + default: return 0; + } + + dds_builtintopic_endpoint_t keysample; + //printf ("(in %"PRId32" %"PRIx64" -> ", rd1, es->esi[ent1]); + //fflush (stdout); + if (dds_instance_get_key (rd1, ctx->esi[ent1], &keysample) != 0) + return 0; + // In principle, only key fields are set in sample returned by get_key; + // in the case of a built-in topic that is extended to the participant + // key. The qos and topic/type names should not be set, and there is no + // (therefore) memory allocated for the sample. + assert (keysample.qos == NULL); + assert (keysample.topic_name == NULL); + assert (keysample.type_name == NULL); + //for (size_t j = 0; j < sizeof (keysample.key.v); j++) + // printf ("%s%02x", (j > 0 && j % 4 == 0) ? ":" : "", keysample.key.v[j]); + const dds_instance_handle_t ih = dds_lookup_instance (rd2, &keysample); + //printf (" -> %"PRIx64")", ih); + //fflush (stdout); + return ih; + } +} + +static void print_timestamp (struct oneliner_ctx *ctx, dds_time_t ts) +{ + dds_time_t dt = ts - ctx->l.tref; + if ((dt % DDS_NSECS_IN_SEC) == 0) + printf ("@%"PRId64, dt / DDS_NSECS_IN_SEC); + else + { + unsigned frac = (unsigned) (dt % DDS_NSECS_IN_SEC); + int digs = 9; + while ((frac % 10) == 0) + { + digs--; + frac /= 10; + } + printf ("@%"PRId64".%0*u", dt / DDS_NSECS_IN_SEC, digs, frac); + } +} + +static bool parse_sample_value (struct oneliner_ctx *ctx, Space_Type1 *s, bool *valid_data, int def) +{ + s->long_1 = s->long_2 = s->long_3 = def; + if (nexttok (&ctx->l, NULL) == TOK_INT) // key value (invalid sample) + { + if (ctx->l.v.i < 0) + return false; + s->long_1 = ctx->l.v.i; + *valid_data = false; + return true; + } + else if (ctx->l.tok == '(') + { + if (nexttok (&ctx->l, NULL) != TOK_INT || ctx->l.v.i < 0) + return false; + s->long_1 = ctx->l.v.i; + if (nexttok (&ctx->l, NULL) != ',' || nexttok (&ctx->l, NULL) != TOK_INT || ctx->l.v.i < 0) + return false; + s->long_2 = ctx->l.v.i; + if (nexttok (&ctx->l, NULL) != ',' || nexttok (&ctx->l, NULL) != TOK_INT || ctx->l.v.i < 0) + return false; + s->long_3 = ctx->l.v.i; + *valid_data = true; + return nexttok (&ctx->l, NULL) == ')'; + } + else + { + return false; + } +} + +struct doreadlike_sample { + uint32_t state; + bool valid_data; + dds_time_t ts; + int wrent; + dds_instance_handle_t wrih; + Space_Type1 data; +}; + +static bool wrname_from_pubhandle (const struct oneliner_ctx *ctx, int ent, dds_instance_handle_t pubhandle, entname_t *wrname) +{ + dds_builtintopic_endpoint_t inf, inf1; + if (dds_instance_get_key (ctx->pubrd[ent/9], pubhandle, &inf) != 0) + return false; + for (int j = 0; j < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); j++) + { + for (int k = 6; k < 9; k++) + { + if (ctx->esi[9*j+k] != 0) + { + if (dds_instance_get_key (ctx->pubrd[j], ctx->esi[9*j+k], &inf1) != 0) + return false; + if (memcmp (&inf.key, &inf1.key, sizeof (inf.key)) == 0) + { + getentname (wrname, 9*j+k); + return true; + } + } + } + } + return false; +} + +static bool doreadlike_parse_sample (struct oneliner_ctx *ctx, struct doreadlike_sample *s) +{ + static const char *statechars = "fsaudno"; + static const uint32_t statemap[] = { + DDS_NOT_READ_SAMPLE_STATE, DDS_READ_SAMPLE_STATE, + DDS_ALIVE_INSTANCE_STATE, DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE, + DDS_NEW_VIEW_STATE, DDS_NOT_NEW_VIEW_STATE + }; + // syntax: [state]k[pubhandle][@ts] or [state](k,l,m)[pubhandle][@ts] + // the first is an invalid sample, the second a valid one, the third says anything goes + // state is a combination of: sample state (F,S fresh/stale), instance state (A,U,D), view state (N,O) + // unspecified: don't care + s->state = 0; + s->ts = -1; + s->wrent = -1; + s->wrih = 0; + struct oneliner_lex l1 = ctx->l; + if (nexttok_if (&ctx->l, TOK_NAME)) + { + char *inp1 = ctx->l.v.n; + char *p; + while (*inp1 && (p = strchr (statechars, *inp1)) != NULL) + { + s->state |= statemap[(int) (p - statechars)]; + inp1++; + } + if (*inp1 == 0) + ; + else if (!isdigit (*inp1)) + return false; + else // rewind input to digit + ctx->l.inp = l1.inp + (inp1 - ctx->l.v.n); + } + // missing states: allow everything + if ((s->state & (statemap[0] | statemap[1])) == 0) + s->state |= statemap[0] | statemap[1]; + if ((s->state & (statemap[2] | statemap[3] | statemap[4])) == 0) + s->state |= statemap[2] | statemap[3] | statemap[4]; + if ((s->state & (statemap[5] | statemap[6])) == 0) + s->state |= statemap[5] | statemap[6]; + if (!parse_sample_value (ctx, &s->data, &s->valid_data, -1)) + return false; + s->wrent = parse_entity1 (&ctx->l, NULL); + if (nexttok_if (&ctx->l, TOK_TIMESTAMP)) + s->ts = ctx->l.v.d; + return true; +} + +static bool doreadlike_ismatch (const dds_sample_info_t *si, const Space_Type1 *s, const struct doreadlike_sample *exp) +{ + return (si->valid_data == exp->valid_data && + (si->sample_state & exp->state) != 0 && + (si->instance_state & exp->state) != 0 && + (si->view_state & exp->state) != 0 && + (exp->data.long_1 < 0 || s->long_1 == exp->data.long_1) && + (!exp->valid_data || exp->data.long_2 < 0 || s->long_2 == exp->data.long_2) && + (!exp->valid_data || exp->data.long_3 < 0 || s->long_3 == exp->data.long_3) && + (exp->ts < 0 || si->source_timestamp == exp->ts) && + (exp->wrent < 0 || si->publication_handle == exp->wrih)); +} + +static bool doreadlike_matchstep (const dds_sample_info_t *si, const Space_Type1 *s, const struct doreadlike_sample *exp, int nexp, bool ellipsis, unsigned *tomatch, int *cursor, dds_instance_handle_t *lastih, int *matchidx) +{ + if (si->instance_handle != *lastih) + { + *lastih = si->instance_handle; + *cursor = -1; + for (int m = 0; m < nexp; m++) + { + if ((*tomatch & (1u << m)) && s->long_1 == exp[m].data.long_1) + { + *cursor = m; + break; + } + } + } + if (*cursor < 0 || *cursor >= nexp) + { + *matchidx = ellipsis ? nexp : -1; + return ellipsis; + } + else if (doreadlike_ismatch (si, s, &exp[*cursor])) + { + *matchidx = *cursor; + *tomatch &= ~(1u << *cursor); + (*cursor)++; + return true; + } + else if (ellipsis) + { + *matchidx = nexp; + return true; + } + else + { + *matchidx = -1; + return false; + } +} + +static void doreadlike (struct oneliner_ctx *ctx, const char *name, dds_return_t (*fn) (dds_entity_t, void **buf, dds_sample_info_t *, size_t, uint32_t)) +{ +#define MAXN 10 + struct doreadlike_sample exp[MAXN]; + int nexp = 0; + bool ellipsis = false; + int exp_nvalid = -1, exp_ninvalid = -1; + int ent; + switch (peektok (&ctx->l, NULL)) + { + default: // no expectations + ellipsis = true; + break; + case '(': // (# valid, # invalid) + nexttok (&ctx->l, NULL); + if (!(nexttok_int (&ctx->l, &exp_nvalid) && nexttok_if (&ctx->l, ',') && nexttok_int (&ctx->l, &exp_ninvalid) && nexttok_if (&ctx->l, ')'))) + error (ctx, "%s: expecting (NINVALID, NVALID)", name); + ellipsis = true; + break; + case '{': + nexttok (&ctx->l, NULL); + if (!nexttok_if (&ctx->l, '}')) + { + do { + if (nexttok_if (&ctx->l, TOK_ELLIPSIS)) { + ellipsis = true; break; + } else if (nexp == MAXN) { + error (ctx, "%s: too many samples specified", name); + } else if (!doreadlike_parse_sample (ctx, &exp[nexp++])) { + error (ctx, "%s: expecting sample", name); + } + } while (nexttok_if (&ctx->l, ',')); + if (!nexttok_if (&ctx->l, '}')) + error (ctx, "%s: expecting '}'", name); + } + break; + } + if ((ent = parse_entity1 (&ctx->l, NULL)) < 0) + error (ctx, "%s: entity required", name); + + for (int i = 0; i < nexp; i++) + { + if (exp[i].wrent >= 0 && (exp[i].wrih = lookup_insthandle (ctx, ent, exp[i].wrent)) == 0) + error (ctx, "%s: instance lookup failed", name); + } + + printf ("entity %"PRId32": %s: ", ctx->es[ent], (fn == dds_take) ? "take" : "read"); + fflush (stdout); + Space_Type1 data[MAXN]; + void *raw[MAXN]; + for (int i = 0; i < MAXN; i++) + raw[i] = &data[i]; + int matchidx[MAXN]; + dds_sample_info_t si[MAXN]; + DDSRT_STATIC_ASSERT (MAXN < CHAR_BIT * sizeof (unsigned)); + const uint32_t maxs = (uint32_t) (sizeof (raw) / sizeof (raw[0])); + const int32_t n = fn (ctx->es[ent], raw, si, maxs, maxs); + if (n < 0) + error_dds (ctx, n, "%s: failed on %"PRId32, name, ctx->es[ent]); + unsigned tomatch = (1u << nexp) - 1; // used to track result entries matched by spec + dds_instance_handle_t lastih = 0; + int cursor = -1; + int count[2] = { 0, 0 }; + bool matchok = true; + printf ("{"); + for (int i = 0; i < n; i++) + { + const Space_Type1 *s = raw[i]; + entname_t wrname; + count[si[i].valid_data]++; + printf ("%s%c%c%c", + (i > 0) ? "," : "", + (si[i].sample_state == DDS_NOT_READ_SAMPLE_STATE) ? 'f' : 's', + (si[i].instance_state == DDS_ALIVE_INSTANCE_STATE) ? 'a' : (si[i].instance_state == DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) ? 'u' : 'd', + (si[i].view_state == DDS_NEW_VIEW_STATE) ? 'n' : 'o'); + if (si[i].valid_data) + printf ("(%"PRId32",%"PRId32",%"PRId32")", s->long_1, s->long_2, s->long_3); + else + printf ("%"PRId32, s->long_1); + if (!wrname_from_pubhandle (ctx, ent, si[i].publication_handle, &wrname)) + error (ctx, "%s: unknown publication handle received", name); + printf ("%s", wrname.n); + print_timestamp (ctx, si[i].source_timestamp); + if (!doreadlike_matchstep (&si[i], s, exp, nexp, ellipsis, &tomatch, &cursor, &lastih, &matchidx[i])) + matchok = false; + } + printf ("}:"); + for (int i = 0; i < n; i++) + printf (" %d", matchidx[i]); + if (tomatch != 0) + { + printf (" (samples missing)"); + matchok = false; + } + printf (" valid %d %d invalid %d %d", count[1], exp_nvalid, count[0], exp_ninvalid); + if (exp_nvalid >= 0 && (count[1] != exp_nvalid)) + matchok = false; + if (exp_ninvalid >= 0 && (count[0] != exp_ninvalid)) + matchok = false; + printf ("\n"); + fflush (stdout); + if (!matchok) + testfail (ctx, "%s: mismatch between actual and expected set\n", name); +#undef MAXN +} + +static void dotake (struct oneliner_ctx *ctx) { doreadlike (ctx, "take", dds_take); } +static void doread (struct oneliner_ctx *ctx) { doreadlike (ctx, "read", dds_read); } + +static void dowritelike (struct oneliner_ctx *ctx, const char *name, bool fail, dds_return_t (*fn) (dds_entity_t wr, const void *sample, dds_time_t ts)) +{ + dds_return_t ret; + dds_time_t ts = dds_time (); + bool valid_data; + int ent; + Space_Type1 sample; + if ((ent = parse_entity (ctx)) < 0) + error (ctx, "%s: expecting entity", name); + if (ctx->es[ent] == 0) + make_entity (ctx, ent, NULL); + if (!parse_sample_value (ctx, &sample, &valid_data, 0)) + error (ctx, "%s: expecting sample value", name); + if (nexttok_if (&ctx->l, TOK_TIMESTAMP)) + ts = ctx->l.v.d; + printf ("entity %"PRId32": %s (%"PRId32",%"PRId32",%"PRId32")", ctx->es[ent], name, sample.long_1, sample.long_2, sample.long_3); + print_timestamp (ctx, ts); + printf ("\n"); + ret = fn (ctx->es[ent], &sample, ts); + if (!fail) + { + if (ret != 0) + error_dds (ctx, ret, "%s: failed", name); + } + else + { + if (ret == 0) + testfail (ctx, "%s: succeeded unexpectedly", name); + else if (ret != DDS_RETCODE_TIMEOUT) + error_dds (ctx, ret, "%s: failed", name); + } +} + +static void dowr (struct oneliner_ctx *ctx) { dowritelike (ctx, "wr", false, dds_write_ts); } +static void dowrfail (struct oneliner_ctx *ctx) { dowritelike (ctx, "wrfail", true, dds_write_ts); } +static void dowrdisp (struct oneliner_ctx *ctx) { dowritelike (ctx, "wrdisp", false, dds_writedispose_ts); } +static void dowrdispfail (struct oneliner_ctx *ctx) { dowritelike (ctx, "wrdispfail", true, dds_writedispose_ts); } +static void dodisp (struct oneliner_ctx *ctx) { dowritelike (ctx, "disp", false, dds_dispose_ts); } +static void dodispfail (struct oneliner_ctx *ctx) { dowritelike (ctx, "dispfail", true, dds_dispose_ts); } +static void dounreg (struct oneliner_ctx *ctx) { dowritelike (ctx, "unreg", false, dds_unregister_instance_ts); } +static void dounregfail (struct oneliner_ctx *ctx) { dowritelike (ctx, "unregfail", true, dds_unregister_instance_ts); } + +static int checkstatus (struct oneliner_ctx *ctx, int ll, int ent, struct oneliner_lex *argl, const void *status) +{ + assert (lldesc[ll].desc != NULL); + const char *d = lldesc[ll].desc; + int field = 0; + const char *sep = "("; + size_t off = 0; + if (nexttok (argl, NULL) != '(') + abort (); + while (*d) + { + const void *p = advance (status, &off, *d); + int i; + switch (*d) + { + case 'n': + if (!nexttok_int (argl, &i) || i < 0) + return setresult (ctx, -1, "checkstatus: field %d expecting non-negative integer", field); + printf ("%s%"PRIu32" %d", sep, *(uint32_t *)p, i); fflush (stdout); + if (*(uint32_t *)p != (uint32_t)i) + return setresult (ctx, 0, "checkstatus: field %d has actual %"PRIu32" expected %d", field, *(uint32_t *)p, i); + break; + case 'c': + if (!nexttok_int (argl, &i)) + return setresult (ctx, -1, "checkstatus: field %d expecting integer", field); + printf ("%s%"PRId32" %d", sep, *(int32_t *)p, i); fflush (stdout); + if (*(int32_t *)p != i) + return setresult (ctx, 0, "checkstatus: field %d has actual %"PRId32" expected %d", field, *(int32_t *)p, i); + break; + case 'P': + if (nexttok (argl, NULL) != TOK_NAME) + return setresult (ctx, -1, "checkstatus: field %d expecting policy name", field); + size_t polidx; + for (polidx = 0; polidx < sizeof (qostab) / sizeof (qostab[0]); polidx++) + if (strcmp (argl->v.n, qostab[polidx].abbrev) == 0) + break; + if (polidx == sizeof (qostab) / sizeof (qostab[0])) + return setresult (ctx, -1, "checkstatus: field %d expecting policy name", field); + printf ("%s%"PRIu32" %"PRIu32, sep, *(uint32_t *)p, (uint32_t) qostab[polidx].id); fflush (stdout); + if (*(uint32_t *)p != (uint32_t) qostab[polidx].id) + return setresult (ctx, 0, "checkstatus: field %d has actual %"PRIu32" expected %d", field, *(uint32_t *)p, (int) qostab[polidx].id); + break; + case 'R': + if (nexttok (argl, NULL) != TOK_NAME) + return setresult (ctx, -1, "checkstatus: field %d expecting reason", field); + if (strcmp (argl->v.n, "i") == 0) i = (int) DDS_REJECTED_BY_INSTANCES_LIMIT; + else if (strcmp (argl->v.n, "s") == 0) i = (int) DDS_REJECTED_BY_SAMPLES_LIMIT; + else if (strcmp (argl->v.n, "spi") == 0) i = (int) DDS_REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT; + else return setresult (ctx, -1, "checkstatus: field %d expecting reason", field); + printf ("%s%d %d", sep, (int) *(dds_sample_rejected_status_kind *)p, i); fflush (stdout); + if (*(dds_sample_rejected_status_kind *)p != (dds_sample_rejected_status_kind) i) + return setresult (ctx, 0, "checkstatus: field %d has actual %d expected %d", field, (int) (*(dds_sample_rejected_status_kind *)p), i); + break; + case 'I': // instance handle is too complicated + break; + case 'E': { + int ent1 = -1; + dds_instance_handle_t esi1 = 0; + if (nexttok_if (argl, '*')) + ent1 = -1; + else if ((ent1 = parse_entity1 (argl, NULL)) < 0) + return setresult (ctx, -1, "checkstatus: field %d expecting * or entity name", field); + else if ((esi1 = lookup_insthandle (ctx, ent, ent1)) == 0) + return setresult (ctx, -1, "checkstatus: field %d instance handle lookup failed", field); + printf ("%s%"PRIx64" %"PRIx64, sep, *(dds_instance_handle_t *)p, esi1); fflush (stdout); + if (ent1 >= 0 && *(dds_instance_handle_t *)p != esi1) + return setresult (ctx, 0, "checkstatus: field %d has actual %"PRIx64" expected %"PRIx64, field, *(dds_instance_handle_t *)p, esi1); + break; + } + default: + return DDS_RETCODE_BAD_PARAMETER; + } + sep = ", "; + if (*d != 'I') + field++; + ++d; + if (*d && *d != 'I' && !nexttok_if (argl, ',')) + return setresult (ctx, -1, "checkstatus: field %d expecting ','", field); + } + printf (")"); + if (!nexttok_if (argl, ')')) + return setresult (ctx, -1, "checkstatus: field %d expecting ')'", field); + assert (off <= lldesc[ll].size); + return 1; +} + +static void checklistener (struct oneliner_ctx *ctx, int ll, int ent, struct oneliner_lex *argl) +{ + bool signalled = true; + uint32_t min_cnt = 1, max_cnt = UINT32_MAX; + uint32_t status; + const int dom = ent / 9; + dds_return_t ret; + printf ("listener %s: check called for entity %"PRId32, lldesc[ll].name, ctx->es[ent]); + fflush (stdout); + if (argl && lldesc[ll].cb_status_off == 0) + { + // those that don't have a status can check the number of invocations + int cnt = -1; + if (!(nexttok_if (argl, '(') && nexttok_int (argl, &cnt) && nexttok_if (argl, ')'))) + error (ctx, "listener %s: expecting (COUNT)", lldesc[ll].name); + if (cnt < 0) + error (ctx, "listener %s: invocation count must be at least 0", lldesc[ll].name); + min_cnt = max_cnt = (uint32_t) cnt; + } + ddsrt_mutex_lock (&ctx->g_mutex); + bool cnt_ok = (ctx->cb[dom].cb_called[lldesc[ll].id] >= min_cnt && ctx->cb[dom].cb_called[lldesc[ll].id] <= max_cnt); + while (ctx->cb[dom].cb_called[lldesc[ll].id] < min_cnt && signalled) + { + signalled = ddsrt_cond_waitfor (&ctx->g_cond, &ctx->g_mutex, DDS_SECS (5)); + cnt_ok = (ctx->cb[dom].cb_called[lldesc[ll].id] >= min_cnt && ctx->cb[dom].cb_called[lldesc[ll].id] <= max_cnt); + } + printf (" cb_called %"PRIu32" (%s)", ctx->cb[dom].cb_called[lldesc[ll].id], cnt_ok ? "ok" : "fail"); + fflush (stdout); + if (!cnt_ok) + { + ddsrt_mutex_unlock (&ctx->g_mutex); + testfail (ctx, "listener %s: not invoked [%"PRIu32",%"PRIu32"] times", lldesc[ll].name, min_cnt, max_cnt); + } + dds_entity_t * const cb_entity = (dds_entity_t *) ((char *) &ctx->cb[dom] + lldesc[ll].cb_entity_off); + printf (" cb_entity %"PRId32" %"PRId32" (%s)", *cb_entity, ctx->es[ent], (*cb_entity == ctx->es[ent]) ? "ok" : "fail"); + fflush (stdout); + if (*cb_entity != ctx->es[ent]) + { + ddsrt_mutex_unlock (&ctx->g_mutex); + testfail (ctx, "listener %s: invoked on %"PRId32" instead of %"PRId32, lldesc[ll].name, *cb_entity, ctx->es[ent]); + } + if (!(ctx->doms[0] && ctx->doms[1])) + { + // FIXME: two domains: listener invocation happens on another thread and we can observe non-0 "change" fields + // they get updated, listener gets invoked, then they get reset -- pretty sure it is allowed by the spec, but + // not quite elegant + if ((ret = check_status_change_fields_are_0 (ll, ctx->es[ent])) <= 0) + { + ddsrt_mutex_unlock (&ctx->g_mutex); + if (ret == 0) + testfail (ctx, "listener %s: status contains non-zero change fields", lldesc[ll].name); + else if (ret < 0) + error_dds (ctx, ret, "listener %s: get entity status failed", lldesc[ll].name); + } + } + if (argl && lldesc[ll].cb_status_off != 0) + { + void *cb_status = (char *) &ctx->cb[dom] + lldesc[ll].cb_status_off; + if (checkstatus (ctx, ll, ent, argl, cb_status) <= 0) + { + ddsrt_mutex_unlock (&ctx->g_mutex); + longjmp (ctx->jb, 1); + } + } + printf ("\n"); + ctx->cb[dom].cb_called[lldesc[ll].id] = 0; + ddsrt_mutex_unlock (&ctx->g_mutex); + if ((ret = dds_get_status_changes (ctx->es[ent], &status)) != 0) + error_dds (ctx, ret, "listener %s: dds_get_status_change on %"PRId32, lldesc[ll].name, ctx->es[ent]); + if ((status & (1u << lldesc[ll].id)) != 0) + testfail (ctx, "listener %s: status mask not cleared", lldesc[ll].name); +} + +static void dochecklistener (struct oneliner_ctx *ctx) +{ + const bool expectclear = nexttok_if (&ctx->l, '!'); + const int ll = parse_listener (ctx); + if (ll < 0) + error (ctx, "check listener: requires listener name"); + else if (expectclear) + { + printf ("listener %s: check not called", lldesc[ll].name); + fflush (stdout); + ddsrt_mutex_lock (&ctx->g_mutex); + bool ret = true; + for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++) + { + printf (" cb_called %"PRIu32" %s\n", ctx->cb[i].cb_called[lldesc[ll].id], ctx->cb[i].cb_called[lldesc[ll].id] == 0 ? "ok" : "fail"); + if (ctx->cb[i].cb_called[lldesc[ll].id] != 0) + ret = false; + } + ddsrt_mutex_unlock (&ctx->g_mutex); + if (!ret) + testfail (ctx, "callback %s invoked unexpectedly", lldesc[ll].name); + } + else + { + struct oneliner_lex l1 = ctx->l; + // no whitespace between name and args + const bool have_args = (*ctx->l.inp == '('); + if (have_args) + { + // skip args: we need the entity before we can interpret them + int tok; + while ((tok = nexttok (&ctx->l, NULL)) != EOF && tok != ')') + ; + } + const int ent = parse_entity (ctx); + if (ent < 0) + error (ctx, "check listener: requires an entity"); + if (ctx->es[ent] == 0) + setlistener (ctx, NULL, ll, ent); + checklistener (ctx, ll, ent, have_args ? &l1 : NULL); + } +} + +static void dodelete (struct oneliner_ctx *ctx) +{ + dds_return_t ret; + int ent; + if ((ent = parse_entity (ctx)) < 0) + error (ctx, "delete: requires entity"); + if ((ret = dds_delete (ctx->es[ent])) != 0) + error_dds (ctx, ret, "delete: failed on %"PRId32, ctx->es[ent]); + ctx->es[ent] = 0; +} + +static void dodeaf (struct oneliner_ctx *ctx) +{ + dds_return_t ret; + entname_t name; + int ent; + if ((ent = parse_entity (ctx)) < 0 || (ent % 9) != 0) + error (ctx, "deaf: requires participant"); + printf ("deaf: %s\n", getentname (&name, ent)); + if ((ret = dds_domain_set_deafmute (ctx->es[ent], true, false, DDS_INFINITY)) != 0) + error_dds (ctx, ret, "deaf: dds_domain_set_deafmute failed on %"PRId32, ctx->es[ent]); + // speed up the process by forcing lease expiry + dds_entity *x, *xprime; + if ((ret = dds_entity_pin (ctx->es[ent], &x)) < 0) + error_dds (ctx, ret, "deaf: pin participant failed %"PRId32, ctx->es[ent]); + for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++) + { + if (i == ent / 9 || ctx->es[9*i] == 0) + continue; + if ((ret = dds_entity_pin (ctx->es[9*i], &xprime)) < 0) + { + dds_entity_unpin (x); + error_dds (ctx, ret, "deaf: pin counterpart participant failed %"PRId32, ctx->es[9*i]); + } + thread_state_awake (lookup_thread_state (), &x->m_domain->gv); + delete_proxy_participant_by_guid (&x->m_domain->gv, &xprime->m_guid, ddsrt_time_wallclock (), true); + thread_state_asleep (lookup_thread_state ()); + dds_entity_unpin (xprime); + } + dds_entity_unpin (x); +} + +static void dohearing (struct oneliner_ctx *ctx) +{ + dds_return_t ret; + entname_t name; + int ent; + if ((ent = parse_entity (ctx)) < 0 || (ent % 9) != 0) + error (ctx, "hearing: requires participant"); + printf ("hearing: %s\n", getentname (&name, ent)); + if ((ret = dds_domain_set_deafmute (ctx->es[ent], false, false, DDS_INFINITY)) != 0) + error_dds (ctx, ret, "hearing: dds_domain_set_deafmute failed %"PRId32, ctx->es[ent]); + // speed up the process by forcing SPDP publication on the remote + for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++) + { + if (i == ent / 9 || ctx->es[9*i] == 0) + continue; + dds_entity *xprime; + struct participant *pp; + if ((ret = dds_entity_pin (ctx->es[9*i], &xprime)) < 0) + error_dds (ctx, ret, "hearing: pin counterpart participant failed %"PRId32, ctx->es[9*i]); + thread_state_awake (lookup_thread_state (), &xprime->m_domain->gv); + if ((pp = entidx_lookup_participant_guid (xprime->m_domain->gv.entity_index, &xprime->m_guid)) != NULL) + resched_xevent_if_earlier (pp->spdp_xevent, ddsrt_mtime_add_duration (ddsrt_time_monotonic (), DDS_MSECS (100))); + thread_state_asleep (lookup_thread_state ()); + dds_entity_unpin (xprime); + } +} + +static void dosleep (struct oneliner_ctx *ctx) +{ + if (nexttok_dur (&ctx->l, NULL, true) != TOK_DURATION) + error (ctx, "sleep: invalid duration"); + dds_sleepfor (ctx->l.v.d); +} + +static void dispatchcmd (struct oneliner_ctx *ctx) +{ + static const struct { + const char *name; + void (*fn) (struct oneliner_ctx *ct); + } cs[] = { + { "-", dodelete }, + { "?", dochecklistener }, + { "wr", dowr }, + { "wrdisp", dowrdisp }, + { "disp", dodisp }, + { "unreg", dounreg }, + { "wrfail", dowrfail }, + { "wrdispfail", dowrdispfail }, + { "dispfail", dodispfail }, + { "unregfail", dounregfail }, + { "take", dotake }, + { "read", doread }, + { "deaf", dodeaf }, + { "hearing", dohearing }, + { "sleep", dosleep } + }; + size_t i; + if (ctx->l.tok > 0) + { + // convert single-character token to string + ctx->l.v.n[0] = (char) ctx->l.tok; + ctx->l.v.n[1] = 0; + } + for (i = 0; i < sizeof (cs) / sizeof (cs[0]); i++) + if (strcmp (ctx->l.v.n, cs[i].name) == 0) + break; + if (i == sizeof (cs) / sizeof (cs[0])) + error (ctx, "%s: unknown command", ctx->l.v.n); + cs[i].fn (ctx); +} + +static void dosetlistener (struct oneliner_ctx *ctx, int ll) +{ + int ent; + struct oneliner_lex l1 = ctx->l; + // scan past listener names to get at the entity, which we need + // to get the right listener object (and hence argument) + while (parse_listener1 (&ctx->l) >= 0) + ; + if ((ent = parse_entity (ctx)) < 0) + error (ctx, "set listener: entity required"); + setlistener (ctx, &l1, ll, ent); +} + +static void test_oneliner_step1 (struct oneliner_ctx *ctx) +{ + while (peektok (&ctx->l, NULL) != TOK_END) + { + int ent, ll; + if (nexttok_if (&ctx->l, ';')) + ; // skip ;s + else if ((ent = parse_entity (ctx)) >= 0) + make_entity (ctx, ent, NULL); + else if ((ll = parse_listener (ctx)) >= 0) + dosetlistener (ctx, ll); + else if (nexttok (&ctx->l, NULL) == TOK_NAME || ctx->l.tok > 0) + dispatchcmd (ctx); + else + error (ctx, "unexpected token %d", ctx->l.tok); + } +} + +void test_oneliner_init (struct oneliner_ctx *ctx) +{ + dds_qos_t *qos = dds_create_qos (); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS (100)); + dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + + *ctx = (struct oneliner_ctx) { + .l = { .tref = dds_time () }, + .qos = qos, + .rwqos = dds_create_qos (), + .result = 1, + .cb = { + [0] = { .ctx = ctx, .list = dds_create_listener (&ctx->cb[0]) }, + [1] = { .ctx = ctx, .list = dds_create_listener (&ctx->cb[1]) }, + [2] = { .ctx = ctx, .list = dds_create_listener (&ctx->cb[2]) } + } + }; + + ddsrt_mutex_init (&ctx->g_mutex); + ddsrt_cond_init (&ctx->g_cond); + + create_unique_topic_name ("ddsc_listener_test", ctx->topicname, sizeof (ctx->topicname)); +} + +int test_oneliner_step (struct oneliner_ctx *ctx, const char *ops) +{ + if (ctx->result > 0 && setjmp (ctx->jb) == 0) + { + ctx->l.inp = ops; + test_oneliner_step1 (ctx); + } + return ctx->result; +} + +const char *test_oneliner_message (const struct oneliner_ctx *ctx) +{ + return ctx->msg; +} + +int test_oneliner_fini (struct oneliner_ctx *ctx) +{ + for (size_t i = 0; i < sizeof (ctx->cb) / sizeof (ctx->cb[0]); i++) + dds_delete_listener (ctx->cb[i].list); + dds_delete_qos (ctx->rwqos); + dds_delete_qos ((dds_qos_t *) ctx->qos); + // prevent any listeners from being invoked so we can safely delete the + // mutex and the condition variable -- must do this going down the + // hierarchy, or listeners may remain set through inheritance + dds_return_t ret; + for (size_t i = 0; i < sizeof (ctx->es) / sizeof (ctx->es[0]); i++) + if (ctx->es[i] && (ret = dds_set_listener (ctx->es[i], NULL)) != 0) + setresult (ctx, ret, "terminate: reset listener failed on %"PRId32, ctx->es[i]); + if (ctx->result == 0) + { + printf ("\n"); + for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++) + { + for (int j = 3; j <= 5; j++) + { + if (ctx->es[9*i + j]) + { + const char *inp_orig = ctx->l.inp; + entname_t n; + ctx->l.inp = getentname (&n, 9*i + j); + doreadlike (ctx, "read", dds_read); + ctx->l.inp = inp_orig; + } + } + } + } + ddsrt_mutex_destroy (&ctx->g_mutex); + ddsrt_cond_destroy (&ctx->g_cond); + for (size_t i = 0; i < sizeof (ctx->doms) / sizeof (ctx->doms[0]); i++) + if (ctx->doms[i] && (ret = dds_delete (ctx->doms[i])) != 0) + setresult (ctx, ret, "terminate: delete domain on %"PRId32, ctx->doms[i]); + return ctx->result; +} + +int test_oneliner (const char *ops) +{ + struct oneliner_ctx ctx; + printf ("dotest: %s\n", ops); + test_oneliner_init (&ctx); + test_oneliner_step (&ctx, ops); + if (test_oneliner_fini (&ctx) <= 0) + fprintf (stderr, "FAIL: %s\n", test_oneliner_message (&ctx)); + return ctx.result; +} diff --git a/src/core/ddsc/tests/test_oneliner.h b/src/core/ddsc/tests/test_oneliner.h new file mode 100644 index 0000000..1a8b75e --- /dev/null +++ b/src/core/ddsc/tests/test_oneliner.h @@ -0,0 +1,326 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef _TEST_ONELINER_H_ +#define _TEST_ONELINER_H_ + +#include +#include + +#include "dds/dds.h" +#include "dds/ddsrt/sync.h" + +/** @brief run a "test" consisting of a sequence of simplish operations + * + * This operation takes a test description, really a program in a bizarre syntax, and + * executes it. Any failures, be it because of error codes coming out of the Cyclone + * calls or expected values being wrong cause it to fail the test via CU_ASSERT_FATAL. + * While it is doing this, it outputs the test steps to stdout including some actual + * values. An invalid program is mostly reported by calling abort(). It is geared towards + * checking for listener invocations and the effects on statuses. + * + * Entities in play: + * + * - participants: P P' P'' + * - subscribers: R R' R'' + * - publishers: W W' W'' + * - readers: r s t r' s' t' r'' s'' t'' + * - writers: w x y w' x' y' w'' x'' y'' + * + * The unprimed ones exist in domain 0, the primed ones in domain 1 (but configured such + * that it talks to domain 0), and the double-primed ones in domain 2 (again configured such + * that it talks to domain 0) so that network-related listener invocations can be checked + * as well. + * + * The first mention of an entity creates it as well as its ancestors. Implicitly created + * ancestors always have standard QoS and have no listeners. There is one topic that is + * created implicitly when the participant is created. + * + * Standard QoS is: default + reliable (100ms), by-source-timestamp, keep-all. The QoS of + * a reader/writer can be overridden at the first mention of it (i.e., when it is created) + * by appending a list of QoS overrides between parentheses. + * + * A program consists of a sequence of operations separated by whitespace, ';' or '/' + * (there is no meaning to the separators, they exist to allow visual grouping): + * + * PROGRAM ::= (OP (\s+|;)*)* + * + * OP ::= (LISTENER)* ENTITY-NAME[(QOS[,QOS[,QOS...]])] + * + * If entity ENTITY-NAME does not exist: + * creates the entity with the given listeners installed + * QOS can be used to override the standard QoS + * else + * changes the entity's listeners to the specified ones + * (see above for the valid ENTITY-NAMEs) + * + * | -ENTITY-NAME + * + * Deletes the specified entity + * + * | WRITE-LIKE[fail] ENTITY-NAME K[@DT] + * | WRITE-LIKE[fail] ENTITY-NAME (K,X,Y)[@DT] + * + * Writes/disposes/unregisters (K,0,0) (first form) or (K,X,Y). If + * "fail" is appended, the expectation is that it fails with a + * timeout, if @DT is appended, the timestamp is the start time of + * the test +
s rather than the current time; DT is a + * floating-point number + * + * | READ-LIKE ENTITY-NAME + * | READ-LIKE(A,B) ENTITY-NAME + * | READ-LIKE{[S1[,S2[,S3...]][,...]} ENTITY-NAME + * + * Reads/takes at most 10 samples. The second form counts the + * number of valid and invalid samples seen and checks them against + * A and B. + * + * In the third form, the exact result set is given by the sample + * Si, which is a comma-separated list of samples: + * + * [STATE]K[ENTITY-NAME][@DT] + * [STATE](K,X,Y)[ENTITY-NAME][@DT] + * + * The first form is an invalid sample with only the (integer) key + * value K, the second form also specifies the two (integer) + * attribute fields. + * + * STATE specifies allowed sample (f - not-read (fresh), s - read + * (stale)), instance (a - alive, u - no-writers (unregistered) d - + * disposed) and view states (n - new, o - old). If no sample state + * is specified, all sample states are allowed, &c. + * + * ENTITY-NAME is the name of the publishing writer expected in the + * publication_handle. Not specifying a writer means any writer is + * ok. DT is the timestamp in the same manner as the write-like + * operations. Not specifying a timestamp means any timestamp is + * ok. + * + * If the expected set ends up with "..." there may be other samples + * in the result as well. + * + * | ?LISTENER[(ARGS)] ENTITY-NAME + * + * Waits until the specified listener has been invoked on using a flag set by the listener function, resets the flag + * and verifies that neither the entity status bit nor the "change" + * fields in the various statuses were set. + * + * ARGS is used to check the status argument most recently passed to + * the listener: + * + * da(A) verifies that it has been invoked A times + * dor(A) see da + * it(A,B) verifies count and change match A and B, policy + * matches RELIABILITY + * lc(A,B,C,D,E) verifies that alive and not-alive counts match A + * and B, that alive and not-alive changes match C and D + * and that the last handle matches E if an entity name + * (ignored if E = "*") + * ll (A,B) verifies count and change match A and B + * odm (A,B) verifies count and change match A and B, last handle + * is ignored + * oiq (A,B,C) verifies that total count and change match A and B + * and that the mismatching QoS is C (using the same + * abbreviations as used for defining QoS on entity + * creation) + * pm (A,B,C,D,E) verifies that total count and change match A and + * B, that current count and change match C and D and + * that the last handle matches E if an entity name + * (ignored if E = "*") + * rdm see odm + * riq see oiq + * sl (A,B) verifies that total count and change match A and B + * sr (A,B,C) verifies total count and change match A and B, and + * that the reason matches C (one of "s" for samples, + * "i" for instances, "spi" for samples per instance) + * sm see pm + * + * | ?!LISTENER + * + * (Not listener) tests that LISTENER has not been invoked since + * last reset + * + * | sleep D + * + * Delay program execution for D s (D is a floating-point number) + * + * | deaf ENTITY-NAME + * | hearing ENTITY-NAME + * + * Makes the domain wherein the specified entity exists deaf, + * respectively restoring hearing. The entity must be either P or + * P' and both must exist. Plays some tricks to speed up lease + * expiry and reconnection (like forcibly deleting a proxy + * participant or triggering the publication of SPDP packets). + * + * WRITE-LIKE ::= wr write + * | wrdisp write-dispose + * | disp dispose + * | unreg unregister + * + * READ-LIKE ::= read dds_read (so any state) + * | take dds_take (so any state) + * + * LISTENER ::= da data available (acts on a reader) + * | dor data on readers (acts on a subcsriber) + * | it incompatible topic (acts on a topic) + * | lc liveliness changed (acts on a reader) + * | ll liveliness lost (acts on a writer) + * | odm offered deadline missed (acts on a writer) + * | oiq offered incompatible QoS (acts on a writer) + * | pm publication matched (acts on a writer) + * | rdm requested deadline missed (acts on a reader) + * | riq requested incompatible QoS (acts on a reader) + * | sl sample lost (acts on a reader) + * | sr sample rejected (acts on a reader) + * | sm subscription matched (acts on a reader) + * + * QOS ::= ad={y|n} auto-dispose unregistered instances + * | d={v|tl|t|p} durability + * | dl={inf|DT} deadline (infinite or DT seconds) + * | ds=DT/H/RL durability service: cleanup delay, history, + * resource limits + * | do={r|s} by-reception or by-source destination order + * | h={N|all} history keep-last-N or keep-all + * | lb={inf|DT} latency budget + * | ll={a[:DT]|p:DT|w:DT} liveliness (automatic, manual by + * participant, manual by topic) + * | ls={inf|DT} lifespan + * | o={s|x[:N]} ownership shared or exclusive (strength N) + * | p={i|t|g} presentation: instance, coherent-topic or + * coherent-group + * | r={be|r[:DT]} best-effort or reliable (with max blocking time) + * | rl=N[/N[/N]] resource limits (sample, instances, samples per + * instance; "inf" is allowed, ommitted ones are + * unlimited) + * | tp=N transport-priority + * | ud=... user data (with escape sequences and hex/octal + * input allowed) + * + * All entities share the listeners with their global state. Only the latest invocation is visible. + * + * @param[in] ops Program to execute. + * + * @return > 0 success, 0 failure, < 0 invalid input + */ +int test_oneliner (const char *ops); + +union oneliner_tokval { + int i; + int64_t d; + char n[32]; +}; + +struct oneliner_lex { + const char *inp; + dds_time_t tref; + int tok; + union oneliner_tokval v; +}; + +struct oneliner_ctx; + +struct oneliner_cb { + struct oneliner_ctx *ctx; + dds_listener_t *list; + uint32_t cb_called[DDS_STATUS_ID_MAX + 1]; + dds_entity_t cb_topic, cb_writer, cb_reader, cb_subscriber; + dds_inconsistent_topic_status_t cb_inconsistent_topic_status; + dds_liveliness_changed_status_t cb_liveliness_changed_status; + dds_liveliness_lost_status_t cb_liveliness_lost_status; + dds_offered_deadline_missed_status_t cb_offered_deadline_missed_status; + dds_offered_incompatible_qos_status_t cb_offered_incompatible_qos_status; + dds_publication_matched_status_t cb_publication_matched_status; + dds_requested_deadline_missed_status_t cb_requested_deadline_missed_status; + dds_requested_incompatible_qos_status_t cb_requested_incompatible_qos_status; + dds_sample_lost_status_t cb_sample_lost_status; + dds_sample_rejected_status_t cb_sample_rejected_status; + dds_subscription_matched_status_t cb_subscription_matched_status; +}; + +struct oneliner_ctx { + struct oneliner_lex l; + + dds_entity_t es[3 * 9]; + dds_entity_t tps[3]; + dds_entity_t doms[3]; + dds_instance_handle_t esi[3 * 9]; + // built-in topic readers for cross-referencing instance handles + dds_entity_t pubrd[3]; + dds_entity_t subrd[3]; + // topic name used for data + char topicname[100]; + + const dds_qos_t *qos; + dds_qos_t *rwqos; + + int result; + char msg[256]; + + jmp_buf jb; + + ddsrt_mutex_t g_mutex; + ddsrt_cond_t g_cond; + struct oneliner_cb cb[3]; +}; + +/** @brief Initialize a "oneliner test" context + * + * @param[out] ctx context to initialize + */ +void test_oneliner_init (struct oneliner_ctx *ctx); + +/** @brief Run a sequence of operations in an initialized context + * + * If the context indicates a preceding step has failed, this is a + * no-op and the previous result is propagated to the return value. + * + * @param[in,out] ctx context to operate in + * @param[in] ops sequence of operations to execute (@ref test_oneliner) + * + * @return integer indicating success or failure + * + * @retval 1 success + * @retval 0 test failure + * @retval <0 syntax error unexpected error + */ +int test_oneliner_step (struct oneliner_ctx *ctx, const char *ops); + +/** @brief Get a pointer to the error message from a "oneliner test" + * + * If a preceding step has failed, this returns a pointer to a message + * containing some information about the failure. If no error + * occurred, the message is meaningless. + * + * + * @param[in] ctx context to retrieve message from + * + * @return pointer to null-terminated string aliasing a string in ctx + */ +const char *test_oneliner_message (const struct oneliner_ctx *ctx); + +/** @brief Deinitialize a "oneliner test" context + * + * This releases all resources used by the context. + * + * @param[in,out] ctx context to operate in + * + * @return integer indicating success or failure in any of the + * preceding steps. If no steps were taken, the result is success. + * + * @retval 1 success + * @retval 0 test failure + * @retval <0 syntax error unexpected error + */ +int test_oneliner_fini (struct oneliner_ctx *ctx); + +#endif diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 9d2765e..14f59f8 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -696,7 +696,7 @@ int writer_set_notalive (struct writer *wr, bool notify); #define CF_PROXYPP_NO_SPDP (1 << 2) void new_proxy_participant (struct ddsi_domaingv *gv, const struct ddsi_guid *guid, uint32_t bes, const struct ddsi_guid *privileged_pp_guid, struct addrset *as_default, struct addrset *as_meta, const struct ddsi_plist *plist, dds_duration_t tlease_dur, nn_vendorid_t vendor, unsigned custom_flags, ddsrt_wctime_t timestamp, seqno_t seq); -int delete_proxy_participant_by_guid (struct ddsi_domaingv *gv, const struct ddsi_guid *guid, ddsrt_wctime_t timestamp, int isimplicit); +DDS_EXPORT int delete_proxy_participant_by_guid (struct ddsi_domaingv *gv, const struct ddsi_guid *guid, ddsrt_wctime_t timestamp, int isimplicit); int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, seqno_t seq, const struct ddsi_plist *datap, ddsrt_wctime_t timestamp); int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t seq, const struct ddsi_plist *datap, ddsrt_wctime_t timestamp);