diff --git a/src/core/ddsc/src/dds_serdata_builtintopic.c b/src/core/ddsc/src/dds_serdata_builtintopic.c index f9d3122..7bdb481 100644 --- a/src/core/ddsc/src/dds_serdata_builtintopic.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -288,6 +288,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = { .eqkey = serdata_builtin_eqkey, .free = serdata_builtin_free, .from_ser = 0, + .from_ser_iov = 0, .from_keyhash = ddsi_serdata_builtin_from_keyhash, .from_sample = 0, .to_ser = serdata_builtin_to_ser, diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index 0f65cc6..29fc12c 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -25,6 +25,7 @@ #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_radmin.h" #include "dds/ddsi/q_globals.h" +#include "dds/ddsi/ddsi_deliver_locally.h" dds_return_t dds_write (dds_entity_t writer, const void *data) { @@ -71,80 +72,103 @@ dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t tim return ret; } -static dds_return_t try_store (struct ddsi_rhc *rhc, const struct ddsi_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms) +static struct reader *writer_first_in_sync_reader (struct entity_index *entity_index, struct entity_common *wrcmn, ddsrt_avl_iter_t *it) { - while (! ddsi_rhc_store (rhc, pwr_info, payload, tk)) + assert (wrcmn->kind == EK_WRITER); + struct writer *wr = (struct writer *) wrcmn; + struct wr_rd_match *m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, it); + return m ? entidx_lookup_reader_guid (entity_index, &m->rd_guid) : NULL; +} + +static struct reader *writer_next_in_sync_reader (struct entity_index *entity_index, ddsrt_avl_iter_t *it) +{ + struct wr_rd_match *m = ddsrt_avl_iter_next (it); + return m ? entidx_lookup_reader_guid (entity_index, &m->rd_guid) : NULL; +} + +struct local_sourceinfo { + const struct ddsi_sertopic *src_topic; + struct ddsi_serdata *src_payload; + struct ddsi_tkmap_instance *src_tk; + nn_mtime_t timeout; +}; + +static struct ddsi_serdata *local_make_sample (struct ddsi_tkmap_instance **tk, struct q_globals *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo) +{ + struct local_sourceinfo *si = vsourceinfo; + if (topic == si->src_topic) { - if (*max_block_ms > 0) + *tk = si->src_tk; + /* FIXME: see if this pair of refc increments can't be avoided + They're needed because free_sample_after_delivery will always be called, but + in the common case of a local writer and a single sertopic, make_sample doesn't + actually create a sample, and so free_sample_after_delivery doesn't actually + have to free anything */ + ddsi_tkmap_instance_ref (si->src_tk); + return ddsi_serdata_ref (si->src_payload); + } + else + { + /* ouch ... convert a serdata from one sertopic to another ... */ + ddsrt_iovec_t iov; + uint32_t size = ddsi_serdata_size (si->src_payload); + (void) ddsi_serdata_to_ser_ref (si->src_payload, 0, size, &iov); + struct ddsi_serdata *d = ddsi_serdata_from_ser_iov (topic, si->src_payload->kind, 1, &iov, size); + ddsi_serdata_to_ser_unref (si->src_payload, &iov); + if (d) { - dds_sleepfor (DDS_HEADBANG_TIMEOUT); - *max_block_ms -= DDS_HEADBANG_TIMEOUT; + d->statusinfo = si->src_payload->statusinfo; + d->timestamp = si->src_payload->timestamp; + *tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, d); } else { - return DDS_RETCODE_TIMEOUT; + DDS_CWARNING (&gv->logconfig, "local: deserialization %s/%s failed in topic type conversion\n", topic->name, topic->type_name); } + return d; + } +} + +static dds_return_t local_on_delivery_failure_fastpath (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo) +{ + (void) fastpath_rdary; + (void) source_entity_locked; + assert (source_entity->kind == EK_WRITER); + struct writer *wr = (struct writer *) source_entity; + struct local_sourceinfo *si = vsourceinfo; + nn_mtime_t tnow = now_mt (); + if (si->timeout.v == 0) + si->timeout = add_duration_to_mtime (tnow, wr->xqos->reliability.max_blocking_time); + if (tnow.v >= si->timeout.v) + return DDS_RETCODE_TIMEOUT; + else + { + dds_sleepfor (DDS_HEADBANG_TIMEOUT); + return DDS_RETCODE_OK; } - return DDS_RETCODE_OK; } static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk) { - dds_return_t ret = DDS_RETCODE_OK; - ddsrt_mutex_lock (&wr->rdary.rdary_lock); - if (wr->rdary.fastpath_ok) - { - struct reader ** const rdary = wr->rdary.rdary; - if (rdary[0]) - { - dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time; - struct ddsi_writer_info pwr_info; - ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos, payload->statusinfo); - for (uint32_t i = 0; rdary[i]; i++) { - DDS_CTRACE (&wr->e.gv->logconfig, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid)); - if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK) - break; - } - } - ddsrt_mutex_unlock (&wr->rdary.rdary_lock); - } - else - { - /* When deleting, pwr is no longer accessible via the hash - tables, and consequently, a reader may be deleted without - it being possible to remove it from rdary. The primary - reason rdary exists is to avoid locking the proxy writer - but this is less of an issue when we are deleting it, so - we fall back to using the GUIDs so that we can deliver all - samples we received from it. As writer being deleted any - reliable samples that are rejected are simply discarded. */ - ddsrt_avl_iter_t it; - struct pwr_rd_match *m; - struct ddsi_writer_info wrinfo; - const struct entity_index *gh = wr->e.gv->entity_index; - dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time; - ddsrt_mutex_unlock (&wr->rdary.rdary_lock); - ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo); - ddsrt_mutex_lock (&wr->e.lock); - for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) - { - struct reader *rd; - if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL) - { - DDS_CTRACE (&wr->e.gv->logconfig, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid)); - /* Copied the return value ignore from DDSI deliver_user_data () function. */ - if ((ret = try_store (rd->rhc, &wrinfo, payload, tk, &max_block_ms)) != DDS_RETCODE_OK) - break; - } - } - ddsrt_mutex_unlock (&wr->e.lock); - } - - if (ret == DDS_RETCODE_TIMEOUT) - { + static const struct deliver_locally_ops deliver_locally_ops = { + .makesample = local_make_sample, + .first_reader = writer_first_in_sync_reader, + .next_reader = writer_next_in_sync_reader, + .on_failure_fastpath = local_on_delivery_failure_fastpath + }; + struct local_sourceinfo sourceinfo = { + .src_topic = wr->topic, + .src_payload = payload, + .src_tk = tk, + .timeout = { 0 }, + }; + dds_return_t rc; + struct ddsi_writer_info wrinfo; + ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo); + rc = deliver_locally_allinsync (wr->e.gv, &wr->e, false, &wr->rdary, &wrinfo, &deliver_locally_ops, &sourceinfo); + if (rc == DDS_RETCODE_TIMEOUT) DDS_CERROR (&wr->e.gv->logconfig, "The writer could not deliver data on time, probably due to a local reader resources being full\n"); - } - return ret; + return rc; } dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action) diff --git a/src/core/ddsc/tests/CMakeLists.txt b/src/core/ddsc/tests/CMakeLists.txt index 3f902dd..bdd239e 100644 --- a/src/core/ddsc/tests/CMakeLists.txt +++ b/src/core/ddsc/tests/CMakeLists.txt @@ -30,6 +30,7 @@ set(ddsc_test_sources "instance_get_key.c" "listener.c" "liveliness.c" + "multi_sertopic.c" "participant.c" "publisher.c" "qos.c" diff --git a/src/core/ddsc/tests/multi_sertopic.c b/src/core/ddsc/tests/multi_sertopic.c new file mode 100644 index 0000000..9501b27 --- /dev/null +++ b/src/core/ddsc/tests/multi_sertopic.c @@ -0,0 +1,609 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include + +#include "dds/dds.h" +#include "CUnit/Theory.h" +#include "Space.h" +#include "config_env.h" + +#include "dds/version.h" +#include "dds__entity.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/ddsi_serdata.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsrt/cdtors.h" +#include "dds/ddsrt/misc.h" +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/threads.h" +#include "dds/ddsrt/environ.h" +#include "dds/ddsrt/atomics.h" +#include "dds/ddsrt/time.h" + +#define DDS_DOMAINID_PUB 0 +#define DDS_DOMAINID_SUB 1 +#define DDS_CONFIG_NO_PORT_GAIN "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}0" +#define DDS_CONFIG_NO_PORT_GAIN_LOG "${CYCLONEDDS_URI}${CYCLONEDDS_URI:+,}cyclonedds_multi_sertopic_tests.${CYCLONEDDS_DOMAIN_ID}.${CYCLONEDDS_PID}.logfinest0" + +/* IDL preprocessing is not really friendly towards creating multiple descriptors + for the same type name with different definitions, so we do it by hand. */ +struct uint32_seq { + uint32_t _maximum; + uint32_t _length; + uint32_t *_buffer; + bool _release; +}; + +struct two_uint32 { + uint32_t v[2]; +}; + +struct two_uint32_seq { + uint32_t _maximum; + uint32_t _length; + struct two_uint32 *_buffer; + bool _release; +}; + +struct type_seq { + struct uint32_seq x; +}; + +struct type_ary { + uint32_t x[4]; +}; + +struct type_uni { + uint32_t _d; + union + { + struct two_uint32_seq a; + uint32_t b[4]; + } _u; +}; + +static const dds_topic_descriptor_t type_seq_desc = +{ + .m_size = sizeof (struct type_seq), + .m_align = sizeof (void *), + .m_flagset = DDS_TOPIC_NO_OPTIMIZE, + .m_nkeys = 0, + .m_typename = "multi_sertopic_type", + .m_keys = NULL, + .m_nops = 2, + .m_ops = (const uint32_t[]) { + DDS_OP_ADR | DDS_OP_TYPE_SEQ | DDS_OP_SUBTYPE_4BY, offsetof (struct type_seq, x), + DDS_OP_RTS + }, + .m_meta = "" /* this is on its way out anyway */ +}; + +static const dds_topic_descriptor_t type_ary_desc = +{ + .m_size = sizeof (struct type_ary), + .m_align = 4u, + .m_flagset = DDS_TOPIC_NO_OPTIMIZE, + .m_nkeys = 0, + .m_typename = "multi_sertopic_type", + .m_keys = NULL, + .m_nops = 2, + .m_ops = (const uint32_t[]) { + DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_4BY, offsetof (struct type_ary, x), 4, + DDS_OP_RTS + }, + .m_meta = "" /* this is on its way out anyway */ +}; + +static const dds_topic_descriptor_t type_uni_desc = +{ + .m_size = sizeof (struct type_uni), + .m_align = sizeof (void *), + .m_flagset = DDS_TOPIC_NO_OPTIMIZE | DDS_TOPIC_CONTAINS_UNION, + .m_nkeys = 0, + .m_typename = "multi_sertopic_type", + .m_keys = NULL, + .m_nops = 8, + .m_ops = (const uint32_t[]) { + DDS_OP_ADR | DDS_OP_TYPE_UNI | DDS_OP_SUBTYPE_4BY | DDS_OP_FLAG_DEF, offsetof (struct type_uni, _d), 2u, (23u << 16) + 4u, + DDS_OP_JEQ | DDS_OP_TYPE_SEQ | 6, 3, offsetof (struct type_uni, _u.a), + DDS_OP_JEQ | DDS_OP_TYPE_ARR | 12, 0, offsetof (struct type_uni, _u.b), + DDS_OP_ADR | DDS_OP_TYPE_SEQ | DDS_OP_SUBTYPE_STU, 0u, + sizeof (struct two_uint32), (8u << 16u) + 4u, + DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_4BY, offsetof (struct two_uint32, v), 2, + DDS_OP_RTS, + DDS_OP_RTS, + DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_4BY, 0u, 4, + DDS_OP_RTS, + DDS_OP_RTS + + }, + .m_meta = "" /* this is on its way out anyway */ +}; + +/* The slow delivery path has a switchover at 4 sertopics (well, today it has ...) so it is better to + to test with > 4 different sertopics. That path (again, today) iterates over GUIDs in increasing + order, and as all readers are created in the participant and the entity ids are strictly + monotonically increasing for the first ~ 16M entities (again, today), creating additional + readers for these topics at the end means that "ary2" is the one that ends up in > 4 case. + Calling takecdr */ +static const dds_topic_descriptor_t type_ary1_desc = +{ + .m_size = sizeof (struct type_ary), + .m_align = 1u, + .m_flagset = DDS_TOPIC_NO_OPTIMIZE, + .m_nkeys = 0, + .m_typename = "multi_sertopic_type", + .m_keys = NULL, + .m_nops = 2, + .m_ops = (const uint32_t[]) { + DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_1BY, offsetof (struct type_ary, x), 16, + DDS_OP_RTS + }, + .m_meta = "" /* this is on its way out anyway */ +}; + +static const dds_topic_descriptor_t type_ary2_desc = +{ + .m_size = sizeof (struct type_ary), + .m_align = 2u, + .m_flagset = DDS_TOPIC_NO_OPTIMIZE, + .m_nkeys = 0, + .m_typename = "multi_sertopic_type", + .m_keys = NULL, + .m_nops = 2, + .m_ops = (const uint32_t[]) { + DDS_OP_ADR | DDS_OP_TYPE_ARR | DDS_OP_SUBTYPE_2BY, offsetof (struct type_ary, x), 8, + DDS_OP_RTS + }, + .m_meta = "" /* this is on its way out anyway */ +}; + +static uint32_t g_topic_nr = 0; +static dds_entity_t g_pub_domain = 0; +static dds_entity_t g_pub_participant = 0; +static dds_entity_t g_pub_publisher = 0; + +static dds_entity_t g_sub_domain = 0; +static dds_entity_t g_sub_participant = 0; +static dds_entity_t g_sub_subscriber = 0; + +static char *create_topic_name (const char *prefix, uint32_t nr, char *name, size_t size) +{ + /* Get unique g_topic name. */ + ddsrt_pid_t pid = ddsrt_getpid(); + ddsrt_tid_t tid = ddsrt_gettid(); + (void) snprintf (name, size, "%s%d_pid%" PRIdPID "_tid%" PRIdTID "", prefix, nr, pid, tid); + return name; +} + +static void multi_sertopic_init (void) +{ + /* Domains for pub and sub use a different domain id, but the portgain setting + * in configuration is 0, so that both domains will map to the same port number. + * This allows to create two domains in a single test process. */ + char *conf_pub = ddsrt_expand_envvars (DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_PUB); + char *conf_sub = ddsrt_expand_envvars (DDS_CONFIG_NO_PORT_GAIN, DDS_DOMAINID_SUB); + g_pub_domain = dds_create_domain (DDS_DOMAINID_PUB, conf_pub); + g_sub_domain = dds_create_domain (DDS_DOMAINID_SUB, conf_sub); + dds_free (conf_pub); + dds_free (conf_sub); + + g_pub_participant = dds_create_participant(DDS_DOMAINID_PUB, NULL, NULL); + CU_ASSERT_FATAL (g_pub_participant > 0); + g_sub_participant = dds_create_participant(DDS_DOMAINID_SUB, NULL, NULL); + CU_ASSERT_FATAL (g_sub_participant > 0); + + g_pub_publisher = dds_create_publisher(g_pub_participant, NULL, NULL); + CU_ASSERT_FATAL (g_pub_publisher > 0); + g_sub_subscriber = dds_create_subscriber(g_sub_participant, NULL, NULL); + CU_ASSERT_FATAL (g_sub_subscriber > 0); +} + +static void multi_sertopic_fini (void) +{ + dds_delete (g_sub_subscriber); + dds_delete (g_pub_publisher); + dds_delete (g_sub_participant); + dds_delete (g_pub_participant); + dds_delete (g_sub_domain); + dds_delete (g_pub_domain); +} + +static bool get_and_check_writer_status (size_t nwr, const dds_entity_t *wrs, size_t nrd) +{ + dds_return_t rc; + struct dds_publication_matched_status x; + for (size_t i = 0; i < nwr; i++) + { + rc = dds_get_publication_matched_status (wrs[i], &x); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + if (x.current_count != nrd) + return false; + } + return true; +} + +static bool get_and_check_reader_status (size_t nrd, const dds_entity_t *rds, size_t nwr) +{ + dds_return_t rc; + struct dds_subscription_matched_status x; + for (size_t i = 0; i < nrd; i++) + { + rc = dds_get_subscription_matched_status (rds[i], &x); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + if (x.current_count != nwr) + return false; + } + return true; +} + +static void waitfor_or_reset_fastpath (dds_entity_t rdhandle, bool fastpath, size_t nwr) +{ + dds_return_t rc; + struct dds_entity *x; + + rc = dds_entity_pin (rdhandle, &x); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + CU_ASSERT_FATAL (dds_entity_kind (x) == DDS_KIND_READER); + + struct reader * const rd = ((struct dds_reader *) x)->m_rd; + struct rd_pwr_match *m; + ddsi_guid_t cursor; + size_t wrcount = 0; + thread_state_awake (lookup_thread_state (), rd->e.gv); + ddsrt_mutex_lock (&rd->e.lock); + + memset (&cursor, 0, sizeof (cursor)); + while ((m = ddsrt_avl_lookup_succ (&rd_writers_treedef, &rd->writers, &cursor)) != NULL) + { + cursor = m->pwr_guid; + ddsrt_mutex_unlock (&rd->e.lock); + struct proxy_writer * const pwr = entidx_lookup_proxy_writer_guid (rd->e.gv->entity_index, &cursor); + ddsrt_mutex_lock (&pwr->rdary.rdary_lock); + if (!fastpath) + pwr->rdary.fastpath_ok = false; + else + { + while (!pwr->rdary.fastpath_ok) + { + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); + dds_sleepfor (DDS_MSECS (10)); + ddsrt_mutex_lock (&pwr->rdary.rdary_lock); + } + } + wrcount++; + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); + ddsrt_mutex_lock (&rd->e.lock); + } + + memset (&cursor, 0, sizeof (cursor)); + while ((m = ddsrt_avl_lookup_succ (&rd_local_writers_treedef, &rd->local_writers, &cursor)) != NULL) + { + cursor = m->pwr_guid; + ddsrt_mutex_unlock (&rd->e.lock); + struct writer * const wr = entidx_lookup_writer_guid (rd->e.gv->entity_index, &cursor); + ddsrt_mutex_lock (&wr->rdary.rdary_lock); + if (!fastpath) + wr->rdary.fastpath_ok = fastpath; + else + { + while (!wr->rdary.fastpath_ok) + { + ddsrt_mutex_unlock (&wr->rdary.rdary_lock); + dds_sleepfor (DDS_MSECS (10)); + ddsrt_mutex_lock (&wr->rdary.rdary_lock); + } + } + wrcount++; + ddsrt_mutex_unlock (&wr->rdary.rdary_lock); + ddsrt_mutex_lock (&rd->e.lock); + } + ddsrt_mutex_unlock (&rd->e.lock); + thread_state_asleep (lookup_thread_state ()); + dds_entity_unpin (x); + + CU_ASSERT_FATAL (wrcount == nwr); +} + +static struct ddsi_sertopic *get_sertopic_from_reader (dds_entity_t reader) +{ + /* not refcounting the sertopic: so this presumes it is kept alive for other reasons */ + dds_return_t rc; + struct dds_entity *x; + struct dds_reader *rd; + struct ddsi_sertopic *sertopic; + rc = dds_entity_pin (reader, &x); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + CU_ASSERT_FATAL (dds_entity_kind (x) == DDS_KIND_READER); + rd = (struct dds_reader *) x; + sertopic = rd->m_topic->m_stopic; + dds_entity_unpin (x); + return sertopic; +} + +static void logsink (void *arg, const dds_log_data_t *msg) +{ + ddsrt_atomic_uint32_t *deser_fail = arg; + fputs (msg->message - msg->hdrsize, stderr); + if (strstr (msg->message, "deserialization") && strstr (msg->message, "failed")) + ddsrt_atomic_inc32 (deser_fail); +} + +static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, bool fastpath) +{ +#define SEQ_IDX 0 +#define ARY_IDX 1 +#define UNI_IDX 2 + char name[100]; + static const dds_topic_descriptor_t *descs[] = { + &type_seq_desc, &type_ary_desc, &type_uni_desc, + &type_ary1_desc, &type_ary2_desc + }; + dds_entity_t pub_topics[3], writers[3]; + dds_entity_t sub_topics[5]; + dds_entity_t readers[15]; + dds_entity_t waitset; + dds_qos_t *qos; + dds_return_t rc; + + printf ("multi_sertopic: %s %s\n", (pp_pub == pp_sub) ? "local" : "remote", fastpath ? "fastpath" : "slowpath"); + + waitset = dds_create_waitset (DDS_CYCLONEDDS_HANDLE); + CU_ASSERT_FATAL (waitset > 0); + + qos = dds_create_qos (); + CU_ASSERT_FATAL (qos != NULL); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY); + dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + + create_topic_name ("ddsc_multi_sertopic_lease_duration_zero", g_topic_nr++, name, sizeof name); + + for (size_t i = 0; i < sizeof (pub_topics) / sizeof (pub_topics[0]); i++) + { + pub_topics[i] = dds_create_topic (pp_pub, descs[i], name, qos, NULL); + CU_ASSERT_FATAL (pub_topics[i] > 0); + } + for (size_t i = 0; i < sizeof (writers) / sizeof (writers[0]); i++) + { + writers[i] = dds_create_writer (pp_pub, pub_topics[i], qos, NULL); + CU_ASSERT_FATAL (writers[i] > 0); + } + for (size_t i = 0; i < sizeof (sub_topics) / sizeof (sub_topics[0]); i++) + { + sub_topics[i] = dds_create_topic (pp_sub, descs[i], name, qos, NULL); + CU_ASSERT_FATAL (sub_topics[i] > 0); + } + DDSRT_STATIC_ASSERT (sizeof (readers) >= sizeof (sub_topics)); + DDSRT_STATIC_ASSERT ((sizeof (readers) % sizeof (sub_topics)) == 0); + for (size_t i = 0; i < sizeof (sub_topics) / sizeof (sub_topics[0]); i++) + { + readers[i] = dds_create_reader (pp_sub, sub_topics[i], qos, NULL); + CU_ASSERT_FATAL (readers[i] > 0); + } + for (size_t i = sizeof (sub_topics) / sizeof (sub_topics[0]); i < sizeof (readers) / sizeof (readers[0]); i++) + { + const size_t nrd = sizeof (readers) / sizeof (readers[0]); + const size_t ntp = sizeof (sub_topics) / sizeof (sub_topics[0]); + readers[i] = dds_create_reader (pp_sub, sub_topics[(i - ntp) / (nrd / ntp - 1)], qos, NULL); + CU_ASSERT_FATAL (readers[i] > 0); + } + + dds_delete_qos (qos); + + /* wait for discovery to complete */ + for (size_t i = 0; i < sizeof (writers) / sizeof (writers[0]); i++) + { + rc = dds_set_status_mask (writers[i], DDS_PUBLICATION_MATCHED_STATUS); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + rc = dds_waitset_attach (waitset, writers[i], -(dds_attach_t)i - 1); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + } + for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++) + { + rc = dds_set_status_mask (readers[i], DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + rc = dds_waitset_attach (waitset, readers[i], (dds_attach_t)i); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + } + + printf ("wait for discovery, fastpath_ok; delete & recreate readers\n"); + while (!(get_and_check_writer_status (sizeof (writers) / sizeof (writers[0]), writers, sizeof (readers) / sizeof (readers[0])) && + get_and_check_reader_status (sizeof (readers) / sizeof (readers[0]), readers, sizeof (writers) / sizeof (writers[0])))) + { + rc = dds_waitset_wait (waitset, NULL, 0, DDS_SECS(5)); + CU_ASSERT_FATAL (rc >= 1); + } + + /* we want to check both the fast path and the slow path ... so first wait + for it to be set on all (proxy) writers, then possibly reset it */ + for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++) + waitfor_or_reset_fastpath (readers[i], true, sizeof (writers) / sizeof (writers[0])); + if (!fastpath) + { + printf ("clear fastpath_ok\n"); + for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++) + waitfor_or_reset_fastpath (readers[i], false, sizeof (writers) / sizeof (writers[0])); + } + + /* check the log output for deserialization failures */ + ddsrt_atomic_uint32_t deser_fail = DDSRT_ATOMIC_UINT32_INIT (0); + dds_set_log_sink (logsink, &deser_fail); + + /* Write one of each type: all of these samples result in the same serialised + form but interpreting the memory layout for type X as-if it were of type Y + wreaks havoc. */ + { + struct type_seq s = { + .x = { + ._length = 3, ._maximum = 3, ._release = false, ._buffer = (uint32_t[]) { 1, 4, 2 } + } + }; + struct type_ary a = { + .x = { 3, 1, 4, 2 } + }; + struct type_uni u = { + ._d = 3, + ._u = { .a = { + ._length = 1, ._maximum = 1, ._release = false, ._buffer = (struct two_uint32[]) { { { 4, 2 } } } + } } + }; + printf ("writing ...\n"); + rc = dds_write_ts (writers[SEQ_IDX], &s, 1); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + rc = dds_write_ts (writers[ARY_IDX], &a, 2); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + rc = dds_write_ts (writers[UNI_IDX], &u, 3); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + + /* Also write a sample that can't be deserialised by the other types */ + struct type_seq s1 = { + .x = { + ._length = 1, ._maximum = 1, ._release = false, ._buffer = (uint32_t[]) { 1 } + } + }; + rc = dds_write_ts (writers[SEQ_IDX], &s1, 4); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + } + + /* All readers should have received three samples, and those that are of type seq + should have received one extra (whereas the others should cause deserialization + failure warnings) */ + printf ("reading\n"); + const size_t nexp = ((sizeof (writers) / sizeof (writers[0])) * + (sizeof (readers) / sizeof (readers[0])) + + ((sizeof (readers) / sizeof (readers[0])) / (sizeof (sub_topics) / sizeof (sub_topics[0])))); + /* expecting exactly as many deserialization failures as there are topics other than seq */ + const size_t nexp_fail = sizeof (sub_topics) / sizeof (sub_topics[0]) - 1; + uint32_t nseen = 0; + while (nseen < nexp) + { + dds_sample_info_t si; + + rc = dds_waitset_wait (waitset, NULL, 0, DDS_SECS (5)); + CU_ASSERT_FATAL (rc >= 1); + + { + struct type_seq s = { .x = { 0 } }; + void *raws[] = { &s }; + while (dds_take (readers[SEQ_IDX], raws, &si, 1, 1) == 1) + { + if (!si.valid_data) + continue; + printf ("recv: seq %"PRId64"\n", si.source_timestamp); + if (si.source_timestamp == 4) + { + CU_ASSERT_FATAL (s.x._length == 1); + CU_ASSERT_FATAL (s.x._buffer[0] == 1); + } + else + { + CU_ASSERT_FATAL (si.source_timestamp >= 1 && si.source_timestamp <= 3); + CU_ASSERT_FATAL (s.x._length == 3); + CU_ASSERT_FATAL (s.x._buffer[0] == 1); + CU_ASSERT_FATAL (s.x._buffer[1] == 4); + CU_ASSERT_FATAL (s.x._buffer[2] == 2); + } + nseen++; + } + dds_free (s.x._buffer); + } + + { + struct type_ary a; + void *rawa[] = { &a }; + while (dds_take (readers[ARY_IDX], rawa, &si, 1, 1) == 1) + { + if (!si.valid_data) + continue; + printf ("recv: ary %"PRId64"\n", si.source_timestamp); + CU_ASSERT_FATAL (si.source_timestamp >= 1 && si.source_timestamp <= 3); + CU_ASSERT_FATAL (a.x[0] == 3); + CU_ASSERT_FATAL (a.x[1] == 1); + CU_ASSERT_FATAL (a.x[2] == 4); + CU_ASSERT_FATAL (a.x[3] == 2); + nseen++; + } + } + + { + struct type_uni u = { ._u.a = { 0 } }; + void *rawu[] = { &u }; + while (dds_take (readers[UNI_IDX], rawu, &si, 1, 1) == 1) + { + if (!si.valid_data) + continue; + printf ("recv: uni %"PRId64"\n", si.source_timestamp); + CU_ASSERT_FATAL (si.source_timestamp >= 1 && si.source_timestamp <= 3); + CU_ASSERT_FATAL (u._d == 3); + CU_ASSERT_FATAL (u._u.a._length == 1); + assert (u._u.a._buffer != NULL); /* for Clang static analyzer */ + CU_ASSERT_FATAL (u._u.a._buffer[0].v[0] == 4); + CU_ASSERT_FATAL (u._u.a._buffer[0].v[1] == 2); + dds_free (u._u.a._buffer); + u._u.a._buffer = NULL; + nseen++; + } + } + + DDSRT_STATIC_ASSERT (((1u << SEQ_IDX) | (1u << ARY_IDX) | (1u << UNI_IDX)) == 7); + for (size_t i = 3; i < sizeof (readers) / sizeof (readers[0]); i++) + { + struct ddsi_serdata *sample; + while (dds_takecdr (readers[i], &sample, 1, &si, DDS_ANY_STATE) == 1) + { + if (!si.valid_data) + continue; + printf ("recv: reader %zu %"PRId64"\n", i, si.source_timestamp); + CU_ASSERT_FATAL (sample->topic == get_sertopic_from_reader (readers[i])); + ddsi_serdata_unref (sample); + nseen++; + } + } + } + CU_ASSERT_FATAL (nseen == nexp); + + /* data from remote writers can cause a deserialization failure after all + expected samples have been seen (becasue it is written last); so wait + for them */ + while (ddsrt_atomic_ld32 (&deser_fail) < nexp_fail) + dds_sleepfor (DDS_MSECS (10)); + CU_ASSERT_FATAL (ddsrt_atomic_ld32 (&deser_fail) == nexp_fail); + + /* deleting the waitset is important: it is bound to the library rather than to + a domain and consequently won't be deleted simply because all domains are */ + rc = dds_delete (waitset); + + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + dds_set_log_sink (0, NULL); +} + +CU_Test(ddsc_multi_sertopic, local, .init = multi_sertopic_init, .fini = multi_sertopic_fini) +{ + ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, true); +} + +CU_Test(ddsc_multi_sertopic, remote, .init = multi_sertopic_init, .fini = multi_sertopic_fini) +{ + ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, true); +} + +CU_Test(ddsc_multi_sertopic, local_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini) +{ + ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, false); +} + +CU_Test(ddsc_multi_sertopic, remote_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini) +{ + ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, false); +} diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index 464f39b..01c566d 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -31,6 +31,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" ddsi_pmd.c ddsi_entity_index.c ddsi_deadline.c + ddsi_deliver_locally.c q_addrset.c q_bitset_inlines.c q_bswap.c @@ -92,6 +93,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" ddsi_guid.h ddsi_entity_index.h ddsi_deadline.h + ddsi_deliver_locally.h q_addrset.h q_bitset.h q_bswap.h diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_deliver_locally.h b/src/core/ddsi/include/dds/ddsi/ddsi_deliver_locally.h new file mode 100644 index 0000000..5ecec40 --- /dev/null +++ b/src/core/ddsi/include/dds/ddsi/ddsi_deliver_locally.h @@ -0,0 +1,62 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDSI_DELIVER_LOCALLY_H +#define DDSI_DELIVER_LOCALLY_H + +#include +#include + +#include "dds/export.h" +#include "dds/ddsrt/retcode.h" +#include "dds/ddsrt/avl.h" +#include "dds/ddsi/ddsi_guid.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +struct q_globals; +struct ddsi_tkmap_instance; +struct ddsi_sertopic; +struct ddsi_serdata; +struct entity_index; +struct reader; +struct entity_common; +struct ddsi_writer_info; +struct local_reader_ary; + +typedef struct ddsi_serdata * (*deliver_locally_makesample_t) (struct ddsi_tkmap_instance **tk, struct q_globals *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo); +typedef struct reader * (*deliver_locally_first_reader_t) (struct entity_index *entity_index, struct entity_common *source_entity, ddsrt_avl_iter_t *it); +typedef struct reader * (*deliver_locally_next_reader_t) (struct entity_index *entity_index, ddsrt_avl_iter_t *it); + +/** return: + - DDS_RETCODE_OK to try again immediately + - DDS_RETCODE_TRY_AGAIN to complete restart the operation later + - anything else: error to be returned from deliver_locally_xxx */ +typedef dds_return_t (*deliver_locally_on_failure_fastpath_t) (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo); + +struct deliver_locally_ops { + deliver_locally_makesample_t makesample; + deliver_locally_first_reader_t first_reader; + deliver_locally_next_reader_t next_reader; + deliver_locally_on_failure_fastpath_t on_failure_fastpath; +}; + +dds_return_t deliver_locally_one (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, const ddsi_guid_t *rdguid, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo); + +dds_return_t deliver_locally_allinsync (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo); + +#if defined (__cplusplus) +} +#endif + +#endif /* DDSI_DELIVER_LOCALLY_H */ diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h index 0074141..ceb5112 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h @@ -61,6 +61,9 @@ typedef void (*ddsi_serdata_free_t) (struct ddsi_serdata *d); - FIXME: get the encoding header out of the serialised data */ typedef struct ddsi_serdata * (*ddsi_serdata_from_ser_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size); +/* Exactly like ddsi_serdata_from_ser_t, but with the data in an iovec and guaranteed absence of overlap */ +typedef struct ddsi_serdata * (*ddsi_serdata_from_ser_iov_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size); + /* Construct a serdata from a keyhash (an SDK_KEY by definition) */ typedef struct ddsi_serdata * (*ddsi_serdata_from_keyhash_t) (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash); @@ -138,6 +141,7 @@ struct ddsi_serdata_ops { ddsi_serdata_eqkey_t eqkey; ddsi_serdata_size_t get_size; ddsi_serdata_from_ser_t from_ser; + ddsi_serdata_from_ser_iov_t from_ser_iov; ddsi_serdata_from_keyhash_t from_keyhash; ddsi_serdata_from_sample_t from_sample; ddsi_serdata_to_ser_t to_ser; @@ -173,6 +177,10 @@ DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_ return topic->serdata_ops->from_ser (topic, kind, fragchain, size); } +DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_ser_iov (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size) { + return topic->serdata_ops->from_ser_iov (topic, kind, niov, iov, size); +} + DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash) { return topic->serdata_ops->from_keyhash (topic, keyhash); } diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 34ab9f8..f5d6dde 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -185,7 +185,7 @@ struct local_reader_ary { unsigned valid: 1; /* always true until (proxy-)writer is being deleted; !valid => !fastpath_ok */ unsigned fastpath_ok: 1; /* if not ok, fall back to using GUIDs (gives access to the reader-writer match data for handling readers that bumped into resource limits, hence can flip-flop, unlike "valid") */ uint32_t n_readers; - struct reader **rdary; /* for efficient delivery, null-pointer terminated */ + struct reader **rdary; /* for efficient delivery, null-pointer terminated, grouped by topic */ }; struct avail_entityid_set { @@ -427,12 +427,12 @@ struct proxy_reader { ddsrt_avl_tree_t writers; /* matching LOCAL writers */ }; -extern const ddsrt_avl_treedef_t wr_readers_treedef; -extern const ddsrt_avl_treedef_t wr_local_readers_treedef; -extern const ddsrt_avl_treedef_t rd_writers_treedef; -extern const ddsrt_avl_treedef_t rd_local_writers_treedef; -extern const ddsrt_avl_treedef_t pwr_readers_treedef; -extern const ddsrt_avl_treedef_t prd_writers_treedef; +DDS_EXPORT extern const ddsrt_avl_treedef_t wr_readers_treedef; +DDS_EXPORT extern const ddsrt_avl_treedef_t wr_local_readers_treedef; +DDS_EXPORT extern const ddsrt_avl_treedef_t rd_writers_treedef; +DDS_EXPORT extern const ddsrt_avl_treedef_t rd_local_writers_treedef; +DDS_EXPORT extern const ddsrt_avl_treedef_t pwr_readers_treedef; +DDS_EXPORT extern const ddsrt_avl_treedef_t prd_writers_treedef; extern const ddsrt_avl_treedef_t deleted_participants_treedef; #define DPG_LOCAL 1 diff --git a/src/core/ddsi/src/ddsi_deliver_locally.c b/src/core/ddsi/src/ddsi_deliver_locally.c new file mode 100644 index 0000000..bea4270 --- /dev/null +++ b/src/core/ddsi/src/ddsi_deliver_locally.c @@ -0,0 +1,269 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include + +#include "dds/ddsrt/log.h" +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/avl.h" + +#include "dds/ddsi/ddsi_deliver_locally.h" +#include "dds/ddsi/ddsi_sertopic.h" +#include "dds/ddsi/ddsi_serdata.h" +#include "dds/ddsi/ddsi_tkmap.h" +#include "dds/ddsi/ddsi_rhc.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/q_globals.h" +#include "dds/ddsi/q_entity.h" + +#define TOPIC_SAMPLE_CACHE_SIZE 4 + +struct ddsi_sertopic; +struct ddsi_serdata; +struct ddsi_tkmap_instance; + +struct topic_sample_cache_entry { + struct ddsi_serdata *sample; + struct ddsi_tkmap_instance *tk; +}; + +struct topic_sample_cache_large_entry { + ddsrt_avl_node_t avlnode; + const struct ddsi_sertopic *topic; + struct ddsi_serdata *sample; + struct ddsi_tkmap_instance *tk; +}; + +struct topic_sample_cache { + uint32_t n; + const struct ddsi_sertopic *topics[TOPIC_SAMPLE_CACHE_SIZE]; + struct topic_sample_cache_entry samples[TOPIC_SAMPLE_CACHE_SIZE]; + ddsrt_avl_tree_t overflow; +}; + +static int cmp_topic_ptrs (const void *va, const void *vb) +{ + uintptr_t a = (uintptr_t) va; + uintptr_t b = (uintptr_t) vb; + return (a == b) ? 0 : (a < b) ? -1 : 1; +} + +static const ddsrt_avl_treedef_t tsc_large_td = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct topic_sample_cache_large_entry, avlnode), offsetof (struct topic_sample_cache_large_entry, topic), cmp_topic_ptrs, 0); + +static void free_sample_after_store (struct q_globals *gv, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk) +{ + if (sample) + { + ddsi_tkmap_instance_unref (gv->m_tkmap, tk); + ddsi_serdata_unref (sample); + } +} + +static void topic_sample_cache_init (struct topic_sample_cache * __restrict tsc) +{ + tsc->n = 0; + ddsrt_avl_init (&tsc_large_td, &tsc->overflow); +} + +static void free_large_entry (void *vnode, void *varg) +{ + struct topic_sample_cache_large_entry *e = vnode; + struct q_globals *gv = varg; + free_sample_after_store (gv, e->sample, e->tk); + ddsrt_free (e); +} + +static void topic_sample_cache_fini (struct topic_sample_cache * __restrict tsc, struct q_globals *gv) +{ + for (uint32_t i = 0; i < tsc->n && i < TOPIC_SAMPLE_CACHE_SIZE; i++) + if (tsc->topics[i] && tsc->samples[i].tk) + free_sample_after_store (gv, tsc->samples[i].sample, tsc->samples[i].tk); + + ddsrt_avl_free_arg (&tsc_large_td, &tsc->overflow, free_large_entry, gv); +} + +static bool topic_sample_cache_lookup (struct ddsi_serdata ** __restrict sample, struct ddsi_tkmap_instance ** __restrict tk, struct topic_sample_cache * __restrict tsc, const struct ddsi_sertopic *topic) +{ + /* linear scan of an array of pointers should be pretty fast */ + for (uint32_t i = 0; i < tsc->n && i < TOPIC_SAMPLE_CACHE_SIZE; i++) + { + if (tsc->topics[i] == topic) + { + *tk = tsc->samples[i].tk; + *sample = tsc->samples[i].sample; + return true; + } + } + + struct topic_sample_cache_large_entry *e; + if ((e = ddsrt_avl_lookup (&tsc_large_td, &tsc->overflow, topic)) != NULL) + { + *tk = e->tk; + *sample = e->sample; + return true; + } + return false; +} + +static void topic_sample_cache_store (struct topic_sample_cache * __restrict tsc, const struct ddsi_sertopic *topic, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk) +{ + if (tsc->n < TOPIC_SAMPLE_CACHE_SIZE) + { + tsc->topics[tsc->n] = topic; + tsc->samples[tsc->n].tk = tk; + tsc->samples[tsc->n].sample = sample; + } + else + { + struct topic_sample_cache_large_entry *e = ddsrt_malloc (sizeof (*e)); + e->topic = topic; + e->tk = tk; + e->sample = sample; + ddsrt_avl_insert (&tsc_large_td, &tsc->overflow, e); + } + tsc->n++; +} + +dds_return_t deliver_locally_one (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, const ddsi_guid_t *rdguid, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo) +{ + struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, rdguid); + if (rd == NULL) + return DDS_RETCODE_OK; + + struct ddsi_serdata *payload; + struct ddsi_tkmap_instance *tk; + if ((payload = ops->makesample (&tk, gv, rd->topic, vsourceinfo)) != NULL) + { + EETRACE (source_entity, " =>"PGUIDFMT"\n", PGUID (*rdguid)); + /* FIXME: why look up rd,pwr again? Their states remains valid while the thread stays + "awake" (although a delete can be initiated), and blocking like this is a stopgap + anyway -- quite possibly to abort once either is deleted */ + while (!ddsi_rhc_store (rd->rhc, wrinfo, payload, tk)) + { + if (source_entity_locked) + ddsrt_mutex_unlock (&source_entity->lock); + dds_sleepfor (DDS_MSECS (1)); + if (source_entity_locked) + ddsrt_mutex_lock (&source_entity->lock); + if (entidx_lookup_reader_guid (gv->entity_index, rdguid) == NULL || + entidx_lookup_guid_untyped (gv->entity_index, &source_entity->guid) == NULL) + { + /* give up when reader or proxy writer no longer accessible */ + break; + } + } + free_sample_after_store (gv, payload, tk); + } + return DDS_RETCODE_OK; +} + +static dds_return_t deliver_locally_slowpath (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo) +{ + /* When deleting, pwr is no longer accessible via the hash + tables, and consequently, a reader may be deleted without + it being possible to remove it from rdary. The primary + reason rdary exists is to avoid locking the proxy writer + but this is less of an issue when we are deleting it, so + we fall back to using the GUIDs so that we can deliver all + samples we received from it. As writer being deleted any + reliable samples that are rejected are simply discarded. */ + struct topic_sample_cache tsc; + ddsrt_avl_iter_t it; + struct reader *rd; + topic_sample_cache_init (&tsc); + if (!source_entity_locked) + ddsrt_mutex_lock (&source_entity->lock); + rd = ops->first_reader (gv->entity_index, source_entity, &it); + if (rd != NULL) + EETRACE (source_entity, " =>"); + while (rd != NULL) + { + struct ddsi_serdata *payload; + struct ddsi_tkmap_instance *tk; + if (!topic_sample_cache_lookup (&payload, &tk, &tsc, rd->topic)) + { + payload = ops->makesample (&tk, gv, rd->topic, vsourceinfo); + topic_sample_cache_store (&tsc, rd->topic, payload, tk); + } + /* check payload to allow for deserialisation failures */ + if (payload) + { + EETRACE (source_entity, " "PGUIDFMT, PGUID (rd->e.guid)); + (void) ddsi_rhc_store (rd->rhc, wrinfo, payload, tk); + } + rd = ops->next_reader (gv->entity_index, &it); + } + EETRACE (source_entity, "\n"); + if (!source_entity_locked) + ddsrt_mutex_unlock (&source_entity->lock); + topic_sample_cache_fini (&tsc, gv); + return DDS_RETCODE_OK; +} + +static dds_return_t deliver_locally_fastpath (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo) +{ + struct reader ** const rdary = fastpath_rdary->rdary; + uint32_t i = 0; + while (rdary[i]) + { + struct ddsi_sertopic const * const topic = rdary[i]->topic; + struct ddsi_serdata *payload; + struct ddsi_tkmap_instance *tk; + if ((payload = ops->makesample (&tk, gv, topic, vsourceinfo)) == NULL) + { + /* malformed payload: skip all readers with the same topic */ + while (rdary[++i] && rdary[i]->topic == topic) + ; /* do nothing */ + } + else + { + do { + dds_return_t rc; + while (!ddsi_rhc_store (rdary[i]->rhc, wrinfo, payload, tk)) + { + if ((rc = ops->on_failure_fastpath (source_entity, source_entity_locked, fastpath_rdary, vsourceinfo)) != DDS_RETCODE_OK) + { + free_sample_after_store (gv, payload, tk); + return rc; + } + } + } while (rdary[++i] && rdary[i]->topic == topic); + free_sample_after_store (gv, payload, tk); + } + } + return DDS_RETCODE_OK; +} + +dds_return_t deliver_locally_allinsync (struct q_globals *gv, struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, const struct ddsi_writer_info *wrinfo, const struct deliver_locally_ops * __restrict ops, void *vsourceinfo) +{ + dds_return_t rc; + /* FIXME: Retry loop for re-delivery of rejected reliable samples is a bad hack + should instead throttle back the writer by skipping acknowledgement and retry */ + do { + ddsrt_mutex_lock (&fastpath_rdary->rdary_lock); + if (fastpath_rdary->fastpath_ok) + { + EETRACE (source_entity, " => EVERYONE\n"); + if (fastpath_rdary->rdary[0]) + rc = deliver_locally_fastpath (gv, source_entity, source_entity_locked, fastpath_rdary, wrinfo, ops, vsourceinfo); + else + rc = DDS_RETCODE_OK; + ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock); + } + else + { + ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock); + rc = deliver_locally_slowpath (gv, source_entity, source_entity_locked, wrinfo, ops, vsourceinfo); + } + } while (rc == DDS_RETCODE_TRY_AGAIN); + return rc; +} diff --git a/src/core/ddsi/src/ddsi_serdata.c b/src/core/ddsi/src/ddsi_serdata.c index 68caae0..5cd816e 100644 --- a/src/core/ddsi/src/ddsi_serdata.c +++ b/src/core/ddsi/src/ddsi_serdata.c @@ -36,6 +36,7 @@ extern inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata * extern inline void ddsi_serdata_unref (struct ddsi_serdata *serdata); extern inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d); extern inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size); +extern inline struct ddsi_serdata *ddsi_serdata_from_ser_iov (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size); extern inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash); extern inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample); extern inline struct ddsi_serdata *ddsi_serdata_to_topicless (const struct ddsi_serdata *d); diff --git a/src/core/ddsi/src/ddsi_serdata_default.c b/src/core/ddsi/src/ddsi_serdata_default.c index a33d3c1..9344253 100644 --- a/src/core/ddsi/src/ddsi_serdata_default.c +++ b/src/core/ddsi/src/ddsi_serdata_default.c @@ -268,6 +268,51 @@ static struct ddsi_serdata_default *serdata_default_from_ser_common (const struc } } +static struct ddsi_serdata_default *serdata_default_from_ser_iov_common (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size) +{ + const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn; + + /* FIXME: check whether this really is the correct maximum: offsets are relative + to the CDR header, but there are also some places that use a serdata as-if it + were a stream, and those use offsets (m_index) relative to the start of the + serdata */ + if (size > UINT32_MAX - offsetof (struct ddsi_serdata_default, hdr)) + return NULL; + assert (niov >= 1); + if (iov[0].iov_len < 4) /* CDR header */ + return NULL; + struct ddsi_serdata_default *d = serdata_default_new_size (tp, kind, (uint32_t) size); + if (d == NULL) + return NULL; + + memcpy (&d->hdr, iov[0].iov_base, sizeof (d->hdr)); + assert (d->hdr.identifier == CDR_LE || d->hdr.identifier == CDR_BE); + serdata_default_append_blob (&d, 1, iov[0].iov_len - 4, (const char *) iov[0].iov_base + 4); + for (ddsrt_msg_iovlen_t i = 1; i < niov; i++) + serdata_default_append_blob (&d, 1, iov[i].iov_len, iov[i].iov_base); + + const bool needs_bswap = (d->hdr.identifier != NATIVE_ENCODING); + d->hdr.identifier = NATIVE_ENCODING; + const uint32_t pad = ddsrt_fromBE2u (d->hdr.options) & 2; + if (d->pos < pad) + { + ddsi_serdata_unref (&d->c); + return NULL; + } + else if (!dds_stream_normalize (d->data, d->pos - pad, needs_bswap, tp, kind == SDK_KEY)) + { + ddsi_serdata_unref (&d->c); + return NULL; + } + else + { + dds_istream_t is; + dds_istream_from_serdata_default (&is, d); + dds_stream_extract_keyhash (&is, &d->keyhash, tp, kind == SDK_KEY); + return d; + } +} + static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size) { struct ddsi_serdata_default *d; @@ -276,6 +321,14 @@ static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic return fix_serdata_default (d, tpcmn->serdata_basehash); } +static struct ddsi_serdata *serdata_default_from_ser_iov (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size) +{ + struct ddsi_serdata_default *d; + if ((d = serdata_default_from_ser_iov_common (tpcmn, kind, niov, iov, size)) == NULL) + return NULL; + return fix_serdata_default (d, tpcmn->serdata_basehash); +} + static struct ddsi_serdata *serdata_default_from_ser_nokey (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size) { struct ddsi_serdata_default *d; @@ -284,6 +337,14 @@ static struct ddsi_serdata *serdata_default_from_ser_nokey (const struct ddsi_se return fix_serdata_default_nokey (d, tpcmn->serdata_basehash); } +static struct ddsi_serdata *serdata_default_from_ser_iov_nokey (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t *iov, size_t size) +{ + struct ddsi_serdata_default *d; + if ((d = serdata_default_from_ser_iov_common (tpcmn, kind, niov, iov, size)) == NULL) + return NULL; + return fix_serdata_default_nokey (d, tpcmn->serdata_basehash); +} + static struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash) { /* FIXME: not quite sure this is correct, though a check against a specially hacked OpenSplice suggests it is */ @@ -630,6 +691,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = { .eqkey = serdata_default_eqkey, .free = serdata_default_free, .from_ser = serdata_default_from_ser, + .from_ser_iov = serdata_default_from_ser_iov, .from_keyhash = ddsi_serdata_from_keyhash_cdr, .from_sample = serdata_default_from_sample_cdr, .to_ser = serdata_default_to_ser, @@ -646,6 +708,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey = { .eqkey = serdata_default_eqkey_nokey, .free = serdata_default_free, .from_ser = serdata_default_from_ser_nokey, + .from_ser_iov = serdata_default_from_ser_iov_nokey, .from_keyhash = ddsi_serdata_from_keyhash_cdr_nokey, .from_sample = serdata_default_from_sample_cdr_nokey, .to_ser = serdata_default_to_ser, @@ -662,6 +725,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_plist = { .eqkey = serdata_default_eqkey, .free = serdata_default_free, .from_ser = serdata_default_from_ser, + .from_ser_iov = serdata_default_from_ser_iov, .from_keyhash = 0, .from_sample = serdata_default_from_sample_plist, .to_ser = serdata_default_to_ser, @@ -678,6 +742,7 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = { .eqkey = serdata_default_eqkey, .free = serdata_default_free, .from_ser = serdata_default_from_ser, + .from_ser_iov = serdata_default_from_ser_iov, .from_keyhash = 0, .from_sample = serdata_default_from_sample_rawcdr, .to_ser = serdata_default_to_ser, diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index ee58f1e..456a98e 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -259,10 +259,28 @@ static void local_reader_ary_fini (struct local_reader_ary *x) static void local_reader_ary_insert (struct local_reader_ary *x, struct reader *rd) { ddsrt_mutex_lock (&x->rdary_lock); + x->rdary = ddsrt_realloc (x->rdary, (x->n_readers + 2) * sizeof (*x->rdary)); + if (x->n_readers <= 1 || rd->topic == x->rdary[x->n_readers - 1]->topic) + { + /* if the first or second reader, or if the topic is the same as that of + the last one in the list simply appending the new will maintain order */ + x->rdary[x->n_readers] = rd; + } + else + { + uint32_t i; + for (i = 0; i < x->n_readers; i++) + if (x->rdary[i]->topic == rd->topic) + break; + if (i < x->n_readers) + { + /* shift any with the same topic plus whichever follow to make room */ + memmove (&x->rdary[i + 1], &x->rdary[i], (x->n_readers - i) * sizeof (x->rdary[i])); + } + x->rdary[i] = rd; + } + x->rdary[x->n_readers + 1] = NULL; x->n_readers++; - x->rdary = ddsrt_realloc (x->rdary, (x->n_readers + 1) * sizeof (*x->rdary)); - x->rdary[x->n_readers - 1] = rd; - x->rdary[x->n_readers] = NULL; ddsrt_mutex_unlock (&x->rdary_lock); } @@ -271,13 +289,20 @@ static void local_reader_ary_remove (struct local_reader_ary *x, struct reader * uint32_t i; ddsrt_mutex_lock (&x->rdary_lock); for (i = 0; i < x->n_readers; i++) - { if (x->rdary[i] == rd) break; - } assert (i < x->n_readers); - /* if i == N-1 copy is a no-op */ - x->rdary[i] = x->rdary[x->n_readers-1]; + if (i + 1 < x->n_readers) + { + /* dropping the final one never requires any fixups; dropping one that has + the same topic as the last is as simple as moving the last one in the + removed one's location; else shift all following readers to keep it + grouped by topic */ + if (rd->topic == x->rdary[x->n_readers - 1]->topic) + x->rdary[i] = x->rdary[x->n_readers - 1]; + else + memmove (&x->rdary[i], &x->rdary[i + 1], (x->n_readers - i - 1) * sizeof (x->rdary[i])); + } x->n_readers--; x->rdary[x->n_readers] = NULL; x->rdary = ddsrt_realloc (x->rdary, (x->n_readers + 1) * sizeof (*x->rdary)); @@ -4537,7 +4562,6 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons pwr->evq = evq; pwr->ddsi2direct_cb = 0; pwr->ddsi2direct_cbarg = 0; - local_reader_ary_init (&pwr->rdary); /* locking the entity prevents matching while the built-in topic hasn't been published yet */ diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index b0d14b1..0b8e118 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -45,6 +45,7 @@ #include "dds/ddsi/q_xmsg.h" #include "dds/ddsi/q_receive.h" #include "dds/ddsi/ddsi_rhc.h" +#include "dds/ddsi/ddsi_deliver_locally.h" #include "dds/ddsi/q_transmit.h" #include "dds/ddsi/q_globals.h" @@ -1733,12 +1734,30 @@ static struct ddsi_serdata *get_serdata (struct ddsi_sertopic const * const topi return sd; } -static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **tk1, struct q_globals *gv, const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags, const nn_plist_t *qos, const struct nn_rdata *fragchain, unsigned statusinfo, nn_wctime_t tstamp, struct ddsi_sertopic const * const topic) +struct remote_sourceinfo { + const struct nn_rsample_info *sampleinfo; + unsigned char data_smhdr_flags; + const nn_plist_t *qos; + const struct nn_rdata *fragchain; + unsigned statusinfo; + nn_wctime_t tstamp; +}; + +static struct ddsi_serdata *remote_make_sample (struct ddsi_tkmap_instance **tk, struct q_globals *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo) { + /* hopefully the compiler figures out that these are just aliases and doesn't reload them + unnecessarily from memory */ + const struct remote_sourceinfo * __restrict si = vsourceinfo; + const struct nn_rsample_info * __restrict sampleinfo = si->sampleinfo; + const struct nn_rdata * __restrict fragchain = si->fragchain; + const uint32_t statusinfo = si->statusinfo; + const unsigned char data_smhdr_flags = si->data_smhdr_flags; + const nn_wctime_t tstamp = si->tstamp; + const nn_plist_t * __restrict qos = si->qos; const char *failmsg = NULL; struct ddsi_serdata *sample = NULL; - if (statusinfo == 0) + if (si->statusinfo == 0) { /* normal write */ if (!(data_smhdr_flags & DATA_FLAG_DATAFLAG) || sampleinfo->size == 0) @@ -1752,7 +1771,7 @@ static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **t "data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": write without proper payload (data_smhdr_flags 0x%x size %"PRIu32")\n", sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1], PGUID (guid), sampleinfo->seq, - data_smhdr_flags, sampleinfo->size); + si->data_smhdr_flags, sampleinfo->size); return NULL; } sample = get_serdata (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp); @@ -1807,7 +1826,7 @@ static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **t } else { - if ((*tk1 = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, sample)) == NULL) + if ((*tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, sample)) == NULL) { ddsi_serdata_unref (sample); sample = NULL; @@ -1831,12 +1850,6 @@ static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **t return sample; } -static void free_sample_after_store (struct q_globals *gv, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk) -{ - ddsi_tkmap_instance_unref (gv->m_tkmap, tk); - ddsi_serdata_unref (sample); -} - unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr) { switch ((SubmessageKind_t) smhdr->submessageId) @@ -1858,28 +1871,51 @@ unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr) } } -static struct reader *proxy_writer_first_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it) +static struct reader *proxy_writer_first_in_sync_reader (struct entity_index *entity_index, struct entity_common *pwrcmn, ddsrt_avl_iter_t *it) { + assert (pwrcmn->kind == EK_PROXY_WRITER); + struct proxy_writer *pwr = (struct proxy_writer *) pwrcmn; struct pwr_rd_match *m; struct reader *rd; for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, it); m != NULL; m = ddsrt_avl_iter_next (it)) - if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &m->rd_guid)) != NULL) + if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (entity_index, &m->rd_guid)) != NULL) return rd; return NULL; } -static struct reader *proxy_writer_next_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it) +static struct reader *proxy_writer_next_in_sync_reader (struct entity_index *entity_index, ddsrt_avl_iter_t *it) { struct pwr_rd_match *m; struct reader *rd; for (m = ddsrt_avl_iter_next (it); m != NULL; m = ddsrt_avl_iter_next (it)) - if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &m->rd_guid)) != NULL) + if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (entity_index, &m->rd_guid)) != NULL) return rd; return NULL; } +static dds_return_t remote_on_delivery_failure_fastpath (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo) +{ + (void) vsourceinfo; + ddsrt_mutex_unlock (&fastpath_rdary->rdary_lock); + if (source_entity_locked) + ddsrt_mutex_unlock (&source_entity->lock); + + dds_sleepfor (DDS_MSECS (10)); + + if (source_entity_locked) + ddsrt_mutex_lock (&source_entity->lock); + ddsrt_mutex_lock (&fastpath_rdary->rdary_lock); + return DDS_RETCODE_TRY_AGAIN; +} + static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const ddsi_guid_t *rdguid, int pwr_locked) { + static const struct deliver_locally_ops deliver_locally_ops = { + .makesample = remote_make_sample, + .first_reader = proxy_writer_first_in_sync_reader, + .next_reader = proxy_writer_next_in_sync_reader, + .on_failure_fastpath = remote_on_delivery_failure_fastpath + }; struct receiver_state const * const rst = sampleinfo->rst; struct q_globals * const gv = rst->gv; struct proxy_writer * const pwr = sampleinfo->pwr; @@ -1953,108 +1989,22 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st struct ddsi_writer_info wrinfo; ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos, statusinfo); - if (rdguid == NULL) - { - /* FIXME: Retry loop, for re-delivery of rejected reliable samples. Is a - temporary hack till throttling back of writer is implemented (with late - acknowledgement of sample and nack). */ - retry: - ddsrt_mutex_lock (&pwr->rdary.rdary_lock); - if (pwr->rdary.fastpath_ok) - { - struct reader ** const rdary = pwr->rdary.rdary; - if (rdary[0]) - { - struct ddsi_serdata *payload; - struct ddsi_tkmap_instance *tk; - if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rdary[0]->topic)) != NULL) - { - ETRACE (pwr, " => EVERYONE\n"); - uint32_t i = 0; - do { - if (!ddsi_rhc_store (rdary[i]->rhc, &wrinfo, payload, tk)) - { - if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); - /* It is painful to drop the sample, but there is no guarantee that the readers - will still be there after unlocking; indeed, it is even possible that the - topic definition got replaced in the meantime. Fortunately, this is in - the midst of a FIXME for many other reasons. */ - free_sample_after_store (gv, payload, tk); - dds_sleepfor (DDS_MSECS (10)); - if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); - goto retry; - } - } while (rdary[++i]); - free_sample_after_store (gv, payload, tk); - } - } - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); - } - else - { - /* When deleting, pwr is no longer accessible via the hash - tables, and consequently, a reader may be deleted without - it being possible to remove it from rdary. The primary - reason rdary exists is to avoid locking the proxy writer - but this is less of an issue when we are deleting it, so - we fall back to using the GUIDs so that we can deliver all - samples we received from it. As writer being deleted any - reliable samples that are rejected are simply discarded. */ - ddsrt_avl_iter_t it; - struct reader *rd; - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); - if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); - if ((rd = proxy_writer_first_in_sync_reader (pwr, &it)) != NULL) - { - struct ddsi_serdata *payload; - struct ddsi_tkmap_instance *tk; - if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL) - { - ETRACE (pwr, " =>"); - do { - ETRACE (pwr, " "PGUIDFMT, PGUID (rd->e.guid)); - (void) ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk); - rd = proxy_writer_next_in_sync_reader (pwr, &it); - } while (rd != NULL); - free_sample_after_store (gv, payload, tk); - ETRACE (pwr, "\n"); - } - } - if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - } - - ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1)); - } + struct remote_sourceinfo sourceinfo = { + .sampleinfo = sampleinfo, + .data_smhdr_flags = data_smhdr_flags, + .qos = &qos, + .fragchain = fragchain, + .statusinfo = statusinfo, + .tstamp = tstamp + }; + if (rdguid) + (void) deliver_locally_one (gv, &pwr->e, pwr_locked != 0, rdguid, &wrinfo, &deliver_locally_ops, &sourceinfo); else { - struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, rdguid); - if (rd != NULL) - { - struct ddsi_serdata *payload; - struct ddsi_tkmap_instance *tk; - if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL) - { - ETRACE (pwr, " =>"PGUIDFMT"\n", PGUID (*rdguid)); - /* FIXME: why look up rd,pwr again? Their states remains valid while the thread stays - "awake" (although a delete can be initiated), and blocking like this is a stopgap - anyway -- quite possibly to abort once either is deleted */ - while (!ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk)) - { - if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - dds_sleepfor (DDS_MSECS (1)); - if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); - if (entidx_lookup_reader_guid (gv->entity_index, rdguid) == NULL || - entidx_lookup_proxy_writer_guid (gv->entity_index, &pwr->e.guid) == NULL) - { - /* give up when reader or proxy writer no longer accessible */ - break; - } - } - free_sample_after_store (gv, payload, tk); - } - } + (void) deliver_locally_allinsync (gv, &pwr->e, pwr_locked != 0, &pwr->rdary, &wrinfo, &deliver_locally_ops, &sourceinfo); + ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1)); } + nn_plist_fini (&qos); return 0; }