From 6ad99463ce87e6d9cbf92c5ae789e67379da6980 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 7 Jun 2019 17:52:14 +0200 Subject: [PATCH] Fix ddsperf race conditions Tracking pings and expected number of pongs was done without holding the correct locks. Terminate flag was also not a ddsrt_atomic... and hence flagged by thread sanitizer as a race condition. Signed-off-by: Erik Boasson --- src/tools/ddsperf/ddsperf.c | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index eaaefc9..8d51e5b 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -32,6 +32,7 @@ #include "dds/ddsrt/random.h" #include "dds/ddsrt/avl.h" #include "dds/ddsrt/fibheap.h" +#include "dds/ddsrt/atomics.h" #if !defined(_WIN32) && !defined(LWIP_SOCKET) #include @@ -57,7 +58,7 @@ enum submode { }; static const char *argv0; -static volatile sig_atomic_t termflag = 0; +static ddsrt_atomic_uint32_t termflag = DDSRT_ATOMIC_UINT32_INIT (0); /* Domain participant, guard condition for termination, domain id */ static dds_entity_t dp; @@ -555,7 +556,7 @@ static uint32_t pubthread (void *varg) tfirst0 = tfirst = dds_time(); unsigned bi = 0; - while (!termflag) + while (!ddsrt_atomic_ld32 (&termflag)) { /* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */ bool reqresp = (ping_frac == 0) ? 0 : (ping_frac == UINT32_MAX) ? 1 : (ddsrt_random () <= ping_frac); @@ -591,7 +592,7 @@ static uint32_t pubthread (void *varg) if (++bi == burstsize) { /* FIXME: should average rate over a short-ish period, rather than over the entire run */ - while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > pub_rate && !termflag) + while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > pub_rate && !ddsrt_atomic_ld32 (&termflag)) { /* FIXME: flushing manually because batching is not yet implemented properly */ dds_write_flush (wr_data); @@ -695,9 +696,15 @@ static bool update_roundtrip (dds_instance_handle_t pubhandle, uint64_t tdelta, bool allseen; ddsrt_mutex_lock (&pongstat_lock); if (isping && seq == cur_ping_seq) + { + ddsrt_mutex_lock (&pongwr_lock); allseen = (++n_pong_seen == n_pong_expected); + ddsrt_mutex_unlock (&pongwr_lock); + } else + { allseen = false; + } for (uint32_t i = 0; i < npongstat; i++) if (pongstat[i].pubhandle == pubhandle) { @@ -936,7 +943,7 @@ static uint32_t subthread_waitset (void *varg) error2 ("dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); if ((rc = dds_waitset_attach (ws, rd_data, 1)) < 0) error2 ("dds_waitset_attach (ws, rd_data, 1): %d\n", (int) rc); - while (!termflag) + while (!ddsrt_atomic_ld32 (&termflag)) { if (!process_data (rd_data, arg)) { @@ -962,7 +969,7 @@ static uint32_t subpingthread_waitset (void *varg) error2 ("dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); if ((rc = dds_waitset_attach (ws, rd_ping, 1)) < 0) error2 ("dds_waitset_attach (ws, rd_ping, 1): %d\n", (int) rc); - while (!termflag) + while (!ddsrt_atomic_ld32 (&termflag)) { int32_t nxs; if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) @@ -984,7 +991,7 @@ static uint32_t subpongthread_waitset (void *varg) error2 ("dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); if ((rc = dds_waitset_attach (ws, rd_pong, 1)) < 0) error2 ("dds_waitset_attach (ws, rd_pong, 1): %d\n", (int) rc); - while (!termflag) + while (!ddsrt_atomic_ld32 (&termflag)) { int32_t nxs; if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) @@ -997,7 +1004,7 @@ static uint32_t subpongthread_waitset (void *varg) static uint32_t subthread_polling (void *varg) { struct subthread_arg * const arg = varg; - while (!termflag) + while (!ddsrt_atomic_ld32 (&termflag)) { if (!process_data (rd_data, arg)) dds_sleepfor (DDS_MSECS (1)); @@ -1174,14 +1181,14 @@ static void participant_data_listener (dds_entity_t rd, void *arg) if (n_pong_expected_delta) { - ddsrt_mutex_lock (&pongstat_lock); + ddsrt_mutex_lock (&pongwr_lock); n_pong_expected += n_pong_expected_delta; /* potential initial packet loss & lazy writer creation conspire against receiving the expected number of responses, so allow for a few attempts before starting to warn about timeouts */ twarn_ping_timeout = dds_time () + DDS_MSECS (3333); //printf ("[%"PRIdPID"] n_pong_expected = %u\n", ddsrt_getpid (), n_pong_expected); - ddsrt_mutex_unlock (&pongstat_lock); + ddsrt_mutex_unlock (&pongwr_lock); } } @@ -1410,7 +1417,7 @@ static void subthread_arg_fini (struct subthread_arg *arg) static void signal_handler (int sig) { (void) sig; - termflag = 1; + ddsrt_atomic_st32 (&termflag, 1); dds_set_guardcondition (termcond, true); } #endif @@ -1928,7 +1935,7 @@ int main (int argc, char *argv[]) dds_time_t tnext = tstart + DDS_SECS (1); dds_time_t tlast = tstart; dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv; - while (!termflag && tnow < tstop) + while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop) { dds_time_t twakeup = DDS_NEVER; int32_t nxs;