Adapt message and burst sizes to receive buffers

This changes a few intertwined things at the same time:

* It allows configuring sending a partial message for large messages,
with a maximum derived from the discovered receive buffer sizes;

* It uses a different message size limit for datagrams that include
  retransmits than for those that don't.  The argument here is that,
  having seen flaky networks where large datagrams cause trouble, it
  makes sense to default to sending retransmits as datagrams that fit in
  individual packets.

* The best performance is generally obtained using the maximum data gram
  size, but the benefits do fall off quite quickly once they are
  largish.  For flaky networks, it doesn't make sense to go for 64kB
  datagrams.  This tries to find a reasonable compromise.

* It now packs mutiple fragments into a single DATAFRAG message to
  eliminate the cost of using small fragment sizes.

The changes in buffer sizes cause the ddsperf sanity check to fail:

* The larger amounts of unacknowledged data cause the used memory to be
  higher, failing the RSS check.  Raising the limit seems
  reasonable (the alternative would be to configure it back to the old
  values, but it is all empirically determined anyway).

* The same also causes the publisher thread to get to run more and the
  ping/pong bit gets less of a chance.  Using fixed-frequency bursts
  helps with this.

This therefore also adjust the test configuration and the thresholds a
bit.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-06-22 17:15:48 +02:00 committed by eboasson
parent 82e6a7972c
commit 66daba9f2f
13 changed files with 321 additions and 106 deletions

View file

@ -250,7 +250,7 @@ The default value is: "".
### //CycloneDDS/Domain/General
Children: [AllowMulticast](#cycloneddsdomaingeneralallowmulticast), [DontRoute](#cycloneddsdomaingeneraldontroute), [EnableMulticastLoopback](#cycloneddsdomaingeneralenablemulticastloopback), [ExternalNetworkAddress](#cycloneddsdomaingeneralexternalnetworkaddress), [ExternalNetworkMask](#cycloneddsdomaingeneralexternalnetworkmask), [FragmentSize](#cycloneddsdomaingeneralfragmentsize), [MaxMessageSize](#cycloneddsdomaingeneralmaxmessagesize), [MulticastRecvNetworkInterfaceAddresses](#cycloneddsdomaingeneralmulticastrecvnetworkinterfaceaddresses), [MulticastTimeToLive](#cycloneddsdomaingeneralmulticasttimetolive), [NetworkInterfaceAddress](#cycloneddsdomaingeneralnetworkinterfaceaddress), [PreferMulticast](#cycloneddsdomaingeneralprefermulticast), [Transport](#cycloneddsdomaingeneraltransport), [UseIPv6](#cycloneddsdomaingeneraluseipv)
Children: [AllowMulticast](#cycloneddsdomaingeneralallowmulticast), [DontRoute](#cycloneddsdomaingeneraldontroute), [EnableMulticastLoopback](#cycloneddsdomaingeneralenablemulticastloopback), [ExternalNetworkAddress](#cycloneddsdomaingeneralexternalnetworkaddress), [ExternalNetworkMask](#cycloneddsdomaingeneralexternalnetworkmask), [FragmentSize](#cycloneddsdomaingeneralfragmentsize), [MaxMessageSize](#cycloneddsdomaingeneralmaxmessagesize), [MaxRexmitMessageSize](#cycloneddsdomaingeneralmaxrexmitmessagesize), [MulticastRecvNetworkInterfaceAddresses](#cycloneddsdomaingeneralmulticastrecvnetworkinterfaceaddresses), [MulticastTimeToLive](#cycloneddsdomaingeneralmulticasttimetolive), [NetworkInterfaceAddress](#cycloneddsdomaingeneralnetworkinterfaceaddress), [PreferMulticast](#cycloneddsdomaingeneralprefermulticast), [Transport](#cycloneddsdomaingeneraltransport), [UseIPv6](#cycloneddsdomaingeneraluseipv)
The General element specifies overall Cyclone DDS service settings.
@ -317,7 +317,7 @@ This element specifies the size of DDSI sample fragments generated by Cyclone DD
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
The default value is: "1280 B".
The default value is: "1344 B".
#### //CycloneDDS/Domain/General/MaxMessageSize
@ -325,11 +325,23 @@ Number-with-unit
This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B).
On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize.
On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
The default value is: "4096 B".
The default value is: "14720 B".
#### //CycloneDDS/Domain/General/MaxRexmitMessageSize
Number-with-unit
This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B).
On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
The default value is: "1456 B".
#### //CycloneDDS/Domain/General/MulticastRecvNetworkInterfaceAddresses
@ -394,7 +406,7 @@ The default value is: "default".
### //CycloneDDS/Domain/Internal
Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [BurstSize](#cycloneddsdomaininternalburstsize), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
The Internal elements deal with a variety of settings that evolving and that are not necessarily fully supported. For the vast majority of the Internal settings, the functionality per-se is supported, but the right to change the way the options control the functionality is reserved. This includes renaming or moving options.
@ -440,6 +452,32 @@ The default is writers, as this is thought to be compliant and reasonably effici
The default value is: "writers".
#### //CycloneDDS/Domain/Internal/BurstSize
Children: [MaxInitTransmit](#cycloneddsdomaininternalburstsizemaxinittransmit), [MaxRexmit](#cycloneddsdomaininternalburstsizemaxrexmit)
Setting for controlling the size of transmit bursts.
##### //CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit
Number-with-unit
This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
The default value is: "4294967295".
##### //CycloneDDS/Domain/Internal/BurstSize/MaxRexmit
Number-with-unit
This element specifies the amount of data to be retransmitted in response to one NACK.
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
The default value is: "1 MiB".
#### //CycloneDDS/Domain/Internal/ControlTopic
The ControlTopic element allows configured whether Cyclone DDS provides a special control interface via a predefined topic or not.

View file

@ -229,13 +229,21 @@ CycloneDDS configuration""" ] ]
}?
& [ a:documentation [ xml:lang="en" """
<p>This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B).</p>
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize.</p>
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.</p>
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
<p>The default value is: "4096 B".</p>""" ] ]
<p>The default value is: "14720 B".</p>""" ] ]
element MaxMessageSize {
memsize
}?
& [ a:documentation [ xml:lang="en" """
<p>This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B).</p>
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.</p>
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
<p>The default value is: "1456 B".</p>""" ] ]
element MaxRexmitMessageSize {
memsize
}?
& [ a:documentation [ xml:lang="en" """
<p>This element specifies on which network interfaces Cyclone DDS listens to multicasts. The following options are available:</p>
<ul>
<li><i>all</i>: listen for multicasts on all multicast-capable interfaces; or</li>
@ -313,6 +321,24 @@ CycloneDDS configuration""" ] ]
("full"|"writers"|"minimal")
}?
& [ a:documentation [ xml:lang="en" """
<p>Setting for controlling the size of transmit bursts.</p>""" ] ]
element BurstSize {
[ a:documentation [ xml:lang="en" """
<p>This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.</p>
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
<p>The default value is: "4294967295".</p>""" ] ]
element MaxInitTransmit {
memsize
}?
& [ a:documentation [ xml:lang="en" """
<p>This element specifies the amount of data to be retransmitted in response to one NACK.</p>
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
<p>The default value is: "1 MiB".</p>""" ] ]
element MaxRexmit {
memsize
}?
}?
& [ a:documentation [ xml:lang="en" """
<p>The ControlTopic element allows configured whether Cyclone DDS provides a special control interface via a predefined topic or not.<p>""" ] ]
element ControlTopic {
empty

View file

@ -313,6 +313,7 @@ CycloneDDS configuration</xs:documentation>
<xs:element minOccurs="0" ref="config:ExternalNetworkMask"/>
<xs:element minOccurs="0" ref="config:FragmentSize"/>
<xs:element minOccurs="0" ref="config:MaxMessageSize"/>
<xs:element minOccurs="0" ref="config:MaxRexmitMessageSize"/>
<xs:element minOccurs="0" ref="config:MulticastRecvNetworkInterfaceAddresses"/>
<xs:element minOccurs="0" ref="config:MulticastTimeToLive"/>
<xs:element minOccurs="0" ref="config:NetworkInterfaceAddress"/>
@ -382,9 +383,18 @@ CycloneDDS configuration</xs:documentation>
<xs:annotation>
<xs:documentation>
&lt;p&gt;This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B).&lt;/p&gt;
&lt;p&gt;On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize.&lt;/p&gt;
&lt;p&gt;On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.&lt;/p&gt;
&lt;p&gt;The unit must be specified explicitly. Recognised units: B (bytes), kB &amp; KiB (2&lt;sup&gt;10&lt;/sup&gt; bytes), MB &amp; MiB (2&lt;sup&gt;20&lt;/sup&gt; bytes), GB &amp; GiB (2&lt;sup&gt;30&lt;/sup&gt; bytes).&lt;/p&gt;
&lt;p&gt;The default value is: "4096 B".&lt;/p&gt;</xs:documentation>
&lt;p&gt;The default value is: "14720 B".&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="MaxRexmitMessageSize" type="config:memsize">
<xs:annotation>
<xs:documentation>
&lt;p&gt;This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B).&lt;/p&gt;
&lt;p&gt;On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.&lt;/p&gt;
&lt;p&gt;The unit must be specified explicitly. Recognised units: B (bytes), kB &amp; KiB (2&lt;sup&gt;10&lt;/sup&gt; bytes), MB &amp; MiB (2&lt;sup&gt;20&lt;/sup&gt; bytes), GB &amp; GiB (2&lt;sup&gt;30&lt;/sup&gt; bytes).&lt;/p&gt;
&lt;p&gt;The default value is: "1456 B".&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="MulticastRecvNetworkInterfaceAddresses" type="xs:string">
@ -465,6 +475,7 @@ CycloneDDS configuration</xs:documentation>
<xs:element minOccurs="0" ref="config:AssumeMulticastCapable"/>
<xs:element minOccurs="0" ref="config:AutoReschedNackDelay"/>
<xs:element minOccurs="0" ref="config:BuiltinEndpointSet"/>
<xs:element minOccurs="0" ref="config:BurstSize"/>
<xs:element minOccurs="0" ref="config:ControlTopic"/>
<xs:element minOccurs="0" ref="config:DDSI2DirectMaxThreads"/>
<xs:element minOccurs="0" ref="config:DefragReliableMaxSamples"/>
@ -549,6 +560,34 @@ CycloneDDS configuration</xs:documentation>
</xs:restriction>
</xs:simpleType>
</xs:element>
<xs:element name="BurstSize">
<xs:annotation>
<xs:documentation>
&lt;p&gt;Setting for controlling the size of transmit bursts.&lt;/p&gt;</xs:documentation>
</xs:annotation>
<xs:complexType>
<xs:all>
<xs:element minOccurs="0" ref="config:MaxInitTransmit"/>
<xs:element minOccurs="0" ref="config:MaxRexmit"/>
</xs:all>
</xs:complexType>
</xs:element>
<xs:element name="MaxInitTransmit" type="config:memsize">
<xs:annotation>
<xs:documentation>
&lt;p&gt;This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.&lt;/p&gt;
&lt;p&gt;The unit must be specified explicitly. Recognised units: B (bytes), kB &amp; KiB (2&lt;sup&gt;10&lt;/sup&gt; bytes), MB &amp; MiB (2&lt;sup&gt;20&lt;/sup&gt; bytes), GB &amp; GiB (2&lt;sup&gt;30&lt;/sup&gt; bytes).&lt;/p&gt;
&lt;p&gt;The default value is: "4294967295".&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="MaxRexmit" type="config:memsize">
<xs:annotation>
<xs:documentation>
&lt;p&gt;This element specifies the amount of data to be retransmitted in response to one NACK.&lt;/p&gt;
&lt;p&gt;The unit must be specified explicitly. Recognised units: B (bytes), kB &amp; KiB (2&lt;sup&gt;10&lt;/sup&gt; bytes), MB &amp; MiB (2&lt;sup&gt;20&lt;/sup&gt; bytes), GB &amp; GiB (2&lt;sup&gt;30&lt;/sup&gt; bytes).&lt;/p&gt;
&lt;p&gt;The default value is: "1 MiB".&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="ControlTopic">
<xs:annotation>
<xs:documentation>

View file

@ -153,7 +153,7 @@ static struct cfgelem general_cfgelems[] = {
"communications, but if a node runs only a single Cyclone DDS service "
"and does not host any other DDSI-capable programs, it should be set "
"to \"false\" for improved performance.</p>")),
STRING("MaxMessageSize", NULL, 1, "4096 B",
STRING("MaxMessageSize", NULL, 1, "14720 B",
MEMBER(max_msg_size),
FUNCTIONS(0, uf_memsize, 0, pf_memsize),
DESCRIPTION(
@ -163,9 +163,19 @@ static struct cfgelem general_cfgelems[] = {
"(especially for very low values of MaxMessageSize) larger payloads "
"may sporadically be observed (currently up to 1192 B).</p>\n"
"<p>On some networks it may be necessary to set this item to keep the "
"packetsize below the MTU to prevent IP fragmentation. In those cases, "
"it is generally advisable to also consider reducing "
"Internal/FragmentSize.</p>"),
"packetsize below the MTU to prevent IP fragmentation.</p>"),
UNIT("memsize")),
STRING("MaxRexmitMessageSize", NULL, 1, "1456 B",
MEMBER(max_rexmit_msg_size),
FUNCTIONS(0, uf_memsize, 0, pf_memsize),
DESCRIPTION(
"<p>This element specifies the maximum size of the UDP payload that "
"Cyclone DDS will generate for a retransmit. Cyclone DDS will try to "
"maintain this limit within the bounds of the DDSI specification, which "
"means that in some cases (especially for very low values) larger payloads "
"may sporadically be observed (currently up to 1192 B).</p>\n"
"<p>On some networks it may be necessary to set this item to keep the "
"packetsize below the MTU to prevent IP fragmentation.</p>"),
UNIT("memsize")),
STRING("FragmentSize", NULL, 1, "1344 B",
MEMBER(fragment_size),
@ -856,6 +866,29 @@ static struct cfgelem internal_watermarks_cfgelems[] = {
END_MARKER
};
static struct cfgelem internal_burstsize_cfgelems[] = {
STRING("MaxRexmit", NULL, 1, "1 MiB",
MEMBER(max_rexmit_burst_size),
FUNCTIONS(0, uf_memsize, 0, pf_memsize),
DESCRIPTION(
"<p>This element specifies the amount of data to be retransmitted in "
"response to one NACK.</p>"),
UNIT("memsize")),
STRING("MaxInitTransmit", NULL, 1, "4294967295",
MEMBER(init_transmit_extra_pct),
FUNCTIONS(0, uf_uint, 0, pf_uint),
DESCRIPTION(
"<p>This element specifies how much more than the (presumed or discovered) "
"receive buffer size may be sent when transmitting a sample for the first "
"time, expressed as a percentage; the remainder will then be handled via "
"retransmits. Usually the receivers can keep up with transmitter, at least "
"on average, and so generally it is better to hope for the best and recover. "
"Besides, the retransmits will be unicast, and so any multicast advantage "
"will be lost as well.</p>"),
UNIT("memsize")),
END_MARKER
};
static struct cfgelem control_topic_cfgattrs[] = {
BOOL(DEPRECATED("Enable"), NULL, 1, "false",
MEMBER(enable_control_topic),
@ -1369,6 +1402,10 @@ static struct cfgelem internal_cfgelems[] = {
NOMEMBER,
NOFUNCTIONS,
DESCRIPTION("<p>Watermarks for flow-control.</p>")),
GROUP("BurstSize", internal_burstsize_cfgelems, NULL, 1,
NOMEMBER,
NOFUNCTIONS,
DESCRIPTION("<p>Setting for controlling the size of transmit bursts.</p>")),
LIST("EnableExpensiveChecks", NULL, 1, "",
MEMBER(enabled_xchecks),
FUNCTIONS(0, uf_xcheck, 0, pf_xcheck),

View file

@ -264,6 +264,9 @@ struct config
uint16_t fragment_size;
uint32_t max_msg_size;
uint32_t max_rexmit_msg_size;
uint32_t init_transmit_extra_pct;
uint32_t max_rexmit_burst_size;
int publish_uc_locators; /* Publish discovery unicast locators */
int enable_uc_locators; /* If false, don't even try to create a unicast socket */

View file

@ -301,6 +301,8 @@ struct writer
uint32_t whc_low, whc_high; /* watermarks for WHC in bytes (counting only unack'd data) */
ddsrt_etime_t t_rexmit_end; /* time of last 1->0 transition of "retransmitting" */
ddsrt_etime_t t_whc_high_upd; /* time "whc_high" was last updated for controlled ramp-up of throughput */
uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */
uint32_t rexmit_burst_size_limit; /* derived from reader's receive_buffer_size */
uint32_t num_readers; /* total number of matching PROXY readers */
int32_t num_reliable_readers; /* number of matching reliable PROXY readers */
ddsrt_avl_tree_t readers; /* all matching PROXY readers, see struct wr_prd_match */

View file

@ -40,8 +40,9 @@ int write_sample_gc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp,
int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
/* When calling the following functions, wr->lock must be held */
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, uint32_t fragnum, uint16_t nfrags, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew, uint32_t advertised_fragnum);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew);
void enqueue_spdp_sample_wrlock_held (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct proxy_reader *prd);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync);
dds_return_t write_hb_liveliness (struct ddsi_domaingv * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp);
int write_sample_p2p_wrlock_held(struct writer *wr, seqno_t seq, struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, struct proxy_reader *prd);

View file

@ -1454,7 +1454,7 @@ static int rebuild_compare_locs(const void *va, const void *vb)
}
}
static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr)
static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr, uint32_t *min_receive_buffer_size)
{
struct addrset *all_addrs = new_addrset();
struct entity_index *gh = wr->e.gv->entity_index;
@ -1471,6 +1471,8 @@ static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr)
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) == NULL)
continue;
(*nreaders)++;
if (prd->receive_buffer_size < *min_receive_buffer_size)
*min_receive_buffer_size = prd->receive_buffer_size;
copy_addrset_into_addrset(wr->e.gv, all_addrs, prd->c.as);
}
if (addrset_empty(all_addrs) || *nreaders == 0)
@ -1677,7 +1679,7 @@ static void rebuild_drop(int locidx, int nreaders, int nlocs, int *locs_nrds, in
}
}
static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer *wr)
static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer *wr, uint32_t *min_receive_buffer_size)
{
bool prefer_multicast = wr->e.gv->config.prefer_multicast;
struct addrset *all_addrs;
@ -1686,7 +1688,7 @@ static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer
int *locs_nrds;
int8_t *covered;
int best;
if ((all_addrs = rebuild_make_all_addrs(&nreaders, wr)) == NULL)
if ((all_addrs = rebuild_make_all_addrs(&nreaders, wr, min_receive_buffer_size)) == NULL)
return;
nn_log_addrset(wr->e.gv, DDS_LC_DISCOVERY, "setcover: all_addrs", all_addrs);
ELOGDISC (wr, "\n");
@ -1715,12 +1717,39 @@ static void rebuild_writer_addrset (struct writer *wr)
/* FIXME way too inefficient in this form */
struct addrset *newas = new_addrset ();
struct addrset *oldas = wr->as;
uint32_t min_receive_buffer_size = UINT32_MAX;
/* only one operation at a time */
ASSERT_MUTEX_HELD (&wr->e.lock);
/* compute new addrset */
rebuild_writer_addrset_setcover(newas, wr);
rebuild_writer_addrset_setcover(newas, wr, &min_receive_buffer_size);
/* Modifying burst size limit here is a bit of a hack; but anyway ...
try to limit bursts of retransmits to 67% of the smallest receive
buffer, and those of initial transmissions to that + overshoot%.
It is usually best to send the full sample initially, always:
- if the receivers manage to keep up somewhat, sending it in one
go and then recovering anything lost is way faster then sending
only small batches
- the way things are now: the retransmits will be sent unicast,
so if there are multiple receivers, that'll blow up things by
a non-trivial amount */
wr->rexmit_burst_size_limit = min_receive_buffer_size - min_receive_buffer_size / 3;
if (wr->rexmit_burst_size_limit < 1024)
wr->rexmit_burst_size_limit = 1024;
if (wr->rexmit_burst_size_limit > wr->e.gv->config.max_rexmit_burst_size)
wr->rexmit_burst_size_limit = wr->e.gv->config.max_rexmit_burst_size;
if (wr->rexmit_burst_size_limit > UINT32_MAX - UINT16_MAX)
wr->rexmit_burst_size_limit = UINT32_MAX - UINT16_MAX;
const uint64_t limit64 = (uint64_t) wr->e.gv->config.init_transmit_extra_pct * (uint64_t) min_receive_buffer_size / 100;
if (limit64 > UINT32_MAX - UINT16_MAX)
wr->init_burst_size_limit = UINT32_MAX - UINT16_MAX;
else if (limit64 < wr->rexmit_burst_size_limit)
wr->init_burst_size_limit = wr->rexmit_burst_size_limit;
else
wr->init_burst_size_limit = (uint32_t) limit64;
/* swap in new address set; this simple procedure is ok as long as
wr->as is never accessed without the wr->e.lock held */
@ -1729,7 +1758,7 @@ static void rebuild_writer_addrset (struct writer *wr)
ELOGDISC (wr, "rebuild_writer_addrset("PGUIDFMT"):", PGUID (wr->e.guid));
nn_log_addrset(wr->e.gv, DDS_LC_DISCOVERY, "", wr->as);
ELOGDISC (wr, "\n");
ELOGDISC (wr, " (burst size %"PRIu32" rexmit %"PRIu32")\n", wr->init_burst_size_limit, wr->rexmit_burst_size_limit);
}
void rebuild_or_clear_writer_addrsets (struct ddsi_domaingv *gv, int rebuild)
@ -3598,6 +3627,8 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
wr->force_md5_keyhash = 0;
wr->alive = 1;
wr->alive_vclock = 0;
wr->init_burst_size_limit = UINT32_MAX - UINT16_MAX;
wr->rexmit_burst_size_limit = UINT32_MAX - UINT16_MAX;
wr->status_cb = status_cb;
wr->status_cb_entity = status_entity;

View file

@ -921,7 +921,8 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
nn_gap_info_init(&gi);
const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq;
const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0;
for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++)
uint32_t limit = wr->rexmit_burst_size_limit;
for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued && limit > 0; i++)
{
/* Accelerated schedule may run ahead of sequence number set
contained in the acknack, and assumes all messages beyond the
@ -949,6 +950,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
max_seq_in_reply = seqbase + i;
msgs_sent++;
sample.last_rexmit_ts = tstamp;
// FIXME: now enqueue_sample_wrlock_held limits retransmit requests of a large sample to 1 fragment
// thus we can easily figure out how much was sent, but we shouldn't have that knowledge here:
// it should return how much it queued instead
uint32_t sent = ddsi_serdata_size (sample.serdata);
if (sent > wr->e.gv->config.fragment_size)
sent = wr->e.gv->config.fragment_size;
limit = (sent > limit) ? 0 : limit - sent;
}
}
else
@ -972,6 +980,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
max_seq_in_reply = seqbase + i;
msgs_sent++;
sample.rexmit_count++;
// FIXME: now enqueue_sample_wrlock_held limits retransmit requests of a large sample to 1 fragment
// thus we can easily figure out how much was sent, but we shouldn't have that knowledge here:
// it should return how much it queued instead
uint32_t sent = ddsi_serdata_size (sample.serdata);
if (sent > wr->e.gv->config.fragment_size)
sent = wr->e.gv->config.fragment_size;
limit = (sent > limit) ? 0 : limit - sent;
}
}
}
@ -1544,18 +1559,21 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
a Gap if we don't have them anymore. */
if (whc_borrow_sample (wr->whc, seq, &sample))
{
const unsigned base = msg->fragmentNumberState.bitmap_base - 1;
int enqueued = 1;
const uint32_t base = msg->fragmentNumberState.bitmap_base - 1;
assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX);
uint32_t nfrags_lim = (wr->rexmit_burst_size_limit + wr->e.gv->config.fragment_size - 1) / wr->e.gv->config.fragment_size;
RSTTRACE (" scheduling requested frags ...\n");
for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && enqueued; i++)
for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && nfrags_lim > 0; i++)
{
if (nn_bitset_isset (msg->fragmentNumberState.numbits, msg->bits, i))
{
struct nn_xmsg *reply;
if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, prd, &reply, 0) < 0)
enqueued = 0;
if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, 1, prd, &reply, 0, 0) < 0)
nfrags_lim = 0;
else if (!qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0))
nfrags_lim = 0;
else
enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0);
nfrags_lim--;
}
}
whc_return_sample (wr->whc, &sample, false);

View file

@ -512,7 +512,7 @@ static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t s
return 0;
}
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew)
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, uint32_t fragnum, uint16_t nfrags, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew, uint32_t advertised_fragnum)
{
/* We always fragment into FRAGMENT_SIZEd fragments, which are near
the smallest allowed fragment size & can't be bothered (yet) to
@ -550,7 +550,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
return DDS_RETCODE_BAD_PARAMETER;
}
fragging = (gv->config.fragment_size < size);
fragging = (nfrags * (uint32_t) gv->config.fragment_size < size);
/* INFO_TS: 12 bytes, DataFrag_t: 36 bytes, expected inline QoS: 32 => should be single chunk */
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL)
@ -616,14 +616,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
ddcmn->smhdr.flags = (unsigned char) (ddcmn->smhdr.flags | contentflag);
frag->fragmentStartingNum = fragnum + 1;
frag->fragmentsInSubmessage = 1;
#if MULTIPLE_FRAGS_IN_SUBMSG /* ugly hack for testing only */
if (fragstart + gv->config.fragment_size < ddsi_serdata_size (serdata) &&
fragstart + 2 * gv->config.fragment_size >= ddsi_serdata_size (serdata))
frag->fragmentsInSubmessage++;
ret = frag->fragmentsInSubmessage;
#endif
frag->fragmentsInSubmessage = nfrags;
frag->fragmentSize = gv->config.fragment_size;
frag->sampleSize = (uint32_t) size;
@ -633,13 +626,13 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
fraglen = (uint32_t) (size - fragstart);
ddcmn->octetsToInlineQos = (unsigned short) ((char*) (frag+1) - ((char*) &ddcmn->octetsToInlineQos + 2));
if (wr->reliable && (!isnew || fragstart + fraglen == ddsi_serdata_size (serdata)))
if (wr->reliable && (!isnew || advertised_fragnum != UINT32_MAX))
{
/* only set for final fragment for new messages; for rexmits we
want it set for all so we can do merging. FIXME: I guess the
writer should track both seq_xmit and the fragment number
... */
nn_xmsg_setwriterseq_fragid (*pmsg, &wr->e.guid, seq, fragnum + frag->fragmentsInSubmessage - 1);
nn_xmsg_setwriterseq_fragid (*pmsg, &wr->e.guid, seq, isnew ? advertised_fragnum : fragnum + frag->fragmentsInSubmessage - 1);
}
}
@ -799,15 +792,19 @@ static int must_skip_frag (const char *frags_to_skip, unsigned frag)
}
#endif
static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew, uint32_t nfrags)
static void transmit_sample_lgmsg_unlocks_wr (struct nn_xpack *xp, struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew, uint32_t nfrags, uint32_t nfrags_lim)
{
#if 0
const char *frags_to_skip = getenv ("SKIPFRAGS");
#endif
assert(xp);
assert((wr->heartbeat_xevent != NULL) == (whcst != NULL));
for (uint32_t i = 0; i < nfrags; i++)
assert(0 < nfrags_lim && nfrags_lim <= nfrags);
uint32_t nf_in_submsg = isnew ? (wr->e.gv->config.max_msg_size / wr->e.gv->config.fragment_size) : 1;
if (nf_in_submsg == 0)
nf_in_submsg = 1;
else if (nf_in_submsg > UINT16_MAX)
nf_in_submsg = UINT16_MAX;
for (uint32_t i = 0; i < nfrags_lim; i += nf_in_submsg)
{
struct nn_xmsg *fmsg = NULL;
struct nn_xmsg *hmsg = NULL;
@ -816,43 +813,26 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *
if (must_skip_frag (frags_to_skip, i))
continue;
#endif
if (nf_in_submsg > nfrags_lim - i)
nf_in_submsg = nfrags_lim - i;
/* Ignore out-of-memory errors: we can't do anything about it, and
eventually we'll have to retry. But if a packet went out and
we haven't yet completed transmitting a fragmented message, add
a HeartbeatFrag. */
ddsrt_mutex_lock (&wr->e.lock);
ret = create_fragment_message (wr, seq, plist, serdata, i, prd, &fmsg, isnew);
if (ret >= 0)
ret = create_fragment_message (wr, seq, plist, serdata, i, (uint16_t) nf_in_submsg, prd, &fmsg, isnew, i + nf_in_submsg == nfrags_lim ? nfrags - 1 : UINT32_MAX);
if (ret >= 0 && i + nf_in_submsg < nfrags_lim && wr->heartbeat_xevent)
{
if (nfrags > 1 && i + 1 < nfrags)
create_HeartbeatFrag (wr, seq, i, prd, &hmsg);
// more fragment messages to come
create_HeartbeatFrag (wr, seq, i + nf_in_submsg - 1, prd, &hmsg);
}
ddsrt_mutex_unlock (&wr->e.lock);
if(fmsg) nn_xpack_addmsg (xp, fmsg, 0);
if(hmsg) nn_xpack_addmsg (xp, hmsg, 0);
#if MULTIPLE_FRAGS_IN_SUBMSG /* ugly hack for testing only */
if (ret > 1)
i += ret-1;
#endif
}
/* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */
if (wr->heartbeat_xevent)
{
struct nn_xmsg *msg = NULL;
int hbansreq;
assert (whcst != NULL);
ddsrt_mutex_lock (&wr->e.lock);
msg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq);
ddsrt_mutex_unlock (&wr->e.lock);
if (msg)
{
nn_xpack_addmsg (xp, msg, 0);
if (hbansreq >= 2)
nn_xpack_send (xp, true);
}
}
}
@ -860,7 +840,8 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
{
/* on entry: &wr->e.lock held; on exit: lock no longer held */
struct ddsi_domaingv const * const gv = wr->e.gv;
struct nn_xmsg *fmsg;
struct nn_xmsg *hmsg = NULL;
int hbansreq = 0;
uint32_t sz;
assert(xp);
assert((wr->heartbeat_xevent != NULL) == (whcst != NULL));
@ -868,35 +849,41 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
sz = ddsi_serdata_size (serdata);
if (sz > gv->config.fragment_size || !isnew || plist != NULL || prd != NULL || q_omg_writer_is_submessage_protected(wr))
{
uint32_t nfrags;
ddsrt_mutex_unlock (&wr->e.lock);
nfrags = (sz + gv->config.fragment_size - 1) / gv->config.fragment_size;
transmit_sample_lgmsg_unlocked (xp, wr, whcst, seq, plist, serdata, prd, isnew, nfrags);
return;
}
else if (create_fragment_message_simple (wr, seq, serdata, &fmsg) < 0)
{
ddsrt_mutex_unlock (&wr->e.lock);
return;
assert (wr->init_burst_size_limit <= UINT32_MAX - UINT16_MAX);
assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX);
const uint32_t max_burst_size = isnew ? wr->init_burst_size_limit : wr->rexmit_burst_size_limit;
const uint32_t nfrags = (sz + gv->config.fragment_size - 1) / gv->config.fragment_size;
uint32_t nfrags_lim;
if (sz <= max_burst_size || wr->num_reliable_readers != wr->num_readers)
nfrags_lim = nfrags; // if it fits or if there are best-effort readers, send it in its entirety
else
nfrags_lim = (max_burst_size + gv->config.fragment_size - 1) / gv->config.fragment_size;
transmit_sample_lgmsg_unlocks_wr (xp, wr, seq, plist, serdata, prd, isnew, nfrags, nfrags_lim);
}
else
{
int hbansreq = 0;
struct nn_xmsg *hmsg;
struct nn_xmsg *fmsg;
if (create_fragment_message_simple (wr, seq, serdata, &fmsg) >= 0)
nn_xpack_addmsg (xp, fmsg, 0);
}
/* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */
if (wr->heartbeat_xevent)
hmsg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq);
else
hmsg = NULL;
ddsrt_mutex_unlock (&wr->e.lock);
nn_xpack_addmsg (xp, fmsg, 0);
if(hmsg)
nn_xpack_addmsg (xp, hmsg, 0);
if (hbansreq >= 2)
nn_xpack_send (xp, true);
}
}
void enqueue_spdp_sample_wrlock_held (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct proxy_reader *prd)
{
assert (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
struct nn_xmsg *msg = NULL;
if (create_fragment_message(wr, seq, NULL, serdata, 0, UINT16_MAX, prd, &msg, 1, UINT32_MAX) >= 0)
qxev_msg (wr->evq, msg);
}
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew)
@ -914,6 +901,8 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct dds
/* end-of-transaction messages are empty, but still need to be sent */
nfrags = 1;
}
if (!isnew && nfrags > 1)
nfrags = 1;
for (i = 0; i < nfrags && enqueued; i++)
{
struct nn_xmsg *fmsg = NULL;
@ -922,7 +911,7 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct dds
eventually we'll have to retry. But if a packet went out and
we haven't yet completed transmitting a fragmented message, add
a HeartbeatFrag. */
if (create_fragment_message (wr, seq, plist, serdata, i, prd, &fmsg, isnew) >= 0)
if (create_fragment_message (wr, seq, plist, serdata, i, 1, prd, &fmsg, isnew, (i+1) == nfrags ? i : UINT32_MAX) >= 0)
{
if (nfrags > 1 && i + 1 < nfrags)
create_HeartbeatFrag (wr, seq, i, prd, &hmsg);
@ -1374,6 +1363,9 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *
{
if (wr->heartbeat_xevent)
writer_hbcontrol_note_asyncwrite (wr, tnow);
if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
enqueue_spdp_sample_wrlock_held(wr, seq, serdata, NULL);
else
enqueue_sample_wrlock_held (wr, seq, plist, serdata, NULL, 1);
ddsrt_mutex_unlock (&wr->e.lock);
}

View file

@ -1129,7 +1129,7 @@ static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t
updating of the last transmitted sequence number won't take
place anyway. Nor is it necessary to fiddle with heartbeat
control stuff. */
enqueue_sample_wrlock_held (wr, sample.seq, sample.plist, sample.serdata, prd, 1);
enqueue_spdp_sample_wrlock_held (wr, sample.seq, sample.serdata, prd);
whc_return_sample(wr->whc, &sample, false);
}
ddsrt_mutex_unlock (&wr->e.lock);

View file

@ -220,6 +220,7 @@ struct nn_xpack
} all;
} dstaddr;
bool includes_rexmit;
struct nn_xmsg_chain included_msgs;
#ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING
@ -1145,6 +1146,7 @@ static void nn_xpack_reinit (struct nn_xpack *xp)
xp->niov = 0;
xp->call_flags = 0;
xp->msg_len.length = 0;
xp->includes_rexmit = false;
xp->included_msgs.latest = NULL;
xp->maxdelay = DDS_INFINITY;
#ifdef DDSI_INCLUDE_SECURITY
@ -1535,9 +1537,24 @@ static int addressing_info_eq_onesidederr (const struct nn_xpack *xp, const stru
return 0;
}
static int nn_xmsg_is_rexmit (const struct nn_xmsg *m)
{
switch (m->kind)
{
case NN_XMSG_KIND_DATA:
case NN_XMSG_KIND_CONTROL:
return 0;
case NN_XMSG_KIND_DATA_REXMIT:
case NN_XMSG_KIND_DATA_REXMIT_NOMERGE:
return 1;
}
return 0;
}
static int nn_xpack_mayaddmsg (const struct nn_xpack *xp, const struct nn_xmsg *m, const uint32_t flags)
{
unsigned max_msg_size = xp->gv->config.max_msg_size;
const bool rexmit = xp->includes_rexmit || nn_xmsg_is_rexmit (m);
const unsigned max_msg_size = rexmit ? xp->gv->config.max_rexmit_msg_size : xp->gv->config.max_msg_size;
unsigned payload_size;
if (xp->niov == 0)
@ -1758,10 +1775,12 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag
xp->msg_len.length = (uint32_t) sz;
xp->niov = niov;
if (xpo_niov > 0 && sz > xp->gv->config.max_msg_size)
const bool rexmit = xp->includes_rexmit || nn_xmsg_is_rexmit (m);
const unsigned max_msg_size = rexmit ? xp->gv->config.max_rexmit_msg_size : xp->gv->config.max_msg_size;
if (xpo_niov > 0 && sz > max_msg_size)
{
GVTRACE (" => now niov %d sz %"PRIuSIZE" > max_msg_size %"PRIu32", nn_xpack_send niov %d sz %"PRIu32" now\n",
(int) niov, sz, gv->config.max_msg_size, (int) xpo_niov, xpo_sz);
(int) niov, sz, max_msg_size, (int) xpo_niov, xpo_sz);
xp->msg_len.length = xpo_sz;
xp->niov = xpo_niov;
nn_xpack_send (xp, false);
@ -1770,6 +1789,15 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag
else
{
xp->call_flags = flags;
switch (m->kind)
{
case NN_XMSG_KIND_DATA:
case NN_XMSG_KIND_CONTROL:
break;
case NN_XMSG_KIND_DATA_REXMIT:
case NN_XMSG_KIND_DATA_REXMIT_NOMERGE:
xp->includes_rexmit = true;
}
nn_xmsg_chain_add (&xp->included_msgs, m);
GVTRACE (" => now niov %d sz %"PRIuSIZE"\n", (int) niov, sz);
}

View file

@ -1,7 +1,7 @@
exitcode=0
# RSS/samples/roundtrip numbers are based on experimentation on Travis
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$!
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 pub & ddsperf_pids="$ddsperf_pids $!"
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:20% -Qrss:1 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$!
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:20% -Qrss:1 pub 100Hz burst 1000 & ddsperf_pids="$ddsperf_pids $!"
sleep 11
for pid in $ddsperf_pids ; do
if kill -0 $pid 2>/dev/null ; then