From 7fbbc13e654d90d07ef746f0705825df3190b596 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 22 Jun 2020 16:37:49 +0200 Subject: [PATCH] Include receive buffer sizes in discovery Signed-off-by: Erik Boasson --- src/core/ddsi/include/dds/ddsi/ddsi_plist.h | 2 + src/core/ddsi/include/dds/ddsi/ddsi_tran.h | 5 ++ src/core/ddsi/include/dds/ddsi/q_entity.h | 2 + src/core/ddsi/include/dds/ddsi/q_protocol.h | 6 +- src/core/ddsi/src/ddsi_plist.c | 3 +- src/core/ddsi/src/ddsi_raweth.c | 7 ++ src/core/ddsi/src/ddsi_tcp.c | 7 ++ src/core/ddsi/src/ddsi_tran.c | 1 + src/core/ddsi/src/ddsi_udp.c | 92 ++++++++++++++------- src/core/ddsi/src/q_ddsi_discovery.c | 10 +++ src/core/ddsi/src/q_entity.c | 10 +++ 11 files changed, 110 insertions(+), 35 deletions(-) diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_plist.h b/src/core/ddsi/include/dds/ddsi/ddsi_plist.h index 635b480..04fbd60 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_plist.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_plist.h @@ -61,6 +61,7 @@ extern "C" { #define PP_PARTICIPANT_SECURITY_INFO ((uint64_t)1 << 35) #define PP_IDENTITY_STATUS_TOKEN ((uint64_t)1 << 36) #define PP_DATA_TAGS ((uint64_t)1 << 37) +#define PP_CYCLONE_RECEIVE_BUFFER_SIZE ((uint64_t)1 << 38) /* Set for unrecognized parameters that are in the reserved space or in our own vendor-specific space that have the PID_UNRECOGNIZED_INCOMPATIBLE_FLAG set (see DDSI 2.1 9.6.2.2.1) */ @@ -227,6 +228,7 @@ typedef struct ddsi_plist { #endif uint32_t domain_id; char *domain_tag; + uint32_t cyclone_receive_buffer_size; } ddsi_plist_t; diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h index 00c6e04..bd3ab22 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_tran.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_tran.h @@ -71,6 +71,7 @@ typedef int (*ddsi_tran_leave_mc_fn_t) (ddsi_tran_conn_t, const nn_locator_t *sr typedef int (*ddsi_is_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const nn_locator_t *loc); typedef int (*ddsi_is_ssm_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const nn_locator_t *loc); typedef int (*ddsi_is_valid_port_fn_t) (const struct ddsi_tran_factory *tran, uint32_t port); +typedef uint32_t (*ddsi_receive_buffer_size_fn_t) (const struct ddsi_tran_factory *fact); enum ddsi_nearby_address_result { DNAR_DISTANT, @@ -174,6 +175,7 @@ struct ddsi_tran_factory ddsi_locator_to_string_fn_t m_locator_to_string_fn; ddsi_enumerate_interfaces_fn_t m_enumerate_interfaces_fn; ddsi_is_valid_port_fn_t m_is_valid_port_fn; + ddsi_receive_buffer_size_fn_t m_receive_buffer_size_fn; /* Data */ @@ -214,6 +216,9 @@ inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int3 inline int ddsi_is_valid_port (const struct ddsi_tran_factory *factory, uint32_t port) { return factory->m_is_valid_port_fn (factory, port); } +inline uint32_t ddsi_receive_buffer_size (const struct ddsi_tran_factory *factory) { + return factory->m_receive_buffer_size_fn (factory); +} inline dds_return_t ddsi_factory_create_conn (ddsi_tran_conn_t *conn, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) { *conn = NULL; if (!ddsi_is_valid_port (factory, port)) diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 7b6ac19..ce0e19d 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -380,6 +380,7 @@ struct proxy_participant struct proxy_endpoint_common *endpoints; /* all proxy endpoints can be reached from here */ ddsrt_avl_tree_t groups; /* table of all groups (publisher, subscriber), see struct proxy_group */ seqno_t seq; /* sequence number of most recent SPDP message */ + uint32_t receive_buffer_size; /* assumed size of receive buffer, used to limit bursts involving this proxypp */ unsigned implicitly_created : 1; /* participants are implicitly created for Cloud/Fog discovered endpoints */ unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */ unsigned minimal_bes_mode: 1; @@ -470,6 +471,7 @@ struct proxy_reader { unsigned favours_ssm: 1; /* iff 1, this proxy reader favours SSM when available */ #endif ddsrt_avl_tree_t writers; /* matching LOCAL writers */ + uint32_t receive_buffer_size; /* assumed receive buffer size inherited from proxypp */ filter_fn_t filter; }; diff --git a/src/core/ddsi/include/dds/ddsi/q_protocol.h b/src/core/ddsi/include/dds/ddsi/q_protocol.h index c26ed10..fbb87cb 100644 --- a/src/core/ddsi/include/dds/ddsi/q_protocol.h +++ b/src/core/ddsi/include/dds/ddsi/q_protocol.h @@ -446,8 +446,10 @@ typedef union Submessage { #define PID_ADLINK_ENDPOINT_GID (PID_VENDORSPECIFIC_FLAG | 0x14u) #define PID_ADLINK_GROUP_GID (PID_VENDORSPECIFIC_FLAG | 0x15u) #define PID_ADLINK_EOTINFO (PID_VENDORSPECIFIC_FLAG | 0x16u) -#define PID_ADLINK_PART_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x17u); -#define PID_ADLINK_LAN_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x18u); +#define PID_ADLINK_PART_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x17u) +#define PID_ADLINK_LAN_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x18u) + +#define PID_CYCLONE_RECEIVE_BUFFER_SIZE (PID_VENDORSPECIFIC_FLAG | 0x19u) #if defined (__cplusplus) } diff --git a/src/core/ddsi/src/ddsi_plist.c b/src/core/ddsi/src/ddsi_plist.c index b5b2a4a..e8e1cfa 100644 --- a/src/core/ddsi/src/ddsi_plist.c +++ b/src/core/ddsi/src/ddsi_plist.c @@ -1646,6 +1646,7 @@ static const struct piddesc piddesc_eclipse[] = { { .desc = { XE2, XSTOP } }, 0 }, PP (ADLINK_PARTICIPANT_VERSION_INFO, adlink_participant_version_info, Xux5, XS), PP (ADLINK_TYPE_DESCRIPTION, type_description, XS), + PP (CYCLONE_RECEIVE_BUFFER_SIZE, cyclone_receive_buffer_size, Xu), { PID_SENTINEL, 0, 0, NULL, 0, 0, { .desc = { XSTOP } }, 0 } }; @@ -1720,7 +1721,7 @@ struct piddesc_index { #endif static const struct piddesc *piddesc_omg_index[DEFAULT_OMG_PIDS_ARRAY_SIZE + SECURITY_OMG_PIDS_ARRAY_SIZE]; -static const struct piddesc *piddesc_eclipse_index[19]; +static const struct piddesc *piddesc_eclipse_index[26]; static const struct piddesc *piddesc_adlink_index[19]; #define INDEX_ANY(vendorid_, tab_) [vendorid_] = { \ diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index 27e6b27..ebf6f6f 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -362,6 +362,12 @@ static int ddsi_raweth_is_valid_port (const struct ddsi_tran_factory *fact, uint return (port >= 1 && port <= 65535); } +static uint32_t ddsi_raweth_receive_buffer_size (const struct ddsi_tran_factory *fact) +{ + (void) fact; + return 0; +} + int ddsi_raweth_init (struct ddsi_domaingv *gv) { struct ddsi_tran_factory *fact = ddsrt_malloc (sizeof (*fact)); @@ -384,6 +390,7 @@ int ddsi_raweth_init (struct ddsi_domaingv *gv) fact->m_locator_to_string_fn = ddsi_raweth_to_string; fact->m_enumerate_interfaces_fn = ddsi_raweth_enumerate_interfaces; fact->m_is_valid_port_fn = ddsi_raweth_is_valid_port; + fact->m_receive_buffer_size_fn = ddsi_raweth_receive_buffer_size; ddsi_factory_add (gv, fact); GVLOG (DDS_LC_CONFIG, "raweth initialized\n"); return 0; diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index e890cc4..42627ca 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -1144,6 +1144,12 @@ static int ddsi_tcp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_ return (port <= 65535); } +static uint32_t ddsi_tcp_receive_buffer_size (const struct ddsi_tran_factory *fact) +{ + (void) fact; + return 0; +} + int ddsi_tcp_init (struct ddsi_domaingv *gv) { struct ddsi_tran_factory_tcp *fact = ddsrt_malloc (sizeof (*fact)); @@ -1169,6 +1175,7 @@ int ddsi_tcp_init (struct ddsi_domaingv *gv) fact->fact.m_is_ssm_mcaddr_fn = ddsi_tcp_is_ssm_mcaddr; fact->fact.m_is_nearby_address_fn = ddsi_tcp_is_nearby_address; fact->fact.m_is_valid_port_fn = ddsi_tcp_is_valid_port; + fact->fact.m_receive_buffer_size_fn = ddsi_tcp_receive_buffer_size; ddsi_factory_add (gv, &fact->fact); #if DDSRT_HAVE_IPV6 diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index df3aefc..65ed57d 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -27,6 +27,7 @@ extern inline uint32_t ddsi_conn_port (const struct ddsi_tran_conn *conn); extern inline dds_return_t ddsi_factory_create_listener (ddsi_tran_listener_t *listener, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos); extern inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int32_t kind); extern inline int ddsi_is_valid_port (const struct ddsi_tran_factory *factory, uint32_t port); +extern inline uint32_t ddsi_receive_buffer_size (const struct ddsi_tran_factory *factory); extern inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn); extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc); extern inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base); diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index af410d6..eae15e3 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -45,6 +45,14 @@ typedef struct ddsi_udp_conn { int m_diffserv; } *ddsi_udp_conn_t; +typedef struct ddsi_udp_tran_factory { + struct ddsi_tran_factory fact; + + // actual minimum receive buffer size in use + // atomically loaded/stored so we don't have to lie about constness + ddsrt_atomic_uint32_t receive_buf_size; +} *ddsi_udp_tran_factory_t; + static void addr_to_loc (const struct ddsi_tran_factory *tran, nn_locator_t *dst, const union addr *src) { ddsi_ipaddr_to_loc (tran, dst, &src->a, (src->a.sa_family == AF_INET) ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6); @@ -305,7 +313,7 @@ static dds_return_t set_rcvbuf (struct ddsi_domaingv const * const gv, ddsrt_soc } } - return rc; + return (rc < 0) ? rc : (size > (uint32_t) INT32_MAX) ? INT32_MAX : (int32_t) size; } static dds_return_t set_sndbuf (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, uint32_t min_size) @@ -408,9 +416,10 @@ static dds_return_t set_mc_options_transmit_ipv4 (struct ddsi_domaingv const * c return DDS_RETCODE_OK; } -static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos) +static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact_cmn, uint32_t port, const ddsi_tran_qos_t *qos) { - struct ddsi_domaingv const * const gv = fact->gv; + struct ddsi_udp_tran_factory *fact = (struct ddsi_udp_tran_factory *) fact_cmn; + struct ddsi_domaingv const * const gv = fact->fact.gv; const int one = 1; dds_return_t rc; @@ -446,7 +455,7 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_ ownloc_w_port.port = port; } ddsi_ipaddr_from_loc (&socketname.x, &ownloc_w_port); - switch (fact->m_kind) + switch (fact->fact.m_kind) { case NN_LOCATOR_KIND_UDPv4: if (bind_to_any) @@ -465,7 +474,7 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_ break; #endif default: - DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->m_kind); + DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->fact.m_kind); } if ((rc = ddsrt_socket (&sock, socketname.a.sa_family, SOCK_DGRAM, 0)) != DDS_RETCODE_OK) { @@ -484,8 +493,18 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_ } } - if (set_rcvbuf (gv, sock, &gv->config.socket_min_rcvbuf_size) != DDS_RETCODE_OK) + if ((rc = set_rcvbuf (gv, sock, &gv->config.socket_min_rcvbuf_size)) < 0) goto fail_w_socket; + if (rc > 0) { + // set fact->receive_buf_size to the smallest observed value + uint32_t old; + do { + old = ddsrt_atomic_ld32 (&fact->receive_buf_size); + if ((uint32_t) rc >= old) + break; + } while (!ddsrt_atomic_cas32 (&fact->receive_buf_size, old, (uint32_t) rc)); + } + if (set_sndbuf (gv, sock, gv->config.socket_min_sndbuf_size) != DDS_RETCODE_OK) goto fail_w_socket; if (gv->config.dontRoute && set_dont_route (gv, sock, ipv6) != DDS_RETCODE_OK) @@ -533,7 +552,7 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_ WSAEventSelect (conn->m_sock, conn->m_sockEvent, FD_WRITE); #endif - ddsi_factory_conn_init (fact, &conn->m_base); + ddsi_factory_conn_init (&fact->fact, &conn->m_base); conn->m_base.m_base.m_port = get_socket_port (gv, sock); conn->m_base.m_base.m_trantype = DDSI_TRAN_CONN; conn->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC); @@ -791,9 +810,10 @@ static char *ddsi_udp_locator_to_string (char *dst, size_t sizeof_dst, const nn_ } } -static void ddsi_udp_fini (ddsi_tran_factory_t fact) +static void ddsi_udp_fini (ddsi_tran_factory_t fact_cmn) { - struct ddsi_domaingv const * const gv = fact->gv; + struct ddsi_udp_tran_factory *fact = (struct ddsi_udp_tran_factory *) fact_cmn; + struct ddsi_domaingv const * const gv = fact->fact.gv; GVLOG (DDS_LC_CONFIG, "udp finalized\n"); ddsrt_free (fact); } @@ -804,40 +824,48 @@ static int ddsi_udp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_ return (port <= 65535); } +static uint32_t ddsi_udp_receive_buffer_size (const struct ddsi_tran_factory *fact_cmn) +{ + const struct ddsi_udp_tran_factory *fact = (const struct ddsi_udp_tran_factory *) fact_cmn; + return ddsrt_atomic_ld32 (&fact->receive_buf_size); +} + int ddsi_udp_init (struct ddsi_domaingv*gv) { - struct ddsi_tran_factory *fact = ddsrt_malloc (sizeof (*fact)); + struct ddsi_udp_tran_factory *fact = ddsrt_malloc (sizeof (*fact)); memset (fact, 0, sizeof (*fact)); - fact->gv = gv; - fact->m_free_fn = ddsi_udp_fini; - fact->m_kind = NN_LOCATOR_KIND_UDPv4; - fact->m_typename = "udp"; - fact->m_default_spdp_address = "udp/239.255.0.1"; - fact->m_connless = true; - fact->m_supports_fn = ddsi_udp_supports; - fact->m_create_conn_fn = ddsi_udp_create_conn; - fact->m_release_conn_fn = ddsi_udp_release_conn; - fact->m_join_mc_fn = ddsi_udp_join_mc; - fact->m_leave_mc_fn = ddsi_udp_leave_mc; - fact->m_is_mcaddr_fn = ddsi_udp_is_mcaddr; + fact->fact.gv = gv; + fact->fact.m_free_fn = ddsi_udp_fini; + fact->fact.m_kind = NN_LOCATOR_KIND_UDPv4; + fact->fact.m_typename = "udp"; + fact->fact.m_default_spdp_address = "udp/239.255.0.1"; + fact->fact.m_connless = true; + fact->fact.m_supports_fn = ddsi_udp_supports; + fact->fact.m_create_conn_fn = ddsi_udp_create_conn; + fact->fact.m_release_conn_fn = ddsi_udp_release_conn; + fact->fact.m_join_mc_fn = ddsi_udp_join_mc; + fact->fact.m_leave_mc_fn = ddsi_udp_leave_mc; + fact->fact.m_is_mcaddr_fn = ddsi_udp_is_mcaddr; #ifdef DDSI_INCLUDE_SSM - fact->m_is_ssm_mcaddr_fn = ddsi_udp_is_ssm_mcaddr; + fact->fact.m_is_ssm_mcaddr_fn = ddsi_udp_is_ssm_mcaddr; #endif - fact->m_is_nearby_address_fn = ddsi_ipaddr_is_nearby_address; - fact->m_locator_from_string_fn = ddsi_udp_address_from_string; - fact->m_locator_to_string_fn = ddsi_udp_locator_to_string; - fact->m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces; - fact->m_is_valid_port_fn = ddsi_udp_is_valid_port; + fact->fact.m_is_nearby_address_fn = ddsi_ipaddr_is_nearby_address; + fact->fact.m_locator_from_string_fn = ddsi_udp_address_from_string; + fact->fact.m_locator_to_string_fn = ddsi_udp_locator_to_string; + fact->fact.m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces; + fact->fact.m_is_valid_port_fn = ddsi_udp_is_valid_port; + fact->fact.m_receive_buffer_size_fn = ddsi_udp_receive_buffer_size; #if DDSRT_HAVE_IPV6 if (gv->config.transport_selector == TRANS_UDP6) { - fact->m_kind = NN_LOCATOR_KIND_UDPv6; - fact->m_typename = "udp6"; - fact->m_default_spdp_address = "udp6/ff02::ffff:239.255.0.1"; + fact->fact.m_kind = NN_LOCATOR_KIND_UDPv6; + fact->fact.m_typename = "udp6"; + fact->fact.m_default_spdp_address = "udp6/ff02::ffff:239.255.0.1"; } #endif + ddsrt_atomic_st32 (&fact->receive_buf_size, UINT32_MAX); - ddsi_factory_add (gv, fact); + ddsi_factory_add (gv, &fact->fact); GVLOG (DDS_LC_CONFIG, "udp initialized\n"); return 0; } diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index 1f031d2..3b3be71 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -287,6 +287,16 @@ void get_participant_builtin_topic_data (const struct participant *pp, ddsi_plis ETRACE (pp, "spdp_write("PGUIDFMT") - internals: %s\n", PGUID (pp->e.guid), dst->adlink_participant_version_info.internals); } + /* Add Cyclone specific information */ + { + const uint32_t bufsz = ddsi_receive_buffer_size (pp->e.gv->m_factory); + if (bufsz > 0) + { + dst->present |= PP_CYCLONE_RECEIVE_BUFFER_SIZE; + dst->cyclone_receive_buffer_size = bufsz; + } + } + #ifdef DDSI_INCLUDE_SECURITY /* Add Security specific information. */ if (q_omg_get_participant_security_info(pp, &(dst->participant_security_info))) { diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index a47a840..d0b6c8e 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -4933,6 +4933,15 @@ bool new_proxy_participant (struct ddsi_domaingv *gv, const struct ddsi_guid *pp proxypp->minimal_bes_mode = 0; proxypp->implicitly_created = ((custom_flags & CF_IMPLICITLY_CREATED_PROXYPP) != 0); proxypp->proxypp_have_spdp = ((custom_flags & CF_PROXYPP_NO_SPDP) == 0); + if (plist->present & PP_CYCLONE_RECEIVE_BUFFER_SIZE) + proxypp->receive_buffer_size = plist->cyclone_receive_buffer_size; + else /* default to what we use */ + proxypp->receive_buffer_size = ddsi_receive_buffer_size (gv->m_factory); + if (proxypp->receive_buffer_size < 131072) + { + /* if we don't know anything, or if it is implausibly tiny, use 128kB */ + proxypp->receive_buffer_size = 131072; + } { struct proxy_participant *privpp; @@ -5793,6 +5802,7 @@ int new_proxy_reader (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid, prd->favours_ssm = (favours_ssm && gv->config.allowMulticast & AMC_SSM) ? 1 : 0; #endif prd->is_fict_trans_reader = 0; + prd->receive_buffer_size = proxypp->receive_buffer_size; ddsrt_avl_init (&prd_writers_treedef, &prd->writers);