diff --git a/src/mpt/tests/qos/procs/ppud.c b/src/mpt/tests/qos/procs/ppud.c index 9978778..31c8da4 100644 --- a/src/mpt/tests/qos/procs/ppud.c +++ b/src/mpt/tests/qos/procs/ppud.c @@ -28,25 +28,28 @@ void ppud_init (void) { } void ppud_fini (void) { } -static const char *exp_ud[] = { - "a", "bc", "def", "" -}; - MPT_ProcessEntry (ppud, MPT_Args (dds_domainid_t domainid, - bool active, + bool master, unsigned ncycles)) { +#define prefix "ppuserdata:" + static const char *exp_ud[] = { + prefix "a", prefix "bc", prefix "def", prefix "" + }; + const char *expud = master ? prefix "X" : prefix; + size_t expusz = strlen (expud); dds_entity_t dp, rd, ws; dds_instance_handle_t dpih; dds_return_t rc; dds_qos_t *qos; int id = (int) ddsrt_getpid (); - printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid); + printf ("=== [Check(%d)] master=%d ncycles=%u Start(%d) ...\n", id, master, ncycles, (int) domainid); qos = dds_create_qos (); dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + dds_qset_userdata (qos, expud, expusz); dp = dds_create_participant (domainid, qos, NULL); MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); rc = dds_get_instance_handle (dp, &dpih); @@ -61,7 +64,7 @@ MPT_ProcessEntry (ppud, MPT_ASSERT_FATAL_EQ (rc, 0, "Could not attach reader to waitset: %s\n", dds_strretcode (rc)); bool done = false; - bool first = true; + bool synced = !master; unsigned exp_index = 0; unsigned exp_cycle = 0; while (!done) @@ -84,83 +87,88 @@ MPT_ProcessEntry (ppud, void *ud = NULL; size_t usz = 0; if (!dds_qget_userdata (sample->qos, &ud, &usz)) - printf ("%d: user data not set in QoS\n", id); - if (first && usz == 0) + MPT_ASSERT (0, "%d: user data not set in QoS\n", id); + if (ud == NULL || strncmp (ud, prefix, sizeof (prefix) - 1) != 0) { - dds_qset_userdata (qos, "X", 1); - rc = dds_set_qos (dp, qos); - MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + /* presumably another process */ + } + else if (!synced && strcmp (ud, expud) != 0) + { + /* slave hasn't discovered us yet */ + } + else if (synced && master && strcmp (ud, prefix "X") == 0 && exp_index == 1 && exp_cycle == 0) + { + /* FIXME: don't want no stutter of the initial sample ... */ } else { - const char *exp = exp_ud[exp_index]; - if (first && strcmp (ud, "X") == 0) - exp = "X"; - const size_t expsz = strlen (exp); - bool eq = (usz == expsz && (usz == 0 || memcmp (ud, exp, usz) == 0)); - //printf ("%d: expected %u %zu/%s received %zu/%s\n", - // id, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); - MPT_ASSERT (eq, "User data mismatch: expected %u %zu/%s received %zu/%s\n", - exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); - if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0])) + synced = true; + if (master) { - exp_index = 0; - exp_cycle++; - } + bool eq = (usz == expusz && (usz == 0 || memcmp (ud, expud, usz) == 0)); + printf ("%d: expected %u %zu/%s received %zu/%s\n", + id, exp_index, expusz, expud, usz, ud ? (char *) ud : "(null)"); + MPT_ASSERT (eq, "user data mismatch: expected %u %zu/%s received %zu/%s\n", + exp_index, expusz, expud ? expud : "(null)", usz, ud ? (char *) ud : "(null)"); + if (++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0])) + { + exp_index = 0; + exp_cycle++; + } - if (active && exp_cycle == ncycles) - done = true; + if (exp_cycle == ncycles) + done = true; + + expud = exp_ud[exp_index]; + expusz = strlen (expud); + } else { - const void *newud; - size_t newusz; - if (!active) - { - /* Set user data to the same value in response */ - newud = ud; newusz = usz; - dds_qset_userdata (qos, ud, usz); - } - else /* Set next agreed value */ - { - newud = exp_ud[exp_index]; newusz = strlen (exp_ud[exp_index]); - dds_qset_userdata (qos, newud, newusz); - } - - rc = dds_set_qos (dp, qos); - MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); - - dds_qos_t *chk = dds_create_qos (); - rc = dds_get_qos (dp, chk); - MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); - - void *chkud = NULL; - size_t chkusz = 0; - if (!dds_qget_userdata (chk, &chkud, &chkusz)) - MPT_ASSERT (0, "Check QoS: no user data present\n"); - MPT_ASSERT (chkusz == newusz && (newusz == 0 || memcmp (chkud, newud, newusz) == 0), - "Retrieved user data differs from user data just set (%zu/%s vs %zu/%s)\n", - chkusz, chkud ? (char *) chkud : "(null)", newusz, newud ? (char *) newud : "(null)"); - dds_free (chkud); - dds_delete_qos (chk); - first = false; + printf ("%d: slave: received %zu/%s\n", id, usz, ud ? (char *) ud : "(null)"); + expud = ud; + expusz = usz; } + + dds_qset_userdata (qos, expud, expusz); + rc = dds_set_qos (dp, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + + dds_qos_t *chk = dds_create_qos (); + rc = dds_get_qos (dp, chk); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); + + void *chkud = NULL; + size_t chkusz = 0; + if (!dds_qget_userdata (chk, &chkud, &chkusz)) + MPT_ASSERT (0, "Check QoS: no user data present\n"); + MPT_ASSERT (chkusz == expusz && (expusz == 0 || memcmp (chkud, expud, expusz) == 0), + "Retrieved user data differs from user data just set (%zu/%s vs %zu/%s)\n", + chkusz, chkud ? (char *) chkud : "(null)", expusz, expud ? (char *) expud : "(null)"); + dds_free (chkud); + dds_delete_qos (chk); } dds_free (ud); } } MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n)); dds_return_loan (rd, &raw, 1); + fflush (stdout); } dds_delete_qos (qos); rc = dds_delete (dp); MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n"); printf ("=== [Check(%d)] Done\n", id); +#undef prefix } +static const char *exp_rwud[] = { + "a", "bc", "def", "" +}; + MPT_ProcessEntry (rwud, MPT_Args (dds_domainid_t domainid, const char *topic_name, - bool active, + bool master, unsigned ncycles, enum rwud which)) { @@ -168,12 +176,15 @@ MPT_ProcessEntry (rwud, void (*qset) (dds_qos_t * __restrict qos, const void *value, size_t sz) = 0; const char *qname = "UNDEFINED"; - dds_entity_t dp, tp, ep, rdep, qent = 0, ws; + dds_entity_t dp, tp, ep, grp, rdep, qent = 0, ws; dds_return_t rc; dds_qos_t *qos; int id = (int) ddsrt_getpid (); - printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid); + const char *expud = master ? "X" : ""; + size_t expusz = strlen (expud); + + printf ("=== [Check(%d)] master=%d ncycles=%u Start(%d) ...\n", id, master, ncycles, (int) domainid); qos = dds_create_qos (); dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); @@ -181,18 +192,22 @@ MPT_ProcessEntry (rwud, MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL); MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp)); - if (active) + if (master) { rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, qos, NULL); MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSSubscription reader: %s\n", dds_strretcode (rdep)); - ep = dds_create_writer (dp, tp, qos, NULL); + grp = dds_create_publisher (dp, qos, NULL); + MPT_ASSERT_FATAL_GT (grp, 0, "Could not create publisher: %s\n", dds_strretcode (grp)); + ep = dds_create_writer (grp, tp, qos, NULL); MPT_ASSERT_FATAL_GT (ep, 0, "Could not create writer: %s\n", dds_strretcode (ep)); } else { rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, qos, NULL); MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSPublication reader: %s\n", dds_strretcode (rdep)); - ep = dds_create_reader (dp, tp, qos, NULL); + grp = dds_create_subscriber (dp, qos, NULL); + MPT_ASSERT_FATAL_GT (grp, 0, "Could not create subscriber: %s\n", dds_strretcode (grp)); + ep = dds_create_reader (grp, tp, qos, NULL); MPT_ASSERT_FATAL_GT (ep, 0, "Could not create reader: %s\n", dds_strretcode (ep)); } rc = dds_set_status_mask (rdep, DDS_DATA_AVAILABLE_STATUS); @@ -214,7 +229,7 @@ MPT_ProcessEntry (rwud, qget = dds_qget_groupdata; qset = dds_qset_groupdata; qname = "group data"; - qent = dds_get_parent (ep); + qent = grp; MPT_ASSERT_FATAL_GT (qent, 0, "Could not get pub/sub from wr/rd: %s\n", dds_strretcode (qent)); break; case RWUD_TOPICDATA: @@ -225,10 +240,18 @@ MPT_ProcessEntry (rwud, break; } + if (master) + { + qset (qos, expud, expusz); + rc = dds_set_qos (qent, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + } + bool done = false; - bool first = true; + bool synced = !master; unsigned exp_index = 0; unsigned exp_cycle = 0; + dds_instance_handle_t peer = 0; while (!done) { rc = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY); @@ -240,81 +263,76 @@ MPT_ProcessEntry (rwud, while ((n = dds_take (rdep, &raw, &si, 1, 1)) == 1) { const dds_builtintopic_endpoint_t *sample = raw; - if (si.instance_state != DDS_IST_ALIVE) + if (si.instance_state != DDS_IST_ALIVE && si.instance_handle == peer) done = true; - else if (!si.valid_data || strcmp (sample->topic_name, topic_name) != 0) + else if (!si.valid_data) + continue; + else if ((peer && si.instance_handle != peer) || strcmp (sample->topic_name, topic_name) != 0) continue; else { void *ud = NULL; size_t usz = 0; if (!qget (sample->qos, &ud, &usz)) - printf ("%d: group data not set in QoS\n", id); - if (first && usz == 0) + MPT_ASSERT (0, "%d: %s not set in QoS\n", id, qname); + else if (!synced && (ud == NULL || strcmp (ud, expud) != 0)) { - qset (qos, "X", 1); - rc = dds_set_qos (qent, qos); - MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + /* slave hasn't discovered us yet */ } else { - const char *exp = exp_ud[exp_index]; - if (first && strcmp (ud, "X") == 0) - exp = "X"; - const size_t expsz = first ? 1 : strlen (exp); - bool eq = (usz == expsz && (usz == 0 || memcmp (ud, exp, usz) == 0)); - //printf ("%d: expected %u %zu/%s received %zu/%s\n", - // id, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); - MPT_ASSERT (eq, "%s mismatch: expected %u %zu/%s received %zu/%s\n", - qname, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); - if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0])) + peer = si.instance_handle; + synced = true; + if (master) { - exp_index = 0; - exp_cycle++; - } + bool eq = (usz == expusz && (usz == 0 || memcmp (ud, expud, usz) == 0)); + printf ("%d: expected %u %zu/%s received %zu/%s\n", + id, exp_index, expusz, expud, usz, ud ? (char *) ud : "(null)"); + MPT_ASSERT (eq, "%s mismatch: expected %u %zu/%s received %zu/%s\n", + qname, exp_index, expusz, expud ? expud : "(null)", usz, ud ? (char *) ud : "(null)"); + if (++exp_index == sizeof (exp_rwud) / sizeof (exp_rwud[0])) + { + exp_index = 0; + exp_cycle++; + } - if (active && exp_cycle == ncycles) - done = true; + if (exp_cycle == ncycles) + done = true; + + expud = exp_rwud[exp_index]; + expusz = strlen (expud); + } else { - const void *newud; - size_t newusz; - if (!active) - { - /* Set group data to the same value in response */ - newud = ud; newusz = usz; - qset (qos, ud, usz); - } - else /* Set next agreed value */ - { - newud = exp_ud[exp_index]; newusz = strlen (exp_ud[exp_index]); - qset (qos, newud, newusz); - } - - rc = dds_set_qos (qent, qos); - MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); - - dds_qos_t *chk = dds_create_qos (); - rc = dds_get_qos (ep, chk); - MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); - - void *chkud = NULL; - size_t chkusz = 0; - if (!qget (chk, &chkud, &chkusz)) - MPT_ASSERT (0, "Check QoS: no %s present\n", qname); - MPT_ASSERT (chkusz == newusz && (newusz == 0 || memcmp (chkud, newud, newusz) == 0), - "Retrieved %s differs from group data just set (%zu/%s vs %zu/%s)\n", qname, - chkusz, chkud ? (char *) chkud : "(null)", newusz, newud ? (char *) newud : "(null)"); - dds_free (chkud); - dds_delete_qos (chk); - first = false; + printf ("%d: slave: received %zu/%s\n", id, usz, ud ? (char *) ud : "(null)"); + expud = ud; + expusz = usz; } + + qset (qos, expud, expusz); + rc = dds_set_qos (qent, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + + dds_qos_t *chk = dds_create_qos (); + rc = dds_get_qos (ep, chk); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); + + void *chkud = NULL; + size_t chkusz = 0; + if (!qget (chk, &chkud, &chkusz)) + MPT_ASSERT (0, "Check QoS: no %s present\n", qname); + MPT_ASSERT (chkusz == expusz && (expusz == 0 || memcmp (chkud, expud, expusz) == 0), + "Retrieved %s differs from group data just set (%zu/%s vs %zu/%s)\n", qname, + chkusz, chkud ? (char *) chkud : "(null)", expusz, expud ? (char *) expud : "(null)"); + dds_free (chkud); + dds_delete_qos (chk); } dds_free (ud); } } MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n)); dds_return_loan (rdep, &raw, 1); + fflush (stdout); } dds_delete_qos (qos); rc = dds_delete (dp); diff --git a/src/mpt/tests/qos/procs/ppud.h b/src/mpt/tests/qos/procs/ppud.h index f94ca48..8f8e012 100644 --- a/src/mpt/tests/qos/procs/ppud.h +++ b/src/mpt/tests/qos/procs/ppud.h @@ -34,13 +34,13 @@ enum rwud { MPT_ProcessEntry (ppud, MPT_Args (dds_domainid_t domainid, - bool active, + bool master, unsigned ncycles)); MPT_ProcessEntry (rwud, MPT_Args (dds_domainid_t domainid, const char *topic_name, - bool active, + bool master, unsigned ncycles, enum rwud which)); diff --git a/src/mpt/tests/qos/procs/rw.c b/src/mpt/tests/qos/procs/rw.c index 0a3ec32..7dbec51 100644 --- a/src/mpt/tests/qos/procs/rw.c +++ b/src/mpt/tests/qos/procs/rw.c @@ -182,6 +182,9 @@ static bool writer_qos_eq_h (const dds_qos_t *a, dds_entity_t ent) return delta == 0; } +#define UD_QMPUB "qosmatch_publisher" +#define UD_QMPUBDONE UD_QMPUB ":ok" + MPT_ProcessEntry (rw_publisher, MPT_Args (dds_domainid_t domainid, const char *topic_name)) @@ -192,16 +195,18 @@ MPT_ProcessEntry (rw_publisher, dds_entity_t wr[NPUB][NWR_PUB]; bool chk[NPUB][NWR_PUB] = { { false } }; dds_return_t rc; - dds_qos_t *qos; + dds_qos_t *qos, *ppqos; int id = (int) ddsrt_getpid (); printf ("=== [Publisher(%d)] Start(%d) ...\n", id, (int) domainid); - qos = dds_create_qos (); - setqos (qos, 0, false, true); - dp = dds_create_participant (domainid, NULL, NULL); + ppqos = dds_create_qos (); + dds_qset_userdata (ppqos, UD_QMPUB, sizeof (UD_QMPUB) - 1); + dp = dds_create_participant (domainid, ppqos, NULL); MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); + qos = dds_create_qos (); + setqos (qos, 0, false, true); tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL); MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp)); @@ -269,7 +274,13 @@ MPT_ProcessEntry (rw_publisher, dds_sleepfor (DDS_MSECS (100)); } + dds_qset_userdata (ppqos, UD_QMPUBDONE, sizeof (UD_QMPUBDONE) - 1); + rc = dds_set_qos (dp, ppqos); + MPT_ASSERT_FATAL_EQ (rc, DDS_RETCODE_OK, "failed to participant QoS: %s\n", dds_strretcode (rc)); + /* Wait until subscribers terminate */ + printf ("wait for subscribers to terminate\n"); + fflush (stdout); while (true) { for (size_t i = 0; i < NPUB; i++) @@ -281,7 +292,11 @@ MPT_ProcessEntry (rw_publisher, MPT_ASSERT_FATAL_EQ (rc, DDS_RETCODE_OK, "dds_get_matched_publication_status failed for writer %zu %zu: %s\n", i, j, dds_strretcode (rc)); if (st.current_count) + { + printf ("%zu %zu: %d\n", i, j, (int) st.current_count); + fflush (stdout); goto have_matches; + } } } break; @@ -290,16 +305,43 @@ MPT_ProcessEntry (rw_publisher, } dds_delete_qos (qos); + dds_delete_qos (ppqos); rc = dds_delete (dp); MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n"); printf ("=== [Publisher(%d)] Done\n", id); } +static void wait_for_done (dds_entity_t rd, const char *userdata) +{ + int32_t n; + void *raw = NULL; + dds_sample_info_t si; + bool done = false; + while (!done) + { + while (!done && (n = dds_take (rd, &raw, &si, 1, 1)) == 1) + { + const dds_builtintopic_participant_t *sample = raw; + void *ud = NULL; + size_t usz = 0; + if (!si.valid_data || !dds_qget_userdata (sample->qos, &ud, &usz)) + continue; + if (ud && strcmp (ud, userdata) == 0) + done = true; + dds_free (ud); + dds_return_loan (rd, &raw, 1); + } + + if (!done) + dds_sleepfor (DDS_MSECS (100)); + } +} + MPT_ProcessEntry (rw_subscriber, MPT_Args (dds_domainid_t domainid, const char *topic_name)) { - dds_entity_t dp; + dds_entity_t dp, pprd; dds_entity_t tp; dds_entity_t sub[NPUB]; dds_entity_t rd[NPUB][NWR_PUB]; @@ -310,11 +352,13 @@ MPT_ProcessEntry (rw_subscriber, printf ("=== [Subscriber(%d)] Start(%d) ...\n", id, (int) domainid); - qos = dds_create_qos (); - setqos (qos, 0, true, true); dp = dds_create_participant (domainid, NULL, NULL); MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); + pprd = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL); + MPT_ASSERT_FATAL_GT (pprd, 0, "Could not create DCPSParticipant reader: %s\n", dds_strretcode (pprd)); + qos = dds_create_qos (); + setqos (qos, 0, true, true); tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL); MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp)); @@ -382,6 +426,9 @@ MPT_ProcessEntry (rw_subscriber, dds_sleepfor (DDS_MSECS (100)); } + printf ("wait for publisher to have completed its checks\n"); + wait_for_done (pprd, UD_QMPUBDONE); + dds_delete_qos (qos); rc = dds_delete (dp); MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");