diff --git a/examples/perfscript/throughput-test b/examples/perfscript/throughput-test index 845768c..8cce52c 100755 --- a/examples/perfscript/throughput-test +++ b/examples/perfscript/throughput-test @@ -12,6 +12,8 @@ loopback=true resultdir="throughput-result" force=false netstats=false +watermarks="" +remotes="" usage () { cat >&2 <$cfg < \${async:-0} 2s + $watermarks fine @@ -111,7 +129,7 @@ fi if $provision ; then echo "provisioning ..." - for r in "$@" ; do + for r in $remotes ; do ssh $r mkdir -p $remotedir $remotedir/bin $remotedir/lib scp lib/libddsc.so.0 $r:$remotedir/lib scp bin/ddsperf $r:$remotedir/bin @@ -122,7 +140,7 @@ topic=KS [ -z "$sizelist" ] && topic=OU export CYCLONEDDS_URI=file://$PWD/$cfg -for r in "$@" ; do +for r in $remotes ; do scp $cfg $r:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 1 ; } done @@ -137,6 +155,8 @@ for async_mode in $asynclist ; do echo "======== ASYNC $async MODE $sub_mode =========" subpids="" + netstat_pids="" + trap dokill_and_exit SIGINT cat > run-subscriber.tmp < sub.log & pid=\$! +#/usr/sbin/tcpdump -c 20000 -s 0 -w /tmp/x.pcap -i eth0 -Z erik 'udp' & tcpdumppid=\$! +bin/ddsperf -1 -X -d $rnwif$bandwidth -c -T $topic sub $sub_mode > sub.log & pid=\$! echo \$pid > throughput-test-sub-\$pid.pid wait \$pid -#kill -INT \$tcpdumppid +[ -n "\$tcpdumppid" ] && kill -INT \$tcpdumppid EOF - for r in "$@" ; do + for r in $remotes ; do scp run-subscriber.tmp $r:$remotedir ssh $r ". $remotedir/run-subscriber.tmp" & subpids="$subpids $!" done @@ -170,20 +191,15 @@ EOF echo "size $size" #export trace=trace,-content bin/ddsperf -Q minmatch:$# -Q initwait:3 \ - -c -d $nwif$bandwidth \ + -X -c -d $nwif$bandwidth \ -D $timeout -T $topic \ pub size $size | \ tee -a $outdir/pub.log done - if $netstats ; then - kill $netstats_pids - fi - for r in "$@" ; do - ssh $r "kill -9 \`cat $remotedir/throughput-test-sub-*.pid\` ; rm $remotedir/throughput-test-sub-*.pid" - done + dokill wait - for r in "$@" ; do + for r in $remotes ; do scp $r:$remotedir/sub.log $outdir/sub-$r.log done @@ -192,9 +208,9 @@ EOF # col 1: appl receive bandwidth, 1s trailing average (Mb/s) # col 2: appl receive bandwidth, 10s trailing average (Mb/s) # (this assumes the network interface name is eth, en, or lo, optionally followed by a 0) - perl -ne 'if(/size \d+ total.* (\d+\.\d+) Mb\/s.* (\d+\.\d+) Mb\/s/){$r=$1;$r10=$2;}if(/(?:eth|en|lo)0?: xmit.*? recv (\d+\.\d+)/){printf "%f %f %f\n", $1, $r, $r10;}' $outdir/sub-$1.log > $resultdir/summary-$async_mode-$sub_mode.txt - perl -ne 'print "$1 $2\n" if /^(\d+)\s+(\d+)(\s+\d+)$/' $outdir/pub.log > $resultdir/rexmit-bytes.txt - perl -ne 'print "$1 $2\n" if /^DISCARDED\s+(\d+)\s+(\d+)$/' $outdir/sub-$1.log > $resultdir/discarded.txt + perl -ne 'if(/size \d+ total.* (\d+\.\d+) Mb\/s.* (\d+\.\d+) Mb\/s/){$r=$1;$r10=$2;}if(/(?:eth|en|lo)0?: xmit.*? recv (\d+\.\d+)/){$rnet=$1;}if(/discarded\s+(\d+)/){printf "%f %f %f %u\n", $rnet, $r, $r10, $1;}' $outdir/sub-$1.log > $resultdir/summary-$async_mode-$sub_mode.txt + perl -ne 'if(/(?:eth|en|lo)0?: xmit (\d+\.\d+) Mb\/s/){printf "%f\n", $1}' $outdir/pub.log > $resultdir/net-xmit-bytes.txt + perl -ne 'print "$1 $2 $3 $4 $5\n" if /^(\d+)\s+(\d+)(\s+\d+)$/ || /^\[\d+\]\s+(\d+\.\d+)\s+discarded\s+\d+\s+rexmit\s+(\d+)\s+[A-Za-z_ ]+(\d+)[A-Za-z_ ]+(\d+)[A-Za-z_ ]+(\d+)$/' $outdir/pub.log > $resultdir/rexmit-bytes.txt if $netstats ; then perl -ne 'print "$1\n" if /time=([0-9.]+)/' $outdir/ping.log > $resultdir/lat.log if [ "`uname -s`" = "Darwin" ] ; then diff --git a/src/core/ddsc/CMakeLists.txt b/src/core/ddsc/CMakeLists.txt index d5bae0c..cfba755 100644 --- a/src/core/ddsc/CMakeLists.txt +++ b/src/core/ddsc/CMakeLists.txt @@ -33,6 +33,7 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src" dds_waitset.c dds_readcond.c dds_guardcond.c + dds_statistics.c dds_subscriber.c dds_write.c dds_whc.c @@ -49,6 +50,7 @@ PREPEND(hdrs_public_ddsc "$m_kind]->interrupt) (e); @@ -200,6 +204,12 @@ inline bool dds_entity_supports_set_qos (struct dds_entity *e) { inline bool dds_entity_supports_validate_status (struct dds_entity *e) { return dds_entity_deriver_table[e->m_kind]->validate_status != dds_entity_deriver_dummy_validate_status; } +inline struct dds_statistics *dds_entity_deriver_create_statistics (const struct dds_entity *e) { + return dds_entity_deriver_table[e->m_kind]->create_statistics (e); +} +inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s) { + dds_entity_deriver_table[e->m_kind]->refresh_statistics (e, s); +} typedef struct dds_cyclonedds_entity { struct dds_entity m_entity; diff --git a/src/core/ddsc/src/dds_domain.c b/src/core/ddsc/src/dds_domain.c index eca5fdc..6de25fd 100644 --- a/src/core/ddsc/src/dds_domain.c +++ b/src/core/ddsc/src/dds_domain.c @@ -36,7 +36,9 @@ const struct dds_entity_deriver dds_entity_deriver_domain = { .close = dds_entity_deriver_dummy_close, .delete = dds_domain_free, .set_qos = dds_entity_deriver_dummy_set_qos, - .validate_status = dds_entity_deriver_dummy_validate_status + .validate_status = dds_entity_deriver_dummy_validate_status, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; static int dds_domain_compare (const void *va, const void *vb) diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 0e48251..1946ec0 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -62,6 +62,12 @@ dds_return_t dds_entity_deriver_dummy_set_qos (struct dds_entity *e, const dds_q dds_return_t dds_entity_deriver_dummy_validate_status (uint32_t mask) { (void) mask; return DDS_RETCODE_ILLEGAL_OPERATION; } +struct dds_statistics *dds_entity_deriver_dummy_create_statistics (const struct dds_entity *e) { + (void) e; return NULL; +} +void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s) { + (void) e; (void) s; +} extern inline void dds_entity_deriver_interrupt (struct dds_entity *e); extern inline void dds_entity_deriver_close (struct dds_entity *e); @@ -70,6 +76,8 @@ extern inline dds_return_t dds_entity_deriver_set_qos (struct dds_entity *e, con extern inline dds_return_t dds_entity_deriver_validate_status (struct dds_entity *e, uint32_t mask); extern inline bool dds_entity_supports_set_qos (struct dds_entity *e); extern inline bool dds_entity_supports_validate_status (struct dds_entity *e); +extern inline struct dds_statistics *dds_entity_deriver_create_statistics (const struct dds_entity *e); +extern inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s); static int compare_instance_handle (const void *va, const void *vb) { diff --git a/src/core/ddsc/src/dds_guardcond.c b/src/core/ddsc/src/dds_guardcond.c index 13e00f9..a76e920 100644 --- a/src/core/ddsc/src/dds_guardcond.c +++ b/src/core/ddsc/src/dds_guardcond.c @@ -27,7 +27,9 @@ const struct dds_entity_deriver dds_entity_deriver_guardcondition = { .close = dds_entity_deriver_dummy_close, .delete = dds_entity_deriver_dummy_delete, .set_qos = dds_entity_deriver_dummy_set_qos, - .validate_status = dds_entity_deriver_dummy_validate_status + .validate_status = dds_entity_deriver_dummy_validate_status, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; dds_entity_t dds_create_guardcondition (dds_entity_t owner) diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 05f0d2c..ffe41f2 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -41,7 +41,9 @@ const struct dds_entity_deriver dds_entity_deriver_cyclonedds = { .close = dds_close, .delete = dds_fini, .set_qos = dds_entity_deriver_dummy_set_qos, - .validate_status = dds_entity_deriver_dummy_validate_status + .validate_status = dds_entity_deriver_dummy_validate_status, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; dds_cyclonedds_entity dds_global; diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 5d84fb8..d089c1e 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -85,7 +85,9 @@ const struct dds_entity_deriver dds_entity_deriver_participant = { .close = dds_entity_deriver_dummy_close, .delete = dds_participant_delete, .set_qos = dds_participant_qos_set, - .validate_status = dds_participant_status_validate + .validate_status = dds_participant_status_validate, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener) diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index ca17999..17109b2 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -43,7 +43,9 @@ const struct dds_entity_deriver dds_entity_deriver_publisher = { .close = dds_entity_deriver_dummy_close, .delete = dds_entity_deriver_dummy_delete, .set_qos = dds_publisher_qos_set, - .validate_status = dds_publisher_status_validate + .validate_status = dds_publisher_status_validate, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; dds_entity_t dds__create_publisher_l (dds_participant *par, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener) diff --git a/src/core/ddsc/src/dds_readcond.c b/src/core/ddsc/src/dds_readcond.c index 3cd16fd..e0893ed 100644 --- a/src/core/ddsc/src/dds_readcond.c +++ b/src/core/ddsc/src/dds_readcond.c @@ -34,7 +34,9 @@ const struct dds_entity_deriver dds_entity_deriver_readcondition = { .close = dds_entity_deriver_dummy_close, .delete = dds_readcond_delete, .set_qos = dds_entity_deriver_dummy_set_qos, - .validate_status = dds_entity_deriver_dummy_validate_status + .validate_status = dds_entity_deriver_dummy_validate_status, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; dds_readcond *dds_create_readcond (dds_reader *rd, dds_entity_kind_t kind, uint32_t mask, dds_querycondition_filter_fn filter) diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 5af47a0..4c1f999 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -28,9 +28,11 @@ #include "dds/ddsi/q_thread.h" #include "dds/ddsi/ddsi_domaingv.h" #include "dds__builtin.h" +#include "dds__statistics.h" #include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/ddsi_entity_index.h" #include "dds/ddsi/ddsi_security_omg.h" +#include "dds/ddsi/ddsi_statistics.h" DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_reader) @@ -349,12 +351,35 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); } +static const struct dds_stat_keyvalue_descriptor dds_reader_statistics_kv[] = { + { "discarded_bytes", DDS_STAT_KIND_UINT64 } +}; + +static const struct dds_stat_descriptor dds_reader_statistics_desc = { + .count = sizeof (dds_reader_statistics_kv) / sizeof (dds_reader_statistics_kv[0]), + .kv = dds_reader_statistics_kv +}; + +static struct dds_statistics *dds_reader_create_statistics (const struct dds_entity *entity) +{ + return dds_alloc_statistics (entity, &dds_reader_statistics_desc); +} + +static void dds_reader_refresh_statistics (const struct dds_entity *entity, struct dds_statistics *stat) +{ + const struct dds_reader *rd = (const struct dds_reader *) entity; + if (rd->m_rd) + ddsi_get_reader_stats (rd->m_rd, &stat->kv[0].u.u64); +} + const struct dds_entity_deriver dds_entity_deriver_reader = { .interrupt = dds_entity_deriver_dummy_interrupt, .close = dds_reader_close, .delete = dds_reader_delete, .set_qos = dds_reader_qos_set, - .validate_status = dds_reader_status_validate + .validate_status = dds_reader_status_validate, + .create_statistics = dds_reader_create_statistics, + .refresh_statistics = dds_reader_refresh_statistics }; static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener, struct dds_rhc *rhc) diff --git a/src/core/ddsc/src/dds_statistics.c b/src/core/ddsc/src/dds_statistics.c new file mode 100644 index 0000000..4fe47fc --- /dev/null +++ b/src/core/ddsc/src/dds_statistics.c @@ -0,0 +1,87 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include + +#include "dds/ddsc/dds_statistics.h" +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/log.h" +#include "dds__entity.h" +#include "dds__statistics.h" + +struct dds_statistics *dds_alloc_statistics (const struct dds_entity *e, const struct dds_stat_descriptor *d) +{ + struct dds_statistics *s = ddsrt_malloc (sizeof (*s) + d->count * sizeof (s->kv[0])); + s->entity = e->m_hdllink.hdl; + s->opaque = e->m_iid; + s->time = 0; + s->count = d->count; + memset (s->kv, 0, d->count * sizeof (s->kv[0])); + for (size_t i = 0; i < s->count; i++) + { + s->kv[i].kind = d->kv[i].kind; + s->kv[i].name = d->kv[i].name; + } + return s; +} + +struct dds_statistics *dds_create_statistics (dds_entity_t entity) +{ + dds_entity *e; + struct dds_statistics *s; + if (dds_entity_pin (entity, &e) != DDS_RETCODE_OK) + return NULL; + struct thread_state1 * const ts1 = lookup_thread_state (); + thread_state_awake (ts1, &e->m_domain->gv); + if ((s = dds_entity_deriver_create_statistics (e)) != NULL) + dds_entity_deriver_refresh_statistics (e, s); + thread_state_asleep (ts1); + dds_entity_unpin (e); + return s; +} + +dds_return_t dds_refresh_statistics (struct dds_statistics *stat) +{ + dds_return_t rc; + dds_entity *e; + if (stat == NULL) + return DDS_RETCODE_BAD_PARAMETER; + if ((rc = dds_entity_pin (stat->entity, &e)) != DDS_RETCODE_OK) + return rc; + if (stat->opaque != e->m_iid) + { + dds_entity_unpin (e); + return DDS_RETCODE_BAD_PARAMETER; + } + struct thread_state1 * const ts1 = lookup_thread_state (); + thread_state_awake (ts1, &e->m_domain->gv); + stat->time = dds_time (); + dds_entity_deriver_refresh_statistics (e, stat); + thread_state_asleep (ts1); + dds_entity_unpin (e); + return DDS_RETCODE_OK; +} + +const struct dds_stat_keyvalue *dds_lookup_statistic (const struct dds_statistics *stat, const char *name) +{ + if (stat == NULL) + return NULL; + for (size_t i = 0; i < stat->count; i++) + if (strcmp (stat->kv[i].name, name) == 0) + return &stat->kv[i]; + return NULL; +} + +void dds_delete_statistics (struct dds_statistics *stat) +{ + ddsrt_free (stat); +} diff --git a/src/core/ddsc/src/dds_subscriber.c b/src/core/ddsc/src/dds_subscriber.c index e501f5b..3c9748b 100644 --- a/src/core/ddsc/src/dds_subscriber.c +++ b/src/core/ddsc/src/dds_subscriber.c @@ -42,7 +42,9 @@ const struct dds_entity_deriver dds_entity_deriver_subscriber = { .close = dds_entity_deriver_dummy_close, .delete = dds_entity_deriver_dummy_delete, .set_qos = dds_subscriber_qos_set, - .validate_status = dds_subscriber_status_validate + .validate_status = dds_subscriber_status_validate, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; dds_entity_t dds__create_subscriber_l (dds_participant *participant, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener) diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 0f48cbd..e2951f7 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -191,7 +191,9 @@ const struct dds_entity_deriver dds_entity_deriver_topic = { .close = dds_entity_deriver_dummy_close, .delete = dds_topic_delete, .set_qos = dds_topic_qos_set, - .validate_status = dds_topic_status_validate + .validate_status = dds_topic_status_validate, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; /** diff --git a/src/core/ddsc/src/dds_waitset.c b/src/core/ddsc/src/dds_waitset.c index 64e15d6..82c9b00 100644 --- a/src/core/ddsc/src/dds_waitset.c +++ b/src/core/ddsc/src/dds_waitset.c @@ -136,7 +136,9 @@ const struct dds_entity_deriver dds_entity_deriver_waitset = { .close = dds_waitset_close, .delete = dds_waitset_delete, .set_qos = dds_entity_deriver_dummy_set_qos, - .validate_status = dds_entity_deriver_dummy_validate_status + .validate_status = dds_entity_deriver_dummy_validate_status, + .create_statistics = dds_entity_deriver_dummy_create_statistics, + .refresh_statistics = dds_entity_deriver_dummy_refresh_statistics }; dds_entity_t dds_create_waitset (dds_entity_t owner) diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index cad0492..b539e22 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -30,6 +30,8 @@ #include "dds__qos.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds__whc.h" +#include "dds__statistics.h" +#include "dds/ddsi/ddsi_statistics.h" DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_writer) @@ -252,12 +254,38 @@ static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, boo return DDS_RETCODE_OK; } +static const struct dds_stat_keyvalue_descriptor dds_writer_statistics_kv[] = { + { "rexmit_bytes", DDS_STAT_KIND_UINT64 }, + { "throttle_count", DDS_STAT_KIND_UINT32 }, + { "time_throttle", DDS_STAT_KIND_UINT64 }, + { "time_rexmit", DDS_STAT_KIND_UINT64 } +}; + +static const struct dds_stat_descriptor dds_writer_statistics_desc = { + .count = sizeof (dds_writer_statistics_kv) / sizeof (dds_writer_statistics_kv[0]), + .kv = dds_writer_statistics_kv +}; + +static struct dds_statistics *dds_writer_create_statistics (const struct dds_entity *entity) +{ + return dds_alloc_statistics (entity, &dds_writer_statistics_desc); +} + +static void dds_writer_refresh_statistics (const struct dds_entity *entity, struct dds_statistics *stat) +{ + const struct dds_writer *wr = (const struct dds_writer *) entity; + if (wr->m_wr) + ddsi_get_writer_stats (wr->m_wr, &stat->kv[0].u.u64, &stat->kv[1].u.u32, &stat->kv[2].u.u64, &stat->kv[3].u.u64); +} + const struct dds_entity_deriver dds_entity_deriver_writer = { .interrupt = dds_writer_interrupt, .close = dds_writer_close, .delete = dds_writer_delete, .set_qos = dds_writer_qos_set, - .validate_status = dds_writer_status_validate + .validate_status = dds_writer_status_validate, + .create_statistics = dds_writer_create_statistics, + .refresh_statistics = dds_writer_refresh_statistics }; dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener) diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index e4bf0b2..63b8de2 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -30,6 +30,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" ddsi_sertopic_default.c ddsi_sertopic_pserop.c ddsi_sertopic_plist.c + ddsi_statistics.c ddsi_iid.c ddsi_tkmap.c ddsi_vendor.c @@ -98,6 +99,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" ddsi_serdata_default.h ddsi_serdata_pserop.h ddsi_serdata_plist.h + ddsi_statistics.h ddsi_iid.h ddsi_tkmap.h ddsi_vendor.h diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_statistics.h b/src/core/ddsi/include/dds/ddsi/ddsi_statistics.h new file mode 100644 index 0000000..821e775 --- /dev/null +++ b/src/core/ddsi/include/dds/ddsi/ddsi_statistics.h @@ -0,0 +1,30 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef _DDSI_STATISTICS_H_ +#define _DDSI_STATISTICS_H_ + +#include + +#if defined (__cplusplus) +extern "C" { +#endif + +struct reader; +struct writer; + +void ddsi_get_writer_stats (struct writer *wr, uint64_t * __restrict rexmit_bytes, uint32_t * __restrict throttle_count, uint64_t * __restrict time_throttled, uint64_t * __restrict time_retransmit); +void ddsi_get_reader_stats (struct reader *rd, uint64_t * __restrict discarded_bytes); + +#if defined (__cplusplus) +} +#endif +#endif diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index b87d36a..c0ca424 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -312,6 +312,7 @@ struct writer struct ldur_fhnode *lease_duration; /* fibheap node to keep lease duration for this writer, NULL in case of automatic liveliness with inifite duration */ struct whc *whc; /* WHC tracking history, T-L durability service history + samples by sequence number for retransmit */ uint32_t whc_low, whc_high; /* watermarks for WHC in bytes (counting only unack'd data) */ + ddsrt_etime_t t_rexmit_start; ddsrt_etime_t t_rexmit_end; /* time of last 1->0 transition of "retransmitting" */ ddsrt_etime_t t_whc_high_upd; /* time "whc_high" was last updated for controlled ramp-up of throughput */ uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */ @@ -329,6 +330,9 @@ struct writer uint32_t throttle_tracing; uint32_t rexmit_count; /* cum samples retransmitted (counting events; 1 sample can be counted many times) */ uint32_t rexmit_lost_count; /* cum samples lost but retransmit requested (also counting events) */ + uint64_t rexmit_bytes; /* cum bytes queued for retransmit */ + uint64_t time_throttled; /* cum time in throttled state */ + uint64_t time_retransmit; /* cum time in retransmitting state */ struct xeventq *evq; /* timed event queue to be used by this writer */ struct local_reader_ary rdary; /* LOCAL readers for fast-pathing; if not fast-pathed, fall back to scanning local_readers */ struct lease *lease; /* for liveliness administration (writer can only become inactive when using manual liveliness) */ @@ -759,7 +763,6 @@ void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ void connect_writer_with_proxy_reader_secure(struct writer *wr, struct proxy_reader *prd, ddsrt_mtime_t tnow, int64_t crypto_handle); void connect_reader_with_proxy_writer_secure(struct reader *rd, struct proxy_writer *pwr, ddsrt_mtime_t tnow, int64_t crypto_handle); - struct ddsi_writer_info; DDS_EXPORT void ddsi_make_writer_info(struct ddsi_writer_info *wrinfo, const struct entity_common *e, const struct dds_qos *xqos, uint32_t statusinfo); diff --git a/src/core/ddsi/include/dds/ddsi/q_radmin.h b/src/core/ddsi/include/dds/ddsi/q_radmin.h index ba89518..a5edb2b 100644 --- a/src/core/ddsi/include/dds/ddsi/q_radmin.h +++ b/src/core/ddsi/include/dds/ddsi/q_radmin.h @@ -255,6 +255,9 @@ void nn_dqueue_enqueue_callback (struct nn_dqueue *q, nn_dqueue_callback_t cb, v int nn_dqueue_is_full (struct nn_dqueue *q); void nn_dqueue_wait_until_empty_if_full (struct nn_dqueue *q); +void nn_defrag_stats (struct nn_defrag *defrag, uint64_t *discarded_bytes); +void nn_reorder_stats (struct nn_reorder *reorder, uint64_t *discarded_bytes); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsi/src/ddsi_statistics.c b/src/core/ddsi/src/ddsi_statistics.c new file mode 100644 index 0000000..f715b59 --- /dev/null +++ b/src/core/ddsi/src/ddsi_statistics.c @@ -0,0 +1,65 @@ +/* + * Copyright(c) 2020 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include "dds/ddsrt/sync.h" +#include "dds/ddsi/ddsi_domaingv.h" +#include "dds/ddsi/ddsi_entity_index.h" +#include "dds/ddsi/ddsi_statistics.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_radmin.h" + +void ddsi_get_writer_stats (struct writer *wr, uint64_t * __restrict rexmit_bytes, uint32_t * __restrict throttle_count, uint64_t * __restrict time_throttled, uint64_t * __restrict time_retransmit) +{ + ddsrt_mutex_lock (&wr->e.lock); + *rexmit_bytes = wr->rexmit_bytes; + *throttle_count = wr->throttle_count; + *time_throttled = wr->time_throttled; + *time_retransmit = wr->time_retransmit; + ddsrt_mutex_unlock (&wr->e.lock); +} + +void ddsi_get_reader_stats (struct reader *rd, uint64_t * __restrict discarded_bytes) +{ + struct rd_pwr_match *m; + ddsi_guid_t pwrguid; + memset (&pwrguid, 0, sizeof (pwrguid)); + assert (thread_is_awake ()); + + *discarded_bytes = 0; + + // collect for all matched proxy writers + ddsrt_mutex_lock (&rd->e.lock); + while ((m = ddsrt_avl_lookup_succ (&rd_writers_treedef, &rd->writers, &pwrguid)) != NULL) + { + struct proxy_writer *pwr; + pwrguid = m->pwr_guid; + ddsrt_mutex_unlock (&rd->e.lock); + if ((pwr = entidx_lookup_proxy_writer_guid (rd->e.gv->entity_index, &pwrguid)) != NULL) + { + uint64_t disc_frags, disc_samples; + ddsrt_mutex_lock (&pwr->e.lock); + struct pwr_rd_match *x = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &rd->e.guid); + if (x != NULL) + { + nn_defrag_stats (pwr->defrag, &disc_frags); + if (x->in_sync != PRMSS_OUT_OF_SYNC && !x->filtered) + nn_reorder_stats (pwr->reorder, &disc_samples); + else + nn_reorder_stats (x->u.not_in_sync.reorder, &disc_samples); + *discarded_bytes += disc_frags + disc_samples; + } + ddsrt_mutex_unlock (&pwr->e.lock); + } + ddsrt_mutex_lock (&rd->e.lock); + } + ddsrt_mutex_unlock (&rd->e.lock); +} diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index b58c334..3035b64 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -3494,6 +3494,7 @@ void writer_set_retransmitting (struct writer *wr) { assert (!wr->retransmitting); wr->retransmitting = 1; + wr->t_rexmit_start = ddsrt_time_elapsed(); if (wr->e.gv->config.whc_adaptive && wr->whc_high > wr->whc_low) { uint32_t m = 8 * wr->whc_high / 10; @@ -3505,6 +3506,7 @@ void writer_clear_retransmitting (struct writer *wr) { wr->retransmitting = 0; wr->t_whc_high_upd = wr->t_rexmit_end = ddsrt_time_elapsed(); + wr->time_retransmit += (uint64_t) (wr->t_rexmit_end.v - wr->t_rexmit_start.v); ddsrt_cond_broadcast (&wr->throttle_cond); } @@ -3624,6 +3626,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se wr->throttling = 0; wr->retransmitting = 0; wr->t_rexmit_end.v = 0; + wr->t_rexmit_start.v = 0; wr->t_whc_high_upd.v = 0; wr->num_readers = 0; wr->num_reliable_readers = 0; @@ -3633,6 +3636,9 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se wr->throttle_tracing = 0; wr->rexmit_count = 0; wr->rexmit_lost_count = 0; + wr->rexmit_bytes = 0; + wr->time_throttled = 0; + wr->time_retransmit = 0; wr->force_md5_keyhash = 0; wr->alive = 1; wr->alive_vclock = 0; diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index 6aa3694..1a0cb0f 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -862,6 +862,7 @@ struct nn_defrag { uint32_t n_samples; uint32_t max_samples; enum nn_defrag_drop_mode drop_mode; + uint64_t discarded_bytes; const struct ddsrt_log_cfg *logcfg; bool trace; }; @@ -897,11 +898,17 @@ struct nn_defrag *nn_defrag_new (const struct ddsrt_log_cfg *logcfg, enum nn_def d->max_samples = max_samples; d->n_samples = 0; d->max_sample = NULL; + d->discarded_bytes = 0; d->logcfg = logcfg; d->trace = (logcfg->c.mask & DDS_LC_RADMIN) != 0; return d; } +void nn_defrag_stats (struct nn_defrag *defrag, uint64_t *discarded_bytes) +{ + *discarded_bytes = defrag->discarded_bytes; +} + void nn_fragchain_adjust_refcount (struct nn_rdata *frag, int adjust) { RDATATRACE (frag, "fragchain_adjust_refcount(%p, %d)\n", (void *) frag, adjust); @@ -1156,7 +1163,7 @@ static void rsample_convert_defrag_to_reorder (struct nn_rsample *sample) sample->u.reorder.n_samples = 1; } -static struct nn_rsample *defrag_add_fragment (const struct nn_defrag *defrag, struct nn_rsample *sample, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo) +static struct nn_rsample *defrag_add_fragment (struct nn_defrag *defrag, struct nn_rsample *sample, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo) { struct nn_rsample_defrag *dfsample = &sample->u.defrag; struct nn_defrag_iv *predeq, *succ; @@ -1202,6 +1209,7 @@ static struct nn_rsample *defrag_add_fragment (const struct nn_defrag *defrag, s /* new is contained in predeq, discard new; rdata did not cause completion of a sample */ TRACE (defrag, " new contained in predeq\n"); + defrag->discarded_bytes += maxp1 - min; return NULL; } else if (min <= predeq->maxp1) @@ -1654,6 +1662,7 @@ struct nn_reorder { enum nn_reorder_mode mode; uint32_t max_samples; uint32_t n_samples; + uint64_t discarded_bytes; const struct ddsrt_log_cfg *logcfg; bool late_ack_mode; bool trace; @@ -1673,12 +1682,18 @@ struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_r r->mode = mode; r->max_samples = max_samples; r->n_samples = 0; + r->discarded_bytes = 0; r->late_ack_mode = late_ack_mode; r->logcfg = logcfg; r->trace = (logcfg->c.mask & DDS_LC_RADMIN) != 0; return r; } +void nn_reorder_stats (struct nn_reorder *reorder, uint64_t *discarded_bytes) +{ + *discarded_bytes = reorder->discarded_bytes; +} + void nn_fragchain_unref (struct nn_rdata *frag) { struct nn_rdata *frag1; @@ -1844,6 +1859,7 @@ static void delete_last_sample (struct nn_reorder *reorder) /* Last sample is in an interval of its own - delete it, and recalc max_sampleiv. */ TRACE (reorder, " delete_last_sample: in singleton interval\n"); + reorder->discarded_bytes += last->sc.first->sampleinfo->size; fragchain = last->sc.first->fragchain; ddsrt_avl_delete (&reorder_sampleivtree_treedef, &reorder->sampleivtree, reorder->max_sampleiv); reorder->max_sampleiv = ddsrt_avl_find_max (&reorder_sampleivtree_treedef, &reorder->sampleivtree); @@ -1867,6 +1883,7 @@ static void delete_last_sample (struct nn_reorder *reorder) pe = e; e = e->next; } while (e != last->sc.last); + reorder->discarded_bytes += e->sampleinfo->size; fragchain = e->fragchain; pe->next = NULL; assert (pe->sampleinfo->seq + 1 < last->maxp1); @@ -1928,6 +1945,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r if (delivery_queue_full_p) { TRACE (reorder, " discarding deliverable sample: delivery queue is full\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } @@ -1960,6 +1978,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r /* we've moved beyond this one: discard it; no need to adjust n_samples */ TRACE (reorder, " discard: too old\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_TOO_OLD; /* don't want refcount increment */ } else if (ddsrt_avl_is_empty (&reorder->sampleivtree)) @@ -1972,6 +1991,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r if (reorder->max_samples == 0) { TRACE (reorder, " NOT - max_samples hit\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } else @@ -1988,6 +2008,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r { /* growing last inteval will not be accepted when this flag is set */ TRACE (reorder, " discarding sample: only accepting delayed samples due to backlog in delivery queue\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } @@ -2001,6 +2022,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r else { TRACE (reorder, " discarding sample: max_samples reached and sample at end\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } } @@ -2010,6 +2032,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r { /* new interval at the end will not be accepted when this flag is set */ TRACE (reorder, " discarding sample: only accepting delayed samples due to backlog in delivery queue\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } if (reorder->n_samples < reorder->max_samples) @@ -2022,6 +2045,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r else { TRACE (reorder, " discarding sample: max_samples reached and sample at end\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } } @@ -2040,6 +2064,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r if (reorder->late_ack_mode && delivery_queue_full_p) { TRACE (reorder, " discarding sample: delivery queue full\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } @@ -2053,6 +2078,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r { /* contained in predeq */ TRACE (reorder, " discard: contained in predeq\n"); + reorder->discarded_bytes += s->sc.first->sampleinfo->size; return NN_REORDER_REJECT; } diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index a7d5403..4940dd7 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -985,6 +985,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const uint32_t sent = ddsi_serdata_size (sample.serdata); if (sent > wr->e.gv->config.fragment_size) sent = wr->e.gv->config.fragment_size; + wr->rexmit_bytes += sent; limit = (sent > limit) ? 0 : limit - sent; } } @@ -1015,6 +1016,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const uint32_t sent = ddsi_serdata_size (sample.serdata); if (sent > wr->e.gv->config.fragment_size) sent = wr->e.gv->config.fragment_size; + wr->rexmit_bytes += sent; limit = (sent > limit) ? 0 : limit - sent; } } @@ -1612,6 +1614,7 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons { sent = true; nfrags_lim--; + wr->rexmit_bytes += wr->e.gv->config.fragment_size; } } } diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index d2ec15f..a0e1b14 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -1035,8 +1035,9 @@ static dds_return_t throttle_writer (struct thread_state1 * const ts1, struct nn writer. */ struct ddsi_domaingv const * const gv = wr->e.gv; dds_return_t result = DDS_RETCODE_OK; - ddsrt_mtime_t tnow = ddsrt_time_monotonic (); - const ddsrt_mtime_t abstimeout = ddsrt_mtime_add_duration (tnow, wr->xqos->reliability.max_blocking_time); + const ddsrt_mtime_t throttle_start = ddsrt_time_monotonic (); + const ddsrt_mtime_t abstimeout = ddsrt_mtime_add_duration (throttle_start, wr->xqos->reliability.max_blocking_time); + ddsrt_mtime_t tnow = throttle_start; struct whc_state whcst; whc_get_state (wr->whc, &whcst); @@ -1090,6 +1091,7 @@ static dds_return_t throttle_writer (struct thread_state1 * const ts1, struct nn } wr->throttling--; + wr->time_throttled += (uint64_t) (ddsrt_time_monotonic().v - throttle_start.v); if (wr->state != WRST_OPERATIONAL) { /* gc_delete_writer may be waiting */ diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 9d80dfb..f17a93e 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -156,6 +156,8 @@ struct xeventq { ddsrt_cond_t cond; ddsi_tran_conn_t tev_conn; uint32_t auxiliary_bandwidth_limit; + + size_t cum_rexmit_bytes; }; static uint32_t xevent_thread (struct xeventq *xevq); @@ -185,6 +187,7 @@ static void update_rexmit_counts (struct xeventq *evq, struct xevent_nt *ev) assert (evq->queued_rexmit_msgs > 0); evq->queued_rexmit_bytes -= ev->u.msg_rexmit.queued_rexmit_bytes; evq->queued_rexmit_msgs--; + evq->cum_rexmit_bytes += ev->u.msg_rexmit.queued_rexmit_bytes; } #if 0 @@ -526,6 +529,8 @@ struct xeventq * xeventq_new evq->gv = conn->m_base.gv; ddsrt_mutex_init (&evq->lock); ddsrt_cond_init (&evq->cond); + + evq->cum_rexmit_bytes = 0; return evq; } diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index e6e8e50..c099978 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -25,6 +25,7 @@ #endif #include "dds/dds.h" +#include "dds/ddsc/dds_statistics.h" #include "ddsperf_types.h" #include "dds/ddsrt/process.h" @@ -96,6 +97,9 @@ static enum submode pingpongmode = SM_LISTENER; /* Whether to show "sub" stats every second even when nothing happens */ static bool substat_every_second = false; +/* Whether to show extended statistics (currently just rexmit info) */ +static bool extended_stats = false; + /* Size of the sequence in KeyedSeq type in bytes */ static uint32_t baggagesize = 0; @@ -1367,7 +1371,17 @@ static int cmp_uint64 (const void *va, const void *vb) return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; } -static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state) +struct dds_stats { + struct dds_statistics *pubstat; + const struct dds_stat_keyvalue *rexmit_bytes; + const struct dds_stat_keyvalue *time_throttle; + const struct dds_stat_keyvalue *time_rexmit; + const struct dds_stat_keyvalue *throttle_count; + struct dds_statistics *substat; + const struct dds_stat_keyvalue *discarded_bytes; +}; + +static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state, struct dds_stats *stats) { char prefix[128]; const double ts = (double) (tnow - tref) / 1e9; @@ -1501,6 +1515,14 @@ static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str if (output) record_netload (netload_state, prefix, tnow); + + if (extended_stats && output && stats) + { + (void) dds_refresh_statistics (stats->substat); + (void) dds_refresh_statistics (stats->pubstat); + printf ("%s discarded %"PRIu64" rexmit %"PRIu64" Trexmit %"PRIu64" Tthrottle %"PRIu64" Nthrottle %"PRIu32"\n", prefix, stats->discarded_bytes->u.u64, stats->rexmit_bytes->u.u64, stats->time_rexmit->u.u64, stats->time_throttle->u.u64, stats->throttle_count->u.u32); + } + fflush (stdout); return output; } @@ -1554,7 +1576,7 @@ static void sigxfsz_handler (int sig __attribute__ ((unused))) if (write (2, msg, sizeof (msg) - 1) < 0) { /* may not ignore return value according to Linux/gcc */ } - print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL); + print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL, NULL); kill (getpid (), 9); } } @@ -1611,6 +1633,7 @@ OPTIONS:\n\ anything.)\n\ -1 print \"sub\" stats every second, even when there is\n\ data\n\ + -X output extended statistics\n\ -i ID use domain ID instead of the default domain\n\ \n\ MODE... is zero or more of:\n\ @@ -1920,7 +1943,7 @@ int main (int argc, char *argv[]) argv0 = argv[0]; - while ((opt = getopt (argc, argv, "1cd:D:i:n:k:uLK:T:Q:R:h")) != EOF) + while ((opt = getopt (argc, argv, "1cd:D:i:n:k:uLK:T:Q:R:Xh")) != EOF) { int pos; switch (opt) @@ -1976,6 +1999,7 @@ int main (int argc, char *argv[]) } break; } + case 'X': extended_stats = true; break; case 'R': { tref = 0; if (sscanf (optarg, "%"SCNd64"%n", &tref, &pos) != 1 || optarg[pos] != 0) @@ -2272,6 +2296,35 @@ int main (int argc, char *argv[]) struct record_cputime_state *cputime_state; cputime_state = record_cputime_new (wr_stat); + struct dds_stats stats; + const struct dds_stat_keyvalue dummy_u64 = { .name = "", .kind = DDS_STAT_KIND_UINT64, .u.u64 = 0 }; + const struct dds_stat_keyvalue dummy_u32 = { .name = "", .kind = DDS_STAT_KIND_UINT32, .u.u32 = 0 }; + stats.substat = dds_create_statistics (rd_data); + stats.discarded_bytes = dds_lookup_statistic (stats.substat, "discarded_bytes"); + stats.pubstat = dds_create_statistics (wr_data); + stats.rexmit_bytes = dds_lookup_statistic (stats.pubstat, "rexmit_bytes"); + stats.time_rexmit = dds_lookup_statistic (stats.pubstat, "time_rexmit"); + stats.time_throttle = dds_lookup_statistic (stats.pubstat, "time_throttle"); + stats.throttle_count = dds_lookup_statistic (stats.pubstat, "throttle_count"); + if (stats.discarded_bytes == NULL) + stats.discarded_bytes = &dummy_u64; + if (stats.rexmit_bytes == NULL) + stats.rexmit_bytes = &dummy_u64; + if (stats.time_rexmit == NULL) + stats.time_rexmit = &dummy_u64; + if (stats.time_throttle == NULL) + stats.time_throttle = &dummy_u64; + if (stats.throttle_count == NULL) + stats.throttle_count = &dummy_u32; + if (stats.discarded_bytes->kind != DDS_STAT_KIND_UINT64 || + stats.rexmit_bytes->kind != DDS_STAT_KIND_UINT64 || + stats.time_rexmit->kind != DDS_STAT_KIND_UINT64 || + stats.time_throttle->kind != DDS_STAT_KIND_UINT64 || + stats.throttle_count->kind != DDS_STAT_KIND_UINT32) + { + abort (); + } + /* I hate Unix signals in multi-threaded processes ... */ #ifdef _WIN32 signal (SIGINT, signal_handler); @@ -2364,7 +2417,7 @@ int main (int argc, char *argv[]) if (tnext <= tnow) { bool output; - output = print_stats (tref, tnow, tlast, cputime_state, netload_state); + output = print_stats (tref, tnow, tlast, cputime_state, netload_state, &stats); tlast = tnow; if (tnow > tnext + DDS_MSECS (500)) tnext = tnow + DDS_SECS (1); @@ -2386,6 +2439,9 @@ int main (int argc, char *argv[]) maybe_send_new_ping (tnow, &tnextping); } } + + dds_delete_statistics (stats.pubstat); + dds_delete_statistics (stats.substat); record_netload_free (netload_state); record_cputime_free (cputime_state);