diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c
index 724948a..4e6a225 100644
--- a/src/core/ddsc/src/dds_entity.c
+++ b/src/core/ddsc/src/dds_entity.c
@@ -24,6 +24,7 @@
#include "dds/version.h"
#include "dds/ddsi/ddsi_pmd.h"
#include "dds/ddsi/q_xqos.h"
+#include "dds/ddsi/q_transmit.h"
extern inline dds_entity *dds_entity_from_handle_link (struct dds_handle_link *hdllink);
extern inline bool dds_entity_is_enabled (const dds_entity *e);
@@ -1387,9 +1388,9 @@ dds_return_t dds_generic_unimplemented_operation (dds_entity_t handle, dds_entit
dds_return_t dds_assert_liveliness (dds_entity_t entity)
{
dds_return_t rc;
- dds_entity *e;
+ dds_entity *e, *ewr;
- if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
+ if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
switch (dds_entity_kind (e))
{
@@ -1398,8 +1399,11 @@ dds_return_t dds_assert_liveliness (dds_entity_t entity)
break;
}
case DDS_KIND_WRITER: {
- /* FIXME: implement liveliness manual-by-topic */
- rc = DDS_RETCODE_UNSUPPORTED;
+ if ((rc = dds_entity_lock (entity, DDS_KIND_WRITER, &ewr)) != DDS_RETCODE_OK)
+ return rc;
+ if ((rc = write_hb_liveliness (&e->m_domain->gv, &e->m_guid, ((struct dds_writer *)ewr)->m_xp)) != DDS_RETCODE_OK)
+ return rc;
+ dds_entity_unlock (e);
break;
}
default: {
@@ -1407,6 +1411,6 @@ dds_return_t dds_assert_liveliness (dds_entity_t entity)
break;
}
}
- dds_entity_unlock (e);
+ dds_entity_unpin (e);
return rc;
}
diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c
index a706568..7063410 100644
--- a/src/core/ddsc/tests/liveliness.c
+++ b/src/core/ddsc/tests/liveliness.c
@@ -30,8 +30,8 @@
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
-#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}127.0.0.10"
-#define DDS_CONFIG_NO_PORT_GAIN_LOG "<"DDS_PROJECT_NAME">0cyclonedds_liveliness.logfinest"DDS_PROJECT_NAME">"
+#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}0"
+#define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}cyclonedds_liveliness_tests.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.logfinest0"
uint32_t g_topic_nr = 0;
static dds_entity_t g_pub_domain = 0;
@@ -44,7 +44,7 @@ static dds_entity_t g_sub_subscriber = 0;
static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size_t size)
{
- /* Get semi random g_topic name. */
+ /* Get unique g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid();
(void)snprintf(name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid);
@@ -53,10 +53,15 @@ static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size
static void liveliness_init(void)
{
- char *conf = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, UINT32_MAX);
- g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf);
- g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf);
- dds_free(conf);
+ /* Domains for pub and sub use a different domain id, but the portgain setting
+ * in configuration is 0, so that both domains will map to the same port number.
+ * This allows to create two domains in a single test process. */
+ char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB);
+ char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB);
+ g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub);
+ g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub);
+ dds_free(conf_pub);
+ dds_free(conf_sub);
g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL(g_pub_participant > 0);
@@ -79,6 +84,11 @@ static void liveliness_fini(void)
dds_delete(g_pub_domain);
}
+/**
+ * Gets the current PMD sequence number for the participant. This
+ * can be used to count the number of PMD messages that is sent by
+ * the participant.
+ */
static seqno_t get_pmd_seqno(dds_entity_t participant)
{
seqno_t seqno;
@@ -96,6 +106,9 @@ static seqno_t get_pmd_seqno(dds_entity_t participant)
return seqno;
}
+/**
+ * Gets the current PMD interval for the participant
+ */
static dds_duration_t get_pmd_interval(dds_entity_t participant)
{
dds_duration_t intv;
@@ -110,6 +123,9 @@ static dds_duration_t get_pmd_interval(dds_entity_t participant)
return intv;
}
+/**
+ * Gets the current lease duration for the participant
+ */
static dds_duration_t get_ldur_config(dds_entity_t participant)
{
struct dds_entity *pp_entity;
@@ -120,15 +136,22 @@ static dds_duration_t get_ldur_config(dds_entity_t participant)
return ldur;
}
+
+/**
+ * Test that the correct number of PMD messages is sent for
+ * the various liveliness kinds.
+ */
#define A DDS_LIVELINESS_AUTOMATIC
#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT
+#define MT DDS_LIVELINESS_MANUAL_BY_TOPIC
CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = {
- CU_DataPoints(dds_liveliness_kind_t, A, A, MP), /* liveliness kind */
- CU_DataPoints(uint32_t, 200, 200, 200), /* lease duration */
- CU_DataPoints(double, 5, 10, 5), /* delay (n times lease duration) */
+ CU_DataPoints(dds_liveliness_kind_t, A, A, MP, MT), /* liveliness kind */
+ CU_DataPoints(uint32_t, 200, 200, 200, 200), /* lease duration */
+ CU_DataPoints(double, 5, 10, 5, 5), /* delay (n times lease duration) */
};
-#undef A
+#undef MT
#undef MP
+#undef A
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
{
dds_entity_t pub_topic;
@@ -150,7 +173,7 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
kind == 0 ? "A" : "MP", ldur, mult);
/* topics */
- create_topic_name("ddsc_liveliness_test", g_topic_nr++, name, sizeof name);
+ create_topic_name("ddsc_liveliness_pmd_count", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
@@ -187,7 +210,7 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
/* end-start should be mult - 1, but allow 1 pmd sample to be lost */
CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 2 : 0))
- if (kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT)
+ if (kind != DDS_LIVELINESS_AUTOMATIC)
CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult)
/* cleanup */
@@ -197,39 +220,42 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}
-/* FIXME: add DDS_LIVELINESS_MANUAL_BY_TOPIC */
+/**
+ * Test that the expected number of proxy writers expires (set to not-alive)
+ * after a certain delay for various combinations of writers with different
+ * liveliness kinds.
+ */
CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = {
- CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration */
- CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */
- CU_DataPoints(size_t, 1, 0, 2, 0, 1, 0, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */
- CU_DataPoints(size_t, 1, 1, 2, 2, 1, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */
+ CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration for initial test run (increased for each retry when test fails) */
+ CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */
+ CU_DataPoints(uint32_t, 1, 0, 2, 0, 1, 0, 0, 1, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */
+ CU_DataPoints(uint32_t, 1, 1, 2, 2, 0, 0, 0, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */
+ CU_DataPoints(uint32_t, 1, 1, 2, 2, 1, 1, 1, 1, 0, 1, 1, 2, 5, 0, 10), /* number of writers with manual-by-topic liveliness */
};
-CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60)
+CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 120)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t *writers;
- dds_qos_t *rqos, *wqos_auto, *wqos_man_pp;
+ dds_qos_t *rqos, *wqos_auto, *wqos_man_pp, *wqos_man_tp;
dds_entity_t waitset;
dds_attach_t triggered;
struct dds_liveliness_changed_status lstatus;
- uint32_t status;
- size_t n, run = 1;
+ uint32_t status, n, run = 1, wr_cnt = wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp;
char name[100];
- size_t wr_cnt = wr_cnt_auto + wr_cnt_man_pp;
dds_time_t tstart, t;
bool test_finished = false;
do
{
tstart = dds_time();
- printf("%d.%06d running test: lease duration %d, delay %f, auto/manual-by-participant %zu/%zu\n",
+ printf("%d.%06d running test: lease duration %d, delay %f, auto/man-by-part/man-by-topic %u/%u/%u\n",
(int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000,
- ldur, mult, wr_cnt_auto, wr_cnt_man_pp);
+ ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp);
/* topics */
- create_topic_name("ddsc_liveliness_test", g_topic_nr++, name, sizeof name);
+ create_topic_name("ddsc_liveliness_expire_kinds", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
@@ -245,6 +271,8 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
dds_qset_liveliness(wqos_auto, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((wqos_man_pp = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_man_pp, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur));
+ CU_ASSERT_FATAL((wqos_man_tp = dds_create_qos()) != NULL);
+ dds_qset_liveliness(wqos_man_tp, DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
@@ -252,17 +280,20 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
writers = dds_alloc(wr_cnt * sizeof(dds_entity_t));
for (n = 0; n < wr_cnt; n++)
{
- CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, n < wr_cnt_auto ? wqos_auto : wqos_man_pp, NULL)) > 0);
+ dds_qos_t *wqos;
+ wqos = n < wr_cnt_auto ? wqos_auto : (n < (wr_cnt_auto + wr_cnt_man_pp) ? wqos_man_pp : wqos_man_tp);
+ CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
}
dds_delete_qos(wqos_auto);
dds_delete_qos(wqos_man_pp);
+ dds_delete_qos(wqos_man_tp);
t = dds_time();
if (t - tstart > DDS_MSECS(0.5 * ldur))
{
- ldur *= 10;
+ ldur *= 10 / (run + 1);
printf("%d.%06d failed to create writers in time\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
}
@@ -274,7 +305,7 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt);
dds_time_t tstop = tstart + DDS_MSECS((dds_duration_t)(mult * ldur));
- size_t stopped = 0;
+ uint32_t stopped = 0;
do
{
dds_duration_t w = tstop - dds_time();
@@ -283,13 +314,13 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
stopped += (uint32_t)lstatus.not_alive_count_change;
} while (dds_time() < tstop);
t = dds_time();
- printf("%d.%06d writers stopped: %zu\n",
+ printf("%d.%06d writers stopped: %u\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, stopped);
- size_t exp_stopped = mult < 1 ? 0 : wr_cnt_man_pp;
+ size_t exp_stopped = mult < 1 ? 0 : (wr_cnt_man_pp + wr_cnt_man_tp);
if (stopped != exp_stopped)
{
- ldur *= 10;
+ ldur *= 10 / (run + 1);
printf("%d.%06d incorrect number of stopped writers\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
}
@@ -315,7 +346,7 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
if (!test_finished)
{
- if (run++ > 2)
+ if (++run > 3)
{
printf("%d.%06d run limit reached, test failed\n", (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000);
CU_FAIL_FATAL("Run limit reached");
@@ -354,6 +385,10 @@ static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
}
+/**
+ * Test that the correct PMD interval is set for the participant
+ * based on the lease duration of the writers.
+ */
#define MAX_WRITERS 10
CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveliness_fini)
{
@@ -361,7 +396,7 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writers[MAX_WRITERS];
- size_t wr_cnt = 0;
+ uint32_t wr_cnt = 0;
char name[100];
dds_qos_t *rqos;
uint32_t n;
@@ -397,6 +432,9 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
+ add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
+ CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
+
/* cleanup */
for (n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
@@ -406,6 +444,9 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
}
#undef MAX_WRITERS
+/**
+ * Check that the correct lease duration is set in the matched
+ * publications in the readers. */
CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini)
{
dds_entity_t pub_topic;
@@ -465,6 +506,12 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}
+/**
+ * Create a relative large number of writers with liveliness kinds automatic and
+ * manual-by-participant and with decreasing lease duration, and check that all
+ * writers become alive. During the writer creation loop, every third writer
+ * is deleted immediately after creating.
+ */
#define MAX_WRITERS 100
CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini)
{
@@ -475,11 +522,11 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
dds_entity_t waitset;
dds_qos_t *wqos;
struct dds_liveliness_changed_status lstatus;
- size_t wr_cnt = 0;
+ uint32_t wr_cnt = 0;
char name[100];
dds_qos_t *rqos;
dds_attach_t triggered;
- uint32_t n, status;
+ uint32_t n;
Space_Type1 sample = { 0, 0, 0 };
/* topics */
@@ -501,12 +548,11 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500));
CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
- CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* create writers */
for (n = 1; n < MAX_WRITERS; n++)
{
- dds_qset_liveliness(wqos, n % 1 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500 - n));
+ dds_qset_liveliness(wqos, n % 2 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (n % 3 ? 500 + n : 500 - n));
CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
dds_write (writers[n], &sample);
if (n % 3 == 2)
@@ -537,6 +583,9 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
}
#undef MAX_WRITERS
+/**
+ * Check the counts in liveliness_changed_status result.
+ */
CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini)
{
dds_entity_t pub_topic;
@@ -547,7 +596,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
dds_qos_t *rqos;
dds_qos_t *wqos;
dds_attach_t triggered;
- uint32_t status = 0;
struct dds_liveliness_changed_status lstatus;
struct dds_subscription_matched_status sstatus;
char name[100];
@@ -576,7 +624,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
/* wait for writer to be alive */
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
- CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check status counts before proxy writer is expired */
dds_get_liveliness_changed_status(reader, &lstatus);
@@ -587,7 +634,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
/* sleep for more than lease duration, writer should be set not-alive but subscription still matched */
dds_sleepfor(ldur + DDS_MSECS(100));
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
- CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0);
@@ -597,7 +643,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
/* write sample and re-check status counts */
dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
- CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
@@ -612,3 +657,96 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
}
+
+/**
+ * Test that dds_assert_liveliness works as expected for liveliness
+ * kinds manual-by-participant and manual-by-topic.
+ */
+#define MAX_WRITERS 100
+CU_TheoryDataPoints(ddsc_liveliness, assert_liveliness) = {
+ CU_DataPoints(uint32_t, 1, 0, 0, 1), /* number of writers with automatic liveliness */
+ CU_DataPoints(uint32_t, 1, 1, 0, 0), /* number of writers with manual-by-participant liveliness */
+ CU_DataPoints(uint32_t, 1, 1, 1, 2), /* number of writers with manual-by-topic liveliness */
+};
+CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, assert_liveliness, .init = liveliness_init, .fini = liveliness_fini, .timeout=30)
+{
+ dds_entity_t pub_topic;
+ dds_entity_t sub_topic;
+ dds_entity_t reader;
+ dds_entity_t writers[MAX_WRITERS];
+ dds_qos_t *rqos;
+ struct dds_liveliness_changed_status lstatus;
+ char name[100];
+ dds_duration_t ldur = DDS_MSECS (300);
+ uint32_t wr_cnt = 0;
+ dds_time_t tstop;
+ uint32_t stopped;
+
+ assert (wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp < MAX_WRITERS);
+ printf("running test assert_liveliness: auto/man-by-part/man-by-topic %u/%u/%u\n", wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp);
+
+ /* topics */
+ create_topic_name("ddsc_liveliness_assert", g_topic_nr++, name, sizeof name);
+ CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
+ CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
+
+ /* reader */
+ CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
+ dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
+ CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
+ dds_delete_qos(rqos);
+ CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
+
+ /* writers */
+ for (size_t n = 0; n < wr_cnt_auto; n++)
+ add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, ldur, &writers[wr_cnt++], pub_topic, reader);
+ for (size_t n = 0; n < wr_cnt_man_pp; n++)
+ add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur, &writers[wr_cnt++], pub_topic, reader);
+ for (size_t n = 0; n < wr_cnt_man_tp; n++)
+ add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, ldur, &writers[wr_cnt++], pub_topic, reader);
+
+ /* check status counts before proxy writer is expired */
+ dds_get_liveliness_changed_status(reader, &lstatus);
+ CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp);
+
+ /* delay for more than lease duration and assert liveliness on writers:
+ all writers (including man-by-pp) should be kept alive */
+ tstop = dds_time() + 4 * ldur / 3;
+ stopped = 0;
+ do
+ {
+ for (size_t n = wr_cnt - wr_cnt_man_tp; n < wr_cnt; n++)
+ dds_assert_liveliness (writers[n]);
+ CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
+ stopped += (uint32_t)lstatus.not_alive_count_change;
+ dds_sleepfor (DDS_MSECS(50));
+ } while (dds_time() < tstop);
+ CU_ASSERT_EQUAL_FATAL(stopped, 0);
+ dds_get_liveliness_changed_status(reader, &lstatus);
+ CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp);
+
+ /* delay for more than lease duration and assert liveliness on participant:
+ writers with liveliness man-by-pp should be kept alive, man-by-topic writers
+ should stop */
+ tstop = dds_time() + 4 * ldur / 3;
+ stopped = 0;
+ do
+ {
+ dds_assert_liveliness (g_pub_participant);
+ CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
+ stopped += (uint32_t)lstatus.not_alive_count_change;
+ dds_sleepfor (DDS_MSECS(50));
+ } while (dds_time() < tstop);
+ CU_ASSERT_EQUAL_FATAL(stopped, wr_cnt_man_tp);
+ dds_get_liveliness_changed_status(reader, &lstatus);
+ printf("writers alive_count: %d\n", lstatus.alive_count);
+ CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp);
+
+ /* cleanup */
+ CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
+ for (size_t n = 0; n < wr_cnt; n++)
+ CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
+ CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
+}
+#undef MAX_WRITERS
diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h
index b6b0877..12e62dd 100644
--- a/src/core/ddsi/include/dds/ddsi/q_entity.h
+++ b/src/core/ddsi/include/dds/ddsi/q_entity.h
@@ -323,6 +323,7 @@ struct proxy_participant
unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */
unsigned minimal_bes_mode: 1;
unsigned lease_expired: 1;
+ unsigned deleting: 1;
unsigned proxypp_have_spdp: 1;
unsigned proxypp_have_cm: 1;
unsigned owns_lease: 1;
@@ -369,7 +370,7 @@ struct proxy_writer {
unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */
unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */
unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */
- unsigned alive: 1; /* iff 1, the proxy writer is alive (lease for this proxy writer is not expired) */
+ unsigned alive: 1; /* iff 1, the proxy writer is alive (lease for this proxy writer is not expired); field may be modified only when holding both pwr->e.lock and pwr->c.proxypp->e.lock */
#ifdef DDSI_INCLUDE_SSM
unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */
#endif
@@ -645,14 +646,14 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
reader or writer. Actual deletion is scheduled in the future, when
no outstanding references may still exist (determined by checking
thread progress, &c.). */
-int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit, bool proxypp_locked);
+int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
-int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive);
-int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *guid, bool alive);
+int proxy_writer_set_alive (struct proxy_writer *pwr);
+int proxy_writer_set_notalive (struct proxy_writer *pwr);
int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
@@ -661,6 +662,8 @@ void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid,
rebuild them all (which only makes sense after previously having emptied them all). */
void rebuild_or_clear_writer_addrsets(struct q_globals *gv, int rebuild);
+void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch);
+
void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok);
struct ddsi_writer_info;
diff --git a/src/core/ddsi/include/dds/ddsi/q_transmit.h b/src/core/ddsi/include/dds/ddsi/q_transmit.h
index c7899e6..b88b999 100644
--- a/src/core/ddsi/include/dds/ddsi/q_transmit.h
+++ b/src/core/ddsi/include/dds/ddsi/q_transmit.h
@@ -42,7 +42,8 @@ int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *x
/* When calling the following functions, wr->lock must be held */
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew);
-void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync);
+void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync);
+dds_return_t write_hb_liveliness (struct q_globals * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp);
#if defined (__cplusplus)
}
diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c
index b53b35b..f9f13c2 100644
--- a/src/core/ddsi/src/q_ddsi_discovery.c
+++ b/src/core/ddsi/src/q_ddsi_discovery.c
@@ -1357,13 +1357,9 @@ static void handle_SEDP_dead (const struct receiver_state *rst, nn_plist_t *data
}
GVLOGDISC (" "PGUIDFMT, PGUID (datap->endpoint_guid));
if (is_writer_entityid (datap->endpoint_guid.entityid))
- {
- res = delete_proxy_writer (gv, &datap->endpoint_guid, timestamp, 0, false);
- }
+ res = delete_proxy_writer (gv, &datap->endpoint_guid, timestamp, 0);
else
- {
res = delete_proxy_reader (gv, &datap->endpoint_guid, timestamp, 0);
- }
GVLOGDISC (" %s\n", (res < 0) ? " unknown" : " delete");
}
diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c
index f9adc0b..475d690 100644
--- a/src/core/ddsi/src/q_entity.c
+++ b/src/core/ddsi/src/q_entity.c
@@ -1488,7 +1488,7 @@ static void writer_drop_local_connection (const struct ddsi_guid *wr_guid, struc
}
}
-static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch)
+void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch)
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
@@ -3619,16 +3619,20 @@ void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct
static void proxy_participant_replace_minl (struct proxy_participant *proxypp, bool manbypp, struct lease *lnew)
{
+ /* By loading/storing the pointer atomically, we ensure we always
+ read a valid (or once valid) lease. By delaying freeing the lease
+ through the garbage collector, we ensure whatever lease update
+ occurs in parallel completes before the memory is released. */
const nn_etime_t never = { T_NEVER };
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
- struct lease *lease_old = (struct lease *) ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto);
- lease_renew (lease_old, never);
- gcreq->arg = (void *) lease_old;
+ struct lease *lease_old = ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto);
+ lease_renew (lease_old, never); /* ensures lease will not expire while it is replaced */
+ gcreq->arg = lease_old;
gcreq_enqueue (gcreq);
- ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, (void *) lnew);
+ ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, lnew);
}
-static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp, const struct proxy_writer * pwr)
+static void proxy_participant_add_pwr_lease_locked (struct proxy_participant * proxypp, const struct proxy_writer * pwr)
{
struct lease *minl_prev;
struct lease *minl_new;
@@ -3637,7 +3641,6 @@ static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp,
assert (pwr->lease != NULL);
manbypp = (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT);
- ddsrt_mutex_lock (&proxypp->e.lock);
lh = manbypp ? &proxypp->leaseheap_man : &proxypp->leaseheap_auto;
minl_prev = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
ddsrt_fibheap_insert (&lease_fhdef_proxypp, lh, pwr->lease);
@@ -3647,10 +3650,11 @@ static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp,
{
nn_etime_t texp = add_duration_to_etime (now_et (), minl_new->tdur);
struct lease *lnew = lease_new (texp, minl_new->tdur, minl_new->entity);
- if (manbypp && minl_prev == NULL)
+ if (minl_prev == NULL)
{
+ assert (manbypp);
assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL);
- ddsrt_atomic_stvoidp (&proxypp->minl_man, (void *) lnew);
+ ddsrt_atomic_stvoidp (&proxypp->minl_man, lnew);
}
else
{
@@ -3658,7 +3662,6 @@ static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp,
}
lease_register (lnew);
}
- ddsrt_mutex_unlock (&proxypp->e.lock);
}
static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant * proxypp, struct proxy_writer * pwr)
@@ -3684,25 +3687,14 @@ static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant
proxy_participant_replace_minl (proxypp, manbypp, lnew);
lease_register (lnew);
}
- else if (manbypp)
- {
- proxy_participant_replace_minl (proxypp, manbypp, NULL);
- }
else
{
- /* minl should not be null for leaseheap_auto because proxypp's lease is in */
- assert (false);
+ assert (manbypp);
+ proxy_participant_replace_minl (proxypp, manbypp, NULL);
}
}
}
-static void proxy_participant_remove_pwr_lease (struct proxy_participant * proxypp, struct proxy_writer * pwr)
-{
- ddsrt_mutex_lock (&proxypp->e.lock);
- proxy_participant_remove_pwr_lease_locked (proxypp, pwr);
- ddsrt_mutex_unlock (&proxypp->e.lock);
-}
-
void new_proxy_participant
(
struct q_globals *gv,
@@ -3736,6 +3728,7 @@ void new_proxy_participant
entity_common_init (&proxypp->e, gv, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false);
proxypp->refc = 1;
proxypp->lease_expired = 0;
+ proxypp->deleting = 0;
proxypp->vendor = vendor;
proxypp->bes = bes;
proxypp->prismtech_bes = prismtech_bes;
@@ -3789,8 +3782,11 @@ void new_proxy_participant
and uses the shortest duration for proxypp and all its pwr's (with automatic liveliness) */
ddsrt_fibheap_insert (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
- /* Set the shortest lease for auto liveliness: clone proxypp's lease (as there are no pwr's
- at this point, this is the shortest lease) */
+ /* Set the shortest lease for auto liveliness: clone proxypp's lease and store the clone in
+ proxypp->minl_auto. As there are no pwr's at this point, the proxy pp's lease is the
+ shortest lease. When a pwr with a shorter is added, the lease in minl_auto is replaced
+ by the lease from the proxy writer in proxy_participant_add_pwr_lease_locked. This old shortest
+ lease is freed, so that's why we need a clone and not the proxypp's lease in the heap. */
ddsrt_atomic_stvoidp (&proxypp->minl_auto, (void *) lease_clone (proxypp->lease));
ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL);
}
@@ -3948,9 +3944,14 @@ int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t s
return 0;
}
-static void ref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
+static int ref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
{
ddsrt_mutex_lock (&proxypp->e.lock);
+ if (proxypp->deleting)
+ {
+ ddsrt_mutex_unlock (&proxypp->e.lock);
+ return DDS_RETCODE_PRECONDITION_NOT_MET;
+ }
c->proxypp = proxypp;
proxypp->refc++;
@@ -3962,6 +3963,8 @@ static void ref_proxy_participant (struct proxy_participant *proxypp, struct pro
}
proxypp->endpoints = c;
ddsrt_mutex_unlock (&proxypp->e.lock);
+
+ return DDS_RETCODE_OK;
}
static void unref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
@@ -4070,8 +4073,9 @@ static void delete_or_detach_dependent_pp (struct proxy_participant *p, struct p
static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp, int isimplicit)
{
- struct proxy_endpoint_common * c;
- int ret;
+ ddsi_entityid_t *eps;
+ ddsi_guid_t ep_guid;
+ uint32_t ep_count = 0;
/* if any proxy participants depend on this participant, delete them */
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting dependent proxy participants\n", PGUID (proxypp->e.guid));
@@ -4084,31 +4088,38 @@ static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp
ephash_enum_proxy_participant_fini (&est);
}
- /* delete_proxy_{reader,writer} merely schedules the actual delete
- operation, so we can hold the lock -- at least, for now. */
-
ddsrt_mutex_lock (&proxypp->e.lock);
+ proxypp->deleting = 1;
if (isimplicit)
proxypp->lease_expired = 1;
- ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting endpoints\n", PGUID (proxypp->e.guid));
- c = proxypp->endpoints;
- while (c)
+ /* Get snapshot of endpoints list so that we can release proxypp->e.lock
+ Pwrs/prds may be deleted during the iteration over the entities,
+ but resolving the guid will fail for these entities and the our
+ call to delete_proxy_writer/reader returns. */
{
- struct entity_common *e = entity_common_from_proxy_endpoint_common (c);
- if (is_writer_entityid (e->guid.entityid))
+ eps = ddsrt_malloc (proxypp->refc * sizeof(ddsi_entityid_t));
+ struct proxy_endpoint_common *cep = proxypp->endpoints;
+ while (cep)
{
- ret = delete_proxy_writer (proxypp->e.gv, &e->guid, timestamp, isimplicit, true);
+ const struct entity_common *entc = entity_common_from_proxy_endpoint_common (cep);
+ eps[ep_count++] = entc->guid.entityid;
+ cep = cep->next_ep;
}
- else
- {
- ret = delete_proxy_reader (proxypp->e.gv, &e->guid, timestamp, isimplicit);
- }
- (void) ret;
- c = c->next_ep;
}
ddsrt_mutex_unlock (&proxypp->e.lock);
+ ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting endpoints\n", PGUID (proxypp->e.guid));
+ ep_guid.prefix = proxypp->e.guid.prefix;
+ for (uint32_t n = 0; n < ep_count; n++)
+ {
+ ep_guid.entityid = eps[n];
+ if (is_writer_entityid (ep_guid.entityid))
+ delete_proxy_writer (proxypp->e.gv, &ep_guid, timestamp, isimplicit);
+ else
+ delete_proxy_reader (proxypp->e.gv, &ep_guid, timestamp, isimplicit);
+ }
+ ddsrt_free (eps);
gcreq_proxy_participant (proxypp);
}
@@ -4185,9 +4196,10 @@ uint64_t get_entity_instance_id (const struct q_globals *gv, const struct ddsi_g
/* PROXY-ENDPOINT --------------------------------------------------- */
-static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct ddsi_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
+static int proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct ddsi_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
{
const char *name;
+ int ret;
if (is_builtin_entityid (guid->entityid, proxypp->vendor))
assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == 0);
@@ -4206,7 +4218,16 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
else
memset (&c->group_guid, 0, sizeof (c->group_guid));
- ref_proxy_participant (proxypp, c);
+ if ((ret = ref_proxy_participant (proxypp, c)) != DDS_RETCODE_OK)
+ {
+ nn_xqos_fini (c->xqos);
+ ddsrt_free (c->xqos);
+ unref_addrset (c->as);
+ entity_common_fini (e);
+ return ret;
+ }
+
+ return DDS_RETCODE_OK;
}
static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_endpoint_common *c)
@@ -4226,6 +4247,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
struct proxy_writer *pwr;
int isreliable;
nn_mtime_t tnow = now_mt ();
+ int ret;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_writer_guid (gv->guid_hash, guid) == NULL);
@@ -4237,7 +4259,11 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
}
pwr = ddsrt_malloc (sizeof (*pwr));
- proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist);
+ if ((ret = proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist)) != DDS_RETCODE_OK)
+ {
+ ddsrt_free (pwr);
+ return ret;
+ }
ddsrt_avl_init (&pwr_readers_treedef, &pwr->readers);
pwr->n_reliable_readers = 0;
@@ -4275,7 +4301,19 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
nn_etime_t texpire = add_duration_to_etime (now_et (), pwr->c.xqos->liveliness.lease_duration);
pwr->lease = lease_new (texpire, pwr->c.xqos->liveliness.lease_duration, &pwr->e);
if (pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
- proxy_participant_add_pwr_lease (proxypp, pwr);
+ {
+ ddsrt_mutex_lock (&proxypp->e.lock);
+ proxy_participant_add_pwr_lease_locked (proxypp, pwr);
+ ddsrt_mutex_unlock (&proxypp->e.lock);
+ }
+ else
+ {
+ lease_register (pwr->lease);
+ }
+ }
+ else
+ {
+ pwr->lease = NULL;
}
if (isreliable)
@@ -4419,17 +4457,22 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
free_pwr_rd_match (m);
}
local_reader_ary_fini (&pwr->rdary);
+ if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
+ lease_free (pwr->lease);
proxy_endpoint_common_fini (&pwr->e, &pwr->c);
nn_defrag_free (pwr->defrag);
nn_reorder_free (pwr->reorder);
ddsrt_free (pwr);
}
-int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit, bool proxypp_locked)
+/* First stage in deleting the proxy writer. In this function the pwr and its member pointers
+ will remain valid. The real cleaning-up is done async in gc_delete_proxy_writer. */
+int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit)
{
struct proxy_writer *pwr;
- (void)isimplicit;
+ DDSRT_UNUSED_ARG (isimplicit);
GVLOGDISC ("delete_proxy_writer ("PGUIDFMT") ", PGUID (*guid));
+
ddsrt_mutex_lock (&gv->lock);
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
{
@@ -4446,66 +4489,60 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (gv->guid_hash, pwr);
ddsrt_mutex_unlock (&gv->lock);
+ if (proxy_writer_set_notalive (pwr) != DDS_RETCODE_OK)
+ GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid));
+ /* Set lease expiry for this pwr to never so that the pwr will not be set
+ to alive again while it is scheduled for being deleted. */
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
- {
- ddsrt_mutex_lock (&pwr->e.lock);
- if (pwr->alive && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
- proxypp_locked ? proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr) : proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr);
- ddsrt_mutex_unlock (&pwr->e.lock);
- lease_free (pwr->lease);
- }
+ lease_renew (pwr->lease, (nn_etime_t){ T_NEVER });
gcreq_proxy_writer (pwr);
- return 0;
+ return DDS_RETCODE_OK;
}
-int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive)
+int proxy_writer_set_alive (struct proxy_writer *pwr)
{
+ /* Caller has pwr->e.lock, so we can safely read pwr->alive. For updating
+ * this field this function is also taking pwr->c.proxypp->e.lock */
ddsrt_avl_iter_t it;
- assert (pwr->alive != alive);
- pwr->alive = alive;
- GVLOGDISC (" alive=%d", pwr->alive);
if (pwr->alive)
+ return DDS_RETCODE_PRECONDITION_NOT_MET;
+ ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
+ pwr->alive = true;
+
+ for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
{
- for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
+ struct reader *rd;
+ if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
{
- struct reader *rd;
- if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
- {
- status_cb_data_t data;
- data.add = true;
- data.handle = pwr->e.iid;
- data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
- (rd->status_cb) (rd->status_cb_entity, &data);
- }
+ status_cb_data_t data;
+ data.add = true;
+ data.handle = pwr->e.iid;
+ data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
+ (rd->status_cb) (rd->status_cb_entity, &data);
}
- if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
- proxy_participant_add_pwr_lease (pwr->c.proxypp, pwr);
}
+ if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
+ proxy_participant_add_pwr_lease_locked (pwr->c.proxypp, pwr);
+ ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
+ return DDS_RETCODE_OK;
+}
+
+int proxy_writer_set_notalive (struct proxy_writer *pwr)
+{
+ /* Caller should not have taken pwr->e.lock and pwr->c.proxypp->e.lock;
+ * this function takes both locks to update pwr->alive value */
+ int ret = DDS_RETCODE_OK;
+ ddsrt_mutex_lock (&pwr->e.lock);
+ if (!pwr->alive)
+ ret = DDS_RETCODE_PRECONDITION_NOT_MET;
else
{
- for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
- reader_drop_connection (&m->rd_guid, pwr, false);
+ ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
+ pwr->alive = false;
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
- proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr);
+ proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr);
+ ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
}
- return 0;
-}
-
-int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *guid, bool alive)
-{
- struct proxy_writer *pwr;
- int ret;
- ddsrt_mutex_lock (&gv->lock);
- if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
- {
- ddsrt_mutex_unlock (&gv->lock);
- GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*guid));
- return DDS_RETCODE_BAD_PARAMETER;
- }
- ddsrt_mutex_unlock (&gv->lock);
-
- ddsrt_mutex_lock (&pwr->e.lock);
- ret = proxy_writer_set_alive_locked (gv, pwr, alive);
ddsrt_mutex_unlock (&pwr->e.lock);
return ret;
}
@@ -4521,6 +4558,7 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
struct proxy_participant *proxypp;
struct proxy_reader *prd;
nn_mtime_t tnow = now_mt ();
+ int ret;
assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_reader_guid (gv->guid_hash, guid) == NULL);
@@ -4532,7 +4570,11 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
}
prd = ddsrt_malloc (sizeof (*prd));
- proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist);
+ if ((ret = proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist)) != DDS_RETCODE_OK)
+ {
+ ddsrt_free (prd);
+ return ret;
+ }
prd->deleting = 0;
#ifdef DDSI_INCLUDE_SSM
@@ -4549,7 +4591,7 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
ddsrt_mutex_unlock (&prd->e.lock);
match_proxy_reader_with_writers (prd, tnow);
- return 0;
+ return DDS_RETCODE_OK;
}
static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *prd)
diff --git a/src/core/ddsi/src/q_lease.c b/src/core/ddsi/src/q_lease.c
index f107855..0db530c 100644
--- a/src/core/ddsi/src/q_lease.c
+++ b/src/core/ddsi/src/q_lease.c
@@ -140,7 +140,6 @@ void lease_free (struct lease *l)
void lease_renew (struct lease *l, nn_etime_t tnowE)
{
- struct q_globals const * gv;
nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur);
/* do not touch tend if moving forward or if already expired */
@@ -151,7 +150,11 @@ void lease_renew (struct lease *l, nn_etime_t tnowE)
return;
} while (!ddsrt_atomic_cas64 (&l->tend, (uint64_t) tend, (uint64_t) tend_new.v));
- gv = l->entity->gv;
+ /* Only at this point we can assume that gv can be recovered from the entity in the
+ * lease (i.e. the entity still exists). In cases where dereferencing l->entity->gv
+ * is not safe (e.g. the deletion of entities), the early out in the loop above
+ * will be the case because tend is set to T_NEVER. */
+ struct q_globals const * gv = l->entity->gv;
if (gv->logconfig.c.mask & DDS_LC_TRACE)
{
int32_t tsec, tusec;
@@ -272,10 +275,27 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
delete_proxy_participant_by_guid (gv, &g, now(), 1);
break;
case EK_PROXY_WRITER:
- GVLOGDISC ("proxy_writer_set_alive ("PGUIDFMT")", PGUID (g));
- (void) proxy_writer_set_alive_guid (gv, &g, false);
+ {
+ struct proxy_writer *pwr;
+ ddsrt_avl_iter_t it;
+ if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, &g)) == NULL)
+ {
+ GVLOGDISC (" "PGUIDFMT"?\n", PGUID (g));
+ ddsrt_mutex_lock (&gv->leaseheap_lock);
+ continue;
+ }
+ GVLOGDISC ("proxy_writer_set_notalive ("PGUIDFMT")", PGUID (g));
+ if (proxy_writer_set_notalive (pwr) == DDS_RETCODE_PRECONDITION_NOT_MET)
+ {
+ GVLOGDISC (" pwr was not alive");
+ ddsrt_mutex_lock (&gv->leaseheap_lock);
+ continue;
+ }
GVLOGDISC ("\n");
+ for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
+ reader_drop_connection (&m->rd_guid, pwr, false);
break;
+ }
case EK_PARTICIPANT:
case EK_READER:
case EK_WRITER:
@@ -283,7 +303,6 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
assert (false);
break;
}
-
ddsrt_mutex_lock (&gv->leaseheap_lock);
}
diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c
index 4bea8e8..b026022 100644
--- a/src/core/ddsi/src/q_receive.c
+++ b/src/core/ddsi/src/q_receive.c
@@ -548,7 +548,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *
}
/* Send a Heartbeat just to this peer */
- add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 0);
+ add_Heartbeat (m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0);
ETRACE (wr, "force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid));
qxev_msg (wr->evq, m);
@@ -1131,12 +1131,18 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
RSTTRACE (PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1;
}
-
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
RSTTRACE (PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst));
-
ddsrt_mutex_lock (&pwr->e.lock);
-
+ if (msg->smhdr.flags & HEARTBEAT_FLAG_LIVELINESS &&
+ pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC &&
+ pwr->c.xqos->liveliness.lease_duration != T_NEVER)
+ {
+ struct lease *lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man);
+ if (lease != NULL)
+ lease_renew (lease, tnow);
+ lease_renew (pwr->lease, tnow);
+ }
if (pwr->n_reliable_readers == 0)
{
RSTTRACE (PGUIDFMT" -> "PGUIDFMT" no-reliable-readers)", PGUID (src), PGUID (dst));
@@ -2087,7 +2093,8 @@ static void clean_defrag (struct proxy_writer *pwr)
nn_defrag_notegap (pwr->defrag, 1, seq);
}
-static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease)
+static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo,
+ uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease)
{
struct proxy_writer *pwr;
struct nn_rsample *rsample;
@@ -2114,13 +2121,11 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
/* Shouldn't lock the full writer, but will do so for now */
ddsrt_mutex_lock (&pwr->e.lock);
- /* FIXME: implement liveliness manual-by-topic */
- /* if (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
+ if (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC && pwr->c.xqos->liveliness.lease_duration != T_NEVER)
lease_renew (pwr->lease, tnow);
- */
if (!pwr->alive)
- proxy_writer_set_alive_locked (pwr->e.gv, pwr, true);
+ proxy_writer_set_alive (pwr);
/* Don't accept data when reliable readers exist and we haven't yet seen
a heartbeat telling us what the "current" sequence number of the writer
diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c
index 7afd1fe..6d2a69a 100644
--- a/src/core/ddsi/src/q_transmit.c
+++ b/src/core/ddsi/src/q_transmit.c
@@ -194,7 +194,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
- add_Heartbeat (msg, wr, whcst, hbansreq, to_entityid (NN_ENTITYID_UNKNOWN), issync);
+ add_Heartbeat (msg, wr, whcst, hbansreq, 0, to_entityid (NN_ENTITYID_UNKNOWN), issync);
}
else
{
@@ -215,7 +215,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
- add_Heartbeat (msg, wr, whcst, hbansreq, prd_guid->entityid, issync);
+ add_Heartbeat (msg, wr, whcst, hbansreq, 0, prd_guid->entityid, issync);
}
writer_hbcontrol_note_hb (wr, tnow, hbansreq);
@@ -313,7 +313,7 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_
return msg;
}
-void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync)
+void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync)
{
struct q_globals const * const gv = wr->e.gv;
struct nn_xmsg_marker sm_marker;
@@ -324,6 +324,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
assert (wr->reliable);
assert (hbansreq >= 0);
+ assert (hbliveliness >= 0);
if (gv->config.meas_hb_to_ack_latency)
{
@@ -337,6 +338,8 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
if (!hbansreq)
hb->smhdr.flags |= HEARTBEAT_FLAG_FINAL;
+ if (hbliveliness)
+ hb->smhdr.flags |= HEARTBEAT_FLAG_LIVELINESS;
hb->readerId = nn_hton_entityid (dst);
hb->writerId = nn_hton_entityid (wr->e.guid.entityid);
@@ -666,6 +669,34 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn
nn_xmsg_submsg_setnext (*pmsg, sm_marker);
}
+dds_return_t write_hb_liveliness (struct q_globals * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp)
+{
+ struct nn_xmsg *msg = NULL;
+ struct whc_state whcst;
+ struct thread_state1 * const ts1 = lookup_thread_state ();
+ thread_state_awake (ts1, gv);
+ struct writer *wr = ephash_lookup_writer_guid (gv->guid_hash, wr_guid);
+ if (wr == NULL)
+ {
+ GVTRACE ("write_hb_liveliness("PGUIDFMT") - writer not found\n", PGUID (*wr_guid));
+ return DDS_RETCODE_PRECONDITION_NOT_MET;
+ }
+ if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL)
+ return DDS_RETCODE_OUT_OF_RESOURCES;
+ ddsrt_mutex_lock (&wr->e.lock);
+ nn_xmsg_setdstN (msg, wr->as, wr->as_group);
+#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
+ nn_xmsg_setencoderid (msg, wr->partition_id);
+#endif
+ whc_get_state (wr->whc, &whcst);
+ add_Heartbeat (msg, wr, &whcst, 0, 1, to_entityid (NN_ENTITYID_UNKNOWN), 1);
+ ddsrt_mutex_unlock (&wr->e.lock);
+ nn_xpack_addmsg (xp, msg, 0);
+ nn_xpack_send (xp, true);
+ thread_state_asleep (ts1);
+ return DDS_RETCODE_OK;
+}
+
#if 0
static int must_skip_frag (const char *frags_to_skip, unsigned frag)
{