diff --git a/src/core/ddsi/include/ddsi/ddsi_tran.h b/src/core/ddsi/include/ddsi/ddsi_tran.h index e573591..5ae1687 100644 --- a/src/core/ddsi/include/ddsi/ddsi_tran.h +++ b/src/core/ddsi/include/ddsi/ddsi_tran.h @@ -36,7 +36,7 @@ typedef struct ddsi_tran_qos * ddsi_tran_qos_t; /* Function pointer types */ -typedef ssize_t (*ddsi_tran_read_fn_t) (ddsi_tran_conn_t, unsigned char *, size_t, nn_locator_t *); +typedef ssize_t (*ddsi_tran_read_fn_t) (ddsi_tran_conn_t, unsigned char *, size_t, bool, nn_locator_t *); typedef ssize_t (*ddsi_tran_write_fn_t) (ddsi_tran_conn_t, const nn_locator_t *, size_t, const os_iovec_t *, uint32_t); typedef int (*ddsi_tran_locator_fn_t) (ddsi_tran_base_t, nn_locator_t *); typedef bool (*ddsi_tran_supports_fn_t) (int32_t); @@ -220,8 +220,8 @@ inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc) { inline ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags) { return conn->m_closed ? -1 : (conn->m_write_fn) (conn, dst, niov, iov, flags); } -inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc) { - return conn->m_closed ? -1 : conn->m_read_fn (conn, buf, len, srcloc); +inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc) { + return conn->m_closed ? -1 : conn->m_read_fn (conn, buf, len, allow_spurious, srcloc); } bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc); void ddsi_conn_disable_multiplexing (ddsi_tran_conn_t conn); diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index 4c8cb2d..46c01f9 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -63,7 +63,7 @@ static char *ddsi_raweth_to_string (ddsi_tran_factory_t tran, char *dst, size_t return dst; } -static ssize_t ddsi_raweth_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc) +static ssize_t ddsi_raweth_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc) { int err; ssize_t ret; @@ -71,6 +71,7 @@ static ssize_t ddsi_raweth_conn_read (ddsi_tran_conn_t conn, unsigned char * buf struct sockaddr_ll src; struct iovec msg_iov; socklen_t srclen = (socklen_t) sizeof (src); + (void) allow_spurious; msg_iov.iov_base = (void*) buf; msg_iov.iov_len = len; diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index 58b2ed4..3ec01d9 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -395,7 +395,7 @@ static int err_is_AGAIN_or_WOULDBLOCK (int err) return 0; } -static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc) +static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc) { ddsi_tcp_conn_t tcp = (ddsi_tcp_conn_t) conn; ssize_t (*rd) (ddsi_tcp_conn_t, void *, size_t, int * err) = ddsi_tcp_conn_read_plain; @@ -436,10 +436,10 @@ static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, s { if (err_is_AGAIN_or_WOULDBLOCK (err)) { - if (ddsi_tcp_select (tcp->m_sock, true, pos) == false) - { + if (allow_spurious && pos == 0) + return 0; + else if (ddsi_tcp_select (tcp->m_sock, true, pos) == false) break; - } } else { diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index 2ecfdce..31c5276 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -31,7 +31,7 @@ extern inline int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc); extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc); extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener); extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener); -extern inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc); +extern inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc); extern inline ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags); void ddsi_factory_add (ddsi_tran_factory_t factory) diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index aaa12db..6d5afd6 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -48,7 +48,7 @@ static struct ddsi_udp_config ddsi_udp_config_g; static struct ddsi_tran_factory ddsi_udp_factory_g; static os_atomic_uint32_t ddsi_udp_init_g = OS_ATOMIC_UINT32_INIT(0); -static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc) +static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc) { int err; ssize_t ret; @@ -56,6 +56,7 @@ static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, s os_sockaddr_storage src; os_iovec_t msg_iov; socklen_t srclen = (socklen_t) sizeof (src); + (void) allow_spurious; msg_iov.iov_base = (void*) buf; msg_iov.iov_len = (os_iov_len_t)len; /* Windows uses unsigned, POSIX (except Linux) int */ diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 16d193f..7a2cd8f 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -2970,7 +2970,13 @@ static bool do_packet /* Read in DDSI header plus MSG_LEN sub message that follows it */ - sz = ddsi_conn_read (conn, buff, stream_hdr_size, &srcloc); + sz = ddsi_conn_read (conn, buff, stream_hdr_size, true, &srcloc); + if (sz == 0) + { + /* Spurious read -- which at this point is still ok */ + nn_rmsg_commit (rmsg); + return true; + } /* Read in remainder of packet */ @@ -2998,7 +3004,7 @@ static bool do_packet } else { - sz = ddsi_conn_read (conn, buff + stream_hdr_size, ml->length - stream_hdr_size, NULL); + sz = ddsi_conn_read (conn, buff + stream_hdr_size, ml->length - stream_hdr_size, false, NULL); if (sz > 0) { sz = (ssize_t) ml->length; @@ -3010,7 +3016,7 @@ static bool do_packet { /* Get next packet */ - sz = ddsi_conn_read (conn, buff, buff_len, &srcloc); + sz = ddsi_conn_read (conn, buff, buff_len, true, &srcloc); } if (sz > 0 && !gv.deaf)