diff --git a/docs/manual/options.md b/docs/manual/options.md index fc440b8..451b8b5 100644 --- a/docs/manual/options.md +++ b/docs/manual/options.md @@ -250,7 +250,7 @@ The default value is: "". ### //CycloneDDS/Domain/General -Children: [AllowMulticast](#cycloneddsdomaingeneralallowmulticast), [DontRoute](#cycloneddsdomaingeneraldontroute), [EnableMulticastLoopback](#cycloneddsdomaingeneralenablemulticastloopback), [ExternalNetworkAddress](#cycloneddsdomaingeneralexternalnetworkaddress), [ExternalNetworkMask](#cycloneddsdomaingeneralexternalnetworkmask), [FragmentSize](#cycloneddsdomaingeneralfragmentsize), [MaxMessageSize](#cycloneddsdomaingeneralmaxmessagesize), [MulticastRecvNetworkInterfaceAddresses](#cycloneddsdomaingeneralmulticastrecvnetworkinterfaceaddresses), [MulticastTimeToLive](#cycloneddsdomaingeneralmulticasttimetolive), [NetworkInterfaceAddress](#cycloneddsdomaingeneralnetworkinterfaceaddress), [PreferMulticast](#cycloneddsdomaingeneralprefermulticast), [Transport](#cycloneddsdomaingeneraltransport), [UseIPv6](#cycloneddsdomaingeneraluseipv) +Children: [AllowMulticast](#cycloneddsdomaingeneralallowmulticast), [DontRoute](#cycloneddsdomaingeneraldontroute), [EnableMulticastLoopback](#cycloneddsdomaingeneralenablemulticastloopback), [ExternalNetworkAddress](#cycloneddsdomaingeneralexternalnetworkaddress), [ExternalNetworkMask](#cycloneddsdomaingeneralexternalnetworkmask), [FragmentSize](#cycloneddsdomaingeneralfragmentsize), [MaxMessageSize](#cycloneddsdomaingeneralmaxmessagesize), [MaxRexmitMessageSize](#cycloneddsdomaingeneralmaxrexmitmessagesize), [MulticastRecvNetworkInterfaceAddresses](#cycloneddsdomaingeneralmulticastrecvnetworkinterfaceaddresses), [MulticastTimeToLive](#cycloneddsdomaingeneralmulticasttimetolive), [NetworkInterfaceAddress](#cycloneddsdomaingeneralnetworkinterfaceaddress), [PreferMulticast](#cycloneddsdomaingeneralprefermulticast), [Transport](#cycloneddsdomaingeneraltransport), [UseIPv6](#cycloneddsdomaingeneraluseipv) The General element specifies overall Cyclone DDS service settings. @@ -317,7 +317,7 @@ This element specifies the size of DDSI sample fragments generated by Cyclone DD The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes). -The default value is: "1280 B". +The default value is: "1344 B". #### //CycloneDDS/Domain/General/MaxMessageSize @@ -325,11 +325,23 @@ Number-with-unit This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B). -On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize. +On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes). -The default value is: "4096 B". +The default value is: "14720 B". + + +#### //CycloneDDS/Domain/General/MaxRexmitMessageSize +Number-with-unit + +This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B). + +On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. + +The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes). + +The default value is: "1456 B". #### //CycloneDDS/Domain/General/MulticastRecvNetworkInterfaceAddresses @@ -394,7 +406,7 @@ The default value is: "default". ### //CycloneDDS/Domain/Internal -Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration) +Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [BurstSize](#cycloneddsdomaininternalburstsize), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsidirectmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration) The Internal elements deal with a variety of settings that evolving and that are not necessarily fully supported. For the vast majority of the Internal settings, the functionality per-se is supported, but the right to change the way the options control the functionality is reserved. This includes renaming or moving options. @@ -440,6 +452,32 @@ The default is writers, as this is thought to be compliant and reasonably effici The default value is: "writers". +#### //CycloneDDS/Domain/Internal/BurstSize +Children: [MaxInitTransmit](#cycloneddsdomaininternalburstsizemaxinittransmit), [MaxRexmit](#cycloneddsdomaininternalburstsizemaxrexmit) + +Setting for controlling the size of transmit bursts. + + +##### //CycloneDDS/Domain/Internal/BurstSize/MaxInitTransmit +Number-with-unit + +This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well. + +The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes). + +The default value is: "4294967295". + + +##### //CycloneDDS/Domain/Internal/BurstSize/MaxRexmit +Number-with-unit + +This element specifies the amount of data to be retransmitted in response to one NACK. + +The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (2^10 bytes), MB & MiB (2^20 bytes), GB & GiB (2^30 bytes). + +The default value is: "1 MiB". + + #### //CycloneDDS/Domain/Internal/ControlTopic The ControlTopic element allows configured whether Cyclone DDS provides a special control interface via a predefined topic or not. @@ -1695,4 +1733,4 @@ While none prevents any message from being written to a DDSI2 log file. The categorisation of tracing output is incomplete and hence most of the verbosity levels and categories are not of much use in the current release. This is an ongoing process and here we describe the target situation rather than the current situation. Currently, the most useful verbosity levels are config, fine and finest. -The default value is: "none". +The default value is: "none". \ No newline at end of file diff --git a/etc/cyclonedds.rnc b/etc/cyclonedds.rnc index 03c6bf5..0d9aa14 100644 --- a/etc/cyclonedds.rnc +++ b/etc/cyclonedds.rnc @@ -229,13 +229,21 @@ CycloneDDS configuration""" ] ] }? & [ a:documentation [ xml:lang="en" """
This element specifies the maximum size of the UDP payload that Cyclone DDS will generate. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values of MaxMessageSize) larger payloads may sporadically be observed (currently up to 1192 B).
-On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation. In those cases, it is generally advisable to also consider reducing Internal/FragmentSize.
+On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.
The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (210 bytes), MB & MiB (220 bytes), GB & GiB (230 bytes).
-The default value is: "4096 B".
""" ] ] +The default value is: "14720 B".
""" ] ] element MaxMessageSize { memsize }? & [ a:documentation [ xml:lang="en" """ +This element specifies the maximum size of the UDP payload that Cyclone DDS will generate for a retransmit. Cyclone DDS will try to maintain this limit within the bounds of the DDSI specification, which means that in some cases (especially for very low values) larger payloads may sporadically be observed (currently up to 1192 B).
+On some networks it may be necessary to set this item to keep the packetsize below the MTU to prevent IP fragmentation.
+The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (210 bytes), MB & MiB (220 bytes), GB & GiB (230 bytes).
+The default value is: "1456 B".
""" ] ] + element MaxRexmitMessageSize { + memsize + }? + & [ a:documentation [ xml:lang="en" """This element specifies on which network interfaces Cyclone DDS listens to multicasts. The following options are available:
Setting for controlling the size of transmit bursts.
""" ] ] + element BurstSize { + [ a:documentation [ xml:lang="en" """ +This element specifies how much more than the (presumed or discovered) receive buffer size may be sent when transmitting a sample for the first time, expressed as a percentage; the remainder will then be handled via retransmits. Usually the receivers can keep up with transmitter, at least on average, and so generally it is better to hope for the best and recover. Besides, the retransmits will be unicast, and so any multicast advantage will be lost as well.
+The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (210 bytes), MB & MiB (220 bytes), GB & GiB (230 bytes).
+The default value is: "4294967295".
""" ] ] + element MaxInitTransmit { + memsize + }? + & [ a:documentation [ xml:lang="en" """ +This element specifies the amount of data to be retransmitted in response to one NACK.
+The unit must be specified explicitly. Recognised units: B (bytes), kB & KiB (210 bytes), MB & MiB (220 bytes), GB & GiB (230 bytes).
+The default value is: "1 MiB".
""" ] ] + element MaxRexmit { + memsize + }? + }? + & [ a:documentation [ xml:lang="en" """The ControlTopic element allows configured whether Cyclone DDS provides a special control interface via a predefined topic or not.
""" ] ]
element ControlTopic {
empty
diff --git a/etc/cyclonedds.xsd b/etc/cyclonedds.xsd
index aa4ad02..749f867 100644
--- a/etc/cyclonedds.xsd
+++ b/etc/cyclonedds.xsd
@@ -313,6 +313,7 @@ CycloneDDS configuration
On some networks it may be necessary to set this item to keep the " - "packetsize below the MTU to prevent IP fragmentation. In those cases, " - "it is generally advisable to also consider reducing " - "Internal/FragmentSize.
"), + "packetsize below the MTU to prevent IP fragmentation."), + UNIT("memsize")), + STRING("MaxRexmitMessageSize", NULL, 1, "1456 B", + MEMBER(max_rexmit_msg_size), + FUNCTIONS(0, uf_memsize, 0, pf_memsize), + DESCRIPTION( + "This element specifies the maximum size of the UDP payload that " + "Cyclone DDS will generate for a retransmit. Cyclone DDS will try to " + "maintain this limit within the bounds of the DDSI specification, which " + "means that in some cases (especially for very low values) larger payloads " + "may sporadically be observed (currently up to 1192 B).
\n" + "On some networks it may be necessary to set this item to keep the " + "packetsize below the MTU to prevent IP fragmentation.
"), UNIT("memsize")), STRING("FragmentSize", NULL, 1, "1344 B", MEMBER(fragment_size), @@ -856,6 +866,29 @@ static struct cfgelem internal_watermarks_cfgelems[] = { END_MARKER }; +static struct cfgelem internal_burstsize_cfgelems[] = { + STRING("MaxRexmit", NULL, 1, "1 MiB", + MEMBER(max_rexmit_burst_size), + FUNCTIONS(0, uf_memsize, 0, pf_memsize), + DESCRIPTION( + "This element specifies the amount of data to be retransmitted in " + "response to one NACK.
"), + UNIT("memsize")), + STRING("MaxInitTransmit", NULL, 1, "4294967295", + MEMBER(init_transmit_extra_pct), + FUNCTIONS(0, uf_uint, 0, pf_uint), + DESCRIPTION( + "This element specifies how much more than the (presumed or discovered) " + "receive buffer size may be sent when transmitting a sample for the first " + "time, expressed as a percentage; the remainder will then be handled via " + "retransmits. Usually the receivers can keep up with transmitter, at least " + "on average, and so generally it is better to hope for the best and recover. " + "Besides, the retransmits will be unicast, and so any multicast advantage " + "will be lost as well.
"), + UNIT("memsize")), + END_MARKER +}; + static struct cfgelem control_topic_cfgattrs[] = { BOOL(DEPRECATED("Enable"), NULL, 1, "false", MEMBER(enable_control_topic), @@ -1369,6 +1402,10 @@ static struct cfgelem internal_cfgelems[] = { NOMEMBER, NOFUNCTIONS, DESCRIPTION("Watermarks for flow-control.
")), + GROUP("BurstSize", internal_burstsize_cfgelems, NULL, 1, + NOMEMBER, + NOFUNCTIONS, + DESCRIPTION("Setting for controlling the size of transmit bursts.
")), LIST("EnableExpensiveChecks", NULL, 1, "", MEMBER(enabled_xchecks), FUNCTIONS(0, uf_xcheck, 0, pf_xcheck), diff --git a/src/core/ddsi/include/dds/ddsi/q_config.h b/src/core/ddsi/include/dds/ddsi/q_config.h index e2ddf37..a73ff29 100644 --- a/src/core/ddsi/include/dds/ddsi/q_config.h +++ b/src/core/ddsi/include/dds/ddsi/q_config.h @@ -264,6 +264,9 @@ struct config uint16_t fragment_size; uint32_t max_msg_size; + uint32_t max_rexmit_msg_size; + uint32_t init_transmit_extra_pct; + uint32_t max_rexmit_burst_size; int publish_uc_locators; /* Publish discovery unicast locators */ int enable_uc_locators; /* If false, don't even try to create a unicast socket */ diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index ce0e19d..2dfa554 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -301,6 +301,8 @@ struct writer uint32_t whc_low, whc_high; /* watermarks for WHC in bytes (counting only unack'd data) */ ddsrt_etime_t t_rexmit_end; /* time of last 1->0 transition of "retransmitting" */ ddsrt_etime_t t_whc_high_upd; /* time "whc_high" was last updated for controlled ramp-up of throughput */ + uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */ + uint32_t rexmit_burst_size_limit; /* derived from reader's receive_buffer_size */ uint32_t num_readers; /* total number of matching PROXY readers */ int32_t num_reliable_readers; /* number of matching reliable PROXY readers */ ddsrt_avl_tree_t readers; /* all matching PROXY readers, see struct wr_prd_match */ diff --git a/src/core/ddsi/include/dds/ddsi/q_transmit.h b/src/core/ddsi/include/dds/ddsi/q_transmit.h index 62feeda..35c0d75 100644 --- a/src/core/ddsi/include/dds/ddsi/q_transmit.h +++ b/src/core/ddsi/include/dds/ddsi/q_transmit.h @@ -40,8 +40,9 @@ int write_sample_gc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata); /* When calling the following functions, wr->lock must be held */ -dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew); +dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, uint32_t fragnum, uint16_t nfrags, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew, uint32_t advertised_fragnum); int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew); +void enqueue_spdp_sample_wrlock_held (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct proxy_reader *prd); void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, int hbliveliness, ddsi_entityid_t dst, int issync); dds_return_t write_hb_liveliness (struct ddsi_domaingv * const gv, struct ddsi_guid *wr_guid, struct nn_xpack *xp); int write_sample_p2p_wrlock_held(struct writer *wr, seqno_t seq, struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, struct proxy_reader *prd); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index d0b6c8e..35dbc42 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -1454,7 +1454,7 @@ static int rebuild_compare_locs(const void *va, const void *vb) } } -static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr) +static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr, uint32_t *min_receive_buffer_size) { struct addrset *all_addrs = new_addrset(); struct entity_index *gh = wr->e.gv->entity_index; @@ -1471,6 +1471,8 @@ static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr) if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) == NULL) continue; (*nreaders)++; + if (prd->receive_buffer_size < *min_receive_buffer_size) + *min_receive_buffer_size = prd->receive_buffer_size; copy_addrset_into_addrset(wr->e.gv, all_addrs, prd->c.as); } if (addrset_empty(all_addrs) || *nreaders == 0) @@ -1677,7 +1679,7 @@ static void rebuild_drop(int locidx, int nreaders, int nlocs, int *locs_nrds, in } } -static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer *wr) +static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer *wr, uint32_t *min_receive_buffer_size) { bool prefer_multicast = wr->e.gv->config.prefer_multicast; struct addrset *all_addrs; @@ -1686,7 +1688,7 @@ static void rebuild_writer_addrset_setcover(struct addrset *newas, struct writer int *locs_nrds; int8_t *covered; int best; - if ((all_addrs = rebuild_make_all_addrs(&nreaders, wr)) == NULL) + if ((all_addrs = rebuild_make_all_addrs(&nreaders, wr, min_receive_buffer_size)) == NULL) return; nn_log_addrset(wr->e.gv, DDS_LC_DISCOVERY, "setcover: all_addrs", all_addrs); ELOGDISC (wr, "\n"); @@ -1715,12 +1717,39 @@ static void rebuild_writer_addrset (struct writer *wr) /* FIXME way too inefficient in this form */ struct addrset *newas = new_addrset (); struct addrset *oldas = wr->as; + uint32_t min_receive_buffer_size = UINT32_MAX; /* only one operation at a time */ ASSERT_MUTEX_HELD (&wr->e.lock); /* compute new addrset */ - rebuild_writer_addrset_setcover(newas, wr); + rebuild_writer_addrset_setcover(newas, wr, &min_receive_buffer_size); + + /* Modifying burst size limit here is a bit of a hack; but anyway ... + try to limit bursts of retransmits to 67% of the smallest receive + buffer, and those of initial transmissions to that + overshoot%. + It is usually best to send the full sample initially, always: + - if the receivers manage to keep up somewhat, sending it in one + go and then recovering anything lost is way faster then sending + only small batches + - the way things are now: the retransmits will be sent unicast, + so if there are multiple receivers, that'll blow up things by + a non-trivial amount */ + wr->rexmit_burst_size_limit = min_receive_buffer_size - min_receive_buffer_size / 3; + if (wr->rexmit_burst_size_limit < 1024) + wr->rexmit_burst_size_limit = 1024; + if (wr->rexmit_burst_size_limit > wr->e.gv->config.max_rexmit_burst_size) + wr->rexmit_burst_size_limit = wr->e.gv->config.max_rexmit_burst_size; + if (wr->rexmit_burst_size_limit > UINT32_MAX - UINT16_MAX) + wr->rexmit_burst_size_limit = UINT32_MAX - UINT16_MAX; + + const uint64_t limit64 = (uint64_t) wr->e.gv->config.init_transmit_extra_pct * (uint64_t) min_receive_buffer_size / 100; + if (limit64 > UINT32_MAX - UINT16_MAX) + wr->init_burst_size_limit = UINT32_MAX - UINT16_MAX; + else if (limit64 < wr->rexmit_burst_size_limit) + wr->init_burst_size_limit = wr->rexmit_burst_size_limit; + else + wr->init_burst_size_limit = (uint32_t) limit64; /* swap in new address set; this simple procedure is ok as long as wr->as is never accessed without the wr->e.lock held */ @@ -1729,7 +1758,7 @@ static void rebuild_writer_addrset (struct writer *wr) ELOGDISC (wr, "rebuild_writer_addrset("PGUIDFMT"):", PGUID (wr->e.guid)); nn_log_addrset(wr->e.gv, DDS_LC_DISCOVERY, "", wr->as); - ELOGDISC (wr, "\n"); + ELOGDISC (wr, " (burst size %"PRIu32" rexmit %"PRIu32")\n", wr->init_burst_size_limit, wr->rexmit_burst_size_limit); } void rebuild_or_clear_writer_addrsets (struct ddsi_domaingv *gv, int rebuild) @@ -3598,6 +3627,8 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se wr->force_md5_keyhash = 0; wr->alive = 1; wr->alive_vclock = 0; + wr->init_burst_size_limit = UINT32_MAX - UINT16_MAX; + wr->rexmit_burst_size_limit = UINT32_MAX - UINT16_MAX; wr->status_cb = status_cb; wr->status_cb_entity = status_entity; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 1fbb48a..453b353 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -921,7 +921,8 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const nn_gap_info_init(&gi); const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq; const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0; - for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++) + uint32_t limit = wr->rexmit_burst_size_limit; + for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued && limit > 0; i++) { /* Accelerated schedule may run ahead of sequence number set contained in the acknack, and assumes all messages beyond the @@ -949,6 +950,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const max_seq_in_reply = seqbase + i; msgs_sent++; sample.last_rexmit_ts = tstamp; + // FIXME: now enqueue_sample_wrlock_held limits retransmit requests of a large sample to 1 fragment + // thus we can easily figure out how much was sent, but we shouldn't have that knowledge here: + // it should return how much it queued instead + uint32_t sent = ddsi_serdata_size (sample.serdata); + if (sent > wr->e.gv->config.fragment_size) + sent = wr->e.gv->config.fragment_size; + limit = (sent > limit) ? 0 : limit - sent; } } else @@ -972,6 +980,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const max_seq_in_reply = seqbase + i; msgs_sent++; sample.rexmit_count++; + // FIXME: now enqueue_sample_wrlock_held limits retransmit requests of a large sample to 1 fragment + // thus we can easily figure out how much was sent, but we shouldn't have that knowledge here: + // it should return how much it queued instead + uint32_t sent = ddsi_serdata_size (sample.serdata); + if (sent > wr->e.gv->config.fragment_size) + sent = wr->e.gv->config.fragment_size; + limit = (sent > limit) ? 0 : limit - sent; } } } @@ -1544,18 +1559,21 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons a Gap if we don't have them anymore. */ if (whc_borrow_sample (wr->whc, seq, &sample)) { - const unsigned base = msg->fragmentNumberState.bitmap_base - 1; - int enqueued = 1; + const uint32_t base = msg->fragmentNumberState.bitmap_base - 1; + assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX); + uint32_t nfrags_lim = (wr->rexmit_burst_size_limit + wr->e.gv->config.fragment_size - 1) / wr->e.gv->config.fragment_size; RSTTRACE (" scheduling requested frags ...\n"); - for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && enqueued; i++) + for (uint32_t i = 0; i < msg->fragmentNumberState.numbits && nfrags_lim > 0; i++) { if (nn_bitset_isset (msg->fragmentNumberState.numbits, msg->bits, i)) { struct nn_xmsg *reply; - if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, prd, &reply, 0) < 0) - enqueued = 0; + if (create_fragment_message (wr, seq, sample.plist, sample.serdata, base + i, 1, prd, &reply, 0, 0) < 0) + nfrags_lim = 0; + else if (!qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0)) + nfrags_lim = 0; else - enqueued = qxev_msg_rexmit_wrlock_held (wr->evq, reply, 0); + nfrags_lim--; } } whc_return_sample (wr->whc, &sample, false); diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 24b8779..7f97039 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -512,7 +512,7 @@ static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t s return 0; } -dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew) +dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, uint32_t fragnum, uint16_t nfrags, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew, uint32_t advertised_fragnum) { /* We always fragment into FRAGMENT_SIZEd fragments, which are near the smallest allowed fragment size & can't be bothered (yet) to @@ -550,7 +550,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru return DDS_RETCODE_BAD_PARAMETER; } - fragging = (gv->config.fragment_size < size); + fragging = (nfrags * (uint32_t) gv->config.fragment_size < size); /* INFO_TS: 12 bytes, DataFrag_t: 36 bytes, expected inline QoS: 32 => should be single chunk */ if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL) @@ -616,14 +616,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru ddcmn->smhdr.flags = (unsigned char) (ddcmn->smhdr.flags | contentflag); frag->fragmentStartingNum = fragnum + 1; - frag->fragmentsInSubmessage = 1; - -#if MULTIPLE_FRAGS_IN_SUBMSG /* ugly hack for testing only */ - if (fragstart + gv->config.fragment_size < ddsi_serdata_size (serdata) && - fragstart + 2 * gv->config.fragment_size >= ddsi_serdata_size (serdata)) - frag->fragmentsInSubmessage++; - ret = frag->fragmentsInSubmessage; -#endif + frag->fragmentsInSubmessage = nfrags; frag->fragmentSize = gv->config.fragment_size; frag->sampleSize = (uint32_t) size; @@ -633,13 +626,13 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru fraglen = (uint32_t) (size - fragstart); ddcmn->octetsToInlineQos = (unsigned short) ((char*) (frag+1) - ((char*) &ddcmn->octetsToInlineQos + 2)); - if (wr->reliable && (!isnew || fragstart + fraglen == ddsi_serdata_size (serdata))) + if (wr->reliable && (!isnew || advertised_fragnum != UINT32_MAX)) { /* only set for final fragment for new messages; for rexmits we want it set for all so we can do merging. FIXME: I guess the writer should track both seq_xmit and the fragment number ... */ - nn_xmsg_setwriterseq_fragid (*pmsg, &wr->e.guid, seq, fragnum + frag->fragmentsInSubmessage - 1); + nn_xmsg_setwriterseq_fragid (*pmsg, &wr->e.guid, seq, isnew ? advertised_fragnum : fragnum + frag->fragmentsInSubmessage - 1); } } @@ -799,15 +792,19 @@ static int must_skip_frag (const char *frags_to_skip, unsigned frag) } #endif -static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew, uint32_t nfrags) +static void transmit_sample_lgmsg_unlocks_wr (struct nn_xpack *xp, struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew, uint32_t nfrags, uint32_t nfrags_lim) { #if 0 const char *frags_to_skip = getenv ("SKIPFRAGS"); #endif assert(xp); - assert((wr->heartbeat_xevent != NULL) == (whcst != NULL)); - - for (uint32_t i = 0; i < nfrags; i++) + assert(0 < nfrags_lim && nfrags_lim <= nfrags); + uint32_t nf_in_submsg = isnew ? (wr->e.gv->config.max_msg_size / wr->e.gv->config.fragment_size) : 1; + if (nf_in_submsg == 0) + nf_in_submsg = 1; + else if (nf_in_submsg > UINT16_MAX) + nf_in_submsg = UINT16_MAX; + for (uint32_t i = 0; i < nfrags_lim; i += nf_in_submsg) { struct nn_xmsg *fmsg = NULL; struct nn_xmsg *hmsg = NULL; @@ -816,43 +813,26 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer * if (must_skip_frag (frags_to_skip, i)) continue; #endif + + if (nf_in_submsg > nfrags_lim - i) + nf_in_submsg = nfrags_lim - i; + /* Ignore out-of-memory errors: we can't do anything about it, and eventually we'll have to retry. But if a packet went out and we haven't yet completed transmitting a fragmented message, add a HeartbeatFrag. */ - ddsrt_mutex_lock (&wr->e.lock); - ret = create_fragment_message (wr, seq, plist, serdata, i, prd, &fmsg, isnew); - if (ret >= 0) + ret = create_fragment_message (wr, seq, plist, serdata, i, (uint16_t) nf_in_submsg, prd, &fmsg, isnew, i + nf_in_submsg == nfrags_lim ? nfrags - 1 : UINT32_MAX); + if (ret >= 0 && i + nf_in_submsg < nfrags_lim && wr->heartbeat_xevent) { - if (nfrags > 1 && i + 1 < nfrags) - create_HeartbeatFrag (wr, seq, i, prd, &hmsg); + // more fragment messages to come + create_HeartbeatFrag (wr, seq, i + nf_in_submsg - 1, prd, &hmsg); } ddsrt_mutex_unlock (&wr->e.lock); if(fmsg) nn_xpack_addmsg (xp, fmsg, 0); if(hmsg) nn_xpack_addmsg (xp, hmsg, 0); -#if MULTIPLE_FRAGS_IN_SUBMSG /* ugly hack for testing only */ - if (ret > 1) - i += ret-1; -#endif - } - - /* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */ - if (wr->heartbeat_xevent) - { - struct nn_xmsg *msg = NULL; - int hbansreq; - assert (whcst != NULL); ddsrt_mutex_lock (&wr->e.lock); - msg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq); - ddsrt_mutex_unlock (&wr->e.lock); - if (msg) - { - nn_xpack_addmsg (xp, msg, 0); - if (hbansreq >= 2) - nn_xpack_send (xp, true); - } } } @@ -860,7 +840,8 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, { /* on entry: &wr->e.lock held; on exit: lock no longer held */ struct ddsi_domaingv const * const gv = wr->e.gv; - struct nn_xmsg *fmsg; + struct nn_xmsg *hmsg = NULL; + int hbansreq = 0; uint32_t sz; assert(xp); assert((wr->heartbeat_xevent != NULL) == (whcst != NULL)); @@ -868,35 +849,41 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, sz = ddsi_serdata_size (serdata); if (sz > gv->config.fragment_size || !isnew || plist != NULL || prd != NULL || q_omg_writer_is_submessage_protected(wr)) { - uint32_t nfrags; - ddsrt_mutex_unlock (&wr->e.lock); - nfrags = (sz + gv->config.fragment_size - 1) / gv->config.fragment_size; - transmit_sample_lgmsg_unlocked (xp, wr, whcst, seq, plist, serdata, prd, isnew, nfrags); - return; - } - else if (create_fragment_message_simple (wr, seq, serdata, &fmsg) < 0) - { - ddsrt_mutex_unlock (&wr->e.lock); - return; + assert (wr->init_burst_size_limit <= UINT32_MAX - UINT16_MAX); + assert (wr->rexmit_burst_size_limit <= UINT32_MAX - UINT16_MAX); + const uint32_t max_burst_size = isnew ? wr->init_burst_size_limit : wr->rexmit_burst_size_limit; + const uint32_t nfrags = (sz + gv->config.fragment_size - 1) / gv->config.fragment_size; + uint32_t nfrags_lim; + if (sz <= max_burst_size || wr->num_reliable_readers != wr->num_readers) + nfrags_lim = nfrags; // if it fits or if there are best-effort readers, send it in its entirety + else + nfrags_lim = (max_burst_size + gv->config.fragment_size - 1) / gv->config.fragment_size; + + transmit_sample_lgmsg_unlocks_wr (xp, wr, seq, plist, serdata, prd, isnew, nfrags, nfrags_lim); } else { - int hbansreq = 0; - struct nn_xmsg *hmsg; - - /* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */ - if (wr->heartbeat_xevent) - hmsg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq); - else - hmsg = NULL; - - ddsrt_mutex_unlock (&wr->e.lock); - nn_xpack_addmsg (xp, fmsg, 0); - if(hmsg) - nn_xpack_addmsg (xp, hmsg, 0); - if (hbansreq >= 2) - nn_xpack_send (xp, true); + struct nn_xmsg *fmsg; + if (create_fragment_message_simple (wr, seq, serdata, &fmsg) >= 0) + nn_xpack_addmsg (xp, fmsg, 0); } + + if (wr->heartbeat_xevent) + hmsg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq); + ddsrt_mutex_unlock (&wr->e.lock); + + if(hmsg) + nn_xpack_addmsg (xp, hmsg, 0); + if (hbansreq >= 2) + nn_xpack_send (xp, true); +} + +void enqueue_spdp_sample_wrlock_held (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct proxy_reader *prd) +{ + assert (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); + struct nn_xmsg *msg = NULL; + if (create_fragment_message(wr, seq, NULL, serdata, 0, UINT16_MAX, prd, &msg, 1, UINT32_MAX) >= 0) + qxev_msg (wr->evq, msg); } int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct ddsi_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew) @@ -914,6 +901,8 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct dds /* end-of-transaction messages are empty, but still need to be sent */ nfrags = 1; } + if (!isnew && nfrags > 1) + nfrags = 1; for (i = 0; i < nfrags && enqueued; i++) { struct nn_xmsg *fmsg = NULL; @@ -922,7 +911,7 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct dds eventually we'll have to retry. But if a packet went out and we haven't yet completed transmitting a fragmented message, add a HeartbeatFrag. */ - if (create_fragment_message (wr, seq, plist, serdata, i, prd, &fmsg, isnew) >= 0) + if (create_fragment_message (wr, seq, plist, serdata, i, 1, prd, &fmsg, isnew, (i+1) == nfrags ? i : UINT32_MAX) >= 0) { if (nfrags > 1 && i + 1 < nfrags) create_HeartbeatFrag (wr, seq, i, prd, &hmsg); @@ -1374,7 +1363,10 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack * { if (wr->heartbeat_xevent) writer_hbcontrol_note_asyncwrite (wr, tnow); - enqueue_sample_wrlock_held (wr, seq, plist, serdata, NULL, 1); + if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) + enqueue_spdp_sample_wrlock_held(wr, seq, serdata, NULL); + else + enqueue_sample_wrlock_held (wr, seq, plist, serdata, NULL, 1); ddsrt_mutex_unlock (&wr->e.lock); } diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index c1f7313..eeebbff 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -1125,11 +1125,11 @@ static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t if (sample_found) { /* Claiming it is new rather than a retransmit so that the rexmit - limiting won't kick in. It is best-effort and therefore the - updating of the last transmitted sequence number won't take - place anyway. Nor is it necessary to fiddle with heartbeat - control stuff. */ - enqueue_sample_wrlock_held (wr, sample.seq, sample.plist, sample.serdata, prd, 1); + limiting won't kick in. It is best-effort and therefore the + updating of the last transmitted sequence number won't take + place anyway. Nor is it necessary to fiddle with heartbeat + control stuff. */ + enqueue_spdp_sample_wrlock_held (wr, sample.seq, sample.serdata, prd); whc_return_sample(wr->whc, &sample, false); } ddsrt_mutex_unlock (&wr->e.lock); diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index 70c70e5..043b32b 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -220,6 +220,7 @@ struct nn_xpack } all; } dstaddr; + bool includes_rexmit; struct nn_xmsg_chain included_msgs; #ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING @@ -1145,6 +1146,7 @@ static void nn_xpack_reinit (struct nn_xpack *xp) xp->niov = 0; xp->call_flags = 0; xp->msg_len.length = 0; + xp->includes_rexmit = false; xp->included_msgs.latest = NULL; xp->maxdelay = DDS_INFINITY; #ifdef DDSI_INCLUDE_SECURITY @@ -1535,9 +1537,24 @@ static int addressing_info_eq_onesidederr (const struct nn_xpack *xp, const stru return 0; } +static int nn_xmsg_is_rexmit (const struct nn_xmsg *m) +{ + switch (m->kind) + { + case NN_XMSG_KIND_DATA: + case NN_XMSG_KIND_CONTROL: + return 0; + case NN_XMSG_KIND_DATA_REXMIT: + case NN_XMSG_KIND_DATA_REXMIT_NOMERGE: + return 1; + } + return 0; +} + static int nn_xpack_mayaddmsg (const struct nn_xpack *xp, const struct nn_xmsg *m, const uint32_t flags) { - unsigned max_msg_size = xp->gv->config.max_msg_size; + const bool rexmit = xp->includes_rexmit || nn_xmsg_is_rexmit (m); + const unsigned max_msg_size = rexmit ? xp->gv->config.max_rexmit_msg_size : xp->gv->config.max_msg_size; unsigned payload_size; if (xp->niov == 0) @@ -1758,10 +1775,12 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag xp->msg_len.length = (uint32_t) sz; xp->niov = niov; - if (xpo_niov > 0 && sz > xp->gv->config.max_msg_size) + const bool rexmit = xp->includes_rexmit || nn_xmsg_is_rexmit (m); + const unsigned max_msg_size = rexmit ? xp->gv->config.max_rexmit_msg_size : xp->gv->config.max_msg_size; + if (xpo_niov > 0 && sz > max_msg_size) { GVTRACE (" => now niov %d sz %"PRIuSIZE" > max_msg_size %"PRIu32", nn_xpack_send niov %d sz %"PRIu32" now\n", - (int) niov, sz, gv->config.max_msg_size, (int) xpo_niov, xpo_sz); + (int) niov, sz, max_msg_size, (int) xpo_niov, xpo_sz); xp->msg_len.length = xpo_sz; xp->niov = xpo_niov; nn_xpack_send (xp, false); @@ -1770,6 +1789,15 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag else { xp->call_flags = flags; + switch (m->kind) + { + case NN_XMSG_KIND_DATA: + case NN_XMSG_KIND_CONTROL: + break; + case NN_XMSG_KIND_DATA_REXMIT: + case NN_XMSG_KIND_DATA_REXMIT_NOMERGE: + xp->includes_rexmit = true; + } nn_xmsg_chain_add (&xp->included_msgs, m); GVTRACE (" => now niov %d sz %"PRIuSIZE"\n", (int) niov, sz); } diff --git a/src/tools/ddsperf/sanity.bash b/src/tools/ddsperf/sanity.bash index 45f425e..37da61a 100755 --- a/src/tools/ddsperf/sanity.bash +++ b/src/tools/ddsperf/sanity.bash @@ -1,7 +1,7 @@ exitcode=0 # RSS/samples/roundtrip numbers are based on experimentation on Travis -bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$! -bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 pub & ddsperf_pids="$ddsperf_pids $!" +bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:20% -Qrss:1 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$! +bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:20% -Qrss:1 pub 100Hz burst 1000 & ddsperf_pids="$ddsperf_pids $!" sleep 11 for pid in $ddsperf_pids ; do if kill -0 $pid 2>/dev/null ; then