Liveliness local readers
This commit adds support for liveliness QoS when using local readers. The implementation for (liveliness) expiration of writers used here is similar to that used with proxy writers, and it also supports the three liveliness kinds (1) automatic, which is trivial when using a local reader and writer, (2) manual-by-participant and (3) manual-by-topic. In addition, these changes and fixes are included in this commit: - Fixed a bug in heartbeat handling in the reader: for manual-by- participant writers the lease was not updated on reception of a heartbeat message with liveliness flag set. This is fixed and a test-case is added. - Include the liveliness flag in a heartbeat message to the trace - Trace all lease renewals, including liveliness leases - Replaced liveliness changed state 'twitch' by 2 subsequent calls to the status callback - Added a test for liveliness duration 0 and 1ns (for both local and remote readers) Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
parent
bb76798492
commit
9410753076
9 changed files with 610 additions and 203 deletions
|
@ -254,9 +254,8 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE &&
|
||||
LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE &&
|
||||
LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE &&
|
||||
LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < LIVELINESS_CHANGED_TWITCH &&
|
||||
(uint32_t) LIVELINESS_CHANGED_TWITCH < UINT32_MAX);
|
||||
assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_TWITCH);
|
||||
(uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < UINT32_MAX);
|
||||
assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE);
|
||||
switch ((enum liveliness_changed_data_extra) data->extra)
|
||||
{
|
||||
case LIVELINESS_CHANGED_ADD_ALIVE:
|
||||
|
@ -287,8 +286,6 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
st->alive_count++;
|
||||
st->alive_count_change++;
|
||||
break;
|
||||
case LIVELINESS_CHANGED_TWITCH:
|
||||
break;
|
||||
}
|
||||
st->last_publication_handle = data->handle;
|
||||
invoke = (lst->on_liveliness_changed != 0);
|
||||
|
|
|
@ -153,10 +153,11 @@ CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = {
|
|||
#undef MT
|
||||
#undef MP
|
||||
#undef A
|
||||
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
|
||||
|
||||
static void test_pmd_count(dds_liveliness_kind_t kind, uint32_t ldur, double mult, bool remote_reader)
|
||||
{
|
||||
dds_entity_t pub_topic;
|
||||
dds_entity_t sub_topic;
|
||||
dds_entity_t sub_topic = 0;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t writer;
|
||||
seqno_t start_seqno, end_seqno;
|
||||
|
@ -169,9 +170,9 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
|
|||
dds_time_t t;
|
||||
|
||||
t = dds_time();
|
||||
printf("%d.%06d running test: kind %s, lease duration %d, delay %d\n",
|
||||
printf("%d.%06d running test: kind %s, lease duration %d, delay %d, %s reader\n",
|
||||
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000,
|
||||
kind == 0 ? "A" : "MP", ldur, (int32_t)(mult * ldur));
|
||||
kind == 0 ? "A" : "MP", ldur, (int32_t)(mult * ldur), remote_reader ? "remote" : "local");
|
||||
|
||||
/* wait for initial PMD to be sent by the participant */
|
||||
while (get_pmd_seqno(g_pub_participant) < 1)
|
||||
|
@ -180,17 +181,18 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
|
|||
/* topics */
|
||||
create_topic_name("ddsc_liveliness_pmd_count", g_topic_nr++, name, sizeof name);
|
||||
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
|
||||
/* reader */
|
||||
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
|
||||
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, rqos, NULL)) > 0);
|
||||
dds_delete_qos(rqos);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
|
||||
|
||||
/* waitset on reader */
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(remote_reader ? g_sub_participant : g_pub_participant)) > 0);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
|
||||
|
||||
/* writer */
|
||||
|
@ -221,12 +223,19 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
|
|||
CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult)
|
||||
|
||||
/* cleanup */
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
|
||||
}
|
||||
|
||||
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
|
||||
{
|
||||
test_pmd_count(kind, ldur, mult, false);
|
||||
test_pmd_count(kind, ldur, mult, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the expected number of proxy writers expires (set to not-alive)
|
||||
* after a certain delay for various combinations of writers with different
|
||||
|
@ -239,10 +248,11 @@ CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = {
|
|||
CU_DataPoints(uint32_t, 1, 1, 2, 2, 0, 0, 0, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */
|
||||
CU_DataPoints(uint32_t, 1, 1, 2, 2, 1, 1, 1, 1, 0, 1, 1, 2, 5, 0, 10), /* number of writers with manual-by-topic liveliness */
|
||||
};
|
||||
CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 120)
|
||||
|
||||
static void test_expire_liveliness_kinds(uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp, bool remote_reader)
|
||||
{
|
||||
dds_entity_t pub_topic;
|
||||
dds_entity_t sub_topic;
|
||||
dds_entity_t sub_topic = 0;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t *writers;
|
||||
dds_qos_t *rqos, *wqos_auto, *wqos_man_pp, *wqos_man_tp;
|
||||
|
@ -257,19 +267,20 @@ CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man
|
|||
do
|
||||
{
|
||||
tstart = dds_time();
|
||||
printf("%d.%06d running test: lease duration %d, delay %f, auto/man-by-part/man-by-topic %u/%u/%u\n",
|
||||
printf("%d.%06d running test: lease duration %d, delay %f, auto/man-by-part/man-by-topic %u/%u/%u\n, %s reader",
|
||||
(int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000,
|
||||
ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp);
|
||||
ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, remote_reader ? "remote" : "local");
|
||||
|
||||
/* topics */
|
||||
create_topic_name("ddsc_liveliness_expire_kinds", g_topic_nr++, name, sizeof name);
|
||||
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
|
||||
/* reader */
|
||||
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
|
||||
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, rqos, NULL)) > 0);
|
||||
dds_delete_qos(rqos);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
|
||||
|
||||
|
@ -281,7 +292,7 @@ CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man
|
|||
CU_ASSERT_FATAL((wqos_man_tp = dds_create_qos()) != NULL);
|
||||
dds_qset_liveliness(wqos_man_tp, DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur));
|
||||
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(remote_reader ? g_sub_participant : g_pub_participant)) > 0);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
|
||||
|
||||
writers = dds_alloc(wr_cnt * sizeof(dds_entity_t));
|
||||
|
@ -347,7 +358,8 @@ CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man
|
|||
for (n = 0; n < wr_cnt; n++)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
|
||||
dds_free(writers);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
|
||||
|
||||
|
@ -369,14 +381,21 @@ CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man
|
|||
} while (!test_finished);
|
||||
}
|
||||
|
||||
static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur, dds_entity_t *writer, dds_entity_t topic, dds_entity_t reader)
|
||||
CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 120)
|
||||
{
|
||||
test_expire_liveliness_kinds (ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, false);
|
||||
test_expire_liveliness_kinds (ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, true);
|
||||
}
|
||||
|
||||
|
||||
static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur, dds_entity_t *writer, dds_entity_t topic, dds_entity_t reader, bool remote_reader)
|
||||
{
|
||||
dds_entity_t waitset;
|
||||
dds_qos_t *wqos;
|
||||
dds_attach_t triggered;
|
||||
uint32_t status;
|
||||
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(remote_reader ? g_sub_participant : g_pub_participant)) > 0);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
|
||||
|
||||
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
|
||||
|
@ -424,22 +443,22 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
|
|||
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), get_ldur_config(g_pub_participant));
|
||||
|
||||
/* create writers and check pmd interval in publishing participant */
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(1000), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(1000), &writers[wr_cnt++], pub_topic, reader, true);
|
||||
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
|
||||
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader, true);
|
||||
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
|
||||
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader, true);
|
||||
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
|
||||
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(500), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(500), &writers[wr_cnt++], pub_topic, reader, true);
|
||||
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
|
||||
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader, true);
|
||||
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
|
||||
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader, true);
|
||||
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
|
||||
|
||||
/* cleanup */
|
||||
|
@ -454,10 +473,10 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
|
|||
/**
|
||||
* Check that the correct lease duration is set in the matched
|
||||
* publications in the readers. */
|
||||
CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini)
|
||||
static void test_lease_duration_pwr(bool remote_reader)
|
||||
{
|
||||
dds_entity_t pub_topic;
|
||||
dds_entity_t sub_topic;
|
||||
dds_entity_t sub_topic = 0;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t writer;
|
||||
char name[100];
|
||||
|
@ -467,15 +486,18 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
|
|||
uint32_t status;
|
||||
dds_duration_t ldur;
|
||||
|
||||
printf("running test lease_duration_pwr: %s reader\n", remote_reader ? "remote" : "local");
|
||||
|
||||
/* topics */
|
||||
create_topic_name("ddsc_liveliness_ldurpwr", 1, name, sizeof name);
|
||||
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
|
||||
/* reader */
|
||||
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
|
||||
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, rqos, NULL)) > 0);
|
||||
dds_delete_qos(rqos);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
|
||||
|
||||
|
@ -486,7 +508,7 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
|
|||
CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
|
||||
|
||||
/* wait for writer to be alive */
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(remote_reader ? g_sub_participant : g_pub_participant)) > 0);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
|
||||
|
@ -509,11 +531,18 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
|
|||
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini)
|
||||
{
|
||||
test_lease_duration_pwr(false);
|
||||
test_lease_duration_pwr(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a relative large number of writers with liveliness kinds automatic and
|
||||
* manual-by-participant and with decreasing lease duration, and check that all
|
||||
|
@ -521,10 +550,11 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
|
|||
* is deleted immediately after creating.
|
||||
*/
|
||||
#define MAX_WRITERS 100
|
||||
CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini, .timeout = 15)
|
||||
|
||||
static void test_create_delete_writer_stress(bool remote_reader)
|
||||
{
|
||||
dds_entity_t pub_topic;
|
||||
dds_entity_t sub_topic;
|
||||
dds_entity_t sub_topic = 0;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t writers[MAX_WRITERS];
|
||||
dds_entity_t waitset;
|
||||
|
@ -538,18 +568,21 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
|
|||
Space_Type1 sample = {0, 0, 0};
|
||||
int64_t ldur = 1000;
|
||||
|
||||
printf("running test create_delete_writer_stress: %s reader\n", remote_reader ? "remote" : "local");
|
||||
|
||||
/* topics */
|
||||
create_topic_name("ddsc_liveliness_wr_stress", 1, name, sizeof name);
|
||||
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
|
||||
/* reader and waitset */
|
||||
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
|
||||
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, rqos, NULL)) > 0);
|
||||
dds_delete_qos(rqos);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(remote_reader ? g_sub_participant : g_pub_participant)) > 0);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
|
||||
|
||||
/* create 1st writer and wait for it to become alive */
|
||||
|
@ -604,42 +637,53 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
|
|||
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini, .timeout = 15)
|
||||
{
|
||||
test_create_delete_writer_stress(false);
|
||||
test_create_delete_writer_stress(true);
|
||||
}
|
||||
#undef MAX_WRITERS
|
||||
|
||||
/**
|
||||
* Check the counts in liveliness_changed_status result.
|
||||
*/
|
||||
CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini)
|
||||
static void test_status_counts(bool remote_reader)
|
||||
{
|
||||
dds_entity_t pub_topic;
|
||||
dds_entity_t sub_topic;
|
||||
dds_entity_t sub_topic = 0;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t writer;
|
||||
dds_entity_t waitset;
|
||||
dds_qos_t *rqos;
|
||||
dds_qos_t *wqos;
|
||||
dds_attach_t triggered;
|
||||
struct dds_liveliness_changed_status lstatus;
|
||||
struct dds_liveliness_changed_status lcstatus;
|
||||
struct dds_liveliness_lost_status llstatus;
|
||||
struct dds_subscription_matched_status sstatus;
|
||||
char name[100];
|
||||
dds_duration_t ldur = DDS_MSECS(500);
|
||||
Space_Type1 sample = {1, 0, 0};
|
||||
|
||||
printf("running test status_counts: %s reader\n", remote_reader ? "remote" : "local");
|
||||
|
||||
/* topics */
|
||||
create_topic_name("ddsc_liveliness_status_counts", g_topic_nr++, name, sizeof name);
|
||||
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
|
||||
/* reader */
|
||||
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
|
||||
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, rqos, NULL)) > 0);
|
||||
dds_delete_qos(rqos);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
|
||||
CU_ASSERT_FATAL((waitset = dds_create_waitset(remote_reader ? g_sub_participant : g_pub_participant)) > 0);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
|
||||
|
||||
/* writer */
|
||||
|
@ -647,56 +691,72 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
|
|||
dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur);
|
||||
CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
|
||||
dds_delete_qos(wqos);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(writer, DDS_LIVELINESS_LOST_STATUS), DDS_RETCODE_OK);
|
||||
|
||||
/* wait for writer to be alive */
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
|
||||
|
||||
/* check status counts before proxy writer is expired */
|
||||
dds_get_liveliness_changed_status(reader, &lstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
|
||||
dds_get_liveliness_changed_status(reader, &lcstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(lcstatus.alive_count, 1);
|
||||
dds_get_subscription_matched_status(reader, &sstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
|
||||
dds_get_liveliness_lost_status(writer, &llstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(llstatus.total_count, 0);
|
||||
|
||||
/* sleep for more than lease duration, writer should be set not-alive but subscription still matched */
|
||||
dds_sleepfor(ldur + DDS_MSECS(100));
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
|
||||
|
||||
dds_get_liveliness_changed_status(reader, &lstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0);
|
||||
dds_get_liveliness_changed_status(reader, &lcstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(lcstatus.alive_count, 0);
|
||||
dds_get_subscription_matched_status(reader, &sstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
|
||||
dds_get_liveliness_lost_status(writer, &llstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(llstatus.total_count, 1);
|
||||
CU_ASSERT_EQUAL_FATAL(llstatus.total_count_change, 1);
|
||||
|
||||
/* write sample and re-check status counts */
|
||||
dds_write(writer, &sample);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
|
||||
|
||||
dds_get_liveliness_changed_status(reader, &lstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
|
||||
dds_get_liveliness_changed_status(reader, &lcstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(lcstatus.alive_count, 1);
|
||||
dds_get_subscription_matched_status(reader, &sstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
|
||||
dds_get_liveliness_lost_status(writer, &llstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(llstatus.total_count_change, 0);
|
||||
|
||||
/* cleanup */
|
||||
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini)
|
||||
{
|
||||
test_status_counts(false);
|
||||
test_status_counts(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that dds_assert_liveliness works as expected for liveliness
|
||||
* kinds manual-by-participant and manual-by-topic.
|
||||
*/
|
||||
#define MAX_WRITERS 100
|
||||
CU_TheoryDataPoints(ddsc_liveliness, assert_liveliness) = {
|
||||
CU_DataPoints(uint32_t, 1, 0, 0, 1), /* number of writers with automatic liveliness */
|
||||
CU_DataPoints(uint32_t, 1, 1, 0, 0), /* number of writers with manual-by-participant liveliness */
|
||||
CU_DataPoints(uint32_t, 1, 1, 1, 2), /* number of writers with manual-by-topic liveliness */
|
||||
CU_DataPoints(uint32_t, 1, 0, 0, 1, 0, 1, 2), /* number of writers with automatic liveliness */
|
||||
CU_DataPoints(uint32_t, 0, 1, 0, 1, 1, 0, 2), /* number of writers with manual-by-participant liveliness */
|
||||
CU_DataPoints(uint32_t, 0, 0, 1, 1, 2, 2, 0), /* number of writers with manual-by-topic liveliness */
|
||||
};
|
||||
CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, assert_liveliness, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60)
|
||||
|
||||
static void test_assert_liveliness(uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp, bool remote_reader)
|
||||
{
|
||||
dds_entity_t pub_topic, sub_topic, reader, writers[MAX_WRITERS];
|
||||
dds_entity_t pub_topic, sub_topic = 0, reader, writers[MAX_WRITERS];
|
||||
dds_qos_t *rqos;
|
||||
struct dds_liveliness_changed_status lstatus;
|
||||
char name[100];
|
||||
|
@ -708,29 +768,30 @@ CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp)
|
|||
{
|
||||
wr_cnt = 0;
|
||||
assert(wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp < MAX_WRITERS);
|
||||
printf("running test assert_liveliness: auto/man-by-part/man-by-topic %u/%u/%u with ldur %d\n",
|
||||
wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, ldur);
|
||||
printf("running test assert_liveliness: auto/man-by-part/man-by-topic %u/%u/%u with ldur %d, %s reader\n",
|
||||
wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, ldur, remote_reader ? "remote" : "local");
|
||||
|
||||
/* topics */
|
||||
create_topic_name("ddsc_liveliness_assert", g_topic_nr++, name, sizeof name);
|
||||
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
|
||||
|
||||
/* reader */
|
||||
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
|
||||
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
|
||||
CU_ASSERT_FATAL((reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, rqos, NULL)) > 0);
|
||||
dds_delete_qos(rqos);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
|
||||
|
||||
/* writers */
|
||||
for (size_t n = 0; n < wr_cnt_auto; n++)
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader, remote_reader);
|
||||
tstart = dds_time();
|
||||
for (size_t n = 0; n < wr_cnt_man_pp; n++)
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader, remote_reader);
|
||||
for (size_t n = 0; n < wr_cnt_man_tp; n++)
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader);
|
||||
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader, remote_reader);
|
||||
t = dds_time();
|
||||
if (t - tstart > DDS_MSECS(0.5 * ldur))
|
||||
{
|
||||
|
@ -750,13 +811,13 @@ CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp)
|
|||
stopped = 0;
|
||||
do
|
||||
{
|
||||
for (size_t n = wr_cnt - wr_cnt_man_tp; n < wr_cnt; n++)
|
||||
dds_assert_liveliness(writers[n]);
|
||||
for (size_t n = wr_cnt_auto; n < wr_cnt; n++)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_assert_liveliness(writers[n]), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
|
||||
stopped += (uint32_t)lstatus.not_alive_count_change;
|
||||
dds_sleepfor(DDS_MSECS(50));
|
||||
} while (dds_time() < tstop);
|
||||
dds_get_liveliness_changed_status(reader, &lstatus);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
|
||||
printf("writers alive with dds_assert_liveliness on all writers: %d, writers stopped: %d\n", lstatus.alive_count, stopped);
|
||||
if (lstatus.alive_count != wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp || stopped != 0)
|
||||
{
|
||||
|
@ -795,7 +856,8 @@ CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp)
|
|||
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
|
||||
for (size_t n = 0; n < wr_cnt; n++)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
if (remote_reader)
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
|
||||
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
|
||||
|
||||
if (!test_finished)
|
||||
|
@ -813,6 +875,12 @@ CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp)
|
|||
}
|
||||
} while (!test_finished);
|
||||
}
|
||||
|
||||
CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, assert_liveliness, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60)
|
||||
{
|
||||
test_assert_liveliness(wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, false);
|
||||
test_assert_liveliness(wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, true);
|
||||
}
|
||||
#undef MAX_WRITERS
|
||||
|
||||
/**
|
||||
|
@ -883,31 +951,47 @@ static void liveliness_changed_listener (dds_entity_t rd, const dds_liveliness_c
|
|||
st->weirdness = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
printf ("liveliness_changed_listener: alive_count_change = 0 && not_alive_count_change = 0\n");
|
||||
st->weirdness = true;
|
||||
}
|
||||
ddsrt_mutex_unlock (&st->lock);
|
||||
}
|
||||
|
||||
static bool get_and_check_status (dds_entity_t reader, dds_entity_t writer_active)
|
||||
#define STATUS_UNSYNCED 0
|
||||
#define STATUS_SYNCED 1
|
||||
#define STATUS_DATA 2
|
||||
static unsigned get_and_check_status (dds_entity_t reader, dds_entity_t writer_active)
|
||||
{
|
||||
struct dds_liveliness_changed_status lstatus;
|
||||
struct dds_subscription_matched_status sstatus;
|
||||
struct dds_publication_matched_status pstatus;
|
||||
uint32_t dstatus;
|
||||
uint32_t result = STATUS_UNSYNCED;
|
||||
dds_return_t rc;
|
||||
rc = dds_get_subscription_matched_status(reader, &sstatus);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
rc = dds_get_liveliness_changed_status(reader, &lstatus);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
rc = dds_take_status(reader, &dstatus, DDS_DATA_AVAILABLE_STATUS);
|
||||
CU_ASSERT_EQUAL_FATAL(rc, DDS_RETCODE_OK);
|
||||
rc = dds_get_publication_matched_status(writer_active, &pstatus);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
CU_ASSERT_FATAL(lstatus.alive_count + lstatus.not_alive_count <= 2);
|
||||
printf ("%d %d %d %d\n", (int)sstatus.current_count, (int)lstatus.alive_count, (int)lstatus.not_alive_count, (int)pstatus.current_count);
|
||||
return (sstatus.current_count == 2 && lstatus.not_alive_count == 2 && pstatus.current_count == 1);
|
||||
printf ("sub %d | alive %d | not-alive %d | pub %d | data %d\n", (int)sstatus.current_count, (int)lstatus.alive_count, (int)lstatus.not_alive_count, (int)pstatus.current_count, dstatus != 0);
|
||||
if (dstatus)
|
||||
result |= STATUS_DATA;
|
||||
if (sstatus.current_count == 2 && lstatus.not_alive_count == 2 && pstatus.current_count == 1)
|
||||
result |= STATUS_SYNCED;
|
||||
return result;
|
||||
}
|
||||
|
||||
static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur)
|
||||
static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader)
|
||||
{
|
||||
const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 100 : 10;
|
||||
const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 50 : 5;
|
||||
dds_entity_t pub_topic;
|
||||
dds_entity_t sub_topic;
|
||||
dds_entity_t sub_topic = 0;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t writer_active; /* writing */
|
||||
dds_entity_t writer_inactive; /* not writing, liveliness should still toggle */
|
||||
|
@ -937,14 +1021,17 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
|
|||
create_topic_name("ddsc_liveliness_lease_duration_zero", g_topic_nr++, name, sizeof name);
|
||||
pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, qos, NULL);
|
||||
CU_ASSERT_FATAL(pub_topic > 0);
|
||||
sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, qos, NULL);
|
||||
CU_ASSERT_FATAL(sub_topic > 0);
|
||||
if (remote_reader)
|
||||
{
|
||||
sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, qos, NULL);
|
||||
CU_ASSERT_FATAL(sub_topic > 0);
|
||||
}
|
||||
|
||||
/* reader liveliness is always automatic/infinity */
|
||||
dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
|
||||
reader = dds_create_reader(g_sub_participant, sub_topic, qos, NULL);
|
||||
reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, qos, NULL);
|
||||
CU_ASSERT_FATAL(reader > 0);
|
||||
rc = dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS);
|
||||
rc = dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
rc = dds_waitset_attach(waitset, reader, reader);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
|
@ -964,15 +1051,29 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
|
|||
|
||||
/* wait for writers to be discovered and to have lost their liveliness, and for
|
||||
writer_active to have discovered the reader */
|
||||
while (!get_and_check_status (reader, writer_active))
|
||||
unsigned status = STATUS_UNSYNCED;
|
||||
bool initial_sample_written = false, initial_sample_received = false;
|
||||
do
|
||||
{
|
||||
status = get_and_check_status (reader, writer_active);
|
||||
if (status & STATUS_DATA)
|
||||
initial_sample_received = true;
|
||||
if (status & STATUS_SYNCED && !initial_sample_written)
|
||||
{
|
||||
rc = dds_write(writer_active, &sample);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
initial_sample_written = true;
|
||||
}
|
||||
if (status & STATUS_SYNCED && initial_sample_received)
|
||||
break;
|
||||
|
||||
rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(5));
|
||||
if (rc < 1)
|
||||
{
|
||||
get_and_check_status (reader, writer_active);
|
||||
CU_ASSERT_FATAL(rc >= 1);
|
||||
}
|
||||
}
|
||||
} while (1);
|
||||
|
||||
/* switch to using a listener: those allow us to observe all events */
|
||||
listener = dds_create_listener (&listener_state);
|
||||
|
@ -1002,7 +1103,7 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
|
|||
uint32_t cnt = 0;
|
||||
while (dds_take(reader, raw, &si, 1, 1) == 1)
|
||||
cnt++;
|
||||
CU_ASSERT(cnt == nsamples);
|
||||
CU_ASSERT(cnt == nsamples + 1);
|
||||
}
|
||||
|
||||
/* transition to not alive is not necessarily immediate */
|
||||
|
@ -1028,21 +1129,15 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
|
|||
printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count);
|
||||
printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive);
|
||||
CU_ASSERT(listener_state.w0_alive == listener_state.w0_not_alive);
|
||||
uint32_t exp_alive;
|
||||
if (sleep == 0)
|
||||
{
|
||||
/* if not sleeping, it's ok if the transition happens only once */
|
||||
CU_ASSERT(listener_state.w0_alive > 0);
|
||||
}
|
||||
exp_alive = 1; /* if not sleeping, it's ok if the transition happens only once */
|
||||
else if (sleep <= DDS_MSECS(10))
|
||||
{
|
||||
/* if sleeping briefly, expect the a good number of writes to toggle liveliness */
|
||||
CU_ASSERT(listener_state.w0_alive >= nsamples / 3);
|
||||
}
|
||||
exp_alive = nsamples / 3; /* if sleeping briefly, expect the a good number of writes to toggle liveliness */
|
||||
else
|
||||
{
|
||||
/* if sleeping, expect the vast majority (90%) of the writes to toggle liveliness */
|
||||
CU_ASSERT(listener_state.w0_alive >= nsamples - nsamples / 10);
|
||||
}
|
||||
exp_alive = nsamples - nsamples / 5; /* if sleeping, expect the vast majority (80%) of the writes to toggle liveliness */
|
||||
printf("check w0_alive %d >= %d\n", listener_state.w0_alive, exp_alive);
|
||||
CU_ASSERT(listener_state.w0_alive >= exp_alive);
|
||||
ddsrt_mutex_unlock(&listener_state.lock);
|
||||
}
|
||||
|
||||
|
@ -1055,8 +1150,11 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
|
|||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
rc = dds_delete(writer_inactive);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
rc = dds_delete(sub_topic);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
if (remote_reader)
|
||||
{
|
||||
rc = dds_delete(sub_topic);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
}
|
||||
rc = dds_delete(pub_topic);
|
||||
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
|
||||
|
||||
|
@ -1065,20 +1163,26 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
|
|||
|
||||
CU_Test(ddsc_liveliness, lease_duration_zero_or_one, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
|
||||
{
|
||||
static const bool remote_rd[] = { false, true };
|
||||
static const dds_duration_t sleep[] = { 0, DDS_MSECS(10), DDS_MSECS(100) };
|
||||
static const dds_liveliness_kind_t lkind[] = { DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_LIVELINESS_MANUAL_BY_TOPIC };
|
||||
static const dds_duration_t ldur[] = { 0, 1 };
|
||||
for (size_t sleep_idx = 0; sleep_idx < sizeof (sleep) / sizeof (sleep[0]); sleep_idx++)
|
||||
for (size_t remote_rd_idx = 0; remote_rd_idx < sizeof (remote_rd) / sizeof (remote_rd[0]); remote_rd_idx++)
|
||||
{
|
||||
for (size_t lkind_idx = 0; lkind_idx < sizeof (lkind) / sizeof (lkind[0]); lkind_idx++)
|
||||
for (size_t sleep_idx = 0; sleep_idx < sizeof (sleep) / sizeof (sleep[0]); sleep_idx++)
|
||||
{
|
||||
for (size_t ldur_idx = 0; ldur_idx < sizeof (ldur) / sizeof (ldur[0]); ldur_idx++)
|
||||
for (size_t lkind_idx = 0; lkind_idx < sizeof (lkind) / sizeof (lkind[0]); lkind_idx++)
|
||||
{
|
||||
dds_duration_t s = sleep[sleep_idx];
|
||||
dds_liveliness_kind_t k = lkind[lkind_idx];
|
||||
dds_duration_t d = ldur[ldur_idx];
|
||||
printf ("lease_duration_zero_or_one: sleep = %"PRId64" lkind = %d ldur = %"PRId64"\n", s, (int) k, d);
|
||||
lease_duration_zero_or_one_impl (s, k, d);
|
||||
for (size_t ldur_idx = 0; ldur_idx < sizeof (ldur) / sizeof (ldur[0]); ldur_idx++)
|
||||
{
|
||||
bool rrd = remote_rd[remote_rd_idx];
|
||||
dds_duration_t s = sleep[sleep_idx];
|
||||
dds_liveliness_kind_t k = lkind[lkind_idx];
|
||||
dds_duration_t d = ldur[ldur_idx];
|
||||
printf ("### lease_duration_zero_or_one: sleep = %"PRId64" lkind = %d ldur = %"PRId64" reader = %s\n", s, (int) k, d, rrd ? "remote" : "local");
|
||||
lease_duration_zero_or_one_impl (s, k, d, rrd);
|
||||
printf ("\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue