diff --git a/docs/manual/options.md b/docs/manual/options.md index fbdd2aa..a5534b2 100644 --- a/docs/manual/options.md +++ b/docs/manual/options.md @@ -554,7 +554,7 @@ The default value is: "default". ### //CycloneDDS/Domain/Internal -Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BindUnicastToInterfaceAddr](#cycloneddsdomaininternalbindunicasttointerfaceaddr), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsi2directmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration) +Children: [AccelerateRexmitBlockSize](#cycloneddsdomaininternalacceleraterexmitblocksize), [AssumeMulticastCapable](#cycloneddsdomaininternalassumemulticastcapable), [AutoReschedNackDelay](#cycloneddsdomaininternalautoreschednackdelay), [BuiltinEndpointSet](#cycloneddsdomaininternalbuiltinendpointset), [ControlTopic](#cycloneddsdomaininternalcontroltopic), [DDSI2DirectMaxThreads](#cycloneddsdomaininternalddsi2directmaxthreads), [DefragReliableMaxSamples](#cycloneddsdomaininternaldefragreliablemaxsamples), [DefragUnreliableMaxSamples](#cycloneddsdomaininternaldefragunreliablemaxsamples), [DeliveryQueueMaxSamples](#cycloneddsdomaininternaldeliveryqueuemaxsamples), [EnableExpensiveChecks](#cycloneddsdomaininternalenableexpensivechecks), [GenerateKeyhash](#cycloneddsdomaininternalgeneratekeyhash), [HeartbeatInterval](#cycloneddsdomaininternalheartbeatinterval), [LateAckMode](#cycloneddsdomaininternallateackmode), [LeaseDuration](#cycloneddsdomaininternalleaseduration), [LivelinessMonitoring](#cycloneddsdomaininternallivelinessmonitoring), [MaxParticipants](#cycloneddsdomaininternalmaxparticipants), [MaxQueuedRexmitBytes](#cycloneddsdomaininternalmaxqueuedrexmitbytes), [MaxQueuedRexmitMessages](#cycloneddsdomaininternalmaxqueuedrexmitmessages), [MaxSampleSize](#cycloneddsdomaininternalmaxsamplesize), [MeasureHbToAckLatency](#cycloneddsdomaininternalmeasurehbtoacklatency), [MinimumSocketReceiveBufferSize](#cycloneddsdomaininternalminimumsocketreceivebuffersize), [MinimumSocketSendBufferSize](#cycloneddsdomaininternalminimumsocketsendbuffersize), [MonitorPort](#cycloneddsdomaininternalmonitorport), [MultipleReceiveThreads](#cycloneddsdomaininternalmultiplereceivethreads), [NackDelay](#cycloneddsdomaininternalnackdelay), [PreEmptiveAckDelay](#cycloneddsdomaininternalpreemptiveackdelay), [PrimaryReorderMaxSamples](#cycloneddsdomaininternalprimaryreordermaxsamples), [PrioritizeRetransmit](#cycloneddsdomaininternalprioritizeretransmit), [RediscoveryBlacklistDuration](#cycloneddsdomaininternalrediscoveryblacklistduration), [RetransmitMerging](#cycloneddsdomaininternalretransmitmerging), [RetransmitMergingPeriod](#cycloneddsdomaininternalretransmitmergingperiod), [RetryOnRejectBestEffort](#cycloneddsdomaininternalretryonrejectbesteffort), [SPDPResponseMaxDelay](#cycloneddsdomaininternalspdpresponsemaxdelay), [ScheduleTimeRounding](#cycloneddsdomaininternalscheduletimerounding), [SecondaryReorderMaxSamples](#cycloneddsdomaininternalsecondaryreordermaxsamples), [SendAsync](#cycloneddsdomaininternalsendasync), [SquashParticipants](#cycloneddsdomaininternalsquashparticipants), [SynchronousDeliveryLatencyBound](#cycloneddsdomaininternalsynchronousdeliverylatencybound), [SynchronousDeliveryPriorityThreshold](#cycloneddsdomaininternalsynchronousdeliveryprioritythreshold), [Test](#cycloneddsdomaininternaltest), [UnicastResponseToSPDPMessages](#cycloneddsdomaininternalunicastresponsetospdpmessages), [UseMulticastIfMreqn](#cycloneddsdomaininternalusemulticastifmreqn), [Watermarks](#cycloneddsdomaininternalwatermarks), [WriteBatch](#cycloneddsdomaininternalwritebatch), [WriterLingerDuration](#cycloneddsdomaininternalwriterlingerduration) The Internal elements deal with a variety of settings that evolving and @@ -600,15 +600,6 @@ Valid values are finite durations with an explicit unit or the keyword The default value is: "1 s". -#### //CycloneDDS/Domain/Internal/BindUnicastToInterfaceAddr -Boolean - -Bind unicast sockets to the address of the preferred interface; if false, -bind to 0.0.0.0 (IPv4) or its equivalent - -The default value is: "true". - - #### //CycloneDDS/Domain/Internal/BuiltinEndpointSet One of: full, writers, minimal diff --git a/etc/cyclonedds.rnc b/etc/cyclonedds.rnc index f3e4395..0edc899 100644 --- a/etc/cyclonedds.rnc +++ b/etc/cyclonedds.rnc @@ -491,13 +491,6 @@ day.

The default value is: "1 s".

""" ] ] duration_inf }? & [ a:documentation [ xml:lang="en" """ -

Bind unicast sockets to the address of the preferred interface; if -false, bind to 0.0.0.0 (IPv4) or its equivalent

The default value -is: "true".

""" ] ] - element BindUnicastToInterfaceAddr { - xsd:boolean - }? - & [ a:documentation [ xml:lang="en" """

This element controls which participants will have which built-in endpoints for the discovery and liveliness protocols. Valid values are:

diff --git a/etc/cyclonedds.xsd b/etc/cyclonedds.xsd index dbcab47..49d4378 100644 --- a/etc/cyclonedds.xsd +++ b/etc/cyclonedds.xsd @@ -638,7 +638,6 @@ reserved. This includes renaming or moving options.</p> - @@ -717,14 +716,6 @@ of HEARTBEAT messages.</p> day.</p><p>The default value is: &quot;1 s&quot;.</p> - - - -<p>Bind unicast sockets to the address of the preferred interface; if -false, bind to 0.0.0.0 (IPv4) or its equivalent</p><p>The default value -is: &quot;true&quot;.</p> - - diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index a2e341f..caedd81 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -292,8 +292,6 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit } } - ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc; - if ((rc = dds_topic_pin (topic, &tp)) != DDS_RETCODE_OK) goto err_pin_topic; assert (tp->m_stopic); @@ -331,6 +329,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit } /* Create writer */ + ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.xmit_conn; struct dds_writer * const wr = dds_alloc (sizeof (*wr)); const dds_entity_t writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK); wr->m_topic = tp; diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h b/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h index caadcb1..b5747a7 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_domaingv.h @@ -115,18 +115,40 @@ struct ddsi_domaingv { DCPS participant of DDSI2 itself will be mirrored in a DDSI participant, and in multi-socket mode that one gets its own socket. */ - struct ddsi_tran_conn * disc_conn_mc; struct ddsi_tran_conn * data_conn_mc; struct ddsi_tran_conn * disc_conn_uc; struct ddsi_tran_conn * data_conn_uc; - /* TCP listener */ + /* Connection used for all output (for connectionless transports), this + used to simply be data_conn_uc, but: + - Windows has a quirk that makes multicast delivery within a machine + utterly unreliable if the transmitting socket is bound to 0.0.0.0 + (despite all sockets having multicast interfaces set correctly), + but apparently only in the presence of sockets transmitting to the + same multicast group that have been bound to non-0.0.0.0 ... + - At least Fast-RTPS and Connext fail to honour the set of advertised + addresses and substitute 127.0.0.1 for the advertised IP address and + expect it to work. + - Fast-RTPS (at least) binds the socket it uses for transmitting + multicasts to non-0.0.0.0 + + So binding to 0.0.0.0 means the unicasts from Fast-RTPS & Connext will + arrive but the multicasts from Cyclone get dropped often on Windows + when trying to interoperate; and binding to the IP address means + unicast messages from the others fail to arrive (because they fail to + arrive). + + The only work around is to use a separate socket for sending. It is + rather sad that Cyclone needs to work around the bugs of the others, + but it seems the only way to get the users what they expect. */ + struct ddsi_tran_conn * xmit_conn; + + /* TCP listener */ struct ddsi_tran_listener * listener; /* Thread pool */ - struct ddsrt_thread_pool_s * thread_pool; /* In many sockets mode, the receive threads maintain a local array @@ -249,10 +271,6 @@ struct ddsi_domaingv { delivery queue; currently just SEDP and PMD */ struct nn_dqueue *builtins_dqueue; - /* Connection used by general timed-event queue for transmitting data */ - - struct ddsi_tran_conn * tev_conn; - struct debug_monitor *debmon; #ifndef DDSI_INCLUDE_NETWORK_CHANNELS diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h index 4d7d0e4..c6ccc8f 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h @@ -46,7 +46,7 @@ typedef struct ddsi_tran_base * ddsi_tran_base_t; typedef struct ddsi_tran_conn * ddsi_tran_conn_t; typedef struct ddsi_tran_listener * ddsi_tran_listener_t; typedef struct ddsi_tran_factory * ddsi_tran_factory_t; -typedef struct ddsi_tran_qos * ddsi_tran_qos_t; +typedef struct ddsi_tran_qos ddsi_tran_qos_t; /* Function pointer types */ @@ -60,8 +60,8 @@ typedef void (*ddsi_tran_free_fn_t) (ddsi_tran_factory_t); typedef void (*ddsi_tran_peer_locator_fn_t) (ddsi_tran_conn_t, nn_locator_t *); typedef void (*ddsi_tran_disable_multiplexing_fn_t) (ddsi_tran_conn_t); typedef ddsi_tran_conn_t (*ddsi_tran_accept_fn_t) (ddsi_tran_listener_t); -typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, ddsi_tran_qos_t); -typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t); +typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, const struct ddsi_tran_qos *); +typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *); typedef void (*ddsi_tran_release_conn_fn_t) (ddsi_tran_conn_t); typedef void (*ddsi_tran_close_conn_fn_t) (ddsi_tran_conn_t); typedef void (*ddsi_tran_unblock_listener_fn_t) (ddsi_tran_listener_t); @@ -189,11 +189,15 @@ struct ddsi_tran_factory ddsi_tran_factory_t m_factory; }; +enum ddsi_tran_qos_purpose { + DDSI_TRAN_QOS_XMIT, + DDSI_TRAN_QOS_RECV_UC, + DDSI_TRAN_QOS_RECV_MC +}; + struct ddsi_tran_qos { - /* QoS Data */ - - bool m_multicast; + enum ddsi_tran_qos_purpose m_purpose; int m_diffserv; }; @@ -210,20 +214,18 @@ inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int3 inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port) { return factory->m_is_valid_port_fn (factory, port); } -inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) { +inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) { if (!ddsi_is_valid_port (factory, port)) return NULL; return factory->m_create_conn_fn (factory, port, qos); } -inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) { +inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) { if (!ddsi_is_valid_port (factory, port)) return NULL; return factory->m_create_listener_fn (factory, port, qos); } void ddsi_tran_free (ddsi_tran_base_t base); -void ddsi_tran_free_qos (ddsi_tran_qos_t qos); -ddsi_tran_qos_t ddsi_tran_create_qos (void); inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base) { return base->m_handle_fn (base); } diff --git a/src/core/ddsi/include/dds/ddsi/q_config.h b/src/core/ddsi/include/dds/ddsi/q_config.h index cb0ba65..fd66ad2 100644 --- a/src/core/ddsi/include/dds/ddsi/q_config.h +++ b/src/core/ddsi/include/dds/ddsi/q_config.h @@ -332,7 +332,6 @@ struct config int64_t initial_deaf_mute_reset; int use_multicast_if_mreqn; - int bind_unicast_to_interface_addr; struct prune_deleted_ppant prune_deleted_ppant; }; diff --git a/src/core/ddsi/include/dds/ddsi/q_nwif.h b/src/core/ddsi/include/dds/ddsi/q_nwif.h index dd27df6..12f232e 100644 --- a/src/core/ddsi/include/dds/ddsi/q_nwif.h +++ b/src/core/ddsi/include/dds/ddsi/q_nwif.h @@ -35,7 +35,7 @@ struct nn_interface { char *name; }; -int make_socket (ddsrt_socket_t *socket, uint16_t port, bool stream, bool multicast, const struct ddsi_domaingv *gv); +int make_socket (ddsrt_socket_t *socket, uint16_t port, bool stream, bool reuse_addr, bool bind_to_any, const struct ddsi_domaingv *gv); int find_own_ip (struct ddsi_domaingv *gv, const char *requested_address); uint32_t locator_to_hopefully_unique_uint32 (const nn_locator_t *src); diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index 1f7ed6a..6e0bca2 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -175,13 +175,13 @@ static int ddsi_raweth_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t return ret; } -static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos) +static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos) { ddsrt_socket_t sock; dds_return_t rc; ddsi_raweth_conn_t uc = NULL; struct sockaddr_ll addr; - bool mcast = (bool) (qos ? qos->m_multicast : 0); + bool mcast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC); /* If port is zero, need to create dynamic port */ diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index 5a7ed06..5a87fd1 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -167,7 +167,7 @@ static void ddsi_tcp_sock_free (const struct ddsrt_log_cfg *logcfg, ddsrt_socket static void ddsi_tcp_sock_new (ddsrt_socket_t *sock, unsigned short port, const struct ddsi_domaingv *gv) { - if (make_socket (sock, port, true, true, gv) != 0) + if (make_socket (sock, port, true, true, true, gv) != 0) { *sock = DDSRT_INVALID_SOCKET; } @@ -706,7 +706,7 @@ static int ddsi_tcp_locator (struct ddsi_tran_factory *fact_cmn, ddsi_tran_base_ return 0; } -static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, ddsi_tran_qos_t qos) +static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn; (void) qos; @@ -853,7 +853,7 @@ static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, dd return conn; } -static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos) +static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos) { char buff[DDSI_LOCSTRLEN]; ddsrt_socket_t sock; diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index 2bdda5d..fdf1fcf 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -24,13 +24,13 @@ extern inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn); extern inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn); -extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos); +extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos); extern inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int32_t kind); extern inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port); extern inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn); extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc); extern inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base); -extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos); +extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos); extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc); extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener); extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener); @@ -182,11 +182,6 @@ bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc) return false; } -void ddsi_tran_free_qos (ddsi_tran_qos_t qos) -{ - ddsrt_free (qos); -} - int ddsi_conn_join_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf) { return conn->m_factory->m_join_mc_fn (conn, srcloc, mcloc, interf); @@ -197,14 +192,6 @@ int ddsi_conn_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const return conn->m_factory->m_leave_mc_fn (conn, srcloc, mcloc, interf); } -ddsi_tran_qos_t ddsi_tran_create_qos (void) -{ - ddsi_tran_qos_t qos; - qos = (ddsi_tran_qos_t) ddsrt_malloc (sizeof (*qos)); - memset (qos, 0, sizeof (*qos)); - return qos; -} - void ddsi_tran_free (ddsi_tran_base_t base) { if (base) diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index 8720468..ede09e7 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -218,16 +218,37 @@ static unsigned short get_socket_port (const struct ddsrt_log_cfg *logcfg, ddsrt return ddsrt_sockaddr_get_port((struct sockaddr *)&addr); } -static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, ddsi_tran_qos_t qos) +static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos) { int ret; ddsrt_socket_t sock; ddsi_udp_conn_t uc = NULL; - bool mcast = (bool) (qos ? qos->m_multicast : false); + bool reuse_addr = false, bind_to_any = false; + const char *purpose_str = NULL; + + switch (qos->m_purpose) + { + case DDSI_TRAN_QOS_XMIT: + reuse_addr = false; + bind_to_any = false; + purpose_str = "transmit"; + break; + case DDSI_TRAN_QOS_RECV_UC: + reuse_addr = false; + bind_to_any = true; + purpose_str = "unicast"; + break; + case DDSI_TRAN_QOS_RECV_MC: + reuse_addr = true; + bind_to_any = true; + purpose_str = "multicast"; + break; + } + assert (purpose_str != NULL); /* If port is zero, need to create dynamic port */ - ret = make_socket (&sock, (unsigned short) port, false, mcast, fact->gv); + ret = make_socket (&sock, (unsigned short) port, false, reuse_addr, bind_to_any, fact->gv); if (ret == 0) { @@ -235,7 +256,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t memset (uc, 0, sizeof (*uc)); uc->m_sock = sock; - uc->m_diffserv = qos ? qos->m_diffserv : 0; + uc->m_diffserv = qos->m_diffserv; #if defined _WIN32 && !defined WINCE uc->m_sockEvent = WSACreateEvent(); WSAEventSelect(uc->m_sock, uc->m_sockEvent, FD_WRITE); @@ -244,7 +265,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t ddsi_factory_conn_init (fact, &uc->m_base); uc->m_base.m_base.m_port = get_socket_port (&fact->gv->logconfig, sock); uc->m_base.m_base.m_trantype = DDSI_TRAN_CONN; - uc->m_base.m_base.m_multicast = mcast; + uc->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC); uc->m_base.m_base.m_handle_fn = ddsi_udp_conn_handle; uc->m_base.m_read_fn = ddsi_udp_conn_read; @@ -254,7 +275,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t DDS_CTRACE (&fact->gv->logconfig, "ddsi_udp_create_conn %s socket %"PRIdSOCK" port %"PRIu32"\n", - mcast ? "multicast" : "unicast", + purpose_str, uc->m_sock, uc->m_base.m_base.m_port); #ifdef DDSI_INCLUDE_NETWORK_CHANNELS @@ -271,7 +292,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t { if (fact->gv->config.participantIndex != PARTICIPANT_INDEX_AUTO) { - DDS_CERROR (&fact->gv->logconfig, "UDP make_socket failed for %s port %"PRIu32"\n", mcast ? "multicast" : "unicast", port); + DDS_CERROR (&fact->gv->logconfig, "UDP make_socket failed for %s port %"PRIu32"\n", purpose_str, port); } } diff --git a/src/core/ddsi/src/q_config.c b/src/core/ddsi/src/q_config.c index 82d1d71..2f3fbf6 100644 --- a/src/core/ddsi/src/q_config.c +++ b/src/core/ddsi/src/q_config.c @@ -578,8 +578,6 @@ static const struct cfgelem internal_cfgelems[] = { BLURB("

This element controls whether retransmits are prioritized over new data, speeding up recovery.

") }, { LEAF("UseMulticastIfMreqn"), 1, "0", ABSOFF(use_multicast_if_mreqn), 0, uf_int, 0, pf_int, BLURB("

Do not use.

") }, - { LEAF("BindUnicastToInterfaceAddr"), 1, "true", ABSOFF(bind_unicast_to_interface_addr), 0, uf_boolean, 0, pf_boolean, - BLURB("

Bind unicast sockets to the address of the preferred interface; if false, bind to 0.0.0.0 (IPv4) or its equivalent

") }, { LEAF("SendAsync"), 1, "false", ABSOFF(xpack_send_async), 0, uf_boolean, 0, pf_boolean, BLURB("

This element controls whether the actual sending of packets occurs on the same thread that prepares them, or is done asynchronously by another thread.

") }, { LEAF_W_ATTRS("RediscoveryBlacklistDuration", rediscovery_blacklist_duration_attrs), 1, "10s", ABSOFF(prune_deleted_ppant.delay), 0, uf_duration_inf, 0, pf_duration, diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index f52cddb..0790a00 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -652,7 +652,8 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain if (gv->config.many_sockets_mode == MSM_MANY_UNICAST) { - pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, NULL); + const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 }; + pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos); ddsi_conn_locator (pp->m_conn, &pp->m_locator); } else diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 46ac147..7cb0fb3 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -96,14 +96,15 @@ static enum make_uc_sockets_ret make_uc_sockets (struct ddsi_domaingv *gv, uint3 if (!ddsi_is_valid_port (gv->m_factory, *pdisc) || !ddsi_is_valid_port (gv->m_factory, *pdata)) return MUSRET_INVALID_PORTS; - gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, NULL); + const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 }; + gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, &qos); if (gv->disc_conn_uc) { /* Check not configured to use same unicast port for data and discovery */ if (*pdata != 0 && (*pdata != *pdisc)) { - gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, NULL); + gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, &qos); } else { @@ -663,10 +664,9 @@ int joinleave_spdp_defmcip (struct ddsi_domaingv *gv, int dojoin) int create_multicast_sockets (struct ddsi_domaingv *gv) { - ddsi_tran_qos_t qos = ddsi_tran_create_qos (); + const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_MC, .m_diffserv = 0 }; ddsi_tran_conn_t disc, data; uint32_t port; - qos->m_multicast = 1; port = ddsi_get_port (&gv->config, DDSI_PORT_MULTI_DISC, 0); if (!ddsi_is_valid_port (gv->m_factory, port)) @@ -675,7 +675,7 @@ int create_multicast_sockets (struct ddsi_domaingv *gv) gv->config.extDomainId.value, port); goto err_disc; } - if ((disc = ddsi_factory_create_conn (gv->m_factory, port, qos)) == NULL) + if ((disc = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL) goto err_disc; if (gv->config.many_sockets_mode == MSM_NO_UNICAST) { @@ -691,12 +691,11 @@ int create_multicast_sockets (struct ddsi_domaingv *gv) gv->config.extDomainId.value, port); goto err_disc; } - if ((data = ddsi_factory_create_conn (gv->m_factory, port, qos)) == NULL) + if ((data = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL) { goto err_data; } } - ddsi_tran_free_qos (qos); gv->disc_conn_mc = disc; gv->data_conn_mc = data; @@ -707,7 +706,6 @@ int create_multicast_sockets (struct ddsi_domaingv *gv) err_data: ddsi_conn_free (disc); err_disc: - ddsi_tran_free_qos (qos); return 0; } @@ -964,7 +962,7 @@ int rtps_init (struct ddsi_domaingv *gv) gv->data_conn_uc = NULL; gv->disc_conn_mc = NULL; gv->data_conn_mc = NULL; - gv->tev_conn = NULL; + gv->xmit_conn = NULL; gv->listener = NULL; gv->thread_pool = NULL; gv->debmon = NULL; @@ -1280,9 +1278,6 @@ int rtps_init (struct ddsi_domaingv *gv) } else { - /* Must have a data_conn_uc/tev_conn/transmit_conn */ - gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, 0, NULL); - if (gv->config.tcp_port == -1) ; /* nop */ else if (!ddsi_is_valid_port (gv->m_factory, (uint32_t) gv->config.tcp_port)) @@ -1311,9 +1306,10 @@ int rtps_init (struct ddsi_domaingv *gv) } /* Create shared transmit connection */ - - gv->tev_conn = gv->data_conn_uc; - GVLOG (DDS_LC_CONFIG, "Timed event transmit port: %d\n", (int) ddsi_conn_port (gv->tev_conn)); + { + const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_XMIT, .m_diffserv = 0 }; + gv->xmit_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos); + } #ifdef DDSI_INCLUDE_NETWORK_CHANNELS { @@ -1376,7 +1372,7 @@ int rtps_init (struct ddsi_domaingv *gv) gv->xevents = xeventq_new ( - gv->tev_conn, + gv->xmit_conn, gv->config.max_queued_rexmit_bytes, gv->config.max_queued_rexmit_msgs, #ifdef DDSI_INCLUDE_BANDWIDTH_LIMITING @@ -1438,6 +1434,8 @@ int rtps_init (struct ddsi_domaingv *gv) return 0; err_mc_conn: + if (gv->xmit_conn) + ddsi_conn_free (gv->xmit_conn); if (gv->disc_conn_mc) ddsi_conn_free (gv->disc_conn_mc); if (gv->data_conn_mc && gv->data_conn_mc != gv->disc_conn_mc) @@ -1762,6 +1760,7 @@ void rtps_fini (struct ddsi_domaingv *gv) (void) joinleave_spdp_defmcip (gv, 0); + ddsi_conn_free (gv->xmit_conn); ddsi_conn_free (gv->disc_conn_mc); if (gv->data_conn_mc != gv->disc_conn_mc) ddsi_conn_free (gv->data_conn_mc); @@ -1770,8 +1769,6 @@ void rtps_fini (struct ddsi_domaingv *gv) if (gv->data_conn_uc != gv->disc_conn_uc) ddsi_conn_free (gv->data_conn_uc); - /* Not freeing gv->tev_conn: it aliases data_conn_uc */ - free_group_membership(gv->mship); ddsi_tran_factories_fini (gv); diff --git a/src/core/ddsi/src/q_nwif.c b/src/core/ddsi/src/q_nwif.c index 5789983..9bbbb5c 100644 --- a/src/core/ddsi/src/q_nwif.c +++ b/src/core/ddsi/src/q_nwif.c @@ -214,7 +214,7 @@ static int set_reuse_options (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t return 0; } -static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multicast, const struct ddsi_domaingv *gv) +static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool bind_to_any, const struct ddsi_domaingv *gv) { dds_return_t rc = DDS_RETCODE_ERROR; @@ -226,7 +226,7 @@ static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multica struct sockaddr_in6 a; } socketname; ddsi_ipaddr_from_loc (&socketname.x, &gv->ownloc); - if (multicast || !gv->config.bind_unicast_to_interface_addr) + if (bind_to_any) socketname.a.sin6_addr = ddsrt_in6addr_any; socketname.a.sin6_port = htons (port); if (IN6_IS_ADDR_LINKLOCAL (&socketname.a.sin6_addr)) { @@ -243,7 +243,7 @@ static int bind_socket (ddsrt_socket_t socket, unsigned short port, bool multica struct sockaddr_in a; } socketname; ddsi_ipaddr_from_loc (&socketname.x, &gv->ownloc); - if (multicast || !gv->config.bind_unicast_to_interface_addr) + if (bind_to_any) socketname.a.sin_addr.s_addr = htonl (INADDR_ANY); socketname.a.sin_port = htons (port); rc = ddsrt_bind (socket, (struct sockaddr *) &socketname.a, sizeof (socketname.a)); @@ -345,7 +345,7 @@ static int set_mc_options_transmit (ddsrt_socket_t socket, const struct ddsi_dom } } -int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool multicast, const struct ddsi_domaingv *gv) +int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool reuse_addr, bool bind_to_any, const struct ddsi_domaingv *gv) { /* FIXME: this stuff has to move to the transports */ int rc = -2; @@ -373,18 +373,15 @@ int make_socket (ddsrt_socket_t *sock, uint16_t port, bool stream, bool multicas return rc; } - if (port && multicast && ((rc = set_reuse_options (&gv->logconfig, *sock)) < 0)) + if (port && reuse_addr && ((rc = set_reuse_options (&gv->logconfig, *sock)) < 0)) { goto fail; } - if - ( - (rc = set_rcvbuf (&gv->logconfig, *sock, &gv->config.socket_min_rcvbuf_size) < 0) || - (rc = set_sndbuf (&gv->logconfig, *sock, gv->config.socket_min_sndbuf_size) < 0) || - ((rc = maybe_set_dont_route (&gv->logconfig, *sock, &gv->config)) < 0) || - ((rc = bind_socket (*sock, port, multicast, gv)) < 0) - ) + if ((rc = set_rcvbuf (&gv->logconfig, *sock, &gv->config.socket_min_rcvbuf_size) < 0) || + (rc = set_sndbuf (&gv->logconfig, *sock, gv->config.socket_min_sndbuf_size) < 0) || + ((rc = maybe_set_dont_route (&gv->logconfig, *sock, &gv->config)) < 0) || + ((rc = bind_socket (*sock, port, bind_to_any, gv)) < 0)) { goto fail; } diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 6ae7d41..7b138a8 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -3156,7 +3156,7 @@ void trigger_recv_threads (const struct ddsi_domaingv *gv) iov.iov_base = &dummy; iov.iov_len = 1; GVTRACE ("trigger_recv_threads: %d single %s\n", i, ddsi_locator_to_string (buf, sizeof (buf), dst)); - ddsi_conn_write (gv->data_conn_uc, dst, 1, &iov, 0); + ddsi_conn_write (gv->xmit_conn, dst, 1, &iov, 0); break; } case RTM_MANY: {