diff --git a/examples/perfscript/throughput-test b/examples/perfscript/throughput-test index d654056..845768c 100755 --- a/examples/perfscript/throughput-test +++ b/examples/perfscript/throughput-test @@ -1,7 +1,7 @@ #!/bin/bash export nwif=eth0 -bandwidth=1e9 +bandwidth="" remotedir="$PWD" provision=false asynclist="sync async" @@ -10,15 +10,19 @@ sizelist="0 20 50 100 200 500 1000 2000 5000 10000 20000 50000 100000 200000 500 timeout=30 loopback=true resultdir="throughput-result" +force=false +netstats=false usage () { cat >&2 <$cfg < - - 17 + + + \${nwif} + $loopback + + + \${async:-0} + 2s + + + fine + \${trace} + \${logdir}/cdds.log + - - $nwif - $loopback - 65500B - 4000B - - - - 500kB - - \${async:-0} - 3s - - - config - EOF @@ -94,11 +98,20 @@ if [ ! -x bin/ddsperf ] ; then exit 1 fi +if [ ! -d $resultdir ] ; then + mkdir $resultdir +elif $force ; then + rm -rf $resultdir + mkdir $resultdir +elif [ `ls $resultdir | wc -l` -gt 0 ] ; then + echo "output directory $resultdir is non-empty" >&2 + exit 1 +fi [ -d $resultdir ] || { echo "output directory $resultdir doesn't exist" >&2 ; exit 1 ; } if $provision ; then echo "provisioning ..." - for r in $pubremote "$@" ; do + for r in "$@" ; do ssh $r mkdir -p $remotedir $remotedir/bin $remotedir/lib scp lib/libddsc.so.0 $r:$remotedir/lib scp bin/ddsperf $r:$remotedir/bin @@ -109,7 +122,7 @@ topic=KS [ -z "$sizelist" ] && topic=OU export CYCLONEDDS_URI=file://$PWD/$cfg -for r in $pubremote "$@" ; do +for r in "$@" ; do scp $cfg $r:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 1 ; } done @@ -123,45 +136,70 @@ for async_mode in $asynclist ; do for sub_mode in $modelist ; do echo "======== ASYNC $async MODE $sub_mode =========" - cat > run-publisher.tmp < run-subscriber.tmp < pub.log - sleep 5 -done -wait +#/usr/sbin/tcpdump -c 20000 -s 0 -w /tmp/x.pcap -i eth0 -Z erik 'udp[8:4]=0x52545053' & tcpdumppid=\$! +bin/ddsperf -1 -d $rnwif$bandwidth -c -T $topic sub $sub_mode > sub.log & pid=\$! +echo \$pid > throughput-test-sub-\$pid.pid +wait \$pid +#kill -INT \$tcpdumppid EOF - scp run-publisher.tmp $pubremote:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 2 ; } - killremotesubs="" - if [ $# -gt 0 ] ; then - cat > run-subscriber.tmp < /dev/null & -echo \$! -EOF - for r in "$@" ; do - scp run-subscriber.tmp $r:$remotedir - rsubpid=`ssh $r ". $remotedir/run-subscriber.tmp"` - killremotesubs="$killremotesubs ssh $r kill -9 $rsubpid &" - done - fi + for r in "$@" ; do + scp run-subscriber.tmp $r:$remotedir + ssh $r ". $remotedir/run-subscriber.tmp" & subpids="$subpids $!" + done outdir=$resultdir/$async_mode-$sub_mode mkdir $outdir + rm -f $outdir/pub.log + export logdir=$outdir + if $netstats ; then + ping `echo $1 | sed -e 's/.*@//'` > $outdir/ping.log & netstats_pids=$! + if [ "`uname -s`" = "Darwin" ] ; then + rm -f $outdir/netstat.log + bash -c "while true ; do /System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport -I >> $outdir/netstat.log ; sleep 1; done" & netstats_pids="$netstats_pids $!" + fi + fi + for size in ${sizelist:-0} ; do + echo "size $size" + #export trace=trace,-content + bin/ddsperf -Q minmatch:$# -Q initwait:3 \ + -c -d $nwif$bandwidth \ + -D $timeout -T $topic \ + pub size $size | \ + tee -a $outdir/pub.log + done - bin/ddsperf -d $nwif:$bandwidth -c -T $topic sub $sub_mode > $outdir/sub.log & spid=$! - tail -f $outdir/sub.log & xpid=$! - ssh $pubremote ". $remotedir/run-publisher.tmp" - kill $spid - eval $killremotesubs - sleep 1 - kill $xpid + if $netstats ; then + kill $netstats_pids + fi + for r in "$@" ; do + ssh $r "kill -9 \`cat $remotedir/throughput-test-sub-*.pid\` ; rm $remotedir/throughput-test-sub-*.pid" + done wait - scp $pubremote:$remotedir/pub.log $outdir + for r in "$@" ; do + scp $r:$remotedir/sub.log $outdir/sub-$r.log + done + + # write a summary, one line per second + # col 0: raw network receive bandwidth (Mb/s) + # col 1: appl receive bandwidth, 1s trailing average (Mb/s) + # col 2: appl receive bandwidth, 10s trailing average (Mb/s) + # (this assumes the network interface name is eth, en, or lo, optionally followed by a 0) + perl -ne 'if(/size \d+ total.* (\d+\.\d+) Mb\/s.* (\d+\.\d+) Mb\/s/){$r=$1;$r10=$2;}if(/(?:eth|en|lo)0?: xmit.*? recv (\d+\.\d+)/){printf "%f %f %f\n", $1, $r, $r10;}' $outdir/sub-$1.log > $resultdir/summary-$async_mode-$sub_mode.txt + perl -ne 'print "$1 $2\n" if /^(\d+)\s+(\d+)(\s+\d+)$/' $outdir/pub.log > $resultdir/rexmit-bytes.txt + perl -ne 'print "$1 $2\n" if /^DISCARDED\s+(\d+)\s+(\d+)$/' $outdir/sub-$1.log > $resultdir/discarded.txt + if $netstats ; then + perl -ne 'print "$1\n" if /time=([0-9.]+)/' $outdir/ping.log > $resultdir/lat.log + if [ "`uname -s`" = "Darwin" ] ; then + perl -ne 'if(/CtlRSSI:\s*(-?\d+)/){$rssi=$1;}if(/CtlNoise:\s*(-?\d+)/){$noise=$1;}if(/maxRate:\s*(\d+)/){$max=$1;}if(/lastTxRate:\s*(\d+)/){$lasttx=$1;print "$max $lasttx $rssi $noise\n"}' $outdir/netstat.log > $resultdir/net.log + fi + fi done done diff --git a/src/tools/ddsperf/cputime.c b/src/tools/ddsperf/cputime.c index 6788e4a..ce96708 100644 --- a/src/tools/ddsperf/cputime.c +++ b/src/tools/ddsperf/cputime.c @@ -41,7 +41,6 @@ bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_hos { char line[512]; size_t pos = 0; - assert (is_fresh || !print_host); pos += (size_t) snprintf (line + pos, sizeof (line) - pos, "%s", prefix); if (!is_fresh) pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " (stale)"); diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index d755b49..e6e8e50 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -93,6 +93,9 @@ static enum topicsel topicsel = KS; static enum submode submode = SM_LISTENER; static enum submode pingpongmode = SM_LISTENER; +/* Whether to show "sub" stats every second even when nothing happens */ +static bool substat_every_second = false; + /* Size of the sequence in KeyedSeq type in bytes */ static uint32_t baggagesize = 0; @@ -105,6 +108,9 @@ static double dur = HUGE_VAL; /* Minimum number of peers (if not met, exit status is 1) */ static uint32_t minmatch = 0; +/* Wait this long for MINMATCH peers before starting */ +static double initmaxwait = 0; + /* Maximum time it may take to discover all MINMATCH peers */ static double maxwait = HUGE_VAL; @@ -189,9 +195,12 @@ struct eseq_stat { uint32_t last_size; /* stats printer state */ - uint64_t nrecv_ref; - uint64_t nlost_ref; - uint64_t nrecv_bytes_ref; + struct { + uint64_t nrecv; + uint64_t nlost; + uint64_t nrecv_bytes; + } ref[10]; + unsigned refidx; }; struct eseq_admin { @@ -1376,30 +1385,39 @@ static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str if (submode != SM_NONE) { struct eseq_admin * const ea = &eseq_admin; - uint64_t tot_nrecv = 0, tot_nlost = 0, nrecv = 0, nrecv_bytes = 0, nlost = 0; + uint64_t tot_nrecv = 0, tot_nlost = 0, nlost = 0; + uint64_t nrecv = 0, nrecv_bytes = 0; + uint64_t nrecv10s = 0, nrecv10s_bytes = 0; uint32_t last_size = 0; ddsrt_mutex_lock (&ea->lock); for (uint32_t i = 0; i < ea->nph; i++) { struct eseq_stat * const x = &ea->stats[i]; + unsigned refidx1s = (x->refidx == 0) ? (unsigned) (sizeof (x->ref) / sizeof (x->ref[0]) - 1) : (x->refidx - 1); + unsigned refidx10s = x->refidx; tot_nrecv += x->nrecv; tot_nlost += x->nlost; - nrecv += x->nrecv - x->nrecv_ref; - nlost += x->nlost - x->nlost_ref; - nrecv_bytes += x->nrecv_bytes - x->nrecv_bytes_ref; + nrecv += x->nrecv - x->ref[refidx1s].nrecv; + nlost += x->nlost - x->ref[refidx1s].nlost; + nrecv_bytes += x->nrecv_bytes - x->ref[refidx1s].nrecv_bytes; + nrecv10s += x->nrecv - x->ref[refidx10s].nrecv; + nrecv10s_bytes += x->nrecv_bytes - x->ref[refidx10s].nrecv_bytes; last_size = x->last_size; - x->nrecv_ref = x->nrecv; - x->nlost_ref = x->nlost; - x->nrecv_bytes_ref = x->nrecv_bytes; + x->ref[x->refidx].nrecv = x->nrecv; + x->ref[x->refidx].nlost = x->nlost; + x->ref[x->refidx].nrecv_bytes = x->nrecv_bytes; + if (++x->refidx == (unsigned) (sizeof (x->ref) / sizeof (x->ref[0]))) + x->refidx = 0; } ddsrt_mutex_unlock (&ea->lock); - if (nrecv > 0) + if (nrecv > 0 || substat_every_second) { const double dt = (double) (tnow - tprev); - printf ("%s size %"PRIu32" total %"PRIu64" lost %"PRIu64" delta %"PRIu64" lost %"PRIu64" rate %.2f kS/s %.2f Mb/s\n", + printf ("%s size %"PRIu32" total %"PRIu64" lost %"PRIu64" delta %"PRIu64" lost %"PRIu64" rate %.2f kS/s %.2f Mb/s (%.2f kS/s %.2f Mb/s)\n", prefix, last_size, tot_nrecv, tot_nlost, nrecv, nlost, - (double) nrecv * 1e6 / dt, (double) nrecv_bytes * 8 * 1e3 / dt); + (double) nrecv * 1e6 / dt, (double) nrecv_bytes * 8 * 1e3 / dt, + (double) nrecv10s * 1e6 / (10 * dt), (double) nrecv10s_bytes * 8 * 1e3 / (10 * dt)); output = true; } } @@ -1577,10 +1595,22 @@ OPTIONS:\n\ samples:N min received messages by \"sub\"\n\ roundtrips:N min roundtrips for \"pong\"\n\ minmatch:N require >= N matching participants\n\ + initwait:DUR wait for those participants before\n\ + starting, abort if not within DUR\n\ + seconds\n\ maxwait:DUR require those participants to match\n\ within DUR seconds\n\ -R TREF timestamps in the output relative to TREF instead of\n\ process start\n\ + -W DUR wait at most DUR seconds for the minimum required\n\ + number of matching participants (set by -Qminmatch:N)\n\ + to show up before starting reading/writing data,\n\ + terminate with an error otherwise. (This differs\n\ + from -Qmaxwait:DUR because that doesn't delay starting\n\ + and doesn't terminate the process before doing\n\ + anything.)\n\ + -1 print \"sub\" stats every second, even when there is\n\ + data\n\ -i ID use domain ID instead of the default domain\n\ \n\ MODE... is zero or more of:\n\ @@ -1884,24 +1914,30 @@ int main (int argc, char *argv[]) ddsrt_thread_t sigtid; #endif char netload_if[256]; - double netload_bw = 0; + double netload_bw = -1; + double rss_init = 0.0, rss_final = 0.0; ddsrt_threadattr_init (&attr); argv0 = argv[0]; - while ((opt = getopt (argc, argv, "cd:D:i:n:k:uLK:T:Q:R:h")) != EOF) + while ((opt = getopt (argc, argv, "1cd:D:i:n:k:uLK:T:Q:R:h")) != EOF) { int pos; switch (opt) { + case '1': substat_every_second = true; break; case 'c': collect_stats = true; break; case 'd': { char *col; (void) ddsrt_strlcpy (netload_if, optarg, sizeof (netload_if)); - if ((col = strrchr (netload_if, ':')) == NULL || col == netload_if || - (sscanf (col+1, "%lf%n", &netload_bw, &pos) != 1 || (col+1)[pos] != 0)) - error3 ("-d %s: expected DEVICE:BANDWIDTH\n", optarg); - *col = 0; + if ((col = strrchr (netload_if, ':')) == NULL) + netload_bw = 0; + else + { + if (col == netload_if || (sscanf (col+1, "%lf%n", &netload_bw, &pos) != 1 || (col+1)[pos] != 0)) + error3 ("-d %s: expected DEVICE:BANDWIDTH\n", optarg); + *col = 0; + } break; } case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break; @@ -1931,6 +1967,8 @@ int main (int argc, char *argv[]) min_roundtrips = (uint64_t) n; } else if (sscanf (optarg, "maxwait:%lf%n", &maxwait, &pos) == 1 && optarg[pos] == 0) { maxwait = (maxwait <= 0) ? HUGE_VAL : maxwait; + } else if (sscanf (optarg, "initwait:%lf%n", &initmaxwait, &pos) == 1 && optarg[pos] == 0) { + initmaxwait = (initmaxwait <= 0) ? 0 : initmaxwait; } else if (sscanf (optarg, "minmatch:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { minmatch = (uint32_t) n; } else { @@ -1972,7 +2010,7 @@ int main (int argc, char *argv[]) baggagesize -= 12; struct record_netload_state *netload_state; - if (netload_bw <= 0) + if (netload_bw < 0) netload_state = NULL; else if ((netload_state = record_netload_new (netload_if, netload_bw)) == NULL) error3 ("can't get network utilization information for device %s\n", netload_if); @@ -2036,7 +2074,7 @@ int main (int argc, char *argv[]) snprintf (tpname_ping, sizeof (tpname_ping), "DDSPerf%cPing%s", reliable ? 'R' : 'U', tp_suf); snprintf (tpname_pong, sizeof (tpname_pong), "DDSPerf%cPong%s", reliable ? 'R' : 'U', tp_suf); qos = dds_create_qos (); - dds_qset_reliability (qos, reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (1)); + dds_qset_reliability (qos, reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10)); if ((tp_data = dds_create_topic (dp, tp_desc, tpname_data, qos, NULL)) < 0) error2 ("dds_create_topic(%s) failed: %d\n", tpname_data, (int) tp_data); if ((tp_ping = dds_create_topic (dp, tp_desc, tpname_ping, qos, NULL)) < 0) @@ -2157,20 +2195,6 @@ int main (int argc, char *argv[]) if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) error2 ("dds_waitset_attach(main, termcond) failed: %d\n", (int) rc); - /* I hate Unix signals in multi-threaded processes ... */ -#ifdef _WIN32 - signal (SIGINT, signal_handler); -#elif !DDSRT_WITH_FREERTOS - sigemptyset (&sigset); - sigaddset (&sigset, SIGINT); - sigaddset (&sigset, SIGTERM); - sigprocmask (SIG_BLOCK, &sigset, &osigset); - ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset); -#if defined __APPLE__ || defined __linux - signal (SIGXFSZ, sigxfsz_handler); -#endif -#endif - /* Make publisher & subscriber thread arguments and start the threads we need (so what if we allocate memory for reading data even if we don't have a reader or will never really be receiving data) */ @@ -2191,6 +2215,34 @@ int main (int argc, char *argv[]) memset (&subtid, 0, sizeof (subtid)); memset (&subpingtid, 0, sizeof (subpingtid)); memset (&subpongtid, 0, sizeof (subpongtid)); + + /* Just before starting the threads but after setting everything up, wait for + the required number of peers, if requested to do so */ + if (initmaxwait > 0) + { + dds_time_t tnow = dds_time (); + const dds_time_t tendwait = tnow + (dds_duration_t) (initmaxwait * 1e9); + ddsrt_mutex_lock (&disc_lock); + while (matchcount < minmatch && tnow < tendwait) + { + ddsrt_mutex_unlock (&disc_lock); + dds_sleepfor (DDS_MSECS (100)); + ddsrt_mutex_lock (&disc_lock); + tnow = dds_time (); + } + const bool ok = (matchcount >= minmatch); + if (!ok) + { + /* set minmatch to an impossible value to avoid a match occurring between now and + the determining of the exit status from causing a successful return */ + minmatch = UINT32_MAX; + } + ddsrt_mutex_unlock (&disc_lock); + if (!ok) + goto err_minmatch_wait; + dds_sleepfor (DDS_MSECS (100)); + } + if (pub_rate > 0) ddsrt_thread_create (&pubtid, "pub", &attr, pubthread, NULL); if (subthread_func != 0) @@ -2220,6 +2272,21 @@ int main (int argc, char *argv[]) struct record_cputime_state *cputime_state; cputime_state = record_cputime_new (wr_stat); + /* I hate Unix signals in multi-threaded processes ... */ +#ifdef _WIN32 + signal (SIGINT, signal_handler); +#elif !DDSRT_WITH_FREERTOS + sigemptyset (&sigset); + sigaddset (&sigset, SIGHUP); + sigaddset (&sigset, SIGINT); + sigaddset (&sigset, SIGTERM); + sigprocmask (SIG_BLOCK, &sigset, &osigset); + ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset); +#if defined __APPLE__ || defined __linux + signal (SIGXFSZ, sigxfsz_handler); +#endif +#endif + /* Run until time limit reached or a signal received. (The time calculations ignore the possibility of overflow around the year 2260.) */ dds_time_t tnow = dds_time (); @@ -2231,7 +2298,6 @@ int main (int argc, char *argv[]) dds_time_t tnext = tstart + DDS_SECS (1); dds_time_t tlast = tstart; dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv; - double rss_init = 0.0, rss_final = 0.0; while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop) { dds_time_t twakeup = DDS_NEVER; @@ -2350,6 +2416,7 @@ int main (int argc, char *argv[]) ddsrt_thread_join (subpongtid, NULL); } +err_minmatch_wait: /* stop the listeners before deleting the readers: otherwise they may still try to access a reader that has already become inaccessible (not quite good, but ...) */ @@ -2420,7 +2487,7 @@ int main (int argc, char *argv[]) if (matchcount < minmatch) { - printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32" instead of %"PRIu32")\n", ddsrt_getpid (), matchcount, minmatch); + printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32")\n", ddsrt_getpid (), matchcount); ok = false; } if (nlost > 0 && (reliable && histdepth == 0)) diff --git a/src/tools/ddsperf/netload.c b/src/tools/ddsperf/netload.c index f186e44..928adac 100644 --- a/src/tools/ddsperf/netload.c +++ b/src/tools/ddsperf/netload.c @@ -47,15 +47,23 @@ void record_netload (struct record_netload_state *st, const char *prefix, dds_ti if (st->data_valid) { /* interface speeds are in bits/s, so convert bytes to bits */ - const double dx = 8 * (double) (x.obytes - st->obytes); - const double dr = 8 * (double) (x.ibytes - st->ibytes); const double dt = (double) (tnow - st->tprev) / 1e9; - const double dxpct = 100.0 * dx / dt / st->bw; - const double drpct = 100.0 * dr / dt / st->bw; - if (dxpct >= 0.5 || drpct >= 0.5) + const double dx = 8 * (double) (x.obytes - st->obytes) / dt; + const double dr = 8 * (double) (x.ibytes - st->ibytes) / dt; + if (st->bw > 0) { - printf ("%s %s: xmit %.0f%% recv %.0f%% [%"PRIu64" %"PRIu64"]\n", - prefix, st->name, dxpct, drpct, x.obytes, x.ibytes); + const double dxpct = 100.0 * dx / st->bw; + const double drpct = 100.0 * dr / st->bw; + if (dxpct >= 0.5 || drpct >= 0.5) + { + printf ("%s %s: xmit %.0f%% recv %.0f%% [%"PRIu64" %"PRIu64"]\n", + prefix, st->name, dxpct, drpct, x.obytes, x.ibytes); + } + } + else if (dx >= 1e5 || dr >= 1e5) // 100kb/s is arbitrary + { + printf ("%s %s: xmit %.2f Mb/s recv %.2f Mb/s [%"PRIu64" %"PRIu64"]\n", + prefix, st->name, dx / 1e6, dr / 1e6, x.obytes, x.ibytes); } } st->obytes = x.obytes;