Fix ddsperf race conditions
Tracking pings and expected number of pongs was done without holding the correct locks. Terminate flag was also not a ddsrt_atomic... and hence flagged by thread sanitizer as a race condition. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
c6c5a872eb
commit
6ad99463ce
1 changed files with 18 additions and 11 deletions
|
@ -32,6 +32,7 @@
|
||||||
#include "dds/ddsrt/random.h"
|
#include "dds/ddsrt/random.h"
|
||||||
#include "dds/ddsrt/avl.h"
|
#include "dds/ddsrt/avl.h"
|
||||||
#include "dds/ddsrt/fibheap.h"
|
#include "dds/ddsrt/fibheap.h"
|
||||||
|
#include "dds/ddsrt/atomics.h"
|
||||||
|
|
||||||
#if !defined(_WIN32) && !defined(LWIP_SOCKET)
|
#if !defined(_WIN32) && !defined(LWIP_SOCKET)
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
|
@ -57,7 +58,7 @@ enum submode {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const char *argv0;
|
static const char *argv0;
|
||||||
static volatile sig_atomic_t termflag = 0;
|
static ddsrt_atomic_uint32_t termflag = DDSRT_ATOMIC_UINT32_INIT (0);
|
||||||
|
|
||||||
/* Domain participant, guard condition for termination, domain id */
|
/* Domain participant, guard condition for termination, domain id */
|
||||||
static dds_entity_t dp;
|
static dds_entity_t dp;
|
||||||
|
@ -555,7 +556,7 @@ static uint32_t pubthread (void *varg)
|
||||||
tfirst0 = tfirst = dds_time();
|
tfirst0 = tfirst = dds_time();
|
||||||
|
|
||||||
unsigned bi = 0;
|
unsigned bi = 0;
|
||||||
while (!termflag)
|
while (!ddsrt_atomic_ld32 (&termflag))
|
||||||
{
|
{
|
||||||
/* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */
|
/* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */
|
||||||
bool reqresp = (ping_frac == 0) ? 0 : (ping_frac == UINT32_MAX) ? 1 : (ddsrt_random () <= ping_frac);
|
bool reqresp = (ping_frac == 0) ? 0 : (ping_frac == UINT32_MAX) ? 1 : (ddsrt_random () <= ping_frac);
|
||||||
|
@ -591,7 +592,7 @@ static uint32_t pubthread (void *varg)
|
||||||
if (++bi == burstsize)
|
if (++bi == burstsize)
|
||||||
{
|
{
|
||||||
/* FIXME: should average rate over a short-ish period, rather than over the entire run */
|
/* FIXME: should average rate over a short-ish period, rather than over the entire run */
|
||||||
while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > pub_rate && !termflag)
|
while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > pub_rate && !ddsrt_atomic_ld32 (&termflag))
|
||||||
{
|
{
|
||||||
/* FIXME: flushing manually because batching is not yet implemented properly */
|
/* FIXME: flushing manually because batching is not yet implemented properly */
|
||||||
dds_write_flush (wr_data);
|
dds_write_flush (wr_data);
|
||||||
|
@ -695,9 +696,15 @@ static bool update_roundtrip (dds_instance_handle_t pubhandle, uint64_t tdelta,
|
||||||
bool allseen;
|
bool allseen;
|
||||||
ddsrt_mutex_lock (&pongstat_lock);
|
ddsrt_mutex_lock (&pongstat_lock);
|
||||||
if (isping && seq == cur_ping_seq)
|
if (isping && seq == cur_ping_seq)
|
||||||
|
{
|
||||||
|
ddsrt_mutex_lock (&pongwr_lock);
|
||||||
allseen = (++n_pong_seen == n_pong_expected);
|
allseen = (++n_pong_seen == n_pong_expected);
|
||||||
|
ddsrt_mutex_unlock (&pongwr_lock);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
allseen = false;
|
allseen = false;
|
||||||
|
}
|
||||||
for (uint32_t i = 0; i < npongstat; i++)
|
for (uint32_t i = 0; i < npongstat; i++)
|
||||||
if (pongstat[i].pubhandle == pubhandle)
|
if (pongstat[i].pubhandle == pubhandle)
|
||||||
{
|
{
|
||||||
|
@ -936,7 +943,7 @@ static uint32_t subthread_waitset (void *varg)
|
||||||
error2 ("dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
|
error2 ("dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
|
||||||
if ((rc = dds_waitset_attach (ws, rd_data, 1)) < 0)
|
if ((rc = dds_waitset_attach (ws, rd_data, 1)) < 0)
|
||||||
error2 ("dds_waitset_attach (ws, rd_data, 1): %d\n", (int) rc);
|
error2 ("dds_waitset_attach (ws, rd_data, 1): %d\n", (int) rc);
|
||||||
while (!termflag)
|
while (!ddsrt_atomic_ld32 (&termflag))
|
||||||
{
|
{
|
||||||
if (!process_data (rd_data, arg))
|
if (!process_data (rd_data, arg))
|
||||||
{
|
{
|
||||||
|
@ -962,7 +969,7 @@ static uint32_t subpingthread_waitset (void *varg)
|
||||||
error2 ("dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
|
error2 ("dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
|
||||||
if ((rc = dds_waitset_attach (ws, rd_ping, 1)) < 0)
|
if ((rc = dds_waitset_attach (ws, rd_ping, 1)) < 0)
|
||||||
error2 ("dds_waitset_attach (ws, rd_ping, 1): %d\n", (int) rc);
|
error2 ("dds_waitset_attach (ws, rd_ping, 1): %d\n", (int) rc);
|
||||||
while (!termflag)
|
while (!ddsrt_atomic_ld32 (&termflag))
|
||||||
{
|
{
|
||||||
int32_t nxs;
|
int32_t nxs;
|
||||||
if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0)
|
if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0)
|
||||||
|
@ -984,7 +991,7 @@ static uint32_t subpongthread_waitset (void *varg)
|
||||||
error2 ("dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
|
error2 ("dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc);
|
||||||
if ((rc = dds_waitset_attach (ws, rd_pong, 1)) < 0)
|
if ((rc = dds_waitset_attach (ws, rd_pong, 1)) < 0)
|
||||||
error2 ("dds_waitset_attach (ws, rd_pong, 1): %d\n", (int) rc);
|
error2 ("dds_waitset_attach (ws, rd_pong, 1): %d\n", (int) rc);
|
||||||
while (!termflag)
|
while (!ddsrt_atomic_ld32 (&termflag))
|
||||||
{
|
{
|
||||||
int32_t nxs;
|
int32_t nxs;
|
||||||
if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0)
|
if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0)
|
||||||
|
@ -997,7 +1004,7 @@ static uint32_t subpongthread_waitset (void *varg)
|
||||||
static uint32_t subthread_polling (void *varg)
|
static uint32_t subthread_polling (void *varg)
|
||||||
{
|
{
|
||||||
struct subthread_arg * const arg = varg;
|
struct subthread_arg * const arg = varg;
|
||||||
while (!termflag)
|
while (!ddsrt_atomic_ld32 (&termflag))
|
||||||
{
|
{
|
||||||
if (!process_data (rd_data, arg))
|
if (!process_data (rd_data, arg))
|
||||||
dds_sleepfor (DDS_MSECS (1));
|
dds_sleepfor (DDS_MSECS (1));
|
||||||
|
@ -1174,14 +1181,14 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
|
||||||
|
|
||||||
if (n_pong_expected_delta)
|
if (n_pong_expected_delta)
|
||||||
{
|
{
|
||||||
ddsrt_mutex_lock (&pongstat_lock);
|
ddsrt_mutex_lock (&pongwr_lock);
|
||||||
n_pong_expected += n_pong_expected_delta;
|
n_pong_expected += n_pong_expected_delta;
|
||||||
/* potential initial packet loss & lazy writer creation conspire against receiving
|
/* potential initial packet loss & lazy writer creation conspire against receiving
|
||||||
the expected number of responses, so allow for a few attempts before starting to
|
the expected number of responses, so allow for a few attempts before starting to
|
||||||
warn about timeouts */
|
warn about timeouts */
|
||||||
twarn_ping_timeout = dds_time () + DDS_MSECS (3333);
|
twarn_ping_timeout = dds_time () + DDS_MSECS (3333);
|
||||||
//printf ("[%"PRIdPID"] n_pong_expected = %u\n", ddsrt_getpid (), n_pong_expected);
|
//printf ("[%"PRIdPID"] n_pong_expected = %u\n", ddsrt_getpid (), n_pong_expected);
|
||||||
ddsrt_mutex_unlock (&pongstat_lock);
|
ddsrt_mutex_unlock (&pongwr_lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1410,7 +1417,7 @@ static void subthread_arg_fini (struct subthread_arg *arg)
|
||||||
static void signal_handler (int sig)
|
static void signal_handler (int sig)
|
||||||
{
|
{
|
||||||
(void) sig;
|
(void) sig;
|
||||||
termflag = 1;
|
ddsrt_atomic_st32 (&termflag, 1);
|
||||||
dds_set_guardcondition (termcond, true);
|
dds_set_guardcondition (termcond, true);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -1928,7 +1935,7 @@ int main (int argc, char *argv[])
|
||||||
dds_time_t tnext = tstart + DDS_SECS (1);
|
dds_time_t tnext = tstart + DDS_SECS (1);
|
||||||
dds_time_t tlast = tstart;
|
dds_time_t tlast = tstart;
|
||||||
dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv;
|
dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv;
|
||||||
while (!termflag && tnow < tstop)
|
while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop)
|
||||||
{
|
{
|
||||||
dds_time_t twakeup = DDS_NEVER;
|
dds_time_t twakeup = DDS_NEVER;
|
||||||
int32_t nxs;
|
int32_t nxs;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue