Update local delivery code for multiple sertopics

This also removes the code duplication for the handling delivery from
local vs remote writers.  (And it adds a test.)

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-01-30 16:11:14 +01:00 committed by eboasson
parent 27d7c72626
commit d92d491b83
13 changed files with 1205 additions and 189 deletions

View file

@ -288,6 +288,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = {
.eqkey = serdata_builtin_eqkey,
.free = serdata_builtin_free,
.from_ser = 0,
.from_ser_iov = 0,
.from_keyhash = ddsi_serdata_builtin_from_keyhash,
.from_sample = 0,
.to_ser = serdata_builtin_to_ser,

View file

@ -25,6 +25,7 @@
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_deliver_locally.h"
dds_return_t dds_write (dds_entity_t writer, const void *data)
{
@ -71,80 +72,103 @@ dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t tim
return ret;
}
static dds_return_t try_store (struct ddsi_rhc *rhc, const struct ddsi_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms)
static struct reader *writer_first_in_sync_reader (struct entity_index *entity_index, struct entity_common *wrcmn, ddsrt_avl_iter_t *it)
{
while (! ddsi_rhc_store (rhc, pwr_info, payload, tk))
assert (wrcmn->kind == EK_WRITER);
struct writer *wr = (struct writer *) wrcmn;
struct wr_rd_match *m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, it);
return m ? entidx_lookup_reader_guid (entity_index, &m->rd_guid) : NULL;
}
static struct reader *writer_next_in_sync_reader (struct entity_index *entity_index, ddsrt_avl_iter_t *it)
{
struct wr_rd_match *m = ddsrt_avl_iter_next (it);
return m ? entidx_lookup_reader_guid (entity_index, &m->rd_guid) : NULL;
}
struct local_sourceinfo {
const struct ddsi_sertopic *src_topic;
struct ddsi_serdata *src_payload;
struct ddsi_tkmap_instance *src_tk;
nn_mtime_t timeout;
};
static struct ddsi_serdata *local_make_sample (struct ddsi_tkmap_instance **tk, struct q_globals *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo)
{
struct local_sourceinfo *si = vsourceinfo;
if (topic == si->src_topic)
{
if (*max_block_ms > 0)
{
dds_sleepfor (DDS_HEADBANG_TIMEOUT);
*max_block_ms -= DDS_HEADBANG_TIMEOUT;
*tk = si->src_tk;
/* FIXME: see if this pair of refc increments can't be avoided
They're needed because free_sample_after_delivery will always be called, but
in the common case of a local writer and a single sertopic, make_sample doesn't
actually create a sample, and so free_sample_after_delivery doesn't actually
have to free anything */
ddsi_tkmap_instance_ref (si->src_tk);
return ddsi_serdata_ref (si->src_payload);
}
else
{
/* ouch ... convert a serdata from one sertopic to another ... */
ddsrt_iovec_t iov;
uint32_t size = ddsi_serdata_size (si->src_payload);
(void) ddsi_serdata_to_ser_ref (si->src_payload, 0, size, &iov);
struct ddsi_serdata *d = ddsi_serdata_from_ser_iov (topic, si->src_payload->kind, 1, &iov, size);
ddsi_serdata_to_ser_unref (si->src_payload, &iov);
if (d)
{
d->statusinfo = si->src_payload->statusinfo;
d->timestamp = si->src_payload->timestamp;
*tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, d);
}
else
{
DDS_CWARNING (&gv->logconfig, "local: deserialization %s/%s failed in topic type conversion\n", topic->name, topic->type_name);
}
return d;
}
}
static dds_return_t local_on_delivery_failure_fastpath (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo)
{
(void) fastpath_rdary;
(void) source_entity_locked;
assert (source_entity->kind == EK_WRITER);
struct writer *wr = (struct writer *) source_entity;
struct local_sourceinfo *si = vsourceinfo;
nn_mtime_t tnow = now_mt ();
if (si->timeout.v == 0)
si->timeout = add_duration_to_mtime (tnow, wr->xqos->reliability.max_blocking_time);
if (tnow.v >= si->timeout.v)
return DDS_RETCODE_TIMEOUT;
}
}
else
{
dds_sleepfor (DDS_HEADBANG_TIMEOUT);
return DDS_RETCODE_OK;
}
}
static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk)
{
dds_return_t ret = DDS_RETCODE_OK;
ddsrt_mutex_lock (&wr->rdary.rdary_lock);
if (wr->rdary.fastpath_ok)
{
struct reader ** const rdary = wr->rdary.rdary;
if (rdary[0])
{
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
struct ddsi_writer_info pwr_info;
ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos, payload->statusinfo);
for (uint32_t i = 0; rdary[i]; i++) {
DDS_CTRACE (&wr->e.gv->logconfig, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
break;
}
}
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
}
else
{
/* When deleting, pwr is no longer accessible via the hash
tables, and consequently, a reader may be deleted without
it being possible to remove it from rdary. The primary
reason rdary exists is to avoid locking the proxy writer
but this is less of an issue when we are deleting it, so
we fall back to using the GUIDs so that we can deliver all
samples we received from it. As writer being deleted any
reliable samples that are rejected are simply discarded. */
ddsrt_avl_iter_t it;
struct pwr_rd_match *m;
static const struct deliver_locally_ops deliver_locally_ops = {
.makesample = local_make_sample,
.first_reader = writer_first_in_sync_reader,
.next_reader = writer_next_in_sync_reader,
.on_failure_fastpath = local_on_delivery_failure_fastpath
};
struct local_sourceinfo sourceinfo = {
.src_topic = wr->topic,
.src_payload = payload,
.src_tk = tk,
.timeout = { 0 },
};
dds_return_t rc;
struct ddsi_writer_info wrinfo;
const struct entity_index *gh = wr->e.gv->entity_index;
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo);
ddsrt_mutex_lock (&wr->e.lock);
for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
DDS_CTRACE (&wr->e.gv->logconfig, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
/* Copied the return value ignore from DDSI deliver_user_data () function. */
if ((ret = try_store (rd->rhc, &wrinfo, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
break;
}
}
ddsrt_mutex_unlock (&wr->e.lock);
}
if (ret == DDS_RETCODE_TIMEOUT)
{
rc = deliver_locally_allinsync (wr->e.gv, &wr->e, false, &wr->rdary, &wrinfo, &deliver_locally_ops, &sourceinfo);
if (rc == DDS_RETCODE_TIMEOUT)
DDS_CERROR (&wr->e.gv->logconfig, "The writer could not deliver data on time, probably due to a local reader resources being full\n");
}
return ret;
return rc;
}
dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action)

View file

@ -30,6 +30,7 @@ set(ddsc_test_sources
"instance_get_key.c"
"listener.c"
"liveliness.c"
"multi_sertopic.c"
"participant.c"
"publisher.c"
"qos.c"

View file

@ -0,0 +1,609 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "Space.h"
#include "config_env.h"
#include "dds/version.h"
#include "dds__entity.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsrt/cdtors.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/environ.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/time.h"
#define DDS_DOMAINID_PUB 0
#define DDS_DOMAINID_SUB 1
#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
#define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}<Tracing><OutputFile>cyclonedds_multi_sertopic_tests.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.log</OutputFile><Verbosity>finest</Verbosity></Tracing><Discovery><ExternalDomainId>0</ExternalDomainId></Discovery>"
/* IDL preprocessing is not really friendly towards creating multiple descriptors
for the same type name with different definitions, so we do it by hand. */
struct uint32_seq {
uint32_t _maximum;
uint32_t _length;
uint32_t *_buffer;
bool _release;
};
struct two_uint32 {
uint32_t v[2];
};
struct two_uint32_seq {
uint32_t _maximum;
uint32_t _length;
struct two_uint32 *_buffer;
bool _release;
};
struct type_seq {
struct uint32_seq x;
};
struct type_ary {
uint32_t x[4];
};
struct type_uni {
uint32_t _d;
union
{
struct two_uint32_seq a;
uint32_t b[4];
} _u;
};
static const dds_topic_descriptor_t type_seq_desc =
{
.m_size = sizeof (struct type_seq),
.m_align = sizeof (void *),
.m_flagset = DDS_TOPIC_NO_OPTIMIZE,
.m_nkeys = 0,
.m_typename = "multi_sertopic_type",
.m_keys = NULL,
.m_nops = 2,
.m_ops = (const uint32_t[]) {
DDS_OP_ADR | DDS_OP_TYPE_SEQ | DDS_OP_SUBTYPE_4BY, offsetof (struct type_seq, x),
DDS_OP_RTS
},
.m_meta = "" /* this is on its way out anyway */
};
static const dds_topic_descriptor_t type_ary_desc =
{
.m_size = sizeof (struct type_ary),
.m_align = 4u,
.m_flagset = DDS_TOPIC_NO_OPTIMIZE,
.m_nkeys = 0,
.m_typename = "multi_sertopic_type",
.m_keys = NULL,
.m_nops = 2,
.m_ops = (const uint32_t[]) {
DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_4BY, offsetof (struct type_ary, x), 4,
DDS_OP_RTS
},
.m_meta = "" /* this is on its way out anyway */
};
static const dds_topic_descriptor_t type_uni_desc =
{
.m_size = sizeof (struct type_uni),
.m_align = sizeof (void *),
.m_flagset = DDS_TOPIC_NO_OPTIMIZE | DDS_TOPIC_CONTAINS_UNION,
.m_nkeys = 0,
.m_typename = "multi_sertopic_type",
.m_keys = NULL,
.m_nops = 8,
.m_ops = (const uint32_t[]) {
DDS_OP_ADR | DDS_OP_TYPE_UNI | DDS_OP_SUBTYPE_4BY | DDS_OP_FLAG_DEF, offsetof (struct type_uni, _d), 2u, (23u << 16) + 4u,
DDS_OP_JEQ | DDS_OP_TYPE_SEQ | 6, 3, offsetof (struct type_uni, _u.a),
DDS_OP_JEQ | DDS_OP_TYPE_ARR | 12, 0, offsetof (struct type_uni, _u.b),
DDS_OP_ADR | DDS_OP_TYPE_SEQ | DDS_OP_SUBTYPE_STU, 0u,
sizeof (struct two_uint32), (8u << 16u) + 4u,
DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_4BY, offsetof (struct two_uint32, v), 2,
DDS_OP_RTS,
DDS_OP_RTS,
DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_4BY, 0u, 4,
DDS_OP_RTS,
DDS_OP_RTS
},
.m_meta = "" /* this is on its way out anyway */
};
/* The slow delivery path has a switchover at 4 sertopics (well, today it has ...) so it is better to
to test with > 4 different sertopics. That path (again, today) iterates over GUIDs in increasing
order, and as all readers are created in the participant and the entity ids are strictly
monotonically increasing for the first ~ 16M entities (again, today), creating additional
readers for these topics at the end means that "ary2" is the one that ends up in > 4 case.
Calling takecdr */
static const dds_topic_descriptor_t type_ary1_desc =
{
.m_size = sizeof (struct type_ary),
.m_align = 1u,
.m_flagset = DDS_TOPIC_NO_OPTIMIZE,
.m_nkeys = 0,
.m_typename = "multi_sertopic_type",
.m_keys = NULL,
.m_nops = 2,
.m_ops = (const uint32_t[]) {
DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_1BY, offsetof (struct type_ary, x), 16,
DDS_OP_RTS
},
.m_meta = "" /* this is on its way out anyway */
};
static const dds_topic_descriptor_t type_ary2_desc =
{
.m_size = sizeof (struct type_ary),
.m_align = 2u,
.m_flagset = DDS_TOPIC_NO_OPTIMIZE,
.m_nkeys = 0,
.m_typename = "multi_sertopic_type",
.m_keys = NULL,
.m_nops = 2,
.m_ops = (const uint32_t[]) {
DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_2BY, offsetof (struct type_ary, x), 8,
DDS_OP_RTS
},
.m_meta = "" /* this is on its way out anyway */
};
static uint32_t g_topic_nr = 0;
static dds_entity_t g_pub_domain = 0;
static dds_entity_t g_pub_participant = 0;
static dds_entity_t g_pub_publisher = 0;
static dds_entity_t g_sub_domain = 0;
static dds_entity_t g_sub_participant = 0;
static dds_entity_t g_sub_subscriber = 0;
static char *create_topic_name (const char *prefix, uint32_t nr, char *name, size_t size)
{
/* Get unique g_topic name. */
ddsrt_pid_t pid = ddsrt_getpid();
ddsrt_tid_t tid = ddsrt_gettid();
(void) snprintf (name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid);
return name;
}
static void multi_sertopic_init (void)
{
/* Domains for pub and sub use a different domain id, but the portgain setting
* in configuration is 0, so that both domains will map to the same port number.
* This allows to create two domains in a single test process. */
char *conf_pub = ddsrt_expand_envvars (DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB);
char *conf_sub = ddsrt_expand_envvars (DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB);
g_pub_domain = dds_create_domain (DDS_DOMAINID_PUB, conf_pub);
g_sub_domain = dds_create_domain (DDS_DOMAINID_SUB, conf_sub);
dds_free (conf_pub);
dds_free (conf_sub);
g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL);
CU_ASSERT_FATAL (g_pub_participant > 0);
g_sub_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL);
CU_ASSERT_FATAL (g_sub_participant > 0);
g_pub_publisher = dds_create_publisher(g_pub_participant, NULL, NULL);
CU_ASSERT_FATAL (g_pub_publisher > 0);
g_sub_subscriber = dds_create_subscriber(g_sub_participant, NULL, NULL);
CU_ASSERT_FATAL (g_sub_subscriber > 0);
}
static void multi_sertopic_fini (void)
{
dds_delete (g_sub_subscriber);
dds_delete (g_pub_publisher);
dds_delete (g_sub_participant);
dds_delete (g_pub_participant);
dds_delete (g_sub_domain);
dds_delete (g_pub_domain);
}
static bool get_and_check_writer_status (size_t nwr, const dds_entity_t *wrs, size_t nrd)
{
dds_return_t rc;
struct dds_publication_matched_status x;
for (size_t i = 0; i < nwr; i++)
{
rc = dds_get_publication_matched_status (wrs[i], &x);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
if (x.current_count != nrd)
return false;
}
return true;
}
static bool get_and_check_reader_status (size_t nrd, const dds_entity_t *rds, size_t nwr)
{
dds_return_t rc;
struct dds_subscription_matched_status x;
for (size_t i = 0; i < nrd; i++)
{
rc = dds_get_subscription_matched_status (rds[i], &x);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
if (x.current_count != nwr)
return false;
}
return true;
}
static void waitfor_or_reset_fastpath (dds_entity_t rdhandle, bool fastpath, size_t nwr)
{
dds_return_t rc;
struct dds_entity *x;
rc = dds_entity_pin (rdhandle, &x);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
CU_ASSERT_FATAL (dds_entity_kind (x) == DDS_KIND_READER);
struct reader * const rd = ((struct dds_reader *) x)->m_rd;
struct rd_pwr_match *m;
ddsi_guid_t cursor;
size_t wrcount = 0;
thread_state_awake (lookup_thread_state (), rd->e.gv);
ddsrt_mutex_lock (&rd->e.lock);
memset (&cursor, 0, sizeof (cursor));
while ((m = ddsrt_avl_lookup_succ (&rd_writers_treedef, &rd->writers, &cursor)) != NULL)
{
cursor = m->pwr_guid;
ddsrt_mutex_unlock (&rd->e.lock);
struct proxy_writer * const pwr = entidx_lookup_proxy_writer_guid (rd->e.gv->entity_index, &cursor);
ddsrt_mutex_lock (&pwr->rdary.rdary_lock);
if (!fastpath)
pwr->rdary.fastpath_ok = false;
else
{
while (!pwr->rdary.fastpath_ok)
{
ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
dds_sleepfor (DDS_MSECS (10));
ddsrt_mutex_lock (&pwr->rdary.rdary_lock);
}
}
wrcount++;
ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
ddsrt_mutex_lock (&rd->e.lock);
}
memset (&cursor, 0, sizeof (cursor));
while ((m = ddsrt_avl_lookup_succ (&rd_local_writers_treedef, &rd->local_writers, &cursor)) != NULL)
{
cursor = m->pwr_guid;
ddsrt_mutex_unlock (&rd->e.lock);
struct writer * const wr = entidx_lookup_writer_guid (rd->e.gv->entity_index, &cursor);
ddsrt_mutex_lock (&wr->rdary.rdary_lock);
if (!fastpath)
wr->rdary.fastpath_ok = fastpath;
else
{
while (!wr->rdary.fastpath_ok)
{
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
dds_sleepfor (DDS_MSECS (10));
ddsrt_mutex_lock (&wr->rdary.rdary_lock);
}
}
wrcount++;
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
ddsrt_mutex_lock (&rd->e.lock);
}
ddsrt_mutex_unlock (&rd->e.lock);
thread_state_asleep (lookup_thread_state ());
dds_entity_unpin (x);
CU_ASSERT_FATAL (wrcount == nwr);
}
static struct ddsi_sertopic *get_sertopic_from_reader (dds_entity_t reader)
{
/* not refcounting the sertopic: so this presumes it is kept alive for other reasons */
dds_return_t rc;
struct dds_entity *x;
struct dds_reader *rd;
struct ddsi_sertopic *sertopic;
rc = dds_entity_pin (reader, &x);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
CU_ASSERT_FATAL (dds_entity_kind (x) == DDS_KIND_READER);
rd = (struct dds_reader *) x;
sertopic = rd->m_topic->m_stopic;
dds_entity_unpin (x);
return sertopic;
}
static void logsink (void *arg, const dds_log_data_t *msg)
{
ddsrt_atomic_uint32_t *deser_fail = arg;
fputs (msg->message - msg->hdrsize, stderr);
if (strstr (msg->message, "deserialization") && strstr (msg->message, "failed"))
ddsrt_atomic_inc32 (deser_fail);
}
static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, bool fastpath)
{
#define SEQ_IDX 0
#define ARY_IDX 1
#define UNI_IDX 2
char name[100];
static const dds_topic_descriptor_t *descs[] = {
&type_seq_desc, &type_ary_desc, &type_uni_desc,
&type_ary1_desc, &type_ary2_desc
};
dds_entity_t pub_topics[3], writers[3];
dds_entity_t sub_topics[5];
dds_entity_t readers[15];
dds_entity_t waitset;
dds_qos_t *qos;
dds_return_t rc;
printf ("multi_sertopic: %s %s\n", (pp_pub == pp_sub) ? "local" : "remote", fastpath ? "fastpath" : "slowpath");
waitset = dds_create_waitset (DDS_CYCLONEDDS_HANDLE);
CU_ASSERT_FATAL (waitset > 0);
qos = dds_create_qos ();
CU_ASSERT_FATAL (qos != NULL);
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP);
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
create_topic_name ("ddsc_multi_sertopic_lease_duration_zero", g_topic_nr++, name, sizeof name);
for (size_t i = 0; i < sizeof (pub_topics) / sizeof (pub_topics[0]); i++)
{
pub_topics[i] = dds_create_topic (pp_pub, descs[i], name, qos, NULL);
CU_ASSERT_FATAL (pub_topics[i] > 0);
}
for (size_t i = 0; i < sizeof (writers) / sizeof (writers[0]); i++)
{
writers[i] = dds_create_writer (pp_pub, pub_topics[i], qos, NULL);
CU_ASSERT_FATAL (writers[i] > 0);
}
for (size_t i = 0; i < sizeof (sub_topics) / sizeof (sub_topics[0]); i++)
{
sub_topics[i] = dds_create_topic (pp_sub, descs[i], name, qos, NULL);
CU_ASSERT_FATAL (sub_topics[i] > 0);
}
DDSRT_STATIC_ASSERT (sizeof (readers) >= sizeof (sub_topics));
DDSRT_STATIC_ASSERT ((sizeof (readers) % sizeof (sub_topics)) == 0);
for (size_t i = 0; i < sizeof (sub_topics) / sizeof (sub_topics[0]); i++)
{
readers[i] = dds_create_reader (pp_sub, sub_topics[i], qos, NULL);
CU_ASSERT_FATAL (readers[i] > 0);
}
for (size_t i = sizeof (sub_topics) / sizeof (sub_topics[0]); i < sizeof (readers) / sizeof (readers[0]); i++)
{
const size_t nrd = sizeof (readers) / sizeof (readers[0]);
const size_t ntp = sizeof (sub_topics) / sizeof (sub_topics[0]);
readers[i] = dds_create_reader (pp_sub, sub_topics[(i - ntp) / (nrd / ntp - 1)], qos, NULL);
CU_ASSERT_FATAL (readers[i] > 0);
}
dds_delete_qos (qos);
/* wait for discovery to complete */
for (size_t i = 0; i < sizeof (writers) / sizeof (writers[0]); i++)
{
rc = dds_set_status_mask (writers[i], DDS_PUBLICATION_MATCHED_STATUS);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_waitset_attach (waitset, writers[i], -(dds_attach_t)i - 1);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
}
for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++)
{
rc = dds_set_status_mask (readers[i], DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_waitset_attach (waitset, readers[i], (dds_attach_t)i);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
}
printf ("wait for discovery, fastpath_ok; delete & recreate readers\n");
while (!(get_and_check_writer_status (sizeof (writers) / sizeof (writers[0]), writers, sizeof (readers) / sizeof (readers[0])) &&
get_and_check_reader_status (sizeof (readers) / sizeof (readers[0]), readers, sizeof (writers) / sizeof (writers[0]))))
{
rc = dds_waitset_wait (waitset, NULL, 0, DDS_SECS(5));
CU_ASSERT_FATAL (rc >= 1);
}
/* we want to check both the fast path and the slow path ... so first wait
for it to be set on all (proxy) writers, then possibly reset it */
for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++)
waitfor_or_reset_fastpath (readers[i], true, sizeof (writers) / sizeof (writers[0]));
if (!fastpath)
{
printf ("clear fastpath_ok\n");
for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++)
waitfor_or_reset_fastpath (readers[i], false, sizeof (writers) / sizeof (writers[0]));
}
/* check the log output for deserialization failures */
ddsrt_atomic_uint32_t deser_fail = DDSRT_ATOMIC_UINT32_INIT (0);
dds_set_log_sink (logsink, &deser_fail);
/* Write one of each type: all of these samples result in the same serialised
form but interpreting the memory layout for type X as-if it were of type Y
wreaks havoc. */
{
struct type_seq s = {
.x = {
._length = 3, ._maximum = 3, ._release = false, ._buffer = (uint32_t[]) { 1, 4, 2 }
}
};
struct type_ary a = {
.x = { 3, 1, 4, 2 }
};
struct type_uni u = {
._d = 3,
._u = { .a = {
._length = 1, ._maximum = 1, ._release = false, ._buffer = (struct two_uint32[]) { { { 4, 2 } } }
} }
};
printf ("writing ...\n");
rc = dds_write_ts (writers[SEQ_IDX], &s, 1);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_write_ts (writers[ARY_IDX], &a, 2);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_write_ts (writers[UNI_IDX], &u, 3);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
/* Also write a sample that can't be deserialised by the other types */
struct type_seq s1 = {
.x = {
._length = 1, ._maximum = 1, ._release = false, ._buffer = (uint32_t[]) { 1 }
}
};
rc = dds_write_ts (writers[SEQ_IDX], &s1, 4);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
}
/* All readers should have received three samples, and those that are of type seq
should have received one extra (whereas the others should cause deserialization
failure warnings) */
printf ("reading\n");
const size_t nexp = ((sizeof (writers) / sizeof (writers[0])) *
(sizeof (readers) / sizeof (readers[0])) +
((sizeof (readers) / sizeof (readers[0])) / (sizeof (sub_topics) / sizeof (sub_topics[0]))));
/* expecting exactly as many deserialization failures as there are topics other than seq */
const size_t nexp_fail = sizeof (sub_topics) / sizeof (sub_topics[0]) - 1;
uint32_t nseen = 0;
while (nseen < nexp)
{
dds_sample_info_t si;
rc = dds_waitset_wait (waitset, NULL, 0, DDS_SECS (5));
CU_ASSERT_FATAL (rc >= 1);
{
struct type_seq s = { .x = { 0 } };
void *raws[] = { &s };
while (dds_take (readers[SEQ_IDX], raws, &si, 1, 1) == 1)
{
if (!si.valid_data)
continue;
printf ("recv: seq %"PRId64"\n", si.source_timestamp);
if (si.source_timestamp == 4)
{
CU_ASSERT_FATAL (s.x._length == 1);
CU_ASSERT_FATAL (s.x._buffer[0] == 1);
}
else
{
CU_ASSERT_FATAL (si.source_timestamp >= 1 && si.source_timestamp <= 3);
CU_ASSERT_FATAL (s.x._length == 3);
CU_ASSERT_FATAL (s.x._buffer[0] == 1);
CU_ASSERT_FATAL (s.x._buffer[1] == 4);
CU_ASSERT_FATAL (s.x._buffer[2] == 2);
}
nseen++;
}
dds_free (s.x._buffer);
}
{
struct type_ary a;
void *rawa[] = { &a };
while (dds_take (readers[ARY_IDX], rawa, &si, 1, 1) == 1)
{
if (!si.valid_data)
continue;
printf ("recv: ary %"PRId64"\n", si.source_timestamp);
CU_ASSERT_FATAL (si.source_timestamp >= 1 && si.source_timestamp <= 3);
CU_ASSERT_FATAL (a.x[0] == 3);
CU_ASSERT_FATAL (a.x[1] == 1);
CU_ASSERT_FATAL (a.x[2] == 4);
CU_ASSERT_FATAL (a.x[3] == 2);
nseen++;
}
}
{
struct type_uni u = { ._u.a = { 0 } };
void *rawu[] = { &u };
while (dds_take (readers[UNI_IDX], rawu, &si, 1, 1) == 1)
{
if (!si.valid_data)
continue;
printf ("recv: uni %"PRId64"\n", si.source_timestamp);
CU_ASSERT_FATAL (si.source_timestamp >= 1 && si.source_timestamp <= 3);
CU_ASSERT_FATAL (u._d == 3);
CU_ASSERT_FATAL (u._u.a._length == 1);
assert (u._u.a._buffer != NULL); /* for Clang static analyzer */
CU_ASSERT_FATAL (u._u.a._buffer[0].v[0] == 4);
CU_ASSERT_FATAL (u._u.a._buffer[0].v[1] == 2);
dds_free (u._u.a._buffer);
u._u.a._buffer = NULL;
nseen++;
}
}
DDSRT_STATIC_ASSERT (((1u << SEQ_IDX) | (1u << ARY_IDX) | (1u << UNI_IDX)) == 7);
for (size_t i = 3; i < sizeof (readers) / sizeof (readers[0]); i++)
{
struct ddsi_serdata *sample;
while (dds_takecdr (readers[i], &sample, 1, &si, DDS_ANY_STATE) == 1)
{
if (!si.valid_data)
continue;
printf ("recv: reader %zu %"PRId64"\n", i, si.source_timestamp);
CU_ASSERT_FATAL (sample->topic == get_sertopic_from_reader (readers[i]));
ddsi_serdata_unref (sample);
nseen++;
}
}
}
CU_ASSERT_FATAL (nseen == nexp);
/* data from remote writers can cause a deserialization failure after all
expected samples have been seen (becasue it is written last); so wait
for them */
while (ddsrt_atomic_ld32 (&deser_fail) < nexp_fail)
dds_sleepfor (DDS_MSECS (10));
CU_ASSERT_FATAL (ddsrt_atomic_ld32 (&deser_fail) == nexp_fail);
/* deleting the waitset is important: it is bound to the library rather than to
a domain and consequently won't be deleted simply because all domains are */
rc = dds_delete (waitset);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
dds_set_log_sink (0, NULL);
}
CU_Test(ddsc_multi_sertopic, local, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
{
ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, true);
}
CU_Test(ddsc_multi_sertopic, remote, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
{
ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, true);
}
CU_Test(ddsc_multi_sertopic, local_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
{
ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, false);
}
CU_Test(ddsc_multi_sertopic, remote_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
{
ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, false);
}

View file

@ -31,6 +31,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_pmd.c
ddsi_entity_index.c
ddsi_deadline.c
ddsi_deliver_locally.c
q_addrset.c
q_bitset_inlines.c
q_bswap.c
@ -92,6 +93,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
ddsi_guid.h
ddsi_entity_index.h
ddsi_deadline.h
ddsi_deliver_locally.h
q_addrset.h
q_bitset.h
q_bswap.h

View file

@ -0,0 +1,62 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_DELIVER_LOCALLY_H
#define DDSI_DELIVER_LOCALLY_H
#include <stdint.h>
#include <stdbool.h>
#include "dds/export.h"
#include "dds/ddsrt/retcode.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsi/ddsi_guid.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct q_globals;
struct ddsi_tkmap_instance;
struct ddsi_sertopic;
struct ddsi_serdata;
struct entity_index;
struct reader;
struct entity_common;
struct ddsi_writer_info;
struct local_reader_ary;
typedef struct ddsi_serdata * (*deliver_locally_makesample_t) (struct ddsi_tkmap_instance **tk, struct q_globals *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo);
typedef struct reader * (*deliver_locally_first_reader_t) (struct entity_index *entity_index, struct entity_common *source_entity, ddsrt_avl_iter_t *it);
typedef struct reader * (*deliver_locally_next_reader_t) (struct entity_index *entity_index, ddsrt_avl_iter_t *it);
/** return:
- DDS_RETCODE_OK to try again immediately
- DDS_RETCODE_TRY_AGAIN to complete restart the operation later
- anything else: error to be returned from deliver_locally_xxx */
typedef dds_return_t (*deliver_locally_on_failure_fastpath_t) (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo);
struct deliver_locally_ops {
deliver_locally_makesample_t makesample;
deliver_locally_first_reader_t first_reader;
deliver_locally_next_reader_t next_reader;
deliver_locally_on_failure_fastpath_t on_failure_fastpath;
};
dds_return_t deliver_locally_one (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, const ddsi_guid_t *rdguid, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo);
dds_return_t deliver_locally_allinsync (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo);
#if defined (__cplusplus)
}
#endif
#endif /* DDSI_DELIVER_LOCALLY_H */

View file

@ -61,6 +61,9 @@ typedef void (*ddsi_serdata_free_t) (struct ddsi_serdata *d);
- FIXME: get the encoding header out of the serialised data */
typedef struct ddsi_serdata * (*ddsi_serdata_from_ser_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size);
/* Exactly like ddsi_serdata_from_ser_t, but with the data in an iovec and guaranteed absence of overlap */
typedef struct ddsi_serdata * (*ddsi_serdata_from_ser_iov_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size);
/* Construct a serdata from a keyhash (an SDK_KEY by definition) */
typedef struct ddsi_serdata * (*ddsi_serdata_from_keyhash_t) (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash);
@ -138,6 +141,7 @@ struct ddsi_serdata_ops {
ddsi_serdata_eqkey_t eqkey;
ddsi_serdata_size_t get_size;
ddsi_serdata_from_ser_t from_ser;
ddsi_serdata_from_ser_iov_t from_ser_iov;
ddsi_serdata_from_keyhash_t from_keyhash;
ddsi_serdata_from_sample_t from_sample;
ddsi_serdata_to_ser_t to_ser;
@ -173,6 +177,10 @@ DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_
return topic->serdata_ops->from_ser (topic, kind, fragchain, size);
}
DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_ser_iov (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size) {
return topic->serdata_ops->from_ser_iov (topic, kind, niov, iov, size);
}
DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash) {
return topic->serdata_ops->from_keyhash (topic, keyhash);
}

View file

@ -185,7 +185,7 @@ struct local_reader_ary {
unsigned valid: 1; /* always true until (proxy-)writer is being deleted; !valid => !fastpath_ok */
unsigned fastpath_ok: 1; /* if not ok, fall back to using GUIDs (gives access to the reader-writer match data for handling readers that bumped into resource limits, hence can flip-flop, unlike "valid") */
uint32_t n_readers;
struct reader **rdary; /* for efficient delivery, null-pointer terminated */
struct reader **rdary; /* for efficient delivery, null-pointer terminated, grouped by topic */
};
struct avail_entityid_set {
@ -427,12 +427,12 @@ struct proxy_reader {
ddsrt_avl_tree_t writers; /* matching LOCAL writers */
};
extern const ddsrt_avl_treedef_t wr_readers_treedef;
extern const ddsrt_avl_treedef_t wr_local_readers_treedef;
extern const ddsrt_avl_treedef_t rd_writers_treedef;
extern const ddsrt_avl_treedef_t rd_local_writers_treedef;
extern const ddsrt_avl_treedef_t pwr_readers_treedef;
extern const ddsrt_avl_treedef_t prd_writers_treedef;
DDS_EXPORT extern const ddsrt_avl_treedef_t wr_readers_treedef;
DDS_EXPORT extern const ddsrt_avl_treedef_t wr_local_readers_treedef;
DDS_EXPORT extern const ddsrt_avl_treedef_t rd_writers_treedef;
DDS_EXPORT extern const ddsrt_avl_treedef_t rd_local_writers_treedef;
DDS_EXPORT extern const ddsrt_avl_treedef_t pwr_readers_treedef;
DDS_EXPORT extern const ddsrt_avl_treedef_t prd_writers_treedef;
extern const ddsrt_avl_treedef_t deleted_participants_treedef;
#define DPG_LOCAL 1

View file

@ -0,0 +1,269 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <stdlib.h>
#include "dds/ddsrt/log.h"
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsi/ddsi_deliver_locally.h"
#include "dds/ddsi/ddsi_sertopic.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_rhc.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_entity.h"
#define TOPIC_SAMPLE_CACHE_SIZE 4
struct ddsi_sertopic;
struct ddsi_serdata;
struct ddsi_tkmap_instance;
struct topic_sample_cache_entry {
struct ddsi_serdata *sample;
struct ddsi_tkmap_instance *tk;
};
struct topic_sample_cache_large_entry {
ddsrt_avl_node_t avlnode;
const struct ddsi_sertopic *topic;
struct ddsi_serdata *sample;
struct ddsi_tkmap_instance *tk;
};
struct topic_sample_cache {
uint32_t n;
const struct ddsi_sertopic *topics[TOPIC_SAMPLE_CACHE_SIZE];
struct topic_sample_cache_entry samples[TOPIC_SAMPLE_CACHE_SIZE];
ddsrt_avl_tree_t overflow;
};
static int cmp_topic_ptrs (const void *va, const void *vb)
{
uintptr_t a = (uintptr_t) va;
uintptr_t b = (uintptr_t) vb;
return (a == b) ? 0 : (a < b) ? -1 : 1;
}
static const ddsrt_avl_treedef_t tsc_large_td = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct topic_sample_cache_large_entry, avlnode), offsetof (struct topic_sample_cache_large_entry, topic), cmp_topic_ptrs, 0);
static void free_sample_after_store (struct q_globals *gv, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk)
{
if (sample)
{
ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
ddsi_serdata_unref (sample);
}
}
static void topic_sample_cache_init (struct topic_sample_cache * __restrict tsc)
{
tsc->n = 0;
ddsrt_avl_init (&tsc_large_td, &tsc->overflow);
}
static void free_large_entry (void *vnode, void *varg)
{
struct topic_sample_cache_large_entry *e = vnode;
struct q_globals *gv = varg;
free_sample_after_store (gv, e->sample, e->tk);
ddsrt_free (e);
}
static void topic_sample_cache_fini (struct topic_sample_cache * __restrict tsc, struct q_globals *gv)
{
for (uint32_t i = 0; i < tsc->n && i < TOPIC_SAMPLE_CACHE_SIZE; i++)
if (tsc->topics[i] && tsc->samples[i].tk)
free_sample_after_store (gv, tsc->samples[i].sample, tsc->samples[i].tk);
ddsrt_avl_free_arg (&tsc_large_td, &tsc->overflow, free_large_entry, gv);
}
static bool topic_sample_cache_lookup (struct ddsi_serdata ** __restrict sample, struct ddsi_tkmap_instance ** __restrict tk, struct topic_sample_cache * __restrict tsc, const struct ddsi_sertopic *topic)
{
/* linear scan of an array of pointers should be pretty fast */
for (uint32_t i = 0; i < tsc->n && i < TOPIC_SAMPLE_CACHE_SIZE; i++)
{
if (tsc->topics[i] == topic)
{
*tk = tsc->samples[i].tk;
*sample = tsc->samples[i].sample;
return true;
}
}
struct topic_sample_cache_large_entry *e;
if ((e = ddsrt_avl_lookup (&tsc_large_td, &tsc->overflow, topic)) != NULL)
{
*tk = e->tk;
*sample = e->sample;
return true;
}
return false;
}
static void topic_sample_cache_store (struct topic_sample_cache * __restrict tsc, const struct ddsi_sertopic *topic, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk)
{
if (tsc->n < TOPIC_SAMPLE_CACHE_SIZE)
{
tsc->topics[tsc->n] = topic;
tsc->samples[tsc->n].tk = tk;
tsc->samples[tsc->n].sample = sample;
}
else
{
struct topic_sample_cache_large_entry *e = ddsrt_malloc (sizeof (*e));
e->topic = topic;
e->tk = tk;
e->sample = sample;
ddsrt_avl_insert (&tsc_large_td, &tsc->overflow, e);
}
tsc->n++;
}
dds_return_t deliver_locally_one (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, const ddsi_guid_t *rdguid, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, rdguid);
if (rd == NULL)
return DDS_RETCODE_OK;
struct ddsi_serdata *payload;
struct ddsi_tkmap_instance *tk;
if ((payload = ops->makesample (&tk, gv, rd->topic, vsourceinfo)) != NULL)
{
EETRACE (source_entity, " =>"PGUIDFMT"\n", PGUID (*rdguid));
/* FIXME: why look up rd,pwr again? Their states remains valid while the thread stays
"awake" (although a delete can be initiated), and blocking like this is a stopgap
anyway -- quite possibly to abort once either is deleted */
while (!ddsi_rhc_store (rd->rhc, wrinfo, payload, tk))
{
if (source_entity_locked)
ddsrt_mutex_unlock (&source_entity->lock);
dds_sleepfor (DDS_MSECS (1));
if (source_entity_locked)
ddsrt_mutex_lock (&source_entity->lock);
if (entidx_lookup_reader_guid (gv->entity_index, rdguid) == NULL ||
entidx_lookup_guid_untyped (gv->entity_index, &source_entity->guid) == NULL)
{
/* give up when reader or proxy writer no longer accessible */
break;
}
}
free_sample_after_store (gv, payload, tk);
}
return DDS_RETCODE_OK;
}
static dds_return_t deliver_locally_slowpath (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
/* When deleting, pwr is no longer accessible via the hash
tables, and consequently, a reader may be deleted without
it being possible to remove it from rdary. The primary
reason rdary exists is to avoid locking the proxy writer
but this is less of an issue when we are deleting it, so
we fall back to using the GUIDs so that we can deliver all
samples we received from it. As writer being deleted any
reliable samples that are rejected are simply discarded. */
struct topic_sample_cache tsc;
ddsrt_avl_iter_t it;
struct reader *rd;
topic_sample_cache_init (&tsc);
if (!source_entity_locked)
ddsrt_mutex_lock (&source_entity->lock);
rd = ops->first_reader (gv->entity_index, source_entity, &it);
if (rd != NULL)
EETRACE (source_entity, " =>");
while (rd != NULL)
{
struct ddsi_serdata *payload;
struct ddsi_tkmap_instance *tk;
if (!topic_sample_cache_lookup (&payload, &tk, &tsc, rd->topic))
{
payload = ops->makesample (&tk, gv, rd->topic, vsourceinfo);
topic_sample_cache_store (&tsc, rd->topic, payload, tk);
}
/* check payload to allow for deserialisation failures */
if (payload)
{
EETRACE (source_entity, " "PGUIDFMT, PGUID (rd->e.guid));
(void) ddsi_rhc_store (rd->rhc, wrinfo, payload, tk);
}
rd = ops->next_reader (gv->entity_index, &it);
}
EETRACE (source_entity, "\n");
if (!source_entity_locked)
ddsrt_mutex_unlock (&source_entity->lock);
topic_sample_cache_fini (&tsc, gv);
return DDS_RETCODE_OK;
}
static dds_return_t deliver_locally_fastpath (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
struct reader ** const rdary = fastpath_rdary->rdary;
uint32_t i = 0;
while (rdary[i])
{
struct ddsi_sertopic const * const topic = rdary[i]->topic;
struct ddsi_serdata *payload;
struct ddsi_tkmap_instance *tk;
if ((payload = ops->makesample (&tk, gv, topic, vsourceinfo)) == NULL)
{
/* malformed payload: skip all readers with the same topic */
while (rdary[++i] && rdary[i]->topic == topic)
; /* do nothing */
}
else
{
do {
dds_return_t rc;
while (!ddsi_rhc_store (rdary[i]->rhc, wrinfo, payload, tk))
{
if ((rc = ops->on_failure_fastpath (source_entity, source_entity_locked, fastpath_rdary, vsourceinfo)) != DDS_RETCODE_OK)
{
free_sample_after_store (gv, payload, tk);
return rc;
}
}
} while (rdary[++i] && rdary[i]->topic == topic);
free_sample_after_store (gv, payload, tk);
}
}
return DDS_RETCODE_OK;
}
dds_return_t deliver_locally_allinsync (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo)
{
dds_return_t rc;
/* FIXME: Retry loop for re-delivery of rejected reliable samples is a bad hack
should instead throttle back the writer by skipping acknowledgement and retry */
do {
ddsrt_mutex_lock (&fastpath_rdary->rdary_lock);
if (fastpath_rdary->fastpath_ok)
{
EETRACE (source_entity, " => EVERYONE\n");
if (fastpath_rdary->rdary[0])
rc = deliver_locally_fastpath (gv, source_entity, source_entity_locked, fastpath_rdary, wrinfo, ops, vsourceinfo);
else
rc = DDS_RETCODE_OK;
ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock);
}
else
{
ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock);
rc = deliver_locally_slowpath (gv, source_entity, source_entity_locked, wrinfo, ops, vsourceinfo);
}
} while (rc == DDS_RETCODE_TRY_AGAIN);
return rc;
}

View file

@ -36,6 +36,7 @@ extern inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *
extern inline void ddsi_serdata_unref (struct ddsi_serdata *serdata);
extern inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d);
extern inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size);
extern inline struct ddsi_serdata *ddsi_serdata_from_ser_iov (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size);
extern inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash);
extern inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample);
extern inline struct ddsi_serdata *ddsi_serdata_to_topicless (const struct ddsi_serdata *d);

View file

@ -268,6 +268,51 @@ static struct ddsi_serdata_default *serdata_default_from_ser_common (const struc
}
}
static struct ddsi_serdata_default *serdata_default_from_ser_iov_common (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size)
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
/* FIXME: check whether this really is the correct maximum: offsets are relative
to the CDR header, but there are also some places that use a serdata as-if it
were a stream, and those use offsets (m_index) relative to the start of the
serdata */
if (size > UINT32_MAX - offsetof (struct ddsi_serdata_default, hdr))
return NULL;
assert (niov >= 1);
if (iov[0].iov_len < 4) /* CDR header */
return NULL;
struct ddsi_serdata_default *d = serdata_default_new_size (tp, kind, (uint32_t) size);
if (d == NULL)
return NULL;
memcpy (&d->hdr, iov[0].iov_base, sizeof (d->hdr));
assert (d->hdr.identifier == CDR_LE || d->hdr.identifier == CDR_BE);
serdata_default_append_blob (&d, 1, iov[0].iov_len - 4, (const char *) iov[0].iov_base + 4);
for (ddsrt_msg_iovlen_t i = 1; i < niov; i++)
serdata_default_append_blob (&d, 1, iov[i].iov_len, iov[i].iov_base);
const bool needs_bswap = (d->hdr.identifier != NATIVE_ENCODING);
d->hdr.identifier = NATIVE_ENCODING;
const uint32_t pad = ddsrt_fromBE2u (d->hdr.options) & 2;
if (d->pos < pad)
{
ddsi_serdata_unref (&d->c);
return NULL;
}
else if (!dds_stream_normalize (d->data, d->pos - pad, needs_bswap, tp, kind == SDK_KEY))
{
ddsi_serdata_unref (&d->c);
return NULL;
}
else
{
dds_istream_t is;
dds_istream_from_serdata_default (&is, d);
dds_stream_extract_keyhash (&is, &d->keyhash, tp, kind == SDK_KEY);
return d;
}
}
static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size)
{
struct ddsi_serdata_default *d;
@ -276,6 +321,14 @@ static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic
return fix_serdata_default (d, tpcmn->serdata_basehash);
}
static struct ddsi_serdata *serdata_default_from_ser_iov (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size)
{
struct ddsi_serdata_default *d;
if ((d = serdata_default_from_ser_iov_common (tpcmn, kind, niov, iov, size)) == NULL)
return NULL;
return fix_serdata_default (d, tpcmn->serdata_basehash);
}
static struct ddsi_serdata *serdata_default_from_ser_nokey (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size)
{
struct ddsi_serdata_default *d;
@ -284,6 +337,14 @@ static struct ddsi_serdata *serdata_default_from_ser_nokey (const struct ddsi_se
return fix_serdata_default_nokey (d, tpcmn->serdata_basehash);
}
static struct ddsi_serdata *serdata_default_from_ser_iov_nokey (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size)
{
struct ddsi_serdata_default *d;
if ((d = serdata_default_from_ser_iov_common (tpcmn, kind, niov, iov, size)) == NULL)
return NULL;
return fix_serdata_default_nokey (d, tpcmn->serdata_basehash);
}
static struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash)
{
/* FIXME: not quite sure this is correct, though a check against a specially hacked OpenSplice suggests it is */
@ -630,6 +691,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = {
.eqkey = serdata_default_eqkey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser,
.from_ser_iov = serdata_default_from_ser_iov,
.from_keyhash = ddsi_serdata_from_keyhash_cdr,
.from_sample = serdata_default_from_sample_cdr,
.to_ser = serdata_default_to_ser,
@ -646,6 +708,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey = {
.eqkey = serdata_default_eqkey_nokey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser_nokey,
.from_ser_iov = serdata_default_from_ser_iov_nokey,
.from_keyhash = ddsi_serdata_from_keyhash_cdr_nokey,
.from_sample = serdata_default_from_sample_cdr_nokey,
.to_ser = serdata_default_to_ser,
@ -662,6 +725,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_plist = {
.eqkey = serdata_default_eqkey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser,
.from_ser_iov = serdata_default_from_ser_iov,
.from_keyhash = 0,
.from_sample = serdata_default_from_sample_plist,
.to_ser = serdata_default_to_ser,
@ -678,6 +742,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = {
.eqkey = serdata_default_eqkey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser,
.from_ser_iov = serdata_default_from_ser_iov,
.from_keyhash = 0,
.from_sample = serdata_default_from_sample_rawcdr,
.to_ser = serdata_default_to_ser,

View file

@ -259,10 +259,28 @@ static void local_reader_ary_fini (struct local_reader_ary *x)
static void local_reader_ary_insert (struct local_reader_ary *x, struct reader *rd)
{
ddsrt_mutex_lock (&x->rdary_lock);
x->rdary = ddsrt_realloc (x->rdary, (x->n_readers + 2) * sizeof (*x->rdary));
if (x->n_readers <= 1 || rd->topic == x->rdary[x->n_readers - 1]->topic)
{
/* if the first or second reader, or if the topic is the same as that of
the last one in the list simply appending the new will maintain order */
x->rdary[x->n_readers] = rd;
}
else
{
uint32_t i;
for (i = 0; i < x->n_readers; i++)
if (x->rdary[i]->topic == rd->topic)
break;
if (i < x->n_readers)
{
/* shift any with the same topic plus whichever follow to make room */
memmove (&x->rdary[i + 1], &x->rdary[i], (x->n_readers - i) * sizeof (x->rdary[i]));
}
x->rdary[i] = rd;
}
x->rdary[x->n_readers + 1] = NULL;
x->n_readers++;
x->rdary = ddsrt_realloc (x->rdary, (x->n_readers + 1) * sizeof (*x->rdary));
x->rdary[x->n_readers - 1] = rd;
x->rdary[x->n_readers] = NULL;
ddsrt_mutex_unlock (&x->rdary_lock);
}
@ -271,13 +289,20 @@ static void local_reader_ary_remove (struct local_reader_ary *x, struct reader *
uint32_t i;
ddsrt_mutex_lock (&x->rdary_lock);
for (i = 0; i < x->n_readers; i++)
{
if (x->rdary[i] == rd)
break;
}
assert (i < x->n_readers);
/* if i == N-1 copy is a no-op */
x->rdary[i] = x->rdary[x->n_readers-1];
if (i + 1 < x->n_readers)
{
/* dropping the final one never requires any fixups; dropping one that has
the same topic as the last is as simple as moving the last one in the
removed one's location; else shift all following readers to keep it
grouped by topic */
if (rd->topic == x->rdary[x->n_readers - 1]->topic)
x->rdary[i] = x->rdary[x->n_readers - 1];
else
memmove (&x->rdary[i], &x->rdary[i + 1], (x->n_readers - i - 1) * sizeof (x->rdary[i]));
}
x->n_readers--;
x->rdary[x->n_readers] = NULL;
x->rdary = ddsrt_realloc (x->rdary, (x->n_readers + 1) * sizeof (*x->rdary));
@ -4537,7 +4562,6 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
pwr->evq = evq;
pwr->ddsi2direct_cb = 0;
pwr->ddsi2direct_cbarg = 0;
local_reader_ary_init (&pwr->rdary);
/* locking the entity prevents matching while the built-in topic hasn't been published yet */

View file

@ -45,6 +45,7 @@
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/q_receive.h"
#include "dds/ddsi/ddsi_rhc.h"
#include "dds/ddsi/ddsi_deliver_locally.h"
#include "dds/ddsi/q_transmit.h"
#include "dds/ddsi/q_globals.h"
@ -1733,12 +1734,30 @@ static struct ddsi_serdata *get_serdata (struct ddsi_sertopic const * const topi
return sd;
}
static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **tk1, struct q_globals *gv, const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags, const nn_plist_t *qos, const struct nn_rdata *fragchain, unsigned statusinfo, nn_wctime_t tstamp, struct ddsi_sertopic const * const topic)
struct remote_sourceinfo {
const struct nn_rsample_info *sampleinfo;
unsigned char data_smhdr_flags;
const nn_plist_t *qos;
const struct nn_rdata *fragchain;
unsigned statusinfo;
nn_wctime_t tstamp;
};
static struct ddsi_serdata *remote_make_sample (struct ddsi_tkmap_instance **tk, struct q_globals *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo)
{
/* hopefully the compiler figures out that these are just aliases and doesn't reload them
unnecessarily from memory */
const struct remote_sourceinfo * __restrict si = vsourceinfo;
const struct nn_rsample_info * __restrict sampleinfo = si->sampleinfo;
const struct nn_rdata * __restrict fragchain = si->fragchain;
const uint32_t statusinfo = si->statusinfo;
const unsigned char data_smhdr_flags = si->data_smhdr_flags;
const nn_wctime_t tstamp = si->tstamp;
const nn_plist_t * __restrict qos = si->qos;
const char *failmsg = NULL;
struct ddsi_serdata *sample = NULL;
if (statusinfo == 0)
if (si->statusinfo == 0)
{
/* normal write */
if (!(data_smhdr_flags & DATA_FLAG_DATAFLAG) || sampleinfo->size == 0)
@ -1752,7 +1771,7 @@ static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **t
"data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": write without proper payload (data_smhdr_flags 0x%x size %"PRIu32")\n",
sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1],
PGUID (guid), sampleinfo->seq,
data_smhdr_flags, sampleinfo->size);
si->data_smhdr_flags, sampleinfo->size);
return NULL;
}
sample = get_serdata (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp);
@ -1807,7 +1826,7 @@ static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **t
}
else
{
if ((*tk1 = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, sample)) == NULL)
if ((*tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, sample)) == NULL)
{
ddsi_serdata_unref (sample);
sample = NULL;
@ -1831,12 +1850,6 @@ static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **t
return sample;
}
static void free_sample_after_store (struct q_globals *gv, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk)
{
ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
ddsi_serdata_unref (sample);
}
unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr)
{
switch ((SubmessageKind_t) smhdr->submessageId)
@ -1858,28 +1871,51 @@ unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr)
}
}
static struct reader *proxy_writer_first_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it)
static struct reader *proxy_writer_first_in_sync_reader (struct entity_index *entity_index, struct entity_common *pwrcmn, ddsrt_avl_iter_t *it)
{
assert (pwrcmn->kind == EK_PROXY_WRITER);
struct proxy_writer *pwr = (struct proxy_writer *) pwrcmn;
struct pwr_rd_match *m;
struct reader *rd;
for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, it); m != NULL; m = ddsrt_avl_iter_next (it))
if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &m->rd_guid)) != NULL)
if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (entity_index, &m->rd_guid)) != NULL)
return rd;
return NULL;
}
static struct reader *proxy_writer_next_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it)
static struct reader *proxy_writer_next_in_sync_reader (struct entity_index *entity_index, ddsrt_avl_iter_t *it)
{
struct pwr_rd_match *m;
struct reader *rd;
for (m = ddsrt_avl_iter_next (it); m != NULL; m = ddsrt_avl_iter_next (it))
if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &m->rd_guid)) != NULL)
if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (entity_index, &m->rd_guid)) != NULL)
return rd;
return NULL;
}
static dds_return_t remote_on_delivery_failure_fastpath (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo)
{
(void) vsourceinfo;
ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock);
if (source_entity_locked)
ddsrt_mutex_unlock (&source_entity->lock);
dds_sleepfor (DDS_MSECS (10));
if (source_entity_locked)
ddsrt_mutex_lock (&source_entity->lock);
ddsrt_mutex_lock (&fastpath_rdary->rdary_lock);
return DDS_RETCODE_TRY_AGAIN;
}
static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const ddsi_guid_t *rdguid, int pwr_locked)
{
static const struct deliver_locally_ops deliver_locally_ops = {
.makesample = remote_make_sample,
.first_reader = proxy_writer_first_in_sync_reader,
.next_reader = proxy_writer_next_in_sync_reader,
.on_failure_fastpath = remote_on_delivery_failure_fastpath
};
struct receiver_state const * const rst = sampleinfo->rst;
struct q_globals * const gv = rst->gv;
struct proxy_writer * const pwr = sampleinfo->pwr;
@ -1953,108 +1989,22 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
struct ddsi_writer_info wrinfo;
ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos, statusinfo);
if (rdguid == NULL)
{
/* FIXME: Retry loop, for re-delivery of rejected reliable samples. Is a
temporary hack till throttling back of writer is implemented (with late
acknowledgement of sample and nack). */
retry:
ddsrt_mutex_lock (&pwr->rdary.rdary_lock);
if (pwr->rdary.fastpath_ok)
{
struct reader ** const rdary = pwr->rdary.rdary;
if (rdary[0])
{
struct ddsi_serdata *payload;
struct ddsi_tkmap_instance *tk;
if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rdary[0]->topic)) != NULL)
{
ETRACE (pwr, " => EVERYONE\n");
uint32_t i = 0;
do {
if (!ddsi_rhc_store (rdary[i]->rhc, &wrinfo, payload, tk))
{
if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
/* It is painful to drop the sample, but there is no guarantee that the readers
will still be there after unlocking; indeed, it is even possible that the
topic definition got replaced in the meantime. Fortunately, this is in
the midst of a FIXME for many other reasons. */
free_sample_after_store (gv, payload, tk);
dds_sleepfor (DDS_MSECS (10));
if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
goto retry;
}
} while (rdary[++i]);
free_sample_after_store (gv, payload, tk);
}
}
ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
}
struct remote_sourceinfo sourceinfo = {
.sampleinfo = sampleinfo,
.data_smhdr_flags = data_smhdr_flags,
.qos = &qos,
.fragchain = fragchain,
.statusinfo = statusinfo,
.tstamp = tstamp
};
if (rdguid)
(void) deliver_locally_one (gv, &pwr->e, pwr_locked != 0, rdguid, &wrinfo, &deliver_locally_ops, &sourceinfo);
else
{
/* When deleting, pwr is no longer accessible via the hash
tables, and consequently, a reader may be deleted without
it being possible to remove it from rdary. The primary
reason rdary exists is to avoid locking the proxy writer
but this is less of an issue when we are deleting it, so
we fall back to using the GUIDs so that we can deliver all
samples we received from it. As writer being deleted any
reliable samples that are rejected are simply discarded. */
ddsrt_avl_iter_t it;
struct reader *rd;
ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
if ((rd = proxy_writer_first_in_sync_reader (pwr, &it)) != NULL)
{
struct ddsi_serdata *payload;
struct ddsi_tkmap_instance *tk;
if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL)
{
ETRACE (pwr, " =>");
do {
ETRACE (pwr, " "PGUIDFMT, PGUID (rd->e.guid));
(void) ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk);
rd = proxy_writer_next_in_sync_reader (pwr, &it);
} while (rd != NULL);
free_sample_after_store (gv, payload, tk);
ETRACE (pwr, "\n");
}
}
if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
}
(void) deliver_locally_allinsync (gv, &pwr->e, pwr_locked != 0, &pwr->rdary, &wrinfo, &deliver_locally_ops, &sourceinfo);
ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1));
}
else
{
struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, rdguid);
if (rd != NULL)
{
struct ddsi_serdata *payload;
struct ddsi_tkmap_instance *tk;
if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL)
{
ETRACE (pwr, " =>"PGUIDFMT"\n", PGUID (*rdguid));
/* FIXME: why look up rd,pwr again? Their states remains valid while the thread stays
"awake" (although a delete can be initiated), and blocking like this is a stopgap
anyway -- quite possibly to abort once either is deleted */
while (!ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk))
{
if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
dds_sleepfor (DDS_MSECS (1));
if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
if (entidx_lookup_reader_guid (gv->entity_index, rdguid) == NULL ||
entidx_lookup_proxy_writer_guid (gv->entity_index, &pwr->e.guid) == NULL)
{
/* give up when reader or proxy writer no longer accessible */
break;
}
}
free_sample_after_store (gv, payload, tk);
}
}
}
nn_plist_fini (&qos);
return 0;
}