diff --git a/docs/manual/options.md b/docs/manual/options.md index 0683eac..f8d4f38 100644 --- a/docs/manual/options.md +++ b/docs/manual/options.md @@ -406,7 +406,7 @@ The default value is: "default". ### //CycloneDDS/Domain/Internal -Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [BurstSize](#cycloneddsdomaininternalburstsize), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration) +Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AckDelay](#cycloneddsdomaininternalackdelay), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [BurstSize](#cycloneddsdomaininternalburstsize), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration) The Internal elements deal with a variety of settings that evolving and that are not necessarily fully supported. For the vast majority of the Internal settings, the functionality per-se is supported, but the right to change the way the options control the functionality is reserved. This includes renaming or moving options. @@ -419,6 +419,16 @@ Proxy readers that are assumed to sill be retrieving historical data get this ma The default value is: "0". +#### //CycloneDDS/Domain/Internal/AckDelay +Number-with-unit + +This setting controls the delay between sending identical acknowledgements. + +The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day. + +The default value is: "10 ms". + + #### //CycloneDDS/Domain/Internal/AssumeMulticastCapable Text @@ -724,7 +734,7 @@ This setting controls the delay between receipt of a HEARTBEAT indicating missin The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day. -The default value is: "10 ms". +The default value is: "100 ms". #### //CycloneDDS/Domain/Internal/PreEmptiveAckDelay @@ -921,7 +931,7 @@ This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes). -The default value is: "100 kB". +The default value is: "500 kB". ##### //CycloneDDS/Domain/Internal/Watermarks/WhcHighInit @@ -1733,4 +1743,4 @@ While none prevents any message from being written to a DDSI2 log file. The categorisation of tracing output is incomplete and hence most of the verbosity levels and categories are not of much use in the current release. This is an ongoing process and here we describe the target situation rather than the current situation. Currently, the most useful verbosity levels are config, fine and finest. -The default value is: "none". \ No newline at end of file +The default value is: "none". diff --git a/etc/cyclonedds.rnc b/etc/cyclonedds.rnc index 1546c23..3e640aa 100644 --- a/etc/cyclonedds.rnc +++ b/etc/cyclonedds.rnc @@ -298,6 +298,13 @@ CycloneDDS configuration""" ] ] xsd:integer }? & [ a:documentation [ xml:lang="en" """ +

This setting controls the delay between sending identical acknowledgements.

+

The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.

+

The default value is: "10 ms".

""" ] ] + element AckDelay { + duration + }? + & [ a:documentation [ xml:lang="en" """

This element controls which network interfaces are assumed to be capable of multicasting even when the interface flags returned by the operating system state it is not (this provides a workaround for some platforms). It is a comma-separated lists of patterns (with ? and * wildcards) against which the interface names are matched.

The default value is: "".

""" ] ] element AssumeMulticastCapable { @@ -511,7 +518,7 @@ CycloneDDS configuration""" ] ] & [ a:documentation [ xml:lang="en" """

This setting controls the delay between receipt of a HEARTBEAT indicating missing samples and a NACK (ignored when the HEARTBEAT requires an answer). However, no NACK is sent if a NACK had been scheduled already for a response earlier than the delay requests: then that NACK will incorporate the latest information.

The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.

-

The default value is: "10 ms".

""" ] ] +

The default value is: "100 ms".

""" ] ] element NackDelay { duration }? @@ -650,7 +657,7 @@ CycloneDDS configuration""" ] ] & [ a:documentation [ xml:lang="en" """

This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, expressed in bytes. A writer is suspended when the WHC reaches this size.

The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (210 bytes), MB & MiB (220 bytes), GB & GiB (230 bytes).

-

The default value is: "100 kB".

""" ] ] +

The default value is: "500 kB".

""" ] ] element WhcHigh { memsize }? diff --git a/etc/cyclonedds.xsd b/etc/cyclonedds.xsd index cd2d0f4..136242b 100644 --- a/etc/cyclonedds.xsd +++ b/etc/cyclonedds.xsd @@ -472,6 +472,7 @@ CycloneDDS configuration + @@ -527,6 +528,14 @@ CycloneDDS configuration <p>The default value is: "0".</p> + + + +<p>This setting controls the delay between sending identical acknowledgements.</p> +<p>The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.</p> +<p>The default value is: "10 ms".</p> + + @@ -820,7 +829,7 @@ CycloneDDS configuration <p>This setting controls the delay between receipt of a HEARTBEAT indicating missing samples and a NACK (ignored when the HEARTBEAT requires an answer). However, no NACK is sent if a NACK had been scheduled already for a response earlier than the delay requests: then that NACK will incorporate the latest information.</p> <p>The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.</p> -<p>The default value is: "10 ms".</p> +<p>The default value is: "100 ms".</p> @@ -1010,7 +1019,7 @@ CycloneDDS configuration <p>This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, expressed in bytes. A writer is suspended when the WHC reaches this size.</p> <p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p> -<p>The default value is: "100 kB".</p> +<p>The default value is: "500 kB".</p> diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index 60e0e91..e4bf0b2 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -43,6 +43,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" ddsi_cdrstream.c ddsi_time.c ddsi_ownip.c + ddsi_acknack.c q_addrset.c q_bitset_inlines.c q_bswap.c @@ -116,6 +117,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" ddsi_ownip.h ddsi_cfgunits.h ddsi_cfgelems.h + ddsi_acknack.h q_addrset.h q_bitset.h q_bswap.h diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_acknack.h b/src/core/ddsi/include/dds/ddsi/ddsi_acknack.h new file mode 100644 index 0000000..d7ce138 --- /dev/null +++ b/src/core/ddsi/include/dds/ddsi/ddsi_acknack.h @@ -0,0 +1,43 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDSI_ACKNACK_H +#define DDSI_ACKNACK_H + +#include +#include + +#include "dds/ddsrt/time.h" +#include "dds/ddsi/q_xevent.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_protocol.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +enum add_AckNack_result { + AANR_SUPPRESSED_ACK, //!< sending nothing: too short a time since the last ACK + AANR_ACK, //!< sending an ACK and there's nothing to NACK + AANR_SUPPRESSED_NACK, //!< sending an ACK even though there are things to NACK + AANR_NACK, //!< sending a NACK, possibly also a NACKFRAG + AANR_NACKFRAG_ONLY //!< sending only a NACKFRAG +}; + +void sched_acknack_if_needed (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack); + +struct nn_xmsg *make_and_resched_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack); + +#if defined (__cplusplus) +} +#endif + +#endif /* DDSI_ACKNACK_H */ diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_cfgelems.h b/src/core/ddsi/include/dds/ddsi/ddsi_cfgelems.h index 2122812..27bea3c 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_cfgelems.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_cfgelems.h @@ -840,7 +840,7 @@ static struct cfgelem internal_watermarks_cfgelems[] = { "expressed in bytes. A suspended writer resumes transmitting when its " "Cyclone DDS WHC shrinks to this size.

"), UNIT("memsize")), - STRING("WhcHigh", NULL, 1, "100 kB", + STRING("WhcHigh", NULL, 1, "500 kB", MEMBER(whc_highwater_mark), FUNCTIONS(0, uf_memsize, 0, pf_memsize), DESCRIPTION( @@ -1194,7 +1194,7 @@ static struct cfgelem internal_cfgelems[] = { "operating system by default creates a larger buffer, it is left " "unchanged.

"), UNIT("memsize")), - STRING("NackDelay", NULL, 1, "10 ms", + STRING("NackDelay", NULL, 1, "100 ms", MEMBER(nack_delay), FUNCTIONS(0, uf_duration_ms_1hr, 0, pf_duration), DESCRIPTION( @@ -1204,6 +1204,13 @@ static struct cfgelem internal_cfgelems[] = { "scheduled already for a response earlier than the delay requests: " "then that NACK will incorporate the latest information.

"), UNIT("duration")), + STRING("AckDelay", NULL, 1, "10 ms", + MEMBER(ack_delay), + FUNCTIONS(0, uf_duration_ms_1hr, 0, pf_duration), + DESCRIPTION( + "

This setting controls the delay between sending identical " + "acknowledgements.

"), + UNIT("duration")), STRING("AutoReschedNackDelay", NULL, 1, "1 s", MEMBER(auto_resched_nack_delay), FUNCTIONS(0, uf_duration_inf, 0, pf_duration), diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h b/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h index bcf81ec..733e4e4 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h @@ -947,7 +947,7 @@ bool decode_DataFrag(const struct ddsi_domaingv *gv, struct nn_rsample_info *sam * @param[in] pwr Writer for which the message is intended. * @param[in] rd_guid Origin reader guid. */ -void encode_datareader_submsg(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, struct proxy_writer *pwr, const struct ddsi_guid *rd_guid); +void encode_datareader_submsg(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, const struct proxy_writer *pwr, const struct ddsi_guid *rd_guid); /** * @brief Encode datawriter submessage when necessary. @@ -1332,7 +1332,7 @@ inline void encode_datareader_submsg( UNUSED_ARG(struct nn_xmsg *msg), UNUSED_ARG(struct nn_xmsg_marker sm_marker), - UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const struct proxy_writer *pwr), UNUSED_ARG(const struct ddsi_guid *rd_guid)) { } diff --git a/src/core/ddsi/include/dds/ddsi/q_config.h b/src/core/ddsi/include/dds/ddsi/q_config.h index a73ff29..1da2169 100644 --- a/src/core/ddsi/include/dds/ddsi/q_config.h +++ b/src/core/ddsi/include/dds/ddsi/q_config.h @@ -337,6 +337,7 @@ struct config int multicast_ttl; struct config_maybe_uint32 socket_min_rcvbuf_size; uint32_t socket_min_sndbuf_size; + int64_t ack_delay; int64_t nack_delay; int64_t preemptive_ack_delay; int64_t schedule_time_rounding; diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 2dfa554..b87d36a 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -129,7 +129,7 @@ struct wr_prd_match { seqno_t max_seq; /* sort-of highest ack'd seq nr in subtree (see augment function) */ seqno_t seq; /* highest acknowledged seq nr */ seqno_t last_seq; /* highest seq send to this reader used when filter is applied */ - int32_t num_reliable_readers_where_seq_equals_max; + uint32_t num_reliable_readers_where_seq_equals_max; ddsi_guid_t arbitrary_unacked_reader; nn_count_t prev_acknack; /* latest accepted acknack sequence number */ nn_count_t prev_nackfrag; /* latest accepted nackfrag sequence number */ @@ -150,6 +150,13 @@ enum pwr_rd_match_syncstate { PRMSS_OUT_OF_SYNC /* not in sync with proxy writer */ }; +struct last_nack_summary { + seqno_t seq_end_p1; /* last seq for which we requested a retransmit */ + seqno_t seq_base; + uint32_t frag_end_p1; /* last fragnum of seq_last_nack for which requested a retransmit */ + uint32_t frag_base; +}; + struct pwr_rd_match { ddsrt_avl_node_t avlnode; ddsi_guid_t rd_guid; @@ -158,12 +165,18 @@ struct pwr_rd_match { nn_count_t prev_heartbeat; /* latest accepted heartbeat (see also add_proxy_writer_to_reader) */ ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */ ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ - ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ - seqno_t seq_last_nack; /* last seq for which we requested a retransmit */ + ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ + ddsrt_mtime_t t_last_ack; /* (local) time we last sent any ACKNACK */ seqno_t last_seq; /* last known sequence number from this writer */ + struct last_nack_summary last_nack; struct xevent *acknack_xevent; /* entry in xevent queue for sending acknacks */ enum pwr_rd_match_syncstate in_sync; /* whether in sync with the proxy writer */ - unsigned filtered:1; + unsigned ack_requested : 1; /* set on receipt of HEARTBEAT with FINAL clear, cleared on sending an ACKNACK */ + unsigned heartbeat_since_ack : 1; /* set when a HEARTBEAT has been received since the last ACKNACK */ + unsigned heartbeatfrag_since_ack : 1; /* set when a HEARTBEATFRAG has been received since the last ACKNACK */ + unsigned directed_heartbeat : 1; /* set on receipt of a directed heartbeat, cleared on sending an ACKNACK */ + unsigned nack_sent_on_nackdelay : 1; /* set when the most recent NACK sent was because of the NackDelay */ + unsigned filtered : 1; union { struct { seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */ @@ -304,7 +317,7 @@ struct writer uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */ uint32_t rexmit_burst_size_limit; /* derived from reader's receive_buffer_size */ uint32_t num_readers; /* total number of matching PROXY readers */ - int32_t num_reliable_readers; /* number of matching reliable PROXY readers */ + uint32_t num_reliable_readers; /* number of matching reliable PROXY readers */ ddsrt_avl_tree_t readers; /* all matching PROXY readers, see struct wr_prd_match */ ddsrt_avl_tree_t local_readers; /* all matching LOCAL readers, see struct wr_rd_match */ #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS @@ -438,10 +451,9 @@ struct proxy_writer { int32_t n_reliable_readers; /* number of those that are reliable */ int32_t n_readers_out_of_sync; /* number of those that require special handling (accepting historical data, waiting for historical data set to become complete) */ seqno_t last_seq; /* highest known seq published by the writer, not last delivered */ - uint32_t last_fragnum; /* last known frag for last_seq, or ~0u if last_seq not partial */ + uint32_t last_fragnum; /* last known frag for last_seq, or UINT32_MAX if last_seq not partial */ nn_count_t nackfragcount; /* last nackfrag seq number */ ddsrt_atomic_uint32_t next_deliv_seq_lowword; /* lower 32-bits for next sequence number that will be delivered; for generating acks; 32-bit so atomic reads on all supported platforms */ - unsigned last_fragnum_reset: 1; /* iff set, heartbeat advertising last_seq as highest seq resets last_fragnum */ unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */ unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */ unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */ diff --git a/src/core/ddsi/include/dds/ddsi/q_protocol.h b/src/core/ddsi/include/dds/ddsi/q_protocol.h index fbb87cb..f60fd98 100644 --- a/src/core/ddsi/include/dds/ddsi/q_protocol.h +++ b/src/core/ddsi/include/dds/ddsi/q_protocol.h @@ -247,7 +247,7 @@ typedef struct AckNack { DDSRT_WARNING_MSVC_ON(4200) #define ACKNACK_FLAG_FINAL 0x02u #define ACKNACK_SIZE(numbits) (offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits) + 4) -#define ACKNACK_SIZE_MAX ACKNACK_SIZE (256u) +#define ACKNACK_SIZE_MAX ACKNACK_SIZE (NN_SEQUENCE_NUMBER_SET_MAX_BITS) DDSRT_WARNING_MSVC_OFF(4200) typedef struct Gap { @@ -260,7 +260,7 @@ typedef struct Gap { } Gap_t; DDSRT_WARNING_MSVC_ON(4200) #define GAP_SIZE(numbits) (offsetof (Gap_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits)) -#define GAP_SIZE_MAX GAP_SIZE (256u) +#define GAP_SIZE_MAX GAP_SIZE (NN_SEQUENCE_NUMBER_SET_MAX_BITS) typedef struct InfoTS { SubmessageHeader_t smhdr; @@ -300,7 +300,7 @@ typedef struct NackFrag { } NackFrag_t; DDSRT_WARNING_MSVC_ON(4200) #define NACKFRAG_SIZE(numbits) (offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits) + 4) -#define NACKFRAG_SIZE_MAX NACKFRAG_SIZE (256u) +#define NACKFRAG_SIZE_MAX NACKFRAG_SIZE (NN_FRAGMENT_NUMBER_SET_MAX_BITS) typedef union Submessage { SubmessageHeader_t smhdr; diff --git a/src/core/ddsi/include/dds/ddsi/q_radmin.h b/src/core/ddsi/include/dds/ddsi/q_radmin.h index d32168a..ba89518 100644 --- a/src/core/ddsi/include/dds/ddsi/q_radmin.h +++ b/src/core/ddsi/include/dds/ddsi/q_radmin.h @@ -223,7 +223,15 @@ struct nn_defrag *nn_defrag_new (const struct ddsrt_log_cfg *logcfg, enum nn_def void nn_defrag_free (struct nn_defrag *defrag); struct nn_rsample *nn_defrag_rsample (struct nn_defrag *defrag, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo); void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1); -int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz); + +enum nn_defrag_nackmap_result { + DEFRAG_NACKMAP_UNKNOWN_SAMPLE, + DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN, + DEFRAG_NACKMAP_FRAGMENTS_MISSING +}; + +enum nn_defrag_nackmap_result nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz); + void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min); struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_reorder_mode mode, uint32_t max_samples, bool late_ack_mode); @@ -232,8 +240,8 @@ struct nn_rsample *nn_reorder_rsample_dup_first (struct nn_rmsg *rmsg, struct nn struct nn_rdata *nn_rsample_fragchain (struct nn_rsample *rsample); nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rsample *rsampleiv, int *refcount_adjust, int delivery_queue_full_p); nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rdata *rdata, seqno_t min, seqno_t maxp1, int *refcount_adjust); -int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq); -unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail); +int nn_reorder_wantsample (const struct nn_reorder *reorder, seqno_t seq); +unsigned nn_reorder_nackmap (const struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail); seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder); void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq); diff --git a/src/core/ddsi/src/ddsi_acknack.c b/src/core/ddsi/src/ddsi_acknack.c new file mode 100644 index 0000000..ba52e42 --- /dev/null +++ b/src/core/ddsi/src/ddsi_acknack.c @@ -0,0 +1,495 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#include "dds/ddsrt/static_assert.h" +#include "dds/ddsi/q_rtps.h" +#include "dds/ddsi/q_radmin.h" +#include "dds/ddsi/q_misc.h" +#include "dds/ddsi/q_bswap.h" +#include "dds/ddsi/q_xmsg.h" +#include "dds/ddsi/q_log.h" +#include "dds/ddsi/q_bitset.h" +#include "dds/ddsi/ddsi_domaingv.h" +#include "dds/ddsi/ddsi_acknack.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/ddsi_security_omg.h" + +#define ACK_REASON_IN_FLAGS 0 + +static seqno_t next_deliv_seq (const struct proxy_writer *pwr, const seqno_t next_seq) +{ + /* We want to determine next_deliv_seq, the next sequence number to + be delivered to all in-sync readers, so that we can acknowledge + what we have actually delivered. This is different from next_seq + tracks, which tracks the sequence number up to which all samples + have been received. The difference is the delivery queue. + + There is always but a single delivery queue, and hence delivery + thread, associated with a single proxy writer; but the ACKs are + always generated by another thread. Therefore, updates to + next_deliv_seq need only be atomic with respect to these reads. + On all supported platforms we can atomically load and store 32 + bits without issue, and so we store just the low word of the + sequence number. + + We know 1 <= next_deliv_seq AND next_seq - N <= next_deliv_seq <= + next_seq for N << 2**32. With n = next_seq, nd = next_deliv_seq, + H the upper half and L the lower half: + + - H(nd) <= H(n) <= H(nd)+1 { n >= nd AND N << 2*32} + - H(n) = H(nd) => L(n) >= L(nd) { n >= nd } + - H(n) = H(nd)+1 => L(n) < L(nd) { N << 2*32 } + + Therefore: + + L(n) < L(nd) <=> H(n) = H(nd+1) + + a.k.a.: + + nd = nd' - if nd' > nd then 2**32 else 0 + where nd' = 2**32 * H(n) + L(nd) + + By not locking next_deliv_seq, we may have nd a bit lower than it + could be, but that only means we are acknowledging slightly less + than we could; but that is perfectly acceptible. + + FIXME: next_seq - #dqueue could probably be used instead, + provided #dqueue is decremented after delivery, rather than + before delivery. */ + const uint32_t lw = ddsrt_atomic_ld32 (&pwr->next_deliv_seq_lowword); + seqno_t next_deliv_seq; + next_deliv_seq = (next_seq & ~(seqno_t) UINT32_MAX) | lw; + if (next_deliv_seq > next_seq) + next_deliv_seq -= ((seqno_t) 1) << 32; + assert (0 < next_deliv_seq && next_deliv_seq <= next_seq); + return next_deliv_seq; +} + +static void add_AckNack_getsource (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct nn_reorder **reorder, seqno_t *bitmap_base, int *notail) +{ + /* if in sync, look at proxy writer status, else look at proxy-writer--reader match status */ + if (rwn->in_sync == PRMSS_OUT_OF_SYNC || rwn->filtered) + { + *reorder = rwn->u.not_in_sync.reorder; + *bitmap_base = nn_reorder_next_seq (*reorder); + *notail = 0; + } + else + { + *reorder = pwr->reorder; + if (!pwr->e.gv->config.late_ack_mode) + { + *bitmap_base = nn_reorder_next_seq (*reorder); + *notail = 0; + } + else + { + *bitmap_base = next_deliv_seq (pwr, nn_reorder_next_seq (*reorder)); + *notail = nn_dqueue_is_full (pwr->dqueue); + } + } +} + +DDSRT_STATIC_ASSERT ((NN_SEQUENCE_NUMBER_SET_MAX_BITS % 32) == 0 && (NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); +struct add_AckNack_info { + bool nack_sent_on_nackdelay; +#if ACK_REASON_IN_FLAGS + uint8_t flags; +#endif + struct { + struct nn_sequence_number_set_header set; + uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; + } acknack; + struct { + seqno_t seq; + struct nn_fragment_number_set_header set; + uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; + } nackfrag; +}; + +static bool add_AckNack_makebitmaps (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct add_AckNack_info *info) +{ + struct nn_reorder *reorder; + seqno_t bitmap_base; + int notail; /* notail = false: all known missing ones are nack'd */ + add_AckNack_getsource (pwr, rwn, &reorder, &bitmap_base, ¬ail); + + /* Make bitmap; note that we've made sure to have room for the maximum bitmap size. */ + const seqno_t last_seq = rwn->filtered ? rwn->last_seq : pwr->last_seq; + const uint32_t numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &info->acknack.set, info->acknack.bits, NN_SEQUENCE_NUMBER_SET_MAX_BITS, notail); + if (numbits == 0) + { + info->nackfrag.seq = 0; + return false; + } + + /* Scan through bitmap, cutting it off at the first missing sample that the defragmenter + knows about. Then note the sequence number & add a NACKFRAG for that sample */ + info->nackfrag.seq = 0; + const seqno_t base = fromSN (info->acknack.set.bitmap_base); + for (uint32_t i = 0; i < numbits; i++) + { + if (!nn_bitset_isset (numbits, info->acknack.bits, i)) + continue; + + const seqno_t seq = base + i; + const uint32_t fragnum = (seq == pwr->last_seq) ? pwr->last_fragnum : UINT32_MAX; + switch (nn_defrag_nackmap (pwr->defrag, seq, fragnum, &info->nackfrag.set, info->nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS)) + { + case DEFRAG_NACKMAP_UNKNOWN_SAMPLE: + break; + case DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN: + /* Cut the NACK short (or make it an ACK if this is the first sample), no NACKFRAG */ + info->nackfrag.seq = 0; + info->acknack.set.numbits = i; + return (i > 0); + case DEFRAG_NACKMAP_FRAGMENTS_MISSING: + /* Cut the NACK short, NACKFRAG */ + info->nackfrag.seq = seq; + info->acknack.set.numbits = i; + return true; + } + } + return true; +} + +static void add_NackFrag (struct nn_xmsg *msg, const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, const struct add_AckNack_info *info) +{ + struct nn_xmsg_marker sm_marker; + NackFrag_t *nf; + + assert (info->nackfrag.set.numbits > 0 && info->nackfrag.set.numbits <= NN_FRAGMENT_NUMBER_SET_MAX_BITS); + nf = nn_xmsg_append (msg, &sm_marker, NACKFRAG_SIZE (info->nackfrag.set.numbits)); + + nn_xmsg_submsg_init (msg, sm_marker, SMID_NACK_FRAG); + nf->readerId = nn_hton_entityid (rwn->rd_guid.entityid); + nf->writerId = nn_hton_entityid (pwr->e.guid.entityid); + nf->writerSN = toSN (info->nackfrag.seq); +#if ACK_REASON_IN_FLAGS + nf->smhdr.flags |= info->flags; +#endif + // We use 0-based fragment numbers, but externally have to provide 1-based fragment numbers */ + nf->fragmentNumberState.bitmap_base = info->nackfrag.set.bitmap_base + 1; + nf->fragmentNumberState.numbits = info->nackfrag.set.numbits; + memcpy (nf->bits, info->nackfrag.bits, NN_FRAGMENT_NUMBER_SET_BITS_SIZE (info->nackfrag.set.numbits)); + + // Count field is at a variable offset ... silly DDSI spec + nn_count_t * const countp = + (nn_count_t *) ((char *) nf + offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nf->fragmentNumberState.numbits)); + *countp = pwr->nackfragcount; + + nn_xmsg_submsg_setnext (msg, sm_marker); + + if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) + { + ETRACE (pwr, "nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", + pwr->nackfragcount, fromSN (nf->writerSN), + nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); + for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) + ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); + } + + // Encode the sub-message when needed + encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid); +} + +static void add_AckNack (struct nn_xmsg *msg, const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, const struct add_AckNack_info *info) +{ + /* If pwr->have_seen_heartbeat == 0, no heartbeat has been received + by this proxy writer yet, so we'll be sending a pre-emptive + AckNack. NACKing data now will most likely cause another NACK + upon reception of the first heartbeat, and so cause the data to + be resent twice. */ + AckNack_t *an; + struct nn_xmsg_marker sm_marker; + + an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX); + nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK); + an->readerId = nn_hton_entityid (rwn->rd_guid.entityid); + an->writerId = nn_hton_entityid (pwr->e.guid.entityid); + + // set FINAL flag late, in case it is decided that the "response_required" flag + // should be set depending on the exact AckNack/NackFrag generated + an->smhdr.flags |= ACKNACK_FLAG_FINAL; +#if ACK_REASON_IN_FLAGS + an->smhdr.flags |= info->flags; +#endif + an->readerSNState = info->acknack.set; + memcpy (an->bits, info->acknack.bits, NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits)); + + // Count field is at a variable offset ... silly DDSI spec + nn_count_t * const countp = + (nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits)); + *countp = rwn->count; + // Reset submessage size, now that we know the real size, and update the offset to the next submessage. + nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits)); + nn_xmsg_submsg_setnext (msg, sm_marker); + + if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) + { + ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": F#%"PRIu32":%"PRId64"/%"PRIu32":", + PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, + fromSN (an->readerSNState.bitmap_base), an->readerSNState.numbits); + for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) + ETRACE (pwr, "%c", nn_bitset_isset (an->readerSNState.numbits, an->bits, ui) ? '1' : '0'); + } + + // Encode the sub-message when needed + encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid); +} + +static enum add_AckNack_result get_AckNack_info (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct last_nack_summary *nack_summary, struct add_AckNack_info *info, bool ackdelay_passed, bool nackdelay_passed) +{ + /* If pwr->have_seen_heartbeat == 0, no heartbeat has been received + by this proxy writer yet, so we'll be sending a pre-emptive + AckNack. NACKing data now will most likely cause another NACK + upon reception of the first heartbeat, and so cause the data to + be resent twice. */ + enum add_AckNack_result result; + +#if ACK_REASON_IN_FLAGS + info->flags = 0; +#endif + if (!add_AckNack_makebitmaps (pwr, rwn, info)) + { + info->nack_sent_on_nackdelay = rwn->nack_sent_on_nackdelay; + nack_summary->seq_base = fromSN (info->acknack.set.bitmap_base); + nack_summary->seq_end_p1 = 0; + nack_summary->frag_base = 0; + nack_summary->frag_end_p1 = 0; + result = AANR_ACK; + } + else + { + // [seq_base:0 .. seq_end_p1:0) + [seq_end_p1:frag_base .. seq_end_p1:frag_end_p1) if frag_end_p1 > 0 + const seqno_t seq_base = fromSN (info->acknack.set.bitmap_base); + assert (seq_base >= 1 && (info->acknack.set.numbits > 0 || info->nackfrag.seq > 0)); + assert (info->nackfrag.seq == 0 || info->nackfrag.set.numbits > 0); + const seqno_t seq_end_p1 = seq_base + info->acknack.set.numbits; + const uint32_t frag_base = (info->nackfrag.seq > 0) ? info->nackfrag.set.bitmap_base : 0; + const uint32_t frag_end_p1 = (info->nackfrag.seq > 0) ? info->nackfrag.set.bitmap_base + info->nackfrag.set.numbits : 0; + + /* Let caller know whether it is a nack, and, in steady state, set + final to prevent a response if it isn't. The initial + (pre-emptive) acknack is different: it'd be nice to get a + heartbeat in response. + + Who cares about an answer to an acknowledgment!? -- actually, + that'd a very useful feature in combination with directed + heartbeats, or somesuch, to get reliability guarantees. */ + nack_summary->seq_end_p1 = seq_end_p1; + nack_summary->frag_end_p1 = frag_end_p1; + nack_summary->seq_base = seq_base; + nack_summary->frag_base = frag_base; + + // [seq_base:0 .. seq_end_p1:0) and [seq_end_p1:frag_base .. seq_end_p1:frag_end_p1) if frag_end_p1 > 0 + if (seq_base > rwn->last_nack.seq_end_p1 || (seq_base == rwn->last_nack.seq_end_p1 && frag_base >= rwn->last_nack.frag_end_p1)) + { + // A NACK for something not previously NACK'd or NackDelay passed, update nack_{seq,frag} to reflect + // the changed state + info->nack_sent_on_nackdelay = false; +#if ACK_REASON_IN_FLAGS + info->flags = 0x10; +#endif + result = AANR_NACK; + } + else if (rwn->directed_heartbeat && (!rwn->nack_sent_on_nackdelay || nackdelay_passed)) + { + info->nack_sent_on_nackdelay = false; +#if ACK_REASON_IN_FLAGS + info->flags = 0x20; +#endif + result = AANR_NACK; + } + else if (nackdelay_passed) + { + info->nack_sent_on_nackdelay = true; +#if ACK_REASON_IN_FLAGS + info->flags = 0x30; +#endif + result = AANR_NACK; + } + else + { + // Overlap between this NACK and the previous one and NackDelay has not yet passed: clear numbits and + // nackfrag_numbits to turn the NACK into an ACK and pretend to the caller nothing scary is going on. +#if ACK_REASON_IN_FLAGS + info->flags = 0x40; +#endif + info->nack_sent_on_nackdelay = rwn->nack_sent_on_nackdelay; + info->acknack.set.numbits = 0; + info->nackfrag.seq = 0; + result = AANR_SUPPRESSED_NACK; + } + } + + if (result == AANR_ACK || result == AANR_SUPPRESSED_NACK) + { + // ACK and SUPPRESSED_NACK both end up being a pure ACK; send those only if we have to + if (!(rwn->heartbeat_since_ack && rwn->ack_requested)) + result = AANR_SUPPRESSED_ACK; // writer didn't ask for it + else if (!(nack_summary->seq_base > rwn->last_nack.seq_base || ackdelay_passed)) + result = AANR_SUPPRESSED_ACK; // no progress since last, not enough time passed + } + else if (info->acknack.set.numbits == 0 && info->nackfrag.seq > 0 && !rwn->ack_requested) + { + // if we are not NACK'ing full samples and we are NACK'ing fragments, skip the ACKNACK submessage if we + // have no interest in a HEARTBEAT and the writer hasn't asked for an ACKNACK since the last one we sent. + result = AANR_NACKFRAG_ONLY; + } + return result; +} + +void sched_acknack_if_needed (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack) +{ + // This is the relatively expensive and precise code to determine what the ACKNACK event will do, + // the alternative is to do: + // + // add_AckNack_getsource (pwr, rwn, &reorder, &bitmap_base, ¬ail); + // const seqno_t last_seq = rwn->filtered ? rwn->last_seq : pwr->last_seq; + // if (bitmap_base <= last_seq) + // (void) resched_xevent_if_earlier (ev, tnow); + // else if (!(rwn->heartbeat_since_ack && rwn->ack_requested)) + // ; // writer didn't ask for it + // else if (!(bitmap_base > rwn->last_nack.seq_base || ackdelay_passed)) + // ; // no progress since last, not enough time passed + // else + // (void) resched_xevent_if_earlier (ev, tnow); + // + // which is a stripped-down version of the same logic that more aggressively schedules the event, + // relying on the event handler to suppress unnecessary messages. There doesn't seem to be a big + // downside to being precise. + + struct ddsi_domaingv * const gv = pwr->e.gv; + const bool ackdelay_passed = (tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_ack, gv->config.ack_delay).v); + const bool nackdelay_passed = (tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay).v); + struct add_AckNack_info info; + struct last_nack_summary nack_summary; + const enum add_AckNack_result aanr = + get_AckNack_info (pwr, rwn, &nack_summary, &info, ackdelay_passed, nackdelay_passed); + if (aanr == AANR_SUPPRESSED_ACK) + ; // nothing to be done now + else if (avoid_suppressed_nack && aanr == AANR_SUPPRESSED_NACK) + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay)); + else + (void) resched_xevent_if_earlier (ev, tnow); +} + +struct nn_xmsg *make_and_resched_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack) +{ + struct ddsi_domaingv * const gv = pwr->e.gv; + struct nn_xmsg *msg; + struct add_AckNack_info info; + + struct last_nack_summary nack_summary; + const enum add_AckNack_result aanr = + get_AckNack_info (pwr, rwn, &nack_summary, &info, + tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_ack, gv->config.ack_delay).v, + tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay).v); + + if (aanr == AANR_SUPPRESSED_ACK) + return NULL; + else if (avoid_suppressed_nack && aanr == AANR_SUPPRESSED_NACK) + { + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay)); + return NULL; + } + + // Committing to sending a message in response: update the state. Note that there's still a + // possibility of not sending a message, but that is only in case of failures of some sort. + // Resetting the flags and bailing out simply means we will wait until the next heartbeat to + // do try again. + rwn->directed_heartbeat = 0; + rwn->heartbeat_since_ack = 0; + rwn->heartbeatfrag_since_ack = 0; + rwn->nack_sent_on_nackdelay = (info.nack_sent_on_nackdelay ? 1 : 0); + + struct participant *pp = NULL; + if (q_omg_proxy_participant_is_secure (pwr->c.proxypp)) + { + struct reader *rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &rwn->rd_guid); + if (rd) + pp = rd->c.pp; + } + + if ((msg = nn_xmsg_new (gv->xmsgpool, &rwn->rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) + { + return NULL; + } + + nn_xmsg_setdstPWR (msg, pwr); + if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v) + { + // If HB->ACK latency measurement is enabled, and we have a + // timestamp available, add it and clear the time stamp. There + // is no real guarantee that the two match, but I haven't got a + // solution for that yet ... If adding the time stamp fails, + // too bad, but no reason to get worried. */ + nn_xmsg_add_timestamp (msg, rwn->hb_timestamp); + rwn->hb_timestamp.v = 0; + } + + if (aanr != AANR_NACKFRAG_ONLY) + add_AckNack (msg, pwr, rwn, &info); + if (info.nackfrag.seq > 0) + { + ETRACE (pwr, " + "); + add_NackFrag (msg, pwr, rwn, &info); + } + ETRACE (pwr, "\n"); + if (nn_xmsg_size (msg) == 0) + { + // attempt at encoding the message caused it to be dropped + nn_xmsg_free (msg); + return NULL; + } + + rwn->count++; + switch (aanr) + { + case AANR_SUPPRESSED_ACK: + // no message: caught by the size = 0 check + assert (0); + break; + case AANR_ACK: + rwn->ack_requested = 0; + rwn->t_last_ack = tnow; + rwn->last_nack.seq_base = nack_summary.seq_base; + break; + case AANR_NACK: + case AANR_NACKFRAG_ONLY: + if (nack_summary.frag_end_p1 != 0) + pwr->nackfragcount++; + if (aanr != AANR_NACKFRAG_ONLY) + { + rwn->ack_requested = 0; + rwn->t_last_ack = tnow; + } + rwn->last_nack = nack_summary; + rwn->t_last_nack = tnow; + /* If NACKing, make sure we don't give up too soon: even though + we're not allowed to send an ACKNACK unless in response to a + HEARTBEAT, I've seen too many cases of not sending an NACK + because the writing side got confused ... Better to recover + eventually. */ + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, gv->config.auto_resched_nack_delay)); + break; + case AANR_SUPPRESSED_NACK: + rwn->ack_requested = 0; + rwn->t_last_ack = tnow; + rwn->last_nack.seq_base = nack_summary.seq_base; + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay)); + break; + } + GVTRACE ("send acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT")\n", PGUID (rwn->rd_guid), PGUID (pwr->e.guid)); + return msg; +} diff --git a/src/core/ddsi/src/ddsi_security_omg.c b/src/core/ddsi/src/ddsi_security_omg.c index 92c9529..c47a10a 100644 --- a/src/core/ddsi/src/ddsi_security_omg.c +++ b/src/core/ddsi/src/ddsi_security_omg.c @@ -3390,7 +3390,7 @@ bool decode_DataFrag (const struct ddsi_domaingv *gv, struct nn_rsample_info *sa return decode_payload (gv, sampleinfo, payloadp, &payloadsz, submsg_len); } -void encode_datareader_submsg (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, struct proxy_writer *pwr, const struct ddsi_guid *rd_guid) +void encode_datareader_submsg (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, const struct proxy_writer *pwr, const struct ddsi_guid *rd_guid) { /* FIXME: avoid this lookup */ struct reader * const rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, rd_guid); @@ -3964,7 +3964,7 @@ extern inline bool decode_DataFrag( extern inline void encode_datareader_submsg( UNUSED_ARG(struct nn_xmsg *msg), UNUSED_ARG(struct nn_xmsg_marker sm_marker), - UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const struct proxy_writer *pwr), UNUSED_ARG(const struct ddsi_guid *rd_guid)); extern inline void encode_datawriter_submsg( diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index f23a18e..f494415 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -275,8 +275,8 @@ static int print_proxy_participants (struct thread_state1 * const ts1, struct dd x += cpf (conn, " last_seq %"PRId64" last_fragnum %"PRIu32"\n", w->last_seq, w->last_fragnum); for (m = ddsrt_avl_iter_first (&wr_readers_treedef, &w->readers, &rdit); m; m = ddsrt_avl_iter_next (&rdit)) { - x += cpf (conn, " rd "PGUIDFMT" (nack %"PRId64" %"PRId64")\n", - PGUID (m->rd_guid), m->seq_last_nack, m->t_last_nack.v); + x += cpf (conn, " rd "PGUIDFMT" (nack %"PRId64" frag %"PRIu32" %"PRId64")\n", + PGUID (m->rd_guid), m->last_nack.seq_end_p1, m->last_nack.frag_end_p1, m->t_last_nack.v); switch (m->in_sync) { case PRMSS_SYNC: diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 35dbc42..b58c334 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2528,9 +2528,18 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader m->hb_timestamp.v = 0; m->t_heartbeat_accepted.v = 0; m->t_last_nack.v = 0; - m->seq_last_nack = 0; + m->t_last_ack.v = 0; + m->last_nack.seq_end_p1 = 0; + m->last_nack.seq_base = 0; + m->last_nack.frag_end_p1 = 0; + m->last_nack.frag_base = 0; m->last_seq = 0; m->filtered = 0; + m->ack_requested = 0; + m->heartbeat_since_ack = 0; + m->heartbeatfrag_since_ack = 0; + m->directed_heartbeat = 0; + m->nack_sent_on_nackdelay = 0; #ifdef DDSI_INCLUDE_SECURITY m->crypto_handle = crypto_handle; @@ -3411,7 +3420,7 @@ static void augment_wr_prd_match (void *vnode, const void *vleft, const void *vr /* seq < max cannot be true for a best-effort reader or a demoted */ n->arbitrary_unacked_reader = n->prd_guid; } - else if (n->is_reliable && (n->seq == MAX_SEQ_NUMBER || !n->has_replied_to_hb)) + else if (n->is_reliable && (n->seq == MAX_SEQ_NUMBER || n->seq == 0 || !n->has_replied_to_hb)) { /* demoted readers and reliable readers that have not yet replied to a heartbeat are candidates */ n->arbitrary_unacked_reader = n->prd_guid; @@ -5479,7 +5488,6 @@ int new_proxy_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid, pwr->last_seq = 0; pwr->last_fragnum = UINT32_MAX; pwr->nackfragcount = 1; - pwr->last_fragnum_reset = 0; pwr->alive = 1; pwr->alive_vclock = 0; pwr->filtered = 0; diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index 99f9120..6aa3694 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -1443,7 +1443,7 @@ void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1) defrag->max_sample = ddsrt_avl_find_max (&defrag_sampletree_treedef, &defrag->sampletree); } -int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz) +enum nn_defrag_nackmap_result nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz) { struct nn_rsample *s; struct nn_defrag_iv *iv; @@ -1455,7 +1455,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu if (maxfragnum == UINT32_MAX) { /* If neither the caller nor the defragmenter knows anything about the sample, say so */ - return -1; + return DEFRAG_NACKMAP_UNKNOWN_SAMPLE; } else { @@ -1468,7 +1468,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu map->numbits = maxfragnum + 1; map->bitmap_base = 0; nn_bitset_one (map->numbits, mapbits); - return (int) map->numbits; + return DEFRAG_NACKMAP_FRAGMENTS_MISSING; } } @@ -1505,7 +1505,9 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu /* if all data is available, iv == liv and map_end < map->bitmap_base, but there is nothing to request in that case. */ - map->numbits = (map_end < map->bitmap_base) ? 0 : map_end - map->bitmap_base + 1; + if (map_end < map->bitmap_base) + return DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN; + map->numbits = map_end - map->bitmap_base + 1; iv = ddsrt_avl_find_succ (&rsample_defrag_fragtree_treedef, &s->u.defrag.fragtree, iv); } @@ -1544,7 +1546,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu unsigned x = (unsigned) (i - map->bitmap_base); nn_bitset_set (map->numbits, mapbits, x); } - return (int) map->numbits; + return DEFRAG_NACKMAP_FRAGMENTS_MISSING; } /* There is only one defrag per proxy writer. However for the Volatile Secure writer a filter @@ -2308,7 +2310,7 @@ nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reord } } -int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq) +int nn_reorder_wantsample (const struct nn_reorder *reorder, seqno_t seq) { struct nn_rsample *s; if (seq < reorder->next_seq) @@ -2320,7 +2322,7 @@ int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq) return (s == NULL || s->u.reorder.maxp1 <= seq); } -unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail) +unsigned nn_reorder_nackmap (const struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail) { struct nn_rsample *iv; seqno_t i; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index b69dd1c..a7d5403 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -55,6 +55,7 @@ #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata_default.h" /* FIXME: get rid of this */ #include "dds/ddsi/ddsi_security_omg.h" +#include "dds/ddsi/ddsi_acknack.h" #include "dds/ddsi/sysdeps.h" #include "dds__whc.h" @@ -576,9 +577,13 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *prev_co the consequence undefined behaviour). Serial number arithmetic deals with the wrap-around after 2**31-1. + Cyclone pre-emptive heartbeats have "count" bitmap_base = 1, NACK + nothing, have count set to 0. They're never sent more often than + once per second, so the 500ms timeout allows them to pass through. + This combined procedure should give the best of all worlds, and is not more expensive in the common case. */ - const int64_t timeout = DDS_SECS (2); + const int64_t timeout = DDS_MSECS (500); if ((int32_t) (new_count - *prev_count) <= 0 && tnow.v - t_last_accepted->v < timeout && !force_accept) return 0; @@ -652,6 +657,7 @@ struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader * struct defer_hb_state { struct nn_xmsg *m; struct xeventq *evq; + int hbansreq; uint64_t wr_iid; uint64_t prd_iid; }; @@ -663,8 +669,16 @@ static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state * if (defer_hb_state->m != NULL) { if (wr->e.iid == defer_hb_state->wr_iid && prd->e.iid == defer_hb_state->prd_iid) - return; - qxev_msg (wr->evq, defer_hb_state->m); + { + if (hbansreq <= defer_hb_state->hbansreq) + return; + else + nn_xmsg_free (defer_hb_state->m); + } + else + { + qxev_msg (wr->evq, defer_hb_state->m); + } } ASSERT_MUTEX_HELD (&wr->e.lock); @@ -674,6 +688,7 @@ static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state * nn_xmsg_setdstPRD (defer_hb_state->m, prd); add_Heartbeat (defer_hb_state->m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0); defer_hb_state->evq = wr->evq; + defer_hb_state->hbansreq = hbansreq; defer_hb_state->wr_iid = wr->e.iid; defer_hb_state->prd_iid = prd->e.iid; } @@ -782,7 +797,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const relevant to setting "has_replied_to_hb" and "assumed_in_sync". */ is_pure_ack = !acknack_is_nack (msg); is_pure_nonhist_ack = is_pure_ack && seqbase - 1 >= rn->seq; - is_preemptive_ack = seqbase <= 1 && is_pure_ack; + is_preemptive_ack = seqbase < 1 || (seqbase == 1 && *countp == 0); wr->num_acks_received++; if (!is_pure_ack) @@ -864,7 +879,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const msgs_sent = 0; msgs_lost = 0; max_seq_in_reply = 0; - if (!rn->has_replied_to_hb && seqbase > 1 && is_pure_nonhist_ack) + if (!rn->has_replied_to_hb && is_pure_nonhist_ack) { RSTTRACE (" setting-has-replied-to-hb"); rn->has_replied_to_hb = 1; @@ -1125,6 +1140,7 @@ struct handle_Heartbeat_helper_arg { ddsrt_wctime_t timestamp; ddsrt_etime_t tnow; ddsrt_mtime_t tnow_mt; + bool directed_heartbeat; }; static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct handle_Heartbeat_helper_arg * const arg) @@ -1132,62 +1148,38 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand struct receiver_state * const rst = arg->rst; Heartbeat_t const * const msg = arg->msg; struct proxy_writer * const pwr = arg->pwr; - seqno_t refseq, last_seq; ASSERT_MUTEX_HELD (&pwr->e.lock); - /* Not supposed to respond to repeats and old heartbeats. */ + if (wn->acknack_xevent == NULL) + { + // Ignore best-effort readers + return; + } + if (!accept_ack_or_hb_w_timeout (msg->count, &wn->prev_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) { RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid)); return; } - /* Reference sequence number for determining whether or not to - Ack/Nack unfortunately depends on whether the reader is in - sync. */ - if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered) - refseq = nn_reorder_next_seq (pwr->reorder) - 1; - else - refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; - RSTTRACE (" "PGUIDFMT"@%"PRId64"%s", PGUID (wn->rd_guid), refseq, (wn->in_sync == PRMSS_SYNC) ? "(sync)" : (wn->in_sync == PRMSS_TLCATCHUP) ? "(tlcatchup)" : ""); - - /* Reschedule AckNack transmit if deemed appropriate; unreliable - readers have acknack_xevent == NULL and can't do this. - - There is no real need to send a nack from each reader that is in - sync -- indeed, we could simply ignore the destination address in - the messages we receive and only ever nack each sequence number - once, regardless of which readers care about it. */ - if (wn->acknack_xevent) + if (rst->gv->logconfig.c.mask & DDS_LC_TRACE) { - ddsrt_mtime_t tsched = DDSRT_MTIME_NEVER; - - if (wn->filtered) - last_seq = wn->last_seq; + seqno_t refseq; + if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered) + refseq = nn_reorder_next_seq (pwr->reorder); else - last_seq = pwr->last_seq; - if (last_seq > refseq) - { - RSTTRACE ("/NAK"); - if (arg->tnow_mt.v >= wn->t_last_nack.v + rst->gv->config.nack_delay || refseq >= wn->seq_last_nack) - tsched = arg->tnow_mt; - else - { - tsched.v = arg->tnow_mt.v + rst->gv->config.nack_delay; - RSTTRACE ("d"); - } - } - else if (!(msg->smhdr.flags & HEARTBEAT_FLAG_FINAL)) - { - tsched = arg->tnow_mt; - } - if (resched_xevent_if_earlier (wn->acknack_xevent, tsched)) - { - if (rst->gv->config.meas_hb_to_ack_latency && arg->timestamp.v) - wn->hb_timestamp = arg->timestamp; - } + refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder); + RSTTRACE (" "PGUIDFMT"@%"PRId64"%s", PGUID (wn->rd_guid), refseq - 1, (wn->in_sync == PRMSS_SYNC) ? "(sync)" : (wn->in_sync == PRMSS_TLCATCHUP) ? "(tlcatchup)" : ""); } + + wn->heartbeat_since_ack = 1; + if (!(msg->smhdr.flags & HEARTBEAT_FLAG_FINAL)) + wn->ack_requested = 1; + if (arg->directed_heartbeat) + wn->directed_heartbeat = 1; + + sched_acknack_if_needed (wn->acknack_xevent, pwr, wn, arg->tnow_mt, true); } static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid) @@ -1276,18 +1268,11 @@ static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, str if (lastseq > pwr->last_seq) { pwr->last_seq = lastseq; - pwr->last_fragnum = ~0u; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = UINT32_MAX; } - else if (pwr->last_fragnum != ~0u && lastseq == pwr->last_seq) + else if (pwr->last_fragnum != UINT32_MAX && lastseq == pwr->last_seq) { - if (!pwr->last_fragnum_reset) - pwr->last_fragnum_reset = 1; - else - { - pwr->last_fragnum = ~0u; - pwr->last_fragnum_reset = 0; - } + pwr->last_fragnum = UINT32_MAX; } nn_defrag_notegap (pwr->defrag, 1, firstseq); @@ -1375,6 +1360,7 @@ static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, str arg.timestamp = timestamp; arg.tnow = tnow; arg.tnow_mt = ddsrt_time_monotonic (); + arg.directed_heartbeat = (dst.entityid.u != NN_ENTITYID_UNKNOWN && vendor_is_eclipse (rst->vendor)); handle_forall_destinations (&dst, pwr, (ddsrt_avl_walk_t) handle_Heartbeat_helper, &arg); RSTTRACE (")"); @@ -1394,6 +1380,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et src.entityid = msg->writerId; dst.prefix = rst->dst_guid_prefix; dst.entityid = msg->readerId; + const bool directed_heartbeat = (dst.entityid.u != NN_ENTITYID_UNKNOWN && vendor_is_eclipse (rst->vendor)); RSTTRACE ("HEARTBEATFRAG(#%"PRId32":%"PRId64"/[1,%u]", msg->count, seq, fragnum+1); if (!rst->forme) @@ -1424,12 +1411,16 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et { pwr->last_seq = seq; pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; } else if (seq == pwr->last_seq && fragnum > pwr->last_fragnum) { pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; + } + + if (!pwr->have_seen_heartbeat) + { + ddsrt_mutex_unlock(&pwr->e.lock); + return 1; } /* Defragmenting happens at the proxy writer, readers have nothing @@ -1445,33 +1436,59 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et if (nn_reorder_wantsample (pwr->reorder, seq)) { - /* Pick an arbitrary reliable reader's guid for the response -- - assuming a reliable writer -> unreliable reader is rare, and - so scanning the readers is acceptable if the first guess - fails */ - m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers); - if (m->acknack_xevent == NULL) + if (directed_heartbeat) { - m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); - while (m && m->acknack_xevent == NULL) - m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); + /* Cyclone currently only ever sends a HEARTBEAT(FRAG) with the + destination entity id set AFTER retransmitting any samples + that reader requested. So it makes sense to only interpret + those for that reader, and to suppress the NackDelay in a + response to it. But it better be a reliable reader! */ + m = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst); + if (m && m->acknack_xevent == NULL) + m = NULL; + } + else + { + /* Pick an arbitrary reliable reader's guid for the response -- + assuming a reliable writer -> unreliable reader is rare, and + so scanning the readers is acceptable if the first guess + fails */ + m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers); + if (m->acknack_xevent == NULL) + { + m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); + while (m && m->acknack_xevent == NULL) + m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); + } } } else if (seq < nn_reorder_next_seq (pwr->reorder)) { - /* Check out-of-sync readers -- should add a bit to cheaply test - whether there are any (usually there aren't) */ - m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); - while (m) + if (directed_heartbeat) { - if ((m->in_sync == PRMSS_OUT_OF_SYNC) && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq)) + m = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst); + if (m && !(m->in_sync == PRMSS_OUT_OF_SYNC && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq))) { - /* If reader is out-of-sync, and reader is realiable, and + /* Ignore if reader is happy or not best-effort */ + m = NULL; + } + } + else + { + /* Check out-of-sync readers -- should add a bit to cheaply test + whether there are any (usually there aren't) */ + m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); + while (m) + { + if (m->in_sync == PRMSS_OUT_OF_SYNC && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq)) + { + /* If reader is out-of-sync, and reader is realiable, and reader still wants this particular sample, then use this reader to decide which fragments to nack */ - break; + break; + } + m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); } - m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); } } @@ -1479,19 +1496,20 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et RSTTRACE (" no interested reliable readers"); else { - /* Check if we are missing something */ + if (directed_heartbeat) + m->directed_heartbeat = 1; + m->heartbeatfrag_since_ack = 1; + DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); struct { struct nn_fragment_number_set_header set; uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; } nackfrag; - if (nn_defrag_nackmap (pwr->defrag, seq, fragnum, &nackfrag.set, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS) > 0) + const seqno_t last_seq = m->filtered ? m->last_seq : pwr->last_seq; + if (seq == last_seq && nn_defrag_nackmap (pwr->defrag, seq, fragnum, &nackfrag.set, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS) == DEFRAG_NACKMAP_FRAGMENTS_MISSING) { - /* Yes we are (note that this potentially also happens for - samples we no longer care about) */ - int64_t delay = rst->gv->config.nack_delay; - RSTTRACE ("/nackfrag"); - (void) resched_xevent_if_earlier (m->acknack_xevent, ddsrt_mtime_add_duration (ddsrt_time_monotonic(), delay)); + // don't rush it ... + resched_xevent_if_earlier (m->acknack_xevent, ddsrt_mtime_add_duration (ddsrt_time_monotonic (), pwr->e.gv->config.nack_delay)); } } } @@ -1579,6 +1597,7 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons const uint32_t base = msg->fragmentNumberState.bitmap_base - 1; assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX); uint32_t nfrags_lim = (wr->rexmit_burst_size_limit + wr->e.gv->config.fragment_size - 1) / wr->e.gv->config.fragment_size; + bool sent = false; RSTTRACE (" scheduling requested frags ...\n"); for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && nfrags_lim > 0; i++) { @@ -1590,9 +1609,17 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons else if (!qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0)) nfrags_lim = 0; else + { + sent = true; nfrags_lim--; + } } } + if (sent && sample.unacked) + { + if (!wr->retransmitting) + writer_set_retransmitting (wr); + } whc_return_sample (wr->whc, &sample, false); } else @@ -1867,8 +1894,7 @@ static int handle_Gap (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn if (listbase + last_included_rel > pwr->last_seq) { pwr->last_seq = listbase + last_included_rel; - pwr->last_fragnum = ~0u; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = UINT32_MAX; } if (wn && wn->filtered) @@ -2220,7 +2246,7 @@ static void clean_defrag (struct proxy_writer *pwr) } static void handle_regular (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, - uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease) + uint32_t max_fragnum_in_msg, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease) { struct proxy_writer *pwr; struct nn_rsample *rsample; @@ -2293,13 +2319,11 @@ static void handle_regular (struct receiver_state *rst, ddsrt_etime_t tnow, stru if (sampleinfo->seq > pwr->last_seq) { pwr->last_seq = sampleinfo->seq; - pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = max_fragnum_in_msg; } - else if (sampleinfo->seq == pwr->last_seq && fragnum > pwr->last_fragnum) + else if (sampleinfo->seq == pwr->last_seq && max_fragnum_in_msg > pwr->last_fragnum) { - pwr->last_fragnum = fragnum; - pwr->last_fragnum_reset = 0; + pwr->last_fragnum = max_fragnum_in_msg; } clean_defrag (pwr); @@ -2571,12 +2595,12 @@ static int handle_Data (struct receiver_state *rst, ddsrt_etime_t tnow, struct n renew_manbypp_lease = false; /* fall through */ default: - handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, renew_manbypp_lease); + handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, UINT32_MAX, rdata, deferred_wakeup, renew_manbypp_lease); } } else { - handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, true); + handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, UINT32_MAX, rdata, deferred_wakeup, true); } } RSTTRACE (")"); diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 7bd4168..d2ec15f 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -167,8 +167,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru } else { - const int32_t n_unacked = wr->num_reliable_readers - root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max; - assert (n_unacked >= 0); + const uint32_t n_unacked = wr->num_reliable_readers - root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max; if (n_unacked == 0) prd_guid = NULL; else @@ -188,7 +187,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru ETRACE (wr, "unicasting to prd "PGUIDFMT" ", PGUID (*prd_guid)); ETRACE (wr, "(rel-prd %"PRId32" seq-eq-max %"PRId32" seq %"PRId64" maxseq %"PRId64")\n", wr->num_reliable_readers, - ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, + ddsrt_avl_is_empty (&wr->readers) ? -1 : (int32_t) root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, wr->seq, ddsrt_avl_is_empty (&wr->readers) ? (seqno_t) -1 : root_rdmatch (wr)->max_seq); @@ -215,7 +214,9 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (msg, wr->partition_id); #endif - add_Heartbeat (msg, wr, whcst, hbansreq, 0, prd_guid->entityid, issync); + // send to all readers in the participant: whether or not the entityid is set affects + // the retransmit requests + add_Heartbeat (msg, wr, whcst, hbansreq, 0, to_entityid (NN_ENTITYID_UNKNOWN), issync); } /* It is possible that the encoding removed the submessage(s). */ @@ -335,7 +336,7 @@ struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state * ETRACE (wr, "writer_hbcontrol_p2p: wr "PGUIDFMT" unicasting to prd "PGUIDFMT" ", PGUID (wr->e.guid), PGUID (prd->e.guid)); ETRACE (wr, "(rel-prd %d seq-eq-max %d seq %"PRId64" maxseq %"PRId64")\n", wr->num_reliable_readers, - ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, + ddsrt_avl_is_empty (&wr->readers) ? -1 : (int32_t) root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, wr->seq, ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : root_rdmatch (wr)->max_seq); diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index ea1a4bd..9d80dfb 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -42,6 +42,7 @@ #include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_pmd.h" +#include "dds/ddsi/ddsi_acknack.h" #include "dds__whc.h" #include "dds/ddsi/sysdeps.h" @@ -776,220 +777,67 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, ddsrt } } -static seqno_t next_deliv_seq (const struct proxy_writer *pwr, const seqno_t next_seq) +static dds_duration_t preemptive_acknack_interval (const struct pwr_rd_match *rwn) { - /* We want to determine next_deliv_seq, the next sequence number to - be delivered to all in-sync readers, so that we can acknowledge - what we have actually delivered. This is different from next_seq - tracks, which tracks the sequence number up to which all samples - have been received. The difference is the delivery queue. - - There is always but a single delivery queue, and hence delivery - thread, associated with a single proxy writer; but the ACKs are - always generated by another thread. Therefore, updates to - next_deliv_seq need only be atomic with respect to these reads. - On all supported platforms we can atomically load and store 32 - bits without issue, and so we store just the low word of the - sequence number. - - We know 1 <= next_deliv_seq AND next_seq - N <= next_deliv_seq <= - next_seq for N << 2**32. With n = next_seq, nd = next_deliv_seq, - H the upper half and L the lower half: - - - H(nd) <= H(n) <= H(nd)+1 { n >= nd AND N << 2*32} - - H(n) = H(nd) => L(n) >= L(nd) { n >= nd } - - H(n) = H(nd)+1 => L(n) < L(nd) { N << 2*32 } - - Therefore: - - L(n) < L(nd) <=> H(n) = H(nd+1) - - a.k.a.: - - nd = nd' - if nd' > nd then 2**32 else 0 - where nd' = 2**32 * H(n) + L(nd) - - By not locking next_deliv_seq, we may have nd a bit lower than it - could be, but that only means we are acknowledging slightly less - than we could; but that is perfectly acceptible. - - FIXME: next_seq - #dqueue could probably be used instead, - provided #dqueue is decremented after delivery, rather than - before delivery. */ - const uint32_t lw = ddsrt_atomic_ld32 (&pwr->next_deliv_seq_lowword); - seqno_t next_deliv_seq; - next_deliv_seq = (next_seq & ~(seqno_t) UINT32_MAX) | lw; - if (next_deliv_seq > next_seq) - next_deliv_seq -= ((seqno_t) 1) << 32; - assert (0 < next_deliv_seq && next_deliv_seq <= next_seq); - return next_deliv_seq; + if (rwn->t_last_ack.v < rwn->tcreate.v) + return 0; + else + { + const dds_duration_t age = rwn->t_last_ack.v - rwn->tcreate.v; + if (age <= DDS_SECS (10)) + return DDS_SECS (1); + else if (age <= DDS_SECS (60)) + return DDS_SECS (2); + else if (age <= DDS_SECS (120)) + return DDS_SECS (5); + else + return DDS_SECS (10); + } } -static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct pwr_rd_match *rwn, seqno_t *nack_seq) +static struct nn_xmsg *make_preemptive_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow) { - /* If pwr->have_seen_heartbeat == 0, no heartbeat has been received - by this proxy writer yet, so we'll be sending a pre-emptive - AckNack. NACKing data now will most likely cause another NACK - upon reception of the first heartbeat, and so cause the data to - be resent twice. */ - const unsigned max_numbits = 256; /* as spec'd */ - int notail = 0; /* all known missing ones are nack'd */ - struct nn_reorder *reorder; - AckNack_t *an; + const dds_duration_t intv = preemptive_acknack_interval (rwn); + if (tnow.v < ddsrt_mtime_add_duration (rwn->t_last_ack, intv).v) + { + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_ack, intv)); + return NULL; + } + + struct ddsi_domaingv * const gv = pwr->e.gv; + struct participant *pp = NULL; + if (q_omg_proxy_participant_is_secure (pwr->c.proxypp)) + { + struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, &rwn->rd_guid); + if (rd) + pp = rd->c.pp; + } + + struct nn_xmsg *msg; + if ((msg = nn_xmsg_new (gv->xmsgpool, &rwn->rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) + { + // if out of memory, try again later + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, DDS_SECS (1))); + return NULL; + } + + nn_xmsg_setdstPWR (msg, pwr); struct nn_xmsg_marker sm_marker; - uint32_t i, numbits; - seqno_t base, last_seq; - - DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); - struct { - struct nn_fragment_number_set_header set; - uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; - } nackfrag; - int nackfrag_numbits; - seqno_t nackfrag_seq = 0; - seqno_t bitmap_base; - - ASSERT_MUTEX_HELD (pwr->e.lock); - - /* if in sync, look at proxy writer status, else look at - proxy-writer--reader match status */ - if (rwn->in_sync != PRMSS_OUT_OF_SYNC && !rwn->filtered) - { - reorder = pwr->reorder; - if (!pwr->e.gv->config.late_ack_mode) - bitmap_base = nn_reorder_next_seq (reorder); - else - { - bitmap_base = next_deliv_seq (pwr, nn_reorder_next_seq (reorder)); - if (nn_dqueue_is_full (pwr->dqueue)) - notail = 1; - } - } - else - { - reorder = rwn->u.not_in_sync.reorder; - bitmap_base = nn_reorder_next_seq (reorder); - } - - if (rwn->filtered) - last_seq = rwn->last_seq; - else - last_seq = pwr->last_seq; - - an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX); + AckNack_t *an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE (0)); nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK); an->readerId = nn_hton_entityid (rwn->rd_guid.entityid); an->writerId = nn_hton_entityid (pwr->e.guid.entityid); + an->readerSNState.bitmap_base = toSN (1); + an->readerSNState.numbits = 0; + nn_count_t * const countp = + (nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (0)); + *countp = 0; + nn_xmsg_submsg_setnext (msg, sm_marker); + encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid); - /* Make bitmap; note that we've made sure to have room for the - maximum bitmap size. */ - numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &an->readerSNState, an->bits, max_numbits, notail); - base = fromSN (an->readerSNState.bitmap_base); - - /* Scan through bitmap, cutting it off at the first missing sample - that the defragmenter knows about. Then note the sequence number - & add a NACKFRAG for that sample */ - nackfrag_numbits = -1; - for (i = 0; i < numbits && nackfrag_numbits < 0; i++) - { - uint32_t fragnum; - nackfrag_seq = base + i; - if (!nn_bitset_isset (numbits, an->bits, i)) - continue; - if (nackfrag_seq == last_seq) - fragnum = pwr->last_fragnum; - else - fragnum = UINT32_MAX; - nackfrag_numbits = nn_defrag_nackmap (pwr->defrag, nackfrag_seq, fragnum, &nackfrag.set, nackfrag.bits, max_numbits); - } - if (nackfrag_numbits >= 0) { - /* Cut the NACK short, NACKFRAG will be added after the NACK's is - properly formatted */ - assert (i > 0); - an->readerSNState.numbits = numbits = i - 1; - } - - /* Let caller know whether it is a nack, and, in steady state, set - final to prevent a response if it isn't. The initial - (pre-emptive) acknack is different: it'd be nice to get a - heartbeat in response. - - Who cares about an answer to an acknowledgment!? -- actually, - that'd a very useful feature in combination with directed - heartbeats, or somesuch, to get reliability guarantees. */ - *nack_seq = (numbits > 0) ? base + numbits : 0; - if (!pwr->have_seen_heartbeat) { - /* We must have seen a heartbeat for us to consider setting FINAL */ - } else if (*nack_seq && base + numbits <= last_seq) { - /* If it's a NACK and it doesn't cover samples all the way up to - the highest known sequence number, there's some reason to expect - we may to do another round. For which we need a Heartbeat. - - Note: last_seq exists, base is first in bitmap, numbits is - length of bitmap, hence less-than-or-equal. */ - } else { - /* An ACK or we think we'll get everything now. */ - an->smhdr.flags |= ACKNACK_FLAG_FINAL; - } - - { - /* Count field is at a variable offset ... silly DDSI spec. */ - nn_count_t *countp = - (nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits)); - *countp = ++rwn->count; - - /* Reset submessage size, now that we know the real size, and update - the offset to the next submessage. */ - nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits)); - nn_xmsg_submsg_setnext (msg, sm_marker); - - if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) - { - ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRIu32":%"PRId64"/%"PRIu32":", - PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, - base, an->readerSNState.numbits); - for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) - ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); - } - - /* Encode the sub-message when needed. */ - encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); - } - - if (nackfrag_numbits > 0) - { - NackFrag_t *nf; - - /* We use 0-based fragment numbers, but externally have to provide - 1-based fragment numbers */ - assert ((unsigned) nackfrag_numbits == nackfrag.set.numbits); - - nf = nn_xmsg_append (msg, &sm_marker, NACKFRAG_SIZE ((unsigned) nackfrag_numbits)); - - nn_xmsg_submsg_init (msg, sm_marker, SMID_NACK_FRAG); - nf->readerId = nn_hton_entityid (rwn->rd_guid.entityid); - nf->writerId = nn_hton_entityid (pwr->e.guid.entityid); - nf->writerSN = toSN (nackfrag_seq); - nf->fragmentNumberState.bitmap_base = nackfrag.set.bitmap_base + 1; - nf->fragmentNumberState.numbits = nackfrag.set.numbits; - memcpy (nf->bits, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nackfrag_numbits)); - - { - nn_count_t *countp = - (nn_count_t *) ((char *) nf + offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nf->fragmentNumberState.numbits)); - *countp = ++pwr->nackfragcount; - nn_xmsg_submsg_setnext (msg, sm_marker); - - ETRACE (pwr, " + nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); - for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) - ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); - } - - /* Encode the sub-message when needed. */ - encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); - } - - ETRACE (pwr, "\n"); + rwn->t_last_ack = tnow; + (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_ack, intv)); + return msg; } static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_mtime_t tnow) @@ -1007,7 +855,6 @@ static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_m struct proxy_writer *pwr; struct nn_xmsg *msg; struct pwr_rd_match *rwn; - nn_locator_t loc; if ((pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, &ev->u.acknack.pwr_guid)) == NULL) { @@ -1021,88 +868,26 @@ static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_m return; } - if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc)) - { - struct participant *pp = NULL; - seqno_t nack_seq; - - if (q_omg_proxy_participant_is_secure(pwr->c.proxypp)) - { - struct reader *rd = entidx_lookup_reader_guid(pwr->e.gv->entity_index, &ev->u.acknack.rd_guid); - - if (rd) - pp = rd->c.pp; - } - - if ((msg = nn_xmsg_new (gv->xmsgpool, &ev->u.acknack.rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) - goto outofmem; - nn_xmsg_setdst1 (gv, msg, &ev->u.acknack.pwr_guid.prefix, &loc); - if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v) - { - /* If HB->ACK latency measurement is enabled, and we have a - timestamp available, add it and clear the time stamp. There - is no real guarantee that the two match, but I haven't got a - solution for that yet ... If adding the time stamp fails, - too bad, but no reason to get worried. */ - nn_xmsg_add_timestamp (msg, rwn->hb_timestamp); - rwn->hb_timestamp.v = 0; - } - add_AckNack (msg, pwr, rwn, &nack_seq); - if (nn_xmsg_size(msg) == 0) - { - /* No AckNack added. */ - nn_xmsg_free(msg); - msg = NULL; - } - else if (nack_seq) - { - rwn->t_last_nack = tnow; - rwn->seq_last_nack = nack_seq; - /* If NACKing, make sure we don't give up too soon: even though - we're not allowed to send an ACKNACK unless in response to a - HEARTBEAT, I've seen too many cases of not sending an NACK - because the writing side got confused ... Better to recover - eventually. */ - (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, gv->config.auto_resched_nack_delay)); - } - GVTRACE ("send acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT")\n", - PGUID (ev->u.acknack.rd_guid), PGUID (ev->u.acknack.pwr_guid)); - } - else - { - GVTRACE ("skip acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT"): no address\n", - PGUID (ev->u.acknack.rd_guid), PGUID (ev->u.acknack.pwr_guid)); + if (!pwr->have_seen_heartbeat) + msg = make_preemptive_acknack (ev, pwr, rwn, tnow); + else if (!(rwn->heartbeat_since_ack || rwn->heartbeatfrag_since_ack)) msg = NULL; - } - - if (!pwr->have_seen_heartbeat && tnow.v - rwn->tcreate.v <= DDS_SECS (300)) - { - /* Force pre-emptive AckNacks out until we receive a heartbeat, - but let the frequency drop over time and stop after a couple - of minutes. */ - int intv, age = (int) ((tnow.v - rwn->tcreate.v) / DDS_NSECS_IN_SEC + 1); - if (age <= 10) - intv = 1; - else if (age <= 60) - intv = 2; - else if (age <= 120) - intv = 5; - else - intv = 10; - (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, intv * DDS_NSECS_IN_SEC)); - } + else + msg = make_and_resched_acknack (ev, pwr, rwn, tnow, false); ddsrt_mutex_unlock (&pwr->e.lock); /* nn_xpack_addmsg may sleep (for bandwidth-limited channels), so must be outside the lock */ if (msg) - nn_xpack_addmsg (xp, msg, 0); - return; - - outofmem: - /* What to do if out of memory? Crash or burn? */ - ddsrt_mutex_unlock (&pwr->e.lock); - (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, DDS_MSECS (100))); + { + // a possible result of trying to encode a submessage is that it is removed, + // in which case we may end up with an empty one. + // FIXME: change encode_datareader_submsg so that it returns this and make it warn_unused_result + if (nn_xmsg_size (msg) == 0) + nn_xmsg_free (msg); + else + nn_xpack_addmsg (xp, msg, 0); + } } static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t *guid, struct proxy_reader *prd)