Merge pull request #342 from eboasson/liveliness

Liveliness QoS implementation
This commit is contained in:
eboasson 2019-11-25 19:13:43 +01:00 committed by GitHub
commit da17a9f5d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 1787 additions and 331 deletions

View file

@ -3260,6 +3260,29 @@ dds_get_matched_publication_data (
dds_entity_t reader,
dds_instance_handle_t ih);
/**
* @brief This operation manually asserts the liveliness of a writer
* or domain participant.
*
* This operation manually asserts the liveliness of a writer
* or domain participant. This is used in combination with the Liveliness
* QoS policy to indicate that the entity remains active. This operation need
* only be used if the liveliness kind in the QoS is either
* DDS_LIVELINESS_MANUAL_BY_PARTICIPANT or DDS_LIVELINESS_MANUAL_BY_TOPIC.
*
* @param[in] entity A domain participant or writer
*
* @returns A dds_return_t indicating success or failure.
*
* @retval DDS_RETCODE_OK
* The operation was successful.
* @retval DDS_RETCODE_ILLEGAL_OPERATION
* The operation is invoked on an inappropriate object.
*/
DDS_EXPORT dds_return_t
dds_assert_liveliness (
dds_entity_t entity);
#if defined (__cplusplus)
}
#endif

View file

@ -22,7 +22,9 @@
#include "dds__qos.h"
#include "dds__topic.h"
#include "dds/version.h"
#include "dds/ddsi/ddsi_pmd.h"
#include "dds/ddsi/q_xqos.h"
#include "dds/ddsi/q_transmit.h"
extern inline dds_entity *dds_entity_from_handle_link (struct dds_handle_link *hdllink);
extern inline bool dds_entity_is_enabled (const dds_entity *e);
@ -1383,3 +1385,32 @@ dds_return_t dds_generic_unimplemented_operation (dds_entity_t handle, dds_entit
return dds_generic_unimplemented_operation_manykinds (handle, 1, &kind);
}
dds_return_t dds_assert_liveliness (dds_entity_t entity)
{
dds_return_t rc;
dds_entity *e, *ewr;
if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
switch (dds_entity_kind (e))
{
case DDS_KIND_PARTICIPANT: {
write_pmd_message_guid (&e->m_domain->gv, &e->m_guid, PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
break;
}
case DDS_KIND_WRITER: {
if ((rc = dds_entity_lock (entity, DDS_KIND_WRITER, &ewr)) != DDS_RETCODE_OK)
return rc;
if ((rc = write_hb_liveliness (&e->m_domain->gv, &e->m_guid, ((struct dds_writer *)ewr)->m_xp)) != DDS_RETCODE_OK)
return rc;
dds_entity_unlock (e);
break;
}
default: {
rc = DDS_RETCODE_ILLEGAL_OPERATION;
break;
}
}
dds_entity_unpin (e);
return rc;
}

View file

@ -277,7 +277,6 @@ int32_t dds_handle_pin_for_delete (dds_handle_t hdl, bool explicit, struct dds_h
uint32_t cf, cf1;
/* Assume success; bail out if the object turns out to be in the process of
being deleted */
rc = DDS_RETCODE_OK;
do {
cf = ddsrt_atomic_ld32 (&(*link)->cnt_flags);

View file

@ -233,16 +233,47 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
}
case DDS_LIVELINESS_CHANGED_STATUS_ID: {
struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status;
if (data->add) {
st->alive_count++;
st->alive_count_change++;
if (st->not_alive_count > 0) {
DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 &&
LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE &&
LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE &&
LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE &&
LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE &&
LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE &&
LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < LIVELINESS_CHANGED_TWITCH &&
(uint32_t) LIVELINESS_CHANGED_TWITCH < UINT32_MAX);
assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_TWITCH);
switch ((enum liveliness_changed_data_extra) data->extra)
{
case LIVELINESS_CHANGED_ADD_ALIVE:
st->alive_count++;
st->alive_count_change++;
break;
case LIVELINESS_CHANGED_ADD_NOT_ALIVE:
st->not_alive_count++;
st->not_alive_count_change++;
break;
case LIVELINESS_CHANGED_REMOVE_NOT_ALIVE:
st->not_alive_count--;
}
} else {
st->alive_count--;
st->not_alive_count++;
st->not_alive_count_change++;
st->not_alive_count_change--;
break;
case LIVELINESS_CHANGED_REMOVE_ALIVE:
st->alive_count--;
st->alive_count_change--;
break;
case LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE:
st->alive_count--;
st->alive_count_change--;
st->not_alive_count++;
st->not_alive_count_change++;
break;
case LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE:
st->not_alive_count--;
st->not_alive_count_change--;
st->alive_count++;
st->alive_count_change++;
break;
case LIVELINESS_CHANGED_TWITCH:
break;
}
st->last_publication_handle = data->handle;
invoke = (lst->on_liveliness_changed != 0);

View file

@ -2394,7 +2394,7 @@ static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_
ddsrt_atomic_st32 (&cond->m_entity.m_status.m_trigger, trigger);
dds_entity_status_signal (&cond->m_entity, DDS_DATA_AVAILABLE_STATUS);
}
TRACE ("add_readcondition(%p, %"PRIx32", %"PRIx32", %"PRIx32") => %p qminv %"PRIx32" ; rhc %"PRIu32" conds\n",
(void *) rhc, cond->m_sample_states, cond->m_view_states,
cond->m_instance_states, (void *) cond, cond->m_qminv, rhc->nconds);

View file

@ -28,6 +28,7 @@ set(ddsc_test_sources
"err.c"
"instance_get_key.c"
"listener.c"
"liveliness.c"
"participant.c"
"publisher.c"
"qos.c"
@ -55,7 +56,9 @@ set(ddsc_test_sources
add_cunit_executable(cunit_ddsc ${ddsc_test_sources})
target_include_directories(
cunit_ddsc PRIVATE
"$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/src/include/>")
"$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/src/include/>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsc/src>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsi/include>")
target_link_libraries(cunit_ddsc PRIVATE RoundTrip Space TypesArrayKey ddsc)
# Setup environment for config-tests

View file

@ -360,9 +360,9 @@ CU_Test(ddsc_entity, liveliness_changed, .init=init_entity_status, .fini=fini_en
ret = dds_get_liveliness_changed_status (rea, &liveliness_changed);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count_change, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 1);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count_change,1);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count_change, -1);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count_change,0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.last_publication_handle, writer_i_hdl);
/* Second call should reset the changed count. */
@ -370,7 +370,7 @@ CU_Test(ddsc_entity, liveliness_changed, .init=init_entity_status, .fini=fini_en
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count_change, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 1);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count_change,0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.last_publication_handle, writer_i_hdl);
}

View file

@ -1196,9 +1196,9 @@ CU_Test(ddsc_listener, liveliness_changed, .init=init_triggering_test, .fini=fin
CU_ASSERT_EQUAL_FATAL(triggered & DDS_LIVELINESS_CHANGED_STATUS, DDS_LIVELINESS_CHANGED_STATUS);
CU_ASSERT_EQUAL_FATAL(cb_reader, g_reader);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.alive_count, 0);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.alive_count_change, 0);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count, 1);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count_change, 1);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.alive_count_change, -1);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count, 0);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.not_alive_count_change, 0);
CU_ASSERT_EQUAL_FATAL(cb_liveliness_changed_status.last_publication_handle, writer_hdl);
/* The listener should have reset the count_change. */
@ -1206,7 +1206,7 @@ CU_Test(ddsc_listener, liveliness_changed, .init=init_triggering_test, .fini=fin
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.alive_count_change, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 1);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.not_alive_count_change, 0);
CU_ASSERT_EQUAL_FATAL(liveliness_changed.last_publication_handle, writer_hdl);
}

View file

@ -0,0 +1,812 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "Space.h"
#include "config_env.h"
#include "dds/version.h"
#include "dds__entity.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsrt/cdtors.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/environ.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/time.h"
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
#define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Tracing><OutputFile>cyclonedds_liveliness_tests.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.log</OutputFile><Verbosity>finest</Verbosity></Tracing><Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
uint32_t g_topic_nr = 0;
static dds_entity_t g_pub_domain = 0;
static dds_entity_t g_pub_participant = 0;
static dds_entity_t g_pub_publisher = 0;
static dds_entity_t g_sub_domain = 0;
static dds_entity_t g_sub_participant = 0;
static dds_entity_t g_sub_subscriber = 0;
static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size_t size)
{
/* Get unique g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid();
(void)snprintf(name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid);
return name;
}
static void liveliness_init(void)
{
/* Domains for pub and sub use a different domain id, but the portgain setting
* in configuration is 0, so that both domains will map to the same port number.
* This allows to create two domains in a single test process. */
char *conf_pub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB);
char *conf_sub = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB);
g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf_pub);
g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf_sub);
dds_free(conf_pub);
dds_free(conf_sub);
g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL(g_pub_participant > 0);
g_sub_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL);
CU_ASSERT_FATAL(g_sub_participant > 0);
g_pub_publisher = dds_create_publisher(g_pub_participant, NULL, NULL);
CU_ASSERT_FATAL(g_pub_publisher > 0);
g_sub_subscriber = dds_create_subscriber(g_sub_participant, NULL, NULL);
CU_ASSERT_FATAL(g_sub_subscriber > 0);
}
static void liveliness_fini(void)
{
dds_delete(g_sub_subscriber);
dds_delete(g_pub_publisher);
dds_delete(g_sub_participant);
dds_delete(g_pub_participant);
dds_delete(g_sub_domain);
dds_delete(g_pub_domain);
}
/**
* Gets the current PMD sequence number for the participant. This
* can be used to count the number of PMD messages that is sent by
* the participant.
*/
static seqno_t get_pmd_seqno(dds_entity_t participant)
{
seqno_t seqno;
struct dds_entity *pp_entity;
struct participant *pp;
struct writer *wr;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
wr = get_builtin_writer(pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
CU_ASSERT_FATAL(wr != NULL);
assert(wr != NULL); /* for Clang's static analyzer */
seqno = wr->seq;
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(pp_entity);
return seqno;
}
/**
* Gets the current PMD interval for the participant
*/
static dds_duration_t get_pmd_interval(dds_entity_t participant)
{
dds_duration_t intv;
struct dds_entity *pp_entity;
struct participant *pp;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
intv = pp_get_pmd_interval(pp);
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(pp_entity);
return intv;
}
/**
* Gets the current lease duration for the participant
*/
static dds_duration_t get_ldur_config(dds_entity_t participant)
{
struct dds_entity *pp_entity;
dds_duration_t ldur;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
ldur = (dds_duration_t)pp_entity->m_domain->gv.config.lease_duration;
dds_entity_unpin(pp_entity);
return ldur;
}
/**
* Test that the correct number of PMD messages is sent for
* the various liveliness kinds.
*/
#define A DDS_LIVELINESS_AUTOMATIC
#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT
#define MT DDS_LIVELINESS_MANUAL_BY_TOPIC
CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = {
CU_DataPoints(dds_liveliness_kind_t, A, A, MP, MT), /* liveliness kind */
CU_DataPoints(uint32_t, 200, 500, 100, 100), /* lease duration */
CU_DataPoints(double, 10, 5, 5, 5), /* delay (n times lease duration) */
};
#undef MT
#undef MP
#undef A
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writer;
seqno_t start_seqno, end_seqno;
dds_qos_t *rqos;
dds_qos_t *wqos;
dds_entity_t waitset;
dds_attach_t triggered;
uint32_t status;
char name[100];
dds_time_t t;
t = dds_time();
printf("%d.%06d running test: kind %s, lease duration %d, delay %d\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000,
kind == 0 ? "A" : "MP", ldur, (int32_t)(mult * ldur));
/* wait for initial PMD to be sent by the participant */
while (get_pmd_seqno(g_pub_participant) < 1)
dds_sleepfor (DDS_MSECS(50));
/* topics */
create_topic_name("ddsc_liveliness_pmd_count", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* waitset on reader */
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
/* writer */
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, kind, DDS_MSECS(ldur));
CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
dds_delete_qos(wqos);
/* wait for writer to be alive */
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(1)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check no of PMD messages sent */
start_seqno = get_pmd_seqno(g_pub_participant);
dds_sleepfor(DDS_MSECS((dds_duration_t)(mult * ldur)));
end_seqno = get_pmd_seqno(g_pub_participant);
t = dds_time();
printf("%d.%06d PMD sequence no: start %" PRId64 " -> end %" PRId64 "\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000,
start_seqno, end_seqno);
/* End-start should be mult - 1 under ideal circumstances, but consider the test successful
when at least 50% of the expected PMD's was sent. This checks that the frequency for sending
PMDs was increased when the writer was added. */
CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? (50 * (mult - 1)) / 100 : 0))
if (kind != DDS_LIVELINESS_AUTOMATIC)
CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult)
/* cleanup */
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}
/**
* Test that the expected number of proxy writers expires (set to not-alive)
* after a certain delay for various combinations of writers with different
* liveliness kinds.
*/
CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = {
CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration for initial test run (increased for each retry when test fails) */
CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */
CU_DataPoints(uint32_t, 1, 0, 2, 0, 1, 0, 0, 1, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */
CU_DataPoints(uint32_t, 1, 1, 2, 2, 0, 0, 0, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */
CU_DataPoints(uint32_t, 1, 1, 2, 2, 1, 1, 1, 1, 0, 1, 1, 2, 5, 0, 10), /* number of writers with manual-by-topic liveliness */
};
CU_Theory((uint32_t ldur, double mult, uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 120)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t *writers;
dds_qos_t *rqos, *wqos_auto, *wqos_man_pp, *wqos_man_tp;
dds_entity_t waitset;
dds_attach_t triggered;
struct dds_liveliness_changed_status lstatus;
uint32_t status, n, run = 1, wr_cnt = wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp;
char name[100];
dds_time_t tstart, t;
bool test_finished = false;
do
{
tstart = dds_time();
printf("%d.%06d running test: lease duration %d, delay %f, auto/man-by-part/man-by-topic %u/%u/%u\n",
(int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000,
ldur, mult, wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp);
/* topics */
create_topic_name("ddsc_liveliness_expire_kinds", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* writers */
CU_ASSERT_FATAL((wqos_auto = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_auto, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((wqos_man_pp = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_man_pp, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur));
CU_ASSERT_FATAL((wqos_man_tp = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_man_tp, DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
writers = dds_alloc(wr_cnt * sizeof(dds_entity_t));
for (n = 0; n < wr_cnt; n++)
{
dds_qos_t *wqos;
wqos = n < wr_cnt_auto ? wqos_auto : (n < (wr_cnt_auto + wr_cnt_man_pp) ? wqos_man_pp : wqos_man_tp);
CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
}
dds_delete_qos(wqos_auto);
dds_delete_qos(wqos_man_pp);
dds_delete_qos(wqos_man_tp);
t = dds_time();
if (t - tstart > DDS_MSECS(0.5 * ldur))
{
ldur *= 10 / (run + 1);
printf("%d.%06d failed to create writers in time\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
}
else
{
/* check alive count before proxy writers are expired */
dds_get_liveliness_changed_status(reader, &lstatus);
printf("%d.%06d writers alive: %d\n", (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, lstatus.alive_count);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt);
dds_time_t tstop = tstart + DDS_MSECS((dds_duration_t)(mult * ldur));
uint32_t stopped = 0;
do
{
dds_duration_t w = tstop - dds_time();
CU_ASSERT_FATAL((dds_waitset_wait(waitset, &triggered, 1, w > 0 ? w : 0)) >= 0);
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
stopped += (uint32_t)lstatus.not_alive_count_change;
} while (dds_time() < tstop);
t = dds_time();
printf("%d.%06d writers stopped: %u\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, stopped);
size_t exp_stopped = mult < 1 ? 0 : (wr_cnt_man_pp + wr_cnt_man_tp);
if (stopped != exp_stopped)
{
ldur *= 10 / (run + 1);
printf("%d.%06d incorrect number of stopped writers\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
}
else
{
/* check alive count */
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
CU_ASSERT_EQUAL(lstatus.alive_count, mult < 1 ? wr_cnt : wr_cnt_auto);
test_finished = true;
}
}
/* cleanup */
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
for (n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
dds_free(writers);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
if (!test_finished)
{
if (++run > 3)
{
printf("%d.%06d run limit reached, test failed\n", (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000);
CU_FAIL_FATAL("Run limit reached");
test_finished = true;
continue;
}
else
{
printf("%d.%06d restarting test with ldur %d\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, ldur);
}
}
} while (!test_finished);
}
static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur, dds_entity_t *writer, dds_entity_t topic, dds_entity_t reader)
{
dds_entity_t waitset;
dds_qos_t *wqos;
dds_attach_t triggered;
uint32_t status;
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, kind, ldur);
CU_ASSERT_FATAL((*writer = dds_create_writer(g_pub_participant, topic, wqos, NULL)) > 0);
dds_delete_qos(wqos);
/* wait for writer to be alive */
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
}
/**
* Test that the correct PMD interval is set for the participant
* based on the lease duration of the writers.
*/
#define MAX_WRITERS 10
CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveliness_fini)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writers[MAX_WRITERS];
uint32_t wr_cnt = 0;
char name[100];
dds_qos_t *rqos;
uint32_t n;
/* topics */
create_topic_name("ddsc_liveliness_ldur", 1, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader and waitset */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check if pmd defaults to configured duration */
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), get_ldur_config(g_pub_participant));
/* create writers and check pmd interval in publishing participant */
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(1000), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(500), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
/* cleanup */
for (n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}
#undef MAX_WRITERS
/**
* Check that the correct lease duration is set in the matched
* publications in the readers. */
CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writer;
char name[100];
dds_qos_t *rqos, *wqos;
dds_entity_t waitset;
dds_attach_t triggered;
uint32_t status;
dds_duration_t ldur;
/* topics */
create_topic_name("ddsc_liveliness_ldurpwr", 1, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* writer */
ldur = 1000;
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
/* wait for writer to be alive */
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check pwr lease duration in matched publication */
dds_instance_handle_t wrs[1];
CU_ASSERT_EQUAL_FATAL(dds_get_matched_publications(reader, wrs, 1), 1);
dds_builtintopic_endpoint_t *ep;
ep = dds_get_matched_publication_data(reader, wrs[0]);
CU_ASSERT_FATAL(ep != NULL);
assert(ep != NULL); /* for Clang's static analyzer */
CU_ASSERT_EQUAL_FATAL(ep->qos->liveliness.lease_duration, DDS_MSECS(ldur));
dds_delete_qos(ep->qos);
dds_free(ep->topic_name);
dds_free(ep->type_name);
dds_free(ep);
/* cleanup */
dds_delete_qos(wqos);
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}
/**
* Create a relative large number of writers with liveliness kinds automatic and
* manual-by-participant and with decreasing lease duration, and check that all
* writers become alive. During the writer creation loop, every third writer
* is deleted immediately after creating.
*/
#define MAX_WRITERS 100
CU_Test(ddsc_liveliness, create_delete_writer_stress, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writers[MAX_WRITERS];
dds_entity_t waitset;
dds_qos_t *wqos;
struct dds_liveliness_changed_status lstatus;
uint32_t alive_writers_auto = 0, alive_writers_man = 0;
char name[100];
dds_qos_t *rqos;
dds_attach_t triggered;
uint32_t n;
Space_Type1 sample = { 0, 0, 0 };
/* topics */
create_topic_name("ddsc_liveliness_wr_stress", 1, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader and waitset */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
/* create 1st writer and wait for it to become alive */
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (500));
CU_ASSERT_FATAL((writers[0] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
alive_writers_man++;
/* create writers */
for (n = 1; n < MAX_WRITERS; n++)
{
dds_qset_liveliness(wqos, n % 2 ? DDS_LIVELINESS_AUTOMATIC : DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS (n % 3 ? 500 + n : 500 - n));
CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
dds_write (writers[n], &sample);
if (n % 3 == 2)
dds_delete(writers[n]);
else if (n % 2)
alive_writers_auto++;
else
alive_writers_man++;
}
dds_delete_qos(wqos);
printf("alive_writers_auto: %d, alive_writers_man: %d\n", alive_writers_auto, alive_writers_man);
/* wait for auto liveliness writers to become alive and manual-by-pp writers to become not-alive */
do
{
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status (reader, &lstatus), DDS_RETCODE_OK);
printf("alive: %d, not-alive: %d\n", lstatus.alive_count, lstatus.not_alive_count);
dds_sleepfor (DDS_MSECS(50));
}
while (lstatus.alive_count != alive_writers_auto || lstatus.not_alive_count != alive_writers_man);
/* cleanup remaining writers */
for (n = 0; n < MAX_WRITERS; n++)
{
if (n % 3 != 2)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
}
/* wait for alive_count and not_alive_count to become 0 */
do
{
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status (reader, &lstatus), DDS_RETCODE_OK);
printf("alive: %d, not: %d\n", lstatus.alive_count, lstatus.not_alive_count);
dds_sleepfor (DDS_MSECS(50));
}
while (lstatus.alive_count > 0 || lstatus.not_alive_count > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
}
#undef MAX_WRITERS
/**
* Check the counts in liveliness_changed_status result.
*/
CU_Test(ddsc_liveliness, status_counts, .init = liveliness_init, .fini = liveliness_fini)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writer;
dds_entity_t waitset;
dds_qos_t *rqos;
dds_qos_t *wqos;
dds_attach_t triggered;
struct dds_liveliness_changed_status lstatus;
struct dds_subscription_matched_status sstatus;
char name[100];
dds_duration_t ldur = DDS_MSECS (500);
Space_Type1 sample = { 1, 0, 0 };
/* topics */
create_topic_name("ddsc_liveliness_status_counts", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
/* writer */
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, ldur);
CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
dds_delete_qos(wqos);
/* wait for writer to be alive */
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
/* check status counts before proxy writer is expired */
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
dds_get_subscription_matched_status(reader, &sstatus);
CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
/* sleep for more than lease duration, writer should be set not-alive but subscription still matched */
dds_sleepfor(ldur + DDS_MSECS(100));
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 0);
dds_get_subscription_matched_status(reader, &sstatus);
CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
/* write sample and re-check status counts */
dds_write (writer, &sample);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, 1);
dds_get_subscription_matched_status(reader, &sstatus);
CU_ASSERT_EQUAL_FATAL(sstatus.current_count, 1);
/* cleanup */
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
}
/**
* Test that dds_assert_liveliness works as expected for liveliness
* kinds manual-by-participant and manual-by-topic.
*/
#define MAX_WRITERS 100
CU_TheoryDataPoints(ddsc_liveliness, assert_liveliness) = {
CU_DataPoints(uint32_t, 1, 0, 0, 1), /* number of writers with automatic liveliness */
CU_DataPoints(uint32_t, 1, 1, 0, 0), /* number of writers with manual-by-participant liveliness */
CU_DataPoints(uint32_t, 1, 1, 1, 2), /* number of writers with manual-by-topic liveliness */
};
CU_Theory((uint32_t wr_cnt_auto, uint32_t wr_cnt_man_pp, uint32_t wr_cnt_man_tp), ddsc_liveliness, assert_liveliness, .init = liveliness_init, .fini = liveliness_fini, .timeout=60)
{
dds_entity_t pub_topic, sub_topic, reader, writers[MAX_WRITERS];
dds_qos_t *rqos;
struct dds_liveliness_changed_status lstatus;
char name[100];
uint32_t ldur = 100, wr_cnt, run = 1, stopped;
dds_time_t tstart, tstop, t;
bool test_finished = false;
do
{
wr_cnt = 0;
assert (wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp < MAX_WRITERS);
printf("running test assert_liveliness: auto/man-by-part/man-by-topic %u/%u/%u with ldur %d\n",
wr_cnt_auto, wr_cnt_man_pp, wr_cnt_man_tp, ldur);
/* topics */
create_topic_name("ddsc_liveliness_assert", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* writers */
for (size_t n = 0; n < wr_cnt_auto; n++)
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader);
tstart = dds_time();
for (size_t n = 0; n < wr_cnt_man_pp; n++)
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader);
for (size_t n = 0; n < wr_cnt_man_tp; n++)
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_TOPIC, DDS_MSECS(ldur), &writers[wr_cnt++], pub_topic, reader);
t = dds_time();
if (t - tstart > DDS_MSECS(0.5 * ldur))
{
ldur *= 10 / (run + 1);
printf("%d.%06d failed to create writers with non-automatic liveliness kind in time\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
}
else
{
/* check status counts before proxy writer is expired */
dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp);
/* delay for more than lease duration and assert liveliness on writers:
all writers (including man-by-pp) should be kept alive */
tstop = dds_time() + 4 * DDS_MSECS(ldur) / 3;
stopped = 0;
do
{
for (size_t n = wr_cnt - wr_cnt_man_tp; n < wr_cnt; n++)
dds_assert_liveliness (writers[n]);
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
stopped += (uint32_t)lstatus.not_alive_count_change;
dds_sleepfor (DDS_MSECS(50));
} while (dds_time() < tstop);
dds_get_liveliness_changed_status(reader, &lstatus);
printf("writers alive with dds_assert_liveliness on all writers: %d, writers stopped: %d\n", lstatus.alive_count, stopped);
if (lstatus.alive_count != wr_cnt_auto + wr_cnt_man_pp + wr_cnt_man_tp || stopped != 0)
{
ldur *= 10 / (run + 1);
printf("incorrect number of writers alive or stopped writers\n");
}
else
{
/* delay for more than lease duration and assert liveliness on participant:
writers with liveliness man-by-pp should be kept alive, man-by-topic writers
should stop */
tstop = dds_time() + 4 * DDS_MSECS(ldur) / 3;
stopped = 0;
do
{
dds_assert_liveliness (g_pub_participant);
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
stopped += (uint32_t)lstatus.not_alive_count_change;
dds_sleepfor (DDS_MSECS(50));
} while (dds_time() < tstop);
dds_get_liveliness_changed_status(reader, &lstatus);
printf("writers alive with dds_assert_liveliness on participant: %d, writers stopped: %d\n", lstatus.alive_count, stopped);
if (lstatus.alive_count != wr_cnt_auto + wr_cnt_man_pp || stopped != wr_cnt_man_tp)
{
ldur *= 10 / (run + 1);
printf("incorrect number of writers alive or stopped writers\n");
}
else
{
test_finished = true;
}
}
}
/* cleanup */
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
for (size_t n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
if (!test_finished)
{
if (++run > 3)
{
CU_FAIL_FATAL("Run limit reached");
test_finished = true;
continue;
}
else
{
printf("restarting test with ldur %d\n", ldur);
}
}
} while (!test_finished);
}
#undef MAX_WRITERS

View file

@ -28,6 +28,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_vendor.c
ddsi_threadmon.c
ddsi_rhc.c
ddsi_pmd.c
q_addrset.c
q_bitset_inlines.c
q_bswap.c

View file

@ -0,0 +1,35 @@
/*
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_PMD_H
#define DDSI_PMD_H
#include "dds/ddsi/q_time.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct q_globals;
struct thread_state1;
struct ddsi_guid;
struct nn_xpack;
struct participant;
struct receiver_state;
void write_pmd_message_guid (struct q_globals * const gv, struct ddsi_guid *pp_guid, unsigned pmd_kind);
void write_pmd_message (struct thread_state1 * const ts1, struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind);
void handle_pmd_message (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len);
#if defined (__cplusplus)
}
#endif
#endif /* DDSI_PMD_H */

View file

@ -12,8 +12,10 @@
#ifndef Q_ENTITY_H
#define Q_ENTITY_H
#include "dds/export.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/sync.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_protocol.h"
@ -46,14 +48,25 @@ struct proxy_group;
struct proxy_endpoint_common;
typedef void (*ddsi2direct_directread_cb_t) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg);
/* Liveliness changed is more complicated than just add/remove. Encode the event
in status_cb_data_t::extra and ignore status_cb_data_t::add */
enum liveliness_changed_data_extra {
LIVELINESS_CHANGED_ADD_ALIVE,
LIVELINESS_CHANGED_ADD_NOT_ALIVE,
LIVELINESS_CHANGED_REMOVE_NOT_ALIVE,
LIVELINESS_CHANGED_REMOVE_ALIVE,
LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE,
LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE,
LIVELINESS_CHANGED_TWITCH
};
typedef struct status_cb_data
{
int raw_status_id;
uint32_t extra;
uint64_t handle;
bool add;
}
status_cb_data_t;
} status_cb_data_t;
typedef void (*status_cb_t) (void *entity, const status_cb_data_t *data);
@ -65,6 +78,8 @@ struct prd_wr_match {
struct rd_pwr_match {
ddsrt_avl_node_t avlnode;
ddsi_guid_t pwr_guid;
unsigned pwr_alive: 1; /* tracks pwr's alive state */
uint32_t pwr_alive_vclock; /* used to ensure progress */
#ifdef DDSI_INCLUDE_SSM
nn_locator_t ssm_mc_loc;
nn_locator_t ssm_src_loc;
@ -184,6 +199,7 @@ struct participant
int32_t user_refc; /* number of non-built-in endpoints in this participant [refc_lock] */
int32_t builtin_refc; /* number of built-in endpoints in this participant [refc_lock] */
int builtins_deleted; /* whether deletion of built-in endpoints has been initiated [refc_lock] */
ddsrt_fibheap_t ldur_auto_wr; /* Heap that contains lease duration for writers with automatic liveliness in this participant */
};
struct endpoint_common {
@ -205,6 +221,11 @@ enum writer_state {
typedef ddsrt_atomic_uint64_t seq_xmit_t;
struct ldur_fhnode {
ddsrt_fibheap_node_t heapnode;
dds_duration_t ldur;
};
struct writer
{
struct entity_common e;
@ -234,7 +255,7 @@ struct writer
struct addrset *as; /* set of addresses to publish to */
struct addrset *as_group; /* alternate case, used for SPDP, when using Cloud with multiple bootstrap locators */
struct xevent *heartbeat_xevent; /* timed event for "periodically" publishing heartbeats when unack'd data present, NULL <=> unreliable */
dds_duration_t lease_duration;
struct ldur_fhnode *lease_duration; /* fibheap node to keep lease duration for this writer, NULL in case of automatic liveliness with inifite duration */
struct whc *whc; /* WHC tracking history, T-L durability service history + samples by sequence number for retransmit */
uint32_t whc_low, whc_high; /* watermarks for WHC in bytes (counting only unack'd data) */
nn_etime_t t_rexmit_end; /* time of last 1->0 transition of "retransmitting" */
@ -300,7 +321,11 @@ struct proxy_participant
unsigned prismtech_bes; /* prismtech-specific extension of built-in endpoints set */
ddsi_guid_t privileged_pp_guid; /* if this PP depends on another PP for its SEDP writing */
struct nn_plist *plist; /* settings/QoS for this participant */
ddsrt_atomic_voidp_t lease; /* lease object for this participant, for automatic leases */
ddsrt_atomic_voidp_t minl_auto; /* lease object for shortest automatic liveliness pwr's lease (includes this proxypp's lease) */
ddsrt_fibheap_t leaseheap_auto; /* keeps leases for this proxypp and leases for pwrs (with liveliness automatic) */
ddsrt_atomic_voidp_t minl_man; /* lease object for shortest manual-by-participant liveliness pwr's lease */
ddsrt_fibheap_t leaseheap_man; /* keeps leases for this proxypp and leases for pwrs (with liveliness manual-by-participant) */
struct lease *lease; /* lease for this proxypp */
struct addrset *as_default; /* default address set to use for user data traffic */
struct addrset *as_meta; /* default address set to use for discovery traffic */
struct proxy_endpoint_common *endpoints; /* all proxy endpoints can be reached from here */
@ -311,6 +336,7 @@ struct proxy_participant
unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */
unsigned minimal_bes_mode: 1;
unsigned lease_expired: 1;
unsigned deleting: 1;
unsigned proxypp_have_spdp: 1;
unsigned proxypp_have_cm: 1;
unsigned owns_lease: 1;
@ -357,9 +383,11 @@ struct proxy_writer {
unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */
unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */
unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */
unsigned alive: 1; /* iff 1, the proxy writer is alive (lease for this proxy writer is not expired); field may be modified only when holding both pwr->e.lock and pwr->c.proxypp->e.lock */
#ifdef DDSI_INCLUDE_SSM
unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */
#endif
uint32_t alive_vclock; /* virtual clock counting transitions between alive/not-alive */
struct nn_defrag *defrag; /* defragmenter for this proxy writer; FIXME: perhaps shouldn't be for historical data */
struct nn_reorder *reorder; /* message reordering for this proxy writer, out-of-sync readers can have their own, see pwr_rd_match */
struct nn_dqueue *dqueue; /* delivery queue for asynchronous delivery (historical data is always delivered asynchronously) */
@ -367,6 +395,7 @@ struct proxy_writer {
struct local_reader_ary rdary; /* LOCAL readers for fast-pathing; if not fast-pathed, fall back to scanning local_readers */
ddsi2direct_directread_cb_t ddsi2direct_cb;
void *ddsi2direct_cbarg;
struct lease *lease;
};
struct proxy_reader {
@ -537,10 +566,14 @@ dds_return_t delete_participant (struct q_globals *gv, const struct ddsi_guid *p
void update_participant_plist (struct participant *pp, const struct nn_plist *plist);
uint64_t get_entity_instance_id (const struct q_globals *gv, const struct ddsi_guid *guid);
/* Gets the interval for PMD messages, which is the minimal lease duration for writers
with auto liveliness in this participant, or the participants lease duration if shorter */
DDS_EXPORT dds_duration_t pp_get_pmd_interval(struct participant *pp);
/* To obtain the builtin writer to be used for publishing SPDP, SEDP,
PMD stuff for PP and its endpoints, given the entityid. If PP has
its own writer, use it; else use the privileged participant. */
struct writer *get_builtin_writer (const struct participant *pp, unsigned entityid);
DDS_EXPORT struct writer *get_builtin_writer (const struct participant *pp, unsigned entityid);
/* To create a new DDSI writer or reader belonging to participant with
GUID "ppguid". May return NULL if participant unknown or
@ -612,6 +645,7 @@ void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct
void purge_proxy_participants (struct q_globals *gv, const nn_locator_t *loc, bool delete_from_as_disc);
/* To create a new proxy writer or reader; the proxy participant is
determined from the GUID and must exist. */
int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, const struct ddsi_guid *guid, struct addrset *as, const struct nn_plist *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp, seqno_t seq);
@ -632,6 +666,10 @@ int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_
void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
void proxy_writer_set_alive_may_unlock (struct proxy_writer *pwr, bool notify);
int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify);
void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify);
int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);

View file

@ -36,7 +36,7 @@ struct ddsi_guid;
EK_PROXY_READER
};
#define EK_NKINDS ((int) EK_PROXY_READER + 1)
struct ephash_enum
{
struct ddsrt_chh_iter it;
@ -80,15 +80,15 @@ void ephash_remove_reader_guid (struct ephash *eh, struct reader *rd);
void ephash_remove_proxy_writer_guid (struct ephash *eh, struct proxy_writer *pwr);
void ephash_remove_proxy_reader_guid (struct ephash *eh, struct proxy_reader *prd);
void *ephash_lookup_guid_untyped (const struct ephash *eh, const struct ddsi_guid *guid);
void *ephash_lookup_guid (const struct ephash *eh, const struct ddsi_guid *guid, enum entity_kind kind);
DDS_EXPORT void *ephash_lookup_guid_untyped (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT void *ephash_lookup_guid (const struct ephash *eh, const struct ddsi_guid *guid, enum entity_kind kind);
struct participant *ephash_lookup_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct proxy_participant *ephash_lookup_proxy_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct writer *ephash_lookup_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct reader *ephash_lookup_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct proxy_writer *ephash_lookup_proxy_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct participant *ephash_lookup_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_participant *ephash_lookup_proxy_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct writer *ephash_lookup_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct reader *ephash_lookup_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_writer *ephash_lookup_proxy_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
/* Enumeration of entries in the hash table:

View file

@ -12,6 +12,9 @@
#ifndef Q_LEASE_H
#define Q_LEASE_H
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/time.h"
#include "dds/ddsi/q_time.h"
#if defined (__cplusplus)
@ -20,21 +23,30 @@ extern "C" {
struct receiver_state;
struct participant;
struct lease;
struct entity_common;
struct q_globals; /* FIXME: make a special for the lease admin */
struct lease {
ddsrt_fibheap_node_t heapnode;
ddsrt_fibheap_node_t pp_heapnode;
nn_etime_t tsched; /* access guarded by leaseheap_lock */
ddsrt_atomic_uint64_t tend; /* really an nn_etime_t */
dds_duration_t tdur; /* constant (renew depends on it) */
struct entity_common *entity; /* constant */
};
int compare_lease_tsched (const void *va, const void *vb);
int compare_lease_tdur (const void *va, const void *vb);
void lease_management_init (struct q_globals *gv);
void lease_management_term (struct q_globals *gv);
struct lease *lease_new (nn_etime_t texpire, int64_t tdur, struct entity_common *e);
struct lease *lease_clone (const struct lease *l);
void lease_register (struct lease *l);
void lease_free (struct lease *l);
void lease_renew (struct lease *l, nn_etime_t tnow);
void lease_set_expiry (struct lease *l, nn_etime_t when);
int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow);
void handle_PMD (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len);
#if defined (__cplusplus)
}
#endif

View file

@ -42,7 +42,8 @@ int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *x
/* When calling the following functions, wr->lock must be held */
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync);
dds_return_t write_hb_liveliness (struct q_globals * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp);
#if defined (__cplusplus)
}

View file

@ -0,0 +1,160 @@
/*
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stdlib.h>
#include <string.h>
#include "dds/ddsi/ddsi_pmd.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_log.h"
#include "dds/ddsi/q_misc.h"
#include "dds/ddsi/q_protocol.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_transmit.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/sysdeps.h"
static void debug_print_rawdata (const struct q_globals *gv, const char *msg, const void *data, size_t len)
{
const unsigned char *c = data;
size_t i;
GVTRACE ("%s<", msg);
for (i = 0; i < len; i++)
{
if (32 < c[i] && c[i] <= 127)
GVTRACE ("%s%c", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
else
GVTRACE ("%s\\x%02x", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
}
GVTRACE (">");
}
void write_pmd_message_guid (struct q_globals * const gv, struct ddsi_guid *pp_guid, unsigned pmd_kind)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, gv);
struct participant *pp = ephash_lookup_participant_guid (gv->guid_hash, pp_guid);
if (pp == NULL)
GVTRACE ("write_pmd_message("PGUIDFMT") - builtin pmd writer not found\n", PGUID (*pp_guid));
else
write_pmd_message (ts1, NULL, pp, pmd_kind);
thread_state_asleep (ts1);
}
void write_pmd_message (struct thread_state1 * const ts1, struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind)
{
#define PMD_DATA_LENGTH 1
struct q_globals * const gv = pp->e.gv;
struct writer *wr;
union {
ParticipantMessageData_t pmd;
char pad[offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH];
} u;
struct ddsi_serdata *serdata;
struct ddsi_tkmap_instance *tk;
if ((wr = get_builtin_writer (pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER)) == NULL)
{
GVTRACE ("write_pmd_message("PGUIDFMT") - builtin pmd writer not found\n", PGUID (pp->e.guid));
return;
}
u.pmd.participantGuidPrefix = nn_hton_guid_prefix (pp->e.guid.prefix);
u.pmd.kind = ddsrt_toBE4u (pmd_kind);
u.pmd.length = PMD_DATA_LENGTH;
memset (u.pmd.value, 0, u.pmd.length);
struct ddsi_rawcdr_sample raw = {
.blob = &u,
.size = offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH,
.key = &u.pmd,
.keysize = 16
};
serdata = ddsi_serdata_from_sample (gv->rawcdr_topic, SDK_DATA, &raw);
serdata->timestamp = now ();
tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, serdata);
write_sample_nogc (ts1, xp, wr, serdata, tk);
ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
#undef PMD_DATA_LENGTH
}
void handle_pmd_message (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len)
{
const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */
const int bswap = (data->identifier == CDR_LE) ^ (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN);
struct proxy_participant *proxypp;
ddsi_guid_t ppguid;
struct lease *l;
RSTTRACE (" PMD ST%x", statusinfo);
if (data->identifier != CDR_LE && data->identifier != CDR_BE)
{
RSTTRACE (" PMD data->identifier %u !?\n", ntohs (data->identifier));
return;
}
switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER))
{
case 0:
if (offsetof (ParticipantMessageData_t, value) > len - sizeof (struct CDRHeader))
debug_print_rawdata (rst->gv, " SHORT1", data, len);
else
{
const ParticipantMessageData_t *pmd = (ParticipantMessageData_t *) (data + 1);
ddsi_guid_prefix_t p = nn_ntoh_guid_prefix (pmd->participantGuidPrefix);
uint32_t kind = ntohl (pmd->kind);
uint32_t length = bswap ? ddsrt_bswap4u (pmd->length) : pmd->length;
RSTTRACE (" pp %"PRIx32":%"PRIx32":%"PRIx32" kind %u data %u", p.u[0], p.u[1], p.u[2], kind, length);
if (len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value) < length)
debug_print_rawdata (rst->gv, " SHORT2", pmd->value, len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value));
else
debug_print_rawdata (rst->gv, "", pmd->value, length);
ppguid.prefix = p;
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if ((proxypp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL)
RSTTRACE (" PPunknown");
else if (kind == PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE &&
(l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL)
{
/* Renew lease for entity with shortest manual-by-participant lease */
lease_renew (l, now_et ());
}
}
break;
case NN_STATUSINFO_DISPOSE:
case NN_STATUSINFO_UNREGISTER:
case NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER:
/* Serialized key; BE or LE doesn't matter as both fields are
defined as octets. */
if (len < sizeof (struct CDRHeader) + sizeof (ddsi_guid_prefix_t))
debug_print_rawdata (rst->gv, " SHORT3", data, len);
else
{
ppguid.prefix = nn_ntoh_guid_prefix (*((ddsi_guid_prefix_t *) (data + 1)));
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if (delete_proxy_participant_by_guid (rst->gv, &ppguid, timestamp, 0) < 0)
RSTTRACE (" unknown");
else
RSTTRACE (" delete");
}
break;
}
RSTTRACE ("\n");
}

View file

@ -43,6 +43,7 @@
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/q_feature_check.h"
#include "dds/ddsi/ddsi_pmd.h"
static int get_locator (const struct q_globals *gv, nn_locator_t *loc, const nn_locators_t *locs, int uc_same_subnet)
{
@ -486,10 +487,8 @@ static void make_participants_dependent_on_ddsi2 (struct q_globals *gv, const dd
{
struct ephash_enum_proxy_participant it;
struct proxy_participant *pp, *d2pp;
struct lease *d2pp_lease;
if ((d2pp = ephash_lookup_proxy_participant_guid (gv->guid_hash, ddsi2guid)) == NULL)
return;
d2pp_lease = ddsrt_atomic_ldvoidp (&d2pp->lease);
ephash_enum_proxy_participant_init (&it, gv->guid_hash);
while ((pp = ephash_enum_proxy_participant_next (&it)) != NULL)
{
@ -499,7 +498,7 @@ static void make_participants_dependent_on_ddsi2 (struct q_globals *gv, const dd
ddsrt_mutex_lock (&pp->e.lock);
pp->privileged_pp_guid = *ddsi2guid;
ddsrt_mutex_unlock (&pp->e.lock);
proxy_participant_reassign_lease (pp, d2pp_lease);
proxy_participant_reassign_lease (pp, d2pp->lease);
GVTRACE ("\n");
if (ephash_lookup_proxy_participant_guid (gv->guid_hash, ddsi2guid) == NULL)
@ -614,9 +613,10 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_
int interesting = 0;
RSTTRACE ("SPDP ST0 "PGUIDFMT" (known)", PGUID (datap->participant_guid));
/* SPDP processing is so different from normal processing that we are
even skipping the automatic lease renewal. Therefore do it regardless
of gv.config.arrival_of_data_asserts_pp_and_ep_liveliness. */
lease_renew (ddsrt_atomic_ldvoidp (&proxypp->lease), now_et ());
even skipping the automatic lease renewal. Note that proxy writers
that are not alive are not set alive here. This is done only when
data is received from a particular pwr (in handle_regular) */
lease_renew (ddsrt_atomic_ldvoidp (&proxypp->minl_auto), now_et ());
ddsrt_mutex_lock (&proxypp->e.lock);
if (proxypp->implicitly_created || seq > proxypp->seq)
{
@ -1247,7 +1247,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn
GVLOGDISC (" "PGUIDFMT" attach-to-DS "PGUIDFMT, PGUID(pp->e.guid), PGUIDPREFIX(*src_guid_prefix), pp->privileged_pp_guid.entityid.u);
ddsrt_mutex_lock (&pp->e.lock);
pp->privileged_pp_guid.prefix = *src_guid_prefix;
lease_set_expiry(ddsrt_atomic_ldvoidp(&pp->lease), never);
lease_set_expiry(pp->lease, never);
ddsrt_mutex_unlock (&pp->e.lock);
}
GVLOGDISC ("\n");
@ -1357,13 +1357,9 @@ static void handle_SEDP_dead (const struct receiver_state *rst, nn_plist_t *data
}
GVLOGDISC (" "PGUIDFMT, PGUID (datap->endpoint_guid));
if (is_writer_entityid (datap->endpoint_guid.entityid))
{
res = delete_proxy_writer (gv, &datap->endpoint_guid, timestamp, 0);
}
else
{
res = delete_proxy_reader (gv, &datap->endpoint_guid, timestamp, 0);
}
GVLOGDISC (" %s\n", (res < 0) ? " unknown" : " delete");
}
@ -1768,7 +1764,7 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str
handle_SEDP (sampleinfo->rst, sampleinfo->seq, timestamp, statusinfo, datap, datasz);
break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
handle_PMD (sampleinfo->rst, timestamp, statusinfo, datap, datasz);
handle_pmd_message (sampleinfo->rst, timestamp, statusinfo, datap, datasz);
break;
case NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER:
handle_SEDP_CM (sampleinfo->rst, srcguid.entityid, timestamp, statusinfo, datap, datasz);

View file

@ -13,6 +13,7 @@
#include <string.h>
#include <stddef.h>
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/log.h"
#include "dds/ddsrt/sockets.h"
@ -63,6 +64,11 @@ struct deleted_participants_admin {
int64_t delay;
};
struct proxy_writer_alive_state {
bool alive;
uint32_t vclock;
};
static int compare_guid (const void *va, const void *vb);
static void augment_wr_prd_match (void *vnode, const void *vleft, const void *vright);
@ -403,6 +409,16 @@ static void remove_deleted_participant_guid (struct deleted_participants_admin *
/* PARTICIPANT ------------------------------------------------------ */
static int compare_ldur (const void *va, const void *vb)
{
const struct ldur_fhnode *a = va;
const struct ldur_fhnode *b = vb;
return (a->ldur == b->ldur) ? 0 : (a->ldur < b->ldur) ? -1 : 1;
}
/* used in participant for keeping writer liveliness renewal */
const ddsrt_fibheap_def_t ldur_fhdef = DDSRT_FIBHEAPDEF_INITIALIZER(offsetof (struct ldur_fhnode, heapnode), compare_ldur);
static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, const dds_qos_t *xqos, nn_wctime_t timestamp)
{
uint64_t mask;
@ -515,6 +531,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
ddsrt_mutex_init (&pp->refc_lock);
inverse_uint32_set_init(&pp->avail_entityids.x, 1, UINT32_MAX / NN_ENTITYID_ALLOCSTEP);
pp->lease_duration = gv->config.lease_duration;
ddsrt_fibheap_init (&ldur_fhdef, &pp->ldur_auto_wr);
pp->plist = ddsrt_malloc (sizeof (*pp->plist));
nn_plist_copy (pp->plist, plist);
nn_plist_mergein_missing (pp->plist, &gv->default_local_plist_pp, ~(uint64_t)0, ~(uint64_t)0);
@ -1003,6 +1020,19 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
return ephash_lookup_writer_guid (pp->e.gv->guid_hash, &bwr_guid);
}
dds_duration_t pp_get_pmd_interval (struct participant *pp)
{
struct ldur_fhnode *ldur_node;
dds_duration_t intv;
ddsrt_mutex_lock (&pp->e.lock);
ldur_node = ddsrt_fibheap_min (&ldur_fhdef, &pp->ldur_auto_wr);
intv = (ldur_node != NULL) ? ldur_node->ldur : T_NEVER;
if (pp->lease_duration < intv)
intv = pp->lease_duration;
ddsrt_mutex_unlock (&pp->e.lock);
return intv;
}
/* WRITER/READER/PROXY-WRITER/PROXY-READER CONNECTION ---------------
These are all located in a separate section because they are so
@ -1406,6 +1436,19 @@ static void free_wr_rd_match (struct wr_rd_match *m)
if (m) ddsrt_free (m);
}
static void proxy_writer_get_alive_state_locked (struct proxy_writer *pwr, struct proxy_writer_alive_state *st)
{
st->alive = pwr->alive;
st->vclock = pwr->alive_vclock;
}
static void proxy_writer_get_alive_state (struct proxy_writer *pwr, struct proxy_writer_alive_state *st)
{
ddsrt_mutex_lock (&pwr->e.lock);
proxy_writer_get_alive_state_locked (pwr, st);
ddsrt_mutex_unlock (&pwr->e.lock);
}
static void writer_drop_connection (const struct ddsi_guid *wr_guid, const struct proxy_reader *prd)
{
struct writer *wr;
@ -1463,6 +1506,55 @@ static void writer_drop_local_connection (const struct ddsi_guid *wr_guid, struc
}
}
static void reader_update_notify_pwr_alive_state (struct reader *rd, const struct proxy_writer *pwr, const struct proxy_writer_alive_state *alive_state)
{
struct rd_pwr_match *m;
bool notify = false;
int delta = 0; /* -1: alive -> not_alive; 0: unchanged; 1: not_alive -> alive */
ddsrt_mutex_lock (&rd->e.lock);
if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL)
{
if ((int32_t) (alive_state->vclock - m->pwr_alive_vclock) > 0)
{
delta = (int) alive_state->alive - (int) m->pwr_alive;
notify = true;
m->pwr_alive = alive_state->alive;
m->pwr_alive_vclock = alive_state->vclock;
}
}
ddsrt_mutex_unlock (&rd->e.lock);
if (delta < 0 && rd->rhc)
{
struct ddsi_writer_info wrinfo;
ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos);
ddsi_rhc_unregister_wr (rd->rhc, &wrinfo);
}
/* Liveliness changed events can race each other and can, potentially, be delivered
in a different order. */
if (notify && rd->status_cb)
{
status_cb_data_t data;
data.handle = pwr->e.iid;
if (delta == 0)
data.extra = (uint32_t) LIVELINESS_CHANGED_TWITCH;
else if (delta < 0)
data.extra = (uint32_t) LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE;
else
data.extra = (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
}
static void reader_update_notify_pwr_alive_state_guid (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, const struct proxy_writer_alive_state *alive_state)
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
reader_update_notify_pwr_alive_state (rd, pwr, alive_state);
}
static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr)
{
struct reader *rd;
@ -1484,9 +1576,9 @@ static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struc
if (rd->status_cb)
{
status_cb_data_t data;
data.add = false;
data.handle = pwr->e.iid;
data.add = false;
data.extra = (uint32_t) (m->pwr_alive ? LIVELINESS_CHANGED_REMOVE_ALIVE : LIVELINESS_CHANGED_REMOVE_NOT_ALIVE);
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
@ -1521,9 +1613,9 @@ static void reader_drop_local_connection (const struct ddsi_guid *rd_guid, const
if (rd->status_cb)
{
status_cb_data_t data;
data.add = false;
data.handle = wr->e.iid;
data.add = false;
data.extra = (uint32_t) LIVELINESS_CHANGED_REMOVE_ALIVE;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
@ -1762,12 +1854,14 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
}
}
static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count)
static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count, const struct proxy_writer_alive_state *alive_state)
{
struct rd_pwr_match *m = ddsrt_malloc (sizeof (*m));
ddsrt_avl_ipath_t path;
m->pwr_guid = pwr->e.guid;
m->pwr_alive = alive_state->alive;
m->pwr_alive_vclock = alive_state->vclock;
ddsrt_mutex_lock (&rd->e.lock);
@ -1821,9 +1915,14 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr,
if (rd->status_cb)
{
status_cb_data_t data;
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
data.add = true;
data.handle = pwr->e.iid;
data.add = true;
data.extra = (uint32_t) (alive_state->alive ? LIVELINESS_CHANGED_ADD_ALIVE : LIVELINESS_CHANGED_ADD_NOT_ALIVE);
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
}
@ -1855,14 +1954,15 @@ static void reader_add_local_connection (struct reader *rd, struct writer *wr)
if (rd->status_cb)
{
status_cb_data_t data;
data.add = true;
data.handle = wr->e.iid;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
data.add = true;
data.extra = (uint32_t) LIVELINESS_CHANGED_ADD_ALIVE;
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
}
}
@ -1980,16 +2080,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
qxev_pwr_entityid (pwr, &rd->e.guid.prefix);
ELOGDISC (pwr, "\n");
if (rd->status_cb)
{
status_cb_data_t data;
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
data.add = true;
data.handle = pwr->e.iid;
(rd->status_cb) (rd->status_cb_entity, &data);
}
return;
already_matched:
@ -2161,6 +2251,7 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r
const int isb1 = (is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) != 0);
dds_qos_policy_id_t reason;
nn_count_t init_count;
struct proxy_writer_alive_state alive_state;
if (isb0 != isb1)
return;
if (rd->e.onlylocal)
@ -2170,8 +2261,18 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r
reader_qos_mismatch (rd, reason);
return;
}
reader_add_connection (rd, pwr, &init_count);
/* Initialze the reader's tracking information for the writer liveliness state to something
sensible, but that may be outdated by the time the reader gets added to the writer's list
of matching readers. */
proxy_writer_get_alive_state (pwr, &alive_state);
reader_add_connection (rd, pwr, &init_count, &alive_state);
proxy_writer_add_connection (pwr, rd, tnow, init_count);
/* Once everything is set up: update with the latest state, any updates to the alive state
happening in parallel will cause this to become a no-op. */
proxy_writer_get_alive_state (pwr, &alive_state);
reader_update_notify_pwr_alive_state (rd, pwr, &alive_state);
}
static bool ignore_local_p (const ddsi_guid_t *guid1, const ddsi_guid_t *guid2, const struct dds_qos *xqos1, const struct dds_qos *xqos2)
@ -2857,13 +2958,17 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
{
wr->heartbeat_xevent = NULL;
}
assert (wr->xqos->present & QP_LIVELINESS);
if (wr->xqos->liveliness.kind != DDS_LIVELINESS_AUTOMATIC || wr->xqos->liveliness.lease_duration != T_NEVER)
if (wr->xqos->liveliness.kind == DDS_LIVELINESS_AUTOMATIC && wr->xqos->liveliness.lease_duration != T_NEVER)
{
ELOGDISC (wr, "writer "PGUIDFMT": incorrectly treating it as of automatic liveliness kind with lease duration = inf (%d, %"PRId64")\n",
PGUID (wr->e.guid), (int) wr->xqos->liveliness.kind, wr->xqos->liveliness.lease_duration);
wr->lease_duration = ddsrt_malloc (sizeof(*wr->lease_duration));
wr->lease_duration->ldur = wr->xqos->liveliness.lease_duration;
}
else
{
wr->lease_duration = NULL;
}
wr->lease_duration = T_NEVER; /* FIXME */
wr->whc = whc;
if (wr->xqos->history.kind == DDS_HISTORY_KEEP_LAST)
@ -2930,10 +3035,19 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct ddsi_g
match_writer_with_local_readers (wr, tnow);
sedp_write_writer (wr);
if (wr->lease_duration != T_NEVER)
if (wr->lease_duration != NULL)
{
nn_mtime_t tsched = { 0 };
(void) resched_xevent_if_earlier (pp->pmd_update_xevent, tsched);
assert (wr->lease_duration->ldur != T_NEVER);
assert (wr->xqos->liveliness.kind == DDS_LIVELINESS_AUTOMATIC);
assert (!is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE));
/* Store writer lease duration in participant's heap in case of automatic liveliness */
ddsrt_mutex_lock (&pp->e.lock);
ddsrt_fibheap_insert (&ldur_fhdef, &pp->ldur_auto_wr, wr->lease_duration);
ddsrt_mutex_unlock (&pp->e.lock);
/* Trigger pmd update */
(void) resched_xevent_if_earlier (pp->pmd_update_xevent, now_mt ());
}
return 0;
@ -3026,6 +3140,11 @@ static void gc_delete_writer (struct gcreq *gcreq)
reader_drop_local_connection (&m->rd_guid, wr);
free_wr_rd_match (m);
}
if (wr->lease_duration != NULL)
{
assert (wr->lease_duration->ldur == DDS_DURATION_INVALID);
ddsrt_free (wr->lease_duration);
}
/* Do last gasp on SEDP and free writer. */
if (!is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE))
@ -3106,6 +3225,13 @@ dds_return_t delete_writer_nolinger_locked (struct writer *wr)
local_reader_ary_setinvalid (&wr->rdary);
ephash_remove_writer_guid (wr->e.gv->guid_hash, wr);
writer_set_state (wr, WRST_DELETING);
if (wr->lease_duration != NULL) {
ddsrt_mutex_lock (&wr->c.pp->e.lock);
ddsrt_fibheap_delete (&ldur_fhdef, &wr->c.pp->ldur_auto_wr, wr->lease_duration);
ddsrt_mutex_unlock (&wr->c.pp->e.lock);
wr->lease_duration->ldur = DDS_DURATION_INVALID;
resched_xevent_if_earlier (wr->c.pp->pmd_update_xevent, now_mt ());
}
gcreq_writer (wr);
return 0;
}
@ -3352,11 +3478,6 @@ static dds_return_t new_reader_guid
ddsi_rhc_set_qos (rd->rhc, rd->xqos);
}
assert (rd->xqos->present & QP_LIVELINESS);
if (rd->xqos->liveliness.kind != DDS_LIVELINESS_AUTOMATIC || rd->xqos->liveliness.lease_duration != T_NEVER)
{
ELOGDISC (rd, "reader "PGUIDFMT": incorrectly treating it as of automatic liveliness kind with lease duration = inf (%d, %"PRId64")\n",
PGUID (rd->e.guid), (int) rd->xqos->liveliness.kind, rd->xqos->liveliness.lease_duration);
}
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
rd->as = new_addrset ();
@ -3533,6 +3654,8 @@ void update_reader_qos (struct reader *rd, const dds_qos_t *xqos)
}
/* PROXY-PARTICIPANT ------------------------------------------------ */
const ddsrt_fibheap_def_t lease_fhdef_proxypp = DDSRT_FIBHEAPDEF_INITIALIZER(offsetof (struct lease, pp_heapnode), compare_lease_tdur);
static void gc_proxy_participant_lease (struct gcreq *gcreq)
{
lease_free (gcreq->arg);
@ -3558,16 +3681,95 @@ void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct
{
const nn_etime_t never = { T_NEVER };
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
struct lease *oldlease = ddsrt_atomic_ldvoidp (&proxypp->lease);
struct lease *oldlease = proxypp->lease;
lease_renew (oldlease, never);
gcreq->arg = oldlease;
gcreq_enqueue (gcreq);
proxypp->owns_lease = 0;
}
ddsrt_atomic_stvoidp (&proxypp->lease, newlease);
proxypp->lease = newlease;
/* FIXME: replace proxypp lease in leaseheap_auto? */
ddsrt_mutex_unlock (&proxypp->e.lock);
}
static void proxy_participant_replace_minl (struct proxy_participant *proxypp, bool manbypp, struct lease *lnew)
{
/* By loading/storing the pointer atomically, we ensure we always
read a valid (or once valid) lease. By delaying freeing the lease
through the garbage collector, we ensure whatever lease update
occurs in parallel completes before the memory is released. */
const nn_etime_t never = { T_NEVER };
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
struct lease *lease_old = ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto);
lease_renew (lease_old, never); /* ensures lease will not expire while it is replaced */
gcreq->arg = lease_old;
gcreq_enqueue (gcreq);
ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, lnew);
}
static void proxy_participant_add_pwr_lease_locked (struct proxy_participant * proxypp, const struct proxy_writer * pwr)
{
struct lease *minl_prev;
struct lease *minl_new;
ddsrt_fibheap_t *lh;
bool manbypp;
assert (pwr->lease != NULL);
manbypp = (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT);
lh = manbypp ? &proxypp->leaseheap_man : &proxypp->leaseheap_auto;
minl_prev = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
ddsrt_fibheap_insert (&lease_fhdef_proxypp, lh, pwr->lease);
minl_new = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
/* if inserted lease is new shortest lease */
if (proxypp->owns_lease && minl_prev != minl_new)
{
nn_etime_t texp = add_duration_to_etime (now_et (), minl_new->tdur);
struct lease *lnew = lease_new (texp, minl_new->tdur, minl_new->entity);
if (minl_prev == NULL)
{
assert (manbypp);
assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL);
ddsrt_atomic_stvoidp (&proxypp->minl_man, lnew);
}
else
{
proxy_participant_replace_minl (proxypp, manbypp, lnew);
}
lease_register (lnew);
}
}
static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant * proxypp, struct proxy_writer * pwr)
{
struct lease *minl;
bool manbypp;
ddsrt_fibheap_t *lh;
assert (pwr->lease != NULL);
manbypp = (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT);
lh = manbypp ? &proxypp->leaseheap_man : &proxypp->leaseheap_auto;
minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
ddsrt_fibheap_delete (&lease_fhdef_proxypp, lh, pwr->lease);
/* if pwr with min lease is removed: update proxypp lease to use new minimal duration */
if (proxypp->owns_lease && pwr->lease == minl)
{
if ((minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh)) != NULL)
{
dds_duration_t trem = minl->tdur - pwr->lease->tdur;
assert (trem >= 0);
nn_etime_t texp = add_duration_to_etime (now_et(), trem);
struct lease *lnew = lease_new (texp, minl->tdur, minl->entity);
proxy_participant_replace_minl (proxypp, manbypp, lnew);
lease_register (lnew);
}
else
{
assert (manbypp);
proxy_participant_replace_minl (proxypp, manbypp, NULL);
}
}
}
void new_proxy_participant
(
struct q_globals *gv,
@ -3601,6 +3803,7 @@ void new_proxy_participant
entity_common_init (&proxypp->e, gv, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false);
proxypp->refc = 1;
proxypp->lease_expired = 0;
proxypp->deleting = 0;
proxypp->vendor = vendor;
proxypp->bes = bes;
proxypp->prismtech_bes = prismtech_bes;
@ -3628,16 +3831,39 @@ void new_proxy_participant
privpp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &proxypp->privileged_pp_guid);
if (privpp != NULL && privpp->is_ddsi2_pp)
{
ddsrt_atomic_stvoidp (&proxypp->lease, ddsrt_atomic_ldvoidp (&privpp->lease));
proxypp->lease = privpp->lease;
proxypp->owns_lease = 0;
}
else
{
/* Lease duration is meaningless when the lease never expires, but when proxy participants are created implicitly because of endpoint discovery from a cloud service, we do want the lease to expire eventually when the cloud discovery service disappears and never reappears. The normal data path renews the lease, so if the lease expiry is changed after the DS disappears but data continues to flow (even if it is only a single sample) the proxy participant would immediately go back to a non-expiring lease with no further triggers for deleting it. Instead, we take tlease_dur == NEVER as a special value meaning a lease that doesn't expire now and that has a "reasonable" lease duration. That way the lease renewal in the data path is fine, and we only need to do something special in SEDP handling. */
/* Lease duration is meaningless when the lease never expires, but when proxy participants are
created implicitly because of endpoint discovery from a cloud service, we do want the lease to expire
eventually when the cloud discovery service disappears and never reappears. The normal data path renews
the lease, so if the lease expiry is changed after the DS disappears but data continues to flow (even if
it is only a single sample) the proxy participant would immediately go back to a non-expiring lease with
no further triggers for deleting it. Instead, we take tlease_dur == NEVER as a special value meaning a
lease that doesn't expire now and that has a "reasonable" lease duration. That way the lease renewal in
the data path is fine, and we only need to do something special in SEDP handling. */
nn_etime_t texp = add_duration_to_etime (now_et(), tlease_dur);
dds_duration_t dur = (tlease_dur == T_NEVER) ? gv->config.lease_duration : tlease_dur;
ddsrt_atomic_stvoidp (&proxypp->lease, lease_new (texp, dur, &proxypp->e));
proxypp->lease = lease_new (texp, dur, &proxypp->e);
proxypp->owns_lease = 1;
/* Init heap for leases */
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_auto);
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_man);
/* Add the proxypp lease to heap so that monitoring liveliness will include this lease
and uses the shortest duration for proxypp and all its pwr's (with automatic liveliness) */
ddsrt_fibheap_insert (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
/* Set the shortest lease for auto liveliness: clone proxypp's lease and store the clone in
proxypp->minl_auto. As there are no pwr's at this point, the proxy pp's lease is the
shortest lease. When a pwr with a shorter is added, the lease in minl_auto is replaced
by the lease from the proxy writer in proxy_participant_add_pwr_lease_locked. This old shortest
lease is freed, so that's why we need a clone and not the proxypp's lease in the heap. */
ddsrt_atomic_stvoidp (&proxypp->minl_auto, (void *) lease_clone (proxypp->lease));
ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL);
}
}
@ -3746,15 +3972,12 @@ void new_proxy_participant
nn_plist_fini (&plist_rd);
}
/* Register lease, but be careful not to accidentally re-register
DDSI2's lease, as we may have become dependent on DDSI2 any time
after ephash_insert_proxy_participant_guid even if
privileged_pp_guid was NULL originally */
/* Register lease for auto liveliness, but be careful not to accidentally re-register
DDSI2's lease, as we may have become dependent on DDSI2 any time after
ephash_insert_proxy_participant_guid even if privileged_pp_guid was NULL originally */
ddsrt_mutex_lock (&proxypp->e.lock);
if (proxypp->owns_lease)
lease_register (ddsrt_atomic_ldvoidp (&proxypp->lease));
lease_register (ddsrt_atomic_ldvoidp (&proxypp->minl_auto));
builtintopic_write (gv->builtin_topic_interface, &proxypp->e, timestamp, true);
ddsrt_mutex_unlock (&proxypp->e.lock);
}
@ -3796,9 +4019,14 @@ int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t s
return 0;
}
static void ref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
static int ref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
{
ddsrt_mutex_lock (&proxypp->e.lock);
if (proxypp->deleting)
{
ddsrt_mutex_unlock (&proxypp->e.lock);
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
c->proxypp = proxypp;
proxypp->refc++;
@ -3810,6 +4038,8 @@ static void ref_proxy_participant (struct proxy_participant *proxypp, struct pro
}
proxypp->endpoints = c;
ddsrt_mutex_unlock (&proxypp->e.lock);
return DDS_RETCODE_OK;
}
static void unref_proxy_participant (struct proxy_participant *proxypp, struct proxy_endpoint_common *c)
@ -3833,14 +4063,23 @@ static void unref_proxy_participant (struct proxy_participant *proxypp, struct p
if (refc == 0)
{
assert (proxypp->endpoints == NULL);
if (proxypp->owns_lease)
{
struct lease * minl_auto = ddsrt_atomic_ldvoidp (&proxypp->minl_auto);
ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto) == NULL);
assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL);
assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL);
assert (!compare_guid (&minl_auto->entity->guid, &proxypp->e.guid));
lease_free (minl_auto);
lease_free (proxypp->lease);
}
ddsrt_mutex_unlock (&proxypp->e.lock);
ELOGDISC (proxypp, "unref_proxy_participant("PGUIDFMT"): refc=0, freeing\n", PGUID (proxypp->e.guid));
unref_addrset (proxypp->as_default);
unref_addrset (proxypp->as_meta);
nn_plist_fini (proxypp->plist);
ddsrt_free (proxypp->plist);
if (proxypp->owns_lease)
lease_free (ddsrt_atomic_ldvoidp (&proxypp->lease));
entity_common_fini (&proxypp->e);
remove_deleted_participant_guid (proxypp->e.gv->deleted_participants, &proxypp->e.guid, DPG_LOCAL | DPG_REMOTE);
ddsrt_free (proxypp);
@ -3901,15 +4140,17 @@ static void delete_or_detach_dependent_pp (struct proxy_participant *p, struct p
/* Clear dependency (but don't touch entity id, which must be 0x1c1) and set the lease ticking */
ELOGDISC (p, PGUIDFMT" detach-from-DS "PGUIDFMT"\n", PGUID(p->e.guid), PGUID(proxypp->e.guid));
memset (&p->privileged_pp_guid.prefix, 0, sizeof (p->privileged_pp_guid.prefix));
lease_set_expiry (ddsrt_atomic_ldvoidp (&p->lease), texp);
lease_set_expiry (p->lease, texp);
/* FIXME: replace in p->leaseheap_auto and get new minl_auto */
ddsrt_mutex_unlock (&p->e.lock);
}
}
static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp, int isimplicit)
{
struct proxy_endpoint_common * c;
int ret;
ddsi_entityid_t *eps;
ddsi_guid_t ep_guid;
uint32_t ep_count = 0;
/* if any proxy participants depend on this participant, delete them */
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting dependent proxy participants\n", PGUID (proxypp->e.guid));
@ -3922,31 +4163,38 @@ static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp
ephash_enum_proxy_participant_fini (&est);
}
/* delete_proxy_{reader,writer} merely schedules the actual delete
operation, so we can hold the lock -- at least, for now. */
ddsrt_mutex_lock (&proxypp->e.lock);
proxypp->deleting = 1;
if (isimplicit)
proxypp->lease_expired = 1;
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting endpoints\n", PGUID (proxypp->e.guid));
c = proxypp->endpoints;
while (c)
/* Get snapshot of endpoints list so that we can release proxypp->e.lock
Pwrs/prds may be deleted during the iteration over the entities,
but resolving the guid will fail for these entities and the our
call to delete_proxy_writer/reader returns. */
{
struct entity_common *e = entity_common_from_proxy_endpoint_common (c);
if (is_writer_entityid (e->guid.entityid))
eps = ddsrt_malloc (proxypp->refc * sizeof(ddsi_entityid_t));
struct proxy_endpoint_common *cep = proxypp->endpoints;
while (cep)
{
ret = delete_proxy_writer (proxypp->e.gv, &e->guid, timestamp, isimplicit);
const struct entity_common *entc = entity_common_from_proxy_endpoint_common (cep);
eps[ep_count++] = entc->guid.entityid;
cep = cep->next_ep;
}
else
{
ret = delete_proxy_reader (proxypp->e.gv, &e->guid, timestamp, isimplicit);
}
(void) ret;
c = c->next_ep;
}
ddsrt_mutex_unlock (&proxypp->e.lock);
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting endpoints\n", PGUID (proxypp->e.guid));
ep_guid.prefix = proxypp->e.guid.prefix;
for (uint32_t n = 0; n < ep_count; n++)
{
ep_guid.entityid = eps[n];
if (is_writer_entityid (ep_guid.entityid))
delete_proxy_writer (proxypp->e.gv, &ep_guid, timestamp, isimplicit);
else
delete_proxy_reader (proxypp->e.gv, &ep_guid, timestamp, isimplicit);
}
ddsrt_free (eps);
gcreq_proxy_participant (proxypp);
}
@ -4023,9 +4271,10 @@ uint64_t get_entity_instance_id (const struct q_globals *gv, const struct ddsi_g
/* PROXY-ENDPOINT --------------------------------------------------- */
static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct ddsi_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
static int proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct ddsi_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
{
const char *name;
int ret;
if (is_builtin_entityid (guid->entityid, proxypp->vendor))
assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == 0);
@ -4044,7 +4293,16 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
else
memset (&c->group_guid, 0, sizeof (c->group_guid));
ref_proxy_participant (proxypp, c);
if ((ret = ref_proxy_participant (proxypp, c)) != DDS_RETCODE_OK)
{
nn_xqos_fini (c->xqos);
ddsrt_free (c->xqos);
unref_addrset (c->as);
entity_common_fini (e);
return ret;
}
return DDS_RETCODE_OK;
}
static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_endpoint_common *c)
@ -4064,6 +4322,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
struct proxy_writer *pwr;
int isreliable;
nn_mtime_t tnow = now_mt ();
int ret;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_writer_guid (gv->guid_hash, guid) == NULL);
@ -4075,7 +4334,11 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
}
pwr = ddsrt_malloc (sizeof (*pwr));
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist);
if ((ret = proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist)) != DDS_RETCODE_OK)
{
ddsrt_free (pwr);
return ret;
}
ddsrt_avl_init (&pwr_readers_treedef, &pwr->readers);
pwr->n_reliable_readers = 0;
@ -4084,6 +4347,8 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
pwr->last_fragnum = ~0u;
pwr->nackfragcount = 0;
pwr->last_fragnum_reset = 0;
pwr->alive = 1;
pwr->alive_vclock = 0;
ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1);
if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) {
/* The DDSI built-in proxy writers always deliver
@ -4107,17 +4372,25 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
#endif
assert (pwr->c.xqos->present & QP_LIVELINESS);
if (pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_AUTOMATIC)
GVLOGDISC (" FIXME: only AUTOMATIC liveliness supported");
#if 0
pwr->tlease_dur = nn_from_ddsi_duration (pwr->c.xqos->liveliness.lease_duration);
if (pwr->tlease_dur == 0)
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
{
GVLOGDISC (" FIXME: treating lease_duration=0 as inf");
pwr->tlease_dur = T_NEVER;
nn_etime_t texpire = add_duration_to_etime (now_et (), pwr->c.xqos->liveliness.lease_duration);
pwr->lease = lease_new (texpire, pwr->c.xqos->liveliness.lease_duration, &pwr->e);
if (pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
{
ddsrt_mutex_lock (&proxypp->e.lock);
proxy_participant_add_pwr_lease_locked (proxypp, pwr);
ddsrt_mutex_unlock (&proxypp->e.lock);
}
else
{
lease_register (pwr->lease);
}
}
else
{
pwr->lease = NULL;
}
pwr->tlease_end = add_duration_to_wctime (tnow, pwr->tlease_dur);
#endif
if (isreliable)
{
@ -4251,7 +4524,6 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
struct proxy_writer *pwr = gcreq->arg;
ELOGDISC (pwr, "gc_delete_proxy_writer(%p, "PGUIDFMT")\n", (void *) gcreq, PGUID (pwr->e.guid));
gcreq_free (gcreq);
while (!ddsrt_avl_is_empty (&pwr->readers))
{
struct pwr_rd_match *m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers);
@ -4261,17 +4533,22 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
free_pwr_rd_match (m);
}
local_reader_ary_fini (&pwr->rdary);
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
lease_free (pwr->lease);
proxy_endpoint_common_fini (&pwr->e, &pwr->c);
nn_defrag_free (pwr->defrag);
nn_reorder_free (pwr->reorder);
ddsrt_free (pwr);
}
/* First stage in deleting the proxy writer. In this function the pwr and its member pointers
will remain valid. The real cleaning-up is done async in gc_delete_proxy_writer. */
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit)
{
struct proxy_writer *pwr;
(void)isimplicit;
DDSRT_UNUSED_ARG (isimplicit);
GVLOGDISC ("delete_proxy_writer ("PGUIDFMT") ", PGUID (*guid));
ddsrt_mutex_lock (&gv->lock);
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
{
@ -4288,8 +4565,93 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (gv->guid_hash, pwr);
ddsrt_mutex_unlock (&gv->lock);
if (proxy_writer_set_notalive (pwr, false) != DDS_RETCODE_OK)
GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid));
gcreq_proxy_writer (pwr);
return 0;
return DDS_RETCODE_OK;
}
static void proxy_writer_notify_liveliness_change_may_unlock (struct proxy_writer *pwr)
{
struct proxy_writer_alive_state alive_state;
proxy_writer_get_alive_state_locked (pwr, &alive_state);
struct ddsi_guid rdguid;
struct pwr_rd_match *m;
memset (&rdguid, 0, sizeof (rdguid));
while (pwr->alive_vclock == alive_state.vclock &&
(m = ddsrt_avl_lookup_succ (&pwr_readers_treedef, &pwr->readers, &rdguid)) != NULL)
{
rdguid = m->rd_guid;
ddsrt_mutex_unlock (&pwr->e.lock);
/* unlocking pwr means alive state may have changed already; we break out of the loop once we
detect this but there for the reader in the current iteration, anything is possible */
reader_update_notify_pwr_alive_state_guid (&rdguid, pwr, &alive_state);
ddsrt_mutex_lock (&pwr->e.lock);
}
}
void proxy_writer_set_alive_may_unlock (struct proxy_writer *pwr, bool notify)
{
/* Caller has pwr->e.lock, so we can safely read pwr->alive. Updating pwr->alive requires
also taking pwr->c.proxypp->e.lock because pwr->alive <=> (pwr->lease in proxypp's lease
heap). */
assert (!pwr->alive);
/* check that proxy writer still exists (when deleting it is removed from guid hash) */
if (ephash_lookup_proxy_writer_guid (pwr->e.gv->guid_hash, &pwr->e.guid) == NULL)
{
ELOGDISC (pwr, "proxy_writer_set_alive_may_unlock("PGUIDFMT") - not in guid_hash, pwr deleting\n", PGUID (pwr->e.guid));
return;
}
ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
pwr->alive = true;
pwr->alive_vclock++;
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_add_pwr_lease_locked (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
if (notify)
proxy_writer_notify_liveliness_change_may_unlock (pwr);
}
int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify)
{
/* Caller should not have taken pwr->e.lock and pwr->c.proxypp->e.lock;
* this function takes both locks to update pwr->alive value */
ddsrt_mutex_lock (&pwr->e.lock);
if (!pwr->alive)
{
ddsrt_mutex_unlock (&pwr->e.lock);
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
ddsrt_mutex_lock (&pwr->c.proxypp->e.lock);
pwr->alive = false;
pwr->alive_vclock++;
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock);
if (notify)
proxy_writer_notify_liveliness_change_may_unlock (pwr);
ddsrt_mutex_unlock (&pwr->e.lock);
return DDS_RETCODE_OK;
}
void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify)
{
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, pwrguid)) == NULL)
GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*pwrguid));
else
{
GVLOGDISC ("proxy_writer_set_notalive_guid ("PGUIDFMT")", PGUID (*pwrguid));
if (proxy_writer_set_notalive (pwr, notify) == DDS_RETCODE_PRECONDITION_NOT_MET)
GVLOGDISC (" pwr was not alive");
GVLOGDISC ("\n");
}
}
/* PROXY-READER ----------------------------------------------------- */
@ -4303,6 +4665,7 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
struct proxy_participant *proxypp;
struct proxy_reader *prd;
nn_mtime_t tnow = now_mt ();
int ret;
assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_reader_guid (gv->guid_hash, guid) == NULL);
@ -4314,7 +4677,11 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
}
prd = ddsrt_malloc (sizeof (*prd));
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist);
if ((ret = proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist)) != DDS_RETCODE_OK)
{
ddsrt_free (prd);
return ret;
}
prd->deleting = 0;
#ifdef DDSI_INCLUDE_SSM
@ -4331,7 +4698,7 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
ddsrt_mutex_unlock (&prd->e.lock);
match_proxy_reader_with_writers (prd, tnow);
return 0;
return DDS_RETCODE_OK;
}
static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *prd)

View file

@ -43,30 +43,27 @@
!= 0 -- and note that it had better be 2's complement machine! */
#define TSCHED_NOT_ON_HEAP INT64_MIN
struct lease {
ddsrt_fibheap_node_t heapnode;
nn_etime_t tsched; /* access guarded by leaseheap_lock */
ddsrt_atomic_uint64_t tend; /* really an nn_etime_t */
dds_duration_t tdur; /* constant (renew depends on it) */
struct entity_common *entity; /* constant */
};
static int compare_lease_tsched (const void *va, const void *vb);
static const ddsrt_fibheap_def_t lease_fhdef = DDSRT_FIBHEAPDEF_INITIALIZER(offsetof (struct lease, heapnode), compare_lease_tsched);
const ddsrt_fibheap_def_t lease_fhdef = DDSRT_FIBHEAPDEF_INITIALIZER (offsetof (struct lease, heapnode), compare_lease_tsched);
static void force_lease_check (struct gcreq_queue *gcreq_queue)
{
gcreq_enqueue (gcreq_new (gcreq_queue, gcreq_free));
}
static int compare_lease_tsched (const void *va, const void *vb)
int compare_lease_tsched (const void *va, const void *vb)
{
const struct lease *a = va;
const struct lease *b = vb;
return (a->tsched.v == b->tsched.v) ? 0 : (a->tsched.v < b->tsched.v) ? -1 : 1;
}
int compare_lease_tdur (const void *va, const void *vb)
{
const struct lease *a = va;
const struct lease *b = vb;
return (a->tdur == b->tdur) ? 0 : (a->tdur < b->tdur) ? -1 : 1;
}
void lease_management_init (struct q_globals *gv)
{
ddsrt_mutex_init (&gv->leaseheap_lock);
@ -92,6 +89,20 @@ struct lease *lease_new (nn_etime_t texpire, dds_duration_t tdur, struct entity_
return l;
}
/**
* Returns a clone of the provided lease. Note that this function does not use
* locking and should therefore only be called from a context where lease 'l'
* cannot be changed by another thread during the function call.
*/
struct lease *lease_clone (const struct lease *l)
{
nn_etime_t texp;
dds_duration_t tdur;
texp.v = (int64_t) ddsrt_atomic_ld64 (&l->tend);
tdur = l->tdur;
return lease_new (texp, tdur, l->entity);
}
void lease_register (struct lease *l) /* FIXME: make lease admin struct */
{
struct q_globals * const gv = l->entity->gv;
@ -116,7 +127,10 @@ void lease_free (struct lease *l)
GVTRACE ("lease_free(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid));
ddsrt_mutex_lock (&gv->leaseheap_lock);
if (l->tsched.v != TSCHED_NOT_ON_HEAP)
{
ddsrt_fibheap_delete (&lease_fhdef, &gv->leaseheap, l);
l->tsched.v = TSCHED_NOT_ON_HEAP;
}
ddsrt_mutex_unlock (&gv->leaseheap_lock);
ddsrt_free (l);
@ -126,7 +140,6 @@ void lease_free (struct lease *l)
void lease_renew (struct lease *l, nn_etime_t tnowE)
{
struct q_globals const * const gv = l->entity->gv;
nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur);
/* do not touch tend if moving forward or if already expired */
@ -137,6 +150,11 @@ void lease_renew (struct lease *l, nn_etime_t tnowE)
return;
} while (!ddsrt_atomic_cas64 (&l->tend, (uint64_t) tend, (uint64_t) tend_new.v));
/* Only at this point we can assume that gv can be recovered from the entity in the
* lease (i.e. the entity still exists). In cases where dereferencing l->entity->gv
* is not safe (e.g. the deletion of entities), the early out in the loop above
* will be the case because tend is set to T_NEVER. */
struct q_globals const * gv = l->entity->gv;
if (gv->logconfig.c.mask & DDS_LC_TRACE)
{
int32_t tsec, tusec;
@ -253,26 +271,19 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
switch (k)
{
case EK_PARTICIPANT:
delete_participant (gv, &g);
break;
case EK_PROXY_PARTICIPANT:
delete_proxy_participant_by_guid (gv, &g, now(), 1);
break;
case EK_WRITER:
delete_writer_nolinger (gv, &g);
break;
case EK_PROXY_WRITER:
(void) delete_proxy_writer (gv, &g, now(), 1);
proxy_writer_set_notalive_guid (gv, &g, true);
break;
case EK_PARTICIPANT:
case EK_READER:
delete_reader (gv, &g);
break;
case EK_WRITER:
case EK_PROXY_READER:
(void) delete_proxy_reader (gv, &g, now(), 1);
assert (false);
break;
}
ddsrt_mutex_lock (&gv->leaseheap_lock);
}
@ -281,83 +292,3 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
return delay;
}
/******/
static void debug_print_rawdata (const struct q_globals *gv, const char *msg, const void *data, size_t len)
{
const unsigned char *c = data;
size_t i;
GVTRACE ("%s<", msg);
for (i = 0; i < len; i++)
{
if (32 < c[i] && c[i] <= 127)
GVTRACE ("%s%c", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
else
GVTRACE ("%s\\x%02x", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
}
GVTRACE (">");
}
void handle_PMD (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len)
{
const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */
const int bswap = (data->identifier == CDR_LE) ^ (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN);
struct proxy_participant *pp;
ddsi_guid_t ppguid;
RSTTRACE (" PMD ST%x", statusinfo);
if (data->identifier != CDR_LE && data->identifier != CDR_BE)
{
RSTTRACE (" PMD data->identifier %u !?\n", ntohs (data->identifier));
return;
}
switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER))
{
case 0:
if (offsetof (ParticipantMessageData_t, value) > len - sizeof (struct CDRHeader))
debug_print_rawdata (rst->gv, " SHORT1", data, len);
else
{
const ParticipantMessageData_t *pmd = (ParticipantMessageData_t *) (data + 1);
ddsi_guid_prefix_t p = nn_ntoh_guid_prefix (pmd->participantGuidPrefix);
uint32_t kind = ntohl (pmd->kind);
uint32_t length = bswap ? ddsrt_bswap4u (pmd->length) : pmd->length;
RSTTRACE (" pp %"PRIx32":%"PRIx32":%"PRIx32" kind %u data %u", p.u[0], p.u[1], p.u[2], kind, length);
if (len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value) < length)
debug_print_rawdata (rst->gv, " SHORT2", pmd->value, len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value));
else
debug_print_rawdata (rst->gv, "", pmd->value, length);
ppguid.prefix = p;
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if ((pp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL)
RSTTRACE (" PPunknown");
else
{
/* Renew lease if arrival of this message didn't already do so, also renew the lease
of the virtual participant used for DS-discovered endpoints */
#if 0 // FIXME: superfluous ... receipt of the message already did it */
lease_renew (ddsrt_atomic_ldvoidp (&pp->lease), now_et ());
#endif
}
}
break;
case NN_STATUSINFO_DISPOSE:
case NN_STATUSINFO_UNREGISTER:
case NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER:
/* Serialized key; BE or LE doesn't matter as both fields are
defined as octets. */
if (len < sizeof (struct CDRHeader) + sizeof (ddsi_guid_prefix_t))
debug_print_rawdata (rst->gv, " SHORT3", data, len);
else
{
ppguid.prefix = nn_ntoh_guid_prefix (*((ddsi_guid_prefix_t *) (data + 1)));
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if (delete_proxy_participant_by_guid (rst->gv, &ppguid, timestamp, 0) < 0)
RSTTRACE (" unknown");
else
RSTTRACE (" delete");
}
break;
}
RSTTRACE ("\n");
}

View file

@ -548,7 +548,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *
}
/* Send a Heartbeat just to this peer */
add_Heartbeat (m, wr, whcst, hbansreq, prd->e.guid.entityid, 0);
add_Heartbeat (m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0);
ETRACE (wr, "force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid));
qxev_msg (wr->evq, m);
@ -668,9 +668,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow);
if (!wr->reliable) /* note: reliability can't be changed */
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer!)", PGUID (src), PGUID (dst));
@ -1133,15 +1131,18 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
RSTTRACE (PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC,
BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
RSTTRACE (PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst));
ddsrt_mutex_lock (&pwr->e.lock);
if (msg->smhdr.flags & HEARTBEAT_FLAG_LIVELINESS &&
pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC &&
pwr->c.xqos->liveliness.lease_duration != T_NEVER)
{
struct lease *lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man);
if (lease != NULL)
lease_renew (lease, tnow);
lease_renew (pwr->lease, tnow);
}
if (pwr->n_reliable_readers == 0)
{
RSTTRACE (PGUIDFMT" -> "PGUIDFMT" no-reliable-readers)", PGUID (src), PGUID (dst));
@ -1271,9 +1272,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst));
ddsrt_mutex_lock (&pwr->e.lock);
@ -1398,9 +1397,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow);
if (!wr->reliable) /* note: reliability can't be changed */
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer)", PGUID (src), PGUID (dst));
@ -1645,9 +1642,7 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
ddsrt_mutex_lock (&pwr->e.lock);
if ((wn = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst)) == NULL)
{
@ -2098,11 +2093,13 @@ static void clean_defrag (struct proxy_writer *pwr)
nn_defrag_notegap (pwr->defrag, 1, seq);
}
static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup)
static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo,
uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease)
{
struct proxy_writer *pwr;
struct nn_rsample *rsample;
ddsi_guid_t dst;
struct lease *lease;
dst.prefix = rst->dst_guid_prefix;
dst.entityid = msg->readerId;
@ -2117,14 +2114,27 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
return;
}
/* liveliness is still only implemented partially (with all set to
AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy
participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
/* Proxy participant's "automatic" lease has to be renewed always, manual-by-participant one only
for data published by the application. If pwr->lease exists, it is in some manual lease mode,
so check whether it is actually in manual-by-topic mode before renewing it. As pwr->lease is
set once (during entity creation) we can read it outside the lock, keeping all the lease
renewals together. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL && renew_manbypp_lease)
lease_renew (lease, tnow);
if (pwr->lease && pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
lease_renew (pwr->lease, tnow);
/* Shouldn't lock the full writer, but will do so for now */
ddsrt_mutex_lock (&pwr->e.lock);
/* A change in transition from not-alive to alive is relatively complicated
and may involve temporarily unlocking the proxy writer during the process
(to avoid unnecessarily holding pwr->e.lock while invoking listeners on
the reader) */
if (!pwr->alive)
proxy_writer_set_alive_may_unlock (pwr, true);
/* Don't accept data when reliable readers exist and we haven't yet seen
a heartbeat telling us what the "current" sequence number of the writer
is. If no reliable readers are present, we can't request a heartbeat and
@ -2194,7 +2204,10 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
/* Enqueue or deliver with pwr->e.lock held: to ensure no other
receive thread's data gets interleaved -- arguably delivery
needn't be exactly in-order, which would allow us to do this
without pwr->e.lock held. */
without pwr->e.lock held.
Note that PMD is also handled here, but the pwr for PMD does not
use no synchronous delivery, so deliver_user_data_synchronously
(which asserts pwr is not built-in) is not used for PMD handling. */
if (pwr->deliver_synchronously)
{
/* FIXME: just in case the synchronous delivery runs into a delay caused
@ -2367,24 +2380,32 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r
unsigned submsg_offset, payload_offset;
submsg_offset = (unsigned) ((unsigned char *) msg - NN_RMSG_PAYLOAD (rmsg));
if (datap)
{
payload_offset = (unsigned) ((unsigned char *) datap - NN_RMSG_PAYLOAD (rmsg));
}
else
{
payload_offset = submsg_offset + (unsigned) size;
}
rdata = nn_rdata_new (rmsg, 0, sampleinfo->size, submsg_offset, payload_offset);
if (msg->x.writerId.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
/* SPDP needs special treatment: there are no proxy writers for it
and we accept data from unknown sources */
if ((msg->x.writerId.u & NN_ENTITYID_SOURCE_MASK) == NN_ENTITYID_SOURCE_BUILTIN)
{
handle_SPDP (sampleinfo, rdata);
bool renew_manbypp_lease = true;
switch (msg->x.writerId.u)
{
case NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER:
/* SPDP needs special treatment: there are no proxy writers for it and we accept data from unknown sources */
handle_SPDP (sampleinfo, rdata);
break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
/* Handle PMD as a regular message, but without renewing the leases on proxypp */
renew_manbypp_lease = false;
/* fall through */
default:
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, renew_manbypp_lease);
}
}
else
{
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup);
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, true);
}
}
RSTTRACE (")");
@ -2411,12 +2432,19 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct
struct nn_rdata *rdata;
unsigned submsg_offset, payload_offset;
uint32_t begin, endp1;
if (msg->x.writerId.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
bool renew_manbypp_lease = true;
if ((msg->x.writerId.u & NN_ENTITYID_SOURCE_MASK) == NN_ENTITYID_SOURCE_BUILTIN)
{
DDS_CWARNING (&rst->gv->logconfig, "DATAFRAG("PGUIDFMT" #%"PRId64" -> "PGUIDFMT") - fragmented builtin data not yet supported\n",
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, fromSN (msg->x.writerSN),
PGUIDPREFIX (rst->dst_guid_prefix), msg->x.readerId.u);
return 1;
switch (msg->x.writerId.u)
{
case NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER:
DDS_CWARNING (&rst->gv->logconfig, "DATAFRAG("PGUIDFMT" #%"PRId64" -> "PGUIDFMT") - fragmented builtin data not yet supported\n",
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, fromSN (msg->x.writerSN),
PGUIDPREFIX (rst->dst_guid_prefix), msg->x.readerId.u);
return 1;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
renew_manbypp_lease = false;
}
}
submsg_offset = (unsigned) ((unsigned char *) msg - NN_RMSG_PAYLOAD (rmsg));
@ -2457,7 +2485,7 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct
wrong, it'll simply generate a request for retransmitting a
non-existent fragment. The other side SHOULD be capable of
dealing with that. */
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata, deferred_wakeup);
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata, deferred_wakeup, renew_manbypp_lease);
}
RSTTRACE (")");
return 1;

View file

@ -194,7 +194,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
add_Heartbeat (msg, wr, whcst, hbansreq, to_entityid (NN_ENTITYID_UNKNOWN), issync);
add_Heartbeat (msg, wr, whcst, hbansreq, 0, to_entityid (NN_ENTITYID_UNKNOWN), issync);
}
else
{
@ -215,7 +215,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
add_Heartbeat (msg, wr, whcst, hbansreq, prd_guid->entityid, issync);
add_Heartbeat (msg, wr, whcst, hbansreq, 0, prd_guid->entityid, issync);
}
writer_hbcontrol_note_hb (wr, tnow, hbansreq);
@ -313,7 +313,7 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_
return msg;
}
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync)
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync)
{
struct q_globals const * const gv = wr->e.gv;
struct nn_xmsg_marker sm_marker;
@ -324,6 +324,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
assert (wr->reliable);
assert (hbansreq >= 0);
assert (hbliveliness >= 0);
if (gv->config.meas_hb_to_ack_latency)
{
@ -337,6 +338,8 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
if (!hbansreq)
hb->smhdr.flags |= HEARTBEAT_FLAG_FINAL;
if (hbliveliness)
hb->smhdr.flags |= HEARTBEAT_FLAG_LIVELINESS;
hb->readerId = nn_hton_entityid (dst);
hb->writerId = nn_hton_entityid (wr->e.guid.entityid);
@ -666,6 +669,34 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn
nn_xmsg_submsg_setnext (*pmsg, sm_marker);
}
dds_return_t write_hb_liveliness (struct q_globals * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp)
{
struct nn_xmsg *msg = NULL;
struct whc_state whcst;
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, gv);
struct writer *wr = ephash_lookup_writer_guid (gv->guid_hash, wr_guid);
if (wr == NULL)
{
GVTRACE ("write_hb_liveliness("PGUIDFMT") - writer not found\n", PGUID (*wr_guid));
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL)
return DDS_RETCODE_OUT_OF_RESOURCES;
ddsrt_mutex_lock (&wr->e.lock);
nn_xmsg_setdstN (msg, wr->as, wr->as_group);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
whc_get_state (wr->whc, &whcst);
add_Heartbeat (msg, wr, &whcst, 0, 1, to_entityid (NN_ENTITYID_UNKNOWN), 1);
ddsrt_mutex_unlock (&wr->e.lock);
nn_xpack_addmsg (xp, msg, 0);
nn_xpack_send (xp, true);
thread_state_asleep (ts1);
return DDS_RETCODE_OK;
}
#if 0
static int must_skip_frag (const char *frags_to_skip, unsigned frag)
{

View file

@ -37,9 +37,11 @@
#include "dds/ddsi/q_bitset.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_pmd.h"
#include "dds__whc.h"
#include "dds/ddsi/sysdeps.h"
@ -1090,44 +1092,6 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
}
}
static void write_pmd_message (struct thread_state1 * const ts1, struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind)
{
#define PMD_DATA_LENGTH 1
struct q_globals * const gv = pp->e.gv;
struct writer *wr;
union {
ParticipantMessageData_t pmd;
char pad[offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH];
} u;
struct ddsi_serdata *serdata;
struct ddsi_tkmap_instance *tk;
if ((wr = get_builtin_writer (pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER)) == NULL)
{
GVTRACE ("write_pmd_message("PGUIDFMT") - builtin pmd writer not found\n", PGUID (pp->e.guid));
return;
}
u.pmd.participantGuidPrefix = nn_hton_guid_prefix (pp->e.guid.prefix);
u.pmd.kind = ddsrt_toBE4u (pmd_kind);
u.pmd.length = PMD_DATA_LENGTH;
memset (u.pmd.value, 0, u.pmd.length);
struct ddsi_rawcdr_sample raw = {
.blob = &u,
.size = offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH,
.key = &u.pmd,
.keysize = 16
};
serdata = ddsi_serdata_from_sample (gv->rawcdr_topic, SDK_DATA, &raw);
serdata->timestamp = now ();
tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, serdata);
write_sample_nogc (ts1, xp, wr, serdata, tk);
ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
#undef PMD_DATA_LENGTH
}
static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow)
{
struct q_globals * const gv = ev->evq->gv;
@ -1142,16 +1106,7 @@ static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_
write_pmd_message (ts1, xp, pp, PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
/* QoS changes can't change lease durations. So the only thing that
could cause trouble here is that the addition or removal of a
writer cause the interval to change for this participant. If we
lock pp for reading out the lease duration we are guaranteed a
consistent value (can't assume 64-bit atomic reads on all support
platforms!) */
ddsrt_mutex_lock (&pp->e.lock);
intv = pp->lease_duration;
/* FIXME: need to use smallest liveliness duration of all automatic-liveliness writers */
intv = pp_get_pmd_interval (pp);
if (intv == T_NEVER)
{
tnext.v = T_NEVER;
@ -1169,7 +1124,6 @@ static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_
}
(void) resched_xevent_if_earlier (ev, tnext);
ddsrt_mutex_unlock (&pp->e.lock);
}
static void handle_xevk_delete_writer (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, UNUSED_ARG (nn_mtime_t tnow))

View file

@ -59,6 +59,9 @@ typedef int64_t dds_duration_t;
/** @name Infinite timeout for relative time */
#define DDS_INFINITY ((dds_duration_t) INT64_MAX)
/** @name Invalid duration value */
#define DDS_DURATION_INVALID ((dds_duration_t) INT64_MIN)
/** @name Macro definition for time conversion to nanoseconds
@{**/
#define DDS_SECS(n) ((n) * DDS_NSECS_IN_SEC)