From 32c5a59c8fdcd22114d96be0286549a12ea16daa Mon Sep 17 00:00:00 2001 From: Dennis Potman Date: Mon, 11 Nov 2019 14:47:26 +0100 Subject: [PATCH] Additional tests for liveliness QoS and minor refactoring Added unit tests for (1) testing the scenario that a proxy writer writes data after its lease is expired, to check that the status for the pwr is set to alive again and (2) stress-testing the creation and deletetion of writers with decreasing lease duration. In addition I've optimized the locking in unref_proxy_participant a bit and fixed the liveliness changed callback when a writer with expired lease (not-alive) gets alive again. Signed-off-by: Dennis Potman --- src/core/ddsc/src/dds_rhc_default.c | 2 +- src/core/ddsc/tests/liveliness.c | 162 ++++++++++++++++++++++++++-- src/core/ddsi/src/q_entity.c | 54 ++++++---- src/core/ddsi/src/q_lease.c | 2 + 4 files changed, 191 insertions(+), 29 deletions(-) diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index 9f84ec3..ff19588 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -2394,7 +2394,7 @@ static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_ ddsrt_atomic_st32 (&cond->m_entity.m_status.m_trigger, trigger); dds_entity_status_signal (&cond->m_entity, DDS_DATA_AVAILABLE_STATUS); } - + TRACE ("add_readcondition(%p, %"PRIx32", %"PRIx32", %"PRIx32") => %p qminv %"PRIx32" ; rhc %"PRIu32" conds\n", (void *) rhc, cond->m_sample_states, cond->m_view_states, cond->m_instance_states, (void *) cond, cond->m_qminv, rhc->nconds); diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c index c5bfe49..a706568 100644 --- a/src/core/ddsc/tests/liveliness.c +++ b/src/core/ddsc/tests/liveliness.c @@ -31,8 +31,7 @@ #define DDS_DOMAINID_PUB 0 #define DDS_DOMAINID_SUB 1 #define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}127.0.0.10" -#define DDS_CONFIG_NO_PORT_GAIN_LOG_PUB "<"DDS_PROJECT_NAME">0cyclonedds_liveliness_pub.logfinest" -#define DDS_CONFIG_NO_PORT_GAIN_LOG_SUB "<"DDS_PROJECT_NAME">0cyclonedds_liveliness_sub.logfinest" +#define DDS_CONFIG_NO_PORT_GAIN_LOG "<"DDS_PROJECT_NAME">0cyclonedds_liveliness.logfinest" uint32_t g_topic_nr = 0; static dds_entity_t g_pub_domain = 0; @@ -130,7 +129,7 @@ CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = { }; #undef A #undef MP -CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 10) +CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30) { dds_entity_t pub_topic; dds_entity_t sub_topic; @@ -186,7 +185,8 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, start_seqno, end_seqno); - CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 1 : 0)) + /* end-start should be mult - 1, but allow 1 pmd sample to be lost */ + CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 2 : 0)) if (kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT) CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult) @@ -198,16 +198,12 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin } /* FIXME: add DDS_LIVELINESS_MANUAL_BY_TOPIC */ -#define A DDS_LIVELINESS_AUTOMATIC -#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = { CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration */ CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */ CU_DataPoints(size_t, 1, 0, 2, 0, 1, 0, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */ CU_DataPoints(size_t, 1, 1, 2, 2, 1, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */ }; -#undef A -#undef MP CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60) { dds_entity_t pub_topic; @@ -467,4 +463,152 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); -} \ No newline at end of file +} + +#define MAX_WRITERS 100 +CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini) +{ + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writers[MAX_WRITERS]; + dds_entity_t waitset; + dds_qos_t *wqos; + struct dds_liveliness_changed_status lstatus; + size_t wr_cnt = 0; + char name[100]; + dds_qos_t *rqos; + dds_attach_t triggered; + uint32_t n, status; + Space_Type1 sample = { 0, 0, 0 }; + + /* topics */ + create_topic_name("ddsc_liveliness_wr_stress", 1, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + + /* reader and waitset */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + + /* create 1st writer and wait for it to become alive */ + CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500)); + CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + + /* create writers */ + for (n = 1; n < MAX_WRITERS; n++) + { + dds_qset_liveliness(wqos, n % 1 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500 - n)); + CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + dds_write (writers[n], &sample); + if (n % 3 == 2) + dds_delete(writers[n]); + } + dds_delete_qos(wqos); + + /* wait for all writers to become alive */ + while (wr_cnt < MAX_WRITERS) + { + CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status (reader, &lstatus), DDS_RETCODE_OK); + wr_cnt += (uint32_t)lstatus.alive_count_change; + dds_sleepfor (DDS_MSECS(50)); + } + CU_ASSERT_EQUAL_FATAL (wr_cnt, MAX_WRITERS); + + /* cleanup remaining writers */ + for (n = 0; n < wr_cnt; n++) + { + if (n % 3 != 2) + CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); + } + CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); +} +#undef MAX_WRITERS + +CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini) +{ + dds_entity_t pub_topic; + dds_entity_t sub_topic; + dds_entity_t reader; + dds_entity_t writer; + dds_entity_t waitset; + dds_qos_t *rqos; + dds_qos_t *wqos; + dds_attach_t triggered; + uint32_t status = 0; + struct dds_liveliness_changed_status lstatus; + struct dds_subscription_matched_status sstatus; + char name[100]; + dds_duration_t ldur = DDS_MSECS (500); + Space_Type1 sample = { 1, 0, 0 }; + + /* topics */ + create_topic_name("ddsc_liveliness_status_counts", g_topic_nr++, name, sizeof name); + CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); + + /* reader */ + CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL); + dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); + CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0); + dds_delete_qos(rqos); + CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); + CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); + + /* writer */ + CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL); + dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur); + CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); + dds_delete_qos(wqos); + + /* wait for writer to be alive */ + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + + /* check status counts before proxy writer is expired */ + dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1); + dds_get_subscription_matched_status(reader, &sstatus); + CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); + + /* sleep for more than lease duration, writer should be set not-alive but subscription still matched */ + dds_sleepfor(ldur + DDS_MSECS(100)); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + + dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0); + dds_get_subscription_matched_status(reader, &sstatus); + CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); + + /* write sample and re-check status counts */ + dds_write (writer, &sample); + CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); + CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); + + dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1); + dds_get_subscription_matched_status(reader, &sstatus); + CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1); + + /* cleanup */ + CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); + CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); +} diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 8967abb..f9adc0b 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -1509,10 +1509,8 @@ static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struc if (rd->status_cb) { status_cb_data_t data; - data.add = false; data.handle = pwr->e.iid; - data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; (rd->status_cb) (rd->status_cb_entity, &data); @@ -3680,7 +3678,8 @@ static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant if ((minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh)) != NULL) { dds_duration_t trem = minl->tdur - pwr->lease->tdur; - nn_etime_t texp = add_duration_to_etime (now_et(), trem >= 0 ? trem : 0); + assert (trem >= 0); + nn_etime_t texp = add_duration_to_etime (now_et(), trem); struct lease *lnew = lease_new (texp, minl->tdur, minl->entity); proxy_participant_replace_minl (proxypp, manbypp, lnew); lease_register (lnew); @@ -3986,22 +3985,23 @@ static void unref_proxy_participant (struct proxy_participant *proxypp, struct p if (refc == 0) { assert (proxypp->endpoints == NULL); + if (proxypp->owns_lease) + { + struct lease * minl_auto = ddsrt_atomic_ldvoidp (&proxypp->minl_auto); + ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease); + assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto) == NULL); + assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL); + assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL); + assert (!compare_guid (&minl_auto->entity->guid, &proxypp->e.guid)); + lease_free (minl_auto); + lease_free (proxypp->lease); + } ddsrt_mutex_unlock (&proxypp->e.lock); ELOGDISC (proxypp, "unref_proxy_participant("PGUIDFMT"): refc=0, freeing\n", PGUID (proxypp->e.guid)); unref_addrset (proxypp->as_default); unref_addrset (proxypp->as_meta); nn_plist_fini (proxypp->plist); ddsrt_free (proxypp->plist); - if (proxypp->owns_lease) - { - ddsrt_mutex_lock (&proxypp->e.lock); - ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease); - assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto) == NULL); - assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL); - ddsrt_mutex_unlock (&proxypp->e.lock); - lease_free (ddsrt_atomic_ldvoidp (&proxypp->minl_auto)); - lease_free (proxypp->lease); - } entity_common_fini (&proxypp->e); remove_deleted_participant_guid (proxypp->e.gv->deleted_participants, &proxypp->e.guid, DPG_LOCAL | DPG_REMOTE); ddsrt_free (proxypp); @@ -4461,17 +4461,33 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_ int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive) { ddsrt_avl_iter_t it; - GVLOGDISC ("proxy_writer_set_alive_locked ("PGUIDFMT") ", PGUID (pwr->e.guid)); assert (pwr->alive != alive); pwr->alive = alive; - GVLOGDISC ("- alive=%d\n", pwr->alive); - if (!pwr->alive) + GVLOGDISC (" alive=%d", pwr->alive); + if (pwr->alive) + { + for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) + { + struct reader *rd; + if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL) + { + status_cb_data_t data; + data.add = true; + data.handle = pwr->e.iid; + data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); + } + } + if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) + proxy_participant_add_pwr_lease (pwr->c.proxypp, pwr); + } + else { for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) reader_drop_connection (&m->rd_guid, pwr, false); + if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) + proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr); } - if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) - pwr->alive ? proxy_participant_add_pwr_lease (pwr->c.proxypp, pwr) : proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr); return 0; } @@ -4483,7 +4499,7 @@ int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *g if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL) { ddsrt_mutex_unlock (&gv->lock); - GVLOGDISC ("proxy_writer_set_alive_guid ("PGUIDFMT") - unknown\n", PGUID (*guid)); + GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*guid)); return DDS_RETCODE_BAD_PARAMETER; } ddsrt_mutex_unlock (&gv->lock); diff --git a/src/core/ddsi/src/q_lease.c b/src/core/ddsi/src/q_lease.c index f86d3f7..f107855 100644 --- a/src/core/ddsi/src/q_lease.c +++ b/src/core/ddsi/src/q_lease.c @@ -272,7 +272,9 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow delete_proxy_participant_by_guid (gv, &g, now(), 1); break; case EK_PROXY_WRITER: + GVLOGDISC ("proxy_writer_set_alive ("PGUIDFMT")", PGUID (g)); (void) proxy_writer_set_alive_guid (gv, &g, false); + GVLOGDISC ("\n"); break; case EK_PARTICIPANT: case EK_READER: