Change ACKNACK policy to be less aggressive

Overly aggressive sending of ACKNACKs eats bandwidth and causes
unnecessary retransmits and lowers performance; but overly timid sending
of them also reduces performance.  This commit reduces the
aggressiveness.

* It keeps more careful track of what ACKNACK (or NACKFRAG) was last
  sent and when, suppressing ACKs that don't provide new information for
  a few milliseconds and suppressing NACKs for the NackDelay
  setting.  (The setting was there all long, but it didn't honor it when
  the writer asked for a response.)

* It ignores the NackDelay when all that was requested has arrived, or
  when it receives a directed heartbeat from a Cyclone peer.  The latter
  is taken as an indication that no more is following, and allows the
  recipient to ask far arbitrary amounts of data and rely on the sender
  to limit the retransmit to what seems reasonable.  (For NACKFRAG one
  can do it in the recipient, but for ACKNACK one cannot, and so one
  might as well do it at the sender always.)

* Sufficient state is maintained in the match object for the ACKNACK
  generator to decide whether or not to send an ACKNACK following the
  rules, and it may decide to send just an ACK even though there is data
  missing, or nothing at all.

* If HEARTBEAT processing requires an immediate response, the response
  message is generated by the receive thread, but still queued for
  transmission.  If a delayed response is required, it schedules the
  ACKNACK event.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-06-22 18:00:49 +02:00 committed by eboasson
parent 0a4c863f11
commit 63b1a7179b
19 changed files with 835 additions and 421 deletions

View file

@ -406,7 +406,7 @@ The default value is: "default".
### //CycloneDDS/Domain/Internal ### //CycloneDDS/Domain/Internal
Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [BurstSize](#cycloneddsdomaininternalburstsize), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration) Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AckDelay](#cycloneddsdomaininternalackdelay), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [BurstSize](#cycloneddsdomaininternalburstsize), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
The Internal elements deal with a variety of settings that evolving and that are not necessarily fully supported. For the vast majority of the Internal settings, the functionality per-se is supported, but the right to change the way the options control the functionality is reserved. This includes renaming or moving options. The Internal elements deal with a variety of settings that evolving and that are not necessarily fully supported. For the vast majority of the Internal settings, the functionality per-se is supported, but the right to change the way the options control the functionality is reserved. This includes renaming or moving options.
@ -419,6 +419,16 @@ Proxy readers that are assumed to sill be retrieving historical data get this ma
The default value is: "0". The default value is: "0".
#### //CycloneDDS/Domain/Internal/AckDelay
Number-with-unit
This setting controls the delay between sending identical acknowledgements.
The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.
The default value is: "10 ms".
#### //CycloneDDS/Domain/Internal/AssumeMulticastCapable #### //CycloneDDS/Domain/Internal/AssumeMulticastCapable
Text Text
@ -724,7 +734,7 @@ This setting controls the delay between receipt of a HEARTBEAT indicating missin
The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day. The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.
The default value is: "10 ms". The default value is: "100 ms".
#### //CycloneDDS/Domain/Internal/PreEmptiveAckDelay #### //CycloneDDS/Domain/Internal/PreEmptiveAckDelay
@ -921,7 +931,7 @@ This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs,
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes). The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
The default value is: "100 kB". The default value is: "500 kB".
##### //CycloneDDS/Domain/Internal/Watermarks/WhcHighInit ##### //CycloneDDS/Domain/Internal/Watermarks/WhcHighInit

View file

@ -298,6 +298,13 @@ CycloneDDS configuration""" ] ]
xsd:integer xsd:integer
}? }?
& [ a:documentation [ xml:lang="en" """ & [ a:documentation [ xml:lang="en" """
<p>This setting controls the delay between sending identical acknowledgements.</p>
<p>The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.</p>
<p>The default value is: "10 ms".</p>""" ] ]
element AckDelay {
duration
}?
& [ a:documentation [ xml:lang="en" """
<p>This element controls which network interfaces are assumed to be capable of multicasting even when the interface flags returned by the operating system state it is not (this provides a workaround for some platforms). It is a comma-separated lists of patterns (with ? and * wildcards) against which the interface names are matched.</p> <p>This element controls which network interfaces are assumed to be capable of multicasting even when the interface flags returned by the operating system state it is not (this provides a workaround for some platforms). It is a comma-separated lists of patterns (with ? and * wildcards) against which the interface names are matched.</p>
<p>The default value is: "".</p>""" ] ] <p>The default value is: "".</p>""" ] ]
element AssumeMulticastCapable { element AssumeMulticastCapable {
@ -511,7 +518,7 @@ CycloneDDS configuration""" ] ]
& [ a:documentation [ xml:lang="en" """ & [ a:documentation [ xml:lang="en" """
<p>This setting controls the delay between receipt of a HEARTBEAT indicating missing samples and a NACK (ignored when the HEARTBEAT requires an answer). However, no NACK is sent if a NACK had been scheduled already for a response earlier than the delay requests: then that NACK will incorporate the latest information.</p> <p>This setting controls the delay between receipt of a HEARTBEAT indicating missing samples and a NACK (ignored when the HEARTBEAT requires an answer). However, no NACK is sent if a NACK had been scheduled already for a response earlier than the delay requests: then that NACK will incorporate the latest information.</p>
<p>The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.</p> <p>The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.</p>
<p>The default value is: "10 ms".</p>""" ] ] <p>The default value is: "100 ms".</p>""" ] ]
element NackDelay { element NackDelay {
duration duration
}? }?
@ -650,7 +657,7 @@ CycloneDDS configuration""" ] ]
& [ a:documentation [ xml:lang="en" """ & [ a:documentation [ xml:lang="en" """
<p>This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, expressed in bytes. A writer is suspended when the WHC reaches this size.</p> <p>This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, expressed in bytes. A writer is suspended when the WHC reaches this size.</p>
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p> <p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
<p>The default value is: "100 kB".</p>""" ] ] <p>The default value is: "500 kB".</p>""" ] ]
element WhcHigh { element WhcHigh {
memsize memsize
}? }?

View file

@ -472,6 +472,7 @@ CycloneDDS configuration</xs:documentation>
<xs:complexType> <xs:complexType>
<xs:all> <xs:all>
<xs:element minOccurs="0" ref="config:AccelerateRexmitBlockSize"/> <xs:element minOccurs="0" ref="config:AccelerateRexmitBlockSize"/>
<xs:element minOccurs="0" ref="config:AckDelay"/>
<xs:element minOccurs="0" ref="config:AssumeMulticastCapable"/> <xs:element minOccurs="0" ref="config:AssumeMulticastCapable"/>
<xs:element minOccurs="0" ref="config:AutoReschedNackDelay"/> <xs:element minOccurs="0" ref="config:AutoReschedNackDelay"/>
<xs:element minOccurs="0" ref="config:BuiltinEndpointSet"/> <xs:element minOccurs="0" ref="config:BuiltinEndpointSet"/>
@ -527,6 +528,14 @@ CycloneDDS configuration</xs:documentation>
&lt;p&gt;The default value is: "0".&lt;/p&gt;</xs:documentation> &lt;p&gt;The default value is: "0".&lt;/p&gt;</xs:documentation>
</xs:annotation> </xs:annotation>
</xs:element> </xs:element>
<xs:element name="AckDelay" type="config:duration">
<xs:annotation>
<xs:documentation>
&lt;p&gt;This setting controls the delay between sending identical acknowledgements.&lt;/p&gt;
&lt;p&gt;The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.&lt;/p&gt;
&lt;p&gt;The default value is: "10 ms".&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="AssumeMulticastCapable" type="xs:string"> <xs:element name="AssumeMulticastCapable" type="xs:string">
<xs:annotation> <xs:annotation>
<xs:documentation> <xs:documentation>
@ -820,7 +829,7 @@ CycloneDDS configuration</xs:documentation>
<xs:documentation> <xs:documentation>
&lt;p&gt;This setting controls the delay between receipt of a HEARTBEAT indicating missing samples and a NACK (ignored when the HEARTBEAT requires an answer). However, no NACK is sent if a NACK had been scheduled already for a response earlier than the delay requests: then that NACK will incorporate the latest information.&lt;/p&gt; &lt;p&gt;This setting controls the delay between receipt of a HEARTBEAT indicating missing samples and a NACK (ignored when the HEARTBEAT requires an answer). However, no NACK is sent if a NACK had been scheduled already for a response earlier than the delay requests: then that NACK will incorporate the latest information.&lt;/p&gt;
&lt;p&gt;The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.&lt;/p&gt; &lt;p&gt;The unit must be specified explicitly. Recognised units: ns, us, ms, s, min, hr, day.&lt;/p&gt;
&lt;p&gt;The default value is: "10 ms".&lt;/p&gt;</xs:documentation> &lt;p&gt;The default value is: "100 ms".&lt;/p&gt;</xs:documentation>
</xs:annotation> </xs:annotation>
</xs:element> </xs:element>
<xs:element name="PreEmptiveAckDelay" type="config:duration"> <xs:element name="PreEmptiveAckDelay" type="config:duration">
@ -1010,7 +1019,7 @@ CycloneDDS configuration</xs:documentation>
<xs:documentation> <xs:documentation>
&lt;p&gt;This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, expressed in bytes. A writer is suspended when the WHC reaches this size.&lt;/p&gt; &lt;p&gt;This element sets the maximum allowed high-water mark for the Cyclone DDS WHCs, expressed in bytes. A writer is suspended when the WHC reaches this size.&lt;/p&gt;
&lt;p&gt;The unit must be specified explicitly. Recognised units: B (bytes), kB &amp; KiB (2&lt;sup&gt;10&lt;/sup&gt; bytes), MB &amp; MiB (2&lt;sup&gt;20&lt;/sup&gt; bytes), GB &amp; GiB (2&lt;sup&gt;30&lt;/sup&gt; bytes).&lt;/p&gt; &lt;p&gt;The unit must be specified explicitly. Recognised units: B (bytes), kB &amp; KiB (2&lt;sup&gt;10&lt;/sup&gt; bytes), MB &amp; MiB (2&lt;sup&gt;20&lt;/sup&gt; bytes), GB &amp; GiB (2&lt;sup&gt;30&lt;/sup&gt; bytes).&lt;/p&gt;
&lt;p&gt;The default value is: "100 kB".&lt;/p&gt;</xs:documentation> &lt;p&gt;The default value is: "500 kB".&lt;/p&gt;</xs:documentation>
</xs:annotation> </xs:annotation>
</xs:element> </xs:element>
<xs:element name="WhcHighInit" type="config:memsize"> <xs:element name="WhcHighInit" type="config:memsize">

View file

@ -43,6 +43,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_cdrstream.c ddsi_cdrstream.c
ddsi_time.c ddsi_time.c
ddsi_ownip.c ddsi_ownip.c
ddsi_acknack.c
q_addrset.c q_addrset.c
q_bitset_inlines.c q_bitset_inlines.c
q_bswap.c q_bswap.c
@ -116,6 +117,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
ddsi_ownip.h ddsi_ownip.h
ddsi_cfgunits.h ddsi_cfgunits.h
ddsi_cfgelems.h ddsi_cfgelems.h
ddsi_acknack.h
q_addrset.h q_addrset.h
q_bitset.h q_bitset.h
q_bswap.h q_bswap.h

View file

@ -0,0 +1,43 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_ACKNACK_H
#define DDSI_ACKNACK_H
#include <stddef.h>
#include <stdbool.h>
#include "dds/ddsrt/time.h"
#include "dds/ddsi/q_xevent.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_protocol.h"
#if defined (__cplusplus)
extern "C" {
#endif
enum add_AckNack_result {
AANR_SUPPRESSED_ACK, //!< sending nothing: too short a time since the last ACK
AANR_ACK, //!< sending an ACK and there's nothing to NACK
AANR_SUPPRESSED_NACK, //!< sending an ACK even though there are things to NACK
AANR_NACK, //!< sending a NACK, possibly also a NACKFRAG
AANR_NACKFRAG_ONLY //!< sending only a NACKFRAG
};
void sched_acknack_if_needed (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack);
struct nn_xmsg *make_and_resched_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack);
#if defined (__cplusplus)
}
#endif
#endif /* DDSI_ACKNACK_H */

View file

@ -840,7 +840,7 @@ static struct cfgelem internal_watermarks_cfgelems[] = {
"expressed in bytes. A suspended writer resumes transmitting when its " "expressed in bytes. A suspended writer resumes transmitting when its "
"Cyclone DDS WHC shrinks to this size.</p>"), "Cyclone DDS WHC shrinks to this size.</p>"),
UNIT("memsize")), UNIT("memsize")),
STRING("WhcHigh", NULL, 1, "100 kB", STRING("WhcHigh", NULL, 1, "500 kB",
MEMBER(whc_highwater_mark), MEMBER(whc_highwater_mark),
FUNCTIONS(0, uf_memsize, 0, pf_memsize), FUNCTIONS(0, uf_memsize, 0, pf_memsize),
DESCRIPTION( DESCRIPTION(
@ -1194,7 +1194,7 @@ static struct cfgelem internal_cfgelems[] = {
"operating system by default creates a larger buffer, it is left " "operating system by default creates a larger buffer, it is left "
"unchanged.</p>"), "unchanged.</p>"),
UNIT("memsize")), UNIT("memsize")),
STRING("NackDelay", NULL, 1, "10 ms", STRING("NackDelay", NULL, 1, "100 ms",
MEMBER(nack_delay), MEMBER(nack_delay),
FUNCTIONS(0, uf_duration_ms_1hr, 0, pf_duration), FUNCTIONS(0, uf_duration_ms_1hr, 0, pf_duration),
DESCRIPTION( DESCRIPTION(
@ -1204,6 +1204,13 @@ static struct cfgelem internal_cfgelems[] = {
"scheduled already for a response earlier than the delay requests: " "scheduled already for a response earlier than the delay requests: "
"then that NACK will incorporate the latest information.</p>"), "then that NACK will incorporate the latest information.</p>"),
UNIT("duration")), UNIT("duration")),
STRING("AckDelay", NULL, 1, "10 ms",
MEMBER(ack_delay),
FUNCTIONS(0, uf_duration_ms_1hr, 0, pf_duration),
DESCRIPTION(
"<p>This setting controls the delay between sending identical "
"acknowledgements.</p>"),
UNIT("duration")),
STRING("AutoReschedNackDelay", NULL, 1, "1 s", STRING("AutoReschedNackDelay", NULL, 1, "1 s",
MEMBER(auto_resched_nack_delay), MEMBER(auto_resched_nack_delay),
FUNCTIONS(0, uf_duration_inf, 0, pf_duration), FUNCTIONS(0, uf_duration_inf, 0, pf_duration),

View file

@ -947,7 +947,7 @@ bool decode_DataFrag(const struct ddsi_domaingv *gv, struct nn_rsample_info *sam
* @param[in] pwr Writer for which the message is intended. * @param[in] pwr Writer for which the message is intended.
* @param[in] rd_guid Origin reader guid. * @param[in] rd_guid Origin reader guid.
*/ */
void encode_datareader_submsg(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, struct proxy_writer *pwr, const struct ddsi_guid *rd_guid); void encode_datareader_submsg(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, const struct proxy_writer *pwr, const struct ddsi_guid *rd_guid);
/** /**
* @brief Encode datawriter submessage when necessary. * @brief Encode datawriter submessage when necessary.
@ -1332,7 +1332,7 @@ inline void
encode_datareader_submsg( encode_datareader_submsg(
UNUSED_ARG(struct nn_xmsg *msg), UNUSED_ARG(struct nn_xmsg *msg),
UNUSED_ARG(struct nn_xmsg_marker sm_marker), UNUSED_ARG(struct nn_xmsg_marker sm_marker),
UNUSED_ARG(struct proxy_writer *pwr), UNUSED_ARG(const struct proxy_writer *pwr),
UNUSED_ARG(const struct ddsi_guid *rd_guid)) UNUSED_ARG(const struct ddsi_guid *rd_guid))
{ {
} }

View file

@ -337,6 +337,7 @@ struct config
int multicast_ttl; int multicast_ttl;
struct config_maybe_uint32 socket_min_rcvbuf_size; struct config_maybe_uint32 socket_min_rcvbuf_size;
uint32_t socket_min_sndbuf_size; uint32_t socket_min_sndbuf_size;
int64_t ack_delay;
int64_t nack_delay; int64_t nack_delay;
int64_t preemptive_ack_delay; int64_t preemptive_ack_delay;
int64_t schedule_time_rounding; int64_t schedule_time_rounding;

View file

@ -129,7 +129,7 @@ struct wr_prd_match {
seqno_t max_seq; /* sort-of highest ack'd seq nr in subtree (see augment function) */ seqno_t max_seq; /* sort-of highest ack'd seq nr in subtree (see augment function) */
seqno_t seq; /* highest acknowledged seq nr */ seqno_t seq; /* highest acknowledged seq nr */
seqno_t last_seq; /* highest seq send to this reader used when filter is applied */ seqno_t last_seq; /* highest seq send to this reader used when filter is applied */
int32_t num_reliable_readers_where_seq_equals_max; uint32_t num_reliable_readers_where_seq_equals_max;
ddsi_guid_t arbitrary_unacked_reader; ddsi_guid_t arbitrary_unacked_reader;
nn_count_t prev_acknack; /* latest accepted acknack sequence number */ nn_count_t prev_acknack; /* latest accepted acknack sequence number */
nn_count_t prev_nackfrag; /* latest accepted nackfrag sequence number */ nn_count_t prev_nackfrag; /* latest accepted nackfrag sequence number */
@ -150,6 +150,13 @@ enum pwr_rd_match_syncstate {
PRMSS_OUT_OF_SYNC /* not in sync with proxy writer */ PRMSS_OUT_OF_SYNC /* not in sync with proxy writer */
}; };
struct last_nack_summary {
seqno_t seq_end_p1; /* last seq for which we requested a retransmit */
seqno_t seq_base;
uint32_t frag_end_p1; /* last fragnum of seq_last_nack for which requested a retransmit */
uint32_t frag_base;
};
struct pwr_rd_match { struct pwr_rd_match {
ddsrt_avl_node_t avlnode; ddsrt_avl_node_t avlnode;
ddsi_guid_t rd_guid; ddsi_guid_t rd_guid;
@ -158,12 +165,18 @@ struct pwr_rd_match {
nn_count_t prev_heartbeat; /* latest accepted heartbeat (see also add_proxy_writer_to_reader) */ nn_count_t prev_heartbeat; /* latest accepted heartbeat (see also add_proxy_writer_to_reader) */
ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */ ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */
ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */
ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */
seqno_t seq_last_nack; /* last seq for which we requested a retransmit */ ddsrt_mtime_t t_last_ack; /* (local) time we last sent any ACKNACK */
seqno_t last_seq; /* last known sequence number from this writer */ seqno_t last_seq; /* last known sequence number from this writer */
struct last_nack_summary last_nack;
struct xevent *acknack_xevent; /* entry in xevent queue for sending acknacks */ struct xevent *acknack_xevent; /* entry in xevent queue for sending acknacks */
enum pwr_rd_match_syncstate in_sync; /* whether in sync with the proxy writer */ enum pwr_rd_match_syncstate in_sync; /* whether in sync with the proxy writer */
unsigned filtered:1; unsigned ack_requested : 1; /* set on receipt of HEARTBEAT with FINAL clear, cleared on sending an ACKNACK */
unsigned heartbeat_since_ack : 1; /* set when a HEARTBEAT has been received since the last ACKNACK */
unsigned heartbeatfrag_since_ack : 1; /* set when a HEARTBEATFRAG has been received since the last ACKNACK */
unsigned directed_heartbeat : 1; /* set on receipt of a directed heartbeat, cleared on sending an ACKNACK */
unsigned nack_sent_on_nackdelay : 1; /* set when the most recent NACK sent was because of the NackDelay */
unsigned filtered : 1;
union { union {
struct { struct {
seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */ seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */
@ -304,7 +317,7 @@ struct writer
uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */ uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */
uint32_t rexmit_burst_size_limit; /* derived from reader's receive_buffer_size */ uint32_t rexmit_burst_size_limit; /* derived from reader's receive_buffer_size */
uint32_t num_readers; /* total number of matching PROXY readers */ uint32_t num_readers; /* total number of matching PROXY readers */
int32_t num_reliable_readers; /* number of matching reliable PROXY readers */ uint32_t num_reliable_readers; /* number of matching reliable PROXY readers */
ddsrt_avl_tree_t readers; /* all matching PROXY readers, see struct wr_prd_match */ ddsrt_avl_tree_t readers; /* all matching PROXY readers, see struct wr_prd_match */
ddsrt_avl_tree_t local_readers; /* all matching LOCAL readers, see struct wr_rd_match */ ddsrt_avl_tree_t local_readers; /* all matching LOCAL readers, see struct wr_rd_match */
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
@ -438,10 +451,9 @@ struct proxy_writer {
int32_t n_reliable_readers; /* number of those that are reliable */ int32_t n_reliable_readers; /* number of those that are reliable */
int32_t n_readers_out_of_sync; /* number of those that require special handling (accepting historical data, waiting for historical data set to become complete) */ int32_t n_readers_out_of_sync; /* number of those that require special handling (accepting historical data, waiting for historical data set to become complete) */
seqno_t last_seq; /* highest known seq published by the writer, not last delivered */ seqno_t last_seq; /* highest known seq published by the writer, not last delivered */
uint32_t last_fragnum; /* last known frag for last_seq, or ~0u if last_seq not partial */ uint32_t last_fragnum; /* last known frag for last_seq, or UINT32_MAX if last_seq not partial */
nn_count_t nackfragcount; /* last nackfrag seq number */ nn_count_t nackfragcount; /* last nackfrag seq number */
ddsrt_atomic_uint32_t next_deliv_seq_lowword; /* lower 32-bits for next sequence number that will be delivered; for generating acks; 32-bit so atomic reads on all supported platforms */ ddsrt_atomic_uint32_t next_deliv_seq_lowword; /* lower 32-bits for next sequence number that will be delivered; for generating acks; 32-bit so atomic reads on all supported platforms */
unsigned last_fragnum_reset: 1; /* iff set, heartbeat advertising last_seq as highest seq resets last_fragnum */
unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */ unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */
unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */ unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */
unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */ unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */

View file

@ -247,7 +247,7 @@ typedef struct AckNack {
DDSRT_WARNING_MSVC_ON(4200) DDSRT_WARNING_MSVC_ON(4200)
#define ACKNACK_FLAG_FINAL 0x02u #define ACKNACK_FLAG_FINAL 0x02u
#define ACKNACK_SIZE(numbits) (offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits) + 4) #define ACKNACK_SIZE(numbits) (offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits) + 4)
#define ACKNACK_SIZE_MAX ACKNACK_SIZE (256u) #define ACKNACK_SIZE_MAX ACKNACK_SIZE (NN_SEQUENCE_NUMBER_SET_MAX_BITS)
DDSRT_WARNING_MSVC_OFF(4200) DDSRT_WARNING_MSVC_OFF(4200)
typedef struct Gap { typedef struct Gap {
@ -260,7 +260,7 @@ typedef struct Gap {
} Gap_t; } Gap_t;
DDSRT_WARNING_MSVC_ON(4200) DDSRT_WARNING_MSVC_ON(4200)
#define GAP_SIZE(numbits) (offsetof (Gap_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits)) #define GAP_SIZE(numbits) (offsetof (Gap_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits))
#define GAP_SIZE_MAX GAP_SIZE (256u) #define GAP_SIZE_MAX GAP_SIZE (NN_SEQUENCE_NUMBER_SET_MAX_BITS)
typedef struct InfoTS { typedef struct InfoTS {
SubmessageHeader_t smhdr; SubmessageHeader_t smhdr;
@ -300,7 +300,7 @@ typedef struct NackFrag {
} NackFrag_t; } NackFrag_t;
DDSRT_WARNING_MSVC_ON(4200) DDSRT_WARNING_MSVC_ON(4200)
#define NACKFRAG_SIZE(numbits) (offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits) + 4) #define NACKFRAG_SIZE(numbits) (offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits) + 4)
#define NACKFRAG_SIZE_MAX NACKFRAG_SIZE (256u) #define NACKFRAG_SIZE_MAX NACKFRAG_SIZE (NN_FRAGMENT_NUMBER_SET_MAX_BITS)
typedef union Submessage { typedef union Submessage {
SubmessageHeader_t smhdr; SubmessageHeader_t smhdr;

View file

@ -223,7 +223,15 @@ struct nn_defrag *nn_defrag_new (const struct ddsrt_log_cfg *logcfg, enum nn_def
void nn_defrag_free (struct nn_defrag *defrag); void nn_defrag_free (struct nn_defrag *defrag);
struct nn_rsample *nn_defrag_rsample (struct nn_defrag *defrag, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo); struct nn_rsample *nn_defrag_rsample (struct nn_defrag *defrag, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo);
void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1); void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1);
int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz);
enum nn_defrag_nackmap_result {
DEFRAG_NACKMAP_UNKNOWN_SAMPLE,
DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN,
DEFRAG_NACKMAP_FRAGMENTS_MISSING
};
enum nn_defrag_nackmap_result nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz);
void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min); void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min);
struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_reorder_mode mode, uint32_t max_samples, bool late_ack_mode); struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_reorder_mode mode, uint32_t max_samples, bool late_ack_mode);
@ -232,8 +240,8 @@ struct nn_rsample *nn_reorder_rsample_dup_first (struct nn_rmsg *rmsg, struct nn
struct nn_rdata *nn_rsample_fragchain (struct nn_rsample *rsample); struct nn_rdata *nn_rsample_fragchain (struct nn_rsample *rsample);
nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rsample *rsampleiv, int *refcount_adjust, int delivery_queue_full_p); nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rsample *rsampleiv, int *refcount_adjust, int delivery_queue_full_p);
nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rdata *rdata, seqno_t min, seqno_t maxp1, int *refcount_adjust); nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reorder *reorder, struct nn_rdata *rdata, seqno_t min, seqno_t maxp1, int *refcount_adjust);
int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq); int nn_reorder_wantsample (const struct nn_reorder *reorder, seqno_t seq);
unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail); unsigned nn_reorder_nackmap (const struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail);
seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder); seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder);
void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq); void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq);

View file

@ -0,0 +1,495 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include "dds/ddsrt/static_assert.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_misc.h"
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/q_log.h"
#include "dds/ddsi/q_bitset.h"
#include "dds/ddsi/ddsi_domaingv.h"
#include "dds/ddsi/ddsi_acknack.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/ddsi_security_omg.h"
#define ACK_REASON_IN_FLAGS 0
static seqno_t next_deliv_seq (const struct proxy_writer *pwr, const seqno_t next_seq)
{
/* We want to determine next_deliv_seq, the next sequence number to
be delivered to all in-sync readers, so that we can acknowledge
what we have actually delivered. This is different from next_seq
tracks, which tracks the sequence number up to which all samples
have been received. The difference is the delivery queue.
There is always but a single delivery queue, and hence delivery
thread, associated with a single proxy writer; but the ACKs are
always generated by another thread. Therefore, updates to
next_deliv_seq need only be atomic with respect to these reads.
On all supported platforms we can atomically load and store 32
bits without issue, and so we store just the low word of the
sequence number.
We know 1 <= next_deliv_seq AND next_seq - N <= next_deliv_seq <=
next_seq for N << 2**32. With n = next_seq, nd = next_deliv_seq,
H the upper half and L the lower half:
- H(nd) <= H(n) <= H(nd)+1 { n >= nd AND N << 2*32}
- H(n) = H(nd) => L(n) >= L(nd) { n >= nd }
- H(n) = H(nd)+1 => L(n) < L(nd) { N << 2*32 }
Therefore:
L(n) < L(nd) <=> H(n) = H(nd+1)
a.k.a.:
nd = nd' - if nd' > nd then 2**32 else 0
where nd' = 2**32 * H(n) + L(nd)
By not locking next_deliv_seq, we may have nd a bit lower than it
could be, but that only means we are acknowledging slightly less
than we could; but that is perfectly acceptible.
FIXME: next_seq - #dqueue could probably be used instead,
provided #dqueue is decremented after delivery, rather than
before delivery. */
const uint32_t lw = ddsrt_atomic_ld32 (&pwr->next_deliv_seq_lowword);
seqno_t next_deliv_seq;
next_deliv_seq = (next_seq & ~(seqno_t) UINT32_MAX) | lw;
if (next_deliv_seq > next_seq)
next_deliv_seq -= ((seqno_t) 1) << 32;
assert (0 < next_deliv_seq && next_deliv_seq <= next_seq);
return next_deliv_seq;
}
static void add_AckNack_getsource (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct nn_reorder **reorder, seqno_t *bitmap_base, int *notail)
{
/* if in sync, look at proxy writer status, else look at proxy-writer--reader match status */
if (rwn->in_sync == PRMSS_OUT_OF_SYNC || rwn->filtered)
{
*reorder = rwn->u.not_in_sync.reorder;
*bitmap_base = nn_reorder_next_seq (*reorder);
*notail = 0;
}
else
{
*reorder = pwr->reorder;
if (!pwr->e.gv->config.late_ack_mode)
{
*bitmap_base = nn_reorder_next_seq (*reorder);
*notail = 0;
}
else
{
*bitmap_base = next_deliv_seq (pwr, nn_reorder_next_seq (*reorder));
*notail = nn_dqueue_is_full (pwr->dqueue);
}
}
}
DDSRT_STATIC_ASSERT ((NN_SEQUENCE_NUMBER_SET_MAX_BITS % 32) == 0 && (NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0);
struct add_AckNack_info {
bool nack_sent_on_nackdelay;
#if ACK_REASON_IN_FLAGS
uint8_t flags;
#endif
struct {
struct nn_sequence_number_set_header set;
uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32];
} acknack;
struct {
seqno_t seq;
struct nn_fragment_number_set_header set;
uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32];
} nackfrag;
};
static bool add_AckNack_makebitmaps (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct add_AckNack_info *info)
{
struct nn_reorder *reorder;
seqno_t bitmap_base;
int notail; /* notail = false: all known missing ones are nack'd */
add_AckNack_getsource (pwr, rwn, &reorder, &bitmap_base, &notail);
/* Make bitmap; note that we've made sure to have room for the maximum bitmap size. */
const seqno_t last_seq = rwn->filtered ? rwn->last_seq : pwr->last_seq;
const uint32_t numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &info->acknack.set, info->acknack.bits, NN_SEQUENCE_NUMBER_SET_MAX_BITS, notail);
if (numbits == 0)
{
info->nackfrag.seq = 0;
return false;
}
/* Scan through bitmap, cutting it off at the first missing sample that the defragmenter
knows about. Then note the sequence number & add a NACKFRAG for that sample */
info->nackfrag.seq = 0;
const seqno_t base = fromSN (info->acknack.set.bitmap_base);
for (uint32_t i = 0; i < numbits; i++)
{
if (!nn_bitset_isset (numbits, info->acknack.bits, i))
continue;
const seqno_t seq = base + i;
const uint32_t fragnum = (seq == pwr->last_seq) ? pwr->last_fragnum : UINT32_MAX;
switch (nn_defrag_nackmap (pwr->defrag, seq, fragnum, &info->nackfrag.set, info->nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS))
{
case DEFRAG_NACKMAP_UNKNOWN_SAMPLE:
break;
case DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN:
/* Cut the NACK short (or make it an ACK if this is the first sample), no NACKFRAG */
info->nackfrag.seq = 0;
info->acknack.set.numbits = i;
return (i > 0);
case DEFRAG_NACKMAP_FRAGMENTS_MISSING:
/* Cut the NACK short, NACKFRAG */
info->nackfrag.seq = seq;
info->acknack.set.numbits = i;
return true;
}
}
return true;
}
static void add_NackFrag (struct nn_xmsg *msg, const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, const struct add_AckNack_info *info)
{
struct nn_xmsg_marker sm_marker;
NackFrag_t *nf;
assert (info->nackfrag.set.numbits > 0 && info->nackfrag.set.numbits <= NN_FRAGMENT_NUMBER_SET_MAX_BITS);
nf = nn_xmsg_append (msg, &sm_marker, NACKFRAG_SIZE (info->nackfrag.set.numbits));
nn_xmsg_submsg_init (msg, sm_marker, SMID_NACK_FRAG);
nf->readerId = nn_hton_entityid (rwn->rd_guid.entityid);
nf->writerId = nn_hton_entityid (pwr->e.guid.entityid);
nf->writerSN = toSN (info->nackfrag.seq);
#if ACK_REASON_IN_FLAGS
nf->smhdr.flags |= info->flags;
#endif
// We use 0-based fragment numbers, but externally have to provide 1-based fragment numbers */
nf->fragmentNumberState.bitmap_base = info->nackfrag.set.bitmap_base + 1;
nf->fragmentNumberState.numbits = info->nackfrag.set.numbits;
memcpy (nf->bits, info->nackfrag.bits, NN_FRAGMENT_NUMBER_SET_BITS_SIZE (info->nackfrag.set.numbits));
// Count field is at a variable offset ... silly DDSI spec
nn_count_t * const countp =
(nn_count_t *) ((char *) nf + offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nf->fragmentNumberState.numbits));
*countp = pwr->nackfragcount;
nn_xmsg_submsg_setnext (msg, sm_marker);
if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE)
{
ETRACE (pwr, "nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":",
pwr->nackfragcount, fromSN (nf->writerSN),
nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits);
for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0');
}
// Encode the sub-message when needed
encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid);
}
static void add_AckNack (struct nn_xmsg *msg, const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, const struct add_AckNack_info *info)
{
/* If pwr->have_seen_heartbeat == 0, no heartbeat has been received
by this proxy writer yet, so we'll be sending a pre-emptive
AckNack. NACKing data now will most likely cause another NACK
upon reception of the first heartbeat, and so cause the data to
be resent twice. */
AckNack_t *an;
struct nn_xmsg_marker sm_marker;
an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX);
nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK);
an->readerId = nn_hton_entityid (rwn->rd_guid.entityid);
an->writerId = nn_hton_entityid (pwr->e.guid.entityid);
// set FINAL flag late, in case it is decided that the "response_required" flag
// should be set depending on the exact AckNack/NackFrag generated
an->smhdr.flags |= ACKNACK_FLAG_FINAL;
#if ACK_REASON_IN_FLAGS
an->smhdr.flags |= info->flags;
#endif
an->readerSNState = info->acknack.set;
memcpy (an->bits, info->acknack.bits, NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits));
// Count field is at a variable offset ... silly DDSI spec
nn_count_t * const countp =
(nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits));
*countp = rwn->count;
// Reset submessage size, now that we know the real size, and update the offset to the next submessage.
nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits));
nn_xmsg_submsg_setnext (msg, sm_marker);
if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE)
{
ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": F#%"PRIu32":%"PRId64"/%"PRIu32":",
PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count,
fromSN (an->readerSNState.bitmap_base), an->readerSNState.numbits);
for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (an->readerSNState.numbits, an->bits, ui) ? '1' : '0');
}
// Encode the sub-message when needed
encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid);
}
static enum add_AckNack_result get_AckNack_info (const struct proxy_writer *pwr, const struct pwr_rd_match *rwn, struct last_nack_summary *nack_summary, struct add_AckNack_info *info, bool ackdelay_passed, bool nackdelay_passed)
{
/* If pwr->have_seen_heartbeat == 0, no heartbeat has been received
by this proxy writer yet, so we'll be sending a pre-emptive
AckNack. NACKing data now will most likely cause another NACK
upon reception of the first heartbeat, and so cause the data to
be resent twice. */
enum add_AckNack_result result;
#if ACK_REASON_IN_FLAGS
info->flags = 0;
#endif
if (!add_AckNack_makebitmaps (pwr, rwn, info))
{
info->nack_sent_on_nackdelay = rwn->nack_sent_on_nackdelay;
nack_summary->seq_base = fromSN (info->acknack.set.bitmap_base);
nack_summary->seq_end_p1 = 0;
nack_summary->frag_base = 0;
nack_summary->frag_end_p1 = 0;
result = AANR_ACK;
}
else
{
// [seq_base:0 .. seq_end_p1:0) + [seq_end_p1:frag_base .. seq_end_p1:frag_end_p1) if frag_end_p1 > 0
const seqno_t seq_base = fromSN (info->acknack.set.bitmap_base);
assert (seq_base >= 1 && (info->acknack.set.numbits > 0 || info->nackfrag.seq > 0));
assert (info->nackfrag.seq == 0 || info->nackfrag.set.numbits > 0);
const seqno_t seq_end_p1 = seq_base + info->acknack.set.numbits;
const uint32_t frag_base = (info->nackfrag.seq > 0) ? info->nackfrag.set.bitmap_base : 0;
const uint32_t frag_end_p1 = (info->nackfrag.seq > 0) ? info->nackfrag.set.bitmap_base + info->nackfrag.set.numbits : 0;
/* Let caller know whether it is a nack, and, in steady state, set
final to prevent a response if it isn't. The initial
(pre-emptive) acknack is different: it'd be nice to get a
heartbeat in response.
Who cares about an answer to an acknowledgment!? -- actually,
that'd a very useful feature in combination with directed
heartbeats, or somesuch, to get reliability guarantees. */
nack_summary->seq_end_p1 = seq_end_p1;
nack_summary->frag_end_p1 = frag_end_p1;
nack_summary->seq_base = seq_base;
nack_summary->frag_base = frag_base;
// [seq_base:0 .. seq_end_p1:0) and [seq_end_p1:frag_base .. seq_end_p1:frag_end_p1) if frag_end_p1 > 0
if (seq_base > rwn->last_nack.seq_end_p1 || (seq_base == rwn->last_nack.seq_end_p1 && frag_base >= rwn->last_nack.frag_end_p1))
{
// A NACK for something not previously NACK'd or NackDelay passed, update nack_{seq,frag} to reflect
// the changed state
info->nack_sent_on_nackdelay = false;
#if ACK_REASON_IN_FLAGS
info->flags = 0x10;
#endif
result = AANR_NACK;
}
else if (rwn->directed_heartbeat && (!rwn->nack_sent_on_nackdelay || nackdelay_passed))
{
info->nack_sent_on_nackdelay = false;
#if ACK_REASON_IN_FLAGS
info->flags = 0x20;
#endif
result = AANR_NACK;
}
else if (nackdelay_passed)
{
info->nack_sent_on_nackdelay = true;
#if ACK_REASON_IN_FLAGS
info->flags = 0x30;
#endif
result = AANR_NACK;
}
else
{
// Overlap between this NACK and the previous one and NackDelay has not yet passed: clear numbits and
// nackfrag_numbits to turn the NACK into an ACK and pretend to the caller nothing scary is going on.
#if ACK_REASON_IN_FLAGS
info->flags = 0x40;
#endif
info->nack_sent_on_nackdelay = rwn->nack_sent_on_nackdelay;
info->acknack.set.numbits = 0;
info->nackfrag.seq = 0;
result = AANR_SUPPRESSED_NACK;
}
}
if (result == AANR_ACK || result == AANR_SUPPRESSED_NACK)
{
// ACK and SUPPRESSED_NACK both end up being a pure ACK; send those only if we have to
if (!(rwn->heartbeat_since_ack && rwn->ack_requested))
result = AANR_SUPPRESSED_ACK; // writer didn't ask for it
else if (!(nack_summary->seq_base > rwn->last_nack.seq_base || ackdelay_passed))
result = AANR_SUPPRESSED_ACK; // no progress since last, not enough time passed
}
else if (info->acknack.set.numbits == 0 && info->nackfrag.seq > 0 && !rwn->ack_requested)
{
// if we are not NACK'ing full samples and we are NACK'ing fragments, skip the ACKNACK submessage if we
// have no interest in a HEARTBEAT and the writer hasn't asked for an ACKNACK since the last one we sent.
result = AANR_NACKFRAG_ONLY;
}
return result;
}
void sched_acknack_if_needed (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack)
{
// This is the relatively expensive and precise code to determine what the ACKNACK event will do,
// the alternative is to do:
//
// add_AckNack_getsource (pwr, rwn, &reorder, &bitmap_base, &notail);
// const seqno_t last_seq = rwn->filtered ? rwn->last_seq : pwr->last_seq;
// if (bitmap_base <= last_seq)
// (void) resched_xevent_if_earlier (ev, tnow);
// else if (!(rwn->heartbeat_since_ack && rwn->ack_requested))
// ; // writer didn't ask for it
// else if (!(bitmap_base > rwn->last_nack.seq_base || ackdelay_passed))
// ; // no progress since last, not enough time passed
// else
// (void) resched_xevent_if_earlier (ev, tnow);
//
// which is a stripped-down version of the same logic that more aggressively schedules the event,
// relying on the event handler to suppress unnecessary messages. There doesn't seem to be a big
// downside to being precise.
struct ddsi_domaingv * const gv = pwr->e.gv;
const bool ackdelay_passed = (tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_ack, gv->config.ack_delay).v);
const bool nackdelay_passed = (tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay).v);
struct add_AckNack_info info;
struct last_nack_summary nack_summary;
const enum add_AckNack_result aanr =
get_AckNack_info (pwr, rwn, &nack_summary, &info, ackdelay_passed, nackdelay_passed);
if (aanr == AANR_SUPPRESSED_ACK)
; // nothing to be done now
else if (avoid_suppressed_nack && aanr == AANR_SUPPRESSED_NACK)
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay));
else
(void) resched_xevent_if_earlier (ev, tnow);
}
struct nn_xmsg *make_and_resched_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow, bool avoid_suppressed_nack)
{
struct ddsi_domaingv * const gv = pwr->e.gv;
struct nn_xmsg *msg;
struct add_AckNack_info info;
struct last_nack_summary nack_summary;
const enum add_AckNack_result aanr =
get_AckNack_info (pwr, rwn, &nack_summary, &info,
tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_ack, gv->config.ack_delay).v,
tnow.v >= ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay).v);
if (aanr == AANR_SUPPRESSED_ACK)
return NULL;
else if (avoid_suppressed_nack && aanr == AANR_SUPPRESSED_NACK)
{
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay));
return NULL;
}
// Committing to sending a message in response: update the state. Note that there's still a
// possibility of not sending a message, but that is only in case of failures of some sort.
// Resetting the flags and bailing out simply means we will wait until the next heartbeat to
// do try again.
rwn->directed_heartbeat = 0;
rwn->heartbeat_since_ack = 0;
rwn->heartbeatfrag_since_ack = 0;
rwn->nack_sent_on_nackdelay = (info.nack_sent_on_nackdelay ? 1 : 0);
struct participant *pp = NULL;
if (q_omg_proxy_participant_is_secure (pwr->c.proxypp))
{
struct reader *rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &rwn->rd_guid);
if (rd)
pp = rd->c.pp;
}
if ((msg = nn_xmsg_new (gv->xmsgpool, &rwn->rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL)
{
return NULL;
}
nn_xmsg_setdstPWR (msg, pwr);
if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v)
{
// If HB->ACK latency measurement is enabled, and we have a
// timestamp available, add it and clear the time stamp. There
// is no real guarantee that the two match, but I haven't got a
// solution for that yet ... If adding the time stamp fails,
// too bad, but no reason to get worried. */
nn_xmsg_add_timestamp (msg, rwn->hb_timestamp);
rwn->hb_timestamp.v = 0;
}
if (aanr != AANR_NACKFRAG_ONLY)
add_AckNack (msg, pwr, rwn, &info);
if (info.nackfrag.seq > 0)
{
ETRACE (pwr, " + ");
add_NackFrag (msg, pwr, rwn, &info);
}
ETRACE (pwr, "\n");
if (nn_xmsg_size (msg) == 0)
{
// attempt at encoding the message caused it to be dropped
nn_xmsg_free (msg);
return NULL;
}
rwn->count++;
switch (aanr)
{
case AANR_SUPPRESSED_ACK:
// no message: caught by the size = 0 check
assert (0);
break;
case AANR_ACK:
rwn->ack_requested = 0;
rwn->t_last_ack = tnow;
rwn->last_nack.seq_base = nack_summary.seq_base;
break;
case AANR_NACK:
case AANR_NACKFRAG_ONLY:
if (nack_summary.frag_end_p1 != 0)
pwr->nackfragcount++;
if (aanr != AANR_NACKFRAG_ONLY)
{
rwn->ack_requested = 0;
rwn->t_last_ack = tnow;
}
rwn->last_nack = nack_summary;
rwn->t_last_nack = tnow;
/* If NACKing, make sure we don't give up too soon: even though
we're not allowed to send an ACKNACK unless in response to a
HEARTBEAT, I've seen too many cases of not sending an NACK
because the writing side got confused ... Better to recover
eventually. */
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, gv->config.auto_resched_nack_delay));
break;
case AANR_SUPPRESSED_NACK:
rwn->ack_requested = 0;
rwn->t_last_ack = tnow;
rwn->last_nack.seq_base = nack_summary.seq_base;
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_nack, gv->config.nack_delay));
break;
}
GVTRACE ("send acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT")\n", PGUID (rwn->rd_guid), PGUID (pwr->e.guid));
return msg;
}

View file

@ -3390,7 +3390,7 @@ bool decode_DataFrag (const struct ddsi_domaingv *gv, struct nn_rsample_info *sa
return decode_payload (gv, sampleinfo, payloadp, &payloadsz, submsg_len); return decode_payload (gv, sampleinfo, payloadp, &payloadsz, submsg_len);
} }
void encode_datareader_submsg (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, struct proxy_writer *pwr, const struct ddsi_guid *rd_guid) void encode_datareader_submsg (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, const struct proxy_writer *pwr, const struct ddsi_guid *rd_guid)
{ {
/* FIXME: avoid this lookup */ /* FIXME: avoid this lookup */
struct reader * const rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, rd_guid); struct reader * const rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, rd_guid);
@ -3964,7 +3964,7 @@ extern inline bool decode_DataFrag(
extern inline void encode_datareader_submsg( extern inline void encode_datareader_submsg(
UNUSED_ARG(struct nn_xmsg *msg), UNUSED_ARG(struct nn_xmsg *msg),
UNUSED_ARG(struct nn_xmsg_marker sm_marker), UNUSED_ARG(struct nn_xmsg_marker sm_marker),
UNUSED_ARG(struct proxy_writer *pwr), UNUSED_ARG(const struct proxy_writer *pwr),
UNUSED_ARG(const struct ddsi_guid *rd_guid)); UNUSED_ARG(const struct ddsi_guid *rd_guid));
extern inline void encode_datawriter_submsg( extern inline void encode_datawriter_submsg(

View file

@ -275,8 +275,8 @@ static int print_proxy_participants (struct thread_state1 * const ts1, struct dd
x += cpf (conn, " last_seq %"PRId64" last_fragnum %"PRIu32"\n", w->last_seq, w->last_fragnum); x += cpf (conn, " last_seq %"PRId64" last_fragnum %"PRIu32"\n", w->last_seq, w->last_fragnum);
for (m = ddsrt_avl_iter_first (&wr_readers_treedef, &w->readers, &rdit); m; m = ddsrt_avl_iter_next (&rdit)) for (m = ddsrt_avl_iter_first (&wr_readers_treedef, &w->readers, &rdit); m; m = ddsrt_avl_iter_next (&rdit))
{ {
x += cpf (conn, " rd "PGUIDFMT" (nack %"PRId64" %"PRId64")\n", x += cpf (conn, " rd "PGUIDFMT" (nack %"PRId64" frag %"PRIu32" %"PRId64")\n",
PGUID (m->rd_guid), m->seq_last_nack, m->t_last_nack.v); PGUID (m->rd_guid), m->last_nack.seq_end_p1, m->last_nack.frag_end_p1, m->t_last_nack.v);
switch (m->in_sync) switch (m->in_sync)
{ {
case PRMSS_SYNC: case PRMSS_SYNC:

View file

@ -2528,9 +2528,18 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
m->hb_timestamp.v = 0; m->hb_timestamp.v = 0;
m->t_heartbeat_accepted.v = 0; m->t_heartbeat_accepted.v = 0;
m->t_last_nack.v = 0; m->t_last_nack.v = 0;
m->seq_last_nack = 0; m->t_last_ack.v = 0;
m->last_nack.seq_end_p1 = 0;
m->last_nack.seq_base = 0;
m->last_nack.frag_end_p1 = 0;
m->last_nack.frag_base = 0;
m->last_seq = 0; m->last_seq = 0;
m->filtered = 0; m->filtered = 0;
m->ack_requested = 0;
m->heartbeat_since_ack = 0;
m->heartbeatfrag_since_ack = 0;
m->directed_heartbeat = 0;
m->nack_sent_on_nackdelay = 0;
#ifdef DDSI_INCLUDE_SECURITY #ifdef DDSI_INCLUDE_SECURITY
m->crypto_handle = crypto_handle; m->crypto_handle = crypto_handle;
@ -3411,7 +3420,7 @@ static void augment_wr_prd_match (void *vnode, const void *vleft, const void *vr
/* seq < max cannot be true for a best-effort reader or a demoted */ /* seq < max cannot be true for a best-effort reader or a demoted */
n->arbitrary_unacked_reader = n->prd_guid; n->arbitrary_unacked_reader = n->prd_guid;
} }
else if (n->is_reliable && (n->seq == MAX_SEQ_NUMBER || !n->has_replied_to_hb)) else if (n->is_reliable && (n->seq == MAX_SEQ_NUMBER || n->seq == 0 || !n->has_replied_to_hb))
{ {
/* demoted readers and reliable readers that have not yet replied to a heartbeat are candidates */ /* demoted readers and reliable readers that have not yet replied to a heartbeat are candidates */
n->arbitrary_unacked_reader = n->prd_guid; n->arbitrary_unacked_reader = n->prd_guid;
@ -5479,7 +5488,6 @@ int new_proxy_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid,
pwr->last_seq = 0; pwr->last_seq = 0;
pwr->last_fragnum = UINT32_MAX; pwr->last_fragnum = UINT32_MAX;
pwr->nackfragcount = 1; pwr->nackfragcount = 1;
pwr->last_fragnum_reset = 0;
pwr->alive = 1; pwr->alive = 1;
pwr->alive_vclock = 0; pwr->alive_vclock = 0;
pwr->filtered = 0; pwr->filtered = 0;

View file

@ -1443,7 +1443,7 @@ void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1)
defrag->max_sample = ddsrt_avl_find_max (&defrag_sampletree_treedef, &defrag->sampletree); defrag->max_sample = ddsrt_avl_find_max (&defrag_sampletree_treedef, &defrag->sampletree);
} }
int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz) enum nn_defrag_nackmap_result nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz)
{ {
struct nn_rsample *s; struct nn_rsample *s;
struct nn_defrag_iv *iv; struct nn_defrag_iv *iv;
@ -1455,7 +1455,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu
if (maxfragnum == UINT32_MAX) if (maxfragnum == UINT32_MAX)
{ {
/* If neither the caller nor the defragmenter knows anything about the sample, say so */ /* If neither the caller nor the defragmenter knows anything about the sample, say so */
return -1; return DEFRAG_NACKMAP_UNKNOWN_SAMPLE;
} }
else else
{ {
@ -1468,7 +1468,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu
map->numbits = maxfragnum + 1; map->numbits = maxfragnum + 1;
map->bitmap_base = 0; map->bitmap_base = 0;
nn_bitset_one (map->numbits, mapbits); nn_bitset_one (map->numbits, mapbits);
return (int) map->numbits; return DEFRAG_NACKMAP_FRAGMENTS_MISSING;
} }
} }
@ -1505,7 +1505,9 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu
/* if all data is available, iv == liv and map_end < /* if all data is available, iv == liv and map_end <
map->bitmap_base, but there is nothing to request in that map->bitmap_base, but there is nothing to request in that
case. */ case. */
map->numbits = (map_end < map->bitmap_base) ? 0 : map_end - map->bitmap_base + 1; if (map_end < map->bitmap_base)
return DEFRAG_NACKMAP_ALL_ADVERTISED_FRAGMENTS_KNOWN;
map->numbits = map_end - map->bitmap_base + 1;
iv = ddsrt_avl_find_succ (&rsample_defrag_fragtree_treedef, &s->u.defrag.fragtree, iv); iv = ddsrt_avl_find_succ (&rsample_defrag_fragtree_treedef, &s->u.defrag.fragtree, iv);
} }
@ -1544,7 +1546,7 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu
unsigned x = (unsigned) (i - map->bitmap_base); unsigned x = (unsigned) (i - map->bitmap_base);
nn_bitset_set (map->numbits, mapbits, x); nn_bitset_set (map->numbits, mapbits, x);
} }
return (int) map->numbits; return DEFRAG_NACKMAP_FRAGMENTS_MISSING;
} }
/* There is only one defrag per proxy writer. However for the Volatile Secure writer a filter /* There is only one defrag per proxy writer. However for the Volatile Secure writer a filter
@ -2308,7 +2310,7 @@ nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reord
} }
} }
int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq) int nn_reorder_wantsample (const struct nn_reorder *reorder, seqno_t seq)
{ {
struct nn_rsample *s; struct nn_rsample *s;
if (seq < reorder->next_seq) if (seq < reorder->next_seq)
@ -2320,7 +2322,7 @@ int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq)
return (s == NULL || s->u.reorder.maxp1 <= seq); return (s == NULL || s->u.reorder.maxp1 <= seq);
} }
unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail) unsigned nn_reorder_nackmap (const struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail)
{ {
struct nn_rsample *iv; struct nn_rsample *iv;
seqno_t i; seqno_t i;

View file

@ -55,6 +55,7 @@
#include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h" /* FIXME: get rid of this */ #include "dds/ddsi/ddsi_serdata_default.h" /* FIXME: get rid of this */
#include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/ddsi_security_omg.h"
#include "dds/ddsi/ddsi_acknack.h"
#include "dds/ddsi/sysdeps.h" #include "dds/ddsi/sysdeps.h"
#include "dds__whc.h" #include "dds__whc.h"
@ -576,9 +577,13 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *prev_co
the consequence undefined behaviour). Serial number arithmetic the consequence undefined behaviour). Serial number arithmetic
deals with the wrap-around after 2**31-1. deals with the wrap-around after 2**31-1.
Cyclone pre-emptive heartbeats have "count" bitmap_base = 1, NACK
nothing, have count set to 0. They're never sent more often than
once per second, so the 500ms timeout allows them to pass through.
This combined procedure should give the best of all worlds, and This combined procedure should give the best of all worlds, and
is not more expensive in the common case. */ is not more expensive in the common case. */
const int64_t timeout = DDS_SECS (2); const int64_t timeout = DDS_MSECS (500);
if ((int32_t) (new_count - *prev_count) <= 0 && tnow.v - t_last_accepted->v < timeout && !force_accept) if ((int32_t) (new_count - *prev_count) <= 0 && tnow.v - t_last_accepted->v < timeout && !force_accept)
return 0; return 0;
@ -652,6 +657,7 @@ struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader *
struct defer_hb_state { struct defer_hb_state {
struct nn_xmsg *m; struct nn_xmsg *m;
struct xeventq *evq; struct xeventq *evq;
int hbansreq;
uint64_t wr_iid; uint64_t wr_iid;
uint64_t prd_iid; uint64_t prd_iid;
}; };
@ -663,8 +669,16 @@ static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state *
if (defer_hb_state->m != NULL) if (defer_hb_state->m != NULL)
{ {
if (wr->e.iid == defer_hb_state->wr_iid && prd->e.iid == defer_hb_state->prd_iid) if (wr->e.iid == defer_hb_state->wr_iid && prd->e.iid == defer_hb_state->prd_iid)
return; {
qxev_msg (wr->evq, defer_hb_state->m); if (hbansreq <= defer_hb_state->hbansreq)
return;
else
nn_xmsg_free (defer_hb_state->m);
}
else
{
qxev_msg (wr->evq, defer_hb_state->m);
}
} }
ASSERT_MUTEX_HELD (&wr->e.lock); ASSERT_MUTEX_HELD (&wr->e.lock);
@ -674,6 +688,7 @@ static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state *
nn_xmsg_setdstPRD (defer_hb_state->m, prd); nn_xmsg_setdstPRD (defer_hb_state->m, prd);
add_Heartbeat (defer_hb_state->m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0); add_Heartbeat (defer_hb_state->m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0);
defer_hb_state->evq = wr->evq; defer_hb_state->evq = wr->evq;
defer_hb_state->hbansreq = hbansreq;
defer_hb_state->wr_iid = wr->e.iid; defer_hb_state->wr_iid = wr->e.iid;
defer_hb_state->prd_iid = prd->e.iid; defer_hb_state->prd_iid = prd->e.iid;
} }
@ -782,7 +797,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
relevant to setting "has_replied_to_hb" and "assumed_in_sync". */ relevant to setting "has_replied_to_hb" and "assumed_in_sync". */
is_pure_ack = !acknack_is_nack (msg); is_pure_ack = !acknack_is_nack (msg);
is_pure_nonhist_ack = is_pure_ack && seqbase - 1 >= rn->seq; is_pure_nonhist_ack = is_pure_ack && seqbase - 1 >= rn->seq;
is_preemptive_ack = seqbase <= 1 && is_pure_ack; is_preemptive_ack = seqbase < 1 || (seqbase == 1 && *countp == 0);
wr->num_acks_received++; wr->num_acks_received++;
if (!is_pure_ack) if (!is_pure_ack)
@ -864,7 +879,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
msgs_sent = 0; msgs_sent = 0;
msgs_lost = 0; msgs_lost = 0;
max_seq_in_reply = 0; max_seq_in_reply = 0;
if (!rn->has_replied_to_hb && seqbase > 1 && is_pure_nonhist_ack) if (!rn->has_replied_to_hb && is_pure_nonhist_ack)
{ {
RSTTRACE (" setting-has-replied-to-hb"); RSTTRACE (" setting-has-replied-to-hb");
rn->has_replied_to_hb = 1; rn->has_replied_to_hb = 1;
@ -1125,6 +1140,7 @@ struct handle_Heartbeat_helper_arg {
ddsrt_wctime_t timestamp; ddsrt_wctime_t timestamp;
ddsrt_etime_t tnow; ddsrt_etime_t tnow;
ddsrt_mtime_t tnow_mt; ddsrt_mtime_t tnow_mt;
bool directed_heartbeat;
}; };
static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct handle_Heartbeat_helper_arg * const arg) static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct handle_Heartbeat_helper_arg * const arg)
@ -1132,62 +1148,38 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand
struct receiver_state * const rst = arg->rst; struct receiver_state * const rst = arg->rst;
Heartbeat_t const * const msg = arg->msg; Heartbeat_t const * const msg = arg->msg;
struct proxy_writer * const pwr = arg->pwr; struct proxy_writer * const pwr = arg->pwr;
seqno_t refseq, last_seq;
ASSERT_MUTEX_HELD (&pwr->e.lock); ASSERT_MUTEX_HELD (&pwr->e.lock);
/* Not supposed to respond to repeats and old heartbeats. */ if (wn->acknack_xevent == NULL)
{
// Ignore best-effort readers
return;
}
if (!accept_ack_or_hb_w_timeout (msg->count, &wn->prev_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) if (!accept_ack_or_hb_w_timeout (msg->count, &wn->prev_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0))
{ {
RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid)); RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid));
return; return;
} }
/* Reference sequence number for determining whether or not to if (rst->gv->logconfig.c.mask & DDS_LC_TRACE)
Ack/Nack unfortunately depends on whether the reader is in
sync. */
if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered)
refseq = nn_reorder_next_seq (pwr->reorder) - 1;
else
refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1;
RSTTRACE (" "PGUIDFMT"@%"PRId64"%s", PGUID (wn->rd_guid), refseq, (wn->in_sync == PRMSS_SYNC) ? "(sync)" : (wn->in_sync == PRMSS_TLCATCHUP) ? "(tlcatchup)" : "");
/* Reschedule AckNack transmit if deemed appropriate; unreliable
readers have acknack_xevent == NULL and can't do this.
There is no real need to send a nack from each reader that is in
sync -- indeed, we could simply ignore the destination address in
the messages we receive and only ever nack each sequence number
once, regardless of which readers care about it. */
if (wn->acknack_xevent)
{ {
ddsrt_mtime_t tsched = DDSRT_MTIME_NEVER; seqno_t refseq;
if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered)
if (wn->filtered) refseq = nn_reorder_next_seq (pwr->reorder);
last_seq = wn->last_seq;
else else
last_seq = pwr->last_seq; refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder);
if (last_seq > refseq) RSTTRACE (" "PGUIDFMT"@%"PRId64"%s", PGUID (wn->rd_guid), refseq - 1, (wn->in_sync == PRMSS_SYNC) ? "(sync)" : (wn->in_sync == PRMSS_TLCATCHUP) ? "(tlcatchup)" : "");
{
RSTTRACE ("/NAK");
if (arg->tnow_mt.v >= wn->t_last_nack.v + rst->gv->config.nack_delay || refseq >= wn->seq_last_nack)
tsched = arg->tnow_mt;
else
{
tsched.v = arg->tnow_mt.v + rst->gv->config.nack_delay;
RSTTRACE ("d");
}
}
else if (!(msg->smhdr.flags & HEARTBEAT_FLAG_FINAL))
{
tsched = arg->tnow_mt;
}
if (resched_xevent_if_earlier (wn->acknack_xevent, tsched))
{
if (rst->gv->config.meas_hb_to_ack_latency && arg->timestamp.v)
wn->hb_timestamp = arg->timestamp;
}
} }
wn->heartbeat_since_ack = 1;
if (!(msg->smhdr.flags & HEARTBEAT_FLAG_FINAL))
wn->ack_requested = 1;
if (arg->directed_heartbeat)
wn->directed_heartbeat = 1;
sched_acknack_if_needed (wn->acknack_xevent, pwr, wn, arg->tnow_mt, true);
} }
static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid) static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid)
@ -1276,18 +1268,11 @@ static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, str
if (lastseq > pwr->last_seq) if (lastseq > pwr->last_seq)
{ {
pwr->last_seq = lastseq; pwr->last_seq = lastseq;
pwr->last_fragnum = ~0u; pwr->last_fragnum = UINT32_MAX;
pwr->last_fragnum_reset = 0;
} }
else if (pwr->last_fragnum != ~0u && lastseq == pwr->last_seq) else if (pwr->last_fragnum != UINT32_MAX && lastseq == pwr->last_seq)
{ {
if (!pwr->last_fragnum_reset) pwr->last_fragnum = UINT32_MAX;
pwr->last_fragnum_reset = 1;
else
{
pwr->last_fragnum = ~0u;
pwr->last_fragnum_reset = 0;
}
} }
nn_defrag_notegap (pwr->defrag, 1, firstseq); nn_defrag_notegap (pwr->defrag, 1, firstseq);
@ -1375,6 +1360,7 @@ static int handle_Heartbeat (struct receiver_state *rst, ddsrt_etime_t tnow, str
arg.timestamp = timestamp; arg.timestamp = timestamp;
arg.tnow = tnow; arg.tnow = tnow;
arg.tnow_mt = ddsrt_time_monotonic (); arg.tnow_mt = ddsrt_time_monotonic ();
arg.directed_heartbeat = (dst.entityid.u != NN_ENTITYID_UNKNOWN && vendor_is_eclipse (rst->vendor));
handle_forall_destinations (&dst, pwr, (ddsrt_avl_walk_t) handle_Heartbeat_helper, &arg); handle_forall_destinations (&dst, pwr, (ddsrt_avl_walk_t) handle_Heartbeat_helper, &arg);
RSTTRACE (")"); RSTTRACE (")");
@ -1394,6 +1380,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et
src.entityid = msg->writerId; src.entityid = msg->writerId;
dst.prefix = rst->dst_guid_prefix; dst.prefix = rst->dst_guid_prefix;
dst.entityid = msg->readerId; dst.entityid = msg->readerId;
const bool directed_heartbeat = (dst.entityid.u != NN_ENTITYID_UNKNOWN && vendor_is_eclipse (rst->vendor));
RSTTRACE ("HEARTBEATFRAG(#%"PRId32":%"PRId64"/[1,%u]", msg->count, seq, fragnum+1); RSTTRACE ("HEARTBEATFRAG(#%"PRId32":%"PRId64"/[1,%u]", msg->count, seq, fragnum+1);
if (!rst->forme) if (!rst->forme)
@ -1424,12 +1411,16 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et
{ {
pwr->last_seq = seq; pwr->last_seq = seq;
pwr->last_fragnum = fragnum; pwr->last_fragnum = fragnum;
pwr->last_fragnum_reset = 0;
} }
else if (seq == pwr->last_seq && fragnum > pwr->last_fragnum) else if (seq == pwr->last_seq && fragnum > pwr->last_fragnum)
{ {
pwr->last_fragnum = fragnum; pwr->last_fragnum = fragnum;
pwr->last_fragnum_reset = 0; }
if (!pwr->have_seen_heartbeat)
{
ddsrt_mutex_unlock(&pwr->e.lock);
return 1;
} }
/* Defragmenting happens at the proxy writer, readers have nothing /* Defragmenting happens at the proxy writer, readers have nothing
@ -1445,33 +1436,59 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et
if (nn_reorder_wantsample (pwr->reorder, seq)) if (nn_reorder_wantsample (pwr->reorder, seq))
{ {
/* Pick an arbitrary reliable reader's guid for the response -- if (directed_heartbeat)
assuming a reliable writer -> unreliable reader is rare, and
so scanning the readers is acceptable if the first guess
fails */
m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers);
if (m->acknack_xevent == NULL)
{ {
m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); /* Cyclone currently only ever sends a HEARTBEAT(FRAG) with the
while (m && m->acknack_xevent == NULL) destination entity id set AFTER retransmitting any samples
m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m); that reader requested. So it makes sense to only interpret
those for that reader, and to suppress the NackDelay in a
response to it. But it better be a reliable reader! */
m = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst);
if (m && m->acknack_xevent == NULL)
m = NULL;
}
else
{
/* Pick an arbitrary reliable reader's guid for the response --
assuming a reliable writer -> unreliable reader is rare, and
so scanning the readers is acceptable if the first guess
fails */
m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers);
if (m->acknack_xevent == NULL)
{
m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers);
while (m && m->acknack_xevent == NULL)
m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m);
}
} }
} }
else if (seq < nn_reorder_next_seq (pwr->reorder)) else if (seq < nn_reorder_next_seq (pwr->reorder))
{ {
/* Check out-of-sync readers -- should add a bit to cheaply test if (directed_heartbeat)
whether there are any (usually there aren't) */
m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers);
while (m)
{ {
if ((m->in_sync == PRMSS_OUT_OF_SYNC) && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq)) m = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &dst);
if (m && !(m->in_sync == PRMSS_OUT_OF_SYNC && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq)))
{ {
/* If reader is out-of-sync, and reader is realiable, and /* Ignore if reader is happy or not best-effort */
m = NULL;
}
}
else
{
/* Check out-of-sync readers -- should add a bit to cheaply test
whether there are any (usually there aren't) */
m = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers);
while (m)
{
if (m->in_sync == PRMSS_OUT_OF_SYNC && m->acknack_xevent != NULL && nn_reorder_wantsample (m->u.not_in_sync.reorder, seq))
{
/* If reader is out-of-sync, and reader is realiable, and
reader still wants this particular sample, then use this reader still wants this particular sample, then use this
reader to decide which fragments to nack */ reader to decide which fragments to nack */
break; break;
}
m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m);
} }
m = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, m);
} }
} }
@ -1479,19 +1496,20 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et
RSTTRACE (" no interested reliable readers"); RSTTRACE (" no interested reliable readers");
else else
{ {
/* Check if we are missing something */ if (directed_heartbeat)
m->directed_heartbeat = 1;
m->heartbeatfrag_since_ack = 1;
DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0);
struct { struct {
struct nn_fragment_number_set_header set; struct nn_fragment_number_set_header set;
uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32]; uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32];
} nackfrag; } nackfrag;
if (nn_defrag_nackmap (pwr->defrag, seq, fragnum, &nackfrag.set, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS) > 0) const seqno_t last_seq = m->filtered ? m->last_seq : pwr->last_seq;
if (seq == last_seq && nn_defrag_nackmap (pwr->defrag, seq, fragnum, &nackfrag.set, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_MAX_BITS) == DEFRAG_NACKMAP_FRAGMENTS_MISSING)
{ {
/* Yes we are (note that this potentially also happens for // don't rush it ...
samples we no longer care about) */ resched_xevent_if_earlier (m->acknack_xevent, ddsrt_mtime_add_duration (ddsrt_time_monotonic (), pwr->e.gv->config.nack_delay));
int64_t delay = rst->gv->config.nack_delay;
RSTTRACE ("/nackfrag");
(void) resched_xevent_if_earlier (m->acknack_xevent, ddsrt_mtime_add_duration (ddsrt_time_monotonic(), delay));
} }
} }
} }
@ -1579,6 +1597,7 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
const uint32_t base = msg->fragmentNumberState.bitmap_base - 1; const uint32_t base = msg->fragmentNumberState.bitmap_base - 1;
assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX); assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX);
uint32_t nfrags_lim = (wr->rexmit_burst_size_limit + wr->e.gv->config.fragment_size - 1) / wr->e.gv->config.fragment_size; uint32_t nfrags_lim = (wr->rexmit_burst_size_limit + wr->e.gv->config.fragment_size - 1) / wr->e.gv->config.fragment_size;
bool sent = false;
RSTTRACE (" scheduling requested frags ...\n"); RSTTRACE (" scheduling requested frags ...\n");
for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && nfrags_lim > 0; i++) for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && nfrags_lim > 0; i++)
{ {
@ -1590,9 +1609,17 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
else if (!qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0)) else if (!qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0))
nfrags_lim = 0; nfrags_lim = 0;
else else
{
sent = true;
nfrags_lim--; nfrags_lim--;
}
} }
} }
if (sent && sample.unacked)
{
if (!wr->retransmitting)
writer_set_retransmitting (wr);
}
whc_return_sample (wr->whc, &sample, false); whc_return_sample (wr->whc, &sample, false);
} }
else else
@ -1867,8 +1894,7 @@ static int handle_Gap (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn
if (listbase + last_included_rel > pwr->last_seq) if (listbase + last_included_rel > pwr->last_seq)
{ {
pwr->last_seq = listbase + last_included_rel; pwr->last_seq = listbase + last_included_rel;
pwr->last_fragnum = ~0u; pwr->last_fragnum = UINT32_MAX;
pwr->last_fragnum_reset = 0;
} }
if (wn && wn->filtered) if (wn && wn->filtered)
@ -2220,7 +2246,7 @@ static void clean_defrag (struct proxy_writer *pwr)
} }
static void handle_regular (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, static void handle_regular (struct receiver_state *rst, ddsrt_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo,
uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease) uint32_t max_fragnum_in_msg, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup, bool renew_manbypp_lease)
{ {
struct proxy_writer *pwr; struct proxy_writer *pwr;
struct nn_rsample *rsample; struct nn_rsample *rsample;
@ -2293,13 +2319,11 @@ static void handle_regular (struct receiver_state *rst, ddsrt_etime_t tnow, stru
if (sampleinfo->seq > pwr->last_seq) if (sampleinfo->seq > pwr->last_seq)
{ {
pwr->last_seq = sampleinfo->seq; pwr->last_seq = sampleinfo->seq;
pwr->last_fragnum = fragnum; pwr->last_fragnum = max_fragnum_in_msg;
pwr->last_fragnum_reset = 0;
} }
else if (sampleinfo->seq == pwr->last_seq && fragnum > pwr->last_fragnum) else if (sampleinfo->seq == pwr->last_seq && max_fragnum_in_msg > pwr->last_fragnum)
{ {
pwr->last_fragnum = fragnum; pwr->last_fragnum = max_fragnum_in_msg;
pwr->last_fragnum_reset = 0;
} }
clean_defrag (pwr); clean_defrag (pwr);
@ -2571,12 +2595,12 @@ static int handle_Data (struct receiver_state *rst, ddsrt_etime_t tnow, struct n
renew_manbypp_lease = false; renew_manbypp_lease = false;
/* fall through */ /* fall through */
default: default:
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, renew_manbypp_lease); handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, UINT32_MAX, rdata, deferred_wakeup, renew_manbypp_lease);
} }
} }
else else
{ {
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup, true); handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, UINT32_MAX, rdata, deferred_wakeup, true);
} }
} }
RSTTRACE (")"); RSTTRACE (")");

View file

@ -167,8 +167,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
} }
else else
{ {
const int32_t n_unacked = wr->num_reliable_readers - root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max; const uint32_t n_unacked = wr->num_reliable_readers - root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max;
assert (n_unacked >= 0);
if (n_unacked == 0) if (n_unacked == 0)
prd_guid = NULL; prd_guid = NULL;
else else
@ -188,7 +187,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
ETRACE (wr, "unicasting to prd "PGUIDFMT" ", PGUID (*prd_guid)); ETRACE (wr, "unicasting to prd "PGUIDFMT" ", PGUID (*prd_guid));
ETRACE (wr, "(rel-prd %"PRId32" seq-eq-max %"PRId32" seq %"PRId64" maxseq %"PRId64")\n", ETRACE (wr, "(rel-prd %"PRId32" seq-eq-max %"PRId32" seq %"PRId64" maxseq %"PRId64")\n",
wr->num_reliable_readers, wr->num_reliable_readers,
ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, ddsrt_avl_is_empty (&wr->readers) ? -1 : (int32_t) root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max,
wr->seq, wr->seq,
ddsrt_avl_is_empty (&wr->readers) ? (seqno_t) -1 : root_rdmatch (wr)->max_seq); ddsrt_avl_is_empty (&wr->readers) ? (seqno_t) -1 : root_rdmatch (wr)->max_seq);
@ -215,7 +214,9 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id); nn_xmsg_setencoderid (msg, wr->partition_id);
#endif #endif
add_Heartbeat (msg, wr, whcst, hbansreq, 0, prd_guid->entityid, issync); // send to all readers in the participant: whether or not the entityid is set affects
// the retransmit requests
add_Heartbeat (msg, wr, whcst, hbansreq, 0, to_entityid (NN_ENTITYID_UNKNOWN), issync);
} }
/* It is possible that the encoding removed the submessage(s). */ /* It is possible that the encoding removed the submessage(s). */
@ -335,7 +336,7 @@ struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state *
ETRACE (wr, "writer_hbcontrol_p2p: wr "PGUIDFMT" unicasting to prd "PGUIDFMT" ", PGUID (wr->e.guid), PGUID (prd->e.guid)); ETRACE (wr, "writer_hbcontrol_p2p: wr "PGUIDFMT" unicasting to prd "PGUIDFMT" ", PGUID (wr->e.guid), PGUID (prd->e.guid));
ETRACE (wr, "(rel-prd %d seq-eq-max %d seq %"PRId64" maxseq %"PRId64")\n", ETRACE (wr, "(rel-prd %d seq-eq-max %d seq %"PRId64" maxseq %"PRId64")\n",
wr->num_reliable_readers, wr->num_reliable_readers,
ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, ddsrt_avl_is_empty (&wr->readers) ? -1 : (int32_t) root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max,
wr->seq, wr->seq,
ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : root_rdmatch (wr)->max_seq); ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : root_rdmatch (wr)->max_seq);

View file

@ -42,6 +42,7 @@
#include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/ddsi_security_omg.h"
#include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_pmd.h" #include "dds/ddsi/ddsi_pmd.h"
#include "dds/ddsi/ddsi_acknack.h"
#include "dds__whc.h" #include "dds__whc.h"
#include "dds/ddsi/sysdeps.h" #include "dds/ddsi/sysdeps.h"
@ -776,220 +777,67 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, ddsrt
} }
} }
static seqno_t next_deliv_seq (const struct proxy_writer *pwr, const seqno_t next_seq) static dds_duration_t preemptive_acknack_interval (const struct pwr_rd_match *rwn)
{ {
/* We want to determine next_deliv_seq, the next sequence number to if (rwn->t_last_ack.v < rwn->tcreate.v)
be delivered to all in-sync readers, so that we can acknowledge return 0;
what we have actually delivered. This is different from next_seq else
tracks, which tracks the sequence number up to which all samples {
have been received. The difference is the delivery queue. const dds_duration_t age = rwn->t_last_ack.v - rwn->tcreate.v;
if (age <= DDS_SECS (10))
There is always but a single delivery queue, and hence delivery return DDS_SECS (1);
thread, associated with a single proxy writer; but the ACKs are else if (age <= DDS_SECS (60))
always generated by another thread. Therefore, updates to return DDS_SECS (2);
next_deliv_seq need only be atomic with respect to these reads. else if (age <= DDS_SECS (120))
On all supported platforms we can atomically load and store 32 return DDS_SECS (5);
bits without issue, and so we store just the low word of the else
sequence number. return DDS_SECS (10);
}
We know 1 <= next_deliv_seq AND next_seq - N <= next_deliv_seq <=
next_seq for N << 2**32. With n = next_seq, nd = next_deliv_seq,
H the upper half and L the lower half:
- H(nd) <= H(n) <= H(nd)+1 { n >= nd AND N << 2*32}
- H(n) = H(nd) => L(n) >= L(nd) { n >= nd }
- H(n) = H(nd)+1 => L(n) < L(nd) { N << 2*32 }
Therefore:
L(n) < L(nd) <=> H(n) = H(nd+1)
a.k.a.:
nd = nd' - if nd' > nd then 2**32 else 0
where nd' = 2**32 * H(n) + L(nd)
By not locking next_deliv_seq, we may have nd a bit lower than it
could be, but that only means we are acknowledging slightly less
than we could; but that is perfectly acceptible.
FIXME: next_seq - #dqueue could probably be used instead,
provided #dqueue is decremented after delivery, rather than
before delivery. */
const uint32_t lw = ddsrt_atomic_ld32 (&pwr->next_deliv_seq_lowword);
seqno_t next_deliv_seq;
next_deliv_seq = (next_seq & ~(seqno_t) UINT32_MAX) | lw;
if (next_deliv_seq > next_seq)
next_deliv_seq -= ((seqno_t) 1) << 32;
assert (0 < next_deliv_seq && next_deliv_seq <= next_seq);
return next_deliv_seq;
} }
static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct pwr_rd_match *rwn, seqno_t *nack_seq) static struct nn_xmsg *make_preemptive_acknack (struct xevent *ev, struct proxy_writer *pwr, struct pwr_rd_match *rwn, ddsrt_mtime_t tnow)
{ {
/* If pwr->have_seen_heartbeat == 0, no heartbeat has been received const dds_duration_t intv = preemptive_acknack_interval (rwn);
by this proxy writer yet, so we'll be sending a pre-emptive if (tnow.v < ddsrt_mtime_add_duration (rwn->t_last_ack, intv).v)
AckNack. NACKing data now will most likely cause another NACK {
upon reception of the first heartbeat, and so cause the data to (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_ack, intv));
be resent twice. */ return NULL;
const unsigned max_numbits = 256; /* as spec'd */ }
int notail = 0; /* all known missing ones are nack'd */
struct nn_reorder *reorder; struct ddsi_domaingv * const gv = pwr->e.gv;
AckNack_t *an; struct participant *pp = NULL;
if (q_omg_proxy_participant_is_secure (pwr->c.proxypp))
{
struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, &rwn->rd_guid);
if (rd)
pp = rd->c.pp;
}
struct nn_xmsg *msg;
if ((msg = nn_xmsg_new (gv->xmsgpool, &rwn->rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL)
{
// if out of memory, try again later
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, DDS_SECS (1)));
return NULL;
}
nn_xmsg_setdstPWR (msg, pwr);
struct nn_xmsg_marker sm_marker; struct nn_xmsg_marker sm_marker;
uint32_t i, numbits; AckNack_t *an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE (0));
seqno_t base, last_seq;
DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0);
struct {
struct nn_fragment_number_set_header set;
uint32_t bits[NN_FRAGMENT_NUMBER_SET_MAX_BITS / 32];
} nackfrag;
int nackfrag_numbits;
seqno_t nackfrag_seq = 0;
seqno_t bitmap_base;
ASSERT_MUTEX_HELD (pwr->e.lock);
/* if in sync, look at proxy writer status, else look at
proxy-writer--reader match status */
if (rwn->in_sync != PRMSS_OUT_OF_SYNC && !rwn->filtered)
{
reorder = pwr->reorder;
if (!pwr->e.gv->config.late_ack_mode)
bitmap_base = nn_reorder_next_seq (reorder);
else
{
bitmap_base = next_deliv_seq (pwr, nn_reorder_next_seq (reorder));
if (nn_dqueue_is_full (pwr->dqueue))
notail = 1;
}
}
else
{
reorder = rwn->u.not_in_sync.reorder;
bitmap_base = nn_reorder_next_seq (reorder);
}
if (rwn->filtered)
last_seq = rwn->last_seq;
else
last_seq = pwr->last_seq;
an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX);
nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK); nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK);
an->readerId = nn_hton_entityid (rwn->rd_guid.entityid); an->readerId = nn_hton_entityid (rwn->rd_guid.entityid);
an->writerId = nn_hton_entityid (pwr->e.guid.entityid); an->writerId = nn_hton_entityid (pwr->e.guid.entityid);
an->readerSNState.bitmap_base = toSN (1);
an->readerSNState.numbits = 0;
nn_count_t * const countp =
(nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (0));
*countp = 0;
nn_xmsg_submsg_setnext (msg, sm_marker);
encode_datareader_submsg (msg, sm_marker, pwr, &rwn->rd_guid);
/* Make bitmap; note that we've made sure to have room for the rwn->t_last_ack = tnow;
maximum bitmap size. */ (void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (rwn->t_last_ack, intv));
numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &an->readerSNState, an->bits, max_numbits, notail); return msg;
base = fromSN (an->readerSNState.bitmap_base);
/* Scan through bitmap, cutting it off at the first missing sample
that the defragmenter knows about. Then note the sequence number
& add a NACKFRAG for that sample */
nackfrag_numbits = -1;
for (i = 0; i < numbits && nackfrag_numbits < 0; i++)
{
uint32_t fragnum;
nackfrag_seq = base + i;
if (!nn_bitset_isset (numbits, an->bits, i))
continue;
if (nackfrag_seq == last_seq)
fragnum = pwr->last_fragnum;
else
fragnum = UINT32_MAX;
nackfrag_numbits = nn_defrag_nackmap (pwr->defrag, nackfrag_seq, fragnum, &nackfrag.set, nackfrag.bits, max_numbits);
}
if (nackfrag_numbits >= 0) {
/* Cut the NACK short, NACKFRAG will be added after the NACK's is
properly formatted */
assert (i > 0);
an->readerSNState.numbits = numbits = i - 1;
}
/* Let caller know whether it is a nack, and, in steady state, set
final to prevent a response if it isn't. The initial
(pre-emptive) acknack is different: it'd be nice to get a
heartbeat in response.
Who cares about an answer to an acknowledgment!? -- actually,
that'd a very useful feature in combination with directed
heartbeats, or somesuch, to get reliability guarantees. */
*nack_seq = (numbits > 0) ? base + numbits : 0;
if (!pwr->have_seen_heartbeat) {
/* We must have seen a heartbeat for us to consider setting FINAL */
} else if (*nack_seq && base + numbits <= last_seq) {
/* If it's a NACK and it doesn't cover samples all the way up to
the highest known sequence number, there's some reason to expect
we may to do another round. For which we need a Heartbeat.
Note: last_seq exists, base is first in bitmap, numbits is
length of bitmap, hence less-than-or-equal. */
} else {
/* An ACK or we think we'll get everything now. */
an->smhdr.flags |= ACKNACK_FLAG_FINAL;
}
{
/* Count field is at a variable offset ... silly DDSI spec. */
nn_count_t *countp =
(nn_count_t *) ((char *) an + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (an->readerSNState.numbits));
*countp = ++rwn->count;
/* Reset submessage size, now that we know the real size, and update
the offset to the next submessage. */
nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits));
nn_xmsg_submsg_setnext (msg, sm_marker);
if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE)
{
ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRIu32":%"PRId64"/%"PRIu32":",
PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count,
base, an->readerSNState.numbits);
for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0');
}
/* Encode the sub-message when needed. */
encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid);
}
if (nackfrag_numbits > 0)
{
NackFrag_t *nf;
/* We use 0-based fragment numbers, but externally have to provide
1-based fragment numbers */
assert ((unsigned) nackfrag_numbits == nackfrag.set.numbits);
nf = nn_xmsg_append (msg, &sm_marker, NACKFRAG_SIZE ((unsigned) nackfrag_numbits));
nn_xmsg_submsg_init (msg, sm_marker, SMID_NACK_FRAG);
nf->readerId = nn_hton_entityid (rwn->rd_guid.entityid);
nf->writerId = nn_hton_entityid (pwr->e.guid.entityid);
nf->writerSN = toSN (nackfrag_seq);
nf->fragmentNumberState.bitmap_base = nackfrag.set.bitmap_base + 1;
nf->fragmentNumberState.numbits = nackfrag.set.numbits;
memcpy (nf->bits, nackfrag.bits, NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nackfrag_numbits));
{
nn_count_t *countp =
(nn_count_t *) ((char *) nf + offsetof (NackFrag_t, bits) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (nf->fragmentNumberState.numbits));
*countp = ++pwr->nackfragcount;
nn_xmsg_submsg_setnext (msg, sm_marker);
ETRACE (pwr, " + nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits);
for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0');
}
/* Encode the sub-message when needed. */
encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid);
}
ETRACE (pwr, "\n");
} }
static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_mtime_t tnow) static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_mtime_t tnow)
@ -1007,7 +855,6 @@ static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_m
struct proxy_writer *pwr; struct proxy_writer *pwr;
struct nn_xmsg *msg; struct nn_xmsg *msg;
struct pwr_rd_match *rwn; struct pwr_rd_match *rwn;
nn_locator_t loc;
if ((pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, &ev->u.acknack.pwr_guid)) == NULL) if ((pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, &ev->u.acknack.pwr_guid)) == NULL)
{ {
@ -1021,88 +868,26 @@ static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, ddsrt_m
return; return;
} }
if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc)) if (!pwr->have_seen_heartbeat)
{ msg = make_preemptive_acknack (ev, pwr, rwn, tnow);
struct participant *pp = NULL; else if (!(rwn->heartbeat_since_ack || rwn->heartbeatfrag_since_ack))
seqno_t nack_seq;
if (q_omg_proxy_participant_is_secure(pwr->c.proxypp))
{
struct reader *rd = entidx_lookup_reader_guid(pwr->e.gv->entity_index, &ev->u.acknack.rd_guid);
if (rd)
pp = rd->c.pp;
}
if ((msg = nn_xmsg_new (gv->xmsgpool, &ev->u.acknack.rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL)
goto outofmem;
nn_xmsg_setdst1 (gv, msg, &ev->u.acknack.pwr_guid.prefix, &loc);
if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v)
{
/* If HB->ACK latency measurement is enabled, and we have a
timestamp available, add it and clear the time stamp. There
is no real guarantee that the two match, but I haven't got a
solution for that yet ... If adding the time stamp fails,
too bad, but no reason to get worried. */
nn_xmsg_add_timestamp (msg, rwn->hb_timestamp);
rwn->hb_timestamp.v = 0;
}
add_AckNack (msg, pwr, rwn, &nack_seq);
if (nn_xmsg_size(msg) == 0)
{
/* No AckNack added. */
nn_xmsg_free(msg);
msg = NULL;
}
else if (nack_seq)
{
rwn->t_last_nack = tnow;
rwn->seq_last_nack = nack_seq;
/* If NACKing, make sure we don't give up too soon: even though
we're not allowed to send an ACKNACK unless in response to a
HEARTBEAT, I've seen too many cases of not sending an NACK
because the writing side got confused ... Better to recover
eventually. */
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, gv->config.auto_resched_nack_delay));
}
GVTRACE ("send acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT")\n",
PGUID (ev->u.acknack.rd_guid), PGUID (ev->u.acknack.pwr_guid));
}
else
{
GVTRACE ("skip acknack(rd "PGUIDFMT" -> pwr "PGUIDFMT"): no address\n",
PGUID (ev->u.acknack.rd_guid), PGUID (ev->u.acknack.pwr_guid));
msg = NULL; msg = NULL;
} else
msg = make_and_resched_acknack (ev, pwr, rwn, tnow, false);
if (!pwr->have_seen_heartbeat && tnow.v - rwn->tcreate.v <= DDS_SECS (300))
{
/* Force pre-emptive AckNacks out until we receive a heartbeat,
but let the frequency drop over time and stop after a couple
of minutes. */
int intv, age = (int) ((tnow.v - rwn->tcreate.v) / DDS_NSECS_IN_SEC + 1);
if (age <= 10)
intv = 1;
else if (age <= 60)
intv = 2;
else if (age <= 120)
intv = 5;
else
intv = 10;
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, intv * DDS_NSECS_IN_SEC));
}
ddsrt_mutex_unlock (&pwr->e.lock); ddsrt_mutex_unlock (&pwr->e.lock);
/* nn_xpack_addmsg may sleep (for bandwidth-limited channels), so /* nn_xpack_addmsg may sleep (for bandwidth-limited channels), so
must be outside the lock */ must be outside the lock */
if (msg) if (msg)
nn_xpack_addmsg (xp, msg, 0); {
return; // a possible result of trying to encode a submessage is that it is removed,
// in which case we may end up with an empty one.
outofmem: // FIXME: change encode_datareader_submsg so that it returns this and make it warn_unused_result
/* What to do if out of memory? Crash or burn? */ if (nn_xmsg_size (msg) == 0)
ddsrt_mutex_unlock (&pwr->e.lock); nn_xmsg_free (msg);
(void) resched_xevent_if_earlier (ev, ddsrt_mtime_add_duration (tnow, DDS_MSECS (100))); else
nn_xpack_addmsg (xp, msg, 0);
}
} }
static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t *guid, struct proxy_reader *prd) static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t *guid, struct proxy_reader *prd)