From 952029dba0bbf888e5dcfc540fffdc01b9f27b5d Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 2 Aug 2019 08:58:18 +0200 Subject: [PATCH] ddsperf enhancements * per-thread CPU usage (only those threads where the load is over 0.5%, if the sum of threads below that threshold exceeds 0.5%, it prints an aggregate for those threads); * also report RSS; * network load (only on request, as percentage of specified network bandwidth and actual bytes in/out, with the output suppressed if it is 0%); * publish CPU usage so a ddsperf instance can display CPU loads for its peers; * handle SIGXFSZ (file size exceeded) by displaying one last line of statistics before killing itself; this simply a debugging tool to make it easier to get a trace covering a high sample-rate start-up issue; * default topic changed to "KS" because that allows all the options to be used, this has a negative impact on performance (both latency and small-sample throughput) but it should be less surprising to users; * specifying a size is now done by appending "size N" (where N is the size in bytes) after a "ping" or "pub" command, rather than it having to set it via a command-line option; Note that some of this is platform-dependent -- SIGXFSZ is currently only on Linux and macOS, and CPU and network load reporting is currently only on Linux, macOS and Windows. Signed-off-by: Erik Boasson --- src/tools/ddsperf/CMakeLists.txt | 2 +- src/tools/ddsperf/cputime.c | 250 ++++++++++++++++++++++++++++ src/tools/ddsperf/cputime.h | 24 +++ src/tools/ddsperf/ddsperf.c | 237 +++++++++++++++++++++----- src/tools/ddsperf/ddsperf_types.idl | 17 ++ src/tools/ddsperf/netload.c | 116 +++++++++++++ src/tools/ddsperf/netload.h | 23 +++ 7 files changed, 627 insertions(+), 42 deletions(-) create mode 100644 src/tools/ddsperf/cputime.c create mode 100644 src/tools/ddsperf/cputime.h create mode 100644 src/tools/ddsperf/netload.c create mode 100644 src/tools/ddsperf/netload.h diff --git a/src/tools/ddsperf/CMakeLists.txt b/src/tools/ddsperf/CMakeLists.txt index c09c8e9..25374b3 100644 --- a/src/tools/ddsperf/CMakeLists.txt +++ b/src/tools/ddsperf/CMakeLists.txt @@ -11,7 +11,7 @@ # idlc_generate(ddsperf_types ddsperf_types.idl) -add_executable(ddsperf ddsperf.c) +add_executable(ddsperf ddsperf.c cputime.c cputime.h netload.c netload.h) target_link_libraries(ddsperf ddsperf_types ddsc) if(WIN32) diff --git a/src/tools/ddsperf/cputime.c b/src/tools/ddsperf/cputime.c new file mode 100644 index 0000000..396580c --- /dev/null +++ b/src/tools/ddsperf/cputime.c @@ -0,0 +1,250 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#define _ISOC99_SOURCE +#include +#include +#include +#include + +#include "dds/dds.h" + +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/sockets.h" +#include "dds/ddsrt/threads.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/rusage.h" + +#include "cputime.h" +#include "ddsperf_types.h" + +static void print (char *line, size_t sz, size_t *pos, const char *name, double du, double ds) +{ + if (*pos < sz) + *pos += (size_t) snprintf (line + *pos, sz - *pos, " %s:%.0f%%+%.0f%%", name, 100.0 * du, 100.0 * ds); +} + +bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_host, bool is_fresh) +{ + if (!s->some_above) + return false; + else + { + char line[512]; + size_t pos = 0; + assert (is_fresh || !print_host); + pos += (size_t) snprintf (line + pos, sizeof (line) - pos, "%s", prefix); + if (!is_fresh) + pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " (stale)"); + if (print_host) + { + int n = (int) strlen (s->hostname); + if (n > 100) n = 100; + pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " @%*.*s:%"PRId32, n, n, s->hostname, s->pid); + } + if (s->maxrss > 1048576) + pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " rss:%.1fMB", s->maxrss / 1048576.0); + else if (s->maxrss > 1024) + pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " rss:%.0fkB", s->maxrss / 1024.0); + else { + /* non-sensical value -- presumably maxrss is not available */ + } + const size_t init_pos = pos; + for (uint32_t i = 0; i < s->cpu._length; i++) + { + struct CPUStatThread * const thr = &s->cpu._buffer[i]; + print (line, sizeof (line), &pos, thr->name, thr->u_pct / 100.0, thr->s_pct / 100.0); + } + if (pos > init_pos) + puts (line); + return true; + } +} + +#if DDSRT_HAVE_RUSAGE && DDSRT_HAVE_THREAD_LIST + +struct record_cputime_state_thr { + ddsrt_thread_list_id_t tid; + char name[32]; + double ut, st; +}; + +struct record_cputime_state { + bool supported; + dds_time_t tprev; + size_t nthreads; + struct record_cputime_state_thr *threads; + dds_entity_t wr; + struct CPUStats s; +}; + +static void update (double *ut_old, double *st_old, double dt, double ut_new, double st_new, double *du, double *ds) +{ + *du = (ut_new - *ut_old) / dt; + *ds = (st_new - *st_old) / dt; + *ut_old = ut_new; + *st_old = st_new; +} + +static bool above_threshold (double *max, double *du_skip, double *ds_skip, double du, double ds) +{ + if (*max < du) *max = du; + if (*max < ds) *max = ds; + if (du >= 0.005 || ds >= 0.005) + return true; + else if (du_skip == NULL || ds_skip == NULL) + return false; + else + { + *du_skip += du; + *ds_skip += ds; + return false; + } +} + +bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow) +{ + if (state == NULL) + return false; + + ddsrt_rusage_t usage; + if (ddsrt_getrusage (DDSRT_RUSAGE_SELF, &usage) < 0) + usage.maxrss = 0; + double max = 0; + double du_skip = 0.0, ds_skip = 0.0; + const double dt = (double) (tnow - state->tprev) / 1e9; + bool some_above = false; + + state->s.maxrss = (double) usage.maxrss; + state->s.cpu._length = 0; + for (size_t i = 0; i < state->nthreads; i++) + { + struct record_cputime_state_thr * const thr = &state->threads[i]; + if (ddsrt_getrusage_anythread (thr->tid, &usage) < 0) + continue; + + const double ut = (double) usage.utime / 1e9; + const double st = (double) usage.stime / 1e9; + double du, ds; + update (&thr->ut, &thr->st, dt, ut, st, &du, &ds); + if (above_threshold (&max, &du_skip, &ds_skip, du, ds)) + { + some_above = true; + /* Thread names are often set by thread itself immediately after creation, + and so it depends on the scheduling whether there is still a default + name or the name we are interested in. Lazily retrieving the name the + first time the thread pops up in the CPU usage works around the timing + problem. */ + if (thr->name[0] == 0) + { + if (ddsrt_thread_getname_anythread (thr->tid, thr->name, sizeof (thr->name)) < 0) + { + du_skip += du; + ds_skip += ds; + continue; + } + } + + struct CPUStatThread * const x = &state->s.cpu._buffer[state->s.cpu._length++]; + x->name = thr->name; + x->u_pct = (int) (100.0 * du + 0.5); + x->s_pct = (int) (100.0 * ds + 0.5); + } + } + if (above_threshold (&max, NULL, NULL, du_skip, ds_skip)) + { + struct CPUStatThread * const x = &state->s.cpu._buffer[state->s.cpu._length++]; + some_above = true; + x->name = "others"; + x->u_pct = (int) (100.0 * du_skip + 0.5); + x->s_pct = (int) (100.0 * ds_skip + 0.5); + } + state->tprev = tnow; + state->s.some_above = some_above; + dds_write (state->wr, &state->s); + return print_cputime (&state->s, prefix, false, true); +} + +struct record_cputime_state *record_cputime_new (dds_entity_t wr) +{ + ddsrt_thread_list_id_t tids[100]; + dds_return_t n; + if ((n = ddsrt_thread_list (tids, sizeof (tids) / sizeof (tids[0]))) <= 0) + return NULL; + else if (n > (dds_return_t) (sizeof (tids) / sizeof (tids[0]))) + { + fprintf (stderr, "way more threads than expected\n"); + return NULL; + } + + struct record_cputime_state *state = malloc (sizeof (*state)); + state->tprev = dds_time (); + state->wr = wr; + state->threads = malloc ((size_t) n * sizeof (*state->threads)); + state->nthreads = 0; + for (int32_t i = 0; i < n; i++) + { + struct record_cputime_state_thr * const thr = &state->threads[state->nthreads]; + ddsrt_rusage_t usage; + if (ddsrt_getrusage_anythread (tids[i], &usage) < 0) + continue; + thr->tid = tids[i]; + thr->name[0] = 0; + thr->ut = (double) usage.utime / 1e9; + thr->st = (double) usage.stime / 1e9; + state->nthreads++; + } + + char hostname[128]; + if (ddsrt_gethostname (hostname, sizeof (hostname)) != DDS_RETCODE_OK) + strcpy (hostname, "?"); + state->s.hostname = ddsrt_strdup (hostname); + state->s.pid = (uint32_t) ddsrt_getpid (); + state->s.cpu._length = 0; + state->s.cpu._maximum = (uint32_t) state->nthreads; + state->s.cpu._buffer = malloc (state->s.cpu._maximum * sizeof (*state->s.cpu._buffer)); + state->s.cpu._release = false; + return state; +} + +void record_cputime_free (struct record_cputime_state *state) +{ + if (state) + { + free (state->threads); + ddsrt_free (state->s.hostname); + /* we alias thread names in state->s->cpu._buffer, so no need to free */ + free (state->s.cpu._buffer); + free (state); + } +} + +#else + +bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow) +{ + (void) state; + (void) prefix; + (void) tnow; +} + +struct record_cputime_state *record_cputime_new (dds_entity_t wr) +{ + (void) wr; +} + +void record_cputime_free (struct record_cputime_state *state) +{ + (void) state; +} + +#endif diff --git a/src/tools/ddsperf/cputime.h b/src/tools/ddsperf/cputime.h new file mode 100644 index 0000000..3216692 --- /dev/null +++ b/src/tools/ddsperf/cputime.h @@ -0,0 +1,24 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef CPUTIME_H +#define CPUTIME_H + +#include "ddsperf_types.h" + +struct record_cputime_state; + +struct record_cputime_state *record_cputime_new (dds_entity_t wr); +void record_cputime_free (struct record_cputime_state *state); +bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow); +bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_host, bool is_fresh); + +#endif diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index b9f3487..e9e6fad 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -37,6 +37,9 @@ #include "dds/ddsrt/fibheap.h" #include "dds/ddsrt/atomics.h" +#include "cputime.h" +#include "netload.h" + #if !defined(_WIN32) && !defined(LWIP_SOCKET) #include #endif @@ -74,22 +77,22 @@ static dds_entity_t rd_participants, rd_subscriptions, rd_publications; /* Topics, readers, writers (except for pong writers: there are many of those) */ -static dds_entity_t tp_data, tp_ping, tp_pong; +static dds_entity_t tp_data, tp_ping, tp_pong, tp_stat; static char tpname_data[32], tpname_ping[32], tpname_pong[32]; -static dds_entity_t sub, pub, wr_data, wr_ping, rd_data, rd_ping, rd_pong; +static dds_entity_t sub, pub, wr_data, wr_ping, wr_stat, rd_data, rd_ping, rd_pong, rd_stat; /* Number of different key values to use (must be 1 for OU type) */ static unsigned nkeyvals = 1; /* Topic type to use */ -static enum topicsel topicsel = OU; +static enum topicsel topicsel = KS; /* Data and ping/pong subscriber triggering modes */ static enum submode submode = SM_LISTENER; static enum submode pingpongmode = SM_LISTENER; /* Size of the sequence in KeyedSeq type in bytes */ -static unsigned baggagesize = 0; +static uint32_t baggagesize = 0; /* Whether or not to register instances prior to writing */ static bool register_instances = true; @@ -115,7 +118,7 @@ static uint32_t matchcount = 0; static uint32_t matchtimeout = 0; /* Data is published in bursts of this many samples */ -static unsigned burstsize = 1; +static uint32_t burstsize = 1; /* Whether to use reliable or best-effort readers/writers */ static bool reliable = true; @@ -490,7 +493,7 @@ static void hist_print (const char *prefix, struct hist *h, dds_time_t dt, int r hist_reset (h); } -static void *make_baggage (dds_sequence_t *b, unsigned cnt) +static void *make_baggage (dds_sequence_t *b, uint32_t cnt) { b->_maximum = b->_length = cnt; if (cnt == 0) @@ -558,7 +561,7 @@ static uint32_t pubthread (void *varg) tfirst0 = tfirst = dds_time(); - unsigned bi = 0; + uint32_t bi = 0; while (!ddsrt_atomic_ld32 (&termflag)) { /* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */ @@ -783,6 +786,19 @@ static dds_entity_t get_pong_writer (dds_instance_handle_t pubhandle) return wr_pong; } +static uint32_t topic_payload_size (enum topicsel tp, uint32_t bgsize) +{ + uint32_t size = 0; + switch (tp) + { + case KS: size = 12 + bgsize; break; + case K32: size = 32; break; + case K256: size = 256; break; + case OU: size = 4; break; + } + return size; +} + static bool process_data (dds_entity_t rd, struct subthread_arg *arg) { uint32_t max_samples = arg->max_samples; @@ -798,10 +814,13 @@ static bool process_data (dds_entity_t rd, struct subthread_arg *arg) uint32_t seq = 0, keyval = 0, size = 0; switch (topicsel) { - case KS: { KeyedSeq *d = (KeyedSeq *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 12 + d->baggage._length; } break; - case K32: { Keyed32 *d = (Keyed32 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 32; } break; - case K256: { Keyed256 *d = (Keyed256 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 256; } break; - case OU: { OneULong *d = (OneULong *) mseq[i]; keyval = 0; seq = d->seq; size = 4; } break; + case KS: { + KeyedSeq *d = (KeyedSeq *) mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, d->baggage._length); + break; + } + case K32: { Keyed32 *d = (Keyed32 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, 0); } break; + case K256: { Keyed256 *d = (Keyed256 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = topic_payload_size (topicsel, 0); } break; + case OU: { OneULong *d = (OneULong *) mseq[i]; keyval = 0; seq = d->seq; size = topic_payload_size (topicsel, 0); } break; } (void) check_eseq (&eseq_admin, seq, keyval, size, iseq[i].publication_handle); if (iseq[i].source_timestamp & 1) @@ -1314,10 +1333,11 @@ static int cmp_uint64 (const void *va, const void *vb) return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; } -static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) +static void print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state) { char prefix[128]; - const double ts = (double) (tnow - tstart) / 1e9; + const double ts = (double) (tnow - tref) / 1e9; + bool output = false; snprintf (prefix, sizeof (prefix), "[%"PRIdPID"] %.3f ", ddsrt_getpid (), ts); if (pub_rate > 0) @@ -1325,18 +1345,20 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) ddsrt_mutex_lock (&pubstat_lock); hist_print (prefix, pubstat_hist, tnow - tprev, 1); ddsrt_mutex_unlock (&pubstat_lock); + output = true; } if (submode != SM_NONE) { struct eseq_admin * const ea = &eseq_admin; - uint64_t tot_nrecv = 0, nrecv = 0, nrecv_bytes = 0, nlost = 0; + uint64_t tot_nrecv = 0, tot_nlost = 0, nrecv = 0, nrecv_bytes = 0, nlost = 0; uint32_t last_size = 0; ddsrt_mutex_lock (&ea->lock); for (uint32_t i = 0; i < ea->nph; i++) { struct eseq_stat * const x = &ea->stats[i]; tot_nrecv += x->nrecv; + tot_nlost += x->nlost; nrecv += x->nrecv - x->nrecv_ref; nlost += x->nlost - x->nlost_ref; nrecv_bytes += x->nrecv_bytes - x->nrecv_bytes_ref; @@ -1349,8 +1371,11 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) if (nrecv > 0) { - printf ("%s size %"PRIu32" ntot %"PRIu64" delta: %"PRIu64" lost %"PRIu64" rate %.2f Mb/s\n", - prefix, last_size, tot_nrecv, nrecv, nlost, (double) nrecv_bytes * 8 * 1e3 / (double) (tnow - tprev)); + const double dt = (double) (tnow - tprev); + printf ("%s size %"PRIu32" total %"PRIu64" lost %"PRIu64" delta %"PRIu64" lost %"PRIu64" rate %.2f kS/s %.2f Mb/s\n", + prefix, last_size, tot_nrecv, tot_nlost, nrecv, nlost, + (double) nrecv * 1e6 / dt, (double) nrecv_bytes * 8 * 1e3 / dt); + output = true; } } @@ -1380,8 +1405,8 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) ddsrt_mutex_unlock (&disc_lock); qsort (y.raw, rawcnt, sizeof (*y.raw), cmp_uint64); - printf ("%s %s mean %.3fus min %.3fus 50%% %.3fus 90%% %.3fus 99%% %.3fus max %.3fus cnt %"PRIu32"\n", - prefix, ppinfo, + printf ("%s %s size %"PRIu32" mean %.3fus min %.3fus 50%% %.3fus 90%% %.3fus 99%% %.3fus max %.3fus cnt %"PRIu32"\n", + prefix, ppinfo, topic_payload_size (topicsel, baggagesize), (double) y.sum / (double) y.cnt / 1e3, (double) y.min / 1e3, (double) y.raw[rawcnt - (rawcnt + 1) / 2] / 1e3, @@ -1389,6 +1414,7 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) (double) y.raw[rawcnt - (rawcnt + 99) / 100] / 1e3, (double) y.max / 1e3, y.cnt); + output = true; } newraw = y.raw; @@ -1396,6 +1422,42 @@ static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) } ddsrt_mutex_unlock (&pongstat_lock); free (newraw); + + if (record_cputime (cputime_state, prefix, tnow)) + output = true; + + if (rd_stat) + { +#define MAXS 40 /* 40 participants is enough for everyone! */ + void *raw[MAXS]; + dds_sample_info_t si[MAXS]; + int32_t n; + /* Read everything using a keep-last-1 reader: effectively latching the + most recent value. While not entirely correct, the nature of the process + is such that things should be stable, and this allows printing the stats + always in the same way despite the absence of synchronization. */ + raw[0] = NULL; + if ((n = dds_take_mask (rd_stat, raw, si, MAXS, MAXS, DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE | DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE)) > 0) + { + for (int32_t i = 0; i < n; i++) + if (si[i].valid_data && si[i].sample_state == DDS_SST_NOT_READ) + if (print_cputime (raw[i], prefix, true, true)) + output = true; + dds_return_loan (rd_stat, raw, n); + } + if ((n = dds_read (rd_stat, raw, si, MAXS, MAXS)) > 0) + { + for (int32_t i = 0; i < n; i++) + if (si[i].valid_data) + if (print_cputime (raw[i], prefix, true, si[i].sample_state == DDS_SST_NOT_READ)) + output = true; + dds_return_loan (rd_stat, raw, n); + } +#undef MAXS + } + + if (output) + record_netload (netload_state, prefix, tnow); fflush (stdout); } @@ -1436,6 +1498,23 @@ static uint32_t sigthread (void *varg) error2 ("sigwait failed: %d\n", errno); return 0; } + +#if defined __APPLE__ || defined __linux +static void sigxfsz_handler (int sig __attribute__ ((unused))) +{ + static const char msg[] = "file size limit reached\n"; + static ddsrt_atomic_uint32_t seen = DDSRT_ATOMIC_UINT32_INIT (0); + if (!ddsrt_atomic_or32_ov (&seen, 1)) + { + dds_time_t tnow = dds_time (); + if (write (2, msg, sizeof (msg) - 1) < 0) { + /* may not ignore return value according to Linux/gcc */ + } + print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL); + kill (getpid (), 9); + } +} +#endif #endif /******************** @@ -1449,7 +1528,7 @@ static void usage (void) %s [OPTIONS] MODE...\n\ \n\ OPTIONS:\n\ - -T KS|K32|K256|OU topic:\n\ + -T KS|K32|K256|OU topic (KS is default):\n\ KS seq num, key value, sequence-of-octets\n\ K32 seq num, key value, array of 24 octets\n\ K256 seq num, key value, array of 248 octets\n\ @@ -1465,7 +1544,7 @@ OPTIONS:\n\ -M DUR require those participants to match within DUR seconds\n\ \n\ MODE... is zero or more of:\n\ - ping [R[Hz]] [waitset|listener]\n\ + ping [R[Hz]] [size N] [waitset|listener]\n\ Send a ping upon receiving all expected pongs, or send a ping at\n\ rate R (optionally suffixed with Hz). The triggering mode is either\n\ a listener (default, unless -L has been specified) or a waitset.\n\ @@ -1476,7 +1555,7 @@ MODE... is zero or more of:\n\ sub [waitset|listener|polling]\n\ Subscribe to data, with calls to take occurring either in a listener\n\ (default), when a waitset is triggered, or by polling at 1kHz.\n\ - pub [R[Hz]] [burst N] [[ping] X%%]\n\ + pub [R[Hz]] [size N] [burst N] [[ping] X%%]\n\ Publish bursts of data at rate R, optionally suffixed with Hz. If\n\ no rate is given or R is \"inf\", data is published as fast as\n\ possible. Each burst is a single sample by default, but can be set\n\ @@ -1484,6 +1563,11 @@ MODE... is zero or more of:\n\ If desired, a fraction of the samples can be treated as if it were a\n\ ping, for this, specify a percentage either as \"ping X%%\" (the\n\ \"ping\" keyword is optional, the %% sign is not).\n\ +\n\ + Payload size (including fixed part of topic) may be set as part of a\n\ + \"ping\" or \"pub\" specification for topic KS (there is only size,\n\ + the last one given determines it for all) and should be either 0 (minimal,\n\ + equivalent to 12) or >= 12.\n\ \n\ If no MODE specified, it defaults to a 1Hz ping + responding to any pings.\n\ ", argv0, argv0); @@ -1516,7 +1600,7 @@ static int exact_string_int_map_lookup (const struct string_int_map_elem *elems, if (strcmp (elems[i].name, str) == 0) return elems[i].value; if (notfound_error) - error3 ("%s: undefined %s", str, label); + error3 ("%s: undefined %s\n", str, label); return -1; } @@ -1538,12 +1622,30 @@ static int string_int_map_lookup (const struct string_int_map_elem *elems, const } } if (ambiguous) - error3 ("%s: ambiguous %sspecification", str, label); + error3 ("%s: ambiguous %sspecification\n", str, label); if (match == SIZE_MAX && notfound_error) - error3 ("%s: undefined %s", str, label); + error3 ("%s: undefined %s\n", str, label); return (match == SIZE_MAX) ? -1 : elems[match].value; } +static bool set_simple_uint32 (int *xoptind, int xargc, char * const xargv[], const char *token, uint32_t *val) +{ + if (strcmp (xargv[*xoptind], token) != 0) + return false; + else + { + unsigned x; + int pos; + if (++(*xoptind) == xargc) + error3 ("argument missing in %s specification\n", token); + if (sscanf (xargv[*xoptind], "%u%n", &x, &pos) == 1 && xargv[*xoptind][pos] == 0) + *val = x; + else + error3 ("%s: invalid %s specification\n", xargv[*xoptind], token); + return true; + } +} + static void set_mode_ping (int *xoptind, int xargc, char * const xargv[]) { ping_intv = 0; @@ -1562,6 +1664,10 @@ static void set_mode_ping (int *xoptind, int xargc, char * const xargv[]) else if (ping_rate > 0) ping_intv = (dds_duration_t) (1e9 / ping_rate + 0.5); else error3 ("%s: invalid ping rate\n", xargv[*xoptind]); } + else if (set_simple_uint32 (xoptind, xargc, xargv, "size", &baggagesize)) + { + /* no further work needed */ + } else { pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "ping mode", xargv[*xoptind], true); @@ -1614,15 +1720,13 @@ static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) if (r < 0) error3 ("%s: invalid publish rate\n", xargv[*xoptind]); pub_rate = r; } - else if (strcmp (xargv[*xoptind], "burst") == 0) + else if (set_simple_uint32 (xoptind, xargc, xargv, "burst", &burstsize)) { - unsigned b; - if (++(*xoptind) == xargc) - error3 ("argument missing in burst size specification\n"); - if (sscanf (xargv[*xoptind], "%u%n", &b, &pos) == 1 && xargv[*xoptind][pos] == 0) - burstsize = b; - else - error3 ("%s: invalid burst size specification\n", xargv[*xoptind]); + /* no further work needed */ + } + else if (set_simple_uint32 (xoptind, xargc, xargv, "size", &baggagesize)) + { + /* no further work needed */ } else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && strcmp (xargv[*xoptind] + pos, "%") == 0) { @@ -1675,28 +1779,43 @@ int main (int argc, char *argv[]) dds_qos_t *qos; dds_listener_t *listener; int opt; + bool collect_stats = false; + dds_time_t tref = DDS_INFINITY; ddsrt_threadattr_t attr; ddsrt_thread_t pubtid, subtid, subpingtid, subpongtid; #if !_WIN32 && !DDSRT_WITH_FREERTOS sigset_t sigset, osigset; ddsrt_thread_t sigtid; #endif + char netload_if[256]; + double netload_bw = 0; ddsrt_threadattr_init (&attr); argv0 = argv[0]; if (argc == 2 && strcmp (argv[1], "help") == 0) usage (); - while ((opt = getopt (argc, argv, "D:n:z:k:uLT:M:N:h")) != EOF) + while ((opt = getopt (argc, argv, "cd:D:n:k:uLK:T:M:N:R:h")) != EOF) { switch (opt) { + case 'c': collect_stats = true; break; + case 'd': { + char *col; + int pos; + ddsrt_strlcpy (netload_if, optarg, sizeof (netload_if)); + if ((col = strrchr (netload_if, ':')) == NULL || col == netload_if || + (sscanf (col+1, "%lf%n", &netload_bw, &pos) != 1 || (col+1)[pos] != 0)) + error3 ("-d%s: expected DEVICE:BANDWIDTH\n", optarg); + *col = 0; + break; + } case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break; case 'n': nkeyvals = (unsigned) atoi (optarg); break; case 'u': reliable = false; break; case 'k': histdepth = atoi (optarg); if (histdepth < 0) histdepth = 0; break; case 'L': ignorelocal = DDS_IGNORELOCAL_NONE; break; - case 'T': + case 'T': case 'K': /* 'K' because of my muscle memory with pubsub ... */ if (strcmp (optarg, "KS") == 0) topicsel = KS; else if (strcmp (optarg, "K32") == 0) topicsel = K32; else if (strcmp (optarg, "K256") == 0) topicsel = K256; @@ -1705,7 +1824,7 @@ int main (int argc, char *argv[]) break; case 'M': maxwait = atof (optarg); if (maxwait <= 0) maxwait = HUGE_VAL; break; case 'N': minmatch = (unsigned) atoi (optarg); break; - case 'z': baggagesize = (unsigned) atoi (optarg); break; + case 'R': tref = 0; sscanf (optarg, "%"SCNd64, &tref); break; case 'h': usage (); break; default: error3 ("-%c: unknown option\n", opt); break; } @@ -1717,12 +1836,18 @@ int main (int argc, char *argv[]) if (topicsel == OU && nkeyvals != 1) error3 ("-n%u invalid: topic OU has no key\n", nkeyvals); if (topicsel != KS && baggagesize != 0) - error3 ("-z%u invalid: only topic KS has a sequence\n", baggagesize); + error3 ("size %"PRIu32" invalid: only topic KS has a sequence\n", baggagesize); if (baggagesize != 0 && baggagesize < 12) - error3 ("-z%u invalid: too small to allow for overhead\n", baggagesize); + error3 ("size %"PRIu32" invalid: too small to allow for overhead\n", baggagesize); else if (baggagesize > 0) baggagesize -= 12; + struct record_netload_state *netload_state; + if (netload_bw <= 0) + netload_state = NULL; + else if ((netload_state = record_netload_new (netload_if, netload_bw)) == NULL) + error3 ("can't get network utilization information for device %s\n", netload_if); + ddsrt_avl_init (&ppants_td, &ppants); ddsrt_fibheap_init (&ppants_to_match_fhd, &ppants_to_match); @@ -1735,7 +1860,7 @@ int main (int argc, char *argv[]) qos = dds_create_qos (); /* set user data: magic cookie, whether we have a reader for the Data topic - (all other endpoints always exist), and our hostname */ + (all other endpoints always exist), pid and hostname */ { unsigned pos; char udata[256]; @@ -1760,6 +1885,12 @@ int main (int argc, char *argv[]) error2 ("dds_create_publisher failed: %d\n", (int) dp); dds_delete_qos (qos); + qos = dds_create_qos (); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS (100)); + if ((tp_stat = dds_create_topic (dp, &CPUStats_desc, "DDSPerfCPUStats", qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", "DDSPerfCPUStats", (int) tp_stat); + dds_delete_qos (qos); + { const char *tp_suf = ""; const dds_topic_descriptor_t *tp_desc = NULL; @@ -1797,6 +1928,19 @@ int main (int argc, char *argv[]) if ((rd_publications = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0) error2 ("dds_create_reader(publications) failed: %d\n", (int) rd_publications); + /* stats writer always exists, reader only when we were requested to collect & print stats */ + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); + dds_qset_ignorelocal (qos, DDS_IGNORELOCAL_PARTICIPANT); + if ((wr_stat = dds_create_writer (pub, tp_stat, qos, NULL)) < 0) + error2 ("dds_create_writer(statistics) failed: %d\n", (int) wr_stat); + if (collect_stats) + { + if ((rd_stat = dds_create_reader (sub, tp_stat, qos, NULL)) < 0) + error2 ("dds_create_reader(statistics) failed: %d\n", (int) rd_stat); + } + dds_delete_qos (qos); + /* ping reader/writer uses keep-last-1 history; not checking matching on these (yet) */ qos = dds_create_qos (); dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); @@ -1882,6 +2026,9 @@ int main (int argc, char *argv[]) sigaddset (&sigset, SIGTERM); sigprocmask (SIG_BLOCK, &sigset, &osigset); ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset); +#if defined __APPLE__ || defined __linux + signal (SIGXFSZ, sigxfsz_handler); +#endif #endif /* Make publisher & subscriber thread arguments and start the threads we @@ -1920,8 +2067,8 @@ int main (int argc, char *argv[]) const bool pingpong_waitset = (ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE) || pingpongmode == SM_WAITSET; if (pingpong_waitset) { - ddsrt_thread_create (&subpingtid, "sub", &attr, subpingthread_waitset, &subarg_pong); - ddsrt_thread_create (&subpongtid, "sub", &attr, subpongthread_waitset, &subarg_pong); + ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &subarg_pong); + ddsrt_thread_create (&subpongtid, "pong", &attr, subpongthread_waitset, &subarg_pong); } else { @@ -1929,10 +2076,16 @@ int main (int argc, char *argv[]) set_data_available_listener (rd_pong, "rd_pong", pong_available_listener, &subarg_pong); } + /* Have to do this after all threads have been created because it caches the list */ + struct record_cputime_state *cputime_state; + cputime_state = record_cputime_new (wr_stat); + /* Run until time limit reached or a signal received. (The time calculations ignore the possibility of overflow around the year 2260.) */ dds_time_t tnow = dds_time (); const dds_time_t tstart = tnow; + if (tref == DDS_INFINITY) + tref = tstart; dds_time_t tmatch = (maxwait == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (maxwait * 1e9 + 0.5); const dds_time_t tstop = (dur == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (dur * 1e9 + 0.5); dds_time_t tnext = tstart + DDS_SECS (1); @@ -2003,7 +2156,7 @@ int main (int argc, char *argv[]) tnow = dds_time (); if (tnext <= tnow) { - print_stats (tstart, tnow, tlast); + print_stats (tref, tnow, tlast, cputime_state, netload_state); tlast = tnow; if (tnow > tnext + DDS_MSECS (500)) tnext = tnow + DDS_SECS (1); @@ -2021,6 +2174,8 @@ int main (int argc, char *argv[]) maybe_send_new_ping (tnow, &tnextping); } } + record_netload_free (netload_state); + record_cputime_free (cputime_state); #if _WIN32 signal_handler (SIGINT); diff --git a/src/tools/ddsperf/ddsperf_types.idl b/src/tools/ddsperf/ddsperf_types.idl index 806f30d..eb2efcd 100644 --- a/src/tools/ddsperf/ddsperf_types.idl +++ b/src/tools/ddsperf/ddsperf_types.idl @@ -27,3 +27,20 @@ struct KeyedSeq sequence baggage; }; #pragma keylist KeyedSeq keyval + +struct CPUStatThread +{ + string name; + long u_pct; + long s_pct; +}; + +struct CPUStats +{ + string hostname; + unsigned long pid; + double maxrss; + boolean some_above; + sequence cpu; +}; +#pragma keylist CPUStats hostname pid diff --git a/src/tools/ddsperf/netload.c b/src/tools/ddsperf/netload.c new file mode 100644 index 0000000..ae716a5 --- /dev/null +++ b/src/tools/ddsperf/netload.c @@ -0,0 +1,116 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include +#include + +#include "dds/dds.h" + +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/netstat.h" + +#include "netload.h" + +#if DDSRT_HAVE_NETSTAT + +struct record_netload_state { + struct ddsrt_netstat_control *ctrl; + char *name; + double bw; + bool errored; + bool data_valid; + dds_time_t tprev; + uint64_t ibytes; + uint64_t obytes; +}; + +void record_netload (struct record_netload_state *st, const char *prefix, dds_time_t tnow) +{ + if (st && !st->errored) + { + struct ddsrt_netstat x; + dds_return_t ret = ddsrt_netstat_get (st->ctrl, &x); + st->errored = (ret == DDS_RETCODE_ERROR); + if (ret == DDS_RETCODE_OK) + { + if (st->data_valid) + { + /* interface speeds are in bits/s, so convert bytes to bits */ + const double dx = 8 * (double) (x.obytes - st->obytes); + const double dr = 8 * (double) (x.ibytes - st->ibytes); + const double dt = (double) (tnow - st->tprev) / 1e9; + const double dxpct = 100.0 * dx / dt / st->bw; + const double drpct = 100.0 * dr / dt / st->bw; + if (dxpct >= 0.5 || drpct >= 0.5) + { + printf ("%s %s: xmit %.0f%% recv %.0f%% [%"PRIu64" %"PRIu64"]\n", + prefix, st->name, dxpct, drpct, x.obytes, x.ibytes); + } + } + st->obytes = x.obytes; + st->ibytes = x.ibytes; + st->tprev = tnow; + st->data_valid = true; + } + } +} + +struct record_netload_state *record_netload_new (const char *dev, double bw) +{ + struct record_netload_state *st = ddsrt_malloc (sizeof (*st)); + if (ddsrt_netstat_new (&st->ctrl, dev) != DDS_RETCODE_OK) + { + ddsrt_free (st); + return NULL; + } + st->name = ddsrt_strdup (dev); + st->bw = bw; + st->data_valid = false; + st->errored = false; + record_netload (st, NULL, dds_time ()); + return st; +} + +void record_netload_free (struct record_netload_state *st) +{ + if (st) + { + ddsrt_netstat_free (st->ctrl); + ddsrt_free (st->name); + ddsrt_free (st); + } +} + +#else + +void record_netload (struct record_netload_state *st, const char *prefix, dds_time_t tnow) +{ + (void) st; + (void) prefix; + (void ) tnow; +} + +struct record_netload_state *record_netload_new (const char *dev, double bw) +{ + (void) dev; + (void) bw; + return NULL; +} + +void record_netload_free (struct record_netload_state *st) +{ + (void) st; +} + +#endif diff --git a/src/tools/ddsperf/netload.h b/src/tools/ddsperf/netload.h new file mode 100644 index 0000000..27a896d --- /dev/null +++ b/src/tools/ddsperf/netload.h @@ -0,0 +1,23 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef NETLOAD_H +#define NETLOAD_H + +#include + +struct record_netload_state; + +void record_netload (struct record_netload_state *st, const char *prefix, dds_time_t tnow); +struct record_netload_state *record_netload_new (const char *dev, double bw); +void record_netload_free (struct record_netload_state *st); + +#endif