Liveliness QoS implementation for kind manual_by_topic

This commit adds the implementation for the liveliness kind manual-by-topic.
With these changes, the api function dds_assert_liveliness now accepts a
writer as entity. Asserting liveliness on a writer triggers sending a
heartbeat message with the liveliness flag set.

The code for handling reception of a heartbeat message checks for this flag and
if set the lease for the proxy writer is renewed (and the shortest manual-by-participant
lease on the proxy participant as well, because the message also indicates that the
remote participant is alive). Receiving data (handle_regular) also renews the
lease on the proxy writer in case it has the manual-by-topic liveliness kind.

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>

Refactored locking for pwr->alive so that locking order is consistent (locking
pwr->c.proxypp->e.lock while holding pwr->e.lock is the expected order). And
processed other review comments: removed lock for ephash_lookup, added
additional comments, set pwr->lease to null if not initialised.

Because of intermittent timing issues with liveliness expiry test in Travis, I've
increase the time-out and retry limit for this test.

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>

Check that proxy writer is still alive (could be not-alive due to deleting) in code path for proxy writer's lease expiry

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>

Some additional refactoring in locking for pwr->alive for liveliness qos, moved lease free to gc_delete_pwr, refactored the set pwr alive/not alive functions and some minor changes in liveliness tests

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>

Fixed building liveliness tests on Windows and some cleaning up in liveliness test code

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
Dennis Potman 2019-11-13 16:32:21 +01:00 committed by Erik Boasson
parent 32c5a59c8f
commit 476507fd5d
9 changed files with 409 additions and 170 deletions

View file

@ -24,6 +24,7 @@
#include "dds/version.h" #include "dds/version.h"
#include "dds/ddsi/ddsi_pmd.h" #include "dds/ddsi/ddsi_pmd.h"
#include "dds/ddsi/q_xqos.h" #include "dds/ddsi/q_xqos.h"
#include "dds/ddsi/q_transmit.h"
extern inline dds_entity *dds_entity_from_handle_link (struct dds_handle_link *hdllink); extern inline dds_entity *dds_entity_from_handle_link (struct dds_handle_link *hdllink);
extern inline bool dds_entity_is_enabled (const dds_entity *e); extern inline bool dds_entity_is_enabled (const dds_entity *e);
@ -1387,9 +1388,9 @@ dds_return_t dds_generic_unimplemented_operation (dds_entity_t handle, dds_entit
dds_return_t dds_assert_liveliness (dds_entity_t entity) dds_return_t dds_assert_liveliness (dds_entity_t entity)
{ {
dds_return_t rc; dds_return_t rc;
dds_entity *e; dds_entity *e, *ewr;
if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc; return rc;
switch (dds_entity_kind (e)) switch (dds_entity_kind (e))
{ {
@ -1398,8 +1399,11 @@ dds_return_t dds_assert_liveliness (dds_entity_t entity)
break; break;
} }
case DDS_KIND_WRITER: { case DDS_KIND_WRITER: {
/* FIXME: implement liveliness manual-by-topic */ if ((rc = dds_entity_lock (entity, DDS_KIND_WRITER, &ewr)) != DDS_RETCODE_OK)
rc = DDS_RETCODE_UNSUPPORTED; return rc;
if ((rc = write_hb_liveliness (&e->m_domain->gv, &e->m_guid, ((struct dds_writer *)ewr)->m_xp)) != DDS_RETCODE_OK)
return rc;
dds_entity_unlock (e);
break; break;
} }
default: { default: {
@ -1407,6 +1411,6 @@ dds_return_t dds_assert_liveliness (dds_entity_t entity)
break; break;
} }
} }
dds_entity_unlock (e); dds_entity_unpin (e);
return rc; return rc;
} }

View file

@ -30,8 +30,8 @@
#define DDS_DOMAINID_PUB 0 #define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1 #define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<General><NetworkInterfaceAddress>127.0.0.1</NetworkInterfaceAddress></General><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery>" #define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery>"
#define DDS_CONFIG_NO_PORT_GAIN_LOG "<"DDS_PROJECT_NAME"><Domain><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery><Tracing><OutputFile>cyclonedds_liveliness.log</OutputFile><Verbosity>finest</Verbosity></Tracing></Domain></"DDS_PROJECT_NAME">" #define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Tracing><OutputFile>cyclonedds_liveliness_tests.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.log</OutputFile><Verbosity>finest</Verbosity></Tracing><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery>"
uint32_t g_topic_nr = 0; uint32_t g_topic_nr = 0;
static dds_entity_t g_pub_domain = 0; static dds_entity_t g_pub_domain = 0;
@ -44,7 +44,7 @@ static dds_entity_t g_sub_subscriber = 0;
static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size_t size) static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size_t size)
{ {
/* Get semi random g_topic name. */ /* Get unique g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid(); ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid(); ddsrt_tid_t tid = ddsrt_gettid();
(void)snprintf(name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid); (void)snprintf(name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid);
@ -53,10 +53,15 @@ static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size
static void liveliness_init(void) static void liveliness_init(void)
{ {
char *conf = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, UINT32_MAX); /* Domains for pub and sub use a different domain id, but the portgain setting
g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf); * in configuration is 0, so that both domains will map to the same port number.
g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf); * This allows to create two domains in a single test process. */
dds_free(conf); char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB);
char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB);
g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub);
g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub);
dds_free(conf_pub);
dds_free(conf_sub);
g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL); g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL(g_pub_participant > 0); CU_ASSERT_FATAL(g_pub_participant > 0);
@ -79,6 +84,11 @@ static void liveliness_fini(void)
dds_delete(g_pub_domain); dds_delete(g_pub_domain);
} }
/**
* Gets the current PMD sequence number for the participant. This
* can be used to count the number of PMD messages that is sent by
* the participant.
*/
static seqno_t get_pmd_seqno(dds_entity_t participant) static seqno_t get_pmd_seqno(dds_entity_t participant)
{ {
seqno_t seqno; seqno_t seqno;
@ -96,6 +106,9 @@ static seqno_t get_pmd_seqno(dds_entity_t participant)
return seqno; return seqno;
} }
/**
* Gets the current PMD interval for the participant
*/
static dds_duration_t get_pmd_interval(dds_entity_t participant) static dds_duration_t get_pmd_interval(dds_entity_t participant)
{ {
dds_duration_t intv; dds_duration_t intv;
@ -110,6 +123,9 @@ static dds_duration_t get_pmd_interval(dds_entity_t participant)
return intv; return intv;
} }
/**
* Gets the current lease duration for the participant
*/
static dds_duration_t get_ldur_config(dds_entity_t participant) static dds_duration_t get_ldur_config(dds_entity_t participant)
{ {
struct dds_entity *pp_entity; struct dds_entity *pp_entity;
@ -120,15 +136,22 @@ static dds_duration_t get_ldur_config(dds_entity_t participant)
return ldur; return ldur;
} }
/**
* Test that the correct number of PMD messages is sent for
* the various liveliness kinds.
*/
#define A DDS_LIVELINESS_AUTOMATIC #define A DDS_LIVELINESS_AUTOMATIC
#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT #define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT
#define MT DDS_LIVELINESS_MANUAL_BY_TOPIC
CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = { CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = {
CU_DataPoints(dds_liveliness_kind_t, A, A, MP), /* liveliness kind */ CU_DataPoints(dds_liveliness_kind_t, A, A, MP, MT), /* liveliness kind */
CU_DataPoints(uint32_t, 200, 200, 200), /* lease duration */ CU_DataPoints(uint32_t, 200, 200, 200, 200), /* lease duration */
CU_DataPoints(double, 5, 10, 5), /* delay (n times lease duration) */ CU_DataPoints(double, 5, 10, 5, 5), /* delay (n times lease duration) */
}; };
#undef A #undef MT
#undef MP #undef MP
#undef A
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30) CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
{ {
dds_entity_t pub_topic; dds_entity_t pub_topic;
@ -150,7 +173,7 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
kind == 0 ? "A" : "MP", ldur, mult); kind == 0 ? "A" : "MP", ldur, mult);
/* topics */ /* topics */
create_topic_name("ddsc_liveliness_test", g_topic_nr++, name, sizeof name); create_topic_name("ddsc_liveliness_pmd_count", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
@ -187,7 +210,7 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
/* end-start should be mult - 1, but allow 1 pmd sample to be lost */ /* end-start should be mult - 1, but allow 1 pmd sample to be lost */
CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 2 : 0)) CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 2 : 0))
if (kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT) if (kind != DDS_LIVELINESS_AUTOMATIC)
CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult) CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult)
/* cleanup */ /* cleanup */
@ -197,39 +220,42 @@ CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_livelin
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
} }
/* FIXME: add DDS_LIVELINESS_MANUAL_BY_TOPIC */ /**
* Test that the expected number of proxy writers expires (set to not-alive)
* after a certain delay for various combinations of writers with different
* liveliness kinds.
*/
CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = { CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = {
CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration */ CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration for initial test run (increased for each retry when test fails) */
CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */ CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */
CU_DataPoints(size_t, 1, 0, 2, 0, 1, 0, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */ CU_DataPoints(uint32_t, 1, 0, 2, 0, 1, 0, 0, 1, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */
CU_DataPoints(size_t, 1, 1, 2, 2, 1, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */ CU_DataPoints(uint32_t, 1, 1, 2, 2, 0, 0, 0, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */
CU_DataPoints(uint32_t, 1, 1, 2, 2, 1, 1, 1, 1, 0, 1, 1, 2, 5, 0, 10), /* number of writers with manual-by-topic liveliness */
}; };
CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60) CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 120)
{ {
dds_entity_t pub_topic; dds_entity_t pub_topic;
dds_entity_t sub_topic; dds_entity_t sub_topic;
dds_entity_t reader; dds_entity_t reader;
dds_entity_t *writers; dds_entity_t *writers;
dds_qos_t *rqos, *wqos_auto, *wqos_man_pp; dds_qos_t *rqos, *wqos_auto, *wqos_man_pp, *wqos_man_tp;
dds_entity_t waitset; dds_entity_t waitset;
dds_attach_t triggered; dds_attach_t triggered;
struct dds_liveliness_changed_status lstatus; struct dds_liveliness_changed_status lstatus;
uint32_t status; uint32_t status, n, run = 1, wr_cnt = wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp;
size_t n, run = 1;
char name[100]; char name[100];
size_t wr_cnt = wr_cnt_auto + wr_cnt_man_pp;
dds_time_t tstart, t; dds_time_t tstart, t;
bool test_finished = false; bool test_finished = false;
do do
{ {
tstart = dds_time(); tstart = dds_time();
printf("%d.%06d running test: lease duration %d, delay %f, auto/manual-by-participant %zu/%zu\n", printf("%d.%06d running test: lease duration %d, delay %f, auto/man-by-part/man-by-topic %u/%u/%u\n",
(int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000, (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000,
ldur, mult, wr_cnt_auto, wr_cnt_man_pp); ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp);
/* topics */ /* topics */
create_topic_name("ddsc_liveliness_test", g_topic_nr++, name, sizeof name); create_topic_name("ddsc_liveliness_expire_kinds", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0); CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
@ -245,6 +271,8 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
dds_qset_liveliness(wqos_auto, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur)); dds_qset_liveliness(wqos_auto, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((wqos_man_pp = dds_create_qos()) != NULL); CU_ASSERT_FATAL((wqos_man_pp = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_man_pp, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur)); dds_qset_liveliness(wqos_man_pp, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur));
CU_ASSERT_FATAL((wqos_man_tp = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_man_tp, DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0); CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
@ -252,17 +280,20 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
writers = dds_alloc(wr_cnt * sizeof(dds_entity_t)); writers = dds_alloc(wr_cnt * sizeof(dds_entity_t));
for (n = 0; n < wr_cnt; n++) for (n = 0; n < wr_cnt; n++)
{ {
CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, n < wr_cnt_auto ? wqos_auto : wqos_man_pp, NULL)) > 0); dds_qos_t *wqos;
wqos = n < wr_cnt_auto ? wqos_auto : (n < (wr_cnt_auto + wr_cnt_man_pp) ? wqos_man_pp : wqos_man_tp);
CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
} }
dds_delete_qos(wqos_auto); dds_delete_qos(wqos_auto);
dds_delete_qos(wqos_man_pp); dds_delete_qos(wqos_man_pp);
dds_delete_qos(wqos_man_tp);
t = dds_time(); t = dds_time();
if (t - tstart > DDS_MSECS(0.5 * ldur)) if (t - tstart > DDS_MSECS(0.5 * ldur))
{ {
ldur *= 10; ldur *= 10 / (run + 1);
printf("%d.%06d failed to create writers in time\n", printf("%d.%06d failed to create writers in time\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
} }
@ -274,7 +305,7 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt); CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt);
dds_time_t tstop = tstart + DDS_MSECS((dds_duration_t)(mult * ldur)); dds_time_t tstop = tstart + DDS_MSECS((dds_duration_t)(mult * ldur));
size_t stopped = 0; uint32_t stopped = 0;
do do
{ {
dds_duration_t w = tstop - dds_time(); dds_duration_t w = tstop - dds_time();
@ -283,13 +314,13 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
stopped += (uint32_t)lstatus.not_alive_count_change; stopped += (uint32_t)lstatus.not_alive_count_change;
} while (dds_time() < tstop); } while (dds_time() < tstop);
t = dds_time(); t = dds_time();
printf("%d.%06d writers stopped: %zu\n", printf("%d.%06d writers stopped: %u\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, stopped); (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, stopped);
size_t exp_stopped = mult < 1 ? 0 : wr_cnt_man_pp; size_t exp_stopped = mult < 1 ? 0 : (wr_cnt_man_pp + wr_cnt_man_tp);
if (stopped != exp_stopped) if (stopped != exp_stopped)
{ {
ldur *= 10; ldur *= 10 / (run + 1);
printf("%d.%06d incorrect number of stopped writers\n", printf("%d.%06d incorrect number of stopped writers\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000); (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
} }
@ -315,7 +346,7 @@ CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp)
if (!test_finished) if (!test_finished)
{ {
if (run++ > 2) if (++run > 3)
{ {
printf("%d.%06d run limit reached, test failed\n", (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000); printf("%d.%06d run limit reached, test failed\n", (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000);
CU_FAIL_FATAL("Run limit reached"); CU_FAIL_FATAL("Run limit reached");
@ -354,6 +385,10 @@ static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
} }
/**
* Test that the correct PMD interval is set for the participant
* based on the lease duration of the writers.
*/
#define MAX_WRITERS 10 #define MAX_WRITERS 10
CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveliness_fini) CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveliness_fini)
{ {
@ -361,7 +396,7 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
dds_entity_t sub_topic; dds_entity_t sub_topic;
dds_entity_t reader; dds_entity_t reader;
dds_entity_t writers[MAX_WRITERS]; dds_entity_t writers[MAX_WRITERS];
size_t wr_cnt = 0; uint32_t wr_cnt = 0;
char name[100]; char name[100];
dds_qos_t *rqos; dds_qos_t *rqos;
uint32_t n; uint32_t n;
@ -397,6 +432,9 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader); add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500)); CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
/* cleanup */ /* cleanup */
for (n = 0; n < wr_cnt; n++) for (n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
@ -406,6 +444,9 @@ CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveli
} }
#undef MAX_WRITERS #undef MAX_WRITERS
/**
* Check that the correct lease duration is set in the matched
* publications in the readers. */
CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini) CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini)
{ {
dds_entity_t pub_topic; dds_entity_t pub_topic;
@ -465,6 +506,12 @@ CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = li
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
} }
/**
* Create a relative large number of writers with liveliness kinds automatic and
* manual-by-participant and with decreasing lease duration, and check that all
* writers become alive. During the writer creation loop, every third writer
* is deleted immediately after creating.
*/
#define MAX_WRITERS 100 #define MAX_WRITERS 100
CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini) CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini)
{ {
@ -475,11 +522,11 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
dds_entity_t waitset; dds_entity_t waitset;
dds_qos_t *wqos; dds_qos_t *wqos;
struct dds_liveliness_changed_status lstatus; struct dds_liveliness_changed_status lstatus;
size_t wr_cnt = 0; uint32_t wr_cnt = 0;
char name[100]; char name[100];
dds_qos_t *rqos; dds_qos_t *rqos;
dds_attach_t triggered; dds_attach_t triggered;
uint32_t n, status; uint32_t n;
Space_Type1 sample = { 0, 0, 0 }; Space_Type1 sample = { 0, 0, 0 };
/* topics */ /* topics */
@ -501,12 +548,11 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500)); dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500));
CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1); CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* create writers */ /* create writers */
for (n = 1; n < MAX_WRITERS; n++) for (n = 1; n < MAX_WRITERS; n++)
{ {
dds_qset_liveliness(wqos, n % 1 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500 - n)); dds_qset_liveliness(wqos, n % 2 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (n % 3 ? 500 + n : 500 - n));
CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0); CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
dds_write (writers[n], &sample); dds_write (writers[n], &sample);
if (n % 3 == 2) if (n % 3 == 2)
@ -537,6 +583,9 @@ CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .
} }
#undef MAX_WRITERS #undef MAX_WRITERS
/**
* Check the counts in liveliness_changed_status result.
*/
CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini) CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini)
{ {
dds_entity_t pub_topic; dds_entity_t pub_topic;
@ -547,7 +596,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
dds_qos_t *rqos; dds_qos_t *rqos;
dds_qos_t *wqos; dds_qos_t *wqos;
dds_attach_t triggered; dds_attach_t triggered;
uint32_t status = 0;
struct dds_liveliness_changed_status lstatus; struct dds_liveliness_changed_status lstatus;
struct dds_subscription_matched_status sstatus; struct dds_subscription_matched_status sstatus;
char name[100]; char name[100];
@ -576,7 +624,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
/* wait for writer to be alive */ /* wait for writer to be alive */
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check status counts before proxy writer is expired */ /* check status counts before proxy writer is expired */
dds_get_liveliness_changed_status(reader, &lstatus); dds_get_liveliness_changed_status(reader, &lstatus);
@ -587,7 +634,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
/* sleep for more than lease duration, writer should be set not-alive but subscription still matched */ /* sleep for more than lease duration, writer should be set not-alive but subscription still matched */
dds_sleepfor(ldur + DDS_MSECS(100)); dds_sleepfor(ldur + DDS_MSECS(100));
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
dds_get_liveliness_changed_status(reader, &lstatus); dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0); CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0);
@ -597,7 +643,6 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
/* write sample and re-check status counts */ /* write sample and re-check status counts */
dds_write (writer, &sample); dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1); CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
dds_get_liveliness_changed_status(reader, &lstatus); dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1); CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
@ -612,3 +657,96 @@ CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = livelin
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK); CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
} }
/**
* Test that dds_assert_liveliness works as expected for liveliness
* kinds manual-by-participant and manual-by-topic.
*/
#define MAX_WRITERS 100
CU_TheoryDataPoints(ddsc_liveliness, assert_liveliness) = {
CU_DataPoints(uint32_t, 1, 0, 0, 1), /* number of writers with automatic liveliness */
CU_DataPoints(uint32_t, 1, 1, 0, 0), /* number of writers with manual-by-participant liveliness */
CU_DataPoints(uint32_t, 1, 1, 1, 2), /* number of writers with manual-by-topic liveliness */
};
CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, assert_liveliness, .init = liveliness_init, .fini = liveliness_fini, .timeout=30)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writers[MAX_WRITERS];
dds_qos_t *rqos;
struct dds_liveliness_changed_status lstatus;
char name[100];
dds_duration_t ldur = DDS_MSECS (300);
uint32_t wr_cnt = 0;
dds_time_t tstop;
uint32_t stopped;
assert (wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp < MAX_WRITERS);
printf("running test assert_liveliness: auto/man-by-part/man-by-topic %u/%u/%u\n", wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp);
/* topics */
create_topic_name("ddsc_liveliness_assert", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* writers */
for (size_t n = 0; n < wr_cnt_auto; n++)
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, ldur, &writers[wr_cnt++], pub_topic, reader);
for (size_t n = 0; n < wr_cnt_man_pp; n++)
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur, &writers[wr_cnt++], pub_topic, reader);
for (size_t n = 0; n < wr_cnt_man_tp; n++)
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, ldur, &writers[wr_cnt++], pub_topic, reader);
/* check status counts before proxy writer is expired */
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp);
/* delay for more than lease duration and assert liveliness on writers:
all writers (including man-by-pp) should be kept alive */
tstop = dds_time() + 4 * ldur / 3;
stopped = 0;
do
{
for (size_t n = wr_cnt - wr_cnt_man_tp; n < wr_cnt; n++)
dds_assert_liveliness (writers[n]);
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
stopped += (uint32_t)lstatus.not_alive_count_change;
dds_sleepfor (DDS_MSECS(50));
} while (dds_time() < tstop);
CU_ASSERT_EQUAL_FATAL(stopped, 0);
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp);
/* delay for more than lease duration and assert liveliness on participant:
writers with liveliness man-by-pp should be kept alive, man-by-topic writers
should stop */
tstop = dds_time() + 4 * ldur / 3;
stopped = 0;
do
{
dds_assert_liveliness (g_pub_participant);
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
stopped += (uint32_t)lstatus.not_alive_count_change;
dds_sleepfor (DDS_MSECS(50));
} while (dds_time() < tstop);
CU_ASSERT_EQUAL_FATAL(stopped, wr_cnt_man_tp);
dds_get_liveliness_changed_status(reader, &lstatus);
printf("writers alive_count: %d\n", lstatus.alive_count);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp);
/* cleanup */
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
for (size_t n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
}
#undef MAX_WRITERS

View file

@ -323,6 +323,7 @@ struct proxy_participant
unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */ unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */
unsigned minimal_bes_mode: 1; unsigned minimal_bes_mode: 1;
unsigned lease_expired: 1; unsigned lease_expired: 1;
unsigned deleting: 1;
unsigned proxypp_have_spdp: 1; unsigned proxypp_have_spdp: 1;
unsigned proxypp_have_cm: 1; unsigned proxypp_have_cm: 1;
unsigned owns_lease: 1; unsigned owns_lease: 1;
@ -369,7 +370,7 @@ struct proxy_writer {
unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */ unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */
unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */ unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */
unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */ unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */
unsigned alive: 1; /* iff 1, the proxy writer is alive (lease for this proxy writer is not expired) */ unsigned alive: 1; /* iff 1, the proxy writer is alive (lease for this proxy writer is not expired); field may be modified only when holding both pwr->e.lock and pwr->c.proxypp->e.lock */
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */ unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */
#endif #endif
@ -645,14 +646,14 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
reader or writer. Actual deletion is scheduled in the future, when reader or writer. Actual deletion is scheduled in the future, when
no outstanding references may still exist (determined by checking no outstanding references may still exist (determined by checking
thread progress, &c.). */ thread progress, &c.). */
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit, bool proxypp_locked); int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit); int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive); int proxy_writer_set_alive (struct proxy_writer *pwr);
int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *guid, bool alive); int proxy_writer_set_notalive (struct proxy_writer *pwr);
int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit); void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
@ -661,6 +662,8 @@ void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid,
rebuild them all (which only makes sense after previously having emptied them all). */ rebuild them all (which only makes sense after previously having emptied them all). */
void rebuild_or_clear_writer_addrsets(struct q_globals *gv, int rebuild); void rebuild_or_clear_writer_addrsets(struct q_globals *gv, int rebuild);
void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch);
void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok); void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok);
struct ddsi_writer_info; struct ddsi_writer_info;

View file

@ -42,7 +42,8 @@ int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *x
/* When calling the following functions, wr->lock must be held */ /* When calling the following functions, wr->lock must be held */
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew); dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew); int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync); void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync);
dds_return_t write_hb_liveliness (struct q_globals * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -1357,13 +1357,9 @@ static void handle_SEDP_dead (const struct receiver_state *rst, nn_plist_t *data
} }
GVLOGDISC (" "PGUIDFMT, PGUID (datap->endpoint_guid)); GVLOGDISC (" "PGUIDFMT, PGUID (datap->endpoint_guid));
if (is_writer_entityid (datap->endpoint_guid.entityid)) if (is_writer_entityid (datap->endpoint_guid.entityid))
{ res = delete_proxy_writer (gv, &datap->endpoint_guid, timestamp, 0);
res = delete_proxy_writer (gv, &datap->endpoint_guid, timestamp, 0, false);
}
else else
{
res = delete_proxy_reader (gv, &datap->endpoint_guid, timestamp, 0); res = delete_proxy_reader (gv, &datap->endpoint_guid, timestamp, 0);
}
GVLOGDISC (" %s\n", (res < 0) ? " unknown" : " delete"); GVLOGDISC (" %s\n", (res < 0) ? " unknown" : " delete");
} }

View file

@ -1488,7 +1488,7 @@ static void writer_drop_local_connection (const struct ddsi_guid *wr_guid, struc
} }
} }
static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch) void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch)
{ {
struct reader *rd; struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL) if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
@ -3619,16 +3619,20 @@ void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct
static void proxy_participant_replace_minl (struct proxy_participant *proxypp, bool manbypp, struct lease *lnew) static void proxy_participant_replace_minl (struct proxy_participant *proxypp, bool manbypp, struct lease *lnew)
{ {
/* By loading/storing the pointer atomically, we ensure we always
read a valid (or once valid) lease. By delaying freeing the lease
through the garbage collector, we ensure whatever lease update
occurs in parallel completes before the memory is released. */
const nn_etime_t never = { T_NEVER }; const nn_etime_t never = { T_NEVER };
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease); struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
struct lease *lease_old = (struct lease *) ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto); struct lease *lease_old = ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto);
lease_renew (lease_old, never); lease_renew (lease_old, never); /* ensures lease will not expire while it is replaced */
gcreq->arg = (void *) lease_old; gcreq->arg = lease_old;
gcreq_enqueue (gcreq); gcreq_enqueue (gcreq);
ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, (void *) lnew); ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, lnew);
} }
static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp, const struct proxy_writer * pwr) static void proxy_participant_add_pwr_lease_locked (struct proxy_participant * proxypp, const struct proxy_writer * pwr)
{ {
struct lease *minl_prev; struct lease *minl_prev;
struct lease *minl_new; struct lease *minl_new;
@ -3637,7 +3641,6 @@ static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp,
assert (pwr->lease != NULL); assert (pwr->lease != NULL);
manbypp = (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT); manbypp = (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT);
ddsrt_mutex_lock (&proxypp->e.lock);
lh = manbypp ? &proxypp->leaseheap_man : &proxypp->leaseheap_auto; lh = manbypp ? &proxypp->leaseheap_man : &proxypp->leaseheap_auto;
minl_prev = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh); minl_prev = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
ddsrt_fibheap_insert (&lease_fhdef_proxypp, lh, pwr->lease); ddsrt_fibheap_insert (&lease_fhdef_proxypp, lh, pwr->lease);
@ -3647,10 +3650,11 @@ static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp,
{ {
nn_etime_t texp = add_duration_to_etime (now_et (), minl_new->tdur); nn_etime_t texp = add_duration_to_etime (now_et (), minl_new->tdur);
struct lease *lnew = lease_new (texp, minl_new->tdur, minl_new->entity); struct lease *lnew = lease_new (texp, minl_new->tdur, minl_new->entity);
if (manbypp && minl_prev == NULL) if (minl_prev == NULL)
{ {
assert (manbypp);
assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL); assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL);
ddsrt_atomic_stvoidp (&proxypp->minl_man, (void *) lnew); ddsrt_atomic_stvoidp (&proxypp->minl_man, lnew);
} }
else else
{ {
@ -3658,7 +3662,6 @@ static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp,
} }
lease_register (lnew); lease_register (lnew);
} }
ddsrt_mutex_unlock (&proxypp->e.lock);
} }
static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant * proxypp, struct proxy_writer * pwr) static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant * proxypp, struct proxy_writer * pwr)
@ -3684,25 +3687,14 @@ static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant
proxy_participant_replace_minl (proxypp, manbypp, lnew); proxy_participant_replace_minl (proxypp, manbypp, lnew);
lease_register (lnew); lease_register (lnew);
} }
else if (manbypp)
{
proxy_participant_replace_minl (proxypp, manbypp, NULL);
}
else else
{ {
/* minl should not be null for leaseheap_auto because proxypp's lease is in */ assert (manbypp);
assert (false); proxy_participant_replace_minl (proxypp, manbypp, NULL);
} }
} }
} }
static void proxy_participant_remove_pwr_lease (struct proxy_participant * proxypp, struct proxy_writer * pwr)
{
ddsrt_mutex_lock (&proxypp->e.lock);
proxy_participant_remove_pwr_lease_locked (proxypp, pwr);
ddsrt_mutex_unlock (&proxypp->e.lock);
}
void new_proxy_participant void new_proxy_participant
( (
struct q_globals *gv, struct q_globals *gv,
@ -3736,6 +3728,7 @@ void new_proxy_participant
entity_common_init (&proxypp->e, gv, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false); entity_common_init (&proxypp->e, gv, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false);
proxypp->refc = 1; proxypp->refc = 1;
proxypp->lease_expired = 0; proxypp->lease_expired = 0;
proxypp->deleting = 0;
proxypp->vendor = vendor; proxypp->vendor = vendor;
proxypp->bes = bes; proxypp->bes = bes;
proxypp->prismtech_bes = prismtech_bes; proxypp->prismtech_bes = prismtech_bes;
@ -3789,8 +3782,11 @@ void new_proxy_participant
and uses the shortest duration for proxypp and all its pwr's (with automatic liveliness) */ and uses the shortest duration for proxypp and all its pwr's (with automatic liveliness) */
ddsrt_fibheap_insert (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease); ddsrt_fibheap_insert (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
/* Set the shortest lease for auto liveliness: clone proxypp's lease (as there are no pwr's /* Set the shortest lease for auto liveliness: clone proxypp's lease and store the clone in
at this point, this is the shortest lease) */ proxypp->minl_auto. As there are no pwr's at this point, the proxy pp's lease is the
shortest lease. When a pwr with a shorter is added, the lease in minl_auto is replaced
by the lease from the proxy writer in proxy_participant_add_pwr_lease_locked. This old shortest
lease is freed, so that's why we need a clone and not the proxypp's lease in the heap. */
ddsrt_atomic_stvoidp (&proxypp->minl_auto, (void *) lease_clone (proxypp->lease)); ddsrt_atomic_stvoidp (&proxypp->minl_auto, (void *) lease_clone (proxypp->lease));
ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL); ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL);
} }
@ -3948,9 +3944,14 @@ int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t s
return 0; return 0;
} }
static void ref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c) static int ref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
{ {
ddsrt_mutex_lock (&proxypp->e.lock); ddsrt_mutex_lock (&proxypp->e.lock);
if (proxypp->deleting)
{
ddsrt_mutex_unlock (&proxypp->e.lock);
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
c->proxypp = proxypp; c->proxypp = proxypp;
proxypp->refc++; proxypp->refc++;
@ -3962,6 +3963,8 @@ static void ref_proxy_participant (struct proxy_participant *proxypp, struct pro
} }
proxypp->endpoints = c; proxypp->endpoints = c;
ddsrt_mutex_unlock (&proxypp->e.lock); ddsrt_mutex_unlock (&proxypp->e.lock);
return DDS_RETCODE_OK;
} }
static void unref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c) static void unref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
@ -4070,8 +4073,9 @@ static void delete_or_detach_dependent_pp (struct proxy_participant *p, struct p
static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp, int isimplicit) static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp, int isimplicit)
{ {
struct proxy_endpoint_common * c; ddsi_entityid_t *eps;
int ret; ddsi_guid_t ep_guid;
uint32_t ep_count = 0;
/* if any proxy participants depend on this participant, delete them */ /* if any proxy participants depend on this participant, delete them */
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting dependent proxy participants\n", PGUID (proxypp->e.guid)); ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting dependent proxy participants\n", PGUID (proxypp->e.guid));
@ -4084,31 +4088,38 @@ static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp
ephash_enum_proxy_participant_fini (&est); ephash_enum_proxy_participant_fini (&est);
} }
/* delete_proxy_{reader,writer} merely schedules the actual delete
operation, so we can hold the lock -- at least, for now. */
ddsrt_mutex_lock (&proxypp->e.lock); ddsrt_mutex_lock (&proxypp->e.lock);
proxypp->deleting = 1;
if (isimplicit) if (isimplicit)
proxypp->lease_expired = 1; proxypp->lease_expired = 1;
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting endpoints\n", PGUID (proxypp->e.guid)); /* Get snapshot of endpoints list so that we can release proxypp->e.lock
c = proxypp->endpoints; Pwrs/prds may be deleted during the iteration over the entities,
while (c) but resolving the guid will fail for these entities and the our
call to delete_proxy_writer/reader returns. */
{ {
struct entity_common *e = entity_common_from_proxy_endpoint_common (c); eps = ddsrt_malloc (proxypp->refc * sizeof(ddsi_entityid_t));
if (is_writer_entityid (e->guid.entityid)) struct proxy_endpoint_common *cep = proxypp->endpoints;
while (cep)
{ {
ret = delete_proxy_writer (proxypp->e.gv, &e->guid, timestamp, isimplicit, true); const struct entity_common *entc = entity_common_from_proxy_endpoint_common (cep);
eps[ep_count++] = entc->guid.entityid;
cep = cep->next_ep;
} }
else
{
ret = delete_proxy_reader (proxypp->e.gv, &e->guid, timestamp, isimplicit);
}
(void) ret;
c = c->next_ep;
} }
ddsrt_mutex_unlock (&proxypp->e.lock); ddsrt_mutex_unlock (&proxypp->e.lock);
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting endpoints\n", PGUID (proxypp->e.guid));
ep_guid.prefix = proxypp->e.guid.prefix;
for (uint32_t n = 0; n < ep_count; n++)
{
ep_guid.entityid = eps[n];
if (is_writer_entityid (ep_guid.entityid))
delete_proxy_writer (proxypp->e.gv, &ep_guid, timestamp, isimplicit);
else
delete_proxy_reader (proxypp->e.gv, &ep_guid, timestamp, isimplicit);
}
ddsrt_free (eps);
gcreq_proxy_participant (proxypp); gcreq_proxy_participant (proxypp);
} }
@ -4185,9 +4196,10 @@ uint64_t get_entity_instance_id (const struct q_globals *gv, const struct ddsi_g
/* PROXY-ENDPOINT --------------------------------------------------- */ /* PROXY-ENDPOINT --------------------------------------------------- */
static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct ddsi_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist) static int proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct ddsi_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
{ {
const char *name; const char *name;
int ret;
if (is_builtin_entityid (guid->entityid, proxypp->vendor)) if (is_builtin_entityid (guid->entityid, proxypp->vendor))
assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == 0); assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == 0);
@ -4206,7 +4218,16 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
else else
memset (&c->group_guid, 0, sizeof (c->group_guid)); memset (&c->group_guid, 0, sizeof (c->group_guid));
ref_proxy_participant (proxypp, c); if ((ret = ref_proxy_participant (proxypp, c)) != DDS_RETCODE_OK)
{
nn_xqos_fini (c->xqos);
ddsrt_free (c->xqos);
unref_addrset (c->as);
entity_common_fini (e);
return ret;
}
return DDS_RETCODE_OK;
} }
static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_endpoint_common *c) static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_endpoint_common *c)
@ -4226,6 +4247,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
struct proxy_writer *pwr; struct proxy_writer *pwr;
int isreliable; int isreliable;
nn_mtime_t tnow = now_mt (); nn_mtime_t tnow = now_mt ();
int ret;
assert (is_writer_entityid (guid->entityid)); assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_writer_guid (gv->guid_hash, guid) == NULL); assert (ephash_lookup_proxy_writer_guid (gv->guid_hash, guid) == NULL);
@ -4237,7 +4259,11 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
} }
pwr = ddsrt_malloc (sizeof (*pwr)); pwr = ddsrt_malloc (sizeof (*pwr));
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist); if ((ret = proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist)) != DDS_RETCODE_OK)
{
ddsrt_free (pwr);
return ret;
}
ddsrt_avl_init (&pwr_readers_treedef, &pwr->readers); ddsrt_avl_init (&pwr_readers_treedef, &pwr->readers);
pwr->n_reliable_readers = 0; pwr->n_reliable_readers = 0;
@ -4275,7 +4301,19 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
nn_etime_t texpire = add_duration_to_etime (now_et (), pwr->c.xqos->liveliness.lease_duration); nn_etime_t texpire = add_duration_to_etime (now_et (), pwr->c.xqos->liveliness.lease_duration);
pwr->lease = lease_new (texpire, pwr->c.xqos->liveliness.lease_duration, &pwr->e); pwr->lease = lease_new (texpire, pwr->c.xqos->liveliness.lease_duration, &pwr->e);
if (pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) if (pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_add_pwr_lease (proxypp, pwr); {
ddsrt_mutex_lock (&proxypp->e.lock);
proxy_participant_add_pwr_lease_locked (proxypp, pwr);
ddsrt_mutex_unlock (&proxypp->e.lock);
}
else
{
lease_register (pwr->lease);
}
}
else
{
pwr->lease = NULL;
} }
if (isreliable) if (isreliable)
@ -4419,17 +4457,22 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
free_pwr_rd_match (m); free_pwr_rd_match (m);
} }
local_reader_ary_fini (&pwr->rdary); local_reader_ary_fini (&pwr->rdary);
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
lease_free (pwr->lease);
proxy_endpoint_common_fini (&pwr->e, &pwr->c); proxy_endpoint_common_fini (&pwr->e, &pwr->c);
nn_defrag_free (pwr->defrag); nn_defrag_free (pwr->defrag);
nn_reorder_free (pwr->reorder); nn_reorder_free (pwr->reorder);
ddsrt_free (pwr); ddsrt_free (pwr);
} }
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit, bool proxypp_locked) /* First stage in deleting the proxy writer. In this function the pwr and its member pointers
will remain valid. The real cleaning-up is done async in gc_delete_proxy_writer. */
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit)
{ {
struct proxy_writer *pwr; struct proxy_writer *pwr;
(void)isimplicit; DDSRT_UNUSED_ARG (isimplicit);
GVLOGDISC ("delete_proxy_writer ("PGUIDFMT") ", PGUID (*guid)); GVLOGDISC ("delete_proxy_writer ("PGUIDFMT") ", PGUID (*guid));
ddsrt_mutex_lock (&gv->lock); ddsrt_mutex_lock (&gv->lock);
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL) if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
{ {
@ -4446,66 +4489,60 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false); builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (gv->guid_hash, pwr); ephash_remove_proxy_writer_guid (gv->guid_hash, pwr);
ddsrt_mutex_unlock (&gv->lock); ddsrt_mutex_unlock (&gv->lock);
if (proxy_writer_set_notalive (pwr) != DDS_RETCODE_OK)
GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid));
/* Set lease expiry for this pwr to never so that the pwr will not be set
to alive again while it is scheduled for being deleted. */
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER) if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
{ lease_renew (pwr->lease, (nn_etime_t){ T_NEVER });
ddsrt_mutex_lock (&pwr->e.lock);
if (pwr->alive && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxypp_locked ? proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr) : proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->e.lock);
lease_free (pwr->lease);
}
gcreq_proxy_writer (pwr); gcreq_proxy_writer (pwr);
return 0; return DDS_RETCODE_OK;
} }
int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive) int proxy_writer_set_alive (struct proxy_writer *pwr)
{ {
/* Caller has pwr->e.lock, so we can safely read pwr->alive. For updating
* this field this function is also taking pwr->c.proxypp->e.lock */
ddsrt_avl_iter_t it; ddsrt_avl_iter_t it;
assert (pwr->alive != alive);
pwr->alive = alive;
GVLOGDISC (" alive=%d", pwr->alive);
if (pwr->alive) if (pwr->alive)
return DDS_RETCODE_PRECONDITION_NOT_MET;
ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
pwr->alive = true;
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
{ {
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
{ {
struct reader *rd; status_cb_data_t data;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL) data.add = true;
{ data.handle = pwr->e.iid;
status_cb_data_t data; data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
data.add = true; (rd->status_cb) (rd->status_cb_entity, &data);
data.handle = pwr->e.iid;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
} }
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_add_pwr_lease (pwr->c.proxypp, pwr);
} }
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_add_pwr_lease_locked (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
return DDS_RETCODE_OK;
}
int proxy_writer_set_notalive (struct proxy_writer *pwr)
{
/* Caller should not have taken pwr->e.lock and pwr->c.proxypp->e.lock;
* this function takes both locks to update pwr->alive value */
int ret = DDS_RETCODE_OK;
ddsrt_mutex_lock (&pwr->e.lock);
if (!pwr->alive)
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
else else
{ {
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
reader_drop_connection (&m->rd_guid, pwr, false); pwr->alive = false;
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr); proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
} }
return 0;
}
int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *guid, bool alive)
{
struct proxy_writer *pwr;
int ret;
ddsrt_mutex_lock (&gv->lock);
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
{
ddsrt_mutex_unlock (&gv->lock);
GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
}
ddsrt_mutex_unlock (&gv->lock);
ddsrt_mutex_lock (&pwr->e.lock);
ret = proxy_writer_set_alive_locked (gv, pwr, alive);
ddsrt_mutex_unlock (&pwr->e.lock); ddsrt_mutex_unlock (&pwr->e.lock);
return ret; return ret;
} }
@ -4521,6 +4558,7 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
struct proxy_participant *proxypp; struct proxy_participant *proxypp;
struct proxy_reader *prd; struct proxy_reader *prd;
nn_mtime_t tnow = now_mt (); nn_mtime_t tnow = now_mt ();
int ret;
assert (!is_writer_entityid (guid->entityid)); assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_reader_guid (gv->guid_hash, guid) == NULL); assert (ephash_lookup_proxy_reader_guid (gv->guid_hash, guid) == NULL);
@ -4532,7 +4570,11 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
} }
prd = ddsrt_malloc (sizeof (*prd)); prd = ddsrt_malloc (sizeof (*prd));
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist); if ((ret = proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist)) != DDS_RETCODE_OK)
{
ddsrt_free (prd);
return ret;
}
prd->deleting = 0; prd->deleting = 0;
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
@ -4549,7 +4591,7 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
ddsrt_mutex_unlock (&prd->e.lock); ddsrt_mutex_unlock (&prd->e.lock);
match_proxy_reader_with_writers (prd, tnow); match_proxy_reader_with_writers (prd, tnow);
return 0; return DDS_RETCODE_OK;
} }
static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *prd) static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *prd)

View file

@ -140,7 +140,6 @@ void lease_free (struct lease *l)
void lease_renew (struct lease *l, nn_etime_t tnowE) void lease_renew (struct lease *l, nn_etime_t tnowE)
{ {
struct q_globals const * gv;
nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur); nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur);
/* do not touch tend if moving forward or if already expired */ /* do not touch tend if moving forward or if already expired */
@ -151,7 +150,11 @@ void lease_renew (struct lease *l, nn_etime_t tnowE)
return; return;
} while (!ddsrt_atomic_cas64 (&l->tend, (uint64_t) tend, (uint64_t) tend_new.v)); } while (!ddsrt_atomic_cas64 (&l->tend, (uint64_t) tend, (uint64_t) tend_new.v));
gv = l->entity->gv; /* Only at this point we can assume that gv can be recovered from the entity in the
* lease (i.e. the entity still exists). In cases where dereferencing l->entity->gv
* is not safe (e.g. the deletion of entities), the early out in the loop above
* will be the case because tend is set to T_NEVER. */
struct q_globals const * gv = l->entity->gv;
if (gv->logconfig.c.mask & DDS_LC_TRACE) if (gv->logconfig.c.mask & DDS_LC_TRACE)
{ {
int32_t tsec, tusec; int32_t tsec, tusec;
@ -272,10 +275,27 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
delete_proxy_participant_by_guid (gv, &g, now(), 1); delete_proxy_participant_by_guid (gv, &g, now(), 1);
break; break;
case EK_PROXY_WRITER: case EK_PROXY_WRITER:
GVLOGDISC ("proxy_writer_set_alive ("PGUIDFMT")", PGUID (g)); {
(void) proxy_writer_set_alive_guid (gv, &g, false); struct proxy_writer *pwr;
ddsrt_avl_iter_t it;
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, &g)) == NULL)
{
GVLOGDISC (" "PGUIDFMT"?\n", PGUID (g));
ddsrt_mutex_lock (&gv->leaseheap_lock);
continue;
}
GVLOGDISC ("proxy_writer_set_notalive ("PGUIDFMT")", PGUID (g));
if (proxy_writer_set_notalive (pwr) == DDS_RETCODE_PRECONDITION_NOT_MET)
{
GVLOGDISC (" pwr was not alive");
ddsrt_mutex_lock (&gv->leaseheap_lock);
continue;
}
GVLOGDISC ("\n"); GVLOGDISC ("\n");
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
reader_drop_connection (&m->rd_guid, pwr, false);
break; break;
}
case EK_PARTICIPANT: case EK_PARTICIPANT:
case EK_READER: case EK_READER:
case EK_WRITER: case EK_WRITER:
@ -283,7 +303,6 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
assert (false); assert (false);
break; break;
} }
ddsrt_mutex_lock (&gv->leaseheap_lock); ddsrt_mutex_lock (&gv->leaseheap_lock);
} }

View file

@ -548,7 +548,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *
} }
/* Send a Heartbeat just to this peer */ /* Send a Heartbeat just to this peer */
add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 0); add_Heartbeat (m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0);
ETRACE (wr, "force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n", ETRACE (wr, "force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid)); PGUID (wr->e.guid), PGUID (prd->e.guid));
qxev_msg (wr->evq, m); qxev_msg (wr->evq, m);
@ -1131,12 +1131,18 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
RSTTRACE (PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst)); RSTTRACE (PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1; return 1;
} }
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow); lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
RSTTRACE (PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst)); RSTTRACE (PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst));
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
if (msg->smhdr.flags & HEARTBEAT_FLAG_LIVELINESS &&
pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC &&
pwr->c.xqos->liveliness.lease_duration != T_NEVER)
{
struct lease *lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man);
if (lease != NULL)
lease_renew (lease, tnow);
lease_renew (pwr->lease, tnow);
}
if (pwr->n_reliable_readers == 0) if (pwr->n_reliable_readers == 0)
{ {
RSTTRACE (PGUIDFMT" -> "PGUIDFMT" no-reliable-readers)", PGUID (src), PGUID (dst)); RSTTRACE (PGUIDFMT" -> "PGUIDFMT" no-reliable-readers)", PGUID (src), PGUID (dst));
@ -2087,7 +2093,8 @@ static void clean_defrag (struct proxy_writer *pwr)
nn_defrag_notegap (pwr->defrag, 1, seq); nn_defrag_notegap (pwr->defrag, 1, seq);
} }
static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease) static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo,
uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease)
{ {
struct proxy_writer *pwr; struct proxy_writer *pwr;
struct nn_rsample *rsample; struct nn_rsample *rsample;
@ -2114,13 +2121,11 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
/* Shouldn't lock the full writer, but will do so for now */ /* Shouldn't lock the full writer, but will do so for now */
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
/* FIXME: implement liveliness manual-by-topic */ if (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC && pwr->c.xqos->liveliness.lease_duration != T_NEVER)
/* if (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
lease_renew (pwr->lease, tnow); lease_renew (pwr->lease, tnow);
*/
if (!pwr->alive) if (!pwr->alive)
proxy_writer_set_alive_locked (pwr->e.gv, pwr, true); proxy_writer_set_alive (pwr);
/* Don't accept data when reliable readers exist and we haven't yet seen /* Don't accept data when reliable readers exist and we haven't yet seen
a heartbeat telling us what the "current" sequence number of the writer a heartbeat telling us what the "current" sequence number of the writer

View file

@ -194,7 +194,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id); nn_xmsg_setencoderid (msg, wr->partition_id);
#endif #endif
add_Heartbeat (msg, wr, whcst, hbansreq, to_entityid (NN_ENTITYID_UNKNOWN), issync); add_Heartbeat (msg, wr, whcst, hbansreq, 0, to_entityid (NN_ENTITYID_UNKNOWN), issync);
} }
else else
{ {
@ -215,7 +215,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id); nn_xmsg_setencoderid (msg, wr->partition_id);
#endif #endif
add_Heartbeat (msg, wr, whcst, hbansreq, prd_guid->entityid, issync); add_Heartbeat (msg, wr, whcst, hbansreq, 0, prd_guid->entityid, issync);
} }
writer_hbcontrol_note_hb (wr, tnow, hbansreq); writer_hbcontrol_note_hb (wr, tnow, hbansreq);
@ -313,7 +313,7 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_
return msg; return msg;
} }
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync) void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync)
{ {
struct q_globals const * const gv = wr->e.gv; struct q_globals const * const gv = wr->e.gv;
struct nn_xmsg_marker sm_marker; struct nn_xmsg_marker sm_marker;
@ -324,6 +324,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
assert (wr->reliable); assert (wr->reliable);
assert (hbansreq >= 0); assert (hbansreq >= 0);
assert (hbliveliness >= 0);
if (gv->config.meas_hb_to_ack_latency) if (gv->config.meas_hb_to_ack_latency)
{ {
@ -337,6 +338,8 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
if (!hbansreq) if (!hbansreq)
hb->smhdr.flags |= HEARTBEAT_FLAG_FINAL; hb->smhdr.flags |= HEARTBEAT_FLAG_FINAL;
if (hbliveliness)
hb->smhdr.flags |= HEARTBEAT_FLAG_LIVELINESS;
hb->readerId = nn_hton_entityid (dst); hb->readerId = nn_hton_entityid (dst);
hb->writerId = nn_hton_entityid (wr->e.guid.entityid); hb->writerId = nn_hton_entityid (wr->e.guid.entityid);
@ -666,6 +669,34 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn
nn_xmsg_submsg_setnext (*pmsg, sm_marker); nn_xmsg_submsg_setnext (*pmsg, sm_marker);
} }
dds_return_t write_hb_liveliness (struct q_globals * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp)
{
struct nn_xmsg *msg = NULL;
struct whc_state whcst;
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, gv);
struct writer *wr = ephash_lookup_writer_guid (gv->guid_hash, wr_guid);
if (wr == NULL)
{
GVTRACE ("write_hb_liveliness("PGUIDFMT") - writer not found\n", PGUID (*wr_guid));
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL)
return DDS_RETCODE_OUT_OF_RESOURCES;
ddsrt_mutex_lock (&wr->e.lock);
nn_xmsg_setdstN (msg, wr->as, wr->as_group);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
whc_get_state (wr->whc, &whcst);
add_Heartbeat (msg, wr, &whcst, 0, 1, to_entityid (NN_ENTITYID_UNKNOWN), 1);
ddsrt_mutex_unlock (&wr->e.lock);
nn_xpack_addmsg (xp, msg, 0);
nn_xpack_send (xp, true);
thread_state_asleep (ts1);
return DDS_RETCODE_OK;
}
#if 0 #if 0
static int must_skip_frag (const char *frags_to_skip, unsigned frag) static int must_skip_frag (const char *frags_to_skip, unsigned frag)
{ {