Default to a single receive thread on Windows

This works around a termination issue on Windows caused by the process
sometimes being unable to send a packet to itself to wake up a thread
stuck in a blocking read on a socket.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-01-22 10:41:02 +01:00 committed by eboasson
parent 263d8016b8
commit bb76798492
7 changed files with 75 additions and 34 deletions

View file

@ -344,7 +344,7 @@ sub conv_to_rnc {
} elsif ($fs->{kstr} eq "Enum") {
die unless exists $enum_values{$fs->{typehint}};
my @vs = split /;/, $enum_values{$fs->{typehint}};
printf $fh "${indent} ${sep}%s\n", (join '|', map { "\"$_\"" } @vs);
printf $fh "${indent} ${sep}(%s)\n", (join '|', map { "\"$_\"" } @vs);
} elsif ($fs->{kstr} eq "Int") {
printf $fh "${indent} ${sep}xsd:integer\n";
#if (exists $range{$lctn} || exists $range{$fs->{typehint}}) {

View file

@ -903,15 +903,17 @@ The default value is: "-1".
#### //CycloneDDS/Domain/Internal/MultipleReceiveThreads
Attributes: [maxretries](#cycloneddsdomaininternalmultiplereceivethreadsmaxretries)
Boolean
One of: false, true, default
This element controls whether all traffic is handled by a single receive
thread or whether multiple receive threads may be used to improve
latency. Currently multiple receive threads are only used for
connectionless transport (e.g., UDP) and ManySocketsMode not set to
single (the default).
thread (false) or whether multiple receive threads may be used to improve
latency (true). By default it is disabled on Windows because it appears
that one cannot count on being able to send packets to oneself, which is
necessary to stop the thread during shutdown. Currently multiple receive
threads are only used for connectionless transport (e.g., UDP) and
ManySocketsMode not set to single (the default).
The default value is: "true".
The default value is: "default".
#### //CycloneDDS/Domain/Internal/MultipleReceiveThreads[@maxretries]

View file

@ -51,7 +51,7 @@ somewhat. It also causes the set of port numbers needed by Cyclone DDS to
become predictable, which may be useful for firewall and NAT
configuration.</p><p>The default value is: &quot;single&quot;.</p>""" ] ]
element ManySocketsMode {
"false"|"true"|"single"|"none"|"many"
("false"|"true"|"single"|"none"|"many")
}?
& [ a:documentation [ xml:lang="en" """
<p>This element sets the level of standards conformance of this instance
@ -78,7 +78,7 @@ though there is no good reason not to.</li></ul>
<p>The default setting is "lax".</p><p>The default value is:
&quot;lax&quot;.</p>""" ] ]
element StandardsConformance {
"lax"|"strict"|"pedantic"
("lax"|"strict"|"pedantic")
}?
}?
& [ a:documentation [ xml:lang="en" """
@ -444,13 +444,13 @@ is available.</p><p>The default value is: &quot;false&quot;.</p>""" ] ]
<p>This element allows selecting the transport to be used (udp, udp6,
tcp, tcp6, raweth)</p><p>The default value is: &quot;default&quot;.</p>""" ] ]
element Transport {
"default"|"udp"|"udp6"|"tcp"|"tcp6"|"raweth"
("default"|"udp"|"udp6"|"tcp"|"tcp6"|"raweth")
}?
& [ a:documentation [ xml:lang="en" """
<p>Deprecated (use Transport instead)</p><p>The default value is:
&quot;default&quot;.</p>""" ] ]
element UseIPv6 {
"false"|"true"|"default"
("false"|"true"|"default")
}?
}?
& [ a:documentation [ xml:lang="en" """
@ -509,7 +509,7 @@ most efficient, and <i>full</i> is inefficient but certain to be
compliant. See also Internal/ConservativeBuiltinReaderStartup.</p><p>The
default value is: &quot;writers&quot;.</p>""" ] ]
element BuiltinEndpointSet {
"full"|"writers"|"minimal"
("full"|"writers"|"minimal")
}?
& [ a:documentation [ xml:lang="en" """
<p>The ControlTopic element allows configured whether Cyclone DDS
@ -735,10 +735,13 @@ is: &quot;-1&quot;.</p>""" ] ]
}?
& [ a:documentation [ xml:lang="en" """
<p>This element controls whether all traffic is handled by a single
receive thread or whether multiple receive threads may be used to improve
latency. Currently multiple receive threads are only used for
connectionless transport (e.g., UDP) and ManySocketsMode not set to
single (the default).</p><p>The default value is: &quot;true&quot;.</p>""" ] ]
receive thread (false) or whether multiple receive threads may be used to
improve latency (true). By default it is disabled on Windows because it
appears that one cannot count on being able to send packets to oneself,
which is necessary to stop the thread during shutdown. Currently multiple
receive threads are only used for connectionless transport (e.g., UDP)
and ManySocketsMode not set to single (the default).</p><p>The default
value is: &quot;default&quot;.</p>""" ] ]
element MultipleReceiveThreads {
[ a:documentation [ xml:lang="en" """
<p>Receive threads dedicated to a single socket can only be triggered for
@ -750,7 +753,7 @@ attribute before aborting.</p><p>The default value is:
attribute maxretries {
xsd:integer
}?
& xsd:boolean
& ("false"|"true"|"default")
}?
& [ a:documentation [ xml:lang="en" """
<p>This setting controls the delay between receipt of a HEARTBEAT
@ -835,7 +838,7 @@ try to merge.</li></ul>
Internal/RetransmitMergingPeriod.</p><p>The default value is:
&quot;never&quot;.</p>""" ] ]
element RetransmitMerging {
"never"|"adaptive"|"always"
("never"|"adaptive"|"always")
}?
& [ a:documentation [ xml:lang="en" """
<p>This setting determines the size of the time window in which a NACK of
@ -1220,7 +1223,7 @@ using TCP.</p><p>The default value is: &quot;false&quot;.</p>""" ] ]
General/Transport instead.</p><p>The default value is:
&quot;default&quot;.</p>""" ] ]
element Enable {
"false"|"true"|"default"
("false"|"true"|"default")
}?
& [ a:documentation [ xml:lang="en" """
<p>This element enables the TCP_NODELAY socket option, preventing
@ -1326,7 +1329,7 @@ from the underlying operating system to be able to assign some of the
privileged scheduling classes.</p><p>The default value is:
&quot;default&quot;.</p>""" ] ]
element Class {
"realtime"|"timeshare"|"default"
("realtime"|"timeshare"|"default")
}?
& [ a:documentation [ xml:lang="en" """
<p>This element specifies the thread priority (decimal integer or
@ -1467,7 +1470,7 @@ situation rather than the current situation. Currently, the most useful
verbosity levels are <i>config</i>, <i>fine</i> and
<i>finest</i>.</p><p>The default value is: &quot;none&quot;.</p>""" ] ]
element Verbosity {
"finest"|"finer"|"fine"|"config"|"info"|"warning"|"severe"|"none"
("finest"|"finer"|"fine"|"config"|"info"|"warning"|"severe"|"none")
}?
}?
}?

View file

@ -1013,14 +1013,24 @@ is: &amp;quot;-1&amp;quot;.&lt;/p&gt;</xs:documentation>
<xs:annotation>
<xs:documentation>
&lt;p&gt;This element controls whether all traffic is handled by a single
receive thread or whether multiple receive threads may be used to improve
latency. Currently multiple receive threads are only used for
connectionless transport (e.g., UDP) and ManySocketsMode not set to
single (the default).&lt;/p&gt;&lt;p&gt;The default value is: &amp;quot;true&amp;quot;.&lt;/p&gt;</xs:documentation>
receive thread (false) or whether multiple receive threads may be used to
improve latency (true). By default it is disabled on Windows because it
appears that one cannot count on being able to send packets to oneself,
which is necessary to stop the thread during shutdown. Currently multiple
receive threads are only used for connectionless transport (e.g., UDP)
and ManySocketsMode not set to single (the default).&lt;/p&gt;&lt;p&gt;The default
value is: &amp;quot;default&amp;quot;.&lt;/p&gt;</xs:documentation>
</xs:annotation>
<xs:complexType>
<xs:simpleContent>
<xs:extension base="xs:boolean">
<xs:restriction base="xs:anyType">
<xs:simpleType>
<xs:restriction base="xs:token">
<xs:enumeration value="false"/>
<xs:enumeration value="true"/>
<xs:enumeration value="default"/>
</xs:restriction>
</xs:simpleType>
<xs:attribute name="maxretries" type="xs:integer">
<xs:annotation>
<xs:documentation>
@ -1032,7 +1042,7 @@ attribute before aborting.&lt;/p&gt;&lt;p&gt;The default value is:
&amp;quot;4294967295&amp;quot;.&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:extension>
</xs:restriction>
</xs:simpleContent>
</xs:complexType>
</xs:element>

View file

@ -218,7 +218,7 @@ struct config
int64_t liveliness_monitoring_interval;
int prioritize_retransmit;
int xpack_send_async;
int multiple_recv_threads;
enum boolean_default multiple_recv_threads;
unsigned recv_thread_stop_maxretries;
unsigned primary_reorder_maxsamples;

View file

@ -149,9 +149,7 @@ DU(ipv4);
DUPF(allow_multicast);
DUPF(boolean);
DU(boolean_default);
#if 0
PF(boolean_default);
#endif
DUPF(string);
DU(tracingOutputFileName);
DU(verbosity);
@ -584,8 +582,8 @@ static const struct cfgelem unsupp_cfgelems[] = {
BLURB("<p>This element controls whether the actual sending of packets occurs on the same thread that prepares them, or is done asynchronously by another thread.</p>") },
{ LEAF_W_ATTRS("RediscoveryBlacklistDuration", rediscovery_blacklist_duration_attrs), 1, "10s", ABSOFF(prune_deleted_ppant.delay), 0, uf_duration_inf, 0, pf_duration,
BLURB("<p>This element controls for how long a remote participant that was previously deleted will remain on a blacklist to prevent rediscovery, giving the software on a node time to perform any cleanup actions it needs to do. To some extent this delay is required internally by DDSI2E, but in the default configuration with the 'enforce' attribute set to false, DDSI2E will reallow rediscovery as soon as it has cleared its internal administration. Setting it to too small a value may result in the entry being pruned from the blacklist before DDSI2E is ready, it is therefore recommended to set it to at least several seconds.</p>") },
{ LEAF_W_ATTRS("MultipleReceiveThreads", multiple_recv_threads_attrs), 1, "true", ABSOFF(multiple_recv_threads), 0, uf_boolean, 0, pf_boolean,
BLURB("<p>This element controls whether all traffic is handled by a single receive thread or whether multiple receive threads may be used to improve latency. Currently multiple receive threads are only used for connectionless transport (e.g., UDP) and ManySocketsMode not set to single (the default).</p>") },
{ LEAF_W_ATTRS("MultipleReceiveThreads", multiple_recv_threads_attrs), 1, "default", ABSOFF(multiple_recv_threads), 0, uf_boolean_default, 0, pf_boolean_default,
BLURB("<p>This element controls whether all traffic is handled by a single receive thread (false) or whether multiple receive threads may be used to improve latency (true). By default it is disabled on Windows because it appears that one cannot count on being able to send packets to oneself, which is necessary to stop the thread during shutdown. Currently multiple receive threads are only used for connectionless transport (e.g., UDP) and ManySocketsMode not set to single (the default).</p>") },
{ MGROUP("ControlTopic", control_topic_cfgelems, control_topic_cfgattrs), 1, 0, 0, 0, 0, 0, 0, 0,
BLURB("<p>The ControlTopic element allows configured whether DDSI2E provides a special control interface via a predefined topic or not.<p>") },
{ GROUP("Test", unsupp_test_cfgelems),
@ -1456,7 +1454,7 @@ GENERIC_ENUM_CTYPE (boolean, int)
static const char *en_boolean_default_vs[] = { "default", "false", "true", NULL };
static const enum boolean_default en_boolean_default_ms[] = { BOOLDEF_DEFAULT, BOOLDEF_FALSE, BOOLDEF_TRUE, 0 };
GENERIC_ENUM_UF (boolean_default)
GENERIC_ENUM (boolean_default)
static const char *en_besmode_vs[] = { "full", "writers", "minimal", NULL };
static const enum besmode en_besmode_ms[] = { BESMODE_FULL, BESMODE_WRITERS, BESMODE_MINIMAL, 0 };

View file

@ -796,8 +796,36 @@ static void free_special_topics (struct q_globals *gv)
ddsi_sertopic_unref (gv->rawcdr_topic);
}
static bool use_multiple_receive_threads (const struct config *cfg)
{
/* Under some unknown circumstances Windows (at least Windows 10) exhibits
the interesting behaviour of losing its ability to let us send packets
to our own sockets. When that happens, dedicated receive threads can no
longer be stopped and Cyclone hangs in shutdown. So until someone
figures out why this happens, it is probably best have a different
default on Windows. */
#if _WIN32
const bool def = false;
#else
const bool def = true;
#endif
switch (cfg->multiple_recv_threads)
{
case BOOLDEF_FALSE:
return false;
case BOOLDEF_TRUE:
return true;
case BOOLDEF_DEFAULT:
return def;
}
assert (0);
return false;
}
static int setup_and_start_recv_threads (struct q_globals *gv)
{
const bool multi_recv_thr = use_multiple_receive_threads (&gv->config);
for (uint32_t i = 0; i < MAX_RECV_THREADS; i++)
{
gv->recv_threads[i].ts = NULL;
@ -812,7 +840,7 @@ static int setup_and_start_recv_threads (struct q_globals *gv)
gv->n_recv_threads = 1;
gv->recv_threads[0].name = "recv";
gv->recv_threads[0].arg.mode = RTM_MANY;
if (gv->m_factory->m_connless && gv->config.many_sockets_mode != MSM_NO_UNICAST && gv->config.multiple_recv_threads)
if (gv->m_factory->m_connless && gv->config.many_sockets_mode != MSM_NO_UNICAST && multi_recv_thr)
{
if (ddsi_is_mcaddr (gv, &gv->loc_default_mc) && !ddsi_is_ssm_mcaddr (gv, &gv->loc_default_mc) && (gv->config.allowMulticast & AMC_ASM))
{