
* Add PropertyPolicy to QoS API for Security settings This commit adds the public API for PropertyQosPolicy including tests. This policy can be used to set the parameters for the DDS security implementation, as an alternative for using the xml configuration. Tests are also inlcuded for setting security properties and conflict resolving when both security configuration and qos properties are present. Finally, the pubsub tool is updated so that is handles this qos correctly. Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Init binary_value.props to fix failing qos merge and moved init code in qset_prop functions Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Add additional test and some validation improvements based on review comments Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Refactoring of qos property Refactored the qos property handling based on review comments. Setting and unsettings functions are simplified and now use helper functions for lookup, property initialisation is simplified. Added an additional check for required security properties when creating participant using security settings from qos, and added a test-case for this code. Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Check for qos flag before getting property index from qos Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Participant creation should fail on inconsistent security qos properties, and some minor code improvements in property qos api functions Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Update log message in test security_config_qos Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Fixed unused label compiler error in q_entity.c when security is disabled Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com> * Refactored qprop functions with macros to avoid code duplicate code Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
2918 lines
110 KiB
C
2918 lines
110 KiB
C
/*
|
|
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
|
|
*
|
|
* This program and the accompanying materials are made available under the
|
|
* terms of the Eclipse Public License v. 2.0 which is available at
|
|
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
|
* v. 1.0 which is available at
|
|
* http://www.eclipse.org/org/documents/edl-v10.php.
|
|
*
|
|
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
|
*/
|
|
#ifdef __APPLE__
|
|
#define USE_EDITLINE 0
|
|
#endif
|
|
|
|
#define _ISOC99_SOURCE
|
|
#include <time.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <stdint.h>
|
|
#include <signal.h>
|
|
#include <limits.h>
|
|
#include <ctype.h>
|
|
#include <sys/types.h>
|
|
#include <errno.h>
|
|
#include <getopt.h>
|
|
|
|
#if USE_EDITLINE
|
|
#include <histedit.h>
|
|
#endif
|
|
|
|
#include "common.h"
|
|
#include "testtype.h"
|
|
#include "tglib.h"
|
|
#include "porting.h"
|
|
|
|
#include "dds/ddsrt/environ.h"
|
|
#include "dds/ddsrt/process.h"
|
|
#include "dds/ddsrt/string.h"
|
|
#include "dds/ddsrt/strtol.h"
|
|
#include "dds/ddsrt/sync.h"
|
|
#include "dds/ddsrt/threads.h"
|
|
|
|
//#define NUMSTR "0123456789"
|
|
//#define HOSTNAMESTR "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-." NUMSTR
|
|
|
|
typedef dds_return_t (*write_oper_t) (dds_entity_t wr, const void *d, const dds_time_t ts);
|
|
|
|
enum topicsel { UNSPEC, KS, K32, K64, K128, K256, OU, ARB };
|
|
enum readermode { MODE_PRINT, MODE_CHECK, MODE_ZEROLOAD, MODE_DUMP, MODE_NONE };
|
|
|
|
#define PM_PID 1u
|
|
#define PM_TOPIC 2u
|
|
#define PM_TIME 4u
|
|
#define PM_IHANDLE 8u
|
|
#define PM_PHANDLE 16u
|
|
#define PM_STIME 32u
|
|
#define PM_DGEN 64u
|
|
#define PM_NWGEN 128u
|
|
#define PM_RANKS 256u
|
|
#define PM_STATE 512u
|
|
|
|
static volatile sig_atomic_t termflag = 0;
|
|
static int flushflag = 0;
|
|
static int pid;
|
|
static dds_entity_t termcond;
|
|
static unsigned nkeyvals = 1;
|
|
static int once_mode = 0;
|
|
static int wait_hist_data = 0;
|
|
static dds_duration_t wait_hist_data_timeout = 0;
|
|
static double dur = 0.0;
|
|
//static int sigpipe[2]; // TODO signal handling support
|
|
//static int termpipe[2];
|
|
//static int fdin = 0; // TODO ARB type support
|
|
static enum tgprint_mode printmode = TGPM_FIELDS;
|
|
static unsigned print_metadata = PM_STATE;
|
|
static int printtype = 0;
|
|
|
|
#define T_SECOND ((int64_t) 1000000000)
|
|
struct tstamp_t {
|
|
int isabs;
|
|
int64_t t;
|
|
};
|
|
|
|
struct readerspec {
|
|
dds_entity_t rd;
|
|
dds_entity_t sub;
|
|
enum topicsel topicsel;
|
|
struct tgtopic *tgtp;
|
|
enum readermode mode;
|
|
int use_take;
|
|
dds_duration_t sleep_ns;
|
|
int polling;
|
|
uint32_t read_maxsamples;
|
|
int print_match_pre_read;
|
|
unsigned idx;
|
|
};
|
|
|
|
enum writermode {
|
|
WRM_NONE,
|
|
WRM_AUTO,
|
|
WRM_INPUT
|
|
};
|
|
|
|
struct writerspec {
|
|
dds_entity_t wr;
|
|
dds_entity_t dupwr;
|
|
dds_entity_t pub;
|
|
enum topicsel topicsel;
|
|
char *tpname;
|
|
struct tgtopic *tgtp;
|
|
double writerate;
|
|
unsigned baggagesize;
|
|
int register_instances;
|
|
int duplicate_writer_flag;
|
|
unsigned burstsize;
|
|
enum writermode mode;
|
|
};
|
|
|
|
static const struct readerspec def_readerspec = {
|
|
.rd = 0,
|
|
.sub = 0,
|
|
.topicsel = UNSPEC,
|
|
.tgtp = NULL,
|
|
.mode = MODE_PRINT,
|
|
.use_take = 1,
|
|
.sleep_ns = 0,
|
|
.polling = 0,
|
|
.read_maxsamples = INT16_MAX,
|
|
.print_match_pre_read = 0
|
|
};
|
|
|
|
static const struct writerspec def_writerspec = {
|
|
.wr = 0,
|
|
.dupwr = 0,
|
|
.pub = 0,
|
|
.topicsel = UNSPEC,
|
|
.tpname = NULL,
|
|
.tgtp = NULL,
|
|
.writerate = 0.0,
|
|
.baggagesize = 0,
|
|
.register_instances = 0,
|
|
.duplicate_writer_flag = 0,
|
|
.burstsize = 1,
|
|
.mode = WRM_INPUT
|
|
};
|
|
|
|
struct wrspeclist {
|
|
struct writerspec *spec;
|
|
struct wrspeclist *prev, *next; /* circular */
|
|
};
|
|
|
|
static ddsrt_mutex_t output_mutex;
|
|
|
|
static void terminate(void) {
|
|
// const char c = 0;
|
|
termflag = 1;
|
|
// os_write(termpipe[1], &c, 1); // TODO: signal handling support; for abstraction layer
|
|
dds_waitset_set_trigger(termcond, true);
|
|
}
|
|
|
|
static void usage(const char *argv0) {
|
|
fprintf (stderr, "\
|
|
usage: %s [OPTIONS] PARTITION...\n\
|
|
\n\
|
|
OPTIONS:\n\
|
|
-T TOPIC set topic name to TOPIC\n\
|
|
specifying a topic name when one has already been given\n\
|
|
introduces a new reader/writer pair\n\
|
|
-K TYPE select type (KS is default),\n\
|
|
(default topic name in parenthesis):\n\
|
|
KS - key, seq, octet sequence (PubSub)\n\
|
|
K32,K64,K128,K256 - key, seq, octet array (PubSub<N>)\n\
|
|
OU - one ulong, keyless (PubSubOU)\n\
|
|
specifying a type when one has already been given introduces\n\
|
|
a new reader/writer pair\n\
|
|
-q FS:QOS set QoS for entities indicated by FS, which must be one or\n\
|
|
more of: t (topic), p (publisher), s (subscriber),\n\
|
|
w (writer), r (reader), or a (all of them). For QoS syntax,\n\
|
|
see below. Inapplicable QoS's are ignored.\n\
|
|
-m [0|p[p]|c[p][:N]|z|d[p]] no reader, print values, check sequence numbers\n\
|
|
(expecting N keys), \"zero-load\" mode or \"dump\" mode (which\n\
|
|
is differs from \"print\" primarily because it uses a data-\n\
|
|
available trigger and reads all samples in read-mode(default:\n\
|
|
p; pp, cp, dp are polling modes); set per-reader\n\
|
|
-D DUR run for DUR seconds\n\
|
|
-n N limit take/read to N samples\n\
|
|
-O take/read once then exit 0 if samples present, or 1 if not\n\
|
|
-P MODES printing control (prefixing with \"no\" disables):\n\
|
|
meta enable printing of all metadata\n\
|
|
trad pid, time, phandle, stime, state\n\
|
|
pid process id of pubsub\n\
|
|
topic which topic (topic is def. for multi-topic)\n\
|
|
time read time relative to program start\n\
|
|
phandle publication handle\n\
|
|
ihandle instance handle\n\
|
|
stime source timestamp\n\
|
|
rtime reception timestamp\n\
|
|
dgen disposed generation count\n\
|
|
nwgen no-writers generation count\n\
|
|
ranks sample, generation, absolute generation ranks\n\
|
|
state instance/sample/view states\n\
|
|
additionally, the following have effect for sample data values:\n\
|
|
dense no additional white space, no field names\n\
|
|
fields field names, some white space\n\
|
|
multiline field names, one field per line\n\
|
|
default is \"nometa,state,fields\".\n\
|
|
For K* types, the .baggage field is omitted from the data output\n\
|
|
-r register instances (-wN mode only)\n\
|
|
-R use 'read' instead of 'take'\n\
|
|
-s MS sleep MS ms after each read/take (default: 0)\n\
|
|
-w F writer mode/input selection, F:\n\
|
|
- stdin (default)\n\
|
|
N cycle through N keys as fast as possible\n\
|
|
N:R*B cycle through N keys at R bursts/second, each burst\n\
|
|
consisting of B samples\n\
|
|
N:R as above, B=1\n\
|
|
no writer is created if -w0 and no writer listener\n\
|
|
automatic specifications can be given per writer; final\n\
|
|
interactive specification determines input used for non-\n\
|
|
automatic ones\n\
|
|
-S EVENTS monitor status events (comma separated; default: none)\n\
|
|
reader (abbreviated and full form):\n\
|
|
pr pre-read (virtual event)\n\
|
|
sl sample-lost\n\
|
|
sr sample-rejected\n\
|
|
lc liveliness-changed\n\
|
|
sm subscription-matched\n\
|
|
riq requested-incompatible-qos\n\
|
|
rdm requested-deadline-missed\n\
|
|
writer:\n\
|
|
ll liveliness-lost\n\
|
|
pm publication-matched\n\
|
|
oiq offered-incompatible-qos\n\
|
|
odm offered-deadline-missed\n\
|
|
-z N topic size (affects KeyedSeq only)\n\
|
|
-@ echo everything on duplicate writer (only for interactive)\n\
|
|
-* N sleep for N seconds just before returning from main()\n\
|
|
\n\
|
|
%s\n\
|
|
Note: defaults above are overridden as follows:\n\
|
|
r:k=all,R=10000/inf/inf\n\
|
|
w:k=all,R=100/inf/inf\n\
|
|
\n\
|
|
Input format is a white-space separated sequence (K* and OU, newline\n\
|
|
separated for ARB) of:\n\
|
|
N write next sample, key value N\n\
|
|
wN synonym for plain N; w@T N same with timestamp of T\n\
|
|
T is absolute of prefixed with \"=\", T currently in seconds\n\
|
|
dN dispose, key value N; d@T N as above\n\
|
|
DN write dispose, key value N; D@T N as above\n\
|
|
uN unregister, key value N; u@T N as above\n\
|
|
rN register, key value N; u@T N as above\n\
|
|
sN sleep for N seconds\n\
|
|
zN set topic size to N (affects KeyedSeq only)\n\
|
|
Note: for K*, OU types, in the above N is always a decimal\n\
|
|
integer (possibly negative); because the OneULong type has no key\n\
|
|
the actual key value is irrelevant. For ARB types, N must be a\n\
|
|
valid initializer. X must always be a list of names.\n\
|
|
\n\
|
|
PARTITION:\n\
|
|
If partition name contains spaces, then wrap it inside quotation marks.\n\
|
|
Use \"\" for default partition.\n",
|
|
argv0, qos_arg_usagestr);
|
|
exit (1);
|
|
}
|
|
|
|
static void expand_append(char **dst, size_t *sz,size_t *pos, char c) {
|
|
if (*pos == *sz) {
|
|
*sz += 1024;
|
|
*dst = dds_realloc(*dst, *sz);
|
|
}
|
|
(*dst)[*pos] = c;
|
|
(*pos)++;
|
|
}
|
|
|
|
static char *expand_envvars(const char *src0);
|
|
|
|
// FIXME: This is the same as the expand function in util. Merge.
|
|
static char *expand_env(const char *name, char op, const char *alt) {
|
|
char *env = NULL;
|
|
ddsrt_getenv(name, &env);
|
|
switch (op) {
|
|
case 0:
|
|
return dds_string_dup(env ? env : "");
|
|
case '-':
|
|
return env ? dds_string_dup(env) : expand_envvars(alt);
|
|
case '?':
|
|
if (env)
|
|
return dds_string_dup(env);
|
|
else {
|
|
char *altx = expand_envvars(alt);
|
|
error_exit("%s: %s\n", name, altx);
|
|
dds_free(altx);
|
|
return NULL;
|
|
}
|
|
case '+':
|
|
return env ? expand_envvars(alt) : dds_string_dup("");
|
|
default:
|
|
exit(2);
|
|
}
|
|
}
|
|
|
|
static char *expand_envbrace(const char **src) {
|
|
const char *start = *src + 1;
|
|
char *name, *x;
|
|
assert(**src == '{');
|
|
(*src)++;
|
|
while (**src && **src != ':' && **src != '}')
|
|
(*src)++;
|
|
if (**src == 0)
|
|
goto err;
|
|
|
|
name = dds_alloc((size_t) (*src - start) + 1);
|
|
memcpy(name, start, (size_t) (*src - start));
|
|
name[*src - start] = 0;
|
|
if (**src == '}') {
|
|
(*src)++;
|
|
x = expand_env(name, 0, NULL);
|
|
dds_free(name);
|
|
return x;
|
|
} else {
|
|
const char *altstart;
|
|
char *alt;
|
|
char op;
|
|
assert(**src == ':');
|
|
(*src)++;
|
|
|
|
switch (**src) {
|
|
case '-': case '+': case '?':
|
|
op = **src;
|
|
(*src)++;
|
|
break;
|
|
default:
|
|
goto err;
|
|
}
|
|
|
|
altstart = *src;
|
|
while (**src && **src != '}') {
|
|
if (**src == '\\') {
|
|
(*src)++;
|
|
if (**src == 0)
|
|
goto err;
|
|
}
|
|
(*src)++;
|
|
}
|
|
if (**src == 0)
|
|
goto err;
|
|
assert(**src == '}');
|
|
alt = dds_alloc((size_t) (*src - altstart) + 1);
|
|
memcpy(alt, altstart, (size_t) (*src - altstart));
|
|
alt[*src - altstart] = 0;
|
|
(*src)++;
|
|
x = expand_env(name, op, alt);
|
|
dds_free(alt);
|
|
dds_free(name);
|
|
return x;
|
|
}
|
|
|
|
err:
|
|
error_exit("%*.*s: invalid expansion\n", (int) (*src - start), (int) (*src - start), start);
|
|
return NULL;
|
|
}
|
|
|
|
static char *expand_envsimple(const char **src) {
|
|
const char *start = *src;
|
|
char *name, *x;
|
|
while (**src && (isalnum((unsigned char)**src) || **src == '_'))
|
|
(*src)++;
|
|
assert(*src > start);
|
|
name = dds_alloc((size_t) (*src - start) + 1);
|
|
memcpy(name, start, (size_t) (*src - start));
|
|
name[*src - start] = 0;
|
|
x = expand_env(name, 0, NULL);
|
|
dds_free(name);
|
|
return x;
|
|
}
|
|
|
|
static char *expand_envchar(const char **src) {
|
|
char name[2];
|
|
assert(**src);
|
|
name[0] = **src;
|
|
name[1] = 0;
|
|
(*src)++;
|
|
return expand_env(name, 0, NULL);
|
|
}
|
|
|
|
static char *expand_envvars(const char *src0) {
|
|
/* Expands $X, ${X}, ${X:-Y}, ${X:+Y}, ${X:?Y} forms */
|
|
const char *src = src0;
|
|
size_t sz = strlen(src) + 1, pos = 0;
|
|
char *dst = dds_alloc(sz);
|
|
while (*src) {
|
|
if (*src == '\\') {
|
|
src++;
|
|
if (*src == 0)
|
|
error_exit("%s: incomplete escape at end of string\n", src0);
|
|
expand_append(&dst, &sz, &pos, *src++);
|
|
} else if (*src == '$') {
|
|
char *x, *xp;
|
|
src++;
|
|
if (*src == 0) {
|
|
error_exit("%s: incomplete variable expansion at end of string\n", src0);
|
|
return NULL;
|
|
} else if (*src == '{') {
|
|
x = expand_envbrace(&src);
|
|
} else if (isalnum((unsigned char)*src) || *src == '_') {
|
|
x = expand_envsimple(&src);
|
|
} else {
|
|
x = expand_envchar(&src);
|
|
}
|
|
xp = x;
|
|
while (*xp)
|
|
expand_append(&dst, &sz, &pos, *xp++);
|
|
dds_free(x);
|
|
} else {
|
|
expand_append(&dst, &sz, &pos, *src++);
|
|
}
|
|
}
|
|
expand_append(&dst, &sz, &pos, 0);
|
|
return dst;
|
|
}
|
|
|
|
static unsigned split_partitions(const char ***p_ps, char **p_bufcopy, const char *buf) {
|
|
const char *b;
|
|
const char **ps;
|
|
char *bufcopy, *bc;
|
|
unsigned i, nps;
|
|
nps = 1;
|
|
for (b = buf; *b; b++) {
|
|
nps += (*b == ',');
|
|
}
|
|
ps = dds_alloc(nps * sizeof(*ps));
|
|
bufcopy = expand_envvars(buf);
|
|
i = 0; bc = bufcopy;
|
|
while (1) {
|
|
ps[i++] = bc;
|
|
while (*bc && *bc != ',') bc++;
|
|
if (*bc == 0) break;
|
|
*bc++ = 0;
|
|
}
|
|
assert(i == nps);
|
|
*p_ps = ps;
|
|
*p_bufcopy = bufcopy;
|
|
return nps;
|
|
}
|
|
|
|
static int set_pub_partition(dds_entity_t pub, const char *buf) {
|
|
const char **ps;
|
|
char *bufcopy;
|
|
unsigned nps = split_partitions(&ps, &bufcopy, buf);
|
|
dds_return_t rc = change_publisher_partitions(pub, nps, ps);
|
|
error_report(rc, "set_pub_partition failed: ");
|
|
dds_free(bufcopy);
|
|
dds_free((char **)ps);
|
|
return 0;
|
|
}
|
|
|
|
#if 0
|
|
static int set_sub_partition(dds_entity_t sub, const char *buf) {
|
|
const char **ps;
|
|
char *bufcopy;
|
|
unsigned nps = split_partitions(&ps, &bufcopy, buf);
|
|
dds_return_t rc = change_subscriber_partitions(sub, nps, ps);
|
|
error_report(rc, "set_partition failed: %s (%d)\n");
|
|
dds_free(bufcopy);
|
|
dds_free(ps);
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
static int read_int(char *buf, int bufsize, int pos, int accept_minus) {
|
|
int c = EOF;
|
|
while (pos < bufsize-1 && (c = getc(stdin)) != EOF && (isdigit((unsigned char) c) || (c == '-' && accept_minus))) {
|
|
accept_minus = 0;
|
|
buf[pos++] = (char) c;
|
|
}
|
|
buf[pos] = 0;
|
|
if (c == EOF || isspace((unsigned char) c)) {
|
|
return (pos > 0);
|
|
} else if (!isdigit((unsigned char) c)) {
|
|
fprintf (stderr, "%c: unexpected character\n", c);
|
|
return 0;
|
|
} else if (pos == bufsize-1) {
|
|
fprintf (stderr, "integer too long\n");
|
|
return 0;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
static int read_int_w_tstamp(struct tstamp_t *tstamp, char *buf, int bufsize, int pos) {
|
|
int c;
|
|
assert(pos < bufsize - 2);
|
|
c = getc(stdin);
|
|
if (c == EOF)
|
|
return 0;
|
|
else if (c == '@') {
|
|
int posoff = 0;
|
|
c = getc(stdin);
|
|
if (c == EOF)
|
|
return 0;
|
|
else if (c == '=')
|
|
tstamp->isabs = 1;
|
|
else {
|
|
buf[pos] = (char) c;
|
|
posoff = 1;
|
|
}
|
|
if (read_int(buf, bufsize, pos + posoff, 1))
|
|
tstamp->t = atoi(buf + pos) * T_SECOND;
|
|
else
|
|
return 0;
|
|
while ((c = getc(stdin)) != EOF && isspace((unsigned char) c))
|
|
;
|
|
if (!isdigit((unsigned char) c))
|
|
return 0;
|
|
}
|
|
buf[pos++] = (char) c;
|
|
while (pos < bufsize-1 && (c = getc(stdin)) != EOF && isdigit((unsigned char) c))
|
|
buf[pos++] = (char) c;
|
|
buf[pos] = 0;
|
|
if (c == EOF || isspace((unsigned char) c))
|
|
return (pos > 0);
|
|
else if (!isdigit((unsigned char) c)) {
|
|
fprintf (stderr, "%c: unexpected character\n", c);
|
|
return 0;
|
|
} else if (pos == bufsize-1) {
|
|
fprintf (stderr, "integer too long\n");
|
|
return 0;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
static int read_value(char *command, int *key, struct tstamp_t *tstamp, char **arg) {
|
|
char buf[1024];
|
|
int c;
|
|
if (*arg) { dds_free(*arg); *arg = NULL; }
|
|
tstamp->isabs = 0;
|
|
tstamp->t = 0;
|
|
do {
|
|
while ((c = getc(stdin)) != EOF && isspace((unsigned char) c))
|
|
;
|
|
if (c == EOF)
|
|
return 0;
|
|
switch (c) {
|
|
case '-':
|
|
case '0': case '1': case '2': case '3': case '4':
|
|
case '5': case '6': case '7': case '8': case '9':
|
|
buf[0] = (char) c;
|
|
if (read_int(buf, sizeof(buf), 1, 0)) {
|
|
*command = 'w';
|
|
*key = atoi(buf);
|
|
return 1;
|
|
}
|
|
break;
|
|
case 'w': case 'd': case 'D': case 'u': case 'r':
|
|
*command = (char) c;
|
|
if (read_int_w_tstamp(tstamp, buf, sizeof(buf), 0)) {
|
|
*key = atoi(buf);
|
|
return 1;
|
|
}
|
|
break;
|
|
case 'z': case 's':
|
|
*command = (char) c;
|
|
if (read_int(buf, sizeof(buf), 0, 0)) {
|
|
*key = atoi(buf);
|
|
return 1;
|
|
}
|
|
break;
|
|
case 'p': case 'S': case ':': case 'Q': {
|
|
int i = 0;
|
|
*command = (char) c;
|
|
while ((c = getc(stdin)) != EOF && !isspace((unsigned char) c)) {
|
|
assert(i < (int) sizeof(buf) - 1);
|
|
buf[i++] = (char) c;
|
|
}
|
|
buf[i] = 0;
|
|
*arg = dds_string_dup(buf);
|
|
ungetc(c, stdin);
|
|
return 1;
|
|
}
|
|
case 'Y': case 'B': case 'E': case 'W':
|
|
*command = (char) c;
|
|
return 1;
|
|
default:
|
|
fprintf (stderr, "'%c': unexpected character\n", c);
|
|
break;
|
|
}
|
|
while ((c = getc(stdin)) != EOF && !isspace((unsigned char) c))
|
|
;
|
|
} while (c != EOF);
|
|
return 0;
|
|
}
|
|
|
|
// TODO Upon support for ARB types, resolve the declaration of fdin
|
|
//static void getl_init_simple(struct getl_arg *arg, int fd) {
|
|
// arg->use_editline = 0;
|
|
// arg->u.s.fd = fd;
|
|
// arg->u.s.lastline = NULL;
|
|
//}
|
|
//
|
|
//static char *getl_simple(int fd, int *count) {
|
|
// size_t sz = 0, n = 0;
|
|
// char *line;
|
|
// int c;
|
|
//
|
|
// if ((c = getc(stdin)) == EOF) {
|
|
// *count = 0;
|
|
// return NULL;
|
|
// }
|
|
//
|
|
// line = NULL;
|
|
// do {
|
|
// if (n == sz) line = dds_realloc(line, sz += 256);
|
|
// line[n++] = (char) c;
|
|
// } while ((c = getc(stdin)) != EOF && c != '\n');
|
|
// if (n == sz) line = dds_realloc(line, sz += 256);
|
|
// line[n++] = 0;
|
|
// *count = (int) (n-1);
|
|
// return line;
|
|
//}
|
|
//
|
|
//struct getl_arg {
|
|
// int use_editline;
|
|
// union {
|
|
//#if USE_EDITLINE
|
|
// struct {
|
|
// FILE *el_fp;
|
|
// EditLine *el;
|
|
// History *hist;
|
|
// HistEvent ev;
|
|
// } el;
|
|
//#endif
|
|
// struct {
|
|
// int fd;
|
|
// char *lastline;
|
|
// } s;
|
|
// } u;
|
|
//};
|
|
|
|
#if USE_EDITLINE
|
|
static int el_getc_wrapper(EditLine *el, char *c) {
|
|
void *fd;
|
|
int in;
|
|
el_get(el, EL_CLIENTDATA, &fd);
|
|
in = fd_getc(*(int *)fd);
|
|
if (in == EOF)
|
|
return 0;
|
|
else {
|
|
*c = (char) in;
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
static const char *prompt(EditLine *el __attribute__ ((unused))) {
|
|
return "";
|
|
}
|
|
|
|
static void getl_init_editline(struct getl_arg *arg, int fd) {
|
|
if (isatty (fdin)) {
|
|
arg->use_editline = 1;
|
|
arg->u.el.el_fp = fdopen(fd, "r");
|
|
arg->u.el.hist = history_init();
|
|
history(arg->u.el.hist, &arg->u.el.ev, H_SETSIZE, 800);
|
|
arg->u.el.el = el_init("pubsub", arg->u.el.el_fp, stdout, stderr);
|
|
el_source(arg->u.el.el, NULL);
|
|
el_set(arg->u.el.el, EL_EDITOR, "emacs");
|
|
el_set(arg->u.el.el, EL_PROMPT, prompt);
|
|
el_set(arg->u.el.el, EL_SIGNAL, 1);
|
|
el_set(arg->u.el.el, EL_CLIENTDATA, &fdin);
|
|
el_set(arg->u.el.el, EL_GETCFN, el_getc_wrapper);
|
|
el_set(arg->u.el.el, EL_HIST, history, arg->u.el.hist);
|
|
} else {
|
|
getl_init_simple(arg, fd);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
// TODO ARB type support
|
|
//static void getl_fini(struct getl_arg *arg) {
|
|
// if (arg->use_editline) {
|
|
//#if USE_EDITLINE
|
|
// el_end(arg->u.el.el);
|
|
// history_end(arg->u.el.hist);
|
|
// fclose(arg->u.el.el_fp);
|
|
//#endif
|
|
// } else {
|
|
// dds_free(arg->u.s.lastline);
|
|
// }
|
|
//}
|
|
//
|
|
//static const char *getl(struct getl_arg *arg, int *count) {
|
|
// if (arg->use_editline) {
|
|
//#if USE_EDITLINE
|
|
// return el_gets(arg->u.el.el, count);
|
|
//#else
|
|
// abort();
|
|
// return NULL;
|
|
//#endif
|
|
// } else {
|
|
// dds_free(arg->u.s.lastline);
|
|
// return arg->u.s.lastline = getl_simple(arg->u.s.fd, count);
|
|
// }
|
|
//}
|
|
//
|
|
//static void getl_enter_hist(struct getl_arg *arg, const char *line) {
|
|
//#if USE_EDITLINE
|
|
// if (arg->use_editline)
|
|
// history(arg->u.el.hist, &arg->u.el.ev, H_ENTER, line);
|
|
//#endif
|
|
//}
|
|
//
|
|
//static char *skipspaces(const char *s) {
|
|
// while (*s && isspace((unsigned char) *s))
|
|
// s++;
|
|
// return (char *) s;
|
|
//}
|
|
|
|
static char si2isc(const dds_sample_info_t *si) {
|
|
switch (si->instance_state) {
|
|
case DDS_IST_ALIVE: return 'A';
|
|
case DDS_IST_NOT_ALIVE_DISPOSED: return 'D';
|
|
case DDS_IST_NOT_ALIVE_NO_WRITERS: return 'U';
|
|
default: return '?';
|
|
}
|
|
}
|
|
|
|
static char si2ssc(const dds_sample_info_t *si) {
|
|
switch (si->sample_state) {
|
|
case DDS_SST_READ: return 'R';
|
|
case DDS_SST_NOT_READ: return 'N';
|
|
default: return '?';
|
|
}
|
|
}
|
|
|
|
static char si2vsc(const dds_sample_info_t *si) {
|
|
switch (si->view_state) {
|
|
case DDS_VST_NEW: return 'N';
|
|
case DDS_VST_OLD: return 'O';
|
|
default: return '?';
|
|
}
|
|
}
|
|
|
|
static int getkeyval_KS(dds_entity_t rd, int32_t *key, dds_instance_handle_t ih) {
|
|
int result;
|
|
KeyedSeq d_key;
|
|
if ((result = dds_instance_get_key(rd, ih, &d_key)) == DDS_RETCODE_OK)
|
|
*key = d_key.keyval;
|
|
else
|
|
*key = 0;
|
|
return result;
|
|
}
|
|
|
|
static int getkeyval_K32(dds_entity_t rd, int32_t *key, dds_instance_handle_t ih) {
|
|
int result = 0;
|
|
Keyed32 d_key;
|
|
if ((result = dds_instance_get_key(rd, ih, &d_key)) == DDS_RETCODE_OK)
|
|
*key = d_key.keyval;
|
|
else
|
|
*key = 0;
|
|
return result;
|
|
}
|
|
|
|
static int getkeyval_K64(dds_entity_t rd, int32_t *key, dds_instance_handle_t ih) {
|
|
int result = 0;
|
|
Keyed64 d_key;
|
|
if ((result = dds_instance_get_key(rd, ih, &d_key)) == DDS_RETCODE_OK)
|
|
*key = d_key.keyval;
|
|
else
|
|
*key = 0;
|
|
return result;
|
|
}
|
|
|
|
static int getkeyval_K128(dds_entity_t rd, int32_t *key, dds_instance_handle_t ih) {
|
|
int result = 0;
|
|
Keyed128 d_key;
|
|
if ((result = dds_instance_get_key(rd, ih, &d_key)) == DDS_RETCODE_OK)
|
|
*key = d_key.keyval;
|
|
else
|
|
*key = 0;
|
|
return result;
|
|
}
|
|
|
|
static int getkeyval_K256(dds_entity_t rd, int32_t *key, dds_instance_handle_t ih) {
|
|
int result = 0;
|
|
Keyed256 d_key;
|
|
if ((result = dds_instance_get_key(rd, ih, &d_key)) == DDS_RETCODE_OK)
|
|
*key = d_key.keyval;
|
|
else
|
|
*key = 0;
|
|
return result;
|
|
}
|
|
|
|
// TODO Determine encoding of dds_instance_handle_t, and see what sort of value can be extracted from it, if any
|
|
//static void instancehandle_to_id(uint32_t *systemId, uint32_t *localId, dds_instance_handle_t h) {
|
|
// /* Undocumented and unsupported trick */
|
|
// union { struct { uint32_t systemId, localId; } s; dds_instance_handle_t h; } u;
|
|
// u.h = h;
|
|
// *systemId = u.s.systemId & ~0x80000000;
|
|
// *localId = u.s.localId;
|
|
//}
|
|
|
|
static void print_sampleinfo(dds_time_t *tstart, dds_time_t tnow, const dds_sample_info_t *si, const char *tag) {
|
|
dds_time_t relt;
|
|
// uint32_t phSystemId, phLocalId, ihSystemId, ihLocalId;
|
|
char isc = si2isc(si), ssc = si2ssc(si), vsc = si2vsc(si);
|
|
const char *sep;
|
|
int n = 0;
|
|
if (*tstart == 0) {
|
|
*tstart = tnow;
|
|
}
|
|
relt = tnow - *tstart;
|
|
// instancehandle_to_id(&ihSystemId, &ihLocalId, si->instance_handle);
|
|
// instancehandle_to_id(&phSystemId, &phLocalId, si->publication_handle);
|
|
if (print_metadata & PM_PID) {
|
|
n += printf ("%d", pid);
|
|
}
|
|
if (print_metadata & PM_TOPIC) {
|
|
n += printf ("%s", tag);
|
|
}
|
|
if (print_metadata & PM_TIME) {
|
|
n += printf ("%s%"PRId64".%09"PRId64, n > 0 ? " " : "", (relt / DDS_NSECS_IN_SEC), (relt % DDS_NSECS_IN_SEC));
|
|
}
|
|
sep = " : ";
|
|
if (print_metadata & PM_PHANDLE) {
|
|
n += printf ("%s%" PRIu64, n > 0 ? sep : "", si->publication_handle);
|
|
sep = " ";
|
|
}
|
|
if (print_metadata & PM_IHANDLE) {
|
|
n += printf ("%s%" PRIu64, n > 0 ? sep : "", si->instance_handle);
|
|
}
|
|
sep = " : ";
|
|
if (print_metadata & PM_STIME) {
|
|
n += printf ("%s%"PRId64".%09"PRId64, n > 0 ? sep : "", (si->source_timestamp/DDS_NSECS_IN_SEC), (si->source_timestamp%DDS_NSECS_IN_SEC));
|
|
}
|
|
sep = " : ";
|
|
if (print_metadata & PM_DGEN) {
|
|
n += printf ("%s%"PRIu32, n > 0 ? sep : "", si->disposed_generation_count);
|
|
sep = " ";
|
|
}
|
|
if (print_metadata & PM_NWGEN) {
|
|
n += printf ("%s%"PRIu32, n > 0 ? sep : "", si->no_writers_generation_count);
|
|
}
|
|
sep = " : ";
|
|
if (print_metadata & PM_RANKS) {
|
|
n += printf ("%s%"PRIu32" %"PRIu32" %"PRIu32, n > 0 ? sep : "", si->sample_rank, si->generation_rank, si->absolute_generation_rank);
|
|
}
|
|
sep = " : ";
|
|
if (print_metadata & PM_STATE) {
|
|
n += printf ("%s%c%c%c", n > 0 ? sep : "", isc, ssc, vsc);
|
|
}
|
|
if (n > 0) {
|
|
printf(" : ");
|
|
}
|
|
}
|
|
|
|
static void print_K(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd, const char *tag, const dds_sample_info_t *si, int32_t keyval, uint32_t seq, int (*getkeyval) (dds_entity_t rd, int32_t *key, dds_instance_handle_t ih)) {
|
|
int result;
|
|
ddsrt_mutex_lock(&output_mutex);
|
|
print_sampleinfo(tstart, tnow, si, tag);
|
|
if (si->valid_data) {
|
|
if(printmode == TGPM_MULTILINE) {
|
|
printf ("{\n%*.*s.seq = %"PRIu32",\n%*.*s.keyval = %"PRId32" }\n", 4, 4, "", seq, 4, 4, "", keyval);
|
|
} else if(printmode == TGPM_DENSE) {
|
|
printf ("{%"PRIu32",%"PRId32"}\n", seq, keyval);
|
|
} else {
|
|
printf ("{ .seq = %"PRIu32", .keyval = %"PRId32" }\n", seq, keyval);
|
|
}
|
|
} else {
|
|
/* May not look at mseq->_buffer[i] but want the key value
|
|
nonetheless. Bummer. Actually this leads to an interesting
|
|
problem: if the instance is in the NOT_ALIVE state and the
|
|
middleware releases all resources related to the instance
|
|
after our taking the sample, get_key_value _will_ fail. So
|
|
the blanket statement "may not look at value" if valid_data
|
|
is not set means you can't really use take ... */
|
|
int32_t d_key;
|
|
if ((result = getkeyval(rd, &d_key, si->instance_handle)) == DDS_RETCODE_OK) {
|
|
if(printmode == TGPM_MULTILINE) {
|
|
printf ("{\n%*.*s.seq = NA,\n%*.*s.keyval = %"PRId32" }\n", 4, 4, "", 4, 4, "", keyval);
|
|
} else if(printmode == TGPM_DENSE) {
|
|
printf ("{NA,%"PRId32"}\n", keyval);
|
|
} else {
|
|
printf ("{ .seq = NA, .keyval = %"PRId32" }\n", keyval);
|
|
}
|
|
} else
|
|
printf ("get_key_value: error (%s)\n", dds_err_str(result));
|
|
}
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
ddsrt_mutex_unlock(&output_mutex);
|
|
}
|
|
|
|
static void print_seq_KS(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd, const char *tag, const dds_sample_info_t *iseq, KeyedSeq **mseq, int count) {
|
|
int i;
|
|
for (i = 0; i < count; i++)
|
|
print_K(tstart, tnow, rd, tag, &iseq[i], mseq[i]->keyval, mseq[i]->seq, getkeyval_KS);
|
|
}
|
|
|
|
static void print_seq_K32(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd, const char *tag, const dds_sample_info_t *iseq, Keyed32 **mseq, int count) {
|
|
int i;
|
|
for (i = 0; i < count; i++)
|
|
print_K(tstart, tnow, rd, tag, &iseq[i], mseq[i]->keyval, mseq[i]->seq, getkeyval_K32);
|
|
}
|
|
|
|
static void print_seq_K64(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd, const char *tag, const dds_sample_info_t *iseq, Keyed64 **mseq, int count) {
|
|
int i;
|
|
for (i = 0; i < count; i++)
|
|
print_K(tstart, tnow, rd, tag, &iseq[i], mseq[i]->keyval, mseq[i]->seq, getkeyval_K64);
|
|
}
|
|
|
|
static void print_seq_K128(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd, const char *tag, const dds_sample_info_t *iseq, Keyed128 **mseq, int count) {
|
|
int i;
|
|
for (i = 0; i < count; i++)
|
|
print_K(tstart, tnow, rd, tag, &iseq[i], mseq[i]->keyval, mseq[i]->seq, getkeyval_K128);
|
|
}
|
|
|
|
static void print_seq_K256(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd, const char *tag, const dds_sample_info_t *iseq, Keyed256 **mseq, int count) {
|
|
int i;
|
|
for (i = 0; i < count; i++)
|
|
print_K(tstart, tnow, rd, tag, &iseq[i], mseq[i]->keyval, mseq[i]->seq, getkeyval_K256);
|
|
}
|
|
|
|
static void print_seq_OU(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd __attribute__ ((unused)), const char *tag, const dds_sample_info_t *si, const OneULong **mseq, int count) {
|
|
int i;
|
|
for (i = 0; i < count; i++)
|
|
{
|
|
ddsrt_mutex_lock(&output_mutex);
|
|
print_sampleinfo(tstart, tnow, si, tag);
|
|
if (si->valid_data) {
|
|
if(printmode == TGPM_MULTILINE) {
|
|
printf ("{\n%*.*s.seq = %"PRIu32" }\n", 4, 4, "", mseq[i]->seq);
|
|
} else if(printmode == TGPM_DENSE) {
|
|
printf ("{%"PRIu32"}\n", mseq[i]->seq);
|
|
} else {
|
|
printf ("{ .seq = %"PRIu32" }\n", mseq[i]->seq);
|
|
}
|
|
} else {
|
|
printf ("NA\n");
|
|
}
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
ddsrt_mutex_unlock(&output_mutex);
|
|
}
|
|
}
|
|
|
|
static void print_seq_ARB(dds_time_t *tstart, dds_time_t tnow, dds_entity_t rd __attribute__ ((unused)), const char *tag, const dds_sample_info_t *iseq, const void **mseq, const struct tgtopic *tgtp) {
|
|
(void)tnow;
|
|
(void)tstart;
|
|
(void)tag;
|
|
(void)iseq;
|
|
(void)mseq;
|
|
(void)tgtp;
|
|
// TODO ARB type support
|
|
// unsigned i;
|
|
// for (i = 0; i < mseq->_length; i++)
|
|
// {
|
|
// dds_sample_info_t const * const si = &iseq->_buffer[i];
|
|
// flockfile(stdout);
|
|
// print_sampleinfo(tstart, tnow, si, tag);
|
|
// if (si->valid_data)
|
|
// tgprint(stdout, tgtp, (char *) mseq->_buffer + i * tgtp->size, printmode);
|
|
// else
|
|
// tgprintkey(stdout, tgtp, (char *) mseq->_buffer + i * tgtp->size, printmode);
|
|
// printf ("\n");
|
|
// funlockfile(stdout);
|
|
// }
|
|
}
|
|
|
|
static void rd_on_liveliness_changed(dds_entity_t rd __attribute__ ((unused)), const dds_liveliness_changed_status_t status, void* arg __attribute__ ((unused))) {
|
|
printf ("[liveliness-changed: alive=(%"PRIu32" change %"PRId32") not_alive=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
|
status.alive_count, status.alive_count_change,
|
|
status.not_alive_count, status.not_alive_count_change,
|
|
status.last_publication_handle);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void rd_on_sample_lost(dds_entity_t rd __attribute__ ((unused)), const dds_sample_lost_status_t status, void* arg __attribute__ ((unused))) {
|
|
printf ("[sample-lost: total=(%"PRIu32" change %"PRId32")]\n", status.total_count, status.total_count_change);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void rd_on_sample_rejected(dds_entity_t rd __attribute__ ((unused)), const dds_sample_rejected_status_t status, void* arg __attribute__ ((unused))) {
|
|
const char *reasonstr = "?";
|
|
switch (status.last_reason) {
|
|
case DDS_NOT_REJECTED: reasonstr = "not_rejected"; break;
|
|
case DDS_REJECTED_BY_INSTANCES_LIMIT: reasonstr = "instances"; break;
|
|
case DDS_REJECTED_BY_SAMPLES_LIMIT: reasonstr = "samples"; break;
|
|
case DDS_REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT: reasonstr = "samples_per_instance"; break;
|
|
}
|
|
printf ("[sample-rejected: total=(%"PRIu32" change %"PRId32") reason=%s handle=%"PRIu64"]\n",
|
|
status.total_count, status.total_count_change,
|
|
reasonstr,
|
|
status.last_instance_handle);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void rd_on_subscription_matched(dds_entity_t rd __attribute__((unused)), const dds_subscription_matched_status_t status, void* arg __attribute__((unused))) {
|
|
printf ("[subscription-matched: total=(%"PRIu32" change %"PRId32") current=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
|
status.total_count, status.total_count_change,
|
|
status.current_count, status.current_count_change,
|
|
status.last_publication_handle);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void rd_on_requested_deadline_missed(dds_entity_t rd __attribute__((unused)), const dds_requested_deadline_missed_status_t status, void* arg __attribute__ ((unused))) {
|
|
printf ("[requested-deadline-missed: total=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
|
status.total_count, status.total_count_change,
|
|
status.last_instance_handle);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static const char *policystr(uint32_t id) {
|
|
switch (id) {
|
|
case DDS_USERDATA_QOS_POLICY_ID: return DDS_USERDATA_QOS_POLICY_NAME;
|
|
case DDS_DURABILITY_QOS_POLICY_ID: return DDS_DURABILITY_QOS_POLICY_NAME;
|
|
case DDS_PRESENTATION_QOS_POLICY_ID: return DDS_PRESENTATION_QOS_POLICY_NAME;
|
|
case DDS_DEADLINE_QOS_POLICY_ID: return DDS_DEADLINE_QOS_POLICY_NAME;
|
|
case DDS_LATENCYBUDGET_QOS_POLICY_ID: return DDS_LATENCYBUDGET_QOS_POLICY_NAME;
|
|
case DDS_OWNERSHIP_QOS_POLICY_ID: return DDS_OWNERSHIP_QOS_POLICY_NAME;
|
|
case DDS_OWNERSHIPSTRENGTH_QOS_POLICY_ID: return DDS_OWNERSHIPSTRENGTH_QOS_POLICY_NAME;
|
|
case DDS_LIVELINESS_QOS_POLICY_ID: return DDS_LIVELINESS_QOS_POLICY_NAME;
|
|
case DDS_TIMEBASEDFILTER_QOS_POLICY_ID: return DDS_TIMEBASEDFILTER_QOS_POLICY_NAME;
|
|
case DDS_PARTITION_QOS_POLICY_ID: return DDS_PARTITION_QOS_POLICY_NAME;
|
|
case DDS_RELIABILITY_QOS_POLICY_ID: return DDS_RELIABILITY_QOS_POLICY_NAME;
|
|
case DDS_DESTINATIONORDER_QOS_POLICY_ID: return DDS_DESTINATIONORDER_QOS_POLICY_NAME;
|
|
case DDS_HISTORY_QOS_POLICY_ID: return DDS_HISTORY_QOS_POLICY_NAME;
|
|
case DDS_RESOURCELIMITS_QOS_POLICY_ID: return DDS_RESOURCELIMITS_QOS_POLICY_NAME;
|
|
case DDS_ENTITYFACTORY_QOS_POLICY_ID: return DDS_ENTITYFACTORY_QOS_POLICY_NAME;
|
|
case DDS_WRITERDATALIFECYCLE_QOS_POLICY_ID: return DDS_WRITERDATALIFECYCLE_QOS_POLICY_NAME;
|
|
case DDS_READERDATALIFECYCLE_QOS_POLICY_ID: return DDS_READERDATALIFECYCLE_QOS_POLICY_NAME;
|
|
case DDS_TOPICDATA_QOS_POLICY_ID: return DDS_TOPICDATA_QOS_POLICY_NAME;
|
|
case DDS_GROUPDATA_QOS_POLICY_ID: return DDS_GROUPDATA_QOS_POLICY_NAME;
|
|
case DDS_TRANSPORTPRIORITY_QOS_POLICY_ID: return DDS_TRANSPORTPRIORITY_QOS_POLICY_NAME;
|
|
case DDS_LIFESPAN_QOS_POLICY_ID: return DDS_LIFESPAN_QOS_POLICY_NAME;
|
|
case DDS_DURABILITYSERVICE_QOS_POLICY_ID: return DDS_DURABILITYSERVICE_QOS_POLICY_NAME;
|
|
case DDS_SUBSCRIPTIONKEY_QOS_POLICY_ID: return DDS_SUBSCRIPTIONKEY_QOS_POLICY_NAME;
|
|
case DDS_VIEWKEY_QOS_POLICY_ID: return DDS_VIEWKEY_QOS_POLICY_NAME;
|
|
case DDS_READERLIFESPAN_QOS_POLICY_ID: return DDS_READERLIFESPAN_QOS_POLICY_NAME;
|
|
case DDS_SHARE_QOS_POLICY_ID: return DDS_SHARE_QOS_POLICY_NAME;
|
|
case DDS_SCHEDULING_QOS_POLICY_ID: return DDS_SCHEDULING_QOS_POLICY_NAME;
|
|
case DDS_PROPERTY_QOS_POLICY_ID: return DDS_PROPERTY_QOS_POLICY_NAME;
|
|
default: return "?";
|
|
}
|
|
}
|
|
|
|
// TODO Decide on whether to work around the lack of DDS_QosPolicyCount, or get rid of this bit.
|
|
//static void format_policies(char *polstr, size_t polsz, const DDS_QosPolicyCount *xs, unsigned nxs) {
|
|
// char *ps = polstr;
|
|
// unsigned i;
|
|
// for (i = 0; i < nxs && ps < polstr + polsz; i++)
|
|
// {
|
|
// const DDS_QosPolicyCount *x = &xs[i];
|
|
// int n = snprintf (ps, polstr + polsz - ps, "%s%s:%d", i == 0 ? "" : ", ", policystr(x->policy_id), x->count);
|
|
// ps += n;
|
|
// }
|
|
//}
|
|
|
|
static void rd_on_requested_incompatible_qos(dds_entity_t rd __attribute__((unused)), const dds_requested_incompatible_qos_status_t status, void* arg __attribute__((unused))) {
|
|
printf ("[requested-incompatible-qos: total=(%"PRIu32" change %"PRId32") last_policy=%s]\n",
|
|
status.total_count, status.total_count_change, policystr(status.last_policy_id));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void wr_on_offered_incompatible_qos(dds_entity_t wr __attribute__((unused)), const dds_offered_incompatible_qos_status_t status, void* arg __attribute__((unused))) {
|
|
printf ("[offered-incompatible-qos: total=(%"PRIu32" change %"PRId32") last_policy=%s]\n",
|
|
status.total_count, status.total_count_change, policystr(status.last_policy_id));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void wr_on_liveliness_lost(dds_entity_t wr __attribute__((unused)), const dds_liveliness_lost_status_t status, void* arg __attribute__ ((unused))) {
|
|
printf ("[liveliness-lost: total=(%"PRIu32" change %"PRId32")]\n",
|
|
status.total_count, status.total_count_change);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void wr_on_offered_deadline_missed(dds_entity_t wr __attribute__((unused)), const dds_offered_deadline_missed_status_t status, void* arg __attribute__((unused))) {
|
|
printf ("[offered-deadline-missed: total=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
|
status.total_count, status.total_count_change, status.last_instance_handle);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static void wr_on_publication_matched(dds_entity_t wr __attribute__((unused)), const dds_publication_matched_status_t status, void* arg __attribute__((unused))) {
|
|
printf ("[publication-matched: total=(%"PRIu32" change %"PRId32") current=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
|
status.total_count, status.total_count_change,
|
|
status.current_count, status.current_count_change,
|
|
status.last_subscription_handle);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
static dds_return_t register_instance_wrapper(dds_entity_t wr, const void *d, const dds_time_t tstamp) {
|
|
dds_instance_handle_t handle;
|
|
(void)tstamp;
|
|
return dds_register_instance(wr, &handle, d);
|
|
}
|
|
|
|
static write_oper_t get_write_oper(char command) {
|
|
switch (command) {
|
|
case 'w': return dds_write_ts;
|
|
case 'd': return dds_dispose_ts;
|
|
case 'D': return dds_writedispose_ts;
|
|
case 'u': return dds_unregister_instance_ts;
|
|
case 'r': return register_instance_wrapper;
|
|
default: return 0;
|
|
}
|
|
}
|
|
|
|
static const char *get_write_operstr(char command) {
|
|
switch (command) {
|
|
case 'w': return "write";
|
|
case 'd': return "dispose";
|
|
case 'D': return "writedispose";
|
|
case 'u': return "unregister_instance";
|
|
case 'r': return "register_instance";
|
|
default: return 0;
|
|
}
|
|
}
|
|
|
|
static void non_data_operation(char command, dds_entity_t wr) {
|
|
dds_return_t rc = 0;
|
|
switch (command) {
|
|
case 'Y':
|
|
printf ("Dispose all: not supported\n");
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
// TODO Implement application side tracking of alive instances for use with a 'dispose all' function
|
|
// if ((result = DDS_Topic_dispose_all_data(DDS_DataWriter_get_topic(wr))) != DDS_RETCODE_OK)
|
|
// error ("DDS_Topic_dispose_all: error %d\n", (int) result);
|
|
break;
|
|
case 'B':
|
|
rc = dds_begin_coherent(wr);
|
|
error_report(rc, "dds_begin_coherent:");
|
|
break;
|
|
case 'E':
|
|
rc = dds_end_coherent(wr);
|
|
error_report(rc, "dds_end_coherent:");
|
|
break;
|
|
case 'W': {
|
|
dds_duration_t inf = DDS_INFINITY;
|
|
rc = dds_wait_for_acks(wr, inf);
|
|
error_report(rc, "dds_wait_for_acks:");
|
|
break;
|
|
}
|
|
default:
|
|
abort();
|
|
}
|
|
}
|
|
|
|
static int accept_error(char command, int retcode) {
|
|
if (retcode == DDS_RETCODE_TIMEOUT)
|
|
return 1;
|
|
if ((command == 'd' || command == 'u') && retcode == DDS_RETCODE_PRECONDITION_NOT_MET)
|
|
return 1;
|
|
return 0;
|
|
}
|
|
|
|
union data {
|
|
uint32_t seq;
|
|
struct { uint32_t seq; int32_t keyval; } seq_keyval;
|
|
KeyedSeq ks;
|
|
Keyed32 k32;
|
|
Keyed64 k64;
|
|
Keyed128 k128;
|
|
Keyed256 k256;
|
|
OneULong ou;
|
|
};
|
|
|
|
static void pub_do_auto(const struct writerspec *spec) {
|
|
int result;
|
|
dds_instance_handle_t *handle = (dds_instance_handle_t*) dds_alloc(sizeof(dds_instance_handle_t)*nkeyvals);
|
|
dds_time_t ntot = 0, tfirst, tlast, tprev, tfirst0, tstop;
|
|
struct hist *hist = hist_new(30, 1000, 0);
|
|
int k = 0;
|
|
union data d;
|
|
memset(&d, 0, sizeof(d));
|
|
|
|
switch (spec->topicsel) {
|
|
case UNSPEC:
|
|
assert(0);
|
|
case KS:
|
|
d.ks.baggage._maximum = d.ks.baggage._length = spec->baggagesize;
|
|
d.ks.baggage._buffer = (uint8_t *) dds_alloc(spec->baggagesize);
|
|
memset(d.ks.baggage._buffer, 0xee, spec->baggagesize);
|
|
break;
|
|
case K32:
|
|
memset(d.k32.baggage, 0xee, sizeof(d.k32.baggage));
|
|
break;
|
|
case K64:
|
|
memset(d.k64.baggage, 0xee, sizeof(d.k64.baggage));
|
|
break;
|
|
case K128:
|
|
memset(d.k128.baggage, 0xee, sizeof(d.k128.baggage));
|
|
break;
|
|
case K256:
|
|
memset(d.k256.baggage, 0xee, sizeof(d.k256.baggage));
|
|
break;
|
|
case OU:
|
|
break;
|
|
case ARB:
|
|
break;
|
|
}
|
|
|
|
for (k = 0; (uint32_t) k < nkeyvals; k++) {
|
|
d.seq_keyval.keyval = k;
|
|
if(spec->register_instances) {
|
|
dds_register_instance(spec->wr, &handle[k], &d);
|
|
}
|
|
}
|
|
|
|
dds_sleepfor(DDS_SECS(1)); // TODO is this sleep necessary?
|
|
d.seq_keyval.keyval = 0;
|
|
tfirst0 = tfirst = tprev = dds_time();
|
|
if (dur != 0.0) {
|
|
dds_duration_t dds_dur = 0;
|
|
(void) double_to_dds_duration(&dds_dur, dur);
|
|
tstop = tfirst0 + dds_dur;
|
|
} else
|
|
tstop = INT64_MAX;
|
|
|
|
if (nkeyvals == 0) {
|
|
while (!termflag && tprev < tstop) {
|
|
dds_sleepfor(DDS_MSECS(100));
|
|
}
|
|
} else if (spec->writerate <= 0) {
|
|
while (!termflag && tprev < tstop) {
|
|
if ((result = dds_write(spec->wr, &d)) != DDS_RETCODE_OK) {
|
|
printf ("write: error %d (%s)\n", (int) result, dds_err_str(result));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
if (result != DDS_RETCODE_TIMEOUT)
|
|
break;
|
|
} else {
|
|
d.seq_keyval.keyval = (d.seq_keyval.keyval + 1) % (int32_t)nkeyvals;
|
|
d.seq++;
|
|
ntot++;
|
|
if ((d.seq % 16) == 0) {
|
|
dds_time_t t = dds_time();
|
|
hist_record(hist, (uint64_t)((t - tprev) / 16), 16);
|
|
if (t < tfirst + DDS_SECS(4)) {
|
|
tprev = t;
|
|
} else {
|
|
tlast = t;
|
|
hist_print(hist, tlast - tfirst, 1);
|
|
tfirst = tprev;
|
|
tprev = dds_time();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
unsigned bi = 0;
|
|
while (!termflag && tprev < tstop) {
|
|
if ((result = dds_write(spec->wr, &d)) != DDS_RETCODE_OK) {
|
|
printf ("write: error %d (%s)\n", (int) result, dds_err_str(result));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
if (result != DDS_RETCODE_TIMEOUT)
|
|
break;
|
|
}
|
|
|
|
{
|
|
dds_time_t t = dds_time();
|
|
d.seq_keyval.keyval = (d.seq_keyval.keyval + 1) % (int32_t)nkeyvals;
|
|
d.seq++;
|
|
ntot++;
|
|
hist_record(hist, (uint64_t)(t - tprev), 1);
|
|
if (t >= tfirst + DDS_SECS(4)) {
|
|
tlast = t;
|
|
hist_print(hist, tlast - tfirst, 1);
|
|
tfirst = tprev;
|
|
t = dds_time();
|
|
}
|
|
if (++bi == spec->burstsize) {
|
|
while (((double)(ntot / spec->burstsize) / ((double)(t - tfirst0) / 1e9 + 5e-3)) > spec->writerate && !termflag) {
|
|
/* FIXME: only doing this manually because batching is not yet implemented properly */
|
|
dds_write_flush(spec->wr);
|
|
dds_sleepfor(DDS_MSECS(10));
|
|
t = dds_time();
|
|
}
|
|
bi = 0;
|
|
}
|
|
tprev = t;
|
|
}
|
|
}
|
|
}
|
|
tlast = dds_time();
|
|
hist_print(hist, tlast - tfirst, 0);
|
|
hist_free(hist);
|
|
printf ("total writes: %" PRId64 " (%e/s)\n", ntot, (double)ntot * 1e9 / (double)(tlast - tfirst0));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
if (spec->topicsel == KS) {
|
|
dds_free(d.ks.baggage._buffer);
|
|
}
|
|
dds_free(handle);
|
|
}
|
|
|
|
static char *pub_do_nonarb(const struct writerspec *spec, uint32_t *seq) {
|
|
struct tstamp_t tstamp_spec = { .isabs = 0, .t = 0 };
|
|
int result;
|
|
union data d;
|
|
char command;
|
|
char *arg = NULL;
|
|
int k = 0;
|
|
memset(&d, 0, sizeof(d));
|
|
switch (spec->topicsel) {
|
|
case UNSPEC:
|
|
assert(0);
|
|
case KS:
|
|
d.ks.baggage._maximum = d.ks.baggage._length = spec->baggagesize;
|
|
d.ks.baggage._buffer = (uint8_t *) dds_alloc(spec->baggagesize);
|
|
memset(d.ks.baggage._buffer, 0xee, spec->baggagesize);
|
|
break;
|
|
case K32:
|
|
memset(d.k32.baggage, 0xee, sizeof(d.k32.baggage));
|
|
break;
|
|
case K64:
|
|
memset(d.k64.baggage, 0xee, sizeof(d.k64.baggage));
|
|
break;
|
|
case K128:
|
|
memset(d.k128.baggage, 0xee, sizeof(d.k128.baggage));
|
|
break;
|
|
case K256:
|
|
memset(d.k256.baggage, 0xee, sizeof(d.k256.baggage));
|
|
break;
|
|
case OU:
|
|
break;
|
|
case ARB:
|
|
break;
|
|
}
|
|
d.seq = *seq;
|
|
command = 0;
|
|
while (command != ':' && read_value(&command, &k, &tstamp_spec, &arg)) {
|
|
d.seq_keyval.keyval = k;
|
|
switch (command) {
|
|
case 'w': case 'd': case 'D': case 'u': case 'r': {
|
|
write_oper_t fn = get_write_oper(command);
|
|
dds_time_t tstamp = 0;
|
|
if (!tstamp_spec.isabs) {
|
|
tstamp = dds_time();
|
|
tstamp_spec.t += tstamp;
|
|
}
|
|
tstamp = (tstamp_spec.t % T_SECOND) + ((int) (tstamp_spec.t / T_SECOND) * DDS_NSECS_IN_SEC);
|
|
if ((result = fn(spec->wr, &d, tstamp)) != DDS_RETCODE_OK) {
|
|
printf ("%s %d: error %d (%s)\n", get_write_operstr(command), k, (int) result, dds_err_str(result));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
if (!accept_error(command, result))
|
|
exit(1);
|
|
}
|
|
/* FIXME: only doing this manually because batching is not yet implemented properly */
|
|
dds_write_flush(spec->wr);
|
|
if (spec->dupwr && (result = fn(spec->dupwr, &d, tstamp)) != DDS_RETCODE_OK) {
|
|
printf ("%s %d(dup): error %d (%s)\n", get_write_operstr(command), k, (int) result, dds_err_str(result));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
if (!accept_error(command, result))
|
|
exit(1);
|
|
}
|
|
if (spec->dupwr) {
|
|
/* FIXME: only doing this manually because batching is not yet implemented properly */
|
|
dds_write_flush(spec->wr);
|
|
}
|
|
d.seq++;
|
|
break;
|
|
}
|
|
case 'z':
|
|
if (spec->topicsel != KS) {
|
|
printf ("payload size cannot be set for selected type\n");
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
} else if (k < 12 && k != 0) {
|
|
printf ("invalid payload size: %d\n", k);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
} else {
|
|
uint32_t baggagesize = (k != 0) ? (uint32_t) (k - 12) : 0;
|
|
if (d.ks.baggage._buffer)
|
|
dds_free (d.ks.baggage._buffer);
|
|
d.ks.baggage._maximum = d.ks.baggage._length = baggagesize;
|
|
d.ks.baggage._buffer = (uint8_t *) dds_alloc(baggagesize);
|
|
memset(d.ks.baggage._buffer, 0xee, d.ks.baggage._length);
|
|
}
|
|
break;
|
|
case 'p':
|
|
set_pub_partition(spec->pub, arg);
|
|
break;
|
|
case 's':
|
|
if (k < 0) {
|
|
printf ("invalid sleep duration: %ds\n", k);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
} else {
|
|
dds_sleepfor(DDS_SECS(k));
|
|
}
|
|
break;
|
|
case 'Q': {
|
|
dds_qos_t *qos = dds_create_qos ();
|
|
setqos_from_args (DDS_KIND_PARTICIPANT, qos, 1, (const char **) &arg);
|
|
dds_set_qos (dp, qos);
|
|
dds_delete_qos (qos);
|
|
break;
|
|
}
|
|
case 'Y': case 'B': case 'E': case 'W':
|
|
non_data_operation(command, spec->wr);
|
|
break;
|
|
case ':':
|
|
break;
|
|
default:
|
|
abort();
|
|
}
|
|
}
|
|
if (spec->topicsel == KS)
|
|
dds_free(d.ks.baggage._buffer);
|
|
*seq = d.seq;
|
|
if (command == ':')
|
|
return arg;
|
|
else {
|
|
dds_free(arg);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
// TODO ARB type support
|
|
//static char *pub_do_arb_line(const struct writerspec *spec, const char *line) {
|
|
// int result;
|
|
// struct tstamp_t tstamp_spec;
|
|
// char *ret = NULL;
|
|
// char command;
|
|
// int k, pos;
|
|
// while (line && *(line = skipspaces(line)) != 0) {
|
|
// tstamp_spec.isabs = 0; tstamp_spec.t = 0;
|
|
// command = 'w';
|
|
// switch (*line) {
|
|
// case 'w': case 'd': case 'D': case 'u': case 'r':
|
|
// command = *line++;
|
|
// if (*line == '@') {
|
|
// if (*++line == '=') { ++line; tstamp_spec.isabs = 1; }
|
|
// tstamp_spec.t = T_SECOND * strtol(line, (char **) &line, 10);
|
|
// }
|
|
// case '{': {
|
|
// write_oper_t fn = get_write_oper(command);
|
|
// void *arb;
|
|
// char *endp;
|
|
// if ((arb = tgscan(spec->tgtp, line, &endp)) == NULL) {
|
|
// line = NULL;
|
|
// } else {
|
|
// dds_time_t tstamp;
|
|
// int diddodup = 0;
|
|
// if (!tstamp_spec.isabs) {
|
|
// DDS_DomainParticipant_get_current_time(dp, &tstamp);
|
|
// tstamp_spec.t += tstamp.sec * T_SECOND + tstamp.nanosec;
|
|
// }
|
|
// tstamp.sec = (int) (tstamp_spec.t / T_SECOND);
|
|
// tstamp.nanosec = (unsigned) (tstamp_spec.t % T_SECOND);
|
|
// line = endp;
|
|
// result = fn(spec->wr, arb, DDS_HANDLE_NIL, &tstamp);
|
|
// if (result == DDS_RETCODE_OK && spec->dupwr) {
|
|
// diddodup = 1;
|
|
// result = fn(spec->dupwr, arb, DDS_HANDLE_NIL, &tstamp);
|
|
// }
|
|
// tgfreedata(spec->tgtp, arb);
|
|
// if (result != DDS_RETCODE_OK) {
|
|
// printf ("%s%s: error %d (%s)\n", get_write_operstr(command), diddodup ? "(dup)" : "", (int) result, dds_err_str(result));
|
|
// if (!accept_error(command, result)) {
|
|
// line = NULL;
|
|
// if (!isatty(fdin))
|
|
// exit(1);
|
|
// break;
|
|
// }
|
|
// }
|
|
// }
|
|
// break;
|
|
// }
|
|
// case 'p':
|
|
// set_pub_partition(DDS_DataWriter_get_publisher(spec->wr), line+1);
|
|
// line = NULL;
|
|
// break;
|
|
// case 's':
|
|
// if (sscanf(line+1, "%d%n", &k, &pos) != 1 || k < 0) {
|
|
// printf ("invalid sleep duration: %ds\n", k);
|
|
// line = NULL;
|
|
// } else {
|
|
// sleep((unsigned) k);
|
|
// line += 1 + pos;
|
|
// }
|
|
// break;
|
|
// case 'Y': case 'B': case 'E': case 'W':
|
|
// non_data_operation(*line, spec->wr);
|
|
// break;
|
|
// case 'S':
|
|
// make_persistent_snapshot(line+1);
|
|
// line = NULL;
|
|
// break;
|
|
// case ':':
|
|
// ret = dds_string_dup(line+1);
|
|
// line = NULL;
|
|
// break;
|
|
// default:
|
|
// printf ("unrecognised command: %s\n", line);
|
|
// line = NULL;
|
|
// break;
|
|
// }
|
|
// }
|
|
// return ret;
|
|
//}
|
|
//
|
|
//static char *pub_do_arb(const struct writerspec *spec, struct getl_arg *getl_arg) {
|
|
// const char *orgline;
|
|
// char *ret = NULL;
|
|
// int count;
|
|
// while (ret == NULL && (orgline = getl(getl_arg, &count)) != NULL) {
|
|
// const char *line = skipspaces(orgline);
|
|
// if (*line) getl_enter_hist(getl_arg, orgline);
|
|
// ret = pub_do_arb_line(spec, line);
|
|
// }
|
|
// return ret;
|
|
//}
|
|
|
|
static uint32_t pubthread_auto(void *vspec) {
|
|
const struct writerspec *spec = vspec;
|
|
assert(spec->topicsel != UNSPEC && spec->topicsel != ARB);
|
|
pub_do_auto(spec);
|
|
return 0;
|
|
}
|
|
|
|
static uint32_t pubthread(void *vwrspecs) {
|
|
struct wrspeclist *wrspecs = vwrspecs;
|
|
uint32_t seq = 0;
|
|
// TODO Upon support for ARB types, resolve the declaration of fdin
|
|
// struct getl_arg getl_arg;
|
|
//#if USE_EDITLINE
|
|
// getl_init_editline(&getl_arg, fdin);
|
|
//#else
|
|
// getl_init_simple(&getl_arg, fdin);
|
|
//#endif
|
|
|
|
struct wrspeclist *cursor = wrspecs;
|
|
struct writerspec *spec = cursor->spec;
|
|
char *nextspec = NULL;
|
|
do {
|
|
if (spec->topicsel != ARB)
|
|
nextspec = pub_do_nonarb(spec, &seq);
|
|
// else
|
|
// nextspec = pub_do_arb(spec, &getl_arg);
|
|
if (nextspec == NULL)
|
|
spec = NULL;
|
|
else
|
|
{
|
|
int cnt, pos;
|
|
char *tmp = nextspec + strlen(nextspec);
|
|
while (tmp > nextspec && isspace((unsigned char)tmp[-1]))
|
|
*--tmp = 0;
|
|
if ((sscanf(nextspec, "+%d%n", &cnt, &pos) == 1 && nextspec[pos] == 0) || ((void)(cnt = 1), strcmp(nextspec, "+") == 0)) {
|
|
while (cnt--) cursor = cursor->next;
|
|
} else if ((sscanf(nextspec, "-%d%n", &cnt, &pos) == 1 && nextspec[pos] == 0) || ((void)(cnt = 1), strcmp(nextspec, "-") == 0)) {
|
|
while (cnt--) cursor = cursor->prev;
|
|
} else if (sscanf(nextspec, "%d%n", &cnt, &pos) == 1 && nextspec[pos] == 0) {
|
|
cursor = wrspecs; while (cnt--) cursor = cursor->next;
|
|
} else {
|
|
struct wrspeclist *endm = cursor, *cand = NULL;
|
|
do {
|
|
if (strncmp (cursor->spec->tpname, nextspec, strlen(nextspec)) == 0) {
|
|
if (cand == NULL)
|
|
cand = cursor;
|
|
else {
|
|
printf ("%s: ambiguous writer specification\n", nextspec);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
cursor = cursor->next;
|
|
} while (cursor != endm);
|
|
if (cand == NULL) {
|
|
printf ("%s: no matching writer specification\n", nextspec);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
} else if (cursor != endm) { /* ambiguous case */
|
|
cursor = endm;
|
|
} else {
|
|
cursor = cand;
|
|
}
|
|
}
|
|
spec = cursor != NULL ? cursor->spec : NULL;
|
|
}
|
|
} while (spec);
|
|
|
|
return 0;
|
|
}
|
|
|
|
struct eseq_admin {
|
|
unsigned nkeys;
|
|
unsigned nph;
|
|
dds_instance_handle_t *ph;
|
|
unsigned **eseq;
|
|
};
|
|
|
|
static void init_eseq_admin(struct eseq_admin *ea, unsigned nkeys) {
|
|
ea->nkeys = nkeys;
|
|
ea->nph = 0;
|
|
ea->ph = NULL;
|
|
ea->eseq = NULL;
|
|
}
|
|
|
|
static void fini_eseq_admin(struct eseq_admin *ea) {
|
|
dds_free(ea->ph);
|
|
for (unsigned i = 0; i < ea->nph; i++)
|
|
dds_free(ea->eseq[i]);
|
|
dds_free(ea->eseq);
|
|
}
|
|
|
|
static int check_eseq(struct eseq_admin *ea, unsigned seq, unsigned keyval, const dds_instance_handle_t pubhandle) {
|
|
unsigned *eseq;
|
|
if (keyval >= ea->nkeys)
|
|
{
|
|
printf ("received key %u >= nkeys %u\n", keyval, ea->nkeys);
|
|
exit(2);
|
|
}
|
|
for (unsigned i = 0; i < ea->nph; i++)
|
|
if (pubhandle == ea->ph[i])
|
|
{
|
|
unsigned e = ea->eseq[i][keyval];
|
|
ea->eseq[i][keyval] = seq + ea->nkeys;
|
|
return seq == e;
|
|
}
|
|
ea->ph = dds_realloc(ea->ph, (ea->nph + 1) * sizeof(*ea->ph));
|
|
ea->ph[ea->nph] = pubhandle;
|
|
ea->eseq = dds_realloc(ea->eseq, (ea->nph + 1) * sizeof(*ea->eseq));
|
|
ea->eseq[ea->nph] = dds_alloc(ea->nkeys * sizeof(*ea->eseq[ea->nph]));
|
|
eseq = ea->eseq[ea->nph];
|
|
for (unsigned i = 0; i < ea->nkeys; i++)
|
|
eseq[i] = seq + (i - keyval) + (i <= keyval ? ea->nkeys : 0);
|
|
ea->nph++;
|
|
return 1;
|
|
}
|
|
|
|
// TODO coherency - Reintroduce this into application logic where needed. dds.h has this, but returns UNSUPPORTED, so expect that for now
|
|
//static int subscriber_needs_access(dds_entity_t sub) {
|
|
// dds_qos_t *qos;
|
|
// int x;
|
|
// if ((qos = dds_create_qos()) == NULL)
|
|
// return DDS_RETCODE_OUT_OF_RESOURCES;
|
|
// dds_qos_get(sub, qos);
|
|
// if (qos == NULL)
|
|
// error ("DDS_Subscriber_get_qos: error\n");
|
|
//
|
|
// dds_presentation_access_scope_kind_t access_scope;
|
|
// bool coherent_access;
|
|
// bool ordered_access;
|
|
// dds_qget_presentation(qos, &access_scope, &coherent_access, &ordered_access);
|
|
// x = (access_scope == DDS_PRESENTATION_GROUP && coherent_access);
|
|
// dds_free(qos);
|
|
// return x;
|
|
//}
|
|
|
|
static uint32_t subthread(void *vspec) {
|
|
const struct readerspec *spec = vspec;
|
|
dds_entity_t rd = spec->rd;
|
|
// TODO coherency support
|
|
// dds_entity_t sub = spec->sub;
|
|
// const int need_access = subscriber_needs_access(sub);
|
|
dds_entity_t ws;
|
|
dds_entity_t rdcondA = 0, rdcondD = 0;
|
|
dds_entity_t stcond = 0;
|
|
dds_return_t rc;
|
|
uintptr_t exitcode = 0;
|
|
char tag[270];
|
|
char tn[256];
|
|
size_t nxs = 0;
|
|
|
|
rc = dds_get_name(dds_get_topic(rd), tn, sizeof(tn));
|
|
error_report(rc, "dds_get_name failed");
|
|
(void)snprintf(tag, sizeof(tag), "[%u:%s]", spec->idx, tn);
|
|
|
|
if (wait_hist_data) {
|
|
rc = dds_reader_wait_for_historical_data(rd, wait_hist_data_timeout);
|
|
error_report(rc, "dds_reader_wait_for_historical_data");
|
|
}
|
|
|
|
ws = dds_create_waitset(dp);
|
|
rc = dds_waitset_attach(ws, termcond, termcond);
|
|
error_abort(rc, "dds_waitset_attach(termcond)");
|
|
nxs++;
|
|
switch (spec->mode) {
|
|
case MODE_NONE:
|
|
case MODE_ZEROLOAD:
|
|
/* no triggers */
|
|
break;
|
|
case MODE_PRINT:
|
|
/* complicated triggers */
|
|
rdcondA = dds_create_readcondition(rd, spec->use_take ? (DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ALIVE_INSTANCE_STATE | DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE)
|
|
: (DDS_NOT_READ_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ALIVE_INSTANCE_STATE | DDS_NOT_ALIVE_NO_WRITERS_INSTANCE_STATE));
|
|
error_abort(rdcondA, "dds_readcondition_create(rdcondA)");
|
|
|
|
rc = dds_waitset_attach(ws, rdcondA, rdcondA);
|
|
error_abort(rc, "dds_waitset_attach(rdcondA)");
|
|
nxs++;
|
|
|
|
rdcondD = dds_create_readcondition(rd, (DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_NOT_ALIVE_DISPOSED_INSTANCE_STATE));
|
|
error_abort(rdcondD, "dds_readcondition_create(rdcondD)");
|
|
|
|
rc = dds_waitset_attach(ws, rdcondD, rdcondD);
|
|
error_abort(rc, "dds_waitset_attach(rdcondD)");
|
|
nxs++;
|
|
break;
|
|
case MODE_CHECK:
|
|
case MODE_DUMP:
|
|
if (!spec->polling) {
|
|
/* fastest trigger we have */
|
|
rc = dds_set_status_mask(rd, DDS_DATA_AVAILABLE_STATUS);
|
|
error_abort(rc, "dds_set_status_mask(stcond)");
|
|
rc = dds_waitset_attach(ws, rd, rd);
|
|
error_abort(rc, "dds_waitset_attach(rd)");
|
|
nxs++;
|
|
}
|
|
break;
|
|
}
|
|
|
|
{
|
|
void **mseq = (void **) dds_alloc(sizeof(void*) * (spec->read_maxsamples));
|
|
|
|
dds_sample_info_t *iseq = (dds_sample_info_t *) dds_alloc(sizeof(dds_sample_info_t) * spec->read_maxsamples);
|
|
dds_attach_t *xs = dds_alloc(sizeof(dds_attach_t) * nxs);
|
|
|
|
dds_time_t tstart = 0, tfirst = 0, tprint = 0;
|
|
long long out_of_seq = 0, nreceived = 0, last_nreceived = 0;
|
|
long long nreceived_bytes = 0, last_nreceived_bytes = 0;
|
|
struct eseq_admin eseq_admin;
|
|
init_eseq_admin(&eseq_admin, nkeyvals);
|
|
|
|
int ii = 0;
|
|
for(ii = 0; ii < (int32_t) spec->read_maxsamples; ii++) {
|
|
mseq[ii] = NULL;
|
|
}
|
|
|
|
while (!termflag && !once_mode) {
|
|
dds_time_t tnow;
|
|
unsigned gi;
|
|
|
|
if (spec->polling) {
|
|
dds_sleepfor(DDS_MSECS(1)); /* 1ms sleep interval, so a bit less than 1kHz poll freq */
|
|
} else {
|
|
rc = dds_waitset_wait(ws, xs, nxs, DDS_INFINITY);
|
|
if (rc < DDS_RETCODE_OK) {
|
|
printf ("wait: error %d\n", (int) rc);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
break;
|
|
} else if (rc == DDS_RETCODE_OK) {
|
|
continue;
|
|
}
|
|
}
|
|
|
|
tnow = dds_time();
|
|
for (gi = 0; gi < (spec->polling ? 1 : nxs); gi++) {
|
|
dds_entity_t cond = !spec->polling && xs[gi] != 0 ? (dds_entity_t) xs[gi] : 0;
|
|
dds_return_t nread;
|
|
int32_t i;
|
|
|
|
if (cond == termcond)
|
|
continue;
|
|
if (cond == 0 && !spec->polling) {
|
|
break;
|
|
}
|
|
|
|
if (spec->print_match_pre_read) {
|
|
dds_subscription_matched_status_t status;
|
|
rc = dds_get_subscription_matched_status(rd, &status);
|
|
error_report(rc, "dds_get_subscription_matched_status failed");
|
|
if (rc == DDS_RETCODE_OK) {
|
|
printf("[pre-read: subscription-matched: total=(%"PRIu32" change %"PRId32") current=(%"PRIu32" change %"PRId32") handle=%"PRIu64"]\n",
|
|
status.total_count, status.total_count_change,
|
|
status.current_count,
|
|
status.current_count_change,
|
|
status.last_publication_handle);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Always take NOT_ALIVE_DISPOSED data because it means the
|
|
instance has reached its end-of-life.
|
|
|
|
NO_WRITERS I usually don't care for (though there certainly
|
|
are situations in which it is useful information). But you
|
|
can't have a NO_WRITERS with invalid_data set:
|
|
|
|
- either the reader contains the instance without data in
|
|
the disposed state, but in that case it stays in the
|
|
NOT_ALIVED_DISPOSED state;
|
|
|
|
- or the reader doesn't have the instance yet, in which
|
|
case the unregister is silently discarded.
|
|
|
|
However, receiving an unregister doesn't turn the sample
|
|
into a NEW one, though. So HOW AM I TO TRIGGER ON IT
|
|
without triggering CONTINUOUSLY?
|
|
*/
|
|
// TODO coherency support
|
|
// if (need_access && (result = DDS_Subscriber_begin_access(sub)) != DDS_RETCODE_OK)
|
|
// error ("DDS_Subscriber_begin_access: %d (%s)\n", (int) result, dds_err_str(result));
|
|
|
|
if (spec->mode == MODE_CHECK || (spec->mode == MODE_DUMP && spec->use_take) || spec->polling) {
|
|
nread = dds_take_mask(rd, mseq, iseq, spec->read_maxsamples, spec->read_maxsamples, DDS_ANY_STATE);
|
|
} else if (spec->mode == MODE_DUMP) {
|
|
nread = dds_read_mask(rd, mseq, iseq, spec->read_maxsamples, spec->read_maxsamples, DDS_ANY_STATE);
|
|
} else if (spec->use_take || cond == rdcondD) {
|
|
nread = dds_take(cond, mseq, iseq, spec->read_maxsamples, spec->read_maxsamples);
|
|
} else {
|
|
nread = dds_read(cond, mseq, iseq, spec->read_maxsamples, spec->read_maxsamples);
|
|
}
|
|
|
|
if (nread < 1) {
|
|
if (spec->polling && nread == 0) {
|
|
; /* expected */
|
|
} else if (spec->mode == MODE_CHECK || spec->mode == MODE_DUMP || spec->polling) {
|
|
printf ("%s: %d (%s) on %s\n", (!spec->use_take && spec->mode == MODE_DUMP) ? "read" : "take", (int) nread, dds_err_str(nread), spec->polling ? "poll" : "stcond");
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
} else {
|
|
printf ("%s: %d (%s) on rdcond%s\n", spec->use_take ? "take" : "read", (int) nread, dds_err_str(nread), (cond == rdcondA) ? "A" : (cond == rdcondD) ? "D" : "?");
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
|
|
// TODO coherency support
|
|
// if (need_access && (result = DDS_Subscriber_end_access(sub)) != DDS_RETCODE_OK)
|
|
// error ("DDS_Subscriber_end_access: %d (%s)\n", (int) result, dds_err_str(result));
|
|
|
|
switch (spec->mode) {
|
|
case MODE_PRINT:
|
|
case MODE_DUMP:
|
|
switch (spec->topicsel) {
|
|
case UNSPEC: assert(0);
|
|
case KS: print_seq_KS(&tstart, tnow, rd, tag, iseq, (KeyedSeq **)mseq, nread); break;
|
|
case K32: print_seq_K32(&tstart, tnow, rd, tag, iseq, (Keyed32 **)mseq, nread); break;
|
|
case K64: print_seq_K64(&tstart, tnow, rd, tag, iseq, (Keyed64 **)mseq, nread); break;
|
|
case K128: print_seq_K128(&tstart, tnow, rd, tag, iseq, (Keyed128 **)mseq, nread); break;
|
|
case K256: print_seq_K256(&tstart, tnow, rd, tag, iseq, (Keyed256 **)mseq, nread); break;
|
|
case OU: print_seq_OU(&tstart, tnow, rd, tag, iseq, (const OneULong **)mseq, nread); break;
|
|
case ARB: print_seq_ARB(&tstart, tnow, rd, tag, iseq, (const void **)mseq, spec->tgtp); break;
|
|
}
|
|
break;
|
|
|
|
case MODE_CHECK:
|
|
for (i = 0; i < nread; i++) {
|
|
int keyval = 0;
|
|
unsigned seq = 0;
|
|
unsigned size = 0;
|
|
if (!iseq[i].valid_data)
|
|
continue;
|
|
switch (spec->topicsel) {
|
|
case UNSPEC: assert(0);
|
|
case KS: { KeyedSeq *d = (KeyedSeq *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 12 + d->baggage._length; } break;
|
|
case K32: { Keyed32 *d = (Keyed32 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 32; } break;
|
|
case K64: { Keyed64 *d = (Keyed64 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 64; } break;
|
|
case K128: { Keyed128 *d = (Keyed128 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 128; } break;
|
|
case K256: { Keyed256 *d = (Keyed256 *) mseq[i]; keyval = d->keyval; seq = d->seq; size = 256; } break;
|
|
case OU: { OneULong *d = (OneULong *) mseq[i]; keyval = 0; seq = d->seq; size = 4; } break;
|
|
case ARB: assert(0); break; /* can't check what we don't know */
|
|
}
|
|
if (!check_eseq(&eseq_admin, seq, (unsigned)keyval, iseq[i].publication_handle))
|
|
out_of_seq++;
|
|
if (nreceived == 0) {
|
|
tfirst = tnow;
|
|
tprint = tfirst;
|
|
}
|
|
nreceived++;
|
|
nreceived_bytes += size;
|
|
if (tnow - tprint >= DDS_SECS(1)) {
|
|
const dds_time_t tdelta_ns = tnow - tfirst;
|
|
const dds_time_t tdelta_s = tdelta_ns / DDS_NSECS_IN_SEC;
|
|
const dds_time_t tdelta_ms = ((tdelta_ns % DDS_NSECS_IN_SEC) + 500000) / DDS_NSECS_IN_MSEC;
|
|
const long long ndelta = nreceived - last_nreceived;
|
|
const double rate_Mbps = (double)(nreceived_bytes - last_nreceived_bytes) * 8 / 1e6;
|
|
printf ("%"PRId64".%03"PRId64" ntot %lld nseq %lld ndelta %lld rate %.2f Mb/s\n",
|
|
tdelta_s, tdelta_ms, nreceived, out_of_seq, ndelta, rate_Mbps);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
last_nreceived = nreceived;
|
|
last_nreceived_bytes = nreceived_bytes;
|
|
tprint = tnow;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case MODE_NONE:
|
|
case MODE_ZEROLOAD:
|
|
break;
|
|
}
|
|
rc = dds_return_loan(rd, mseq, nread);
|
|
error_report(rc, "dds_return_loan failed");
|
|
if (spec->sleep_ns) {
|
|
dds_sleepfor(spec->sleep_ns);
|
|
}
|
|
}
|
|
}
|
|
dds_free(xs);
|
|
|
|
if (spec->mode == MODE_PRINT || spec->mode == MODE_DUMP || once_mode) {
|
|
// TODO coherency support
|
|
// if (need_access && (result = DDS_Subscriber_begin_access (sub)) != DDS_RETCODE_OK)
|
|
// error ("DDS_Subscriber_begin_access: %d (%s)\n", (int) result, dds_err_str (result));
|
|
|
|
/* This is the final Read/Take */
|
|
dds_return_t nread;
|
|
nread = dds_take_mask(rd, mseq, iseq, spec->read_maxsamples, spec->read_maxsamples, DDS_ANY_STATE);
|
|
if (nread == 0) {
|
|
if (!once_mode) {
|
|
printf ("-- final take: data reader empty --\n");
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
} else {
|
|
exitcode = 1;
|
|
}
|
|
} else if (nread < DDS_RETCODE_OK) {
|
|
if (!once_mode) {
|
|
error_report(rc, "-- final take --\n");
|
|
} else {
|
|
error_report(rc, "read/take");
|
|
}
|
|
} else {
|
|
if (!once_mode)
|
|
printf ("-- final contents of data reader --\n");
|
|
if (spec->mode == MODE_PRINT || spec->mode == MODE_DUMP) {
|
|
switch (spec->topicsel) {
|
|
case UNSPEC: assert(0);
|
|
case KS: print_seq_KS(&tstart, dds_time(), rd, tag, iseq, (KeyedSeq **) mseq, nread); break;
|
|
case K32: print_seq_K32(&tstart, dds_time(), rd, tag, iseq, (Keyed32 **) mseq, nread); break;
|
|
case K64: print_seq_K64(&tstart, dds_time(), rd, tag, iseq, (Keyed64 **) mseq, nread); break;
|
|
case K128: print_seq_K128(&tstart, dds_time(), rd, tag, iseq, (Keyed128 **) mseq, nread); break;
|
|
case K256: print_seq_K256(&tstart, dds_time(), rd, tag, iseq, (Keyed256 **) mseq, nread); break;
|
|
case OU: print_seq_OU(&tstart, dds_time(), rd, tag, iseq, (const OneULong **) mseq, nread); break;
|
|
case ARB: print_seq_ARB(&tstart, dds_time(), rd, tag, iseq, (const void **) mseq, spec->tgtp); break;
|
|
}
|
|
}
|
|
}
|
|
// TODO coherency support
|
|
// if (need_access && (result = DDS_Subscriber_end_access(sub)) != DDS_RETCODE_OK)
|
|
// error ("DDS_Subscriber_end_access: %d (%s)\n", (int) result, dds_err_str(result));
|
|
rc = dds_return_loan(rd, mseq, nread);
|
|
error_report(rc, "dds_return_loan failed");
|
|
}
|
|
dds_free(iseq);
|
|
dds_free(mseq);
|
|
if (spec->mode == MODE_CHECK) {
|
|
printf ("received: %lld, out of seq: %lld\n", nreceived, out_of_seq);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
fini_eseq_admin(&eseq_admin);
|
|
}
|
|
|
|
switch (spec->mode) {
|
|
case MODE_NONE:
|
|
case MODE_ZEROLOAD:
|
|
break;
|
|
case MODE_PRINT:
|
|
dds_waitset_detach(ws, rdcondA);
|
|
dds_delete(rdcondA);
|
|
dds_waitset_detach(ws, rdcondD);
|
|
dds_delete(rdcondD);
|
|
break;
|
|
case MODE_CHECK:
|
|
case MODE_DUMP:
|
|
if (!spec->polling)
|
|
dds_waitset_detach(ws, stcond);
|
|
break;
|
|
}
|
|
|
|
// TODO Confirm that dds_delete(participant) takes care of this
|
|
// ret = dds_waitset_detach(ws, termcond);
|
|
// ret = dds_delete(ws);
|
|
|
|
if (once_mode) {
|
|
/* trigger EOF for writer side, so we actually do terminate */
|
|
terminate();
|
|
}
|
|
return (uint32_t)exitcode;
|
|
}
|
|
|
|
static uint32_t autotermthread(void *varg __attribute__((unused))) {
|
|
dds_time_t tstop, tnow;
|
|
dds_return_t rc;
|
|
dds_entity_t ws;
|
|
|
|
dds_attach_t wsresults[1];
|
|
size_t wsresultsize = 1u;
|
|
|
|
assert(dur > 0);
|
|
|
|
tnow = dds_time();
|
|
dds_duration_t dds_dur = 0;
|
|
(void) double_to_dds_duration(&dds_dur, dur);
|
|
tstop = tnow + dds_dur;
|
|
|
|
ws = dds_create_waitset(dp);
|
|
rc = dds_waitset_attach(ws, termcond, termcond);
|
|
error_abort(rc, "dds_waitset_attach(termcomd)");
|
|
|
|
tnow = dds_time();
|
|
while (!termflag && tnow < tstop) {
|
|
dds_time_t dt = tstop - tnow;
|
|
dds_duration_t timeout;
|
|
int64_t xsec = dt / DDS_NSECS_IN_SEC;
|
|
int64_t xnanosec = dt % DDS_NSECS_IN_SEC;
|
|
timeout = DDS_SECS(xsec)+xnanosec;
|
|
|
|
if ((rc = dds_waitset_wait(ws, wsresults, wsresultsize, timeout)) < DDS_RETCODE_OK) {
|
|
printf ("wait: error %s\n", dds_err_str(rc));
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
break;
|
|
}
|
|
tnow = dds_time();
|
|
}
|
|
|
|
dds_waitset_detach(ws, termcond);
|
|
dds_delete(ws);
|
|
return 0;
|
|
}
|
|
|
|
static const char *execname(int argc, char *argv[]) {
|
|
const char *p;
|
|
if (argc == 0 || argv[0] == NULL)
|
|
return "";
|
|
else if ((p = strrchr(argv[0], '/')) != NULL)
|
|
return p + 1;
|
|
else
|
|
return argv[0];
|
|
}
|
|
|
|
static char *read_line_from_textfile(FILE *fp) {
|
|
char *str = NULL;
|
|
size_t sz = 0, n = 0;
|
|
int c;
|
|
while ((c = fgetc(fp)) != EOF && c != '\n') {
|
|
if (n == sz) str = dds_realloc(str, sz += 256);
|
|
str[n++] = (char)c;
|
|
}
|
|
if (c != EOF || n > 0) {
|
|
if (n == sz) str = dds_realloc(str, sz += 1);
|
|
str[n] = 0;
|
|
} else if (ferror(fp)) {
|
|
error_exit("error reading file, errno = %d\n", errno);
|
|
}
|
|
return str;
|
|
}
|
|
|
|
static int get_metadata(char **metadata, char **typename, char **keylist, const char *file) {
|
|
FILE *fp;
|
|
if ((fp = fopen(file, "r")) == NULL)
|
|
error_exit("%s: can't open for reading metadata\n", file);
|
|
*typename = read_line_from_textfile(fp);
|
|
*keylist = read_line_from_textfile(fp);
|
|
*metadata = read_line_from_textfile(fp);
|
|
if (*typename == NULL || *keylist == NULL || *typename == NULL)
|
|
error_exit("%s: invalid metadata file\n", file);
|
|
fclose(fp);
|
|
return 1;
|
|
}
|
|
|
|
static dds_entity_t find_topic(dds_entity_t dpFindTopic, const char *name, const dds_duration_t *timeout) {
|
|
dds_entity_t tp;
|
|
(void)timeout;
|
|
// TODO ARB type support
|
|
// int isbuiltin = 0;
|
|
|
|
/* A historical accident has caused subtle issues with a generic reader for the built-in topics included in the DDS spec. */
|
|
// if (strcmp(name, "DCPSParticipant") == 0 || strcmp(name, "DCPSTopic") == 0 ||
|
|
// strcmp(name, "DCPSSubscription") == 0 || strcmp(name, "DCPSPublication") == 0) {
|
|
// dds_entity_t sub;
|
|
// if ((sub = DDS_DomainParticipant_get_builtin_subscriber(dp)) == NULL)
|
|
// error("DDS_DomainParticipant_get_builtin_subscriber failed\n");
|
|
// if (DDS_Subscriber_lookup_datareader(sub, name) == NULL)
|
|
// error("DDS_Subscriber_lookup_datareader failed\n");
|
|
// if ((result = DDS_Subscriber_delete_contained_entities(sub)) != DDS_RETCODE_OK)
|
|
// error("DDS_Subscriber_delete_contained_entities failed: error %d (%s)\n", (int) result, dds_err_str(result));
|
|
// isbuiltin = 1;
|
|
// }
|
|
|
|
// TODO Note: the implementation for dds_topic_find blocks infinitely if the topic does not exist in the domain
|
|
if (!(tp = dds_find_topic(dpFindTopic, name))) {
|
|
printf ("topic %s not found\n", name);
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
}
|
|
|
|
// if (!isbuiltin) {
|
|
// char *tn = DDS_Topic_get_type_name(tp);
|
|
// char *kl = DDS_Topic_get_keylist(tp);
|
|
// char *md = DDS_Topic_get_metadescription(tp);
|
|
// DDS_ReturnCode_t result;
|
|
// DDS_TypeSupport ts;
|
|
// if ((ts = DDS_TypeSupport__alloc(tn, kl ? kl : "", md)) == NULL)
|
|
// error("DDS_TypeSupport__alloc(%s) failed\n", tn);
|
|
// if ((result = DDS_TypeSupport_register_type(ts, dp, tn)) != DDS_RETCODE_OK)
|
|
// error("DDS_TypeSupport_register_type(%s) failed: %d (%s)\n", tn, (int) result, dds_err_str(result));
|
|
// DDS_free(md);
|
|
// DDS_free(kl);
|
|
// DDS_free(tn);
|
|
// DDS_free(ts);
|
|
//
|
|
// /* Work around a double-free-at-shutdown issue caused by a find_topic
|
|
// without a type support having been register */
|
|
// if ((result = DDS_DomainParticipant_delete_topic(dp, tp)) != DDS_RETCODE_OK) {
|
|
// error("DDS_DomainParticipant_find_topic failed: %d (%s)\n", (int) result, dds_err_str(result));
|
|
// }
|
|
// if ((tp = DDS_DomainParticipant_find_topic(dp, name, timeout)) == NULL) {
|
|
// error("DDS_DomainParticipant_find_topic(2) failed\n");
|
|
// }
|
|
// }
|
|
|
|
return tp;
|
|
}
|
|
|
|
static void set_systemid_env(void) {
|
|
// TODO Determine encoding of dds_instance_handle_t, and see what sort of value can be extracted from it, if any
|
|
// Unsupported
|
|
|
|
/*uint32_t systemId, localId;
|
|
char str[128];
|
|
instancehandle_to_id(&systemId, &localId, DDS_Entity_get_instance_handle(dp));
|
|
snprintf (str, sizeof(str), "%u", systemId);
|
|
setenv("SYSTEMID", str, 1);
|
|
snprintf (str, sizeof(str), "__NODE%08x BUILT-IN PARTITION__", systemId);
|
|
setenv("NODE_BUILTIN_PARTITION", str, 1);*/
|
|
}
|
|
|
|
struct spec {
|
|
dds_entity_t tp;
|
|
dds_entity_t cftp;
|
|
const char *topicname;
|
|
const char *cftp_expr;
|
|
char *metadata;
|
|
char *typename;
|
|
char *keylist;
|
|
dds_duration_t findtopic_timeout;
|
|
struct readerspec rd;
|
|
struct writerspec wr;
|
|
ddsrt_thread_t rdtid;
|
|
ddsrt_thread_t wrtid;
|
|
};
|
|
|
|
static void addspec(unsigned whatfor, unsigned *specsofar, unsigned *specidx, struct spec **spec, int want_reader) {
|
|
if (*specsofar & whatfor)
|
|
{
|
|
struct spec *s;
|
|
(*specidx)++;
|
|
*spec = dds_realloc(*spec, (*specidx + 1) * sizeof(**spec));
|
|
s = &(*spec)[*specidx];
|
|
s->tp = 0;
|
|
s->cftp = 0;
|
|
s->topicname = NULL;
|
|
s->cftp_expr = NULL;
|
|
s->metadata = NULL;
|
|
s->typename = NULL;
|
|
s->keylist = NULL;
|
|
s->findtopic_timeout = 10;
|
|
s->rd = def_readerspec;
|
|
s->wr = def_writerspec;
|
|
|
|
// TODO Upon support for ARB types, resolve the declaration of fdin
|
|
// if (fdin == -1 && fdservsock == -1)
|
|
// s->wr.mode = WRM_NONE;
|
|
if (!want_reader)
|
|
s->rd.mode = MODE_NONE;
|
|
*specsofar = 0;
|
|
}
|
|
*specsofar |= whatfor;
|
|
}
|
|
|
|
static void set_print_mode(const char *modestr) {
|
|
char *copy = dds_string_dup(modestr), *cursor = copy, *tok;
|
|
while ((tok = ddsrt_strsep(&cursor, ",")) != NULL) {
|
|
int enable;
|
|
if (strncmp(tok, "no", 2) == 0) {
|
|
enable = 0; tok += 2;
|
|
} else {
|
|
enable = 1;
|
|
}
|
|
if (strcmp(tok, "type") == 0)
|
|
printtype = enable;
|
|
else if (strcmp(tok, "dense") == 0)
|
|
printmode = TGPM_DENSE;
|
|
else if (strcmp(tok, "space") == 0)
|
|
printmode = TGPM_SPACE;
|
|
else if (strcmp(tok, "fields") == 0)
|
|
printmode = TGPM_FIELDS;
|
|
else if (strcmp(tok, "multiline") == 0)
|
|
printmode = TGPM_MULTILINE;
|
|
else
|
|
{
|
|
static struct { const char *name; unsigned flag; } tab[] = {
|
|
{ "meta", ~0u },
|
|
{ "trad", PM_PID | PM_TIME | PM_PHANDLE | PM_STIME | PM_STATE },
|
|
{ "pid", PM_PID },
|
|
{ "topic", PM_TOPIC },
|
|
{ "time", PM_TIME },
|
|
{ "phandle", PM_PHANDLE },
|
|
{ "ihandle", PM_IHANDLE },
|
|
{ "stime", PM_STIME },
|
|
{ "dgen", PM_DGEN },
|
|
{ "nwgen", PM_NWGEN },
|
|
{ "ranks", PM_RANKS },
|
|
{ "state", PM_STATE }
|
|
};
|
|
size_t i;
|
|
for (i = 0; i < sizeof(tab)/sizeof(tab[0]); i++)
|
|
if (strcmp(tok, tab[i].name) == 0)
|
|
break;
|
|
if (i < sizeof(tab)/sizeof(tab[0])) {
|
|
if (enable)
|
|
print_metadata |= tab[i].flag;
|
|
else
|
|
print_metadata &= ~tab[i].flag;
|
|
} else {
|
|
fprintf (stderr, "-P %s: invalid print mode\n", modestr);
|
|
exit(2);
|
|
}
|
|
}
|
|
}
|
|
dds_free(copy);
|
|
}
|
|
|
|
int main(int argc, char *argv[]) {
|
|
dds_entity_t sub = 0;
|
|
dds_entity_t pub = 0;
|
|
dds_listener_t *rdlistener = dds_create_listener(NULL);
|
|
dds_listener_t *wrlistener = dds_create_listener(NULL);
|
|
|
|
dds_qos_t *qos;
|
|
const char **qtopic = (const char **) dds_alloc(sizeof(char *) * (unsigned)argc);
|
|
const char **qreader = (const char **) dds_alloc(sizeof(char *) * (2+(unsigned)argc));
|
|
const char **qwriter = (const char **) dds_alloc(sizeof(char *) * (2+(unsigned)argc));
|
|
const char **qpublisher = (const char **) dds_alloc(sizeof(char *) * (2+(unsigned)argc));
|
|
const char **qsubscriber = (const char **) dds_alloc(sizeof(char *) * (2+(unsigned)argc));
|
|
int nqtopic = 0, nqreader = 0, nqwriter = 0;
|
|
int nqpublisher = 0, nqsubscriber = 0;
|
|
int opt, pos;
|
|
uintptr_t exitcode = 0;
|
|
int want_reader = 1;
|
|
int want_writer = 1;
|
|
bool isWriterListenerSet = false;
|
|
// int disable_signal_handlers = 0; // TODO signal handler support
|
|
long long sleep_at_end = 0;
|
|
ddsrt_thread_t sigtid;
|
|
ddsrt_thread_t inptid;
|
|
#define SPEC_TOPICSEL 1
|
|
#define SPEC_TOPICNAME 2
|
|
unsigned spec_sofar = 0;
|
|
unsigned specidx = 0;
|
|
unsigned i;
|
|
double wait_for_matching_reader_timeout = 0.0;
|
|
const char *wait_for_matching_reader_arg = NULL;
|
|
struct spec *spec = NULL;
|
|
struct wrspeclist *wrspecs = NULL;
|
|
memset (&sigtid, 0, sizeof(sigtid));
|
|
memset (&inptid, 0, sizeof(inptid));
|
|
ddsrt_mutex_init(&output_mutex);
|
|
|
|
if (ddsrt_strcasecmp(execname(argc, argv), "sub") == 0)
|
|
want_writer = 0;
|
|
else if(ddsrt_strcasecmp(execname(argc, argv), "pub") == 0)
|
|
want_reader = 0;
|
|
|
|
save_argv0 (argv[0]);
|
|
pid = (int) ddsrt_getpid();
|
|
|
|
qreader[0] = "k=all";
|
|
qreader[1] = "R=10000/inf/inf";
|
|
nqreader = 2;
|
|
|
|
qwriter[0] = "k=all";
|
|
qwriter[1] = "R=100/inf/inf";
|
|
nqwriter = 2;
|
|
|
|
spec_sofar = SPEC_TOPICSEL;
|
|
specidx--;
|
|
addspec(SPEC_TOPICSEL, &spec_sofar, &specidx, &spec, want_reader);
|
|
spec_sofar = 0;
|
|
assert(specidx == 0);
|
|
|
|
while ((opt = getopt(argc, argv, "!@*:FK:T:D:q:m:M:n:OP:rRs:S:U:W:w:z:")) != EOF) {
|
|
switch (opt) {
|
|
case '!':
|
|
// disable_signal_handlers = 1; // TODO signal handler support
|
|
break;
|
|
case '@':
|
|
spec[specidx].wr.duplicate_writer_flag = 1;
|
|
break;
|
|
case '*':
|
|
{
|
|
sleep_at_end = 0;
|
|
(void)ddsrt_atoll(optarg, &sleep_at_end);
|
|
}
|
|
break;
|
|
case 'M':
|
|
if (sscanf(optarg, "%lf:%n", &wait_for_matching_reader_timeout, &pos) != 1) {
|
|
fprintf (stderr, "-M %s: invalid timeout\n", optarg);
|
|
exit(2);
|
|
}
|
|
wait_for_matching_reader_arg = optarg + pos;
|
|
break;
|
|
case 'F':
|
|
flushflag = 1;
|
|
break;
|
|
case 'K':
|
|
addspec(SPEC_TOPICSEL, &spec_sofar, &specidx, &spec, want_reader);
|
|
if (ddsrt_strcasecmp(optarg, "KS") == 0)
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = KS;
|
|
else if (ddsrt_strcasecmp(optarg, "K32") == 0)
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = K32;
|
|
else if (ddsrt_strcasecmp(optarg, "K64") == 0)
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = K64;
|
|
else if (ddsrt_strcasecmp(optarg, "K128") == 0)
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = K128;
|
|
else if (ddsrt_strcasecmp(optarg, "K256") == 0)
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = K256;
|
|
else if (ddsrt_strcasecmp(optarg, "OU") == 0)
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = OU;
|
|
else if (ddsrt_strcasecmp(optarg, "ARB") == 0)
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = ARB;
|
|
else if (get_metadata(&spec[specidx].metadata, &spec[specidx].typename, &spec[specidx].keylist, optarg))
|
|
spec[specidx].rd.topicsel = spec[specidx].wr.topicsel = ARB;
|
|
else {
|
|
fprintf (stderr, "-K %s: unknown type\n", optarg);
|
|
exit(2);
|
|
}
|
|
break;
|
|
case 'T': {
|
|
char *p;
|
|
addspec(SPEC_TOPICNAME, &spec_sofar, &specidx, &spec, want_reader);
|
|
spec[specidx].topicname = (const char *) dds_string_dup(optarg);
|
|
if ((p = strchr(spec[specidx].topicname, ':')) != NULL) {
|
|
double d;
|
|
int dpos, have_to = 0;
|
|
*p++ = 0;
|
|
if (strcmp (p, "inf") == 0 || strncmp (p, "inf:", 4) == 0) {
|
|
have_to = 1;
|
|
set_infinite_dds_duration(&spec[specidx].findtopic_timeout);
|
|
} else if (sscanf(p, "%lf%n", &d, &dpos) == 1 && (p[dpos] == 0 || p[dpos] == ':')) {
|
|
if (double_to_dds_duration(&spec[specidx].findtopic_timeout, d) < 0)
|
|
error_exit("-T %s: %s: duration invalid\n", optarg, p);
|
|
have_to = 1;
|
|
} else {
|
|
/* assume content filter */
|
|
}
|
|
if (have_to && (p = strchr(p, ':')) != NULL) {
|
|
p++;
|
|
}
|
|
}
|
|
if (p != NULL) {
|
|
spec[specidx].cftp_expr = p;
|
|
}
|
|
break;
|
|
}
|
|
case 'q':
|
|
if (strncmp(optarg, "provider=", 9) == 0) {
|
|
set_qosprovider(optarg+9);
|
|
} else {
|
|
size_t n = strspn(optarg, "atrwps");
|
|
const char *colon = strchr(optarg, ':');
|
|
if (colon == NULL || n == 0 || n != (size_t) (colon - optarg)) {
|
|
fprintf (stderr, "-q %s: flags indicating to which entities QoS's apply must match regex \"[^atrwps]+:\"\n", optarg);
|
|
exit(2);
|
|
} else {
|
|
const char *q = colon+1;
|
|
for (const char *flag = optarg; flag != colon; flag++)
|
|
switch (*flag) {
|
|
case 't': qtopic[nqtopic++] = q; break;
|
|
case 'r': qreader[nqreader++] = q; break;
|
|
case 'w': qwriter[nqwriter++] = q; break;
|
|
case 'p': qpublisher[nqpublisher++] = q; break;
|
|
case 's': qsubscriber[nqsubscriber++] = q; break;
|
|
case 'a':
|
|
qtopic[nqtopic++] = q;
|
|
qreader[nqreader++] = q;
|
|
qwriter[nqwriter++] = q;
|
|
qpublisher[nqpublisher++] = q;
|
|
qsubscriber[nqsubscriber++] = q;
|
|
break;
|
|
default:
|
|
assert(0);
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
case 'D':
|
|
dur = atof(optarg);
|
|
break;
|
|
case 'm':
|
|
spec[specidx].rd.polling = 0;
|
|
if (strcmp(optarg, "0") == 0) {
|
|
spec[specidx].rd.mode = MODE_NONE;
|
|
} else if (strcmp(optarg, "p") == 0) {
|
|
spec[specidx].rd.mode = MODE_PRINT;
|
|
} else if (strcmp(optarg, "pp") == 0) {
|
|
spec[specidx].rd.mode = MODE_PRINT; spec[specidx].rd.polling = 1;
|
|
} else if (strcmp(optarg, "c") == 0) {
|
|
spec[specidx].rd.mode = MODE_CHECK;
|
|
} else if (sscanf(optarg, "c:%u%n", &nkeyvals, &pos) == 1 && optarg[pos] == 0) {
|
|
spec[specidx].rd.mode = MODE_CHECK;
|
|
} else if (strcmp(optarg, "cp") == 0) {
|
|
spec[specidx].rd.mode = MODE_CHECK; spec[specidx].rd.polling = 1;
|
|
} else if (sscanf(optarg, "cp:%u%n", &nkeyvals, &pos) == 1 && optarg[pos] == 0) {
|
|
spec[specidx].rd.mode = MODE_CHECK; spec[specidx].rd.polling = 1;
|
|
} else if (strcmp(optarg, "z") == 0) {
|
|
spec[specidx].rd.mode = MODE_ZEROLOAD;
|
|
} else if (strcmp(optarg, "d") == 0) {
|
|
spec[specidx].rd.mode = MODE_DUMP;
|
|
} else if (strcmp(optarg, "dp") == 0) {
|
|
spec[specidx].rd.mode = MODE_DUMP; spec[specidx].rd.polling = 1;
|
|
} else {
|
|
fprintf (stderr, "-m %s: invalid mode\n", optarg);
|
|
exit(2);
|
|
}
|
|
break;
|
|
case 'w': {
|
|
int port;
|
|
spec[specidx].wr.writerate = 0.0;
|
|
spec[specidx].wr.burstsize = 1;
|
|
if (strcmp(optarg, "-") == 0) {
|
|
spec[specidx].wr.mode = WRM_INPUT;
|
|
} else if (sscanf(optarg, "%u%n", &nkeyvals, &pos) == 1 && optarg[pos] == 0) {
|
|
spec[specidx].wr.mode = (nkeyvals == 0) ? WRM_NONE : WRM_AUTO;
|
|
} else if (sscanf(optarg, "%u:%lf*%u%n", &nkeyvals, &spec[specidx].wr.writerate, &spec[specidx].wr.burstsize, &pos) == 3
|
|
&& optarg[pos] == 0) {
|
|
spec[specidx].wr.mode = (nkeyvals == 0) ? WRM_NONE : WRM_AUTO;
|
|
} else if (sscanf(optarg, "%u:%lf%n", &nkeyvals, &spec[specidx].wr.writerate, &pos) == 2
|
|
&& optarg[pos] == 0) {
|
|
spec[specidx].wr.mode = (nkeyvals == 0) ? WRM_NONE : WRM_AUTO;
|
|
} else if (sscanf(optarg, ":%d%n", &port, &pos) == 1 && optarg[pos] == 0) {
|
|
fprintf (stderr, "listen on TCP port P: not supported\n");
|
|
exit(1);
|
|
} else {
|
|
spec[specidx].wr.mode = WRM_INPUT;
|
|
fprintf (stderr, "%s: can't open\n", optarg);
|
|
exit(1);
|
|
}
|
|
break;
|
|
}
|
|
case 'n':
|
|
spec[specidx].rd.read_maxsamples = (uint32_t)atoi(optarg);
|
|
break;
|
|
case 'O':
|
|
once_mode = 1;
|
|
break;
|
|
case 'P':
|
|
set_print_mode(optarg);
|
|
break;
|
|
case 'R':
|
|
spec[specidx].rd.use_take = 0;
|
|
break;
|
|
case 'r':
|
|
spec[specidx].wr.register_instances = 1;
|
|
break;
|
|
case 's':
|
|
spec[specidx].rd.sleep_ns = DDS_MSECS((int64_t) atoi(optarg));
|
|
break;
|
|
case 'W': {
|
|
double t;
|
|
wait_hist_data = 1;
|
|
if (strcmp(optarg, "inf") == 0)
|
|
set_infinite_dds_duration(&wait_hist_data_timeout);
|
|
else if (sscanf(optarg, "%lf%n", &t, &pos) == 1 && optarg[pos] == 0 && t >= 0)
|
|
double_to_dds_duration(&wait_hist_data_timeout, t);
|
|
else {
|
|
fprintf (stderr, "-W %s: invalid duration\n", optarg);
|
|
exit(2);
|
|
}
|
|
}
|
|
break;
|
|
case 'S': {
|
|
char *copy = dds_string_dup(optarg), *tok, *cursor = copy;
|
|
if (copy == NULL)
|
|
abort();
|
|
tok = ddsrt_strsep(&cursor, ",");
|
|
while (tok) {
|
|
if (strcmp(tok, "pr") == 0 || strcmp(tok, "pre-read") == 0)
|
|
spec[specidx].rd.print_match_pre_read = 1;
|
|
else if (strcmp(tok, "sl") == 0 || strcmp(tok, "sample-lost") == 0)
|
|
dds_lset_sample_lost(rdlistener, rd_on_sample_lost);
|
|
else if (strcmp(tok, "sr") == 0 || strcmp(tok, "sample-rejected") == 0)
|
|
dds_lset_sample_rejected(rdlistener, rd_on_sample_rejected);
|
|
else if (strcmp(tok, "lc") == 0 || strcmp(tok, "liveliness-changed") == 0)
|
|
dds_lset_liveliness_changed(rdlistener, rd_on_liveliness_changed);
|
|
else if (strcmp(tok, "sm") == 0 || strcmp(tok, "subscription-matched") == 0)
|
|
dds_lset_subscription_matched(rdlistener, rd_on_subscription_matched);
|
|
else if (strcmp(tok, "ll") == 0 || strcmp(tok, "liveliness-lost") == 0) {
|
|
dds_lset_liveliness_lost(wrlistener, wr_on_liveliness_lost);
|
|
isWriterListenerSet = true;
|
|
} else if (strcmp(tok, "odm") == 0 || strcmp(tok, "offered-deadline-missed") == 0) {
|
|
dds_lset_offered_deadline_missed(wrlistener, wr_on_offered_deadline_missed);
|
|
isWriterListenerSet = true;
|
|
} else if (strcmp(tok, "pm") == 0 || strcmp(tok, "publication-matched") == 0) {
|
|
dds_lset_publication_matched(wrlistener, wr_on_publication_matched);
|
|
isWriterListenerSet = true;
|
|
} else if (strcmp(tok, "rdm") == 0 || strcmp(tok, "requested-deadline-missed") == 0)
|
|
dds_lset_requested_deadline_missed(rdlistener, rd_on_requested_deadline_missed);
|
|
else if (strcmp(tok, "riq") == 0 || strcmp(tok, "requested-incompatible-qos") == 0)
|
|
dds_lset_requested_incompatible_qos(rdlistener, rd_on_requested_incompatible_qos);
|
|
else if (strcmp(tok, "oiq") == 0 || strcmp(tok, "offered-incompatible-qos") == 0) {
|
|
dds_lset_offered_incompatible_qos(wrlistener, wr_on_offered_incompatible_qos);
|
|
isWriterListenerSet = true;
|
|
} else {
|
|
fprintf (stderr, "-S %s: invalid event\n", tok);
|
|
exit(2);
|
|
}
|
|
tok = ddsrt_strsep(&cursor, ",");
|
|
}
|
|
dds_free(copy);
|
|
}
|
|
break;
|
|
case 'z': {
|
|
/* payload is int32 int32 seq<octet>, which we count as 16+N,
|
|
for a 4 byte sequence length */
|
|
int tmp = atoi(optarg);
|
|
if (tmp != 0 && tmp < 12) {
|
|
fprintf (stderr, "-z %s: minimum is 12\n", optarg);
|
|
exit(1);
|
|
} else if (tmp == 0)
|
|
spec[specidx].wr.baggagesize = 0;
|
|
else
|
|
spec[specidx].wr.baggagesize = (unsigned) (tmp - 12);
|
|
break;
|
|
}
|
|
default:
|
|
usage(argv[0]);
|
|
}
|
|
}
|
|
|
|
if (argc - optind < 1) {
|
|
usage(argv[0]);
|
|
}
|
|
|
|
for (i = 0; i <= specidx; i++) {
|
|
assert(spec[i].rd.topicsel == spec[i].wr.topicsel);
|
|
|
|
if (spec[i].rd.topicsel == UNSPEC)
|
|
spec[i].rd.topicsel = spec[i].wr.topicsel = KS;
|
|
|
|
if (spec[i].topicname == NULL) {
|
|
switch (spec[i].rd.topicsel) {
|
|
case UNSPEC: assert(0);
|
|
case KS: spec[i].topicname = dds_string_dup("PubSub"); break;
|
|
case K32: spec[i].topicname = dds_string_dup("PubSub32"); break;
|
|
case K64: spec[i].topicname = dds_string_dup("PubSub64"); break;
|
|
case K128: spec[i].topicname = dds_string_dup("PubSub128"); break;
|
|
case K256: spec[i].topicname = dds_string_dup("PubSub256"); break;
|
|
case OU: spec[i].topicname = dds_string_dup("PubSubOU"); break;
|
|
case ARB: error_exit("-K ARB requires specifying a topic name\n"); break;
|
|
}
|
|
assert(spec[i].topicname != NULL);
|
|
}
|
|
assert(spec[i].rd.topicsel != UNSPEC && spec[i].rd.topicsel == spec[i].wr.topicsel);
|
|
}
|
|
|
|
if (!isWriterListenerSet) {
|
|
want_writer = 0;
|
|
want_reader = 0;
|
|
for (i = 0; i <= specidx; i++) {
|
|
if (spec[i].rd.mode != MODE_NONE)
|
|
want_reader = 1;
|
|
switch(spec[i].wr.mode) {
|
|
case WRM_NONE:
|
|
break;
|
|
case WRM_AUTO:
|
|
want_writer = 1;
|
|
if (spec[i].wr.topicsel == ARB)
|
|
error_exit("auto-write mode requires non-ARB topic\n");
|
|
break;
|
|
case WRM_INPUT:
|
|
want_writer = 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
for (i = 0; i <= specidx; i++) {
|
|
if (spec[i].rd.topicsel == OU) {
|
|
/* by definition only 1 instance for OneULong type */
|
|
nkeyvals = 1;
|
|
if (spec[i].rd.topicsel == ARB) {
|
|
// TODO ARB type support
|
|
// if (((spec[i].rd.mode != MODE_PRINT || spec[i].rd.mode != MODE_DUMP) && spec[i].rd.mode != MODE_NONE) || (fdin == -1 && fdservsock == -1))
|
|
// error("-K ARB requires readers in PRINT or DUMP mode and writers in interactive mode\n");
|
|
// if (nqtopic != 0 && spec[i].metadata == NULL)
|
|
// error("-K ARB disallows specifying topic QoS when using find_topic\n");
|
|
}
|
|
}
|
|
if (spec[i].rd.mode == MODE_ZEROLOAD)
|
|
{
|
|
/* need to change to keep-last-1 (unless overridden by user) */
|
|
qreader[0] = "k=1";
|
|
}
|
|
}
|
|
|
|
common_init(argv[0]);
|
|
set_systemid_env();
|
|
dds_write_set_batch(true); // FIXME: hack (the global batching flag is a hack anyway)
|
|
|
|
{
|
|
char **ps = (char **) dds_alloc(sizeof(char *) * (uint32_t)(argc - optind));
|
|
for (i = 0; i < (unsigned) (argc - optind); i++)
|
|
ps[i] = expand_envvars(argv[(unsigned) optind + i]);
|
|
if (want_reader) {
|
|
qos = dds_create_qos();
|
|
setqos_from_args(DDS_KIND_SUBSCRIBER, qos, nqsubscriber, qsubscriber);
|
|
sub = new_subscriber(qos, (unsigned) (argc - optind), (const char **) ps);
|
|
dds_delete_qos(qos);
|
|
}
|
|
if (want_writer) {
|
|
qos = dds_create_qos();
|
|
setqos_from_args(DDS_KIND_PUBLISHER, qos, nqpublisher, qpublisher);
|
|
pub = new_publisher(qos, (unsigned) (argc - optind), (const char **) ps);
|
|
dds_delete_qos(qos);
|
|
}
|
|
for (i = 0; i < (unsigned) (argc - optind); i++)
|
|
dds_free(ps[i]);
|
|
dds_free(ps);
|
|
}
|
|
|
|
|
|
for (i = 0; i <= specidx; i++) {
|
|
qos = new_tqos();
|
|
setqos_from_args(DDS_KIND_TOPIC, qos, nqtopic, qtopic);
|
|
switch (spec[i].rd.topicsel) {
|
|
case UNSPEC: assert(0); break;
|
|
case KS: spec[i].tp = new_topic(spec[i].topicname, ts_KeyedSeq, qos); break;
|
|
case K32: spec[i].tp = new_topic(spec[i].topicname, ts_Keyed32, qos); break;
|
|
case K64: spec[i].tp = new_topic(spec[i].topicname, ts_Keyed64, qos); break;
|
|
case K128: spec[i].tp = new_topic(spec[i].topicname, ts_Keyed128, qos); break;
|
|
case K256: spec[i].tp = new_topic(spec[i].topicname, ts_Keyed256, qos); break;
|
|
case OU: spec[i].tp = new_topic(spec[i].topicname, ts_OneULong, qos); break;
|
|
case ARB:
|
|
// TODO ARB type support
|
|
error_exit("Currently doesn't support ARB type\n");
|
|
if (spec[i].metadata == NULL) {
|
|
if (!(spec[i].tp = find_topic(dp, spec[i].topicname, &spec[i].findtopic_timeout)))
|
|
error_exit("topic %s not found\n", spec[i].topicname);
|
|
} else {
|
|
// const dds_topic_descriptor_t* ts = dds_topic_descriptor_create(spec[i].typename, spec[i].keylist, spec[i].metadata); //Todo: Not available in cham dds.h
|
|
const dds_topic_descriptor_t* ts = NULL;
|
|
if(ts == NULL)
|
|
error_exit("dds_topic_descriptor_create(%s) failed\n",spec[i].typename);
|
|
spec[i].tp = new_topic(spec[i].topicname, ts, qos);
|
|
// dds_topic_descriptor_delete((dds_topic_descriptor_t*) ts);
|
|
}
|
|
// spec[i].rd.tgtp = spec[i].wr.tgtp = tgnew(spec[i].tp, printtype);
|
|
break;
|
|
}
|
|
assert(spec[i].tp);
|
|
// assert(spec[i].rd.topicsel != ARB || spec[i].rd.tgtp != NULL);
|
|
// assert(spec[i].wr.topicsel != ARB || spec[i].wr.tgtp != NULL);
|
|
dds_delete_qos(qos);
|
|
|
|
if (spec[i].cftp_expr == NULL)
|
|
spec[i].cftp = spec[i].tp;
|
|
else {
|
|
fprintf (stderr,"C99 API doesn't support the creation of content filtered topic.\n");
|
|
spec[i].cftp = spec[i].tp;
|
|
// TODO Content Filtered Topic support
|
|
// char name[40], *expr = expand_envvars(spec[i].cftp_expr);
|
|
// DDS_StringSeq *params = DDS_StringSeq__alloc();
|
|
// snprintf (name, sizeof (name), "cft%u", i);
|
|
// if ((spec[i].cftp = DDS_DomainParticipant_create_contentfilteredtopic(dp, name, spec[i].tp, expr, params)) == NULL)
|
|
// error("DDS_DomainParticipant_create_contentfiltered_topic failed\n");
|
|
// DDS_free(params);
|
|
// free(expr);
|
|
}
|
|
|
|
if (spec[i].rd.mode != MODE_NONE) {
|
|
qos = new_rdqos(spec[i].cftp);
|
|
setqos_from_args(DDS_KIND_READER, qos, nqreader, qreader);
|
|
spec[i].rd.rd = new_datareader_listener(sub, spec[i].cftp, qos, rdlistener);
|
|
spec[i].rd.sub = sub;
|
|
dds_delete_qos(qos);
|
|
}
|
|
|
|
if (spec[i].wr.mode != WRM_NONE) {
|
|
qos = new_wrqos(spec[i].tp);
|
|
setqos_from_args(DDS_KIND_WRITER, qos, nqwriter, qwriter);
|
|
spec[i].wr.wr = new_datawriter_listener(pub, spec[i].tp, qos, wrlistener);
|
|
spec[i].wr.pub = pub;
|
|
if (spec[i].wr.duplicate_writer_flag) {
|
|
spec[i].wr.dupwr = dds_create_writer(pub, spec[i].tp, qos, NULL);
|
|
error_abort(spec[i].wr.dupwr, "dds_writer_create failed");
|
|
}
|
|
dds_delete_qos(qos);
|
|
}
|
|
}
|
|
|
|
if (want_writer && wait_for_matching_reader_arg) {
|
|
printf("Wait for matching reader: unsupported\n");
|
|
if (flushflag) {
|
|
fflush (stdout);
|
|
}
|
|
// TODO Reimplement wait_for_matching_reader functionality via wait on status subscription matched
|
|
// struct qos *q = NULL;
|
|
// uint64_t tnow = dds_time();
|
|
// uint64_t tend = tnow + (uint64_t) (wait_for_matching_reader_timeout >= 0 ? (wait_for_matching_reader_timeout * 1e9 + 0.5) : 0);
|
|
// DDS_InstanceHandleSeq *sh = DDS_InstanceHandleSeq__alloc();
|
|
// dds_instance_handle_t pphandle;
|
|
// DDS_ReturnCode_t ret;
|
|
// DDS_ParticipantBuiltinTopicData *ppdata = DDS_ParticipantBuiltinTopicData__alloc();
|
|
// const DDS_UserDataQosPolicy *udqos;
|
|
// unsigned m;
|
|
// if ((pphandle = DDS_DomainParticipant_get_instance_handle(dp)) == 0)
|
|
// error("DDS_DomainParticipant_get_instance_handle failed\n");
|
|
// if ((ret = DDS_DomainParticipant_get_discovered_participant_data(dp, ppdata, pphandle)) != DDS_RETCODE_OK)
|
|
// error("DDS_DomainParticipant_get_discovered_participant_data failed: %d (%s)\n", (int) ret, dds_err_str(ret));
|
|
// q = new_wrqos(pub, spec[0].tp);
|
|
// qos_user_data(q, wait_for_matching_reader_arg);
|
|
// udqos = &qos_datawriter(q)->user_data;
|
|
// do {
|
|
// for (i = 0, m = specidx + 1; i <= specidx; i++) {
|
|
// if (spec[i].wr.mode == WM_NONE)
|
|
// --m;
|
|
// else if ((ret = DDS_DataWriter_get_matched_subscriptions(spec[i].wr.wr, sh)) != DDS_RETCODE_OK)
|
|
// error("DDS_DataWriter_get_matched_subscriptions failed: %d (%s)\n", (int) ret, dds_err_str(ret));
|
|
// else {
|
|
// unsigned j;
|
|
// for(j = 0; j < sh->_length; j++) {
|
|
// DDS_SubscriptionBuiltinTopicData *d = DDS_SubscriptionBuiltinTopicData__alloc();
|
|
// if ((ret = DDS_DataWriter_get_matched_subscription_data(spec[i].wr.wr, d, sh->_buffer[j])) != DDS_RETCODE_OK)
|
|
// error("DDS_DataWriter_get_matched_subscription_data(wr %u ih %llx) failed: %d (%s)\n", specidx, sh->_buffer[j], (int) ret, dds_err_str(ret));
|
|
// if (memcmp(d->participant_key, ppdata->key, sizeof(ppdata->key)) != 0 &&
|
|
// d->user_data.value._length == udqos->value._length &&
|
|
// (d->user_data.value._length == 0 || memcmp(d->user_data.value._buffer, udqos->value._buffer, udqos->value._length) == 0)) {
|
|
// --m;
|
|
// DDS_free(d);
|
|
// break;
|
|
// }
|
|
// DDS_free(d);
|
|
// }
|
|
// }
|
|
// }
|
|
// tnow = dds_time();
|
|
// if (m != 0 && tnow < tend) {
|
|
// uint64_t tdelta = (tend-tnow) < T_SECOND/10 ? tend-tnow : T_SECOND/10;
|
|
// os_time delay = { (os_timeSec) (tdelta / T_SECOND), (os_int32) (tdelta % T_SECOND)};
|
|
// os_nanoSleep(delay);
|
|
// tnow = dds_time();
|
|
// }
|
|
// } while(m != 0 && tnow < tend);
|
|
// free_qos(q);
|
|
// DDS_free(ppdata);
|
|
// DDS_free(sh);
|
|
// if (m != 0)
|
|
// error("timed out waiting for matching subscriptions\n");
|
|
}
|
|
|
|
termcond = dds_create_waitset(dp); // Waitset serves as GuardCondition here.
|
|
error_abort(termcond, "dds_create_waitset failed");
|
|
|
|
ddsrt_threadattr_t attr;
|
|
ddsrt_threadattr_init(&attr);
|
|
dds_return_t osres;
|
|
|
|
if (want_writer) {
|
|
for (i = 0; i <= specidx; i++) {
|
|
struct wrspeclist *wsl;
|
|
switch (spec[i].wr.mode) {
|
|
case WRM_NONE:
|
|
break;
|
|
case WRM_AUTO:
|
|
osres = ddsrt_thread_create(&spec[i].wrtid, "pubthread_auto", &attr, pubthread_auto, &spec[i].wr);
|
|
os_error_exit(osres, "Error: cannot create thread pubthread_auto");
|
|
break;
|
|
case WRM_INPUT:
|
|
wsl = dds_alloc(sizeof(*wsl));
|
|
spec[i].wr.tpname = dds_string_dup(spec[i].topicname);
|
|
wsl->spec = &spec[i].wr;
|
|
if (wrspecs) {
|
|
wsl->next = wrspecs->next;
|
|
wrspecs->next = wsl;
|
|
} else {
|
|
wsl->next = wsl;
|
|
}
|
|
wrspecs = wsl;
|
|
break;
|
|
}
|
|
}
|
|
if (wrspecs) { /* start with first wrspec */
|
|
wrspecs = wrspecs->next;
|
|
osres = ddsrt_thread_create(&inptid, "pubthread", &attr, pubthread, wrspecs);
|
|
os_error_exit(osres, "Error: cannot create thread pubthread");
|
|
}
|
|
} else if (dur > 0) { /* note: abusing inptid */
|
|
osres = ddsrt_thread_create(&inptid, "autotermthread", &attr, autotermthread, NULL);
|
|
os_error_exit(osres, "Error: cannot create thread autotermthread");
|
|
}
|
|
|
|
for (i = 0; i <= specidx; i++) {
|
|
if (spec[i].rd.mode != MODE_NONE) {
|
|
spec[i].rd.idx = i;
|
|
osres = ddsrt_thread_create(&spec[i].rdtid, "subthread", &attr, subthread, &spec[i].rd);
|
|
os_error_exit(osres, "Error: cannot create thread subthread");
|
|
}
|
|
}
|
|
|
|
if (want_writer || dur > 0) {
|
|
int term_called = 0;
|
|
if (!want_writer || wrspecs) {
|
|
(void)ddsrt_thread_join(inptid, NULL);
|
|
term_called = 1;
|
|
terminate();
|
|
}
|
|
for (i = 0; i <= specidx; i++) {
|
|
if (spec[i].wr.mode == WRM_AUTO)
|
|
(void)ddsrt_thread_join(spec[i].wrtid, NULL);
|
|
}
|
|
if (!term_called)
|
|
terminate();
|
|
}
|
|
|
|
if (want_reader) {
|
|
uint32_t ret;
|
|
exitcode = 0;
|
|
for (i = 0; i <= specidx; i++) {
|
|
if (spec[i].rd.mode != MODE_NONE) {
|
|
(void)ddsrt_thread_join(spec[i].rdtid, &ret);
|
|
if ((uintptr_t) ret > exitcode)
|
|
exitcode = (uintptr_t) ret;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (wrspecs) {
|
|
struct wrspeclist *m;
|
|
m = wrspecs->next;
|
|
wrspecs->next = NULL;
|
|
wrspecs = m;
|
|
while ((m = wrspecs) != NULL) {
|
|
wrspecs = wrspecs->next;
|
|
dds_free(m);
|
|
}
|
|
}
|
|
|
|
dds_delete_listener(wrlistener);
|
|
dds_delete_listener(rdlistener);
|
|
|
|
dds_free((char **) qtopic);
|
|
dds_free((char **) qpublisher);
|
|
dds_free((char **) qsubscriber);
|
|
dds_free((char **) qreader);
|
|
dds_free((char **) qwriter);
|
|
|
|
for (i = 0; i <= specidx; i++) {
|
|
if(spec[i].topicname) dds_free((char *)spec[i].topicname);
|
|
if(spec[i].cftp_expr) dds_free((char *)spec[i].cftp_expr);
|
|
if(spec[i].metadata) dds_free(spec[i].metadata);
|
|
if(spec[i].typename) dds_free(spec[i].typename);
|
|
if(spec[i].keylist) dds_free(spec[i].keylist);
|
|
assert(spec[i].wr.tgtp == spec[i].rd.tgtp); /* so no need to free both */
|
|
// TODO ARB type support
|
|
// if (spec[i].rd.tgtp)
|
|
// tgfree(spec[i].rd.tgtp);
|
|
// if (spec[i].wr.tgtp)
|
|
// tgfree(spec[i].wr.tgtp);
|
|
if (spec[i].wr.tpname)
|
|
dds_string_free(spec[i].wr.tpname);
|
|
}
|
|
dds_free(spec);
|
|
|
|
// dds_delete(termcond);
|
|
common_fini ();
|
|
if (sleep_at_end) {
|
|
dds_sleepfor(DDS_SECS(sleep_at_end));
|
|
}
|
|
ddsrt_mutex_destroy(&output_mutex);
|
|
return (int) exitcode;
|
|
}
|