From d1ed8df9f343b1cd441e3cd7ceaa750b480b7ce9 Mon Sep 17 00:00:00 2001
From: Erik Boasson
Date: Sat, 7 Mar 2020 17:47:18 +0100
Subject: [PATCH] Create a separate socket for transmitting data
This is a workaround for interoperability issues, ultimately driven by a
Windows quirk that makes multicast delivery within a machine utterly
unreliable if the transmitting socket is bound to 0.0.0.0 (despite all
sockets having multicast interfaces set correctly) when there are also
sockets transmitting to the same multicast group that have been bound to
non-0.0.0.0. (Note: there may be other factors at play, but this is
what it looks like after experimentation.)
At least Fast-RTPS in some versions binds the socket it uses for
transmitting multicasts to non-0.0.0.0, so interoperability with
Fast-RTPS on Windows requires us to bind the socket we use for
transmitting multicasts (which was the same as the one we use for
receiving unicast data) also to non-0.0.0.0 or our multicasts get
dropped often.
This would work fine if other implementations honoured the set of
advertised addresses. However, at least Fast-RTPS and Connext (in some
versions) fail to do this and happily substitute 127.0.0.1 for the
advertised IP address. If we bind to, e.g., 192.168.1.1, then suddenly
those packets won't arrive anymore, breaking interoperability.
The only work around is to use a separate socket for sending.
Signed-off-by: Erik Boasson
---
docs/manual/options.md | 11 +-----
etc/cyclonedds.rnc | 7 ----
etc/cyclonedds.xsd | 9 -----
src/core/ddsc/src/dds_writer.c | 3 +-
.../ddsi/include/dds/ddsi/ddsi_domaingv.h | 32 +++++++++++++----
src/core/ddsi/include/dds/ddsi/ddsi_tran.h | 22 ++++++------
src/core/ddsi/include/dds/ddsi/q_config.h | 1 -
src/core/ddsi/include/dds/ddsi/q_nwif.h | 2 +-
src/core/ddsi/src/ddsi_raweth.c | 4 +--
src/core/ddsi/src/ddsi_tcp.c | 6 ++--
src/core/ddsi/src/ddsi_tran.c | 17 ++-------
src/core/ddsi/src/ddsi_udp.c | 35 +++++++++++++++----
src/core/ddsi/src/q_config.c | 2 --
src/core/ddsi/src/q_entity.c | 3 +-
src/core/ddsi/src/q_init.c | 33 ++++++++---------
src/core/ddsi/src/q_nwif.c | 21 +++++------
src/core/ddsi/src/q_receive.c | 2 +-
17 files changed, 102 insertions(+), 108 deletions(-)
diff --git a/docs/manual/options.md b/docs/manual/options.md
index fbdd2aa..a5534b2 100644
--- a/docs/manual/options.md
+++ b/docs/manual/options.md
@@ -554,7 +554,7 @@ The default value is: "default".
### //CycloneDDS/Domain/Internal
-Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BindUnicastToInterfaceAddr](#cycloneddsdomaininternalbindunicasttointerfaceaddr), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsi2directmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
+Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsi2directmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
The Internal elements deal with a variety of settings that evolving and
@@ -600,15 +600,6 @@ Valid values are finite durations with an explicit unit or the keyword
The default value is: "1 s".
-#### //CycloneDDS/Domain/Internal/BindUnicastToInterfaceAddr
-Boolean
-
-Bind unicast sockets to the address of the preferred interface; if false,
-bind to 0.0.0.0 (IPv4) or its equivalent
-
-The default value is: "true".
-
-
#### //CycloneDDS/Domain/Internal/BuiltinEndpointSet
One of: full, writers, minimal
diff --git a/etc/cyclonedds.rnc b/etc/cyclonedds.rnc
index f3e4395..0edc899 100644
--- a/etc/cyclonedds.rnc
+++ b/etc/cyclonedds.rnc
@@ -491,13 +491,6 @@ day.
The default value is: "1 s".
""" ] ]
duration_inf
}?
& [ a:documentation [ xml:lang="en" """
-Bind unicast sockets to the address of the preferred interface; if
-false, bind to 0.0.0.0 (IPv4) or its equivalent
The default value
-is: "true".
""" ] ]
- element BindUnicastToInterfaceAddr {
- xsd:boolean
- }?
- & [ a:documentation [ xml:lang="en" """
This element controls which participants will have which built-in
endpoints for the discovery and liveliness protocols. Valid values
are:
diff --git a/etc/cyclonedds.xsd b/etc/cyclonedds.xsd
index dbcab47..49d4378 100644
--- a/etc/cyclonedds.xsd
+++ b/etc/cyclonedds.xsd
@@ -638,7 +638,6 @@ reserved. This includes renaming or moving options.</p>
-
@@ -717,14 +716,6 @@ of HEARTBEAT messages.</p>
day.</p><p>The default value is: "1 s".</p>
-
-
-
-<p>Bind unicast sockets to the address of the preferred interface; if
-false, bind to 0.0.0.0 (IPv4) or its equivalent</p><p>The default value
-is: "true".</p>
-
-
diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c
index a2e341f..caedd81 100644
--- a/src/core/ddsc/src/dds_writer.c
+++ b/src/core/ddsc/src/dds_writer.c
@@ -292,8 +292,6 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
}
}
- ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc;
-
if ((rc = dds_topic_pin (topic, &tp)) != DDS_RETCODE_OK)
goto err_pin_topic;
assert (tp->m_stopic);
@@ -331,6 +329,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
}
/* Create writer */
+ ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.xmit_conn;
struct dds_writer * const wr = dds_alloc (sizeof (*wr));
const dds_entity_t writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK);
wr->m_topic = tp;
diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h b/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h
index caadcb1..b5747a7 100644
--- a/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h
+++ b/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h
@@ -115,18 +115,40 @@ struct ddsi_domaingv {
DCPS participant of DDSI2 itself will be mirrored in a DDSI
participant, and in multi-socket mode that one gets its own
socket. */
-
struct ddsi_tran_conn * disc_conn_mc;
struct ddsi_tran_conn * data_conn_mc;
struct ddsi_tran_conn * disc_conn_uc;
struct ddsi_tran_conn * data_conn_uc;
- /* TCP listener */
+ /* Connection used for all output (for connectionless transports), this
+ used to simply be data_conn_uc, but:
+ - Windows has a quirk that makes multicast delivery within a machine
+ utterly unreliable if the transmitting socket is bound to 0.0.0.0
+ (despite all sockets having multicast interfaces set correctly),
+ but apparently only in the presence of sockets transmitting to the
+ same multicast group that have been bound to non-0.0.0.0 ...
+ - At least Fast-RTPS and Connext fail to honour the set of advertised
+ addresses and substitute 127.0.0.1 for the advertised IP address and
+ expect it to work.
+ - Fast-RTPS (at least) binds the socket it uses for transmitting
+ multicasts to non-0.0.0.0
+
+ So binding to 0.0.0.0 means the unicasts from Fast-RTPS & Connext will
+ arrive but the multicasts from Cyclone get dropped often on Windows
+ when trying to interoperate; and binding to the IP address means
+ unicast messages from the others fail to arrive (because they fail to
+ arrive).
+
+ The only work around is to use a separate socket for sending. It is
+ rather sad that Cyclone needs to work around the bugs of the others,
+ but it seems the only way to get the users what they expect. */
+ struct ddsi_tran_conn * xmit_conn;
+
+ /* TCP listener */
struct ddsi_tran_listener * listener;
/* Thread pool */
-
struct ddsrt_thread_pool_s * thread_pool;
/* In many sockets mode, the receive threads maintain a local array
@@ -249,10 +271,6 @@ struct ddsi_domaingv {
delivery queue; currently just SEDP and PMD */
struct nn_dqueue *builtins_dqueue;
- /* Connection used by general timed-event queue for transmitting data */
-
- struct ddsi_tran_conn * tev_conn;
-
struct debug_monitor *debmon;
#ifndef DDSI_INCLUDE_NETWORK_CHANNELS
diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h
index 4d7d0e4..c6ccc8f 100644
--- a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h
+++ b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h
@@ -46,7 +46,7 @@ typedef struct ddsi_tran_base * ddsi_tran_base_t;
typedef struct ddsi_tran_conn * ddsi_tran_conn_t;
typedef struct ddsi_tran_listener * ddsi_tran_listener_t;
typedef struct ddsi_tran_factory * ddsi_tran_factory_t;
-typedef struct ddsi_tran_qos * ddsi_tran_qos_t;
+typedef struct ddsi_tran_qos ddsi_tran_qos_t;
/* Function pointer types */
@@ -60,8 +60,8 @@ typedef void (*ddsi_tran_free_fn_t) (ddsi_tran_factory_t);
typedef void (*ddsi_tran_peer_locator_fn_t) (ddsi_tran_conn_t, nn_locator_t *);
typedef void (*ddsi_tran_disable_multiplexing_fn_t) (ddsi_tran_conn_t);
typedef ddsi_tran_conn_t (*ddsi_tran_accept_fn_t) (ddsi_tran_listener_t);
-typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, ddsi_tran_qos_t);
-typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t);
+typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, const struct ddsi_tran_qos *);
+typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *);
typedef void (*ddsi_tran_release_conn_fn_t) (ddsi_tran_conn_t);
typedef void (*ddsi_tran_close_conn_fn_t) (ddsi_tran_conn_t);
typedef void (*ddsi_tran_unblock_listener_fn_t) (ddsi_tran_listener_t);
@@ -189,11 +189,15 @@ struct ddsi_tran_factory
ddsi_tran_factory_t m_factory;
};
+enum ddsi_tran_qos_purpose {
+ DDSI_TRAN_QOS_XMIT,
+ DDSI_TRAN_QOS_RECV_UC,
+ DDSI_TRAN_QOS_RECV_MC
+};
+
struct ddsi_tran_qos
{
- /* QoS Data */
-
- bool m_multicast;
+ enum ddsi_tran_qos_purpose m_purpose;
int m_diffserv;
};
@@ -210,20 +214,18 @@ inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int3
inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port) {
return factory->m_is_valid_port_fn (factory, port);
}
-inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) {
+inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
if (!ddsi_is_valid_port (factory, port))
return NULL;
return factory->m_create_conn_fn (factory, port, qos);
}
-inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) {
+inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
if (!ddsi_is_valid_port (factory, port))
return NULL;
return factory->m_create_listener_fn (factory, port, qos);
}
void ddsi_tran_free (ddsi_tran_base_t base);
-void ddsi_tran_free_qos (ddsi_tran_qos_t qos);
-ddsi_tran_qos_t ddsi_tran_create_qos (void);
inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base) {
return base->m_handle_fn (base);
}
diff --git a/src/core/ddsi/include/dds/ddsi/q_config.h b/src/core/ddsi/include/dds/ddsi/q_config.h
index cb0ba65..fd66ad2 100644
--- a/src/core/ddsi/include/dds/ddsi/q_config.h
+++ b/src/core/ddsi/include/dds/ddsi/q_config.h
@@ -332,7 +332,6 @@ struct config
int64_t initial_deaf_mute_reset;
int use_multicast_if_mreqn;
- int bind_unicast_to_interface_addr;
struct prune_deleted_ppant prune_deleted_ppant;
};
diff --git a/src/core/ddsi/include/dds/ddsi/q_nwif.h b/src/core/ddsi/include/dds/ddsi/q_nwif.h
index dd27df6..12f232e 100644
--- a/src/core/ddsi/include/dds/ddsi/q_nwif.h
+++ b/src/core/ddsi/include/dds/ddsi/q_nwif.h
@@ -35,7 +35,7 @@ struct nn_interface {
char *name;
};
-int make_socket (ddsrt_socket_t *socket, uint16_t port, bool stream, bool multicast, const struct ddsi_domaingv *gv);
+int make_socket (ddsrt_socket_t *socket, uint16_t port, bool stream, bool reuse_addr, bool bind_to_any, const struct ddsi_domaingv *gv);
int find_own_ip (struct ddsi_domaingv *gv, const char *requested_address);
uint32_t locator_to_hopefully_unique_uint32 (const nn_locator_t *src);
diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c
index 1f7ed6a..6e0bca2 100644
--- a/src/core/ddsi/src/ddsi_raweth.c
+++ b/src/core/ddsi/src/ddsi_raweth.c
@@ -175,13 +175,13 @@ static int ddsi_raweth_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t
return ret;
}
-static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos)
+static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
{
ddsrt_socket_t sock;
dds_return_t rc;
ddsi_raweth_conn_t uc = NULL;
struct sockaddr_ll addr;
- bool mcast = (bool) (qos ? qos->m_multicast : 0);
+ bool mcast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC);
/* If port is zero, need to create dynamic port */
diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c
index 5a7ed06..5a87fd1 100644
--- a/src/core/ddsi/src/ddsi_tcp.c
+++ b/src/core/ddsi/src/ddsi_tcp.c
@@ -167,7 +167,7 @@ static void ddsi_tcp_sock_free (const struct ddsrt_log_cfg *logcfg, ddsrt_socket
static void ddsi_tcp_sock_new (ddsrt_socket_t *sock, unsigned short port, const struct ddsi_domaingv *gv)
{
- if (make_socket (sock, port, true, true, gv) != 0)
+ if (make_socket (sock, port, true, true, true, gv) != 0)
{
*sock = DDSRT_INVALID_SOCKET;
}
@@ -706,7 +706,7 @@ static int ddsi_tcp_locator (struct ddsi_tran_factory *fact_cmn, ddsi_tran_base_
return 0;
}
-static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, ddsi_tran_qos_t qos)
+static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn;
(void) qos;
@@ -853,7 +853,7 @@ static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, dd
return conn;
}
-static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos)
+static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
{
char buff[DDSI_LOCSTRLEN];
ddsrt_socket_t sock;
diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c
index 2bdda5d..fdf1fcf 100644
--- a/src/core/ddsi/src/ddsi_tran.c
+++ b/src/core/ddsi/src/ddsi_tran.c
@@ -24,13 +24,13 @@
extern inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn);
extern inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn);
-extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos);
+extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int32_t kind);
extern inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port);
extern inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn);
extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc);
extern inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base);
-extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos);
+extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc);
extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener);
extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener);
@@ -182,11 +182,6 @@ bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc)
return false;
}
-void ddsi_tran_free_qos (ddsi_tran_qos_t qos)
-{
- ddsrt_free (qos);
-}
-
int ddsi_conn_join_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf)
{
return conn->m_factory->m_join_mc_fn (conn, srcloc, mcloc, interf);
@@ -197,14 +192,6 @@ int ddsi_conn_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const
return conn->m_factory->m_leave_mc_fn (conn, srcloc, mcloc, interf);
}
-ddsi_tran_qos_t ddsi_tran_create_qos (void)
-{
- ddsi_tran_qos_t qos;
- qos = (ddsi_tran_qos_t) ddsrt_malloc (sizeof (*qos));
- memset (qos, 0, sizeof (*qos));
- return qos;
-}
-
void ddsi_tran_free (ddsi_tran_base_t base)
{
if (base)
diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c
index 8720468..ede09e7 100644
--- a/src/core/ddsi/src/ddsi_udp.c
+++ b/src/core/ddsi/src/ddsi_udp.c
@@ -218,16 +218,37 @@ static unsigned short get_socket_port (const struct ddsrt_log_cfg *logcfg, ddsrt
return ddsrt_sockaddr_get_port((struct sockaddr *)&addr);
}
-static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos)
+static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos)
{
int ret;
ddsrt_socket_t sock;
ddsi_udp_conn_t uc = NULL;
- bool mcast = (bool) (qos ? qos->m_multicast : false);
+ bool reuse_addr = false, bind_to_any = false;
+ const char *purpose_str = NULL;
+
+ switch (qos->m_purpose)
+ {
+ case DDSI_TRAN_QOS_XMIT:
+ reuse_addr = false;
+ bind_to_any = false;
+ purpose_str = "transmit";
+ break;
+ case DDSI_TRAN_QOS_RECV_UC:
+ reuse_addr = false;
+ bind_to_any = true;
+ purpose_str = "unicast";
+ break;
+ case DDSI_TRAN_QOS_RECV_MC:
+ reuse_addr = true;
+ bind_to_any = true;
+ purpose_str = "multicast";
+ break;
+ }
+ assert (purpose_str != NULL);
/* If port is zero, need to create dynamic port */
- ret = make_socket (&sock, (unsigned short) port, false, mcast, fact->gv);
+ ret = make_socket (&sock, (unsigned short) port, false, reuse_addr, bind_to_any, fact->gv);
if (ret == 0)
{
@@ -235,7 +256,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
memset (uc, 0, sizeof (*uc));
uc->m_sock = sock;
- uc->m_diffserv = qos ? qos->m_diffserv : 0;
+ uc->m_diffserv = qos->m_diffserv;
#if defined _WIN32 && !defined WINCE
uc->m_sockEvent = WSACreateEvent();
WSAEventSelect(uc->m_sock, uc->m_sockEvent, FD_WRITE);
@@ -244,7 +265,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
ddsi_factory_conn_init (fact, &uc->m_base);
uc->m_base.m_base.m_port = get_socket_port (&fact->gv->logconfig, sock);
uc->m_base.m_base.m_trantype = DDSI_TRAN_CONN;
- uc->m_base.m_base.m_multicast = mcast;
+ uc->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC);
uc->m_base.m_base.m_handle_fn = ddsi_udp_conn_handle;
uc->m_base.m_read_fn = ddsi_udp_conn_read;
@@ -254,7 +275,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
DDS_CTRACE (&fact->gv->logconfig,
"ddsi_udp_create_conn %s socket %"PRIdSOCK" port %"PRIu32"\n",
- mcast ? "multicast" : "unicast",
+ purpose_str,
uc->m_sock,
uc->m_base.m_base.m_port);
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
@@ -271,7 +292,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
{
if (fact->gv->config.participantIndex != PARTICIPANT_INDEX_AUTO)
{
- DDS_CERROR (&fact->gv->logconfig, "UDP make_socket failed for %s port %"PRIu32"\n", mcast ? "multicast" : "unicast", port);
+ DDS_CERROR (&fact->gv->logconfig, "UDP make_socket failed for %s port %"PRIu32"\n", purpose_str, port);
}
}
diff --git a/src/core/ddsi/src/q_config.c b/src/core/ddsi/src/q_config.c
index 82d1d71..2f3fbf6 100644
--- a/src/core/ddsi/src/q_config.c
+++ b/src/core/ddsi/src/q_config.c
@@ -578,8 +578,6 @@ static const struct cfgelem internal_cfgelems[] = {
BLURB("This element controls whether retransmits are prioritized over new data, speeding up recovery.
") },
{ LEAF("UseMulticastIfMreqn"), 1, "0", ABSOFF(use_multicast_if_mreqn), 0, uf_int, 0, pf_int,
BLURB("Do not use.
") },
- { LEAF("BindUnicastToInterfaceAddr"), 1, "true", ABSOFF(bind_unicast_to_interface_addr), 0, uf_boolean, 0, pf_boolean,
- BLURB("Bind unicast sockets to the address of the preferred interface; if false, bind to 0.0.0.0 (IPv4) or its equivalent
") },
{ LEAF("SendAsync"), 1, "false", ABSOFF(xpack_send_async), 0, uf_boolean, 0, pf_boolean,
BLURB("This element controls whether the actual sending of packets occurs on the same thread that prepares them, or is done asynchronously by another thread.
") },
{ LEAF_W_ATTRS("RediscoveryBlacklistDuration", rediscovery_blacklist_duration_attrs), 1, "10s", ABSOFF(prune_deleted_ppant.delay), 0, uf_duration_inf, 0, pf_duration,
diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c
index f52cddb..0790a00 100644
--- a/src/core/ddsi/src/q_entity.c
+++ b/src/core/ddsi/src/q_entity.c
@@ -652,7 +652,8 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain
if (gv->config.many_sockets_mode == MSM_MANY_UNICAST)
{
- pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, NULL);
+ const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 };
+ pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos);
ddsi_conn_locator (pp->m_conn, &pp->m_locator);
}
else
diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c
index 46ac147..7cb0fb3 100644
--- a/src/core/ddsi/src/q_init.c
+++ b/src/core/ddsi/src/q_init.c
@@ -96,14 +96,15 @@ static enum make_uc_sockets_ret make_uc_sockets (struct ddsi_domaingv *gv, uint3
if (!ddsi_is_valid_port (gv->m_factory, *pdisc) || !ddsi_is_valid_port (gv->m_factory, *pdata))
return MUSRET_INVALID_PORTS;
- gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, NULL);
+ const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 };
+ gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, &qos);
if (gv->disc_conn_uc)
{
/* Check not configured to use same unicast port for data and discovery */
if (*pdata != 0 && (*pdata != *pdisc))
{
- gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, NULL);
+ gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, &qos);
}
else
{
@@ -663,10 +664,9 @@ int joinleave_spdp_defmcip (struct ddsi_domaingv *gv, int dojoin)
int create_multicast_sockets (struct ddsi_domaingv *gv)
{
- ddsi_tran_qos_t qos = ddsi_tran_create_qos ();
+ const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_MC, .m_diffserv = 0 };
ddsi_tran_conn_t disc, data;
uint32_t port;
- qos->m_multicast = 1;
port = ddsi_get_port (&gv->config, DDSI_PORT_MULTI_DISC, 0);
if (!ddsi_is_valid_port (gv->m_factory, port))
@@ -675,7 +675,7 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
gv->config.extDomainId.value, port);
goto err_disc;
}
- if ((disc = ddsi_factory_create_conn (gv->m_factory, port, qos)) == NULL)
+ if ((disc = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL)
goto err_disc;
if (gv->config.many_sockets_mode == MSM_NO_UNICAST)
{
@@ -691,12 +691,11 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
gv->config.extDomainId.value, port);
goto err_disc;
}
- if ((data = ddsi_factory_create_conn (gv->m_factory, port, qos)) == NULL)
+ if ((data = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL)
{
goto err_data;
}
}
- ddsi_tran_free_qos (qos);
gv->disc_conn_mc = disc;
gv->data_conn_mc = data;
@@ -707,7 +706,6 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
err_data:
ddsi_conn_free (disc);
err_disc:
- ddsi_tran_free_qos (qos);
return 0;
}
@@ -964,7 +962,7 @@ int rtps_init (struct ddsi_domaingv *gv)
gv->data_conn_uc = NULL;
gv->disc_conn_mc = NULL;
gv->data_conn_mc = NULL;
- gv->tev_conn = NULL;
+ gv->xmit_conn = NULL;
gv->listener = NULL;
gv->thread_pool = NULL;
gv->debmon = NULL;
@@ -1280,9 +1278,6 @@ int rtps_init (struct ddsi_domaingv *gv)
}
else
{
- /* Must have a data_conn_uc/tev_conn/transmit_conn */
- gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, 0, NULL);
-
if (gv->config.tcp_port == -1)
; /* nop */
else if (!ddsi_is_valid_port (gv->m_factory, (uint32_t) gv->config.tcp_port))
@@ -1311,9 +1306,10 @@ int rtps_init (struct ddsi_domaingv *gv)
}
/* Create shared transmit connection */
-
- gv->tev_conn = gv->data_conn_uc;
- GVLOG (DDS_LC_CONFIG, "Timed event transmit port: %d\n", (int) ddsi_conn_port (gv->tev_conn));
+ {
+ const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_XMIT, .m_diffserv = 0 };
+ gv->xmit_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos);
+ }
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
{
@@ -1376,7 +1372,7 @@ int rtps_init (struct ddsi_domaingv *gv)
gv->xevents = xeventq_new
(
- gv->tev_conn,
+ gv->xmit_conn,
gv->config.max_queued_rexmit_bytes,
gv->config.max_queued_rexmit_msgs,
#ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING
@@ -1438,6 +1434,8 @@ int rtps_init (struct ddsi_domaingv *gv)
return 0;
err_mc_conn:
+ if (gv->xmit_conn)
+ ddsi_conn_free (gv->xmit_conn);
if (gv->disc_conn_mc)
ddsi_conn_free (gv->disc_conn_mc);
if (gv->data_conn_mc && gv->data_conn_mc != gv->disc_conn_mc)
@@ -1762,6 +1760,7 @@ void rtps_fini (struct ddsi_domaingv *gv)
(void) joinleave_spdp_defmcip (gv, 0);
+ ddsi_conn_free (gv->xmit_conn);
ddsi_conn_free (gv->disc_conn_mc);
if (gv->data_conn_mc != gv->disc_conn_mc)
ddsi_conn_free (gv->data_conn_mc);
@@ -1770,8 +1769,6 @@ void rtps_fini (struct ddsi_domaingv *gv)
if (gv->data_conn_uc != gv->disc_conn_uc)
ddsi_conn_free (gv->data_conn_uc);
- /* Not freeing gv->tev_conn: it aliases data_conn_uc */
-
free_group_membership(gv->mship);
ddsi_tran_factories_fini (gv);
diff --git a/src/core/ddsi/src/q_nwif.c b/src/core/ddsi/src/q_nwif.c
index 5789983..9bbbb5c 100644
--- a/src/core/ddsi/src/q_nwif.c
+++ b/src/core/ddsi/src/q_nwif.c
@@ -214,7 +214,7 @@ static int set_reuse_options (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t
return 0;
}
-static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multicast, const struct ddsi_domaingv *gv)
+static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool bind_to_any, const struct ddsi_domaingv *gv)
{
dds_return_t rc = DDS_RETCODE_ERROR;
@@ -226,7 +226,7 @@ static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multica
struct sockaddr_in6 a;
} socketname;
ddsi_ipaddr_from_loc (&socketname.x, &gv->ownloc);
- if (multicast || !gv->config.bind_unicast_to_interface_addr)
+ if (bind_to_any)
socketname.a.sin6_addr = ddsrt_in6addr_any;
socketname.a.sin6_port = htons (port);
if (IN6_IS_ADDR_LINKLOCAL (&socketname.a.sin6_addr)) {
@@ -243,7 +243,7 @@ static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multica
struct sockaddr_in a;
} socketname;
ddsi_ipaddr_from_loc (&socketname.x, &gv->ownloc);
- if (multicast || !gv->config.bind_unicast_to_interface_addr)
+ if (bind_to_any)
socketname.a.sin_addr.s_addr = htonl (INADDR_ANY);
socketname.a.sin_port = htons (port);
rc = ddsrt_bind (socket, (struct sockaddr *) &socketname.a, sizeof (socketname.a));
@@ -345,7 +345,7 @@ static int set_mc_options_transmit (ddsrt_socket_t socket, const struct ddsi_dom
}
}
-int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool multicast, const struct ddsi_domaingv *gv)
+int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool reuse_addr, bool bind_to_any, const struct ddsi_domaingv *gv)
{
/* FIXME: this stuff has to move to the transports */
int rc = -2;
@@ -373,18 +373,15 @@ int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool multicas
return rc;
}
- if (port && multicast && ((rc = set_reuse_options (&gv->logconfig, *sock)) < 0))
+ if (port && reuse_addr && ((rc = set_reuse_options (&gv->logconfig, *sock)) < 0))
{
goto fail;
}
- if
- (
- (rc = set_rcvbuf (&gv->logconfig, *sock, &gv->config.socket_min_rcvbuf_size) < 0) ||
- (rc = set_sndbuf (&gv->logconfig, *sock, gv->config.socket_min_sndbuf_size) < 0) ||
- ((rc = maybe_set_dont_route (&gv->logconfig, *sock, &gv->config)) < 0) ||
- ((rc = bind_socket (*sock, port, multicast, gv)) < 0)
- )
+ if ((rc = set_rcvbuf (&gv->logconfig, *sock, &gv->config.socket_min_rcvbuf_size) < 0) ||
+ (rc = set_sndbuf (&gv->logconfig, *sock, gv->config.socket_min_sndbuf_size) < 0) ||
+ ((rc = maybe_set_dont_route (&gv->logconfig, *sock, &gv->config)) < 0) ||
+ ((rc = bind_socket (*sock, port, bind_to_any, gv)) < 0))
{
goto fail;
}
diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c
index 6ae7d41..7b138a8 100644
--- a/src/core/ddsi/src/q_receive.c
+++ b/src/core/ddsi/src/q_receive.c
@@ -3156,7 +3156,7 @@ void trigger_recv_threads (const struct ddsi_domaingv *gv)
iov.iov_base = &dummy;
iov.iov_len = 1;
GVTRACE ("trigger_recv_threads: %d single %s\n", i, ddsi_locator_to_string (buf, sizeof (buf), dst));
- ddsi_conn_write (gv->data_conn_uc, dst, 1, &iov, 0);
+ ddsi_conn_write (gv->xmit_conn, dst, 1, &iov, 0);
break;
}
case RTM_MANY: {