diff --git a/src/core/ddsi/include/ddsi/ddsi_tran.h b/src/core/ddsi/include/ddsi/ddsi_tran.h index 4a1f428..c73eb86 100644 --- a/src/core/ddsi/include/ddsi/ddsi_tran.h +++ b/src/core/ddsi/include/ddsi/ddsi_tran.h @@ -44,6 +44,7 @@ typedef os_handle (*ddsi_tran_handle_fn_t) (ddsi_tran_base_t); typedef int (*ddsi_tran_listen_fn_t) (ddsi_tran_listener_t); typedef void (*ddsi_tran_free_fn_t) (void); typedef void (*ddsi_tran_peer_locator_fn_t) (ddsi_tran_conn_t, nn_locator_t *); +typedef void (*ddsi_tran_disable_multiplexing_fn_t) (ddsi_tran_conn_t); typedef ddsi_tran_conn_t (*ddsi_tran_accept_fn_t) (ddsi_tran_listener_t); typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (uint32_t, ddsi_tran_qos_t); typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (int port, ddsi_tran_qos_t); @@ -102,6 +103,7 @@ struct ddsi_tran_conn ddsi_tran_read_fn_t m_read_fn; ddsi_tran_write_fn_t m_write_fn; ddsi_tran_peer_locator_fn_t m_peer_locator_fn; + ddsi_tran_disable_multiplexing_fn_t m_disable_multiplexing_fn; /* Data */ @@ -209,6 +211,7 @@ void ddsi_factory_conn_init (ddsi_tran_factory_t factory, ddsi_tran_conn_t conn) OSAPI_EXPORT ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const ddsi_iovec_t *iov, uint32_t flags); ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc); bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc); +void ddsi_conn_disable_multiplexing (ddsi_tran_conn_t conn); void ddsi_conn_add_ref (ddsi_tran_conn_t conn); void ddsi_conn_free (ddsi_tran_conn_t conn); diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index 807664c..f35ae4d 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -240,6 +240,7 @@ static ddsi_tran_conn_t ddsi_raweth_create_conn (uint32_t port, ddsi_tran_qos_t uc->m_base.m_base.m_locator_fn = ddsi_raweth_conn_locator; uc->m_base.m_read_fn = ddsi_raweth_conn_read; uc->m_base.m_write_fn = ddsi_raweth_conn_write; + uc->m_base.m_disable_multiplexing_fn = 0; nn_log(LC_INFO, "ddsi_raweth_create_conn %s socket %d port %u\n", mcast ? "multicast" : "unicast", uc->m_sock, uc->m_base.m_base.m_port); return uc ? &uc->m_base : NULL; diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index b2b1f9e..5c6a2e6 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -857,6 +857,7 @@ static void ddsi_tcp_base_init (struct ddsi_tran_conn * base) base->m_read_fn = ddsi_tcp_conn_read; base->m_write_fn = ddsi_tcp_conn_write; base->m_peer_locator_fn = ddsi_tcp_conn_peer_locator; + base->m_disable_multiplexing_fn = 0; } static ddsi_tcp_conn_t ddsi_tcp_new_conn (os_socket sock, bool server, os_sockaddr * peer) diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index 5b5576b..3eae4f6 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -170,6 +170,13 @@ ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t return ret; } +void ddsi_conn_disable_multiplexing (ddsi_tran_conn_t conn) +{ + if (conn->m_disable_multiplexing_fn) { + (conn->m_disable_multiplexing_fn) (conn); + } +} + bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc) { if (conn->m_peer_locator_fn) diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index bf08a9c..5ac3d39 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -175,6 +175,18 @@ static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *d return ret; } +static void ddsi_udp_disable_multiplexing (ddsi_tran_conn_t base) +{ +#if defined _WIN32 && !defined WINCE + ddsi_udp_conn_t uc = (ddsi_udp_conn_t) base; + uint32_t zero = 0, dummy; + WSAEventSelect(uc->m_sock, 0, 0); + WSAIoctl(uc->m_sock, FIONBIO, &zero,sizeof(zero), NULL,0, &dummy, NULL,NULL); +#else + (void)base; +#endif +} + static os_handle ddsi_udp_conn_handle (ddsi_tran_base_t base) { return ((ddsi_udp_conn_t) base)->m_sock; @@ -257,6 +269,7 @@ static ddsi_tran_conn_t ddsi_udp_create_conn uc->m_base.m_read_fn = ddsi_udp_conn_read; uc->m_base.m_write_fn = ddsi_udp_conn_write; + uc->m_base.m_disable_multiplexing_fn = ddsi_udp_disable_multiplexing; nn_log ( diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 736896a..8270fa8 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -777,6 +777,7 @@ static int setup_and_start_recv_threads (void) gv.recv_threads[gv.n_recv_threads].arg.mode = RTM_SINGLE; gv.recv_threads[gv.n_recv_threads].arg.u.single.conn = gv.data_conn_mc; gv.recv_threads[gv.n_recv_threads].arg.u.single.loc = &gv.loc_default_mc; + ddsi_conn_disable_multiplexing (gv.data_conn_mc); gv.n_recv_threads++; } if (config.many_sockets_mode == MSM_SINGLE_UNICAST) @@ -786,6 +787,7 @@ static int setup_and_start_recv_threads (void) gv.recv_threads[gv.n_recv_threads].arg.mode = RTM_SINGLE; gv.recv_threads[gv.n_recv_threads].arg.u.single.conn = gv.data_conn_uc; gv.recv_threads[gv.n_recv_threads].arg.u.single.loc = &gv.loc_default_uc; + ddsi_conn_disable_multiplexing (gv.data_conn_uc); gv.n_recv_threads++; } }