diff --git a/src/mpt/tests/qos/ppuserdata.c b/src/mpt/tests/qos/ppuserdata.c index 0b319c7..4951311 100644 --- a/src/mpt/tests/qos/ppuserdata.c +++ b/src/mpt/tests/qos/ppuserdata.c @@ -27,8 +27,8 @@ MPT_Test(qos, ppuserdata, .init=ppud_init, .fini=ppud_fini); /* * Checks whether reader/writer user_data QoS changes work. */ -#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", true, 10, RWUD_USERDATA) -#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", false, 0, RWUD_USERDATA) +#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", true, 10, RWUD_USERDATA, NULL) +#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", false, 0, RWUD_USERDATA, NULL) MPT_TestProcess(qos, rwuserdata, a, rwud, TEST_A_ARGS); MPT_TestProcess(qos, rwuserdata, b, rwud, TEST_B_ARGS); MPT_Test(qos, rwuserdata, .init=ppud_init, .fini=ppud_fini); @@ -38,8 +38,8 @@ MPT_Test(qos, rwuserdata, .init=ppud_init, .fini=ppud_fini); /* * Checks whether topic_data QoS changes become visible in reader/writer. */ -#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", true, 10, RWUD_TOPICDATA) -#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", false, 0, RWUD_TOPICDATA) +#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", true, 10, RWUD_TOPICDATA, NULL) +#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", false, 0, RWUD_TOPICDATA, NULL) MPT_TestProcess(qos, rwtopicdata, a, rwud, TEST_A_ARGS); MPT_TestProcess(qos, rwtopicdata, b, rwud, TEST_B_ARGS); MPT_Test(qos, rwtopicdata, .init=ppud_init, .fini=ppud_fini); @@ -49,10 +49,23 @@ MPT_Test(qos, rwtopicdata, .init=ppud_init, .fini=ppud_fini); /* * Checks whether group_data QoS changes become visible in reader/writer. */ -#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", true, 10, RWUD_GROUPDATA) -#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", false, 0, RWUD_GROUPDATA) +#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", true, 10, RWUD_GROUPDATA, NULL) +#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", false, 0, RWUD_GROUPDATA, NULL) MPT_TestProcess(qos, rwgroupdata, a, rwud, TEST_A_ARGS); MPT_TestProcess(qos, rwgroupdata, b, rwud, TEST_B_ARGS); MPT_Test(qos, rwgroupdata, .init=ppud_init, .fini=ppud_fini); #undef TEST_A_ARGS #undef TEST_B_ARGS + +/* + * Checks whether topic_data QoS changes become visible in reader/writer, + * but doing so in 2 domains simultaneously -- the specified domain id, + * and the one immediately above that + */ +#define TEST_A_ARGS MPT_ArgValues(3, "rwtopicdataM", true, 10, RWUD_TOPICDATA) +#define TEST_B_ARGS MPT_ArgValues(3, "rwtopicdataM", false, 0, RWUD_TOPICDATA) +MPT_TestProcess(qos, rwtopicdataM, a, rwudM, TEST_A_ARGS); +MPT_TestProcess(qos, rwtopicdataM, b, rwudM, TEST_B_ARGS); +MPT_Test(qos, rwtopicdataM, .init=ppud_init, .fini=ppud_fini); +#undef TEST_A_ARGS +#undef TEST_B_ARGS diff --git a/src/mpt/tests/qos/procs/ppud.c b/src/mpt/tests/qos/procs/ppud.c index 31c8da4..25ae30a 100644 --- a/src/mpt/tests/qos/procs/ppud.c +++ b/src/mpt/tests/qos/procs/ppud.c @@ -18,6 +18,8 @@ #include "dds/dds.h" #include "dds/ddsrt/time.h" +#include "dds/ddsrt/threads.h" +#include "dds/ddsrt/sync.h" #include "dds/ddsrt/process.h" #include "dds/ddsrt/sockets.h" #include "dds/ddsrt/heap.h" @@ -161,16 +163,42 @@ MPT_ProcessEntry (ppud, #undef prefix } -static const char *exp_rwud[] = { - "a", "bc", "def", "" +static const char *exp_rwud[2][4] = { + { "a", "bc", "def", "" }, + { "p", "qr", "stu", "" } }; +struct rwud_barrier { + ddsrt_mutex_t lock; + ddsrt_cond_t cond; + int initcount; + int count; +}; + +static void barrierwait (struct rwud_barrier *barrier, int id) +{ + printf ("%d waiting at barrier\n", id); + fflush (stdout); + ddsrt_mutex_lock (&barrier->lock); + assert (barrier->initcount > 0); + if (barrier->count == 0) + barrier->count = barrier->initcount; + if (--barrier->count == 0) + ddsrt_cond_broadcast (&barrier->cond); + while (barrier->count > 0) + ddsrt_cond_wait (&barrier->cond, &barrier->lock); + ddsrt_mutex_unlock (&barrier->lock); + printf ("%d continuing past barrier\n", id); + fflush (stdout); +} + MPT_ProcessEntry (rwud, MPT_Args (dds_domainid_t domainid, const char *topic_name, bool master, unsigned ncycles, - enum rwud which)) + enum rwud which, + struct rwud_barrier *barrier)) { bool (*qget) (const dds_qos_t * __restrict qos, void **value, size_t *sz) = 0; void (*qset) (dds_qos_t * __restrict qos, const void *value, size_t sz) = 0; @@ -247,8 +275,14 @@ MPT_ProcessEntry (rwud, MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); } + if (barrier) + { + barrierwait (barrier, id); + } + bool done = false; bool synced = !master; + const unsigned exp_setindex = (unsigned) domainid % 2; unsigned exp_index = 0; unsigned exp_cycle = 0; dds_instance_handle_t peer = 0; @@ -290,7 +324,7 @@ MPT_ProcessEntry (rwud, id, exp_index, expusz, expud, usz, ud ? (char *) ud : "(null)"); MPT_ASSERT (eq, "%s mismatch: expected %u %zu/%s received %zu/%s\n", qname, exp_index, expusz, expud ? expud : "(null)", usz, ud ? (char *) ud : "(null)"); - if (++exp_index == sizeof (exp_rwud) / sizeof (exp_rwud[0])) + if (++exp_index == sizeof (exp_rwud[0]) / sizeof (exp_rwud[0][0])) { exp_index = 0; exp_cycle++; @@ -299,7 +333,7 @@ MPT_ProcessEntry (rwud, if (exp_cycle == ncycles) done = true; - expud = exp_rwud[exp_index]; + expud = exp_rwud[exp_setindex][exp_index]; expusz = strlen (expud); } else @@ -335,7 +369,80 @@ MPT_ProcessEntry (rwud, fflush (stdout); } dds_delete_qos (qos); + + if (barrier) + { + barrierwait (barrier, id); + } + rc = dds_delete (dp); MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n"); printf ("=== [Check(%d)] Done\n", id); } + +struct rwudM_thread_arg { + dds_domainid_t domainid; + const char *topic_name; + bool master; + unsigned ncycles; + enum rwud which; + mpt_retval_t retval; + struct rwud_barrier *barrier; + const mpt_data_t *mpt__args__; +}; + +static uint32_t rwudM_thread (void *varg) +{ + struct rwudM_thread_arg *arg = varg; + const mpt_data_t *mpt__args__ = arg->mpt__args__; + mpt_retval_t *mpt__retval__ = &arg->retval; + MPT_ProcessEntryName(rwud) (MPT_ArgValues (arg->domainid, arg->topic_name, arg->master, arg->ncycles, arg->which, arg->barrier)); + return 0; +} + +MPT_ProcessEntry (rwudM, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool master, + unsigned ncycles, + enum rwud which)) +{ + dds_return_t ret; + uint32_t dummy; + ddsrt_thread_t thr[2]; + ddsrt_threadattr_t attr; + struct rwud_barrier barrier; + struct rwudM_thread_arg a = { + .domainid = domainid, + .topic_name = topic_name, + .master = master, + .ncycles = ncycles, + .which = which, + .barrier = &barrier, + .retval = MPT_SUCCESS, + .mpt__args__ = mpt__args__ + }; + struct rwudM_thread_arg b; + b = a; ++b.domainid; + + ddsrt_mutex_init (&barrier.lock); + ddsrt_cond_init (&barrier.cond); + barrier.initcount = 2; + barrier.count = 0; + + ddsrt_threadattr_init (&attr); + ret = ddsrt_thread_create (&thr[0], "a", &attr, &rwudM_thread, &a); + MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to create thread a\n"); + ret = ddsrt_thread_create (&thr[1], "b", &attr, &rwudM_thread, &b); + MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to create thread b\n"); + ret = ddsrt_thread_join (thr[0], &dummy); + MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to join thread a\n"); + ret = ddsrt_thread_join (thr[1], &dummy); + MPT_ASSERT_FATAL_EQ (ret, DDS_RETCODE_OK, "failed to join thread b\n"); + /* forward thread failures to process failures */ + MPT_ASSERT_EQ (a.retval, MPT_SUCCESS, "thread a failed\n"); + MPT_ASSERT_EQ (b.retval, MPT_SUCCESS, "thread b failed\n"); + + ddsrt_cond_destroy (&barrier.cond); + ddsrt_mutex_destroy (&barrier.lock); +} diff --git a/src/mpt/tests/qos/procs/ppud.h b/src/mpt/tests/qos/procs/ppud.h index 8f8e012..598cc59 100644 --- a/src/mpt/tests/qos/procs/ppud.h +++ b/src/mpt/tests/qos/procs/ppud.h @@ -32,12 +32,22 @@ enum rwud { RWUD_TOPICDATA }; +struct rwud_barrier; + MPT_ProcessEntry (ppud, MPT_Args (dds_domainid_t domainid, bool master, unsigned ncycles)); MPT_ProcessEntry (rwud, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool master, + unsigned ncycles, + enum rwud which, + struct rwud_barrier *barrier)); + +MPT_ProcessEntry (rwudM, MPT_Args (dds_domainid_t domainid, const char *topic_name, bool master,