Merge pull request #101 from eboasson/throughput-example-fixes
Throughput example fixes
This commit is contained in:
commit
acec84cf0b
6 changed files with 199 additions and 113 deletions
73
performance/throughput-fanout-test
Normal file
73
performance/throughput-fanout-test
Normal file
|
@ -0,0 +1,73 @@
|
|||
#!/bin/bash
|
||||
|
||||
usage () {
|
||||
cat >&2 <<EOF
|
||||
usage: $0 [OPTIONS] user@remote [user@remote...]
|
||||
|
||||
OPTIONS
|
||||
-i IF use network interface IF (default: eth0)
|
||||
-b 100|1000 network bandwidth (100Mbps/1000Gbps) for calculating load
|
||||
% given load in bytes/second (default: 1000)
|
||||
-d DIR use DIR on remote (default: PWD)
|
||||
-p provision required binaries in DIR (default: false)
|
||||
first ssh's in to try mkdir -p DIR, then follows up with scp
|
||||
-t DUR run for DUR seconds per size (default 20)
|
||||
-a ASYNCLIST run for delivery async settings ASYNCLIST (default: "0 1")
|
||||
-m MODELIST run with subscriber mode settings MODELIST (default: "-1 0 1")
|
||||
-s SIZELIST run for sizes in SIZELIST (default: "0 16 32 64 128 256")
|
||||
-l LOOPBACK enable multicast loopback (true/false, default: true)
|
||||
-o DIR store results in dir.N where N is number of nodes
|
||||
|
||||
Runs throughput-test with the specified options for each prefix of remote nodes.
|
||||
EOF
|
||||
exit 1
|
||||
}
|
||||
|
||||
export nwif=eth0
|
||||
bandwidth=1000
|
||||
remotedir="$PWD"
|
||||
provision=false
|
||||
asynclist="0 1"
|
||||
modelist="-1 0 1"
|
||||
sizelist="0 16 32 64 128 256"
|
||||
timeout=20
|
||||
loopback=true
|
||||
resultdir="throughput-result"
|
||||
while getopts "i:b:d:pa:m:s:t:o:l:" opt ; do
|
||||
case $opt in
|
||||
i) nwif="$OPTARG" ;;
|
||||
b) bandwidth="$OPTARG" ;;
|
||||
d) remotedir="$OPTARG" ;;
|
||||
p) provision=true ;;
|
||||
a) asynclist="$OPTARG" ;;
|
||||
m) modelist="$OPTARG" ;;
|
||||
s) sizelist="$OPTARG" ;;
|
||||
l) loopback="OPTARG" ;;
|
||||
t) timeout="$OPTARG" ;;
|
||||
o) resultdir="$OPTARG" ;;
|
||||
h) usage ;;
|
||||
esac
|
||||
done
|
||||
shift $((OPTIND-1))
|
||||
if [ $# -lt 1 ] ; then usage ; fi
|
||||
pubremote=$1
|
||||
shift
|
||||
|
||||
popt=
|
||||
$provision && popt=-p
|
||||
|
||||
n=0
|
||||
while [[ $n -le $# ]] ; do
|
||||
out=$resultdir.$(( $n + 1 ))
|
||||
mkdir $out
|
||||
|
||||
otherhosts=""
|
||||
j=1
|
||||
while [[ $j -le $n ]] ; do
|
||||
hostJ=`eval echo "\\$$j"`
|
||||
otherhosts="$otherhosts $hostJ"
|
||||
j=$(( $j + 1 ))
|
||||
done
|
||||
`dirname $0`/throughput-test -i "$nwif" -b "$bandwidth" -d "$remotedir" $popt -a "$asynclist" -m "$modelist" -s "$sizelist" -t "$timeout" -o $out $pubremote $otherhosts
|
||||
n=$(( $n + 1 ))
|
||||
done
|
|
@ -1,21 +1,61 @@
|
|||
#!/bin/bash
|
||||
|
||||
if [ $# -ne 1 ] ; then
|
||||
echo >&2 <<EOF
|
||||
usage: $0 user@remote
|
||||
usage () {
|
||||
cat >&2 <<EOF
|
||||
usage: $0 [OPTIONS] user@remote [user@remote...]
|
||||
|
||||
It assumes various things:
|
||||
- ssh user@remote succeeds without a password
|
||||
- network device to use is em2 (otherwise, change the config)
|
||||
- it assumes it is run from the build directory (actually, it verifies this)
|
||||
- it assumes ThroughputPublisher can be found in exactly the same place on the remote
|
||||
- probably some other things as well ... use with care.
|
||||
OPTIONS
|
||||
-i IF use network interface IF (default: eth0)
|
||||
-b 100|1000 network bandwidth (100Mbps/1000Gbps) for calculating load
|
||||
% given load in bytes/second (default: 1000)
|
||||
-d DIR use DIR on remote (default: PWD)
|
||||
-p provision required binaries in DIR (default: false)
|
||||
first ssh's in to try mkdir -p DIR, then follows up with scp
|
||||
-t DUR run for DUR seconds per size (default 20)
|
||||
-a ASYNCLIST run for delivery async settings ASYNCLIST (default: "0 1")
|
||||
-m MODELIST run with subscriber mode settings MODELIST (default: "-1 0 1")
|
||||
-s SIZELIST run for sizes in SIZELIST (default: "0 16 32 64 128 256")
|
||||
-l LOOPBACK enable/disable multicast loopback (true/false, default: true)
|
||||
-o DIR store results in dir (default: throughput-result)
|
||||
|
||||
Local host runs ThroughputSubscriber, first remote runs ThroughputPublisher,
|
||||
further remotes also run ThroughputSubscriber. It assumes these are
|
||||
available in DIR/bin. It also assumes that ssh user@remote works without
|
||||
requiring a password.
|
||||
EOF
|
||||
exit 1
|
||||
fi
|
||||
remote=$1
|
||||
dir=`dirname $0`
|
||||
ethload=$dir/ethload
|
||||
}
|
||||
|
||||
export nwif=eth0
|
||||
bandwidth=1000
|
||||
remotedir="$PWD"
|
||||
provision=false
|
||||
asynclist="0 1"
|
||||
modelist="-1 0 1"
|
||||
sizelist="0 16 32 64 128 256"
|
||||
timeout=20
|
||||
loopback=true
|
||||
resultdir="throughput-result"
|
||||
while getopts "i:b:d:pa:m:s:t:o:l:" opt ; do
|
||||
case $opt in
|
||||
i) nwif="$OPTARG" ;;
|
||||
b) bandwidth="$OPTARG" ;;
|
||||
d) remotedir="$OPTARG" ;;
|
||||
p) provision=true ;;
|
||||
a) asynclist="$OPTARG" ;;
|
||||
m) modelist="$OPTARG" ;;
|
||||
s) sizelist="$OPTARG" ;;
|
||||
l) loopback="OPTARG" ;;
|
||||
t) timeout="$OPTARG" ;;
|
||||
o) resultdir="$OPTARG" ;;
|
||||
h) usage ;;
|
||||
esac
|
||||
done
|
||||
shift $((OPTIND-1))
|
||||
if [ $# -lt 1 ] ; then usage ; fi
|
||||
ethload=`dirname $0`/ethload
|
||||
pubremote=$1
|
||||
shift
|
||||
|
||||
cfg=cdds-simple.xml
|
||||
cat >$cfg <<EOF
|
||||
|
@ -25,13 +65,15 @@ cat >$cfg <<EOF
|
|||
</Domain>
|
||||
<DDSI2E>
|
||||
<General>
|
||||
<NetworkInterfaceAddress>em2</NetworkInterfaceAddress>
|
||||
<NetworkInterfaceAddress>$nwif</NetworkInterfaceAddress>
|
||||
<EnableMulticastLoopback>$loopback</EnableMulticastLoopback>
|
||||
</General>
|
||||
<Internal>
|
||||
<Watermarks>
|
||||
<WhcHigh>500kB</WhcHigh>
|
||||
</Watermarks>
|
||||
<SynchronousDeliveryPriorityThreshold>${ASYNC:-0}</SynchronousDeliveryPriorityThreshold>
|
||||
<SynchronousDeliveryPriorityThreshold>${async:-0}</SynchronousDeliveryPriorityThreshold>
|
||||
<LeaseDuration>3s</LeaseDuration>
|
||||
</Internal>
|
||||
</DDSI2E>
|
||||
</CycloneDDS>
|
||||
|
@ -42,24 +84,37 @@ if [ ! -x bin/ThroughputPublisher -o ! -x bin/ThroughputSubscriber -o ! -x $ethl
|
|||
exit 1
|
||||
fi
|
||||
|
||||
mkdir throughput-result || { echo "failed to create throughput-result directory" >&2 ; exit 1 ; }
|
||||
[ -d $resultdir ] || { echo "output directory $resultdir doesn't exist" >&2 ; exit 1 ; }
|
||||
|
||||
if $provision ; then
|
||||
echo "provisioning ..."
|
||||
for r in $pubremote "$@" ; do
|
||||
ssh $r mkdir -p $remotedir $remotedir/bin $remotedir/lib
|
||||
scp lib/libddsc.so.0 $r:$remotedir/lib
|
||||
scp bin/ThroughputPublisher bin/ThroughputSubscriber $r:$remotedir/bin
|
||||
done
|
||||
fi
|
||||
|
||||
export CYCLONEDDS_URI=file://$PWD/$cfg
|
||||
scp $cfg $remote:$PWD || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 1 ; }
|
||||
for r in $pubremote "$@" ; do
|
||||
scp $cfg $r:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 1 ; }
|
||||
done
|
||||
|
||||
for async in 0 1 ; do
|
||||
for mode in -1 0 1 ; do
|
||||
for async in $asynclist ; do
|
||||
export async
|
||||
for mode in $modelist ; do
|
||||
echo "======== ASYNC $async MODE $mode ========="
|
||||
|
||||
cat > run-publisher.tmp <<EOF
|
||||
export CYCLONEDDS_URI=$CYCLONEDDS_URI
|
||||
export ASYNC=$async
|
||||
cd $PWD
|
||||
export CYCLONEDDS_URI=file://$remotedir/$cfg
|
||||
export async=$async
|
||||
cd $remotedir
|
||||
rm -f pub-top.log
|
||||
for size in 0 16 32 64 128 256 ; do
|
||||
for size in $sizelist ; do
|
||||
echo "size \$size"
|
||||
bin/ThroughputPublisher \$size > pub.log & ppid=\$!
|
||||
top -b -d1 -p \$ppid >> pub-top.log & tpid=\$!
|
||||
sleep 20
|
||||
sleep $timeout
|
||||
kill \$tpid
|
||||
kill -2 \$ppid
|
||||
wait \$ppid
|
||||
|
@ -67,24 +122,38 @@ for size in 0 16 32 64 128 256 ; do
|
|||
done
|
||||
wait
|
||||
EOF
|
||||
scp run-publisher.tmp $remote:$PWD || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 2 ; }
|
||||
|
||||
export ASYNC=$async
|
||||
|
||||
outdir=throughput-result/data-async$async-mode$mode
|
||||
scp run-publisher.tmp $pubremote:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 2 ; }
|
||||
killremotesubs=""
|
||||
if [ $# -gt 0 ] ; then
|
||||
cat > run-subscriber.tmp <<EOF
|
||||
export CYCLONEDDS_URI=file://$remotedir/$cfg
|
||||
export async=$async
|
||||
cd $remotedir
|
||||
nohup bin/ThroughputSubscriber 0 $mode > /dev/null &
|
||||
echo \$!
|
||||
EOF
|
||||
for r in "$@" ; do
|
||||
scp run-subscriber.tmp $r:$remotedir
|
||||
rsubpid=`ssh $r ". $remotedir/run-subscriber.tmp"`
|
||||
killremotesubs="$killremotesubs ssh $r kill -9 $rsubpid &"
|
||||
done
|
||||
fi
|
||||
|
||||
outdir=$resultdir/data-async$async-mode$mode
|
||||
mkdir $outdir
|
||||
|
||||
rm -f sub-top.log
|
||||
$ethload em2 1000 > $outdir/sub-ethload.log & lpid=$!
|
||||
$ethload $nwif $bandwidth > $outdir/sub-ethload.log & lpid=$!
|
||||
bin/ThroughputSubscriber 0 $mode > $outdir/sub.log & spid=$!
|
||||
top -b -d1 -p $spid >> $outdir/sub-top.log & tpid=$!
|
||||
tail -f $outdir/sub.log & xpid=$!
|
||||
ssh $remote ". $PWD/run-publisher.tmp"
|
||||
ssh $pubremote ". $remotedir/run-publisher.tmp"
|
||||
kill $tpid
|
||||
kill -2 $spid
|
||||
eval $killremotesubs
|
||||
sleep 1
|
||||
kill $lpid $xpid
|
||||
wait
|
||||
scp $remote:$PWD/{pub-top.log,pub.log} $outdir
|
||||
scp $pubremote:$remotedir/{pub-top.log,pub.log} $outdir
|
||||
done
|
||||
done
|
||||
|
|
|
@ -7,16 +7,18 @@ my @dirs = ("async0-mode-1", "async0-mode0", "async0-mode1",
|
|||
|
||||
my $dataset = 0;
|
||||
my $basedir = "throughput-result";
|
||||
$basedir = $ARGV[0] if @ARGV== 1;
|
||||
my $load_threshold = 20;
|
||||
for my $dir (@dirs) {
|
||||
my @loads = ();
|
||||
|
||||
|
||||
{
|
||||
open LH, "< $basedir/data-$dir/sub-ethload.log" or die "can't open $basedir/data-$dir/sub-ethload.log";
|
||||
open LH, "< $basedir/data-$dir/sub-ethload.log" or next; # die "can't open $basedir/data-$dir/sub-ethload.log";
|
||||
my @curload = ();
|
||||
while (<LH>) {
|
||||
die unless /^r +([0-9.]+).*\( *(\d+)/;
|
||||
push @curload, $2 if $1 > 20;
|
||||
if (@curload && $1 < 20) {
|
||||
next unless /^r +([0-9.]+).*\( *(\d+)/;
|
||||
push @curload, $2 if $1 > $load_threshold;
|
||||
if (@curload && $1 < $load_threshold) {
|
||||
push @loads, median (@curload);
|
||||
@curload = ();
|
||||
}
|
||||
|
@ -24,8 +26,8 @@ for my $dir (@dirs) {
|
|||
push @loads, median (@curload) if @curload;
|
||||
close LH;
|
||||
}
|
||||
|
||||
open FH, "< $basedir/data-$dir/sub.log" or die "can't open $basedir/data-$dir/sub.log";
|
||||
|
||||
open FH, "< $basedir/data-$dir/sub.log" or next; # die "can't open $basedir/data-$dir/sub.log";
|
||||
print "\n\n" if $dataset++;
|
||||
print "# mode $dir\n";
|
||||
print "# payloadsize rate[samples/s] appl.bandwidth[Mb/s] raw.bandwidth[Mb/s]\n";
|
||||
|
@ -36,9 +38,6 @@ for my $dir (@dirs) {
|
|||
my $psz_cur = $1; my $rate_cur = $2;
|
||||
$psz = $psz_cur unless defined $psz;
|
||||
if ($psz != $psz_cur) {
|
||||
# this is a bit yucky: scan the ethload for the next set of substantially-above-zero numbers
|
||||
# where "substantially-above-zero" is defined as > 20%, cos that seems to work fine on a
|
||||
# quiescent network
|
||||
my $load = shift @loads;
|
||||
my $rate = median (@rate);
|
||||
printf "%d %f %f %f\n", $psz, $rate, $rate * (8 + $psz) / 125e3, $load / 125e3;
|
||||
|
|
|
@ -1945,10 +1945,9 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
|
|||
reorder->n_samples++;
|
||||
}
|
||||
}
|
||||
else if (s->min == reorder->max_sampleiv->u.reorder.maxp1)
|
||||
else if (((void) assert (reorder->max_sampleiv != NULL)), (s->min == reorder->max_sampleiv->u.reorder.maxp1))
|
||||
{
|
||||
/* note: sampleivtree not empty <=> max_sampleiv is set (compilers
|
||||
and static analyzers may warn) */
|
||||
/* (sampleivtree not empty) <=> (max_sampleiv is non-NULL), for which there is an assert at the beginning but compilers and static analyzers don't all quite get that ... the somewhat crazy assert shuts up Clang's static analyzer */
|
||||
if (delivery_queue_full_p)
|
||||
{
|
||||
/* growing last inteval will not be accepted when this flag is set */
|
||||
|
|
|
@ -29,22 +29,11 @@ static int parse_args(int argc, char **argv, uint32_t *payloadSize, int *burstIn
|
|||
static dds_entity_t prepare_dds(dds_entity_t *writer, const char *partitionName);
|
||||
static void finalize_dds(dds_entity_t participant, dds_entity_t writer, ThroughputModule_DataType sample);
|
||||
|
||||
/* Functions to handle Ctrl-C presses. */
|
||||
#ifdef _WIN32
|
||||
#include <Windows.h>
|
||||
static int CtrlHandler (DWORD fdwCtrlType)
|
||||
{
|
||||
done = true;
|
||||
return true; /* Don't let other handlers handle this key */
|
||||
}
|
||||
#else
|
||||
struct sigaction oldAction;
|
||||
static void CtrlHandler (int sig)
|
||||
static void sigint (int sig)
|
||||
{
|
||||
(void)sig;
|
||||
done = true;
|
||||
}
|
||||
#endif
|
||||
|
||||
int main (int argc, char **argv)
|
||||
{
|
||||
|
@ -57,6 +46,8 @@ int main (int argc, char **argv)
|
|||
dds_entity_t writer;
|
||||
ThroughputModule_DataType sample;
|
||||
|
||||
setvbuf (stdout, NULL, _IOLBF, 0);
|
||||
|
||||
if (parse_args(argc, argv, &payloadSize, &burstInterval, &burstSize, &timeOut, &partitionName) == EXIT_FAILURE) {
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
@ -80,25 +71,11 @@ int main (int argc, char **argv)
|
|||
}
|
||||
|
||||
/* Register handler for Ctrl-C */
|
||||
#ifdef _WIN32
|
||||
SetConsoleCtrlHandler ((PHANDLER_ROUTINE) CtrlHandler, true);
|
||||
#else
|
||||
struct sigaction sat;
|
||||
sat.sa_handler = CtrlHandler;
|
||||
sigemptyset (&sat.sa_mask);
|
||||
sat.sa_flags = 0;
|
||||
sigaction (SIGINT, &sat, &oldAction);
|
||||
#endif
|
||||
signal (SIGINT, sigint);
|
||||
|
||||
/* Register the sample instance and write samples repeatedly or until time out */
|
||||
start_writing(writer, &sample, burstInterval, burstSize, timeOut);
|
||||
|
||||
#ifdef _WIN32
|
||||
SetConsoleCtrlHandler (0, false);
|
||||
#else
|
||||
sigaction (SIGINT, &oldAction, 0);
|
||||
#endif
|
||||
|
||||
/* Cleanup */
|
||||
finalize_dds(participant, writer, sample);
|
||||
return EXIT_SUCCESS;
|
||||
|
|
|
@ -61,24 +61,11 @@ static void process_samples(dds_entity_t reader, unsigned long long maxCycles);
|
|||
static dds_entity_t prepare_dds(dds_entity_t *reader, const char *partitionName);
|
||||
static void finalize_dds(dds_entity_t participant);
|
||||
|
||||
/* Functions to handle Ctrl-C presses. */
|
||||
#ifdef _WIN32
|
||||
#include <Windows.h>
|
||||
static int CtrlHandler (DWORD fdwCtrlType)
|
||||
static void sigint (int sig)
|
||||
{
|
||||
dds_waitset_set_trigger (waitSet, true);
|
||||
done = true;
|
||||
return true; /* Don't let other handlers handle this key */
|
||||
}
|
||||
#else
|
||||
struct sigaction oldAction;
|
||||
static void CtrlHandler (int sig)
|
||||
{
|
||||
(void)sig;
|
||||
dds_waitset_set_trigger (waitSet, true);
|
||||
(void) sig;
|
||||
done = true;
|
||||
}
|
||||
#endif
|
||||
|
||||
int main (int argc, char **argv)
|
||||
{
|
||||
|
@ -88,16 +75,7 @@ int main (int argc, char **argv)
|
|||
dds_entity_t participant;
|
||||
dds_entity_t reader;
|
||||
|
||||
/* Register handler for Ctrl-C */
|
||||
#ifdef _WIN32
|
||||
SetConsoleCtrlHandler((PHANDLER_ROUTINE) CtrlHandler, true);
|
||||
#else
|
||||
struct sigaction sat;
|
||||
sat.sa_handler = CtrlHandler;
|
||||
sigemptyset(&sat.sa_mask);
|
||||
sat.sa_flags = 0;
|
||||
sigaction (SIGINT, &sat, &oldAction);
|
||||
#endif
|
||||
setvbuf (stdout, NULL, _IOLBF, 0);
|
||||
|
||||
if (parse_args(argc, argv, &maxCycles, &partitionName) == EXIT_FAILURE)
|
||||
{
|
||||
|
@ -112,21 +90,12 @@ int main (int argc, char **argv)
|
|||
|
||||
/* Process samples until Ctrl-C is pressed or until maxCycles */
|
||||
/* has been reached (0 = infinite) */
|
||||
signal (SIGINT, sigint);
|
||||
process_samples(reader, maxCycles);
|
||||
|
||||
/* Finished, disable callbacks */
|
||||
dds_set_status_mask (reader, 0);
|
||||
HandleMap__free (imap);
|
||||
|
||||
#ifdef _WIN32
|
||||
SetConsoleCtrlHandler (0, FALSE);
|
||||
#else
|
||||
sigaction (SIGINT, &oldAction, 0);
|
||||
#endif
|
||||
|
||||
/* Clean up */
|
||||
finalize_dds(participant);
|
||||
|
||||
finalize_dds (participant);
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -281,15 +250,15 @@ static void process_samples(dds_entity_t reader, unsigned long long maxCycles)
|
|||
while (!done && (maxCycles == 0 || cycles < maxCycles))
|
||||
{
|
||||
if (pollingDelay > 0)
|
||||
{
|
||||
dds_sleepfor (DDS_MSECS (pollingDelay));
|
||||
while (do_take (reader))
|
||||
;
|
||||
}
|
||||
else
|
||||
{
|
||||
status = dds_waitset_wait (waitSet, wsresults, sizeof(wsresults)/sizeof(wsresults[0]), DDS_SECS(1));
|
||||
status = dds_waitset_wait (waitSet, wsresults, sizeof(wsresults)/sizeof(wsresults[0]), DDS_MSECS(100));
|
||||
DDS_ERR_CHECK (status, DDS_CHECK_REPORT | DDS_CHECK_EXIT);
|
||||
}
|
||||
|
||||
if (pollingDelay >= 0)
|
||||
{
|
||||
while (do_take (reader))
|
||||
;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue