diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c index cf876aa..32c5b3a 100644 --- a/src/core/ddsc/tests/liveliness.c +++ b/src/core/ddsc/tests/liveliness.c @@ -44,44 +44,44 @@ static dds_entity_t g_sub_subscriber = 0; static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size_t size) { - /* Get unique g_topic name. */ - ddsrt_pid_t pid = ddsrt_getpid(); - ddsrt_tid_t tid = ddsrt_gettid(); - (void)snprintf(name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid); - return name; + /* Get unique g_topic name. */ + ddsrt_pid_t pid = ddsrt_getpid(); + ddsrt_tid_t tid = ddsrt_gettid(); + (void)snprintf(name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid); + return name; } static void liveliness_init(void) { - /* Domains for pub and sub use a different domain id, but the portgain setting + /* Domains for pub and sub use a different domain id, but the portgain setting * in configuration is 0, so that both domains will map to the same port number. * This allows to create two domains in a single test process. */ - char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB); - char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB); - g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub); - g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub); - dds_free(conf_pub); - dds_free(conf_sub); + char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB); + char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB); + g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub); + g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub); + dds_free(conf_pub); + dds_free(conf_sub); - g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL); - CU_ASSERT_FATAL(g_pub_participant > 0); - g_sub_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL); - CU_ASSERT_FATAL(g_sub_participant > 0); + g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL); + CU_ASSERT_FATAL(g_pub_participant > 0); + g_sub_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL); + CU_ASSERT_FATAL(g_sub_participant > 0); - g_pub_publisher = dds_create_publisher(g_pub_participant, NULL, NULL); - CU_ASSERT_FATAL(g_pub_publisher > 0); - g_sub_subscriber = dds_create_subscriber(g_sub_participant, NULL, NULL); - CU_ASSERT_FATAL(g_sub_subscriber > 0); + g_pub_publisher = dds_create_publisher(g_pub_participant, NULL, NULL); + CU_ASSERT_FATAL(g_pub_publisher > 0); + g_sub_subscriber = dds_create_subscriber(g_sub_participant, NULL, NULL); + CU_ASSERT_FATAL(g_sub_subscriber > 0); } static void liveliness_fini(void) { - dds_delete(g_sub_subscriber); - dds_delete(g_pub_publisher); - dds_delete(g_sub_participant); - dds_delete(g_pub_participant); - dds_delete(g_sub_domain); - dds_delete(g_pub_domain); + dds_delete(g_sub_subscriber); + dds_delete(g_pub_publisher); + dds_delete(g_sub_participant); + dds_delete(g_pub_participant); + dds_delete(g_sub_domain); + dds_delete(g_pub_domain); } /** @@ -91,20 +91,20 @@ static void liveliness_fini(void) */ static seqno_t get_pmd_seqno(dds_entity_t participant) { - seqno_t seqno; - struct dds_entity *pp_entity; - struct participant *pp; - struct writer *wr; - CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0); - thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv); - pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid); - wr = get_builtin_writer(pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER); - CU_ASSERT_FATAL(wr != NULL); - assert(wr != NULL); /* for Clang's static analyzer */ - seqno = wr->seq; - thread_state_asleep(lookup_thread_state()); - dds_entity_unpin(pp_entity); - return seqno; + seqno_t seqno; + struct dds_entity *pp_entity; + struct participant *pp; + struct writer *wr; + CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0); + thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv); + pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid); + wr = get_builtin_writer(pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER); + CU_ASSERT_FATAL(wr != NULL); + assert(wr != NULL); /* for Clang's static analyzer */ + seqno = wr->seq; + thread_state_asleep(lookup_thread_state()); + dds_entity_unpin(pp_entity); + return seqno; } /** @@ -112,16 +112,16 @@ static seqno_t get_pmd_seqno(dds_entity_t participant) */ static dds_duration_t get_pmd_interval(dds_entity_t participant) { - dds_duration_t intv; - struct dds_entity *pp_entity; - struct participant *pp; - CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0); - thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv); - pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid); - intv = pp_get_pmd_interval(pp); - thread_state_asleep(lookup_thread_state()); - dds_entity_unpin(pp_entity); - return intv; + dds_duration_t intv; + struct dds_entity *pp_entity; + struct participant *pp; + CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0); + thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv); + pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid); + intv = pp_get_pmd_interval(pp); + thread_state_asleep(lookup_thread_state()); + dds_entity_unpin(pp_entity); + return intv; } /** @@ -129,15 +129,14 @@ static dds_duration_t get_pmd_interval(dds_entity_t participant) */ static dds_duration_t get_ldur_config(dds_entity_t participant) { - struct dds_entity *pp_entity; - dds_duration_t ldur; - CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0); - ldur = (dds_duration_t)pp_entity->m_domain->gv.config.lease_duration; - dds_entity_unpin(pp_entity); - return ldur; + struct dds_entity *pp_entity; + dds_duration_t ldur; + CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0); + ldur = (dds_duration_t)pp_entity->m_domain->gv.config.lease_duration; + dds_entity_unpin(pp_entity); + return ldur; } - /** * Test that the correct number of PMD messages is sent for * the various liveliness kinds. @@ -146,85 +145,85 @@ static dds_duration_t get_ldur_config(dds_entity_t participant) #define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT #define MT DDS_LIVELINESS_MANUAL_BY_TOPIC CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = { - CU_DataPoints(dds_liveliness_kind_t, A, A, MP, MT), /* liveliness kind */ - CU_DataPoints(uint32_t, 200, 500, 100, 100), /* lease duration */ - CU_DataPoints(double, 10, 5, 5, 5), /* delay (n times lease duration) */ + CU_DataPoints(dds_liveliness_kind_t, A, A, MP, MT), /* liveliness kind */ + CU_DataPoints(uint32_t, 200, 500, 100, 100), /* lease duration */ + CU_DataPoints(double, 10, 5, 5, 5), /* delay (n times lease duration) */ }; #undef MT #undef MP #undef A CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30) { - dds_entity_t pub_topic; - dds_entity_t sub_topic; - dds_entity_t reader; - dds_entity_t writer; - seqno_t start_seqno, end_seqno; - dds_qos_t *rqos; - dds_qos_t *wqos; - dds_entity_t waitset; - dds_attach_t triggered; - uint32_t status; - char name[100]; - dds_time_t t; + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writer; + seqno_t start_seqno, end_seqno; + dds_qos_t *rqos; + dds_qos_t *wqos; + dds_entity_t waitset; + dds_attach_t triggered; + uint32_t status; + char name[100]; + dds_time_t t; - t = dds_time(); - printf("%d.%06d running test: kind %s, lease duration %d, delay %d\n", - (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, - kind == 0 ? "A" : "MP", ldur, (int32_t)(mult * ldur)); + t = dds_time(); + printf("%d.%06d running test: kind %s, lease duration %d, delay %d\n", + (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, + kind == 0 ? "A" : "MP", ldur, (int32_t)(mult * ldur)); - /* wait for initial PMD to be sent by the participant */ - while (get_pmd_seqno(g_pub_participant) < 1) - dds_sleepfor (DDS_MSECS(50)); + /* wait for initial PMD to be sent by the participant */ + while (get_pmd_seqno(g_pub_participant) < 1) + dds_sleepfor(DDS_MSECS(50)); - /* topics */ - create_topic_name("ddsc_liveliness_pmd_count", g_topic_nr++, name, sizeof name); - CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + /* topics */ + create_topic_name("ddsc_liveliness_pmd_count", g_topic_nr++, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - /* reader */ - CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); - dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); - dds_delete_qos(rqos); - CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* reader */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - /* waitset on reader */ - CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + /* waitset on reader */ + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); - /* writer */ - CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos, kind, DDS_MSECS(ldur)); - CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); - dds_delete_qos(wqos); + /* writer */ + CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos, kind, DDS_MSECS(ldur)); + CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + dds_delete_qos(wqos); - /* wait for writer to be alive */ - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(1)), 1); - CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* wait for writer to be alive */ + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(1)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - /* check no of PMD messages sent */ - start_seqno = get_pmd_seqno(g_pub_participant); - dds_sleepfor(DDS_MSECS((dds_duration_t)(mult * ldur))); - end_seqno = get_pmd_seqno(g_pub_participant); + /* check no of PMD messages sent */ + start_seqno = get_pmd_seqno(g_pub_participant); + dds_sleepfor(DDS_MSECS((dds_duration_t)(mult * ldur))); + end_seqno = get_pmd_seqno(g_pub_participant); - t = dds_time(); - printf("%d.%06d PMD sequence no: start %" PRId64 " -> end %" PRId64 "\n", - (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, - start_seqno, end_seqno); + t = dds_time(); + printf("%d.%06d PMD sequence no: start %" PRId64 " -> end %" PRId64 "\n", + (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, + start_seqno, end_seqno); - /* End-start should be mult - 1 under ideal circumstances, but consider the test successful + /* End-start should be mult - 1 under ideal circumstances, but consider the test successful when at least 50% of the expected PMD's was sent. This checks that the frequency for sending PMDs was increased when the writer was added. */ - CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? (50 * (mult - 1)) / 100 : 0)) - if (kind != DDS_LIVELINESS_AUTOMATIC) - CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult) + CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? (50 * (mult - 1)) / 100 : 0)) + if (kind != DDS_LIVELINESS_AUTOMATIC) + CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult) - /* cleanup */ - CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + /* cleanup */ + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); } /** @@ -233,163 +232,163 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin * liveliness kinds. */ CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = { - CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration for initial test run (increased for each retry when test fails) */ - CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */ - CU_DataPoints(uint32_t, 1, 0, 2, 0, 1, 0, 0, 1, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */ - CU_DataPoints(uint32_t, 1, 1, 2, 2, 0, 0, 0, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */ - CU_DataPoints(uint32_t, 1, 1, 2, 2, 1, 1, 1, 1, 0, 1, 1, 2, 5, 0, 10), /* number of writers with manual-by-topic liveliness */ + CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration for initial test run (increased for each retry when test fails) */ + CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */ + CU_DataPoints(uint32_t, 1, 0, 2, 0, 1, 0, 0, 1, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */ + CU_DataPoints(uint32_t, 1, 1, 2, 2, 0, 0, 0, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */ + CU_DataPoints(uint32_t, 1, 1, 2, 2, 1, 1, 1, 1, 0, 1, 1, 2, 5, 0, 10), /* number of writers with manual-by-topic liveliness */ }; CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 120) { - dds_entity_t pub_topic; - dds_entity_t sub_topic; - dds_entity_t reader; - dds_entity_t *writers; - dds_qos_t *rqos, *wqos_auto, *wqos_man_pp, *wqos_man_tp; - dds_entity_t waitset; - dds_attach_t triggered; - struct dds_liveliness_changed_status lstatus; - uint32_t status, n, run = 1, wr_cnt = wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp; - char name[100]; - dds_time_t tstart, t; - bool test_finished = false; + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t *writers; + dds_qos_t *rqos, *wqos_auto, *wqos_man_pp, *wqos_man_tp; + dds_entity_t waitset; + dds_attach_t triggered; + struct dds_liveliness_changed_status lstatus; + uint32_t status, n, run = 1, wr_cnt = wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp; + char name[100]; + dds_time_t tstart, t; + bool test_finished = false; - do - { - tstart = dds_time(); - printf("%d.%06d running test: lease duration %d, delay %f, auto/man-by-part/man-by-topic %u/%u/%u\n", - (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000, - ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp); + do + { + tstart = dds_time(); + printf("%d.%06d running test: lease duration %d, delay %f, auto/man-by-part/man-by-topic %u/%u/%u\n", + (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000, + ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp); - /* topics */ - create_topic_name("ddsc_liveliness_expire_kinds", g_topic_nr++, name, sizeof name); - CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + /* topics */ + create_topic_name("ddsc_liveliness_expire_kinds", g_topic_nr++, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - /* reader */ - CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); - dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); - dds_delete_qos(rqos); - CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* reader */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - /* writers */ - CU_ASSERT_FATAL((wqos_auto = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos_auto, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur)); - CU_ASSERT_FATAL((wqos_man_pp = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos_man_pp, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur)); - CU_ASSERT_FATAL((wqos_man_tp = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos_man_tp, DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur)); + /* writers */ + CU_ASSERT_FATAL((wqos_auto = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos_auto, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur)); + CU_ASSERT_FATAL((wqos_man_pp = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos_man_pp, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur)); + CU_ASSERT_FATAL((wqos_man_tp = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos_man_tp, DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur)); - CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); - writers = dds_alloc(wr_cnt * sizeof(dds_entity_t)); - for (n = 0; n < wr_cnt; n++) - { - dds_qos_t *wqos; - wqos = n < wr_cnt_auto ? wqos_auto : (n < (wr_cnt_auto + wr_cnt_man_pp) ? wqos_man_pp : wqos_man_tp); - CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); - CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - } - dds_delete_qos(wqos_auto); - dds_delete_qos(wqos_man_pp); - dds_delete_qos(wqos_man_tp); + writers = dds_alloc(wr_cnt * sizeof(dds_entity_t)); + for (n = 0; n < wr_cnt; n++) + { + dds_qos_t *wqos; + wqos = n < wr_cnt_auto ? wqos_auto : (n < (wr_cnt_auto + wr_cnt_man_pp) ? wqos_man_pp : wqos_man_tp); + CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + } + dds_delete_qos(wqos_auto); + dds_delete_qos(wqos_man_pp); + dds_delete_qos(wqos_man_tp); - t = dds_time(); - if (t - tstart > DDS_MSECS(0.5 * ldur)) - { - ldur *= 10 / (run + 1); - printf("%d.%06d failed to create writers in time\n", - (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); - } - else - { - /* check alive count before proxy writers are expired */ - dds_get_liveliness_changed_status(reader, &lstatus); - printf("%d.%06d writers alive: %d\n", (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, lstatus.alive_count); - CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt); + t = dds_time(); + if (t - tstart > DDS_MSECS(0.5 * ldur)) + { + ldur *= 10 / (run + 1); + printf("%d.%06d failed to create writers in time\n", + (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); + } + else + { + /* check alive count before proxy writers are expired */ + dds_get_liveliness_changed_status(reader, &lstatus); + printf("%d.%06d writers alive: %d\n", (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, lstatus.alive_count); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt); - dds_time_t tstop = tstart + DDS_MSECS((dds_duration_t)(mult * ldur)); - uint32_t stopped = 0; - do - { - dds_duration_t w = tstop - dds_time(); - CU_ASSERT_FATAL((dds_waitset_wait(waitset, &triggered, 1, w > 0 ? w : 0)) >= 0); - CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); - stopped += (uint32_t)lstatus.not_alive_count_change; - } while (dds_time() < tstop); - t = dds_time(); - printf("%d.%06d writers stopped: %u\n", - (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, stopped); + dds_time_t tstop = tstart + DDS_MSECS((dds_duration_t)(mult * ldur)); + uint32_t stopped = 0; + do + { + dds_duration_t w = tstop - dds_time(); + CU_ASSERT_FATAL((dds_waitset_wait(waitset, &triggered, 1, w > 0 ? w : 0)) >= 0); + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); + stopped += (uint32_t)lstatus.not_alive_count_change; + } while (dds_time() < tstop); + t = dds_time(); + printf("%d.%06d writers stopped: %u\n", + (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, stopped); - size_t exp_stopped = mult < 1 ? 0 : (wr_cnt_man_pp + wr_cnt_man_tp); - if (stopped != exp_stopped) - { - ldur *= 10 / (run + 1); - printf("%d.%06d incorrect number of stopped writers\n", - (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); - } - else - { - /* check alive count */ - CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); - CU_ASSERT_EQUAL(lstatus.alive_count, mult < 1 ? wr_cnt : wr_cnt_auto); - test_finished = true; - } - } + size_t exp_stopped = mult < 1 ? 0 : (wr_cnt_man_pp + wr_cnt_man_tp); + if (stopped != exp_stopped) + { + ldur *= 10 / (run + 1); + printf("%d.%06d incorrect number of stopped writers\n", + (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); + } + else + { + /* check alive count */ + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); + CU_ASSERT_EQUAL(lstatus.alive_count, mult < 1 ? wr_cnt : wr_cnt_auto); + test_finished = true; + } + } - /* cleanup */ - CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); + /* cleanup */ + CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); - for (n = 0; n < wr_cnt; n++) - CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); - dds_free(writers); - CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + for (n = 0; n < wr_cnt; n++) + CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); + dds_free(writers); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); - if (!test_finished) - { - if (++run > 3) - { - printf("%d.%06d run limit reached, test failed\n", (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000); - CU_FAIL_FATAL("Run limit reached"); - test_finished = true; - continue; - } - else - { - printf("%d.%06d restarting test with ldur %d\n", - (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, ldur); - } - } - } while (!test_finished); + if (!test_finished) + { + if (++run > 3) + { + printf("%d.%06d run limit reached, test failed\n", (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000); + CU_FAIL_FATAL("Run limit reached"); + test_finished = true; + continue; + } + else + { + printf("%d.%06d restarting test with ldur %d\n", + (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, ldur); + } + } + } while (!test_finished); } static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur, dds_entity_t *writer, dds_entity_t topic, dds_entity_t reader) { - dds_entity_t waitset; - dds_qos_t *wqos; - dds_attach_t triggered; - uint32_t status; + dds_entity_t waitset; + dds_qos_t *wqos; + dds_attach_t triggered; + uint32_t status; - CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); - CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos, kind, ldur); - CU_ASSERT_FATAL((*writer = dds_create_writer(g_pub_participant, topic, wqos, NULL)) > 0); - dds_delete_qos(wqos); + CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos, kind, ldur); + CU_ASSERT_FATAL((*writer = dds_create_writer(g_pub_participant, topic, wqos, NULL)) > 0); + dds_delete_qos(wqos); - /* wait for writer to be alive */ - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); - CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* wait for writer to be alive */ + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); } /** @@ -399,55 +398,55 @@ static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur #define MAX_WRITERS 10 CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveliness_fini) { - dds_entity_t pub_topic; - dds_entity_t sub_topic; - dds_entity_t reader; - dds_entity_t writers[MAX_WRITERS]; - uint32_t wr_cnt = 0; - char name[100]; - dds_qos_t *rqos; - uint32_t n; + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writers[MAX_WRITERS]; + uint32_t wr_cnt = 0; + char name[100]; + dds_qos_t *rqos; + uint32_t n; - /* topics */ - create_topic_name("ddsc_liveliness_ldur", 1, name, sizeof name); - CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + /* topics */ + create_topic_name("ddsc_liveliness_ldur", 1, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - /* reader and waitset */ - CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); - dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); - dds_delete_qos(rqos); - CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* reader and waitset */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - /* check if pmd defaults to configured duration */ - CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), get_ldur_config(g_pub_participant)); + /* check if pmd defaults to configured duration */ + CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), get_ldur_config(g_pub_participant)); - /* create writers and check pmd interval in publishing participant */ - add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(1000), &writers[wr_cnt++], pub_topic, reader); - CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000)); + /* create writers and check pmd interval in publishing participant */ + add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(1000), &writers[wr_cnt++], pub_topic, reader); + CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000)); - add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader); - CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000)); + add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader); + CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000)); - add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader); - CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000)); + add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader); + CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000)); - add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(500), &writers[wr_cnt++], pub_topic, reader); - CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500)); + add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(500), &writers[wr_cnt++], pub_topic, reader); + CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500)); - add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader); - CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500)); + add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader); + CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500)); - add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader); - CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500)); + add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader); + CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500)); - /* cleanup */ - for (n = 0; n < wr_cnt; n++) - CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + /* cleanup */ + for (n = 0; n < wr_cnt; n++) + CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); } #undef MAX_WRITERS @@ -456,62 +455,62 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli * publications in the readers. */ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini) { - dds_entity_t pub_topic; - dds_entity_t sub_topic; - dds_entity_t reader; - dds_entity_t writer; - char name[100]; - dds_qos_t *rqos, *wqos; - dds_entity_t waitset; - dds_attach_t triggered; - uint32_t status; - dds_duration_t ldur; + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writer; + char name[100]; + dds_qos_t *rqos, *wqos; + dds_entity_t waitset; + dds_attach_t triggered; + uint32_t status; + dds_duration_t ldur; - /* topics */ - create_topic_name("ddsc_liveliness_ldurpwr", 1, name, sizeof name); - CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + /* topics */ + create_topic_name("ddsc_liveliness_ldurpwr", 1, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - /* reader */ - CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); - dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); - dds_delete_qos(rqos); - CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* reader */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - /* writer */ - ldur = 1000; - CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur)); - CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + /* writer */ + ldur = 1000; + CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur)); + CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); - /* wait for writer to be alive */ - CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); - CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* wait for writer to be alive */ + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - /* check pwr lease duration in matched publication */ - dds_instance_handle_t wrs[1]; - CU_ASSERT_EQUAL_FATAL(dds_get_matched_publications(reader, wrs, 1), 1); - dds_builtintopic_endpoint_t *ep; - ep = dds_get_matched_publication_data(reader, wrs[0]); - CU_ASSERT_FATAL(ep != NULL); - assert(ep != NULL); /* for Clang's static analyzer */ - CU_ASSERT_EQUAL_FATAL(ep->qos->liveliness.lease_duration, DDS_MSECS(ldur)); - dds_delete_qos(ep->qos); - dds_free(ep->topic_name); - dds_free(ep->type_name); - dds_free(ep); + /* check pwr lease duration in matched publication */ + dds_instance_handle_t wrs[1]; + CU_ASSERT_EQUAL_FATAL(dds_get_matched_publications(reader, wrs, 1), 1); + dds_builtintopic_endpoint_t *ep; + ep = dds_get_matched_publication_data(reader, wrs[0]); + CU_ASSERT_FATAL(ep != NULL); + assert(ep != NULL); /* for Clang's static analyzer */ + CU_ASSERT_EQUAL_FATAL(ep->qos->liveliness.lease_duration, DDS_MSECS(ldur)); + dds_delete_qos(ep->qos); + dds_free(ep->topic_name); + dds_free(ep->type_name); + dds_free(ep); - /* cleanup */ - dds_delete_qos(wqos); - CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + /* cleanup */ + dds_delete_qos(wqos); + CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); } /** @@ -521,87 +520,91 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li * is deleted immediately after creating. */ #define MAX_WRITERS 100 -CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30) +CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini, .timeout = 15) { - dds_entity_t pub_topic; - dds_entity_t sub_topic; - dds_entity_t reader; - dds_entity_t writers[MAX_WRITERS]; - dds_entity_t waitset; - dds_qos_t *wqos; - struct dds_liveliness_changed_status lstatus; - uint32_t alive_writers_auto = 0, alive_writers_man = 0; - char name[100]; - dds_qos_t *rqos; - dds_attach_t triggered; - uint32_t n; - Space_Type1 sample = { 0, 0, 0 }; + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writers[MAX_WRITERS]; + dds_entity_t waitset; + dds_qos_t *wqos; + struct dds_liveliness_changed_status lstatus; + uint32_t alive_writers_auto = 0, alive_writers_man = 0; + char name[100]; + dds_qos_t *rqos; + dds_attach_t triggered; + uint32_t n; + Space_Type1 sample = {0, 0, 0}; + int64_t ldur = 1000; - /* topics */ - create_topic_name("ddsc_liveliness_wr_stress", 1, name, sizeof name); - CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + /* topics */ + create_topic_name("ddsc_liveliness_wr_stress", 1, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - /* reader and waitset */ - CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); - dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); - dds_delete_qos(rqos); - CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + /* reader and waitset */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); - /* create 1st writer and wait for it to become alive */ - CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500)); - CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); - alive_writers_man++; + /* create 1st writer and wait for it to become alive */ + CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur)); + CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); + alive_writers_man++; - /* create writers */ - for (n = 1; n < MAX_WRITERS; n++) - { - dds_qset_liveliness(wqos, n % 2 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (n % 3 ? 500 + n : 500 - n)); - CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); - dds_write (writers[n], &sample); - if (n % 3 == 2) - dds_delete(writers[n]); - else if (n % 2) - alive_writers_auto++; - else - alive_writers_man++; - } - dds_delete_qos(wqos); - printf("alive_writers_auto: %d, alive_writers_man: %d\n", alive_writers_auto, alive_writers_man); + /* create writers */ + for (n = 1; n < MAX_WRITERS; n++) + { + dds_qset_liveliness(wqos, n % 2 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(n % 3 ? ldur + n : ldur - n) + ((n % 3) == 2 ? 1 : 0)); + CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + dds_write(writers[n], &sample); + if (n % 3 == 2) + dds_delete(writers[n]); + else if (n % 2) + alive_writers_auto++; + else + alive_writers_man++; + } + dds_delete_qos(wqos); + printf("alive_writers_auto: %d, alive_writers_man: %d\n", alive_writers_auto, alive_writers_man); - /* wait for auto liveliness writers to become alive and manual-by-pp writers to become not-alive */ - do - { - CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status (reader, &lstatus), DDS_RETCODE_OK); - printf("alive: %d, not-alive: %d\n", lstatus.alive_count, lstatus.not_alive_count); - dds_sleepfor (DDS_MSECS(50)); - } - while (lstatus.alive_count != alive_writers_auto || lstatus.not_alive_count != alive_writers_man); + /* wait for auto liveliness writers to become alive and manual-by-pp writers to become not-alive */ + do + { + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); + printf("alive: %d, not-alive: %d\n", lstatus.alive_count, lstatus.not_alive_count); + dds_sleepfor(DDS_MSECS(50)); + } while (lstatus.alive_count != alive_writers_auto || lstatus.not_alive_count != alive_writers_man); - /* cleanup remaining writers */ - for (n = 0; n < MAX_WRITERS; n++) - { - if (n % 3 != 2) - CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); - } - /* wait for alive_count and not_alive_count to become 0 */ - do - { - CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status (reader, &lstatus), DDS_RETCODE_OK); - printf("alive: %d, not: %d\n", lstatus.alive_count, lstatus.not_alive_count); - dds_sleepfor (DDS_MSECS(50)); - } - while (lstatus.alive_count > 0 || lstatus.not_alive_count > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); + /* check that counts are stable after a delay */ + dds_sleepfor(DDS_MSECS(ldur / 2)); + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); + CU_ASSERT_FATAL(lstatus.alive_count == alive_writers_auto && lstatus.not_alive_count == alive_writers_man); + + /* cleanup remaining writers */ + for (n = 0; n < MAX_WRITERS; n++) + { + if (n % 3 != 2) + CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); + } + /* wait for alive_count and not_alive_count to become 0 */ + do + { + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); + printf("alive: %d, not: %d\n", lstatus.alive_count, lstatus.not_alive_count); + dds_sleepfor(DDS_MSECS(ldur / 10)); + } while (lstatus.alive_count > 0 || lstatus.not_alive_count > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); } #undef MAX_WRITERS @@ -610,74 +613,74 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, . */ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini) { - dds_entity_t pub_topic; - dds_entity_t sub_topic; - dds_entity_t reader; - dds_entity_t writer; - dds_entity_t waitset; - dds_qos_t *rqos; - dds_qos_t *wqos; - dds_attach_t triggered; - struct dds_liveliness_changed_status lstatus; - struct dds_subscription_matched_status sstatus; - char name[100]; - dds_duration_t ldur = DDS_MSECS (500); - Space_Type1 sample = { 1, 0, 0 }; + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writer; + dds_entity_t waitset; + dds_qos_t *rqos; + dds_qos_t *wqos; + dds_attach_t triggered; + struct dds_liveliness_changed_status lstatus; + struct dds_subscription_matched_status sstatus; + char name[100]; + dds_duration_t ldur = DDS_MSECS(500); + Space_Type1 sample = {1, 0, 0}; - /* topics */ - create_topic_name("ddsc_liveliness_status_counts", g_topic_nr++, name, sizeof name); - CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + /* topics */ + create_topic_name("ddsc_liveliness_status_counts", g_topic_nr++, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - /* reader */ - CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); - dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); - dds_delete_qos(rqos); - CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); - CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + /* reader */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); - /* writer */ - CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); - dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur); - CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); - dds_delete_qos(wqos); + /* writer */ + CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur); + CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + dds_delete_qos(wqos); - /* wait for writer to be alive */ - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); + /* wait for writer to be alive */ + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); - /* check status counts before proxy writer is expired */ - dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1); - dds_get_subscription_matched_status(reader, &sstatus); - CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); + /* check status counts before proxy writer is expired */ + dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1); + dds_get_subscription_matched_status(reader, &sstatus); + CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); - /* sleep for more than lease duration, writer should be set not-alive but subscription still matched */ - dds_sleepfor(ldur + DDS_MSECS(100)); - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); + /* sleep for more than lease duration, writer should be set not-alive but subscription still matched */ + dds_sleepfor(ldur + DDS_MSECS(100)); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); - dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0); - dds_get_subscription_matched_status(reader, &sstatus); - CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); + dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0); + dds_get_subscription_matched_status(reader, &sstatus); + CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); - /* write sample and re-check status counts */ - dds_write (writer, &sample); - CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); + /* write sample and re-check status counts */ + dds_write(writer, &sample); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); - dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1); - dds_get_subscription_matched_status(reader, &sstatus); - CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); + dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1); + dds_get_subscription_matched_status(reader, &sstatus); + CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); - /* cleanup */ - CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); + /* cleanup */ + CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); } /** @@ -686,127 +689,127 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin */ #define MAX_WRITERS 100 CU_TheoryDataPoints(ddsc_liveliness, assert_liveliness) = { - CU_DataPoints(uint32_t, 1, 0, 0, 1), /* number of writers with automatic liveliness */ - CU_DataPoints(uint32_t, 1, 1, 0, 0), /* number of writers with manual-by-participant liveliness */ - CU_DataPoints(uint32_t, 1, 1, 1, 2), /* number of writers with manual-by-topic liveliness */ + CU_DataPoints(uint32_t, 1, 0, 0, 1), /* number of writers with automatic liveliness */ + CU_DataPoints(uint32_t, 1, 1, 0, 0), /* number of writers with manual-by-participant liveliness */ + CU_DataPoints(uint32_t, 1, 1, 1, 2), /* number of writers with manual-by-topic liveliness */ }; -CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, assert_liveliness, .init = liveliness_init, .fini = liveliness_fini, .timeout=60) +CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, assert_liveliness, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60) { - dds_entity_t pub_topic, sub_topic, reader, writers[MAX_WRITERS]; - dds_qos_t *rqos; - struct dds_liveliness_changed_status lstatus; - char name[100]; - uint32_t ldur = 100, wr_cnt, run = 1, stopped; - dds_time_t tstart, tstop, t; - bool test_finished = false; + dds_entity_t pub_topic, sub_topic, reader, writers[MAX_WRITERS]; + dds_qos_t *rqos; + struct dds_liveliness_changed_status lstatus; + char name[100]; + uint32_t ldur = 100, wr_cnt, run = 1, stopped; + dds_time_t tstart, tstop, t; + bool test_finished = false; - do - { - wr_cnt = 0; - assert (wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp < MAX_WRITERS); - printf("running test assert_liveliness: auto/man-by-part/man-by-topic %u/%u/%u with ldur %d\n", - wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, ldur); + do + { + wr_cnt = 0; + assert(wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp < MAX_WRITERS); + printf("running test assert_liveliness: auto/man-by-part/man-by-topic %u/%u/%u with ldur %d\n", + wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, ldur); - /* topics */ - create_topic_name("ddsc_liveliness_assert", g_topic_nr++, name, sizeof name); - CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + /* topics */ + create_topic_name("ddsc_liveliness_assert", g_topic_nr++, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); - /* reader */ - CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); - dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); - dds_delete_qos(rqos); - CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + /* reader */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); - /* writers */ - for (size_t n = 0; n < wr_cnt_auto; n++) - add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader); - tstart = dds_time(); - for (size_t n = 0; n < wr_cnt_man_pp; n++) - add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader); - for (size_t n = 0; n < wr_cnt_man_tp; n++) - add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader); - t = dds_time(); - if (t - tstart > DDS_MSECS(0.5 * ldur)) - { - ldur *= 10 / (run + 1); - printf("%d.%06d failed to create writers with non-automatic liveliness kind in time\n", - (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); - } - else - { - /* check status counts before proxy writer is expired */ - dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp); + /* writers */ + for (size_t n = 0; n < wr_cnt_auto; n++) + add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader); + tstart = dds_time(); + for (size_t n = 0; n < wr_cnt_man_pp; n++) + add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader); + for (size_t n = 0; n < wr_cnt_man_tp; n++) + add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader); + t = dds_time(); + if (t - tstart > DDS_MSECS(0.5 * ldur)) + { + ldur *= 10 / (run + 1); + printf("%d.%06d failed to create writers with non-automatic liveliness kind in time\n", + (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); + } + else + { + /* check status counts before proxy writer is expired */ + dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp); - /* delay for more than lease duration and assert liveliness on writers: + /* delay for more than lease duration and assert liveliness on writers: all writers (including man-by-pp) should be kept alive */ - tstop = dds_time() + 4 * DDS_MSECS(ldur) / 3; - stopped = 0; - do - { - for (size_t n = wr_cnt - wr_cnt_man_tp; n < wr_cnt; n++) - dds_assert_liveliness (writers[n]); - CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); - stopped += (uint32_t)lstatus.not_alive_count_change; - dds_sleepfor (DDS_MSECS(50)); - } while (dds_time() < tstop); - dds_get_liveliness_changed_status(reader, &lstatus); - printf("writers alive with dds_assert_liveliness on all writers: %d, writers stopped: %d\n", lstatus.alive_count, stopped); - if (lstatus.alive_count != wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp || stopped != 0) - { - ldur *= 10 / (run + 1); - printf("incorrect number of writers alive or stopped writers\n"); - } - else - { - /* delay for more than lease duration and assert liveliness on participant: + tstop = dds_time() + 4 * DDS_MSECS(ldur) / 3; + stopped = 0; + do + { + for (size_t n = wr_cnt - wr_cnt_man_tp; n < wr_cnt; n++) + dds_assert_liveliness(writers[n]); + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); + stopped += (uint32_t)lstatus.not_alive_count_change; + dds_sleepfor(DDS_MSECS(50)); + } while (dds_time() < tstop); + dds_get_liveliness_changed_status(reader, &lstatus); + printf("writers alive with dds_assert_liveliness on all writers: %d, writers stopped: %d\n", lstatus.alive_count, stopped); + if (lstatus.alive_count != wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp || stopped != 0) + { + ldur *= 10 / (run + 1); + printf("incorrect number of writers alive or stopped writers\n"); + } + else + { + /* delay for more than lease duration and assert liveliness on participant: writers with liveliness man-by-pp should be kept alive, man-by-topic writers should stop */ - tstop = dds_time() + 4 * DDS_MSECS(ldur) / 3; - stopped = 0; - do - { - dds_assert_liveliness (g_pub_participant); - CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); - stopped += (uint32_t)lstatus.not_alive_count_change; - dds_sleepfor (DDS_MSECS(50)); - } while (dds_time() < tstop); - dds_get_liveliness_changed_status(reader, &lstatus); - printf("writers alive with dds_assert_liveliness on participant: %d, writers stopped: %d\n", lstatus.alive_count, stopped); - if (lstatus.alive_count != wr_cnt_auto + wr_cnt_man_pp || stopped != wr_cnt_man_tp) - { - ldur *= 10 / (run + 1); - printf("incorrect number of writers alive or stopped writers\n"); - } - else - { - test_finished = true; - } - } - } + tstop = dds_time() + 4 * DDS_MSECS(ldur) / 3; + stopped = 0; + do + { + dds_assert_liveliness(g_pub_participant); + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK); + stopped += (uint32_t)lstatus.not_alive_count_change; + dds_sleepfor(DDS_MSECS(50)); + } while (dds_time() < tstop); + dds_get_liveliness_changed_status(reader, &lstatus); + printf("writers alive with dds_assert_liveliness on participant: %d, writers stopped: %d\n", lstatus.alive_count, stopped); + if (lstatus.alive_count != wr_cnt_auto + wr_cnt_man_pp || stopped != wr_cnt_man_tp) + { + ldur *= 10 / (run + 1); + printf("incorrect number of writers alive or stopped writers\n"); + } + else + { + test_finished = true; + } + } + } - /* cleanup */ - CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); - for (size_t n = 0; n < wr_cnt; n++) - CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); - CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); + /* cleanup */ + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + for (size_t n = 0; n < wr_cnt; n++) + CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); - if (!test_finished) - { - if (++run > 3) - { - CU_FAIL_FATAL("Run limit reached"); - test_finished = true; - continue; - } - else - { - printf("restarting test with ldur %d\n", ldur); - } - } - } while (!test_finished); + if (!test_finished) + { + if (++run > 3) + { + CU_FAIL_FATAL("Run limit reached"); + test_finished = true; + continue; + } + else + { + printf("restarting test with ldur %d\n", ldur); + } + } + } while (!test_finished); } #undef MAX_WRITERS diff --git a/src/core/ddsi/include/dds/ddsi/q_lease.h b/src/core/ddsi/include/dds/ddsi/q_lease.h index d8f2964..51a00cb 100644 --- a/src/core/ddsi/include/dds/ddsi/q_lease.h +++ b/src/core/ddsi/include/dds/ddsi/q_lease.h @@ -42,6 +42,7 @@ void lease_management_term (struct q_globals *gv); struct lease *lease_new (nn_etime_t texpire, int64_t tdur, struct entity_common *e); struct lease *lease_clone (const struct lease *l); void lease_register (struct lease *l); +void lease_unregister (struct lease *l); void lease_free (struct lease *l); void lease_renew (struct lease *l, nn_etime_t tnow); void lease_set_expiry (struct lease *l, nn_etime_t when); diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index f9f13c2..0e72f6c 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -610,13 +610,15 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_ else if (existing_entity->kind == EK_PROXY_PARTICIPANT) { struct proxy_participant *proxypp = (struct proxy_participant *) existing_entity; + struct lease *lease; int interesting = 0; RSTTRACE ("SPDP ST0 "PGUIDFMT" (known)", PGUID (datap->participant_guid)); /* SPDP processing is so different from normal processing that we are even skipping the automatic lease renewal. Note that proxy writers that are not alive are not set alive here. This is done only when data is received from a particular pwr (in handle_regular) */ - lease_renew (ddsrt_atomic_ldvoidp (&proxypp->minl_auto), now_et ()); + if ((lease = ddsrt_atomic_ldvoidp (&proxypp->minl_auto)) != NULL) + lease_renew (lease, now_et ()); ddsrt_mutex_lock (&proxypp->e.lock); if (proxypp->implicitly_created || seq > proxypp->seq) { diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index b15b807..bb1430c 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -3662,51 +3662,67 @@ static void gc_proxy_participant_lease (struct gcreq *gcreq) gcreq_free (gcreq); } -void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct lease *newlease) -{ - /* Lease renewal is done by the receive thread without locking the - proxy participant (and I'd like to keep it that way), but that - means we must guarantee that the lease pointer remains valid once - loaded. - - By loading/storing the pointer atomically, we ensure we always - read a valid (or once valid) value, by delaying the freeing - through the garbage collector, we ensure whatever lease update - occurs in parallel completes before the memory is released. - - The lease_renew(never) call ensures the lease will never expire - while we are messing with it. */ - ddsrt_mutex_lock (&proxypp->e.lock); - if (proxypp->owns_lease) - { - const nn_etime_t never = { T_NEVER }; - struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease); - struct lease *oldlease = proxypp->lease; - lease_renew (oldlease, never); - gcreq->arg = oldlease; - gcreq_enqueue (gcreq); - proxypp->owns_lease = 0; - } - proxypp->lease = newlease; - /* FIXME: replace proxypp lease in leaseheap_auto? */ - ddsrt_mutex_unlock (&proxypp->e.lock); -} - static void proxy_participant_replace_minl (struct proxy_participant *proxypp, bool manbypp, struct lease *lnew) { /* By loading/storing the pointer atomically, we ensure we always read a valid (or once valid) lease. By delaying freeing the lease through the garbage collector, we ensure whatever lease update occurs in parallel completes before the memory is released. */ - const nn_etime_t never = { T_NEVER }; struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease); struct lease *lease_old = ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto); - lease_renew (lease_old, never); /* ensures lease will not expire while it is replaced */ + lease_unregister (lease_old); /* ensures lease will not expire while it is replaced */ gcreq->arg = lease_old; gcreq_enqueue (gcreq); ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, lnew); } +void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct lease *newlease) +{ + ddsrt_mutex_lock (&proxypp->e.lock); + if (proxypp->owns_lease) + { + struct lease *minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto); + ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease); + if (minl == proxypp->lease) + { + if ((minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto)) != NULL) + { + dds_duration_t trem = minl->tdur - proxypp->lease->tdur; + assert (trem >= 0); + nn_etime_t texp = add_duration_to_etime (now_et(), trem); + struct lease *lnew = lease_new (texp, minl->tdur, minl->entity); + proxy_participant_replace_minl (proxypp, false, lnew); + lease_register (lnew); + } + else + { + proxy_participant_replace_minl (proxypp, false, NULL); + } + } + + /* Lease renewal is done by the receive thread without locking the + proxy participant (and I'd like to keep it that way), but that + means we must guarantee that the lease pointer remains valid once + loaded. + + By loading/storing the pointer atomically, we ensure we always + read a valid (or once valid) value, by delaying the freeing + through the garbage collector, we ensure whatever lease update + occurs in parallel completes before the memory is released. + + The lease_unregister call ensures the lease will never expire + while we are messing with it. */ + struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease); + lease_unregister (proxypp->lease); + gcreq->arg = proxypp->lease; + gcreq_enqueue (gcreq); + proxypp->owns_lease = 0; + } + proxypp->lease = newlease; + + ddsrt_mutex_unlock (&proxypp->e.lock); +} + static void proxy_participant_add_pwr_lease_locked (struct proxy_participant * proxypp, const struct proxy_writer * pwr) { struct lease *minl_prev; @@ -3764,7 +3780,6 @@ static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant } else { - assert (manbypp); proxy_participant_replace_minl (proxypp, manbypp, NULL); } } @@ -3829,10 +3844,16 @@ void new_proxy_participant { struct proxy_participant *privpp; privpp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &proxypp->privileged_pp_guid); + + ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_auto); + ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_man); + ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL); + if (privpp != NULL && privpp->is_ddsi2_pp) { proxypp->lease = privpp->lease; proxypp->owns_lease = 0; + ddsrt_atomic_stvoidp (&proxypp->minl_auto, NULL); } else { @@ -3849,10 +3870,6 @@ void new_proxy_participant proxypp->lease = lease_new (texp, dur, &proxypp->e); proxypp->owns_lease = 1; - /* Init heap for leases */ - ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_auto); - ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_man); - /* Add the proxypp lease to heap so that monitoring liveliness will include this lease and uses the shortest duration for proxypp and all its pwr's (with automatic liveliness) */ ddsrt_fibheap_insert (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease); @@ -3863,7 +3880,6 @@ void new_proxy_participant by the lease from the proxy writer in proxy_participant_add_pwr_lease_locked. This old shortest lease is freed, so that's why we need a clone and not the proxypp's lease in the heap. */ ddsrt_atomic_stvoidp (&proxypp->minl_auto, (void *) lease_clone (proxypp->lease)); - ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL); } } @@ -4071,6 +4087,7 @@ static void unref_proxy_participant (struct proxy_participant *proxypp, struct p assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL); assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL); assert (!compare_guid (&minl_auto->entity->guid, &proxypp->e.guid)); + lease_unregister (minl_auto); lease_free (minl_auto); lease_free (proxypp->lease); } @@ -4556,6 +4573,7 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_ GVLOGDISC ("- unknown\n"); return DDS_RETCODE_BAD_PARAMETER; } + /* Set "deleting" flag in particular for Lite, to signal to the receive path it can't trust rdary[] anymore, which is because removing the proxy writer from the hash table will prevent the readers from looking up the proxy writer, and consequently @@ -4565,6 +4583,9 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_ builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false); ephash_remove_proxy_writer_guid (gv->guid_hash, pwr); ddsrt_mutex_unlock (&gv->lock); + if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && + pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC) + lease_unregister (pwr->lease); if (proxy_writer_set_notalive (pwr, false) != DDS_RETCODE_OK) GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid)); gcreq_proxy_writer (pwr); diff --git a/src/core/ddsi/src/q_lease.c b/src/core/ddsi/src/q_lease.c index ccd0306..12f36e8 100644 --- a/src/core/ddsi/src/q_lease.c +++ b/src/core/ddsi/src/q_lease.c @@ -121,10 +121,10 @@ void lease_register (struct lease *l) /* FIXME: make lease admin struct */ force_lease_check (gv->gcreq_queue); } -void lease_free (struct lease *l) +void lease_unregister (struct lease *l) { struct q_globals * const gv = l->entity->gv; - GVTRACE ("lease_free(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid)); + GVTRACE ("lease_unregister(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid)); ddsrt_mutex_lock (&gv->leaseheap_lock); if (l->tsched.v != TSCHED_NOT_ON_HEAP) { @@ -132,12 +132,18 @@ void lease_free (struct lease *l) l->tsched.v = TSCHED_NOT_ON_HEAP; } ddsrt_mutex_unlock (&gv->leaseheap_lock); - ddsrt_free (l); /* see lease_register() */ force_lease_check (gv->gcreq_queue); } +void lease_free (struct lease *l) +{ + struct q_globals * const gv = l->entity->gv; + GVTRACE ("lease_free(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid)); + ddsrt_free (l); +} + void lease_renew (struct lease *l, nn_etime_t tnowE) { nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur); diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index b20dd06..58bb2de 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -617,6 +617,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac struct proxy_reader *prd; struct wr_prd_match *rn; struct writer *wr; + struct lease *lease; ddsi_guid_t src, dst; seqno_t seqbase; seqno_t seq_xmit; @@ -668,7 +669,8 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac return 1; } - lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow); + if ((lease = ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto)) != NULL) + lease_renew (lease, tnow); if (!wr->reliable) /* note: reliability can't be changed */ { RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer!)", PGUID (src), PGUID (dst)); @@ -1111,6 +1113,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct const seqno_t lastseq = fromSN (msg->lastSN); struct handle_Heartbeat_helper_arg arg; struct proxy_writer *pwr; + struct lease *lease; ddsi_guid_t src, dst; src.prefix = rst->src_guid_prefix; @@ -1131,15 +1134,15 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct RSTTRACE (PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst)); return 1; } - lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow); + if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL) + lease_renew (lease, tnow); RSTTRACE (PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst)); ddsrt_mutex_lock (&pwr->e.lock); if (msg->smhdr.flags & HEARTBEAT_FLAG_LIVELINESS && pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC && pwr->c.xqos->liveliness.lease_duration != T_NEVER) { - struct lease *lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man); - if (lease != NULL) + if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL) lease_renew (lease, tnow); lease_renew (pwr->lease, tnow); } @@ -1253,6 +1256,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime const nn_fragment_number_t fragnum = msg->lastFragmentNum - 1; /* we do 0-based */ ddsi_guid_t src, dst; struct proxy_writer *pwr; + struct lease *lease; src.prefix = rst->src_guid_prefix; src.entityid = msg->writerId; @@ -1272,7 +1276,8 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime return 1; } - lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow); + if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL) + lease_renew (lease, tnow); RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst)); ddsrt_mutex_lock (&pwr->e.lock); @@ -1361,6 +1366,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N struct proxy_reader *prd; struct wr_prd_match *rn; struct writer *wr; + struct lease *lease; struct whc_borrowed_sample sample; ddsi_guid_t src, dst; nn_count_t *countp; @@ -1397,7 +1403,8 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N return 1; } - lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow); + if ((lease = ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto)) != NULL) + lease_renew (lease, tnow); if (!wr->reliable) /* note: reliability can't be changed */ { RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer)", PGUID (src), PGUID (dst)); @@ -1609,6 +1616,7 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm struct proxy_writer *pwr; struct pwr_rd_match *wn; + struct lease *lease; ddsi_guid_t src, dst; seqno_t gapstart, listbase; int32_t last_included_rel; @@ -1642,7 +1650,8 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm return 1; } - lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow); + if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL) + lease_renew (lease, tnow); ddsrt_mutex_lock (&pwr->e.lock); if ((wn = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst)) == NULL) { @@ -2119,7 +2128,8 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct so check whether it is actually in manual-by-topic mode before renewing it. As pwr->lease is set once (during entity creation) we can read it outside the lock, keeping all the lease renewals together. */ - lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow); + if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto)) != NULL) + lease_renew (lease, tnow); if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL && renew_manbypp_lease) lease_renew (lease, tnow); if (pwr->lease && pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)