Add multi-domain version of topic_data test

This is the same test for verifying set_qos on topic_data is reflected
in the DCPSPublication and DCPSSubscription topics as the one that
already exists, but it uses two threads in two domains.  A barrier is
used to ensure the threads in the two processes indeed execute
concurrently, and different topic data sequences are used to increase
the likelihood of detecting leakage from one domain into the other.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-07-19 12:36:23 +02:00 committed by eboasson
parent 5e31a3df90
commit 711026114b
3 changed files with 141 additions and 11 deletions

View file

@ -27,8 +27,8 @@ MPT_Test(qos, ppuserdata, .init=ppud_init, .fini=ppud_fini);
/*
* Checks whether reader/writer user_data QoS changes work.
*/
#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", true, 10, RWUD_USERDATA)
#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", false, 0, RWUD_USERDATA)
#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", true, 10, RWUD_USERDATA, NULL)
#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", false, 0, RWUD_USERDATA, NULL)
MPT_TestProcess(qos, rwuserdata, a, rwud, TEST_A_ARGS);
MPT_TestProcess(qos, rwuserdata, b, rwud, TEST_B_ARGS);
MPT_Test(qos, rwuserdata, .init=ppud_init, .fini=ppud_fini);
@ -38,8 +38,8 @@ MPT_Test(qos, rwuserdata, .init=ppud_init, .fini=ppud_fini);
/*
* Checks whether topic_data QoS changes become visible in reader/writer.
*/
#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", true, 10, RWUD_TOPICDATA)
#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", false, 0, RWUD_TOPICDATA)
#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", true, 10, RWUD_TOPICDATA, NULL)
#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", false, 0, RWUD_TOPICDATA, NULL)
MPT_TestProcess(qos, rwtopicdata, a, rwud, TEST_A_ARGS);
MPT_TestProcess(qos, rwtopicdata, b, rwud, TEST_B_ARGS);
MPT_Test(qos, rwtopicdata, .init=ppud_init, .fini=ppud_fini);
@ -49,10 +49,23 @@ MPT_Test(qos, rwtopicdata, .init=ppud_init, .fini=ppud_fini);
/*
* Checks whether group_data QoS changes become visible in reader/writer.
*/
#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", true, 10, RWUD_GROUPDATA)
#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", false, 0, RWUD_GROUPDATA)
#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", true, 10, RWUD_GROUPDATA, NULL)
#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", false, 0, RWUD_GROUPDATA, NULL)
MPT_TestProcess(qos, rwgroupdata, a, rwud, TEST_A_ARGS);
MPT_TestProcess(qos, rwgroupdata, b, rwud, TEST_B_ARGS);
MPT_Test(qos, rwgroupdata, .init=ppud_init, .fini=ppud_fini);
#undef TEST_A_ARGS
#undef TEST_B_ARGS
/*
* Checks whether topic_data QoS changes become visible in reader/writer,
* but doing so in 2 domains simultaneously -- the specified domain id,
* and the one immediately above that
*/
#define TEST_A_ARGS MPT_ArgValues(3, "rwtopicdataM", true, 10, RWUD_TOPICDATA)
#define TEST_B_ARGS MPT_ArgValues(3, "rwtopicdataM", false, 0, RWUD_TOPICDATA)
MPT_TestProcess(qos, rwtopicdataM, a, rwudM, TEST_A_ARGS);
MPT_TestProcess(qos, rwtopicdataM, b, rwudM, TEST_B_ARGS);
MPT_Test(qos, rwtopicdataM, .init=ppud_init, .fini=ppud_fini);
#undef TEST_A_ARGS
#undef TEST_B_ARGS

View file

@ -18,6 +18,8 @@
#include "dds/dds.h"
#include "dds/ddsrt/time.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/sync.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/sockets.h"
#include "dds/ddsrt/heap.h"
@ -161,16 +163,42 @@ MPT_ProcessEntry (ppud,
#undef prefix
}
static const char *exp_rwud[] = {
"a", "bc", "def", ""
static const char *exp_rwud[2][4] = {
{ "a", "bc", "def", "" },
{ "p", "qr", "stu", "" }
};
struct rwud_barrier {
ddsrt_mutex_t lock;
ddsrt_cond_t cond;
int initcount;
int count;
};
static void barrierwait (struct rwud_barrier *barrier, int id)
{
printf ("%d waiting at barrier\n", id);
fflush (stdout);
ddsrt_mutex_lock (&barrier->lock);
assert (barrier->initcount > 0);
if (barrier->count == 0)
barrier->count = barrier->initcount;
if (--barrier->count == 0)
ddsrt_cond_broadcast (&barrier->cond);
while (barrier->count > 0)
ddsrt_cond_wait (&barrier->cond, &barrier->lock);
ddsrt_mutex_unlock (&barrier->lock);
printf ("%d continuing past barrier\n", id);
fflush (stdout);
}
MPT_ProcessEntry (rwud,
MPT_Args (dds_domainid_t domainid,
const char *topic_name,
bool master,
unsigned ncycles,
enum rwud which))
enum rwud which,
struct rwud_barrier *barrier))
{
bool (*qget) (const dds_qos_t * __restrict qos, void **value, size_t *sz) = 0;
void (*qset) (dds_qos_t * __restrict qos, const void *value, size_t sz) = 0;
@ -247,8 +275,14 @@ MPT_ProcessEntry (rwud,
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc));
}
if (barrier)
{
barrierwait (barrier, id);
}
bool done = false;
bool synced = !master;
const unsigned exp_setindex = (unsigned) domainid % 2;
unsigned exp_index = 0;
unsigned exp_cycle = 0;
dds_instance_handle_t peer = 0;
@ -290,7 +324,7 @@ MPT_ProcessEntry (rwud,
id, exp_index, expusz, expud, usz, ud ? (char *) ud : "(null)");
MPT_ASSERT (eq, "%s mismatch: expected %u %zu/%s received %zu/%s\n",
qname, exp_index, expusz, expud ? expud : "(null)", usz, ud ? (char *) ud : "(null)");
if (++exp_index == sizeof (exp_rwud) / sizeof (exp_rwud[0]))
if (++exp_index == sizeof (exp_rwud[0]) / sizeof (exp_rwud[0][0]))
{
exp_index = 0;
exp_cycle++;
@ -299,7 +333,7 @@ MPT_ProcessEntry (rwud,
if (exp_cycle == ncycles)
done = true;
expud = exp_rwud[exp_index];
expud = exp_rwud[exp_setindex][exp_index];
expusz = strlen (expud);
}
else
@ -335,7 +369,80 @@ MPT_ProcessEntry (rwud,
fflush (stdout);
}
dds_delete_qos (qos);
if (barrier)
{
barrierwait (barrier, id);
}
rc = dds_delete (dp);
MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n");
printf ("=== [Check(%d)] Done\n", id);
}
struct rwudM_thread_arg {
dds_domainid_t domainid;
const char *topic_name;
bool master;
unsigned ncycles;
enum rwud which;
mpt_retval_t retval;
struct rwud_barrier *barrier;
const mpt_data_t *mpt__args__;
};
static uint32_t rwudM_thread (void *varg)
{
struct rwudM_thread_arg *arg = varg;
const mpt_data_t *mpt__args__ = arg->mpt__args__;
mpt_retval_t *mpt__retval__ = &arg->retval;
MPT_ProcessEntryName(rwud) (MPT_ArgValues (arg->domainid, arg->topic_name, arg->master, arg->ncycles, arg->which, arg->barrier));
return 0;
}
MPT_ProcessEntry (rwudM,
MPT_Args (dds_domainid_t domainid,
const char *topic_name,
bool master,
unsigned ncycles,
enum rwud which))
{
dds_return_t ret;
uint32_t dummy;
ddsrt_thread_t thr[2];
ddsrt_threadattr_t attr;
struct rwud_barrier barrier;
struct rwudM_thread_arg a = {
.domainid = domainid,
.topic_name = topic_name,
.master = master,
.ncycles = ncycles,
.which = which,
.barrier = &barrier,
.retval = MPT_SUCCESS,
.mpt__args__ = mpt__args__
};
struct rwudM_thread_arg b;
b = a; ++b.domainid;
ddsrt_mutex_init (&barrier.lock);
ddsrt_cond_init (&barrier.cond);
barrier.initcount = 2;
barrier.count = 0;
ddsrt_threadattr_init (&attr);
ret = ddsrt_thread_create (&thr[0], "a", &attr, &rwudM_thread, &a);
MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to create thread a\n");
ret = ddsrt_thread_create (&thr[1], "b", &attr, &rwudM_thread, &b);
MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to create thread b\n");
ret = ddsrt_thread_join (thr[0], &dummy);
MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to join thread a\n");
ret = ddsrt_thread_join (thr[1], &dummy);
MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to join thread b\n");
/* forward thread failures to process failures */
MPT_ASSERT_EQ (a.retval, MPT_SUCCESS, "thread a failed\n");
MPT_ASSERT_EQ (b.retval, MPT_SUCCESS, "thread b failed\n");
ddsrt_cond_destroy (&barrier.cond);
ddsrt_mutex_destroy (&barrier.lock);
}

View file

@ -32,12 +32,22 @@ enum rwud {
RWUD_TOPICDATA
};
struct rwud_barrier;
MPT_ProcessEntry (ppud,
MPT_Args (dds_domainid_t domainid,
bool master,
unsigned ncycles));
MPT_ProcessEntry (rwud,
MPT_Args (dds_domainid_t domainid,
const char *topic_name,
bool master,
unsigned ncycles,
enum rwud which,
struct rwud_barrier *barrier));
MPT_ProcessEntry (rwudM,
MPT_Args (dds_domainid_t domainid,
const char *topic_name,
bool master,