ddsperf and throughput-test script improvements

* Bandwidth usage is now printed in Mb/s if no reference rate is given

* Trailing average rate over the last 10s (approximated as the last 10
  lines of output) is printed

* An option to wait until the expected number of peers is present

* The test script now pushes data to the remotes, instead of using the
  first remote as the publisher

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-06-22 18:03:48 +02:00 committed by eboasson
parent 63b1a7179b
commit b116e6e41e
4 changed files with 217 additions and 105 deletions

View file

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
export nwif=eth0 export nwif=eth0
bandwidth=1e9 bandwidth=""
remotedir="$PWD" remotedir="$PWD"
provision=false provision=false
asynclist="sync async" asynclist="sync async"
@ -10,15 +10,19 @@ sizelist="0 20 50 100 200 500 1000 2000 5000 10000 20000 50000 100000 200000 500
timeout=30 timeout=30
loopback=true loopback=true
resultdir="throughput-result" resultdir="throughput-result"
force=false
netstats=false
usage () { usage () {
cat >&2 <<EOF cat >&2 <<EOF
usage: $0 [OPTIONS] user@remote [user@remote...] usage: $0 [OPTIONS] user@remote [user@remote...]
OPTIONS OPTIONS
-i IF use network interface IF (default: $nwif) -i IF use network interface IF (default: $nwif) local
-I IF use network interface IF remotely (default: same as local)
-b 100|1000|B network bandwidth (100Mbps/1000Gbps or B bits/sec) for -b 100|1000|B network bandwidth (100Mbps/1000Gbps or B bits/sec) for
calculating load % given load in bytes/second (default: 1000) calculating load % given load in bytes/second (default:
report Mb/s)
-d DIR use DIR on remote (default: PWD) -d DIR use DIR on remote (default: PWD)
-p provision required binaries in DIR (default: $provision) -p provision required binaries in DIR (default: $provision)
first ssh's in to try mkdir -p DIR, then follows up with scp first ssh's in to try mkdir -p DIR, then follows up with scp
@ -31,6 +35,8 @@ OPTIONS
if SIZELIST is empty, it uses ddsperf's OU topic instead if SIZELIST is empty, it uses ddsperf's OU topic instead
-l LOOPBACK enable/disable multicast loopback (true/false, default: $loopback) -l LOOPBACK enable/disable multicast loopback (true/false, default: $loopback)
-o DIR store results in dir (default: $resultdir) -o DIR store results in dir (default: $resultdir)
-f "force": first do "rm -rf" of output dir, then create it
-X run ping to first remote; on macOS: log interface stats
Local host runs "ddsperf" in subscriber mode, first remote runs it publisher Local host runs "ddsperf" in subscriber mode, first remote runs it publisher
mode, further remotes also run subcribers. It assumes these are available in mode, further remotes also run subcribers. It assumes these are available in
@ -40,13 +46,15 @@ EOF
exit 1 exit 1
} }
while getopts "i:b:d:pa:m:s:t:o:l:" opt ; do while getopts "fi:I:b:d:pa:m:s:t:o:l:X" opt ; do
case $opt in case $opt in
f) force=true ;;
i) nwif="$OPTARG" ;; i) nwif="$OPTARG" ;;
I) rnwif="$OPTARG" ;;
b) case "$OPTARG" in b) case "$OPTARG" in
100) bandwidth=1e8 ;; 100) bandwidth=:1e8 ;;
1000) bandwidth=1e9 ;; 1000) bandwidth=:1e9 ;;
*) bandwidth="$OPTARG" ;; *) bandwidth="${OPTARG:+:}$OPTARG" ;;
esac ;; esac ;;
d) remotedir="$OPTARG" ;; d) remotedir="$OPTARG" ;;
p) provision=true ;; p) provision=true ;;
@ -56,36 +64,32 @@ while getopts "i:b:d:pa:m:s:t:o:l:" opt ; do
l) loopback="OPTARG" ;; l) loopback="OPTARG" ;;
t) timeout="$OPTARG" ;; t) timeout="$OPTARG" ;;
o) resultdir="$OPTARG" ;; o) resultdir="$OPTARG" ;;
X) netstats=true ;;
h) usage ;; h) usage ;;
esac esac
done done
shift $((OPTIND-1)) shift $((OPTIND-1))
if [ $# -lt 1 ] ; then usage ; fi if [ $# -lt 1 ] ; then usage ; fi
pubremote=$1
shift
[ -z "$rnwif" ] && rnwif=$nwif
cfg=cdds-simple.xml cfg=cdds-simple.xml
cat >$cfg <<EOF cat >$cfg <<EOF
<CycloneDDS> <CycloneDDS>
<Domain> <Domain id="17">
<Id>17</Id>
</Domain>
<General> <General>
<NetworkInterfaceAddress>$nwif</NetworkInterfaceAddress> <NetworkInterfaceAddress>\${nwif}</NetworkInterfaceAddress>
<EnableMulticastLoopback>$loopback</EnableMulticastLoopback> <EnableMulticastLoopback>$loopback</EnableMulticastLoopback>
<MaxMessageSize>65500B</MaxMessageSize>
<FragmentSize>4000B</FragmentSize>
</General> </General>
<Internal> <Internal>
<Watermarks>
<WhcHigh>500kB</WhcHigh>
</Watermarks>
<SynchronousDeliveryPriorityThreshold>\${async:-0}</SynchronousDeliveryPriorityThreshold> <SynchronousDeliveryPriorityThreshold>\${async:-0}</SynchronousDeliveryPriorityThreshold>
<LeaseDuration>3s</LeaseDuration> <LeaseDuration>2s</LeaseDuration>
</Internal> </Internal>
<Tracing> <Tracing>
<Verbosity>config</Verbosity> <Verbosity>fine</Verbosity>
<Category>\${trace}</Category>
<OutputFile>\${logdir}/cdds.log</OutputFile>
</Tracing> </Tracing>
</Domain>
</CycloneDDS> </CycloneDDS>
EOF EOF
@ -94,11 +98,20 @@ if [ ! -x bin/ddsperf ] ; then
exit 1 exit 1
fi fi
if [ ! -d $resultdir ] ; then
mkdir $resultdir
elif $force ; then
rm -rf $resultdir
mkdir $resultdir
elif [ `ls $resultdir | wc -l` -gt 0 ] ; then
echo "output directory $resultdir is non-empty" >&2
exit 1
fi
[ -d $resultdir ] || { echo "output directory $resultdir doesn't exist" >&2 ; exit 1 ; } [ -d $resultdir ] || { echo "output directory $resultdir doesn't exist" >&2 ; exit 1 ; }
if $provision ; then if $provision ; then
echo "provisioning ..." echo "provisioning ..."
for r in $pubremote "$@" ; do for r in "$@" ; do
ssh $r mkdir -p $remotedir $remotedir/bin $remotedir/lib ssh $r mkdir -p $remotedir $remotedir/bin $remotedir/lib
scp lib/libddsc.so.0 $r:$remotedir/lib scp lib/libddsc.so.0 $r:$remotedir/lib
scp bin/ddsperf $r:$remotedir/bin scp bin/ddsperf $r:$remotedir/bin
@ -109,7 +122,7 @@ topic=KS
[ -z "$sizelist" ] && topic=OU [ -z "$sizelist" ] && topic=OU
export CYCLONEDDS_URI=file://$PWD/$cfg export CYCLONEDDS_URI=file://$PWD/$cfg
for r in $pubremote "$@" ; do for r in "$@" ; do
scp $cfg $r:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 1 ; } scp $cfg $r:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 1 ; }
done done
@ -123,45 +136,70 @@ for async_mode in $asynclist ; do
for sub_mode in $modelist ; do for sub_mode in $modelist ; do
echo "======== ASYNC $async MODE $sub_mode =========" echo "======== ASYNC $async MODE $sub_mode ========="
cat > run-publisher.tmp <<EOF subpids=""
export CYCLONEDDS_URI=file://$remotedir/$cfg
export async=$async
cd $remotedir
for size in ${sizelist:-0} ; do
echo "size \$size"
bin/ddsperf -D $timeout -T $topic pub size \$size > pub.log
sleep 5
done
wait
EOF
scp run-publisher.tmp $pubremote:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 2 ; }
killremotesubs=""
if [ $# -gt 0 ] ; then
cat > run-subscriber.tmp <<EOF cat > run-subscriber.tmp <<EOF
export CYCLONEDDS_URI=file://$remotedir/$cfg export CYCLONEDDS_URI=file://$remotedir/$cfg
export async=$async export async=$async
export nwif=$rnwif
export logdir=.
#export trace=trace,-content
cd $remotedir cd $remotedir
nohup bin/ddsperf -T $topic sub $sub_mode > /dev/null & #/usr/sbin/tcpdump -c 20000 -s 0 -w /tmp/x.pcap -i eth0 -Z erik 'udp[8:4]=0x52545053' & tcpdumppid=\$!
echo \$! bin/ddsperf -1 -d $rnwif$bandwidth -c -T $topic sub $sub_mode > sub.log & pid=\$!
echo \$pid > throughput-test-sub-\$pid.pid
wait \$pid
#kill -INT \$tcpdumppid
EOF EOF
for r in "$@" ; do for r in "$@" ; do
scp run-subscriber.tmp $r:$remotedir scp run-subscriber.tmp $r:$remotedir
rsubpid=`ssh $r ". $remotedir/run-subscriber.tmp"` ssh $r ". $remotedir/run-subscriber.tmp" & subpids="$subpids $!"
killremotesubs="$killremotesubs ssh $r kill -9 $rsubpid &"
done done
fi
outdir=$resultdir/$async_mode-$sub_mode outdir=$resultdir/$async_mode-$sub_mode
mkdir $outdir mkdir $outdir
rm -f $outdir/pub.log
export logdir=$outdir
if $netstats ; then
ping `echo $1 | sed -e 's/.*@//'` > $outdir/ping.log & netstats_pids=$!
if [ "`uname -s`" = "Darwin" ] ; then
rm -f $outdir/netstat.log
bash -c "while true ; do /System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport -I >> $outdir/netstat.log ; sleep 1; done" & netstats_pids="$netstats_pids $!"
fi
fi
for size in ${sizelist:-0} ; do
echo "size $size"
#export trace=trace,-content
bin/ddsperf -Q minmatch:$# -Q initwait:3 \
-c -d $nwif$bandwidth \
-D $timeout -T $topic \
pub size $size | \
tee -a $outdir/pub.log
done
bin/ddsperf -d $nwif:$bandwidth -c -T $topic sub $sub_mode > $outdir/sub.log & spid=$! if $netstats ; then
tail -f $outdir/sub.log & xpid=$! kill $netstats_pids
ssh $pubremote ". $remotedir/run-publisher.tmp" fi
kill $spid for r in "$@" ; do
eval $killremotesubs ssh $r "kill -9 \`cat $remotedir/throughput-test-sub-*.pid\` ; rm $remotedir/throughput-test-sub-*.pid"
sleep 1 done
kill $xpid
wait wait
scp $pubremote:$remotedir/pub.log $outdir for r in "$@" ; do
scp $r:$remotedir/sub.log $outdir/sub-$r.log
done
# write a summary, one line per second
# col 0: raw network receive bandwidth (Mb/s)
# col 1: appl receive bandwidth, 1s trailing average (Mb/s)
# col 2: appl receive bandwidth, 10s trailing average (Mb/s)
# (this assumes the network interface name is eth, en, or lo, optionally followed by a 0)
perl -ne 'if(/size \d+ total.* (\d+\.\d+) Mb\/s.* (\d+\.\d+) Mb\/s/){$r=$1;$r10=$2;}if(/(?:eth|en|lo)0?: xmit.*? recv (\d+\.\d+)/){printf "%f %f %f\n", $1, $r, $r10;}' $outdir/sub-$1.log > $resultdir/summary-$async_mode-$sub_mode.txt
perl -ne 'print "$1 $2\n" if /^(\d+)\s+(\d+)(\s+\d+)$/' $outdir/pub.log > $resultdir/rexmit-bytes.txt
perl -ne 'print "$1 $2\n" if /^DISCARDED\s+(\d+)\s+(\d+)$/' $outdir/sub-$1.log > $resultdir/discarded.txt
if $netstats ; then
perl -ne 'print "$1\n" if /time=([0-9.]+)/' $outdir/ping.log > $resultdir/lat.log
if [ "`uname -s`" = "Darwin" ] ; then
perl -ne 'if(/CtlRSSI:\s*(-?\d+)/){$rssi=$1;}if(/CtlNoise:\s*(-?\d+)/){$noise=$1;}if(/maxRate:\s*(\d+)/){$max=$1;}if(/lastTxRate:\s*(\d+)/){$lasttx=$1;print "$max $lasttx $rssi $noise\n"}' $outdir/netstat.log > $resultdir/net.log
fi
fi
done done
done done

View file

@ -41,7 +41,6 @@ bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_hos
{ {
char line[512]; char line[512];
size_t pos = 0; size_t pos = 0;
assert (is_fresh || !print_host);
pos += (size_t) snprintf (line + pos, sizeof (line) - pos, "%s", prefix); pos += (size_t) snprintf (line + pos, sizeof (line) - pos, "%s", prefix);
if (!is_fresh) if (!is_fresh)
pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " (stale)"); pos += (size_t) snprintf (line + pos, sizeof (line) - pos, " (stale)");

View file

@ -93,6 +93,9 @@ static enum topicsel topicsel = KS;
static enum submode submode = SM_LISTENER; static enum submode submode = SM_LISTENER;
static enum submode pingpongmode = SM_LISTENER; static enum submode pingpongmode = SM_LISTENER;
/* Whether to show "sub" stats every second even when nothing happens */
static bool substat_every_second = false;
/* Size of the sequence in KeyedSeq type in bytes */ /* Size of the sequence in KeyedSeq type in bytes */
static uint32_t baggagesize = 0; static uint32_t baggagesize = 0;
@ -105,6 +108,9 @@ static double dur = HUGE_VAL;
/* Minimum number of peers (if not met, exit status is 1) */ /* Minimum number of peers (if not met, exit status is 1) */
static uint32_t minmatch = 0; static uint32_t minmatch = 0;
/* Wait this long for MINMATCH peers before starting */
static double initmaxwait = 0;
/* Maximum time it may take to discover all MINMATCH peers */ /* Maximum time it may take to discover all MINMATCH peers */
static double maxwait = HUGE_VAL; static double maxwait = HUGE_VAL;
@ -189,9 +195,12 @@ struct eseq_stat {
uint32_t last_size; uint32_t last_size;
/* stats printer state */ /* stats printer state */
uint64_t nrecv_ref; struct {
uint64_t nlost_ref; uint64_t nrecv;
uint64_t nrecv_bytes_ref; uint64_t nlost;
uint64_t nrecv_bytes;
} ref[10];
unsigned refidx;
}; };
struct eseq_admin { struct eseq_admin {
@ -1376,30 +1385,39 @@ static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str
if (submode != SM_NONE) if (submode != SM_NONE)
{ {
struct eseq_admin * const ea = &eseq_admin; struct eseq_admin * const ea = &eseq_admin;
uint64_t tot_nrecv = 0, tot_nlost = 0, nrecv = 0, nrecv_bytes = 0, nlost = 0; uint64_t tot_nrecv = 0, tot_nlost = 0, nlost = 0;
uint64_t nrecv = 0, nrecv_bytes = 0;
uint64_t nrecv10s = 0, nrecv10s_bytes = 0;
uint32_t last_size = 0; uint32_t last_size = 0;
ddsrt_mutex_lock (&ea->lock); ddsrt_mutex_lock (&ea->lock);
for (uint32_t i = 0; i < ea->nph; i++) for (uint32_t i = 0; i < ea->nph; i++)
{ {
struct eseq_stat * const x = &ea->stats[i]; struct eseq_stat * const x = &ea->stats[i];
unsigned refidx1s = (x->refidx == 0) ? (unsigned) (sizeof (x->ref) / sizeof (x->ref[0]) - 1) : (x->refidx - 1);
unsigned refidx10s = x->refidx;
tot_nrecv += x->nrecv; tot_nrecv += x->nrecv;
tot_nlost += x->nlost; tot_nlost += x->nlost;
nrecv += x->nrecv - x->nrecv_ref; nrecv += x->nrecv - x->ref[refidx1s].nrecv;
nlost += x->nlost - x->nlost_ref; nlost += x->nlost - x->ref[refidx1s].nlost;
nrecv_bytes += x->nrecv_bytes - x->nrecv_bytes_ref; nrecv_bytes += x->nrecv_bytes - x->ref[refidx1s].nrecv_bytes;
nrecv10s += x->nrecv - x->ref[refidx10s].nrecv;
nrecv10s_bytes += x->nrecv_bytes - x->ref[refidx10s].nrecv_bytes;
last_size = x->last_size; last_size = x->last_size;
x->nrecv_ref = x->nrecv; x->ref[x->refidx].nrecv = x->nrecv;
x->nlost_ref = x->nlost; x->ref[x->refidx].nlost = x->nlost;
x->nrecv_bytes_ref = x->nrecv_bytes; x->ref[x->refidx].nrecv_bytes = x->nrecv_bytes;
if (++x->refidx == (unsigned) (sizeof (x->ref) / sizeof (x->ref[0])))
x->refidx = 0;
} }
ddsrt_mutex_unlock (&ea->lock); ddsrt_mutex_unlock (&ea->lock);
if (nrecv > 0) if (nrecv > 0 || substat_every_second)
{ {
const double dt = (double) (tnow - tprev); const double dt = (double) (tnow - tprev);
printf ("%s size %"PRIu32" total %"PRIu64" lost %"PRIu64" delta %"PRIu64" lost %"PRIu64" rate %.2f kS/s %.2f Mb/s\n", printf ("%s size %"PRIu32" total %"PRIu64" lost %"PRIu64" delta %"PRIu64" lost %"PRIu64" rate %.2f kS/s %.2f Mb/s (%.2f kS/s %.2f Mb/s)\n",
prefix, last_size, tot_nrecv, tot_nlost, nrecv, nlost, prefix, last_size, tot_nrecv, tot_nlost, nrecv, nlost,
(double) nrecv * 1e6 / dt, (double) nrecv_bytes * 8 * 1e3 / dt); (double) nrecv * 1e6 / dt, (double) nrecv_bytes * 8 * 1e3 / dt,
(double) nrecv10s * 1e6 / (10 * dt), (double) nrecv10s_bytes * 8 * 1e3 / (10 * dt));
output = true; output = true;
} }
} }
@ -1577,10 +1595,22 @@ OPTIONS:\n\
samples:N min received messages by \"sub\"\n\ samples:N min received messages by \"sub\"\n\
roundtrips:N min roundtrips for \"pong\"\n\ roundtrips:N min roundtrips for \"pong\"\n\
minmatch:N require >= N matching participants\n\ minmatch:N require >= N matching participants\n\
initwait:DUR wait for those participants before\n\
starting, abort if not within DUR\n\
seconds\n\
maxwait:DUR require those participants to match\n\ maxwait:DUR require those participants to match\n\
within DUR seconds\n\ within DUR seconds\n\
-R TREF timestamps in the output relative to TREF instead of\n\ -R TREF timestamps in the output relative to TREF instead of\n\
process start\n\ process start\n\
-W DUR wait at most DUR seconds for the minimum required\n\
number of matching participants (set by -Qminmatch:N)\n\
to show up before starting reading/writing data,\n\
terminate with an error otherwise. (This differs\n\
from -Qmaxwait:DUR because that doesn't delay starting\n\
and doesn't terminate the process before doing\n\
anything.)\n\
-1 print \"sub\" stats every second, even when there is\n\
data\n\
-i ID use domain ID instead of the default domain\n\ -i ID use domain ID instead of the default domain\n\
\n\ \n\
MODE... is zero or more of:\n\ MODE... is zero or more of:\n\
@ -1884,24 +1914,30 @@ int main (int argc, char *argv[])
ddsrt_thread_t sigtid; ddsrt_thread_t sigtid;
#endif #endif
char netload_if[256]; char netload_if[256];
double netload_bw = 0; double netload_bw = -1;
double rss_init = 0.0, rss_final = 0.0;
ddsrt_threadattr_init (&attr); ddsrt_threadattr_init (&attr);
argv0 = argv[0]; argv0 = argv[0];
while ((opt = getopt (argc, argv, "cd:D:i:n:k:uLK:T:Q:R:h")) != EOF) while ((opt = getopt (argc, argv, "1cd:D:i:n:k:uLK:T:Q:R:h")) != EOF)
{ {
int pos; int pos;
switch (opt) switch (opt)
{ {
case '1': substat_every_second = true; break;
case 'c': collect_stats = true; break; case 'c': collect_stats = true; break;
case 'd': { case 'd': {
char *col; char *col;
(void) ddsrt_strlcpy (netload_if, optarg, sizeof (netload_if)); (void) ddsrt_strlcpy (netload_if, optarg, sizeof (netload_if));
if ((col = strrchr (netload_if, ':')) == NULL || col == netload_if || if ((col = strrchr (netload_if, ':')) == NULL)
(sscanf (col+1, "%lf%n", &netload_bw, &pos) != 1 || (col+1)[pos] != 0)) netload_bw = 0;
else
{
if (col == netload_if || (sscanf (col+1, "%lf%n", &netload_bw, &pos) != 1 || (col+1)[pos] != 0))
error3 ("-d %s: expected DEVICE:BANDWIDTH\n", optarg); error3 ("-d %s: expected DEVICE:BANDWIDTH\n", optarg);
*col = 0; *col = 0;
}
break; break;
} }
case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break; case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break;
@ -1931,6 +1967,8 @@ int main (int argc, char *argv[])
min_roundtrips = (uint64_t) n; min_roundtrips = (uint64_t) n;
} else if (sscanf (optarg, "maxwait:%lf%n", &maxwait, &pos) == 1 && optarg[pos] == 0) { } else if (sscanf (optarg, "maxwait:%lf%n", &maxwait, &pos) == 1 && optarg[pos] == 0) {
maxwait = (maxwait <= 0) ? HUGE_VAL : maxwait; maxwait = (maxwait <= 0) ? HUGE_VAL : maxwait;
} else if (sscanf (optarg, "initwait:%lf%n", &initmaxwait, &pos) == 1 && optarg[pos] == 0) {
initmaxwait = (initmaxwait <= 0) ? 0 : initmaxwait;
} else if (sscanf (optarg, "minmatch:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { } else if (sscanf (optarg, "minmatch:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) {
minmatch = (uint32_t) n; minmatch = (uint32_t) n;
} else { } else {
@ -1972,7 +2010,7 @@ int main (int argc, char *argv[])
baggagesize -= 12; baggagesize -= 12;
struct record_netload_state *netload_state; struct record_netload_state *netload_state;
if (netload_bw <= 0) if (netload_bw < 0)
netload_state = NULL; netload_state = NULL;
else if ((netload_state = record_netload_new (netload_if, netload_bw)) == NULL) else if ((netload_state = record_netload_new (netload_if, netload_bw)) == NULL)
error3 ("can't get network utilization information for device %s\n", netload_if); error3 ("can't get network utilization information for device %s\n", netload_if);
@ -2036,7 +2074,7 @@ int main (int argc, char *argv[])
snprintf (tpname_ping, sizeof (tpname_ping), "DDSPerf%cPing%s", reliable ? 'R' : 'U', tp_suf); snprintf (tpname_ping, sizeof (tpname_ping), "DDSPerf%cPing%s", reliable ? 'R' : 'U', tp_suf);
snprintf (tpname_pong, sizeof (tpname_pong), "DDSPerf%cPong%s", reliable ? 'R' : 'U', tp_suf); snprintf (tpname_pong, sizeof (tpname_pong), "DDSPerf%cPong%s", reliable ? 'R' : 'U', tp_suf);
qos = dds_create_qos (); qos = dds_create_qos ();
dds_qset_reliability (qos, reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (1)); dds_qset_reliability (qos, reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (10));
if ((tp_data = dds_create_topic (dp, tp_desc, tpname_data, qos, NULL)) < 0) if ((tp_data = dds_create_topic (dp, tp_desc, tpname_data, qos, NULL)) < 0)
error2 ("dds_create_topic(%s) failed: %d\n", tpname_data, (int) tp_data); error2 ("dds_create_topic(%s) failed: %d\n", tpname_data, (int) tp_data);
if ((tp_ping = dds_create_topic (dp, tp_desc, tpname_ping, qos, NULL)) < 0) if ((tp_ping = dds_create_topic (dp, tp_desc, tpname_ping, qos, NULL)) < 0)
@ -2157,20 +2195,6 @@ int main (int argc, char *argv[])
if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0)
error2 ("dds_waitset_attach(main, termcond) failed: %d\n", (int) rc); error2 ("dds_waitset_attach(main, termcond) failed: %d\n", (int) rc);
/* I hate Unix signals in multi-threaded processes ... */
#ifdef _WIN32
signal (SIGINT, signal_handler);
#elif !DDSRT_WITH_FREERTOS
sigemptyset (&sigset);
sigaddset (&sigset, SIGINT);
sigaddset (&sigset, SIGTERM);
sigprocmask (SIG_BLOCK, &sigset, &osigset);
ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset);
#if defined __APPLE__ || defined __linux
signal (SIGXFSZ, sigxfsz_handler);
#endif
#endif
/* Make publisher & subscriber thread arguments and start the threads we /* Make publisher & subscriber thread arguments and start the threads we
need (so what if we allocate memory for reading data even if we don't need (so what if we allocate memory for reading data even if we don't
have a reader or will never really be receiving data) */ have a reader or will never really be receiving data) */
@ -2191,6 +2215,34 @@ int main (int argc, char *argv[])
memset (&subtid, 0, sizeof (subtid)); memset (&subtid, 0, sizeof (subtid));
memset (&subpingtid, 0, sizeof (subpingtid)); memset (&subpingtid, 0, sizeof (subpingtid));
memset (&subpongtid, 0, sizeof (subpongtid)); memset (&subpongtid, 0, sizeof (subpongtid));
/* Just before starting the threads but after setting everything up, wait for
the required number of peers, if requested to do so */
if (initmaxwait > 0)
{
dds_time_t tnow = dds_time ();
const dds_time_t tendwait = tnow + (dds_duration_t) (initmaxwait * 1e9);
ddsrt_mutex_lock (&disc_lock);
while (matchcount < minmatch && tnow < tendwait)
{
ddsrt_mutex_unlock (&disc_lock);
dds_sleepfor (DDS_MSECS (100));
ddsrt_mutex_lock (&disc_lock);
tnow = dds_time ();
}
const bool ok = (matchcount >= minmatch);
if (!ok)
{
/* set minmatch to an impossible value to avoid a match occurring between now and
the determining of the exit status from causing a successful return */
minmatch = UINT32_MAX;
}
ddsrt_mutex_unlock (&disc_lock);
if (!ok)
goto err_minmatch_wait;
dds_sleepfor (DDS_MSECS (100));
}
if (pub_rate > 0) if (pub_rate > 0)
ddsrt_thread_create (&pubtid, "pub", &attr, pubthread, NULL); ddsrt_thread_create (&pubtid, "pub", &attr, pubthread, NULL);
if (subthread_func != 0) if (subthread_func != 0)
@ -2220,6 +2272,21 @@ int main (int argc, char *argv[])
struct record_cputime_state *cputime_state; struct record_cputime_state *cputime_state;
cputime_state = record_cputime_new (wr_stat); cputime_state = record_cputime_new (wr_stat);
/* I hate Unix signals in multi-threaded processes ... */
#ifdef _WIN32
signal (SIGINT, signal_handler);
#elif !DDSRT_WITH_FREERTOS
sigemptyset (&sigset);
sigaddset (&sigset, SIGHUP);
sigaddset (&sigset, SIGINT);
sigaddset (&sigset, SIGTERM);
sigprocmask (SIG_BLOCK, &sigset, &osigset);
ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset);
#if defined __APPLE__ || defined __linux
signal (SIGXFSZ, sigxfsz_handler);
#endif
#endif
/* Run until time limit reached or a signal received. (The time calculations /* Run until time limit reached or a signal received. (The time calculations
ignore the possibility of overflow around the year 2260.) */ ignore the possibility of overflow around the year 2260.) */
dds_time_t tnow = dds_time (); dds_time_t tnow = dds_time ();
@ -2231,7 +2298,6 @@ int main (int argc, char *argv[])
dds_time_t tnext = tstart + DDS_SECS (1); dds_time_t tnext = tstart + DDS_SECS (1);
dds_time_t tlast = tstart; dds_time_t tlast = tstart;
dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv; dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv;
double rss_init = 0.0, rss_final = 0.0;
while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop) while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop)
{ {
dds_time_t twakeup = DDS_NEVER; dds_time_t twakeup = DDS_NEVER;
@ -2350,6 +2416,7 @@ int main (int argc, char *argv[])
ddsrt_thread_join (subpongtid, NULL); ddsrt_thread_join (subpongtid, NULL);
} }
err_minmatch_wait:
/* stop the listeners before deleting the readers: otherwise they may /* stop the listeners before deleting the readers: otherwise they may
still try to access a reader that has already become inaccessible still try to access a reader that has already become inaccessible
(not quite good, but ...) */ (not quite good, but ...) */
@ -2420,7 +2487,7 @@ int main (int argc, char *argv[])
if (matchcount < minmatch) if (matchcount < minmatch)
{ {
printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32" instead of %"PRIu32")\n", ddsrt_getpid (), matchcount, minmatch); printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32")\n", ddsrt_getpid (), matchcount);
ok = false; ok = false;
} }
if (nlost > 0 && (reliable && histdepth == 0)) if (nlost > 0 && (reliable && histdepth == 0))

View file

@ -47,17 +47,25 @@ void record_netload (struct record_netload_state *st, const char *prefix, dds_ti
if (st->data_valid) if (st->data_valid)
{ {
/* interface speeds are in bits/s, so convert bytes to bits */ /* interface speeds are in bits/s, so convert bytes to bits */
const double dx = 8 * (double) (x.obytes - st->obytes);
const double dr = 8 * (double) (x.ibytes - st->ibytes);
const double dt = (double) (tnow - st->tprev) / 1e9; const double dt = (double) (tnow - st->tprev) / 1e9;
const double dxpct = 100.0 * dx / dt / st->bw; const double dx = 8 * (double) (x.obytes - st->obytes) / dt;
const double drpct = 100.0 * dr / dt / st->bw; const double dr = 8 * (double) (x.ibytes - st->ibytes) / dt;
if (st->bw > 0)
{
const double dxpct = 100.0 * dx / st->bw;
const double drpct = 100.0 * dr / st->bw;
if (dxpct >= 0.5 || drpct >= 0.5) if (dxpct >= 0.5 || drpct >= 0.5)
{ {
printf ("%s %s: xmit %.0f%% recv %.0f%% [%"PRIu64" %"PRIu64"]\n", printf ("%s %s: xmit %.0f%% recv %.0f%% [%"PRIu64" %"PRIu64"]\n",
prefix, st->name, dxpct, drpct, x.obytes, x.ibytes); prefix, st->name, dxpct, drpct, x.obytes, x.ibytes);
} }
} }
else if (dx >= 1e5 || dr >= 1e5) // 100kb/s is arbitrary
{
printf ("%s %s: xmit %.2f Mb/s recv %.2f Mb/s [%"PRIu64" %"PRIu64"]\n",
prefix, st->name, dx / 1e6, dr / 1e6, x.obytes, x.ibytes);
}
}
st->obytes = x.obytes; st->obytes = x.obytes;
st->ibytes = x.ibytes; st->ibytes = x.ibytes;
st->tprev = tnow; st->tprev = tnow;