Include receive buffer sizes in discovery

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-06-22 16:37:49 +02:00 committed by eboasson
parent eb2a273c1e
commit 7fbbc13e65
11 changed files with 110 additions and 35 deletions

View file

@ -61,6 +61,7 @@ extern "C" {
#define PP_PARTICIPANT_SECURITY_INFO ((uint64_t)1 << 35)
#define PP_IDENTITY_STATUS_TOKEN ((uint64_t)1 << 36)
#define PP_DATA_TAGS ((uint64_t)1 << 37)
#define PP_CYCLONE_RECEIVE_BUFFER_SIZE ((uint64_t)1 << 38)
/* Set for unrecognized parameters that are in the reserved space or
in our own vendor-specific space that have the
PID_UNRECOGNIZED_INCOMPATIBLE_FLAG set (see DDSI 2.1 9.6.2.2.1) */
@ -227,6 +228,7 @@ typedef struct ddsi_plist {
#endif
uint32_t domain_id;
char *domain_tag;
uint32_t cyclone_receive_buffer_size;
} ddsi_plist_t;

View file

@ -71,6 +71,7 @@ typedef int (*ddsi_tran_leave_mc_fn_t) (ddsi_tran_conn_t, const nn_locator_t *sr
typedef int (*ddsi_is_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const nn_locator_t *loc);
typedef int (*ddsi_is_ssm_mcaddr_fn_t) (const struct ddsi_tran_factory *tran, const nn_locator_t *loc);
typedef int (*ddsi_is_valid_port_fn_t) (const struct ddsi_tran_factory *tran, uint32_t port);
typedef uint32_t (*ddsi_receive_buffer_size_fn_t) (const struct ddsi_tran_factory *fact);
enum ddsi_nearby_address_result {
DNAR_DISTANT,
@ -174,6 +175,7 @@ struct ddsi_tran_factory
ddsi_locator_to_string_fn_t m_locator_to_string_fn;
ddsi_enumerate_interfaces_fn_t m_enumerate_interfaces_fn;
ddsi_is_valid_port_fn_t m_is_valid_port_fn;
ddsi_receive_buffer_size_fn_t m_receive_buffer_size_fn;
/* Data */
@ -214,6 +216,9 @@ inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int3
inline int ddsi_is_valid_port (const struct ddsi_tran_factory *factory, uint32_t port) {
return factory->m_is_valid_port_fn (factory, port);
}
inline uint32_t ddsi_receive_buffer_size (const struct ddsi_tran_factory *factory) {
return factory->m_receive_buffer_size_fn (factory);
}
inline dds_return_t ddsi_factory_create_conn (ddsi_tran_conn_t *conn, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos) {
*conn = NULL;
if (!ddsi_is_valid_port (factory, port))

View file

@ -380,6 +380,7 @@ struct proxy_participant
struct proxy_endpoint_common *endpoints; /* all proxy endpoints can be reached from here */
ddsrt_avl_tree_t groups; /* table of all groups (publisher, subscriber), see struct proxy_group */
seqno_t seq; /* sequence number of most recent SPDP message */
uint32_t receive_buffer_size; /* assumed size of receive buffer, used to limit bursts involving this proxypp */
unsigned implicitly_created : 1; /* participants are implicitly created for Cloud/Fog discovered endpoints */
unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */
unsigned minimal_bes_mode: 1;
@ -470,6 +471,7 @@ struct proxy_reader {
unsigned favours_ssm: 1; /* iff 1, this proxy reader favours SSM when available */
#endif
ddsrt_avl_tree_t writers; /* matching LOCAL writers */
uint32_t receive_buffer_size; /* assumed receive buffer size inherited from proxypp */
filter_fn_t filter;
};

View file

@ -446,8 +446,10 @@ typedef union Submessage {
#define PID_ADLINK_ENDPOINT_GID (PID_VENDORSPECIFIC_FLAG | 0x14u)
#define PID_ADLINK_GROUP_GID (PID_VENDORSPECIFIC_FLAG | 0x15u)
#define PID_ADLINK_EOTINFO (PID_VENDORSPECIFIC_FLAG | 0x16u)
#define PID_ADLINK_PART_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x17u);
#define PID_ADLINK_LAN_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x18u);
#define PID_ADLINK_PART_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x17u)
#define PID_ADLINK_LAN_CERT_NAME (PID_VENDORSPECIFIC_FLAG | 0x18u)
#define PID_CYCLONE_RECEIVE_BUFFER_SIZE (PID_VENDORSPECIFIC_FLAG | 0x19u)
#if defined (__cplusplus)
}

View file

@ -1646,6 +1646,7 @@ static const struct piddesc piddesc_eclipse[] = {
{ .desc = { XE2, XSTOP } }, 0 },
PP (ADLINK_PARTICIPANT_VERSION_INFO, adlink_participant_version_info, Xux5, XS),
PP (ADLINK_TYPE_DESCRIPTION, type_description, XS),
PP (CYCLONE_RECEIVE_BUFFER_SIZE, cyclone_receive_buffer_size, Xu),
{ PID_SENTINEL, 0, 0, NULL, 0, 0, { .desc = { XSTOP } }, 0 }
};
@ -1720,7 +1721,7 @@ struct piddesc_index {
#endif
static const struct piddesc *piddesc_omg_index[DEFAULT_OMG_PIDS_ARRAY_SIZE + SECURITY_OMG_PIDS_ARRAY_SIZE];
static const struct piddesc *piddesc_eclipse_index[19];
static const struct piddesc *piddesc_eclipse_index[26];
static const struct piddesc *piddesc_adlink_index[19];
#define INDEX_ANY(vendorid_, tab_) [vendorid_] = { \

View file

@ -362,6 +362,12 @@ static int ddsi_raweth_is_valid_port (const struct ddsi_tran_factory *fact, uint
return (port >= 1 && port <= 65535);
}
static uint32_t ddsi_raweth_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}
int ddsi_raweth_init (struct ddsi_domaingv *gv)
{
struct ddsi_tran_factory *fact = ddsrt_malloc (sizeof (*fact));
@ -384,6 +390,7 @@ int ddsi_raweth_init (struct ddsi_domaingv *gv)
fact->m_locator_to_string_fn = ddsi_raweth_to_string;
fact->m_enumerate_interfaces_fn = ddsi_raweth_enumerate_interfaces;
fact->m_is_valid_port_fn = ddsi_raweth_is_valid_port;
fact->m_receive_buffer_size_fn = ddsi_raweth_receive_buffer_size;
ddsi_factory_add (gv, fact);
GVLOG (DDS_LC_CONFIG, "raweth initialized\n");
return 0;

View file

@ -1144,6 +1144,12 @@ static int ddsi_tcp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_
return (port <= 65535);
}
static uint32_t ddsi_tcp_receive_buffer_size (const struct ddsi_tran_factory *fact)
{
(void) fact;
return 0;
}
int ddsi_tcp_init (struct ddsi_domaingv *gv)
{
struct ddsi_tran_factory_tcp *fact = ddsrt_malloc (sizeof (*fact));
@ -1169,6 +1175,7 @@ int ddsi_tcp_init (struct ddsi_domaingv *gv)
fact->fact.m_is_ssm_mcaddr_fn = ddsi_tcp_is_ssm_mcaddr;
fact->fact.m_is_nearby_address_fn = ddsi_tcp_is_nearby_address;
fact->fact.m_is_valid_port_fn = ddsi_tcp_is_valid_port;
fact->fact.m_receive_buffer_size_fn = ddsi_tcp_receive_buffer_size;
ddsi_factory_add (gv, &fact->fact);
#if DDSRT_HAVE_IPV6

View file

@ -27,6 +27,7 @@ extern inline uint32_t ddsi_conn_port (const struct ddsi_tran_conn *conn);
extern inline dds_return_t ddsi_factory_create_listener (ddsi_tran_listener_t *listener, ddsi_tran_factory_t factory, uint32_t port, const struct ddsi_tran_qos *qos);
extern inline bool ddsi_factory_supports (const struct ddsi_tran_factory *factory, int32_t kind);
extern inline int ddsi_is_valid_port (const struct ddsi_tran_factory *factory, uint32_t port);
extern inline uint32_t ddsi_receive_buffer_size (const struct ddsi_tran_factory *factory);
extern inline ddsrt_socket_t ddsi_conn_handle (ddsi_tran_conn_t conn);
extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc);
extern inline ddsrt_socket_t ddsi_tran_handle (ddsi_tran_base_t base);

View file

@ -45,6 +45,14 @@ typedef struct ddsi_udp_conn {
int m_diffserv;
} *ddsi_udp_conn_t;
typedef struct ddsi_udp_tran_factory {
struct ddsi_tran_factory fact;
// actual minimum receive buffer size in use
// atomically loaded/stored so we don't have to lie about constness
ddsrt_atomic_uint32_t receive_buf_size;
} *ddsi_udp_tran_factory_t;
static void addr_to_loc (const struct ddsi_tran_factory *tran, nn_locator_t *dst, const union addr *src)
{
ddsi_ipaddr_to_loc (tran, dst, &src->a, (src->a.sa_family == AF_INET) ? NN_LOCATOR_KIND_UDPv4 : NN_LOCATOR_KIND_UDPv6);
@ -305,7 +313,7 @@ static dds_return_t set_rcvbuf (struct ddsi_domaingv const * const gv, ddsrt_soc
}
}
return rc;
return (rc < 0) ? rc : (size > (uint32_t) INT32_MAX) ? INT32_MAX : (int32_t) size;
}
static dds_return_t set_sndbuf (struct ddsi_domaingv const * const gv, ddsrt_socket_t sock, uint32_t min_size)
@ -408,9 +416,10 @@ static dds_return_t set_mc_options_transmit_ipv4 (struct ddsi_domaingv const * c
return DDS_RETCODE_OK;
}
static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact, uint32_t port, const ddsi_tran_qos_t *qos)
static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_factory_t fact_cmn, uint32_t port, const ddsi_tran_qos_t *qos)
{
struct ddsi_domaingv const * const gv = fact->gv;
struct ddsi_udp_tran_factory *fact = (struct ddsi_udp_tran_factory *) fact_cmn;
struct ddsi_domaingv const * const gv = fact->fact.gv;
const int one = 1;
dds_return_t rc;
@ -446,7 +455,7 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_
ownloc_w_port.port = port;
}
ddsi_ipaddr_from_loc (&socketname.x, &ownloc_w_port);
switch (fact->m_kind)
switch (fact->fact.m_kind)
{
case NN_LOCATOR_KIND_UDPv4:
if (bind_to_any)
@ -465,7 +474,7 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_
break;
#endif
default:
DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->m_kind);
DDS_FATAL ("ddsi_udp_create_conn: unsupported kind %"PRId32"\n", fact->fact.m_kind);
}
if ((rc = ddsrt_socket (&sock, socketname.a.sa_family, SOCK_DGRAM, 0)) != DDS_RETCODE_OK)
{
@ -484,8 +493,18 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_
}
}
if (set_rcvbuf (gv, sock, &gv->config.socket_min_rcvbuf_size) != DDS_RETCODE_OK)
if ((rc = set_rcvbuf (gv, sock, &gv->config.socket_min_rcvbuf_size)) < 0)
goto fail_w_socket;
if (rc > 0) {
// set fact->receive_buf_size to the smallest observed value
uint32_t old;
do {
old = ddsrt_atomic_ld32 (&fact->receive_buf_size);
if ((uint32_t) rc >= old)
break;
} while (!ddsrt_atomic_cas32 (&fact->receive_buf_size, old, (uint32_t) rc));
}
if (set_sndbuf (gv, sock, gv->config.socket_min_sndbuf_size) != DDS_RETCODE_OK)
goto fail_w_socket;
if (gv->config.dontRoute && set_dont_route (gv, sock, ipv6) != DDS_RETCODE_OK)
@ -533,7 +552,7 @@ static dds_return_t ddsi_udp_create_conn (ddsi_tran_conn_t *conn_out, ddsi_tran_
WSAEventSelect (conn->m_sock, conn->m_sockEvent, FD_WRITE);
#endif
ddsi_factory_conn_init (fact, &conn->m_base);
ddsi_factory_conn_init (&fact->fact, &conn->m_base);
conn->m_base.m_base.m_port = get_socket_port (gv, sock);
conn->m_base.m_base.m_trantype = DDSI_TRAN_CONN;
conn->m_base.m_base.m_multicast = (qos->m_purpose == DDSI_TRAN_QOS_RECV_MC);
@ -791,9 +810,10 @@ static char *ddsi_udp_locator_to_string (char *dst, size_t sizeof_dst, const nn_
}
}
static void ddsi_udp_fini (ddsi_tran_factory_t fact)
static void ddsi_udp_fini (ddsi_tran_factory_t fact_cmn)
{
struct ddsi_domaingv const * const gv = fact->gv;
struct ddsi_udp_tran_factory *fact = (struct ddsi_udp_tran_factory *) fact_cmn;
struct ddsi_domaingv const * const gv = fact->fact.gv;
GVLOG (DDS_LC_CONFIG, "udp finalized\n");
ddsrt_free (fact);
}
@ -804,40 +824,48 @@ static int ddsi_udp_is_valid_port (const struct ddsi_tran_factory *fact, uint32_
return (port <= 65535);
}
static uint32_t ddsi_udp_receive_buffer_size (const struct ddsi_tran_factory *fact_cmn)
{
const struct ddsi_udp_tran_factory *fact = (const struct ddsi_udp_tran_factory *) fact_cmn;
return ddsrt_atomic_ld32 (&fact->receive_buf_size);
}
int ddsi_udp_init (struct ddsi_domaingv*gv)
{
struct ddsi_tran_factory *fact = ddsrt_malloc (sizeof (*fact));
struct ddsi_udp_tran_factory *fact = ddsrt_malloc (sizeof (*fact));
memset (fact, 0, sizeof (*fact));
fact->gv = gv;
fact->m_free_fn = ddsi_udp_fini;
fact->m_kind = NN_LOCATOR_KIND_UDPv4;
fact->m_typename = "udp";
fact->m_default_spdp_address = "udp/239.255.0.1";
fact->m_connless = true;
fact->m_supports_fn = ddsi_udp_supports;
fact->m_create_conn_fn = ddsi_udp_create_conn;
fact->m_release_conn_fn = ddsi_udp_release_conn;
fact->m_join_mc_fn = ddsi_udp_join_mc;
fact->m_leave_mc_fn = ddsi_udp_leave_mc;
fact->m_is_mcaddr_fn = ddsi_udp_is_mcaddr;
fact->fact.gv = gv;
fact->fact.m_free_fn = ddsi_udp_fini;
fact->fact.m_kind = NN_LOCATOR_KIND_UDPv4;
fact->fact.m_typename = "udp";
fact->fact.m_default_spdp_address = "udp/239.255.0.1";
fact->fact.m_connless = true;
fact->fact.m_supports_fn = ddsi_udp_supports;
fact->fact.m_create_conn_fn = ddsi_udp_create_conn;
fact->fact.m_release_conn_fn = ddsi_udp_release_conn;
fact->fact.m_join_mc_fn = ddsi_udp_join_mc;
fact->fact.m_leave_mc_fn = ddsi_udp_leave_mc;
fact->fact.m_is_mcaddr_fn = ddsi_udp_is_mcaddr;
#ifdef DDSI_INCLUDE_SSM
fact->m_is_ssm_mcaddr_fn = ddsi_udp_is_ssm_mcaddr;
fact->fact.m_is_ssm_mcaddr_fn = ddsi_udp_is_ssm_mcaddr;
#endif
fact->m_is_nearby_address_fn = ddsi_ipaddr_is_nearby_address;
fact->m_locator_from_string_fn = ddsi_udp_address_from_string;
fact->m_locator_to_string_fn = ddsi_udp_locator_to_string;
fact->m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces;
fact->m_is_valid_port_fn = ddsi_udp_is_valid_port;
fact->fact.m_is_nearby_address_fn = ddsi_ipaddr_is_nearby_address;
fact->fact.m_locator_from_string_fn = ddsi_udp_address_from_string;
fact->fact.m_locator_to_string_fn = ddsi_udp_locator_to_string;
fact->fact.m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces;
fact->fact.m_is_valid_port_fn = ddsi_udp_is_valid_port;
fact->fact.m_receive_buffer_size_fn = ddsi_udp_receive_buffer_size;
#if DDSRT_HAVE_IPV6
if (gv->config.transport_selector == TRANS_UDP6)
{
fact->m_kind = NN_LOCATOR_KIND_UDPv6;
fact->m_typename = "udp6";
fact->m_default_spdp_address = "udp6/ff02::ffff:239.255.0.1";
fact->fact.m_kind = NN_LOCATOR_KIND_UDPv6;
fact->fact.m_typename = "udp6";
fact->fact.m_default_spdp_address = "udp6/ff02::ffff:239.255.0.1";
}
#endif
ddsrt_atomic_st32 (&fact->receive_buf_size, UINT32_MAX);
ddsi_factory_add (gv, fact);
ddsi_factory_add (gv, &fact->fact);
GVLOG (DDS_LC_CONFIG, "udp initialized\n");
return 0;
}

View file

@ -287,6 +287,16 @@ void get_participant_builtin_topic_data (const struct participant *pp, ddsi_plis
ETRACE (pp, "spdp_write("PGUIDFMT") - internals: %s\n", PGUID (pp->e.guid), dst->adlink_participant_version_info.internals);
}
/* Add Cyclone specific information */
{
const uint32_t bufsz = ddsi_receive_buffer_size (pp->e.gv->m_factory);
if (bufsz > 0)
{
dst->present |= PP_CYCLONE_RECEIVE_BUFFER_SIZE;
dst->cyclone_receive_buffer_size = bufsz;
}
}
#ifdef DDSI_INCLUDE_SECURITY
/* Add Security specific information. */
if (q_omg_get_participant_security_info(pp, &(dst->participant_security_info))) {

View file

@ -4933,6 +4933,15 @@ bool new_proxy_participant (struct ddsi_domaingv *gv, const struct ddsi_guid *pp
proxypp->minimal_bes_mode = 0;
proxypp->implicitly_created = ((custom_flags & CF_IMPLICITLY_CREATED_PROXYPP) != 0);
proxypp->proxypp_have_spdp = ((custom_flags & CF_PROXYPP_NO_SPDP) == 0);
if (plist->present & PP_CYCLONE_RECEIVE_BUFFER_SIZE)
proxypp->receive_buffer_size = plist->cyclone_receive_buffer_size;
else /* default to what we use */
proxypp->receive_buffer_size = ddsi_receive_buffer_size (gv->m_factory);
if (proxypp->receive_buffer_size < 131072)
{
/* if we don't know anything, or if it is implausibly tiny, use 128kB */
proxypp->receive_buffer_size = 131072;
}
{
struct proxy_participant *privpp;
@ -5793,6 +5802,7 @@ int new_proxy_reader (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid,
prd->favours_ssm = (favours_ssm && gv->config.allowMulticast & AMC_SSM) ? 1 : 0;
#endif
prd->is_fict_trans_reader = 0;
prd->receive_buffer_size = proxypp->receive_buffer_size;
ddsrt_avl_init (&prd_writers_treedef, &prd->writers);