From 63b1a7179b5055e48797e0ec22b4ea7c2a4cbea3 Mon Sep 17 00:00:00 2001
From: Erik Boasson This setting controls the delay between sending identical acknowledgements. The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day. The default value is: "10 ms". This element controls which network interfaces are assumed to be capable of multicasting even when the interface flags returned by the operating system state it is not (this provides a workaround for some platforms). It is a comma-separated lists of patterns (with ? and * wildcards) against which the interface names are matched. The default value is: "". This setting controls the delay between receipt of a HEARTBEAT indicating missing samples and a NACK (ignored when the HEARTBEAT requires an answer). However, no NACK is sent if a NACK had been scheduled already for a response earlier than the delay requests: then that NACK will incorporate the latest information. The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day. The default value is: "10 ms". The default value is: "100 ms". This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, expressed in bytes. A writer is suspended when the WHC reaches this size. The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (210 bytes), MB & MiB (220 bytes), GB & GiB (230 bytes). The default value is: "100 kB". The default value is: "500 kB".
This setting controls the delay between sending identical " + "acknowledgements.
"), + UNIT("duration")), STRING("AutoReschedNackDelay", NULL, 1, "1 s", MEMBER(auto_resched_nack_delay), FUNCTIONS(0, uf_duration_inf, 0, pf_duration), diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h b/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h index bcf81ec..733e4e4 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h @@ -947,7 +947,7 @@ bool decode_DataFrag(const struct ddsi_domaingv *gv, struct nn_rsample_info *sam * @param[in] pwr Writer for which the message is intended. * @param[in] rd_guid Origin reader guid. */ -void encode_datareader_submsg(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, struct proxy_writer *pwr, const struct ddsi_guid *rd_guid); +void encode_datareader_submsg(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, const struct proxy_writer *pwr, const struct ddsi_guid *rd_guid); /** * @brief Encode datawriter submessage when necessary. @@ -1332,7 +1332,7 @@ inline void encode_datareader_submsg( UNUSED_ARG(struct nn_xmsg *msg), UNUSED_ARG(struct nn_xmsg_marker sm_marker), - UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const struct proxy_writer *pwr), UNUSED_ARG(const struct ddsi_guid *rd_guid)) { } diff --git a/src/core/ddsi/include/dds/ddsi/q_config.h b/src/core/ddsi/include/dds/ddsi/q_config.h index a73ff29..1da2169 100644 --- a/src/core/ddsi/include/dds/ddsi/q_config.h +++ b/src/core/ddsi/include/dds/ddsi/q_config.h @@ -337,6 +337,7 @@ struct config int multicast_ttl; struct config_maybe_uint32 socket_min_rcvbuf_size; uint32_t socket_min_sndbuf_size; + int64_t ack_delay; int64_t nack_delay; int64_t preemptive_ack_delay; int64_t schedule_time_rounding; diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 2dfa554..b87d36a 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -129,7 +129,7 @@ struct wr_prd_match { seqno_t max_seq; /* sort-of highest ack'd seq nr in subtree (see augment function) */ seqno_t seq; /* highest acknowledged seq nr */ seqno_t last_seq; /* highest seq send to this reader used when filter is applied */ - int32_t num_reliable_readers_where_seq_equals_max; + uint32_t num_reliable_readers_where_seq_equals_max; ddsi_guid_t arbitrary_unacked_reader; nn_count_t prev_acknack; /* latest accepted acknack sequence number */ nn_count_t prev_nackfrag; /* latest accepted nackfrag sequence number */ @@ -150,6 +150,13 @@ enum pwr_rd_match_syncstate { PRMSS_OUT_OF_SYNC /* not in sync with proxy writer */ }; +struct last_nack_summary { + seqno_t seq_end_p1; /* last seq for which we requested a retransmit */ + seqno_t seq_base; + uint32_t frag_end_p1; /* last fragnum of seq_last_nack for which requested a retransmit */ + uint32_t frag_base; +}; + struct pwr_rd_match { ddsrt_avl_node_t avlnode; ddsi_guid_t rd_guid; @@ -158,12 +165,18 @@ struct pwr_rd_match { nn_count_t prev_heartbeat; /* latest accepted heartbeat (see also add_proxy_writer_to_reader) */ ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */ ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ - ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ - seqno_t seq_last_nack; /* last seq for which we requested a retransmit */ + ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ + ddsrt_mtime_t t_last_ack; /* (local) time we last sent any ACKNACK */ seqno_t last_seq; /* last known sequence number from this writer */ + struct last_nack_summary last_nack; struct xevent *acknack_xevent; /* entry in xevent queue for sending acknacks */ enum pwr_rd_match_syncstate in_sync; /* whether in sync with the proxy writer */ - unsigned filtered:1; + unsigned ack_requested : 1; /* set on receipt of HEARTBEAT with FINAL clear, cleared on sending an ACKNACK */ + unsigned heartbeat_since_ack : 1; /* set when a HEARTBEAT has been received since the last ACKNACK */ + unsigned heartbeatfrag_since_ack : 1; /* set when a HEARTBEATFRAG has been received since the last ACKNACK */ + unsigned directed_heartbeat : 1; /* set on receipt of a directed heartbeat, cleared on sending an ACKNACK */ + unsigned nack_sent_on_nackdelay : 1; /* set when the most recent NACK sent was because of the NackDelay */ + unsigned filtered : 1; union { struct { seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */ @@ -304,7 +317,7 @@ struct writer uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */ uint32_t rexmit_burst_size_limit; /* derived from reader's receive_buffer_size */ uint32_t num_readers; /* total number of matching PROXY readers */ - int32_t num_reliable_readers; /* number of matching reliable PROXY readers */ + uint32_t num_reliable_readers; /* number of matching reliable PROXY readers */ ddsrt_avl_tree_t readers; /* all matching PROXY readers, see struct wr_prd_match */ ddsrt_avl_tree_t local_readers; /* all matching LOCAL readers, see struct wr_rd_match */ #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS @@ -438,10 +451,9 @@ struct proxy_writer { int32_t n_reliable_readers; /* number of those that are reliable */ int32_t n_readers_out_of_sync; /* number of those that require special handling (accepting historical data, waiting for historical data set to become complete) */ seqno_t last_seq; /* highest known seq published by the writer, not last delivered */ - uint32_t last_fragnum; /* last known frag for last_seq, or ~0u if last_seq not partial */ + uint32_t last_fragnum; /* last known frag for last_seq, or UINT32_MAX if last_seq not partial */ nn_count_t nackfragcount; /* last nackfrag seq number */ ddsrt_atomic_uint32_t next_deliv_seq_lowword; /* lower 32-bits for next sequence number that will be delivered; for generating acks; 32-bit so atomic reads on all supported platforms */ - unsigned last_fragnum_reset: 1; /* iff set, heartbeat advertising last_seq as highest seq resets last_fragnum */ unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */ unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */ unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */ diff --git a/src/core/ddsi/include/dds/ddsi/q_protocol.h b/src/core/ddsi/include/dds/ddsi/q_protocol.h index fbb87cb..f60fd98 100644 --- a/src/core/ddsi/include/dds/ddsi/q_protocol.h +++ b/src/core/ddsi/include/dds/ddsi/q_protocol.h @@ -247,7 +247,7 @@ typedef struct AckNack { DDSRT_WARNING_MSVC_ON(4200) #define ACKNACK_FLAG_FINAL 0x02u #define ACKNACK_SIZE(numbits) (offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits) + 4) -#define ACKNACK_SIZE_MAX ACKNACK_SIZE (256u) +#define ACKNACK_SIZE_MAX ACKNACK_SIZE (NN_SEQUENCE_NUMBER_SET_MAX_BITS) DDSRT_WARNING_MSVC_OFF(4200) typedef struct Gap { @@ -260,7 +260,7 @@ typedef struct Gap { } Gap_t; DDSRT_WARNING_MSVC_ON(4200) #define GAP_SIZE(numbits) (offsetof (Gap_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits)) -#define GAP_SIZE_MAX GAP_SIZE (256u) +#define GAP_SIZE_MAX GAP_SIZE (NN_SEQUENCE_NUMBER_SET_MAX_BITS) typedef struct InfoTS { SubmessageHeader_t smhdr; @@ -300,7 +300,7 @@ typedef struct NackFrag { } NackFrag_t; DDSRT_WARNING_MSVC_ON(4200) #define NACKFRAG_SIZE(numbits) (offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits) + 4) -#define NACKFRAG_SIZE_MAX NACKFRAG_SIZE (256u) +#define NACKFRAG_SIZE_MAX NACKFRAG_SIZE (NN_FRAGMENT_NUMBER_SET_MAX_BITS) typedef union Submessage { SubmessageHeader_t smhdr; diff --git a/src/core/ddsi/include/dds/ddsi/q_radmin.h b/src/core/ddsi/include/dds/ddsi/q_radmin.h index d32168a..ba89518 100644 --- a/src/core/ddsi/include/dds/ddsi/q_radmin.h +++ b/src/core/ddsi/include/dds/ddsi/q_radmin.h @@ -223,7 +223,15 @@ struct nn_defrag *nn_defrag_new (const struct ddsrt_log_cfg *logcfg, enum nn_def void nn_defrag_free (struct nn_defrag *defrag); struct nn_rsample *nn_defrag_rsample (struct nn_defrag *defrag, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo); void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1); -int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz); + +enum nn_defrag_nackmap_result { + DEFRAG_NACKMAP_UNKNOWN_SAMPLE, + DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN, + DEFRAG_NACKMAP_FRAGMENTS_MISSING +}; + +enum nn_defrag_nackmap_result nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz); + void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min); struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_reorder_mode mode, uint32_t max_samples, bool late_ack_mode); @@ -232,8 +240,8 @@ struct nn_rsample *nn_reorder_rsample_dup_first (struct nn_rmsg *rmsg, struct nn struct nn_rdata *nn_rsample_fragchain (struct nn_rsample *rsample); nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rsample *rsampleiv, int *refcount_adjust, int delivery_queue_full_p); nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rdata *rdata, seqno_t min, seqno_t maxp1, int *refcount_adjust); -int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq); -unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail); +int nn_reorder_wantsample (const struct nn_reorder *reorder, seqno_t seq); +unsigned nn_reorder_nackmap (const struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail); seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder); void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq); diff --git a/src/core/ddsi/src/ddsi_acknack.c b/src/core/ddsi/src/ddsi_acknack.c new file mode 100644 index 0000000..ba52e42 --- /dev/null +++ b/src/core/ddsi/src/ddsi_acknack.c @@ -0,0 +1,495 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#include "dds/ddsrt/static_assert.h" +#include "dds/ddsi/q_rtps.h" +#include "dds/ddsi/q_radmin.h" +#include "dds/ddsi/q_misc.h" +#include "dds/ddsi/q_bswap.h" +#include "dds/ddsi/q_xmsg.h" +#include "dds/ddsi/q_log.h" +#include "dds/ddsi/q_bitset.h" +#include "dds/ddsi/ddsi_domaingv.h" +#include "dds/ddsi/ddsi_acknack.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/ddsi_security_omg.h" + +#define ACK_REASON_IN_FLAGS 0 + +static seqno_t next_deliv_seq (const struct proxy_writer *pwr, const seqno_t next_seq) +{ + /* We want to determine next_deliv_seq, the next sequence number to + be delivered to all in-sync readers, so that we can acknowledge + what we have actually delivered. This is different from next_seq + tracks, which tracks the sequence number up to which all samples + have been received. The difference is the delivery queue. + + There is always but a single delivery queue, and hence delivery + thread, associated with a single proxy writer; but the ACKs are + always generated by another thread. Therefore, updates to + next_deliv_seq need only be atomic with respect to these reads. + On all supported platforms we can atomically load and store 32 + bits without issue, and so we store just the low word of the + sequence number. + + We know 1 <= next_deliv_seq AND next_seq - N <= next_deliv_seq <= + next_seq for N << 2**32. With n = next_seq, nd = next_deliv_seq, + H the upper half and L the lower half: + + - H(nd) <= H(n) <= H(nd)+1 { n >= nd AND N << 2*32} + - H(n) = H(nd) => L(n) >= L(nd) { n >= nd } + - H(n) = H(nd)+1 => L(n) < L(nd) { N << 2*32 } + + Therefore: + + L(n) < L(nd) <=> H(n) = H(nd+1) + + a.k.a.: + + nd = nd' - if nd' > nd then 2**32 else 0 + where nd' = 2**32 * H(n) + L(nd) + + By not locking next_deliv_seq, we may have nd a bit lower than it + could be, but that only means we are acknowledging slightly less + than we could; but that is perfectly acceptible. + + FIXME: next_seq - #dqueue could probably be used instead, + provided #dqueue is decremented after delivery, rather than + before delivery. */ + const uint32_t lw = ddsrt_atomic_ld32 (&pwr->next_deliv_seq_lowword); + seqno_t next_deliv_seq; + next_deliv_seq = (next_seq & ~(seqno_t) UINT32_MAX) | lw; + if (next_deliv_seq > next_seq) + next_deliv_seq -= ((seqno_t) 1) << 32; + assert (0 < next_deliv_seq && next_deliv_seq <= next_seq); + return next_deliv_seq; +} + +static void add_AckNack_getsource (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct nn_reorder **reorder, seqno_t *bitmap_base, int *notail) +{ + /* if in sync, look at proxy writer status, else look at proxy-writer--reader match status */ + if (rwn->in_sync == PRMSS_OUT_OF_SYNC || rwn->filtered) + { + *reorder = rwn->u.not_in_sync.reorder; + *bitmap_base = nn_reorder_next_seq (*reorder); + *notail = 0; + } + else + { + *reorder = pwr->reorder; + if (!pwr->e.gv->config.late_ack_mode) + { + *bitmap_base = nn_reorder_next_seq (*reorder); + *notail = 0; + } + else + { + *bitmap_base = next_deliv_seq (pwr, nn_reorder_next_seq (*reorder)); + *notail = nn_dqueue_is_full (pwr->dqueue); + } + } +} + +DDSRT_STATIC_ASSERT ((NN_SEQUENCE_NUMBER_SET_MAX_BITS % 32) == 0 && (NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); +struct add_AckNack_info { + bool nack_sent_on_nackdelay; +#if ACK_REASON_IN_FLAGS + uint8_t flags; +#endif + struct { + struct nn_sequence_number_set_header set; + uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; + } acknack; + struct { + seqno_t seq; + struct nn_fragment_number_set_header set; + uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; + } nackfrag; +}; + +static bool add_AckNack_makebitmaps (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct add_AckNack_info *info) +{ + struct nn_reorder *reorder; + seqno_t bitmap_base; + int notail; /* notail = false: all known missing ones are nack'd */ + add_AckNack_getsource (pwr, rwn, &reorder, &bitmap_base, ¬ail); + + /* Make bitmap; note that we've made sure to have room for the maximum bitmap size. */ + const seqno_t last_seq = rwn->filtered ? rwn->last_seq : pwr->last_seq; + const uint32_t numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &info->acknack.set, info->acknack.bits, NN_SEQUENCE_NUMBER_SET_MAX_BITS, notail); + if (numbits == 0) + { + info->nackfrag.seq = 0; + return false; + } + + /* Scan through bitmap, cutting it off at the first missing sample that the defragmenter + knows about. Then note the sequence number & add a NACKFRAG for that sample */ + info->nackfrag.seq = 0; + const seqno_t base = fromSN (info->acknack.set.bitmap_base); + for (uint32_t i = 0; i < numbits; i++) + { + if (!nn_bitset_isset (numbits, info->acknack.bits, i)) + continue; + + const seqno_t seq = base + i; + const uint32_t fragnum = (seq == pwr->last_seq) ? pwr->last_fragnum : UINT32_MAX; + switch (nn_defrag_nackmap (pwr->defrag, seq, fragnum, &info->nackfrag.set, info->nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS)) + { + case DEFRAG_NACKMAP_UNKNOWN_SAMPLE: + break; + case DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN: + /* Cut the NACK short (or make it an ACK if this is the first sample), no NACKFRAG */ + info->nackfrag.seq = 0; + info->acknack.set.numbits = i; + return (i > 0); + case DEFRAG_NACKMAP_FRAGMENTS_MISSING: + /* Cut the NACK short, NACKFRAG */ + info->nackfrag.seq = seq; + info->acknack.set.numbits = i; + return true; + } + } + return true; +} + +static void add_NackFrag (struct nn_xmsg *msg, const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, const struct add_AckNack_info *info) +{ + struct nn_xmsg_marker sm_marker; + NackFrag_t *nf; + + assert (info->nackfrag.set.numbits > 0 && info->nackfrag.set.numbits <= NN_FRAGMENT_NUMBER_SET_MAX_BITS); + nf = nn_xmsg_append (msg, &sm_marker, NACKFRAG_SIZE (info->nackfrag.set.numbits)); + + nn_xmsg_submsg_init (msg, sm_marker, SMID_NACK_FRAG); + nf->readerId = nn_hton_entityid (rwn->rd_guid.entityid); + nf->writerId = nn_hton_entityid (pwr->e.guid.entityid); + nf->writerSN = toSN (info->nackfrag.seq); +#if ACK_REASON_IN_FLAGS + nf->smhdr.flags |= info->flags; +#endif + // We use 0-based fragment numbers, but externally have to provide 1-based fragment numbers */ + nf->fragmentNumberState.bitmap_base = info->nackfrag.set.bitmap_base + 1; + nf->fragmentNumberState.numbits = info->nackfrag.set.numbits; + memcpy (nf->bits, info->nackfrag.bits, NN_FRAGMENT_NUMBER_SET_BITS_SIZE (info->nackfrag.set.numbits)); + + // Count field is at a variable offset ... silly DDSI spec + nn_count_t * const countp = + (nn_count_t *) ((char *) nf + offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nf->fragmentNumberState.numbits)); + *countp = pwr->nackfragcount; + + nn_xmsg_submsg_setnext (msg, sm_marker); + + if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) + { + ETRACE (pwr, "nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", + pwr->nackfragcount, fromSN (nf->writerSN), + nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); + for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) + ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); + } + + // Encode the sub-message when needed + encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid); +} + +static void add_AckNack (struct nn_xmsg *msg, const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, const struct add_AckNack_info *info) +{ + /* If pwr->have_seen_heartbeat == 0, no heartbeat has been received + by this proxy writer yet, so we'll be sending a pre-emptive + AckNack. NACKing data now will most likely cause another NACK + upon reception of the first heartbeat, and so cause the data to + be resent twice. */ + AckNack_t *an; + struct nn_xmsg_marker sm_marker; + + an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX); + nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK); + an->readerId = nn_hton_entityid (rwn->rd_guid.entityid); + an->writerId = nn_hton_entityid (pwr->e.guid.entityid); + + // set FINAL flag late, in case it is decided that the "response_required" flag + // should be set depending on the exact AckNack/NackFrag generated + an->smhdr.flags |= ACKNACK_FLAG_FINAL; +#if ACK_REASON_IN_FLAGS + an->smhdr.flags |= info->flags; +#endif + an->readerSNState = info->acknack.set; + memcpy (an->bits, info->acknack.bits, NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits)); + + // Count field is at a variable offset ... silly DDSI spec + nn_count_t * const countp = + (nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits)); + *countp = rwn->count; + // Reset submessage size, now that we know the real size, and update the offset to the next submessage. + nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits)); + nn_xmsg_submsg_setnext (msg, sm_marker); + + if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) + { + ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": F#%"PRIu32":%"PRId64"/%"PRIu32":", + PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, + fromSN (an->readerSNState.bitmap_base), an->readerSNState.numbits); + for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) + ETRACE (pwr, "%c", nn_bitset_isset (an->readerSNState.numbits, an->bits, ui) ? '1' : '0'); + } + + // Encode the sub-message when needed + encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid); +} + +static enum add_AckNack_result get_AckNack_info (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct last_nack_summary *nack_summary, struct add_AckNack_info *info, bool ackdelay_passed, bool nackdelay_passed) +{ + /* If pwr->have_seen_heartbeat == 0, no heartbeat has been received + by this proxy writer yet, so we'll be sending a pre-emptive + AckNack. NACKing data now will most likely cause another NACK + upon reception of the first heartbeat, and so cause the data to + be resent twice. */ + enum add_AckNack_result result; + +#if ACK_REASON_IN_FLAGS + info->flags = 0; +#endif + if (!add_AckNack_makebitmaps (pwr, rwn, info)) + { + info->nack_sent_on_nackdelay = rwn->nack_sent_on_nackdelay; + nack_summary->seq_base = fromSN (info->acknack.set.bitmap_base); + nack_summary->seq_end_p1 = 0; + nack_summary->frag_base = 0; + nack_summary->frag_end_p1 = 0; + result = AANR_ACK; + } + else + { + // [seq_base:0 .. seq_end_p1:0) + [seq_end_p1:frag_base .. seq_end_p1:frag_end_p1) if frag_end_p1 > 0 + const seqno_t seq_base = fromSN (info->acknack.set.bitmap_base); + assert (seq_base >= 1 && (info->acknack.set.numbits > 0 || info->nackfrag.seq > 0)); + assert (info->nackfrag.seq == 0 || info->nackfrag.set.numbits > 0); + const seqno_t seq_end_p1 = seq_base + info->acknack.set.numbits; + const uint32_t frag_base = (info->nackfrag.seq > 0) ? info->nackfrag.set.bitmap_base : 0; + const uint32_t frag_end_p1 = (info->nackfrag.seq > 0) ? info->nackfrag.set.bitmap_base + info->nackfrag.set.numbits : 0; + + /* Let caller know whether it is a nack, and, in steady state, set + final to prevent a response if it isn't. The initial + (pre-emptive) acknack is different: it'd be nice to get a + heartbeat in response. + + Who cares about an answer to an acknowledgment!? -- actually, + that'd a very useful feature in combination with directed + heartbeats, or somesuch, to get reliability guarantees. */ + nack_summary->seq_end_p1 = seq_end_p1; + nack_summary->frag_end_p1 = frag_end_p1; + nack_summary->seq_base = seq_base; + nack_summary->frag_base = frag_base; + + // [seq_base:0 .. seq_end_p1:0) and [seq_end_p1:frag_base .. seq_end_p1:frag_end_p1) if frag_end_p1 > 0 + if (seq_base > rwn->last_nack.seq_end_p1 || (seq_base == rwn->last_nack.seq_end_p1 && frag_base >= rwn->last_nack.frag_end_p1)) + { + // A NACK for something not previously NACK'd or NackDelay passed, update nack_{seq,frag} to reflect + // the changed state + info->nack_sent_on_nackdelay = false; +#if ACK_REASON_IN_FLAGS + info->flags = 0x10; +#endif + result = AANR_NACK; + } + else if (rwn->directed_heartbeat && (!rwn->nack_sent_on_nackdelay || nackdelay_passed)) + { + info->nack_sent_on_nackdelay = false; +#if ACK_REASON_IN_FLAGS + info->flags = 0x20; +#endif + result = AANR_NACK; + } + else if (nackdelay_passed) + { + info->nack_sent_on_nackdelay = true; +#if ACK_REASON_IN_FLAGS + info->flags = 0x30; +#endif + result = AANR_NACK; + } + else + { + // Overlap between this NACK and the previous one and NackDelay has not yet passed: clear numbits and + // nackfrag_numbits to turn the NACK into an ACK and pretend to the caller nothing scary is going on. +#if ACK_REASON_IN_FLAGS + info->flags = 0x40; +#endif + info->nack_sent_on_nackdelay = rwn->nack_sent_on_nackdelay; + info->acknack.set.numbits = 0; + info->nackfrag.seq = 0; + result = AANR_SUPPRESSED_NACK; + } + } + + if (result == AANR_ACK || result == AANR_SUPPRESSED_NACK) + { + // ACK and SUPPRESSED_NACK both end up being a pure ACK; send those only if we have to + if (!(rwn->heartbeat_since_ack && rwn->ack_requested)) + result = AANR_SUPPRESSED_ACK; // writer didn't ask for it + else if (!(nack_summary->seq_base > rwn->last_nack.seq_base || ackdelay_passed)) + result = AANR_SUPPRESSED_ACK; // no progress since last, not enough time passed + } + else if (info->acknack.set.numbits == 0 && info->nackfrag.seq > 0 && !rwn->ack_requested) + { + // if we are not NACK'ing full samples and we are NACK'ing fragments, skip the ACKNACK submessage if we + // have no interest in a HEARTBEAT and the writer hasn't asked for an ACKNACK since the last one we sent. + result = AANR_NACKFRAG_ONLY; + } + return result; +} + +void sched_acknack_if_needed (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack) +{ + // This is the relatively expensive and precise code to determine what the ACKNACK event will do, + // the alternative is to do: + // + // add_AckNack_getsource (pwr, rwn, &reorder, &bitmap_base, ¬ail); + // const seqno_t last_seq = rwn->filtered ? rwn->last_seq : pwr->last_seq; + // if (bitmap_base <= last_seq) + // (void) resched_xevent_if_earlier (ev, tnow); + // else if (!(rwn->heartbeat_since_ack && rwn->ack_requested)) + // ; // writer didn't ask for it + // else if (!(bitmap_base > rwn->last_nack.seq_base || ackdelay_passed)) + // ; // no progress since last, not enough time passed + // else + // (void) resched_xevent_if_earlier (ev, tnow); + // + // which is a stripped-down version of the same logic that more aggressively schedules the event, + // relying on the event handler to suppress unnecessary messages. There doesn't seem to be a big + // downside to being precise. + + struct ddsi_domaingv * const gv = pwr->e.gv; + const bool ackdelay_passed = (tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_ack, gv->config.ack_delay).v); + const bool nackdelay_passed = (tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay).v); + struct add_AckNack_info info; + struct last_nack_summary nack_summary; + const enum add_AckNack_result aanr = + get_AckNack_info (pwr, rwn, &nack_summary, &info, ackdelay_passed, nackdelay_passed); + if (aanr == AANR_SUPPRESSED_ACK) + ; // nothing to be done now + else if (avoid_suppressed_nack && aanr == AANR_SUPPRESSED_NACK) + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay)); + else + (void) resched_xevent_if_earlier (ev, tnow); +} + +struct nn_xmsg *make_and_resched_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack) +{ + struct ddsi_domaingv * const gv = pwr->e.gv; + struct nn_xmsg *msg; + struct add_AckNack_info info; + + struct last_nack_summary nack_summary; + const enum add_AckNack_result aanr = + get_AckNack_info (pwr, rwn, &nack_summary, &info, + tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_ack, gv->config.ack_delay).v, + tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay).v); + + if (aanr == AANR_SUPPRESSED_ACK) + return NULL; + else if (avoid_suppressed_nack && aanr == AANR_SUPPRESSED_NACK) + { + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay)); + return NULL; + } + + // Committing to sending a message in response: update the state. Note that there's still a + // possibility of not sending a message, but that is only in case of failures of some sort. + // Resetting the flags and bailing out simply means we will wait until the next heartbeat to + // do try again. + rwn->directed_heartbeat = 0; + rwn->heartbeat_since_ack = 0; + rwn->heartbeatfrag_since_ack = 0; + rwn->nack_sent_on_nackdelay = (info.nack_sent_on_nackdelay ? 1 : 0); + + struct participant *pp = NULL; + if (q_omg_proxy_participant_is_secure (pwr->c.proxypp)) + { + struct reader *rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &rwn->rd_guid); + if (rd) + pp = rd->c.pp; + } + + if ((msg = nn_xmsg_new (gv->xmsgpool, &rwn->rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) + { + return NULL; + } + + nn_xmsg_setdstPWR (msg, pwr); + if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v) + { + // If HB->ACK latency measurement is enabled, and we have a + // timestamp available, add it and clear the time stamp. There + // is no real guarantee that the two match, but I haven't got a + // solution for that yet ... If adding the time stamp fails, + // too bad, but no reason to get worried. */ + nn_xmsg_add_timestamp (msg, rwn->hb_timestamp); + rwn->hb_timestamp.v = 0; + } + + if (aanr != AANR_NACKFRAG_ONLY) + add_AckNack (msg, pwr, rwn, &info); + if (info.nackfrag.seq > 0) + { + ETRACE (pwr, " + "); + add_NackFrag (msg, pwr, rwn, &info); + } + ETRACE (pwr, "\n"); + if (nn_xmsg_size (msg) == 0) + { + // attempt at encoding the message caused it to be dropped + nn_xmsg_free (msg); + return NULL; + } + + rwn->count++; + switch (aanr) + { + case AANR_SUPPRESSED_ACK: + // no message: caught by the size = 0 check + assert (0); + break; + case AANR_ACK: + rwn->ack_requested = 0; + rwn->t_last_ack = tnow; + rwn->last_nack.seq_base = nack_summary.seq_base; + break; + case AANR_NACK: + case AANR_NACKFRAG_ONLY: + if (nack_summary.frag_end_p1 != 0) + pwr->nackfragcount++; + if (aanr != AANR_NACKFRAG_ONLY) + { + rwn->ack_requested = 0; + rwn->t_last_ack = tnow; + } + rwn->last_nack = nack_summary; + rwn->t_last_nack = tnow; + /* If NACKing, make sure we don't give up too soon: even though + we're not allowed to send an ACKNACK unless in response to a + HEARTBEAT, I've seen too many cases of not sending an NACK + because the writing side got confused ... Better to recover + eventually. */ + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, gv->config.auto_resched_nack_delay)); + break; + case AANR_SUPPRESSED_NACK: + rwn->ack_requested = 0; + rwn->t_last_ack = tnow; + rwn->last_nack.seq_base = nack_summary.seq_base; + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay)); + break; + } + GVTRACE ("send acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT")\n", PGUID (rwn->rd_guid), PGUID (pwr->e.guid)); + return msg; +} diff --git a/src/core/ddsi/src/ddsi_security_omg.c b/src/core/ddsi/src/ddsi_security_omg.c index 92c9529..c47a10a 100644 --- a/src/core/ddsi/src/ddsi_security_omg.c +++ b/src/core/ddsi/src/ddsi_security_omg.c @@ -3390,7 +3390,7 @@ bool decode_DataFrag (const struct ddsi_domaingv *gv, struct nn_rsample_info *sa return decode_payload (gv, sampleinfo, payloadp, &payloadsz, submsg_len); } -void encode_datareader_submsg (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, struct proxy_writer *pwr, const struct ddsi_guid *rd_guid) +void encode_datareader_submsg (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, const struct proxy_writer *pwr, const struct ddsi_guid *rd_guid) { /* FIXME: avoid this lookup */ struct reader * const rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, rd_guid); @@ -3964,7 +3964,7 @@ extern inline bool decode_DataFrag( extern inline void encode_datareader_submsg( UNUSED_ARG(struct nn_xmsg *msg), UNUSED_ARG(struct nn_xmsg_marker sm_marker), - UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const struct proxy_writer *pwr), UNUSED_ARG(const struct ddsi_guid *rd_guid)); extern inline void encode_datawriter_submsg( diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index f23a18e..f494415 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -275,8 +275,8 @@ static int print_proxy_participants (struct thread_state1 * const ts1, struct dd x += cpf (conn, " last_seq %"PRId64" last_fragnum %"PRIu32"\n", w->last_seq, w->last_fragnum); for (m = ddsrt_avl_iter_first (&wr_readers_treedef, &w->readers, &rdit); m; m = ddsrt_avl_iter_next (&rdit)) { - x += cpf (conn, " rd "PGUIDFMT" (nack %"PRId64" %"PRId64")\n", - PGUID (m->rd_guid), m->seq_last_nack, m->t_last_nack.v); + x += cpf (conn, " rd "PGUIDFMT" (nack %"PRId64" frag %"PRIu32" %"PRId64")\n", + PGUID (m->rd_guid), m->last_nack.seq_end_p1, m->last_nack.frag_end_p1, m->t_last_nack.v); switch (m->in_sync) { case PRMSS_SYNC: diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 35dbc42..b58c334 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2528,9 +2528,18 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader m->hb_timestamp.v = 0; m->t_heartbeat_accepted.v = 0; m->t_last_nack.v = 0; - m->seq_last_nack = 0; + m->t_last_ack.v = 0; + m->last_nack.seq_end_p1 = 0; + m->last_nack.seq_base = 0; + m->last_nack.frag_end_p1 = 0; + m->last_nack.frag_base = 0; m->last_seq = 0; m->filtered = 0; + m->ack_requested = 0; + m->heartbeat_since_ack = 0; + m->heartbeatfrag_since_ack = 0; + m->directed_heartbeat = 0; + m->nack_sent_on_nackdelay = 0; #ifdef DDSI_INCLUDE_SECURITY m->crypto_handle = crypto_handle; @@ -3411,7 +3420,7 @@ static void augment_wr_prd_match (void *vnode, const void *vleft, const void *vr /* seq < max cannot be true for a best-effort reader or a demoted */ n->arbitrary_unacked_reader = n->prd_guid; } - else if (n->is_reliable && (n->seq == MAX_SEQ_NUMBER || !n->has_replied_to_hb)) + else if (n->is_reliable && (n->seq == MAX_SEQ_NUMBER || n->seq == 0 || !n->has_replied_to_hb)) { /* demoted readers and reliable readers that have not yet replied to a heartbeat are candidates */ n->arbitrary_unacked_reader = n->prd_guid; @@ -5479,7 +5488,6 @@ int new_proxy_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid, pwr->last_seq = 0; pwr->last_fragnum = UINT32_MAX; pwr->nackfragcount = 1; - pwr->last_fragnum_reset = 0; pwr->alive = 1; pwr->alive_vclock = 0; pwr->filtered = 0; diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index 99f9120..6aa3694 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -1443,7 +1443,7 @@ void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1) defrag->max_sample = ddsrt_avl_find_max (&defrag_sampletree_treedef, &defrag->sampletree); } -int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz) +enum nn_defrag_nackmap_result nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz) { struct nn_rsample *s; struct nn_defrag_iv *iv; @@ -1455,7 +1455,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu if (maxfragnum == UINT32_MAX) { /* If neither the caller nor the defragmenter knows anything about the sample, say so */ - return -1; + return DEFRAG_NACKMAP_UNKNOWN_SAMPLE; } else { @@ -1468,7 +1468,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu map->numbits = maxfragnum + 1; map->bitmap_base = 0; nn_bitset_one (map->numbits, mapbits); - return (int) map->numbits; + return DEFRAG_NACKMAP_FRAGMENTS_MISSING; } } @@ -1505,7 +1505,9 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu /* if all data is available, iv == liv and map_end < map->bitmap_base, but there is nothing to request in that case. */ - map->numbits = (map_end < map->bitmap_base) ? 0 : map_end - map->bitmap_base + 1; + if (map_end < map->bitmap_base) + return DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN; + map->numbits = map_end - map->bitmap_base + 1; iv = ddsrt_avl_find_succ (&rsample_defrag_fragtree_treedef, &s->u.defrag.fragtree, iv); } @@ -1544,7 +1546,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu unsigned x = (unsigned) (i - map->bitmap_base); nn_bitset_set (map->numbits, mapbits, x); } - return (int) map->numbits; + return DEFRAG_NACKMAP_FRAGMENTS_MISSING; } /* There is only one defrag per proxy writer. However for the Volatile Secure writer a filter @@ -2308,7 +2310,7 @@ nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reord } } -int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq) +int nn_reorder_wantsample (const struct nn_reorder *reorder, seqno_t seq) { struct nn_rsample *s; if (seq < reorder->next_seq) @@ -2320,7 +2322,7 @@ int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq) return (s == NULL || s->u.reorder.maxp1 <= seq); } -unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail) +unsigned nn_reorder_nackmap (const struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail) { struct nn_rsample *iv; seqno_t i; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index b69dd1c..a7d5403 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -55,6 +55,7 @@ #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata_default.h" /* FIXME: get rid of this */ #include "dds/ddsi/ddsi_security_omg.h" +#include "dds/ddsi/ddsi_acknack.h" #include "dds/ddsi/sysdeps.h" #include "dds__whc.h" @@ -576,9 +577,13 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *prev_co the consequence undefined behaviour). Serial number arithmetic deals with the wrap-around after 2**31-1. + Cyclone pre-emptive heartbeats have "count" bitmap_base = 1, NACK + nothing, have count set to 0. They're never sent more often than + once per second, so the 500ms timeout allows them to pass through. + This combined procedure should give the best of all worlds, and is not more expensive in the common case. */ - const int64_t timeout = DDS_SECS (2); + const int64_t timeout = DDS_MSECS (500); if ((int32_t) (new_count - *prev_count) <= 0 && tnow.v - t_last_accepted->v < timeout && !force_accept) return 0; @@ -652,6 +657,7 @@ struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader * struct defer_hb_state { struct nn_xmsg *m; struct xeventq *evq; + int hbansreq; uint64_t wr_iid; uint64_t prd_iid; }; @@ -663,8 +669,16 @@ static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state * if (defer_hb_state->m != NULL) { if (wr->e.iid == defer_hb_state->wr_iid && prd->e.iid == defer_hb_state->prd_iid) - return; - qxev_msg (wr->evq, defer_hb_state->m); + { + if (hbansreq <= defer_hb_state->hbansreq) + return; + else + nn_xmsg_free (defer_hb_state->m); + } + else + { + qxev_msg (wr->evq, defer_hb_state->m); + } } ASSERT_MUTEX_HELD (&wr->e.lock); @@ -674,6 +688,7 @@ static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state * nn_xmsg_setdstPRD (defer_hb_state->m, prd); add_Heartbeat (defer_hb_state->m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0); defer_hb_state->evq = wr->evq; + defer_hb_state->hbansreq = hbansreq; defer_hb_state->wr_iid = wr->e.iid; defer_hb_state->prd_iid = prd->e.iid; } @@ -782,7 +797,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const relevant to setting "has_replied_to_hb" and "assumed_in_sync". */ is_pure_ack = !acknack_is_nack (msg); is_pure_nonhist_ack = is_pure_ack && seqbase - 1 >= rn->seq; - is_preemptive_ack = seqbase <= 1 && is_pure_ack; + is_preemptive_ack = seqbase < 1 || (seqbase == 1 && *countp == 0); wr->num_acks_received++; if (!is_pure_ack) @@ -864,7 +879,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const msgs_sent = 0; msgs_lost = 0; max_seq_in_reply = 0; - if (!rn->has_replied_to_hb && seqbase > 1 && is_pure_nonhist_ack) + if (!rn->has_replied_to_hb && is_pure_nonhist_ack) { RSTTRACE (" setting-has-replied-to-hb"); rn->has_replied_to_hb = 1; @@ -1125,6 +1140,7 @@ struct handle_Heartbeat_helper_arg { ddsrt_wctime_t timestamp; ddsrt_etime_t tnow; ddsrt_mtime_t tnow_mt; + bool directed_heartbeat; }; static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct handle_Heartbeat_helper_arg * const arg) @@ -1132,62 +1148,38 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand struct receiver_state * const rst = arg->rst; Heartbeat_t const * const msg = arg->msg; struct proxy_writer * const pwr = arg->pwr; - seqno_t refseq, last_seq; ASSERT_MUTEX_HELD (&pwr->e.lock); - /* Not supposed to respond to repeats and old heartbeats. */ + if (wn->acknack_xevent == NULL) + { + // Ignore best-effort readers + return; + } + if (!accept_ack_or_hb_w_timeout (msg->count, &wn->prev_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) { RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid)); return; } - /* Reference sequence number for determining whether or not to - Ack/Nack unfortunately depends on whether the reader is in - sync. */ - if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered) - refseq = nn_reorder_next_seq (pwr->reorder) - 1; - else - refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; - RSTTRACE (" "PGUIDFMT"@%"PRId64"%s", PGUID (wn->rd_guid), refseq, (wn->in_sync == PRMSS_SYNC) ? "(sync)" : (wn->in_sync == PRMSS_TLCATCHUP) ? "(tlcatchup)" : ""); - - /* Reschedule AckNack transmit if deemed appropriate; unreliable - readers have acknack_xevent == NULL and can't do this. - - There is no real need to send a nack from each reader that is in - sync -- indeed, we could simply ignore the destination address in - the messages we receive and only ever nack each sequence number - once, regardless of which readers care about it. */ - if (wn->acknack_xevent) + if (rst->gv->logconfig.c.mask & DDS_LC_TRACE) { - ddsrt_mtime_t tsched = DDSRT_MTIME_NEVER; - - if (wn->filtered) - last_seq = wn->last_seq; + seqno_t refseq; + if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered) + refseq = nn_reorder_next_seq (pwr->reorder); else - last_seq = pwr->last_seq; - if (last_seq > refseq) - { - RSTTRACE ("/NAK"); - if (arg->tnow_mt.v >= wn->t_last_nack.v + rst->gv->config.nack_delay || refseq >= wn->seq_last_nack) - tsched = arg->tnow_mt; - else - { - tsched.v = arg->tnow_mt.v + rst->gv->config.nack_delay; - RSTTRACE ("d"); - } - } - else if (!(msg->smhdr.flags & HEARTBEAT_FLAG_FINAL)) - { - tsched = arg->tnow_mt; - } - if (resched_xevent_if_earlier (wn->acknack_xevent, tsched)) - { - if (rst->gv->config.meas_hb_to_ack_latency && arg->timestamp.v) - wn->hb_timestamp = arg->timestamp; - } + refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder); + RSTTRACE (" "PGUIDFMT"@%"PRId64"%s", PGUID (wn->rd_guid), refseq - 1, (wn->in_sync == PRMSS_SYNC) ? "(sync)" : (wn->in_sync == PRMSS_TLCATCHUP) ? "(tlcatchup)" : ""); } + + wn->heartbeat_since_ack = 1; + if (!(msg->smhdr.flags & HEARTBEAT_FLAG_FINAL)) + wn->ack_requested = 1; + if (arg->directed_heartbeat) + wn->directed_heartbeat = 1; + + sched_acknack_if_needed (wn->acknack_xevent, pwr, wn, arg->tnow_mt, true); } static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid) @@ -1276,18 +1268,11 @@ static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, str if (lastseq > pwr->last_seq) { pwr->last_seq = lastseq; - pwr->last_fragnum = ~0u; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = UINT32_MAX; } - else if (pwr->last_fragnum != ~0u && lastseq == pwr->last_seq) + else if (pwr->last_fragnum != UINT32_MAX && lastseq == pwr->last_seq) { - if (!pwr->last_fragnum_reset) - pwr->last_fragnum_reset = 1; - else - { - pwr->last_fragnum = ~0u; - pwr->last_fragnum_reset = 0; - } + pwr->last_fragnum = UINT32_MAX; } nn_defrag_notegap (pwr->defrag, 1, firstseq); @@ -1375,6 +1360,7 @@ static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, str arg.timestamp = timestamp; arg.tnow = tnow; arg.tnow_mt = ddsrt_time_monotonic (); + arg.directed_heartbeat = (dst.entityid.u != NN_ENTITYID_UNKNOWN && vendor_is_eclipse (rst->vendor)); handle_forall_destinations (&dst, pwr, (ddsrt_avl_walk_t) handle_Heartbeat_helper, &arg); RSTTRACE (")"); @@ -1394,6 +1380,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et src.entityid = msg->writerId; dst.prefix = rst->dst_guid_prefix; dst.entityid = msg->readerId; + const bool directed_heartbeat = (dst.entityid.u != NN_ENTITYID_UNKNOWN && vendor_is_eclipse (rst->vendor)); RSTTRACE ("HEARTBEATFRAG(#%"PRId32":%"PRId64"/[1,%u]", msg->count, seq, fragnum+1); if (!rst->forme) @@ -1424,12 +1411,16 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et { pwr->last_seq = seq; pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; } else if (seq == pwr->last_seq && fragnum > pwr->last_fragnum) { pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; + } + + if (!pwr->have_seen_heartbeat) + { + ddsrt_mutex_unlock(&pwr->e.lock); + return 1; } /* Defragmenting happens at the proxy writer, readers have nothing @@ -1445,33 +1436,59 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et if (nn_reorder_wantsample (pwr->reorder, seq)) { - /* Pick an arbitrary reliable reader's guid for the response -- - assuming a reliable writer -> unreliable reader is rare, and - so scanning the readers is acceptable if the first guess - fails */ - m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers); - if (m->acknack_xevent == NULL) + if (directed_heartbeat) { - m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); - while (m && m->acknack_xevent == NULL) - m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); + /* Cyclone currently only ever sends a HEARTBEAT(FRAG) with the + destination entity id set AFTER retransmitting any samples + that reader requested. So it makes sense to only interpret + those for that reader, and to suppress the NackDelay in a + response to it. But it better be a reliable reader! */ + m = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst); + if (m && m->acknack_xevent == NULL) + m = NULL; + } + else + { + /* Pick an arbitrary reliable reader's guid for the response -- + assuming a reliable writer -> unreliable reader is rare, and + so scanning the readers is acceptable if the first guess + fails */ + m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers); + if (m->acknack_xevent == NULL) + { + m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); + while (m && m->acknack_xevent == NULL) + m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); + } } } else if (seq < nn_reorder_next_seq (pwr->reorder)) { - /* Check out-of-sync readers -- should add a bit to cheaply test - whether there are any (usually there aren't) */ - m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); - while (m) + if (directed_heartbeat) { - if ((m->in_sync == PRMSS_OUT_OF_SYNC) && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq)) + m = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst); + if (m && !(m->in_sync == PRMSS_OUT_OF_SYNC && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq))) { - /* If reader is out-of-sync, and reader is realiable, and + /* Ignore if reader is happy or not best-effort */ + m = NULL; + } + } + else + { + /* Check out-of-sync readers -- should add a bit to cheaply test + whether there are any (usually there aren't) */ + m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); + while (m) + { + if (m->in_sync == PRMSS_OUT_OF_SYNC && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq)) + { + /* If reader is out-of-sync, and reader is realiable, and reader still wants this particular sample, then use this reader to decide which fragments to nack */ - break; + break; + } + m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); } - m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); } } @@ -1479,19 +1496,20 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et RSTTRACE (" no interested reliable readers"); else { - /* Check if we are missing something */ + if (directed_heartbeat) + m->directed_heartbeat = 1; + m->heartbeatfrag_since_ack = 1; + DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); struct { struct nn_fragment_number_set_header set; uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; } nackfrag; - if (nn_defrag_nackmap (pwr->defrag, seq, fragnum, &nackfrag.set, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS) > 0) + const seqno_t last_seq = m->filtered ? m->last_seq : pwr->last_seq; + if (seq == last_seq && nn_defrag_nackmap (pwr->defrag, seq, fragnum, &nackfrag.set, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS) == DEFRAG_NACKMAP_FRAGMENTS_MISSING) { - /* Yes we are (note that this potentially also happens for - samples we no longer care about) */ - int64_t delay = rst->gv->config.nack_delay; - RSTTRACE ("/nackfrag"); - (void) resched_xevent_if_earlier (m->acknack_xevent, ddsrt_mtime_add_duration (ddsrt_time_monotonic(), delay)); + // don't rush it ... + resched_xevent_if_earlier (m->acknack_xevent, ddsrt_mtime_add_duration (ddsrt_time_monotonic (), pwr->e.gv->config.nack_delay)); } } } @@ -1579,6 +1597,7 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons const uint32_t base = msg->fragmentNumberState.bitmap_base - 1; assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX); uint32_t nfrags_lim = (wr->rexmit_burst_size_limit + wr->e.gv->config.fragment_size - 1) / wr->e.gv->config.fragment_size; + bool sent = false; RSTTRACE (" scheduling requested frags ...\n"); for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && nfrags_lim > 0; i++) { @@ -1590,9 +1609,17 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons else if (!qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0)) nfrags_lim = 0; else + { + sent = true; nfrags_lim--; + } } } + if (sent && sample.unacked) + { + if (!wr->retransmitting) + writer_set_retransmitting (wr); + } whc_return_sample (wr->whc, &sample, false); } else @@ -1867,8 +1894,7 @@ static int handle_Gap (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn if (listbase + last_included_rel > pwr->last_seq) { pwr->last_seq = listbase + last_included_rel; - pwr->last_fragnum = ~0u; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = UINT32_MAX; } if (wn && wn->filtered) @@ -2220,7 +2246,7 @@ static void clean_defrag (struct proxy_writer *pwr) } static void handle_regular (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, - uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease) + uint32_t max_fragnum_in_msg, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease) { struct proxy_writer *pwr; struct nn_rsample *rsample; @@ -2293,13 +2319,11 @@ static void handle_regular (struct receiver_state *rst, ddsrt_etime_t tnow, stru if (sampleinfo->seq > pwr->last_seq) { pwr->last_seq = sampleinfo->seq; - pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = max_fragnum_in_msg; } - else if (sampleinfo->seq == pwr->last_seq && fragnum > pwr->last_fragnum) + else if (sampleinfo->seq == pwr->last_seq && max_fragnum_in_msg > pwr->last_fragnum) { - pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = max_fragnum_in_msg; } clean_defrag (pwr); @@ -2571,12 +2595,12 @@ static int handle_Data (struct receiver_state *rst, ddsrt_etime_t tnow, struct n renew_manbypp_lease = false; /* fall through */ default: - handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, renew_manbypp_lease); + handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, UINT32_MAX, rdata, deferred_wakeup, renew_manbypp_lease); } } else { - handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, true); + handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, UINT32_MAX, rdata, deferred_wakeup, true); } } RSTTRACE (")"); diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 7bd4168..d2ec15f 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -167,8 +167,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru } else { - const int32_t n_unacked = wr->num_reliable_readers - root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max; - assert (n_unacked >= 0); + const uint32_t n_unacked = wr->num_reliable_readers - root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max; if (n_unacked == 0) prd_guid = NULL; else @@ -188,7 +187,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru ETRACE (wr, "unicasting to prd "PGUIDFMT" ", PGUID (*prd_guid)); ETRACE (wr, "(rel-prd %"PRId32" seq-eq-max %"PRId32" seq %"PRId64" maxseq %"PRId64")\n", wr->num_reliable_readers, - ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, + ddsrt_avl_is_empty (&wr->readers) ? -1 : (int32_t) root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, wr->seq, ddsrt_avl_is_empty (&wr->readers) ? (seqno_t) -1 : root_rdmatch (wr)->max_seq); @@ -215,7 +214,9 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (msg, wr->partition_id); #endif - add_Heartbeat (msg, wr, whcst, hbansreq, 0, prd_guid->entityid, issync); + // send to all readers in the participant: whether or not the entityid is set affects + // the retransmit requests + add_Heartbeat (msg, wr, whcst, hbansreq, 0, to_entityid (NN_ENTITYID_UNKNOWN), issync); } /* It is possible that the encoding removed the submessage(s). */ @@ -335,7 +336,7 @@ struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state * ETRACE (wr, "writer_hbcontrol_p2p: wr "PGUIDFMT" unicasting to prd "PGUIDFMT" ", PGUID (wr->e.guid), PGUID (prd->e.guid)); ETRACE (wr, "(rel-prd %d seq-eq-max %d seq %"PRId64" maxseq %"PRId64")\n", wr->num_reliable_readers, - ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, + ddsrt_avl_is_empty (&wr->readers) ? -1 : (int32_t) root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, wr->seq, ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : root_rdmatch (wr)->max_seq); diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index ea1a4bd..9d80dfb 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -42,6 +42,7 @@ #include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_pmd.h" +#include "dds/ddsi/ddsi_acknack.h" #include "dds__whc.h" #include "dds/ddsi/sysdeps.h" @@ -776,220 +777,67 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, ddsrt } } -static seqno_t next_deliv_seq (const struct proxy_writer *pwr, const seqno_t next_seq) +static dds_duration_t preemptive_acknack_interval (const struct pwr_rd_match *rwn) { - /* We want to determine next_deliv_seq, the next sequence number to - be delivered to all in-sync readers, so that we can acknowledge - what we have actually delivered. This is different from next_seq - tracks, which tracks the sequence number up to which all samples - have been received. The difference is the delivery queue. - - There is always but a single delivery queue, and hence delivery - thread, associated with a single proxy writer; but the ACKs are - always generated by another thread. Therefore, updates to - next_deliv_seq need only be atomic with respect to these reads. - On all supported platforms we can atomically load and store 32 - bits without issue, and so we store just the low word of the - sequence number. - - We know 1 <= next_deliv_seq AND next_seq - N <= next_deliv_seq <= - next_seq for N << 2**32. With n = next_seq, nd = next_deliv_seq, - H the upper half and L the lower half: - - - H(nd) <= H(n) <= H(nd)+1 { n >= nd AND N << 2*32} - - H(n) = H(nd) => L(n) >= L(nd) { n >= nd } - - H(n) = H(nd)+1 => L(n) < L(nd) { N << 2*32 } - - Therefore: - - L(n) < L(nd) <=> H(n) = H(nd+1) - - a.k.a.: - - nd = nd' - if nd' > nd then 2**32 else 0 - where nd' = 2**32 * H(n) + L(nd) - - By not locking next_deliv_seq, we may have nd a bit lower than it - could be, but that only means we are acknowledging slightly less - than we could; but that is perfectly acceptible. - - FIXME: next_seq - #dqueue could probably be used instead, - provided #dqueue is decremented after delivery, rather than - before delivery. */ - const uint32_t lw = ddsrt_atomic_ld32 (&pwr->next_deliv_seq_lowword); - seqno_t next_deliv_seq; - next_deliv_seq = (next_seq & ~(seqno_t) UINT32_MAX) | lw; - if (next_deliv_seq > next_seq) - next_deliv_seq -= ((seqno_t) 1) << 32; - assert (0 < next_deliv_seq && next_deliv_seq <= next_seq); - return next_deliv_seq; + if (rwn->t_last_ack.v < rwn->tcreate.v) + return 0; + else + { + const dds_duration_t age = rwn->t_last_ack.v - rwn->tcreate.v; + if (age <= DDS_SECS (10)) + return DDS_SECS (1); + else if (age <= DDS_SECS (60)) + return DDS_SECS (2); + else if (age <= DDS_SECS (120)) + return DDS_SECS (5); + else + return DDS_SECS (10); + } } -static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct pwr_rd_match *rwn, seqno_t *nack_seq) +static struct nn_xmsg *make_preemptive_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow) { - /* If pwr->have_seen_heartbeat == 0, no heartbeat has been received - by this proxy writer yet, so we'll be sending a pre-emptive - AckNack. NACKing data now will most likely cause another NACK - upon reception of the first heartbeat, and so cause the data to - be resent twice. */ - const unsigned max_numbits = 256; /* as spec'd */ - int notail = 0; /* all known missing ones are nack'd */ - struct nn_reorder *reorder; - AckNack_t *an; + const dds_duration_t intv = preemptive_acknack_interval (rwn); + if (tnow.v < ddsrt_mtime_add_duration (rwn->t_last_ack, intv).v) + { + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_ack, intv)); + return NULL; + } + + struct ddsi_domaingv * const gv = pwr->e.gv; + struct participant *pp = NULL; + if (q_omg_proxy_participant_is_secure (pwr->c.proxypp)) + { + struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, &rwn->rd_guid); + if (rd) + pp = rd->c.pp; + } + + struct nn_xmsg *msg; + if ((msg = nn_xmsg_new (gv->xmsgpool, &rwn->rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) + { + // if out of memory, try again later + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, DDS_SECS (1))); + return NULL; + } + + nn_xmsg_setdstPWR (msg, pwr); struct nn_xmsg_marker sm_marker; - uint32_t i, numbits; - seqno_t base, last_seq; - - DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); - struct { - struct nn_fragment_number_set_header set; - uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; - } nackfrag; - int nackfrag_numbits; - seqno_t nackfrag_seq = 0; - seqno_t bitmap_base; - - ASSERT_MUTEX_HELD (pwr->e.lock); - - /* if in sync, look at proxy writer status, else look at - proxy-writer--reader match status */ - if (rwn->in_sync != PRMSS_OUT_OF_SYNC && !rwn->filtered) - { - reorder = pwr->reorder; - if (!pwr->e.gv->config.late_ack_mode) - bitmap_base = nn_reorder_next_seq (reorder); - else - { - bitmap_base = next_deliv_seq (pwr, nn_reorder_next_seq (reorder)); - if (nn_dqueue_is_full (pwr->dqueue)) - notail = 1; - } - } - else - { - reorder = rwn->u.not_in_sync.reorder; - bitmap_base = nn_reorder_next_seq (reorder); - } - - if (rwn->filtered) - last_seq = rwn->last_seq; - else - last_seq = pwr->last_seq; - - an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX); + AckNack_t *an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE (0)); nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK); an->readerId = nn_hton_entityid (rwn->rd_guid.entityid); an->writerId = nn_hton_entityid (pwr->e.guid.entityid); + an->readerSNState.bitmap_base = toSN (1); + an->readerSNState.numbits = 0; + nn_count_t * const countp = + (nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (0)); + *countp = 0; + nn_xmsg_submsg_setnext (msg, sm_marker); + encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid); - /* Make bitmap; note that we've made sure to have room for the - maximum bitmap size. */ - numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &an->readerSNState, an->bits, max_numbits, notail); - base = fromSN (an->readerSNState.bitmap_base); - - /* Scan through bitmap, cutting it off at the first missing sample - that the defragmenter knows about. Then note the sequence number - & add a NACKFRAG for that sample */ - nackfrag_numbits = -1; - for (i = 0; i < numbits && nackfrag_numbits < 0; i++) - { - uint32_t fragnum; - nackfrag_seq = base + i; - if (!nn_bitset_isset (numbits, an->bits, i)) - continue; - if (nackfrag_seq == last_seq) - fragnum = pwr->last_fragnum; - else - fragnum = UINT32_MAX; - nackfrag_numbits = nn_defrag_nackmap (pwr->defrag, nackfrag_seq, fragnum, &nackfrag.set, nackfrag.bits, max_numbits); - } - if (nackfrag_numbits >= 0) { - /* Cut the NACK short, NACKFRAG will be added after the NACK's is - properly formatted */ - assert (i > 0); - an->readerSNState.numbits = numbits = i - 1; - } - - /* Let caller know whether it is a nack, and, in steady state, set - final to prevent a response if it isn't. The initial - (pre-emptive) acknack is different: it'd be nice to get a - heartbeat in response. - - Who cares about an answer to an acknowledgment!? -- actually, - that'd a very useful feature in combination with directed - heartbeats, or somesuch, to get reliability guarantees. */ - *nack_seq = (numbits > 0) ? base + numbits : 0; - if (!pwr->have_seen_heartbeat) { - /* We must have seen a heartbeat for us to consider setting FINAL */ - } else if (*nack_seq && base + numbits <= last_seq) { - /* If it's a NACK and it doesn't cover samples all the way up to - the highest known sequence number, there's some reason to expect - we may to do another round. For which we need a Heartbeat. - - Note: last_seq exists, base is first in bitmap, numbits is - length of bitmap, hence less-than-or-equal. */ - } else { - /* An ACK or we think we'll get everything now. */ - an->smhdr.flags |= ACKNACK_FLAG_FINAL; - } - - { - /* Count field is at a variable offset ... silly DDSI spec. */ - nn_count_t *countp = - (nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits)); - *countp = ++rwn->count; - - /* Reset submessage size, now that we know the real size, and update - the offset to the next submessage. */ - nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits)); - nn_xmsg_submsg_setnext (msg, sm_marker); - - if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) - { - ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRIu32":%"PRId64"/%"PRIu32":", - PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, - base, an->readerSNState.numbits); - for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) - ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); - } - - /* Encode the sub-message when needed. */ - encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); - } - - if (nackfrag_numbits > 0) - { - NackFrag_t *nf; - - /* We use 0-based fragment numbers, but externally have to provide - 1-based fragment numbers */ - assert ((unsigned) nackfrag_numbits == nackfrag.set.numbits); - - nf = nn_xmsg_append (msg, &sm_marker, NACKFRAG_SIZE ((unsigned) nackfrag_numbits)); - - nn_xmsg_submsg_init (msg, sm_marker, SMID_NACK_FRAG); - nf->readerId = nn_hton_entityid (rwn->rd_guid.entityid); - nf->writerId = nn_hton_entityid (pwr->e.guid.entityid); - nf->writerSN = toSN (nackfrag_seq); - nf->fragmentNumberState.bitmap_base = nackfrag.set.bitmap_base + 1; - nf->fragmentNumberState.numbits = nackfrag.set.numbits; - memcpy (nf->bits, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nackfrag_numbits)); - - { - nn_count_t *countp = - (nn_count_t *) ((char *) nf + offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nf->fragmentNumberState.numbits)); - *countp = ++pwr->nackfragcount; - nn_xmsg_submsg_setnext (msg, sm_marker); - - ETRACE (pwr, " + nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); - for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) - ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); - } - - /* Encode the sub-message when needed. */ - encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); - } - - ETRACE (pwr, "\n"); + rwn->t_last_ack = tnow; + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_ack, intv)); + return msg; } static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_mtime_t tnow) @@ -1007,7 +855,6 @@ static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_m struct proxy_writer *pwr; struct nn_xmsg *msg; struct pwr_rd_match *rwn; - nn_locator_t loc; if ((pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, &ev->u.acknack.pwr_guid)) == NULL) { @@ -1021,88 +868,26 @@ static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_m return; } - if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc)) - { - struct participant *pp = NULL; - seqno_t nack_seq; - - if (q_omg_proxy_participant_is_secure(pwr->c.proxypp)) - { - struct reader *rd = entidx_lookup_reader_guid(pwr->e.gv->entity_index, &ev->u.acknack.rd_guid); - - if (rd) - pp = rd->c.pp; - } - - if ((msg = nn_xmsg_new (gv->xmsgpool, &ev->u.acknack.rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) - goto outofmem; - nn_xmsg_setdst1 (gv, msg, &ev->u.acknack.pwr_guid.prefix, &loc); - if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v) - { - /* If HB->ACK latency measurement is enabled, and we have a - timestamp available, add it and clear the time stamp. There - is no real guarantee that the two match, but I haven't got a - solution for that yet ... If adding the time stamp fails, - too bad, but no reason to get worried. */ - nn_xmsg_add_timestamp (msg, rwn->hb_timestamp); - rwn->hb_timestamp.v = 0; - } - add_AckNack (msg, pwr, rwn, &nack_seq); - if (nn_xmsg_size(msg) == 0) - { - /* No AckNack added. */ - nn_xmsg_free(msg); - msg = NULL; - } - else if (nack_seq) - { - rwn->t_last_nack = tnow; - rwn->seq_last_nack = nack_seq; - /* If NACKing, make sure we don't give up too soon: even though - we're not allowed to send an ACKNACK unless in response to a - HEARTBEAT, I've seen too many cases of not sending an NACK - because the writing side got confused ... Better to recover - eventually. */ - (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, gv->config.auto_resched_nack_delay)); - } - GVTRACE ("send acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT")\n", - PGUID (ev->u.acknack.rd_guid), PGUID (ev->u.acknack.pwr_guid)); - } - else - { - GVTRACE ("skip acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT"): no address\n", - PGUID (ev->u.acknack.rd_guid), PGUID (ev->u.acknack.pwr_guid)); + if (!pwr->have_seen_heartbeat) + msg = make_preemptive_acknack (ev, pwr, rwn, tnow); + else if (!(rwn->heartbeat_since_ack || rwn->heartbeatfrag_since_ack)) msg = NULL; - } - - if (!pwr->have_seen_heartbeat && tnow.v - rwn->tcreate.v <= DDS_SECS (300)) - { - /* Force pre-emptive AckNacks out until we receive a heartbeat, - but let the frequency drop over time and stop after a couple - of minutes. */ - int intv, age = (int) ((tnow.v - rwn->tcreate.v) / DDS_NSECS_IN_SEC + 1); - if (age <= 10) - intv = 1; - else if (age <= 60) - intv = 2; - else if (age <= 120) - intv = 5; - else - intv = 10; - (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, intv * DDS_NSECS_IN_SEC)); - } + else + msg = make_and_resched_acknack (ev, pwr, rwn, tnow, false); ddsrt_mutex_unlock (&pwr->e.lock); /* nn_xpack_addmsg may sleep (for bandwidth-limited channels), so must be outside the lock */ if (msg) - nn_xpack_addmsg (xp, msg, 0); - return; - - outofmem: - /* What to do if out of memory? Crash or burn? */ - ddsrt_mutex_unlock (&pwr->e.lock); - (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, DDS_MSECS (100))); + { + // a possible result of trying to encode a submessage is that it is removed, + // in which case we may end up with an empty one. + // FIXME: change encode_datareader_submsg so that it returns this and make it warn_unused_result + if (nn_xmsg_size (msg) == 0) + nn_xmsg_free (msg); + else + nn_xpack_addmsg (xp, msg, 0); + } } static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t *guid, struct proxy_reader *prd)