Adapt message and burst sizes to receive buffers
This changes a few intertwined things at the same time: * It allows configuring sending a partial message for large messages, with a maximum derived from the discovered receive buffer sizes; * It uses a different message size limit for datagrams that include retransmits than for those that don't. The argument here is that, having seen flaky networks where large datagrams cause trouble, it makes sense to default to sending retransmits as datagrams that fit in individual packets. * The best performance is generally obtained using the maximum data gram size, but the benefits do fall off quite quickly once they are largish. For flaky networks, it doesn't make sense to go for 64kB datagrams. This tries to find a reasonable compromise. * It now packs mutiple fragments into a single DATAFRAG message to eliminate the cost of using small fragment sizes. The changes in buffer sizes cause the ddsperf sanity check to fail: * The larger amounts of unacknowledged data cause the used memory to be higher, failing the RSS check. Raising the limit seems reasonable (the alternative would be to configure it back to the old values, but it is all empirically determined anyway). * The same also causes the publisher thread to get to run more and the ping/pong bit gets less of a chance. Using fixed-frequency bursts helps with this. This therefore also adjust the test configuration and the thresholds a bit. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
82e6a7972c
commit
66daba9f2f
13 changed files with 321 additions and 106 deletions
|
@ -250,7 +250,7 @@ The default value is: "".
|
|||
|
||||
|
||||
### //CycloneDDS/Domain/General
|
||||
Children: [AllowMulticast](#cycloneddsdomaingeneralallowmulticast), [DontRoute](#cycloneddsdomaingeneraldontroute), [EnableMulticastLoopback](#cycloneddsdomaingeneralenablemulticastloopback), [ExternalNetworkAddress](#cycloneddsdomaingeneralexternalnetworkaddress), [ExternalNetworkMask](#cycloneddsdomaingeneralexternalnetworkmask), [FragmentSize](#cycloneddsdomaingeneralfragmentsize), [MaxMessageSize](#cycloneddsdomaingeneralmaxmessagesize), [MulticastRecvNetworkInterfaceAddresses](#cycloneddsdomaingeneralmulticastrecvnetworkinterfaceaddresses), [MulticastTimeToLive](#cycloneddsdomaingeneralmulticasttimetolive), [NetworkInterfaceAddress](#cycloneddsdomaingeneralnetworkinterfaceaddress), [PreferMulticast](#cycloneddsdomaingeneralprefermulticast), [Transport](#cycloneddsdomaingeneraltransport), [UseIPv6](#cycloneddsdomaingeneraluseipv)
|
||||
Children: [AllowMulticast](#cycloneddsdomaingeneralallowmulticast), [DontRoute](#cycloneddsdomaingeneraldontroute), [EnableMulticastLoopback](#cycloneddsdomaingeneralenablemulticastloopback), [ExternalNetworkAddress](#cycloneddsdomaingeneralexternalnetworkaddress), [ExternalNetworkMask](#cycloneddsdomaingeneralexternalnetworkmask), [FragmentSize](#cycloneddsdomaingeneralfragmentsize), [MaxMessageSize](#cycloneddsdomaingeneralmaxmessagesize), [MaxRexmitMessageSize](#cycloneddsdomaingeneralmaxrexmitmessagesize), [MulticastRecvNetworkInterfaceAddresses](#cycloneddsdomaingeneralmulticastrecvnetworkinterfaceaddresses), [MulticastTimeToLive](#cycloneddsdomaingeneralmulticasttimetolive), [NetworkInterfaceAddress](#cycloneddsdomaingeneralnetworkinterfaceaddress), [PreferMulticast](#cycloneddsdomaingeneralprefermulticast), [Transport](#cycloneddsdomaingeneraltransport), [UseIPv6](#cycloneddsdomaingeneraluseipv)
|
||||
|
||||
The General element specifies overall Cyclone DDS service settings.
|
||||
|
||||
|
@ -317,7 +317,7 @@ This element specifies the size of DDSI sample fragments generated by Cyclone DD
|
|||
|
||||
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
|
||||
|
||||
The default value is: "1280 B".
|
||||
The default value is: "1344 B".
|
||||
|
||||
|
||||
#### //CycloneDDS/Domain/General/MaxMessageSize
|
||||
|
@ -325,11 +325,23 @@ Number-with-unit
|
|||
|
||||
This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B).
|
||||
|
||||
On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize.
|
||||
On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.
|
||||
|
||||
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
|
||||
|
||||
The default value is: "4096 B".
|
||||
The default value is: "14720 B".
|
||||
|
||||
|
||||
#### //CycloneDDS/Domain/General/MaxRexmitMessageSize
|
||||
Number-with-unit
|
||||
|
||||
This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B).
|
||||
|
||||
On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.
|
||||
|
||||
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
|
||||
|
||||
The default value is: "1456 B".
|
||||
|
||||
|
||||
#### //CycloneDDS/Domain/General/MulticastRecvNetworkInterfaceAddresses
|
||||
|
@ -394,7 +406,7 @@ The default value is: "default".
|
|||
|
||||
|
||||
### //CycloneDDS/Domain/Internal
|
||||
Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
|
||||
Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [BurstSize](#cycloneddsdomaininternalburstsize), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
|
||||
|
||||
The Internal elements deal with a variety of settings that evolving and that are not necessarily fully supported. For the vast majority of the Internal settings, the functionality per-se is supported, but the right to change the way the options control the functionality is reserved. This includes renaming or moving options.
|
||||
|
||||
|
@ -440,6 +452,32 @@ The default is writers, as this is thought to be compliant and reasonably effici
|
|||
The default value is: "writers".
|
||||
|
||||
|
||||
#### //CycloneDDS/Domain/Internal/BurstSize
|
||||
Children: [MaxInitTransmit](#cycloneddsdomaininternalburstsizemaxinittransmit), [MaxRexmit](#cycloneddsdomaininternalburstsizemaxrexmit)
|
||||
|
||||
Setting for controlling the size of transmit bursts.
|
||||
|
||||
|
||||
##### //CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit
|
||||
Number-with-unit
|
||||
|
||||
This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.
|
||||
|
||||
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
|
||||
|
||||
The default value is: "4294967295".
|
||||
|
||||
|
||||
##### //CycloneDDS/Domain/Internal/BurstSize/MaxRexmit
|
||||
Number-with-unit
|
||||
|
||||
This element specifies the amount of data to be retransmitted in response to one NACK.
|
||||
|
||||
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes).
|
||||
|
||||
The default value is: "1 MiB".
|
||||
|
||||
|
||||
#### //CycloneDDS/Domain/Internal/ControlTopic
|
||||
The ControlTopic element allows configured whether Cyclone DDS provides a special control interface via a predefined topic or not.
|
||||
|
||||
|
|
|
@ -229,13 +229,21 @@ CycloneDDS configuration""" ] ]
|
|||
}?
|
||||
& [ a:documentation [ xml:lang="en" """
|
||||
<p>This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B).</p>
|
||||
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize.</p>
|
||||
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "4096 B".</p>""" ] ]
|
||||
<p>The default value is: "14720 B".</p>""" ] ]
|
||||
element MaxMessageSize {
|
||||
memsize
|
||||
}?
|
||||
& [ a:documentation [ xml:lang="en" """
|
||||
<p>This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B).</p>
|
||||
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "1456 B".</p>""" ] ]
|
||||
element MaxRexmitMessageSize {
|
||||
memsize
|
||||
}?
|
||||
& [ a:documentation [ xml:lang="en" """
|
||||
<p>This element specifies on which network interfaces Cyclone DDS listens to multicasts. The following options are available:</p>
|
||||
<ul>
|
||||
<li><i>all</i>: listen for multicasts on all multicast-capable interfaces; or</li>
|
||||
|
@ -313,6 +321,24 @@ CycloneDDS configuration""" ] ]
|
|||
("full"|"writers"|"minimal")
|
||||
}?
|
||||
& [ a:documentation [ xml:lang="en" """
|
||||
<p>Setting for controlling the size of transmit bursts.</p>""" ] ]
|
||||
element BurstSize {
|
||||
[ a:documentation [ xml:lang="en" """
|
||||
<p>This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "4294967295".</p>""" ] ]
|
||||
element MaxInitTransmit {
|
||||
memsize
|
||||
}?
|
||||
& [ a:documentation [ xml:lang="en" """
|
||||
<p>This element specifies the amount of data to be retransmitted in response to one NACK.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "1 MiB".</p>""" ] ]
|
||||
element MaxRexmit {
|
||||
memsize
|
||||
}?
|
||||
}?
|
||||
& [ a:documentation [ xml:lang="en" """
|
||||
<p>The ControlTopic element allows configured whether Cyclone DDS provides a special control interface via a predefined topic or not.<p>""" ] ]
|
||||
element ControlTopic {
|
||||
empty
|
||||
|
|
|
@ -313,6 +313,7 @@ CycloneDDS configuration</xs:documentation>
|
|||
<xs:element minOccurs="0" ref="config:ExternalNetworkMask"/>
|
||||
<xs:element minOccurs="0" ref="config:FragmentSize"/>
|
||||
<xs:element minOccurs="0" ref="config:MaxMessageSize"/>
|
||||
<xs:element minOccurs="0" ref="config:MaxRexmitMessageSize"/>
|
||||
<xs:element minOccurs="0" ref="config:MulticastRecvNetworkInterfaceAddresses"/>
|
||||
<xs:element minOccurs="0" ref="config:MulticastTimeToLive"/>
|
||||
<xs:element minOccurs="0" ref="config:NetworkInterfaceAddress"/>
|
||||
|
@ -382,9 +383,18 @@ CycloneDDS configuration</xs:documentation>
|
|||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
<p>This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B).</p>
|
||||
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize.</p>
|
||||
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "4096 B".</p></xs:documentation>
|
||||
<p>The default value is: "14720 B".</p></xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:element>
|
||||
<xs:element name="MaxRexmitMessageSize" type="config:memsize">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
<p>This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B).</p>
|
||||
<p>On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "1456 B".</p></xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:element>
|
||||
<xs:element name="MulticastRecvNetworkInterfaceAddresses" type="xs:string">
|
||||
|
@ -465,6 +475,7 @@ CycloneDDS configuration</xs:documentation>
|
|||
<xs:element minOccurs="0" ref="config:AssumeMulticastCapable"/>
|
||||
<xs:element minOccurs="0" ref="config:AutoReschedNackDelay"/>
|
||||
<xs:element minOccurs="0" ref="config:BuiltinEndpointSet"/>
|
||||
<xs:element minOccurs="0" ref="config:BurstSize"/>
|
||||
<xs:element minOccurs="0" ref="config:ControlTopic"/>
|
||||
<xs:element minOccurs="0" ref="config:DDSI2DirectMaxThreads"/>
|
||||
<xs:element minOccurs="0" ref="config:DefragReliableMaxSamples"/>
|
||||
|
@ -549,6 +560,34 @@ CycloneDDS configuration</xs:documentation>
|
|||
</xs:restriction>
|
||||
</xs:simpleType>
|
||||
</xs:element>
|
||||
<xs:element name="BurstSize">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
<p>Setting for controlling the size of transmit bursts.</p></xs:documentation>
|
||||
</xs:annotation>
|
||||
<xs:complexType>
|
||||
<xs:all>
|
||||
<xs:element minOccurs="0" ref="config:MaxInitTransmit"/>
|
||||
<xs:element minOccurs="0" ref="config:MaxRexmit"/>
|
||||
</xs:all>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:element name="MaxInitTransmit" type="config:memsize">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
<p>This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "4294967295".</p></xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:element>
|
||||
<xs:element name="MaxRexmit" type="config:memsize">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
<p>This element specifies the amount of data to be retransmitted in response to one NACK.</p>
|
||||
<p>The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2<sup>10</sup> bytes), MB & MiB (2<sup>20</sup> bytes), GB & GiB (2<sup>30</sup> bytes).</p>
|
||||
<p>The default value is: "1 MiB".</p></xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:element>
|
||||
<xs:element name="ControlTopic">
|
||||
<xs:annotation>
|
||||
<xs:documentation>
|
||||
|
|
|
@ -153,7 +153,7 @@ static struct cfgelem general_cfgelems[] = {
|
|||
"communications, but if a node runs only a single Cyclone DDS service "
|
||||
"and does not host any other DDSI-capable programs, it should be set "
|
||||
"to \"false\" for improved performance.</p>")),
|
||||
STRING("MaxMessageSize", NULL, 1, "4096 B",
|
||||
STRING("MaxMessageSize", NULL, 1, "14720 B",
|
||||
MEMBER(max_msg_size),
|
||||
FUNCTIONS(0, uf_memsize, 0, pf_memsize),
|
||||
DESCRIPTION(
|
||||
|
@ -163,9 +163,19 @@ static struct cfgelem general_cfgelems[] = {
|
|||
"(especially for very low values of MaxMessageSize) larger payloads "
|
||||
"may sporadically be observed (currently up to 1192 B).</p>\n"
|
||||
"<p>On some networks it may be necessary to set this item to keep the "
|
||||
"packetsize below the MTU to prevent IP fragmentation. In those cases, "
|
||||
"it is generally advisable to also consider reducing "
|
||||
"Internal/FragmentSize.</p>"),
|
||||
"packetsize below the MTU to prevent IP fragmentation.</p>"),
|
||||
UNIT("memsize")),
|
||||
STRING("MaxRexmitMessageSize", NULL, 1, "1456 B",
|
||||
MEMBER(max_rexmit_msg_size),
|
||||
FUNCTIONS(0, uf_memsize, 0, pf_memsize),
|
||||
DESCRIPTION(
|
||||
"<p>This element specifies the maximum size of the UDP payload that "
|
||||
"Cyclone DDS will generate for a retransmit. Cyclone DDS will try to "
|
||||
"maintain this limit within the bounds of the DDSI specification, which "
|
||||
"means that in some cases (especially for very low values) larger payloads "
|
||||
"may sporadically be observed (currently up to 1192 B).</p>\n"
|
||||
"<p>On some networks it may be necessary to set this item to keep the "
|
||||
"packetsize below the MTU to prevent IP fragmentation.</p>"),
|
||||
UNIT("memsize")),
|
||||
STRING("FragmentSize", NULL, 1, "1344 B",
|
||||
MEMBER(fragment_size),
|
||||
|
@ -856,6 +866,29 @@ static struct cfgelem internal_watermarks_cfgelems[] = {
|
|||
END_MARKER
|
||||
};
|
||||
|
||||
static struct cfgelem internal_burstsize_cfgelems[] = {
|
||||
STRING("MaxRexmit", NULL, 1, "1 MiB",
|
||||
MEMBER(max_rexmit_burst_size),
|
||||
FUNCTIONS(0, uf_memsize, 0, pf_memsize),
|
||||
DESCRIPTION(
|
||||
"<p>This element specifies the amount of data to be retransmitted in "
|
||||
"response to one NACK.</p>"),
|
||||
UNIT("memsize")),
|
||||
STRING("MaxInitTransmit", NULL, 1, "4294967295",
|
||||
MEMBER(init_transmit_extra_pct),
|
||||
FUNCTIONS(0, uf_uint, 0, pf_uint),
|
||||
DESCRIPTION(
|
||||
"<p>This element specifies how much more than the (presumed or discovered) "
|
||||
"receive buffer size may be sent when transmitting a sample for the first "
|
||||
"time, expressed as a percentage; the remainder will then be handled via "
|
||||
"retransmits. Usually the receivers can keep up with transmitter, at least "
|
||||
"on average, and so generally it is better to hope for the best and recover. "
|
||||
"Besides, the retransmits will be unicast, and so any multicast advantage "
|
||||
"will be lost as well.</p>"),
|
||||
UNIT("memsize")),
|
||||
END_MARKER
|
||||
};
|
||||
|
||||
static struct cfgelem control_topic_cfgattrs[] = {
|
||||
BOOL(DEPRECATED("Enable"), NULL, 1, "false",
|
||||
MEMBER(enable_control_topic),
|
||||
|
@ -1369,6 +1402,10 @@ static struct cfgelem internal_cfgelems[] = {
|
|||
NOMEMBER,
|
||||
NOFUNCTIONS,
|
||||
DESCRIPTION("<p>Watermarks for flow-control.</p>")),
|
||||
GROUP("BurstSize", internal_burstsize_cfgelems, NULL, 1,
|
||||
NOMEMBER,
|
||||
NOFUNCTIONS,
|
||||
DESCRIPTION("<p>Setting for controlling the size of transmit bursts.</p>")),
|
||||
LIST("EnableExpensiveChecks", NULL, 1, "",
|
||||
MEMBER(enabled_xchecks),
|
||||
FUNCTIONS(0, uf_xcheck, 0, pf_xcheck),
|
||||
|
|
|
@ -264,6 +264,9 @@ struct config
|
|||
|
||||
uint16_t fragment_size;
|
||||
uint32_t max_msg_size;
|
||||
uint32_t max_rexmit_msg_size;
|
||||
uint32_t init_transmit_extra_pct;
|
||||
uint32_t max_rexmit_burst_size;
|
||||
|
||||
int publish_uc_locators; /* Publish discovery unicast locators */
|
||||
int enable_uc_locators; /* If false, don't even try to create a unicast socket */
|
||||
|
|
|
@ -301,6 +301,8 @@ struct writer
|
|||
uint32_t whc_low, whc_high; /* watermarks for WHC in bytes (counting only unack'd data) */
|
||||
ddsrt_etime_t t_rexmit_end; /* time of last 1->0 transition of "retransmitting" */
|
||||
ddsrt_etime_t t_whc_high_upd; /* time "whc_high" was last updated for controlled ramp-up of throughput */
|
||||
uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */
|
||||
uint32_t rexmit_burst_size_limit; /* derived from reader's receive_buffer_size */
|
||||
uint32_t num_readers; /* total number of matching PROXY readers */
|
||||
int32_t num_reliable_readers; /* number of matching reliable PROXY readers */
|
||||
ddsrt_avl_tree_t readers; /* all matching PROXY readers, see struct wr_prd_match */
|
||||
|
|
|
@ -40,8 +40,9 @@ int write_sample_gc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp,
|
|||
int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
|
||||
|
||||
/* When calling the following functions, wr->lock must be held */
|
||||
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
|
||||
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, uint32_t fragnum, uint16_t nfrags, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew, uint32_t advertised_fragnum);
|
||||
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew);
|
||||
void enqueue_spdp_sample_wrlock_held (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct proxy_reader *prd);
|
||||
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync);
|
||||
dds_return_t write_hb_liveliness (struct ddsi_domaingv * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp);
|
||||
int write_sample_p2p_wrlock_held(struct writer *wr, seqno_t seq, struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, struct proxy_reader *prd);
|
||||
|
|
|
@ -1454,7 +1454,7 @@ static int rebuild_compare_locs(const void *va, const void *vb)
|
|||
}
|
||||
}
|
||||
|
||||
static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr)
|
||||
static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr, uint32_t *min_receive_buffer_size)
|
||||
{
|
||||
struct addrset *all_addrs = new_addrset();
|
||||
struct entity_index *gh = wr->e.gv->entity_index;
|
||||
|
@ -1471,6 +1471,8 @@ static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr)
|
|||
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) == NULL)
|
||||
continue;
|
||||
(*nreaders)++;
|
||||
if (prd->receive_buffer_size < *min_receive_buffer_size)
|
||||
*min_receive_buffer_size = prd->receive_buffer_size;
|
||||
copy_addrset_into_addrset(wr->e.gv, all_addrs, prd->c.as);
|
||||
}
|
||||
if (addrset_empty(all_addrs) || *nreaders == 0)
|
||||
|
@ -1677,7 +1679,7 @@ static void rebuild_drop(int locidx, int nreaders, int nlocs, int *locs_nrds, in
|
|||
}
|
||||
}
|
||||
|
||||
static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer *wr)
|
||||
static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer *wr, uint32_t *min_receive_buffer_size)
|
||||
{
|
||||
bool prefer_multicast = wr->e.gv->config.prefer_multicast;
|
||||
struct addrset *all_addrs;
|
||||
|
@ -1686,7 +1688,7 @@ static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer
|
|||
int *locs_nrds;
|
||||
int8_t *covered;
|
||||
int best;
|
||||
if ((all_addrs = rebuild_make_all_addrs(&nreaders, wr)) == NULL)
|
||||
if ((all_addrs = rebuild_make_all_addrs(&nreaders, wr, min_receive_buffer_size)) == NULL)
|
||||
return;
|
||||
nn_log_addrset(wr->e.gv, DDS_LC_DISCOVERY, "setcover: all_addrs", all_addrs);
|
||||
ELOGDISC (wr, "\n");
|
||||
|
@ -1715,12 +1717,39 @@ static void rebuild_writer_addrset (struct writer *wr)
|
|||
/* FIXME way too inefficient in this form */
|
||||
struct addrset *newas = new_addrset ();
|
||||
struct addrset *oldas = wr->as;
|
||||
uint32_t min_receive_buffer_size = UINT32_MAX;
|
||||
|
||||
/* only one operation at a time */
|
||||
ASSERT_MUTEX_HELD (&wr->e.lock);
|
||||
|
||||
/* compute new addrset */
|
||||
rebuild_writer_addrset_setcover(newas, wr);
|
||||
rebuild_writer_addrset_setcover(newas, wr, &min_receive_buffer_size);
|
||||
|
||||
/* Modifying burst size limit here is a bit of a hack; but anyway ...
|
||||
try to limit bursts of retransmits to 67% of the smallest receive
|
||||
buffer, and those of initial transmissions to that + overshoot%.
|
||||
It is usually best to send the full sample initially, always:
|
||||
- if the receivers manage to keep up somewhat, sending it in one
|
||||
go and then recovering anything lost is way faster then sending
|
||||
only small batches
|
||||
- the way things are now: the retransmits will be sent unicast,
|
||||
so if there are multiple receivers, that'll blow up things by
|
||||
a non-trivial amount */
|
||||
wr->rexmit_burst_size_limit = min_receive_buffer_size - min_receive_buffer_size / 3;
|
||||
if (wr->rexmit_burst_size_limit < 1024)
|
||||
wr->rexmit_burst_size_limit = 1024;
|
||||
if (wr->rexmit_burst_size_limit > wr->e.gv->config.max_rexmit_burst_size)
|
||||
wr->rexmit_burst_size_limit = wr->e.gv->config.max_rexmit_burst_size;
|
||||
if (wr->rexmit_burst_size_limit > UINT32_MAX - UINT16_MAX)
|
||||
wr->rexmit_burst_size_limit = UINT32_MAX - UINT16_MAX;
|
||||
|
||||
const uint64_t limit64 = (uint64_t) wr->e.gv->config.init_transmit_extra_pct * (uint64_t) min_receive_buffer_size / 100;
|
||||
if (limit64 > UINT32_MAX - UINT16_MAX)
|
||||
wr->init_burst_size_limit = UINT32_MAX - UINT16_MAX;
|
||||
else if (limit64 < wr->rexmit_burst_size_limit)
|
||||
wr->init_burst_size_limit = wr->rexmit_burst_size_limit;
|
||||
else
|
||||
wr->init_burst_size_limit = (uint32_t) limit64;
|
||||
|
||||
/* swap in new address set; this simple procedure is ok as long as
|
||||
wr->as is never accessed without the wr->e.lock held */
|
||||
|
@ -1729,7 +1758,7 @@ static void rebuild_writer_addrset (struct writer *wr)
|
|||
|
||||
ELOGDISC (wr, "rebuild_writer_addrset("PGUIDFMT"):", PGUID (wr->e.guid));
|
||||
nn_log_addrset(wr->e.gv, DDS_LC_DISCOVERY, "", wr->as);
|
||||
ELOGDISC (wr, "\n");
|
||||
ELOGDISC (wr, " (burst size %"PRIu32" rexmit %"PRIu32")\n", wr->init_burst_size_limit, wr->rexmit_burst_size_limit);
|
||||
}
|
||||
|
||||
void rebuild_or_clear_writer_addrsets (struct ddsi_domaingv *gv, int rebuild)
|
||||
|
@ -3598,6 +3627,8 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
|
|||
wr->force_md5_keyhash = 0;
|
||||
wr->alive = 1;
|
||||
wr->alive_vclock = 0;
|
||||
wr->init_burst_size_limit = UINT32_MAX - UINT16_MAX;
|
||||
wr->rexmit_burst_size_limit = UINT32_MAX - UINT16_MAX;
|
||||
|
||||
wr->status_cb = status_cb;
|
||||
wr->status_cb_entity = status_entity;
|
||||
|
|
|
@ -921,7 +921,8 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
|
|||
nn_gap_info_init(&gi);
|
||||
const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq;
|
||||
const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0;
|
||||
for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++)
|
||||
uint32_t limit = wr->rexmit_burst_size_limit;
|
||||
for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued && limit > 0; i++)
|
||||
{
|
||||
/* Accelerated schedule may run ahead of sequence number set
|
||||
contained in the acknack, and assumes all messages beyond the
|
||||
|
@ -949,6 +950,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
|
|||
max_seq_in_reply = seqbase + i;
|
||||
msgs_sent++;
|
||||
sample.last_rexmit_ts = tstamp;
|
||||
// FIXME: now enqueue_sample_wrlock_held limits retransmit requests of a large sample to 1 fragment
|
||||
// thus we can easily figure out how much was sent, but we shouldn't have that knowledge here:
|
||||
// it should return how much it queued instead
|
||||
uint32_t sent = ddsi_serdata_size (sample.serdata);
|
||||
if (sent > wr->e.gv->config.fragment_size)
|
||||
sent = wr->e.gv->config.fragment_size;
|
||||
limit = (sent > limit) ? 0 : limit - sent;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -972,6 +980,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
|
|||
max_seq_in_reply = seqbase + i;
|
||||
msgs_sent++;
|
||||
sample.rexmit_count++;
|
||||
// FIXME: now enqueue_sample_wrlock_held limits retransmit requests of a large sample to 1 fragment
|
||||
// thus we can easily figure out how much was sent, but we shouldn't have that knowledge here:
|
||||
// it should return how much it queued instead
|
||||
uint32_t sent = ddsi_serdata_size (sample.serdata);
|
||||
if (sent > wr->e.gv->config.fragment_size)
|
||||
sent = wr->e.gv->config.fragment_size;
|
||||
limit = (sent > limit) ? 0 : limit - sent;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1544,18 +1559,21 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
|
|||
a Gap if we don't have them anymore. */
|
||||
if (whc_borrow_sample (wr->whc, seq, &sample))
|
||||
{
|
||||
const unsigned base = msg->fragmentNumberState.bitmap_base - 1;
|
||||
int enqueued = 1;
|
||||
const uint32_t base = msg->fragmentNumberState.bitmap_base - 1;
|
||||
assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX);
|
||||
uint32_t nfrags_lim = (wr->rexmit_burst_size_limit + wr->e.gv->config.fragment_size - 1) / wr->e.gv->config.fragment_size;
|
||||
RSTTRACE (" scheduling requested frags ...\n");
|
||||
for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && enqueued; i++)
|
||||
for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && nfrags_lim > 0; i++)
|
||||
{
|
||||
if (nn_bitset_isset (msg->fragmentNumberState.numbits, msg->bits, i))
|
||||
{
|
||||
struct nn_xmsg *reply;
|
||||
if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, prd, &reply, 0) < 0)
|
||||
enqueued = 0;
|
||||
if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, 1, prd, &reply, 0, 0) < 0)
|
||||
nfrags_lim = 0;
|
||||
else if (!qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0))
|
||||
nfrags_lim = 0;
|
||||
else
|
||||
enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0);
|
||||
nfrags_lim--;
|
||||
}
|
||||
}
|
||||
whc_return_sample (wr->whc, &sample, false);
|
||||
|
|
|
@ -512,7 +512,7 @@ static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t s
|
|||
return 0;
|
||||
}
|
||||
|
||||
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew)
|
||||
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, uint32_t fragnum, uint16_t nfrags, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew, uint32_t advertised_fragnum)
|
||||
{
|
||||
/* We always fragment into FRAGMENT_SIZEd fragments, which are near
|
||||
the smallest allowed fragment size & can't be bothered (yet) to
|
||||
|
@ -550,7 +550,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
|
|||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
}
|
||||
|
||||
fragging = (gv->config.fragment_size < size);
|
||||
fragging = (nfrags * (uint32_t) gv->config.fragment_size < size);
|
||||
|
||||
/* INFO_TS: 12 bytes, DataFrag_t: 36 bytes, expected inline QoS: 32 => should be single chunk */
|
||||
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL)
|
||||
|
@ -616,14 +616,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
|
|||
ddcmn->smhdr.flags = (unsigned char) (ddcmn->smhdr.flags | contentflag);
|
||||
|
||||
frag->fragmentStartingNum = fragnum + 1;
|
||||
frag->fragmentsInSubmessage = 1;
|
||||
|
||||
#if MULTIPLE_FRAGS_IN_SUBMSG /* ugly hack for testing only */
|
||||
if (fragstart + gv->config.fragment_size < ddsi_serdata_size (serdata) &&
|
||||
fragstart + 2 * gv->config.fragment_size >= ddsi_serdata_size (serdata))
|
||||
frag->fragmentsInSubmessage++;
|
||||
ret = frag->fragmentsInSubmessage;
|
||||
#endif
|
||||
frag->fragmentsInSubmessage = nfrags;
|
||||
frag->fragmentSize = gv->config.fragment_size;
|
||||
frag->sampleSize = (uint32_t) size;
|
||||
|
||||
|
@ -633,13 +626,13 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
|
|||
fraglen = (uint32_t) (size - fragstart);
|
||||
ddcmn->octetsToInlineQos = (unsigned short) ((char*) (frag+1) - ((char*) &ddcmn->octetsToInlineQos + 2));
|
||||
|
||||
if (wr->reliable && (!isnew || fragstart + fraglen == ddsi_serdata_size (serdata)))
|
||||
if (wr->reliable && (!isnew || advertised_fragnum != UINT32_MAX))
|
||||
{
|
||||
/* only set for final fragment for new messages; for rexmits we
|
||||
want it set for all so we can do merging. FIXME: I guess the
|
||||
writer should track both seq_xmit and the fragment number
|
||||
... */
|
||||
nn_xmsg_setwriterseq_fragid (*pmsg, &wr->e.guid, seq, fragnum + frag->fragmentsInSubmessage - 1);
|
||||
nn_xmsg_setwriterseq_fragid (*pmsg, &wr->e.guid, seq, isnew ? advertised_fragnum : fragnum + frag->fragmentsInSubmessage - 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -799,15 +792,19 @@ static int must_skip_frag (const char *frags_to_skip, unsigned frag)
|
|||
}
|
||||
#endif
|
||||
|
||||
static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew, uint32_t nfrags)
|
||||
static void transmit_sample_lgmsg_unlocks_wr (struct nn_xpack *xp, struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew, uint32_t nfrags, uint32_t nfrags_lim)
|
||||
{
|
||||
#if 0
|
||||
const char *frags_to_skip = getenv ("SKIPFRAGS");
|
||||
#endif
|
||||
assert(xp);
|
||||
assert((wr->heartbeat_xevent != NULL) == (whcst != NULL));
|
||||
|
||||
for (uint32_t i = 0; i < nfrags; i++)
|
||||
assert(0 < nfrags_lim && nfrags_lim <= nfrags);
|
||||
uint32_t nf_in_submsg = isnew ? (wr->e.gv->config.max_msg_size / wr->e.gv->config.fragment_size) : 1;
|
||||
if (nf_in_submsg == 0)
|
||||
nf_in_submsg = 1;
|
||||
else if (nf_in_submsg > UINT16_MAX)
|
||||
nf_in_submsg = UINT16_MAX;
|
||||
for (uint32_t i = 0; i < nfrags_lim; i += nf_in_submsg)
|
||||
{
|
||||
struct nn_xmsg *fmsg = NULL;
|
||||
struct nn_xmsg *hmsg = NULL;
|
||||
|
@ -816,43 +813,26 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *
|
|||
if (must_skip_frag (frags_to_skip, i))
|
||||
continue;
|
||||
#endif
|
||||
|
||||
if (nf_in_submsg > nfrags_lim - i)
|
||||
nf_in_submsg = nfrags_lim - i;
|
||||
|
||||
/* Ignore out-of-memory errors: we can't do anything about it, and
|
||||
eventually we'll have to retry. But if a packet went out and
|
||||
we haven't yet completed transmitting a fragmented message, add
|
||||
a HeartbeatFrag. */
|
||||
ddsrt_mutex_lock (&wr->e.lock);
|
||||
ret = create_fragment_message (wr, seq, plist, serdata, i, prd, &fmsg, isnew);
|
||||
if (ret >= 0)
|
||||
ret = create_fragment_message (wr, seq, plist, serdata, i, (uint16_t) nf_in_submsg, prd, &fmsg, isnew, i + nf_in_submsg == nfrags_lim ? nfrags - 1 : UINT32_MAX);
|
||||
if (ret >= 0 && i + nf_in_submsg < nfrags_lim && wr->heartbeat_xevent)
|
||||
{
|
||||
if (nfrags > 1 && i + 1 < nfrags)
|
||||
create_HeartbeatFrag (wr, seq, i, prd, &hmsg);
|
||||
// more fragment messages to come
|
||||
create_HeartbeatFrag (wr, seq, i + nf_in_submsg - 1, prd, &hmsg);
|
||||
}
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
|
||||
if(fmsg) nn_xpack_addmsg (xp, fmsg, 0);
|
||||
if(hmsg) nn_xpack_addmsg (xp, hmsg, 0);
|
||||
|
||||
#if MULTIPLE_FRAGS_IN_SUBMSG /* ugly hack for testing only */
|
||||
if (ret > 1)
|
||||
i += ret-1;
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */
|
||||
if (wr->heartbeat_xevent)
|
||||
{
|
||||
struct nn_xmsg *msg = NULL;
|
||||
int hbansreq;
|
||||
assert (whcst != NULL);
|
||||
ddsrt_mutex_lock (&wr->e.lock);
|
||||
msg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq);
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
if (msg)
|
||||
{
|
||||
nn_xpack_addmsg (xp, msg, 0);
|
||||
if (hbansreq >= 2)
|
||||
nn_xpack_send (xp, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -860,7 +840,8 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
|
|||
{
|
||||
/* on entry: &wr->e.lock held; on exit: lock no longer held */
|
||||
struct ddsi_domaingv const * const gv = wr->e.gv;
|
||||
struct nn_xmsg *fmsg;
|
||||
struct nn_xmsg *hmsg = NULL;
|
||||
int hbansreq = 0;
|
||||
uint32_t sz;
|
||||
assert(xp);
|
||||
assert((wr->heartbeat_xevent != NULL) == (whcst != NULL));
|
||||
|
@ -868,35 +849,41 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
|
|||
sz = ddsi_serdata_size (serdata);
|
||||
if (sz > gv->config.fragment_size || !isnew || plist != NULL || prd != NULL || q_omg_writer_is_submessage_protected(wr))
|
||||
{
|
||||
uint32_t nfrags;
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
nfrags = (sz + gv->config.fragment_size - 1) / gv->config.fragment_size;
|
||||
transmit_sample_lgmsg_unlocked (xp, wr, whcst, seq, plist, serdata, prd, isnew, nfrags);
|
||||
return;
|
||||
}
|
||||
else if (create_fragment_message_simple (wr, seq, serdata, &fmsg) < 0)
|
||||
{
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
return;
|
||||
assert (wr->init_burst_size_limit <= UINT32_MAX - UINT16_MAX);
|
||||
assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX);
|
||||
const uint32_t max_burst_size = isnew ? wr->init_burst_size_limit : wr->rexmit_burst_size_limit;
|
||||
const uint32_t nfrags = (sz + gv->config.fragment_size - 1) / gv->config.fragment_size;
|
||||
uint32_t nfrags_lim;
|
||||
if (sz <= max_burst_size || wr->num_reliable_readers != wr->num_readers)
|
||||
nfrags_lim = nfrags; // if it fits or if there are best-effort readers, send it in its entirety
|
||||
else
|
||||
nfrags_lim = (max_burst_size + gv->config.fragment_size - 1) / gv->config.fragment_size;
|
||||
|
||||
transmit_sample_lgmsg_unlocks_wr (xp, wr, seq, plist, serdata, prd, isnew, nfrags, nfrags_lim);
|
||||
}
|
||||
else
|
||||
{
|
||||
int hbansreq = 0;
|
||||
struct nn_xmsg *hmsg;
|
||||
struct nn_xmsg *fmsg;
|
||||
if (create_fragment_message_simple (wr, seq, serdata, &fmsg) >= 0)
|
||||
nn_xpack_addmsg (xp, fmsg, 0);
|
||||
}
|
||||
|
||||
/* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */
|
||||
if (wr->heartbeat_xevent)
|
||||
hmsg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq);
|
||||
else
|
||||
hmsg = NULL;
|
||||
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
nn_xpack_addmsg (xp, fmsg, 0);
|
||||
|
||||
if(hmsg)
|
||||
nn_xpack_addmsg (xp, hmsg, 0);
|
||||
if (hbansreq >= 2)
|
||||
nn_xpack_send (xp, true);
|
||||
}
|
||||
}
|
||||
|
||||
void enqueue_spdp_sample_wrlock_held (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct proxy_reader *prd)
|
||||
{
|
||||
assert (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
|
||||
struct nn_xmsg *msg = NULL;
|
||||
if (create_fragment_message(wr, seq, NULL, serdata, 0, UINT16_MAX, prd, &msg, 1, UINT32_MAX) >= 0)
|
||||
qxev_msg (wr->evq, msg);
|
||||
}
|
||||
|
||||
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew)
|
||||
|
@ -914,6 +901,8 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct dds
|
|||
/* end-of-transaction messages are empty, but still need to be sent */
|
||||
nfrags = 1;
|
||||
}
|
||||
if (!isnew && nfrags > 1)
|
||||
nfrags = 1;
|
||||
for (i = 0; i < nfrags && enqueued; i++)
|
||||
{
|
||||
struct nn_xmsg *fmsg = NULL;
|
||||
|
@ -922,7 +911,7 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct dds
|
|||
eventually we'll have to retry. But if a packet went out and
|
||||
we haven't yet completed transmitting a fragmented message, add
|
||||
a HeartbeatFrag. */
|
||||
if (create_fragment_message (wr, seq, plist, serdata, i, prd, &fmsg, isnew) >= 0)
|
||||
if (create_fragment_message (wr, seq, plist, serdata, i, 1, prd, &fmsg, isnew, (i+1) == nfrags ? i : UINT32_MAX) >= 0)
|
||||
{
|
||||
if (nfrags > 1 && i + 1 < nfrags)
|
||||
create_HeartbeatFrag (wr, seq, i, prd, &hmsg);
|
||||
|
@ -1374,6 +1363,9 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *
|
|||
{
|
||||
if (wr->heartbeat_xevent)
|
||||
writer_hbcontrol_note_asyncwrite (wr, tnow);
|
||||
if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
|
||||
enqueue_spdp_sample_wrlock_held(wr, seq, serdata, NULL);
|
||||
else
|
||||
enqueue_sample_wrlock_held (wr, seq, plist, serdata, NULL, 1);
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
}
|
||||
|
|
|
@ -1129,7 +1129,7 @@ static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t
|
|||
updating of the last transmitted sequence number won't take
|
||||
place anyway. Nor is it necessary to fiddle with heartbeat
|
||||
control stuff. */
|
||||
enqueue_sample_wrlock_held (wr, sample.seq, sample.plist, sample.serdata, prd, 1);
|
||||
enqueue_spdp_sample_wrlock_held (wr, sample.seq, sample.serdata, prd);
|
||||
whc_return_sample(wr->whc, &sample, false);
|
||||
}
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
|
|
|
@ -220,6 +220,7 @@ struct nn_xpack
|
|||
} all;
|
||||
} dstaddr;
|
||||
|
||||
bool includes_rexmit;
|
||||
struct nn_xmsg_chain included_msgs;
|
||||
|
||||
#ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING
|
||||
|
@ -1145,6 +1146,7 @@ static void nn_xpack_reinit (struct nn_xpack *xp)
|
|||
xp->niov = 0;
|
||||
xp->call_flags = 0;
|
||||
xp->msg_len.length = 0;
|
||||
xp->includes_rexmit = false;
|
||||
xp->included_msgs.latest = NULL;
|
||||
xp->maxdelay = DDS_INFINITY;
|
||||
#ifdef DDSI_INCLUDE_SECURITY
|
||||
|
@ -1535,9 +1537,24 @@ static int addressing_info_eq_onesidederr (const struct nn_xpack *xp, const stru
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int nn_xmsg_is_rexmit (const struct nn_xmsg *m)
|
||||
{
|
||||
switch (m->kind)
|
||||
{
|
||||
case NN_XMSG_KIND_DATA:
|
||||
case NN_XMSG_KIND_CONTROL:
|
||||
return 0;
|
||||
case NN_XMSG_KIND_DATA_REXMIT:
|
||||
case NN_XMSG_KIND_DATA_REXMIT_NOMERGE:
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int nn_xpack_mayaddmsg (const struct nn_xpack *xp, const struct nn_xmsg *m, const uint32_t flags)
|
||||
{
|
||||
unsigned max_msg_size = xp->gv->config.max_msg_size;
|
||||
const bool rexmit = xp->includes_rexmit || nn_xmsg_is_rexmit (m);
|
||||
const unsigned max_msg_size = rexmit ? xp->gv->config.max_rexmit_msg_size : xp->gv->config.max_msg_size;
|
||||
unsigned payload_size;
|
||||
|
||||
if (xp->niov == 0)
|
||||
|
@ -1758,10 +1775,12 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag
|
|||
xp->msg_len.length = (uint32_t) sz;
|
||||
xp->niov = niov;
|
||||
|
||||
if (xpo_niov > 0 && sz > xp->gv->config.max_msg_size)
|
||||
const bool rexmit = xp->includes_rexmit || nn_xmsg_is_rexmit (m);
|
||||
const unsigned max_msg_size = rexmit ? xp->gv->config.max_rexmit_msg_size : xp->gv->config.max_msg_size;
|
||||
if (xpo_niov > 0 && sz > max_msg_size)
|
||||
{
|
||||
GVTRACE (" => now niov %d sz %"PRIuSIZE" > max_msg_size %"PRIu32", nn_xpack_send niov %d sz %"PRIu32" now\n",
|
||||
(int) niov, sz, gv->config.max_msg_size, (int) xpo_niov, xpo_sz);
|
||||
(int) niov, sz, max_msg_size, (int) xpo_niov, xpo_sz);
|
||||
xp->msg_len.length = xpo_sz;
|
||||
xp->niov = xpo_niov;
|
||||
nn_xpack_send (xp, false);
|
||||
|
@ -1770,6 +1789,15 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag
|
|||
else
|
||||
{
|
||||
xp->call_flags = flags;
|
||||
switch (m->kind)
|
||||
{
|
||||
case NN_XMSG_KIND_DATA:
|
||||
case NN_XMSG_KIND_CONTROL:
|
||||
break;
|
||||
case NN_XMSG_KIND_DATA_REXMIT:
|
||||
case NN_XMSG_KIND_DATA_REXMIT_NOMERGE:
|
||||
xp->includes_rexmit = true;
|
||||
}
|
||||
nn_xmsg_chain_add (&xp->included_msgs, m);
|
||||
GVTRACE (" => now niov %d sz %"PRIuSIZE"\n", (int) niov, sz);
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
exitcode=0
|
||||
# RSS/samples/roundtrip numbers are based on experimentation on Travis
|
||||
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$!
|
||||
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 pub & ddsperf_pids="$ddsperf_pids $!"
|
||||
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:20% -Qrss:1 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$!
|
||||
bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:20% -Qrss:1 pub 100Hz burst 1000 & ddsperf_pids="$ddsperf_pids $!"
|
||||
sleep 11
|
||||
for pid in $ddsperf_pids ; do
|
||||
if kill -0 $pid 2>/dev/null ; then
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue