diff --git a/.travis.yml b/.travis.yml index 1be51fc..3d2f4cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -216,6 +216,9 @@ script: ;; esac - CYCLONEDDS_URI='alltrueconfigstderr' ctest -j 4 --output-on-failure -T test -E '^CUnit_ddsrt_random_default_random$' -C ${BUILD_TYPE} + - if [ "${ASAN}" = "none" ]; then + ${SHELL} ../src/tools/ddsperf/sanity.bash; + fi - if [ "${ASAN}" != "none" ]; then CMAKE_LINKER_FLAGS="-DCMAKE_LINKER_FLAGS=-fsanitize=${USE_SANITIZER}"; CMAKE_C_FLAGS="-DCMAKE_C_FLAGS=-fsanitize=${USE_SANITIZER}"; diff --git a/src/tools/ddsperf/cputime.c b/src/tools/ddsperf/cputime.c index b94a473..c737c93 100644 --- a/src/tools/ddsperf/cputime.c +++ b/src/tools/ddsperf/cputime.c @@ -184,6 +184,11 @@ bool record_cputime (struct record_cputime_state *state, const char *prefix, dds return print_cputime (&state->s, prefix, false, true); } +double record_cputime_read_rss (const struct record_cputime_state *state) +{ + return state->s.maxrss; +} + struct record_cputime_state *record_cputime_new (dds_entity_t wr) { ddsrt_thread_list_id_t tids[100]; @@ -251,6 +256,12 @@ bool record_cputime (struct record_cputime_state *state, const char *prefix, dds (void) tnow; } +double record_cputime_read_rss (const struct record_cputime_state *state) +{ + (void) state; + return 0.0. +} + struct record_cputime_state *record_cputime_new (dds_entity_t wr) { (void) wr; diff --git a/src/tools/ddsperf/cputime.h b/src/tools/ddsperf/cputime.h index 3216692..7574356 100644 --- a/src/tools/ddsperf/cputime.h +++ b/src/tools/ddsperf/cputime.h @@ -19,6 +19,7 @@ struct record_cputime_state; struct record_cputime_state *record_cputime_new (dds_entity_t wr); void record_cputime_free (struct record_cputime_state *state); bool record_cputime (struct record_cputime_state *state, const char *prefix, dds_time_t tnow); +double record_cputime_read_rss (const struct record_cputime_state *state); bool print_cputime (const struct CPUStats *s, const char *prefix, bool print_host, bool is_fresh); #endif diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index 628778b..18eaad7 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -151,6 +151,18 @@ static dds_duration_t ping_intv; pongs had been received */ static uint32_t ping_timeouts = 0; +/* Maximum allowed increase in RSS between 2nd RSS sample and + final RSS sample: final one must be <= + init * (1 + rss_factor/100) + rss_term */ +static bool rss_check = false; +static double rss_factor = 0; +static double rss_term = 0; + +/* Minimum number of samples, minimum number of roundtrips to + declare the run a success */ +static uint64_t min_received = 0; +static uint64_t min_roundtrips = 0; + static ddsrt_mutex_t disc_lock; /* Publisher statistics and lock protecting it */ @@ -207,6 +219,7 @@ struct subthread_arg_pongstat { uint64_t min, max; uint64_t sum; uint32_t cnt; + uint64_t totcnt; uint64_t *raw; }; @@ -733,6 +746,7 @@ static bool update_roundtrip (dds_instance_handle_t pubhandle, uint64_t tdelta, if (x->cnt < PINGPONG_RAWSIZE) x->raw[x->cnt] = tdelta; x->cnt++; + x->totcnt++; ddsrt_mutex_unlock (&pongstat_lock); return allseen; } @@ -742,6 +756,7 @@ static bool update_roundtrip (dds_instance_handle_t pubhandle, uint64_t tdelta, x->pphandle = get_pphandle_for_pubhandle (pubhandle); x->min = x->max = x->sum = tdelta; x->cnt = 1; + x->totcnt = 1; x->raw = malloc (PINGPONG_RAWSIZE * sizeof (*x->raw)); x->raw[0] = tdelta; npongstat++; @@ -1343,7 +1358,7 @@ static int cmp_uint64 (const void *va, const void *vb) return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; } -static void print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state) +static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state) { char prefix[128]; const double ts = (double) (tnow - tref) / 1e9; @@ -1469,6 +1484,7 @@ static void print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str if (output) record_netload (netload_state, prefix, tnow); fflush (stdout); + return output; } static void subthread_arg_init (struct subthread_arg *arg, dds_entity_t rd, uint32_t max_samples) @@ -1555,8 +1571,14 @@ OPTIONS:\n\ -d DEV:BW report network load for device DEV with nominal\n\ bandwidth BW in bits/s (e.g., eth0:1e9)\n\ -D DUR run for at most DUR seconds\n\ - -N COUNT require at least COUNT matching participants\n\ - -M DUR require those participants to match within DUR seconds\n\ + -Q KEY:VAL set success criteria\n\ + rss:X%% max allowed increase in RSS, in %%\n\ + rss:X max allowed increase in RSS, in MB\n\ + samples:N min received messages by \"sub\"\n\ + roundtrips:N min roundtrips for \"pong\"\n\ + minmatch:N require >= N matching participants\n\ + maxwait:DUR require those participants to match\n\ + within DUR seconds\n\ -R TREF timestamps in the output relative to TREF instead of\n\ process start\n\ -i ID use domain ID instead of the default domain\n\ @@ -1867,7 +1889,7 @@ int main (int argc, char *argv[]) argv0 = argv[0]; - while ((opt = getopt (argc, argv, "cd:D:i:n:k:uLK:T:M:N:R:h")) != EOF) + while ((opt = getopt (argc, argv, "cd:D:i:n:k:uLK:T:Q:R:h")) != EOF) { switch (opt) { @@ -1897,11 +1919,28 @@ int main (int argc, char *argv[]) else if (strcmp (optarg, "UK1024") == 0) topicsel = UK1024; else error3 ("%s: unknown topic\n", optarg); break; - case 'M': maxwait = atof (optarg); if (maxwait <= 0) maxwait = HUGE_VAL; break; - case 'N': minmatch = (unsigned) atoi (optarg); break; + case 'Q': { + int pos; + double d; + unsigned long n; + if (sscanf (optarg, "rss:%lf%n", &d, &pos) == 1 && (optarg[pos] == 0 || optarg[pos] == '%')) { + if (optarg[pos] == 0) rss_term = d * 1048576.0; else rss_factor = 1.0 + d / 100.0; + rss_check = true; + } else if (sscanf (optarg, "samples:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { + min_received = (uint64_t) n; + } else if (sscanf (optarg, "roundtrips:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { + min_roundtrips = (uint64_t) n; + } else if (sscanf (optarg, "maxwait:%lf%n", &maxwait, &pos) == 1 && optarg[pos] == 0) { + maxwait = (maxwait <= 0) ? HUGE_VAL : maxwait; + } else if (sscanf (optarg, "minmatch:%lu%n", &n, &pos) == 1 && optarg[pos] == 0) { + minmatch = (uint32_t) n; + } else { + error3 ("-Q%s: invalid success criterium\n", optarg); + } + break; + } case 'R': tref = 0; sscanf (optarg, "%"SCNd64, &tref); break; - case 'h': usage (); break; - default: error3 ("-%c: unknown option\n", opt); break; + case 'h': default: usage (); break; } } @@ -2006,11 +2045,17 @@ int main (int argc, char *argv[]) /* participants reader must exist before the "publication matched" or "subscription matched" listener is invoked, or it won't be able to get the details (FIXME: even the DDS spec has convenience functions for that ...) */ + if ((rd_participants = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, NULL)) < 0) + error2 ("dds_create_reader(participants) failed: %d\n", (int) rd_participants); + /* set listener later: DATA_AVAILABLE still has the nasty habit of potentially triggering + before the reader is accessible to the application via its handle */ listener = dds_create_listener (NULL); dds_lset_data_available (listener, participant_data_listener); - if ((rd_participants = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, listener)) < 0) - error2 ("dds_create_reader(participants) failed: %d\n", (int) rd_participants); + dds_set_listener (rd_participants, listener); dds_delete_listener (listener); + /* then there is the matter of data arriving prior to setting the listener ... this state + of affairs is undoubtedly a bug */ + participant_data_listener (rd_participants, NULL); if ((rd_subscriptions = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL)) < 0) error2 ("dds_create_reader(subscriptions) failed: %d\n", (int) rd_subscriptions); if ((rd_publications = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0) @@ -2155,7 +2200,7 @@ int main (int argc, char *argv[]) const bool pingpong_waitset = (ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE) || pingpongmode == SM_WAITSET; if (pingpong_waitset) { - ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &subarg_pong); + ddsrt_thread_create (&subpingtid, "ping", &attr, subpingthread_waitset, &subarg_ping); ddsrt_thread_create (&subpongtid, "pong", &attr, subpongthread_waitset, &subarg_pong); } else @@ -2179,6 +2224,7 @@ int main (int argc, char *argv[]) dds_time_t tnext = tstart + DDS_SECS (1); dds_time_t tlast = tstart; dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv; + double rss_init = 0.0, rss_final = 0.0; while (!ddsrt_atomic_ld32 (&termflag) && tnow < tstop) { dds_time_t twakeup = DDS_NEVER; @@ -2244,12 +2290,17 @@ int main (int argc, char *argv[]) tnow = dds_time (); if (tnext <= tnow) { - print_stats (tref, tnow, tlast, cputime_state, netload_state); + bool output; + output = print_stats (tref, tnow, tlast, cputime_state, netload_state); tlast = tnow; if (tnow > tnext + DDS_MSECS (500)) tnext = tnow + DDS_SECS (1); else tnext += DDS_SECS (1); + + if (rss_init == 0.0 && matchcount >= minmatch && output) + rss_init = record_cputime_read_rss (cputime_state); + rss_final = record_cputime_read_rss (cputime_state); } /* If a "real" ping doesn't result in the expected number of pongs within a reasonable @@ -2317,8 +2368,13 @@ int main (int argc, char *argv[]) dds_delete (rd_data); uint64_t nlost = 0; + bool received_ok = true; for (uint32_t i = 0; i < eseq_admin.nph; i++) + { nlost += eseq_admin.stats[i].nlost; + if (eseq_admin.stats[i].nrecv < (uint64_t) min_received) + received_ok = false; + } fini_eseq_admin (&eseq_admin); subthread_arg_fini (&subarg_data); subthread_arg_fini (&subarg_ping); @@ -2330,8 +2386,13 @@ int main (int argc, char *argv[]) ddsrt_mutex_destroy (&pubstat_lock); hist_free (pubstat_hist); free (pongwr); + bool roundtrips_ok = true; for (uint32_t i = 0; i < npongstat; i++) + { + if (pongstat[i].totcnt < min_roundtrips) + roundtrips_ok = false; free (pongstat[i].raw); + } free (pongstat); bool ok = true; @@ -2360,5 +2421,20 @@ int main (int argc, char *argv[]) printf ("[%"PRIdPID"] error: %"PRIu64" samples lost\n", ddsrt_getpid (), nlost); ok = false; } + if (!roundtrips_ok) + { + printf ("[%"PRIdPID"] error: too few roundtrips for some peers\n", ddsrt_getpid ()); + ok = false; + } + if (!received_ok) + { + printf ("[%"PRIdPID"] error: too few samples received from some peers\n", ddsrt_getpid ()); + ok = false; + } + if (rss_check && rss_final >= rss_init * rss_factor + rss_term) + { + printf ("[%"PRIdPID"] error: RSS grew too much (%f -> %f)\n", ddsrt_getpid (), rss_init, rss_final); + ok = false; + } return ok ? 0 : 1; } diff --git a/src/tools/ddsperf/sanity.bash b/src/tools/ddsperf/sanity.bash new file mode 100755 index 0000000..45f425e --- /dev/null +++ b/src/tools/ddsperf/sanity.bash @@ -0,0 +1,23 @@ +exitcode=0 +# RSS/samples/roundtrip numbers are based on experimentation on Travis +bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 -Qsamples:300000 -Qroundtrips:3000 sub ping & ddsperf_pids=$! +bin/ddsperf -L -D10 -n10 -Qminmatch:2 -Qrss:10% -Qrss:0.5 pub & ddsperf_pids="$ddsperf_pids $!" +sleep 11 +for pid in $ddsperf_pids ; do + if kill -0 $pid 2>/dev/null ; then + echo "killing process $pid" + kill -9 $pid + exitcode=2 + fi + wait $pid + x=$? + if [[ $x -gt $exitcode ]] ; then + exitcode=$x + fi +done +if [[ $exitcode -gt 0 ]] ; then + echo "** FAILED **" +else + echo "** OK **" +fi +exit $exitcode