From e1201e678d7e70b704f16b729e5629fd5b381684 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 13 Mar 2020 15:14:17 +0100 Subject: [PATCH] Minor cleanup of UDP, TCP support code Signed-off-by: Erik Boasson --- src/core/ddsi/include/dds/ddsi/ddsi_tran.h | 18 +- src/core/ddsi/include/dds/ddsi/q_pcap.h | 2 +- src/core/ddsi/src/ddsi_raweth.c | 13 +- src/core/ddsi/src/ddsi_tcp.c | 364 ++++++++-------- src/core/ddsi/src/ddsi_tran.c | 4 +- src/core/ddsi/src/ddsi_udp.c | 479 +++++++++++---------- src/core/ddsi/src/q_debmon.c | 3 +- src/core/ddsi/src/q_entity.c | 24 +- src/core/ddsi/src/q_init.c | 85 ++-- src/core/ddsi/src/q_pcap.c | 4 +- 10 files changed, 517 insertions(+), 479 deletions(-) diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h index c6ccc8f..f18b2db 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h @@ -60,8 +60,8 @@ typedef void (*ddsi_tran_free_fn_t) (ddsi_tran_factory_t); typedef void (*ddsi_tran_peer_locator_fn_t) (ddsi_tran_conn_t, nn_locator_t *); typedef void (*ddsi_tran_disable_multiplexing_fn_t) (ddsi_tran_conn_t); typedef ddsi_tran_conn_t (*ddsi_tran_accept_fn_t) (ddsi_tran_listener_t); -typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, const struct ddsi_tran_qos *); -typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *); +typedef dds_return_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_conn_t *conn, ddsi_tran_factory_t fact, uint32_t, const struct ddsi_tran_qos *); +typedef dds_return_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_listener_t *listener, ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *); typedef void (*ddsi_tran_release_conn_fn_t) (ddsi_tran_conn_t); typedef void (*ddsi_tran_close_conn_fn_t) (ddsi_tran_conn_t); typedef void (*ddsi_tran_unblock_listener_fn_t) (ddsi_tran_listener_t); @@ -214,15 +214,17 @@ inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int3 inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port) { return factory->m_is_valid_port_fn (factory, port); } -inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) { +inline dds_return_t ddsi_factory_create_conn (ddsi_tran_conn_t *conn, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) { + *conn = NULL; if (!ddsi_is_valid_port (factory, port)) - return NULL; - return factory->m_create_conn_fn (factory, port, qos); + return DDS_RETCODE_BAD_PARAMETER; + return factory->m_create_conn_fn (conn, factory, port, qos); } -inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) { +inline dds_return_t ddsi_factory_create_listener (ddsi_tran_listener_t *listener, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) { + *listener = NULL; if (!ddsi_is_valid_port (factory, port)) - return NULL; - return factory->m_create_listener_fn (factory, port, qos); + return DDS_RETCODE_BAD_PARAMETER; + return factory->m_create_listener_fn (listener, factory, port, qos); } void ddsi_tran_free (ddsi_tran_base_t base); diff --git a/src/core/ddsi/include/dds/ddsi/q_pcap.h b/src/core/ddsi/include/dds/ddsi/q_pcap.h index 6ae6d66..f69d3bb 100644 --- a/src/core/ddsi/include/dds/ddsi/q_pcap.h +++ b/src/core/ddsi/include/dds/ddsi/q_pcap.h @@ -21,7 +21,7 @@ extern "C" { struct msghdr; -FILE * new_pcap_file (const struct ddsrt_log_cfg *logcfg, const char *name); +FILE * new_pcap_file (struct ddsi_domaingv *gv, const char *name); void write_pcap_received (struct ddsi_domaingv *gv, ddsrt_wctime_t tstamp, const struct sockaddr_storage *src, const struct sockaddr_storage *dst, unsigned char *buf, size_t sz); void write_pcap_sent (struct ddsi_domaingv *gv, ddsrt_wctime_t tstamp, const struct sockaddr_storage *src, diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index 7237e4a..2dcb957 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -174,7 +174,7 @@ static int ddsi_raweth_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t return ret; } -static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos) +static dds_return_t ddsi_raweth_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos) { ddsrt_socket_t sock; dds_return_t rc; @@ -187,14 +187,14 @@ static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint3 if (port == 0 || port > 65535) { DDS_CERROR (&fact->gv->logconfig, "ddsi_raweth_create_conn %s port %u - using port number as ethernet type, %u won't do\n", mcast ? "multicast" : "unicast", port, port); - return NULL; + return DDS_RETCODE_ERROR; } rc = ddsrt_socket(&sock, PF_PACKET, SOCK_DGRAM, htons((uint16_t)port)); if (rc != DDS_RETCODE_OK) { DDS_CERROR (&fact->gv->logconfig, "ddsi_raweth_create_conn %s port %u failed ... retcode = %d\n", mcast ? "multicast" : "unicast", port, rc); - return NULL; + return DDS_RETCODE_ERROR; } memset(&addr, 0, sizeof(addr)); @@ -207,13 +207,13 @@ static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint3 { ddsrt_close(sock); DDS_CERROR (&fact->gv->logconfig, "ddsi_raweth_create_conn %s bind port %u failed ... retcode = %d\n", mcast ? "multicast" : "unicast", port, rc); - return NULL; + return DDS_RETCODE_ERROR; } if ((uc = (ddsi_raweth_conn_t) ddsrt_malloc (sizeof (*uc))) == NULL) { ddsrt_close(sock); - return NULL; + return DDS_RETCODE_ERROR; } memset (uc, 0, sizeof (*uc)); @@ -230,7 +230,8 @@ static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint3 uc->m_base.m_disable_multiplexing_fn = 0; DDS_CTRACE (&fact->gv->logconfig, "ddsi_raweth_create_conn %s socket %d port %u\n", mcast ? "multicast" : "unicast", uc->m_sock, uc->m_base.m_base.m_port); - return &uc->m_base; + *conn_out = &uc->m_base; + return DDS_RETCODE_OK; } static int isbroadcast(const nn_locator_t *loc) diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index 5cf5b62..de7a7e6 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -39,9 +39,15 @@ wait set that manages their lifecycle. */ +union addr { + struct sockaddr a; + struct sockaddr_in a4; + struct sockaddr_in6 a6; +}; + typedef struct ddsi_tcp_conn { struct ddsi_tran_conn m_base; - struct sockaddr_storage m_peer_addr; + union addr m_peer_addr; uint32_t m_peer_port; ddsrt_mutex_t m_mutex; ddsrt_socket_t m_sock; @@ -70,8 +76,8 @@ struct ddsi_tran_factory_tcp { static int ddsi_tcp_cmp_conn (const struct ddsi_tcp_conn *c1, const struct ddsi_tcp_conn *c2) { - const struct sockaddr *a1s = (struct sockaddr *)&c1->m_peer_addr; - const struct sockaddr *a2s = (struct sockaddr *)&c2->m_peer_addr; + const struct sockaddr *a1s = &c1->m_peer_addr.a; + const struct sockaddr *a2s = &c2->m_peer_addr.a; if (a1s->sa_family != a2s->sa_family) return (a1s->sa_family < a2s->sa_family) ? -1 : 1; else if (c1->m_peer_port != c2->m_peer_port) @@ -132,114 +138,91 @@ static void ddsi_tcp_cache_dump (void) } */ -static unsigned short get_socket_port (struct ddsrt_log_cfg *logcfg, ddsrt_socket_t socket) +static uint16_t get_socket_port (struct ddsi_domaingv const * const gv, ddsrt_socket_t socket) { - struct sockaddr_storage addr; + union addr addr; socklen_t addrlen = sizeof (addr); dds_return_t ret; - ret = ddsrt_getsockname(socket, (struct sockaddr *)&addr, &addrlen); + ret = ddsrt_getsockname(socket, &addr.a, &addrlen); if (ret != DDS_RETCODE_OK) { - DDS_CERROR (logcfg, "ddsi_tcp_get_socket_port: ddsrt_getsockname retcode %"PRId32"\n", ret); + GVERROR ("ddsi_tcp_get_socket_port: ddsrt_getsockname retcode %"PRId32"\n", ret); return 0; } - return ddsrt_sockaddr_get_port((struct sockaddr *)&addr); + return ddsrt_sockaddr_get_port (&addr.a); } static void ddsi_tcp_conn_set_socket (ddsi_tcp_conn_t conn, ddsrt_socket_t sock) { + struct ddsi_domaingv const * const gv = conn->m_base.m_base.gv; conn->m_sock = sock; - conn->m_base.m_base.m_port = (sock == DDSRT_INVALID_SOCKET) ? INVALID_PORT : get_socket_port (&conn->m_base.m_base.gv->logconfig, sock); + conn->m_base.m_base.m_port = (sock == DDSRT_INVALID_SOCKET) ? INVALID_PORT : get_socket_port (gv, sock); } -static void ddsi_tcp_sock_free (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t sock, const char *msg) +static void ddsi_tcp_sock_free (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, const char *msg) { if (sock != DDSRT_INVALID_SOCKET) { if (msg) - { - DDS_CLOG (DDS_LC_TCP, logcfg, "tcp %s free socket %"PRIdSOCK"\n", msg, sock); - } + GVLOG (DDS_LC_TCP, "tcp %s free socket %"PRIdSOCK"\n", msg, sock); ddsrt_close (sock); } } static dds_return_t ddsi_tcp_sock_new (struct ddsi_tran_factory_tcp * const fact, ddsrt_socket_t *sock, uint16_t port) { - struct ddsi_domaingv * const gv = fact->fact.gv; + struct ddsi_domaingv const * const gv = fact->fact.gv; const int one = 1; + union addr socketname; dds_return_t rc; + memset (&socketname, 0, sizeof (socketname)); + switch (fact->fact.m_kind) { - int af = AF_UNSPEC; - switch (fact->fact.m_kind) - { - case NN_LOCATOR_KIND_TCPv4: - af = AF_INET; - break; + case NN_LOCATOR_KIND_TCPv4: + socketname.a4.sin_family = AF_INET; + socketname.a4.sin_addr.s_addr = htonl (INADDR_ANY); + socketname.a4.sin_port = htons (port); + break; #if DDSRT_HAVE_IPV6 - case NN_LOCATOR_KIND_TCPv6: - af = AF_INET6; - break; + case NN_LOCATOR_KIND_TCPv6: + socketname.a6.sin6_family = AF_INET6; + socketname.a6.sin6_addr = ddsrt_in6addr_any; + socketname.a6.sin6_port = htons (port); + break; #endif - default: - DDS_FATAL ("ddsi_tcp_sock_new: unsupported kind %"PRId32"\n", fact->fact.m_kind); - } - assert (af != AF_UNSPEC); - if ((rc = ddsrt_socket (sock, af, SOCK_STREAM, 0)) != DDS_RETCODE_OK) - { - GVERROR ("ddsi_tcp_sock_new: failed to create socket: %s\n", dds_strretcode (rc)); - goto fail; - } + default: + DDS_FATAL ("ddsi_tcp_sock_new: unsupported kind %"PRId32"\n", fact->fact.m_kind); + } + if ((rc = ddsrt_socket (sock, socketname.a.sa_family, SOCK_STREAM, 0)) != DDS_RETCODE_OK) + { + GVERROR ("ddsi_tcp_sock_new: failed to create socket: %s\n", dds_strretcode (rc)); + goto fail; } /* REUSEADDR if we're binding to a port number */ - if (port && (rc = ddsrt_setsockopt (*sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) != DDS_RETCODE_OK) + if (port && (rc = ddsrt_setsockopt (*sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one))) != DDS_RETCODE_OK) { GVERROR ("ddsi_tcp_sock_new: failed to enable address reuse: %s\n", dds_strretcode (rc)); goto fail_w_socket; } + if ((rc = ddsrt_bind (*sock, &socketname.a, ddsrt_sockaddr_get_size (&socketname.a))) != DDS_RETCODE_OK) { - union { - struct sockaddr_storage x; - struct sockaddr_in a4; - struct sockaddr_in6 a6; - } socketname; - memset (&socketname.x, 0, sizeof (socketname.x)); - switch (fact->fact.m_kind) - { - case NN_LOCATOR_KIND_TCPv4: - socketname.a4.sin_family = AF_INET; - socketname.a4.sin_addr.s_addr = htonl (INADDR_ANY); - socketname.a4.sin_port = htons (port); - break; -#if DDSRT_HAVE_IPV6 - case NN_LOCATOR_KIND_TCPv6: - socketname.a4.sin_family = AF_INET6; - socketname.a6.sin6_addr = ddsrt_in6addr_any; - socketname.a6.sin6_port = htons (port); - break; -#endif - default: - DDS_FATAL ("ddsi_tcp_sock_new: unsupported kind %"PRId32"\n", fact->fact.m_kind); - } - if ((rc = ddsrt_bind (*sock, (struct sockaddr *) &socketname, ddsrt_sockaddr_get_size ((struct sockaddr *) &socketname))) != DDS_RETCODE_OK) - { - GVERROR ("ddsi_tcp_sock_new: failed to bind to ANY:%"PRIu16": %s\n", port, dds_strretcode (rc)); - goto fail_w_socket; - } + GVERROR ("ddsi_tcp_sock_new: failed to bind to ANY:%"PRIu16": %s\n", port, + (rc == DDS_RETCODE_PRECONDITION_NOT_MET) ? "address in use" : dds_strretcode (rc)); + goto fail_w_socket; } #ifdef SO_NOSIGPIPE - if (ddsrt_setsockopt (*sock, SOL_SOCKET, SO_NOSIGPIPE, (char *) &one, sizeof (one)) != DDS_RETCODE_OK) + if (ddsrt_setsockopt (*sock, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)) != DDS_RETCODE_OK) { GVERROR ("ddsi_tcp_sock_new: failed to set NOSIGPIPE: %s\n", dds_strretcode (rc)); goto fail_w_socket; } #endif #ifdef TCP_NODELAY - if (gv->config.tcp_nodelay && (rc = ddsrt_setsockopt (*sock, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof (one))) != DDS_RETCODE_OK) + if (gv->config.tcp_nodelay && (rc = ddsrt_setsockopt (*sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one))) != DDS_RETCODE_OK) { GVERROR ("ddsi_tcp_sock_new: failed to set NODELAY: %s\n", dds_strretcode (rc)); goto fail_w_socket; @@ -264,52 +247,57 @@ static void ddsi_tcp_node_free (void * ptr) static void ddsi_tcp_conn_connect (ddsi_tcp_conn_t conn, const ddsrt_msghdr_t * msg) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_base.m_factory; + struct ddsi_domaingv const * const gv = fact->fact.gv; char buff[DDSI_LOCSTRLEN]; ddsrt_socket_t sock; dds_return_t ret; - if (ddsi_tcp_sock_new (fact, &sock, 0) == DDS_RETCODE_OK) + if (ddsi_tcp_sock_new (fact, &sock, 0) != DDS_RETCODE_OK) { - /* Attempt to connect, expected that may fail */ - do { - ret = ddsrt_connect(sock, msg->msg_name, msg->msg_namelen); - } while (ret == DDS_RETCODE_INTERRUPTED); + /* error messages are logged by ddsi_tcp_sock_new */ + return; + } - if (ret != DDS_RETCODE_OK) - { - ddsi_tcp_sock_free (&conn->m_base.m_base.gv->logconfig, sock, NULL); - return; - } - ddsi_tcp_conn_set_socket (conn, sock); + /* Attempt to connect, expected that may fail */ + do { + ret = ddsrt_connect(sock, msg->msg_name, msg->msg_namelen); + } while (ret == DDS_RETCODE_INTERRUPTED); + if (ret != DDS_RETCODE_OK) + goto fail_w_socket; + ddsi_tcp_conn_set_socket (conn, sock); #ifdef DDSI_INCLUDE_SSL - if (fact->ddsi_tcp_ssl_plugin.connect) + if (fact->ddsi_tcp_ssl_plugin.connect) + { + conn->m_ssl = (fact->ddsi_tcp_ssl_plugin.connect) (conn->m_base.m_base.gv, sock); + if (conn->m_ssl == NULL) { - conn->m_ssl = (fact->ddsi_tcp_ssl_plugin.connect) (conn->m_base.m_base.gv, sock); - if (conn->m_ssl == NULL) - { - ddsi_tcp_conn_set_socket (conn, DDSRT_INVALID_SOCKET); - return; - } + ddsi_tcp_conn_set_socket (conn, DDSRT_INVALID_SOCKET); + goto fail_w_socket; } + } #endif - sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *) msg->msg_name); - DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp connect socket %"PRIdSOCK" port %u to %s\n", sock, get_socket_port (&conn->m_base.m_base.gv->logconfig, sock), buff); + sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *) msg->msg_name); + GVLOG (DDS_LC_TCP, "tcp connect socket %"PRIdSOCK" port %u to %s\n", sock, get_socket_port (gv, sock), buff); - /* Also may need to receive on connection so add to waitset */ + /* Also may need to receive on connection so add to waitset */ - (void)ddsrt_setsocknonblocking(conn->m_sock, true); + (void)ddsrt_setsocknonblocking(conn->m_sock, true); - assert (conn->m_base.m_base.gv->n_recv_threads > 0); - assert (conn->m_base.m_base.gv->recv_threads[0].arg.mode == RTM_MANY); - os_sockWaitsetAdd (conn->m_base.m_base.gv->recv_threads[0].arg.u.many.ws, &conn->m_base); - os_sockWaitsetTrigger (conn->m_base.m_base.gv->recv_threads[0].arg.u.many.ws); - } + assert (conn->m_base.m_base.gv->n_recv_threads > 0); + assert (conn->m_base.m_base.gv->recv_threads[0].arg.mode == RTM_MANY); + os_sockWaitsetAdd (conn->m_base.m_base.gv->recv_threads[0].arg.u.many.ws, &conn->m_base); + os_sockWaitsetTrigger (conn->m_base.m_base.gv->recv_threads[0].arg.u.many.ws); + return; + +fail_w_socket: + ddsi_tcp_sock_free (gv, sock, NULL); } static void ddsi_tcp_cache_add (struct ddsi_tran_factory_tcp *fact, ddsi_tcp_conn_t conn, ddsrt_avl_ipath_t * path) { + struct ddsi_domaingv * const gv = fact->fact.gv; const char * action = "added"; ddsi_tcp_node_t node; char buff[DDSI_LOCSTRLEN]; @@ -342,13 +330,14 @@ static void ddsi_tcp_cache_add (struct ddsi_tran_factory_tcp *fact, ddsi_tcp_con } } - sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr); - DDS_CLOG (DDS_LC_TCP, &fact->fact.gv->logconfig, "tcp cache %s %s socket %"PRIdSOCK" to %s\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); + sockaddr_to_string_with_port(fact, buff, sizeof(buff), &conn->m_peer_addr.a); + GVLOG (DDS_LC_TCP, "tcp cache %s %s socket %"PRIdSOCK" to %s\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); } static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_base.m_factory; + struct ddsi_domaingv * const gv = fact->fact.gv; char buff[DDSI_LOCSTRLEN]; ddsi_tcp_node_t node; ddsrt_avl_dpath_t path; @@ -357,8 +346,8 @@ static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn) node = ddsrt_avl_lookup_dpath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, conn, &path); if (node) { - sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr); - DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp cache removed socket %"PRIdSOCK" to %s\n", conn->m_sock, buff); + sockaddr_to_string_with_port(fact, buff, sizeof(buff), &conn->m_peer_addr.a); + GVLOG (DDS_LC_TCP, "tcp cache removed socket %"PRIdSOCK" to %s\n", conn->m_sock, buff); ddsrt_avl_delete_dpath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, node, &path); ddsi_tcp_node_free (node); } @@ -399,7 +388,7 @@ static ddsi_tcp_conn_t ddsi_tcp_cache_find (struct ddsi_tran_factory_tcp *fact, } if (ret == NULL) { - ret = ddsi_tcp_new_conn (fact, DDSRT_INVALID_SOCKET, false, (struct sockaddr *)&key.m_peer_addr); + ret = ddsi_tcp_new_conn (fact, DDSRT_INVALID_SOCKET, false, &key.m_peer_addr.a); ddsi_tcp_cache_add (fact, ret, &path); } ddsrt_mutex_unlock (&fact->ddsi_tcp_cache_lock_g); @@ -425,7 +414,7 @@ static ssize_t ddsi_tcp_conn_read_ssl (ddsi_tcp_conn_t tcp, void * buf, size_t l } #endif -static bool ddsi_tcp_select (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t sock, bool read, size_t pos, int64_t timeout) +static bool ddsi_tcp_select (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, bool read, size_t pos, int64_t timeout) { dds_return_t rc; fd_set fds; @@ -443,22 +432,29 @@ static bool ddsi_tcp_select (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t DDSRT_WARNING_GNUC_ON(sign-conversion) #endif - DDS_CLOG (DDS_LC_TCP, logcfg, "tcp blocked %s: sock %d\n", read ? "read" : "write", (int) sock); + GVLOG (DDS_LC_TCP, "tcp blocked %s: sock %d\n", read ? "read" : "write", (int) sock); do { rc = ddsrt_select (sock + 1, rdset, wrset, NULL, tval, &ready); } while (rc == DDS_RETCODE_INTERRUPTED); if (rc != DDS_RETCODE_OK) { - DDS_CWARNING (logcfg, "tcp abandoning %s on blocking socket %d after %"PRIuSIZE" bytes\n", read ? "read" : "write", (int) sock, pos); + GVWARNING ("tcp abandoning %s on blocking socket %d after %"PRIuSIZE" bytes\n", read ? "read" : "write", (int) sock, pos); } return (ready > 0); } +static int32_t addrfam_to_locator_kind (int af) +{ + assert (af == AF_INET || af == AF_INET6); + return (af == AF_INET) ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6; +} + static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char *buf, size_t len, bool allow_spurious, nn_locator_t *srcloc) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_factory; + struct ddsi_domaingv const * const gv = fact->fact.gv; dds_return_t rc; ddsi_tcp_conn_t tcp = (ddsi_tcp_conn_t) conn; ssize_t (*rd) (ddsi_tcp_conn_t, void *, size_t, dds_return_t * err) = ddsi_tcp_conn_read_plain; @@ -482,14 +478,15 @@ static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char *buf, si { if (srcloc) { - ddsi_ipaddr_to_loc(&fact->fact, srcloc, (struct sockaddr *)&tcp->m_peer_addr, tcp->m_peer_addr.ss_family == AF_INET ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6); + const int32_t kind = addrfam_to_locator_kind (tcp->m_peer_addr.a.sa_family); + ddsi_ipaddr_to_loc(&fact->fact, srcloc, &tcp->m_peer_addr.a, kind); } return (ssize_t) pos; } } else if (n == 0) { - DDS_CLOG (DDS_LC_TCP, &conn->m_base.gv->logconfig, "tcp read: sock %"PRIdSOCK" closed-by-peer\n", tcp->m_sock); + GVLOG (DDS_LC_TCP, "tcp read: sock %"PRIdSOCK" closed-by-peer\n", tcp->m_sock); break; } else @@ -500,13 +497,13 @@ static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char *buf, si { if (allow_spurious && pos == 0) return 0; - const int64_t timeout = conn->m_base.gv->config.tcp_read_timeout; - if (ddsi_tcp_select (&conn->m_base.gv->logconfig, tcp->m_sock, true, pos, timeout) == false) + const int64_t timeout = gv->config.tcp_read_timeout; + if (ddsi_tcp_select (gv, tcp->m_sock, true, pos, timeout) == false) break; } else { - DDS_CLOG (DDS_LC_TCP, &conn->m_base.gv->logconfig, "tcp read: sock %"PRIdSOCK" error %"PRId32"\n", tcp->m_sock, rc); + GVLOG (DDS_LC_TCP, "tcp read: sock %"PRIdSOCK" error %"PRId32"\n", tcp->m_sock, rc); break; } } @@ -542,6 +539,7 @@ static ssize_t ddsi_tcp_block_write (ssize_t (*wr) (ddsi_tcp_conn_t, const void { /* Write all bytes of buf even in the presence of signals, partial writes and blocking (typically write buffer full) */ + struct ddsi_domaingv const * const gv = conn->m_base.m_base.gv; dds_return_t rc; size_t pos = 0; ssize_t n = -1; @@ -559,15 +557,15 @@ static ssize_t ddsi_tcp_block_write (ssize_t (*wr) (ddsi_tcp_conn_t, const void { if (rc == DDS_RETCODE_TRY_AGAIN) { - const int64_t timeout = conn->m_base.m_base.gv->config.tcp_write_timeout; - if (ddsi_tcp_select (&conn->m_base.m_base.gv->logconfig, conn->m_sock, false, pos, timeout) == false) + const int64_t timeout = gv->config.tcp_write_timeout; + if (ddsi_tcp_select (gv, conn->m_sock, false, pos, timeout) == false) { break; } } else { - DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" error %"PRId32"\n", conn->m_sock, rc); + GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" error %"PRId32"\n", conn->m_sock, rc); break; } } @@ -594,6 +592,7 @@ static void set_msghdr_iov (ddsrt_msghdr_t *mhdr, ddsrt_iovec_t *iov, size_t iov static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *dst, size_t niov, const ddsrt_iovec_t *iov, uint32_t flags) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) base->m_factory; + struct ddsi_domaingv const * const gv = fact->fact.gv; #ifdef DDSI_INCLUDE_SSL char msgbuf[4096]; /* stack buffer for merging smallish writes without requiring allocations */ ddsrt_iovec_t iovec; /* iovec used for msgbuf */ @@ -604,13 +603,16 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d int piecewise; bool connect = false; ddsrt_msghdr_t msg; - struct sockaddr_storage dstaddr; + union { + struct sockaddr_storage x; + union addr a; + } dstaddr; assert(niov <= INT_MAX); - ddsi_ipaddr_from_loc(&dstaddr, dst); + ddsi_ipaddr_from_loc(&dstaddr.x, dst); memset(&msg, 0, sizeof(msg)); set_msghdr_iov (&msg, (ddsrt_iovec_t *) iov, niov); msg.msg_name = &dstaddr; - msg.msg_namelen = (socklen_t) ddsrt_sockaddr_get_size((struct sockaddr *) &dstaddr); + msg.msg_namelen = ddsrt_sockaddr_get_size(&dstaddr.a.a); #if DDSRT_MSGHDR_FLAGS msg.msg_flags = (int) flags; #endif @@ -643,13 +645,13 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d if (!connect && ((flags & DDSI_TRAN_ON_CONNECT) != 0)) { - DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" message filtered\n", conn->m_sock); + GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" message filtered\n", conn->m_sock); ddsrt_mutex_unlock (&conn->m_mutex); return (ssize_t) len; } #ifdef DDSI_INCLUDE_SSL - if (base->m_base.gv->config.ssl_enable) + if (gv->config.ssl_enable) { /* SSL doesn't have sendmsg, ret = 0 so writing starts at first byte. Rumor is that it is much better to merge small writes, which do here @@ -702,11 +704,11 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d { case DDS_RETCODE_NO_CONNECTION: case DDS_RETCODE_ILLEGAL_OPERATION: - DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" DDS_RETCODE_NO_CONNECTION\n", conn->m_sock); + GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" DDS_RETCODE_NO_CONNECTION\n", conn->m_sock); break; default: if (! conn->m_base.m_closed && (conn->m_sock != DDSRT_INVALID_SOCKET)) - DDS_CWARNING (&conn->m_base.m_base.gv->logconfig, "tcp write failed on socket %"PRIdSOCK" with errno %"PRId32"\n", conn->m_sock, rc); + GVWARNING ("tcp write failed on socket %"PRIdSOCK" with errno %"PRId32"\n", conn->m_sock, rc); break; } } @@ -715,7 +717,7 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d { if (ret == 0) { - DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" eof\n", conn->m_sock); + GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" eof\n", conn->m_sock); } piecewise = (ret > 0 && (size_t) ret < len); } @@ -783,12 +785,13 @@ static int ddsi_tcp_locator (struct ddsi_tran_factory *fact_cmn, ddsi_tran_base_ return 0; } -static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos) +static dds_return_t ddsi_tcp_create_conn (ddsi_tran_conn_t *conn_out, struct ddsi_tran_factory *fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn; (void) qos; (void) port; - return &fact->ddsi_tcp_conn_client.m_base; + *conn_out = &fact->ddsi_tcp_conn_client.m_base; + return DDS_RETCODE_OK; } static int ddsi_tcp_listen (ddsi_tran_listener_t listener) @@ -812,10 +815,11 @@ static int ddsi_tcp_listen (ddsi_tran_listener_t listener) static ddsi_tran_conn_t ddsi_tcp_accept (ddsi_tran_listener_t listener) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) listener->m_factory; + struct ddsi_domaingv const * const gv = fact->fact.gv; ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) listener; ddsi_tcp_conn_t tcp = NULL; ddsrt_socket_t sock = DDSRT_INVALID_SOCKET; - struct sockaddr_storage addr; + union addr addr; socklen_t addrlen = sizeof (addr); char buff[DDSI_LOCSTRLEN]; dds_return_t rc = DDS_RETCODE_OK; @@ -839,31 +843,31 @@ static ddsi_tran_conn_t ddsi_tcp_accept (ddsi_tran_listener_t listener) { rc = ddsrt_accept(tl->m_sock, NULL, NULL, &sock); } - if (!ddsrt_atomic_ld32(&listener->m_base.gv->rtps_keepgoing)) + if (!ddsrt_atomic_ld32(&gv->rtps_keepgoing)) { - ddsi_tcp_sock_free (&listener->m_base.gv->logconfig, sock, NULL); + ddsi_tcp_sock_free (gv, sock, NULL); return NULL; } } while (rc == DDS_RETCODE_INTERRUPTED || rc == DDS_RETCODE_TRY_AGAIN); if (sock == DDSRT_INVALID_SOCKET) { - (void)ddsrt_getsockname (tl->m_sock, (struct sockaddr *) &addr, &addrlen); - sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&addr); - DDS_CLOG ((rc == DDS_RETCODE_OK) ? DDS_LC_ERROR : DDS_LC_FATAL, &listener->m_base.gv->logconfig, "tcp accept failed on socket %"PRIdSOCK" at %s retcode %"PRId32"\n", tl->m_sock, buff, rc); + (void)ddsrt_getsockname (tl->m_sock, &addr.a, &addrlen); + sockaddr_to_string_with_port(fact, buff, sizeof(buff), &addr.a); + GVLOG ((rc == DDS_RETCODE_OK) ? DDS_LC_ERROR : DDS_LC_FATAL, "tcp accept failed on socket %"PRIdSOCK" at %s retcode %"PRId32"\n", tl->m_sock, buff, rc); } - else if (getpeername (sock, (struct sockaddr *) &addr, &addrlen) == -1) + else if (getpeername (sock, &addr.a, &addrlen) == -1) { - DDS_CWARNING (&listener->m_base.gv->logconfig, "tcp accepted new socket %"PRIdSOCK" on socket %"PRIdSOCK" but no peer address, errno %"PRId32"\n", sock, tl->m_sock, rc); + GVWARNING ("tcp accepted new socket %"PRIdSOCK" on socket %"PRIdSOCK" but no peer address, errno %"PRId32"\n", sock, tl->m_sock, rc); ddsrt_close (sock); } else { - sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&addr); - DDS_CLOG (DDS_LC_TCP, &listener->m_base.gv->logconfig, "tcp accept new socket %"PRIdSOCK" on socket %"PRIdSOCK" from %s\n", sock, tl->m_sock, buff); + sockaddr_to_string_with_port(fact, buff, sizeof(buff), &addr.a); + GVLOG (DDS_LC_TCP, "tcp accept new socket %"PRIdSOCK" on socket %"PRIdSOCK" from %s\n", sock, tl->m_sock, buff); (void)ddsrt_setsocknonblocking (sock, true); - tcp = ddsi_tcp_new_conn (fact, sock, true, (struct sockaddr *)&addr); + tcp = ddsi_tcp_new_conn (fact, sock, true, &addr.a); #ifdef DDSI_INCLUDE_SSL tcp->m_ssl = ssl; #endif @@ -891,14 +895,20 @@ static ddsrt_socket_t ddsi_tcp_listener_handle (ddsi_tran_base_t base) caller (supporting call back over NAT). */ +static void addr_to_loc (const struct ddsi_tran_factory *fact, nn_locator_t *loc, const union addr *addr) +{ + ddsi_ipaddr_to_loc (fact, loc, &addr->a, addrfam_to_locator_kind (addr->a.sa_family)); +} + static void ddsi_tcp_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc) { + struct ddsi_domaingv const * const gv = conn->m_base.gv; char buff[DDSI_LOCSTRLEN]; ddsi_tcp_conn_t tc = (ddsi_tcp_conn_t) conn; assert (tc->m_sock != DDSRT_INVALID_SOCKET); - ddsi_ipaddr_to_loc (conn->m_factory, loc, (struct sockaddr *)&tc->m_peer_addr, tc->m_peer_addr.ss_family == AF_INET ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6); + addr_to_loc (conn->m_factory, loc, &tc->m_peer_addr); ddsi_locator_to_string(buff, sizeof(buff), loc); - DDS_CLOG (DDS_LC_TCP, &conn->m_base.gv->logconfig, "(tcp EP:%s)", buff); + GVLOG (DDS_LC_TCP, "(tcp EP:%s)", buff); } static void ddsi_tcp_base_init (const struct ddsi_tran_factory_tcp *fact, struct ddsi_tran_conn *base) @@ -915,7 +925,7 @@ static void ddsi_tcp_base_init (const struct ddsi_tran_factory_tcp *fact, struct static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, ddsrt_socket_t sock, bool server, struct sockaddr * peer) { - ddsi_tcp_conn_t conn = (ddsi_tcp_conn_t) ddsrt_malloc (sizeof (*conn)); + ddsi_tcp_conn_t conn = ddsrt_malloc (sizeof (*conn)); memset (conn, 0, sizeof (*conn)); ddsi_tcp_base_init (fact, &conn->m_base); @@ -930,19 +940,30 @@ static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, dd return conn; } -static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos) +static dds_return_t ddsi_tcp_create_listener (ddsi_tran_listener_t *listener_out, ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos) { struct ddsi_tran_factory_tcp * const fact_tcp = (struct ddsi_tran_factory_tcp *) fact; - struct ddsi_domaingv * const gv = fact_tcp->fact.gv; + struct ddsi_domaingv const * const gv = fact_tcp->fact.gv; ddsrt_socket_t sock; - (void) qos; - if (ddsi_tcp_sock_new (fact_tcp, &sock, (unsigned short) port) != DDS_RETCODE_OK) - return NULL; + if (ddsi_tcp_sock_new (fact_tcp, &sock, (uint16_t) port) != DDS_RETCODE_OK) + return DDS_RETCODE_ERROR; + char buff[DDSI_LOCSTRLEN]; + union addr addr; + socklen_t addrlen = sizeof (addr); dds_return_t ret; - ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) ddsrt_malloc (sizeof (*tl)); + if ((ret = ddsrt_getsockname (sock, &addr.a, &addrlen)) != DDS_RETCODE_OK) + { + GVERROR ("ddsi_tcp_create_listener: ddsrt_getsockname returned %"PRId32"\n", ret); + ddsi_tcp_sock_free (gv, sock, NULL); + return DDS_RETCODE_ERROR; + } + sockaddr_to_string_with_port (fact_tcp, buff, sizeof (buff), &addr.a); + GVLOG (DDS_LC_TCP, "tcp create listener socket %"PRIdSOCK" on %s\n", sock, buff); + + ddsi_tcp_listener_t tl = ddsrt_malloc (sizeof (*tl)); memset (tl, 0, sizeof (*tl)); tl->m_sock = sock; @@ -952,33 +973,21 @@ static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, tl->m_base.m_accept_fn = ddsi_tcp_accept; tl->m_base.m_factory = fact; - tl->m_base.m_base.m_port = get_socket_port (&fact->gv->logconfig, sock); + tl->m_base.m_base.m_port = get_socket_port (gv, sock); tl->m_base.m_base.m_trantype = DDSI_TRAN_LISTENER; tl->m_base.m_base.m_handle_fn = ddsi_tcp_listener_handle; tl->m_base.m_locator_fn = ddsi_tcp_locator; - - struct sockaddr_storage addr; - socklen_t addrlen = sizeof (addr); - if ((ret = ddsrt_getsockname (sock, (struct sockaddr *) &addr, &addrlen)) != DDS_RETCODE_OK) - { - GVERROR ("ddsi_tcp_create_listener: ddsrt_getsockname returned %"PRId32"\n", ret); - ddsi_tcp_sock_free (&fact->gv->logconfig, sock, NULL); - ddsrt_free (tl); - return NULL; - } - - char buff[DDSI_LOCSTRLEN]; - sockaddr_to_string_with_port (fact_tcp, buff, sizeof (buff), (struct sockaddr *) &addr); - GVLOG (DDS_LC_TCP, "tcp create listener socket %"PRIdSOCK" on %s\n", sock, buff); - return &tl->m_base; + *listener_out = &tl->m_base; + return DDS_RETCODE_OK; } static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_base.m_factory; + struct ddsi_domaingv const * const gv = fact->fact.gv; char buff[DDSI_LOCSTRLEN]; - sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr); - DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp free %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); + sockaddr_to_string_with_port(fact, buff, sizeof(buff), &conn->m_peer_addr.a); + GVLOG (DDS_LC_TCP, "tcp free %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); #ifdef DDSI_INCLUDE_SSL if (fact->ddsi_tcp_ssl_plugin.ssl_free) @@ -988,7 +997,7 @@ static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn) else #endif { - ddsi_tcp_sock_free (&conn->m_base.m_base.gv->logconfig, conn->m_sock, "connection"); + ddsi_tcp_sock_free (gv, conn->m_sock, "connection"); } ddsrt_mutex_destroy (&conn->m_mutex); ddsrt_free (conn); @@ -997,17 +1006,18 @@ static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn) static void ddsi_tcp_close_conn (ddsi_tran_conn_t tc) { struct ddsi_tran_factory_tcp * const fact_tcp = (struct ddsi_tran_factory_tcp *) tc->m_factory; + struct ddsi_domaingv * const gv = fact_tcp->fact.gv; if (tc != &fact_tcp->ddsi_tcp_conn_client.m_base) { char buff[DDSI_LOCSTRLEN]; nn_locator_t loc; ddsi_tcp_conn_t conn = (ddsi_tcp_conn_t) tc; - sockaddr_to_string_with_port(fact_tcp, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr); - DDS_CLOG (DDS_LC_TCP, &tc->m_base.gv->logconfig, "tcp close %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); + sockaddr_to_string_with_port(fact_tcp, buff, sizeof(buff), &conn->m_peer_addr.a); + GVLOG (DDS_LC_TCP, "tcp close %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff); (void) shutdown (conn->m_sock, 2); - ddsi_ipaddr_to_loc(&fact_tcp->fact, &loc, (struct sockaddr *)&conn->m_peer_addr, conn->m_peer_addr.ss_family == AF_INET ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6); + ddsi_ipaddr_to_loc(&fact_tcp->fact, &loc, &conn->m_peer_addr.a, addrfam_to_locator_kind(conn->m_peer_addr.a.sa_family)); loc.port = conn->m_peer_port; - purge_proxy_participants (conn->m_base.m_base.gv, &loc, conn->m_base.m_server); + purge_proxy_participants (gv, &loc, conn->m_base.m_server); } } @@ -1023,7 +1033,7 @@ static void ddsi_tcp_release_conn (ddsi_tran_conn_t conn) static void ddsi_tcp_unblock_listener (ddsi_tran_listener_t listener) { struct ddsi_tran_factory_tcp * const fact_tcp = (struct ddsi_tran_factory_tcp *) listener->m_factory; - struct ddsi_domaingv * const gv = fact_tcp->fact.gv; + struct ddsi_domaingv const * const gv = fact_tcp->fact.gv; ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) listener; ddsrt_socket_t sock; dds_return_t ret; @@ -1032,44 +1042,40 @@ static void ddsi_tcp_unblock_listener (ddsi_tran_listener_t listener) if (ddsi_tcp_sock_new (fact_tcp, &sock, 0) != DDS_RETCODE_OK) goto fail; - struct sockaddr_storage addr; + union addr addr; socklen_t addrlen = sizeof (addr); - if ((ret = ddsrt_getsockname (tl->m_sock, (struct sockaddr *) &addr, &addrlen)) != DDS_RETCODE_OK) + if ((ret = ddsrt_getsockname (tl->m_sock, &addr.a, &addrlen)) != DDS_RETCODE_OK) { GVWARNING ("tcp failed to get listener address error %"PRId32"\n", ret); goto fail_w_socket; } - switch (addr.ss_family) + switch (addr.a.sa_family) { - case AF_INET: { - struct sockaddr_in *socketname = (struct sockaddr_in *) &addr; - if (socketname->sin_addr.s_addr == htonl (INADDR_ANY)) - socketname->sin_addr.s_addr = htonl (INADDR_LOOPBACK); + case AF_INET: + if (addr.a4.sin_addr.s_addr == htonl (INADDR_ANY)) + addr.a4.sin_addr.s_addr = htonl (INADDR_LOOPBACK); break; - } #if DDSRT_HAVE_IPV6 - case AF_INET6: { - struct sockaddr_in6 *socketname = (struct sockaddr_in6 *) &addr; - if (memcmp (&socketname->sin6_addr, &ddsrt_in6addr_any, sizeof (socketname->sin6_addr)) == 0) - socketname->sin6_addr = ddsrt_in6addr_loopback; + case AF_INET6: + if (memcmp (&addr.a6.sin6_addr, &ddsrt_in6addr_any, sizeof (addr.a6.sin6_addr)) == 0) + addr.a6.sin6_addr = ddsrt_in6addr_loopback; break; - } #endif } do { - ret = ddsrt_connect (sock, (struct sockaddr *) &addr, ddsrt_sockaddr_get_size ((struct sockaddr *) &addr)); + ret = ddsrt_connect (sock, &addr.a, ddsrt_sockaddr_get_size (&addr.a)); } while (ret == DDS_RETCODE_INTERRUPTED); if (ret != DDS_RETCODE_OK) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) listener->m_factory; char buff[DDSI_LOCSTRLEN]; - sockaddr_to_string_with_port (fact, buff, sizeof (buff), (struct sockaddr *) &addr); + sockaddr_to_string_with_port (fact, buff, sizeof (buff), &addr.a); GVWARNING ("tcp failed to connect to own listener (%s) error %"PRId32"\n", buff, ret); } fail_w_socket: - ddsi_tcp_sock_free (&listener->m_base.gv->logconfig, sock, NULL); + ddsi_tcp_sock_free (gv, sock, NULL); fail: return; } @@ -1077,6 +1083,7 @@ fail: static void ddsi_tcp_release_listener (ddsi_tran_listener_t listener) { ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) listener; + struct ddsi_domaingv const * const gv = tl->m_base.m_base.gv; #ifdef DDSI_INCLUDE_SSL struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) listener->m_factory; if (fact->ddsi_tcp_ssl_plugin.bio_vfree) @@ -1084,13 +1091,14 @@ static void ddsi_tcp_release_listener (ddsi_tran_listener_t listener) (fact->ddsi_tcp_ssl_plugin.bio_vfree) (tl->m_bio); } #endif - ddsi_tcp_sock_free (&listener->m_base.gv->logconfig, tl->m_sock, "listener"); + ddsi_tcp_sock_free (gv, tl->m_sock, "listener"); ddsrt_free (tl); } static void ddsi_tcp_release_factory (struct ddsi_tran_factory *fact_cmn) { struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn; + struct ddsi_domaingv const * const gv = fact->fact.gv; ddsrt_avl_free (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, ddsi_tcp_node_free); ddsrt_mutex_destroy (&fact->ddsi_tcp_cache_lock_g); #ifdef DDSI_INCLUDE_SSL @@ -1099,7 +1107,7 @@ static void ddsi_tcp_release_factory (struct ddsi_tran_factory *fact_cmn) (fact->ddsi_tcp_ssl_plugin.fini) (); } #endif - DDS_CLOG (DDS_LC_CONFIG, &fact_cmn->gv->logconfig, "tcp de-initialized\n"); + GVLOG (DDS_LC_CONFIG, "tcp de-initialized\n"); ddsrt_free (fact); } diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index fdf1fcf..afd7c33 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -24,13 +24,13 @@ extern inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn); extern inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn); -extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos); +extern inline dds_return_t ddsi_factory_create_listener (ddsi_tran_listener_t *listener, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos); extern inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int32_t kind); extern inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port); extern inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn); extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc); extern inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base); -extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos); +extern inline dds_return_t ddsi_factory_create_conn (ddsi_tran_conn_t *conn, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos); extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc); extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener); extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener); diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index 382f029..3410510 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -26,6 +26,13 @@ #include "dds/ddsi/q_pcap.h" #include "dds/ddsi/ddsi_domaingv.h" +union addr { + struct sockaddr_storage x; + struct sockaddr a; + struct sockaddr_in a4; + struct sockaddr_in6 a6; +}; + typedef struct ddsi_udp_conn { struct ddsi_tran_conn m_base; ddsrt_socket_t m_sock; @@ -35,20 +42,27 @@ typedef struct ddsi_udp_conn { int m_diffserv; } *ddsi_udp_conn_t; -static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc) +static void addr_to_loc (const struct ddsi_tran_factory *tran, nn_locator_t *dst, const union addr *src) { + ddsi_ipaddr_to_loc (tran, dst, &src->a, (src->a.sa_family == AF_INET) ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6); +} + +static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn_cmn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc) +{ + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; + struct ddsi_domaingv * const gv = conn->m_base.m_base.gv; dds_return_t rc; ssize_t ret = 0; ddsrt_msghdr_t msghdr; - struct sockaddr_storage src; + union addr src; ddsrt_iovec_t msg_iov; socklen_t srclen = (socklen_t) sizeof (src); (void) allow_spurious; - msg_iov.iov_base = (void*) buf; - msg_iov.iov_len = (ddsrt_iov_len_t)len; /* Windows uses unsigned, POSIX (except Linux) int */ + msg_iov.iov_base = (void *) buf; + msg_iov.iov_len = (ddsrt_iov_len_t) len; /* Windows uses unsigned, POSIX (except Linux) int */ - msghdr.msg_name = &src; + msghdr.msg_name = &src.x; msghdr.msg_namelen = srclen; msghdr.msg_iov = &msg_iov; msghdr.msg_iovlen = 1; @@ -61,65 +75,67 @@ static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, s #endif do { - rc = ddsrt_recvmsg(((ddsi_udp_conn_t) conn)->m_sock, &msghdr, 0, &ret); + rc = ddsrt_recvmsg (conn->m_sock, &msghdr, 0, &ret); } while (rc == DDS_RETCODE_INTERRUPTED); if (ret > 0) { if (srcloc) - ddsi_ipaddr_to_loc(conn->m_factory, srcloc, (struct sockaddr *)&src, src.ss_family == AF_INET ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6); + addr_to_loc (conn->m_base.m_factory, srcloc, &src); - if(conn->m_base.gv->pcap_fp) + if (gv->pcap_fp) { - struct sockaddr_storage dest; + union addr dest; socklen_t dest_len = sizeof (dest); - if (ddsrt_getsockname (((ddsi_udp_conn_t) conn)->m_sock, (struct sockaddr *) &dest, &dest_len) != DDS_RETCODE_OK) - memset(&dest, 0, sizeof(dest)); - write_pcap_received(conn->m_base.gv, ddsrt_time_wallclock(), &src, &dest, buf, (size_t) ret); + if (ddsrt_getsockname (conn->m_sock, &dest.a, &dest_len) != DDS_RETCODE_OK) + memset (&dest, 0, sizeof (dest)); + write_pcap_received (gv, ddsrt_time_wallclock (), &src.x, &dest.x, buf, (size_t) ret); } /* Check for udp packet truncation */ - if ((((size_t) ret) > len) #if DDSRT_MSGHDR_FLAGS - || (msghdr.msg_flags & MSG_TRUNC) + const bool trunc_flag = (msghdr.msg_flags & MSG_TRUNC) != 0; +#else + const bool trunc_flag = false; #endif - ) + if ((size_t) ret > len || trunc_flag) { char addrbuf[DDSI_LOCSTRLEN]; nn_locator_t tmp; - ddsi_ipaddr_to_loc(conn->m_factory, &tmp, (struct sockaddr *)&src, src.ss_family == AF_INET ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6); - ddsi_locator_to_string(addrbuf, sizeof(addrbuf), &tmp); - DDS_CWARNING(&conn->m_base.gv->logconfig, "%s => %d truncated to %d\n", addrbuf, (int)ret, (int)len); + addr_to_loc (conn->m_base.m_factory, &tmp, &src); + ddsi_locator_to_string (addrbuf, sizeof (addrbuf), &tmp); + GVWARNING ("%s => %d truncated to %d\n", addrbuf, (int) ret, (int) len); } } - else if (rc != DDS_RETCODE_BAD_PARAMETER && - rc != DDS_RETCODE_NO_CONNECTION) + else if (rc != DDS_RETCODE_BAD_PARAMETER && rc != DDS_RETCODE_NO_CONNECTION) { - DDS_CERROR(&conn->m_base.gv->logconfig, "UDP recvmsg sock %d: ret %d retcode %"PRId32"\n", (int) ((ddsi_udp_conn_t) conn)->m_sock, (int) ret, rc); + GVERROR ("UDP recvmsg sock %d: ret %d retcode %"PRId32"\n", (int) conn->m_sock, (int) ret, rc); ret = -1; } return ret; } -static void set_msghdr_iov (ddsrt_msghdr_t *mhdr, ddsrt_iovec_t *iov, size_t iovlen) +static void set_msghdr_iov (ddsrt_msghdr_t *mhdr, const ddsrt_iovec_t *iov, size_t iovlen) { - mhdr->msg_iov = iov; - mhdr->msg_iovlen = (ddsrt_msg_iovlen_t)iovlen; + mhdr->msg_iov = (ddsrt_iovec_t *) iov; + mhdr->msg_iovlen = (ddsrt_msg_iovlen_t) iovlen; } -static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const ddsrt_iovec_t *iov, uint32_t flags) +static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn_cmn, const nn_locator_t *dst, size_t niov, const ddsrt_iovec_t *iov, uint32_t flags) { + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; + struct ddsi_domaingv * const gv = conn->m_base.m_base.gv; dds_return_t rc; ssize_t ret = -1; unsigned retry = 2; int sendflags = 0; ddsrt_msghdr_t msg; - struct sockaddr_storage dstaddr; - assert(niov <= INT_MAX); - ddsi_ipaddr_from_loc(&dstaddr, dst); - set_msghdr_iov (&msg, (ddsrt_iovec_t *) iov, niov); - msg.msg_name = &dstaddr; - msg.msg_namelen = (socklen_t) ddsrt_sockaddr_get_size((struct sockaddr *) &dstaddr); + union addr dstaddr; + assert (niov <= INT_MAX); + ddsi_ipaddr_from_loc (&dstaddr.x, dst); + set_msghdr_iov (&msg, iov, niov); + msg.msg_name = &dstaddr.x; + msg.msg_namelen = (socklen_t) ddsrt_sockaddr_get_size (&dstaddr.a); #if defined(__sun) && !defined(_XPG4_2) msg.msg_accrights = NULL; msg.msg_accrightslen = 0; @@ -130,56 +146,53 @@ static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *d #if DDSRT_MSGHDR_FLAGS msg.msg_flags = (int) flags; #else - DDSRT_UNUSED_ARG(flags); + DDSRT_UNUSED_ARG (flags); #endif #if MSG_NOSIGNAL && !LWIP_SOCKET sendflags |= MSG_NOSIGNAL; #endif do { - ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn; - rc = ddsrt_sendmsg (uc->m_sock, &msg, sendflags, &ret); + rc = ddsrt_sendmsg (conn->m_sock, &msg, sendflags, &ret); #if defined _WIN32 && !defined WINCE - if (rc == DDS_RETCODE_TRY_AGAIN) { + if (rc == DDS_RETCODE_TRY_AGAIN) + { WSANETWORKEVENTS ev; - WaitForSingleObject(uc->m_sockEvent, INFINITE); - WSAEnumNetworkEvents(uc->m_sock, uc->m_sockEvent, &ev); + WaitForSingleObject (conn->m_sockEvent, INFINITE); + WSAEnumNetworkEvents (conn->m_sock, conn->m_sockEvent, &ev); } #endif - } while ((rc == DDS_RETCODE_INTERRUPTED) || - (rc == DDS_RETCODE_TRY_AGAIN) || - (rc == DDS_RETCODE_NOT_ALLOWED && retry-- > 0)); - if (ret > 0 && conn->m_base.gv->pcap_fp) + } while (rc == DDS_RETCODE_INTERRUPTED || rc == DDS_RETCODE_TRY_AGAIN || (rc == DDS_RETCODE_NOT_ALLOWED && retry-- > 0)); + if (ret > 0 && gv->pcap_fp) { - struct sockaddr_storage sa; + union addr sa; socklen_t alen = sizeof (sa); - if (ddsrt_getsockname (((ddsi_udp_conn_t) conn)->m_sock, (struct sockaddr *) &sa, &alen) != DDS_RETCODE_OK) + if (ddsrt_getsockname (conn->m_sock, &sa.a, &alen) != DDS_RETCODE_OK) memset(&sa, 0, sizeof(sa)); - write_pcap_sent (conn->m_base.gv, ddsrt_time_wallclock (), &sa, &msg, (size_t) ret); + write_pcap_sent (gv, ddsrt_time_wallclock (), &sa.x, &msg, (size_t) ret); } - else if (rc != DDS_RETCODE_OK && - rc != DDS_RETCODE_NOT_ALLOWED && - rc != DDS_RETCODE_NO_CONNECTION) + else if (rc != DDS_RETCODE_OK && rc != DDS_RETCODE_NOT_ALLOWED && rc != DDS_RETCODE_NO_CONNECTION) { - DDS_CERROR(&conn->m_base.gv->logconfig, "ddsi_udp_conn_write failed with retcode %"PRId32"\n", rc); + GVERROR ("ddsi_udp_conn_write failed with retcode %"PRId32"\n", rc); } - return (rc == DDS_RETCODE_OK ? ret : -1); + return (rc == DDS_RETCODE_OK) ? ret : -1; } -static void ddsi_udp_disable_multiplexing (ddsi_tran_conn_t base) +static void ddsi_udp_disable_multiplexing (ddsi_tran_conn_t conn_cmn) { #if defined _WIN32 && !defined WINCE - ddsi_udp_conn_t uc = (ddsi_udp_conn_t) base; + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; uint32_t zero = 0, dummy; - WSAEventSelect(uc->m_sock, 0, 0); - WSAIoctl(uc->m_sock, FIONBIO, &zero,sizeof(zero), NULL,0, &dummy, NULL,NULL); + WSAEventSelect (conn->m_sock, 0, 0); + WSAIoctl (conn->m_sock, FIONBIO, &zero,sizeof(zero), NULL,0, &dummy, NULL,NULL); #else - (void)base; + (void) conn_cmn; #endif } -static ddsrt_socket_t ddsi_udp_conn_handle (ddsi_tran_base_t base) +static ddsrt_socket_t ddsi_udp_conn_handle (ddsi_tran_base_t conn_cmn) { - return ((ddsi_udp_conn_t) base)->m_sock; + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; + return conn->m_sock; } static bool ddsi_udp_supports (const struct ddsi_tran_factory *fact, int32_t kind) @@ -187,162 +200,171 @@ static bool ddsi_udp_supports (const struct ddsi_tran_factory *fact, int32_t kin return kind == fact->m_kind || (kind == NN_LOCATOR_KIND_UDPv4MCGEN && fact->m_kind == NN_LOCATOR_KIND_UDPv4); } -static int ddsi_udp_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t base, nn_locator_t *loc) +static int ddsi_udp_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t conn_cmn, nn_locator_t *loc) { + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; int ret = -1; - ddsi_udp_conn_t uc = (ddsi_udp_conn_t) base; - if (uc->m_sock != DDSRT_INVALID_SOCKET) + if (conn->m_sock != DDSRT_INVALID_SOCKET) { loc->kind = fact->m_kind; - loc->port = uc->m_base.m_base.m_port; - memcpy(loc->address, uc->m_base.m_base.gv->extloc.address, sizeof (loc->address)); + loc->port = conn->m_base.m_base.m_port; + memcpy (loc->address, conn->m_base.m_base.gv->extloc.address, sizeof (loc->address)); ret = 0; } return ret; } -static uint16_t get_socket_port (struct ddsi_domaingv * const gv, ddsrt_socket_t sock) +static uint16_t get_socket_port (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock) { dds_return_t ret; - struct sockaddr_storage addr; + union addr addr; socklen_t addrlen = sizeof (addr); - - ret = ddsrt_getsockname (sock, (struct sockaddr *)&addr, &addrlen); + ret = ddsrt_getsockname (sock, &addr.a, &addrlen); if (ret != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_get_socket_port: getsockname returned %"PRId32"\n", ret); return 0; } - - return ddsrt_sockaddr_get_port ((struct sockaddr *)&addr); + return ddsrt_sockaddr_get_port (&addr.a); } -static dds_return_t set_dont_route (struct ddsi_domaingv * const gv, ddsrt_socket_t socket, bool ipv6) +static dds_return_t set_dont_route (struct ddsi_domaingv const * const gv, ddsrt_socket_t socket, bool ipv6) { dds_return_t rc; #if DDSRT_HAVE_IPV6 if (ipv6) { - const unsigned ipv6Flag = 1; - if ((rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &ipv6Flag, sizeof (ipv6Flag))) != DDS_RETCODE_OK) + const unsigned uone = 1; + if ((rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &uone, sizeof (uone))) != DDS_RETCODE_OK) GVERROR ("ddsi_udp_create_conn: set IPV6_UNICAST_HOPS = 1 failed: %s\n", dds_strretcode (rc)); return rc; } +#else + (void) ipv6; #endif const int one = 1; - if ((rc = ddsrt_setsockopt (socket, SOL_SOCKET, SO_DONTROUTE, (char *) &one, sizeof (one))) != DDS_RETCODE_OK) + if ((rc = ddsrt_setsockopt (socket, SOL_SOCKET, SO_DONTROUTE, &one, sizeof (one))) != DDS_RETCODE_OK) GVERROR ("ddsi_udp_create_conn: set SO_DONTROUTE = 1 failed: %s\n", dds_strretcode (rc)); return rc; } -static dds_return_t set_rcvbuf (struct ddsi_domaingv * const gv, ddsrt_socket_t sock, const struct config_maybe_uint32 *min_size) +static dds_return_t set_rcvbuf (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, const struct config_maybe_uint32 *min_size) { - uint32_t ReceiveBufferSize; - socklen_t optlen = (socklen_t) sizeof (ReceiveBufferSize); + uint32_t size; + socklen_t optlen = (socklen_t) sizeof (size); uint32_t socket_min_rcvbuf_size; dds_return_t rc; - if (min_size->isdefault) - socket_min_rcvbuf_size = 1048576; - else - socket_min_rcvbuf_size = min_size->value; - rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, (char *) &ReceiveBufferSize, &optlen); + socket_min_rcvbuf_size = min_size->isdefault ? 1048576 : min_size->value; + rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, &size, &optlen); if (rc == DDS_RETCODE_BAD_PARAMETER) { /* not all stacks support getting/setting RCVBUF */ GVLOG (DDS_LC_CONFIG, "cannot retrieve socket receive buffer size\n"); return DDS_RETCODE_OK; } - - if (rc != DDS_RETCODE_OK) + else if (rc != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: get SO_RCVBUF failed: %s\n", dds_strretcode (rc)); - goto fail; + return rc; } - if (ReceiveBufferSize < socket_min_rcvbuf_size) + if (size < socket_min_rcvbuf_size) { /* make sure the receive buffersize is at least the minimum required */ - ReceiveBufferSize = socket_min_rcvbuf_size; - (void) ddsrt_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, (const char *) &ReceiveBufferSize, sizeof (ReceiveBufferSize)); + size = socket_min_rcvbuf_size; + (void) ddsrt_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &size, sizeof (size)); /* We don't check the return code from setsockopt, because some O/Ss tend to silently cap the buffer size. The only way to make sure is to read the option value back and check it is now set correctly. */ - if ((rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, (char *) &ReceiveBufferSize, &optlen)) != DDS_RETCODE_OK) + if ((rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, &size, &optlen)) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: get SO_RCVBUF failed: %s\n", dds_strretcode (rc)); - goto fail; + return rc; } - if (ReceiveBufferSize >= socket_min_rcvbuf_size) - GVLOG (DDS_LC_CONFIG, "socket receive buffer size set to %"PRIu32" bytes\n", ReceiveBufferSize); + if (size >= socket_min_rcvbuf_size) + GVLOG (DDS_LC_CONFIG, "socket receive buffer size set to %"PRIu32" bytes\n", size); + else if (min_size->isdefault) + GVLOG (DDS_LC_CONFIG, + "failed to increase socket receive buffer size to %"PRIu32" bytes, continuing with %"PRIu32" bytes\n", + socket_min_rcvbuf_size, size); else - GVLOG (min_size->isdefault ? DDS_LC_CONFIG : DDS_LC_ERROR, - "failed to increase socket receive buffer size to %"PRIu32" bytes, continuing with %"PRIu32" bytes\n", - socket_min_rcvbuf_size, ReceiveBufferSize); + { + /* If the configuration states it must be >= X, then error out if the + kernel doesn't give us at least X */ + GVLOG (DDS_LC_CONFIG | DDS_LC_ERROR, + "failed to increase socket receive buffer size to %"PRIu32" bytes, maximum is %"PRIu32" bytes\n", + socket_min_rcvbuf_size, size); + rc = DDS_RETCODE_NOT_ENOUGH_SPACE; + } } -fail: return rc; } -static dds_return_t set_sndbuf (struct ddsi_domaingv * const gv, ddsrt_socket_t sock, uint32_t min_size) +static dds_return_t set_sndbuf (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, uint32_t min_size) { - unsigned SendBufferSize; - socklen_t optlen = (socklen_t) sizeof(SendBufferSize); + unsigned size; + socklen_t optlen = (socklen_t) sizeof(size); dds_return_t rc; - rc = ddsrt_getsockopt(sock, SOL_SOCKET, SO_SNDBUF,(char *)&SendBufferSize, &optlen); + rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_SNDBUF, &size, &optlen); if (rc == DDS_RETCODE_BAD_PARAMETER) { /* not all stacks support getting/setting SNDBUF */ GVLOG (DDS_LC_CONFIG, "cannot retrieve socket send buffer size\n"); return DDS_RETCODE_OK; } - - if (rc != DDS_RETCODE_OK) + else if (rc != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: get SO_SNDBUF failed: %s\n", dds_strretcode (rc)); - goto fail; + return rc; } - if (SendBufferSize < min_size) + if (size < min_size) { /* make sure the send buffersize is at least the minimum required */ - SendBufferSize = min_size; - if ((rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, (const char *) &SendBufferSize, sizeof (SendBufferSize))) != DDS_RETCODE_OK) + size = min_size; + if ((rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &size, sizeof (size))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set SO_SNDBUF failed: %s\n", dds_strretcode (rc)); - goto fail; + return rc; } } -fail: - return rc; + return DDS_RETCODE_OK; } -static dds_return_t set_mc_options_transmit_ipv6 (struct ddsi_domaingv * const gv, ddsrt_socket_t sock) +static dds_return_t set_mc_options_transmit_ipv6 (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock) { + /* Function is a never-called no-op if IPv6 is not supported to keep the call-site a bit cleaner */ #if DDSRT_HAVE_IPV6 - const unsigned interfaceNo = gv->interfaceNo; + const unsigned ifno = gv->interfaceNo; const unsigned ttl = (unsigned) gv->config.multicast_ttl; const unsigned loop = (unsigned) !!gv->config.enableMulticastLoopback; dds_return_t rc; - if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interfaceNo, sizeof (interfaceNo))) != DDS_RETCODE_OK) + if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_IF, &ifno, sizeof (ifno))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set IPV6_MULTICAST_IF failed: %s\n", dds_strretcode (rc)); - else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, (char *) &ttl, sizeof (ttl))) != DDS_RETCODE_OK) + return rc; + } + if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &ttl, sizeof (ttl))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set IPV6_MULTICAST_HOPS failed: %s\n", dds_strretcode (rc)); - else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK) + return rc; + } + if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set IPV6_MULTICAST_LOOP failed: %s\n", dds_strretcode (rc)); - return rc; + return rc; + } + return DDS_RETCODE_OK; #else (void) gv; (void) sock; return DDS_RETCODE_ERROR; #endif } -static dds_return_t set_mc_options_transmit_ipv4_if (struct ddsi_domaingv * const gv, ddsrt_socket_t sock) +static dds_return_t set_mc_options_transmit_ipv4_if (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock) { #if (defined(__linux) || defined(__APPLE__)) && !LWIP_SOCKET if (gv->config.use_multicast_if_mreqn) @@ -362,23 +384,29 @@ static dds_return_t set_mc_options_transmit_ipv4_if (struct ddsi_domaingv * cons return ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_IF, gv->ownloc.address + 12, 4); } -static dds_return_t set_mc_options_transmit_ipv4 (struct ddsi_domaingv * const gv, ddsrt_socket_t sock) +static dds_return_t set_mc_options_transmit_ipv4 (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock) { const unsigned char ttl = (unsigned char) gv->config.multicast_ttl; const unsigned char loop = (unsigned char) !!gv->config.enableMulticastLoopback; dds_return_t rc; - if ((rc = set_mc_options_transmit_ipv4_if (gv, sock)) != DDS_RETCODE_OK) + if ((rc = set_mc_options_transmit_ipv4_if (gv, sock)) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set IP_MULTICAST_IF failed: %s\n", dds_strretcode (rc)); - else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_TTL, (char *) &ttl, sizeof (ttl))) != DDS_RETCODE_OK) + return rc; + } + if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof (ttl))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set IP_MULTICAST_TTL failed: %s\n", dds_strretcode (rc)); - else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK) + return rc; + } + if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set IP_MULTICAST_LOOP failed: %s\n", dds_strretcode (rc)); - return rc; + return rc; + } + return DDS_RETCODE_OK; } -static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos) +static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos) { - struct ddsi_domaingv * const gv = fact->gv; + struct ddsi_domaingv const * const gv = fact->gv; const int one = 1; dds_return_t rc; @@ -406,31 +434,37 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t } assert (purpose_str != NULL); + union addr socketname; + nn_locator_t ownloc_w_port = gv->ownloc; + assert (ownloc_w_port.port == NN_LOCATOR_PORT_INVALID); + if (port) { + /* PORT_INVALID maps to 0 in ipaddr_from_loc */ + ownloc_w_port.port = port; + } + ddsi_ipaddr_from_loc (&socketname.x, &ownloc_w_port); + switch (fact->m_kind) { - int af = AF_UNSPEC; - switch (fact->m_kind) - { - case NN_LOCATOR_KIND_UDPv4: - af = AF_INET; - break; + case NN_LOCATOR_KIND_UDPv4: + if (bind_to_any) + socketname.a4.sin_addr.s_addr = htonl (INADDR_ANY); + break; #if DDSRT_HAVE_IPV6 - case NN_LOCATOR_KIND_UDPv6: - af = AF_INET6; - ipv6 = true; - break; + case NN_LOCATOR_KIND_UDPv6: + ipv6 = true; + if (bind_to_any) + socketname.a6.sin6_addr = ddsrt_in6addr_any; + break; #endif - default: - DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->m_kind); - } - assert (af != AF_UNSPEC); - if ((rc = ddsrt_socket (&sock, af, SOCK_DGRAM, 0)) != DDS_RETCODE_OK) - { - GVERROR ("ddsi_udp_create_conn: failed to create socket: %s\n", dds_strretcode (rc)); - goto fail; - } + default: + DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->m_kind); + } + if ((rc = ddsrt_socket (&sock, socketname.a.sa_family, SOCK_DGRAM, 0)) != DDS_RETCODE_OK) + { + GVERROR ("ddsi_udp_create_conn: failed to create socket: %s\n", dds_strretcode (rc)); + goto fail; } - if (reuse_addr && (rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) != DDS_RETCODE_OK) + if (reuse_addr && (rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: failed to enable address reuse: %s\n", dds_strretcode (rc)); if (rc != DDS_RETCODE_BAD_PARAMETER) @@ -448,45 +482,21 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t if (gv->config.dontRoute && (rc = set_dont_route (gv, sock, ipv6)) != DDS_RETCODE_OK) goto fail_w_socket; + if ((rc = ddsrt_bind (sock, &socketname.a, ddsrt_sockaddr_get_size (&socketname.a))) != DDS_RETCODE_OK) { - union { - struct sockaddr_storage x; - struct sockaddr_in a4; - struct sockaddr_in6 a6; - } socketname; - nn_locator_t ownloc_w_port = gv->ownloc; - assert (ownloc_w_port.port == NN_LOCATOR_PORT_INVALID); - if (port) { - /* PORT_INVALID maps to 0 in ipaddr_from_loc */ - ownloc_w_port.port = port; - } - ddsi_ipaddr_from_loc (&socketname.x, &ownloc_w_port); + /* PRECONDITION_NOT_MET (= EADDRINUSE) is expected if reuse_addr isn't set, should be handled at + a higher level and therefore needs to return a specific error message */ + if (!reuse_addr && rc == DDS_RETCODE_PRECONDITION_NOT_MET) + goto fail_addrinuse; + + char buf[DDSI_LOCATORSTRLEN]; if (bind_to_any) - { - switch (fact->m_kind) - { - case NN_LOCATOR_KIND_UDPv4: - socketname.a4.sin_addr.s_addr = htonl (INADDR_ANY); - break; -#if DDSRT_HAVE_IPV6 - case NN_LOCATOR_KIND_UDPv6: - socketname.a6.sin6_addr = ddsrt_in6addr_any; - break; -#endif - default: - DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->m_kind); - } - } - if ((rc = ddsrt_bind (sock, (struct sockaddr *) &socketname, ddsrt_sockaddr_get_size ((struct sockaddr *) &socketname))) != DDS_RETCODE_OK) - { - char buf[DDSI_LOCATORSTRLEN]; - if (bind_to_any) - snprintf (buf, sizeof (buf), "ANY:%"PRIu32, port); - else - ddsi_locator_to_string (buf, sizeof (buf), &ownloc_w_port); - GVERROR ("ddsi_udp_create_conn: failed to bind to %s: %s\n", buf, dds_strretcode (rc)); - goto fail_w_socket; - } + snprintf (buf, sizeof (buf), "ANY:%"PRIu32, port); + else + ddsi_locator_to_string (buf, sizeof (buf), &ownloc_w_port); + GVERROR ("ddsi_udp_create_conn: failed to bind to %s: %s\n", buf, + (rc == DDS_RETCODE_PRECONDITION_NOT_MET) ? "address in use" : dds_strretcode (rc)); + goto fail_w_socket; } rc = ipv6 ? set_mc_options_transmit_ipv6 (gv, sock) : set_mc_options_transmit_ipv4 (gv, sock); @@ -494,9 +504,9 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t goto fail_w_socket; #ifdef DDSI_INCLUDE_NETWORK_CHANNELS - if ((qos->m_diffserv != 0) && (fact->m_kind == NN_LOCATOR_KIND_UDPv4)) + if (qos->m_diffserv != 0 && fact->m_kind == NN_LOCATOR_KIND_UDPv4) { - if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_TOS, (char *) &qos->m_diffserv, sizeof (qos->m_diffserv))) != DDS_RETCODE_OK) + if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_TOS, &qos->m_diffserv, sizeof (qos->m_diffserv))) != DDS_RETCODE_OK) { GVERROR ("ddsi_udp_create_conn: set diffserv retcode %"PRId32"\n", rc); goto fail_w_socket; @@ -504,49 +514,52 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t } #endif - ddsi_udp_conn_t uc = (ddsi_udp_conn_t) ddsrt_malloc (sizeof (*uc)); - memset (uc, 0, sizeof (*uc)); + ddsi_udp_conn_t conn = ddsrt_malloc (sizeof (*conn)); + memset (conn, 0, sizeof (*conn)); - uc->m_sock = sock; - uc->m_diffserv = qos->m_diffserv; + conn->m_sock = sock; + conn->m_diffserv = qos->m_diffserv; #if defined _WIN32 && !defined WINCE - uc->m_sockEvent = WSACreateEvent(); - WSAEventSelect(uc->m_sock, uc->m_sockEvent, FD_WRITE); + conn->m_sockEvent = WSACreateEvent (); + WSAEventSelect (conn->m_sock, conn->m_sockEvent, FD_WRITE); #endif - ddsi_factory_conn_init (fact, &uc->m_base); - uc->m_base.m_base.m_port = get_socket_port (gv, sock); - uc->m_base.m_base.m_trantype = DDSI_TRAN_CONN; - uc->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC); - uc->m_base.m_base.m_handle_fn = ddsi_udp_conn_handle; + ddsi_factory_conn_init (fact, &conn->m_base); + conn->m_base.m_base.m_port = get_socket_port (gv, sock); + conn->m_base.m_base.m_trantype = DDSI_TRAN_CONN; + conn->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC); + conn->m_base.m_base.m_handle_fn = ddsi_udp_conn_handle; - uc->m_base.m_read_fn = ddsi_udp_conn_read; - uc->m_base.m_write_fn = ddsi_udp_conn_write; - uc->m_base.m_disable_multiplexing_fn = ddsi_udp_disable_multiplexing; - uc->m_base.m_locator_fn = ddsi_udp_conn_locator; + conn->m_base.m_read_fn = ddsi_udp_conn_read; + conn->m_base.m_write_fn = ddsi_udp_conn_write; + conn->m_base.m_disable_multiplexing_fn = ddsi_udp_disable_multiplexing; + conn->m_base.m_locator_fn = ddsi_udp_conn_locator; - GVTRACE ("ddsi_udp_create_conn %s socket %"PRIdSOCK" port %"PRIu32"\n", purpose_str, uc->m_sock, uc->m_base.m_base.m_port); - return &uc->m_base; + GVTRACE ("ddsi_udp_create_conn %s socket %"PRIdSOCK" port %"PRIu32"\n", purpose_str, conn->m_sock, conn->m_base.m_base.m_port); + *conn_out = &conn->m_base; + return DDS_RETCODE_OK; fail_w_socket: ddsrt_close (sock); fail: - if (fact->gv->config.participantIndex != PARTICIPANT_INDEX_AUTO) - GVERROR ("ddsi_udp_create_conn: failed for %s port %"PRIu32"\n", purpose_str, port); - return NULL; + return DDS_RETCODE_ERROR; + +fail_addrinuse: + ddsrt_close (sock); + return DDS_RETCODE_PRECONDITION_NOT_MET; } static int joinleave_asm_mcgroup (ddsrt_socket_t socket, int join, const nn_locator_t *mcloc, const struct nn_interface *interf) { dds_return_t rc; - struct sockaddr_storage mcip; - ddsi_ipaddr_from_loc(&mcip, mcloc); + union addr mcip; + ddsi_ipaddr_from_loc (&mcip.x, mcloc); #if DDSRT_HAVE_IPV6 if (mcloc->kind == NN_LOCATOR_KIND_UDPv6) { struct ipv6_mreq ipv6mreq; memset (&ipv6mreq, 0, sizeof (ipv6mreq)); - memcpy (&ipv6mreq.ipv6mr_multiaddr, &((struct sockaddr_in6 *) &mcip)->sin6_addr, sizeof (ipv6mreq.ipv6mr_multiaddr)); + memcpy (&ipv6mreq.ipv6mr_multiaddr, &mcip.a6, sizeof (ipv6mreq.ipv6mr_multiaddr)); ipv6mreq.ipv6mr_interface = interf ? interf->if_index : 0; rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, join ? IPV6_JOIN_GROUP : IPV6_LEAVE_GROUP, &ipv6mreq, sizeof (ipv6mreq)); } @@ -554,12 +567,12 @@ static int joinleave_asm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca #endif { struct ip_mreq mreq; - mreq.imr_multiaddr = ((struct sockaddr_in *) &mcip)->sin_addr; + mreq.imr_multiaddr = mcip.a4.sin_addr; if (interf) memcpy (&mreq.imr_interface, interf->loc.address + 12, sizeof (mreq.imr_interface)); else mreq.imr_interface.s_addr = htonl (INADDR_ANY); - rc = ddsrt_setsockopt (socket, IPPROTO_IP, join ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, (char *) &mreq, sizeof (mreq)); + rc = ddsrt_setsockopt (socket, IPPROTO_IP, join ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, &mreq, sizeof (mreq)); } return (rc == DDS_RETCODE_OK) ? 0 : -1; } @@ -568,17 +581,17 @@ static int joinleave_asm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca static int joinleave_ssm_mcgroup (ddsrt_socket_t socket, int join, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf) { dds_return_t rc; - struct sockaddr_storage mcip, srcip; - ddsi_ipaddr_from_loc(&mcip, mcloc); - ddsi_ipaddr_from_loc(&srcip, srcloc); + union addr mcip, srcip; + ddsi_ipaddr_from_loc (&mcip.x, mcloc); + ddsi_ipaddr_from_loc (&srcip.x, srcloc); #if DDSRT_HAVE_IPV6 if (mcloc->kind == NN_LOCATOR_KIND_UDPv6) { struct group_source_req gsr; memset (&gsr, 0, sizeof (gsr)); gsr.gsr_interface = interf ? interf->if_index : 0; - memcpy (&gsr.gsr_group, &mcip, sizeof (gsr.gsr_group)); - memcpy (&gsr.gsr_source, &srcip, sizeof (gsr.gsr_source)); + memcpy (&gsr.gsr_group, &mcip.a6, sizeof (gsr.gsr_group)); + memcpy (&gsr.gsr_source, &srcip.a6, sizeof (gsr.gsr_source)); rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, join ? MCAST_JOIN_SOURCE_GROUP : MCAST_LEAVE_SOURCE_GROUP, &gsr, sizeof (gsr)); } else @@ -586,8 +599,8 @@ static int joinleave_ssm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca { struct ip_mreq_source mreq; memset (&mreq, 0, sizeof (mreq)); - mreq.imr_sourceaddr = ((struct sockaddr_in *) &srcip)->sin_addr; - mreq.imr_multiaddr = ((struct sockaddr_in *) &mcip)->sin_addr; + mreq.imr_sourceaddr = srcip.a4.sin_addr; + mreq.imr_multiaddr = mcip.a4.sin_addr; if (interf) memcpy (&mreq.imr_interface, interf->loc.address + 12, sizeof (mreq.imr_interface)); else @@ -598,43 +611,42 @@ static int joinleave_ssm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca } #endif -static int ddsi_udp_join_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf) +static int ddsi_udp_join_mc (ddsi_tran_conn_t conn_cmn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf) { - ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn; - (void)srcloc; + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; + (void) srcloc; #ifdef DDSI_INCLUDE_SSM if (srcloc) - return joinleave_ssm_mcgroup(uc->m_sock, 1, srcloc, mcloc, interf); + return joinleave_ssm_mcgroup (conn->m_sock, 1, srcloc, mcloc, interf); else #endif - return joinleave_asm_mcgroup(uc->m_sock, 1, mcloc, interf); + return joinleave_asm_mcgroup (conn->m_sock, 1, mcloc, interf); } -static int ddsi_udp_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf) +static int ddsi_udp_leave_mc (ddsi_tran_conn_t conn_cmn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf) { - ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn; - (void)srcloc; + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; + (void) srcloc; #ifdef DDSI_INCLUDE_SSM if (srcloc) - return joinleave_ssm_mcgroup(uc->m_sock, 0, srcloc, mcloc, interf); + return joinleave_ssm_mcgroup (conn->m_sock, 0, srcloc, mcloc, interf); else #endif - return joinleave_asm_mcgroup(uc->m_sock, 0, mcloc, interf); + return joinleave_asm_mcgroup (conn->m_sock, 0, mcloc, interf); } -static void ddsi_udp_release_conn (ddsi_tran_conn_t conn) +static void ddsi_udp_release_conn (ddsi_tran_conn_t conn_cmn) { - ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn; - DDS_CTRACE (&conn->m_base.gv->logconfig, - "ddsi_udp_release_conn %s socket %"PRIdSOCK" port %"PRIu32"\n", - conn->m_base.m_multicast ? "multicast" : "unicast", - uc->m_sock, - uc->m_base.m_base.m_port); - ddsrt_close (uc->m_sock); + ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn; + struct ddsi_domaingv const * const gv = conn->m_base.m_base.gv; + GVTRACE ("ddsi_udp_release_conn %s socket %"PRIdSOCK" port %"PRIu32"\n", + conn_cmn->m_base.m_multicast ? "multicast" : "unicast", + conn->m_sock, conn->m_base.m_base.m_port); + ddsrt_close (conn->m_sock); #if defined _WIN32 && !defined WINCE - WSACloseEvent(uc->m_sockEvent); + WSACloseEvent (conn->m_sockEvent); #endif - ddsrt_free (conn); + ddsrt_free (conn_cmn); } static int ddsi_udp_is_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc) @@ -720,7 +732,8 @@ static char *ddsi_udp_locator_to_string (char *dst, size_t sizeof_dst, const nn_ static void ddsi_udp_fini (ddsi_tran_factory_t fact) { - DDS_CLOG (DDS_LC_CONFIG, &fact->gv->logconfig, "udp finalized\n"); + struct ddsi_domaingv const * const gv = fact->gv; + GVLOG (DDS_LC_CONFIG, "udp finalized\n"); ddsrt_free (fact); } @@ -730,7 +743,7 @@ static int ddsi_udp_is_valid_port (ddsi_tran_factory_t fact, uint32_t port) return (port <= 65535); } -int ddsi_udp_init (struct ddsi_domaingv *gv) +int ddsi_udp_init (struct ddsi_domaingv*gv) { struct ddsi_tran_factory *fact = ddsrt_malloc (sizeof (*fact)); memset (fact, 0, sizeof (*fact)); diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index 68f72af..f23a18e 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -369,8 +369,7 @@ struct debug_monitor *new_debug_monitor (struct ddsi_domaingv *gv, int32_t port) goto err_invalid_port; } - dm->servsock = ddsi_factory_create_listener (dm->tran_factory, (uint32_t) port, NULL); - if (dm->servsock == NULL) + if (ddsi_factory_create_listener (&dm->servsock, dm->tran_factory, (uint32_t) port, NULL) != DDS_RETCODE_OK) { GVWARNING ("debmon: can't create socket\n"); goto err_servsock; diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 6917574..e9f9e8e 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -587,6 +587,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain struct participant *pp; ddsi_guid_t subguid, group_guid; struct whc_writer_info *wrinfo; + ddsi_tran_conn_t ppconn; /* no reserved bits may be set */ assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0); @@ -603,6 +604,18 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain if (entidx_lookup_participant_guid (gv->entity_index, ppguid) != NULL) return DDS_RETCODE_PRECONDITION_NOT_MET; + if (gv->config.many_sockets_mode != MSM_MANY_UNICAST) + ppconn = NULL; + else + { + const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 }; + if (ddsi_factory_create_conn (&ppconn, gv->m_factory, 0, &qos) != DDS_RETCODE_OK) + { + GVERROR ("new_participant("PGUIDFMT", %x) failed: could not create network endpoint\n", PGUID (*ppguid), flags); + return DDS_RETCODE_OUT_OF_RESOURCES; + } + } + if (gv->config.max_participants == 0) { ddsrt_mutex_lock (&gv->participant_set_lock); @@ -621,6 +634,8 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain { ddsrt_mutex_unlock (&gv->participant_set_lock); GVERROR ("new_participant("PGUIDFMT", %x) failed: max participants reached\n", PGUID (*ppguid), flags); + if (ppconn) + ddsi_conn_free (ppconn); return DDS_RETCODE_OUT_OF_RESOURCES; } } @@ -649,16 +664,9 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain GVLOGDISC ("}\n"); } + pp->m_conn = ppconn; if (gv->config.many_sockets_mode == MSM_MANY_UNICAST) - { - const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 }; - pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos); ddsi_conn_locator (pp->m_conn, &pp->m_locator); - } - else - { - pp->m_conn = NULL; - } ddsrt_fibheap_init (&lease_fhdef_pp, &pp->leaseheap_man); ddsrt_atomic_stvoidp (&pp->minl_man, NULL); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 1a1a0cb..d2479a3 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -72,13 +72,16 @@ static void add_peer_addresses (const struct ddsi_domaingv *gv, struct addrset * } enum make_uc_sockets_ret { - MUSRET_SUCCESS, - MUSRET_INVALID_PORTS, - MUSRET_NOSOCKET + MUSRET_SUCCESS, /* unicast socket(s) created */ + MUSRET_INVALID_PORTS, /* specified port numbers are invalid */ + MUSRET_PORTS_IN_USE, /* ports were in use, keep trying */ + MUSRET_ERROR /* generic error, no use continuing */ }; static enum make_uc_sockets_ret make_uc_sockets (struct ddsi_domaingv *gv, uint32_t * pdisc, uint32_t * pdata, int ppid) { + dds_return_t rc; + if (gv->config.many_sockets_mode == MSM_NO_UNICAST) { assert (ppid == PARTICIPANT_INDEX_NONE); @@ -97,33 +100,29 @@ static enum make_uc_sockets_ret make_uc_sockets (struct ddsi_domaingv *gv, uint3 return MUSRET_INVALID_PORTS; const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 }; - gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, &qos); - if (gv->disc_conn_uc) + rc = ddsi_factory_create_conn (&gv->disc_conn_uc, gv->m_factory, *pdisc, &qos); + if (rc != DDS_RETCODE_OK) + goto fail_disc; + + if (*pdata == 0 || *pdata == *pdisc) + gv->data_conn_uc = gv->disc_conn_uc; + else { - /* Check not configured to use same unicast port for data and discovery */ - - if (*pdata != 0 && (*pdata != *pdisc)) - { - gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, &qos); - } - else - { - gv->data_conn_uc = gv->disc_conn_uc; - } - if (gv->data_conn_uc == NULL) - { - ddsi_conn_free (gv->disc_conn_uc); - gv->disc_conn_uc = NULL; - } - else - { - /* Set unicast locators */ - ddsi_conn_locator (gv->disc_conn_uc, &gv->loc_meta_uc); - ddsi_conn_locator (gv->data_conn_uc, &gv->loc_default_uc); - } + rc = ddsi_factory_create_conn (&gv->data_conn_uc, gv->m_factory, *pdata, &qos); + if (rc != DDS_RETCODE_OK) + goto fail_data; } + ddsi_conn_locator (gv->disc_conn_uc, &gv->loc_meta_uc); + ddsi_conn_locator (gv->data_conn_uc, &gv->loc_default_uc); + return MUSRET_SUCCESS; - return gv->data_conn_uc ? MUSRET_SUCCESS : MUSRET_NOSOCKET; +fail_data: + ddsi_conn_free (gv->disc_conn_uc); + gv->disc_conn_uc = NULL; +fail_disc: + if (rc == DDS_RETCODE_PRECONDITION_NOT_MET) + return MUSRET_PORTS_IN_USE; + return MUSRET_ERROR; } static void make_builtin_endpoint_xqos (dds_qos_t *q, const dds_qos_t *template) @@ -675,7 +674,7 @@ int create_multicast_sockets (struct ddsi_domaingv *gv) gv->config.extDomainId.value, port); goto err_disc; } - if ((disc = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL) + if (ddsi_factory_create_conn (&disc, gv->m_factory, port, &qos) != DDS_RETCODE_OK) goto err_disc; if (gv->config.many_sockets_mode == MSM_NO_UNICAST) { @@ -691,10 +690,8 @@ int create_multicast_sockets (struct ddsi_domaingv *gv) gv->config.extDomainId.value, port); goto err_disc; } - if ((data = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL) - { + if (ddsi_factory_create_conn (&data, gv->m_factory, port, &qos) != DDS_RETCODE_OK) goto err_data; - } } gv->disc_conn_mc = disc; @@ -1192,18 +1189,21 @@ int rtps_init (struct ddsi_domaingv *gv) GVERROR ("Failed to create unicast sockets for domain %"PRIu32" participant index %d: resulting port numbers (%"PRIu32", %"PRIu32") are out of range\n", gv->config.extDomainId.value, gv->config.participantIndex, port_disc_uc, port_data_uc); goto err_unicast_sockets; - case MUSRET_NOSOCKET: + case MUSRET_PORTS_IN_USE: GVERROR ("rtps_init: failed to create unicast sockets for domain %"PRId32" participant index %d (ports %"PRIu32", %"PRIu32")\n", gv->config.extDomainId.value, gv->config.participantIndex, port_disc_uc, port_data_uc); goto err_unicast_sockets; + case MUSRET_ERROR: + /* something bad happened; assume make_uc_sockets logged the error */ + goto err_unicast_sockets; } } else if (gv->config.participantIndex == PARTICIPANT_INDEX_AUTO) { /* try to find a free one, and update gv->config.participantIndex */ - enum make_uc_sockets_ret musret = MUSRET_NOSOCKET; + enum make_uc_sockets_ret musret = MUSRET_PORTS_IN_USE; int ppid; GVLOG (DDS_LC_CONFIG, "rtps_init: trying to find a free participant index\n"); - for (ppid = 0; ppid <= gv->config.maxAutoParticipantIndex && musret == MUSRET_NOSOCKET; ppid++) + for (ppid = 0; ppid <= gv->config.maxAutoParticipantIndex && musret == MUSRET_PORTS_IN_USE; ppid++) { musret = make_uc_sockets (gv, &port_disc_uc, &port_data_uc, ppid); switch (musret) @@ -1214,8 +1214,11 @@ int rtps_init (struct ddsi_domaingv *gv) GVERROR ("Failed to create unicast sockets for domain %"PRIu32" participant index %d: resulting port numbers (%"PRIu32", %"PRIu32") are out of range\n", gv->config.extDomainId.value, ppid, port_disc_uc, port_data_uc); goto err_unicast_sockets; - case MUSRET_NOSOCKET: /* Try next one */ + case MUSRET_PORTS_IN_USE: /* Try next one */ break; + case MUSRET_ERROR: + /* something bad happened; assume make_uc_sockets logged the error */ + goto err_unicast_sockets; } } if (ppid > gv->config.maxAutoParticipantIndex) @@ -1235,7 +1238,7 @@ int rtps_init (struct ddsi_domaingv *gv) if (gv->config.pcap_file && *gv->config.pcap_file) { - gv->pcap_fp = new_pcap_file (&gv->logconfig, gv->config.pcap_file); + gv->pcap_fp = new_pcap_file (gv, gv->config.pcap_file); if (gv->pcap_fp) { ddsrt_mutex_init (&gv->pcap_lock); @@ -1286,8 +1289,9 @@ int rtps_init (struct ddsi_domaingv *gv) } else { - gv->listener = ddsi_factory_create_listener (gv->m_factory, (uint32_t) gv->config.tcp_port, NULL); - if (gv->listener == NULL || ddsi_listener_listen (gv->listener) != 0) + dds_return_t rc; + rc = ddsi_factory_create_listener (&gv->listener, gv->m_factory, (uint32_t) gv->config.tcp_port, NULL); + if (rc != DDS_RETCODE_OK || ddsi_listener_listen (gv->listener) != 0) { GVERROR ("Failed to create %s listener\n", gv->m_factory->m_typename); if (gv->listener) @@ -1308,7 +1312,10 @@ int rtps_init (struct ddsi_domaingv *gv) /* Create shared transmit connection */ { const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_XMIT, .m_diffserv = 0 }; - gv->xmit_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos); + dds_return_t rc; + rc = ddsi_factory_create_conn (&gv->xmit_conn, gv->m_factory, 0, &qos); + if (rc != DDS_RETCODE_OK) + goto err_mc_conn; } #ifdef DDSI_INCLUDE_NETWORK_CHANNELS diff --git a/src/core/ddsi/src/q_pcap.c b/src/core/ddsi/src/q_pcap.c index 601c96d..45f6c67 100644 --- a/src/core/ddsi/src/q_pcap.c +++ b/src/core/ddsi/src/q_pcap.c @@ -76,7 +76,7 @@ static const ipv4_hdr_t ipv4_hdr_template = { #define IPV4_HDR_SIZE 20 #define UDP_HDR_SIZE 8 -FILE *new_pcap_file (const struct ddsrt_log_cfg *logcfg, const char *name) +FILE *new_pcap_file (struct ddsi_domaingv *gv, const char *name) { DDSRT_WARNING_MSVC_OFF(4996); FILE *fp; @@ -84,7 +84,7 @@ FILE *new_pcap_file (const struct ddsrt_log_cfg *logcfg, const char *name) if ((fp = fopen (name, "wb")) == NULL) { - DDS_CWARNING (logcfg, "packet capture disabled: file %s could not be opened for writing\n", name); + GVWARNING ("packet capture disabled: file %s could not be opened for writing\n", name); return NULL; }