various ddsperf fixes

* As a simple matter of code hygiene, in particular to aid in checking for
  leaks, ddsperf should free all memory it allocates on exit.

* Remove spurious mutex unlock in ddsperf

* Removing a participant means removing one or two entries from the "pong
  writers" array ("pong wr"), and there it read 1 element beyond the end
  of the array while moving the remaining elements forward.

* Constant-rate pinging was broken because of two reasons, one worse than
  the other:

  * setting the rate had a mismatch in variables (publication rate and
    command-line argument) resulting in a completely wrong ping interval;
    the code now has a bit more clear variable naming ...

  * the timing of the pings was relative to the current time, but the
    wakeup a little delayed, resulting in a lower rate than requested.
    It now simply adds the ping interval to the scheduled ping time, rather
    than the time at which the ping is being sent.  To guard against really
    late wakeups, rates that are too high, suspending the machine, &c. it
    will in extremis delay the next ping.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-05-07 14:09:26 +08:00 committed by eboasson
parent e3428ad1d8
commit c2cf340a1b

View file

@ -123,7 +123,7 @@ static int32_t histdepth = 0;
/* Publishing rate in Hz, HUGE_VAL means as fast as possible,
0 means no throughput data is published at all */
static double rate;
static double pub_rate;
/* Fraction of throughput data samples that double as a ping
message */
@ -138,6 +138,10 @@ static dds_ignorelocal_kind_t ignorelocal = DDS_IGNORELOCAL_PARTICIPANT;
possible, DDS_INFINITY means never */
static dds_duration_t ping_intv;
/* Number of times a new ping was sent before all expected
pongs had been received */
static uint32_t ping_timeouts = 0;
static ddsrt_mutex_t disc_lock;
/* Publisher statistics and lock protecting it */
@ -582,12 +586,12 @@ static uint32_t pubthread (void *varg)
data.seq_keyval.keyval = (data.seq_keyval.keyval + 1) % (int32_t) nkeyvals;
data.seq++;
if (rate < HUGE_VAL)
if (pub_rate < HUGE_VAL)
{
if (++bi == burstsize)
{
/* FIXME: should average rate over a short-ish period, rather than over the entire run */
while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > rate && !termflag)
while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > pub_rate && !termflag)
{
/* FIXME: flushing manually because batching is not yet implemented properly */
dds_write_flush (wr_data);
@ -742,7 +746,6 @@ static dds_entity_t get_pong_writer_locked (dds_instance_handle_t pubhandle)
if (pongwr[i].pubhandle == 0)
{
pongwr[i].pubhandle = pubhandle;
ddsrt_mutex_unlock (&pongwr_lock);
return wr_pong;
}
else
@ -752,7 +755,6 @@ static dds_entity_t get_pong_writer_locked (dds_instance_handle_t pubhandle)
pongwr[npongwr].pphandle = pphandle;
pongwr[npongwr].wr_pong = wr_pong;
npongwr++;
ddsrt_mutex_unlock (&pongwr_lock);
return wr_pong;
}
}
@ -885,19 +887,32 @@ static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping)
}
else
{
if (tnow > twarn_ping_timeout)
if (n_pong_seen < n_pong_expected)
{
printf ("[%"PRIdPID"] ping timed out ... sending new ping\n", ddsrt_getpid ());
fflush (stdout);
ping_timeouts++;
if (tnow > twarn_ping_timeout)
{
printf ("[%"PRIdPID"] ping timed out (total %"PRIu32" times) ... sending new ping\n", ddsrt_getpid (), ping_timeouts);
twarn_ping_timeout = tnow + DDS_SECS (1);
fflush (stdout);
}
}
n_pong_seen = 0;
cur_ping_time = tnow;
if (ping_intv > 0)
if (ping_intv == 0)
{
*tnextping = tnow + DDS_SECS (1);
cur_ping_time = tnow;
}
else
{
/* tnow should be ~ cur_ping_time + ping_intv, but it won't be if the
wakeup was delayed significantly, the machine was suspended in the
meantime, so slow down if we can't keep up */
cur_ping_time += ping_intv;
if (cur_ping_time < tnow - ping_intv / 2)
cur_ping_time = tnow;
*tnextping = cur_ping_time + ping_intv;
else if (ping_intv == 0)
*tnextping = cur_ping_time + DDS_SECS (1);
if (ping_intv > 0 && *tnextping > twarn_ping_timeout)
twarn_ping_timeout = *tnextping + ping_intv / 2;
}
cur_ping_seq++;
baggage = init_sample (&data, cur_ping_seq);
ddsrt_mutex_unlock (&pongwr_lock);
@ -1053,7 +1068,7 @@ static void delete_pong_writer (dds_instance_handle_t pphandle)
else
{
assert (wr_pong == 0 || wr_pong == pongwr[i].wr_pong);
memmove (&pongwr[i], &pongwr[i+1], (npongwr - i) * sizeof (pongwr[0]));
memmove (&pongwr[i], &pongwr[i+1], (npongwr - i - 1) * sizeof (pongwr[0]));
npongwr--;
}
}
@ -1062,6 +1077,13 @@ static void delete_pong_writer (dds_instance_handle_t pphandle)
dds_delete (wr_pong);
}
static void free_ppant (void *vpp)
{
struct ppant *pp = vpp;
free (pp->hostname);
free (pp);
}
static void participant_data_listener (dds_entity_t rd, void *arg)
{
dds_sample_info_t info;
@ -1091,7 +1113,7 @@ static void participant_data_listener (dds_entity_t rd, void *arg)
ddsrt_avl_delete_dpath (&ppants_td, &ppants, pp, &dpath);
if (pp->tdeadline != DDS_NEVER)
ddsrt_fibheap_delete (&ppants_to_match_fhd, &ppants_to_match, pp);
free (pp);
free_ppant (pp);
}
ddsrt_mutex_unlock (&disc_lock);
}
@ -1288,7 +1310,7 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev)
const double ts = (double) (tnow - tstart) / 1e9;
snprintf (prefix, sizeof (prefix), "[%"PRIdPID"] %.3f ", ddsrt_getpid (), ts);
if (rate > 0)
if (pub_rate > 0)
{
ddsrt_mutex_lock (&pubstat_lock);
hist_print (prefix, pubstat_hist, tnow - tprev, 1);
@ -1519,15 +1541,15 @@ static void set_mode_ping (int *xoptind, int xargc, char * const xargv[])
while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1)
{
int pos;
double r;
double ping_rate;
if (strcmp (xargv[*xoptind], "inf") == 0)
{
ping_intv = 0;
}
else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0))
else if (sscanf (xargv[*xoptind], "%lf%n", &ping_rate, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0))
{
if (r == 0) ping_intv = DDS_INFINITY;
else if (r > 0) ping_intv = (dds_duration_t) (1e9 / rate + 0.5);
if (ping_rate == 0) ping_intv = DDS_INFINITY;
else if (ping_rate > 0) ping_intv = (dds_duration_t) (1e9 / ping_rate + 0.5);
else error3 ("%s: invalid ping rate\n", xargv[*xoptind]);
}
else
@ -1566,7 +1588,7 @@ static void set_mode_sub (int *xoptind, int xargc, char * const xargv[])
static void set_mode_pub (int *xoptind, int xargc, char * const xargv[])
{
rate = HUGE_VAL;
pub_rate = HUGE_VAL;
burstsize = 1;
ping_frac = 0;
while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1)
@ -1575,12 +1597,12 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[])
double r;
if (strcmp (xargv[*xoptind], "inf") == 0 || strcmp (xargv[*xoptind], "infHz") == 0)
{
rate = HUGE_VAL;
pub_rate = HUGE_VAL;
}
else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0))
{
if (r < 0) error3 ("%s: invalid publish rate\n", xargv[*xoptind]);
rate = r;
pub_rate = r;
}
else if (strcmp (xargv[*xoptind], "burst") == 0)
{
@ -1597,7 +1619,7 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[])
if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]);
ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5);
}
else if (strcmp (xargv[*xoptind], "ping") == 0 && *xoptind + 1 < xargc && sscanf (xargv[*xoptind + 1], "%lf%%%n", &rate, &pos) == 1 && xargv[*xoptind + 1][pos] == 0)
else if (strcmp (xargv[*xoptind], "ping") == 0 && *xoptind + 1 < xargc && sscanf (xargv[*xoptind + 1], "%lf%%%n", &pub_rate, &pos) == 1 && xargv[*xoptind + 1][pos] == 0)
{
++(*xoptind);
if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]);
@ -1614,7 +1636,7 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[])
static void set_mode (int xoptind, int xargc, char * const xargv[])
{
int code;
rate = 0.0;
pub_rate = 0.0;
submode = SM_NONE;
pingpongmode = SM_LISTENER;
ping_intv = (xoptind == xargc) ? DDS_SECS (1) : DDS_INFINITY;
@ -1872,7 +1894,7 @@ int main (int argc, char *argv[])
memset (&subtid, 0, sizeof (subtid));
memset (&subpingtid, 0, sizeof (subpingtid));
memset (&subpongtid, 0, sizeof (subpongtid));
if (rate > 0)
if (pub_rate > 0)
ddsrt_thread_create (&pubtid, "pub", &attr, pubthread, NULL);
if (subthread_func != 0)
ddsrt_thread_create (&subtid, "sub", &attr, subthread_func, &subarg_data);
@ -2007,7 +2029,7 @@ int main (int argc, char *argv[])
}
#endif
if (rate > 0)
if (pub_rate > 0)
ddsrt_thread_join (pubtid, NULL);
if (subthread_func != 0)
ddsrt_thread_join (subtid, NULL);
@ -2054,6 +2076,10 @@ int main (int argc, char *argv[])
ddsrt_mutex_destroy (&pongstat_lock);
ddsrt_mutex_destroy (&pubstat_lock);
hist_free (pubstat_hist);
free (pongwr);
for (uint32_t i = 0; i < npongstat; i++)
free (pongstat[i].raw);
free (pongstat);
bool ok = true;
@ -2068,6 +2094,9 @@ int main (int argc, char *argv[])
ok = false;
}
}
ddsrt_avl_free (&ppants_td, &ppants, free_ppant);
if (matchcount < minmatch)
{
printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32" instead of %"PRIu32")\n", ddsrt_getpid (), matchcount, minmatch);