Liveliness QoS implementation for auto and manual-by-participant

This commit adds support for the liveliness QoS for the liveliness
kinds automatic and manual-by-participant. It also implements the
lease_duration from this QoS, which was ignored until now. In the
api the function dds_assert_liveliness is added to assert liveliness
on a participant, which can be used when using liveliness kind
manual-by-participant.

Liveliness kind manual-by-topic is not yet supported, this will be
added in a later commit.

* Proxy participants now have 2 fibheaps to keep leases: one for leases
of pwrs with automatic liveliness and one for leases of the pwrs with
manual-by-participant liveliness (both protected by the proxypp lock).
The minl_auto and minl_man members represent the shortest lease from
these fibheaps and these leases are renewed when receiving data.
Replacing the minl_ leases is now done by replacing the lease object
(atomic ptr) with delayed deletion of the old lease using the gc.

* Proxy writers are set not-alive when the lease expired, and reset to
alive then data is received. When data is received by a pwr, the other
pwrs in the proxypp might also be set alive. I think the specification
is not clear at this point, and for now I have not implemented this

* I refactored out the counter for man-by-pp proxy writers and improved
locking when updating the min-leases on the proxy participant, so I
think this fixes the race conditions.

Some additional tests are required, e.g. to test the not-alive->alive
transition for pwrs. I will add these in short term, as well as the
implementation of the manual-by-topic liveliness kind.

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
Dennis Potman 2019-10-09 10:28:19 +02:00 committed by Erik Boasson
parent 482e1cd006
commit 3822f42eff
16 changed files with 1116 additions and 259 deletions

View file

@ -3260,6 +3260,29 @@ dds_get_matched_publication_data (
dds_entity_t reader,
dds_instance_handle_t ih);
/**
* @brief This operation manually asserts the liveliness of a writer
* or domain participant.
*
* This operation manually asserts the liveliness of a writer
* or domain participant. This is used in combination with the Liveliness
* QoS policy to indicate that the entity remains active. This operation need
* only be used if the liveliness kind in the QoS is either
* DDS_LIVELINESS_MANUAL_BY_PARTICIPANT or DDS_LIVELINESS_MANUAL_BY_TOPIC.
*
* @param[in] entity A domain participant or writer
*
* @returns A dds_return_t indicating success or failure.
*
* @retval DDS_RETCODE_OK
* The operation was successful.
* @retval DDS_RETCODE_ILLEGAL_OPERATION
* The operation is invoked on an inappropriate object.
*/
DDS_EXPORT dds_return_t
dds_assert_liveliness (
dds_entity_t entity);
#if defined (__cplusplus)
}
#endif

View file

@ -22,6 +22,7 @@
#include "dds__qos.h"
#include "dds__topic.h"
#include "dds/version.h"
#include "dds/ddsi/ddsi_pmd.h"
#include "dds/ddsi/q_xqos.h"
extern inline dds_entity *dds_entity_from_handle_link (struct dds_handle_link *hdllink);
@ -1383,3 +1384,29 @@ dds_return_t dds_generic_unimplemented_operation (dds_entity_t handle, dds_entit
return dds_generic_unimplemented_operation_manykinds (handle, 1, &kind);
}
dds_return_t dds_assert_liveliness (dds_entity_t entity)
{
dds_return_t rc;
dds_entity *e;
if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
return rc;
switch (dds_entity_kind (e))
{
case DDS_KIND_PARTICIPANT: {
write_pmd_message_guid (&e->m_domain->gv, &e->m_guid, PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE);
break;
}
case DDS_KIND_WRITER: {
/* FIXME: implement liveliness manual-by-topic */
rc = DDS_RETCODE_UNSUPPORTED;
break;
}
default: {
rc = DDS_RETCODE_ILLEGAL_OPERATION;
break;
}
}
dds_entity_unlock (e);
return rc;
}

View file

@ -28,6 +28,7 @@ set(ddsc_test_sources
"err.c"
"instance_get_key.c"
"listener.c"
"liveliness.c"
"participant.c"
"publisher.c"
"qos.c"
@ -55,7 +56,9 @@ set(ddsc_test_sources
add_cunit_executable(cunit_ddsc ${ddsc_test_sources})
target_include_directories(
cunit_ddsc PRIVATE
"$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/src/include/>")
"$<BUILD_INTERFACE:${CMAKE_BINARY_DIR}/src/include/>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsc/src>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsi/include>")
target_link_libraries(cunit_ddsc PRIVATE RoundTrip Space TypesArrayKey ddsc)
# Setup environment for config-tests

View file

@ -0,0 +1,470 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "Space.h"
#include "config_env.h"
#include "dds/version.h"
#include "dds__entity.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsrt/cdtors.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/environ.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/time.h"
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<General><NetworkInterfaceAddress>127.0.0.1</NetworkInterfaceAddress></General><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery>"
#define DDS_CONFIG_NO_PORT_GAIN_LOG_PUB "<"DDS_PROJECT_NAME"><Domain><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery><Tracing><OutputFile>cyclonedds_liveliness_pub.log</OutputFile><Verbosity>finest</Verbosity></Tracing></Domain></"DDS_PROJECT_NAME">"
#define DDS_CONFIG_NO_PORT_GAIN_LOG_SUB "<"DDS_PROJECT_NAME"><Domain><Discovery><Ports><DomainGain>0</DomainGain></Ports></Discovery><Tracing><OutputFile>cyclonedds_liveliness_sub.log</OutputFile><Verbosity>finest</Verbosity></Tracing></Domain></"DDS_PROJECT_NAME">"
uint32_t g_topic_nr = 0;
static dds_entity_t g_pub_domain = 0;
static dds_entity_t g_pub_participant = 0;
static dds_entity_t g_pub_publisher = 0;
static dds_entity_t g_sub_domain = 0;
static dds_entity_t g_sub_participant = 0;
static dds_entity_t g_sub_subscriber = 0;
static char *create_topic_name(const char *prefix, uint32_t nr, char *name, size_t size)
{
/* Get semi random g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid();
(void)snprintf(name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid);
return name;
}
static void liveliness_init(void)
{
char *conf = ddsrt_expand_envvars(DDS_CONFIG_NO_PORT_GAIN, UINT32_MAX);
g_pub_domain = dds_create_domain(DDS_DOMAINID_PUB, conf);
g_sub_domain = dds_create_domain(DDS_DOMAINID_SUB, conf);
dds_free(conf);
g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL(g_pub_participant > 0);
g_sub_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL);
CU_ASSERT_FATAL(g_sub_participant > 0);
g_pub_publisher = dds_create_publisher(g_pub_participant, NULL, NULL);
CU_ASSERT_FATAL(g_pub_publisher > 0);
g_sub_subscriber = dds_create_subscriber(g_sub_participant, NULL, NULL);
CU_ASSERT_FATAL(g_sub_subscriber > 0);
}
static void liveliness_fini(void)
{
dds_delete(g_sub_subscriber);
dds_delete(g_pub_publisher);
dds_delete(g_sub_participant);
dds_delete(g_pub_participant);
dds_delete(g_sub_domain);
dds_delete(g_pub_domain);
}
static seqno_t get_pmd_seqno(dds_entity_t participant)
{
seqno_t seqno;
struct dds_entity *pp_entity;
struct participant *pp;
struct writer *wr;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
wr = get_builtin_writer(pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
CU_ASSERT_FATAL(wr != NULL);
seqno = wr->seq;
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(pp_entity);
return seqno;
}
static dds_duration_t get_pmd_interval(dds_entity_t participant)
{
dds_duration_t intv;
struct dds_entity *pp_entity;
struct participant *pp;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
intv = pp_get_pmd_interval(pp);
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(pp_entity);
return intv;
}
static dds_duration_t get_ldur_config(dds_entity_t participant)
{
struct dds_entity *pp_entity;
dds_duration_t ldur;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
ldur = (dds_duration_t)pp_entity->m_domain->gv.config.lease_duration;
dds_entity_unpin(pp_entity);
return ldur;
}
#define A DDS_LIVELINESS_AUTOMATIC
#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT
CU_TheoryDataPoints(ddsc_liveliness, pmd_count) = {
CU_DataPoints(dds_liveliness_kind_t, A, A, MP), /* liveliness kind */
CU_DataPoints(uint32_t, 200, 200, 200), /* lease duration */
CU_DataPoints(double, 5, 10, 5), /* delay (n times lease duration) */
};
#undef A
#undef MP
CU_Theory((dds_liveliness_kind_t kind, uint32_t ldur, double mult), ddsc_liveliness, pmd_count, .init = liveliness_init, .fini = liveliness_fini, .timeout = 10)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writer;
seqno_t start_seqno, end_seqno;
dds_qos_t *rqos;
dds_qos_t *wqos;
dds_entity_t waitset;
dds_attach_t triggered;
uint32_t status;
char name[100];
dds_time_t t;
t = dds_time();
printf("%d.%06d running test: kind %s, lease duration %d, delay %f\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000,
kind == 0 ? "A" : "MP", ldur, mult);
/* topics */
create_topic_name("ddsc_liveliness_test", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* waitset on reader */
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
/* writer */
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, kind, DDS_MSECS(ldur));
CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
dds_delete_qos(wqos);
/* wait for writer to be alive */
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(1)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check no of PMD messages sent */
start_seqno = get_pmd_seqno(g_pub_participant);
dds_sleepfor(DDS_MSECS((dds_duration_t)(mult * ldur)));
end_seqno = get_pmd_seqno(g_pub_participant);
t = dds_time();
printf("%d.%06d PMD sequence no: start %" PRId64 " -> end %" PRId64 "\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000,
start_seqno, end_seqno);
CU_ASSERT(end_seqno - start_seqno >= (kind == DDS_LIVELINESS_AUTOMATIC ? mult - 1 : 0))
if (kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT)
CU_ASSERT(get_pmd_seqno(g_pub_participant) - start_seqno < mult)
/* cleanup */
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}
/* FIXME: add DDS_LIVELINESS_MANUAL_BY_TOPIC */
#define A DDS_LIVELINESS_AUTOMATIC
#define MP DDS_LIVELINESS_MANUAL_BY_PARTICIPANT
CU_TheoryDataPoints(ddsc_liveliness, expire_liveliness_kinds) = {
CU_DataPoints(uint32_t, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 200), /* lease duration */
CU_DataPoints(double, 0.3, 0.3, 0.3, 0.3, 2, 2, 2, 2, 2, 2, 2, 2, 2), /* delay (n times lease duration) */
CU_DataPoints(size_t, 1, 0, 2, 0, 1, 0, 1, 2, 0, 5, 0, 15, 15), /* number of writers with automatic liveliness */
CU_DataPoints(size_t, 1, 1, 2, 2, 1, 1, 0, 2, 2, 5, 10, 0, 15), /* number of writers with manual-by-participant liveliness */
};
#undef A
#undef MP
CU_Theory((uint32_t ldur, double mult, size_t wr_cnt_auto, size_t wr_cnt_man_pp), ddsc_liveliness, expire_liveliness_kinds, .init = liveliness_init, .fini = liveliness_fini, .timeout = 60)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t *writers;
dds_qos_t *rqos, *wqos_auto, *wqos_man_pp;
dds_entity_t waitset;
dds_attach_t triggered;
struct dds_liveliness_changed_status lstatus;
uint32_t status;
size_t n, run = 1;
char name[100];
size_t wr_cnt = wr_cnt_auto + wr_cnt_man_pp;
dds_time_t tstart, t;
bool test_finished = false;
do
{
tstart = dds_time();
printf("%d.%06d running test: lease duration %d, delay %f, auto/manual-by-participant %zu/%zu\n",
(int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000,
ldur, mult, wr_cnt_auto, wr_cnt_man_pp);
/* topics */
create_topic_name("ddsc_liveliness_test", g_topic_nr++, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* writers */
CU_ASSERT_FATAL((wqos_auto = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_auto, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((wqos_man_pp = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos_man_pp, DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(ldur));
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
writers = dds_alloc(wr_cnt * sizeof(dds_entity_t));
for (n = 0; n < wr_cnt; n++)
{
CU_ASSERT_FATAL((writers[n] = dds_create_writer(g_pub_participant, pub_topic, n < wr_cnt_auto ? wqos_auto : wqos_man_pp, NULL)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_SECS(5)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
}
dds_delete_qos(wqos_auto);
dds_delete_qos(wqos_man_pp);
t = dds_time();
if (t - tstart > DDS_MSECS(0.5 * ldur))
{
ldur *= 10;
printf("%d.%06d failed to create writers in time\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
}
else
{
/* check alive count before proxy writers are expired */
dds_get_liveliness_changed_status(reader, &lstatus);
printf("%d.%06d writers alive: %d\n", (int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, lstatus.alive_count);
CU_ASSERT_EQUAL_FATAL(lstatus.alive_count, wr_cnt);
dds_time_t tstop = tstart + DDS_MSECS((dds_duration_t)(mult * ldur));
size_t stopped = 0;
do
{
dds_duration_t w = tstop - dds_time();
CU_ASSERT_FATAL((dds_waitset_wait(waitset, &triggered, 1, w > 0 ? w : 0)) >= 0);
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
stopped += (uint32_t)lstatus.not_alive_count_change;
} while (dds_time() < tstop);
t = dds_time();
printf("%d.%06d writers stopped: %zu\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, stopped);
size_t exp_stopped = mult < 1 ? 0 : wr_cnt_man_pp;
if (stopped != exp_stopped)
{
ldur *= 10;
printf("%d.%06d incorrect number of stopped writers\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000);
}
else
{
/* check alive count */
CU_ASSERT_EQUAL_FATAL(dds_get_liveliness_changed_status(reader, &lstatus), DDS_RETCODE_OK);
CU_ASSERT_EQUAL(lstatus.alive_count, mult < 1 ? wr_cnt : wr_cnt_auto);
test_finished = true;
}
}
/* cleanup */
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
for (n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
dds_free(writers);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
if (!test_finished)
{
if (run++ > 2)
{
printf("%d.%06d run limit reached, test failed\n", (int32_t)(tstart / DDS_NSECS_IN_SEC), (int32_t)(tstart % DDS_NSECS_IN_SEC) / 1000);
CU_FAIL_FATAL("Run limit reached");
test_finished = true;
continue;
}
else
{
printf("%d.%06d restarting test with ldur %d\n",
(int32_t)(t / DDS_NSECS_IN_SEC), (int32_t)(t % DDS_NSECS_IN_SEC) / 1000, ldur);
}
}
} while (!test_finished);
}
static void add_and_check_writer(dds_liveliness_kind_t kind, dds_duration_t ldur, dds_entity_t *writer, dds_entity_t topic, dds_entity_t reader)
{
dds_entity_t waitset;
dds_qos_t *wqos;
dds_attach_t triggered;
uint32_t status;
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, kind, ldur);
CU_ASSERT_FATAL((*writer = dds_create_writer(g_pub_participant, topic, wqos, NULL)) > 0);
dds_delete_qos(wqos);
/* wait for writer to be alive */
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
}
#define MAX_WRITERS 10
CU_Test(ddsc_liveliness, lease_duration, .init = liveliness_init, .fini = liveliness_fini)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writers[MAX_WRITERS];
size_t wr_cnt = 0;
char name[100];
dds_qos_t *rqos;
uint32_t n;
/* topics */
create_topic_name("ddsc_liveliness_ldur", 1, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader and waitset */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check if pmd defaults to configured duration */
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), get_ldur_config(g_pub_participant));
/* create writers and check pmd interval in publishing participant */
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(1000), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(2000), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(1000));
add_and_check_writer(DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(500), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
add_and_check_writer(DDS_LIVELINESS_MANUAL_BY_PARTICIPANT, DDS_MSECS(100), &writers[wr_cnt++], pub_topic, reader);
CU_ASSERT_EQUAL_FATAL(get_pmd_interval(g_pub_participant), DDS_MSECS(500));
/* cleanup */
for (n = 0; n < wr_cnt; n++)
CU_ASSERT_EQUAL_FATAL(dds_delete(writers[n]), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}
#undef MAX_WRITERS
CU_Test(ddsc_liveliness, lease_duration_pwr, .init = liveliness_init, .fini = liveliness_fini)
{
dds_entity_t pub_topic;
dds_entity_t sub_topic;
dds_entity_t reader;
dds_entity_t writer;
char name[100];
dds_qos_t *rqos, *wqos;
dds_entity_t waitset;
dds_attach_t triggered;
uint32_t status;
dds_duration_t ldur;
/* topics */
create_topic_name("ddsc_liveliness_ldurpwr", 1, name, sizeof name);
CU_ASSERT_FATAL((pub_topic = dds_create_topic(g_pub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
CU_ASSERT_FATAL((sub_topic = dds_create_topic(g_sub_participant, &Space_Type1_desc, name, NULL, NULL)) > 0);
/* reader */
CU_ASSERT_FATAL((rqos = dds_create_qos()) != NULL);
dds_qset_liveliness(rqos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
CU_ASSERT_FATAL((reader = dds_create_reader(g_sub_participant, sub_topic, rqos, NULL)) > 0);
dds_delete_qos(rqos);
CU_ASSERT_EQUAL_FATAL(dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* writer */
ldur = 1000;
CU_ASSERT_FATAL((wqos = dds_create_qos()) != NULL);
dds_qset_liveliness(wqos, DDS_LIVELINESS_AUTOMATIC, DDS_MSECS(ldur));
CU_ASSERT_FATAL((writer = dds_create_writer(g_pub_participant, pub_topic, wqos, NULL)) > 0);
/* wait for writer to be alive */
CU_ASSERT_FATAL((waitset = dds_create_waitset(g_sub_participant)) > 0);
CU_ASSERT_EQUAL_FATAL(dds_waitset_attach(waitset, reader, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_waitset_wait(waitset, &triggered, 1, DDS_MSECS(1000)), 1);
CU_ASSERT_EQUAL_FATAL(dds_take_status(reader, &status, DDS_LIVELINESS_CHANGED_STATUS), DDS_RETCODE_OK);
/* check pwr lease duration in matched publication */
dds_instance_handle_t wrs[1];
CU_ASSERT_EQUAL_FATAL(dds_get_matched_publications(reader, wrs, 1), 1);
dds_builtintopic_endpoint_t *ep;
ep = dds_get_matched_publication_data(reader, wrs[0]);
CU_ASSERT_FATAL(ep != NULL);
CU_ASSERT_EQUAL_FATAL(ep->qos->liveliness.lease_duration, DDS_MSECS(ldur));
dds_delete_qos(ep->qos);
dds_free(ep->topic_name);
dds_free(ep->type_name);
dds_free(ep);
/* cleanup */
dds_delete_qos(wqos);
CU_ASSERT_EQUAL_FATAL(dds_waitset_detach(waitset, reader), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(waitset), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(writer), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(sub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(pub_topic), DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(dds_delete(reader), DDS_RETCODE_OK);
}

View file

@ -28,6 +28,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_vendor.c
ddsi_threadmon.c
ddsi_rhc.c
ddsi_pmd.c
q_addrset.c
q_bitset_inlines.c
q_bswap.c

View file

@ -0,0 +1,35 @@
/*
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_PMD_H
#define DDSI_PMD_H
#include "dds/ddsi/q_time.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct q_globals;
struct thread_state1;
struct ddsi_guid;
struct nn_xpack;
struct participant;
struct receiver_state;
void write_pmd_message_guid (struct q_globals * const gv, struct ddsi_guid *pp_guid, unsigned pmd_kind);
void write_pmd_message (struct thread_state1 * const ts1, struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind);
void handle_pmd_message (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len);
#if defined (__cplusplus)
}
#endif
#endif /* DDSI_PMD_H */

View file

@ -12,8 +12,10 @@
#ifndef Q_ENTITY_H
#define Q_ENTITY_H
#include "dds/export.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/sync.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_protocol.h"
@ -184,6 +186,7 @@ struct participant
int32_t user_refc; /* number of non-built-in endpoints in this participant [refc_lock] */
int32_t builtin_refc; /* number of built-in endpoints in this participant [refc_lock] */
int builtins_deleted; /* whether deletion of built-in endpoints has been initiated [refc_lock] */
ddsrt_fibheap_t ldur_auto_wr; /* Heap that contains lease duration for writers with automatic liveliness in this participant */
};
struct endpoint_common {
@ -205,6 +208,11 @@ enum writer_state {
typedef ddsrt_atomic_uint64_t seq_xmit_t;
struct ldur_fhnode {
ddsrt_fibheap_node_t heapnode;
dds_duration_t ldur;
};
struct writer
{
struct entity_common e;
@ -234,7 +242,7 @@ struct writer
struct addrset *as; /* set of addresses to publish to */
struct addrset *as_group; /* alternate case, used for SPDP, when using Cloud with multiple bootstrap locators */
struct xevent *heartbeat_xevent; /* timed event for "periodically" publishing heartbeats when unack'd data present, NULL <=> unreliable */
dds_duration_t lease_duration;
struct ldur_fhnode *lease_duration; /* fibheap node to keep lease duration for this writer, NULL in case of automatic liveliness with inifite duration */
struct whc *whc; /* WHC tracking history, T-L durability service history + samples by sequence number for retransmit */
uint32_t whc_low, whc_high; /* watermarks for WHC in bytes (counting only unack'd data) */
nn_etime_t t_rexmit_end; /* time of last 1->0 transition of "retransmitting" */
@ -300,7 +308,11 @@ struct proxy_participant
unsigned prismtech_bes; /* prismtech-specific extension of built-in endpoints set */
ddsi_guid_t privileged_pp_guid; /* if this PP depends on another PP for its SEDP writing */
struct nn_plist *plist; /* settings/QoS for this participant */
ddsrt_atomic_voidp_t lease; /* lease object for this participant, for automatic leases */
ddsrt_atomic_voidp_t minl_auto; /* lease object for shortest automatic liveliness pwr's lease (includes this proxypp's lease) */
ddsrt_fibheap_t leaseheap_auto; /* keeps leases for this proxypp and leases for pwrs (with liveliness automatic) */
ddsrt_atomic_voidp_t minl_man; /* lease object for shortest manual-by-participant liveliness pwr's lease */
ddsrt_fibheap_t leaseheap_man; /* keeps leases for this proxypp and leases for pwrs (with liveliness manual-by-participant) */
struct lease *lease; /* lease for this proxypp */
struct addrset *as_default; /* default address set to use for user data traffic */
struct addrset *as_meta; /* default address set to use for discovery traffic */
struct proxy_endpoint_common *endpoints; /* all proxy endpoints can be reached from here */
@ -357,6 +369,7 @@ struct proxy_writer {
unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */
unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */
unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */
unsigned alive: 1; /* iff 1, the proxy writer is alive (lease for this proxy writer is not expired) */
#ifdef DDSI_INCLUDE_SSM
unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */
#endif
@ -367,6 +380,7 @@ struct proxy_writer {
struct local_reader_ary rdary; /* LOCAL readers for fast-pathing; if not fast-pathed, fall back to scanning local_readers */
ddsi2direct_directread_cb_t ddsi2direct_cb;
void *ddsi2direct_cbarg;
struct lease *lease;
};
struct proxy_reader {
@ -537,10 +551,14 @@ dds_return_t delete_participant (struct q_globals *gv, const struct ddsi_guid *p
void update_participant_plist (struct participant *pp, const struct nn_plist *plist);
uint64_t get_entity_instance_id (const struct q_globals *gv, const struct ddsi_guid *guid);
/* Gets the interval for PMD messages, which is the minimal lease duration for writers
with auto liveliness in this participant, or the participants lease duration if shorter */
DDS_EXPORT dds_duration_t pp_get_pmd_interval(struct participant *pp);
/* To obtain the builtin writer to be used for publishing SPDP, SEDP,
PMD stuff for PP and its endpoints, given the entityid. If PP has
its own writer, use it; else use the privileged participant. */
struct writer *get_builtin_writer (const struct participant *pp, unsigned entityid);
DDS_EXPORT struct writer *get_builtin_writer (const struct participant *pp, unsigned entityid);
/* To create a new DDSI writer or reader belonging to participant with
GUID "ppguid". May return NULL if participant unknown or
@ -612,6 +630,7 @@ void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct
void purge_proxy_participants (struct q_globals *gv, const nn_locator_t *loc, bool delete_from_as_disc);
/* To create a new proxy writer or reader; the proxy participant is
determined from the GUID and must exist. */
int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, const struct ddsi_guid *guid, struct addrset *as, const struct nn_plist *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp, seqno_t seq);
@ -626,12 +645,15 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
reader or writer. Actual deletion is scheduled in the future, when
no outstanding references may still exist (determined by checking
thread progress, &c.). */
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit, bool proxypp_locked);
int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive);
int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *guid, bool alive);
int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);

View file

@ -36,7 +36,7 @@ struct ddsi_guid;
EK_PROXY_READER
};
#define EK_NKINDS ((int) EK_PROXY_READER + 1)
struct ephash_enum
{
struct ddsrt_chh_iter it;
@ -80,15 +80,15 @@ void ephash_remove_reader_guid (struct ephash *eh, struct reader *rd);
void ephash_remove_proxy_writer_guid (struct ephash *eh, struct proxy_writer *pwr);
void ephash_remove_proxy_reader_guid (struct ephash *eh, struct proxy_reader *prd);
void *ephash_lookup_guid_untyped (const struct ephash *eh, const struct ddsi_guid *guid);
void *ephash_lookup_guid (const struct ephash *eh, const struct ddsi_guid *guid, enum entity_kind kind);
DDS_EXPORT void *ephash_lookup_guid_untyped (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT void *ephash_lookup_guid (const struct ephash *eh, const struct ddsi_guid *guid, enum entity_kind kind);
struct participant *ephash_lookup_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct proxy_participant *ephash_lookup_proxy_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct writer *ephash_lookup_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct reader *ephash_lookup_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct proxy_writer *ephash_lookup_proxy_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct participant *ephash_lookup_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_participant *ephash_lookup_proxy_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct writer *ephash_lookup_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct reader *ephash_lookup_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_writer *ephash_lookup_proxy_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
/* Enumeration of entries in the hash table:

View file

@ -12,6 +12,9 @@
#ifndef Q_LEASE_H
#define Q_LEASE_H
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/time.h"
#include "dds/ddsi/q_time.h"
#if defined (__cplusplus)
@ -20,21 +23,30 @@ extern "C" {
struct receiver_state;
struct participant;
struct lease;
struct entity_common;
struct q_globals; /* FIXME: make a special for the lease admin */
struct lease {
ddsrt_fibheap_node_t heapnode;
ddsrt_fibheap_node_t pp_heapnode;
nn_etime_t tsched; /* access guarded by leaseheap_lock */
ddsrt_atomic_uint64_t tend; /* really an nn_etime_t */
dds_duration_t tdur; /* constant (renew depends on it) */
struct entity_common *entity; /* constant */
};
int compare_lease_tsched (const void *va, const void *vb);
int compare_lease_tdur (const void *va, const void *vb);
void lease_management_init (struct q_globals *gv);
void lease_management_term (struct q_globals *gv);
struct lease *lease_new (nn_etime_t texpire, int64_t tdur, struct entity_common *e);
struct lease *lease_clone (const struct lease *l);
void lease_register (struct lease *l);
void lease_free (struct lease *l);
void lease_renew (struct lease *l, nn_etime_t tnow);
void lease_set_expiry (struct lease *l, nn_etime_t when);
int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow);
void handle_PMD (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len);
#if defined (__cplusplus)
}
#endif

View file

@ -0,0 +1,161 @@
/*
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stdlib.h>
#include <string.h>
#include "dds/ddsi/ddsi_pmd.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_log.h"
#include "dds/ddsi/q_misc.h"
#include "dds/ddsi/q_protocol.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_transmit.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/sysdeps.h"
static void debug_print_rawdata (const struct q_globals *gv, const char *msg, const void *data, size_t len)
{
const unsigned char *c = data;
size_t i;
GVTRACE ("%s<", msg);
for (i = 0; i < len; i++)
{
if (32 < c[i] && c[i] <= 127)
GVTRACE ("%s%c", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
else
GVTRACE ("%s\\x%02x", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
}
GVTRACE (">");
}
void write_pmd_message_guid (struct q_globals * const gv, struct ddsi_guid *pp_guid, unsigned pmd_kind)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, gv);
struct participant *pp = ephash_lookup_participant_guid (gv->guid_hash, pp_guid);
if (pp == NULL)
GVTRACE ("write_pmd_message("PGUIDFMT") - builtin pmd writer not found\n", PGUID (*pp_guid));
else
write_pmd_message (ts1, NULL, pp, pmd_kind);
thread_state_asleep (ts1);
}
void write_pmd_message (struct thread_state1 * const ts1, struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind)
{
#define PMD_DATA_LENGTH 1
struct q_globals * const gv = pp->e.gv;
struct writer *wr;
union {
ParticipantMessageData_t pmd;
char pad[offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH];
} u;
struct ddsi_serdata *serdata;
struct ddsi_tkmap_instance *tk;
if ((wr = get_builtin_writer (pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER)) == NULL)
{
GVTRACE ("write_pmd_message("PGUIDFMT") - builtin pmd writer not found\n", PGUID (pp->e.guid));
return;
}
u.pmd.participantGuidPrefix = nn_hton_guid_prefix (pp->e.guid.prefix);
u.pmd.kind = ddsrt_toBE4u (pmd_kind);
u.pmd.length = PMD_DATA_LENGTH;
memset (u.pmd.value, 0, u.pmd.length);
struct ddsi_rawcdr_sample raw = {
.blob = &u,
.size = offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH,
.key = &u.pmd,
.keysize = 16
};
serdata = ddsi_serdata_from_sample (gv->rawcdr_topic, SDK_DATA, &raw);
serdata->timestamp = now ();
tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, serdata);
write_sample_nogc (ts1, xp, wr, serdata, tk);
ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
#undef PMD_DATA_LENGTH
}
void handle_pmd_message (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len)
{
const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */
const int bswap = (data->identifier == CDR_LE) ^ (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN);
struct proxy_participant *proxypp;
ddsi_guid_t ppguid;
struct lease *l;
RSTTRACE (" PMD ST%x", statusinfo);
if (data->identifier != CDR_LE && data->identifier != CDR_BE)
{
RSTTRACE (" PMD data->identifier %u !?\n", ntohs (data->identifier));
return;
}
switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER))
{
case 0:
if (offsetof (ParticipantMessageData_t, value) > len - sizeof (struct CDRHeader))
debug_print_rawdata (rst->gv, " SHORT1", data, len);
else
{
const ParticipantMessageData_t *pmd = (ParticipantMessageData_t *) (data + 1);
ddsi_guid_prefix_t p = nn_ntoh_guid_prefix (pmd->participantGuidPrefix);
uint32_t kind = ntohl (pmd->kind);
uint32_t length = bswap ? ddsrt_bswap4u (pmd->length) : pmd->length;
RSTTRACE (" pp %"PRIx32":%"PRIx32":%"PRIx32" kind %u data %u", p.u[0], p.u[1], p.u[2], kind, length);
if (len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value) < length)
debug_print_rawdata (rst->gv, " SHORT2", pmd->value, len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value));
else
debug_print_rawdata (rst->gv, "", pmd->value, length);
ppguid.prefix = p;
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if ((proxypp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL)
RSTTRACE (" PPunknown");
if (kind == PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE &&
(l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL)
{
/* Renew lease for entity with shortest manual-by-participant lease */
lease_renew (l, now_et ());
}
}
break;
case NN_STATUSINFO_DISPOSE:
case NN_STATUSINFO_UNREGISTER:
case NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER:
/* Serialized key; BE or LE doesn't matter as both fields are
defined as octets. */
if (len < sizeof (struct CDRHeader) + sizeof (ddsi_guid_prefix_t))
debug_print_rawdata (rst->gv, " SHORT3", data, len);
else
{
ppguid.prefix = nn_ntoh_guid_prefix (*((ddsi_guid_prefix_t *) (data + 1)));
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if (delete_proxy_participant_by_guid (rst->gv, &ppguid, timestamp, 0) < 0)
RSTTRACE (" unknown");
else
RSTTRACE (" delete");
}
break;
}
RSTTRACE ("\n");
}

View file

@ -43,6 +43,7 @@
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/q_feature_check.h"
#include "dds/ddsi/ddsi_pmd.h"
static int get_locator (const struct q_globals *gv, nn_locator_t *loc, const nn_locators_t *locs, int uc_same_subnet)
{
@ -486,10 +487,8 @@ static void make_participants_dependent_on_ddsi2 (struct q_globals *gv, const dd
{
struct ephash_enum_proxy_participant it;
struct proxy_participant *pp, *d2pp;
struct lease *d2pp_lease;
if ((d2pp = ephash_lookup_proxy_participant_guid (gv->guid_hash, ddsi2guid)) == NULL)
return;
d2pp_lease = ddsrt_atomic_ldvoidp (&d2pp->lease);
ephash_enum_proxy_participant_init (&it, gv->guid_hash);
while ((pp = ephash_enum_proxy_participant_next (&it)) != NULL)
{
@ -499,7 +498,7 @@ static void make_participants_dependent_on_ddsi2 (struct q_globals *gv, const dd
ddsrt_mutex_lock (&pp->e.lock);
pp->privileged_pp_guid = *ddsi2guid;
ddsrt_mutex_unlock (&pp->e.lock);
proxy_participant_reassign_lease (pp, d2pp_lease);
proxy_participant_reassign_lease (pp, d2pp->lease);
GVTRACE ("\n");
if (ephash_lookup_proxy_participant_guid (gv->guid_hash, ddsi2guid) == NULL)
@ -614,9 +613,10 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_
int interesting = 0;
RSTTRACE ("SPDP ST0 "PGUIDFMT" (known)", PGUID (datap->participant_guid));
/* SPDP processing is so different from normal processing that we are
even skipping the automatic lease renewal. Therefore do it regardless
of gv.config.arrival_of_data_asserts_pp_and_ep_liveliness. */
lease_renew (ddsrt_atomic_ldvoidp (&proxypp->lease), now_et ());
even skipping the automatic lease renewal. Note that proxy writers
that are not alive are not set alive here. This is done only when
data is received from a particular pwr (in handle_regular) */
lease_renew (ddsrt_atomic_ldvoidp (&proxypp->minl_auto), now_et ());
ddsrt_mutex_lock (&proxypp->e.lock);
if (proxypp->implicitly_created || seq > proxypp->seq)
{
@ -1247,7 +1247,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn
GVLOGDISC (" "PGUIDFMT" attach-to-DS "PGUIDFMT, PGUID(pp->e.guid), PGUIDPREFIX(*src_guid_prefix), pp->privileged_pp_guid.entityid.u);
ddsrt_mutex_lock (&pp->e.lock);
pp->privileged_pp_guid.prefix = *src_guid_prefix;
lease_set_expiry(ddsrt_atomic_ldvoidp(&pp->lease), never);
lease_set_expiry(pp->lease, never);
ddsrt_mutex_unlock (&pp->e.lock);
}
GVLOGDISC ("\n");
@ -1358,7 +1358,7 @@ static void handle_SEDP_dead (const struct receiver_state *rst, nn_plist_t *data
GVLOGDISC (" "PGUIDFMT, PGUID (datap->endpoint_guid));
if (is_writer_entityid (datap->endpoint_guid.entityid))
{
res = delete_proxy_writer (gv, &datap->endpoint_guid, timestamp, 0);
res = delete_proxy_writer (gv, &datap->endpoint_guid, timestamp, 0, false);
}
else
{
@ -1768,7 +1768,7 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str
handle_SEDP (sampleinfo->rst, sampleinfo->seq, timestamp, statusinfo, datap, datasz);
break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
handle_PMD (sampleinfo->rst, timestamp, statusinfo, datap, datasz);
handle_pmd_message (sampleinfo->rst, timestamp, statusinfo, datap, datasz);
break;
case NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER:
handle_SEDP_CM (sampleinfo->rst, srcguid.entityid, timestamp, statusinfo, datap, datasz);

View file

@ -13,6 +13,7 @@
#include <string.h>
#include <stddef.h>
#include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/log.h"
#include "dds/ddsrt/sockets.h"
@ -403,6 +404,16 @@ static void remove_deleted_participant_guid (struct deleted_participants_admin *
/* PARTICIPANT ------------------------------------------------------ */
static int compare_ldur (const void *va, const void *vb)
{
const struct ldur_fhnode *a = va;
const struct ldur_fhnode *b = vb;
return (a->ldur == b->ldur) ? 0 : (a->ldur < b->ldur) ? -1 : 1;
}
/* used in participant for keeping writer liveliness renewal */
const ddsrt_fibheap_def_t ldur_fhdef = DDSRT_FIBHEAPDEF_INITIALIZER(offsetof (struct ldur_fhnode, heapnode), compare_ldur);
static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, const dds_qos_t *xqos, nn_wctime_t timestamp)
{
uint64_t mask;
@ -515,6 +526,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
ddsrt_mutex_init (&pp->refc_lock);
inverse_uint32_set_init(&pp->avail_entityids.x, 1, UINT32_MAX / NN_ENTITYID_ALLOCSTEP);
pp->lease_duration = gv->config.lease_duration;
ddsrt_fibheap_init (&ldur_fhdef, &pp->ldur_auto_wr);
pp->plist = ddsrt_malloc (sizeof (*pp->plist));
nn_plist_copy (pp->plist, plist);
nn_plist_mergein_missing (pp->plist, &gv->default_local_plist_pp, ~(uint64_t)0, ~(uint64_t)0);
@ -1003,6 +1015,19 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
return ephash_lookup_writer_guid (pp->e.gv->guid_hash, &bwr_guid);
}
dds_duration_t pp_get_pmd_interval (struct participant *pp)
{
struct ldur_fhnode *ldur_node;
dds_duration_t intv;
ddsrt_mutex_lock (&pp->e.lock);
ldur_node = ddsrt_fibheap_min (&ldur_fhdef, &pp->ldur_auto_wr);
intv = (ldur_node != NULL) ? ldur_node->ldur : T_NEVER;
if (pp->lease_duration < intv)
intv = pp->lease_duration;
ddsrt_mutex_unlock (&pp->e.lock);
return intv;
}
/* WRITER/READER/PROXY-WRITER/PROXY-READER CONNECTION ---------------
These are all located in a separate section because they are so
@ -1463,14 +1488,14 @@ static void writer_drop_local_connection (const struct ddsi_guid *wr_guid, struc
}
}
static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr)
static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch)
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
{
struct rd_pwr_match *m;
ddsrt_mutex_lock (&rd->e.lock);
if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL)
if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL && unmatch)
ddsrt_avl_delete (&rd_writers_treedef, &rd->writers, m);
ddsrt_mutex_unlock (&rd->e.lock);
if (m != NULL)
@ -1491,11 +1516,15 @@ static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struc
data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
if (unmatch)
{
data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
(rd->status_cb) (rd->status_cb_entity, &data);
}
}
}
free_rd_pwr_match (pwr->e.gv, m);
if (unmatch)
free_rd_pwr_match (pwr->e.gv, m);
}
}
@ -2857,13 +2886,17 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
{
wr->heartbeat_xevent = NULL;
}
assert (wr->xqos->present & QP_LIVELINESS);
if (wr->xqos->liveliness.kind != DDS_LIVELINESS_AUTOMATIC || wr->xqos->liveliness.lease_duration != T_NEVER)
if (wr->xqos->liveliness.kind == DDS_LIVELINESS_AUTOMATIC && wr->xqos->liveliness.lease_duration != T_NEVER)
{
ELOGDISC (wr, "writer "PGUIDFMT": incorrectly treating it as of automatic liveliness kind with lease duration = inf (%d, %"PRId64")\n",
PGUID (wr->e.guid), (int) wr->xqos->liveliness.kind, wr->xqos->liveliness.lease_duration);
wr->lease_duration = ddsrt_malloc (sizeof(*wr->lease_duration));
wr->lease_duration->ldur = wr->xqos->liveliness.lease_duration;
}
else
{
wr->lease_duration = NULL;
}
wr->lease_duration = T_NEVER; /* FIXME */
wr->whc = whc;
if (wr->xqos->history.kind == DDS_HISTORY_KEEP_LAST)
@ -2930,10 +2963,18 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct ddsi_g
match_writer_with_local_readers (wr, tnow);
sedp_write_writer (wr);
if (wr->lease_duration != T_NEVER)
if (wr->lease_duration != NULL && wr->xqos->liveliness.kind == DDS_LIVELINESS_AUTOMATIC)
{
nn_mtime_t tsched = { 0 };
(void) resched_xevent_if_earlier (pp->pmd_update_xevent, tsched);
assert (wr->lease_duration->ldur != T_NEVER);
assert (!is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE));
/* Store writer lease duration in participant's heap in case of automatic liveliness */
ddsrt_mutex_lock (&pp->e.lock);
ddsrt_fibheap_insert (&ldur_fhdef, &pp->ldur_auto_wr, wr->lease_duration);
ddsrt_mutex_unlock (&pp->e.lock);
/* Trigger pmd update */
(void) resched_xevent_if_earlier (pp->pmd_update_xevent, now_mt ());
}
return 0;
@ -3026,6 +3067,11 @@ static void gc_delete_writer (struct gcreq *gcreq)
reader_drop_local_connection (&m->rd_guid, wr);
free_wr_rd_match (m);
}
if (wr->lease_duration != NULL)
{
assert (wr->lease_duration->ldur == DDS_DURATION_INVALID);
ddsrt_free (wr->lease_duration);
}
/* Do last gasp on SEDP and free writer. */
if (!is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE))
@ -3106,6 +3152,13 @@ dds_return_t delete_writer_nolinger_locked (struct writer *wr)
local_reader_ary_setinvalid (&wr->rdary);
ephash_remove_writer_guid (wr->e.gv->guid_hash, wr);
writer_set_state (wr, WRST_DELETING);
if (wr->lease_duration != NULL) {
ddsrt_mutex_lock (&wr->c.pp->e.lock);
ddsrt_fibheap_delete (&ldur_fhdef, &wr->c.pp->ldur_auto_wr, wr->lease_duration);
ddsrt_mutex_unlock (&wr->c.pp->e.lock);
wr->lease_duration->ldur = DDS_DURATION_INVALID;
resched_xevent_if_earlier (wr->c.pp->pmd_update_xevent, now_mt ());
}
gcreq_writer (wr);
return 0;
}
@ -3352,11 +3405,6 @@ static dds_return_t new_reader_guid
ddsi_rhc_set_qos (rd->rhc, rd->xqos);
}
assert (rd->xqos->present & QP_LIVELINESS);
if (rd->xqos->liveliness.kind != DDS_LIVELINESS_AUTOMATIC || rd->xqos->liveliness.lease_duration != T_NEVER)
{
ELOGDISC (rd, "reader "PGUIDFMT": incorrectly treating it as of automatic liveliness kind with lease duration = inf (%d, %"PRId64")\n",
PGUID (rd->e.guid), (int) rd->xqos->liveliness.kind, rd->xqos->liveliness.lease_duration);
}
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
rd->as = new_addrset ();
@ -3533,6 +3581,8 @@ void update_reader_qos (struct reader *rd, const dds_qos_t *xqos)
}
/* PROXY-PARTICIPANT ------------------------------------------------ */
const ddsrt_fibheap_def_t lease_fhdef_proxypp = DDSRT_FIBHEAPDEF_INITIALIZER(offsetof (struct lease, pp_heapnode), compare_lease_tdur);
static void gc_proxy_participant_lease (struct gcreq *gcreq)
{
lease_free (gcreq->arg);
@ -3558,13 +3608,99 @@ void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct
{
const nn_etime_t never = { T_NEVER };
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
struct lease *oldlease = ddsrt_atomic_ldvoidp (&proxypp->lease);
struct lease *oldlease = proxypp->lease;
lease_renew (oldlease, never);
gcreq->arg = oldlease;
gcreq_enqueue (gcreq);
proxypp->owns_lease = 0;
}
ddsrt_atomic_stvoidp (&proxypp->lease, newlease);
proxypp->lease = newlease;
/* FIXME: replace proxypp lease in leaseheap_auto? */
ddsrt_mutex_unlock (&proxypp->e.lock);
}
static void proxy_participant_replace_minl (struct proxy_participant *proxypp, bool manbypp, struct lease *lnew)
{
const nn_etime_t never = { T_NEVER };
struct gcreq *gcreq = gcreq_new (proxypp->e.gv->gcreq_queue, gc_proxy_participant_lease);
struct lease *lease_old = (struct lease *) ddsrt_atomic_ldvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto);
lease_renew (lease_old, never);
gcreq->arg = (void *) lease_old;
gcreq_enqueue (gcreq);
ddsrt_atomic_stvoidp (manbypp ? &proxypp->minl_man : &proxypp->minl_auto, (void *) lnew);
}
static void proxy_participant_add_pwr_lease (struct proxy_participant * proxypp, const struct proxy_writer * pwr)
{
struct lease *minl_prev;
struct lease *minl_new;
ddsrt_fibheap_t *lh;
bool manbypp;
assert (pwr->lease != NULL);
manbypp = (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT);
ddsrt_mutex_lock (&proxypp->e.lock);
lh = manbypp ? &proxypp->leaseheap_man : &proxypp->leaseheap_auto;
minl_prev = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
ddsrt_fibheap_insert (&lease_fhdef_proxypp, lh, pwr->lease);
minl_new = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
/* if inserted lease is new shortest lease */
if (proxypp->owns_lease && minl_prev != minl_new)
{
nn_etime_t texp = add_duration_to_etime (now_et (), minl_new->tdur);
struct lease *lnew = lease_new (texp, minl_new->tdur, minl_new->entity);
if (manbypp && minl_prev == NULL)
{
assert (ddsrt_atomic_ldvoidp (&proxypp->minl_man) == NULL);
ddsrt_atomic_stvoidp (&proxypp->minl_man, (void *) lnew);
}
else
{
proxy_participant_replace_minl (proxypp, manbypp, lnew);
}
lease_register (lnew);
}
ddsrt_mutex_unlock (&proxypp->e.lock);
}
static void proxy_participant_remove_pwr_lease_locked (struct proxy_participant * proxypp, struct proxy_writer * pwr)
{
struct lease *minl;
bool manbypp;
ddsrt_fibheap_t *lh;
assert (pwr->lease != NULL);
manbypp = (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_PARTICIPANT);
lh = manbypp ? &proxypp->leaseheap_man : &proxypp->leaseheap_auto;
minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh);
ddsrt_fibheap_delete (&lease_fhdef_proxypp, lh, pwr->lease);
/* if pwr with min lease is removed: update proxypp lease to use new minimal duration */
if (proxypp->owns_lease && pwr->lease == minl)
{
if ((minl = ddsrt_fibheap_min (&lease_fhdef_proxypp, lh)) != NULL)
{
dds_duration_t trem = minl->tdur - pwr->lease->tdur;
nn_etime_t texp = add_duration_to_etime (now_et(), trem >= 0 ? trem : 0);
struct lease *lnew = lease_new (texp, minl->tdur, minl->entity);
proxy_participant_replace_minl (proxypp, manbypp, lnew);
lease_register (lnew);
}
else if (manbypp)
{
proxy_participant_replace_minl (proxypp, manbypp, NULL);
}
else
{
/* minl should not be null for leaseheap_auto because proxypp's lease is in */
assert (false);
}
}
}
static void proxy_participant_remove_pwr_lease (struct proxy_participant * proxypp, struct proxy_writer * pwr)
{
ddsrt_mutex_lock (&proxypp->e.lock);
proxy_participant_remove_pwr_lease_locked (proxypp, pwr);
ddsrt_mutex_unlock (&proxypp->e.lock);
}
@ -3628,16 +3764,36 @@ void new_proxy_participant
privpp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &proxypp->privileged_pp_guid);
if (privpp != NULL && privpp->is_ddsi2_pp)
{
ddsrt_atomic_stvoidp (&proxypp->lease, ddsrt_atomic_ldvoidp (&privpp->lease));
proxypp->lease = privpp->lease;
proxypp->owns_lease = 0;
}
else
{
/* Lease duration is meaningless when the lease never expires, but when proxy participants are created implicitly because of endpoint discovery from a cloud service, we do want the lease to expire eventually when the cloud discovery service disappears and never reappears. The normal data path renews the lease, so if the lease expiry is changed after the DS disappears but data continues to flow (even if it is only a single sample) the proxy participant would immediately go back to a non-expiring lease with no further triggers for deleting it. Instead, we take tlease_dur == NEVER as a special value meaning a lease that doesn't expire now and that has a "reasonable" lease duration. That way the lease renewal in the data path is fine, and we only need to do something special in SEDP handling. */
/* Lease duration is meaningless when the lease never expires, but when proxy participants are
created implicitly because of endpoint discovery from a cloud service, we do want the lease to expire
eventually when the cloud discovery service disappears and never reappears. The normal data path renews
the lease, so if the lease expiry is changed after the DS disappears but data continues to flow (even if
it is only a single sample) the proxy participant would immediately go back to a non-expiring lease with
no further triggers for deleting it. Instead, we take tlease_dur == NEVER as a special value meaning a
lease that doesn't expire now and that has a "reasonable" lease duration. That way the lease renewal in
the data path is fine, and we only need to do something special in SEDP handling. */
nn_etime_t texp = add_duration_to_etime (now_et(), tlease_dur);
dds_duration_t dur = (tlease_dur == T_NEVER) ? gv->config.lease_duration : tlease_dur;
ddsrt_atomic_stvoidp (&proxypp->lease, lease_new (texp, dur, &proxypp->e));
proxypp->lease = lease_new (texp, dur, &proxypp->e);
proxypp->owns_lease = 1;
/* Init heap for leases */
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_auto);
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_man);
/* Add the proxypp lease to heap so that monitoring liveliness will include this lease
and uses the shortest duration for proxypp and all its pwr's (with automatic liveliness) */
ddsrt_fibheap_insert (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
/* Set the shortest lease for auto liveliness: clone proxypp's lease (as there are no pwr's
at this point, this is the shortest lease) */
ddsrt_atomic_stvoidp (&proxypp->minl_auto, (void *) lease_clone (proxypp->lease));
ddsrt_atomic_stvoidp (&proxypp->minl_man, NULL);
}
}
@ -3746,15 +3902,12 @@ void new_proxy_participant
nn_plist_fini (&plist_rd);
}
/* Register lease, but be careful not to accidentally re-register
DDSI2's lease, as we may have become dependent on DDSI2 any time
after ephash_insert_proxy_participant_guid even if
privileged_pp_guid was NULL originally */
/* Register lease for auto liveliness, but be careful not to accidentally re-register
DDSI2's lease, as we may have become dependent on DDSI2 any time after
ephash_insert_proxy_participant_guid even if privileged_pp_guid was NULL originally */
ddsrt_mutex_lock (&proxypp->e.lock);
if (proxypp->owns_lease)
lease_register (ddsrt_atomic_ldvoidp (&proxypp->lease));
lease_register (ddsrt_atomic_ldvoidp (&proxypp->minl_auto));
builtintopic_write (gv->builtin_topic_interface, &proxypp->e, timestamp, true);
ddsrt_mutex_unlock (&proxypp->e.lock);
}
@ -3840,7 +3993,15 @@ static void unref_proxy_participant (struct proxy_participant *proxypp, struct p
nn_plist_fini (proxypp->plist);
ddsrt_free (proxypp->plist);
if (proxypp->owns_lease)
lease_free (ddsrt_atomic_ldvoidp (&proxypp->lease));
{
ddsrt_mutex_lock (&proxypp->e.lock);
ddsrt_fibheap_delete (&lease_fhdef_proxypp, &proxypp->leaseheap_auto, proxypp->lease);
assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_auto) == NULL);
assert (ddsrt_fibheap_min (&lease_fhdef_proxypp, &proxypp->leaseheap_man) == NULL);
ddsrt_mutex_unlock (&proxypp->e.lock);
lease_free (ddsrt_atomic_ldvoidp (&proxypp->minl_auto));
lease_free (proxypp->lease);
}
entity_common_fini (&proxypp->e);
remove_deleted_participant_guid (proxypp->e.gv->deleted_participants, &proxypp->e.guid, DPG_LOCAL | DPG_REMOTE);
ddsrt_free (proxypp);
@ -3901,7 +4062,8 @@ static void delete_or_detach_dependent_pp (struct proxy_participant *p, struct p
/* Clear dependency (but don't touch entity id, which must be 0x1c1) and set the lease ticking */
ELOGDISC (p, PGUIDFMT" detach-from-DS "PGUIDFMT"\n", PGUID(p->e.guid), PGUID(proxypp->e.guid));
memset (&p->privileged_pp_guid.prefix, 0, sizeof (p->privileged_pp_guid.prefix));
lease_set_expiry (ddsrt_atomic_ldvoidp (&p->lease), texp);
lease_set_expiry (p->lease, texp);
/* FIXME: replace in p->leaseheap_auto and get new minl_auto */
ddsrt_mutex_unlock (&p->e.lock);
}
}
@ -3936,7 +4098,7 @@ static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp
struct entity_common *e = entity_common_from_proxy_endpoint_common (c);
if (is_writer_entityid (e->guid.entityid))
{
ret = delete_proxy_writer (proxypp->e.gv, &e->guid, timestamp, isimplicit);
ret = delete_proxy_writer (proxypp->e.gv, &e->guid, timestamp, isimplicit, true);
}
else
{
@ -4084,6 +4246,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
pwr->last_fragnum = ~0u;
pwr->nackfragcount = 0;
pwr->last_fragnum_reset = 0;
pwr->alive = 1;
ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1);
if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) {
/* The DDSI built-in proxy writers always deliver
@ -4107,17 +4270,13 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
#endif
assert (pwr->c.xqos->present & QP_LIVELINESS);
if (pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_AUTOMATIC)
GVLOGDISC (" FIXME: only AUTOMATIC liveliness supported");
#if 0
pwr->tlease_dur = nn_from_ddsi_duration (pwr->c.xqos->liveliness.lease_duration);
if (pwr->tlease_dur == 0)
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
{
GVLOGDISC (" FIXME: treating lease_duration=0 as inf");
pwr->tlease_dur = T_NEVER;
nn_etime_t texpire = add_duration_to_etime (now_et (), pwr->c.xqos->liveliness.lease_duration);
pwr->lease = lease_new (texpire, pwr->c.xqos->liveliness.lease_duration, &pwr->e);
if (pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxy_participant_add_pwr_lease (proxypp, pwr);
}
pwr->tlease_end = add_duration_to_wctime (tnow, pwr->tlease_dur);
#endif
if (isreliable)
{
@ -4251,12 +4410,11 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
struct proxy_writer *pwr = gcreq->arg;
ELOGDISC (pwr, "gc_delete_proxy_writer(%p, "PGUIDFMT")\n", (void *) gcreq, PGUID (pwr->e.guid));
gcreq_free (gcreq);
while (!ddsrt_avl_is_empty (&pwr->readers))
{
struct pwr_rd_match *m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers);
ddsrt_avl_delete (&pwr_readers_treedef, &pwr->readers, m);
reader_drop_connection (&m->rd_guid, pwr);
reader_drop_connection (&m->rd_guid, pwr, true);
update_reader_init_acknack_count (&pwr->e.gv->logconfig, pwr->e.gv->guid_hash, &m->rd_guid, m->count);
free_pwr_rd_match (m);
}
@ -4267,7 +4425,7 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
ddsrt_free (pwr);
}
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit)
int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit, bool proxypp_locked)
{
struct proxy_writer *pwr;
(void)isimplicit;
@ -4288,10 +4446,54 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (gv->guid_hash, pwr);
ddsrt_mutex_unlock (&gv->lock);
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER)
{
ddsrt_mutex_lock (&pwr->e.lock);
if (pwr->alive && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
proxypp_locked ? proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr) : proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr);
ddsrt_mutex_unlock (&pwr->e.lock);
lease_free (pwr->lease);
}
gcreq_proxy_writer (pwr);
return 0;
}
int proxy_writer_set_alive_locked (struct q_globals *gv, struct proxy_writer *pwr, bool alive)
{
ddsrt_avl_iter_t it;
GVLOGDISC ("proxy_writer_set_alive_locked ("PGUIDFMT") ", PGUID (pwr->e.guid));
assert (pwr->alive != alive);
pwr->alive = alive;
GVLOGDISC ("- alive=%d\n", pwr->alive);
if (!pwr->alive)
{
for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
reader_drop_connection (&m->rd_guid, pwr, false);
}
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC)
pwr->alive ? proxy_participant_add_pwr_lease (pwr->c.proxypp, pwr) : proxy_participant_remove_pwr_lease (pwr->c.proxypp, pwr);
return 0;
}
int proxy_writer_set_alive_guid (struct q_globals *gv, const struct ddsi_guid *guid, bool alive)
{
struct proxy_writer *pwr;
int ret;
ddsrt_mutex_lock (&gv->lock);
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
{
ddsrt_mutex_unlock (&gv->lock);
GVLOGDISC ("proxy_writer_set_alive_guid ("PGUIDFMT") - unknown\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
}
ddsrt_mutex_unlock (&gv->lock);
ddsrt_mutex_lock (&pwr->e.lock);
ret = proxy_writer_set_alive_locked (gv, pwr, alive);
ddsrt_mutex_unlock (&pwr->e.lock);
return ret;
}
/* PROXY-READER ----------------------------------------------------- */
int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, const struct ddsi_guid *guid, struct addrset *as, const nn_plist_t *plist, nn_wctime_t timestamp, seqno_t seq

View file

@ -43,30 +43,27 @@
!= 0 -- and note that it had better be 2's complement machine! */
#define TSCHED_NOT_ON_HEAP INT64_MIN
struct lease {
ddsrt_fibheap_node_t heapnode;
nn_etime_t tsched; /* access guarded by leaseheap_lock */
ddsrt_atomic_uint64_t tend; /* really an nn_etime_t */
dds_duration_t tdur; /* constant (renew depends on it) */
struct entity_common *entity; /* constant */
};
static int compare_lease_tsched (const void *va, const void *vb);
static const ddsrt_fibheap_def_t lease_fhdef = DDSRT_FIBHEAPDEF_INITIALIZER(offsetof (struct lease, heapnode), compare_lease_tsched);
const ddsrt_fibheap_def_t lease_fhdef = DDSRT_FIBHEAPDEF_INITIALIZER (offsetof (struct lease, heapnode), compare_lease_tsched);
static void force_lease_check (struct gcreq_queue *gcreq_queue)
{
gcreq_enqueue (gcreq_new (gcreq_queue, gcreq_free));
}
static int compare_lease_tsched (const void *va, const void *vb)
int compare_lease_tsched (const void *va, const void *vb)
{
const struct lease *a = va;
const struct lease *b = vb;
return (a->tsched.v == b->tsched.v) ? 0 : (a->tsched.v < b->tsched.v) ? -1 : 1;
}
int compare_lease_tdur (const void *va, const void *vb)
{
const struct lease *a = va;
const struct lease *b = vb;
return (a->tdur == b->tdur) ? 0 : (a->tdur < b->tdur) ? -1 : 1;
}
void lease_management_init (struct q_globals *gv)
{
ddsrt_mutex_init (&gv->leaseheap_lock);
@ -92,6 +89,20 @@ struct lease *lease_new (nn_etime_t texpire, dds_duration_t tdur, struct entity_
return l;
}
/**
* Returns a clone of the provided lease. Note that this function does not use
* locking and should therefore only be called from a context where lease 'l'
* cannot be changed by another thread during the function call.
*/
struct lease *lease_clone (const struct lease *l)
{
nn_etime_t texp;
dds_duration_t tdur;
texp.v = (int64_t) ddsrt_atomic_ld64 (&l->tend);
tdur = l->tdur;
return lease_new (texp, tdur, l->entity);
}
void lease_register (struct lease *l) /* FIXME: make lease admin struct */
{
struct q_globals * const gv = l->entity->gv;
@ -116,7 +127,10 @@ void lease_free (struct lease *l)
GVTRACE ("lease_free(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid));
ddsrt_mutex_lock (&gv->leaseheap_lock);
if (l->tsched.v != TSCHED_NOT_ON_HEAP)
{
ddsrt_fibheap_delete (&lease_fhdef, &gv->leaseheap, l);
l->tsched.v = TSCHED_NOT_ON_HEAP;
}
ddsrt_mutex_unlock (&gv->leaseheap_lock);
ddsrt_free (l);
@ -126,7 +140,7 @@ void lease_free (struct lease *l)
void lease_renew (struct lease *l, nn_etime_t tnowE)
{
struct q_globals const * const gv = l->entity->gv;
struct q_globals const * gv;
nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur);
/* do not touch tend if moving forward or if already expired */
@ -137,6 +151,7 @@ void lease_renew (struct lease *l, nn_etime_t tnowE)
return;
} while (!ddsrt_atomic_cas64 (&l->tend, (uint64_t) tend, (uint64_t) tend_new.v));
gv = l->entity->gv;
if (gv->logconfig.c.mask & DDS_LC_TRACE)
{
int32_t tsec, tusec;
@ -253,23 +268,17 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
switch (k)
{
case EK_PARTICIPANT:
delete_participant (gv, &g);
break;
case EK_PROXY_PARTICIPANT:
delete_proxy_participant_by_guid (gv, &g, now(), 1);
break;
case EK_WRITER:
delete_writer_nolinger (gv, &g);
break;
case EK_PROXY_WRITER:
(void) delete_proxy_writer (gv, &g, now(), 1);
(void) proxy_writer_set_alive_guid (gv, &g, false);
break;
case EK_PARTICIPANT:
case EK_READER:
delete_reader (gv, &g);
break;
case EK_WRITER:
case EK_PROXY_READER:
(void) delete_proxy_reader (gv, &g, now(), 1);
assert (false);
break;
}
@ -281,83 +290,3 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
return delay;
}
/******/
static void debug_print_rawdata (const struct q_globals *gv, const char *msg, const void *data, size_t len)
{
const unsigned char *c = data;
size_t i;
GVTRACE ("%s<", msg);
for (i = 0; i < len; i++)
{
if (32 < c[i] && c[i] <= 127)
GVTRACE ("%s%c", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
else
GVTRACE ("%s\\x%02x", (i > 0 && (i%4) == 0) ? " " : "", c[i]);
}
GVTRACE (">");
}
void handle_PMD (const struct receiver_state *rst, nn_wctime_t timestamp, uint32_t statusinfo, const void *vdata, uint32_t len)
{
const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */
const int bswap = (data->identifier == CDR_LE) ^ (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN);
struct proxy_participant *pp;
ddsi_guid_t ppguid;
RSTTRACE (" PMD ST%x", statusinfo);
if (data->identifier != CDR_LE && data->identifier != CDR_BE)
{
RSTTRACE (" PMD data->identifier %u !?\n", ntohs (data->identifier));
return;
}
switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER))
{
case 0:
if (offsetof (ParticipantMessageData_t, value) > len - sizeof (struct CDRHeader))
debug_print_rawdata (rst->gv, " SHORT1", data, len);
else
{
const ParticipantMessageData_t *pmd = (ParticipantMessageData_t *) (data + 1);
ddsi_guid_prefix_t p = nn_ntoh_guid_prefix (pmd->participantGuidPrefix);
uint32_t kind = ntohl (pmd->kind);
uint32_t length = bswap ? ddsrt_bswap4u (pmd->length) : pmd->length;
RSTTRACE (" pp %"PRIx32":%"PRIx32":%"PRIx32" kind %u data %u", p.u[0], p.u[1], p.u[2], kind, length);
if (len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value) < length)
debug_print_rawdata (rst->gv, " SHORT2", pmd->value, len - sizeof (struct CDRHeader) - offsetof (ParticipantMessageData_t, value));
else
debug_print_rawdata (rst->gv, "", pmd->value, length);
ppguid.prefix = p;
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if ((pp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL)
RSTTRACE (" PPunknown");
else
{
/* Renew lease if arrival of this message didn't already do so, also renew the lease
of the virtual participant used for DS-discovered endpoints */
#if 0 // FIXME: superfluous ... receipt of the message already did it */
lease_renew (ddsrt_atomic_ldvoidp (&pp->lease), now_et ());
#endif
}
}
break;
case NN_STATUSINFO_DISPOSE:
case NN_STATUSINFO_UNREGISTER:
case NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER:
/* Serialized key; BE or LE doesn't matter as both fields are
defined as octets. */
if (len < sizeof (struct CDRHeader) + sizeof (ddsi_guid_prefix_t))
debug_print_rawdata (rst->gv, " SHORT3", data, len);
else
{
ppguid.prefix = nn_ntoh_guid_prefix (*((ddsi_guid_prefix_t *) (data + 1)));
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if (delete_proxy_participant_by_guid (rst->gv, &ppguid, timestamp, 0) < 0)
RSTTRACE (" unknown");
else
RSTTRACE (" delete");
}
break;
}
RSTTRACE ("\n");
}

View file

@ -668,9 +668,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow);
if (!wr->reliable) /* note: reliability can't be changed */
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer!)", PGUID (src), PGUID (dst));
@ -1134,10 +1132,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC,
BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
RSTTRACE (PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst));
ddsrt_mutex_lock (&pwr->e.lock);
@ -1271,9 +1266,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst));
ddsrt_mutex_lock (&pwr->e.lock);
@ -1398,9 +1391,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->minl_auto), tnow);
if (!wr->reliable) /* note: reliability can't be changed */
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" not a reliable writer)", PGUID (src), PGUID (dst));
@ -1645,9 +1636,7 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
ddsrt_mutex_lock (&pwr->e.lock);
if ((wn = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst)) == NULL)
{
@ -2098,11 +2087,12 @@ static void clean_defrag (struct proxy_writer *pwr)
nn_defrag_notegap (pwr->defrag, 1, seq);
}
static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup)
static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease)
{
struct proxy_writer *pwr;
struct nn_rsample *rsample;
ddsi_guid_t dst;
struct lease *lease;
dst.prefix = rst->dst_guid_prefix;
dst.entityid = msg->readerId;
@ -2117,14 +2107,21 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
return;
}
/* liveliness is still only implemented partially (with all set to
AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy
participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow);
if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL && renew_manbypp_lease)
lease_renew (lease, tnow);
/* Shouldn't lock the full writer, but will do so for now */
ddsrt_mutex_lock (&pwr->e.lock);
/* FIXME: implement liveliness manual-by-topic */
/* if (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
lease_renew (pwr->lease, tnow);
*/
if (!pwr->alive)
proxy_writer_set_alive_locked (pwr->e.gv, pwr, true);
/* Don't accept data when reliable readers exist and we haven't yet seen
a heartbeat telling us what the "current" sequence number of the writer
is. If no reliable readers are present, we can't request a heartbeat and
@ -2194,7 +2191,10 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
/* Enqueue or deliver with pwr->e.lock held: to ensure no other
receive thread's data gets interleaved -- arguably delivery
needn't be exactly in-order, which would allow us to do this
without pwr->e.lock held. */
without pwr->e.lock held.
Note that PMD is also handled here, but the pwr for PMD does not
use no synchronous delivery, so deliver_user_data_synchronously
(which asserts pwr is not built-in) is not used for PMD handling. */
if (pwr->deliver_synchronously)
{
/* FIXME: just in case the synchronous delivery runs into a delay caused
@ -2367,24 +2367,32 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r
unsigned submsg_offset, payload_offset;
submsg_offset = (unsigned) ((unsigned char *) msg - NN_RMSG_PAYLOAD (rmsg));
if (datap)
{
payload_offset = (unsigned) ((unsigned char *) datap - NN_RMSG_PAYLOAD (rmsg));
}
else
{
payload_offset = submsg_offset + (unsigned) size;
}
rdata = nn_rdata_new (rmsg, 0, sampleinfo->size, submsg_offset, payload_offset);
if (msg->x.writerId.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
/* SPDP needs special treatment: there are no proxy writers for it
and we accept data from unknown sources */
if ((msg->x.writerId.u & NN_ENTITYID_SOURCE_MASK) == NN_ENTITYID_SOURCE_BUILTIN)
{
handle_SPDP (sampleinfo, rdata);
bool renew_manbypp_lease = true;
switch (msg->x.writerId.u)
{
case NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER:
/* SPDP needs special treatment: there are no proxy writers for it and we accept data from unknown sources */
handle_SPDP (sampleinfo, rdata);
break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
/* Handle PMD as a regular message, but without renewing the leases on proxypp */
renew_manbypp_lease = false;
/* fall through */
default:
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, renew_manbypp_lease);
}
}
else
{
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup);
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, true);
}
}
RSTTRACE (")");
@ -2411,12 +2419,19 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct
struct nn_rdata *rdata;
unsigned submsg_offset, payload_offset;
uint32_t begin, endp1;
if (msg->x.writerId.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
bool renew_manbypp_lease = true;
if ((msg->x.writerId.u & NN_ENTITYID_SOURCE_MASK) == NN_ENTITYID_SOURCE_BUILTIN)
{
DDS_CWARNING (&rst->gv->logconfig, "DATAFRAG("PGUIDFMT" #%"PRId64" -> "PGUIDFMT") - fragmented builtin data not yet supported\n",
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, fromSN (msg->x.writerSN),
PGUIDPREFIX (rst->dst_guid_prefix), msg->x.readerId.u);
return 1;
switch (msg->x.writerId.u)
{
case NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER:
DDS_CWARNING (&rst->gv->logconfig, "DATAFRAG("PGUIDFMT" #%"PRId64" -> "PGUIDFMT") - fragmented builtin data not yet supported\n",
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, fromSN (msg->x.writerSN),
PGUIDPREFIX (rst->dst_guid_prefix), msg->x.readerId.u);
return 1;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
renew_manbypp_lease = false;
}
}
submsg_offset = (unsigned) ((unsigned char *) msg - NN_RMSG_PAYLOAD (rmsg));
@ -2457,7 +2472,7 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct
wrong, it'll simply generate a request for retransmitting a
non-existent fragment. The other side SHOULD be capable of
dealing with that. */
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata, deferred_wakeup);
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata, deferred_wakeup, renew_manbypp_lease);
}
RSTTRACE (")");
return 1;

View file

@ -37,9 +37,11 @@
#include "dds/ddsi/q_bitset.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_pmd.h"
#include "dds__whc.h"
#include "dds/ddsi/sysdeps.h"
@ -1090,44 +1092,6 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
}
}
static void write_pmd_message (struct thread_state1 * const ts1, struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind)
{
#define PMD_DATA_LENGTH 1
struct q_globals * const gv = pp->e.gv;
struct writer *wr;
union {
ParticipantMessageData_t pmd;
char pad[offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH];
} u;
struct ddsi_serdata *serdata;
struct ddsi_tkmap_instance *tk;
if ((wr = get_builtin_writer (pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER)) == NULL)
{
GVTRACE ("write_pmd_message("PGUIDFMT") - builtin pmd writer not found\n", PGUID (pp->e.guid));
return;
}
u.pmd.participantGuidPrefix = nn_hton_guid_prefix (pp->e.guid.prefix);
u.pmd.kind = ddsrt_toBE4u (pmd_kind);
u.pmd.length = PMD_DATA_LENGTH;
memset (u.pmd.value, 0, u.pmd.length);
struct ddsi_rawcdr_sample raw = {
.blob = &u,
.size = offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH,
.key = &u.pmd,
.keysize = 16
};
serdata = ddsi_serdata_from_sample (gv->rawcdr_topic, SDK_DATA, &raw);
serdata->timestamp = now ();
tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, serdata);
write_sample_nogc (ts1, xp, wr, serdata, tk);
ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
#undef PMD_DATA_LENGTH
}
static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow)
{
struct q_globals * const gv = ev->evq->gv;
@ -1142,16 +1106,7 @@ static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_
write_pmd_message (ts1, xp, pp, PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
/* QoS changes can't change lease durations. So the only thing that
could cause trouble here is that the addition or removal of a
writer cause the interval to change for this participant. If we
lock pp for reading out the lease duration we are guaranteed a
consistent value (can't assume 64-bit atomic reads on all support
platforms!) */
ddsrt_mutex_lock (&pp->e.lock);
intv = pp->lease_duration;
/* FIXME: need to use smallest liveliness duration of all automatic-liveliness writers */
intv = pp_get_pmd_interval (pp);
if (intv == T_NEVER)
{
tnext.v = T_NEVER;
@ -1169,7 +1124,6 @@ static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_
}
(void) resched_xevent_if_earlier (ev, tnext);
ddsrt_mutex_unlock (&pp->e.lock);
}
static void handle_xevk_delete_writer (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, UNUSED_ARG (nn_mtime_t tnow))

View file

@ -59,6 +59,9 @@ typedef int64_t dds_duration_t;
/** @name Infinite timeout for relative time */
#define DDS_INFINITY ((dds_duration_t) INT64_MAX)
/** @name Invalid duration value */
#define DDS_DURATION_INVALID ((dds_duration_t) INT64_MIN)
/** @name Macro definition for time conversion to nanoseconds
@{**/
#define DDS_SECS(n) ((n) * DDS_NSECS_IN_SEC)