From a652ecb78e145fa936b8fef2f58f27ab0411245a Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 22 May 2019 20:53:57 +0200 Subject: [PATCH] ensure delivery of writes immediately following pub match event (#165) A long-standing bug of Cyclone is that a sample written immediately after a publication-matched event may never arrive at the reader that was just matched. This happened because the reader need not have completed discovery of the writer by the time the writer discovers the reader, at which point the reader ignores the sample because it either doesn't know the writer at all, or it hasn't yet seen a Heartbeat from it. That Heartbeat arrives shortly after, but by then it is too late: the reader slaves decides to accept the next sample to be written by the writer. (It has no choice, really: either you risk losing some data, or you will be requesting all historical data, which is empathically not what a volatile reader is about ...) A related issue is the handling of historical data for transient-local readers: it used to deliver this out-of-order, but that is firstly against the specification, and secondly, against reasonable expectations of those who use DDS as a mere publish-subscribe messaging system. To add insult to injury, it didn't completely handle some reordering issues with disposes ... This commit changes the way writers respond to a request for retransmission from volatile proxy readers and the way the in-sync/out-of-sync setting of a reader with respect to a proxy-writer is used. The first makes it safe for a Cyclone reader to ask a Cyclone writer for all data (all these details not being covered in the specs it errs on the reasonable side for other vendors, but that may cause the data loss mentioned above): the writer simply send a Gap message to the reader for all the sequence numbers prior to the matching. The second changes the rule for switching from out-of-sync to in-sync: that transition is now simply once the next sequence number to be delivered to the reader equals the next sequence number that will be delivered directly from the proxy writer object to all readers. (I.e., a much more intuitive notion than reaching some seemingly arbitrary sequence number.) To avoid duplicates the rule for delivery straight from a proxy writer has changed: where samples were delivered from the proxy writer to all matching readers, they are now delivered only to the matching readers that are in-sync. To avoid ordering problems, the idea that historical data can be delivered through the asynchronous delivery path even when the regular data goes through the synchronous delivery path has been abandoned. All data now always follows the same path. As these same mechanisms are used for getting historical data into transient-local readers, the ordering problem for the historical data also disappeared. The test stuff in src/core/xtests/initsampledeliv covers a lot of the interesting cases: data published before the existene of a reader, after it, mixes of volatile and transient-local. Running them takes quite a bit of time, and they are not yet integrated in the CI builds (if ever, because of that time). Note: the "conservative built-in startup" option has been removed, because it really makes no sense to keep a vague compatibility option added a decade ago "just in case" that has never been used ... Note: the workaround in the src/mpt/tests/basic/procs/hello.c (use transient-local to ensure delivery of data) has been removed, as has been its workaround for the already-fixed #146. Signed-off-by: Erik Boasson --- src/core/ddsi/include/dds/ddsi/q_config.h | 1 - src/core/ddsi/include/dds/ddsi/q_entity.h | 4 +- src/core/ddsi/src/q_config.c | 7 +- src/core/ddsi/src/q_debmon.c | 2 +- src/core/ddsi/src/q_entity.c | 80 ++++-- src/core/ddsi/src/q_receive.c | 257 +++++++++--------- src/core/xtests/CMakeLists.txt | 17 +- .../xtests/initsampledeliv/CMakeLists.txt | 20 ++ .../initsampledeliv/InitSampleDelivData.idl | 19 ++ src/core/xtests/initsampledeliv/publisher.c | 163 +++++++++++ src/core/xtests/initsampledeliv/runtest | 46 ++++ src/core/xtests/initsampledeliv/subscriber.c | 198 ++++++++++++++ src/core/xtests/rhc_torture/CMakeLists.txt | 26 ++ .../xtests/{ => rhc_torture}/RhcTypes.idl | 0 .../xtests/{ => rhc_torture}/rhc_torture.c | 0 src/mpt/tests/basic/procs/hello.c | 9 +- 16 files changed, 677 insertions(+), 172 deletions(-) create mode 100644 src/core/xtests/initsampledeliv/CMakeLists.txt create mode 100644 src/core/xtests/initsampledeliv/InitSampleDelivData.idl create mode 100644 src/core/xtests/initsampledeliv/publisher.c create mode 100644 src/core/xtests/initsampledeliv/runtest create mode 100644 src/core/xtests/initsampledeliv/subscriber.c create mode 100644 src/core/xtests/rhc_torture/CMakeLists.txt rename src/core/xtests/{ => rhc_torture}/RhcTypes.idl (100%) rename src/core/xtests/{ => rhc_torture}/rhc_torture.c (100%) diff --git a/src/core/ddsi/include/dds/ddsi/q_config.h b/src/core/ddsi/include/dds/ddsi/q_config.h index d7cd25d..a83fbf8 100644 --- a/src/core/ddsi/include/dds/ddsi/q_config.h +++ b/src/core/ddsi/include/dds/ddsi/q_config.h @@ -330,7 +330,6 @@ struct config uint32_t rmsg_chunk_size; /**<< size of a chunk in the receive buffer */ uint32_t rbuf_size; /* << size of a single receiver buffer */ enum besmode besmode; - int conservative_builtin_reader_startup; int meas_hb_to_ack_latency; int unicast_response_to_spdp_messages; int synchronous_delivery_priority_threshold; diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 4c0c4bd..603b88f 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -123,7 +123,6 @@ struct pwr_rd_match { union { struct { seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */ - seqno_t end_of_out_of_sync_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */ struct nn_reorder *reorder; /* can be done (mostly) per proxy writer, but that is harder; only when state=OUT_OF_SYNC */ } not_in_sync; } u; @@ -574,6 +573,9 @@ uint64_t writer_instance_id (const struct nn_guid *guid); rebuild them all (which only makes sense after previously having emptied them all). */ void rebuild_or_clear_writer_addrsets(int rebuild); + +void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsi/src/q_config.c b/src/core/ddsi/src/q_config.c index 382d2c1..98f030b 100644 --- a/src/core/ddsi/src/q_config.c +++ b/src/core/ddsi/src/q_config.c @@ -509,9 +509,9 @@ static const struct cfgelem unsupp_cfgelems[] = { { MOVED("FragmentSize", "CycloneDDS/General/FragmentSize") }, { LEAF("DeliveryQueueMaxSamples"), 1, "256", ABSOFF(delivery_queue_maxsamples), 0, uf_uint, 0, pf_uint, BLURB("

This element controls the Maximum size of a delivery queue, expressed in samples. Once a delivery queue is full, incoming samples destined for that queue are dropped until space becomes available again.

") }, - { LEAF("PrimaryReorderMaxSamples"), 1, "64", ABSOFF(primary_reorder_maxsamples), 0, uf_uint, 0, pf_uint, + { LEAF("PrimaryReorderMaxSamples"), 1, "128", ABSOFF(primary_reorder_maxsamples), 0, uf_uint, 0, pf_uint, BLURB("

This element sets the maximum size in samples of a primary re-order administration. Each proxy writer has one primary re-order administration to buffer the packet flow in case some packets arrive out of order. Old samples are forwarded to secondary re-order administrations associated with readers in need of historical data.

") }, - { LEAF("SecondaryReorderMaxSamples"), 1, "16", ABSOFF(secondary_reorder_maxsamples), 0, uf_uint, 0, pf_uint, + { LEAF("SecondaryReorderMaxSamples"), 1, "128", ABSOFF(secondary_reorder_maxsamples), 0, uf_uint, 0, pf_uint, BLURB("

This element sets the maximum size in samples of a secondary re-order administration. The secondary re-order administration is per reader in need of historical data.

") }, { LEAF("DefragUnreliableMaxSamples"), 1, "4", ABSOFF(defrag_unreliable_maxsamples), 0, uf_uint, 0, pf_uint, BLURB("

This element sets the maximum number of samples that can be defragmented simultaneously for a best-effort writers.

") }, @@ -523,9 +523,6 @@ static const struct cfgelem unsupp_cfgelems[] = {
  • writers: all participants have the writers, but just one has the readers;
  • \n\
  • minimal: only one participant has built-in endpoints.
  • \n\

    The default is writers, as this is thought to be compliant and reasonably efficient. Minimal may or may not be compliant but is most efficient, and full is inefficient but certain to be compliant. See also Internal/ConservativeBuiltinReaderStartup.

    ") }, - { LEAF("ConservativeBuiltinReaderStartup"), 1, "false", ABSOFF(conservative_builtin_reader_startup), 0, uf_boolean, 0, pf_boolean, - BLURB("

    This element forces all DDSI2E built-in discovery-related readers to request all historical data, instead of just one for each \"topic\". There is no indication that any of the current DDSI implementations requires changing of this setting, but it is conceivable that an implementation might track which participants have been informed of the existence of endpoints and which have not been, refusing communication with those that have \"can't\" know.

    \n\ -

    Should it be necessary to hide DDSI2E's shared discovery behaviour, set this to true and Internal/BuiltinEndpointSet to full.

    ") }, { LEAF("MeasureHbToAckLatency"), 1, "false", ABSOFF(meas_hb_to_ack_latency), 0, uf_boolean, 0, pf_boolean, BLURB("

    This element enables heartbeat-to-ack latency among DDSI2E services by prepending timestamps to Heartbeat and AckNack messages and calculating round trip times. This is non-standard behaviour. The measured latencies are quite noisy and are currently not used anywhere.

    ") }, { LEAF("UnicastResponseToSPDPMessages"), 1, "true", ABSOFF(unicast_response_to_spdp_messages), 0, uf_boolean, 0, pf_boolean, diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index 35ed218..9ea929f 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -286,7 +286,7 @@ static int print_proxy_participants (struct thread_state1 * const ts1, ddsi_tran x += cpf (conn, " tl-catchup end_of_tl_seq %lld\n", m->u.not_in_sync.end_of_tl_seq); break; case PRMSS_OUT_OF_SYNC: - x += cpf (conn, " out-of-sync end_of_tl_seq %lld end_of_out_of_sync_seq %lld\n", m->u.not_in_sync.end_of_tl_seq, m->u.not_in_sync.end_of_out_of_sync_seq); + x += cpf (conn, " out-of-sync end_of_tl_seq %lld\n", m->u.not_in_sync.end_of_tl_seq); break; } } diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index f7d3480..9dc0024 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -235,6 +235,14 @@ void local_reader_ary_remove (struct local_reader_ary *x, struct reader *rd) ddsrt_mutex_unlock (&x->rdary_lock); } +void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok) +{ + ddsrt_mutex_lock (&x->rdary_lock); + if (x->valid) + x->fastpath_ok = fastpath_ok; + ddsrt_mutex_unlock (&x->rdary_lock); +} + void local_reader_ary_setinvalid (struct local_reader_ary *x) { ddsrt_mutex_lock (&x->rdary_lock); @@ -1492,9 +1500,17 @@ static void proxy_writer_drop_connection (const struct nn_guid *pwr_guid, struct { ddsrt_avl_delete (&pwr_readers_treedef, &pwr->readers, m); if (m->in_sync != PRMSS_SYNC) - pwr->n_readers_out_of_sync--; + { + if (--pwr->n_readers_out_of_sync == 0) + local_reader_ary_setfastpath_ok (&pwr->rdary, true); + } if (rd->reliable) pwr->n_reliable_readers--; + /* If no reliable readers left, there is no reason to believe the heartbeats will keep + coming and therefore reset have_seen_heartbeat so the next reader to be created + doesn't get initialised based on stale data */ + if (pwr->n_reliable_readers == 0) + pwr->have_seen_heartbeat = 0; local_reader_ary_remove (&pwr->rdary, rd); } ddsrt_mutex_unlock (&pwr->e.lock); @@ -1775,7 +1791,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader { struct pwr_rd_match *m = ddsrt_malloc (sizeof (*m)); ddsrt_avl_ipath_t path; - seqno_t last_deliv_seq; ddsrt_mutex_lock (&pwr->e.lock); if (ddsrt_avl_lookup_ipath (&pwr_readers_treedef, &pwr->readers, &rd->e.guid, &path)) @@ -1794,7 +1809,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader m->rd_guid = rd->e.guid; m->tcreate = now_mt (); - /* We track the last heartbeat count value per reader--proxy-writer pair, so that we can correctly handle directed heartbeats. The only reason to bother is to prevent a directed heartbeat (with @@ -1813,34 +1827,61 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader /* These can change as a consequence of handling data and/or discovery activities. The safe way of dealing with them is to lock the proxy writer */ - last_deliv_seq = nn_reorder_next_seq (pwr->reorder) - 1; - if (!rd->handle_as_transient_local) + if (is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) && !ddsrt_avl_is_empty (&pwr->readers)) { + /* builtins really don't care about multiple copies or anything */ m->in_sync = PRMSS_SYNC; } - else if (!config.conservative_builtin_reader_startup && is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) && !ddsrt_avl_is_empty (&pwr->readers)) + else if (!pwr->have_seen_heartbeat) { - /* builtins really don't care about multiple copies */ + /* Proxy writer hasn't seen a heartbeat yet: means we have no + clue from what sequence number to start accepting data, nor + where historical data ends and live data begins. + + A transient-local reader should always get all historical + data, and so can always start-out as "out-of-sync". Cyclone + refuses to retransmit already ACK'd samples to a Cyclone + reader, so if the other side is Cyclone, we can always start + from sequence number 1. + + For non-Cyclone, if the reader is volatile, we have to just + start from the most recent sample, even though that means + the first samples written after matching the reader may be + lost. The alternative not only gets too much historical data + but may also result in "sample lost" notifications because the + writer is (may not be) retaining samples on behalf of this + reader for the oldest samples and so this reader may end up + with a partial set of old-ish samples. Even when both are + using KEEP_ALL and the connection doesn't fail ... */ + if (rd->handle_as_transient_local) + m->in_sync = PRMSS_OUT_OF_SYNC; + else if (vendor_is_eclipse (pwr->c.vendor)) + m->in_sync = PRMSS_OUT_OF_SYNC; + else + m->in_sync = PRMSS_SYNC; + m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER; + } + else if (!rd->handle_as_transient_local) + { + /* volatile reader, writer has seen a heartbeat: it's in sync + (there is a risk of it getting some historical data: that + happens to be cached in the writer's reorder admin at this + point) */ m->in_sync = PRMSS_SYNC; } else { - /* normal transient-local, reader is behind proxy writer */ + /* transient-local reader; range of sequence numbers is already + known */ m->in_sync = PRMSS_OUT_OF_SYNC; - if (last_deliv_seq == 0) - { - m->u.not_in_sync.end_of_out_of_sync_seq = MAX_SEQ_NUMBER; - m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER; - } - else - { - m->u.not_in_sync.end_of_tl_seq = pwr->last_seq; - m->u.not_in_sync.end_of_out_of_sync_seq = last_deliv_seq; - } - DDS_LOG(DDS_LC_DISCOVERY, " - out-of-sync %"PRId64, m->u.not_in_sync.end_of_out_of_sync_seq); + m->u.not_in_sync.end_of_tl_seq = pwr->last_seq; } if (m->in_sync != PRMSS_SYNC) + { + DDS_LOG(DDS_LC_DISCOVERY, " - out-of-sync"); pwr->n_readers_out_of_sync++; + local_reader_ary_setfastpath_ok (&pwr->rdary, false); + } m->count = init_count; /* Spec says we may send a pre-emptive AckNack (8.4.2.3.4), hence we schedule it for the configured delay * T_MILLISECOND. From then @@ -3510,7 +3551,6 @@ void new_proxy_participant proxypp->plist = nn_plist_dup (plist); ddsrt_avl_init (&proxypp_groups_treedef, &proxypp->groups); - if (custom_flags & CF_INC_KERNEL_SEQUENCE_NUMBERS) proxypp->kernel_sequence_numbers = 1; else diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 842d372..58ae5c3 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -74,7 +74,7 @@ Notes: */ -static void deliver_user_data_synchronously (struct nn_rsample_chain *sc); +static void deliver_user_data_synchronously (struct nn_rsample_chain *sc, const nn_guid_t *rdguid); static void maybe_set_reader_in_sync (struct proxy_writer *pwr, struct pwr_rd_match *wn, seqno_t last_deliv_seq) { @@ -87,11 +87,13 @@ static void maybe_set_reader_in_sync (struct proxy_writer *pwr, struct pwr_rd_ma if (last_deliv_seq >= wn->u.not_in_sync.end_of_tl_seq) { wn->in_sync = PRMSS_SYNC; - pwr->n_readers_out_of_sync--; + if (--pwr->n_readers_out_of_sync == 0) + local_reader_ary_setfastpath_ok (&pwr->rdary, true); } break; case PRMSS_OUT_OF_SYNC: - if (nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1 >= wn->u.not_in_sync.end_of_out_of_sync_seq) + assert (nn_reorder_next_seq (wn->u.not_in_sync.reorder) <= nn_reorder_next_seq (pwr->reorder)); + if (pwr->have_seen_heartbeat && nn_reorder_next_seq (wn->u.not_in_sync.reorder) == nn_reorder_next_seq (pwr->reorder)) { DDS_TRACE(" msr_in_sync("PGUIDFMT" out-of-sync to tlcatchup)", PGUID (wn->rd_guid)); wn->in_sync = PRMSS_TLCATCHUP; @@ -887,6 +889,8 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac a future request'll fix it. */ enqueued = 1; seq_xmit = READ_SEQ_XMIT(wr); + const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == NN_VOLATILE_DURABILITY_QOS && seqbase <= rn->seq; + const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0; for (i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++) { /* Accelerated schedule may run ahead of sequence number set @@ -897,7 +901,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac { seqno_t seq = seqbase + i; struct whc_borrowed_sample sample; - if (whc_borrow_sample (wr->whc, seq, &sample)) + if (seqbase + i >= min_seq_to_rexmit && whc_borrow_sample (wr->whc, seq, &sample)) { if (!wr->retransmitting && sample.unacked) writer_set_retransmitting (wr); @@ -1176,40 +1180,45 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct if (!rst->forme) { - DDS_TRACE(""PGUIDFMT" -> "PGUIDFMT" not-for-me)", PGUID (src), PGUID (dst)); + DDS_TRACE(PGUIDFMT" -> "PGUIDFMT" not-for-me)", PGUID (src), PGUID (dst)); return 1; } if ((pwr = ephash_lookup_proxy_writer_guid (&src)) == NULL) { - DDS_TRACE(""PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst)); + DDS_TRACE(PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst)); return 1; } - /* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */ + /* liveliness is still only implemented partially (with all set to AUTOMATIC, + BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */ if (pwr->assert_pp_lease) lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow); - DDS_TRACE(""PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst)); + DDS_TRACE(PGUIDFMT" -> "PGUIDFMT":", PGUID (src), PGUID (dst)); ddsrt_mutex_lock (&pwr->e.lock); + if (pwr->n_reliable_readers == 0) + { + DDS_TRACE(PGUIDFMT" -> "PGUIDFMT" no-reliable-readers)", PGUID (src), PGUID (dst)); + ddsrt_mutex_unlock (&pwr->e.lock); + return 1; + } + if (!pwr->have_seen_heartbeat) { struct nn_rdata *gap; struct nn_rsample_chain sc; int refc_adjust = 0; nn_reorder_result_t res; - nn_defrag_notegap (pwr->defrag, 1, lastseq + 1); gap = nn_rdata_newgap (rmsg); - if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, lastseq + 1, &refc_adjust)) > 0) - { - if (pwr->deliver_synchronously) - deliver_user_data_synchronously (&sc); - else - nn_dqueue_enqueue (pwr->dqueue, &sc, res); - } + res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, lastseq + 1, &refc_adjust); + /* proxy writer is not accepting data until it has received a heartbeat, so + there can't be any data to deliver */ + assert (res <= 0); + (void) res; nn_fragchain_adjust_refcount (gap, refc_adjust); pwr->have_seen_heartbeat = 1; } @@ -1243,7 +1252,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, firstseq, &refc_adjust)) > 0) { if (pwr->deliver_synchronously) - deliver_user_data_synchronously (&sc); + deliver_user_data_synchronously (&sc, NULL); else nn_dqueue_enqueue (pwr->dqueue, &sc, res); } @@ -1262,13 +1271,18 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct case PRMSS_OUT_OF_SYNC: { struct nn_reorder *ro = wn->u.not_in_sync.reorder; if ((res = nn_reorder_gap (&sc, ro, gap, 1, firstseq, &refc_adjust)) > 0) - nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res); + { + if (pwr->deliver_synchronously) + deliver_user_data_synchronously (&sc, &wn->rd_guid); + else + nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res); + } last_deliv_seq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; } } if (wn->u.not_in_sync.end_of_tl_seq == MAX_SEQ_NUMBER) { - wn->u.not_in_sync.end_of_out_of_sync_seq = wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN); + wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN); DDS_TRACE(" end-of-tl-seq(rd "PGUIDFMT" #%"PRId64")", PGUID(wn->rd_guid), wn->u.not_in_sync.end_of_tl_seq); } maybe_set_reader_in_sync (pwr, wn, last_deliv_seq); @@ -1591,7 +1605,7 @@ static int handle_one_gap (struct proxy_writer *pwr, struct pwr_rd_match *wn, se if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, a, b, refc_adjust)) > 0) { if (pwr->deliver_synchronously) - deliver_user_data_synchronously (&sc); + deliver_user_data_synchronously (&sc, NULL); else nn_dqueue_enqueue (pwr->dqueue, &sc, res); } @@ -1617,13 +1631,15 @@ static int handle_one_gap (struct proxy_writer *pwr, struct pwr_rd_match *wn, se case PRMSS_TLCATCHUP: break; case PRMSS_OUT_OF_SYNC: - if (a <= wn->u.not_in_sync.end_of_out_of_sync_seq) + if ((res = nn_reorder_gap (&sc, wn->u.not_in_sync.reorder, gap, a, b, refc_adjust)) > 0) { - if ((res = nn_reorder_gap (&sc, wn->u.not_in_sync.reorder, gap, a, b, refc_adjust)) > 0) + if (pwr->deliver_synchronously) + deliver_user_data_synchronously (&sc, &wn->rd_guid); + else nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res); - if (res >= 0) - gap_was_valuable = 1; } + if (res >= 0) + gap_was_valuable = 1; break; } @@ -1970,94 +1986,90 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st goto no_payload; } - /* Generate the DDS_SampleInfo (which is faked to some extent - because we don't actually have a data reader); also note that - the PRISMTECH_WRITER_INFO thing is completely meaningless to - us */ + because we don't actually have a data reader); also note that + the PRISMTECH_WRITER_INFO thing is completely meaningless to + us */ + struct ddsi_tkmap_instance * tk; + if ((tk = ddsi_tkmap_lookup_instance_ref(payload)) != NULL) { - struct ddsi_tkmap_instance * tk; - tk = ddsi_tkmap_lookup_instance_ref(payload); - if (tk) + struct proxy_writer_info pwr_info; + make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos); + + if (rdguid == NULL) { - struct proxy_writer_info pwr_info; - make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos); + DDS_TRACE(" %"PRId64"=>EVERYONE\n", sampleinfo->seq); - if (rdguid == NULL) + /* FIXME: pwr->rdary is an array of pointers to attached + readers. There's only one thread delivering data for the + proxy writer (as long as there is only one receive thread), + so could get away with not locking at all, and doing safe + updates + GC of rdary instead. */ + + /* Retry loop, for re-delivery of rejected reliable samples. Is a + temporary hack till throttling back of writer is implemented + (with late acknowledgement of sample and nack). */ + retry: + + ddsrt_mutex_lock (&pwr->rdary.rdary_lock); + if (pwr->rdary.fastpath_ok) { - DDS_TRACE(" %"PRId64"=>EVERYONE\n", sampleinfo->seq); - - /* FIXME: pwr->rdary is an array of pointers to attached - readers. There's only one thread delivering data for the - proxy writer (as long as there is only one receive thread), - so could get away with not locking at all, and doing safe - updates + GC of rdary instead. */ - - /* Retry loop, for re-delivery of rejected reliable samples. Is a - temporary hack till throttling back of writer is implemented - (with late acknowledgement of sample and nack). */ -retry: - - ddsrt_mutex_lock (&pwr->rdary.rdary_lock); - if (pwr->rdary.fastpath_ok) + struct reader ** const rdary = pwr->rdary.rdary; + unsigned i; + for (i = 0; rdary[i]; i++) { - struct reader ** const rdary = pwr->rdary.rdary; - unsigned i; - for (i = 0; rdary[i]; i++) + DDS_TRACE("reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid)); + if (! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk)) { - DDS_TRACE("reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid)); - if (! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk)) - { - if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); - dds_sleepfor (DDS_MSECS (10)); - if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); - goto retry; - } + if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); + dds_sleepfor (DDS_MSECS (10)); + if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + goto retry; } - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); } - else - { - /* When deleting, pwr is no longer accessible via the hash - tables, and consequently, a reader may be deleted without - it being possible to remove it from rdary. The primary - reason rdary exists is to avoid locking the proxy writer - but this is less of an issue when we are deleting it, so - we fall back to using the GUIDs so that we can deliver all - samples we received from it. As writer being deleted any - reliable samples that are rejected are simply discarded. */ - ddsrt_avl_iter_t it; - struct pwr_rd_match *m; - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); - if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); - for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) - { - struct reader *rd; - if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) - { - DDS_TRACE("reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid)); - (void) (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); - } - } - if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - } - - ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1)); + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); } else { - struct reader *rd = ephash_lookup_reader_guid (rdguid);; - DDS_TRACE(" %"PRId64"=>"PGUIDFMT"%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?"); - while (rd && ! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk) && ephash_lookup_proxy_writer_guid (&pwr->e.guid)) + /* When deleting, pwr is no longer accessible via the hash + tables, and consequently, a reader may be deleted without + it being possible to remove it from rdary. The primary + reason rdary exists is to avoid locking the proxy writer + but this is less of an issue when we are deleting it, so + we fall back to using the GUIDs so that we can deliver all + samples we received from it. As writer being deleted any + reliable samples that are rejected are simply discarded. */ + ddsrt_avl_iter_t it; + struct pwr_rd_match *m; + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); + if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) { - if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - dds_sleepfor (DDS_MSECS (1)); - if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + struct reader *rd; + if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL && m->in_sync == PRMSS_SYNC) + { + DDS_TRACE("reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid)); + (void) (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); + } } + if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); } - ddsi_tkmap_instance_unref (tk); + + ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1)); } + else + { + struct reader *rd = ephash_lookup_reader_guid (rdguid);; + DDS_TRACE(" %"PRId64"=>"PGUIDFMT"%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?"); + while (rd && ! (ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk) && ephash_lookup_proxy_writer_guid (&pwr->e.guid)) + { + if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); + dds_sleepfor (DDS_MSECS (1)); + if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + } + } + ddsi_tkmap_instance_unref (tk); } ddsi_serdata_unref (payload); no_payload: @@ -2072,7 +2084,7 @@ int user_dqueue_handler (const struct nn_rsample_info *sampleinfo, const struct return res; } -static void deliver_user_data_synchronously (struct nn_rsample_chain *sc) +static void deliver_user_data_synchronously (struct nn_rsample_chain *sc, const nn_guid_t *rdguid) { while (sc->first) { @@ -2084,7 +2096,7 @@ static void deliver_user_data_synchronously (struct nn_rsample_chain *sc) sample_lost events. Also note that the synchronous path is _never_ used for historical data, and therefore never has the GUID of a reader to deliver to */ - deliver_user_data (e->sampleinfo, e->fragchain, NULL, 1); + deliver_user_data (e->sampleinfo, e->fragchain, rdguid, 1); } nn_fragchain_unref (e->fragchain); } @@ -2215,7 +2227,7 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct by the current mishandling of resource limits */ if (*deferred_wakeup) dd_dqueue_enqueue_trigger (*deferred_wakeup); - deliver_user_data_synchronously (&sc); + deliver_user_data_synchronously (&sc, NULL); } else { @@ -2226,28 +2238,22 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct *deferred_wakeup = pwr->dqueue; } } - if (pwr->n_readers_out_of_sync > 0) - { - /* Those readers catching up with TL but in sync with the proxy - writer may have become in sync with the proxy writer and the - writer; those catching up with TL all by themselves go through - the "TOO_OLD" path below. */ - ddsrt_avl_iter_t it; - struct pwr_rd_match *wn; - for (wn = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); wn != NULL; wn = ddsrt_avl_iter_next (&it)) - if (wn->in_sync == PRMSS_TLCATCHUP) - maybe_set_reader_in_sync (pwr, wn, sampleinfo->seq); - } } - else if (rres == NN_REORDER_TOO_OLD) + + if (pwr->n_readers_out_of_sync > 0) { + /* Those readers catching up with TL but in sync with the proxy + writer may have become in sync with the proxy writer and the + writer; those catching up with TL all by themselves go through + the "TOO_OLD" path below. */ + ddsrt_avl_iter_t it; struct pwr_rd_match *wn; struct nn_rsample *rsample_dup = NULL; int reuse_rsample_dup = 0; - for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn != NULL; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn)) + for (wn = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); wn != NULL; wn = ddsrt_avl_iter_next (&it)) { nn_reorder_result_t rres2; - if (wn->in_sync != PRMSS_OUT_OF_SYNC || sampleinfo->seq > wn->u.not_in_sync.end_of_out_of_sync_seq) + if (wn->in_sync == PRMSS_SYNC) continue; if (!reuse_rsample_dup) rsample_dup = nn_reorder_rsample_dup (rmsg, rsample); @@ -2273,17 +2279,26 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct in-order, and those few microseconds can't hurt in catching up on transient-local data. See also NN_REORDER_DELIVER case in outer switch. */ - nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, rres2); + if (pwr->deliver_synchronously) + { + /* FIXME: just in case the synchronous delivery runs into a delay caused + by the current mishandling of resource limits */ + deliver_user_data_synchronously (&sc, &wn->rd_guid); + } + else + { + if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) + { + dd_dqueue_enqueue_trigger (*deferred_wakeup); + *deferred_wakeup = NULL; + } + nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, rres2); + } break; } } } -#ifndef NDEBUG - else - { - assert (rres == NN_REORDER_ACCEPT || rres == NN_REORDER_REJECT); - } -#endif + nn_fragchain_adjust_refcount (fragchain, refc_adjust); } ddsrt_mutex_unlock (&pwr->e.lock); diff --git a/src/core/xtests/CMakeLists.txt b/src/core/xtests/CMakeLists.txt index 02b7bfd..72bdb94 100644 --- a/src/core/xtests/CMakeLists.txt +++ b/src/core/xtests/CMakeLists.txt @@ -9,18 +9,5 @@ # # SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause # -idlc_generate(RhcTypes RhcTypes.idl) - -add_executable(rhc_torture rhc_torture.c) - -target_include_directories( - rhc_torture PRIVATE - "$" - "$") - -target_link_libraries(rhc_torture RhcTypes ddsc) - -add_test( - NAME rhc_torture - COMMAND rhc_torture 314159265 0 5000 0) -set_property(TEST rhc_torture PROPERTY TIMEOUT 20) +add_subdirectory(rhc_torture) +add_subdirectory(initsampledeliv) diff --git a/src/core/xtests/initsampledeliv/CMakeLists.txt b/src/core/xtests/initsampledeliv/CMakeLists.txt new file mode 100644 index 0000000..e4804fa --- /dev/null +++ b/src/core/xtests/initsampledeliv/CMakeLists.txt @@ -0,0 +1,20 @@ +# +# Copyright(c) 2019 ADLINK Technology Limited and others +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +# v. 1.0 which is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +# +cmake_minimum_required(VERSION 3.5) + +idlc_generate(InitSampleDeliv_lib InitSampleDelivData.idl) + +add_executable(InitSampleDelivPub publisher.c) +add_executable(InitSampleDelivSub subscriber.c) + +target_link_libraries(InitSampleDelivPub InitSampleDeliv_lib ddsc) +target_link_libraries(InitSampleDelivSub InitSampleDeliv_lib ddsc) diff --git a/src/core/xtests/initsampledeliv/InitSampleDelivData.idl b/src/core/xtests/initsampledeliv/InitSampleDelivData.idl new file mode 100644 index 0000000..1064232 --- /dev/null +++ b/src/core/xtests/initsampledeliv/InitSampleDelivData.idl @@ -0,0 +1,19 @@ +// Copyright(c) 2019 ADLINK Technology Limited and others +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +// v. 1.0 which is available at +// http://www.eclipse.org/org/documents/edl-v10.php. +// +// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + +struct Msg { + long keyval; + long seq; + long tldepth; + long final_seq; + long seq_at_match[2]; +}; + +#pragma keylist Msg keyval diff --git a/src/core/xtests/initsampledeliv/publisher.c b/src/core/xtests/initsampledeliv/publisher.c new file mode 100644 index 0000000..a690c16 --- /dev/null +++ b/src/core/xtests/initsampledeliv/publisher.c @@ -0,0 +1,163 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include "dds/dds.h" +#include "dds/ddsrt/atomics.h" +#include "InitSampleDelivData.h" +#include +#include +#include + +static void oops (const char *file, int line) +{ + fflush (stdout); + fprintf (stderr, "%s:%d\n", file, line); + abort (); +} + +#define oops() oops(__FILE__, __LINE__) + +static void on_pub_matched (dds_entity_t wr, const dds_publication_matched_status_t st, void *varg) +{ + ddsrt_atomic_uint32_t *new_readers = varg; + dds_sample_info_t info; + void *raw = NULL; + dds_entity_t rd; + printf ("pubmatched\n"); + if ((rd = dds_create_reader (dds_get_participant (wr), DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL)) < 0) + oops (); + if (dds_read_instance (rd, &raw, &info, 1, 1, st.last_subscription_handle) != 1) + oops (); + const dds_builtintopic_endpoint_t *sample = raw; + /* in our test the user data must be present */ + void *ud; + size_t udsz; + if (!dds_qget_userdata (sample->qos, &ud, &udsz)) + oops (); + int rdid = atoi (ud); + if (rdid < 0 || rdid > 31) + oops (); + printf ("pubmatched: %d\n", rdid); + fflush (stdout); + ddsrt_atomic_or32 (new_readers, UINT32_C (1) << rdid); + dds_free (ud); + dds_return_loan (rd, &raw, 1); +} + +static uint32_t get_publication_matched_count (dds_entity_t wr) +{ + dds_publication_matched_status_t status; + if (dds_get_publication_matched_status (wr, &status) < 0) + oops (); + return status.current_count; +} + +int main (int argc, char ** argv) +{ + dds_entity_t ppant; + dds_entity_t tp; + dds_entity_t wr; + dds_qos_t *qos; + ddsrt_atomic_uint32_t newreaders = DDSRT_ATOMIC_UINT32_INIT (0); + int opt; + bool flag_prewrite = false; + bool flag_translocal = false; + const int32_t tlhist = 10; + + while ((opt = getopt (argc, argv, "tp")) != EOF) + { + switch (opt) + { + case 't': + flag_translocal = true; + break; + case 'p': + flag_prewrite = true; + break; + default: + fprintf (stderr, "usage error: see source code\n"); + exit (2); + } + } + + if ((ppant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL)) < 0) + oops (); + + qos = dds_create_qos (); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10)); + dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL); + if ((tp = dds_create_topic (ppant, &Msg_desc, "Msg", qos, NULL)) < 0) + oops (); + + /* Writer has overrides for history, durability */ + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + dds_qset_durability (qos, flag_translocal ? DDS_DURABILITY_TRANSIENT_LOCAL : DDS_DURABILITY_VOLATILE); + dds_qset_durability_service (qos, 0, DDS_HISTORY_KEEP_LAST, tlhist, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + + dds_listener_t *list = dds_create_listener (&newreaders); + dds_lset_publication_matched (list, on_pub_matched); + if ((wr = dds_create_writer (ppant, tp, qos, list)) < 0) + oops (); + dds_delete_listener (list); + dds_delete_qos (qos); + + Msg sample = { + .keyval = 0, + .seq = 1, + .tldepth = tlhist, + .final_seq = 30, + .seq_at_match = { 0, 0 } + }; + dds_time_t tlast = 0, tnewrd = 0; + while (sample.seq <= sample.final_seq) + { + uint32_t newrd = ddsrt_atomic_and32_ov (&newreaders, 0); + for (uint32_t i = 0; i < 32; i++) + { + if (newrd & (UINT32_C (1) << i)) + { + if (i >= (uint32_t) (sizeof (sample.seq_at_match) / sizeof (sample.seq_at_match[0]))) + oops (); + if (sample.seq_at_match[i] != 0) + oops (); + sample.seq_at_match[i] = sample.seq; + tnewrd = dds_time (); + printf ("%d.%09d newreader %d: start seq %d\n", (int) (tnewrd / DDS_NSECS_IN_SEC), (int) (tnewrd % DDS_NSECS_IN_SEC), (int) i, (int) sample.seq_at_match[i]); + fflush (stdout); + } + } + + if (get_publication_matched_count (wr) || (flag_prewrite && sample.seq <= tlhist + 1)) + { + dds_time_t tnow = dds_time (); + if (tnow - tlast > DDS_MSECS (100) || newrd) + { + if (dds_write (wr, &sample) < 0) + oops (); + sample.seq++; + tlast = tnow; + if (sample.seq > sample.final_seq) + { + tnow = dds_time (); + printf ("%d.%09d done writing\n", (int) (tnow / DDS_NSECS_IN_SEC), (int) (tnow % DDS_NSECS_IN_SEC)); + fflush (stdout); + } + } + } + + dds_sleepfor (DDS_MSECS (1)); + } + + dds_sleepfor (DDS_MSECS (100)); + dds_wait_for_acks (wr, DDS_INFINITY); + dds_delete (ppant); + return 0; +} diff --git a/src/core/xtests/initsampledeliv/runtest b/src/core/xtests/initsampledeliv/runtest new file mode 100644 index 0000000..9ba03b8 --- /dev/null +++ b/src/core/xtests/initsampledeliv/runtest @@ -0,0 +1,46 @@ +#!/bin/bash +# +# Copyright(c) 2019 ADLINK Technology Limited and others +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +# v. 1.0 which is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +# +ok=true +for sd in "" "-d1" "-d12" ; do + for st in "" "-t" ; do + for sT in "" "-T" ; do + if [ "$st" = "-t" -o "$sT" = "-T" ] ; then + maybeV=false + else + maybeV=true + fi + for sw in "" "-w" ; do + for pt in "" "-t" ; do + for pp in "" "-p" ; do + if [ "$sT" = "" -a "$sd" != "" -a \( "$pt" = "-t" -o $maybeV = true \) ] ; then + if $ok ; then + echo "bin/InitSampleDelivSub $sw $sd $st $sT & bin/InitSampleDelivPub $pt $pp" + bin/InitSampleDelivSub $sw $sd $st $sT & spid=$! + bin/InitSampleDelivPub $pt $pp + wait $spid || ok=false + fi + if $ok ; then + echo "bin/InitSampleDelivPub $pt $pp & sleep 2 ; bin/InitSampleDelivSub $sw $sd $st $sT " + bin/InitSampleDelivPub $pt $pp & ppid=$! + sleep 2 + bin/InitSampleDelivSub $sw $sd $st $sT & spid=$! + wait $spid || ok=false + wait + fi + fi + done + done + done + done + done +done diff --git a/src/core/xtests/initsampledeliv/subscriber.c b/src/core/xtests/initsampledeliv/subscriber.c new file mode 100644 index 0000000..1cb6ad6 --- /dev/null +++ b/src/core/xtests/initsampledeliv/subscriber.c @@ -0,0 +1,198 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include "dds/dds.h" +#include "InitSampleDelivData.h" +#include +#include +#include +#include +#include + +static void oops (const char *file, int line) +{ + fflush (stdout); + fprintf (stderr, "%s:%d\n", file, line); + abort (); +} + +#define oops() oops(__FILE__, __LINE__) + +static void wait_for_writer (dds_entity_t ppant) +{ + dds_entity_t rd; + dds_sample_info_t info; + void *raw = NULL; + int32_t n; + if ((rd = dds_create_reader (ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0) + oops (); + bool done = false; + do { + dds_sleepfor (DDS_MSECS (100)); + while ((n = dds_take (rd, &raw, &info, 1, 1)) == 1) + { + const dds_builtintopic_endpoint_t *sample = raw; + if (strcmp (sample->topic_name, "Msg") == 0) + done = true; + dds_return_loan (rd, &raw, n); + } + if (n < 0) oops (); + } while (!done); + dds_delete (rd); +} + +static uint32_t get_subscription_matched_count (dds_entity_t rd) +{ + dds_subscription_matched_status_t status; + if (dds_get_subscription_matched_status (rd, &status) < 0) + oops (); + return status.current_count; +} + +int main (int argc, char ** argv) +{ + dds_entity_t ppant; + dds_entity_t tp; + dds_entity_t rd[2] = { 0, 0 }; + dds_qos_t *qos; + int opt; + bool flag_wait = false; + bool flag_translocal[sizeof (rd) / sizeof (rd[0])] = { false }; + int flag_create_2nd_rd = -1; + + while ((opt = getopt (argc, argv, "d:tTw")) != EOF) + { + switch (opt) + { + case 'd': + flag_create_2nd_rd = atoi (optarg); + break; + case 't': + flag_translocal[0] = true; + break; + case 'T': + flag_translocal[1] = true; + break; + case 'w': + flag_wait = true; + break; + default: + fprintf (stderr, "usage error: see source code\n"); + exit (2); + } + } + + if ((ppant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL)) < 0) + oops (); + + qos = dds_create_qos (); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10)); + dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL); + if ((tp = dds_create_topic (ppant, &Msg_desc, "Msg", qos, NULL)) < 0) + oops (); + + if (flag_wait) + { + printf ("waiting for writer ...\n"); + fflush (stdout); + wait_for_writer (ppant); + printf ("writer seen; giving it some time to discover us and publish data ...\n"); + fflush (stdout); + dds_sleepfor (DDS_SECS (1)); + printf ("continuing ...\n"); + fflush (stdout); + } + + /* Reader has overrides for history, durability */ + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + dds_qset_durability (qos, flag_translocal[0] ? DDS_DURABILITY_TRANSIENT_LOCAL : DDS_DURABILITY_VOLATILE); + dds_qset_userdata (qos, "0", 1); + if ((rd[0] = dds_create_reader (ppant, tp, qos, NULL)) < 0) + oops (); + + dds_qset_durability (qos, flag_translocal[1] ? DDS_DURABILITY_TRANSIENT_LOCAL : DDS_DURABILITY_VOLATILE); + dds_qset_userdata (qos, "1", 1); + + int32_t firstmsg[2] = { 0 }; + int32_t prevmsg[2] = { 0 }; + int32_t seqatmatch[2] = { 0 }; + int32_t tldepth = 0; + int32_t endmsg = 0; + while (prevmsg[0] == 0 || get_subscription_matched_count (rd[0]) > 0) + { + void *raw = NULL; + dds_sample_info_t info; + int32_t n; + for (int i = 0; i < 2 && rd[i]; i++) + { + if ((n = dds_take (rd[i], &raw, &info, 1, 1)) < 0) + oops (); + else if (n > 0 && info.valid_data) + { + const Msg *msg = raw; + if (prevmsg[i] == 0) + { + /* have to postpone first seq# check for transient-local data because the limit + t-l history means the first sample we read may have an arbitrary sequence + that antedated the matching */ + printf ("reader %d: first seq %d\n", i, (int) msg->seq); + fflush (stdout); + firstmsg[i] = msg->seq; + } + else if (msg->seq != prevmsg[i] + 1) + { + printf ("reader %d: received %d, previous %d\n", i, (int) msg->seq, (int) prevmsg[i]); + oops (); + } + prevmsg[i] = msg->seq; + endmsg = msg->final_seq; + tldepth = msg->tldepth; + if (seqatmatch[i] == 0) + seqatmatch[i] = msg->seq_at_match[i]; + dds_return_loan (rd[i], &raw, n); + } + } + if (rd[1] == 0 && prevmsg[0] == flag_create_2nd_rd) + { + if ((rd[1] = dds_create_reader (ppant, tp, qos, NULL)) < 0) + oops (); + } + dds_sleepfor (DDS_MSECS (10)); + } + if (tldepth == 0 || endmsg == 0) + oops (); + for (int i = 0; i < 2; i++) + { + if (rd[i] == 0) + continue; + if (prevmsg[i] != endmsg) + oops (); + int32_t refseq; + if (!flag_translocal[i]) + refseq = seqatmatch[i]; + else if (seqatmatch[i] <= tldepth) + refseq = 1; + else + refseq = seqatmatch[i] - tldepth; + if (flag_translocal[i] ? (firstmsg[i] > refseq + 1) : firstmsg[i] > refseq) + { + /* allow the rare cases where an additional sample was received for volatile data + (for t-l data, the publisher waits to give so the subscriber can get the data + in time */ + printf ("reader %d: first seq %d but refseq %d\n", i, (int) firstmsg[i], refseq); + oops (); + } + } + + dds_delete_qos (qos); + dds_delete (ppant); + return 0; +} diff --git a/src/core/xtests/rhc_torture/CMakeLists.txt b/src/core/xtests/rhc_torture/CMakeLists.txt new file mode 100644 index 0000000..4aa3a5f --- /dev/null +++ b/src/core/xtests/rhc_torture/CMakeLists.txt @@ -0,0 +1,26 @@ +# +# Copyright(c) 2019 ADLINK Technology Limited and others +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +# v. 1.0 which is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +# +idlc_generate(RhcTypes RhcTypes.idl) + +add_executable(rhc_torture rhc_torture.c) + +target_include_directories( + rhc_torture PRIVATE + "$" + "$") + +target_link_libraries(rhc_torture RhcTypes ddsc) + +add_test( + NAME rhc_torture + COMMAND rhc_torture 314159265 0 5000 0) +set_property(TEST rhc_torture PROPERTY TIMEOUT 20) diff --git a/src/core/xtests/RhcTypes.idl b/src/core/xtests/rhc_torture/RhcTypes.idl similarity index 100% rename from src/core/xtests/RhcTypes.idl rename to src/core/xtests/rhc_torture/RhcTypes.idl diff --git a/src/core/xtests/rhc_torture.c b/src/core/xtests/rhc_torture/rhc_torture.c similarity index 100% rename from src/core/xtests/rhc_torture.c rename to src/core/xtests/rhc_torture/rhc_torture.c diff --git a/src/mpt/tests/basic/procs/hello.c b/src/mpt/tests/basic/procs/hello.c index 2c68a84..8623ac0 100644 --- a/src/mpt/tests/basic/procs/hello.c +++ b/src/mpt/tests/basic/procs/hello.c @@ -173,13 +173,7 @@ MPT_ProcessEntry(hello_subscriber, printf("--- [Subscriber(%d)] Start(%d) ...\n", id, domainid); - /* - * A reliable volatile sample, written after publication matched, can still - * be lost when the subscriber wasn't able to match its subscription yet. - * Use transient_local reliable to make sure the sample is received. - */ qos = dds_create_qos(); - dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_SECS(10)); /* Use listener to get data available trigger. */ @@ -206,8 +200,7 @@ MPT_ProcessEntry(hello_subscriber, ddsrt_mutex_lock(&g_mutex); recv_cnt = 0; while (recv_cnt < sample_cnt) { - /* Use a take with mask to work around the #146 issue. */ - rc = dds_take_mask(reader, samples, infos, MAX_SAMPLES, MAX_SAMPLES, DDS_NEW_VIEW_STATE); + rc = dds_take(reader, samples, infos, MAX_SAMPLES, MAX_SAMPLES); MPT_ASSERT_GEQ(rc, 0, "Could not read: %s\n", dds_strretcode(-rc)); /* Check if we read some data and it is valid. */