Minor cleanup of UDP, TCP support code

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-03-13 15:14:17 +01:00 committed by eboasson
parent 0b9ab17018
commit e1201e678d
10 changed files with 517 additions and 479 deletions

View file

@ -60,8 +60,8 @@ typedef void (*ddsi_tran_free_fn_t) (ddsi_tran_factory_t);
typedef void (*ddsi_tran_peer_locator_fn_t) (ddsi_tran_conn_t, nn_locator_t *);
typedef void (*ddsi_tran_disable_multiplexing_fn_t) (ddsi_tran_conn_t);
typedef ddsi_tran_conn_t (*ddsi_tran_accept_fn_t) (ddsi_tran_listener_t);
typedef ddsi_tran_conn_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_factory_t fact, uint32_t, const struct ddsi_tran_qos *);
typedef ddsi_tran_listener_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *);
typedef dds_return_t (*ddsi_tran_create_conn_fn_t) (ddsi_tran_conn_t *conn, ddsi_tran_factory_t fact, uint32_t, const struct ddsi_tran_qos *);
typedef dds_return_t (*ddsi_tran_create_listener_fn_t) (ddsi_tran_listener_t *listener, ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *);
typedef void (*ddsi_tran_release_conn_fn_t) (ddsi_tran_conn_t);
typedef void (*ddsi_tran_close_conn_fn_t) (ddsi_tran_conn_t);
typedef void (*ddsi_tran_unblock_listener_fn_t) (ddsi_tran_listener_t);
@ -214,15 +214,17 @@ inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int3
inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port) {
return factory->m_is_valid_port_fn (factory, port);
}
inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
inline dds_return_t ddsi_factory_create_conn (ddsi_tran_conn_t *conn, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
*conn = NULL;
if (!ddsi_is_valid_port (factory, port))
return NULL;
return factory->m_create_conn_fn (factory, port, qos);
return DDS_RETCODE_BAD_PARAMETER;
return factory->m_create_conn_fn (conn, factory, port, qos);
}
inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
inline dds_return_t ddsi_factory_create_listener (ddsi_tran_listener_t *listener, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
*listener = NULL;
if (!ddsi_is_valid_port (factory, port))
return NULL;
return factory->m_create_listener_fn (factory, port, qos);
return DDS_RETCODE_BAD_PARAMETER;
return factory->m_create_listener_fn (listener, factory, port, qos);
}
void ddsi_tran_free (ddsi_tran_base_t base);

View file

@ -21,7 +21,7 @@ extern "C" {
struct msghdr;
FILE * new_pcap_file (const struct ddsrt_log_cfg *logcfg, const char *name);
FILE * new_pcap_file (struct ddsi_domaingv *gv, const char *name);
void write_pcap_received (struct ddsi_domaingv *gv, ddsrt_wctime_t tstamp, const struct sockaddr_storage *src, const struct sockaddr_storage *dst, unsigned char *buf, size_t sz);
void write_pcap_sent (struct ddsi_domaingv *gv, ddsrt_wctime_t tstamp, const struct sockaddr_storage *src,

View file

@ -174,7 +174,7 @@ static int ddsi_raweth_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t
return ret;
}
static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
static dds_return_t ddsi_raweth_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
{
ddsrt_socket_t sock;
dds_return_t rc;
@ -187,14 +187,14 @@ static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint3
if (port == 0 || port > 65535)
{
DDS_CERROR (&fact->gv->logconfig, "ddsi_raweth_create_conn %s port %u - using port number as ethernet type, %u won't do\n", mcast ? "multicast" : "unicast", port, port);
return NULL;
return DDS_RETCODE_ERROR;
}
rc = ddsrt_socket(&sock, PF_PACKET, SOCK_DGRAM, htons((uint16_t)port));
if (rc != DDS_RETCODE_OK)
{
DDS_CERROR (&fact->gv->logconfig, "ddsi_raweth_create_conn %s port %u failed ... retcode = %d\n", mcast ? "multicast" : "unicast", port, rc);
return NULL;
return DDS_RETCODE_ERROR;
}
memset(&addr, 0, sizeof(addr));
@ -207,13 +207,13 @@ static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint3
{
ddsrt_close(sock);
DDS_CERROR (&fact->gv->logconfig, "ddsi_raweth_create_conn %s bind port %u failed ... retcode = %d\n", mcast ? "multicast" : "unicast", port, rc);
return NULL;
return DDS_RETCODE_ERROR;
}
if ((uc = (ddsi_raweth_conn_t) ddsrt_malloc (sizeof (*uc))) == NULL)
{
ddsrt_close(sock);
return NULL;
return DDS_RETCODE_ERROR;
}
memset (uc, 0, sizeof (*uc));
@ -230,7 +230,8 @@ static ddsi_tran_conn_t ddsi_raweth_create_conn (ddsi_tran_factory_t fact, uint3
uc->m_base.m_disable_multiplexing_fn = 0;
DDS_CTRACE (&fact->gv->logconfig, "ddsi_raweth_create_conn %s socket %d port %u\n", mcast ? "multicast" : "unicast", uc->m_sock, uc->m_base.m_base.m_port);
return &uc->m_base;
*conn_out = &uc->m_base;
return DDS_RETCODE_OK;
}
static int isbroadcast(const nn_locator_t *loc)

View file

@ -39,9 +39,15 @@
wait set that manages their lifecycle.
*/
union addr {
struct sockaddr a;
struct sockaddr_in a4;
struct sockaddr_in6 a6;
};
typedef struct ddsi_tcp_conn {
struct ddsi_tran_conn m_base;
struct sockaddr_storage m_peer_addr;
union addr m_peer_addr;
uint32_t m_peer_port;
ddsrt_mutex_t m_mutex;
ddsrt_socket_t m_sock;
@ -70,8 +76,8 @@ struct ddsi_tran_factory_tcp {
static int ddsi_tcp_cmp_conn (const struct ddsi_tcp_conn *c1, const struct ddsi_tcp_conn *c2)
{
const struct sockaddr *a1s = (struct sockaddr *)&c1->m_peer_addr;
const struct sockaddr *a2s = (struct sockaddr *)&c2->m_peer_addr;
const struct sockaddr *a1s = &c1->m_peer_addr.a;
const struct sockaddr *a2s = &c2->m_peer_addr.a;
if (a1s->sa_family != a2s->sa_family)
return (a1s->sa_family < a2s->sa_family) ? -1 : 1;
else if (c1->m_peer_port != c2->m_peer_port)
@ -132,81 +138,45 @@ static void ddsi_tcp_cache_dump (void)
}
*/
static unsigned short get_socket_port (struct ddsrt_log_cfg *logcfg, ddsrt_socket_t socket)
static uint16_t get_socket_port (struct ddsi_domaingv const * const gv, ddsrt_socket_t socket)
{
struct sockaddr_storage addr;
union addr addr;
socklen_t addrlen = sizeof (addr);
dds_return_t ret;
ret = ddsrt_getsockname(socket, (struct sockaddr *)&addr, &addrlen);
ret = ddsrt_getsockname(socket, &addr.a, &addrlen);
if (ret != DDS_RETCODE_OK) {
DDS_CERROR (logcfg, "ddsi_tcp_get_socket_port: ddsrt_getsockname retcode %"PRId32"\n", ret);
GVERROR ("ddsi_tcp_get_socket_port: ddsrt_getsockname retcode %"PRId32"\n", ret);
return 0;
}
return ddsrt_sockaddr_get_port((struct sockaddr *)&addr);
return ddsrt_sockaddr_get_port (&addr.a);
}
static void ddsi_tcp_conn_set_socket (ddsi_tcp_conn_t conn, ddsrt_socket_t sock)
{
struct ddsi_domaingv const * const gv = conn->m_base.m_base.gv;
conn->m_sock = sock;
conn->m_base.m_base.m_port = (sock == DDSRT_INVALID_SOCKET) ? INVALID_PORT : get_socket_port (&conn->m_base.m_base.gv->logconfig, sock);
conn->m_base.m_base.m_port = (sock == DDSRT_INVALID_SOCKET) ? INVALID_PORT : get_socket_port (gv, sock);
}
static void ddsi_tcp_sock_free (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t sock, const char *msg)
static void ddsi_tcp_sock_free (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, const char *msg)
{
if (sock != DDSRT_INVALID_SOCKET)
{
if (msg)
{
DDS_CLOG (DDS_LC_TCP, logcfg, "tcp %s free socket %"PRIdSOCK"\n", msg, sock);
}
GVLOG (DDS_LC_TCP, "tcp %s free socket %"PRIdSOCK"\n", msg, sock);
ddsrt_close (sock);
}
}
static dds_return_t ddsi_tcp_sock_new (struct ddsi_tran_factory_tcp * const fact, ddsrt_socket_t *sock, uint16_t port)
{
struct ddsi_domaingv * const gv = fact->fact.gv;
struct ddsi_domaingv const * const gv = fact->fact.gv;
const int one = 1;
union addr socketname;
dds_return_t rc;
{
int af = AF_UNSPEC;
switch (fact->fact.m_kind)
{
case NN_LOCATOR_KIND_TCPv4:
af = AF_INET;
break;
#if DDSRT_HAVE_IPV6
case NN_LOCATOR_KIND_TCPv6:
af = AF_INET6;
break;
#endif
default:
DDS_FATAL ("ddsi_tcp_sock_new: unsupported kind %"PRId32"\n", fact->fact.m_kind);
}
assert (af != AF_UNSPEC);
if ((rc = ddsrt_socket (sock, af, SOCK_STREAM, 0)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_sock_new: failed to create socket: %s\n", dds_strretcode (rc));
goto fail;
}
}
/* REUSEADDR if we're binding to a port number */
if (port && (rc = ddsrt_setsockopt (*sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_sock_new: failed to enable address reuse: %s\n", dds_strretcode (rc));
goto fail_w_socket;
}
{
union {
struct sockaddr_storage x;
struct sockaddr_in a4;
struct sockaddr_in6 a6;
} socketname;
memset (&socketname.x, 0, sizeof (socketname.x));
memset (&socketname, 0, sizeof (socketname));
switch (fact->fact.m_kind)
{
case NN_LOCATOR_KIND_TCPv4:
@ -216,7 +186,7 @@ static dds_return_t ddsi_tcp_sock_new (struct ddsi_tran_factory_tcp * const fact
break;
#if DDSRT_HAVE_IPV6
case NN_LOCATOR_KIND_TCPv6:
socketname.a4.sin_family = AF_INET6;
socketname.a6.sin6_family = AF_INET6;
socketname.a6.sin6_addr = ddsrt_in6addr_any;
socketname.a6.sin6_port = htons (port);
break;
@ -224,22 +194,35 @@ static dds_return_t ddsi_tcp_sock_new (struct ddsi_tran_factory_tcp * const fact
default:
DDS_FATAL ("ddsi_tcp_sock_new: unsupported kind %"PRId32"\n", fact->fact.m_kind);
}
if ((rc = ddsrt_bind (*sock, (struct sockaddr *) &socketname, ddsrt_sockaddr_get_size ((struct sockaddr *) &socketname))) != DDS_RETCODE_OK)
if ((rc = ddsrt_socket (sock, socketname.a.sa_family, SOCK_STREAM, 0)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_sock_new: failed to bind to ANY:%"PRIu16": %s\n", port, dds_strretcode (rc));
GVERROR ("ddsi_tcp_sock_new: failed to create socket: %s\n", dds_strretcode (rc));
goto fail;
}
/* REUSEADDR if we're binding to a port number */
if (port && (rc = ddsrt_setsockopt (*sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one))) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_sock_new: failed to enable address reuse: %s\n", dds_strretcode (rc));
goto fail_w_socket;
}
if ((rc = ddsrt_bind (*sock, &socketname.a, ddsrt_sockaddr_get_size (&socketname.a))) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_sock_new: failed to bind to ANY:%"PRIu16": %s\n", port,
(rc == DDS_RETCODE_PRECONDITION_NOT_MET) ? "address in use" : dds_strretcode (rc));
goto fail_w_socket;
}
#ifdef SO_NOSIGPIPE
if (ddsrt_setsockopt (*sock, SOL_SOCKET, SO_NOSIGPIPE, (char *) &one, sizeof (one)) != DDS_RETCODE_OK)
if (ddsrt_setsockopt (*sock, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof (one)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_sock_new: failed to set NOSIGPIPE: %s\n", dds_strretcode (rc));
goto fail_w_socket;
}
#endif
#ifdef TCP_NODELAY
if (gv->config.tcp_nodelay && (rc = ddsrt_setsockopt (*sock, IPPROTO_TCP, TCP_NODELAY, (char*) &one, sizeof (one))) != DDS_RETCODE_OK)
if (gv->config.tcp_nodelay && (rc = ddsrt_setsockopt (*sock, IPPROTO_TCP, TCP_NODELAY, &one, sizeof (one))) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_sock_new: failed to set NODELAY: %s\n", dds_strretcode (rc));
goto fail_w_socket;
@ -264,24 +247,25 @@ static void ddsi_tcp_node_free (void * ptr)
static void ddsi_tcp_conn_connect (ddsi_tcp_conn_t conn, const ddsrt_msghdr_t * msg)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_base.m_factory;
struct ddsi_domaingv const * const gv = fact->fact.gv;
char buff[DDSI_LOCSTRLEN];
ddsrt_socket_t sock;
dds_return_t ret;
if (ddsi_tcp_sock_new (fact, &sock, 0) == DDS_RETCODE_OK)
if (ddsi_tcp_sock_new (fact, &sock, 0) != DDS_RETCODE_OK)
{
/* error messages are logged by ddsi_tcp_sock_new */
return;
}
/* Attempt to connect, expected that may fail */
do {
ret = ddsrt_connect(sock, msg->msg_name, msg->msg_namelen);
} while (ret == DDS_RETCODE_INTERRUPTED);
if (ret != DDS_RETCODE_OK)
{
ddsi_tcp_sock_free (&conn->m_base.m_base.gv->logconfig, sock, NULL);
return;
}
ddsi_tcp_conn_set_socket (conn, sock);
goto fail_w_socket;
ddsi_tcp_conn_set_socket (conn, sock);
#ifdef DDSI_INCLUDE_SSL
if (fact->ddsi_tcp_ssl_plugin.connect)
{
@ -289,13 +273,13 @@ static void ddsi_tcp_conn_connect (ddsi_tcp_conn_t conn, const ddsrt_msghdr_t *
if (conn->m_ssl == NULL)
{
ddsi_tcp_conn_set_socket (conn, DDSRT_INVALID_SOCKET);
return;
goto fail_w_socket;
}
}
#endif
sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *) msg->msg_name);
DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp connect socket %"PRIdSOCK" port %u to %s\n", sock, get_socket_port (&conn->m_base.m_base.gv->logconfig, sock), buff);
GVLOG (DDS_LC_TCP, "tcp connect socket %"PRIdSOCK" port %u to %s\n", sock, get_socket_port (gv, sock), buff);
/* Also may need to receive on connection so add to waitset */
@ -305,11 +289,15 @@ static void ddsi_tcp_conn_connect (ddsi_tcp_conn_t conn, const ddsrt_msghdr_t *
assert (conn->m_base.m_base.gv->recv_threads[0].arg.mode == RTM_MANY);
os_sockWaitsetAdd (conn->m_base.m_base.gv->recv_threads[0].arg.u.many.ws, &conn->m_base);
os_sockWaitsetTrigger (conn->m_base.m_base.gv->recv_threads[0].arg.u.many.ws);
}
return;
fail_w_socket:
ddsi_tcp_sock_free (gv, sock, NULL);
}
static void ddsi_tcp_cache_add (struct ddsi_tran_factory_tcp *fact, ddsi_tcp_conn_t conn, ddsrt_avl_ipath_t * path)
{
struct ddsi_domaingv * const gv = fact->fact.gv;
const char * action = "added";
ddsi_tcp_node_t node;
char buff[DDSI_LOCSTRLEN];
@ -342,13 +330,14 @@ static void ddsi_tcp_cache_add (struct ddsi_tran_factory_tcp *fact, ddsi_tcp_con
}
}
sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr);
DDS_CLOG (DDS_LC_TCP, &fact->fact.gv->logconfig, "tcp cache %s %s socket %"PRIdSOCK" to %s\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff);
sockaddr_to_string_with_port(fact, buff, sizeof(buff), &conn->m_peer_addr.a);
GVLOG (DDS_LC_TCP, "tcp cache %s %s socket %"PRIdSOCK" to %s\n", action, conn->m_base.m_server ? "server" : "client", conn->m_sock, buff);
}
static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_base.m_factory;
struct ddsi_domaingv * const gv = fact->fact.gv;
char buff[DDSI_LOCSTRLEN];
ddsi_tcp_node_t node;
ddsrt_avl_dpath_t path;
@ -357,8 +346,8 @@ static void ddsi_tcp_cache_remove (ddsi_tcp_conn_t conn)
node = ddsrt_avl_lookup_dpath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, conn, &path);
if (node)
{
sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr);
DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp cache removed socket %"PRIdSOCK" to %s\n", conn->m_sock, buff);
sockaddr_to_string_with_port(fact, buff, sizeof(buff), &conn->m_peer_addr.a);
GVLOG (DDS_LC_TCP, "tcp cache removed socket %"PRIdSOCK" to %s\n", conn->m_sock, buff);
ddsrt_avl_delete_dpath (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, node, &path);
ddsi_tcp_node_free (node);
}
@ -399,7 +388,7 @@ static ddsi_tcp_conn_t ddsi_tcp_cache_find (struct ddsi_tran_factory_tcp *fact,
}
if (ret == NULL)
{
ret = ddsi_tcp_new_conn (fact, DDSRT_INVALID_SOCKET, false, (struct sockaddr *)&key.m_peer_addr);
ret = ddsi_tcp_new_conn (fact, DDSRT_INVALID_SOCKET, false, &key.m_peer_addr.a);
ddsi_tcp_cache_add (fact, ret, &path);
}
ddsrt_mutex_unlock (&fact->ddsi_tcp_cache_lock_g);
@ -425,7 +414,7 @@ static ssize_t ddsi_tcp_conn_read_ssl (ddsi_tcp_conn_t tcp, void * buf, size_t l
}
#endif
static bool ddsi_tcp_select (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t sock, bool read, size_t pos, int64_t timeout)
static bool ddsi_tcp_select (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, bool read, size_t pos, int64_t timeout)
{
dds_return_t rc;
fd_set fds;
@ -443,22 +432,29 @@ static bool ddsi_tcp_select (const struct ddsrt_log_cfg *logcfg, ddsrt_socket_t
DDSRT_WARNING_GNUC_ON(sign-conversion)
#endif
DDS_CLOG (DDS_LC_TCP, logcfg, "tcp blocked %s: sock %d\n", read ? "read" : "write", (int) sock);
GVLOG (DDS_LC_TCP, "tcp blocked %s: sock %d\n", read ? "read" : "write", (int) sock);
do {
rc = ddsrt_select (sock + 1, rdset, wrset, NULL, tval, &ready);
} while (rc == DDS_RETCODE_INTERRUPTED);
if (rc != DDS_RETCODE_OK)
{
DDS_CWARNING (logcfg, "tcp abandoning %s on blocking socket %d after %"PRIuSIZE" bytes\n", read ? "read" : "write", (int) sock, pos);
GVWARNING ("tcp abandoning %s on blocking socket %d after %"PRIuSIZE" bytes\n", read ? "read" : "write", (int) sock, pos);
}
return (ready > 0);
}
static int32_t addrfam_to_locator_kind (int af)
{
assert (af == AF_INET || af == AF_INET6);
return (af == AF_INET) ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6;
}
static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char *buf, size_t len, bool allow_spurious, nn_locator_t *srcloc)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_factory;
struct ddsi_domaingv const * const gv = fact->fact.gv;
dds_return_t rc;
ddsi_tcp_conn_t tcp = (ddsi_tcp_conn_t) conn;
ssize_t (*rd) (ddsi_tcp_conn_t, void *, size_t, dds_return_t * err) = ddsi_tcp_conn_read_plain;
@ -482,14 +478,15 @@ static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char *buf, si
{
if (srcloc)
{
ddsi_ipaddr_to_loc(&fact->fact, srcloc, (struct sockaddr *)&tcp->m_peer_addr, tcp->m_peer_addr.ss_family == AF_INET ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6);
const int32_t kind = addrfam_to_locator_kind (tcp->m_peer_addr.a.sa_family);
ddsi_ipaddr_to_loc(&fact->fact, srcloc, &tcp->m_peer_addr.a, kind);
}
return (ssize_t) pos;
}
}
else if (n == 0)
{
DDS_CLOG (DDS_LC_TCP, &conn->m_base.gv->logconfig, "tcp read: sock %"PRIdSOCK" closed-by-peer\n", tcp->m_sock);
GVLOG (DDS_LC_TCP, "tcp read: sock %"PRIdSOCK" closed-by-peer\n", tcp->m_sock);
break;
}
else
@ -500,13 +497,13 @@ static ssize_t ddsi_tcp_conn_read (ddsi_tran_conn_t conn, unsigned char *buf, si
{
if (allow_spurious && pos == 0)
return 0;
const int64_t timeout = conn->m_base.gv->config.tcp_read_timeout;
if (ddsi_tcp_select (&conn->m_base.gv->logconfig, tcp->m_sock, true, pos, timeout) == false)
const int64_t timeout = gv->config.tcp_read_timeout;
if (ddsi_tcp_select (gv, tcp->m_sock, true, pos, timeout) == false)
break;
}
else
{
DDS_CLOG (DDS_LC_TCP, &conn->m_base.gv->logconfig, "tcp read: sock %"PRIdSOCK" error %"PRId32"\n", tcp->m_sock, rc);
GVLOG (DDS_LC_TCP, "tcp read: sock %"PRIdSOCK" error %"PRId32"\n", tcp->m_sock, rc);
break;
}
}
@ -542,6 +539,7 @@ static ssize_t ddsi_tcp_block_write (ssize_t (*wr) (ddsi_tcp_conn_t, const void
{
/* Write all bytes of buf even in the presence of signals,
partial writes and blocking (typically write buffer full) */
struct ddsi_domaingv const * const gv = conn->m_base.m_base.gv;
dds_return_t rc;
size_t pos = 0;
ssize_t n = -1;
@ -559,15 +557,15 @@ static ssize_t ddsi_tcp_block_write (ssize_t (*wr) (ddsi_tcp_conn_t, const void
{
if (rc == DDS_RETCODE_TRY_AGAIN)
{
const int64_t timeout = conn->m_base.m_base.gv->config.tcp_write_timeout;
if (ddsi_tcp_select (&conn->m_base.m_base.gv->logconfig, conn->m_sock, false, pos, timeout) == false)
const int64_t timeout = gv->config.tcp_write_timeout;
if (ddsi_tcp_select (gv, conn->m_sock, false, pos, timeout) == false)
{
break;
}
}
else
{
DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" error %"PRId32"\n", conn->m_sock, rc);
GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" error %"PRId32"\n", conn->m_sock, rc);
break;
}
}
@ -594,6 +592,7 @@ static void set_msghdr_iov (ddsrt_msghdr_t *mhdr, ddsrt_iovec_t *iov, size_t iov
static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *dst, size_t niov, const ddsrt_iovec_t *iov, uint32_t flags)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) base->m_factory;
struct ddsi_domaingv const * const gv = fact->fact.gv;
#ifdef DDSI_INCLUDE_SSL
char msgbuf[4096]; /* stack buffer for merging smallish writes without requiring allocations */
ddsrt_iovec_t iovec; /* iovec used for msgbuf */
@ -604,13 +603,16 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d
int piecewise;
bool connect = false;
ddsrt_msghdr_t msg;
struct sockaddr_storage dstaddr;
union {
struct sockaddr_storage x;
union addr a;
} dstaddr;
assert(niov <= INT_MAX);
ddsi_ipaddr_from_loc(&dstaddr, dst);
ddsi_ipaddr_from_loc(&dstaddr.x, dst);
memset(&msg, 0, sizeof(msg));
set_msghdr_iov (&msg, (ddsrt_iovec_t *) iov, niov);
msg.msg_name = &dstaddr;
msg.msg_namelen = (socklen_t) ddsrt_sockaddr_get_size((struct sockaddr *) &dstaddr);
msg.msg_namelen = ddsrt_sockaddr_get_size(&dstaddr.a.a);
#if DDSRT_MSGHDR_FLAGS
msg.msg_flags = (int) flags;
#endif
@ -643,13 +645,13 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d
if (!connect && ((flags & DDSI_TRAN_ON_CONNECT) != 0))
{
DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" message filtered\n", conn->m_sock);
GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" message filtered\n", conn->m_sock);
ddsrt_mutex_unlock (&conn->m_mutex);
return (ssize_t) len;
}
#ifdef DDSI_INCLUDE_SSL
if (base->m_base.gv->config.ssl_enable)
if (gv->config.ssl_enable)
{
/* SSL doesn't have sendmsg, ret = 0 so writing starts at first byte.
Rumor is that it is much better to merge small writes, which do here
@ -702,11 +704,11 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d
{
case DDS_RETCODE_NO_CONNECTION:
case DDS_RETCODE_ILLEGAL_OPERATION:
DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" DDS_RETCODE_NO_CONNECTION\n", conn->m_sock);
GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" DDS_RETCODE_NO_CONNECTION\n", conn->m_sock);
break;
default:
if (! conn->m_base.m_closed && (conn->m_sock != DDSRT_INVALID_SOCKET))
DDS_CWARNING (&conn->m_base.m_base.gv->logconfig, "tcp write failed on socket %"PRIdSOCK" with errno %"PRId32"\n", conn->m_sock, rc);
GVWARNING ("tcp write failed on socket %"PRIdSOCK" with errno %"PRId32"\n", conn->m_sock, rc);
break;
}
}
@ -715,7 +717,7 @@ static ssize_t ddsi_tcp_conn_write (ddsi_tran_conn_t base, const nn_locator_t *d
{
if (ret == 0)
{
DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp write: sock %"PRIdSOCK" eof\n", conn->m_sock);
GVLOG (DDS_LC_TCP, "tcp write: sock %"PRIdSOCK" eof\n", conn->m_sock);
}
piecewise = (ret > 0 && (size_t) ret < len);
}
@ -783,12 +785,13 @@ static int ddsi_tcp_locator (struct ddsi_tran_factory *fact_cmn, ddsi_tran_base_
return 0;
}
static ddsi_tran_conn_t ddsi_tcp_create_conn (struct ddsi_tran_factory *fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos)
static dds_return_t ddsi_tcp_create_conn (ddsi_tran_conn_t *conn_out, struct ddsi_tran_factory *fact_cmn, uint32_t port, const struct ddsi_tran_qos *qos)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn;
(void) qos;
(void) port;
return &fact->ddsi_tcp_conn_client.m_base;
*conn_out = &fact->ddsi_tcp_conn_client.m_base;
return DDS_RETCODE_OK;
}
static int ddsi_tcp_listen (ddsi_tran_listener_t listener)
@ -812,10 +815,11 @@ static int ddsi_tcp_listen (ddsi_tran_listener_t listener)
static ddsi_tran_conn_t ddsi_tcp_accept (ddsi_tran_listener_t listener)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) listener->m_factory;
struct ddsi_domaingv const * const gv = fact->fact.gv;
ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) listener;
ddsi_tcp_conn_t tcp = NULL;
ddsrt_socket_t sock = DDSRT_INVALID_SOCKET;
struct sockaddr_storage addr;
union addr addr;
socklen_t addrlen = sizeof (addr);
char buff[DDSI_LOCSTRLEN];
dds_return_t rc = DDS_RETCODE_OK;
@ -839,31 +843,31 @@ static ddsi_tran_conn_t ddsi_tcp_accept (ddsi_tran_listener_t listener)
{
rc = ddsrt_accept(tl->m_sock, NULL, NULL, &sock);
}
if (!ddsrt_atomic_ld32(&listener->m_base.gv->rtps_keepgoing))
if (!ddsrt_atomic_ld32(&gv->rtps_keepgoing))
{
ddsi_tcp_sock_free (&listener->m_base.gv->logconfig, sock, NULL);
ddsi_tcp_sock_free (gv, sock, NULL);
return NULL;
}
} while (rc == DDS_RETCODE_INTERRUPTED || rc == DDS_RETCODE_TRY_AGAIN);
if (sock == DDSRT_INVALID_SOCKET)
{
(void)ddsrt_getsockname (tl->m_sock, (struct sockaddr *) &addr, &addrlen);
sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&addr);
DDS_CLOG ((rc == DDS_RETCODE_OK) ? DDS_LC_ERROR : DDS_LC_FATAL, &listener->m_base.gv->logconfig, "tcp accept failed on socket %"PRIdSOCK" at %s retcode %"PRId32"\n", tl->m_sock, buff, rc);
(void)ddsrt_getsockname (tl->m_sock, &addr.a, &addrlen);
sockaddr_to_string_with_port(fact, buff, sizeof(buff), &addr.a);
GVLOG ((rc == DDS_RETCODE_OK) ? DDS_LC_ERROR : DDS_LC_FATAL, "tcp accept failed on socket %"PRIdSOCK" at %s retcode %"PRId32"\n", tl->m_sock, buff, rc);
}
else if (getpeername (sock, (struct sockaddr *) &addr, &addrlen) == -1)
else if (getpeername (sock, &addr.a, &addrlen) == -1)
{
DDS_CWARNING (&listener->m_base.gv->logconfig, "tcp accepted new socket %"PRIdSOCK" on socket %"PRIdSOCK" but no peer address, errno %"PRId32"\n", sock, tl->m_sock, rc);
GVWARNING ("tcp accepted new socket %"PRIdSOCK" on socket %"PRIdSOCK" but no peer address, errno %"PRId32"\n", sock, tl->m_sock, rc);
ddsrt_close (sock);
}
else
{
sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&addr);
DDS_CLOG (DDS_LC_TCP, &listener->m_base.gv->logconfig, "tcp accept new socket %"PRIdSOCK" on socket %"PRIdSOCK" from %s\n", sock, tl->m_sock, buff);
sockaddr_to_string_with_port(fact, buff, sizeof(buff), &addr.a);
GVLOG (DDS_LC_TCP, "tcp accept new socket %"PRIdSOCK" on socket %"PRIdSOCK" from %s\n", sock, tl->m_sock, buff);
(void)ddsrt_setsocknonblocking (sock, true);
tcp = ddsi_tcp_new_conn (fact, sock, true, (struct sockaddr *)&addr);
tcp = ddsi_tcp_new_conn (fact, sock, true, &addr.a);
#ifdef DDSI_INCLUDE_SSL
tcp->m_ssl = ssl;
#endif
@ -891,14 +895,20 @@ static ddsrt_socket_t ddsi_tcp_listener_handle (ddsi_tran_base_t base)
caller (supporting call back over NAT).
*/
static void addr_to_loc (const struct ddsi_tran_factory *fact, nn_locator_t *loc, const union addr *addr)
{
ddsi_ipaddr_to_loc (fact, loc, &addr->a, addrfam_to_locator_kind (addr->a.sa_family));
}
static void ddsi_tcp_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc)
{
struct ddsi_domaingv const * const gv = conn->m_base.gv;
char buff[DDSI_LOCSTRLEN];
ddsi_tcp_conn_t tc = (ddsi_tcp_conn_t) conn;
assert (tc->m_sock != DDSRT_INVALID_SOCKET);
ddsi_ipaddr_to_loc (conn->m_factory, loc, (struct sockaddr *)&tc->m_peer_addr, tc->m_peer_addr.ss_family == AF_INET ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6);
addr_to_loc (conn->m_factory, loc, &tc->m_peer_addr);
ddsi_locator_to_string(buff, sizeof(buff), loc);
DDS_CLOG (DDS_LC_TCP, &conn->m_base.gv->logconfig, "(tcp EP:%s)", buff);
GVLOG (DDS_LC_TCP, "(tcp EP:%s)", buff);
}
static void ddsi_tcp_base_init (const struct ddsi_tran_factory_tcp *fact, struct ddsi_tran_conn *base)
@ -915,7 +925,7 @@ static void ddsi_tcp_base_init (const struct ddsi_tran_factory_tcp *fact, struct
static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, ddsrt_socket_t sock, bool server, struct sockaddr * peer)
{
ddsi_tcp_conn_t conn = (ddsi_tcp_conn_t) ddsrt_malloc (sizeof (*conn));
ddsi_tcp_conn_t conn = ddsrt_malloc (sizeof (*conn));
memset (conn, 0, sizeof (*conn));
ddsi_tcp_base_init (fact, &conn->m_base);
@ -930,19 +940,30 @@ static ddsi_tcp_conn_t ddsi_tcp_new_conn (struct ddsi_tran_factory_tcp *fact, dd
return conn;
}
static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
static dds_return_t ddsi_tcp_create_listener (ddsi_tran_listener_t *listener_out, ddsi_tran_factory_t fact, uint32_t port, const struct ddsi_tran_qos *qos)
{
struct ddsi_tran_factory_tcp * const fact_tcp = (struct ddsi_tran_factory_tcp *) fact;
struct ddsi_domaingv * const gv = fact_tcp->fact.gv;
struct ddsi_domaingv const * const gv = fact_tcp->fact.gv;
ddsrt_socket_t sock;
(void) qos;
if (ddsi_tcp_sock_new (fact_tcp, &sock, (unsigned short) port) != DDS_RETCODE_OK)
return NULL;
if (ddsi_tcp_sock_new (fact_tcp, &sock, (uint16_t) port) != DDS_RETCODE_OK)
return DDS_RETCODE_ERROR;
char buff[DDSI_LOCSTRLEN];
union addr addr;
socklen_t addrlen = sizeof (addr);
dds_return_t ret;
ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) ddsrt_malloc (sizeof (*tl));
if ((ret = ddsrt_getsockname (sock, &addr.a, &addrlen)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_create_listener: ddsrt_getsockname returned %"PRId32"\n", ret);
ddsi_tcp_sock_free (gv, sock, NULL);
return DDS_RETCODE_ERROR;
}
sockaddr_to_string_with_port (fact_tcp, buff, sizeof (buff), &addr.a);
GVLOG (DDS_LC_TCP, "tcp create listener socket %"PRIdSOCK" on %s\n", sock, buff);
ddsi_tcp_listener_t tl = ddsrt_malloc (sizeof (*tl));
memset (tl, 0, sizeof (*tl));
tl->m_sock = sock;
@ -952,33 +973,21 @@ static ddsi_tran_listener_t ddsi_tcp_create_listener (ddsi_tran_factory_t fact,
tl->m_base.m_accept_fn = ddsi_tcp_accept;
tl->m_base.m_factory = fact;
tl->m_base.m_base.m_port = get_socket_port (&fact->gv->logconfig, sock);
tl->m_base.m_base.m_port = get_socket_port (gv, sock);
tl->m_base.m_base.m_trantype = DDSI_TRAN_LISTENER;
tl->m_base.m_base.m_handle_fn = ddsi_tcp_listener_handle;
tl->m_base.m_locator_fn = ddsi_tcp_locator;
struct sockaddr_storage addr;
socklen_t addrlen = sizeof (addr);
if ((ret = ddsrt_getsockname (sock, (struct sockaddr *) &addr, &addrlen)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_tcp_create_listener: ddsrt_getsockname returned %"PRId32"\n", ret);
ddsi_tcp_sock_free (&fact->gv->logconfig, sock, NULL);
ddsrt_free (tl);
return NULL;
}
char buff[DDSI_LOCSTRLEN];
sockaddr_to_string_with_port (fact_tcp, buff, sizeof (buff), (struct sockaddr *) &addr);
GVLOG (DDS_LC_TCP, "tcp create listener socket %"PRIdSOCK" on %s\n", sock, buff);
return &tl->m_base;
*listener_out = &tl->m_base;
return DDS_RETCODE_OK;
}
static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) conn->m_base.m_factory;
struct ddsi_domaingv const * const gv = fact->fact.gv;
char buff[DDSI_LOCSTRLEN];
sockaddr_to_string_with_port(fact, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr);
DDS_CLOG (DDS_LC_TCP, &conn->m_base.m_base.gv->logconfig, "tcp free %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff);
sockaddr_to_string_with_port(fact, buff, sizeof(buff), &conn->m_peer_addr.a);
GVLOG (DDS_LC_TCP, "tcp free %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff);
#ifdef DDSI_INCLUDE_SSL
if (fact->ddsi_tcp_ssl_plugin.ssl_free)
@ -988,7 +997,7 @@ static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn)
else
#endif
{
ddsi_tcp_sock_free (&conn->m_base.m_base.gv->logconfig, conn->m_sock, "connection");
ddsi_tcp_sock_free (gv, conn->m_sock, "connection");
}
ddsrt_mutex_destroy (&conn->m_mutex);
ddsrt_free (conn);
@ -997,17 +1006,18 @@ static void ddsi_tcp_conn_delete (ddsi_tcp_conn_t conn)
static void ddsi_tcp_close_conn (ddsi_tran_conn_t tc)
{
struct ddsi_tran_factory_tcp * const fact_tcp = (struct ddsi_tran_factory_tcp *) tc->m_factory;
struct ddsi_domaingv * const gv = fact_tcp->fact.gv;
if (tc != &fact_tcp->ddsi_tcp_conn_client.m_base)
{
char buff[DDSI_LOCSTRLEN];
nn_locator_t loc;
ddsi_tcp_conn_t conn = (ddsi_tcp_conn_t) tc;
sockaddr_to_string_with_port(fact_tcp, buff, sizeof(buff), (struct sockaddr *)&conn->m_peer_addr);
DDS_CLOG (DDS_LC_TCP, &tc->m_base.gv->logconfig, "tcp close %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff);
sockaddr_to_string_with_port(fact_tcp, buff, sizeof(buff), &conn->m_peer_addr.a);
GVLOG (DDS_LC_TCP, "tcp close %s connnection on socket %"PRIdSOCK" to %s\n", conn->m_base.m_server ? "server" : "client", conn->m_sock, buff);
(void) shutdown (conn->m_sock, 2);
ddsi_ipaddr_to_loc(&fact_tcp->fact, &loc, (struct sockaddr *)&conn->m_peer_addr, conn->m_peer_addr.ss_family == AF_INET ? NN_LOCATOR_KIND_TCPv4 : NN_LOCATOR_KIND_TCPv6);
ddsi_ipaddr_to_loc(&fact_tcp->fact, &loc, &conn->m_peer_addr.a, addrfam_to_locator_kind(conn->m_peer_addr.a.sa_family));
loc.port = conn->m_peer_port;
purge_proxy_participants (conn->m_base.m_base.gv, &loc, conn->m_base.m_server);
purge_proxy_participants (gv, &loc, conn->m_base.m_server);
}
}
@ -1023,7 +1033,7 @@ static void ddsi_tcp_release_conn (ddsi_tran_conn_t conn)
static void ddsi_tcp_unblock_listener (ddsi_tran_listener_t listener)
{
struct ddsi_tran_factory_tcp * const fact_tcp = (struct ddsi_tran_factory_tcp *) listener->m_factory;
struct ddsi_domaingv * const gv = fact_tcp->fact.gv;
struct ddsi_domaingv const * const gv = fact_tcp->fact.gv;
ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) listener;
ddsrt_socket_t sock;
dds_return_t ret;
@ -1032,44 +1042,40 @@ static void ddsi_tcp_unblock_listener (ddsi_tran_listener_t listener)
if (ddsi_tcp_sock_new (fact_tcp, &sock, 0) != DDS_RETCODE_OK)
goto fail;
struct sockaddr_storage addr;
union addr addr;
socklen_t addrlen = sizeof (addr);
if ((ret = ddsrt_getsockname (tl->m_sock, (struct sockaddr *) &addr, &addrlen)) != DDS_RETCODE_OK)
if ((ret = ddsrt_getsockname (tl->m_sock, &addr.a, &addrlen)) != DDS_RETCODE_OK)
{
GVWARNING ("tcp failed to get listener address error %"PRId32"\n", ret);
goto fail_w_socket;
}
switch (addr.ss_family)
switch (addr.a.sa_family)
{
case AF_INET: {
struct sockaddr_in *socketname = (struct sockaddr_in *) &addr;
if (socketname->sin_addr.s_addr == htonl (INADDR_ANY))
socketname->sin_addr.s_addr = htonl (INADDR_LOOPBACK);
case AF_INET:
if (addr.a4.sin_addr.s_addr == htonl (INADDR_ANY))
addr.a4.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
break;
}
#if DDSRT_HAVE_IPV6
case AF_INET6: {
struct sockaddr_in6 *socketname = (struct sockaddr_in6 *) &addr;
if (memcmp (&socketname->sin6_addr, &ddsrt_in6addr_any, sizeof (socketname->sin6_addr)) == 0)
socketname->sin6_addr = ddsrt_in6addr_loopback;
case AF_INET6:
if (memcmp (&addr.a6.sin6_addr, &ddsrt_in6addr_any, sizeof (addr.a6.sin6_addr)) == 0)
addr.a6.sin6_addr = ddsrt_in6addr_loopback;
break;
}
#endif
}
do {
ret = ddsrt_connect (sock, (struct sockaddr *) &addr, ddsrt_sockaddr_get_size ((struct sockaddr *) &addr));
ret = ddsrt_connect (sock, &addr.a, ddsrt_sockaddr_get_size (&addr.a));
} while (ret == DDS_RETCODE_INTERRUPTED);
if (ret != DDS_RETCODE_OK)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) listener->m_factory;
char buff[DDSI_LOCSTRLEN];
sockaddr_to_string_with_port (fact, buff, sizeof (buff), (struct sockaddr *) &addr);
sockaddr_to_string_with_port (fact, buff, sizeof (buff), &addr.a);
GVWARNING ("tcp failed to connect to own listener (%s) error %"PRId32"\n", buff, ret);
}
fail_w_socket:
ddsi_tcp_sock_free (&listener->m_base.gv->logconfig, sock, NULL);
ddsi_tcp_sock_free (gv, sock, NULL);
fail:
return;
}
@ -1077,6 +1083,7 @@ fail:
static void ddsi_tcp_release_listener (ddsi_tran_listener_t listener)
{
ddsi_tcp_listener_t tl = (ddsi_tcp_listener_t) listener;
struct ddsi_domaingv const * const gv = tl->m_base.m_base.gv;
#ifdef DDSI_INCLUDE_SSL
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) listener->m_factory;
if (fact->ddsi_tcp_ssl_plugin.bio_vfree)
@ -1084,13 +1091,14 @@ static void ddsi_tcp_release_listener (ddsi_tran_listener_t listener)
(fact->ddsi_tcp_ssl_plugin.bio_vfree) (tl->m_bio);
}
#endif
ddsi_tcp_sock_free (&listener->m_base.gv->logconfig, tl->m_sock, "listener");
ddsi_tcp_sock_free (gv, tl->m_sock, "listener");
ddsrt_free (tl);
}
static void ddsi_tcp_release_factory (struct ddsi_tran_factory *fact_cmn)
{
struct ddsi_tran_factory_tcp * const fact = (struct ddsi_tran_factory_tcp *) fact_cmn;
struct ddsi_domaingv const * const gv = fact->fact.gv;
ddsrt_avl_free (&ddsi_tcp_treedef, &fact->ddsi_tcp_cache_g, ddsi_tcp_node_free);
ddsrt_mutex_destroy (&fact->ddsi_tcp_cache_lock_g);
#ifdef DDSI_INCLUDE_SSL
@ -1099,7 +1107,7 @@ static void ddsi_tcp_release_factory (struct ddsi_tran_factory *fact_cmn)
(fact->ddsi_tcp_ssl_plugin.fini) ();
}
#endif
DDS_CLOG (DDS_LC_CONFIG, &fact_cmn->gv->logconfig, "tcp de-initialized\n");
GVLOG (DDS_LC_CONFIG, "tcp de-initialized\n");
ddsrt_free (fact);
}

View file

@ -24,13 +24,13 @@
extern inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn);
extern inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn);
extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline dds_return_t ddsi_factory_create_listener (ddsi_tran_listener_t *listener, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int32_t kind);
extern inline int ddsi_is_valid_port (ddsi_tran_factory_t factory, uint32_t port);
extern inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn);
extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc);
extern inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base);
extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline dds_return_t ddsi_factory_create_conn (ddsi_tran_conn_t *conn, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc);
extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener);
extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener);

View file

@ -26,6 +26,13 @@
#include "dds/ddsi/q_pcap.h"
#include "dds/ddsi/ddsi_domaingv.h"
union addr {
struct sockaddr_storage x;
struct sockaddr a;
struct sockaddr_in a4;
struct sockaddr_in6 a6;
};
typedef struct ddsi_udp_conn {
struct ddsi_tran_conn m_base;
ddsrt_socket_t m_sock;
@ -35,20 +42,27 @@ typedef struct ddsi_udp_conn {
int m_diffserv;
} *ddsi_udp_conn_t;
static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc)
static void addr_to_loc (const struct ddsi_tran_factory *tran, nn_locator_t *dst, const union addr *src)
{
ddsi_ipaddr_to_loc (tran, dst, &src->a, (src->a.sa_family == AF_INET) ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6);
}
static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn_cmn, unsigned char * buf, size_t len, bool allow_spurious, nn_locator_t *srcloc)
{
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
struct ddsi_domaingv * const gv = conn->m_base.m_base.gv;
dds_return_t rc;
ssize_t ret = 0;
ddsrt_msghdr_t msghdr;
struct sockaddr_storage src;
union addr src;
ddsrt_iovec_t msg_iov;
socklen_t srclen = (socklen_t) sizeof (src);
(void) allow_spurious;
msg_iov.iov_base = (void*) buf;
msg_iov.iov_len = (ddsrt_iov_len_t)len; /* Windows uses unsigned, POSIX (except Linux) int */
msg_iov.iov_base = (void *) buf;
msg_iov.iov_len = (ddsrt_iov_len_t) len; /* Windows uses unsigned, POSIX (except Linux) int */
msghdr.msg_name = &src;
msghdr.msg_name = &src.x;
msghdr.msg_namelen = srclen;
msghdr.msg_iov = &msg_iov;
msghdr.msg_iovlen = 1;
@ -61,65 +75,67 @@ static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, s
#endif
do {
rc = ddsrt_recvmsg(((ddsi_udp_conn_t) conn)->m_sock, &msghdr, 0, &ret);
rc = ddsrt_recvmsg (conn->m_sock, &msghdr, 0, &ret);
} while (rc == DDS_RETCODE_INTERRUPTED);
if (ret > 0)
{
if (srcloc)
ddsi_ipaddr_to_loc(conn->m_factory, srcloc, (struct sockaddr *)&src, src.ss_family == AF_INET ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6);
addr_to_loc (conn->m_base.m_factory, srcloc, &src);
if(conn->m_base.gv->pcap_fp)
if (gv->pcap_fp)
{
struct sockaddr_storage dest;
union addr dest;
socklen_t dest_len = sizeof (dest);
if (ddsrt_getsockname (((ddsi_udp_conn_t) conn)->m_sock, (struct sockaddr *) &dest, &dest_len) != DDS_RETCODE_OK)
memset(&dest, 0, sizeof(dest));
write_pcap_received(conn->m_base.gv, ddsrt_time_wallclock(), &src, &dest, buf, (size_t) ret);
if (ddsrt_getsockname (conn->m_sock, &dest.a, &dest_len) != DDS_RETCODE_OK)
memset (&dest, 0, sizeof (dest));
write_pcap_received (gv, ddsrt_time_wallclock (), &src.x, &dest.x, buf, (size_t) ret);
}
/* Check for udp packet truncation */
if ((((size_t) ret) > len)
#if DDSRT_MSGHDR_FLAGS
|| (msghdr.msg_flags & MSG_TRUNC)
const bool trunc_flag = (msghdr.msg_flags & MSG_TRUNC) != 0;
#else
const bool trunc_flag = false;
#endif
)
if ((size_t) ret > len || trunc_flag)
{
char addrbuf[DDSI_LOCSTRLEN];
nn_locator_t tmp;
ddsi_ipaddr_to_loc(conn->m_factory, &tmp, (struct sockaddr *)&src, src.ss_family == AF_INET ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6);
ddsi_locator_to_string(addrbuf, sizeof(addrbuf), &tmp);
DDS_CWARNING(&conn->m_base.gv->logconfig, "%s => %d truncated to %d\n", addrbuf, (int)ret, (int)len);
addr_to_loc (conn->m_base.m_factory, &tmp, &src);
ddsi_locator_to_string (addrbuf, sizeof (addrbuf), &tmp);
GVWARNING ("%s => %d truncated to %d\n", addrbuf, (int) ret, (int) len);
}
}
else if (rc != DDS_RETCODE_BAD_PARAMETER &&
rc != DDS_RETCODE_NO_CONNECTION)
else if (rc != DDS_RETCODE_BAD_PARAMETER && rc != DDS_RETCODE_NO_CONNECTION)
{
DDS_CERROR(&conn->m_base.gv->logconfig, "UDP recvmsg sock %d: ret %d retcode %"PRId32"\n", (int) ((ddsi_udp_conn_t) conn)->m_sock, (int) ret, rc);
GVERROR ("UDP recvmsg sock %d: ret %d retcode %"PRId32"\n", (int) conn->m_sock, (int) ret, rc);
ret = -1;
}
return ret;
}
static void set_msghdr_iov (ddsrt_msghdr_t *mhdr, ddsrt_iovec_t *iov, size_t iovlen)
static void set_msghdr_iov (ddsrt_msghdr_t *mhdr, const ddsrt_iovec_t *iov, size_t iovlen)
{
mhdr->msg_iov = iov;
mhdr->msg_iovlen = (ddsrt_msg_iovlen_t)iovlen;
mhdr->msg_iov = (ddsrt_iovec_t *) iov;
mhdr->msg_iovlen = (ddsrt_msg_iovlen_t) iovlen;
}
static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const ddsrt_iovec_t *iov, uint32_t flags)
static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn_cmn, const nn_locator_t *dst, size_t niov, const ddsrt_iovec_t *iov, uint32_t flags)
{
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
struct ddsi_domaingv * const gv = conn->m_base.m_base.gv;
dds_return_t rc;
ssize_t ret = -1;
unsigned retry = 2;
int sendflags = 0;
ddsrt_msghdr_t msg;
struct sockaddr_storage dstaddr;
assert(niov <= INT_MAX);
ddsi_ipaddr_from_loc(&dstaddr, dst);
set_msghdr_iov (&msg, (ddsrt_iovec_t *) iov, niov);
msg.msg_name = &dstaddr;
msg.msg_namelen = (socklen_t) ddsrt_sockaddr_get_size((struct sockaddr *) &dstaddr);
union addr dstaddr;
assert (niov <= INT_MAX);
ddsi_ipaddr_from_loc (&dstaddr.x, dst);
set_msghdr_iov (&msg, iov, niov);
msg.msg_name = &dstaddr.x;
msg.msg_namelen = (socklen_t) ddsrt_sockaddr_get_size (&dstaddr.a);
#if defined(__sun) && !defined(_XPG4_2)
msg.msg_accrights = NULL;
msg.msg_accrightslen = 0;
@ -130,56 +146,53 @@ static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *d
#if DDSRT_MSGHDR_FLAGS
msg.msg_flags = (int) flags;
#else
DDSRT_UNUSED_ARG(flags);
DDSRT_UNUSED_ARG (flags);
#endif
#if MSG_NOSIGNAL && !LWIP_SOCKET
sendflags |= MSG_NOSIGNAL;
#endif
do {
ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn;
rc = ddsrt_sendmsg (uc->m_sock, &msg, sendflags, &ret);
rc = ddsrt_sendmsg (conn->m_sock, &msg, sendflags, &ret);
#if defined _WIN32 && !defined WINCE
if (rc == DDS_RETCODE_TRY_AGAIN) {
if (rc == DDS_RETCODE_TRY_AGAIN)
{
WSANETWORKEVENTS ev;
WaitForSingleObject(uc->m_sockEvent, INFINITE);
WSAEnumNetworkEvents(uc->m_sock, uc->m_sockEvent, &ev);
WaitForSingleObject (conn->m_sockEvent, INFINITE);
WSAEnumNetworkEvents (conn->m_sock, conn->m_sockEvent, &ev);
}
#endif
} while ((rc == DDS_RETCODE_INTERRUPTED) ||
(rc == DDS_RETCODE_TRY_AGAIN) ||
(rc == DDS_RETCODE_NOT_ALLOWED && retry-- > 0));
if (ret > 0 && conn->m_base.gv->pcap_fp)
} while (rc == DDS_RETCODE_INTERRUPTED || rc == DDS_RETCODE_TRY_AGAIN || (rc == DDS_RETCODE_NOT_ALLOWED && retry-- > 0));
if (ret > 0 && gv->pcap_fp)
{
struct sockaddr_storage sa;
union addr sa;
socklen_t alen = sizeof (sa);
if (ddsrt_getsockname (((ddsi_udp_conn_t) conn)->m_sock, (struct sockaddr *) &sa, &alen) != DDS_RETCODE_OK)
if (ddsrt_getsockname (conn->m_sock, &sa.a, &alen) != DDS_RETCODE_OK)
memset(&sa, 0, sizeof(sa));
write_pcap_sent (conn->m_base.gv, ddsrt_time_wallclock (), &sa, &msg, (size_t) ret);
write_pcap_sent (gv, ddsrt_time_wallclock (), &sa.x, &msg, (size_t) ret);
}
else if (rc != DDS_RETCODE_OK &&
rc != DDS_RETCODE_NOT_ALLOWED &&
rc != DDS_RETCODE_NO_CONNECTION)
else if (rc != DDS_RETCODE_OK && rc != DDS_RETCODE_NOT_ALLOWED && rc != DDS_RETCODE_NO_CONNECTION)
{
DDS_CERROR(&conn->m_base.gv->logconfig, "ddsi_udp_conn_write failed with retcode %"PRId32"\n", rc);
GVERROR ("ddsi_udp_conn_write failed with retcode %"PRId32"\n", rc);
}
return (rc == DDS_RETCODE_OK ? ret : -1);
return (rc == DDS_RETCODE_OK) ? ret : -1;
}
static void ddsi_udp_disable_multiplexing (ddsi_tran_conn_t base)
static void ddsi_udp_disable_multiplexing (ddsi_tran_conn_t conn_cmn)
{
#if defined _WIN32 && !defined WINCE
ddsi_udp_conn_t uc = (ddsi_udp_conn_t) base;
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
uint32_t zero = 0, dummy;
WSAEventSelect(uc->m_sock, 0, 0);
WSAIoctl(uc->m_sock, FIONBIO, &zero,sizeof(zero), NULL,0, &dummy, NULL,NULL);
WSAEventSelect (conn->m_sock, 0, 0);
WSAIoctl (conn->m_sock, FIONBIO, &zero,sizeof(zero), NULL,0, &dummy, NULL,NULL);
#else
(void)base;
(void) conn_cmn;
#endif
}
static ddsrt_socket_t ddsi_udp_conn_handle (ddsi_tran_base_t base)
static ddsrt_socket_t ddsi_udp_conn_handle (ddsi_tran_base_t conn_cmn)
{
return ((ddsi_udp_conn_t) base)->m_sock;
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
return conn->m_sock;
}
static bool ddsi_udp_supports (const struct ddsi_tran_factory *fact, int32_t kind)
@ -187,162 +200,171 @@ static bool ddsi_udp_supports (const struct ddsi_tran_factory *fact, int32_t kin
return kind == fact->m_kind || (kind == NN_LOCATOR_KIND_UDPv4MCGEN && fact->m_kind == NN_LOCATOR_KIND_UDPv4);
}
static int ddsi_udp_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t base, nn_locator_t *loc)
static int ddsi_udp_conn_locator (ddsi_tran_factory_t fact, ddsi_tran_base_t conn_cmn, nn_locator_t *loc)
{
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
int ret = -1;
ddsi_udp_conn_t uc = (ddsi_udp_conn_t) base;
if (uc->m_sock != DDSRT_INVALID_SOCKET)
if (conn->m_sock != DDSRT_INVALID_SOCKET)
{
loc->kind = fact->m_kind;
loc->port = uc->m_base.m_base.m_port;
memcpy(loc->address, uc->m_base.m_base.gv->extloc.address, sizeof (loc->address));
loc->port = conn->m_base.m_base.m_port;
memcpy (loc->address, conn->m_base.m_base.gv->extloc.address, sizeof (loc->address));
ret = 0;
}
return ret;
}
static uint16_t get_socket_port (struct ddsi_domaingv * const gv, ddsrt_socket_t sock)
static uint16_t get_socket_port (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock)
{
dds_return_t ret;
struct sockaddr_storage addr;
union addr addr;
socklen_t addrlen = sizeof (addr);
ret = ddsrt_getsockname (sock, (struct sockaddr *)&addr, &addrlen);
ret = ddsrt_getsockname (sock, &addr.a, &addrlen);
if (ret != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_get_socket_port: getsockname returned %"PRId32"\n", ret);
return 0;
}
return ddsrt_sockaddr_get_port ((struct sockaddr *)&addr);
return ddsrt_sockaddr_get_port (&addr.a);
}
static dds_return_t set_dont_route (struct ddsi_domaingv * const gv, ddsrt_socket_t socket, bool ipv6)
static dds_return_t set_dont_route (struct ddsi_domaingv const * const gv, ddsrt_socket_t socket, bool ipv6)
{
dds_return_t rc;
#if DDSRT_HAVE_IPV6
if (ipv6)
{
const unsigned ipv6Flag = 1;
if ((rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &ipv6Flag, sizeof (ipv6Flag))) != DDS_RETCODE_OK)
const unsigned uone = 1;
if ((rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, IPV6_UNICAST_HOPS, &uone, sizeof (uone))) != DDS_RETCODE_OK)
GVERROR ("ddsi_udp_create_conn: set IPV6_UNICAST_HOPS = 1 failed: %s\n", dds_strretcode (rc));
return rc;
}
#else
(void) ipv6;
#endif
const int one = 1;
if ((rc = ddsrt_setsockopt (socket, SOL_SOCKET, SO_DONTROUTE, (char *) &one, sizeof (one))) != DDS_RETCODE_OK)
if ((rc = ddsrt_setsockopt (socket, SOL_SOCKET, SO_DONTROUTE, &one, sizeof (one))) != DDS_RETCODE_OK)
GVERROR ("ddsi_udp_create_conn: set SO_DONTROUTE = 1 failed: %s\n", dds_strretcode (rc));
return rc;
}
static dds_return_t set_rcvbuf (struct ddsi_domaingv * const gv, ddsrt_socket_t sock, const struct config_maybe_uint32 *min_size)
static dds_return_t set_rcvbuf (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, const struct config_maybe_uint32 *min_size)
{
uint32_t ReceiveBufferSize;
socklen_t optlen = (socklen_t) sizeof (ReceiveBufferSize);
uint32_t size;
socklen_t optlen = (socklen_t) sizeof (size);
uint32_t socket_min_rcvbuf_size;
dds_return_t rc;
if (min_size->isdefault)
socket_min_rcvbuf_size = 1048576;
else
socket_min_rcvbuf_size = min_size->value;
rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, (char *) &ReceiveBufferSize, &optlen);
socket_min_rcvbuf_size = min_size->isdefault ? 1048576 : min_size->value;
rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, &size, &optlen);
if (rc == DDS_RETCODE_BAD_PARAMETER)
{
/* not all stacks support getting/setting RCVBUF */
GVLOG (DDS_LC_CONFIG, "cannot retrieve socket receive buffer size\n");
return DDS_RETCODE_OK;
}
if (rc != DDS_RETCODE_OK)
else if (rc != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: get SO_RCVBUF failed: %s\n", dds_strretcode (rc));
goto fail;
return rc;
}
if (ReceiveBufferSize < socket_min_rcvbuf_size)
if (size < socket_min_rcvbuf_size)
{
/* make sure the receive buffersize is at least the minimum required */
ReceiveBufferSize = socket_min_rcvbuf_size;
(void) ddsrt_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, (const char *) &ReceiveBufferSize, sizeof (ReceiveBufferSize));
size = socket_min_rcvbuf_size;
(void) ddsrt_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &size, sizeof (size));
/* We don't check the return code from setsockopt, because some O/Ss tend
to silently cap the buffer size. The only way to make sure is to read
the option value back and check it is now set correctly. */
if ((rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, (char *) &ReceiveBufferSize, &optlen)) != DDS_RETCODE_OK)
if ((rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_RCVBUF, &size, &optlen)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: get SO_RCVBUF failed: %s\n", dds_strretcode (rc));
goto fail;
return rc;
}
if (ReceiveBufferSize >= socket_min_rcvbuf_size)
GVLOG (DDS_LC_CONFIG, "socket receive buffer size set to %"PRIu32" bytes\n", ReceiveBufferSize);
else
GVLOG (min_size->isdefault ? DDS_LC_CONFIG : DDS_LC_ERROR,
if (size >= socket_min_rcvbuf_size)
GVLOG (DDS_LC_CONFIG, "socket receive buffer size set to %"PRIu32" bytes\n", size);
else if (min_size->isdefault)
GVLOG (DDS_LC_CONFIG,
"failed to increase socket receive buffer size to %"PRIu32" bytes, continuing with %"PRIu32" bytes\n",
socket_min_rcvbuf_size, ReceiveBufferSize);
socket_min_rcvbuf_size, size);
else
{
/* If the configuration states it must be >= X, then error out if the
kernel doesn't give us at least X */
GVLOG (DDS_LC_CONFIG | DDS_LC_ERROR,
"failed to increase socket receive buffer size to %"PRIu32" bytes, maximum is %"PRIu32" bytes\n",
socket_min_rcvbuf_size, size);
rc = DDS_RETCODE_NOT_ENOUGH_SPACE;
}
}
fail:
return rc;
}
static dds_return_t set_sndbuf (struct ddsi_domaingv * const gv, ddsrt_socket_t sock, uint32_t min_size)
static dds_return_t set_sndbuf (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, uint32_t min_size)
{
unsigned SendBufferSize;
socklen_t optlen = (socklen_t) sizeof(SendBufferSize);
unsigned size;
socklen_t optlen = (socklen_t) sizeof(size);
dds_return_t rc;
rc = ddsrt_getsockopt(sock, SOL_SOCKET, SO_SNDBUF,(char *)&SendBufferSize, &optlen);
rc = ddsrt_getsockopt (sock, SOL_SOCKET, SO_SNDBUF, &size, &optlen);
if (rc == DDS_RETCODE_BAD_PARAMETER)
{
/* not all stacks support getting/setting SNDBUF */
GVLOG (DDS_LC_CONFIG, "cannot retrieve socket send buffer size\n");
return DDS_RETCODE_OK;
}
if (rc != DDS_RETCODE_OK)
else if (rc != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: get SO_SNDBUF failed: %s\n", dds_strretcode (rc));
goto fail;
return rc;
}
if (SendBufferSize < min_size)
if (size < min_size)
{
/* make sure the send buffersize is at least the minimum required */
SendBufferSize = min_size;
if ((rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, (const char *) &SendBufferSize, sizeof (SendBufferSize))) != DDS_RETCODE_OK)
size = min_size;
if ((rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &size, sizeof (size))) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: set SO_SNDBUF failed: %s\n", dds_strretcode (rc));
goto fail;
return rc;
}
}
fail:
return rc;
return DDS_RETCODE_OK;
}
static dds_return_t set_mc_options_transmit_ipv6 (struct ddsi_domaingv * const gv, ddsrt_socket_t sock)
static dds_return_t set_mc_options_transmit_ipv6 (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock)
{
/* Function is a never-called no-op if IPv6 is not supported to keep the call-site a bit cleaner */
#if DDSRT_HAVE_IPV6
const unsigned interfaceNo = gv->interfaceNo;
const unsigned ifno = gv->interfaceNo;
const unsigned ttl = (unsigned) gv->config.multicast_ttl;
const unsigned loop = (unsigned) !!gv->config.enableMulticastLoopback;
dds_return_t rc;
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_IF, &interfaceNo, sizeof (interfaceNo))) != DDS_RETCODE_OK)
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_IF, &ifno, sizeof (ifno))) != DDS_RETCODE_OK) {
GVERROR ("ddsi_udp_create_conn: set IPV6_MULTICAST_IF failed: %s\n", dds_strretcode (rc));
else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, (char *) &ttl, sizeof (ttl))) != DDS_RETCODE_OK)
return rc;
}
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &ttl, sizeof (ttl))) != DDS_RETCODE_OK) {
GVERROR ("ddsi_udp_create_conn: set IPV6_MULTICAST_HOPS failed: %s\n", dds_strretcode (rc));
else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK)
return rc;
}
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK) {
GVERROR ("ddsi_udp_create_conn: set IPV6_MULTICAST_LOOP failed: %s\n", dds_strretcode (rc));
return rc;
}
return DDS_RETCODE_OK;
#else
(void) gv; (void) sock;
return DDS_RETCODE_ERROR;
#endif
}
static dds_return_t set_mc_options_transmit_ipv4_if (struct ddsi_domaingv * const gv, ddsrt_socket_t sock)
static dds_return_t set_mc_options_transmit_ipv4_if (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock)
{
#if (defined(__linux) || defined(__APPLE__)) && !LWIP_SOCKET
if (gv->config.use_multicast_if_mreqn)
@ -362,23 +384,29 @@ static dds_return_t set_mc_options_transmit_ipv4_if (struct ddsi_domaingv * cons
return ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_IF, gv->ownloc.address + 12, 4);
}
static dds_return_t set_mc_options_transmit_ipv4 (struct ddsi_domaingv * const gv, ddsrt_socket_t sock)
static dds_return_t set_mc_options_transmit_ipv4 (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock)
{
const unsigned char ttl = (unsigned char) gv->config.multicast_ttl;
const unsigned char loop = (unsigned char) !!gv->config.enableMulticastLoopback;
dds_return_t rc;
if ((rc = set_mc_options_transmit_ipv4_if (gv, sock)) != DDS_RETCODE_OK)
if ((rc = set_mc_options_transmit_ipv4_if (gv, sock)) != DDS_RETCODE_OK) {
GVERROR ("ddsi_udp_create_conn: set IP_MULTICAST_IF failed: %s\n", dds_strretcode (rc));
else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_TTL, (char *) &ttl, sizeof (ttl))) != DDS_RETCODE_OK)
return rc;
}
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof (ttl))) != DDS_RETCODE_OK) {
GVERROR ("ddsi_udp_create_conn: set IP_MULTICAST_TTL failed: %s\n", dds_strretcode (rc));
else if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK)
return rc;
}
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof (loop))) != DDS_RETCODE_OK) {
GVERROR ("ddsi_udp_create_conn: set IP_MULTICAST_LOOP failed: %s\n", dds_strretcode (rc));
return rc;
}
return DDS_RETCODE_OK;
}
static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos)
static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos)
{
struct ddsi_domaingv * const gv = fact->gv;
struct ddsi_domaingv const * const gv = fact->gv;
const int one = 1;
dds_return_t rc;
@ -406,31 +434,37 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
}
assert (purpose_str != NULL);
{
int af = AF_UNSPEC;
union addr socketname;
nn_locator_t ownloc_w_port = gv->ownloc;
assert (ownloc_w_port.port == NN_LOCATOR_PORT_INVALID);
if (port) {
/* PORT_INVALID maps to 0 in ipaddr_from_loc */
ownloc_w_port.port = port;
}
ddsi_ipaddr_from_loc (&socketname.x, &ownloc_w_port);
switch (fact->m_kind)
{
case NN_LOCATOR_KIND_UDPv4:
af = AF_INET;
if (bind_to_any)
socketname.a4.sin_addr.s_addr = htonl (INADDR_ANY);
break;
#if DDSRT_HAVE_IPV6
case NN_LOCATOR_KIND_UDPv6:
af = AF_INET6;
ipv6 = true;
if (bind_to_any)
socketname.a6.sin6_addr = ddsrt_in6addr_any;
break;
#endif
default:
DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->m_kind);
}
assert (af != AF_UNSPEC);
if ((rc = ddsrt_socket (&sock, af, SOCK_DGRAM, 0)) != DDS_RETCODE_OK)
if ((rc = ddsrt_socket (&sock, socketname.a.sa_family, SOCK_DGRAM, 0)) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: failed to create socket: %s\n", dds_strretcode (rc));
goto fail;
}
}
if (reuse_addr && (rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, (char *) &one, sizeof (one))) != DDS_RETCODE_OK)
if (reuse_addr && (rc = ddsrt_setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof (one))) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: failed to enable address reuse: %s\n", dds_strretcode (rc));
if (rc != DDS_RETCODE_BAD_PARAMETER)
@ -448,55 +482,31 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
if (gv->config.dontRoute && (rc = set_dont_route (gv, sock, ipv6)) != DDS_RETCODE_OK)
goto fail_w_socket;
if ((rc = ddsrt_bind (sock, &socketname.a, ddsrt_sockaddr_get_size (&socketname.a))) != DDS_RETCODE_OK)
{
union {
struct sockaddr_storage x;
struct sockaddr_in a4;
struct sockaddr_in6 a6;
} socketname;
nn_locator_t ownloc_w_port = gv->ownloc;
assert (ownloc_w_port.port == NN_LOCATOR_PORT_INVALID);
if (port) {
/* PORT_INVALID maps to 0 in ipaddr_from_loc */
ownloc_w_port.port = port;
}
ddsi_ipaddr_from_loc (&socketname.x, &ownloc_w_port);
if (bind_to_any)
{
switch (fact->m_kind)
{
case NN_LOCATOR_KIND_UDPv4:
socketname.a4.sin_addr.s_addr = htonl (INADDR_ANY);
break;
#if DDSRT_HAVE_IPV6
case NN_LOCATOR_KIND_UDPv6:
socketname.a6.sin6_addr = ddsrt_in6addr_any;
break;
#endif
default:
DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->m_kind);
}
}
if ((rc = ddsrt_bind (sock, (struct sockaddr *) &socketname, ddsrt_sockaddr_get_size ((struct sockaddr *) &socketname))) != DDS_RETCODE_OK)
{
/* PRECONDITION_NOT_MET (= EADDRINUSE) is expected if reuse_addr isn't set, should be handled at
a higher level and therefore needs to return a specific error message */
if (!reuse_addr && rc == DDS_RETCODE_PRECONDITION_NOT_MET)
goto fail_addrinuse;
char buf[DDSI_LOCATORSTRLEN];
if (bind_to_any)
snprintf (buf, sizeof (buf), "ANY:%"PRIu32, port);
else
ddsi_locator_to_string (buf, sizeof (buf), &ownloc_w_port);
GVERROR ("ddsi_udp_create_conn: failed to bind to %s: %s\n", buf, dds_strretcode (rc));
GVERROR ("ddsi_udp_create_conn: failed to bind to %s: %s\n", buf,
(rc == DDS_RETCODE_PRECONDITION_NOT_MET) ? "address in use" : dds_strretcode (rc));
goto fail_w_socket;
}
}
rc = ipv6 ? set_mc_options_transmit_ipv6 (gv, sock) : set_mc_options_transmit_ipv4 (gv, sock);
if (rc != DDS_RETCODE_OK)
goto fail_w_socket;
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
if ((qos->m_diffserv != 0) && (fact->m_kind == NN_LOCATOR_KIND_UDPv4))
if (qos->m_diffserv != 0 && fact->m_kind == NN_LOCATOR_KIND_UDPv4)
{
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_TOS, (char *) &qos->m_diffserv, sizeof (qos->m_diffserv))) != DDS_RETCODE_OK)
if ((rc = ddsrt_setsockopt (sock, IPPROTO_IP, IP_TOS, &qos->m_diffserv, sizeof (qos->m_diffserv))) != DDS_RETCODE_OK)
{
GVERROR ("ddsi_udp_create_conn: set diffserv retcode %"PRId32"\n", rc);
goto fail_w_socket;
@ -504,49 +514,52 @@ static ddsi_tran_conn_t ddsi_udp_create_conn (ddsi_tran_factory_t fact, uint32_t
}
#endif
ddsi_udp_conn_t uc = (ddsi_udp_conn_t) ddsrt_malloc (sizeof (*uc));
memset (uc, 0, sizeof (*uc));
ddsi_udp_conn_t conn = ddsrt_malloc (sizeof (*conn));
memset (conn, 0, sizeof (*conn));
uc->m_sock = sock;
uc->m_diffserv = qos->m_diffserv;
conn->m_sock = sock;
conn->m_diffserv = qos->m_diffserv;
#if defined _WIN32 && !defined WINCE
uc->m_sockEvent = WSACreateEvent();
WSAEventSelect(uc->m_sock, uc->m_sockEvent, FD_WRITE);
conn->m_sockEvent = WSACreateEvent ();
WSAEventSelect (conn->m_sock, conn->m_sockEvent, FD_WRITE);
#endif
ddsi_factory_conn_init (fact, &uc->m_base);
uc->m_base.m_base.m_port = get_socket_port (gv, sock);
uc->m_base.m_base.m_trantype = DDSI_TRAN_CONN;
uc->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC);
uc->m_base.m_base.m_handle_fn = ddsi_udp_conn_handle;
ddsi_factory_conn_init (fact, &conn->m_base);
conn->m_base.m_base.m_port = get_socket_port (gv, sock);
conn->m_base.m_base.m_trantype = DDSI_TRAN_CONN;
conn->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC);
conn->m_base.m_base.m_handle_fn = ddsi_udp_conn_handle;
uc->m_base.m_read_fn = ddsi_udp_conn_read;
uc->m_base.m_write_fn = ddsi_udp_conn_write;
uc->m_base.m_disable_multiplexing_fn = ddsi_udp_disable_multiplexing;
uc->m_base.m_locator_fn = ddsi_udp_conn_locator;
conn->m_base.m_read_fn = ddsi_udp_conn_read;
conn->m_base.m_write_fn = ddsi_udp_conn_write;
conn->m_base.m_disable_multiplexing_fn = ddsi_udp_disable_multiplexing;
conn->m_base.m_locator_fn = ddsi_udp_conn_locator;
GVTRACE ("ddsi_udp_create_conn %s socket %"PRIdSOCK" port %"PRIu32"\n", purpose_str, uc->m_sock, uc->m_base.m_base.m_port);
return &uc->m_base;
GVTRACE ("ddsi_udp_create_conn %s socket %"PRIdSOCK" port %"PRIu32"\n", purpose_str, conn->m_sock, conn->m_base.m_base.m_port);
*conn_out = &conn->m_base;
return DDS_RETCODE_OK;
fail_w_socket:
ddsrt_close (sock);
fail:
if (fact->gv->config.participantIndex != PARTICIPANT_INDEX_AUTO)
GVERROR ("ddsi_udp_create_conn: failed for %s port %"PRIu32"\n", purpose_str, port);
return NULL;
return DDS_RETCODE_ERROR;
fail_addrinuse:
ddsrt_close (sock);
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
static int joinleave_asm_mcgroup (ddsrt_socket_t socket, int join, const nn_locator_t *mcloc, const struct nn_interface *interf)
{
dds_return_t rc;
struct sockaddr_storage mcip;
ddsi_ipaddr_from_loc(&mcip, mcloc);
union addr mcip;
ddsi_ipaddr_from_loc (&mcip.x, mcloc);
#if DDSRT_HAVE_IPV6
if (mcloc->kind == NN_LOCATOR_KIND_UDPv6)
{
struct ipv6_mreq ipv6mreq;
memset (&ipv6mreq, 0, sizeof (ipv6mreq));
memcpy (&ipv6mreq.ipv6mr_multiaddr, &((struct sockaddr_in6 *) &mcip)->sin6_addr, sizeof (ipv6mreq.ipv6mr_multiaddr));
memcpy (&ipv6mreq.ipv6mr_multiaddr, &mcip.a6, sizeof (ipv6mreq.ipv6mr_multiaddr));
ipv6mreq.ipv6mr_interface = interf ? interf->if_index : 0;
rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, join ? IPV6_JOIN_GROUP : IPV6_LEAVE_GROUP, &ipv6mreq, sizeof (ipv6mreq));
}
@ -554,12 +567,12 @@ static int joinleave_asm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca
#endif
{
struct ip_mreq mreq;
mreq.imr_multiaddr = ((struct sockaddr_in *) &mcip)->sin_addr;
mreq.imr_multiaddr = mcip.a4.sin_addr;
if (interf)
memcpy (&mreq.imr_interface, interf->loc.address + 12, sizeof (mreq.imr_interface));
else
mreq.imr_interface.s_addr = htonl (INADDR_ANY);
rc = ddsrt_setsockopt (socket, IPPROTO_IP, join ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, (char *) &mreq, sizeof (mreq));
rc = ddsrt_setsockopt (socket, IPPROTO_IP, join ? IP_ADD_MEMBERSHIP : IP_DROP_MEMBERSHIP, &mreq, sizeof (mreq));
}
return (rc == DDS_RETCODE_OK) ? 0 : -1;
}
@ -568,17 +581,17 @@ static int joinleave_asm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca
static int joinleave_ssm_mcgroup (ddsrt_socket_t socket, int join, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf)
{
dds_return_t rc;
struct sockaddr_storage mcip, srcip;
ddsi_ipaddr_from_loc(&mcip, mcloc);
ddsi_ipaddr_from_loc(&srcip, srcloc);
union addr mcip, srcip;
ddsi_ipaddr_from_loc (&mcip.x, mcloc);
ddsi_ipaddr_from_loc (&srcip.x, srcloc);
#if DDSRT_HAVE_IPV6
if (mcloc->kind == NN_LOCATOR_KIND_UDPv6)
{
struct group_source_req gsr;
memset (&gsr, 0, sizeof (gsr));
gsr.gsr_interface = interf ? interf->if_index : 0;
memcpy (&gsr.gsr_group, &mcip, sizeof (gsr.gsr_group));
memcpy (&gsr.gsr_source, &srcip, sizeof (gsr.gsr_source));
memcpy (&gsr.gsr_group, &mcip.a6, sizeof (gsr.gsr_group));
memcpy (&gsr.gsr_source, &srcip.a6, sizeof (gsr.gsr_source));
rc = ddsrt_setsockopt (socket, IPPROTO_IPV6, join ? MCAST_JOIN_SOURCE_GROUP : MCAST_LEAVE_SOURCE_GROUP, &gsr, sizeof (gsr));
}
else
@ -586,8 +599,8 @@ static int joinleave_ssm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca
{
struct ip_mreq_source mreq;
memset (&mreq, 0, sizeof (mreq));
mreq.imr_sourceaddr = ((struct sockaddr_in *) &srcip)->sin_addr;
mreq.imr_multiaddr = ((struct sockaddr_in *) &mcip)->sin_addr;
mreq.imr_sourceaddr = srcip.a4.sin_addr;
mreq.imr_multiaddr = mcip.a4.sin_addr;
if (interf)
memcpy (&mreq.imr_interface, interf->loc.address + 12, sizeof (mreq.imr_interface));
else
@ -598,43 +611,42 @@ static int joinleave_ssm_mcgroup (ddsrt_socket_t socket, int join, const nn_loca
}
#endif
static int ddsi_udp_join_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf)
static int ddsi_udp_join_mc (ddsi_tran_conn_t conn_cmn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf)
{
ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn;
(void)srcloc;
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
(void) srcloc;
#ifdef DDSI_INCLUDE_SSM
if (srcloc)
return joinleave_ssm_mcgroup(uc->m_sock, 1, srcloc, mcloc, interf);
return joinleave_ssm_mcgroup (conn->m_sock, 1, srcloc, mcloc, interf);
else
#endif
return joinleave_asm_mcgroup(uc->m_sock, 1, mcloc, interf);
return joinleave_asm_mcgroup (conn->m_sock, 1, mcloc, interf);
}
static int ddsi_udp_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf)
static int ddsi_udp_leave_mc (ddsi_tran_conn_t conn_cmn, const nn_locator_t *srcloc, const nn_locator_t *mcloc, const struct nn_interface *interf)
{
ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn;
(void)srcloc;
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
(void) srcloc;
#ifdef DDSI_INCLUDE_SSM
if (srcloc)
return joinleave_ssm_mcgroup(uc->m_sock, 0, srcloc, mcloc, interf);
return joinleave_ssm_mcgroup (conn->m_sock, 0, srcloc, mcloc, interf);
else
#endif
return joinleave_asm_mcgroup(uc->m_sock, 0, mcloc, interf);
return joinleave_asm_mcgroup (conn->m_sock, 0, mcloc, interf);
}
static void ddsi_udp_release_conn (ddsi_tran_conn_t conn)
static void ddsi_udp_release_conn (ddsi_tran_conn_t conn_cmn)
{
ddsi_udp_conn_t uc = (ddsi_udp_conn_t) conn;
DDS_CTRACE (&conn->m_base.gv->logconfig,
"ddsi_udp_release_conn %s socket %"PRIdSOCK" port %"PRIu32"\n",
conn->m_base.m_multicast ? "multicast" : "unicast",
uc->m_sock,
uc->m_base.m_base.m_port);
ddsrt_close (uc->m_sock);
ddsi_udp_conn_t conn = (ddsi_udp_conn_t) conn_cmn;
struct ddsi_domaingv const * const gv = conn->m_base.m_base.gv;
GVTRACE ("ddsi_udp_release_conn %s socket %"PRIdSOCK" port %"PRIu32"\n",
conn_cmn->m_base.m_multicast ? "multicast" : "unicast",
conn->m_sock, conn->m_base.m_base.m_port);
ddsrt_close (conn->m_sock);
#if defined _WIN32 && !defined WINCE
WSACloseEvent(uc->m_sockEvent);
WSACloseEvent (conn->m_sockEvent);
#endif
ddsrt_free (conn);
ddsrt_free (conn_cmn);
}
static int ddsi_udp_is_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc)
@ -720,7 +732,8 @@ static char *ddsi_udp_locator_to_string (char *dst, size_t sizeof_dst, const nn_
static void ddsi_udp_fini (ddsi_tran_factory_t fact)
{
DDS_CLOG (DDS_LC_CONFIG, &fact->gv->logconfig, "udp finalized\n");
struct ddsi_domaingv const * const gv = fact->gv;
GVLOG (DDS_LC_CONFIG, "udp finalized\n");
ddsrt_free (fact);
}
@ -730,7 +743,7 @@ static int ddsi_udp_is_valid_port (ddsi_tran_factory_t fact, uint32_t port)
return (port <= 65535);
}
int ddsi_udp_init (struct ddsi_domaingv *gv)
int ddsi_udp_init (struct ddsi_domaingv*gv)
{
struct ddsi_tran_factory *fact = ddsrt_malloc (sizeof (*fact));
memset (fact, 0, sizeof (*fact));

View file

@ -369,8 +369,7 @@ struct debug_monitor *new_debug_monitor (struct ddsi_domaingv *gv, int32_t port)
goto err_invalid_port;
}
dm->servsock = ddsi_factory_create_listener (dm->tran_factory, (uint32_t) port, NULL);
if (dm->servsock == NULL)
if (ddsi_factory_create_listener (&dm->servsock, dm->tran_factory, (uint32_t) port, NULL) != DDS_RETCODE_OK)
{
GVWARNING ("debmon: can't create socket\n");
goto err_servsock;

View file

@ -587,6 +587,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain
struct participant *pp;
ddsi_guid_t subguid, group_guid;
struct whc_writer_info *wrinfo;
ddsi_tran_conn_t ppconn;
/* no reserved bits may be set */
assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0);
@ -603,6 +604,18 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain
if (entidx_lookup_participant_guid (gv->entity_index, ppguid) != NULL)
return DDS_RETCODE_PRECONDITION_NOT_MET;
if (gv->config.many_sockets_mode != MSM_MANY_UNICAST)
ppconn = NULL;
else
{
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 };
if (ddsi_factory_create_conn (&ppconn, gv->m_factory, 0, &qos) != DDS_RETCODE_OK)
{
GVERROR ("new_participant("PGUIDFMT", %x) failed: could not create network endpoint\n", PGUID (*ppguid), flags);
return DDS_RETCODE_OUT_OF_RESOURCES;
}
}
if (gv->config.max_participants == 0)
{
ddsrt_mutex_lock (&gv->participant_set_lock);
@ -621,6 +634,8 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain
{
ddsrt_mutex_unlock (&gv->participant_set_lock);
GVERROR ("new_participant("PGUIDFMT", %x) failed: max participants reached\n", PGUID (*ppguid), flags);
if (ppconn)
ddsi_conn_free (ppconn);
return DDS_RETCODE_OUT_OF_RESOURCES;
}
}
@ -649,16 +664,9 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct ddsi_domain
GVLOGDISC ("}\n");
}
pp->m_conn = ppconn;
if (gv->config.many_sockets_mode == MSM_MANY_UNICAST)
{
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 };
pp->m_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos);
ddsi_conn_locator (pp->m_conn, &pp->m_locator);
}
else
{
pp->m_conn = NULL;
}
ddsrt_fibheap_init (&lease_fhdef_pp, &pp->leaseheap_man);
ddsrt_atomic_stvoidp (&pp->minl_man, NULL);

View file

@ -72,13 +72,16 @@ static void add_peer_addresses (const struct ddsi_domaingv *gv, struct addrset *
}
enum make_uc_sockets_ret {
MUSRET_SUCCESS,
MUSRET_INVALID_PORTS,
MUSRET_NOSOCKET
MUSRET_SUCCESS, /* unicast socket(s) created */
MUSRET_INVALID_PORTS, /* specified port numbers are invalid */
MUSRET_PORTS_IN_USE, /* ports were in use, keep trying */
MUSRET_ERROR /* generic error, no use continuing */
};
static enum make_uc_sockets_ret make_uc_sockets (struct ddsi_domaingv *gv, uint32_t * pdisc, uint32_t * pdata, int ppid)
{
dds_return_t rc;
if (gv->config.many_sockets_mode == MSM_NO_UNICAST)
{
assert (ppid == PARTICIPANT_INDEX_NONE);
@ -97,33 +100,29 @@ static enum make_uc_sockets_ret make_uc_sockets (struct ddsi_domaingv *gv, uint3
return MUSRET_INVALID_PORTS;
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_RECV_UC, .m_diffserv = 0 };
gv->disc_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdisc, &qos);
if (gv->disc_conn_uc)
{
/* Check not configured to use same unicast port for data and discovery */
rc = ddsi_factory_create_conn (&gv->disc_conn_uc, gv->m_factory, *pdisc, &qos);
if (rc != DDS_RETCODE_OK)
goto fail_disc;
if (*pdata != 0 && (*pdata != *pdisc))
{
gv->data_conn_uc = ddsi_factory_create_conn (gv->m_factory, *pdata, &qos);
}
else
{
if (*pdata == 0 || *pdata == *pdisc)
gv->data_conn_uc = gv->disc_conn_uc;
}
if (gv->data_conn_uc == NULL)
{
ddsi_conn_free (gv->disc_conn_uc);
gv->disc_conn_uc = NULL;
}
else
{
/* Set unicast locators */
rc = ddsi_factory_create_conn (&gv->data_conn_uc, gv->m_factory, *pdata, &qos);
if (rc != DDS_RETCODE_OK)
goto fail_data;
}
ddsi_conn_locator (gv->disc_conn_uc, &gv->loc_meta_uc);
ddsi_conn_locator (gv->data_conn_uc, &gv->loc_default_uc);
}
}
return MUSRET_SUCCESS;
return gv->data_conn_uc ? MUSRET_SUCCESS : MUSRET_NOSOCKET;
fail_data:
ddsi_conn_free (gv->disc_conn_uc);
gv->disc_conn_uc = NULL;
fail_disc:
if (rc == DDS_RETCODE_PRECONDITION_NOT_MET)
return MUSRET_PORTS_IN_USE;
return MUSRET_ERROR;
}
static void make_builtin_endpoint_xqos (dds_qos_t *q, const dds_qos_t *template)
@ -675,7 +674,7 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
gv->config.extDomainId.value, port);
goto err_disc;
}
if ((disc = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL)
if (ddsi_factory_create_conn (&disc, gv->m_factory, port, &qos) != DDS_RETCODE_OK)
goto err_disc;
if (gv->config.many_sockets_mode == MSM_NO_UNICAST)
{
@ -691,11 +690,9 @@ int create_multicast_sockets (struct ddsi_domaingv *gv)
gv->config.extDomainId.value, port);
goto err_disc;
}
if ((data = ddsi_factory_create_conn (gv->m_factory, port, &qos)) == NULL)
{
if (ddsi_factory_create_conn (&data, gv->m_factory, port, &qos) != DDS_RETCODE_OK)
goto err_data;
}
}
gv->disc_conn_mc = disc;
gv->data_conn_mc = data;
@ -1192,18 +1189,21 @@ int rtps_init (struct ddsi_domaingv *gv)
GVERROR ("Failed to create unicast sockets for domain %"PRIu32" participant index %d: resulting port numbers (%"PRIu32", %"PRIu32") are out of range\n",
gv->config.extDomainId.value, gv->config.participantIndex, port_disc_uc, port_data_uc);
goto err_unicast_sockets;
case MUSRET_NOSOCKET:
case MUSRET_PORTS_IN_USE:
GVERROR ("rtps_init: failed to create unicast sockets for domain %"PRId32" participant index %d (ports %"PRIu32", %"PRIu32")\n", gv->config.extDomainId.value, gv->config.participantIndex, port_disc_uc, port_data_uc);
goto err_unicast_sockets;
case MUSRET_ERROR:
/* something bad happened; assume make_uc_sockets logged the error */
goto err_unicast_sockets;
}
}
else if (gv->config.participantIndex == PARTICIPANT_INDEX_AUTO)
{
/* try to find a free one, and update gv->config.participantIndex */
enum make_uc_sockets_ret musret = MUSRET_NOSOCKET;
enum make_uc_sockets_ret musret = MUSRET_PORTS_IN_USE;
int ppid;
GVLOG (DDS_LC_CONFIG, "rtps_init: trying to find a free participant index\n");
for (ppid = 0; ppid <= gv->config.maxAutoParticipantIndex && musret == MUSRET_NOSOCKET; ppid++)
for (ppid = 0; ppid <= gv->config.maxAutoParticipantIndex && musret == MUSRET_PORTS_IN_USE; ppid++)
{
musret = make_uc_sockets (gv, &port_disc_uc, &port_data_uc, ppid);
switch (musret)
@ -1214,8 +1214,11 @@ int rtps_init (struct ddsi_domaingv *gv)
GVERROR ("Failed to create unicast sockets for domain %"PRIu32" participant index %d: resulting port numbers (%"PRIu32", %"PRIu32") are out of range\n",
gv->config.extDomainId.value, ppid, port_disc_uc, port_data_uc);
goto err_unicast_sockets;
case MUSRET_NOSOCKET: /* Try next one */
case MUSRET_PORTS_IN_USE: /* Try next one */
break;
case MUSRET_ERROR:
/* something bad happened; assume make_uc_sockets logged the error */
goto err_unicast_sockets;
}
}
if (ppid > gv->config.maxAutoParticipantIndex)
@ -1235,7 +1238,7 @@ int rtps_init (struct ddsi_domaingv *gv)
if (gv->config.pcap_file && *gv->config.pcap_file)
{
gv->pcap_fp = new_pcap_file (&gv->logconfig, gv->config.pcap_file);
gv->pcap_fp = new_pcap_file (gv, gv->config.pcap_file);
if (gv->pcap_fp)
{
ddsrt_mutex_init (&gv->pcap_lock);
@ -1286,8 +1289,9 @@ int rtps_init (struct ddsi_domaingv *gv)
}
else
{
gv->listener = ddsi_factory_create_listener (gv->m_factory, (uint32_t) gv->config.tcp_port, NULL);
if (gv->listener == NULL || ddsi_listener_listen (gv->listener) != 0)
dds_return_t rc;
rc = ddsi_factory_create_listener (&gv->listener, gv->m_factory, (uint32_t) gv->config.tcp_port, NULL);
if (rc != DDS_RETCODE_OK || ddsi_listener_listen (gv->listener) != 0)
{
GVERROR ("Failed to create %s listener\n", gv->m_factory->m_typename);
if (gv->listener)
@ -1308,7 +1312,10 @@ int rtps_init (struct ddsi_domaingv *gv)
/* Create shared transmit connection */
{
const ddsi_tran_qos_t qos = { .m_purpose = DDSI_TRAN_QOS_XMIT, .m_diffserv = 0 };
gv->xmit_conn = ddsi_factory_create_conn (gv->m_factory, 0, &qos);
dds_return_t rc;
rc = ddsi_factory_create_conn (&gv->xmit_conn, gv->m_factory, 0, &qos);
if (rc != DDS_RETCODE_OK)
goto err_mc_conn;
}
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS

View file

@ -76,7 +76,7 @@ static const ipv4_hdr_t ipv4_hdr_template = {
#define IPV4_HDR_SIZE 20
#define UDP_HDR_SIZE 8
FILE *new_pcap_file (const struct ddsrt_log_cfg *logcfg, const char *name)
FILE *new_pcap_file (struct ddsi_domaingv *gv, const char *name)
{
DDSRT_WARNING_MSVC_OFF(4996);
FILE *fp;
@ -84,7 +84,7 @@ FILE *new_pcap_file (const struct ddsrt_log_cfg *logcfg, const char *name)
if ((fp = fopen (name, "wb")) == NULL)
{
DDS_CWARNING (logcfg, "packet capture disabled: file %s could not be opened for writing\n", name);
GVWARNING ("packet capture disabled: file %s could not be opened for writing\n", name);
return NULL;
}