ddsperf enhancements

* per-thread CPU usage (only those threads where the load is over 0.5%,
  if the sum of threads below that threshold exceeds 0.5%, it prints an
  aggregate for those threads);

* also report RSS;

* network load (only on request, as percentage of specified network
  bandwidth and actual bytes in/out, with the output suppressed if it is
  0%);

* publish CPU usage so a ddsperf instance can display CPU loads for
  its peers;

* handle SIGXFSZ (file size exceeded) by displaying one last line of
  statistics before killing itself; this simply a debugging tool to make
  it easier to get a trace covering a high sample-rate start-up issue;

* default topic changed to "KS" because that allows all the options to
  be used, this has a negative impact on performance (both latency and
  small-sample throughput) but it should be less surprising to users;

* specifying a size is now done by appending "size N" (where N is the
  size in bytes) after a "ping" or "pub" command, rather than it having
  to set it via a command-line option;

Note that some of this is platform-dependent -- SIGXFSZ is currently
only on Linux and macOS, and CPU and network load reporting is currently
only on Linux, macOS and Windows.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-08-02 08:58:18 +02:00 committed by eboasson
parent ecb77d481c
commit 952029dba0
7 changed files with 627 additions and 42 deletions

View file

@ -11,7 +11,7 @@
# #
idlc_generate(ddsperf_types ddsperf_types.idl) idlc_generate(ddsperf_types ddsperf_types.idl)
add_executable(ddsperf ddsperf.c) add_executable(ddsperf ddsperf.c cputime.c cputime.h netload.c netload.h)
target_link_libraries(ddsperf ddsperf_types ddsc) target_link_libraries(ddsperf ddsperf_types ddsc)
if(WIN32) if(WIN32)

250
src/tools/ddsperf/cputime.c Normal file
View file

@ -0,0 +1,250 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#define _ISOC99_SOURCE
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include "dds/dds.h"
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/sockets.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/string.h"
#include "dds/ddsrt/rusage.h"
#include "cputime.h"
#include "ddsperf_types.h"
static void print (char *line, size_t sz, size_t *pos, const char *name, double du, double ds)
{
if (*pos < sz)
*pos += (size_t) snprintf (line + *pos, sz - *pos, " %s:%.0f%%+%.0f%%", name, 100.0 * du, 100.0 * ds);
}
bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_host, bool is_fresh)
{
if (!s->some_above)
return false;
else
{
char line[512];
size_t pos = 0;
assert (is_fresh || !print_host);
pos += (size_t) snprintf (line + pos, sizeof (line) - pos, "%s", prefix);
if (!is_fresh)
pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " (stale)");
if (print_host)
{
int n = (int) strlen (s->hostname);
if (n > 100) n = 100;
pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " @%*.*s:%"PRId32, n, n, s->hostname, s->pid);
}
if (s->maxrss > 1048576)
pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " rss:%.1fMB", s->maxrss / 1048576.0);
else if (s->maxrss > 1024)
pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " rss:%.0fkB", s->maxrss / 1024.0);
else {
/* non-sensical value -- presumably maxrss is not available */
}
const size_t init_pos = pos;
for (uint32_t i = 0; i < s->cpu._length; i++)
{
struct CPUStatThread * const thr = &s->cpu._buffer[i];
print (line, sizeof (line), &pos, thr->name, thr->u_pct / 100.0, thr->s_pct / 100.0);
}
if (pos > init_pos)
puts (line);
return true;
}
}
#if DDSRT_HAVE_RUSAGE && DDSRT_HAVE_THREAD_LIST
struct record_cputime_state_thr {
ddsrt_thread_list_id_t tid;
char name[32];
double ut, st;
};
struct record_cputime_state {
bool supported;
dds_time_t tprev;
size_t nthreads;
struct record_cputime_state_thr *threads;
dds_entity_t wr;
struct CPUStats s;
};
static void update (double *ut_old, double *st_old, double dt, double ut_new, double st_new, double *du, double *ds)
{
*du = (ut_new - *ut_old) / dt;
*ds = (st_new - *st_old) / dt;
*ut_old = ut_new;
*st_old = st_new;
}
static bool above_threshold (double *max, double *du_skip, double *ds_skip, double du, double ds)
{
if (*max < du) *max = du;
if (*max < ds) *max = ds;
if (du >= 0.005 || ds >= 0.005)
return true;
else if (du_skip == NULL || ds_skip == NULL)
return false;
else
{
*du_skip += du;
*ds_skip += ds;
return false;
}
}
bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow)
{
if (state == NULL)
return false;
ddsrt_rusage_t usage;
if (ddsrt_getrusage (DDSRT_RUSAGE_SELF, &usage) < 0)
usage.maxrss = 0;
double max = 0;
double du_skip = 0.0, ds_skip = 0.0;
const double dt = (double) (tnow - state->tprev) / 1e9;
bool some_above = false;
state->s.maxrss = (double) usage.maxrss;
state->s.cpu._length = 0;
for (size_t i = 0; i < state->nthreads; i++)
{
struct record_cputime_state_thr * const thr = &state->threads[i];
if (ddsrt_getrusage_anythread (thr->tid, &usage) < 0)
continue;
const double ut = (double) usage.utime / 1e9;
const double st = (double) usage.stime / 1e9;
double du, ds;
update (&thr->ut, &thr->st, dt, ut, st, &du, &ds);
if (above_threshold (&max, &du_skip, &ds_skip, du, ds))
{
some_above = true;
/* Thread names are often set by thread itself immediately after creation,
and so it depends on the scheduling whether there is still a default
name or the name we are interested in. Lazily retrieving the name the
first time the thread pops up in the CPU usage works around the timing
problem. */
if (thr->name[0] == 0)
{
if (ddsrt_thread_getname_anythread (thr->tid, thr->name, sizeof (thr->name)) < 0)
{
du_skip += du;
ds_skip += ds;
continue;
}
}
struct CPUStatThread * const x = &state->s.cpu._buffer[state->s.cpu._length++];
x->name = thr->name;
x->u_pct = (int) (100.0 * du + 0.5);
x->s_pct = (int) (100.0 * ds + 0.5);
}
}
if (above_threshold (&max, NULL, NULL, du_skip, ds_skip))
{
struct CPUStatThread * const x = &state->s.cpu._buffer[state->s.cpu._length++];
some_above = true;
x->name = "others";
x->u_pct = (int) (100.0 * du_skip + 0.5);
x->s_pct = (int) (100.0 * ds_skip + 0.5);
}
state->tprev = tnow;
state->s.some_above = some_above;
dds_write (state->wr, &state->s);
return print_cputime (&state->s, prefix, false, true);
}
struct record_cputime_state *record_cputime_new (dds_entity_t wr)
{
ddsrt_thread_list_id_t tids[100];
dds_return_t n;
if ((n = ddsrt_thread_list (tids, sizeof (tids) / sizeof (tids[0]))) <= 0)
return NULL;
else if (n > (dds_return_t) (sizeof (tids) / sizeof (tids[0])))
{
fprintf (stderr, "way more threads than expected\n");
return NULL;
}
struct record_cputime_state *state = malloc (sizeof (*state));
state->tprev = dds_time ();
state->wr = wr;
state->threads = malloc ((size_t) n * sizeof (*state->threads));
state->nthreads = 0;
for (int32_t i = 0; i < n; i++)
{
struct record_cputime_state_thr * const thr = &state->threads[state->nthreads];
ddsrt_rusage_t usage;
if (ddsrt_getrusage_anythread (tids[i], &usage) < 0)
continue;
thr->tid = tids[i];
thr->name[0] = 0;
thr->ut = (double) usage.utime / 1e9;
thr->st = (double) usage.stime / 1e9;
state->nthreads++;
}
char hostname[128];
if (ddsrt_gethostname (hostname, sizeof (hostname)) != DDS_RETCODE_OK)
strcpy (hostname, "?");
state->s.hostname = ddsrt_strdup (hostname);
state->s.pid = (uint32_t) ddsrt_getpid ();
state->s.cpu._length = 0;
state->s.cpu._maximum = (uint32_t) state->nthreads;
state->s.cpu._buffer = malloc (state->s.cpu._maximum * sizeof (*state->s.cpu._buffer));
state->s.cpu._release = false;
return state;
}
void record_cputime_free (struct record_cputime_state *state)
{
if (state)
{
free (state->threads);
ddsrt_free (state->s.hostname);
/* we alias thread names in state->s->cpu._buffer, so no need to free */
free (state->s.cpu._buffer);
free (state);
}
}
#else
bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow)
{
(void) state;
(void) prefix;
(void) tnow;
}
struct record_cputime_state *record_cputime_new (dds_entity_t wr)
{
(void) wr;
}
void record_cputime_free (struct record_cputime_state *state)
{
(void) state;
}
#endif

View file

@ -0,0 +1,24 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef CPUTIME_H
#define CPUTIME_H
#include "ddsperf_types.h"
struct record_cputime_state;
struct record_cputime_state *record_cputime_new (dds_entity_t wr);
void record_cputime_free (struct record_cputime_state *state);
bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow);
bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_host, bool is_fresh);
#endif

View file

@ -37,6 +37,9 @@
#include "dds/ddsrt/fibheap.h" #include "dds/ddsrt/fibheap.h"
#include "dds/ddsrt/atomics.h" #include "dds/ddsrt/atomics.h"
#include "cputime.h"
#include "netload.h"
#if !defined(_WIN32) && !defined(LWIP_SOCKET) #if !defined(_WIN32) && !defined(LWIP_SOCKET)
#include <errno.h> #include <errno.h>
#endif #endif
@ -74,22 +77,22 @@ static dds_entity_t rd_participants, rd_subscriptions, rd_publications;
/* Topics, readers, writers (except for pong writers: there are /* Topics, readers, writers (except for pong writers: there are
many of those) */ many of those) */
static dds_entity_t tp_data, tp_ping, tp_pong; static dds_entity_t tp_data, tp_ping, tp_pong, tp_stat;
static char tpname_data[32], tpname_ping[32], tpname_pong[32]; static char tpname_data[32], tpname_ping[32], tpname_pong[32];
static dds_entity_t sub, pub, wr_data, wr_ping, rd_data, rd_ping, rd_pong; static dds_entity_t sub, pub, wr_data, wr_ping, wr_stat, rd_data, rd_ping, rd_pong, rd_stat;
/* Number of different key values to use (must be 1 for OU type) */ /* Number of different key values to use (must be 1 for OU type) */
static unsigned nkeyvals = 1; static unsigned nkeyvals = 1;
/* Topic type to use */ /* Topic type to use */
static enum topicsel topicsel = OU; static enum topicsel topicsel = KS;
/* Data and ping/pong subscriber triggering modes */ /* Data and ping/pong subscriber triggering modes */
static enum submode submode = SM_LISTENER; static enum submode submode = SM_LISTENER;
static enum submode pingpongmode = SM_LISTENER; static enum submode pingpongmode = SM_LISTENER;
/* Size of the sequence in KeyedSeq type in bytes */ /* Size of the sequence in KeyedSeq type in bytes */
static unsigned baggagesize = 0; static uint32_t baggagesize = 0;
/* Whether or not to register instances prior to writing */ /* Whether or not to register instances prior to writing */
static bool register_instances = true; static bool register_instances = true;
@ -115,7 +118,7 @@ static uint32_t matchcount = 0;
static uint32_t matchtimeout = 0; static uint32_t matchtimeout = 0;
/* Data is published in bursts of this many samples */ /* Data is published in bursts of this many samples */
static unsigned burstsize = 1; static uint32_t burstsize = 1;
/* Whether to use reliable or best-effort readers/writers */ /* Whether to use reliable or best-effort readers/writers */
static bool reliable = true; static bool reliable = true;
@ -490,7 +493,7 @@ static void hist_print (const char *prefix, struct hist *h, dds_time_t dt, int r
hist_reset (h); hist_reset (h);
} }
static void *make_baggage (dds_sequence_t *b, unsigned cnt) static void *make_baggage (dds_sequence_t *b, uint32_t cnt)
{ {
b->_maximum = b->_length = cnt; b->_maximum = b->_length = cnt;
if (cnt == 0) if (cnt == 0)
@ -558,7 +561,7 @@ static uint32_t pubthread (void *varg)
tfirst0 = tfirst = dds_time(); tfirst0 = tfirst = dds_time();
unsigned bi = 0; uint32_t bi = 0;
while (!ddsrt_atomic_ld32 (&termflag)) while (!ddsrt_atomic_ld32 (&termflag))
{ {
/* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */ /* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */
@ -783,6 +786,19 @@ static dds_entity_t get_pong_writer (dds_instance_handle_t pubhandle)
return wr_pong; return wr_pong;
} }
static uint32_t topic_payload_size (enum topicsel tp, uint32_t bgsize)
{
uint32_t size = 0;
switch (tp)
{
case KS: size = 12 + bgsize; break;
case K32: size = 32; break;
case K256: size = 256; break;
case OU: size = 4; break;
}
return size;
}
static bool process_data (dds_entity_t rd, struct subthread_arg *arg) static bool process_data (dds_entity_t rd, struct subthread_arg *arg)
{ {
uint32_t max_samples = arg->max_samples; uint32_t max_samples = arg->max_samples;
@ -798,10 +814,13 @@ static bool process_data (dds_entity_t rd, struct subthread_arg *arg)
uint32_t seq = 0, keyval = 0, size = 0; uint32_t seq = 0, keyval = 0, size = 0;
switch (topicsel) switch (topicsel)
{ {
case KS: { KeyedSeq *d = (KeyedSeq *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 12 + d->baggage._length; } break; case KS: {
case K32: { Keyed32 *d = (Keyed32 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 32; } break; KeyedSeq *d = (KeyedSeq *) mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, d->baggage._length);
case K256: { Keyed256 *d = (Keyed256 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 256; } break; break;
case OU: { OneULong *d = (OneULong *) mseq[i]; keyval = 0; seq = d->seq; size = 4; } break; }
case K32: { Keyed32 *d = (Keyed32 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, 0); } break;
case K256: { Keyed256 *d = (Keyed256 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, 0); } break;
case OU: { OneULong *d = (OneULong *) mseq[i]; keyval = 0; seq = d->seq; size = topic_payload_size (topicsel, 0); } break;
} }
(void) check_eseq (&eseq_admin, seq, keyval, size, iseq[i].publication_handle); (void) check_eseq (&eseq_admin, seq, keyval, size, iseq[i].publication_handle);
if (iseq[i].source_timestamp & 1) if (iseq[i].source_timestamp & 1)
@ -1314,10 +1333,11 @@ static int cmp_uint64 (const void *va, const void *vb)
return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; return (*a == *b) ? 0 : (*a < *b) ? -1 : 1;
} }
static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) static void print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state)
{ {
char prefix[128]; char prefix[128];
const double ts = (double) (tnow - tstart) / 1e9; const double ts = (double) (tnow - tref) / 1e9;
bool output = false;
snprintf (prefix, sizeof (prefix), "[%"PRIdPID"] %.3f ", ddsrt_getpid (), ts); snprintf (prefix, sizeof (prefix), "[%"PRIdPID"] %.3f ", ddsrt_getpid (), ts);
if (pub_rate > 0) if (pub_rate > 0)
@ -1325,18 +1345,20 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev)
ddsrt_mutex_lock (&pubstat_lock); ddsrt_mutex_lock (&pubstat_lock);
hist_print (prefix, pubstat_hist, tnow - tprev, 1); hist_print (prefix, pubstat_hist, tnow - tprev, 1);
ddsrt_mutex_unlock (&pubstat_lock); ddsrt_mutex_unlock (&pubstat_lock);
output = true;
} }
if (submode != SM_NONE) if (submode != SM_NONE)
{ {
struct eseq_admin * const ea = &eseq_admin; struct eseq_admin * const ea = &eseq_admin;
uint64_t tot_nrecv = 0, nrecv = 0, nrecv_bytes = 0, nlost = 0; uint64_t tot_nrecv = 0, tot_nlost = 0, nrecv = 0, nrecv_bytes = 0, nlost = 0;
uint32_t last_size = 0; uint32_t last_size = 0;
ddsrt_mutex_lock (&ea->lock); ddsrt_mutex_lock (&ea->lock);
for (uint32_t i = 0; i < ea->nph; i++) for (uint32_t i = 0; i < ea->nph; i++)
{ {
struct eseq_stat * const x = &ea->stats[i]; struct eseq_stat * const x = &ea->stats[i];
tot_nrecv += x->nrecv; tot_nrecv += x->nrecv;
tot_nlost += x->nlost;
nrecv += x->nrecv - x->nrecv_ref; nrecv += x->nrecv - x->nrecv_ref;
nlost += x->nlost - x->nlost_ref; nlost += x->nlost - x->nlost_ref;
nrecv_bytes += x->nrecv_bytes - x->nrecv_bytes_ref; nrecv_bytes += x->nrecv_bytes - x->nrecv_bytes_ref;
@ -1349,8 +1371,11 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev)
if (nrecv > 0) if (nrecv > 0)
{ {
printf ("%s size %"PRIu32" ntot %"PRIu64" delta: %"PRIu64" lost %"PRIu64" rate %.2f Mb/s\n", const double dt = (double) (tnow - tprev);
prefix, last_size, tot_nrecv, nrecv, nlost, (double) nrecv_bytes * 8 * 1e3 / (double) (tnow - tprev)); printf ("%s size %"PRIu32" total %"PRIu64" lost %"PRIu64" delta %"PRIu64" lost %"PRIu64" rate %.2f kS/s %.2f Mb/s\n",
prefix, last_size, tot_nrecv, tot_nlost, nrecv, nlost,
(double) nrecv * 1e6 / dt, (double) nrecv_bytes * 8 * 1e3 / dt);
output = true;
} }
} }
@ -1380,8 +1405,8 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev)
ddsrt_mutex_unlock (&disc_lock); ddsrt_mutex_unlock (&disc_lock);
qsort (y.raw, rawcnt, sizeof (*y.raw), cmp_uint64); qsort (y.raw, rawcnt, sizeof (*y.raw), cmp_uint64);
printf ("%s %s mean %.3fus min %.3fus 50%% %.3fus 90%% %.3fus 99%% %.3fus max %.3fus cnt %"PRIu32"\n", printf ("%s %s size %"PRIu32" mean %.3fus min %.3fus 50%% %.3fus 90%% %.3fus 99%% %.3fus max %.3fus cnt %"PRIu32"\n",
prefix, ppinfo, prefix, ppinfo, topic_payload_size (topicsel, baggagesize),
(double) y.sum / (double) y.cnt / 1e3, (double) y.sum / (double) y.cnt / 1e3,
(double) y.min / 1e3, (double) y.min / 1e3,
(double) y.raw[rawcnt - (rawcnt + 1) / 2] / 1e3, (double) y.raw[rawcnt - (rawcnt + 1) / 2] / 1e3,
@ -1389,6 +1414,7 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev)
(double) y.raw[rawcnt - (rawcnt + 99) / 100] / 1e3, (double) y.raw[rawcnt - (rawcnt + 99) / 100] / 1e3,
(double) y.max / 1e3, (double) y.max / 1e3,
y.cnt); y.cnt);
output = true;
} }
newraw = y.raw; newraw = y.raw;
@ -1396,6 +1422,42 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev)
} }
ddsrt_mutex_unlock (&pongstat_lock); ddsrt_mutex_unlock (&pongstat_lock);
free (newraw); free (newraw);
if (record_cputime (cputime_state, prefix, tnow))
output = true;
if (rd_stat)
{
#define MAXS 40 /* 40 participants is enough for everyone! */
void *raw[MAXS];
dds_sample_info_t si[MAXS];
int32_t n;
/* Read everything using a keep-last-1 reader: effectively latching the
most recent value. While not entirely correct, the nature of the process
is such that things should be stable, and this allows printing the stats
always in the same way despite the absence of synchronization. */
raw[0] = NULL;
if ((n = dds_take_mask (rd_stat, raw, si, MAXS, MAXS, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE | DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE)) > 0)
{
for (int32_t i = 0; i < n; i++)
if (si[i].valid_data && si[i].sample_state == DDS_SST_NOT_READ)
if (print_cputime (raw[i], prefix, true, true))
output = true;
dds_return_loan (rd_stat, raw, n);
}
if ((n = dds_read (rd_stat, raw, si, MAXS, MAXS)) > 0)
{
for (int32_t i = 0; i < n; i++)
if (si[i].valid_data)
if (print_cputime (raw[i], prefix, true, si[i].sample_state == DDS_SST_NOT_READ))
output = true;
dds_return_loan (rd_stat, raw, n);
}
#undef MAXS
}
if (output)
record_netload (netload_state, prefix, tnow);
fflush (stdout); fflush (stdout);
} }
@ -1436,6 +1498,23 @@ static uint32_t sigthread (void *varg)
error2 ("sigwait failed: %d\n", errno); error2 ("sigwait failed: %d\n", errno);
return 0; return 0;
} }
#if defined __APPLE__ || defined __linux
static void sigxfsz_handler (int sig __attribute__ ((unused)))
{
static const char msg[] = "file size limit reached\n";
static ddsrt_atomic_uint32_t seen = DDSRT_ATOMIC_UINT32_INIT (0);
if (!ddsrt_atomic_or32_ov (&seen, 1))
{
dds_time_t tnow = dds_time ();
if (write (2, msg, sizeof (msg) - 1) < 0) {
/* may not ignore return value according to Linux/gcc */
}
print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL);
kill (getpid (), 9);
}
}
#endif
#endif #endif
/******************** /********************
@ -1449,7 +1528,7 @@ static void usage (void)
%s [OPTIONS] MODE...\n\ %s [OPTIONS] MODE...\n\
\n\ \n\
OPTIONS:\n\ OPTIONS:\n\
-T KS|K32|K256|OU topic:\n\ -T KS|K32|K256|OU topic (KS is default):\n\
KS seq num, key value, sequence-of-octets\n\ KS seq num, key value, sequence-of-octets\n\
K32 seq num, key value, array of 24 octets\n\ K32 seq num, key value, array of 24 octets\n\
K256 seq num, key value, array of 248 octets\n\ K256 seq num, key value, array of 248 octets\n\
@ -1465,7 +1544,7 @@ OPTIONS:\n\
-M DUR require those participants to match within DUR seconds\n\ -M DUR require those participants to match within DUR seconds\n\
\n\ \n\
MODE... is zero or more of:\n\ MODE... is zero or more of:\n\
ping [R[Hz]] [waitset|listener]\n\ ping [R[Hz]] [size N] [waitset|listener]\n\
Send a ping upon receiving all expected pongs, or send a ping at\n\ Send a ping upon receiving all expected pongs, or send a ping at\n\
rate R (optionally suffixed with Hz). The triggering mode is either\n\ rate R (optionally suffixed with Hz). The triggering mode is either\n\
a listener (default, unless -L has been specified) or a waitset.\n\ a listener (default, unless -L has been specified) or a waitset.\n\
@ -1476,7 +1555,7 @@ MODE... is zero or more of:\n\
sub [waitset|listener|polling]\n\ sub [waitset|listener|polling]\n\
Subscribe to data, with calls to take occurring either in a listener\n\ Subscribe to data, with calls to take occurring either in a listener\n\
(default), when a waitset is triggered, or by polling at 1kHz.\n\ (default), when a waitset is triggered, or by polling at 1kHz.\n\
pub [R[Hz]] [burst N] [[ping] X%%]\n\ pub [R[Hz]] [size N] [burst N] [[ping] X%%]\n\
Publish bursts of data at rate R, optionally suffixed with Hz. If\n\ Publish bursts of data at rate R, optionally suffixed with Hz. If\n\
no rate is given or R is \"inf\", data is published as fast as\n\ no rate is given or R is \"inf\", data is published as fast as\n\
possible. Each burst is a single sample by default, but can be set\n\ possible. Each burst is a single sample by default, but can be set\n\
@ -1484,6 +1563,11 @@ MODE... is zero or more of:\n\
If desired, a fraction of the samples can be treated as if it were a\n\ If desired, a fraction of the samples can be treated as if it were a\n\
ping, for this, specify a percentage either as \"ping X%%\" (the\n\ ping, for this, specify a percentage either as \"ping X%%\" (the\n\
\"ping\" keyword is optional, the %% sign is not).\n\ \"ping\" keyword is optional, the %% sign is not).\n\
\n\
Payload size (including fixed part of topic) may be set as part of a\n\
\"ping\" or \"pub\" specification for topic KS (there is only size,\n\
the last one given determines it for all) and should be either 0 (minimal,\n\
equivalent to 12) or >= 12.\n\
\n\ \n\
If no MODE specified, it defaults to a 1Hz ping + responding to any pings.\n\ If no MODE specified, it defaults to a 1Hz ping + responding to any pings.\n\
", argv0, argv0); ", argv0, argv0);
@ -1516,7 +1600,7 @@ static int exact_string_int_map_lookup (const struct string_int_map_elem *elems,
if (strcmp (elems[i].name, str) == 0) if (strcmp (elems[i].name, str) == 0)
return elems[i].value; return elems[i].value;
if (notfound_error) if (notfound_error)
error3 ("%s: undefined %s", str, label); error3 ("%s: undefined %s\n", str, label);
return -1; return -1;
} }
@ -1538,12 +1622,30 @@ static int string_int_map_lookup (const struct string_int_map_elem *elems, const
} }
} }
if (ambiguous) if (ambiguous)
error3 ("%s: ambiguous %sspecification", str, label); error3 ("%s: ambiguous %sspecification\n", str, label);
if (match == SIZE_MAX && notfound_error) if (match == SIZE_MAX && notfound_error)
error3 ("%s: undefined %s", str, label); error3 ("%s: undefined %s\n", str, label);
return (match == SIZE_MAX) ? -1 : elems[match].value; return (match == SIZE_MAX) ? -1 : elems[match].value;
} }
static bool set_simple_uint32 (int *xoptind, int xargc, char * const xargv[], const char *token, uint32_t *val)
{
if (strcmp (xargv[*xoptind], token) != 0)
return false;
else
{
unsigned x;
int pos;
if (++(*xoptind) == xargc)
error3 ("argument missing in %s specification\n", token);
if (sscanf (xargv[*xoptind], "%u%n", &x, &pos) == 1 && xargv[*xoptind][pos] == 0)
*val = x;
else
error3 ("%s: invalid %s specification\n", xargv[*xoptind], token);
return true;
}
}
static void set_mode_ping (int *xoptind, int xargc, char * const xargv[]) static void set_mode_ping (int *xoptind, int xargc, char * const xargv[])
{ {
ping_intv = 0; ping_intv = 0;
@ -1562,6 +1664,10 @@ static void set_mode_ping (int *xoptind, int xargc, char * const xargv[])
else if (ping_rate > 0) ping_intv = (dds_duration_t) (1e9 / ping_rate + 0.5); else if (ping_rate > 0) ping_intv = (dds_duration_t) (1e9 / ping_rate + 0.5);
else error3 ("%s: invalid ping rate\n", xargv[*xoptind]); else error3 ("%s: invalid ping rate\n", xargv[*xoptind]);
} }
else if (set_simple_uint32 (xoptind, xargc, xargv, "size", &baggagesize))
{
/* no further work needed */
}
else else
{ {
pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "ping mode", xargv[*xoptind], true); pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "ping mode", xargv[*xoptind], true);
@ -1614,15 +1720,13 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[])
if (r < 0) error3 ("%s: invalid publish rate\n", xargv[*xoptind]); if (r < 0) error3 ("%s: invalid publish rate\n", xargv[*xoptind]);
pub_rate = r; pub_rate = r;
} }
else if (strcmp (xargv[*xoptind], "burst") == 0) else if (set_simple_uint32 (xoptind, xargc, xargv, "burst", &burstsize))
{ {
unsigned b; /* no further work needed */
if (++(*xoptind) == xargc) }
error3 ("argument missing in burst size specification\n"); else if (set_simple_uint32 (xoptind, xargc, xargv, "size", &baggagesize))
if (sscanf (xargv[*xoptind], "%u%n", &b, &pos) == 1 && xargv[*xoptind][pos] == 0) {
burstsize = b; /* no further work needed */
else
error3 ("%s: invalid burst size specification\n", xargv[*xoptind]);
} }
else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && strcmp (xargv[*xoptind] + pos, "%") == 0) else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && strcmp (xargv[*xoptind] + pos, "%") == 0)
{ {
@ -1675,28 +1779,43 @@ int main (int argc, char *argv[])
dds_qos_t *qos; dds_qos_t *qos;
dds_listener_t *listener; dds_listener_t *listener;
int opt; int opt;
bool collect_stats = false;
dds_time_t tref = DDS_INFINITY;
ddsrt_threadattr_t attr; ddsrt_threadattr_t attr;
ddsrt_thread_t pubtid, subtid, subpingtid, subpongtid; ddsrt_thread_t pubtid, subtid, subpingtid, subpongtid;
#if !_WIN32 && !DDSRT_WITH_FREERTOS #if !_WIN32 && !DDSRT_WITH_FREERTOS
sigset_t sigset, osigset; sigset_t sigset, osigset;
ddsrt_thread_t sigtid; ddsrt_thread_t sigtid;
#endif #endif
char netload_if[256];
double netload_bw = 0;
ddsrt_threadattr_init (&attr); ddsrt_threadattr_init (&attr);
argv0 = argv[0]; argv0 = argv[0];
if (argc == 2 && strcmp (argv[1], "help") == 0) if (argc == 2 && strcmp (argv[1], "help") == 0)
usage (); usage ();
while ((opt = getopt (argc, argv, "D:n:z:k:uLT:M:N:h")) != EOF) while ((opt = getopt (argc, argv, "cd:D:n:k:uLK:T:M:N:R:h")) != EOF)
{ {
switch (opt) switch (opt)
{ {
case 'c': collect_stats = true; break;
case 'd': {
char *col;
int pos;
ddsrt_strlcpy (netload_if, optarg, sizeof (netload_if));
if ((col = strrchr (netload_if, ':')) == NULL || col == netload_if ||
(sscanf (col+1, "%lf%n", &netload_bw, &pos) != 1 || (col+1)[pos] != 0))
error3 ("-d%s: expected DEVICE:BANDWIDTH\n", optarg);
*col = 0;
break;
}
case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break; case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break;
case 'n': nkeyvals = (unsigned) atoi (optarg); break; case 'n': nkeyvals = (unsigned) atoi (optarg); break;
case 'u': reliable = false; break; case 'u': reliable = false; break;
case 'k': histdepth = atoi (optarg); if (histdepth < 0) histdepth = 0; break; case 'k': histdepth = atoi (optarg); if (histdepth < 0) histdepth = 0; break;
case 'L': ignorelocal = DDS_IGNORELOCAL_NONE; break; case 'L': ignorelocal = DDS_IGNORELOCAL_NONE; break;
case 'T': case 'T': case 'K': /* 'K' because of my muscle memory with pubsub ... */
if (strcmp (optarg, "KS") == 0) topicsel = KS; if (strcmp (optarg, "KS") == 0) topicsel = KS;
else if (strcmp (optarg, "K32") == 0) topicsel = K32; else if (strcmp (optarg, "K32") == 0) topicsel = K32;
else if (strcmp (optarg, "K256") == 0) topicsel = K256; else if (strcmp (optarg, "K256") == 0) topicsel = K256;
@ -1705,7 +1824,7 @@ int main (int argc, char *argv[])
break; break;
case 'M': maxwait = atof (optarg); if (maxwait <= 0) maxwait = HUGE_VAL; break; case 'M': maxwait = atof (optarg); if (maxwait <= 0) maxwait = HUGE_VAL; break;
case 'N': minmatch = (unsigned) atoi (optarg); break; case 'N': minmatch = (unsigned) atoi (optarg); break;
case 'z': baggagesize = (unsigned) atoi (optarg); break; case 'R': tref = 0; sscanf (optarg, "%"SCNd64, &tref); break;
case 'h': usage (); break; case 'h': usage (); break;
default: error3 ("-%c: unknown option\n", opt); break; default: error3 ("-%c: unknown option\n", opt); break;
} }
@ -1717,12 +1836,18 @@ int main (int argc, char *argv[])
if (topicsel == OU && nkeyvals != 1) if (topicsel == OU && nkeyvals != 1)
error3 ("-n%u invalid: topic OU has no key\n", nkeyvals); error3 ("-n%u invalid: topic OU has no key\n", nkeyvals);
if (topicsel != KS && baggagesize != 0) if (topicsel != KS && baggagesize != 0)
error3 ("-z%u invalid: only topic KS has a sequence\n", baggagesize); error3 ("size %"PRIu32" invalid: only topic KS has a sequence\n", baggagesize);
if (baggagesize != 0 && baggagesize < 12) if (baggagesize != 0 && baggagesize < 12)
error3 ("-z%u invalid: too small to allow for overhead\n", baggagesize); error3 ("size %"PRIu32" invalid: too small to allow for overhead\n", baggagesize);
else if (baggagesize > 0) else if (baggagesize > 0)
baggagesize -= 12; baggagesize -= 12;
struct record_netload_state *netload_state;
if (netload_bw <= 0)
netload_state = NULL;
else if ((netload_state = record_netload_new (netload_if, netload_bw)) == NULL)
error3 ("can't get network utilization information for device %s\n", netload_if);
ddsrt_avl_init (&ppants_td, &ppants); ddsrt_avl_init (&ppants_td, &ppants);
ddsrt_fibheap_init (&ppants_to_match_fhd, &ppants_to_match); ddsrt_fibheap_init (&ppants_to_match_fhd, &ppants_to_match);
@ -1735,7 +1860,7 @@ int main (int argc, char *argv[])
qos = dds_create_qos (); qos = dds_create_qos ();
/* set user data: magic cookie, whether we have a reader for the Data topic /* set user data: magic cookie, whether we have a reader for the Data topic
(all other endpoints always exist), and our hostname */ (all other endpoints always exist), pid and hostname */
{ {
unsigned pos; unsigned pos;
char udata[256]; char udata[256];
@ -1760,6 +1885,12 @@ int main (int argc, char *argv[])
error2 ("dds_create_publisher failed: %d\n", (int) dp); error2 ("dds_create_publisher failed: %d\n", (int) dp);
dds_delete_qos (qos); dds_delete_qos (qos);
qos = dds_create_qos ();
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS (100));
if ((tp_stat = dds_create_topic (dp, &CPUStats_desc, "DDSPerfCPUStats", qos, NULL)) < 0)
error2 ("dds_create_topic(%s) failed: %d\n", "DDSPerfCPUStats", (int) tp_stat);
dds_delete_qos (qos);
{ {
const char *tp_suf = ""; const char *tp_suf = "";
const dds_topic_descriptor_t *tp_desc = NULL; const dds_topic_descriptor_t *tp_desc = NULL;
@ -1797,6 +1928,19 @@ int main (int argc, char *argv[])
if ((rd_publications = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0) if ((rd_publications = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0)
error2 ("dds_create_reader(publications) failed: %d\n", (int) rd_publications); error2 ("dds_create_reader(publications) failed: %d\n", (int) rd_publications);
/* stats writer always exists, reader only when we were requested to collect & print stats */
qos = dds_create_qos ();
dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1);
dds_qset_ignorelocal (qos, DDS_IGNORELOCAL_PARTICIPANT);
if ((wr_stat = dds_create_writer (pub, tp_stat, qos, NULL)) < 0)
error2 ("dds_create_writer(statistics) failed: %d\n", (int) wr_stat);
if (collect_stats)
{
if ((rd_stat = dds_create_reader (sub, tp_stat, qos, NULL)) < 0)
error2 ("dds_create_reader(statistics) failed: %d\n", (int) rd_stat);
}
dds_delete_qos (qos);
/* ping reader/writer uses keep-last-1 history; not checking matching on these (yet) */ /* ping reader/writer uses keep-last-1 history; not checking matching on these (yet) */
qos = dds_create_qos (); qos = dds_create_qos ();
dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1);
@ -1882,6 +2026,9 @@ int main (int argc, char *argv[])
sigaddset (&sigset, SIGTERM); sigaddset (&sigset, SIGTERM);
sigprocmask (SIG_BLOCK, &sigset, &osigset); sigprocmask (SIG_BLOCK, &sigset, &osigset);
ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset); ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset);
#if defined __APPLE__ || defined __linux
signal (SIGXFSZ, sigxfsz_handler);
#endif
#endif #endif
/* Make publisher & subscriber thread arguments and start the threads we /* Make publisher & subscriber thread arguments and start the threads we
@ -1920,8 +2067,8 @@ int main (int argc, char *argv[])
const bool pingpong_waitset = (ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE) || pingpongmode == SM_WAITSET; const bool pingpong_waitset = (ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE) || pingpongmode == SM_WAITSET;
if (pingpong_waitset) if (pingpong_waitset)
{ {
ddsrt_thread_create (&subpingtid, "sub", &attr, subpingthread_waitset, &subarg_pong); ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &subarg_pong);
ddsrt_thread_create (&subpongtid, "sub", &attr, subpongthread_waitset, &subarg_pong); ddsrt_thread_create (&subpongtid, "pong", &attr, subpongthread_waitset, &subarg_pong);
} }
else else
{ {
@ -1929,10 +2076,16 @@ int main (int argc, char *argv[])
set_data_available_listener (rd_pong, "rd_pong", pong_available_listener, &subarg_pong); set_data_available_listener (rd_pong, "rd_pong", pong_available_listener, &subarg_pong);
} }
/* Have to do this after all threads have been created because it caches the list */
struct record_cputime_state *cputime_state;
cputime_state = record_cputime_new (wr_stat);
/* Run until time limit reached or a signal received. (The time calculations /* Run until time limit reached or a signal received. (The time calculations
ignore the possibility of overflow around the year 2260.) */ ignore the possibility of overflow around the year 2260.) */
dds_time_t tnow = dds_time (); dds_time_t tnow = dds_time ();
const dds_time_t tstart = tnow; const dds_time_t tstart = tnow;
if (tref == DDS_INFINITY)
tref = tstart;
dds_time_t tmatch = (maxwait == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (maxwait * 1e9 + 0.5); dds_time_t tmatch = (maxwait == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (maxwait * 1e9 + 0.5);
const dds_time_t tstop = (dur == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (dur * 1e9 + 0.5); const dds_time_t tstop = (dur == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (dur * 1e9 + 0.5);
dds_time_t tnext = tstart + DDS_SECS (1); dds_time_t tnext = tstart + DDS_SECS (1);
@ -2003,7 +2156,7 @@ int main (int argc, char *argv[])
tnow = dds_time (); tnow = dds_time ();
if (tnext <= tnow) if (tnext <= tnow)
{ {
print_stats (tstart, tnow, tlast); print_stats (tref, tnow, tlast, cputime_state, netload_state);
tlast = tnow; tlast = tnow;
if (tnow > tnext + DDS_MSECS (500)) if (tnow > tnext + DDS_MSECS (500))
tnext = tnow + DDS_SECS (1); tnext = tnow + DDS_SECS (1);
@ -2021,6 +2174,8 @@ int main (int argc, char *argv[])
maybe_send_new_ping (tnow, &tnextping); maybe_send_new_ping (tnow, &tnextping);
} }
} }
record_netload_free (netload_state);
record_cputime_free (cputime_state);
#if _WIN32 #if _WIN32
signal_handler (SIGINT); signal_handler (SIGINT);

View file

@ -27,3 +27,20 @@ struct KeyedSeq
sequence<octet> baggage; sequence<octet> baggage;
}; };
#pragma keylist KeyedSeq keyval #pragma keylist KeyedSeq keyval
struct CPUStatThread
{
string name;
long u_pct;
long s_pct;
};
struct CPUStats
{
string hostname;
unsigned long pid;
double maxrss;
boolean some_above;
sequence<CPUStatThread> cpu;
};
#pragma keylist CPUStats hostname pid

116
src/tools/ddsperf/netload.c Normal file
View file

@ -0,0 +1,116 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include "dds/dds.h"
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/string.h"
#include "dds/ddsrt/netstat.h"
#include "netload.h"
#if DDSRT_HAVE_NETSTAT
struct record_netload_state {
struct ddsrt_netstat_control *ctrl;
char *name;
double bw;
bool errored;
bool data_valid;
dds_time_t tprev;
uint64_t ibytes;
uint64_t obytes;
};
void record_netload (struct record_netload_state *st, const char *prefix, dds_time_t tnow)
{
if (st && !st->errored)
{
struct ddsrt_netstat x;
dds_return_t ret = ddsrt_netstat_get (st->ctrl, &x);
st->errored = (ret == DDS_RETCODE_ERROR);
if (ret == DDS_RETCODE_OK)
{
if (st->data_valid)
{
/* interface speeds are in bits/s, so convert bytes to bits */
const double dx = 8 * (double) (x.obytes - st->obytes);
const double dr = 8 * (double) (x.ibytes - st->ibytes);
const double dt = (double) (tnow - st->tprev) / 1e9;
const double dxpct = 100.0 * dx / dt / st->bw;
const double drpct = 100.0 * dr / dt / st->bw;
if (dxpct >= 0.5 || drpct >= 0.5)
{
printf ("%s %s: xmit %.0f%% recv %.0f%% [%"PRIu64" %"PRIu64"]\n",
prefix, st->name, dxpct, drpct, x.obytes, x.ibytes);
}
}
st->obytes = x.obytes;
st->ibytes = x.ibytes;
st->tprev = tnow;
st->data_valid = true;
}
}
}
struct record_netload_state *record_netload_new (const char *dev, double bw)
{
struct record_netload_state *st = ddsrt_malloc (sizeof (*st));
if (ddsrt_netstat_new (&st->ctrl, dev) != DDS_RETCODE_OK)
{
ddsrt_free (st);
return NULL;
}
st->name = ddsrt_strdup (dev);
st->bw = bw;
st->data_valid = false;
st->errored = false;
record_netload (st, NULL, dds_time ());
return st;
}
void record_netload_free (struct record_netload_state *st)
{
if (st)
{
ddsrt_netstat_free (st->ctrl);
ddsrt_free (st->name);
ddsrt_free (st);
}
}
#else
void record_netload (struct record_netload_state *st, const char *prefix, dds_time_t tnow)
{
(void) st;
(void) prefix;
(void ) tnow;
}
struct record_netload_state *record_netload_new (const char *dev, double bw)
{
(void) dev;
(void) bw;
return NULL;
}
void record_netload_free (struct record_netload_state *st)
{
(void) st;
}
#endif

View file

@ -0,0 +1,23 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef NETLOAD_H
#define NETLOAD_H
#include <dds/dds.h>
struct record_netload_state;
void record_netload (struct record_netload_state *st, const char *prefix, dds_time_t tnow);
struct record_netload_state *record_netload_new (const char *dev, double bw);
void record_netload_free (struct record_netload_state *st);
#endif