Quick-n-dirty statistics framework

This adds a set of functions:

* dds_create_statistics
* dds_refresh_statistics
* dds_delete_statistics
* dds_lookup_statistic

to poll entities for information on their state, returned as a set of
name-value pairs.  The interface and selection of statistics (and
naming) is all provisional, and for this reason the
dds/ddsc/dds_statistisc.h file is not included by dds.h.

Currently, the only statistics available relate to retansmits and are
optionally output by ddsperf.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-06-27 11:37:20 +02:00 committed by eboasson
parent b116e6e41e
commit dff08536c4
29 changed files with 575 additions and 36 deletions

View file

@ -12,6 +12,8 @@ loopback=true
resultdir="throughput-result"
force=false
netstats=false
watermarks=""
remotes=""
usage () {
cat >&2 <<EOF
@ -36,6 +38,7 @@ OPTIONS
-l LOOPBACK enable/disable multicast loopback (true/false, default: $loopback)
-o DIR store results in dir (default: $resultdir)
-f "force": first do "rm -rf" of output dir, then create it
-W set high water mark to 100kB
-X run ping to first remote; on macOS: log interface stats
Local host runs "ddsperf" in subscriber mode, first remote runs it publisher
@ -46,7 +49,19 @@ EOF
exit 1
}
while getopts "fi:I:b:d:pa:m:s:t:o:l:X" opt ; do
dokill () {
[ -n "$netstats_pids" ] && kill $netstats_pids
for r in $remotes ; do
ssh $r "kill -9 \`cat $remotedir/throughput-test-sub-*.pid\` ; rm $remotedir/throughput-test-sub-*.pid"
done
}
dokill_and_exit () {
dokill
exit 1
}
while getopts "fi:I:b:d:pa:m:s:t:o:l:WX" opt ; do
case $opt in
f) force=true ;;
i) nwif="$OPTARG" ;;
@ -64,12 +79,14 @@ while getopts "fi:I:b:d:pa:m:s:t:o:l:X" opt ; do
l) loopback="OPTARG" ;;
t) timeout="$OPTARG" ;;
o) resultdir="$OPTARG" ;;
W) watermarks="<Watermarks><WhcHigh>100kB</WhcHigh></Watermarks>" ;;
X) netstats=true ;;
h) usage ;;
esac
done
shift $((OPTIND-1))
if [ $# -lt 1 ] ; then usage ; fi
remotes="$@"
[ -z "$rnwif" ] && rnwif=$nwif
cfg=cdds-simple.xml
@ -83,6 +100,7 @@ cat >$cfg <<EOF
<Internal>
<SynchronousDeliveryPriorityThreshold>\${async:-0}</SynchronousDeliveryPriorityThreshold>
<LeaseDuration>2s</LeaseDuration>
$watermarks
</Internal>
<Tracing>
<Verbosity>fine</Verbosity>
@ -111,7 +129,7 @@ fi
if $provision ; then
echo "provisioning ..."
for r in "$@" ; do
for r in $remotes ; do
ssh $r mkdir -p $remotedir $remotedir/bin $remotedir/lib
scp lib/libddsc.so.0 $r:$remotedir/lib
scp bin/ddsperf $r:$remotedir/bin
@ -122,7 +140,7 @@ topic=KS
[ -z "$sizelist" ] && topic=OU
export CYCLONEDDS_URI=file://$PWD/$cfg
for r in "$@" ; do
for r in $remotes ; do
scp $cfg $r:$remotedir || { echo "failed to copy $cfg to $remote:$PWD" >&2 ; exit 1 ; }
done
@ -137,6 +155,8 @@ for async_mode in $asynclist ; do
echo "======== ASYNC $async MODE $sub_mode ========="
subpids=""
netstat_pids=""
trap dokill_and_exit SIGINT
cat > run-subscriber.tmp <<EOF
export CYCLONEDDS_URI=file://$remotedir/$cfg
export async=$async
@ -145,12 +165,13 @@ export logdir=.
#export trace=trace,-content
cd $remotedir
#/usr/sbin/tcpdump -c 20000 -s 0 -w /tmp/x.pcap -i eth0 -Z erik 'udp[8:4]=0x52545053' & tcpdumppid=\$!
bin/ddsperf -1 -d $rnwif$bandwidth -c -T $topic sub $sub_mode > sub.log & pid=\$!
#/usr/sbin/tcpdump -c 20000 -s 0 -w /tmp/x.pcap -i eth0 -Z erik 'udp' & tcpdumppid=\$!
bin/ddsperf -1 -X -d $rnwif$bandwidth -c -T $topic sub $sub_mode > sub.log & pid=\$!
echo \$pid > throughput-test-sub-\$pid.pid
wait \$pid
#kill -INT \$tcpdumppid
[ -n "\$tcpdumppid" ] && kill -INT \$tcpdumppid
EOF
for r in "$@" ; do
for r in $remotes ; do
scp run-subscriber.tmp $r:$remotedir
ssh $r ". $remotedir/run-subscriber.tmp" & subpids="$subpids $!"
done
@ -170,20 +191,15 @@ EOF
echo "size $size"
#export trace=trace,-content
bin/ddsperf -Q minmatch:$# -Q initwait:3 \
-c -d $nwif$bandwidth \
-X -c -d $nwif$bandwidth \
-D $timeout -T $topic \
pub size $size | \
tee -a $outdir/pub.log
done
if $netstats ; then
kill $netstats_pids
fi
for r in "$@" ; do
ssh $r "kill -9 \`cat $remotedir/throughput-test-sub-*.pid\` ; rm $remotedir/throughput-test-sub-*.pid"
done
dokill
wait
for r in "$@" ; do
for r in $remotes ; do
scp $r:$remotedir/sub.log $outdir/sub-$r.log
done
@ -192,9 +208,9 @@ EOF
# col 1: appl receive bandwidth, 1s trailing average (Mb/s)
# col 2: appl receive bandwidth, 10s trailing average (Mb/s)
# (this assumes the network interface name is eth, en, or lo, optionally followed by a 0)
perl -ne 'if(/size \d+ total.* (\d+\.\d+) Mb\/s.* (\d+\.\d+) Mb\/s/){$r=$1;$r10=$2;}if(/(?:eth|en|lo)0?: xmit.*? recv (\d+\.\d+)/){printf "%f %f %f\n", $1, $r, $r10;}' $outdir/sub-$1.log > $resultdir/summary-$async_mode-$sub_mode.txt
perl -ne 'print "$1 $2\n" if /^(\d+)\s+(\d+)(\s+\d+)$/' $outdir/pub.log > $resultdir/rexmit-bytes.txt
perl -ne 'print "$1 $2\n" if /^DISCARDED\s+(\d+)\s+(\d+)$/' $outdir/sub-$1.log > $resultdir/discarded.txt
perl -ne 'if(/size \d+ total.* (\d+\.\d+) Mb\/s.* (\d+\.\d+) Mb\/s/){$r=$1;$r10=$2;}if(/(?:eth|en|lo)0?: xmit.*? recv (\d+\.\d+)/){$rnet=$1;}if(/discarded\s+(\d+)/){printf "%f %f %f %u\n", $rnet, $r, $r10, $1;}' $outdir/sub-$1.log > $resultdir/summary-$async_mode-$sub_mode.txt
perl -ne 'if(/(?:eth|en|lo)0?: xmit (\d+\.\d+) Mb\/s/){printf "%f\n", $1}' $outdir/pub.log > $resultdir/net-xmit-bytes.txt
perl -ne 'print "$1 $2 $3 $4 $5\n" if /^(\d+)\s+(\d+)(\s+\d+)$/ || /^\[\d+\]\s+(\d+\.\d+)\s+discarded\s+\d+\s+rexmit\s+(\d+)\s+[A-Za-z_ ]+(\d+)[A-Za-z_ ]+(\d+)[A-Za-z_ ]+(\d+)$/' $outdir/pub.log > $resultdir/rexmit-bytes.txt
if $netstats ; then
perl -ne 'print "$1\n" if /time=([0-9.]+)/' $outdir/ping.log > $resultdir/lat.log
if [ "`uname -s`" = "Darwin" ] ; then

View file

@ -33,6 +33,7 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds_waitset.c
dds_readcond.c
dds_guardcond.c
dds_statistics.c
dds_subscriber.c
dds_write.c
dds_whc.c
@ -49,6 +50,7 @@ PREPEND(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include/dd
ddsc/dds_public_qos.h
ddsc/dds_public_qosdefs.h
ddsc/dds_public_status.h
ddsc/dds_statistics.h
ddsc/dds_rhc.h
ddsc/dds_internal_api.h
)
@ -69,6 +71,7 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds__guardcond.h
dds__reader.h
dds__rhc_default.h
dds__statistics.h
dds__subscriber.h
dds__topic.h
dds__types.h

View file

@ -0,0 +1,107 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDS_STATISTICS_H
#define DDS_STATISTICS_H
/* A quick-and-dirty provisional interface */
#include "dds/dds.h"
#include "dds/ddsrt/attributes.h"
#include "dds/export.h"
#if defined (__cplusplus)
extern "C" {
#endif
enum dds_stat_kind {
DDS_STAT_KIND_UINT32, ///< value is a 32-bit unsigned integer
DDS_STAT_KIND_UINT64, ///< value is a 64-bit unsigned integer
DDS_STAT_KIND_LENGTHTIME ///< value is integral(length(t) dt)
};
struct dds_stat_keyvalue {
const char *name; ///< name, memory owned by library
enum dds_stat_kind kind; ///< value type
union {
uint32_t u32;
uint64_t u64;
uint64_t lengthtime;
} u;
};
struct dds_statistics {
dds_entity_t entity; ///< handle of entity to which this set of values applies
uint64_t opaque; ///< internal data
dds_time_t time; ///< time stamp of latest call to `dds_refresh_statistics`
size_t count; ///< number of key-value pairs
struct dds_stat_keyvalue kv[]; ///< data
};
/** @brief Allocate a new statistics object for entity
*
* This allocates and populates a newly allocated `struct dds_statistics` for the
* specified entity.
*
* @param[in] entity the handle of the entity
*
* @returns a newly allocated and populated statistics structure or NULL if entity is
* invalid or doesn't support any statistics.
*/
DDS_EXPORT struct dds_statistics *dds_create_statistics (dds_entity_t entity);
/** @brief Update a previously created statistics structure with current values
*
* Only the time stamp and the values (and "opaque") may change. The set of keys and the
* types of the values do not change.
*
* @param[in,out] stat statistics structure to update the values of
*
* @returns success or an error indication
*
* @retval DDS_RETCODE_OK
* the data was successfully updated
* @retval DDS_RETCODE_BAD_PARAMETER
* stats is a null pointer or the referenced entity no longer exists
* @retval DDS_RETCODE_PRECONDITION_NOT_MET
* library was deinitialized
*/
DDS_EXPORT dds_return_t dds_refresh_statistics (struct dds_statistics *stat);
/** @brief Free a previously created statistics object
*
* This frees the statistics object. Passing a null pointer is a no-op. The operation
* succeeds also if the referenced entity no longer exists.
*
* @param[in] stat statistics object to free
*/
DDS_EXPORT void dds_delete_statistics (struct dds_statistics *stat);
/** @brief Lookup a specific value by name
*
* This looks up the specified name in the list of keys in `stat` and returns the address
* of the key-value pair if present, a null pointer if not. If `stat` is a null pointer,
* it returns a null pointer.
*
* @param[in] stat statistics object to lookup a name in (or NULL)
* @param[in] name name to look for
*
* @returns The address of the key-value pair inside `stat`, or NULL if `stat` is NULL or
* `name` does not match a key in `stat.
*/
DDS_EXPORT const struct dds_stat_keyvalue *dds_lookup_statistic (const struct dds_statistics *stat, const char *name)
ddsrt_nonnull ((2));
#if defined (__cplusplus)
}
#endif
#endif

View file

@ -0,0 +1,36 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef _DDS_STATISTICS_IMPL_H_
#define _DDS_STATISTICS_IMPL_H_
#include "dds/ddsc/dds_statistics.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct dds_stat_keyvalue_descriptor {
const char *name;
enum dds_stat_kind kind;
};
struct dds_stat_descriptor {
size_t count;
const struct dds_stat_keyvalue_descriptor *kv;
};
struct dds_statistics *dds_alloc_statistics (const struct dds_entity *e, const struct dds_stat_descriptor *d);
#if defined (__cplusplus)
}
#endif
#endif /* _DDS_STATISTICS_IMPL_H_ */

View file

@ -106,6 +106,8 @@ typedef struct dds_entity_deriver {
dds_return_t (*delete) (struct dds_entity *e) ddsrt_nonnull_all;
dds_return_t (*set_qos) (struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all;
dds_return_t (*validate_status) (uint32_t mask);
struct dds_statistics * (*create_statistics) (const struct dds_entity *e);
void (*refresh_statistics) (const struct dds_entity *e, struct dds_statistics *s);
} dds_entity_deriver;
struct dds_waitset;
@ -178,6 +180,8 @@ void dds_entity_deriver_dummy_close (struct dds_entity *e);
dds_return_t dds_entity_deriver_dummy_delete (struct dds_entity *e);
dds_return_t dds_entity_deriver_dummy_set_qos (struct dds_entity *e, const dds_qos_t *qos, bool enabled);
dds_return_t dds_entity_deriver_dummy_validate_status (uint32_t mask);
struct dds_statistics *dds_entity_deriver_dummy_create_statistics (const struct dds_entity *e);
void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s);
inline void dds_entity_deriver_interrupt (struct dds_entity *e) {
(dds_entity_deriver_table[e->m_kind]->interrupt) (e);
@ -200,6 +204,12 @@ inline bool dds_entity_supports_set_qos (struct dds_entity *e) {
inline bool dds_entity_supports_validate_status (struct dds_entity *e) {
return dds_entity_deriver_table[e->m_kind]->validate_status != dds_entity_deriver_dummy_validate_status;
}
inline struct dds_statistics *dds_entity_deriver_create_statistics (const struct dds_entity *e) {
return dds_entity_deriver_table[e->m_kind]->create_statistics (e);
}
inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s) {
dds_entity_deriver_table[e->m_kind]->refresh_statistics (e, s);
}
typedef struct dds_cyclonedds_entity {
struct dds_entity m_entity;

View file

@ -36,7 +36,9 @@ const struct dds_entity_deriver dds_entity_deriver_domain = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_domain_free,
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
static int dds_domain_compare (const void *va, const void *vb)

View file

@ -62,6 +62,12 @@ dds_return_t dds_entity_deriver_dummy_set_qos (struct dds_entity *e, const dds_q
dds_return_t dds_entity_deriver_dummy_validate_status (uint32_t mask) {
(void) mask; return DDS_RETCODE_ILLEGAL_OPERATION;
}
struct dds_statistics *dds_entity_deriver_dummy_create_statistics (const struct dds_entity *e) {
(void) e; return NULL;
}
void dds_entity_deriver_dummy_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s) {
(void) e; (void) s;
}
extern inline void dds_entity_deriver_interrupt (struct dds_entity *e);
extern inline void dds_entity_deriver_close (struct dds_entity *e);
@ -70,6 +76,8 @@ extern inline dds_return_t dds_entity_deriver_set_qos (struct dds_entity *e, con
extern inline dds_return_t dds_entity_deriver_validate_status (struct dds_entity *e, uint32_t mask);
extern inline bool dds_entity_supports_set_qos (struct dds_entity *e);
extern inline bool dds_entity_supports_validate_status (struct dds_entity *e);
extern inline struct dds_statistics *dds_entity_deriver_create_statistics (const struct dds_entity *e);
extern inline void dds_entity_deriver_refresh_statistics (const struct dds_entity *e, struct dds_statistics *s);
static int compare_instance_handle (const void *va, const void *vb)
{

View file

@ -27,7 +27,9 @@ const struct dds_entity_deriver dds_entity_deriver_guardcondition = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_entity_deriver_dummy_delete,
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
dds_entity_t dds_create_guardcondition (dds_entity_t owner)

View file

@ -41,7 +41,9 @@ const struct dds_entity_deriver dds_entity_deriver_cyclonedds = {
.close = dds_close,
.delete = dds_fini,
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
dds_cyclonedds_entity dds_global;

View file

@ -85,7 +85,9 @@ const struct dds_entity_deriver dds_entity_deriver_participant = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_participant_delete,
.set_qos = dds_participant_qos_set,
.validate_status = dds_participant_status_validate
.validate_status = dds_participant_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener)

View file

@ -43,7 +43,9 @@ const struct dds_entity_deriver dds_entity_deriver_publisher = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_entity_deriver_dummy_delete,
.set_qos = dds_publisher_qos_set,
.validate_status = dds_publisher_status_validate
.validate_status = dds_publisher_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
dds_entity_t dds__create_publisher_l (dds_participant *par, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)

View file

@ -34,7 +34,9 @@ const struct dds_entity_deriver dds_entity_deriver_readcondition = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_readcond_delete,
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
dds_readcond *dds_create_readcond (dds_reader *rd, dds_entity_kind_t kind, uint32_t mask, dds_querycondition_filter_fn filter)

View file

@ -28,9 +28,11 @@
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/ddsi_domaingv.h"
#include "dds__builtin.h"
#include "dds__statistics.h"
#include "dds/ddsi/ddsi_sertopic.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/ddsi_security_omg.h"
#include "dds/ddsi/ddsi_statistics.h"
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_reader)
@ -349,12 +351,35 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
}
static const struct dds_stat_keyvalue_descriptor dds_reader_statistics_kv[] = {
{ "discarded_bytes", DDS_STAT_KIND_UINT64 }
};
static const struct dds_stat_descriptor dds_reader_statistics_desc = {
.count = sizeof (dds_reader_statistics_kv) / sizeof (dds_reader_statistics_kv[0]),
.kv = dds_reader_statistics_kv
};
static struct dds_statistics *dds_reader_create_statistics (const struct dds_entity *entity)
{
return dds_alloc_statistics (entity, &dds_reader_statistics_desc);
}
static void dds_reader_refresh_statistics (const struct dds_entity *entity, struct dds_statistics *stat)
{
const struct dds_reader *rd = (const struct dds_reader *) entity;
if (rd->m_rd)
ddsi_get_reader_stats (rd->m_rd, &stat->kv[0].u.u64);
}
const struct dds_entity_deriver dds_entity_deriver_reader = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_reader_close,
.delete = dds_reader_delete,
.set_qos = dds_reader_qos_set,
.validate_status = dds_reader_status_validate
.validate_status = dds_reader_status_validate,
.create_statistics = dds_reader_create_statistics,
.refresh_statistics = dds_reader_refresh_statistics
};
static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener, struct dds_rhc *rhc)

View file

@ -0,0 +1,87 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <string.h>
#include "dds/ddsc/dds_statistics.h"
#include "dds/ddsrt/heap.h"
#include "dds/ddsrt/log.h"
#include "dds__entity.h"
#include "dds__statistics.h"
struct dds_statistics *dds_alloc_statistics (const struct dds_entity *e, const struct dds_stat_descriptor *d)
{
struct dds_statistics *s = ddsrt_malloc (sizeof (*s) + d->count * sizeof (s->kv[0]));
s->entity = e->m_hdllink.hdl;
s->opaque = e->m_iid;
s->time = 0;
s->count = d->count;
memset (s->kv, 0, d->count * sizeof (s->kv[0]));
for (size_t i = 0; i < s->count; i++)
{
s->kv[i].kind = d->kv[i].kind;
s->kv[i].name = d->kv[i].name;
}
return s;
}
struct dds_statistics *dds_create_statistics (dds_entity_t entity)
{
dds_entity *e;
struct dds_statistics *s;
if (dds_entity_pin (entity, &e) != DDS_RETCODE_OK)
return NULL;
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, &e->m_domain->gv);
if ((s = dds_entity_deriver_create_statistics (e)) != NULL)
dds_entity_deriver_refresh_statistics (e, s);
thread_state_asleep (ts1);
dds_entity_unpin (e);
return s;
}
dds_return_t dds_refresh_statistics (struct dds_statistics *stat)
{
dds_return_t rc;
dds_entity *e;
if (stat == NULL)
return DDS_RETCODE_BAD_PARAMETER;
if ((rc = dds_entity_pin (stat->entity, &e)) != DDS_RETCODE_OK)
return rc;
if (stat->opaque != e->m_iid)
{
dds_entity_unpin (e);
return DDS_RETCODE_BAD_PARAMETER;
}
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, &e->m_domain->gv);
stat->time = dds_time ();
dds_entity_deriver_refresh_statistics (e, stat);
thread_state_asleep (ts1);
dds_entity_unpin (e);
return DDS_RETCODE_OK;
}
const struct dds_stat_keyvalue *dds_lookup_statistic (const struct dds_statistics *stat, const char *name)
{
if (stat == NULL)
return NULL;
for (size_t i = 0; i < stat->count; i++)
if (strcmp (stat->kv[i].name, name) == 0)
return &stat->kv[i];
return NULL;
}
void dds_delete_statistics (struct dds_statistics *stat)
{
ddsrt_free (stat);
}

View file

@ -42,7 +42,9 @@ const struct dds_entity_deriver dds_entity_deriver_subscriber = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_entity_deriver_dummy_delete,
.set_qos = dds_subscriber_qos_set,
.validate_status = dds_subscriber_status_validate
.validate_status = dds_subscriber_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
dds_entity_t dds__create_subscriber_l (dds_participant *participant, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)

View file

@ -191,7 +191,9 @@ const struct dds_entity_deriver dds_entity_deriver_topic = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_topic_delete,
.set_qos = dds_topic_qos_set,
.validate_status = dds_topic_status_validate
.validate_status = dds_topic_status_validate,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
/**

View file

@ -136,7 +136,9 @@ const struct dds_entity_deriver dds_entity_deriver_waitset = {
.close = dds_waitset_close,
.delete = dds_waitset_delete,
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status
.validate_status = dds_entity_deriver_dummy_validate_status,
.create_statistics = dds_entity_deriver_dummy_create_statistics,
.refresh_statistics = dds_entity_deriver_dummy_refresh_statistics
};
dds_entity_t dds_create_waitset (dds_entity_t owner)

View file

@ -30,6 +30,8 @@
#include "dds__qos.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds__whc.h"
#include "dds__statistics.h"
#include "dds/ddsi/ddsi_statistics.h"
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_writer)
@ -252,12 +254,38 @@ static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, boo
return DDS_RETCODE_OK;
}
static const struct dds_stat_keyvalue_descriptor dds_writer_statistics_kv[] = {
{ "rexmit_bytes", DDS_STAT_KIND_UINT64 },
{ "throttle_count", DDS_STAT_KIND_UINT32 },
{ "time_throttle", DDS_STAT_KIND_UINT64 },
{ "time_rexmit", DDS_STAT_KIND_UINT64 }
};
static const struct dds_stat_descriptor dds_writer_statistics_desc = {
.count = sizeof (dds_writer_statistics_kv) / sizeof (dds_writer_statistics_kv[0]),
.kv = dds_writer_statistics_kv
};
static struct dds_statistics *dds_writer_create_statistics (const struct dds_entity *entity)
{
return dds_alloc_statistics (entity, &dds_writer_statistics_desc);
}
static void dds_writer_refresh_statistics (const struct dds_entity *entity, struct dds_statistics *stat)
{
const struct dds_writer *wr = (const struct dds_writer *) entity;
if (wr->m_wr)
ddsi_get_writer_stats (wr->m_wr, &stat->kv[0].u.u64, &stat->kv[1].u.u32, &stat->kv[2].u.u64, &stat->kv[3].u.u64);
}
const struct dds_entity_deriver dds_entity_deriver_writer = {
.interrupt = dds_writer_interrupt,
.close = dds_writer_close,
.delete = dds_writer_delete,
.set_qos = dds_writer_qos_set,
.validate_status = dds_writer_status_validate
.validate_status = dds_writer_status_validate,
.create_statistics = dds_writer_create_statistics,
.refresh_statistics = dds_writer_refresh_statistics
};
dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener)

View file

@ -30,6 +30,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_sertopic_default.c
ddsi_sertopic_pserop.c
ddsi_sertopic_plist.c
ddsi_statistics.c
ddsi_iid.c
ddsi_tkmap.c
ddsi_vendor.c
@ -98,6 +99,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
ddsi_serdata_default.h
ddsi_serdata_pserop.h
ddsi_serdata_plist.h
ddsi_statistics.h
ddsi_iid.h
ddsi_tkmap.h
ddsi_vendor.h

View file

@ -0,0 +1,30 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef _DDSI_STATISTICS_H_
#define _DDSI_STATISTICS_H_
#include <stdint.h>
#if defined (__cplusplus)
extern "C" {
#endif
struct reader;
struct writer;
void ddsi_get_writer_stats (struct writer *wr, uint64_t * __restrict rexmit_bytes, uint32_t * __restrict throttle_count, uint64_t * __restrict time_throttled, uint64_t * __restrict time_retransmit);
void ddsi_get_reader_stats (struct reader *rd, uint64_t * __restrict discarded_bytes);
#if defined (__cplusplus)
}
#endif
#endif

View file

@ -312,6 +312,7 @@ struct writer
struct ldur_fhnode *lease_duration; /* fibheap node to keep lease duration for this writer, NULL in case of automatic liveliness with inifite duration */
struct whc *whc; /* WHC tracking history, T-L durability service history + samples by sequence number for retransmit */
uint32_t whc_low, whc_high; /* watermarks for WHC in bytes (counting only unack'd data) */
ddsrt_etime_t t_rexmit_start;
ddsrt_etime_t t_rexmit_end; /* time of last 1->0 transition of "retransmitting" */
ddsrt_etime_t t_whc_high_upd; /* time "whc_high" was last updated for controlled ramp-up of throughput */
uint32_t init_burst_size_limit; /* derived from reader's receive_buffer_size */
@ -329,6 +330,9 @@ struct writer
uint32_t throttle_tracing;
uint32_t rexmit_count; /* cum samples retransmitted (counting events; 1 sample can be counted many times) */
uint32_t rexmit_lost_count; /* cum samples lost but retransmit requested (also counting events) */
uint64_t rexmit_bytes; /* cum bytes queued for retransmit */
uint64_t time_throttled; /* cum time in throttled state */
uint64_t time_retransmit; /* cum time in retransmitting state */
struct xeventq *evq; /* timed event queue to be used by this writer */
struct local_reader_ary rdary; /* LOCAL readers for fast-pathing; if not fast-pathed, fall back to scanning local_readers */
struct lease *lease; /* for liveliness administration (writer can only become inactive when using manual liveliness) */
@ -759,7 +763,6 @@ void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_
void connect_writer_with_proxy_reader_secure(struct writer *wr, struct proxy_reader *prd, ddsrt_mtime_t tnow, int64_t crypto_handle);
void connect_reader_with_proxy_writer_secure(struct reader *rd, struct proxy_writer *pwr, ddsrt_mtime_t tnow, int64_t crypto_handle);
struct ddsi_writer_info;
DDS_EXPORT void ddsi_make_writer_info(struct ddsi_writer_info *wrinfo, const struct entity_common *e, const struct dds_qos *xqos, uint32_t statusinfo);

View file

@ -255,6 +255,9 @@ void nn_dqueue_enqueue_callback (struct nn_dqueue *q, nn_dqueue_callback_t cb, v
int nn_dqueue_is_full (struct nn_dqueue *q);
void nn_dqueue_wait_until_empty_if_full (struct nn_dqueue *q);
void nn_defrag_stats (struct nn_defrag *defrag, uint64_t *discarded_bytes);
void nn_reorder_stats (struct nn_reorder *reorder, uint64_t *discarded_bytes);
#if defined (__cplusplus)
}
#endif

View file

@ -0,0 +1,65 @@
/*
* Copyright(c) 2020 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <string.h>
#include "dds/ddsrt/sync.h"
#include "dds/ddsi/ddsi_domaingv.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/ddsi_statistics.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_radmin.h"
void ddsi_get_writer_stats (struct writer *wr, uint64_t * __restrict rexmit_bytes, uint32_t * __restrict throttle_count, uint64_t * __restrict time_throttled, uint64_t * __restrict time_retransmit)
{
ddsrt_mutex_lock (&wr->e.lock);
*rexmit_bytes = wr->rexmit_bytes;
*throttle_count = wr->throttle_count;
*time_throttled = wr->time_throttled;
*time_retransmit = wr->time_retransmit;
ddsrt_mutex_unlock (&wr->e.lock);
}
void ddsi_get_reader_stats (struct reader *rd, uint64_t * __restrict discarded_bytes)
{
struct rd_pwr_match *m;
ddsi_guid_t pwrguid;
memset (&pwrguid, 0, sizeof (pwrguid));
assert (thread_is_awake ());
*discarded_bytes = 0;
// collect for all matched proxy writers
ddsrt_mutex_lock (&rd->e.lock);
while ((m = ddsrt_avl_lookup_succ (&rd_writers_treedef, &rd->writers, &pwrguid)) != NULL)
{
struct proxy_writer *pwr;
pwrguid = m->pwr_guid;
ddsrt_mutex_unlock (&rd->e.lock);
if ((pwr = entidx_lookup_proxy_writer_guid (rd->e.gv->entity_index, &pwrguid)) != NULL)
{
uint64_t disc_frags, disc_samples;
ddsrt_mutex_lock (&pwr->e.lock);
struct pwr_rd_match *x = ddsrt_avl_lookup (&pwr_readers_treedef, &pwr->readers, &rd->e.guid);
if (x != NULL)
{
nn_defrag_stats (pwr->defrag, &disc_frags);
if (x->in_sync != PRMSS_OUT_OF_SYNC && !x->filtered)
nn_reorder_stats (pwr->reorder, &disc_samples);
else
nn_reorder_stats (x->u.not_in_sync.reorder, &disc_samples);
*discarded_bytes += disc_frags + disc_samples;
}
ddsrt_mutex_unlock (&pwr->e.lock);
}
ddsrt_mutex_lock (&rd->e.lock);
}
ddsrt_mutex_unlock (&rd->e.lock);
}

View file

@ -3494,6 +3494,7 @@ void writer_set_retransmitting (struct writer *wr)
{
assert (!wr->retransmitting);
wr->retransmitting = 1;
wr->t_rexmit_start = ddsrt_time_elapsed();
if (wr->e.gv->config.whc_adaptive && wr->whc_high > wr->whc_low)
{
uint32_t m = 8 * wr->whc_high / 10;
@ -3505,6 +3506,7 @@ void writer_clear_retransmitting (struct writer *wr)
{
wr->retransmitting = 0;
wr->t_whc_high_upd = wr->t_rexmit_end = ddsrt_time_elapsed();
wr->time_retransmit += (uint64_t) (wr->t_rexmit_end.v - wr->t_rexmit_start.v);
ddsrt_cond_broadcast (&wr->throttle_cond);
}
@ -3624,6 +3626,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
wr->throttling = 0;
wr->retransmitting = 0;
wr->t_rexmit_end.v = 0;
wr->t_rexmit_start.v = 0;
wr->t_whc_high_upd.v = 0;
wr->num_readers = 0;
wr->num_reliable_readers = 0;
@ -3633,6 +3636,9 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
wr->throttle_tracing = 0;
wr->rexmit_count = 0;
wr->rexmit_lost_count = 0;
wr->rexmit_bytes = 0;
wr->time_throttled = 0;
wr->time_retransmit = 0;
wr->force_md5_keyhash = 0;
wr->alive = 1;
wr->alive_vclock = 0;

View file

@ -862,6 +862,7 @@ struct nn_defrag {
uint32_t n_samples;
uint32_t max_samples;
enum nn_defrag_drop_mode drop_mode;
uint64_t discarded_bytes;
const struct ddsrt_log_cfg *logcfg;
bool trace;
};
@ -897,11 +898,17 @@ struct nn_defrag *nn_defrag_new (const struct ddsrt_log_cfg *logcfg, enum nn_def
d->max_samples = max_samples;
d->n_samples = 0;
d->max_sample = NULL;
d->discarded_bytes = 0;
d->logcfg = logcfg;
d->trace = (logcfg->c.mask & DDS_LC_RADMIN) != 0;
return d;
}
void nn_defrag_stats (struct nn_defrag *defrag, uint64_t *discarded_bytes)
{
*discarded_bytes = defrag->discarded_bytes;
}
void nn_fragchain_adjust_refcount (struct nn_rdata *frag, int adjust)
{
RDATATRACE (frag, "fragchain_adjust_refcount(%p, %d)\n", (void *) frag, adjust);
@ -1156,7 +1163,7 @@ static void rsample_convert_defrag_to_reorder (struct nn_rsample *sample)
sample->u.reorder.n_samples = 1;
}
static struct nn_rsample *defrag_add_fragment (const struct nn_defrag *defrag, struct nn_rsample *sample, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo)
static struct nn_rsample *defrag_add_fragment (struct nn_defrag *defrag, struct nn_rsample *sample, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo)
{
struct nn_rsample_defrag *dfsample = &sample->u.defrag;
struct nn_defrag_iv *predeq, *succ;
@ -1202,6 +1209,7 @@ static struct nn_rsample *defrag_add_fragment (const struct nn_defrag *defrag, s
/* new is contained in predeq, discard new; rdata did not cause
completion of a sample */
TRACE (defrag, " new contained in predeq\n");
defrag->discarded_bytes += maxp1 - min;
return NULL;
}
else if (min <= predeq->maxp1)
@ -1654,6 +1662,7 @@ struct nn_reorder {
enum nn_reorder_mode mode;
uint32_t max_samples;
uint32_t n_samples;
uint64_t discarded_bytes;
const struct ddsrt_log_cfg *logcfg;
bool late_ack_mode;
bool trace;
@ -1673,12 +1682,18 @@ struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_r
r->mode = mode;
r->max_samples = max_samples;
r->n_samples = 0;
r->discarded_bytes = 0;
r->late_ack_mode = late_ack_mode;
r->logcfg = logcfg;
r->trace = (logcfg->c.mask & DDS_LC_RADMIN) != 0;
return r;
}
void nn_reorder_stats (struct nn_reorder *reorder, uint64_t *discarded_bytes)
{
*discarded_bytes = reorder->discarded_bytes;
}
void nn_fragchain_unref (struct nn_rdata *frag)
{
struct nn_rdata *frag1;
@ -1844,6 +1859,7 @@ static void delete_last_sample (struct nn_reorder *reorder)
/* Last sample is in an interval of its own - delete it, and
recalc max_sampleiv. */
TRACE (reorder, " delete_last_sample: in singleton interval\n");
reorder->discarded_bytes += last->sc.first->sampleinfo->size;
fragchain = last->sc.first->fragchain;
ddsrt_avl_delete (&reorder_sampleivtree_treedef, &reorder->sampleivtree, reorder->max_sampleiv);
reorder->max_sampleiv = ddsrt_avl_find_max (&reorder_sampleivtree_treedef, &reorder->sampleivtree);
@ -1867,6 +1883,7 @@ static void delete_last_sample (struct nn_reorder *reorder)
pe = e;
e = e->next;
} while (e != last->sc.last);
reorder->discarded_bytes += e->sampleinfo->size;
fragchain = e->fragchain;
pe->next = NULL;
assert (pe->sampleinfo->seq + 1 < last->maxp1);
@ -1928,6 +1945,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
if (delivery_queue_full_p)
{
TRACE (reorder, " discarding deliverable sample: delivery queue is full\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}
@ -1960,6 +1978,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
/* we've moved beyond this one: discard it; no need to adjust
n_samples */
TRACE (reorder, " discard: too old\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_TOO_OLD; /* don't want refcount increment */
}
else if (ddsrt_avl_is_empty (&reorder->sampleivtree))
@ -1972,6 +1991,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
if (reorder->max_samples == 0)
{
TRACE (reorder, " NOT - max_samples hit\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}
else
@ -1988,6 +2008,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
{
/* growing last inteval will not be accepted when this flag is set */
TRACE (reorder, " discarding sample: only accepting delayed samples due to backlog in delivery queue\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}
@ -2001,6 +2022,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
else
{
TRACE (reorder, " discarding sample: max_samples reached and sample at end\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}
}
@ -2010,6 +2032,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
{
/* new interval at the end will not be accepted when this flag is set */
TRACE (reorder, " discarding sample: only accepting delayed samples due to backlog in delivery queue\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}
if (reorder->n_samples < reorder->max_samples)
@ -2022,6 +2045,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
else
{
TRACE (reorder, " discarding sample: max_samples reached and sample at end\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}
}
@ -2040,6 +2064,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
if (reorder->late_ack_mode && delivery_queue_full_p)
{
TRACE (reorder, " discarding sample: delivery queue full\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}
@ -2053,6 +2078,7 @@ nn_reorder_result_t nn_reorder_rsample (struct nn_rsample_chain *sc, struct nn_r
{
/* contained in predeq */
TRACE (reorder, " discard: contained in predeq\n");
reorder->discarded_bytes += s->sc.first->sampleinfo->size;
return NN_REORDER_REJECT;
}

View file

@ -985,6 +985,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
uint32_t sent = ddsi_serdata_size (sample.serdata);
if (sent > wr->e.gv->config.fragment_size)
sent = wr->e.gv->config.fragment_size;
wr->rexmit_bytes += sent;
limit = (sent > limit) ? 0 : limit - sent;
}
}
@ -1015,6 +1016,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
uint32_t sent = ddsi_serdata_size (sample.serdata);
if (sent > wr->e.gv->config.fragment_size)
sent = wr->e.gv->config.fragment_size;
wr->rexmit_bytes += sent;
limit = (sent > limit) ? 0 : limit - sent;
}
}
@ -1612,6 +1614,7 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
{
sent = true;
nfrags_lim--;
wr->rexmit_bytes += wr->e.gv->config.fragment_size;
}
}
}

View file

@ -1035,8 +1035,9 @@ static dds_return_t throttle_writer (struct thread_state1 * const ts1, struct nn
writer. */
struct ddsi_domaingv const * const gv = wr->e.gv;
dds_return_t result = DDS_RETCODE_OK;
ddsrt_mtime_t tnow = ddsrt_time_monotonic ();
const ddsrt_mtime_t abstimeout = ddsrt_mtime_add_duration (tnow, wr->xqos->reliability.max_blocking_time);
const ddsrt_mtime_t throttle_start = ddsrt_time_monotonic ();
const ddsrt_mtime_t abstimeout = ddsrt_mtime_add_duration (throttle_start, wr->xqos->reliability.max_blocking_time);
ddsrt_mtime_t tnow = throttle_start;
struct whc_state whcst;
whc_get_state (wr->whc, &whcst);
@ -1090,6 +1091,7 @@ static dds_return_t throttle_writer (struct thread_state1 * const ts1, struct nn
}
wr->throttling--;
wr->time_throttled += (uint64_t) (ddsrt_time_monotonic().v - throttle_start.v);
if (wr->state != WRST_OPERATIONAL)
{
/* gc_delete_writer may be waiting */

View file

@ -156,6 +156,8 @@ struct xeventq {
ddsrt_cond_t cond;
ddsi_tran_conn_t tev_conn;
uint32_t auxiliary_bandwidth_limit;
size_t cum_rexmit_bytes;
};
static uint32_t xevent_thread (struct xeventq *xevq);
@ -185,6 +187,7 @@ static void update_rexmit_counts (struct xeventq *evq, struct xevent_nt *ev)
assert (evq->queued_rexmit_msgs > 0);
evq->queued_rexmit_bytes -= ev->u.msg_rexmit.queued_rexmit_bytes;
evq->queued_rexmit_msgs--;
evq->cum_rexmit_bytes += ev->u.msg_rexmit.queued_rexmit_bytes;
}
#if 0
@ -526,6 +529,8 @@ struct xeventq * xeventq_new
evq->gv = conn->m_base.gv;
ddsrt_mutex_init (&evq->lock);
ddsrt_cond_init (&evq->cond);
evq->cum_rexmit_bytes = 0;
return evq;
}

View file

@ -25,6 +25,7 @@
#endif
#include "dds/dds.h"
#include "dds/ddsc/dds_statistics.h"
#include "ddsperf_types.h"
#include "dds/ddsrt/process.h"
@ -96,6 +97,9 @@ static enum submode pingpongmode = SM_LISTENER;
/* Whether to show "sub" stats every second even when nothing happens */
static bool substat_every_second = false;
/* Whether to show extended statistics (currently just rexmit info) */
static bool extended_stats = false;
/* Size of the sequence in KeyedSeq type in bytes */
static uint32_t baggagesize = 0;
@ -1367,7 +1371,17 @@ static int cmp_uint64 (const void *va, const void *vb)
return (*a == *b) ? 0 : (*a < *b) ? -1 : 1;
}
static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state)
struct dds_stats {
struct dds_statistics *pubstat;
const struct dds_stat_keyvalue *rexmit_bytes;
const struct dds_stat_keyvalue *time_throttle;
const struct dds_stat_keyvalue *time_rexmit;
const struct dds_stat_keyvalue *throttle_count;
struct dds_statistics *substat;
const struct dds_stat_keyvalue *discarded_bytes;
};
static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, struct record_cputime_state *cputime_state, struct record_netload_state *netload_state, struct dds_stats *stats)
{
char prefix[128];
const double ts = (double) (tnow - tref) / 1e9;
@ -1501,6 +1515,14 @@ static bool print_stats (dds_time_t tref, dds_time_t tnow, dds_time_t tprev, str
if (output)
record_netload (netload_state, prefix, tnow);
if (extended_stats && output && stats)
{
(void) dds_refresh_statistics (stats->substat);
(void) dds_refresh_statistics (stats->pubstat);
printf ("%s discarded %"PRIu64" rexmit %"PRIu64" Trexmit %"PRIu64" Tthrottle %"PRIu64" Nthrottle %"PRIu32"\n", prefix, stats->discarded_bytes->u.u64, stats->rexmit_bytes->u.u64, stats->time_rexmit->u.u64, stats->time_throttle->u.u64, stats->throttle_count->u.u32);
}
fflush (stdout);
return output;
}
@ -1554,7 +1576,7 @@ static void sigxfsz_handler (int sig __attribute__ ((unused)))
if (write (2, msg, sizeof (msg) - 1) < 0) {
/* may not ignore return value according to Linux/gcc */
}
print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL);
print_stats (0, tnow, tnow - DDS_SECS (1), NULL, NULL, NULL);
kill (getpid (), 9);
}
}
@ -1611,6 +1633,7 @@ OPTIONS:\n\
anything.)\n\
-1 print \"sub\" stats every second, even when there is\n\
data\n\
-X output extended statistics\n\
-i ID use domain ID instead of the default domain\n\
\n\
MODE... is zero or more of:\n\
@ -1920,7 +1943,7 @@ int main (int argc, char *argv[])
argv0 = argv[0];
while ((opt = getopt (argc, argv, "1cd:D:i:n:k:uLK:T:Q:R:h")) != EOF)
while ((opt = getopt (argc, argv, "1cd:D:i:n:k:uLK:T:Q:R:Xh")) != EOF)
{
int pos;
switch (opt)
@ -1976,6 +1999,7 @@ int main (int argc, char *argv[])
}
break;
}
case 'X': extended_stats = true; break;
case 'R': {
tref = 0;
if (sscanf (optarg, "%"SCNd64"%n", &tref, &pos) != 1 || optarg[pos] != 0)
@ -2272,6 +2296,35 @@ int main (int argc, char *argv[])
struct record_cputime_state *cputime_state;
cputime_state = record_cputime_new (wr_stat);
struct dds_stats stats;
const struct dds_stat_keyvalue dummy_u64 = { .name = "", .kind = DDS_STAT_KIND_UINT64, .u.u64 = 0 };
const struct dds_stat_keyvalue dummy_u32 = { .name = "", .kind = DDS_STAT_KIND_UINT32, .u.u32 = 0 };
stats.substat = dds_create_statistics (rd_data);
stats.discarded_bytes = dds_lookup_statistic (stats.substat, "discarded_bytes");
stats.pubstat = dds_create_statistics (wr_data);
stats.rexmit_bytes = dds_lookup_statistic (stats.pubstat, "rexmit_bytes");
stats.time_rexmit = dds_lookup_statistic (stats.pubstat, "time_rexmit");
stats.time_throttle = dds_lookup_statistic (stats.pubstat, "time_throttle");
stats.throttle_count = dds_lookup_statistic (stats.pubstat, "throttle_count");
if (stats.discarded_bytes == NULL)
stats.discarded_bytes = &dummy_u64;
if (stats.rexmit_bytes == NULL)
stats.rexmit_bytes = &dummy_u64;
if (stats.time_rexmit == NULL)
stats.time_rexmit = &dummy_u64;
if (stats.time_throttle == NULL)
stats.time_throttle = &dummy_u64;
if (stats.throttle_count == NULL)
stats.throttle_count = &dummy_u32;
if (stats.discarded_bytes->kind != DDS_STAT_KIND_UINT64 ||
stats.rexmit_bytes->kind != DDS_STAT_KIND_UINT64 ||
stats.time_rexmit->kind != DDS_STAT_KIND_UINT64 ||
stats.time_throttle->kind != DDS_STAT_KIND_UINT64 ||
stats.throttle_count->kind != DDS_STAT_KIND_UINT32)
{
abort ();
}
/* I hate Unix signals in multi-threaded processes ... */
#ifdef _WIN32
signal (SIGINT, signal_handler);
@ -2364,7 +2417,7 @@ int main (int argc, char *argv[])
if (tnext <= tnow)
{
bool output;
output = print_stats (tref, tnow, tlast, cputime_state, netload_state);
output = print_stats (tref, tnow, tlast, cputime_state, netload_state, &stats);
tlast = tnow;
if (tnow > tnext + DDS_MSECS (500))
tnext = tnow + DDS_SECS (1);
@ -2386,6 +2439,9 @@ int main (int argc, char *argv[])
maybe_send_new_ping (tnow, &tnextping);
}
}
dds_delete_statistics (stats.pubstat);
dds_delete_statistics (stats.substat);
record_netload_free (netload_state);
record_cputime_free (cputime_state);