improve performance by allowing multiple receive threads and avoiding multiplexing I/O

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-08-05 13:28:58 +02:00 committed by eboasson
parent 508d56b427
commit 4f0b9cb471
11 changed files with 354 additions and 152 deletions

View file

@ -260,6 +260,8 @@ struct config
int noprogress_log_stacktraces; int noprogress_log_stacktraces;
int prioritize_retransmit; int prioritize_retransmit;
int xpack_send_async; int xpack_send_async;
int multiple_recv_threads;
unsigned recv_thread_stop_maxretries;
unsigned primary_reorder_maxsamples; unsigned primary_reorder_maxsamples;
unsigned secondary_reorder_maxsamples; unsigned secondary_reorder_maxsamples;

View file

@ -65,6 +65,25 @@ enum recvips_mode {
#define N_LEASE_LOCKS_LG2 4 #define N_LEASE_LOCKS_LG2 4
#define N_LEASE_LOCKS ((int) (1 << N_LEASE_LOCKS_LG2)) #define N_LEASE_LOCKS ((int) (1 << N_LEASE_LOCKS_LG2))
enum recv_thread_mode {
RTM_SINGLE,
RTM_MANY
};
struct recv_thread_arg {
enum recv_thread_mode mode;
struct nn_rbufpool *rbpool;
union {
struct {
const nn_locator_t *loc;
struct ddsi_tran_conn *conn;
} single;
struct {
os_sockWaitset ws;
} many;
} u;
};
struct q_globals { struct q_globals {
volatile int terminate; volatile int terminate;
volatile int exception; volatile int exception;
@ -113,13 +132,6 @@ struct q_globals {
struct ut_thread_pool_s * thread_pool; struct ut_thread_pool_s * thread_pool;
/* Receive thread triggering: must have a socket per receive thread
because all receive threads must be triggered, even though each
receive thread takes the trigger message from the socket. With one
trigger socket, we can only have one receive thread (which enables
other optimisations that we don't currently do). */
os_sockWaitset waitset;
/* In many sockets mode, the receive threads maintain a local array /* In many sockets mode, the receive threads maintain a local array
with participant GUIDs and sockets, participant_set_generation is with participant GUIDs and sockets, participant_set_generation is
used to notify them. */ used to notify them. */
@ -192,10 +204,15 @@ struct q_globals {
/* Receive thread. (We can only has one for now, cos of the signal /* Receive thread. (We can only has one for now, cos of the signal
trigger socket.) Receive buffer pool is per receive thread, trigger socket.) Receive buffer pool is per receive thread,
practical considerations led to it being a global variable it is only a global variable because it needs to be freed way later
TEMPORARILY. */ than the receive thread itself terminates */
struct thread_state1 *recv_ts; #define MAX_RECV_THREADS 3
struct nn_rbufpool *rbufpool; unsigned n_recv_threads;
struct recv_thread {
const char *name;
struct thread_state1 *ts;
struct recv_thread_arg arg;
} recv_threads[MAX_RECV_THREADS];
/* Listener thread for connection based transports */ /* Listener thread for connection based transports */
struct thread_state1 *listen_ts; struct thread_state1 *listen_ts;

View file

@ -20,8 +20,10 @@ struct nn_rbufpool;
struct nn_rsample_info; struct nn_rsample_info;
struct nn_rdata; struct nn_rdata;
struct ddsi_tran_listener; struct ddsi_tran_listener;
struct recv_thread_arg;
uint32_t recv_thread (struct nn_rbufpool *rbpool); void trigger_recv_threads (void);
uint32_t recv_thread (void *vrecv_thread_arg);
uint32_t listen_thread (struct ddsi_tran_listener * listener); uint32_t listen_thread (struct ddsi_tran_listener * listener);
int user_dqueue_handler (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const nn_guid_t *rdguid, void *qarg); int user_dqueue_handler (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const nn_guid_t *rdguid, void *qarg);

View file

@ -80,7 +80,6 @@ int rtps_config_prep (struct cfgst *cfgst);
int rtps_config_open (void); int rtps_config_open (void);
int rtps_init (void); int rtps_init (void);
void ddsi_plugin_init (void); void ddsi_plugin_init (void);
void rtps_term_prep (void);
void rtps_term (void); void rtps_term (void);
#if defined (__cplusplus) #if defined (__cplusplus)

View file

@ -263,8 +263,11 @@ static void ddsi_tcp_conn_connect (ddsi_tcp_conn_t conn, const struct msghdr * m
/* Also may need to receive on connection so add to waitset */ /* Also may need to receive on connection so add to waitset */
os_sockSetNonBlocking (conn->m_sock, true); os_sockSetNonBlocking (conn->m_sock, true);
os_sockWaitsetAdd (gv.waitset, &conn->m_base);
os_sockWaitsetTrigger (gv.waitset); assert (gv.n_recv_threads > 0);
assert (gv.recv_threads[0].arg.mode == RTM_MANY);
os_sockWaitsetAdd (gv.recv_threads[0].arg.u.many.ws, &conn->m_base);
os_sockWaitsetTrigger (gv.recv_threads[0].arg.u.many.ws);
} }
} }

View file

@ -98,7 +98,27 @@ void ddsi_conn_free (ddsi_tran_conn_t conn)
conn->m_closed = true; conn->m_closed = true;
/* FIXME: rethink the socket waitset & the deleting of entries; the biggest issue is TCP handling that can open & close sockets at will and yet expects the waitset to wake up at the apprioriate times. (This pretty much works with the select-based version, but not the kqueue-based one.) TCP code can also have connections without a socket ... Calling sockWaitsetRemove here (where there shouldn't be any knowledge of it) at least ensures that it is removed in time and that there can't be aliasing of connections and sockets. */ /* FIXME: rethink the socket waitset & the deleting of entries; the biggest issue is TCP handling that can open & close sockets at will and yet expects the waitset to wake up at the apprioriate times. (This pretty much works with the select-based version, but not the kqueue-based one.) TCP code can also have connections without a socket ... Calling sockWaitsetRemove here (where there shouldn't be any knowledge of it) at least ensures that it is removed in time and that there can't be aliasing of connections and sockets. */
if (ddsi_conn_handle (conn) != Q_INVALID_SOCKET) if (ddsi_conn_handle (conn) != Q_INVALID_SOCKET)
os_sockWaitsetRemove (gv.waitset, conn); {
unsigned i;
for (i = 0; i < gv.n_recv_threads; i++)
{
if (!gv.recv_threads[i].ts)
assert (!gv.rtps_keepgoing);
else
{
switch (gv.recv_threads[i].arg.mode)
{
case RTM_MANY:
os_sockWaitsetRemove (gv.recv_threads[i].arg.u.many.ws, conn);
break;
case RTM_SINGLE:
if (gv.recv_threads[i].arg.u.single.conn == conn)
abort();
break;
}
}
}
}
if (conn->m_factory->m_close_conn_fn) if (conn->m_factory->m_close_conn_fn)
{ {
(conn->m_factory->m_close_conn_fn) (conn); (conn->m_factory->m_close_conn_fn) (conn);

View file

@ -505,6 +505,12 @@ static const struct cfgelem liveliness_monitoring_attrs[] = {
END_MARKER END_MARKER
}; };
static const struct cfgelem multiple_recv_threads_attrs[] = {
{ ATTR("maxretries"), 1, "4294967295", ABSOFF(recv_thread_stop_maxretries), 0, uf_uint, 0, pf_uint,
"<p>Receive threads dedicated to a single socket can only be triggered for termination by sending a packet. Reception of any packet will do, so termination failure due to packet loss is exceedingly unlikely, but to eliminate all risks, it will retry as many times as specified by this attribute before aborting.</p>" },
END_MARKER
};
static const struct cfgelem unsupp_cfgelems[] = { static const struct cfgelem unsupp_cfgelems[] = {
{ MOVED("MaxMessageSize", "General/MaxMessageSize") }, { MOVED("MaxMessageSize", "General/MaxMessageSize") },
{ MOVED("FragmentSize", "General/FragmentSize") }, { MOVED("FragmentSize", "General/FragmentSize") },
@ -614,6 +620,8 @@ static const struct cfgelem unsupp_cfgelems[] = {
"<p>This element controls whether the actual sending of packets occurs on the same thread that prepares them, or is done asynchronously by another thread.</p>" }, "<p>This element controls whether the actual sending of packets occurs on the same thread that prepares them, or is done asynchronously by another thread.</p>" },
{ LEAF_W_ATTRS("RediscoveryBlacklistDuration", rediscovery_blacklist_duration_attrs), 1, "10s", ABSOFF(prune_deleted_ppant.delay), 0, uf_duration_inf, 0, pf_duration, { LEAF_W_ATTRS("RediscoveryBlacklistDuration", rediscovery_blacklist_duration_attrs), 1, "10s", ABSOFF(prune_deleted_ppant.delay), 0, uf_duration_inf, 0, pf_duration,
"<p>This element controls for how long a remote participant that was previously deleted will remain on a blacklist to prevent rediscovery, giving the software on a node time to perform any cleanup actions it needs to do. To some extent this delay is required internally by DDSI2E, but in the default configuration with the 'enforce' attribute set to false, DDSI2E will reallow rediscovery as soon as it has cleared its internal administration. Setting it to too small a value may result in the entry being pruned from the blacklist before DDSI2E is ready, it is therefore recommended to set it to at least several seconds.</p>" }, "<p>This element controls for how long a remote participant that was previously deleted will remain on a blacklist to prevent rediscovery, giving the software on a node time to perform any cleanup actions it needs to do. To some extent this delay is required internally by DDSI2E, but in the default configuration with the 'enforce' attribute set to false, DDSI2E will reallow rediscovery as soon as it has cleared its internal administration. Setting it to too small a value may result in the entry being pruned from the blacklist before DDSI2E is ready, it is therefore recommended to set it to at least several seconds.</p>" },
{ LEAF_W_ATTRS("MultipleReceiveThreads", multiple_recv_threads_attrs), 1, "true", ABSOFF(multiple_recv_threads), 0, uf_boolean, 0, pf_boolean,
"<p>This element controls whether all traffic is handled by a single receive thread or whether multiple receive threads may be used to improve latency. Currently multiple receive threads are only used for connectionless transport (e.g., UDP) and ManySocketsMode not set to single (the default).</p>" },
{ MGROUP("ControlTopic", control_topic_cfgelems, control_topic_cfgattrs), 1, 0, 0, 0, 0, 0, 0, 0, { MGROUP("ControlTopic", control_topic_cfgelems, control_topic_cfgattrs), 1, 0, 0, 0, 0, 0, 0, 0,
"<p>The ControlTopic element allows configured whether DDSI2E provides a special control interface via a predefined topic or not.<p>" }, "<p>The ControlTopic element allows configured whether DDSI2E provides a special control interface via a predefined topic or not.<p>" },
{ GROUP("Test", unsupp_test_cfgelems), { GROUP("Test", unsupp_test_cfgelems),

View file

@ -39,6 +39,7 @@
#include "ddsi/q_builtin_topic.h" #include "ddsi/q_builtin_topic.h"
#include "ddsi/ddsi_ser.h" #include "ddsi/ddsi_ser.h"
#include "ddsi/ddsi_mcgroup.h" #include "ddsi/ddsi_mcgroup.h"
#include "ddsi/q_receive.h"
#include "ddsi/sysdeps.h" #include "ddsi/sysdeps.h"
#include "dds__whc.h" #include "dds__whc.h"
@ -575,7 +576,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
{ {
os_atomic_fence (); os_atomic_fence ();
os_atomic_inc32 (&gv.participant_set_generation); os_atomic_inc32 (&gv.participant_set_generation);
os_sockWaitsetTrigger (gv.waitset); trigger_recv_threads ();
} }
/* SPDP periodic broadcast uses the retransmit path, so the initial /* SPDP periodic broadcast uses the retransmit path, so the initial
@ -784,7 +785,6 @@ static void unref_participant (struct participant *pp, const struct nn_guid *gui
/* Deleting the socket will usually suffice to wake up the /* Deleting the socket will usually suffice to wake up the
receiver threads, but in general, no one cares if it takes a receiver threads, but in general, no one cares if it takes a
while longer for it to wakeup. */ while longer for it to wakeup. */
ddsi_conn_free (pp->m_conn); ddsi_conn_free (pp->m_conn);
} }
nn_plist_fini (pp->plist); nn_plist_fini (pp->plist);

View file

@ -596,10 +596,10 @@ int rtps_config_prep (struct cfgst *cfgst)
} }
/* Thread admin: need max threads, which is currently (2 or 3) for each /* Thread admin: need max threads, which is currently (2 or 3) for each
configured channel plus 7: main, recv, dqueue.builtin, configured channel plus 9: main, recv (up to 3x), dqueue.builtin,
lease, gc, debmon; once thread state admin has been inited, upgrade the lease, gc, debmon; once thread state admin has been inited, upgrade the
main thread one participating in the thread tracking stuff as main thread one participating in the thread tracking stuff as
if it had been created using create_thread(). */ if it had been created using create_thread(). */
{ {
/* For Lite - Temporary /* For Lite - Temporary
@ -608,9 +608,9 @@ int rtps_config_prep (struct cfgst *cfgst)
#define USER_MAX_THREADS 50 #define USER_MAX_THREADS 50
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS #ifdef DDSI_INCLUDE_NETWORK_CHANNELS
const unsigned max_threads = 7 + USER_MAX_THREADS + num_channel_threads + config.ddsi2direct_max_threads; const unsigned max_threads = 9 + USER_MAX_THREADS + num_channel_threads + config.ddsi2direct_max_threads;
#else #else
const unsigned max_threads = 9 + USER_MAX_THREADS + config.ddsi2direct_max_threads; const unsigned max_threads = 11 + USER_MAX_THREADS + config.ddsi2direct_max_threads;
#endif #endif
thread_states_init (max_threads); thread_states_init (max_threads);
} }
@ -705,6 +705,141 @@ err_disc:
return 0; return 0;
} }
static void rtps_term_prep (void)
{
/* Stop all I/O */
os_mutexLock (&gv.lock);
if (gv.rtps_keepgoing)
{
gv.rtps_keepgoing = 0; /* so threads will stop once they get round to checking */
os_atomic_fence ();
/* can't wake up throttle_writer, currently, but it'll check every few seconds */
trigger_recv_threads ();
}
os_mutexUnlock (&gv.lock);
}
struct wait_for_receive_threads_helper_arg {
unsigned count;
};
static void wait_for_receive_threads_helper (struct xevent *xev, void *varg, nn_mtime_t tnow)
{
struct wait_for_receive_threads_helper_arg * const arg = varg;
if (arg->count++ == config.recv_thread_stop_maxretries)
abort ();
trigger_recv_threads ();
resched_xevent_if_earlier (xev, add_duration_to_mtime (tnow, T_SECOND));
}
static void wait_for_receive_threads (void)
{
struct xevent *trigev;
unsigned i;
struct wait_for_receive_threads_helper_arg cbarg;
cbarg.count = 0;
if ((trigev = qxev_callback (add_duration_to_mtime (now_mt (), T_SECOND), wait_for_receive_threads_helper, &cbarg)) == NULL)
{
/* retrying is to deal a packet geting lost because the socket buffer is full or because the
macOS firewall (and perhaps others) likes to ask if the process is allowed to receive data,
dropping the packets until the user approves. */
NN_WARNING ("wait_for_receive_threads: failed to schedule periodic triggering of the receive threads to deal with packet loss\n");
}
for (i = 0; i < gv.n_recv_threads; i++)
{
if (gv.recv_threads[i].ts)
{
join_thread (gv.recv_threads[i].ts);
/* setting .ts to NULL helps in sanity checking */
gv.recv_threads[i].ts = NULL;
}
}
if (trigev)
{
delete_xevent (trigev);
}
}
static int setup_and_start_recv_threads (void)
{
unsigned i;
for (i = 0; i < MAX_RECV_THREADS; i++)
{
gv.recv_threads[i].ts = NULL;
gv.recv_threads[i].arg.mode = RTM_SINGLE;
gv.recv_threads[i].arg.rbpool = NULL;
gv.recv_threads[i].arg.u.single.loc = NULL;
gv.recv_threads[i].arg.u.single.conn = NULL;
}
/* First thread always uses a waitset and gobbles up all sockets not handled by dedicated threads - FIXME: MSM_NO_UNICAST mode with UDP probably doesn't even need this one to use a waitset */
gv.n_recv_threads = 1;
gv.recv_threads[0].name = "recv";
gv.recv_threads[0].arg.mode = RTM_MANY;
if (gv.m_factory->m_connless && config.many_sockets_mode != MSM_NO_UNICAST && config.multiple_recv_threads)
{
if (ddsi_is_mcaddr (&gv.loc_default_mc) && !ddsi_is_ssm_mcaddr (&gv.loc_default_mc))
{
/* Multicast enabled, but it isn't an SSM address => handle data multicasts on a separate thread (the trouble with SSM addresses is that we only join matching writers, which our own sockets typically would not be) */
gv.recv_threads[gv.n_recv_threads].name = "recvMC";
gv.recv_threads[gv.n_recv_threads].arg.mode = RTM_SINGLE;
gv.recv_threads[gv.n_recv_threads].arg.u.single.conn = gv.data_conn_mc;
gv.recv_threads[gv.n_recv_threads].arg.u.single.loc = &gv.loc_default_mc;
gv.n_recv_threads++;
}
if (config.many_sockets_mode == MSM_SINGLE_UNICAST)
{
/* No per-participant sockets => handle data unicasts on a separate thread as well */
gv.recv_threads[gv.n_recv_threads].name = "recvUC";
gv.recv_threads[gv.n_recv_threads].arg.mode = RTM_SINGLE;
gv.recv_threads[gv.n_recv_threads].arg.u.single.conn = gv.data_conn_uc;
gv.recv_threads[gv.n_recv_threads].arg.u.single.loc = &gv.loc_default_uc;
gv.n_recv_threads++;
}
}
assert (gv.n_recv_threads <= MAX_RECV_THREADS);
/* For each thread, create rbufpool and waitset if needed, then start it */
for (i = 0; i < gv.n_recv_threads; i++)
{
/* We create the rbufpool for the receive thread, and so we'll
become the initial owner thread. The receive thread will change
it before it does anything with it. */
if ((gv.recv_threads[i].arg.rbpool = nn_rbufpool_new (config.rbuf_size, config.rmsg_chunk_size)) == NULL)
{
NN_ERROR ("rtps_init: can't allocate receive buffer pool for thread %s\n", gv.recv_threads[i].name);
goto fail;
}
if (gv.recv_threads[i].arg.mode == RTM_MANY)
{
if ((gv.recv_threads[i].arg.u.many.ws = os_sockWaitsetNew ()) == NULL)
{
NN_ERROR ("rtps_init: can't allocate sock waitset for thread %s\n", gv.recv_threads[i].name);
goto fail;
}
}
if ((gv.recv_threads[i].ts = create_thread (gv.recv_threads[i].name, recv_thread, &gv.recv_threads[i].arg)) == NULL)
{
NN_ERROR ("rtps_init: failed to start thread %s\n", gv.recv_threads[i].name);
goto fail;
}
}
return 0;
fail:
/* to trigger any threads we already started to stop - xevent thread has already been started */
rtps_term_prep ();
wait_for_receive_threads ();
for (i = 0; i < gv.n_recv_threads; i++)
{
if (gv.recv_threads[i].arg.mode == RTM_MANY && gv.recv_threads[i].arg.u.many.ws)
os_sockWaitsetFree (gv.recv_threads[i].arg.u.many.ws);
if (gv.recv_threads[i].arg.rbpool)
nn_rbufpool_free (gv.recv_threads[i].arg.rbpool);
}
return -1;
}
int rtps_init (void) int rtps_init (void)
{ {
uint32_t port_disc_uc = 0; uint32_t port_disc_uc = 0;
@ -942,8 +1077,6 @@ int rtps_init (void)
} }
nn_log (LC_CONFIG, "rtps_init: domainid %d participantid %d\n", config.domainId, config.participantIndex); nn_log (LC_CONFIG, "rtps_init: domainid %d participantid %d\n", config.domainId, config.participantIndex);
gv.waitset = os_sockWaitsetNew ();
if (config.pcap_file && *config.pcap_file) if (config.pcap_file && *config.pcap_file)
{ {
gv.pcap_fp = new_pcap_file (config.pcap_file); gv.pcap_fp = new_pcap_file (config.pcap_file);
@ -1105,14 +1238,6 @@ int rtps_init (void)
gv.gcreq_queue = gcreq_queue_new (); gv.gcreq_queue = gcreq_queue_new ();
/* We create the rbufpool for the receive thread, and so we'll
become the initial owner thread. The receive thread will change
it before it does anything with it. */
if ((gv.rbufpool = nn_rbufpool_new (config.rbuf_size, config.rmsg_chunk_size)) == NULL)
{
NN_FATAL ("rtps_init: can't allocate receive buffer pool\n");
}
gv.rtps_keepgoing = 1; gv.rtps_keepgoing = 1;
os_rwlockInit (&gv.qoslock); os_rwlockInit (&gv.qoslock);
@ -1151,7 +1276,11 @@ int rtps_init (void)
gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL); gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
#endif #endif
gv.recv_ts = create_thread ("recv", (uint32_t (*) (void *)) recv_thread, gv.rbufpool); if (setup_and_start_recv_threads () < 0)
{
NN_FATAL ("failed to start receive threads\n");
}
if (gv.listener) if (gv.listener)
{ {
gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener); gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
@ -1176,7 +1305,6 @@ err_mc_conn:
ddsi_conn_free (gv.data_conn_mc); ddsi_conn_free (gv.data_conn_mc);
if (gv.pcap_fp) if (gv.pcap_fp)
os_mutexDestroy (&gv.pcap_lock); os_mutexDestroy (&gv.pcap_lock);
os_sockWaitsetFree (gv.waitset);
if (gv.disc_conn_uc != gv.disc_conn_mc) if (gv.disc_conn_uc != gv.disc_conn_mc)
ddsi_conn_free (gv.data_conn_uc); ddsi_conn_free (gv.data_conn_uc);
if (gv.data_conn_uc != gv.disc_conn_uc) if (gv.data_conn_uc != gv.disc_conn_uc)
@ -1251,20 +1379,6 @@ static void builtins_dqueue_ready_cb (void *varg)
os_mutexUnlock (&arg->lock); os_mutexUnlock (&arg->lock);
} }
void rtps_term_prep (void)
{
/* Stop all I/O */
os_mutexLock (&gv.lock);
if (gv.rtps_keepgoing)
{
gv.rtps_keepgoing = 0; /* so threads will stop once they get round to checking */
os_atomic_fence ();
/* can't wake up throttle_writer, currently, but it'll check every few seconds */
os_sockWaitsetTrigger (gv.waitset);
}
os_mutexUnlock (&gv.lock);
}
void rtps_term (void) void rtps_term (void)
{ {
struct thread_state1 *self = lookup_thread_state (); struct thread_state1 *self = lookup_thread_state ();
@ -1280,7 +1394,7 @@ void rtps_term (void)
/* Stop all I/O */ /* Stop all I/O */
rtps_term_prep (); rtps_term_prep ();
join_thread (gv.recv_ts); wait_for_receive_threads ();
if (gv.listener) if (gv.listener)
{ {
@ -1443,22 +1557,17 @@ void rtps_term (void)
(void) joinleave_spdp_defmcip (0); (void) joinleave_spdp_defmcip (0);
ddsi_conn_free (gv.disc_conn_mc); ddsi_conn_free (gv.disc_conn_mc);
ddsi_conn_free (gv.data_conn_mc); if (gv.data_conn_mc != gv.disc_conn_mc)
if (gv.disc_conn_uc == gv.data_conn_uc) ddsi_conn_free (gv.data_conn_mc);
{ if (gv.disc_conn_uc != gv.disc_conn_mc)
ddsi_conn_free (gv.data_conn_uc);
}
else
{
ddsi_conn_free (gv.data_conn_uc);
ddsi_conn_free (gv.disc_conn_uc); ddsi_conn_free (gv.disc_conn_uc);
} if (gv.data_conn_uc != gv.disc_conn_uc)
ddsi_conn_free (gv.data_conn_uc);
/* Not freeing gv.tev_conn: it aliases data_conn_uc */ /* Not freeing gv.tev_conn: it aliases data_conn_uc */
free_group_membership(gv.mship); free_group_membership(gv.mship);
ddsi_tran_factories_fini (); ddsi_tran_factories_fini ();
os_sockWaitsetFree (gv.waitset);
if (gv.pcap_fp) if (gv.pcap_fp)
{ {
@ -1473,7 +1582,16 @@ void rtps_term (void)
been dropped, which only happens once all receive threads have been dropped, which only happens once all receive threads have
stopped, defrags and reorders have been freed, and all delivery stopped, defrags and reorders have been freed, and all delivery
queues been drained. I.e., until very late in the game. */ queues been drained. I.e., until very late in the game. */
nn_rbufpool_free (gv.rbufpool); {
unsigned i;
for (i = 0; i < gv.n_recv_threads; i++)
{
if (gv.recv_threads[i].arg.mode == RTM_MANY)
os_sockWaitsetFree (gv.recv_threads[i].arg.u.many.ws);
nn_rbufpool_free (gv.recv_threads[i].arg.rbpool);
}
}
dds_tkmap_free (gv.m_tkmap); dds_tkmap_free (gv.m_tkmap);
ephash_free (gv.guid_hash); ephash_free (gv.guid_hash);

View file

@ -3237,13 +3237,24 @@ uint32_t listen_thread (struct ddsi_tran_listener * listener)
conn = ddsi_listener_accept (listener); conn = ddsi_listener_accept (listener);
if (conn) if (conn)
{ {
os_sockWaitsetAdd (gv.waitset, conn); os_sockWaitsetAdd (gv.recv_threads[0].arg.u.many.ws, conn);
os_sockWaitsetTrigger (gv.waitset); os_sockWaitsetTrigger (gv.recv_threads[0].arg.u.many.ws);
} }
} }
return 0; return 0;
} }
static int recv_thread_waitset_add_conn (os_sockWaitset ws, ddsi_tran_conn_t conn)
{
unsigned i;
if (conn == NULL)
return 0;
for (i = 0; i < gv.n_recv_threads; i++)
if (gv.recv_threads[i].arg.mode == RTM_SINGLE && gv.recv_threads[i].arg.u.single.conn == conn)
return 0;
return os_sockWaitsetAdd (ws, conn);
}
enum local_deaf_state_recover { enum local_deaf_state_recover {
LDSR_NORMAL = 0, /* matches gv.deaf for normal operation */ LDSR_NORMAL = 0, /* matches gv.deaf for normal operation */
LDSR_DEAF = 1, /* matches gv.deaf for "deaf" state */ LDSR_DEAF = 1, /* matches gv.deaf for "deaf" state */
@ -3255,7 +3266,7 @@ struct local_deaf_state {
nn_mtime_t tnext; nn_mtime_t tnext;
}; };
static int check_and_handle_deafness_recover(struct local_deaf_state *st) static int check_and_handle_deafness_recover(struct local_deaf_state *st, unsigned num_fixed_uc)
{ {
int rebuildws = 0; int rebuildws = 0;
if (now_mt().v < st->tnext.v) if (now_mt().v < st->tnext.v)
@ -3278,9 +3289,11 @@ static int check_and_handle_deafness_recover(struct local_deaf_state *st)
ddsi_transfer_group_membership(data, gv.data_conn_mc); ddsi_transfer_group_membership(data, gv.data_conn_mc);
TRACE(("check_and_handle_deafness_recover: state %d drop from waitset and add new\n", (int)st->state)); TRACE(("check_and_handle_deafness_recover: state %d drop from waitset and add new\n", (int)st->state));
/* see waitset construction code in recv_thread */ /* see waitset construction code in recv_thread */
os_sockWaitsetPurge (gv.waitset, 1 + (gv.disc_conn_uc != gv.data_conn_uc)); os_sockWaitsetPurge (gv.recv_threads[0].arg.u.many.ws, num_fixed_uc);
os_sockWaitsetAdd (gv.waitset, gv.disc_conn_mc); if (recv_thread_waitset_add_conn (gv.recv_threads[0].arg.u.many.ws, gv.disc_conn_mc) < 0)
os_sockWaitsetAdd (gv.waitset, gv.data_conn_mc); NN_FATAL("check_and_handle_deafness_recover: failed to add disc_conn_mc to waitset\n");
if (recv_thread_waitset_add_conn (gv.recv_threads[0].arg.u.many.ws, gv.data_conn_mc) < 0)
NN_FATAL("check_and_handle_deafness_recover: failed to add data_conn_mc to waitset\n");
TRACE(("check_and_handle_deafness_recover: state %d close sockets\n", (int)st->state)); TRACE(("check_and_handle_deafness_recover: state %d close sockets\n", (int)st->state));
ddsi_conn_free(disc); ddsi_conn_free(disc);
ddsi_conn_free(data); ddsi_conn_free(data);
@ -3308,7 +3321,7 @@ error:
return rebuildws; return rebuildws;
} }
static int check_and_handle_deafness(struct local_deaf_state *st) static int check_and_handle_deafness(struct local_deaf_state *st, unsigned num_fixed_uc)
{ {
const int gv_deaf = gv.deaf; const int gv_deaf = gv.deaf;
assert (gv_deaf == 0 || gv_deaf == 1); assert (gv_deaf == 0 || gv_deaf == 1);
@ -3329,108 +3342,129 @@ static int check_and_handle_deafness(struct local_deaf_state *st)
} }
else else
{ {
return check_and_handle_deafness_recover(st); return check_and_handle_deafness_recover(st, num_fixed_uc);
} }
} }
uint32_t recv_thread (struct nn_rbufpool * rbpool) void trigger_recv_threads (void)
{ {
struct thread_state1 *self = lookup_thread_state ();
struct local_participant_set lps;
unsigned num_fixed = 0;
nn_mtime_t next_thread_cputime = { 0 };
os_sockWaitsetCtx ctx;
unsigned i; unsigned i;
struct local_deaf_state lds; for (i = 0; i < gv.n_recv_threads; i++)
{
if (gv.recv_threads[i].ts == NULL)
continue;
switch (gv.recv_threads[i].arg.mode)
{
case RTM_SINGLE: {
char buf[DDSI_LOCSTRLEN];
char dummy = 0;
const nn_locator_t *dst = gv.recv_threads[i].arg.u.single.loc;
ddsi_iovec_t iov;
iov.iov_base = &dummy;
iov.iov_len = 1;
TRACE(("trigger_recv_threads: %d single %s\n", i, ddsi_locator_to_string (buf, sizeof (buf), dst)));
ddsi_conn_write (gv.data_conn_uc, dst, 1, &iov, 0);
break;
}
case RTM_MANY: {
TRACE(("trigger_recv_threads: %d many %p\n", i, gv.recv_threads[i].arg.u.many.ws));
os_sockWaitsetTrigger (gv.recv_threads[i].arg.u.many.ws);
break;
}
}
}
}
lds.state = gv.deaf ? LDSR_DEAF : LDSR_NORMAL; uint32_t recv_thread (void *vrecv_thread_arg)
lds.tnext = now_mt(); {
struct recv_thread_arg *recv_thread_arg = vrecv_thread_arg;
struct thread_state1 *self = lookup_thread_state ();
struct nn_rbufpool *rbpool = recv_thread_arg->rbpool;
os_sockWaitset waitset = recv_thread_arg->mode == RTM_MANY ? recv_thread_arg->u.many.ws : NULL;
nn_mtime_t next_thread_cputime = { 0 };
local_participant_set_init (&lps);
nn_rbufpool_setowner (rbpool, os_threadIdSelf ()); nn_rbufpool_setowner (rbpool, os_threadIdSelf ());
if (waitset == NULL)
if (gv.m_factory->m_connless)
{ {
if (config.many_sockets_mode == MSM_NO_UNICAST) while (gv.rtps_keepgoing)
{ {
/* we only have one - disc, data, uc and mc all alias each other */ LOG_THREAD_CPUTIME (next_thread_cputime);
os_sockWaitsetAdd (gv.waitset, gv.disc_conn_uc); (void) do_packet (self, recv_thread_arg->u.single.conn, NULL, rbpool);
num_fixed = 1;
}
else
{
os_sockWaitsetAdd (gv.waitset, gv.disc_conn_uc);
os_sockWaitsetAdd (gv.waitset, gv.data_conn_uc);
num_fixed = 1 + (gv.disc_conn_uc != gv.data_conn_uc);
if (config.allowMulticast)
{
os_sockWaitsetAdd (gv.waitset, gv.disc_conn_mc);
os_sockWaitsetAdd (gv.waitset, gv.data_conn_mc);
num_fixed += 2;
}
} }
} }
else
while (gv.rtps_keepgoing)
{ {
int rebuildws; struct local_participant_set lps;
unsigned num_fixed = 0, num_fixed_uc = 0;
LOG_THREAD_CPUTIME (next_thread_cputime); os_sockWaitsetCtx ctx;
unsigned i;
rebuildws = check_and_handle_deafness(&lds); struct local_deaf_state lds;
lds.state = gv.deaf ? LDSR_DEAF : LDSR_NORMAL;
if (config.many_sockets_mode != MSM_MANY_UNICAST) lds.tnext = now_mt();
local_participant_set_init (&lps);
if (gv.m_factory->m_connless)
{ {
/* no other sockets to check */ int rc;
} if ((rc = recv_thread_waitset_add_conn (waitset, gv.disc_conn_uc)) < 0)
else if (os_atomic_ld32 (&gv.participant_set_generation) != lps.gen) NN_FATAL ("recv_thread: failed to add disc_conn_uc to waitset\n");
{ num_fixed_uc += (unsigned)rc;
rebuildws = 1; if ((rc = recv_thread_waitset_add_conn (waitset, gv.data_conn_uc)) < 0)
NN_FATAL ("recv_thread: failed to add data_conn_uc to waitset\n");
num_fixed_uc += (unsigned)rc;
num_fixed += num_fixed_uc;
if ((rc = recv_thread_waitset_add_conn (waitset, gv.disc_conn_mc)) < 0)
NN_FATAL ("recv_thread: failed to add disc_conn_mc to waitset\n");
num_fixed += (unsigned)rc;
if ((rc = recv_thread_waitset_add_conn (waitset, gv.data_conn_mc)) < 0)
NN_FATAL ("recv_thread: failed to add data_conn_mc to waitset\n");
num_fixed += (unsigned)rc;
} }
if (rebuildws && config.many_sockets_mode == MSM_MANY_UNICAST) while (gv.rtps_keepgoing)
{ {
/* first rebuild local participant set - unless someone's toggling "deafness", this int rebuildws;
LOG_THREAD_CPUTIME (next_thread_cputime);
rebuildws = check_and_handle_deafness(&lds, num_fixed_uc);
if (config.many_sockets_mode != MSM_MANY_UNICAST)
{
/* no other sockets to check */
}
else if (os_atomic_ld32 (&gv.participant_set_generation) != lps.gen)
{
rebuildws = 1;
}
if (rebuildws && waitset && config.many_sockets_mode == MSM_MANY_UNICAST)
{
/* first rebuild local participant set - unless someone's toggling "deafness", this
only happens when the participant set has changed, so might as well rebuild it */ only happens when the participant set has changed, so might as well rebuild it */
rebuild_local_participant_set (self, &lps); rebuild_local_participant_set (self, &lps);
os_sockWaitsetPurge (gv.waitset, num_fixed); os_sockWaitsetPurge (waitset, num_fixed);
for (i = 0; i < lps.nps; i++) for (i = 0; i < lps.nps; i++)
{
if (lps.ps[i].m_conn)
{ {
os_sockWaitsetAdd (gv.waitset, lps.ps[i].m_conn); if (lps.ps[i].m_conn)
} os_sockWaitsetAdd (waitset, lps.ps[i].m_conn);
} }
} }
ctx = os_sockWaitsetWait (gv.waitset); if ((ctx = os_sockWaitsetWait (waitset)) != NULL)
if (ctx) {
{ int idx;
int idx; ddsi_tran_conn_t conn;
ddsi_tran_conn_t conn; while ((idx = os_sockWaitsetNextEvent (ctx, &conn)) >= 0)
{
while ((idx = os_sockWaitsetNextEvent (ctx, &conn)) >= 0) const nn_guid_prefix_t *guid_prefix;
{ if (((unsigned)idx < num_fixed) || config.many_sockets_mode != MSM_MANY_UNICAST)
bool ret; guid_prefix = NULL;
if (((unsigned)idx < num_fixed) || config.many_sockets_mode != MSM_MANY_UNICAST) else
{ guid_prefix = &lps.ps[(unsigned)idx - num_fixed].guid_prefix;
ret = do_packet (self, conn, NULL, rbpool); /* Process message and clean out connection if failed or closed */
} if (!do_packet (self, conn, guid_prefix, rbpool) && !conn->m_connless)
else ddsi_conn_free (conn);
{
ret = do_packet (self, conn, &lps.ps[(unsigned)idx - num_fixed].guid_prefix, rbpool);
}
/* Clean out connection if failed or closed */
if (! ret && ! conn->m_connless)
{
os_sockWaitsetRemove (gv.waitset, conn);
ddsi_conn_free (conn);
} }
} }
} }
local_participant_set_fini (&lps);
} }
local_participant_set_fini (&lps);
return 0; return 0;
} }

View file

@ -21,6 +21,7 @@
#include "ddsi/q_unused.h" #include "ddsi/q_unused.h"
#include "ddsi/q_error.h" #include "ddsi/q_error.h"
#include "ddsi/q_globals.h" /* for mattr, cattr */ #include "ddsi/q_globals.h" /* for mattr, cattr */
#include "ddsi/q_receive.h"
#include "ddsi/sysdeps.h" /* for getrusage() */ #include "ddsi/sysdeps.h" /* for getrusage() */
@ -172,9 +173,7 @@ static uint32_t lease_renewal_thread (struct nn_servicelease *sl)
every now and then to try recreating sockets & rejoining multicast every now and then to try recreating sockets & rejoining multicast
groups */ groups */
if (gv.deaf) if (gv.deaf)
{ trigger_recv_threads ();
os_sockWaitsetTrigger(gv.waitset);
}
} }
os_mutexUnlock (&sl->lock); os_mutexUnlock (&sl->lock);
return 0; return 0;