diff --git a/src/core/ddsc/src/dds_read.c b/src/core/ddsc/src/dds_read.c index 114e6ff..37e35ec 100644 --- a/src/core/ddsc/src/dds_read.c +++ b/src/core/ddsc/src/dds_read.c @@ -686,13 +686,12 @@ dds_return_loan( dds_readcond *cond; dds_return_t ret = DDS_RETCODE_OK; - if (!buf ) { + if (!buf) { DDS_ERROR("Argument buf is NULL\n"); ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); goto fail; } - if(*buf == NULL && bufsz > 0){ - DDS_ERROR("Argument buf is NULL\n"); + if (*buf == NULL && bufsz > 0) { ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); goto fail; } diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 7e615a1..9c2d51c 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -13,6 +13,7 @@ set(CMAKE_INSTALL_TOOLSDIR "${CMAKE_INSTALL_DATADIR}/${CMAKE_PROJECT_NAME}/tools add_subdirectory(pubsub) add_subdirectory(config) add_subdirectory(ddsls) +add_subdirectory(ddsperf) # VxWorks build machines use OpenJDK 8, which lack jfxrt.jar. Do not build launcher on that platform. # diff --git a/src/tools/ddsperf/CMakeLists.txt b/src/tools/ddsperf/CMakeLists.txt new file mode 100644 index 0000000..c1c12e0 --- /dev/null +++ b/src/tools/ddsperf/CMakeLists.txt @@ -0,0 +1,23 @@ +# +# Copyright(c) 2006 to 2018 ADLINK Technology Limited and others +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License v. 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License +# v. 1.0 which is available at +# http://www.eclipse.org/org/documents/edl-v10.php. +# +# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause +# +idlc_generate(ddsperf_types ddsperf_types.idl) +add_executable(ddsperf ddsperf.c) +target_link_libraries(ddsperf ddsperf_types ddsc) +if(WIN32) + target_compile_definitions(ddsperf PRIVATE _CRT_SECURE_NO_WARNINGS) +endif() + +install( + TARGETS ddsperf + DESTINATION "${CMAKE_INSTALL_BINDIR}" + COMPONENT dev +) diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c new file mode 100644 index 0000000..51874a4 --- /dev/null +++ b/src/tools/ddsperf/ddsperf.c @@ -0,0 +1,2036 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#define _ISOC99_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +#include +#endif +#include + +#include "dds/dds.h" +#include "ddsperf_types.h" + +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/sync.h" +#include "dds/ddsrt/sockets.h" +#include "dds/ddsrt/threads.h" +#include "dds/ddsrt/random.h" +#include "dds/ddsrt/avl.h" +#include "dds/ddsrt/fibheap.h" + +#define UDATA_MAGIC "DDSPerf:" +#define UDATA_MAGIC_SIZE (sizeof (UDATA_MAGIC) - 1) + +enum topicsel { + KS, /* KeyedSeq type: seq#, key, sequence-of-octet */ + K32, /* Keyed32 type: seq#, key, array-of-24-octet (sizeof = 32) */ + K256, /* Keyed256 type: seq#, key, array-of-248-octet (sizeof = 256) */ + OU /* OneULong type: seq# */ +}; + +enum submode { + SM_NONE, /* no subscriber at all */ + SM_WAITSET, /* subscriber using a waitset */ + SM_POLLING, /* ... using polling, sleeping for 1ms if no data */ + SM_LISTENER /* ... using a DATA_AVAILABLE listener */ +}; + +static const char *argv0; +static volatile sig_atomic_t termflag = 0; + +/* Domain participant, guard condition for termination, domain id */ +static dds_entity_t dp; +static dds_instance_handle_t dp_handle; +static dds_entity_t termcond; +static dds_domainid_t did = DDS_DOMAIN_DEFAULT; + +/* Readers for built-in topics to get discovery information */ +static dds_entity_t rd_participants, rd_subscriptions, rd_publications; + +/* Topics, readers, writers (except for pong writers: there are + many of those) */ +static dds_entity_t tp_data, tp_ping, tp_pong; +static char tpname_data[32], tpname_ping[32], tpname_pong[32]; +static dds_entity_t sub, pub, wr_data, wr_ping, rd_data, rd_ping, rd_pong; + +/* Number of different key values to use (must be 1 for OU type) */ +static unsigned nkeyvals = 1; + +/* Topic type to use */ +static enum topicsel topicsel = OU; + +/* Data and ping/pong subscriber triggering modes */ +static enum submode submode = SM_LISTENER; +static enum submode pingpongmode = SM_LISTENER; + +/* Size of the sequence in KeyedSeq type in bytes */ +static unsigned baggagesize = 0; + +/* Whether or not to register instances prior to writing */ +static bool register_instances = true; + +/* Maximum run time in seconds */ +static double dur = HUGE_VAL; + +/* Minimum number of peers (if not met, exit status is 1) */ +static uint32_t minmatch = 0; + +/* Maximum time it may take to discover all MINMATCH peers */ +static double maxwait = HUGE_VAL; + +/* Number of participants for which all expected endpoints + have been matched (this includes the local participant + if ignorelocal is DDS_IGNORELOCAL_NONE) [protected by + disc_lock] */ +static uint32_t matchcount = 0; + +/* An error is always signalled if not all endpoints of a + participant have been discovered within a set amount of + time (5s, currently) [protected by disc_lock] */ +static uint32_t matchtimeout = 0; + +/* Data is published in bursts of this many samples */ +static unsigned burstsize = 1; + +/* Whether to use reliable or best-effort readers/writers */ +static bool reliable = true; + +/* History depth for throughput data reader and writer; 0 is + KEEP_ALL, otherwise it is KEEP_LAST histdepth. Ping/pong + always uses KEEP_LAST 1. */ +static int32_t histdepth = 0; + +/* Publishing rate in Hz, HUGE_VAL means as fast as possible, + 0 means no throughput data is published at all */ +static double rate; + +/* Fraction of throughput data samples that double as a ping + message */ +static uint32_t ping_frac = 0; + +/* Setting for "ignore local" reader/writer QoS: whether or + not to ignore readers and writers in the same particiapnt + that would otherwise match */ +static dds_ignorelocal_kind_t ignorelocal = DDS_IGNORELOCAL_PARTICIPANT; + +/* Pinging interval for roundtrip testing, 0 means as fast as + possible, DDS_INFINITY means never */ +static dds_duration_t ping_intv; + +static ddsrt_mutex_t disc_lock; + +/* Publisher statistics and lock protecting it */ +struct hist { + unsigned nbins; + uint64_t binwidth; + uint64_t bin0; /* bins are [bin0,bin0+binwidth),[bin0+binwidth,bin0+2*binwidth) */ + uint64_t binN; /* bin0 + nbins*binwidth */ + uint64_t min, max; /* min and max observed since last reset */ + uint64_t under, over; /* < bin0, >= binN */ + uint64_t bins[]; +}; + +static ddsrt_mutex_t pubstat_lock; +static struct hist *pubstat_hist; + +/* Subscriber statistics for tracking number of samples received + and lost per source */ +struct eseq_stat { + /* totals */ + uint64_t nrecv; + uint64_t nlost; + uint64_t nrecv_bytes; + + /* stats printer state */ + uint64_t nrecv_ref; + uint64_t nlost_ref; + uint64_t nrecv_bytes_ref; +}; + +struct eseq_admin { + ddsrt_mutex_t lock; + unsigned nkeys; + unsigned nph; + dds_instance_handle_t *ph; + struct eseq_stat *stats; + uint32_t **eseq; +}; + +static struct eseq_admin eseq_admin; + +/* Entry for mapping ping/data publication handle to pong writer */ +struct subthread_arg_pongwr { + dds_instance_handle_t pubhandle; + dds_instance_handle_t pphandle; + dds_entity_t wr_pong; +}; + +/* Entry for mapping pong publication handle to latency statistics */ +struct subthread_arg_pongstat { + dds_instance_handle_t pubhandle; + dds_instance_handle_t pphandle; + uint64_t min, max; + uint64_t sum; + uint32_t cnt; +}; + +/* Pong statistics is stored in n array of npongstat entries + [protected by pongstat_lock] */ +static ddsrt_mutex_t pongstat_lock; +static uint32_t npongstat; +static struct subthread_arg_pongstat *pongstat; + +/* All topics have a sequence number, this is the one of the + latest ping sent and the number of pongs received for that + sequence number. Also the time at which it was sent for + generating new ping messages in the case of loss of topology + changes, and a timestamp after which a warning is printed + when a new ping is published. [All protected by + pongwr_lock] */ +static dds_time_t cur_ping_time; +static dds_time_t twarn_ping_timeout; +static uint32_t cur_ping_seq; +static uint32_t n_pong_seen; + +/* Number of pongs expected for each ping [protected by + pongwr_lock] */ +static uint32_t n_pong_expected; + +/* Table mapping data and ping publication handles to writers + of pongs (one per participant in a unique partition so that + a participant only receives responses to its own pings) is + a simply array of npongwr entries [protected by pongwr_lock] */ +static ddsrt_mutex_t pongwr_lock; +static uint32_t npongwr; +static struct subthread_arg_pongwr *pongwr; + +/* Each subscriber thread gets its own not-quite-pre-allocated + set of samples (it does use a loan, but that loan gets reused) */ +struct subthread_arg { + dds_entity_t rd; + uint32_t max_samples; + dds_sample_info_t *iseq; + void **mseq; +}; + +/* Type used for converting GUIDs to strings, used for generating + the per-participant partition names */ +struct guidstr { + char str[36]; +}; + +/* Endpoints that can be matched; all endpoints except for a data + subscriber always exist; the data subscriber is only created if + requested */ +#define MM_RD_DATA 1u +#define MM_RD_PING 2u +#define MM_RD_PONG 4u +#define MM_WR_DATA 8u +#define MM_WR_PING 16u +#define MM_WR_PONG 32u +#define MM_ALL (2 * MM_WR_PONG - 1) + +struct ppant { + ddsrt_avl_node_t avlnode; /* embedded AVL node for handle index */ + ddsrt_fibheap_node_t fhnode; /* prio queue for timeout handling */ + dds_instance_handle_t handle; /* participant instance handle */ + dds_builtintopic_guid_t guid; /* participant GUID */ + char *hostname; /* hostname is taken from user_data QoS */ + uint32_t pid; /* pid is also taken from user_data QoS */ + dds_time_t tdisc; /* time at which it was discovered */ + dds_time_t tdeadline; /* by what time must unmatched be 0 */ + uint32_t unmatched; /* expected but not yet detected endpoints */ +}; + +static int cmp_instance_handle (const void *va, const void *vb) +{ + const dds_instance_handle_t *a = va; + const dds_instance_handle_t *b = vb; + return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; +} + +/* AVL tree of ppant structures indexed on handle using cmp_instance_handle */ +static ddsrt_avl_treedef_t ppants_td = DDSRT_AVL_TREEDEF_INITIALIZER (offsetof (struct ppant, avlnode), offsetof (struct ppant, handle), cmp_instance_handle, 0); +static ddsrt_avl_tree_t ppants; + +/* Priority queue (Fibonacci heap) of ppant structures with tdeadline as key */ +static int cmp_ppant_tdeadline (const void *va, const void *vb) +{ + const struct ppant *a = va; + const struct ppant *b = vb; + return (a->tdeadline == b->tdeadline) ? 0 : (a->tdeadline < b->tdeadline) ? -1 : 1; +} + +static ddsrt_fibheap_def_t ppants_to_match_fhd = DDSRT_FIBHEAPDEF_INITIALIZER (offsetof (struct ppant, fhnode), cmp_ppant_tdeadline); +static ddsrt_fibheap_t ppants_to_match; + +/* Printing error messages: error2 is for DDS errors, error3 is for usage errors */ +static void verrorx (int exitcode, const char *fmt, va_list ap) ddsrt_attribute_noreturn; +static void error2 (const char *fmt, ...) ddsrt_attribute_format ((printf, 1, 2)) ddsrt_attribute_noreturn; +static void error3 (const char *fmt, ...) ddsrt_attribute_format ((printf, 1, 2)) ddsrt_attribute_noreturn; + +static void publication_matched_listener (dds_entity_t wr, const dds_publication_matched_status_t status, void *arg); + +struct seq_keyval { + uint32_t seq; + int32_t keyval; +}; + +union data { + uint32_t seq; + struct seq_keyval seq_keyval; + KeyedSeq ks; + Keyed32 k32; + Keyed256 k256; + OneULong ou; +}; + +static void verrorx (int exitcode, const char *fmt, va_list ap) +{ + vprintf (fmt, ap); + fflush (stdout); + exit (exitcode); +} + +static void error2 (const char *fmt, ...) +{ + va_list ap; + va_start (ap, fmt); + verrorx (2, fmt, ap); +} + +static void error3 (const char *fmt, ...) +{ + va_list ap; + va_start (ap, fmt); + verrorx (3, fmt, ap); +} + +static char *make_guidstr (struct guidstr *buf, const dds_builtintopic_guid_t *guid) +{ + snprintf (buf->str, sizeof (buf->str), "%02x%02x%02x%02x_%02x%02x%02x%02x_%02x%02x%02x%02x_%02x%02x%02x%02x", + guid->v[0], guid->v[1], guid->v[2], guid->v[3], + guid->v[4], guid->v[5], guid->v[6], guid->v[7], + guid->v[8], guid->v[9], guid->v[10], guid->v[11], + guid->v[12], guid->v[13], guid->v[14], guid->v[15]); + return buf->str; +} + +static void hist_reset_minmax (struct hist *h) +{ + h->min = UINT64_MAX; + h->max = 0; +} + +static void hist_reset (struct hist *h) +{ + hist_reset_minmax (h); + h->under = 0; + h->over = 0; + memset (h->bins, 0, h->nbins * sizeof (*h->bins)); +} + +static struct hist *hist_new (unsigned nbins, uint64_t binwidth, uint64_t bin0) +{ + struct hist *h = malloc (sizeof (*h) + nbins * sizeof (*h->bins)); + h->nbins = nbins; + h->binwidth = binwidth; + h->bin0 = bin0; + h->binN = h->bin0 + h->nbins * h->binwidth; + hist_reset (h); + return h; +} + +static void hist_free (struct hist *h) +{ + free (h); +} + +static void hist_record (struct hist *h, uint64_t x, unsigned weight) +{ + if (x < h->min) + h->min = x; + if (x > h->max) + h->max = x; + if (x < h->bin0) + h->under += weight; + else if (x >= h->binN) + h->over += weight; + else + h->bins[(x - h->bin0) / h->binwidth] += weight; +} + +static void xsnprintf(char *buf, size_t bufsz, size_t *p, const char *fmt, ...) +{ + if (*p < bufsz) + { + int n; + va_list ap; + va_start (ap, fmt); + n = vsnprintf (buf + *p, bufsz - *p, fmt, ap); + va_end (ap); + *p += (size_t) n; + } +} + +static void hist_print (const char *prefix, struct hist *h, dds_time_t dt, int reset) +{ + const size_t l_size = sizeof(char) * h->nbins + 200 + strlen (prefix); + const size_t hist_size = sizeof(char) * h->nbins + 1; + char *l = (char *) malloc(l_size); + char *hist = (char *) malloc(hist_size); + double dt_s = (double)dt / 1e9, avg; + uint64_t peak = 0, cnt = h->under + h->over; + size_t p = 0; + + xsnprintf (l, l_size, &p, "%s", prefix); + + hist[h->nbins] = 0; + for (unsigned i = 0; i < h->nbins; i++) + { + cnt += h->bins[i]; + if (h->bins[i] > peak) + peak = h->bins[i]; + } + + const uint64_t p1 = peak / 100; + const uint64_t p10 = peak / 10; + const uint64_t p20 = 1 * peak / 5; + const uint64_t p40 = 2 * peak / 5; + const uint64_t p60 = 3 * peak / 5; + const uint64_t p80 = 4 * peak / 5; + for (unsigned i = 0; i < h->nbins; i++) + { + if (h->bins[i] == 0) hist[i] = ' '; + else if (h->bins[i] <= p1) hist[i] = '.'; + else if (h->bins[i] <= p10) hist[i] = '_'; + else if (h->bins[i] <= p20) hist[i] = '-'; + else if (h->bins[i] <= p40) hist[i] = '='; + else if (h->bins[i] <= p60) hist[i] = 'x'; + else if (h->bins[i] <= p80) hist[i] = 'X'; + else hist[i] = '@'; + } + + avg = (double) cnt / dt_s; + if (avg < 999.5) + xsnprintf (l, l_size, &p, "%5.3g", avg); + else if (avg < 1e6) + xsnprintf (l, l_size, &p, "%4.3gk", avg / 1e3); + else + xsnprintf (l, l_size, &p, "%4.3gM", avg / 1e6); + xsnprintf (l, l_size, &p, "/s "); + + if (h->min == UINT64_MAX) + xsnprintf (l, l_size, &p, " inf "); + else if (h->min < 1000) + xsnprintf (l, l_size, &p, "%3"PRIu64"n ", h->min); + else if (h->min + 500 < 1000000) + xsnprintf (l, l_size, &p, "%3"PRIu64"u ", (h->min + 500) / 1000); + else if (h->min + 500000 < 1000000000) + xsnprintf (l, l_size, &p, "%3"PRIu64"m ", (h->min + 500000) / 1000000); + else + xsnprintf (l, l_size, &p, "%3"PRIu64"s ", (h->min + 500000000) / 1000000000); + + if (h->bin0 > 0) { + int pct = (cnt == 0) ? 0 : 100 * (int) ((h->under + cnt/2) / cnt); + xsnprintf (l, l_size, &p, "%3d%% ", pct); + } + + { + int pct = (cnt == 0) ? 0 : 100 * (int) ((h->over + cnt/2) / cnt); + xsnprintf (l, l_size, &p, "|%s| %3d%%", hist, pct); + } + + if (h->max < 1000) + xsnprintf (l, l_size, &p, " %3"PRIu64"n", h->max); + else if (h->max + 500 < 1000000) + xsnprintf (l, l_size, &p, " %3"PRIu64"u", (h->max + 500) / 1000); + else if (h->max + 500000 < 1000000000) + xsnprintf (l, l_size, &p, " %3"PRIu64"m", (h->max + 500000) / 1000000); + else + xsnprintf (l, l_size, &p, " %3"PRIu64"s", (h->max + 500000000) / 1000000000); + + (void) p; + puts (l); + fflush (stdout); + free (l); + free (hist); + if (reset) + hist_reset (h); +} + +static void *make_baggage (dds_sequence_t *b, unsigned cnt) +{ + b->_maximum = b->_length = cnt; + if (cnt == 0) + b->_buffer = NULL; + else + { + b->_buffer = malloc (b->_maximum); + memset(b->_buffer, 0xee, b->_maximum); + } + return b->_buffer; +} + +static void *init_sample (union data *data, uint32_t seq) +{ + void *baggage = NULL; + switch (topicsel) + { + case KS: + data->ks.seq = seq; + data->ks.keyval = 0; + baggage = make_baggage (&data->ks.baggage, baggagesize); + break; + case K32: + data->k32.seq = seq; + data->k32.keyval = 0; + memset (data->k32.baggage, 0xee, sizeof (data->k32.baggage)); + break; + case K256: + data->k256.seq = seq; + data->k256.keyval = 0; + memset (data->k256.baggage, 0xee, sizeof (data->k256.baggage)); + break; + case OU: + data->ou.seq = seq; + break; + } + return baggage; +} + +static uint32_t pubthread (void *varg) +{ + int result; + dds_instance_handle_t *ihs; + dds_time_t ntot = 0, tfirst, tfirst0; + union data data; + uint64_t timeouts = 0; + void *baggage = NULL; + (void) varg; + + memset (&data, 0, sizeof (data)); + assert (nkeyvals > 0); + assert (topicsel != OU || nkeyvals == 1); + + baggage = init_sample (&data, 0); + ihs = malloc (nkeyvals * sizeof (dds_instance_handle_t)); + for (unsigned k = 0; k < nkeyvals; k++) + { + data.seq_keyval.keyval = (int32_t) k; + if (register_instances) + dds_register_instance (wr_data, &ihs[k], &data); + else + ihs[k] = 0; + } + data.seq_keyval.keyval = 0; + + tfirst0 = tfirst = dds_time(); + + unsigned bi = 0; + while (!termflag) + { + /* lsb of timestamp is abused to signal whether the sample is a ping requiring a response or not */ + bool reqresp = (ping_frac == 0) ? 0 : (ping_frac == UINT32_MAX) ? 1 : (ddsrt_random () <= ping_frac); + const dds_time_t t_write = (dds_time () & ~1) | reqresp; + if ((result = dds_write_ts (wr_data, &data, t_write)) != DDS_RETCODE_OK) + { + printf ("write error: %d\n", result); + fflush (stdout); + if (dds_err_nr (result) != DDS_RETCODE_TIMEOUT) + exit (2); + timeouts++; + /* retry with original timestamp, it really is just a way of reporting + blocking for an exceedingly long time */ + continue; + } + if (reqresp) + { + dds_write_flush (wr_data); + } + + const dds_time_t t_post_write = dds_time (); + dds_time_t t = t_post_write; + ddsrt_mutex_lock (&pubstat_lock); + hist_record (pubstat_hist, (uint64_t) ((t_post_write - t_write) / 1), 1); + ntot++; + ddsrt_mutex_unlock (&pubstat_lock); + + data.seq_keyval.keyval = (data.seq_keyval.keyval + 1) % (int32_t) nkeyvals; + data.seq++; + + if (rate < HUGE_VAL) + { + if (++bi == burstsize) + { + /* FIXME: should average rate over a short-ish period, rather than over the entire run */ + while (((double) (ntot / burstsize) / ((double) (t - tfirst0) / 1e9 + 5e-3)) > rate && !termflag) + { + /* FIXME: flushing manually because batching is not yet implemented properly */ + dds_write_flush (wr_data); + dds_sleepfor (DDS_MSECS (1)); + t = dds_time (); + } + bi = 0; + } + } + } + if (baggage) + free (baggage); + free (ihs); + return 0; +} + +static void init_eseq_admin (struct eseq_admin *ea, unsigned nkeys) +{ + ddsrt_mutex_init (&ea->lock); + ea->nkeys = nkeys; + ea->nph = 0; + ea->ph = NULL; + ea->stats = NULL; + ea->eseq = NULL; +} + +static void fini_eseq_admin (struct eseq_admin *ea) +{ + free (ea->ph); + free (ea->stats); + for (unsigned i = 0; i < ea->nph; i++) + free (ea->eseq[i]); + ddsrt_mutex_destroy (&ea->lock); + free (ea->eseq); +} + +static int check_eseq (struct eseq_admin *ea, uint32_t seq, uint32_t keyval, uint32_t size, const dds_instance_handle_t pubhandle) +{ + uint32_t *eseq; + if (keyval >= ea->nkeys) + { + printf ("received key %u >= nkeys %u\n", keyval, ea->nkeys); + exit (3); + } + ddsrt_mutex_lock (&ea->lock); + for (uint32_t i = 0; i < ea->nph; i++) + if (pubhandle == ea->ph[i]) + { + uint32_t e = ea->eseq[i][keyval]; + ea->eseq[i][keyval] = seq + ea->nkeys; + ea->stats[i].nrecv++; + ea->stats[i].nrecv_bytes += size; + ea->stats[i].nlost += seq - e; + ddsrt_mutex_unlock (&ea->lock); + return seq == e; + } + ea->ph = realloc (ea->ph, (ea->nph + 1) * sizeof (*ea->ph)); + ea->ph[ea->nph] = pubhandle; + ea->eseq = realloc (ea->eseq, (ea->nph + 1) * sizeof (*ea->eseq)); + ea->eseq[ea->nph] = malloc (ea->nkeys * sizeof (*ea->eseq[ea->nph])); + eseq = ea->eseq[ea->nph]; + for (unsigned i = 0; i < ea->nkeys; i++) + eseq[i] = seq + (i - keyval) + (i <= keyval ? ea->nkeys : 0); + ea->stats = realloc (ea->stats, (ea->nph + 1) * sizeof (*ea->stats)); + memset (&ea->stats[ea->nph], 0, sizeof (ea->stats[ea->nph])); + ea->stats[ea->nph].nrecv = 1; + ea->stats[ea->nph].nrecv_bytes = size; + ea->nph++; + ddsrt_mutex_unlock (&ea->lock); + return 1; +} + +static dds_instance_handle_t get_pphandle_for_pubhandle (dds_instance_handle_t pubhandle) +{ + /* FIXME: implement the get_matched_... interfaces so there's no need for keeping a reader + (and having to GC it, which I'm skipping here ...) */ + int32_t n; + void *msg = NULL; + dds_sample_info_t info; + if ((n = dds_read_instance (rd_publications, &msg, &info, 1, 1, pubhandle)) < 0) + error2 ("dds_read_instance(rd_publications, %"PRIx64") failed: %d\n", pubhandle, (int) n); + else if (n == 0 || !info.valid_data) + { + printf ("get_pong_writer: publication handle %"PRIx64" not found\n", pubhandle); + fflush (stdout); + return 0; + } + else + { + const dds_builtintopic_endpoint_t *sample = msg; + dds_instance_handle_t pphandle = sample->participant_instance_handle; + dds_return_loan (rd_publications, &msg, n); + return pphandle; + } +} + +static bool update_roundtrip (dds_instance_handle_t pubhandle, uint64_t tdelta, bool isping, uint32_t seq) +{ + bool allseen; + ddsrt_mutex_lock (&pongstat_lock); + if (isping && seq == cur_ping_seq) + allseen = (++n_pong_seen == n_pong_expected); + else + allseen = false; + for (uint32_t i = 0; i < npongstat; i++) + if (pongstat[i].pubhandle == pubhandle) + { + struct subthread_arg_pongstat * const x = &pongstat[i]; + if (tdelta < x->min) x->min = tdelta; + if (tdelta > x->max) x->max = tdelta; + x->sum += tdelta; + x->cnt++; + ddsrt_mutex_unlock (&pongstat_lock); + return allseen; + } + pongstat = realloc (pongstat, (npongstat + 1) * sizeof (*pongstat)); + struct subthread_arg_pongstat * const x = &pongstat[npongstat]; + x->pubhandle = pubhandle; + x->pphandle = get_pphandle_for_pubhandle (pubhandle); + x->min = x->max = x->sum = tdelta; + x->cnt = 1; + npongstat++; + ddsrt_mutex_unlock (&pongstat_lock); + return allseen; +} + +static dds_entity_t get_pong_writer_locked (dds_instance_handle_t pubhandle) +{ + dds_instance_handle_t pphandle; + + for (uint32_t j = 0; j < npongwr; j++) + if (pongwr[j].pubhandle == pubhandle) + return pongwr[j].wr_pong; + + /* FIXME: implement the get_matched_... interfaces so there's no need for keeping a reader + (and having to GC it, which I'm skipping here ...) */ + pphandle = get_pphandle_for_pubhandle (pubhandle); + + /* This gets called when no writer is associaed yet with pubhandle, but it may be that a writer + is associated already with pphandle (because there is the data writer and the ping writer) */ + for (uint32_t i = 0; i < npongwr; i++) + { + if (pongwr[i].pphandle == pphandle) + { + dds_entity_t wr_pong = pongwr[i].wr_pong; + if (pongwr[i].pubhandle == 0) + { + pongwr[i].pubhandle = pubhandle; + ddsrt_mutex_unlock (&pongwr_lock); + return wr_pong; + } + else + { + pongwr = realloc (pongwr, (npongwr + 1) * sizeof (*pongwr)); + pongwr[npongwr].pubhandle = pubhandle; + pongwr[npongwr].pphandle = pphandle; + pongwr[npongwr].wr_pong = wr_pong; + npongwr++; + ddsrt_mutex_unlock (&pongwr_lock); + return wr_pong; + } + } + } + printf ("get_pong_writer: participant handle %"PRIx64" not found\n", pphandle); + fflush (stdout); + return 0; +} + +static dds_entity_t get_pong_writer (dds_instance_handle_t pubhandle) +{ + dds_entity_t wr_pong = 0; + ddsrt_mutex_lock (&pongwr_lock); + wr_pong = get_pong_writer_locked (pubhandle); + ddsrt_mutex_unlock (&pongwr_lock); + return wr_pong; +} + +static bool process_data (dds_entity_t rd, struct subthread_arg *arg) +{ + uint32_t max_samples = arg->max_samples; + dds_sample_info_t *iseq = arg->iseq; + void **mseq = arg->mseq; + int32_t nread_data; + if ((nread_data = dds_take (rd, mseq, iseq, max_samples, max_samples)) < 0) + error2 ("dds_take (rd_data): %d\n", (int) nread_data); + for (int32_t i = 0; i < nread_data; i++) + { + if (iseq[i].valid_data) + { + uint32_t seq, keyval, size; + switch (topicsel) + { + case KS: { KeyedSeq *d = (KeyedSeq *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 12 + d->baggage._length; } break; + case K32: { Keyed32 *d = (Keyed32 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 32; } break; + case K256: { Keyed256 *d = (Keyed256 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 256; } break; + case OU: { OneULong *d = (OneULong *) mseq[i]; keyval = 0; seq = d->seq; size = 4; } break; + } + (void) check_eseq (&eseq_admin, seq, keyval, size, iseq[i].publication_handle); + if (iseq[i].source_timestamp & 1) + { + dds_entity_t wr_pong = 0; + if ((wr_pong = get_pong_writer (iseq[i].publication_handle)) != 0) + { + dds_return_t rc; + if ((rc = dds_write_ts (wr_pong, mseq[i], iseq[i].source_timestamp - 1)) < 0 && dds_err_nr(rc) != DDS_RETCODE_TIMEOUT) + error2 ("dds_write_ts (wr_pong, mseq[i], iseq[i].source_timestamp): %d\n", (int) rc); + dds_write_flush (wr_pong); + } + } + } + } + return (nread_data > 0); +} + +static bool process_ping (dds_entity_t rd, struct subthread_arg *arg) +{ + /* Ping sends back Pongs with the lsb 1; Data sends back Pongs with the lsb 0. This way, the Pong handler can + figure out whether to Ping again or not by looking at the lsb. If it is 1, another Ping is required */ + uint32_t max_samples = arg->max_samples; + dds_sample_info_t *iseq = arg->iseq; + void **mseq = arg->mseq; + int32_t nread_ping; + if ((nread_ping = dds_take (rd, mseq, iseq, max_samples, max_samples)) < 0) + error2 ("dds_take (rd_data): %d\n", (int) nread_ping); + for (int32_t i = 0; i < nread_ping; i++) + { + if (iseq[i].valid_data) + { + dds_entity_t wr_pong; + if ((wr_pong = get_pong_writer (iseq[i].publication_handle)) != 0) + { + dds_return_t rc; + if ((rc = dds_write_ts (wr_pong, mseq[i], iseq[i].source_timestamp | 1)) < 0 && dds_err_nr(rc) != DDS_RETCODE_TIMEOUT) + error2 ("dds_write_ts (wr_pong, mseq[i], iseq[i].source_timestamp): %d\n", (int) rc); + dds_write_flush (wr_pong); + } + } + } + return (nread_ping > 0); +} + +static bool process_pong (dds_entity_t rd, struct subthread_arg *arg) +{ + uint32_t max_samples = arg->max_samples; + dds_sample_info_t *iseq = arg->iseq; + void **mseq = arg->mseq; + int32_t nread_pong; + if ((nread_pong = dds_take (rd, mseq, iseq, max_samples, max_samples)) < 0) + error2 ("dds_take (rd_pong): %d\n", (int) nread_pong); + else if (nread_pong > 0) + { + dds_time_t tnow = dds_time (); + for (int32_t i = 0; i < nread_pong; i++) + if (iseq[i].valid_data) + { + uint32_t * const seq = mseq[i]; + const bool isping = (iseq[i].source_timestamp & 1) != 0; + const bool all = update_roundtrip (iseq[i].publication_handle, (uint64_t) (tnow - iseq[i].source_timestamp) / 2, isping, *seq); + if (isping && all && ping_intv == 0) + { + /* If it is a pong sent in response to a ping, and all known nodes have responded, send out a new ping */ + dds_return_t rc; + ddsrt_mutex_lock (&pongwr_lock); + n_pong_seen = 0; + cur_ping_time = dds_time (); + cur_ping_seq = ++(*seq); + ddsrt_mutex_unlock (&pongwr_lock); + if ((rc = dds_write_ts (wr_ping, mseq[i], dds_time () | 1)) < 0 && dds_err_nr(rc) != DDS_RETCODE_TIMEOUT) + error2 ("dds_write (wr_ping, mseq[i]): %d\n", (int) rc); + dds_write_flush (wr_ping); + } + } + } + return (nread_pong > 0); +} + +static void maybe_send_new_ping (dds_time_t tnow, dds_time_t *tnextping) +{ + void *baggage; + union data data; + int32_t rc; + assert (ping_intv != DDS_INFINITY); + ddsrt_mutex_lock (&pongwr_lock); + if (tnow < cur_ping_time + (ping_intv == 0 ? DDS_SECS (1) : ping_intv)) + { + if (ping_intv == 0) + *tnextping = cur_ping_time + DDS_SECS (1); + ddsrt_mutex_unlock (&pongwr_lock); + } + else + { + if (tnow > twarn_ping_timeout) + { + printf ("[%"PRIdPID"] ping timed out ... sending new ping\n", ddsrt_getpid ()); + fflush (stdout); + } + n_pong_seen = 0; + cur_ping_time = tnow; + if (ping_intv > 0) + *tnextping = cur_ping_time + ping_intv; + else if (ping_intv == 0) + *tnextping = cur_ping_time + DDS_SECS (1); + if (ping_intv > 0 && *tnextping > twarn_ping_timeout) + twarn_ping_timeout = *tnextping + ping_intv / 2; + cur_ping_seq++; + baggage = init_sample (&data, cur_ping_seq); + ddsrt_mutex_unlock (&pongwr_lock); + if ((rc = dds_write_ts (wr_ping, &data, dds_time () | 1)) < 0 && dds_err_nr (rc) != DDS_RETCODE_TIMEOUT) + error2 ("send_new_ping: dds_write (wr_ping, &data): %d\n", (int) rc); + dds_write_flush (wr_ping); + if (baggage) + free (baggage); + } +} + +static uint32_t subthread_waitset (void *varg) +{ + struct subthread_arg * const arg = varg; + dds_entity_t ws; + int32_t rc; + ws = dds_create_waitset (dp); + if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) + error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); + if ((rc = dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS)) < 0) + error2 ("dds_set_status_mask (rd_data, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); + if ((rc = dds_waitset_attach (ws, rd_data, 1)) < 0) + error2 ("dds_waitset_attach (ws, rd_data, 1): %d\n", (int) rc); + while (!termflag) + { + if (!process_data (rd_data, arg)) + { + /* when we use DATA_AVAILABLE, we must read until nothing remains, or we would deadlock + if more than max_samples were available and nothing further is received */ + int32_t nxs; + if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) + error2 ("dds_waitset_wait: %d\n", (int) nxs); + } + } + return 0; +} + +static uint32_t subpingthread_waitset (void *varg) +{ + struct subthread_arg * const arg = varg; + dds_entity_t ws; + int32_t rc; + ws = dds_create_waitset (dp); + if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) + error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); + if ((rc = dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS)) < 0) + error2 ("dds_set_status_mask (rd_ping, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); + if ((rc = dds_waitset_attach (ws, rd_ping, 1)) < 0) + error2 ("dds_waitset_attach (ws, rd_ping, 1): %d\n", (int) rc); + while (!termflag) + { + int32_t nxs; + if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) + error2 ("dds_waitset_wait: %d\n", (int) nxs); + process_ping (rd_ping, arg); + } + return 0; +} + +static uint32_t subpongthread_waitset (void *varg) +{ + struct subthread_arg * const arg = varg; + dds_entity_t ws; + int32_t rc; + ws = dds_create_waitset (dp); + if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) + error2 ("dds_waitset_attach (termcond, 0): %d\n", (int) rc); + if ((rc = dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS)) < 0) + error2 ("dds_set_status_mask (rd_pong, DDS_DATA_AVAILABLE_STATUS): %d\n", (int) rc); + if ((rc = dds_waitset_attach (ws, rd_pong, 1)) < 0) + error2 ("dds_waitset_attach (ws, rd_pong, 1): %d\n", (int) rc); + while (!termflag) + { + int32_t nxs; + if ((nxs = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY)) < 0) + error2 ("dds_waitset_wait: %d\n", (int) nxs); + process_pong (rd_pong, arg); + } + return 0; +} + +static uint32_t subthread_polling (void *varg) +{ + struct subthread_arg * const arg = varg; + while (!termflag) + { + if (!process_data (rd_data, arg)) + dds_sleepfor (DDS_MSECS (1)); + } + return 0; +} + +static void data_available_listener (dds_entity_t rd, void *arg) +{ + process_data (rd, arg); +} + +static void ping_available_listener (dds_entity_t rd, void *arg) +{ + process_ping (rd, arg); +} + +static void pong_available_listener (dds_entity_t rd, void *arg) +{ + process_pong (rd, arg); +} + +static dds_entity_t create_pong_writer (dds_instance_handle_t pphandle, const struct guidstr *guidstr) +{ + dds_qos_t *qos; + dds_listener_t *listener; + dds_entity_t pongpub; + dds_entity_t wr_pong; + + //printf ("[%"PRIdPID"] create_pong_writer: creating writer in partition %s pubhandle %"PRIx64"\n", ddsrt_getpid (), guidstr->str, pphandle); + //fflush (stdout); + + qos = dds_create_qos (); + dds_qset_partition1 (qos, guidstr->str); + if ((pongpub = dds_create_publisher (dp, qos, NULL)) < 0) + error2 ("dds_create_publisher failed: %d\n", (int) pongpub); + dds_delete_qos (qos); + + listener = dds_create_listener ((void *) (uintptr_t) MM_RD_PONG); + dds_lset_publication_matched (listener, publication_matched_listener); + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); + dds_qset_ignorelocal (qos, ignorelocal); + if ((wr_pong = dds_create_writer (pongpub, tp_pong, qos, listener)) < 0) + error2 ("dds_create_writer(%s) failed: %d\n", tpname_pong, (int) wr_pong); + dds_delete_qos (qos); + dds_delete_listener (listener); + + ddsrt_mutex_lock (&pongwr_lock); + pongwr = realloc (pongwr, (npongwr + 1) * sizeof (*pongwr)); + pongwr[npongwr].pubhandle = 0; + pongwr[npongwr].pphandle = pphandle; + pongwr[npongwr].wr_pong = wr_pong; + npongwr++; + ddsrt_mutex_unlock (&pongwr_lock); + return wr_pong; +} + +static void delete_pong_writer (dds_instance_handle_t pphandle) +{ + uint32_t i = 0; + dds_entity_t wr_pong = 0; + ddsrt_mutex_lock (&pongwr_lock); + while (i < npongwr) + { + if (pongwr[i].pphandle != pphandle) + i++; + else + { + assert (wr_pong == 0 || wr_pong == pongwr[i].wr_pong); + memmove (&pongwr[i], &pongwr[i+1], (npongwr - i) * sizeof (pongwr[0])); + npongwr--; + } + } + ddsrt_mutex_unlock (&pongwr_lock); + if (wr_pong) + dds_delete (wr_pong); +} + +static void participant_data_listener (dds_entity_t rd, void *arg) +{ + dds_sample_info_t info; + void *msg = NULL; + uint32_t n_pong_expected_delta = 0; + int32_t n; + (void) arg; + while ((n = dds_take (rd, &msg, &info, 1, 1)) > 0) + { + struct ppant *pp; + assert (info.instance_state != DDS_ALIVE_INSTANCE_STATE || info.valid_data); + if (info.instance_state != DDS_ALIVE_INSTANCE_STATE) + { + ddsrt_avl_dpath_t dpath; + ddsrt_mutex_lock (&disc_lock); + if ((pp = ddsrt_avl_lookup_dpath (&ppants_td, &ppants, &info.instance_handle, &dpath)) != NULL) + { + printf ("[%"PRIdPID"] participant %s:%"PRIu32": gone\n", ddsrt_getpid (), pp->hostname, pp->pid); + fflush (stdout); + + if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) + { + delete_pong_writer (pp->handle); + n_pong_expected_delta--; + } + + ddsrt_avl_delete_dpath (&ppants_td, &ppants, pp, &dpath); + if (pp->tdeadline != DDS_NEVER) + ddsrt_fibheap_delete (&ppants_to_match_fhd, &ppants_to_match, pp); + free (pp); + } + ddsrt_mutex_unlock (&disc_lock); + } + else + { + const dds_builtintopic_participant_t *sample = msg; + void *vudata; + size_t usz; + ddsrt_avl_ipath_t ipath; + /* only add unknown participants with the magic user_data value: DDSPerf:X:HOSTNAME, where X is decimal */ + if (dds_qget_userdata (sample->qos, &vudata, &usz) && usz > 0) + { + const char *udata = vudata; + int has_reader, pos; + long pid; + if (sscanf (udata, UDATA_MAGIC "%d:%ld%n", &has_reader, &pid, &pos) == 2 && udata[pos] == ':' && strlen (udata + pos) == usz - (unsigned) pos) + { + size_t sz = usz - (unsigned) pos; + char *hostname = malloc (sz); + memcpy (hostname, udata + pos + 1, sz); + ddsrt_mutex_lock (&disc_lock); + if ((pp = ddsrt_avl_lookup_ipath (&ppants_td, &ppants, &info.instance_handle, &ipath)) != NULL) + free (hostname); + else + { + printf ("[%"PRIdPID"] participant %s:%"PRIu32": new%s\n", ddsrt_getpid (), hostname, (uint32_t) pid, (info.instance_handle == dp_handle) ? " (self)" : ""); + pp = malloc (sizeof (*pp)); + pp->handle = info.instance_handle; + pp->guid = sample->key; + pp->hostname = hostname; + pp->pid = (uint32_t) pid; + pp->tdisc = dds_time (); + pp->tdeadline = pp->tdisc + DDS_SECS (5); + if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) + pp->unmatched = MM_ALL & ~(has_reader ? 0 : MM_RD_DATA) & ~(rd_data ? 0 : MM_WR_DATA); + else + pp->unmatched = 0; + ddsrt_fibheap_insert (&ppants_to_match_fhd, &ppants_to_match, pp); + ddsrt_avl_insert_ipath (&ppants_td, &ppants, pp, &ipath); + + if (pp->handle != dp_handle || ignorelocal == DDS_IGNORELOCAL_NONE) + { + struct guidstr guidstr; + make_guidstr (&guidstr, &sample->key); + create_pong_writer (pp->handle, &guidstr); + n_pong_expected_delta++; + } + } + ddsrt_mutex_unlock (&disc_lock); + } + dds_free (vudata); + } + } + dds_return_loan (rd, &msg, n); + } + if (n < 0) + error2 ("dds_take(rd_participants): error %d\n", (int) n); + + if (n_pong_expected_delta) + { + ddsrt_mutex_lock (&pongstat_lock); + n_pong_expected += n_pong_expected_delta; + /* potential initial packet loss & lazy writer creation conspire against receiving + the expected number of responses, so allow for a few attempts before starting to + warn about timeouts */ + twarn_ping_timeout = dds_time () + DDS_MSECS (3333); + //printf ("[%"PRIdPID"] n_pong_expected = %u\n", ddsrt_getpid (), n_pong_expected); + ddsrt_mutex_unlock (&pongstat_lock); + } +} + +static void endpoint_matched_listener (uint32_t match_mask, dds_entity_t rd_epinfo, dds_instance_handle_t remote_endpoint) +{ + dds_sample_info_t info; + void *msg = NULL; + int32_t n; + + /* update participant data so this remote endpoint's participant will be known */ + participant_data_listener (rd_participants, NULL); + + /* FIXME: implement the get_matched_... interfaces so there's no need for keeping a reader + (and having to GC it, which I'm skipping here ...) */ + if ((n = dds_read_instance (rd_epinfo, &msg, &info, 1, 1, remote_endpoint)) < 0) + error2 ("dds_read_instance(rd_epinfo, %"PRIx64") failed: %d\n", remote_endpoint, (int) n); + else if (n == 0) + printf ("[%"PRIdPID"] endpoint %"PRIx64" not found\n", ddsrt_getpid (), remote_endpoint); + else + { + if (info.valid_data) + { + const dds_builtintopic_endpoint_t *sample = msg; + struct ppant *pp; + ddsrt_mutex_lock (&disc_lock); + if ((pp = ddsrt_avl_lookup (&ppants_td, &ppants, &sample->participant_instance_handle)) == NULL) + printf ("[%"PRIdPID"] participant %"PRIx64" no longer exists\n", ddsrt_getpid (), sample->participant_instance_handle); + else + { + pp->unmatched &= ~match_mask; + if (pp->unmatched == 0) + matchcount++; + } + ddsrt_mutex_unlock (&disc_lock); + } + dds_return_loan (rd_epinfo, &msg, n); + } + fflush (stdout); +} + +static const char *match_mask1_to_string (uint32_t mask) +{ + assert ((mask & ~MM_ALL) == 0); + switch (mask) + { + case MM_WR_DATA: return "data writer"; + case MM_RD_DATA: return "data reader"; + case MM_WR_PING: return "ping writer"; + case MM_RD_PING: return "ping reader"; + case MM_WR_PONG: return "pong writer"; + case MM_RD_PONG: return "pong reader"; + } + return "?"; +} + +static char *match_mask_to_string (char *buf, size_t size, uint32_t mask) +{ + size_t pos = 0; + while (pos < size && mask != 0) + { + uint32_t mask1 = mask & (~mask + 1u); + mask &= ~mask1; + int n = snprintf (buf + pos, size - (unsigned) pos, "%s%s", (pos > 0) ? ", " : "", match_mask1_to_string (mask1)); + if (n >= 0) pos += (size_t) n; + } + return buf; +} + +static void subscription_matched_listener (dds_entity_t rd, const dds_subscription_matched_status_t status, void *arg) +{ + /* this only works because the listener is called for every match; but I don't think that is something the + spec guarantees, and I don't think Cyclone should guarantee that either -- and if it isn't guaranteed + _really_ needs the get_matched_... interfaces to not have to implement the matching logic ... */ + (void) rd; + if (status.current_count_change > 0) + { + uint32_t mask = (uint32_t) (uintptr_t) arg; + //printf ("[%"PRIdPID"] subscription match: %s\n", ddsrt_getpid (), match_mask1_to_string (mask)); + endpoint_matched_listener (mask, rd_publications, status.last_publication_handle); + } +} + +static void publication_matched_listener (dds_entity_t wr, const dds_publication_matched_status_t status, void *arg) +{ + /* this only works because the listener is called for every match; but I don't think that is something the + spec guarantees, and I don't think Cyclone should guarantee that either -- and if it isn't guaranteed + _really_ needs the get_matched_... interfaces to not have to implement the matching logic ... */ + (void) wr; + if (status.current_count_change > 0) + { + uint32_t mask = (uint32_t) (uintptr_t) arg; + //printf ("[%"PRIdPID"] publication match: %s\n", ddsrt_getpid (), match_mask1_to_string (mask)); + endpoint_matched_listener (mask, rd_subscriptions, status.last_subscription_handle); + } +} + +static void set_data_available_listener (dds_entity_t rd, const char *rd_name, dds_on_data_available_fn fn, void *arg) +{ + /* This convoluted code is so that we leave all listeners unchanged, except the + data_available one. There is no real need for these complications, but it is + a nice exercise. */ + dds_listener_t *listener = dds_create_listener (arg); + dds_return_t rc; + dds_lset_data_available (listener, fn); + dds_listener_t *tmplistener = dds_create_listener (NULL); + if ((rc = dds_get_listener (rd, tmplistener)) < 0) + error2 ("dds_get_listener(%s) failed: %d\n", rd_name, (int) rc); + dds_merge_listener (listener, tmplistener); + dds_delete_listener (tmplistener); + + if ((rc = dds_set_listener (rd, listener)) < 0) + error2 ("dds_set_listener(%s) failed: %d\n", rd_name, (int) rc); + dds_delete_listener (listener); +} + +static void print_stats (dds_time_t tstart, dds_time_t tnow, dds_time_t tprev) +{ + char prefix[128]; + const double ts = (double) (tnow - tstart) / 1e9; + snprintf (prefix, sizeof (prefix), "[%"PRIdPID"] %.3f ", ddsrt_getpid (), ts); + + if (rate > 0) + { + ddsrt_mutex_lock (&pubstat_lock); + hist_print (prefix, pubstat_hist, tnow - tprev, 1); + ddsrt_mutex_unlock (&pubstat_lock); + } + + if (submode != SM_NONE) + { + struct eseq_admin * const ea = &eseq_admin; + uint64_t tot_nrecv = 0, nrecv = 0, nrecv_bytes = 0, nlost = 0; + ddsrt_mutex_lock (&ea->lock); + for (uint32_t i = 0; i < ea->nph; i++) + { + struct eseq_stat * const x = &ea->stats[i]; + tot_nrecv += x->nrecv; + nrecv += x->nrecv - x->nrecv_ref; + nlost += x->nlost - x->nlost_ref; + nrecv_bytes += x->nrecv_bytes - x->nrecv_bytes_ref; + x->nrecv_ref = x->nrecv; + x->nlost_ref = x->nlost; + x->nrecv_bytes_ref = x->nrecv_bytes; + } + ddsrt_mutex_unlock (&ea->lock); + + if (nrecv > 0) + { + printf ("%s ntot %"PRIu64" delta: %"PRIu64" lost %"PRIu64" rate %.2f Mb/s\n", + prefix, tot_nrecv, nrecv, nlost, (double) nrecv_bytes * 8 / 1e6 / (1e9 * (double) (tnow - tprev))); + } + } + + ddsrt_mutex_lock (&pongstat_lock); + for (uint32_t i = 0; i < npongstat; i++) + { + struct subthread_arg_pongstat * const x = &pongstat[i]; + if (x->cnt > 0) + { + struct ppant *pp; + ddsrt_mutex_lock (&disc_lock); + if ((pp = ddsrt_avl_lookup (&ppants_td, &ppants, &x->pphandle)) == NULL) + printf ("%s %"PRIx64" min %.3fus mean %.3fus max %.3fus cnt %"PRIu32"\n", + prefix, x->pubhandle, (double) x->min / 1e3, + (double) x->sum / (double) x->cnt / 1e3, (double) x->max / 1e3, x->cnt); + else + printf ("%s %s:%d min %.3fus mean %.3fus max %.3fus cnt %"PRIu32"\n", + prefix, pp->hostname, pp->pid, (double) x->min / 1e3, + (double) x->sum / (double) x->cnt / 1e3, (double) x->max / 1e3, x->cnt); + ddsrt_mutex_unlock (&disc_lock); + x->min = UINT64_MAX; + x->max = x->sum = x->cnt = 0; + } + } + ddsrt_mutex_unlock (&pongstat_lock); + fflush (stdout); +} + +static void subthread_arg_init (struct subthread_arg *arg, dds_entity_t rd, uint32_t max_samples) +{ + arg->rd = rd; + arg->max_samples = max_samples; + arg->mseq = malloc (arg->max_samples * sizeof (arg->mseq[0])); + arg->iseq = malloc (arg->max_samples * sizeof (arg->iseq[0])); + for (uint32_t i = 0; i < arg->max_samples; i++) + arg->mseq[i] = NULL; +} + +static void subthread_arg_fini (struct subthread_arg *arg) +{ + dds_return_loan(arg->rd, arg->mseq, (int32_t) arg->max_samples); + free (arg->mseq); + free (arg->iseq); +} + +static void signal_handler (int sig) +{ + (void) sig; + termflag = 1; + dds_set_guardcondition (termcond, true); +} + +#ifndef _WIN32 +static uint32_t sigthread (void *varg) +{ + sigset_t *set = varg; + int sig; + if (sigwait (set, &sig) == 0) + signal_handler (sig); + else + error2 ("sigwait failed: %d\n", errno); + return 0; +} +#endif + +/******************** + COMMAND LINE PARSING + ********************/ + +static void usage (void) +{ + printf ("\ +%s help\n\ +%s [OPTIONS] MODE...\n\ +\n\ +OPTIONS:\n\ + -T KS|K32|K256|OU topic:\n\ + KS seq num, key value, sequence-of-octets\n\ + K32 seq num, key value, array of 24 octets\n\ + K256 seq num, key value, array of 248 octets\n\ + OU seq num\n\ + -L allow matching with local endpoints\n\ + -u best-effort instead of reliable\n\ + -k all|N keep-all or keep-last-N for data (ping/pong is\n\ + always keep-last-1)\n\ + -n N number of key values to use for data (only for\n\ + topics with a key value)\n\ + -D DUR run for at most DUR seconds\n\ + -N COUNT require at least COUNT matching participants\n\ + -M DUR require those participants to match within DUR seconds\n\ +\n\ +MODE... is zero or more of:\n\ + ping [R[Hz]] [waitset|listener]\n\ + Send a ping upon receiving all expected pongs, or send a ping at\n\ + rate R (optionally suffixed with Hz). The triggering mode is either\n\ + a listener (default, unless -L has been specified) or a waitset.\n\ + pong [waitset|listener]\n\ + A \"dummy\" mode that serves two purposes: configuring the triggering.\n\ + mode (but it is shared with ping's mode), and suppressing the 1Hz ping\n\ + if no other options are selected. It always responds to pings.\n\ + sub [waitset|listener|polling]\n\ + Subscribe to data, with calls to take occurring either in a listener\n\ + (default), when a waitset is triggered, or by polling at 1kHz.\n\ + pub [R[Hz]] [burst N] [[ping] X%%]\n\ + Publish bursts of data at rate R, optionally suffixed with Hz. If\n\ + no rate is given or R is \"inf\", data is published as fast as\n\ + possible. Each burst is a single sample by default, but can be set\n\ + to larger value using \"burst N\".\n\ + If desired, a fraction of the samples can be treated as if it were a\n\ + ping, for this, specify a percentage either as \"ping X%%\" (the\n\ + \"ping\" keyword is optional, the %% sign is not).\n\ +\n\ +If no MODE specified, it defaults to a 1Hz ping + responding to any pings.\n\ +", argv0, argv0); + fflush (stdout); + exit (3); +} + +struct string_int_map_elem { + const char *name; + int value; +}; + +static const struct string_int_map_elem modestrings[] = { + { "ping", 1 }, + { "pong", 2 }, + { "sub", 3 }, + { "pub", 4 }, + { NULL, 0 } +}; + +static const struct string_int_map_elem pingpongmodes[] = { + { "waitset", SM_WAITSET }, + { "listener", SM_LISTENER }, + { NULL, 0 } +}; + +static int exact_string_int_map_lookup (const struct string_int_map_elem *elems, const char *label, const char *str, bool notfound_error) +{ + for (size_t i = 0; elems[i].name; i++) + if (strcmp (elems[i].name, str) == 0) + return elems[i].value; + if (notfound_error) + error3 ("%s: undefined %s", str, label); + return -1; +} + +static int string_int_map_lookup (const struct string_int_map_elem *elems, const char *label, const char *str, bool notfound_error) +{ + size_t match = SIZE_MAX; + size_t len = strlen (str); + bool ambiguous = false; + for (size_t i = 0; elems[i].name; i++) + { + if (strcmp (elems[i].name, str) == 0) + return elems[match].value; + else if (len >= 3 && strlen (elems[i].name) >= 3 && strncmp (elems[i].name, str, len) == 0) + { + if (match == SIZE_MAX) + match = i; + else + ambiguous = true; + } + } + if (ambiguous) + error3 ("%s: ambiguous %sspecification", str, label); + if (match == SIZE_MAX && notfound_error) + error3 ("%s: undefined %s", str, label); + return (match == SIZE_MAX) ? -1 : elems[match].value; +} + +static void set_mode_ping (int *xoptind, int xargc, char * const xargv[]) +{ + ping_intv = 0; + pingpongmode = SM_LISTENER; + while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) + { + int pos; + double r; + if (strcmp (xargv[*xoptind], "inf") == 0) + { + ping_intv = 0; + } + else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0)) + { + if (r == 0) ping_intv = DDS_INFINITY; + else if (r > 0) ping_intv = (dds_duration_t) (1e9 / rate + 0.5); + else error3 ("%s: invalid ping rate\n", xargv[*xoptind]); + } + else + { + pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "ping mode", xargv[*xoptind], true); + } + (*xoptind)++; + } +} + +static void set_mode_pong (int *xoptind, int xargc, char * const xargv[]) +{ + pingpongmode = SM_LISTENER; + while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) + { + pingpongmode = (enum submode) string_int_map_lookup (pingpongmodes, "pong mode", xargv[*xoptind], true); + (*xoptind)++; + } +} + +static void set_mode_sub (int *xoptind, int xargc, char * const xargv[]) +{ + static const struct string_int_map_elem submodes[] = { + { "waitset", SM_WAITSET }, + { "polling", SM_POLLING }, + { "listener", SM_LISTENER }, + { NULL, 0 } + }; + submode = SM_LISTENER; + while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) + { + submode = (enum submode) string_int_map_lookup (submodes, "subscription mode", xargv[*xoptind], true); + (*xoptind)++; + } +} + +static void set_mode_pub (int *xoptind, int xargc, char * const xargv[]) +{ + rate = HUGE_VAL; + burstsize = 1; + ping_frac = 0; + while (*xoptind < xargc && exact_string_int_map_lookup (modestrings, "mode string", xargv[*xoptind], false) == -1) + { + int pos = 0; + double r; + if (strcmp (xargv[*xoptind], "inf") == 0 || strcmp (xargv[*xoptind], "infHz") == 0) + { + rate = HUGE_VAL; + } + else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && (xargv[*xoptind][pos] == 0 || strcmp (xargv[*xoptind] + pos, "Hz") == 0)) + { + if (r < 0) error3 ("%s: invalid publish rate\n", xargv[*xoptind]); + rate = r; + } + else if (strcmp (xargv[*xoptind], "burst") == 0) + { + unsigned b; + if (++(*xoptind) == xargc) + error3 ("argument missing in burst size specification\n"); + if (sscanf (xargv[*xoptind], "%u%n", &b, &pos) == 1 && xargv[*xoptind][pos] == 0) + burstsize = b; + else + error3 ("%s: invalid burst size specification\n", xargv[*xoptind]); + } + else if (sscanf (xargv[*xoptind], "%lf%n", &r, &pos) == 1 && strcmp (xargv[*xoptind] + pos, "%") == 0) + { + if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]); + ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5); + } + else if (strcmp (xargv[*xoptind], "ping") == 0 && *xoptind + 1 < xargc && sscanf (xargv[*xoptind + 1], "%lf%%%n", &rate, &pos) == 1 && xargv[*xoptind + 1][pos] == 0) + { + ++(*xoptind); + if (r < 0 || r > 100) error3 ("%s: ping fraction out of range\n", xargv[*xoptind]); + ping_frac = (uint32_t) (UINT32_MAX * (r / 100.0) + 0.5); + } + else + { + error3 ("%s: unrecognised publish specification\n", xargv[*xoptind]); + } + (*xoptind)++; + } +} + +static void set_mode (int xoptind, int xargc, char * const xargv[]) +{ + int code; + rate = 0.0; + submode = SM_NONE; + pingpongmode = SM_LISTENER; + ping_intv = (xoptind == xargc) ? DDS_SECS (1) : DDS_INFINITY; + ping_frac = 0; + while (xoptind < xargc && (code = exact_string_int_map_lookup (modestrings, "mode string", xargv[xoptind], true)) != -1) + { + xoptind++; + switch (code) + { + case 1: set_mode_ping (&xoptind, xargc, xargv); break; + case 2: set_mode_pong (&xoptind, xargc, xargv); break; + case 3: set_mode_sub (&xoptind, xargc, xargv); break; + case 4: set_mode_pub (&xoptind, xargc, xargv); break; + } + } + if (xoptind != xargc) + { + error3 ("%s: unrecognized argument\n", xargv[xoptind]); + } +} + +int main (int argc, char *argv[]) +{ + dds_entity_t ws; + dds_return_t rc; + dds_qos_t *qos; + dds_listener_t *listener; + int opt; + ddsrt_threadattr_t attr; + ddsrt_thread_t pubtid, subtid, subpingtid, subpongtid; +#ifndef _WIN32 + sigset_t sigset, osigset; + ddsrt_thread_t sigtid; +#endif + ddsrt_threadattr_init (&attr); + + argv0 = argv[0]; + + if (argc == 2 && strcmp (argv[1], "help") == 0) + usage (); + while ((opt = getopt (argc, argv, "D:n:z:k:uLT:M:N:h")) != EOF) + { + switch (opt) + { + case 'D': dur = atof (optarg); if (dur <= 0) dur = HUGE_VAL; break; + case 'n': nkeyvals = (unsigned) atoi (optarg); break; + case 'u': reliable = false; break; + case 'k': histdepth = atoi (optarg); if (histdepth < 0) histdepth = 0; break; + case 'L': ignorelocal = DDS_IGNORELOCAL_NONE; break; + case 'T': + if (strcmp (optarg, "KS") == 0) topicsel = KS; + else if (strcmp (optarg, "K32") == 0) topicsel = K32; + else if (strcmp (optarg, "K256") == 0) topicsel = K256; + else if (strcmp (optarg, "OU") == 0) topicsel = OU; + else error3 ("%s: unknown topic\n", optarg); + break; + case 'M': maxwait = atof (optarg); if (maxwait <= 0) maxwait = HUGE_VAL; break; + case 'N': minmatch = (unsigned) atoi (optarg); break; + case 'z': baggagesize = (unsigned) atoi (optarg); break; + case 'h': usage (); break; + default: error3 ("-%c: unknown option\n", opt); break; + } + } + set_mode (optind, argc, argv); + + if (nkeyvals == 0) + nkeyvals = 1; + if (topicsel == OU && nkeyvals != 1) + error3 ("-n%u invalid: topic OU has no key\n", nkeyvals); + if (topicsel != KS && baggagesize != 0) + error3 ("-z%u invalid: only topic KS has a sequence\n", baggagesize); + + ddsrt_avl_init (&ppants_td, &ppants); + ddsrt_fibheap_init (&ppants_to_match_fhd, &ppants_to_match); + + ddsrt_mutex_init (&disc_lock); + ddsrt_mutex_init (&pongstat_lock); + ddsrt_mutex_init (&pongwr_lock); + ddsrt_mutex_init (&pubstat_lock); + + pubstat_hist = hist_new (30, 1000, 0); + + qos = dds_create_qos (); + /* set user data: magic cookie, whether we have a reader for the Data topic + (all other endpoints always exist), and our hostname */ + { + unsigned pos; + char udata[256]; + pos = (unsigned) snprintf (udata, sizeof (udata), UDATA_MAGIC"%d:%"PRIdPID":", submode != SM_NONE, ddsrt_getpid ()); + assert (pos < sizeof (udata)); + if (ddsrt_gethostname (udata + pos, sizeof (udata) - pos) != DDS_RETCODE_OK) + strcpy (udata + UDATA_MAGIC_SIZE, "?"); + dds_qset_userdata (qos, udata, strlen (udata)); + } + if ((dp = dds_create_participant (did, qos, NULL)) < 0) + error2 ("dds_create_participant(domain %d) failed: %d\n", (int) did, (int) dp); + dds_delete_qos (qos); + dds_write_set_batch (true); + if ((rc = dds_get_instance_handle (dp, &dp_handle)) < 0) + error2 ("dds_get_instance_handle(participant) failed: %d\n", (int) rc); + + qos = dds_create_qos (); + dds_qset_partition1 (qos, "DDSPerf"); + if ((sub = dds_create_subscriber (dp, NULL, NULL)) < 0) + error2 ("dds_create_subscriber failed: %d\n", (int) dp); + if ((pub = dds_create_publisher (dp, NULL, NULL)) < 0) + error2 ("dds_create_publisher failed: %d\n", (int) dp); + dds_delete_qos (qos); + + { + const char *tp_suf = ""; + const dds_topic_descriptor_t *tp_desc = NULL; + switch (topicsel) + { + case KS: tp_suf = "KS"; tp_desc = &KeyedSeq_desc; break; + case K32: tp_suf = "K32"; tp_desc = &Keyed32_desc; break; + case K256: tp_suf = "K256"; tp_desc = &Keyed256_desc; break; + case OU: tp_suf = "OU"; tp_desc = &OneULong_desc; break; + } + snprintf (tpname_data, sizeof (tpname_data), "DDSPerf%cData%s", reliable ? 'R' : 'U', tp_suf); + snprintf (tpname_ping, sizeof (tpname_ping), "DDSPerf%cPing%s", reliable ? 'R' : 'U', tp_suf); + snprintf (tpname_pong, sizeof (tpname_pong), "DDSPerf%cPong%s", reliable ? 'R' : 'U', tp_suf); + qos = dds_create_qos (); + dds_qset_reliability (qos, reliable ? DDS_RELIABILITY_RELIABLE : DDS_RELIABILITY_BEST_EFFORT, DDS_SECS (1)); + if ((tp_data = dds_create_topic (dp, tp_desc, tpname_data, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", tpname_data, (int) tp_data); + if ((tp_ping = dds_create_topic (dp, tp_desc, tpname_ping, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", tpname_ping, (int) tp_ping); + if ((tp_pong = dds_create_topic (dp, tp_desc, tpname_pong, qos, NULL)) < 0) + error2 ("dds_create_topic(%s) failed: %d\n", tpname_pong, (int) tp_pong); + dds_delete_qos (qos); + } + + /* participants reader must exist before the "publication matched" or "subscription matched" + listener is invoked, or it won't be able to get the details (FIXME: even the DDS spec + has convenience functions for that ...) */ + listener = dds_create_listener (NULL); + dds_lset_data_available (listener, participant_data_listener); + if ((rd_participants = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, NULL, listener)) < 0) + error2 ("dds_create_reader(participants) failed: %d\n", (int) rd_participants); + dds_delete_listener (listener); + if ((rd_subscriptions = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL)) < 0) + error2 ("dds_create_reader(subscriptions) failed: %d\n", (int) rd_subscriptions); + if ((rd_publications = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0) + error2 ("dds_create_reader(publications) failed: %d\n", (int) rd_publications); + + /* ping reader/writer uses keep-last-1 history; not checking matching on these (yet) */ + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, 1); + dds_qset_ignorelocal (qos, ignorelocal); + listener = dds_create_listener ((void *) (uintptr_t) MM_WR_PING); + dds_lset_subscription_matched (listener, subscription_matched_listener); + if ((rd_ping = dds_create_reader (sub, tp_ping, qos, listener)) < 0) + error2 ("dds_create_reader(%s) failed: %d\n", tpname_ping, (int) rd_ping); + dds_delete_listener (listener); + listener = dds_create_listener ((void *) (uintptr_t) MM_RD_PING); + dds_lset_publication_matched (listener, publication_matched_listener); + if ((wr_ping = dds_create_writer (pub, tp_ping, qos, listener)) < 0) + error2 ("dds_create_writer(%s) failed: %d\n", tpname_ping, (int) wr_ping); + dds_delete_listener (listener); + dds_delete_qos (qos); + + /* data reader/writer use a keep-all history with generous resource limits. */ + qos = dds_create_qos (); + if (histdepth == 0) + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 1); + else + dds_qset_history (qos, DDS_HISTORY_KEEP_LAST, histdepth); + dds_qset_resource_limits (qos, 10000, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + dds_qset_ignorelocal (qos, ignorelocal); + listener = dds_create_listener ((void *) (uintptr_t) MM_WR_DATA); + dds_lset_subscription_matched (listener, subscription_matched_listener); + if (submode != SM_NONE && (rd_data = dds_create_reader (sub, tp_data, qos, listener)) < 0) + error2 ("dds_create_reader(%s) failed: %d\n", tpname_data, (int) rd_data); + dds_delete_listener (listener); + listener = dds_create_listener ((void *) (uintptr_t) MM_RD_DATA); + dds_lset_publication_matched (listener, publication_matched_listener); + if ((wr_data = dds_create_writer (pub, tp_data, qos, listener)) < 0) + error2 ("dds_create_writer(%s) failed: %d\n", tpname_data, (int) wr_data); + dds_delete_listener (listener); + + /* We only need a pong reader when sending data with a non-zero probability + of it being a "ping", or when sending "real" pings. I.e., if + rate > 0 && ping_frac > 0) || ping_intv != DDS_NEVER + but it doesn't really hurt to have the reader either, and always creating + it and futhermore eagerly creating the pong writers means we can do more + checking. */ + { + /* participant listener should have already been called for "dp", so we + can simply look up the details on ourself to get at the GUID of the + participant */ + struct guidstr guidstr; + struct ppant *pp; + dds_entity_t sub_pong; + ddsrt_mutex_lock (&disc_lock); + if ((pp = ddsrt_avl_lookup (&ppants_td, &ppants, &dp_handle)) == NULL) + { + printf ("participant %"PRIx64" (self) not found\n", dp_handle); + exit (2); + } + make_guidstr (&guidstr, &pp->guid); + ddsrt_mutex_unlock (&disc_lock); + dds_qos_t *subqos = dds_create_qos (); + dds_qset_partition1 (subqos, guidstr.str); + if ((sub_pong = dds_create_subscriber (dp, subqos, NULL)) < 0) + error2 ("dds_create_subscriber(pong) failed: %d\n", (int) sub_pong); + dds_delete_qos (subqos); + listener = dds_create_listener ((void *) (uintptr_t) MM_WR_PONG); + dds_lset_subscription_matched (listener, subscription_matched_listener); + if ((rd_pong = dds_create_reader (sub_pong, tp_pong, qos, listener)) < 0) + error2 ("dds_create_reader(%s) failed: %d\n", tpname_pong, (int) rd_pong); + dds_delete_listener (listener); + } + dds_delete_qos (qos); + + if ((termcond = dds_create_guardcondition (dp)) < 0) + error2 ("dds_create_guardcondition(termcond) failed: %d\n", (int) termcond); + if ((ws = dds_create_waitset (dp)) < 0) + error2 ("dds_create_waitset(main) failed: %d\n", (int) ws); + if ((rc = dds_waitset_attach (ws, termcond, 0)) < 0) + error2 ("dds_waitset_attach(main, termcond) failed: %d\n", (int) rc); + + /* I hate Unix signals in multi-threaded processes ... */ +#ifdef _WIN32 + signal (SIGINT, signal_handler); +#else + sigemptyset (&sigset); + sigaddset (&sigset, SIGINT); + sigaddset (&sigset, SIGTERM); + sigprocmask (SIG_BLOCK, &sigset, &osigset); + ddsrt_thread_create (&sigtid, "sigthread", &attr, sigthread, &sigset); +#endif + + /* Make publisher & subscriber thread arguments and start the threads we + need (so what if we allocate memory for reading data even if we don't + have a reader or will never really be receiving data) */ + struct subthread_arg subarg_data, subarg_ping, subarg_pong; + init_eseq_admin (&eseq_admin, nkeyvals); + subthread_arg_init (&subarg_data, rd_data, 100); + subthread_arg_init (&subarg_ping, rd_ping, 100); + subthread_arg_init (&subarg_pong, rd_pong, 100); + uint32_t (*subthread_func) (void *arg) = 0; + switch (submode) + { + case SM_NONE: break; + case SM_WAITSET: subthread_func = subthread_waitset; break; + case SM_POLLING: subthread_func = subthread_polling; break; + case SM_LISTENER: break; + } + memset (&pubtid, 0, sizeof (pubtid)); + memset (&subtid, 0, sizeof (subtid)); + memset (&subpingtid, 0, sizeof (subpingtid)); + memset (&subpongtid, 0, sizeof (subpongtid)); + if (rate > 0) + ddsrt_thread_create (&pubtid, "pub", &attr, pubthread, NULL); + if (subthread_func != 0) + ddsrt_thread_create (&subtid, "sub", &attr, subthread_func, &subarg_data); + else if (submode == SM_LISTENER) + set_data_available_listener (rd_data, "rd_data", data_available_listener, &subarg_data); + /* Need to handle incoming "pong"s only if we can be sending "ping"s (whether that + be pings from the "ping" mode (i.e. ping_intv != DDS_NEVER), or pings embedded + in the published data stream (i.e. rate > 0 && ping_frac > 0). The trouble with + the first category is that a publication/subscription pair in the same process + would result in a "thread awake nesting" overflow (and otherwise in a stack + overflow) because each sample triggers the next. So in that particular case we + had better create a waitset. */ + const bool pingpong_waitset = (ping_intv != DDS_NEVER && ignorelocal == DDS_IGNORELOCAL_NONE) || pingpongmode == SM_WAITSET; + if (pingpong_waitset) + { + ddsrt_thread_create (&subpingtid, "sub", &attr, subpingthread_waitset, &subarg_pong); + ddsrt_thread_create (&subpongtid, "sub", &attr, subpongthread_waitset, &subarg_pong); + } + else + { + set_data_available_listener (rd_ping, "rd_ping", ping_available_listener, &subarg_ping); + set_data_available_listener (rd_pong, "rd_pong", pong_available_listener, &subarg_pong); + } + + /* Run until time limit reached or a signal received. (The time calculations + ignore the possibility of overflow around the year 2260.) */ + dds_time_t tnow = dds_time (); + const dds_time_t tstart = tnow; + dds_time_t tmatch = (maxwait == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (maxwait * 1e9 + 0.5); + const dds_time_t tstop = (dur == HUGE_VAL) ? DDS_NEVER : tstart + (int64_t) (dur * 1e9 + 0.5); + dds_time_t tnext = tstart + DDS_SECS (1); + dds_time_t tlast = tstart; + dds_time_t tnextping = (ping_intv == DDS_INFINITY) ? DDS_NEVER : (ping_intv == 0) ? tstart + DDS_SECS (1) : tstart + ping_intv; + while (!termflag && tnow < tstop) + { + dds_time_t twakeup = DDS_NEVER; + int32_t nxs; + + /* bail out if too few readers discovered within the deadline */ + if (tnow >= tmatch) + { + bool ok; + ddsrt_mutex_lock (&disc_lock); + ok = (matchcount >= minmatch); + ddsrt_mutex_unlock (&disc_lock); + if (ok) + tmatch = DDS_NEVER; + else + { + /* set minmatch to an impossible value to avoid a match occurring between now and + the determining of the exit status from causing a successful return */ + minmatch = UINT32_MAX; + break; + } + } + + /* sometimes multicast only works one way, thus it makes sense to verify + reader/writer matching takes place within a set amount of time after + discovering the participant. */ + { + struct ppant *pp; + ddsrt_mutex_lock (&disc_lock); + while ((pp = ddsrt_fibheap_min (&ppants_to_match_fhd, &ppants_to_match)) != NULL && pp->tdeadline < tnow) + { + (void) ddsrt_fibheap_extract_min (&ppants_to_match_fhd, &ppants_to_match); + if (pp->unmatched != 0) + { + printf ("[%"PRIdPID"] participant %s:%"PRIu32": failed to match in %.3fs\n", ddsrt_getpid (), pp->hostname, pp->pid, (double) (pp->tdeadline - pp->tdisc) / 1e9); + fflush (stdout); + matchtimeout++; + } + /* keep the participant in the admin so we will never look at it again */ + pp->tdeadline = DDS_NEVER; + } + if (pp && pp->tdeadline < tnext) + { + twakeup = pp->tdeadline; + } + ddsrt_mutex_unlock (&disc_lock); + } + + /* next wakeup should be when the next event occurs */ + if (tnext < twakeup) + twakeup = tnext; + if (tstop < twakeup) + twakeup = tstop; + if (tmatch < twakeup) + twakeup = tmatch; + if (tnextping < twakeup) + twakeup = tnextping; + + if ((nxs = dds_waitset_wait_until (ws, NULL, 0, twakeup)) < 0) + error2 ("dds_waitset_wait_until(main): error %d\n", (int) nxs); + + /* try to print exactly once per second, but do gracefully handle a very late wakeup */ + tnow = dds_time (); + if (tnext <= tnow) + { + print_stats (tstart, tnow, tlast); + tlast = tnow; + if (tnow > tnext + DDS_MSECS (500)) + tnext = tnow + DDS_SECS (1); + else + tnext += DDS_SECS (1); + } + + /* If a "real" ping doesn't result in the expected number of pongs within a reasonable + time, send a new ping to restart the process. This can happen as a result of starting + or stopping a process, as a result of packet loss if best-effort reliability is + selected, or as a result of overwhelming the ping/pong from the data publishing thread + (as the QoS is a simple keep-last-1) */ + if (tnextping <= tnow) + { + maybe_send_new_ping (tnow, &tnextping); + } + } + +#if _WIN32 + signal_handler (SIGINT); +#else + { + /* get the attention of the signal handler thread */ + void (*osigint) (int); + void (*osigterm) (int); + kill (getpid (), SIGTERM); + ddsrt_thread_join (sigtid, NULL); + osigint = signal (SIGINT, SIG_IGN); + osigterm = signal (SIGTERM, SIG_IGN); + sigprocmask (SIG_SETMASK, &osigset, NULL); + signal (SIGINT, osigint); + signal (SIGINT, osigterm); + } +#endif + + if (rate > 0) + ddsrt_thread_join (pubtid, NULL); + if (subthread_func != 0) + ddsrt_thread_join (subtid, NULL); + if (pingpong_waitset) + { + ddsrt_thread_join (subpingtid, NULL); + ddsrt_thread_join (subpongtid, NULL); + } + + /* stop the listeners before deleting the readers: otherwise they may + still try to access a reader that has already become inaccessible + (not quite good, but ...) */ + dds_set_listener (rd_ping, NULL); + dds_set_listener (rd_pong, NULL); + dds_set_listener (rd_data, NULL); + dds_set_listener (rd_participants, NULL); + dds_set_listener (rd_subscriptions, NULL); + dds_set_listener (rd_publications, NULL); + + /* Delete rd_data early to workaround a deadlock deleting a reader + or writer while the receive thread (or a delivery thread) got + stuck trying to write into a reader that hit its resource limits. + + The deadlock is that the deleting of a reader/writer requires + waiting for the DDSI-level entity to be deleted (a multi-stage + GC process), and a "stuck" receive thread prevents the GC from + making progress. + + The fix is to eliminate the waiting and retrying, and instead + flip the reader's state to out-of-sync and rely on retransmits + to let it make progress once room is available again. */ + dds_delete (rd_data); + + uint64_t nlost = 0; + for (uint32_t i = 0; i < eseq_admin.nph; i++) + nlost += eseq_admin.stats[i].nlost; + fini_eseq_admin (&eseq_admin); + subthread_arg_fini (&subarg_data); + subthread_arg_fini (&subarg_ping); + subthread_arg_fini (&subarg_pong); + dds_delete (dp); + ddsrt_mutex_destroy (&disc_lock); + ddsrt_mutex_destroy (&pongwr_lock); + ddsrt_mutex_destroy (&pongstat_lock); + ddsrt_mutex_destroy (&pubstat_lock); + hist_free (pubstat_hist); + + bool ok = true; + + { + ddsrt_avl_iter_t it; + struct ppant *pp; + for (pp = ddsrt_avl_iter_first (&ppants_td, &ppants, &it); pp; pp = ddsrt_avl_iter_next (&it)) + if (pp->unmatched != 0) + { + char buf[256]; + printf ("[%"PRIdPID"] error: %s:%"PRIu32" failed to match %s\n", ddsrt_getpid (), pp->hostname, pp->pid, match_mask_to_string (buf, sizeof (buf), pp->unmatched)); + ok = false; + } + } + if (matchcount < minmatch) + { + printf ("[%"PRIdPID"] error: too few matching participants (%"PRIu32" instead of %"PRIu32")\n", ddsrt_getpid (), matchcount, minmatch); + ok = false; + } + if (nlost > 0 && (reliable && histdepth == 0)) + { + printf ("[%"PRIdPID"] error: %"PRIu64" samples lost\n", ddsrt_getpid (), nlost); + ok = false; + } + return ok ? 0 : 1; +} diff --git a/src/tools/ddsperf/ddsperf_types.idl b/src/tools/ddsperf/ddsperf_types.idl new file mode 100644 index 0000000..806f30d --- /dev/null +++ b/src/tools/ddsperf/ddsperf_types.idl @@ -0,0 +1,29 @@ +struct OneULong +{ + unsigned long seq; +}; +#pragma keylist OneULong + +struct Keyed32 +{ + unsigned long seq; + unsigned long keyval; + octet baggage[24]; +}; +#pragma keylist Keyed32 keyval + +struct Keyed256 +{ + unsigned long seq; + unsigned long keyval; + octet baggage[248]; +}; +#pragma keylist Keyed256 keyval + +struct KeyedSeq +{ + unsigned long seq; + unsigned long keyval; + sequence baggage; +}; +#pragma keylist KeyedSeq keyval