Trivial changes for thread sanitizer

Thread sanitizer warns about reads and writes of variables that are
meant to be read without holding a lock:

* Global "keep_going" is now a ddsrt_atomic_uint32_t

* Thread "vtime" is now a ddsrt_atomic_uint32_t

Previously the code relied on the assumption that a 32-bit int would be
treated as atomic, now that is all wrapped in ddsrt_atomic_{ld,st}32.
These being inline functions doing exactly the same thing, there is no
functional change, but it does allow annotating the loads and stores for
via function attributes on the ddsrt_atomic_{ld,st}X.

The concurrent hashtable implementation is replaced by a locked version
of the non-concurrent implementation if thread sanitizer is used.  This
changes eliminates the scores of problems signalled by thread sanitizer
in the GUID-to-entity translation and the key-to-instance id lookups.

Other than that, this replaces a flag used in a waitset test case to be
a ddsrt_atomic_uint32_t.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-06-07 17:49:39 +02:00 committed by eboasson
parent 0356af470d
commit c6c5a872eb
17 changed files with 404 additions and 267 deletions

View file

@ -221,7 +221,7 @@ struct q_globals {
struct thread_state1 *listen_ts;
/* Flag cleared when stopping (receive threads). FIXME. */
int rtps_keepgoing;
ddsrt_atomic_uint32_t rtps_keepgoing;
/* Start time of the DDSI2 service, for logging relative time stamps,
should I ever so desire. */

View file

@ -57,13 +57,14 @@ enum thread_state {
struct logbuf;
/*
* watchdog indicates progress for the service lease liveliness mechsanism, while vtime
* indicates progress for the Garbage collection purposes.
* vtime even : thread awake
* vtime odd : thread asleep
* vtime indicates progress for the garbage collector and the liveliness monitoring.
*
* vtime is updated without using atomic operations: only the owning thread updates
* them, and the garbage collection mechanism and the liveliness monitoring only
* observe the value
*/
#define THREAD_BASE \
volatile vtime_t vtime; \
ddsrt_atomic_uint32_t vtime; \
ddsrt_thread_t tid; \
ddsrt_thread_t extTid; \
enum thread_state state; \
@ -131,17 +132,21 @@ DDS_EXPORT inline bool vtime_gt (vtime_t vtime1, vtime_t vtime0)
DDS_EXPORT inline bool thread_is_awake (void)
{
return vtime_awake_p (lookup_thread_state ()->vtime);
struct thread_state1 *ts = lookup_thread_state ();
vtime_t vt = ddsrt_atomic_ld32 (&ts->vtime);
return vtime_awake_p (vt);
}
DDS_EXPORT inline bool thread_is_asleep (void)
{
return vtime_asleep_p (lookup_thread_state ()->vtime);
struct thread_state1 *ts = lookup_thread_state ();
vtime_t vt = ddsrt_atomic_ld32 (&ts->vtime);
return vtime_asleep_p (vt);
}
DDS_EXPORT inline void thread_state_asleep (struct thread_state1 *ts1)
{
vtime_t vt = ts1->vtime;
vtime_t vt = ddsrt_atomic_ld32 (&ts1->vtime);
assert (vtime_awake_p (vt));
/* nested calls a rare and an extra fence doesn't break things */
ddsrt_atomic_fence_rel ();
@ -149,24 +154,24 @@ DDS_EXPORT inline void thread_state_asleep (struct thread_state1 *ts1)
vt += (1u << VTIME_TIME_SHIFT) - 1u;
else
vt -= 1u;
ts1->vtime = vt;
ddsrt_atomic_st32 (&ts1->vtime, vt);
}
DDS_EXPORT inline void thread_state_awake (struct thread_state1 *ts1)
{
vtime_t vt = ts1->vtime;
vtime_t vt = ddsrt_atomic_ld32 (&ts1->vtime);
assert ((vt & VTIME_NEST_MASK) < VTIME_NEST_MASK);
ts1->vtime = vt + 1u;
ddsrt_atomic_st32 (&ts1->vtime, vt + 1u);
/* nested calls a rare and an extra fence doesn't break things */
ddsrt_atomic_fence_acq ();
}
DDS_EXPORT inline void thread_state_awake_to_awake_no_nest (struct thread_state1 *ts1)
{
vtime_t vt = ts1->vtime;
vtime_t vt = ddsrt_atomic_ld32 (&ts1->vtime);
assert ((vt & VTIME_NEST_MASK) == 1);
ddsrt_atomic_fence_rel ();
ts1->vtime = vt + (1u << VTIME_TIME_SHIFT);
ddsrt_atomic_st32 (&ts1->vtime, vt + (1u << VTIME_TIME_SHIFT));
ddsrt_atomic_fence_acq ();
}

View file

@ -706,6 +706,7 @@ static ddsrt_socket_t ddsi_tcp_conn_handle (ddsi_tran_base_t base)
return ((ddsi_tcp_conn_t) base)->m_sock;
}
ddsrt_attribute_no_sanitize (("thread"))
static bool ddsi_tcp_supports (int32_t kind)
{
return kind == ddsi_tcp_factory_g.m_kind;
@ -770,7 +771,7 @@ static ddsi_tran_conn_t ddsi_tcp_accept (ddsi_tran_listener_t listener)
{
rc = ddsrt_accept(tl->m_sock, NULL, NULL, &sock);
}
if (! gv.rtps_keepgoing)
if (!ddsrt_atomic_ld32(&gv.rtps_keepgoing))
{
ddsi_tcp_sock_free (sock, NULL);
return NULL;

View file

@ -83,7 +83,7 @@ static uint32_t threadmon_thread (struct ddsi_threadmon *sl)
n_unused++;
else
{
vtime_t vt = thread_states.ts[i].vtime;
vtime_t vt = ddsrt_atomic_ld32 (&thread_states.ts[i].vtime);
bool alive = vtime_asleep_p (vt) || vtime_asleep_p (sl->av_ary[i].vt) || vtime_gt (vt, sl->av_ary[i].vt);
n_alive += (unsigned) alive;
DDS_TRACE(" %u(%s):%c:%"PRIx32"->%"PRIx32, i, thread_states.ts[i].name, alive ? 'a' : 'd', sl->av_ary[i].vt, vt);

View file

@ -91,6 +91,7 @@ static ddsi_tran_factory_t ddsi_factory_find_with_len (const char * type, size_t
return factory;
}
ddsrt_attribute_no_sanitize (("thread"))
ddsi_tran_factory_t ddsi_factory_find_supported_kind (int32_t kind)
{
/* FIXME: MUST speed up */
@ -124,7 +125,7 @@ void ddsi_conn_free (ddsi_tran_conn_t conn)
for (uint32_t i = 0; i < gv.n_recv_threads; i++)
{
if (!gv.recv_threads[i].ts)
assert (!gv.rtps_keepgoing);
assert (!ddsrt_atomic_ld32 (&gv.rtps_keepgoing));
else
{
switch (gv.recv_threads[i].arg.mode)

View file

@ -44,7 +44,7 @@ static void threads_vtime_gather_for_wait (unsigned *nivs, struct idx_vtime *ivs
uint32_t i, j;
for (i = j = 0; i < thread_states.nthreads; i++)
{
vtime_t vtime = thread_states.ts[i].vtime;
vtime_t vtime = ddsrt_atomic_ld32 (&thread_states.ts[i].vtime);
if (vtime_awake_p (vtime))
{
ivs[j].idx = i;
@ -63,7 +63,7 @@ static int threads_vtime_check (uint32_t *nivs, struct idx_vtime *ivs)
while (i < *nivs)
{
uint32_t thridx = ivs[i].idx;
vtime_t vtime = thread_states.ts[thridx].vtime;
vtime_t vtime = ddsrt_atomic_ld32 (&thread_states.ts[thridx].vtime);
assert (vtime_awake_p (ivs[i].vtime));
if (!vtime_gt (vtime, ivs[i].vtime))
++i;

View file

@ -679,9 +679,9 @@ static void rtps_term_prep (void)
{
/* Stop all I/O */
ddsrt_mutex_lock (&gv.lock);
if (gv.rtps_keepgoing)
if (ddsrt_atomic_ld32 (&gv.rtps_keepgoing))
{
gv.rtps_keepgoing = 0; /* so threads will stop once they get round to checking */
ddsrt_atomic_st32 (&gv.rtps_keepgoing, 0); /* so threads will stop once they get round to checking */
ddsrt_atomic_fence ();
/* can't wake up throttle_writer, currently, but it'll check every few seconds */
trigger_recv_threads ();
@ -1252,7 +1252,7 @@ int rtps_init (void)
gv.gcreq_queue = gcreq_queue_new ();
gv.rtps_keepgoing = 1;
ddsrt_atomic_st32 (&gv.rtps_keepgoing, 1);
ddsrt_rwlock_init (&gv.qoslock);
if (config.xpack_send_async)

View file

@ -3156,7 +3156,7 @@ uint32_t listen_thread (struct ddsi_tran_listener * listener)
{
ddsi_tran_conn_t conn;
while (gv.rtps_keepgoing)
while (ddsrt_atomic_ld32 (&gv.rtps_keepgoing))
{
/* Accept connection from listener */
@ -3310,7 +3310,7 @@ uint32_t recv_thread (void *vrecv_thread_arg)
nn_rbufpool_setowner (rbpool, ddsrt_thread_self ());
if (waitset == NULL)
{
while (gv.rtps_keepgoing)
while (ddsrt_atomic_ld32 (&gv.rtps_keepgoing))
{
LOG_THREAD_CPUTIME (next_thread_cputime);
(void) do_packet (ts1, recv_thread_arg->u.single.conn, NULL, rbpool);
@ -3343,7 +3343,7 @@ uint32_t recv_thread (void *vrecv_thread_arg)
num_fixed += (unsigned)rc;
}
while (gv.rtps_keepgoing)
while (ddsrt_atomic_ld32 (&gv.rtps_keepgoing))
{
int rebuildws;
LOG_THREAD_CPUTIME (next_thread_cputime);

View file

@ -75,7 +75,7 @@ static void ddsrt_free_aligned (void *ptr)
void thread_states_init_static (void)
{
static struct thread_state1 ts = {
.state = THREAD_STATE_ALIVE, .vtime = 0u, .name = "(anon)"
.state = THREAD_STATE_ALIVE, .vtime = DDSRT_ATOMIC_UINT32_INIT (0), .name = "(anon)"
};
tsd_thread_state = &ts;
}
@ -92,7 +92,7 @@ void thread_states_init (unsigned maxthreads)
for (uint32_t i = 0; i < thread_states.nthreads; i++)
{
thread_states.ts[i].state = THREAD_STATE_ZERO;
thread_states.ts[i].vtime = 0u;
ddsrt_atomic_st32 (&thread_states.ts[i].vtime, 0);
thread_states.ts[i].name = NULL;
}
DDSRT_WARNING_MSVC_ON(6386);
@ -113,6 +113,7 @@ void thread_states_fini (void)
thread_states.ts = NULL;
}
ddsrt_attribute_no_sanitize (("thread"))
static struct thread_state1 *find_thread_state (ddsrt_thread_t tid)
{
if (thread_states.ts) {
@ -132,7 +133,7 @@ static void cleanup_thread_state (void *data)
if (ts)
{
assert(ts->state == THREAD_STATE_LAZILY_CREATED);
assert(vtime_asleep_p(ts->vtime));
assert(vtime_asleep_p(ddsrt_atomic_ld32 (&ts->vtime)));
reset_thread_state(ts);
}
ddsrt_fini();
@ -207,7 +208,7 @@ void upgrade_main_thread (void)
abort ();
ts1 = &thread_states.ts[cand];
if (ts1->state == THREAD_STATE_ZERO)
assert (vtime_asleep_p (ts1->vtime));
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
ts1->state = THREAD_STATE_LAZILY_CREATED;
ts1->tid = ddsrt_thread_self ();
ts1->name = main_thread_name;
@ -233,7 +234,7 @@ static struct thread_state1 *init_thread_state (const char *tname, enum thread_s
return NULL;
ts = &thread_states.ts[cand];
assert (vtime_asleep_p (ts->vtime));
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts->vtime)));
ts->name = ddsrt_strdup (tname);
ts->state = state;
@ -300,7 +301,7 @@ dds_return_t join_thread (struct thread_state1 *ts1)
dds_return_t ret;
assert (ts1->state == THREAD_STATE_ALIVE);
ret = ddsrt_thread_join (ts1->extTid, NULL);
assert (vtime_asleep_p (ts1->vtime));
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
reap_thread_state (ts1, 1);
return ret;
}
@ -317,7 +318,7 @@ void reset_thread_state (struct thread_state1 *ts1)
void downgrade_main_thread (void)
{
struct thread_state1 *ts1 = lookup_thread_state ();
assert (vtime_asleep_p (ts1->vtime));
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
/* no need to sync with service lease: already stopped */
reap_thread_state (ts1, 0);
thread_states_init_static ();

View file

@ -968,7 +968,7 @@ static dds_return_t throttle_writer (struct thread_state1 * const ts1, struct nn
whc_get_state (wr->whc, &whcst);
}
while (gv.rtps_keepgoing && !writer_may_continue (wr, &whcst))
while (ddsrt_atomic_ld32 (&gv.rtps_keepgoing) && !writer_may_continue (wr, &whcst))
{
int64_t reltimeout;
tnow = now_mt ();

View file

@ -597,8 +597,8 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
return;
}
assert (wr->reliable);
ddsrt_mutex_lock (&wr->e.lock);
assert (wr->reliable);
whc_get_state(wr->whc, &whcst);
if (!writer_must_have_hb_scheduled (wr, &whcst))
{