diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c
index 9f84ec3..ff19588 100644
--- a/src/core/ddsc/src/dds_rhc_default.c
+++ b/src/core/ddsc/src/dds_rhc_default.c
@@ -2394,7 +2394,7 @@ static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_
ddsrt_atomic_st32 (&cond->m_entity.m_status.m_trigger, trigger);
dds_entity_status_signal (&cond->m_entity, DDS_DATA_AVAILABLE_STATUS);
}
-
+
TRACE ("add_readcondition(%p, %"PRIx32", %"PRIx32", %"PRIx32") => %p qminv %"PRIx32" ; rhc %"PRIu32" conds\n",
(void *) rhc, cond->m_sample_states, cond->m_view_states,
cond->m_instance_states, (void *) cond, cond->m_qminv, rhc->nconds);
diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c
index c5bfe49..a706568 100644
--- a/src/core/ddsc/tests/liveliness.c
+++ b/src/core/ddsc/tests/liveliness.c
@@ -31,8 +31,7 @@
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}127.0.0.10"
-#define DDS_CONFIG_NO_PORT_GAIN_LOG_PUB "<"DDS_PROJECT_NAME">0cyclonedds_liveliness_pub.logfinest"DDS_PROJECT_NAME">"
-#define DDS_CONFIG_NO_PORT_GAIN_LOG_SUB "<"DDS_PROJECT_NAME">0cyclonedds_liveliness_sub.logfinest"DDS_PROJECT_NAME">"
+#define DDS_CONFIG_NO_PORT_GAIN_LOG "<"DDS_PROJECT_NAME">0cyclonedds_liveliness.logfinest"DDS_PROJECT_NAME">"
uint32_t g_topic_nr = 0;
static dds_entity_t g_pub_domain = 0;
@@ -130,7 +129,7 @@ CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = {
};
#undef A
#undef MP
-CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 10)
+CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
@@ -186,7 +185,8 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000,
start_seqno, end_seqno);
- CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 1 : 0))
+ /* end-start should be mult - 1, but allow 1 pmd sample to be lost */
+ CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 2 : 0))
if (kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT)
CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult)
@@ -198,16 +198,12 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
}
/* FIXME: add DDS_LIVELINESS_MANUAL_BY_TOPIC */
-#define A DDS_LIVELINESS_AUTOMATIC
-#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT
CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = {
CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration */
CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */
CU_DataPoints(size_t, 1, 0, 2, 0, 1, 0, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */
CU_DataPoints(size_t, 1, 1, 2, 2, 1, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */
};
-#undef A
-#undef MP
CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60)
{
dds_entity_t pub_topic;
@@ -467,4 +463,152 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
-}
\ No newline at end of file
+}
+
+#define MAX_WRITERS 100
+CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini)
+{
+ dds_entity_t pub_topic;
+ dds_entity_t sub_topic;
+ dds_entity_t reader;
+ dds_entity_t writers[MAX_WRITERS];
+ dds_entity_t waitset;
+ dds_qos_t *wqos;
+ struct dds_liveliness_changed_status lstatus;
+ size_t wr_cnt = 0;
+ char name[100];
+ dds_qos_t *rqos;
+ dds_attach_t triggered;
+ uint32_t n, status;
+ Space_Type1 sample = { 0, 0, 0 };
+
+ /* topics */
+ create_topic_name("ddsc_liveliness_wr_stress", 1, name, sizeof name);
+ CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
+ CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
+
+ /* reader and waitset */
+ CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
+ dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
+ CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
+ dds_delete_qos(rqos);
+ CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
+ CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
+
+ /* create 1st writer and wait for it to become alive */
+ CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
+ dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500));
+ CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
+ CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
+
+ /* create writers */
+ for (n = 1; n < MAX_WRITERS; n++)
+ {
+ dds_qset_liveliness(wqos, n % 1 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500 - n));
+ CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
+ dds_write (writers[n], &sample);
+ if (n % 3 == 2)
+ dds_delete(writers[n]);
+ }
+ dds_delete_qos(wqos);
+
+ /* wait for all writers to become alive */
+ while (wr_cnt < MAX_WRITERS)
+ {
+ CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status (reader, &lstatus), DDS_RETCODE_OK);
+ wr_cnt += (uint32_t)lstatus.alive_count_change;
+ dds_sleepfor (DDS_MSECS(50));
+ }
+ CU_ASSERT_EQUAL_FATAL (wr_cnt, MAX_WRITERS);
+
+ /* cleanup remaining writers */
+ for (n = 0; n < wr_cnt; n++)
+ {
+ if (n % 3 != 2)
+ CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
+ }
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
+}
+#undef MAX_WRITERS
+
+CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini)
+{
+ dds_entity_t pub_topic;
+ dds_entity_t sub_topic;
+ dds_entity_t reader;
+ dds_entity_t writer;
+ dds_entity_t waitset;
+ dds_qos_t *rqos;
+ dds_qos_t *wqos;
+ dds_attach_t triggered;
+ uint32_t status = 0;
+ struct dds_liveliness_changed_status lstatus;
+ struct dds_subscription_matched_status sstatus;
+ char name[100];
+ dds_duration_t ldur = DDS_MSECS (500);
+ Space_Type1 sample = { 1, 0, 0 };
+
+ /* topics */
+ create_topic_name("ddsc_liveliness_status_counts", g_topic_nr++, name, sizeof name);
+ CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
+ CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
+
+ /* reader */
+ CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
+ dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
+ CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
+ dds_delete_qos(rqos);
+ CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
+ CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
+
+ /* writer */
+ CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
+ dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur);
+ CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
+ dds_delete_qos(wqos);
+
+ /* wait for writer to be alive */
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
+ CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
+
+ /* check status counts before proxy writer is expired */
+ dds_get_liveliness_changed_status(reader, &lstatus);
+ CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
+ dds_get_subscription_matched_status(reader, &sstatus);
+ CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
+
+ /* sleep for more than lease duration, writer should be set not-alive but subscription still matched */
+ dds_sleepfor(ldur + DDS_MSECS(100));
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
+ CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
+
+ dds_get_liveliness_changed_status(reader, &lstatus);
+ CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0);
+ dds_get_subscription_matched_status(reader, &sstatus);
+ CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
+
+ /* write sample and re-check status counts */
+ dds_write (writer, &sample);
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
+ CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
+
+ dds_get_liveliness_changed_status(reader, &lstatus);
+ CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
+ dds_get_subscription_matched_status(reader, &sstatus);
+ CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
+
+ /* cleanup */
+ CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
+}
diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c
index 8967abb..f9adc0b 100644
--- a/src/core/ddsi/src/q_entity.c
+++ b/src/core/ddsi/src/q_entity.c
@@ -1509,10 +1509,8 @@ static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struc
if (rd->status_cb)
{
status_cb_data_t data;
-
data.add = false;
data.handle = pwr->e.iid;
-
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
@@ -3680,7 +3678,8 @@ static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant
if ((minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh)) != NULL)
{
dds_duration_t trem = minl->tdur - pwr->lease->tdur;
- nn_etime_t texp = add_duration_to_etime (now_et(), trem >= 0 ? trem : 0);
+ assert (trem >= 0);
+ nn_etime_t texp = add_duration_to_etime (now_et(), trem);
struct lease *lnew = lease_new (texp, minl->tdur, minl->entity);
proxy_participant_replace_minl (proxypp, manbypp, lnew);
lease_register (lnew);
@@ -3986,22 +3985,23 @@ static void unref_proxy_participant (struct proxy_participant *proxypp, struct p
if (refc == 0)
{
assert (proxypp->endpoints == NULL);
+ if (proxypp->owns_lease)
+ {
+ struct lease * minl_auto = ddsrt_atomic_ldvoidp (&proxypp->minl_auto);
+ ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
+ assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto) == NULL);
+ assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL);
+ assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL);
+ assert (!compare_guid (&minl_auto->entity->guid, &proxypp->e.guid));
+ lease_free (minl_auto);
+ lease_free (proxypp->lease);
+ }
ddsrt_mutex_unlock (&proxypp->e.lock);
ELOGDISC (proxypp, "unref_proxy_participant("PGUIDFMT"): refc=0, freeing\n", PGUID (proxypp->e.guid));
unref_addrset (proxypp->as_default);
unref_addrset (proxypp->as_meta);
nn_plist_fini (proxypp->plist);
ddsrt_free (proxypp->plist);
- if (proxypp->owns_lease)
- {
- ddsrt_mutex_lock (&proxypp->e.lock);
- ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
- assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto) == NULL);
- assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL);
- ddsrt_mutex_unlock (&proxypp->e.lock);
- lease_free (ddsrt_atomic_ldvoidp (&proxypp->minl_auto));
- lease_free (proxypp->lease);
- }
entity_common_fini (&proxypp->e);
remove_deleted_participant_guid (proxypp->e.gv->deleted_participants, &proxypp->e.guid, DPG_LOCAL | DPG_REMOTE);
ddsrt_free (proxypp);
@@ -4461,17 +4461,33 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive)
{
ddsrt_avl_iter_t it;
- GVLOGDISC ("proxy_writer_set_alive_locked ("PGUIDFMT") ", PGUID (pwr->e.guid));
assert (pwr->alive != alive);
pwr->alive = alive;
- GVLOGDISC ("- alive=%d\n", pwr->alive);
- if (!pwr->alive)
+ GVLOGDISC (" alive=%d", pwr->alive);
+ if (pwr->alive)
+ {
+ for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
+ {
+ struct reader *rd;
+ if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
+ {
+ status_cb_data_t data;
+ data.add = true;
+ data.handle = pwr->e.iid;
+ data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
+ (rd->status_cb) (rd->status_cb_entity, &data);
+ }
+ }
+ if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
+ proxy_participant_add_pwr_lease (pwr->c.proxypp, pwr);
+ }
+ else
{
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
reader_drop_connection (&m->rd_guid, pwr, false);
+ if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
+ proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr);
}
- if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
- pwr->alive ? proxy_participant_add_pwr_lease (pwr->c.proxypp, pwr) : proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr);
return 0;
}
@@ -4483,7 +4499,7 @@ int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *g
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
{
ddsrt_mutex_unlock (&gv->lock);
- GVLOGDISC ("proxy_writer_set_alive_guid ("PGUIDFMT") - unknown\n", PGUID (*guid));
+ GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
}
ddsrt_mutex_unlock (&gv->lock);
diff --git a/src/core/ddsi/src/q_lease.c b/src/core/ddsi/src/q_lease.c
index f86d3f7..f107855 100644
--- a/src/core/ddsi/src/q_lease.c
+++ b/src/core/ddsi/src/q_lease.c
@@ -272,7 +272,9 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
delete_proxy_participant_by_guid (gv, &g, now(), 1);
break;
case EK_PROXY_WRITER:
+ GVLOGDISC ("proxy_writer_set_alive ("PGUIDFMT")", PGUID (g));
(void) proxy_writer_set_alive_guid (gv, &g, false);
+ GVLOGDISC ("\n");
break;
case EK_PARTICIPANT:
case EK_READER: