Make qos MPTs more robust
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
3afce30c37
commit
d494ba4eda
3 changed files with 193 additions and 128 deletions
|
@ -28,25 +28,28 @@
|
||||||
void ppud_init (void) { }
|
void ppud_init (void) { }
|
||||||
void ppud_fini (void) { }
|
void ppud_fini (void) { }
|
||||||
|
|
||||||
static const char *exp_ud[] = {
|
|
||||||
"a", "bc", "def", ""
|
|
||||||
};
|
|
||||||
|
|
||||||
MPT_ProcessEntry (ppud,
|
MPT_ProcessEntry (ppud,
|
||||||
MPT_Args (dds_domainid_t domainid,
|
MPT_Args (dds_domainid_t domainid,
|
||||||
bool active,
|
bool master,
|
||||||
unsigned ncycles))
|
unsigned ncycles))
|
||||||
{
|
{
|
||||||
|
#define prefix "ppuserdata:"
|
||||||
|
static const char *exp_ud[] = {
|
||||||
|
prefix "a", prefix "bc", prefix "def", prefix ""
|
||||||
|
};
|
||||||
|
const char *expud = master ? prefix "X" : prefix;
|
||||||
|
size_t expusz = strlen (expud);
|
||||||
dds_entity_t dp, rd, ws;
|
dds_entity_t dp, rd, ws;
|
||||||
dds_instance_handle_t dpih;
|
dds_instance_handle_t dpih;
|
||||||
dds_return_t rc;
|
dds_return_t rc;
|
||||||
dds_qos_t *qos;
|
dds_qos_t *qos;
|
||||||
int id = (int) ddsrt_getpid ();
|
int id = (int) ddsrt_getpid ();
|
||||||
|
|
||||||
printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid);
|
printf ("=== [Check(%d)] master=%d ncycles=%u Start(%d) ...\n", id, master, ncycles, (int) domainid);
|
||||||
|
|
||||||
qos = dds_create_qos ();
|
qos = dds_create_qos ();
|
||||||
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
|
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
|
||||||
|
dds_qset_userdata (qos, expud, expusz);
|
||||||
dp = dds_create_participant (domainid, qos, NULL);
|
dp = dds_create_participant (domainid, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
||||||
rc = dds_get_instance_handle (dp, &dpih);
|
rc = dds_get_instance_handle (dp, &dpih);
|
||||||
|
@ -61,7 +64,7 @@ MPT_ProcessEntry (ppud,
|
||||||
MPT_ASSERT_FATAL_EQ (rc, 0, "Could not attach reader to waitset: %s\n", dds_strretcode (rc));
|
MPT_ASSERT_FATAL_EQ (rc, 0, "Could not attach reader to waitset: %s\n", dds_strretcode (rc));
|
||||||
|
|
||||||
bool done = false;
|
bool done = false;
|
||||||
bool first = true;
|
bool synced = !master;
|
||||||
unsigned exp_index = 0;
|
unsigned exp_index = 0;
|
||||||
unsigned exp_cycle = 0;
|
unsigned exp_cycle = 0;
|
||||||
while (!done)
|
while (!done)
|
||||||
|
@ -84,83 +87,88 @@ MPT_ProcessEntry (ppud,
|
||||||
void *ud = NULL;
|
void *ud = NULL;
|
||||||
size_t usz = 0;
|
size_t usz = 0;
|
||||||
if (!dds_qget_userdata (sample->qos, &ud, &usz))
|
if (!dds_qget_userdata (sample->qos, &ud, &usz))
|
||||||
printf ("%d: user data not set in QoS\n", id);
|
MPT_ASSERT (0, "%d: user data not set in QoS\n", id);
|
||||||
if (first && usz == 0)
|
if (ud == NULL || strncmp (ud, prefix, sizeof (prefix) - 1) != 0)
|
||||||
{
|
{
|
||||||
dds_qset_userdata (qos, "X", 1);
|
/* presumably another process */
|
||||||
rc = dds_set_qos (dp, qos);
|
}
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
|
else if (!synced && strcmp (ud, expud) != 0)
|
||||||
|
{
|
||||||
|
/* slave hasn't discovered us yet */
|
||||||
|
}
|
||||||
|
else if (synced && master && strcmp (ud, prefix "X") == 0 && exp_index == 1 && exp_cycle == 0)
|
||||||
|
{
|
||||||
|
/* FIXME: don't want no stutter of the initial sample ... */
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const char *exp = exp_ud[exp_index];
|
synced = true;
|
||||||
if (first && strcmp (ud, "X") == 0)
|
if (master)
|
||||||
exp = "X";
|
|
||||||
const size_t expsz = strlen (exp);
|
|
||||||
bool eq = (usz == expsz && (usz == 0 || memcmp (ud, exp, usz) == 0));
|
|
||||||
//printf ("%d: expected %u %zu/%s received %zu/%s\n",
|
|
||||||
// id, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)");
|
|
||||||
MPT_ASSERT (eq, "User data mismatch: expected %u %zu/%s received %zu/%s\n",
|
|
||||||
exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)");
|
|
||||||
if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0]))
|
|
||||||
{
|
{
|
||||||
exp_index = 0;
|
bool eq = (usz == expusz && (usz == 0 || memcmp (ud, expud, usz) == 0));
|
||||||
exp_cycle++;
|
printf ("%d: expected %u %zu/%s received %zu/%s\n",
|
||||||
}
|
id, exp_index, expusz, expud, usz, ud ? (char *) ud : "(null)");
|
||||||
|
MPT_ASSERT (eq, "user data mismatch: expected %u %zu/%s received %zu/%s\n",
|
||||||
|
exp_index, expusz, expud ? expud : "(null)", usz, ud ? (char *) ud : "(null)");
|
||||||
|
if (++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0]))
|
||||||
|
{
|
||||||
|
exp_index = 0;
|
||||||
|
exp_cycle++;
|
||||||
|
}
|
||||||
|
|
||||||
if (active && exp_cycle == ncycles)
|
if (exp_cycle == ncycles)
|
||||||
done = true;
|
done = true;
|
||||||
|
|
||||||
|
expud = exp_ud[exp_index];
|
||||||
|
expusz = strlen (expud);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const void *newud;
|
printf ("%d: slave: received %zu/%s\n", id, usz, ud ? (char *) ud : "(null)");
|
||||||
size_t newusz;
|
expud = ud;
|
||||||
if (!active)
|
expusz = usz;
|
||||||
{
|
|
||||||
/* Set user data to the same value in response */
|
|
||||||
newud = ud; newusz = usz;
|
|
||||||
dds_qset_userdata (qos, ud, usz);
|
|
||||||
}
|
|
||||||
else /* Set next agreed value */
|
|
||||||
{
|
|
||||||
newud = exp_ud[exp_index]; newusz = strlen (exp_ud[exp_index]);
|
|
||||||
dds_qset_userdata (qos, newud, newusz);
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = dds_set_qos (dp, qos);
|
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
|
|
||||||
|
|
||||||
dds_qos_t *chk = dds_create_qos ();
|
|
||||||
rc = dds_get_qos (dp, chk);
|
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc));
|
|
||||||
|
|
||||||
void *chkud = NULL;
|
|
||||||
size_t chkusz = 0;
|
|
||||||
if (!dds_qget_userdata (chk, &chkud, &chkusz))
|
|
||||||
MPT_ASSERT (0, "Check QoS: no user data present\n");
|
|
||||||
MPT_ASSERT (chkusz == newusz && (newusz == 0 || memcmp (chkud, newud, newusz) == 0),
|
|
||||||
"Retrieved user data differs from user data just set (%zu/%s vs %zu/%s)\n",
|
|
||||||
chkusz, chkud ? (char *) chkud : "(null)", newusz, newud ? (char *) newud : "(null)");
|
|
||||||
dds_free (chkud);
|
|
||||||
dds_delete_qos (chk);
|
|
||||||
first = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dds_qset_userdata (qos, expud, expusz);
|
||||||
|
rc = dds_set_qos (dp, qos);
|
||||||
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
|
||||||
|
|
||||||
|
dds_qos_t *chk = dds_create_qos ();
|
||||||
|
rc = dds_get_qos (dp, chk);
|
||||||
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc));
|
||||||
|
|
||||||
|
void *chkud = NULL;
|
||||||
|
size_t chkusz = 0;
|
||||||
|
if (!dds_qget_userdata (chk, &chkud, &chkusz))
|
||||||
|
MPT_ASSERT (0, "Check QoS: no user data present\n");
|
||||||
|
MPT_ASSERT (chkusz == expusz && (expusz == 0 || memcmp (chkud, expud, expusz) == 0),
|
||||||
|
"Retrieved user data differs from user data just set (%zu/%s vs %zu/%s)\n",
|
||||||
|
chkusz, chkud ? (char *) chkud : "(null)", expusz, expud ? (char *) expud : "(null)");
|
||||||
|
dds_free (chkud);
|
||||||
|
dds_delete_qos (chk);
|
||||||
}
|
}
|
||||||
dds_free (ud);
|
dds_free (ud);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n));
|
MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n));
|
||||||
dds_return_loan (rd, &raw, 1);
|
dds_return_loan (rd, &raw, 1);
|
||||||
|
fflush (stdout);
|
||||||
}
|
}
|
||||||
dds_delete_qos (qos);
|
dds_delete_qos (qos);
|
||||||
rc = dds_delete (dp);
|
rc = dds_delete (dp);
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");
|
||||||
printf ("=== [Check(%d)] Done\n", id);
|
printf ("=== [Check(%d)] Done\n", id);
|
||||||
|
#undef prefix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char *exp_rwud[] = {
|
||||||
|
"a", "bc", "def", ""
|
||||||
|
};
|
||||||
|
|
||||||
MPT_ProcessEntry (rwud,
|
MPT_ProcessEntry (rwud,
|
||||||
MPT_Args (dds_domainid_t domainid,
|
MPT_Args (dds_domainid_t domainid,
|
||||||
const char *topic_name,
|
const char *topic_name,
|
||||||
bool active,
|
bool master,
|
||||||
unsigned ncycles,
|
unsigned ncycles,
|
||||||
enum rwud which))
|
enum rwud which))
|
||||||
{
|
{
|
||||||
|
@ -168,12 +176,15 @@ MPT_ProcessEntry (rwud,
|
||||||
void (*qset) (dds_qos_t * __restrict qos, const void *value, size_t sz) = 0;
|
void (*qset) (dds_qos_t * __restrict qos, const void *value, size_t sz) = 0;
|
||||||
const char *qname = "UNDEFINED";
|
const char *qname = "UNDEFINED";
|
||||||
|
|
||||||
dds_entity_t dp, tp, ep, rdep, qent = 0, ws;
|
dds_entity_t dp, tp, ep, grp, rdep, qent = 0, ws;
|
||||||
dds_return_t rc;
|
dds_return_t rc;
|
||||||
dds_qos_t *qos;
|
dds_qos_t *qos;
|
||||||
int id = (int) ddsrt_getpid ();
|
int id = (int) ddsrt_getpid ();
|
||||||
|
|
||||||
printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid);
|
const char *expud = master ? "X" : "";
|
||||||
|
size_t expusz = strlen (expud);
|
||||||
|
|
||||||
|
printf ("=== [Check(%d)] master=%d ncycles=%u Start(%d) ...\n", id, master, ncycles, (int) domainid);
|
||||||
|
|
||||||
qos = dds_create_qos ();
|
qos = dds_create_qos ();
|
||||||
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
|
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
|
||||||
|
@ -181,18 +192,22 @@ MPT_ProcessEntry (rwud,
|
||||||
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
||||||
tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL);
|
tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp));
|
MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp));
|
||||||
if (active)
|
if (master)
|
||||||
{
|
{
|
||||||
rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, qos, NULL);
|
rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSSubscription reader: %s\n", dds_strretcode (rdep));
|
MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSSubscription reader: %s\n", dds_strretcode (rdep));
|
||||||
ep = dds_create_writer (dp, tp, qos, NULL);
|
grp = dds_create_publisher (dp, qos, NULL);
|
||||||
|
MPT_ASSERT_FATAL_GT (grp, 0, "Could not create publisher: %s\n", dds_strretcode (grp));
|
||||||
|
ep = dds_create_writer (grp, tp, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (ep, 0, "Could not create writer: %s\n", dds_strretcode (ep));
|
MPT_ASSERT_FATAL_GT (ep, 0, "Could not create writer: %s\n", dds_strretcode (ep));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, qos, NULL);
|
rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSPublication reader: %s\n", dds_strretcode (rdep));
|
MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSPublication reader: %s\n", dds_strretcode (rdep));
|
||||||
ep = dds_create_reader (dp, tp, qos, NULL);
|
grp = dds_create_subscriber (dp, qos, NULL);
|
||||||
|
MPT_ASSERT_FATAL_GT (grp, 0, "Could not create subscriber: %s\n", dds_strretcode (grp));
|
||||||
|
ep = dds_create_reader (grp, tp, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (ep, 0, "Could not create reader: %s\n", dds_strretcode (ep));
|
MPT_ASSERT_FATAL_GT (ep, 0, "Could not create reader: %s\n", dds_strretcode (ep));
|
||||||
}
|
}
|
||||||
rc = dds_set_status_mask (rdep, DDS_DATA_AVAILABLE_STATUS);
|
rc = dds_set_status_mask (rdep, DDS_DATA_AVAILABLE_STATUS);
|
||||||
|
@ -214,7 +229,7 @@ MPT_ProcessEntry (rwud,
|
||||||
qget = dds_qget_groupdata;
|
qget = dds_qget_groupdata;
|
||||||
qset = dds_qset_groupdata;
|
qset = dds_qset_groupdata;
|
||||||
qname = "group data";
|
qname = "group data";
|
||||||
qent = dds_get_parent (ep);
|
qent = grp;
|
||||||
MPT_ASSERT_FATAL_GT (qent, 0, "Could not get pub/sub from wr/rd: %s\n", dds_strretcode (qent));
|
MPT_ASSERT_FATAL_GT (qent, 0, "Could not get pub/sub from wr/rd: %s\n", dds_strretcode (qent));
|
||||||
break;
|
break;
|
||||||
case RWUD_TOPICDATA:
|
case RWUD_TOPICDATA:
|
||||||
|
@ -225,10 +240,18 @@ MPT_ProcessEntry (rwud,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (master)
|
||||||
|
{
|
||||||
|
qset (qos, expud, expusz);
|
||||||
|
rc = dds_set_qos (qent, qos);
|
||||||
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
|
||||||
|
}
|
||||||
|
|
||||||
bool done = false;
|
bool done = false;
|
||||||
bool first = true;
|
bool synced = !master;
|
||||||
unsigned exp_index = 0;
|
unsigned exp_index = 0;
|
||||||
unsigned exp_cycle = 0;
|
unsigned exp_cycle = 0;
|
||||||
|
dds_instance_handle_t peer = 0;
|
||||||
while (!done)
|
while (!done)
|
||||||
{
|
{
|
||||||
rc = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY);
|
rc = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY);
|
||||||
|
@ -240,81 +263,76 @@ MPT_ProcessEntry (rwud,
|
||||||
while ((n = dds_take (rdep, &raw, &si, 1, 1)) == 1)
|
while ((n = dds_take (rdep, &raw, &si, 1, 1)) == 1)
|
||||||
{
|
{
|
||||||
const dds_builtintopic_endpoint_t *sample = raw;
|
const dds_builtintopic_endpoint_t *sample = raw;
|
||||||
if (si.instance_state != DDS_IST_ALIVE)
|
if (si.instance_state != DDS_IST_ALIVE && si.instance_handle == peer)
|
||||||
done = true;
|
done = true;
|
||||||
else if (!si.valid_data || strcmp (sample->topic_name, topic_name) != 0)
|
else if (!si.valid_data)
|
||||||
|
continue;
|
||||||
|
else if ((peer && si.instance_handle != peer) || strcmp (sample->topic_name, topic_name) != 0)
|
||||||
continue;
|
continue;
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
void *ud = NULL;
|
void *ud = NULL;
|
||||||
size_t usz = 0;
|
size_t usz = 0;
|
||||||
if (!qget (sample->qos, &ud, &usz))
|
if (!qget (sample->qos, &ud, &usz))
|
||||||
printf ("%d: group data not set in QoS\n", id);
|
MPT_ASSERT (0, "%d: %s not set in QoS\n", id, qname);
|
||||||
if (first && usz == 0)
|
else if (!synced && (ud == NULL || strcmp (ud, expud) != 0))
|
||||||
{
|
{
|
||||||
qset (qos, "X", 1);
|
/* slave hasn't discovered us yet */
|
||||||
rc = dds_set_qos (qent, qos);
|
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const char *exp = exp_ud[exp_index];
|
peer = si.instance_handle;
|
||||||
if (first && strcmp (ud, "X") == 0)
|
synced = true;
|
||||||
exp = "X";
|
if (master)
|
||||||
const size_t expsz = first ? 1 : strlen (exp);
|
|
||||||
bool eq = (usz == expsz && (usz == 0 || memcmp (ud, exp, usz) == 0));
|
|
||||||
//printf ("%d: expected %u %zu/%s received %zu/%s\n",
|
|
||||||
// id, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)");
|
|
||||||
MPT_ASSERT (eq, "%s mismatch: expected %u %zu/%s received %zu/%s\n",
|
|
||||||
qname, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)");
|
|
||||||
if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0]))
|
|
||||||
{
|
{
|
||||||
exp_index = 0;
|
bool eq = (usz == expusz && (usz == 0 || memcmp (ud, expud, usz) == 0));
|
||||||
exp_cycle++;
|
printf ("%d: expected %u %zu/%s received %zu/%s\n",
|
||||||
}
|
id, exp_index, expusz, expud, usz, ud ? (char *) ud : "(null)");
|
||||||
|
MPT_ASSERT (eq, "%s mismatch: expected %u %zu/%s received %zu/%s\n",
|
||||||
|
qname, exp_index, expusz, expud ? expud : "(null)", usz, ud ? (char *) ud : "(null)");
|
||||||
|
if (++exp_index == sizeof (exp_rwud) / sizeof (exp_rwud[0]))
|
||||||
|
{
|
||||||
|
exp_index = 0;
|
||||||
|
exp_cycle++;
|
||||||
|
}
|
||||||
|
|
||||||
if (active && exp_cycle == ncycles)
|
if (exp_cycle == ncycles)
|
||||||
done = true;
|
done = true;
|
||||||
|
|
||||||
|
expud = exp_rwud[exp_index];
|
||||||
|
expusz = strlen (expud);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const void *newud;
|
printf ("%d: slave: received %zu/%s\n", id, usz, ud ? (char *) ud : "(null)");
|
||||||
size_t newusz;
|
expud = ud;
|
||||||
if (!active)
|
expusz = usz;
|
||||||
{
|
|
||||||
/* Set group data to the same value in response */
|
|
||||||
newud = ud; newusz = usz;
|
|
||||||
qset (qos, ud, usz);
|
|
||||||
}
|
|
||||||
else /* Set next agreed value */
|
|
||||||
{
|
|
||||||
newud = exp_ud[exp_index]; newusz = strlen (exp_ud[exp_index]);
|
|
||||||
qset (qos, newud, newusz);
|
|
||||||
}
|
|
||||||
|
|
||||||
rc = dds_set_qos (qent, qos);
|
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
|
|
||||||
|
|
||||||
dds_qos_t *chk = dds_create_qos ();
|
|
||||||
rc = dds_get_qos (ep, chk);
|
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc));
|
|
||||||
|
|
||||||
void *chkud = NULL;
|
|
||||||
size_t chkusz = 0;
|
|
||||||
if (!qget (chk, &chkud, &chkusz))
|
|
||||||
MPT_ASSERT (0, "Check QoS: no %s present\n", qname);
|
|
||||||
MPT_ASSERT (chkusz == newusz && (newusz == 0 || memcmp (chkud, newud, newusz) == 0),
|
|
||||||
"Retrieved %s differs from group data just set (%zu/%s vs %zu/%s)\n", qname,
|
|
||||||
chkusz, chkud ? (char *) chkud : "(null)", newusz, newud ? (char *) newud : "(null)");
|
|
||||||
dds_free (chkud);
|
|
||||||
dds_delete_qos (chk);
|
|
||||||
first = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qset (qos, expud, expusz);
|
||||||
|
rc = dds_set_qos (qent, qos);
|
||||||
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
|
||||||
|
|
||||||
|
dds_qos_t *chk = dds_create_qos ();
|
||||||
|
rc = dds_get_qos (ep, chk);
|
||||||
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc));
|
||||||
|
|
||||||
|
void *chkud = NULL;
|
||||||
|
size_t chkusz = 0;
|
||||||
|
if (!qget (chk, &chkud, &chkusz))
|
||||||
|
MPT_ASSERT (0, "Check QoS: no %s present\n", qname);
|
||||||
|
MPT_ASSERT (chkusz == expusz && (expusz == 0 || memcmp (chkud, expud, expusz) == 0),
|
||||||
|
"Retrieved %s differs from group data just set (%zu/%s vs %zu/%s)\n", qname,
|
||||||
|
chkusz, chkud ? (char *) chkud : "(null)", expusz, expud ? (char *) expud : "(null)");
|
||||||
|
dds_free (chkud);
|
||||||
|
dds_delete_qos (chk);
|
||||||
}
|
}
|
||||||
dds_free (ud);
|
dds_free (ud);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n));
|
MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n));
|
||||||
dds_return_loan (rdep, &raw, 1);
|
dds_return_loan (rdep, &raw, 1);
|
||||||
|
fflush (stdout);
|
||||||
}
|
}
|
||||||
dds_delete_qos (qos);
|
dds_delete_qos (qos);
|
||||||
rc = dds_delete (dp);
|
rc = dds_delete (dp);
|
||||||
|
|
|
@ -34,13 +34,13 @@ enum rwud {
|
||||||
|
|
||||||
MPT_ProcessEntry (ppud,
|
MPT_ProcessEntry (ppud,
|
||||||
MPT_Args (dds_domainid_t domainid,
|
MPT_Args (dds_domainid_t domainid,
|
||||||
bool active,
|
bool master,
|
||||||
unsigned ncycles));
|
unsigned ncycles));
|
||||||
|
|
||||||
MPT_ProcessEntry (rwud,
|
MPT_ProcessEntry (rwud,
|
||||||
MPT_Args (dds_domainid_t domainid,
|
MPT_Args (dds_domainid_t domainid,
|
||||||
const char *topic_name,
|
const char *topic_name,
|
||||||
bool active,
|
bool master,
|
||||||
unsigned ncycles,
|
unsigned ncycles,
|
||||||
enum rwud which));
|
enum rwud which));
|
||||||
|
|
||||||
|
|
|
@ -182,6 +182,9 @@ static bool writer_qos_eq_h (const dds_qos_t *a, dds_entity_t ent)
|
||||||
return delta == 0;
|
return delta == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define UD_QMPUB "qosmatch_publisher"
|
||||||
|
#define UD_QMPUBDONE UD_QMPUB ":ok"
|
||||||
|
|
||||||
MPT_ProcessEntry (rw_publisher,
|
MPT_ProcessEntry (rw_publisher,
|
||||||
MPT_Args (dds_domainid_t domainid,
|
MPT_Args (dds_domainid_t domainid,
|
||||||
const char *topic_name))
|
const char *topic_name))
|
||||||
|
@ -192,16 +195,18 @@ MPT_ProcessEntry (rw_publisher,
|
||||||
dds_entity_t wr[NPUB][NWR_PUB];
|
dds_entity_t wr[NPUB][NWR_PUB];
|
||||||
bool chk[NPUB][NWR_PUB] = { { false } };
|
bool chk[NPUB][NWR_PUB] = { { false } };
|
||||||
dds_return_t rc;
|
dds_return_t rc;
|
||||||
dds_qos_t *qos;
|
dds_qos_t *qos, *ppqos;
|
||||||
int id = (int) ddsrt_getpid ();
|
int id = (int) ddsrt_getpid ();
|
||||||
|
|
||||||
printf ("=== [Publisher(%d)] Start(%d) ...\n", id, (int) domainid);
|
printf ("=== [Publisher(%d)] Start(%d) ...\n", id, (int) domainid);
|
||||||
|
|
||||||
qos = dds_create_qos ();
|
ppqos = dds_create_qos ();
|
||||||
setqos (qos, 0, false, true);
|
dds_qset_userdata (ppqos, UD_QMPUB, sizeof (UD_QMPUB) - 1);
|
||||||
dp = dds_create_participant (domainid, NULL, NULL);
|
dp = dds_create_participant (domainid, ppqos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
||||||
|
|
||||||
|
qos = dds_create_qos ();
|
||||||
|
setqos (qos, 0, false, true);
|
||||||
tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL);
|
tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp));
|
MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp));
|
||||||
|
|
||||||
|
@ -269,7 +274,13 @@ MPT_ProcessEntry (rw_publisher,
|
||||||
dds_sleepfor (DDS_MSECS (100));
|
dds_sleepfor (DDS_MSECS (100));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dds_qset_userdata (ppqos, UD_QMPUBDONE, sizeof (UD_QMPUBDONE) - 1);
|
||||||
|
rc = dds_set_qos (dp, ppqos);
|
||||||
|
MPT_ASSERT_FATAL_EQ (rc, DDS_RETCODE_OK, "failed to participant QoS: %s\n", dds_strretcode (rc));
|
||||||
|
|
||||||
/* Wait until subscribers terminate */
|
/* Wait until subscribers terminate */
|
||||||
|
printf ("wait for subscribers to terminate\n");
|
||||||
|
fflush (stdout);
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < NPUB; i++)
|
for (size_t i = 0; i < NPUB; i++)
|
||||||
|
@ -281,7 +292,11 @@ MPT_ProcessEntry (rw_publisher,
|
||||||
MPT_ASSERT_FATAL_EQ (rc, DDS_RETCODE_OK, "dds_get_matched_publication_status failed for writer %zu %zu: %s\n",
|
MPT_ASSERT_FATAL_EQ (rc, DDS_RETCODE_OK, "dds_get_matched_publication_status failed for writer %zu %zu: %s\n",
|
||||||
i, j, dds_strretcode (rc));
|
i, j, dds_strretcode (rc));
|
||||||
if (st.current_count)
|
if (st.current_count)
|
||||||
|
{
|
||||||
|
printf ("%zu %zu: %d\n", i, j, (int) st.current_count);
|
||||||
|
fflush (stdout);
|
||||||
goto have_matches;
|
goto have_matches;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -290,16 +305,43 @@ MPT_ProcessEntry (rw_publisher,
|
||||||
}
|
}
|
||||||
|
|
||||||
dds_delete_qos (qos);
|
dds_delete_qos (qos);
|
||||||
|
dds_delete_qos (ppqos);
|
||||||
rc = dds_delete (dp);
|
rc = dds_delete (dp);
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");
|
||||||
printf ("=== [Publisher(%d)] Done\n", id);
|
printf ("=== [Publisher(%d)] Done\n", id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void wait_for_done (dds_entity_t rd, const char *userdata)
|
||||||
|
{
|
||||||
|
int32_t n;
|
||||||
|
void *raw = NULL;
|
||||||
|
dds_sample_info_t si;
|
||||||
|
bool done = false;
|
||||||
|
while (!done)
|
||||||
|
{
|
||||||
|
while (!done && (n = dds_take (rd, &raw, &si, 1, 1)) == 1)
|
||||||
|
{
|
||||||
|
const dds_builtintopic_participant_t *sample = raw;
|
||||||
|
void *ud = NULL;
|
||||||
|
size_t usz = 0;
|
||||||
|
if (!si.valid_data || !dds_qget_userdata (sample->qos, &ud, &usz))
|
||||||
|
continue;
|
||||||
|
if (ud && strcmp (ud, userdata) == 0)
|
||||||
|
done = true;
|
||||||
|
dds_free (ud);
|
||||||
|
dds_return_loan (rd, &raw, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!done)
|
||||||
|
dds_sleepfor (DDS_MSECS (100));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
MPT_ProcessEntry (rw_subscriber,
|
MPT_ProcessEntry (rw_subscriber,
|
||||||
MPT_Args (dds_domainid_t domainid,
|
MPT_Args (dds_domainid_t domainid,
|
||||||
const char *topic_name))
|
const char *topic_name))
|
||||||
{
|
{
|
||||||
dds_entity_t dp;
|
dds_entity_t dp, pprd;
|
||||||
dds_entity_t tp;
|
dds_entity_t tp;
|
||||||
dds_entity_t sub[NPUB];
|
dds_entity_t sub[NPUB];
|
||||||
dds_entity_t rd[NPUB][NWR_PUB];
|
dds_entity_t rd[NPUB][NWR_PUB];
|
||||||
|
@ -310,11 +352,13 @@ MPT_ProcessEntry (rw_subscriber,
|
||||||
|
|
||||||
printf ("=== [Subscriber(%d)] Start(%d) ...\n", id, (int) domainid);
|
printf ("=== [Subscriber(%d)] Start(%d) ...\n", id, (int) domainid);
|
||||||
|
|
||||||
qos = dds_create_qos ();
|
|
||||||
setqos (qos, 0, true, true);
|
|
||||||
dp = dds_create_participant (domainid, NULL, NULL);
|
dp = dds_create_participant (domainid, NULL, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp));
|
||||||
|
pprd = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL);
|
||||||
|
MPT_ASSERT_FATAL_GT (pprd, 0, "Could not create DCPSParticipant reader: %s\n", dds_strretcode (pprd));
|
||||||
|
|
||||||
|
qos = dds_create_qos ();
|
||||||
|
setqos (qos, 0, true, true);
|
||||||
tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL);
|
tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL);
|
||||||
MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp));
|
MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp));
|
||||||
|
|
||||||
|
@ -382,6 +426,9 @@ MPT_ProcessEntry (rw_subscriber,
|
||||||
dds_sleepfor (DDS_MSECS (100));
|
dds_sleepfor (DDS_MSECS (100));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
printf ("wait for publisher to have completed its checks\n");
|
||||||
|
wait_for_done (pprd, UD_QMPUBDONE);
|
||||||
|
|
||||||
dds_delete_qos (qos);
|
dds_delete_qos (qos);
|
||||||
rc = dds_delete (dp);
|
rc = dds_delete (dp);
|
||||||
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");
|
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue