Create a separate socket for transmitting data

This is a workaround for interoperability issues, ultimately driven by a
Windows quirk that makes multicast delivery within a machine utterly
unreliable if the transmitting socket is bound to 0.0.0.0 (despite all
sockets having multicast interfaces set correctly) when there are also
sockets transmitting to the same multicast group that have been bound to
non-0.0.0.0.  (Note: there may be other factors at play, but this is
what it looks like after experimentation.)

At least Fast-RTPS in some versions binds the socket it uses for
transmitting multicasts to non-0.0.0.0, so interoperability with
Fast-RTPS on Windows requires us to bind the socket we use for
transmitting multicasts (which was the same as the one we use for
receiving unicast data) also to non-0.0.0.0 or our multicasts get
dropped often.

This would work fine if other implementations honoured the set of
advertised addresses.  However, at least Fast-RTPS and Connext (in some
versions) fail to do this and happily substitute 127.0.0.1 for the
advertised IP address.  If we bind to, e.g., 192.168.1.1, then suddenly
those packets won't arrive anymore, breaking interoperability.

The only work around is to use a separate socket for sending.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-03-07 17:47:18 +01:00 committed by eboasson
parent 9e673769ce
commit d1ed8df9f3
17 changed files with 102 additions and 108 deletions

View file

@ -554,7 +554,7 @@ The default value is: "default".
### //CycloneDDS/Domain/Internal
Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BindUnicastToInterfaceAddr](#cycloneddsdomaininternalbindunicasttointerfaceaddr), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsi2directmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsi2directmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration)
The Internal elements deal with a variety of settings that evolving and
@ -600,15 +600,6 @@ Valid values are finite durations with an explicit unit or the keyword
The default value is: "1 s".
#### //CycloneDDS/Domain/Internal/BindUnicastToInterfaceAddr
Boolean
Bind unicast sockets to the address of the preferred interface; if false,
bind to 0.0.0.0 (IPv4) or its equivalent
The default value is: "true".
#### //CycloneDDS/Domain/Internal/BuiltinEndpointSet
One of: full, writers, minimal

View file

@ -491,13 +491,6 @@ day.</p><p>The default value is: &quot;1 s&quot;.</p>""" ] ]
duration_inf
}?
& [ a:documentation [ xml:lang="en" """
<p>Bind unicast sockets to the address of the preferred interface; if
false, bind to 0.0.0.0 (IPv4) or its equivalent</p><p>The default value
is: &quot;true&quot;.</p>""" ] ]
element BindUnicastToInterfaceAddr {
xsd:boolean
}?
& [ a:documentation [ xml:lang="en" """
<p>This element controls which participants will have which built-in
endpoints for the discovery and liveliness protocols. Valid values
are:</p>

View file

@ -638,7 +638,6 @@ reserved. This includes renaming or moving options.&lt;/p&gt;</xs:documentation>
<xs:element minOccurs="0" ref="config:AccelerateRexmitBlockSize"/>
<xs:element minOccurs="0" ref="config:AssumeMulticastCapable"/>
<xs:element minOccurs="0" ref="config:AutoReschedNackDelay"/>
<xs:element minOccurs="0" ref="config:BindUnicastToInterfaceAddr"/>
<xs:element minOccurs="0" ref="config:BuiltinEndpointSet"/>
<xs:element minOccurs="0" ref="config:ControlTopic"/>
<xs:element minOccurs="0" ref="config:DDSI2DirectMaxThreads"/>
@ -717,14 +716,6 @@ of HEARTBEAT messages.&lt;/p&gt;
day.&lt;/p&gt;&lt;p&gt;The default value is: &amp;quot;1 s&amp;quot;.&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="BindUnicastToInterfaceAddr" type="xs:boolean">
<xs:annotation>
<xs:documentation>
&lt;p&gt;Bind unicast sockets to the address of the preferred interface; if
false, bind to 0.0.0.0 (IPv4) or its equivalent&lt;/p&gt;&lt;p&gt;The default value
is: &amp;quot;true&amp;quot;.&lt;/p&gt;</xs:documentation>
</xs:annotation>
</xs:element>
<xs:element name="BuiltinEndpointSet">
<xs:annotation>
<xs:documentation>

View file

@ -292,8 +292,6 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
}
}
ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc;
if ((rc = dds_topic_pin (topic, &tp)) != DDS_RETCODE_OK)
goto err_pin_topic;
assert (tp->m_stopic);
@ -331,6 +329,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
}
/* Create writer */
ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.xmit_conn;
struct dds_writer * const wr = dds_alloc (sizeof (*wr));
const dds_entity_t writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK);
wr->m_topic = tp;

View file

@ -115,18 +115,40 @@ struct ddsi_domaingv {
DCPS participant of DDSI2 itself will be mirrored in a DDSI
participant, and in multi-socket mode that one gets its own
socket. */
struct ddsi_tran_conn * disc_conn_mc;
struct ddsi_tran_conn * data_conn_mc;
struct ddsi_tran_conn * disc_conn_uc;
struct ddsi_tran_conn * data_conn_uc;
/* TCP listener */
/* Connection used for all output (for connectionless transports), this
used to simply be data_conn_uc, but:
- Windows has a quirk that makes multicast delivery within a machine
utterly unreliable if the transmitting socket is bound to 0.0.0.0
(despite all sockets having multicast interfaces set correctly),
but apparently only in the presence of sockets transmitting to the
same multicast group that have been bound to non-0.0.0.0 ...
- At least Fast-RTPS and Connext fail to honour the set of advertised
addresses and substitute 127.0.0.1 for the advertised IP address and
expect it to work.
- Fast-RTPS (at least) binds the socket it uses for transmitting
multicasts to non-0.0.0.0
So binding to 0.0.0.0 means the unicasts from Fast-RTPS & Connext will
arrive but the multicasts from Cyclone get dropped often on Windows
when trying to interoperate; and binding to the IP address means
unicast messages from the others fail to arrive (because they fail to
arrive).
The only work around is to use a separate socket for sending. It is
rather sad that Cyclone needs to work around the bugs of the others,
but it seems the only way to get the users what they expect. */
struct ddsi_tran_conn * xmit_conn;
/* TCP listener */
struct ddsi_tran_listener * listener;
/* Thread pool */
struct ddsrt_thread_pool_s * thread_pool;
/* In many sockets mode, the receive threads maintain a local array
@ -249,10 +271,6 @@ struct ddsi_domaingv {
delivery queue; currently just SEDP and PMD */
struct nn_dqueue *builtins_dqueue;
/* Connection used by general timed-event queue for transmitting data */
struct ddsi_tran_conn * tev_conn;
struct debug_monitor *debmon;
#ifndef DDSI_INCLUDE_NETWORK_CHANNELS

View file

@ -46,7 +46,7 @@ typedef struct ddsi_tran_base * ddsi_tran_base_t;
typedef struct ddsi_tran_conn * ddsi_tran_conn_t;
typedef struct ddsi_tran_listener * ddsi_tran_listener_t;
typedef struct ddsi_tran_factory * ddsi_tran_factory_t;
typedef struct ddsi_tran_qos * ddsi_tran_qos_t;
typedef struct ddsi_tran_qos ddsi_tran_qos_t;
/* Function pointer types */
@ -60,8 +60,8 @@ typedef void (*ddsi_tran_free_fn_t) (ddsi_tran_factory_t);
typedef void (*ddsi_tran_peer_locator_fn_t) (ddsi_tran_conn_t, nn_locator_t *);
typedef void (*ddsi_tran_disable_multiplexing_fn_t) (ddsi_tran_conn_t);
typedef ddsi_tran_conn_t (*ddsi_tran_accept_fn_t) (ddsi_tran_listener_t);
typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, ddsi_tran_qos_t);
typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t);
typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, const struct ddsi_tran_qos *);
typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *);
typedef void (*ddsi_tran_release_conn_fn_t) (ddsi_tran_conn_t);
typedef void (*ddsi_tran_close_conn_fn_t) (ddsi_tran_conn_t);
typedef void (*ddsi_tran_unblock_listener_fn_t) (ddsi_tran_listener_t);
@ -189,11 +189,15 @@ struct ddsi_tran_factory
ddsi_tran_factory_t m_factory;
};
enum ddsi_tran_qos_purpose {
DDSI_TRAN_QOS_XMIT,
DDSI_TRAN_QOS_RECV_UC,
DDSI_TRAN_QOS_RECV_MC
};
struct ddsi_tran_qos
{
/* QoS Data */
bool m_multicast;
enum ddsi_tran_qos_purpose m_purpose;
int m_diffserv;
};
@ -210,20 +214,18 @@ inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int3
inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port) {
return factory->m_is_valid_port_fn (factory, port);
}
inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) {
inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
if (!ddsi_is_valid_port (factory, port))
return NULL;
return factory->m_create_conn_fn (factory, port, qos);
}
inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) {
inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
if (!ddsi_is_valid_port (factory, port))
return NULL;
return factory->m_create_listener_fn (factory, port, qos);
}
void ddsi_tran_free (ddsi_tran_base_t base);
void ddsi_tran_free_qos (ddsi_tran_qos_t qos);
ddsi_tran_qos_t ddsi_tran_create_qos (void);
inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base) {
return base->m_handle_fn (base);
}

View file

@ -332,7 +332,6 @@ struct config
int64_t initial_deaf_mute_reset;
int use_multicast_if_mreqn;
int bind_unicast_to_interface_addr;
struct prune_deleted_ppant prune_deleted_ppant;
};

View file

@ -35,7 +35,7 @@ struct nn_interface {
char *name;
};
int make_socket (ddsrt_socket_t *socket, uint16_t port, bool stream, bool multicast, const struct ddsi_domaingv *gv);
int make_socket (ddsrt_socket_t *socket, uint16_t port, bool stream, bool reuse_addr, bool bind_to_any, const struct ddsi_domaingv *gv);
int find_own_ip (struct ddsi_domaingv *gv, const char *requested_address);
uint32_t locator_to_hopefully_unique_uint32 (const nn_locator_t *src);

View file

@ -175,13 +175,13 @@ static int ddsi_raweth_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t
return ret;
}
static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos)
static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
{
ddsrt_socket_t sock;
dds_return_t rc;
ddsi_raweth_conn_t uc = NULL;
struct sockaddr_ll addr;
bool mcast = (bool) (qos ? qos->m_multicast : 0);
bool mcast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC);
/* If port is zero, need to create dynamic port */

View file

@ -167,7 +167,7 @@ static void ddsi_tcp_sock_free (const struct ddsrt_log_cfg *logcfg, ddsrt_socket
static void ddsi_tcp_sock_new (ddsrt_socket_t *sock, unsigned short port, const struct ddsi_domaingv *gv)
{
if (make_socket (sock, port, true, true, gv) != 0)
if (make_socket (sock, port, true, true, true, gv) != 0)
{
*sock = DDSRT_INVALID_SOCKET;
}
@ -706,7 +706,7 @@ static int ddsi_tcp_locator (struct ddsi_tran_factory *fact_cmn, ddsi_tran_base_
return 0;
}
static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, ddsi_tran_qos_t qos)
static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn;
(void) qos;
@ -853,7 +853,7 @@ static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, dd
return conn;
}
static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos)
static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
{
char buff[DDSI_LOCSTRLEN];
ddsrt_socket_t sock;

View file

@ -24,13 +24,13 @@
extern inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn);
extern inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn);
extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos);
extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int32_t kind);
extern inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port);
extern inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn);
extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc);
extern inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base);
extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos);
extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc);
extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener);
extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener);
@ -182,11 +182,6 @@ bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc)
return false;
}
void ddsi_tran_free_qos (ddsi_tran_qos_t qos)
{
ddsrt_free (qos);
}
int ddsi_conn_join_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf)
{
return conn->m_factory->m_join_mc_fn (conn, srcloc, mcloc, interf);
@ -197,14 +192,6 @@ int ddsi_conn_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const
return conn->m_factory->m_leave_mc_fn (conn, srcloc, mcloc, interf);
}
ddsi_tran_qos_t ddsi_tran_create_qos (void)
{
ddsi_tran_qos_t qos;
qos = (ddsi_tran_qos_t) ddsrt_malloc (sizeof (*qos));
memset (qos, 0, sizeof (*qos));
return qos;
}
void ddsi_tran_free (ddsi_tran_base_t base)
{
if (base)

View file

@ -218,16 +218,37 @@ static unsigned short get_socket_port (const struct ddsrt_log_cfg *logcfg, ddsrt
return ddsrt_sockaddr_get_port((struct sockaddr *)&addr);
}
static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos)
static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos)
{
int ret;
ddsrt_socket_t sock;
ddsi_udp_conn_t uc = NULL;
bool mcast = (bool) (qos ? qos->m_multicast : false);
bool reuse_addr = false, bind_to_any = false;
const char *purpose_str = NULL;
switch (qos->m_purpose)
{
case DDSI_TRAN_QOS_XMIT:
reuse_addr = false;
bind_to_any = false;
purpose_str = "transmit";
break;
case DDSI_TRAN_QOS_RECV_UC:
reuse_addr = false;
bind_to_any = true;
purpose_str = "unicast";
break;
case DDSI_TRAN_QOS_RECV_MC:
reuse_addr = true;
bind_to_any = true;
purpose_str = "multicast";
break;
}
assert (purpose_str != NULL);
/* If port is zero, need to create dynamic port */
ret = make_socket (&sock, (unsigned short) port, false, mcast, fact->gv);
ret = make_socket (&sock, (unsigned short) port, false, reuse_addr, bind_to_any, fact->gv);
if (ret == 0)
{
@ -235,7 +256,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
memset (uc, 0, sizeof (*uc));
uc->m_sock = sock;
uc->m_diffserv = qos ? qos->m_diffserv : 0;
uc->m_diffserv = qos->m_diffserv;
#if defined _WIN32 && !defined WINCE
uc->m_sockEvent = WSACreateEvent();
WSAEventSelect(uc->m_sock, uc->m_sockEvent, FD_WRITE);
@ -244,7 +265,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
ddsi_factory_conn_init (fact, &uc->m_base);
uc->m_base.m_base.m_port = get_socket_port (&fact->gv->logconfig, sock);
uc->m_base.m_base.m_trantype = DDSI_TRAN_CONN;
uc->m_base.m_base.m_multicast = mcast;
uc->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC);
uc->m_base.m_base.m_handle_fn = ddsi_udp_conn_handle;
uc->m_base.m_read_fn = ddsi_udp_conn_read;
@ -254,7 +275,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
DDS_CTRACE (&fact->gv->logconfig,
"ddsi_udp_create_conn %s socket %"PRIdSOCK" port %"PRIu32"\n",
mcast ? "multicast" : "unicast",
purpose_str,
uc->m_sock,
uc->m_base.m_base.m_port);
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
@ -271,7 +292,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
{
if (fact->gv->config.participantIndex != PARTICIPANT_INDEX_AUTO)
{
DDS_CERROR (&fact->gv->logconfig, "UDP make_socket failed for %s port %"PRIu32"\n", mcast ? "multicast" : "unicast", port);
DDS_CERROR (&fact->gv->logconfig, "UDP make_socket failed for %s port %"PRIu32"\n", purpose_str, port);
}
}

View file

@ -578,8 +578,6 @@ static const struct cfgelem internal_cfgelems[] = {
BLURB("<p>This element controls whether retransmits are prioritized over new data, speeding up recovery.</p>") },
{ LEAF("UseMulticastIfMreqn"), 1, "0", ABSOFF(use_multicast_if_mreqn), 0, uf_int, 0, pf_int,
BLURB("<p>Do not use.</p>") },
{ LEAF("BindUnicastToInterfaceAddr"), 1, "true", ABSOFF(bind_unicast_to_interface_addr), 0, uf_boolean, 0, pf_boolean,
BLURB("<p>Bind unicast sockets to the address of the preferred interface; if false, bind to 0.0.0.0 (IPv4) or its equivalent</p>") },
{ LEAF("SendAsync"), 1, "false", ABSOFF(xpack_send_async), 0, uf_boolean, 0, pf_boolean,
BLURB("<p>This element controls whether the actual sending of packets occurs on the same thread that prepares them, or is done asynchronously by another thread.</p>") },
{ LEAF_W_ATTRS("RediscoveryBlacklistDuration", rediscovery_blacklist_duration_attrs), 1, "10s", ABSOFF(prune_deleted_ppant.delay), 0, uf_duration_inf, 0, pf_duration,

View file

@ -652,7 +652,8 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain
if (gv->config.many_sockets_mode == MSM_MANY_UNICAST)
{
pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, NULL);
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 };
pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos);
ddsi_conn_locator (pp->m_conn, &pp->m_locator);
}
else

View file

@ -96,14 +96,15 @@ static enum make_uc_sockets_ret make_uc_sockets (struct ddsi_domaingv *gv, uint3
if (!ddsi_is_valid_port (gv->m_factory, *pdisc) || !ddsi_is_valid_port (gv->m_factory, *pdata))
return MUSRET_INVALID_PORTS;
gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, NULL);
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 };
gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, &qos);
if (gv->disc_conn_uc)
{
/* Check not configured to use same unicast port for data and discovery */
if (*pdata != 0 && (*pdata != *pdisc))
{
gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, NULL);
gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, &qos);
}
else
{
@ -663,10 +664,9 @@ int joinleave_spdp_defmcip (struct ddsi_domaingv *gv, int dojoin)
int create_multicast_sockets (struct ddsi_domaingv *gv)
{
ddsi_tran_qos_t qos = ddsi_tran_create_qos ();
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_MC, .m_diffserv = 0 };
ddsi_tran_conn_t disc, data;
uint32_t port;
qos->m_multicast = 1;
port = ddsi_get_port (&gv->config, DDSI_PORT_MULTI_DISC, 0);
if (!ddsi_is_valid_port (gv->m_factory, port))
@ -675,7 +675,7 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
gv->config.extDomainId.value, port);
goto err_disc;
}
if ((disc = ddsi_factory_create_conn (gv->m_factory, port, qos)) == NULL)
if ((disc = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL)
goto err_disc;
if (gv->config.many_sockets_mode == MSM_NO_UNICAST)
{
@ -691,12 +691,11 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
gv->config.extDomainId.value, port);
goto err_disc;
}
if ((data = ddsi_factory_create_conn (gv->m_factory, port, qos)) == NULL)
if ((data = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL)
{
goto err_data;
}
}
ddsi_tran_free_qos (qos);
gv->disc_conn_mc = disc;
gv->data_conn_mc = data;
@ -707,7 +706,6 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
err_data:
ddsi_conn_free (disc);
err_disc:
ddsi_tran_free_qos (qos);
return 0;
}
@ -964,7 +962,7 @@ int rtps_init (struct ddsi_domaingv *gv)
gv->data_conn_uc = NULL;
gv->disc_conn_mc = NULL;
gv->data_conn_mc = NULL;
gv->tev_conn = NULL;
gv->xmit_conn = NULL;
gv->listener = NULL;
gv->thread_pool = NULL;
gv->debmon = NULL;
@ -1280,9 +1278,6 @@ int rtps_init (struct ddsi_domaingv *gv)
}
else
{
/* Must have a data_conn_uc/tev_conn/transmit_conn */
gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, 0, NULL);
if (gv->config.tcp_port == -1)
; /* nop */
else if (!ddsi_is_valid_port (gv->m_factory, (uint32_t) gv->config.tcp_port))
@ -1311,9 +1306,10 @@ int rtps_init (struct ddsi_domaingv *gv)
}
/* Create shared transmit connection */
gv->tev_conn = gv->data_conn_uc;
GVLOG (DDS_LC_CONFIG, "Timed event transmit port: %d\n", (int) ddsi_conn_port (gv->tev_conn));
{
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_XMIT, .m_diffserv = 0 };
gv->xmit_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos);
}
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
{
@ -1376,7 +1372,7 @@ int rtps_init (struct ddsi_domaingv *gv)
gv->xevents = xeventq_new
(
gv->tev_conn,
gv->xmit_conn,
gv->config.max_queued_rexmit_bytes,
gv->config.max_queued_rexmit_msgs,
#ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING
@ -1438,6 +1434,8 @@ int rtps_init (struct ddsi_domaingv *gv)
return 0;
err_mc_conn:
if (gv->xmit_conn)
ddsi_conn_free (gv->xmit_conn);
if (gv->disc_conn_mc)
ddsi_conn_free (gv->disc_conn_mc);
if (gv->data_conn_mc && gv->data_conn_mc != gv->disc_conn_mc)
@ -1762,6 +1760,7 @@ void rtps_fini (struct ddsi_domaingv *gv)
(void) joinleave_spdp_defmcip (gv, 0);
ddsi_conn_free (gv->xmit_conn);
ddsi_conn_free (gv->disc_conn_mc);
if (gv->data_conn_mc != gv->disc_conn_mc)
ddsi_conn_free (gv->data_conn_mc);
@ -1770,8 +1769,6 @@ void rtps_fini (struct ddsi_domaingv *gv)
if (gv->data_conn_uc != gv->disc_conn_uc)
ddsi_conn_free (gv->data_conn_uc);
/* Not freeing gv->tev_conn: it aliases data_conn_uc */
free_group_membership(gv->mship);
ddsi_tran_factories_fini (gv);

View file

@ -214,7 +214,7 @@ static int set_reuse_options (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t
return 0;
}
static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multicast, const struct ddsi_domaingv *gv)
static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool bind_to_any, const struct ddsi_domaingv *gv)
{
dds_return_t rc = DDS_RETCODE_ERROR;
@ -226,7 +226,7 @@ static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multica
struct sockaddr_in6 a;
} socketname;
ddsi_ipaddr_from_loc (&socketname.x, &gv->ownloc);
if (multicast || !gv->config.bind_unicast_to_interface_addr)
if (bind_to_any)
socketname.a.sin6_addr = ddsrt_in6addr_any;
socketname.a.sin6_port = htons (port);
if (IN6_IS_ADDR_LINKLOCAL (&socketname.a.sin6_addr)) {
@ -243,7 +243,7 @@ static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multica
struct sockaddr_in a;
} socketname;
ddsi_ipaddr_from_loc (&socketname.x, &gv->ownloc);
if (multicast || !gv->config.bind_unicast_to_interface_addr)
if (bind_to_any)
socketname.a.sin_addr.s_addr = htonl (INADDR_ANY);
socketname.a.sin_port = htons (port);
rc = ddsrt_bind (socket, (struct sockaddr *) &socketname.a, sizeof (socketname.a));
@ -345,7 +345,7 @@ static int set_mc_options_transmit (ddsrt_socket_t socket, const struct ddsi_dom
}
}
int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool multicast, const struct ddsi_domaingv *gv)
int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool reuse_addr, bool bind_to_any, const struct ddsi_domaingv *gv)
{
/* FIXME: this stuff has to move to the transports */
int rc = -2;
@ -373,18 +373,15 @@ int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool multicas
return rc;
}
if (port && multicast && ((rc = set_reuse_options (&gv->logconfig, *sock)) < 0))
if (port && reuse_addr && ((rc = set_reuse_options (&gv->logconfig, *sock)) < 0))
{
goto fail;
}
if
(
(rc = set_rcvbuf (&gv->logconfig, *sock, &gv->config.socket_min_rcvbuf_size) < 0) ||
if ((rc = set_rcvbuf (&gv->logconfig, *sock, &gv->config.socket_min_rcvbuf_size) < 0) ||
(rc = set_sndbuf (&gv->logconfig, *sock, gv->config.socket_min_sndbuf_size) < 0) ||
((rc = maybe_set_dont_route (&gv->logconfig, *sock, &gv->config)) < 0) ||
((rc = bind_socket (*sock, port, multicast, gv)) < 0)
)
((rc = bind_socket (*sock, port, bind_to_any, gv)) < 0))
{
goto fail;
}

View file

@ -3156,7 +3156,7 @@ void trigger_recv_threads (const struct ddsi_domaingv *gv)
iov.iov_base = &dummy;
iov.iov_len = 1;
GVTRACE ("trigger_recv_threads: %d single %s\n", i, ddsi_locator_to_string (buf, sizeof (buf), dst));
ddsi_conn_write (gv->data_conn_uc, dst, 1, &iov, 0);
ddsi_conn_write (gv->xmit_conn, dst, 1, &iov, 0);
break;
}
case RTM_MANY: {