Optionally include sample content in trace

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-09-01 11:18:18 +02:00 committed by eboasson
parent 572d2528f3
commit 801ae26872
10 changed files with 397 additions and 19 deletions

View file

@ -60,6 +60,10 @@ void dds_stream_extract_keyhash (dds_istream_t * __restrict is, dds_keyhash_t *
void dds_stream_read_key (dds_istream_t * __restrict is, char * __restrict sample, const struct ddsi_sertopic_default * __restrict topic);
size_t dds_stream_print_key (dds_istream_t * __restrict is, const struct ddsi_sertopic_default * __restrict topic, char * __restrict buf, size_t size);
size_t dds_stream_print_sample (dds_istream_t * __restrict is, const struct ddsi_sertopic_default * __restrict topic, char * __restrict buf, size_t size);
/* For marshalling op code handling */
#define DDS_OP_MASK 0xff000000

View file

@ -276,6 +276,12 @@ static void serdata_builtin_to_ser_unref (struct ddsi_serdata *serdata_common, c
(void)serdata_common; (void)ref;
}
static size_t serdata_builtin_topic_print (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, char *buf, size_t size)
{
(void)topic; (void)serdata_common;
return (size_t) snprintf (buf, size, "(blob)");
}
const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = {
.get_size = serdata_builtin_get_size,
.eqkey = serdata_builtin_eqkey,
@ -288,5 +294,6 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = {
.to_ser_ref = serdata_builtin_to_ser_ref,
.to_ser_unref = serdata_builtin_to_ser_unref,
.to_topicless = serdata_builtin_to_topicless,
.topicless_to_sample = serdata_builtin_topicless_to_sample
.topicless_to_sample = serdata_builtin_topicless_to_sample,
.print = serdata_builtin_topic_print
};

View file

@ -11,6 +11,7 @@
*/
#include <assert.h>
#include <string.h>
#include <ctype.h>
#include "dds/ddsrt/endian.h"
#include "dds/ddsrt/md5.h"
@ -1608,6 +1609,292 @@ void dds_stream_extract_keyhash (dds_istream_t * __restrict is, dds_keyhash_t *
}
}
/*******************************************************************************************
**
** Pretty-printing
**
*******************************************************************************************/
/* Returns true if buffer not yet exhausted, false otherwise */
static bool prtf (char * __restrict *buf, size_t * __restrict bufsize, const char *fmt, ...)
{
va_list ap;
if (*bufsize == 0)
return false;
va_start (ap, fmt);
int n = vsnprintf (*buf, *bufsize, fmt, ap);
va_end (ap);
if (n < 0)
{
**buf = 0;
return false;
}
else if ((size_t) n <= *bufsize)
{
*buf += (size_t) n;
*bufsize -= (size_t) n;
return (*bufsize > 0);
}
else
{
*buf += *bufsize;
*bufsize = 0;
return false;
}
}
static bool prtf_str (char * __restrict *buf, size_t * __restrict bufsize, dds_istream_t * __restrict is)
{
size_t sz = dds_is_get4 (is);
bool ret = prtf (buf, bufsize, "\"%s\"", is->m_buffer + is->m_index);
is->m_index += (uint32_t) sz;
return ret;
}
static size_t isprint_runlen (const unsigned char *s, size_t n)
{
size_t m;
for (m = 0; m < n && s[m] != '"' && isprint (s[m]); m++)
;
return m;
}
static bool prtf_simple (char * __restrict *buf, size_t * __restrict bufsize, dds_istream_t * __restrict is, enum dds_stream_typecode type)
{
switch (type)
{
case DDS_OP_VAL_1BY: return prtf (buf, bufsize, "%"PRIu8, dds_is_get1 (is));
case DDS_OP_VAL_2BY: return prtf (buf, bufsize, "%"PRIu16, dds_is_get2 (is));
case DDS_OP_VAL_4BY: return prtf (buf, bufsize, "%"PRIu32, dds_is_get4 (is));
case DDS_OP_VAL_8BY: return prtf (buf, bufsize, "%"PRIu64, dds_is_get8 (is));
case DDS_OP_VAL_STR: case DDS_OP_VAL_BST: return prtf_str (buf, bufsize, is);
case DDS_OP_VAL_ARR: case DDS_OP_VAL_SEQ: case DDS_OP_VAL_UNI: case DDS_OP_VAL_STU:
abort ();
}
return false;
}
static bool prtf_simple_array (char * __restrict *buf, size_t * __restrict bufsize, dds_istream_t * __restrict is, uint32_t num, enum dds_stream_typecode type)
{
bool cont = prtf (buf, bufsize, "{");
switch (type)
{
case DDS_OP_VAL_1BY: {
size_t i = 0, j;
while (cont && i < num)
{
size_t m = isprint_runlen ((unsigned char *) (is->m_buffer + is->m_index), num - i);
if (m >= 4)
{
cont = prtf (buf, bufsize, "%s\"", i != 0 ? "," : "");
for (j = 0; cont && j < m; j++)
cont = prtf (buf, bufsize, "%c", is->m_buffer[is->m_index + j]);
cont = prtf (buf, bufsize, "\"");
is->m_index += (uint32_t) m;
i += m;
}
else
{
if (i != 0)
cont = prtf (buf, bufsize, ",");
cont = prtf_simple (buf, bufsize, is, type);
i++;
}
}
break;
}
case DDS_OP_VAL_2BY: case DDS_OP_VAL_4BY: case DDS_OP_VAL_8BY:
case DDS_OP_VAL_STR: case DDS_OP_VAL_BST:
for (size_t i = 0; cont && i < num; i++)
{
if (i != 0)
cont = prtf (buf, bufsize, ",");
cont = prtf_simple (buf, bufsize, is, type);
}
break;
default:
abort ();
break;
}
return cont;
}
static bool dds_stream_print_sample1 (char * __restrict *buf, size_t * __restrict bufsize, dds_istream_t * __restrict is, const uint32_t * __restrict ops, bool add_braces);
static const uint32_t *prtf_seq (char * __restrict *buf, size_t *bufsize, dds_istream_t * __restrict is, const uint32_t * __restrict ops, uint32_t insn)
{
const enum dds_stream_typecode subtype = DDS_OP_SUBTYPE (insn);
uint32_t num;
num = dds_is_get4 (is);
if (num == 0)
{
prtf (buf, bufsize, "{}");
return skip_sequence_insns (ops, insn);
}
switch (subtype)
{
case DDS_OP_VAL_1BY: case DDS_OP_VAL_2BY: case DDS_OP_VAL_4BY: case DDS_OP_VAL_8BY:
prtf_simple_array (buf, bufsize, is, num, subtype);
return ops + 2;
case DDS_OP_VAL_STR: case DDS_OP_VAL_BST:
prtf_simple_array (buf, bufsize, is, num, subtype);
return ops + (subtype == DDS_OP_VAL_STR ? 2 : 3);
case DDS_OP_VAL_SEQ: case DDS_OP_VAL_ARR: case DDS_OP_VAL_UNI: case DDS_OP_VAL_STU: {
const uint32_t jmp = DDS_OP_ADR_JMP (ops[3]);
uint32_t const * const jsr_ops = ops + DDS_OP_ADR_JSR (ops[3]);
bool cont = prtf (buf, bufsize, "{");
for (uint32_t i = 0; cont && i < num; i++)
{
if (i > 0) prtf (buf, bufsize, ",");
cont = dds_stream_print_sample1 (buf, bufsize, is, jsr_ops, subtype == DDS_OP_VAL_STU);
}
prtf (buf, bufsize, "}");
return ops + (jmp ? jmp : 4); /* FIXME: why would jmp be 0? */
}
}
return NULL;
}
static const uint32_t *prtf_arr (char * __restrict *buf, size_t *bufsize, dds_istream_t * __restrict is, const uint32_t * __restrict ops, uint32_t insn)
{
const enum dds_stream_typecode subtype = DDS_OP_SUBTYPE (insn);
const uint32_t num = ops[2];
switch (subtype)
{
case DDS_OP_VAL_1BY: case DDS_OP_VAL_2BY: case DDS_OP_VAL_4BY: case DDS_OP_VAL_8BY:
prtf_simple_array (buf, bufsize, is, num, subtype);
return ops + 3;
case DDS_OP_VAL_STR: case DDS_OP_VAL_BST:
prtf_simple_array (buf, bufsize, is, num, subtype);
return ops + (subtype == DDS_OP_VAL_STR ? 3 : 5);
case DDS_OP_VAL_SEQ: case DDS_OP_VAL_ARR: case DDS_OP_VAL_UNI: case DDS_OP_VAL_STU: {
const uint32_t *jsr_ops = ops + DDS_OP_ADR_JSR (ops[3]);
const uint32_t jmp = DDS_OP_ADR_JMP (ops[3]);
bool cont = prtf (buf, bufsize, "{");
for (uint32_t i = 0; cont && i < num; i++)
{
if (i > 0) prtf (buf, bufsize, ",");
cont = dds_stream_print_sample1 (buf, bufsize, is, jsr_ops, subtype == DDS_OP_VAL_STU);
}
prtf (buf, bufsize, "}");
return ops + (jmp ? jmp : 5);
}
}
return NULL;
}
static const uint32_t *prtf_uni (char * __restrict *buf, size_t *bufsize, dds_istream_t * __restrict is, const uint32_t * __restrict ops, uint32_t insn)
{
const uint32_t disc = read_union_discriminant (is, DDS_OP_SUBTYPE (insn));
uint32_t const * const jeq_op = find_union_case (ops, disc);
prtf (buf, bufsize, "%"PRIu32":", disc);
ops += DDS_OP_ADR_JMP (ops[3]);
if (jeq_op)
{
const enum dds_stream_typecode valtype = DDS_JEQ_TYPE (jeq_op[0]);
switch (valtype)
{
case DDS_OP_VAL_1BY: case DDS_OP_VAL_2BY: case DDS_OP_VAL_4BY: case DDS_OP_VAL_8BY:
case DDS_OP_VAL_STR: case DDS_OP_VAL_BST:
prtf_simple (buf, bufsize, is, valtype);
break;
case DDS_OP_VAL_SEQ: case DDS_OP_VAL_ARR: case DDS_OP_VAL_UNI: case DDS_OP_VAL_STU:
dds_stream_print_sample1 (buf, bufsize, is, jeq_op + DDS_OP_ADR_JSR (jeq_op[0]), valtype == DDS_OP_VAL_STU);
break;
}
}
return ops;
}
static bool dds_stream_print_sample1 (char * __restrict *buf, size_t * __restrict bufsize, dds_istream_t * __restrict is, const uint32_t * __restrict ops, bool add_braces)
{
uint32_t insn;
bool cont = true;
bool needs_comma = false;
if (add_braces)
prtf (buf, bufsize, "{");
while (cont && (insn = *ops) != DDS_OP_RTS)
{
if (needs_comma)
prtf (buf, bufsize, ",");
needs_comma = true;
switch (DDS_OP (insn))
{
case DDS_OP_ADR: {
switch (DDS_OP_TYPE (insn))
{
case DDS_OP_VAL_1BY: case DDS_OP_VAL_2BY: case DDS_OP_VAL_4BY: case DDS_OP_VAL_8BY:
case DDS_OP_VAL_STR:
cont = prtf_simple (buf, bufsize, is, DDS_OP_TYPE (insn));
ops += 2;
break;
case DDS_OP_VAL_BST:
cont = prtf_simple (buf, bufsize, is, DDS_OP_TYPE (insn));
ops += 3;
break;
case DDS_OP_VAL_SEQ:
ops = prtf_seq (buf, bufsize, is, ops, insn);
break;
case DDS_OP_VAL_ARR:
ops = prtf_arr (buf, bufsize, is, ops, insn);
break;
case DDS_OP_VAL_UNI:
ops = prtf_uni (buf, bufsize, is, ops, insn);
break;
case DDS_OP_VAL_STU:
abort ();
break;
}
break;
}
case DDS_OP_JSR: {
cont = dds_stream_print_sample1 (buf, bufsize, is, ops + DDS_OP_JUMP (insn), true);
ops++;
break;
}
case DDS_OP_RTS: case DDS_OP_JEQ: {
abort ();
break;
}
}
}
if (add_braces)
prtf (buf, bufsize, "}");
return cont;
}
size_t dds_stream_print_sample (dds_istream_t * __restrict is, const struct ddsi_sertopic_default * __restrict topic, char * __restrict buf, size_t bufsize)
{
dds_stream_print_sample1 (&buf, &bufsize, is, topic->type->m_ops, true);
return bufsize;
}
size_t dds_stream_print_key (dds_istream_t * __restrict is, const struct ddsi_sertopic_default * __restrict topic, char * __restrict buf, size_t bufsize)
{
const dds_topic_descriptor_t *desc = topic->type;
bool cont = prtf (&buf, &bufsize, ":k:{");
for (uint32_t i = 0; cont && i < desc->m_nkeys; i++)
{
const uint32_t *op = desc->m_ops + desc->m_keys[i].m_index;
assert (insn_key_ok_p (*op));
switch (DDS_OP_TYPE (*op))
{
case DDS_OP_VAL_1BY: case DDS_OP_VAL_2BY: case DDS_OP_VAL_4BY: case DDS_OP_VAL_8BY:
case DDS_OP_VAL_STR: case DDS_OP_VAL_BST:
cont = prtf_simple (&buf, &bufsize, is, DDS_OP_TYPE (*op));
break;
case DDS_OP_VAL_ARR:
cont = prtf_simple_array (&buf, &bufsize, is, op[2], DDS_OP_SUBTYPE (*op));
break;
case DDS_OP_VAL_SEQ: case DDS_OP_VAL_UNI: case DDS_OP_VAL_STU:
abort ();
break;
}
}
prtf (&buf, &bufsize, "}");
return bufsize;
}
/*******************************************************************************************
**
** Stuff to make it possible to treat a ddsi_serdata_default as a stream

View file

@ -123,6 +123,15 @@ typedef bool (*ddsi_serdata_topicless_to_sample_t) (const struct ddsi_sertopic *
computing equijoins across topics much simpler). */
typedef bool (*ddsi_serdata_eqkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b);
/* Print a serdata into the provided buffer (truncating as necessary)
- topic is present for supporting printing of "topicless" samples
- buf != NULL, bufsize > 0 on input
- buf must always be terminated with a nul character on return
- returns the number of characters (excluding the terminating 0) needed to print it
in full (or, as an optimization, it may pretend that it has printed it in full,
returning bufsize-1) if it had to truncate) */
typedef size_t (*ddsi_serdata_print_t) (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, char *buf, size_t size);
struct ddsi_serdata_ops {
ddsi_serdata_eqkey_t eqkey;
ddsi_serdata_size_t get_size;
@ -136,8 +145,11 @@ struct ddsi_serdata_ops {
ddsi_serdata_to_topicless_t to_topicless;
ddsi_serdata_topicless_to_sample_t topicless_to_sample;
ddsi_serdata_free_t free;
ddsi_serdata_print_t print;
};
#define DDSI_SERDATA_HAS_PRINT 1
DDS_EXPORT void ddsi_serdata_init (struct ddsi_serdata *d, const struct ddsi_sertopic *tp, enum ddsi_serdata_kind kind);
DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *serdata_const) {
@ -195,6 +207,20 @@ DDS_EXPORT inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const s
return a->ops->eqkey (a, b);
}
DDS_EXPORT inline bool ddsi_serdata_print (const struct ddsi_serdata *d, char *buf, size_t size) {
return d->ops->print (d->topic, d, buf, size);
}
DDS_EXPORT inline bool ddsi_serdata_print_topicless (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, char *buf, size_t size) {
if (d->ops->print)
return d->ops->print (topic, d, buf, size);
else
{
buf[0] = 0;
return 0;
}
}
#if defined (__cplusplus)
}
#endif

View file

@ -45,3 +45,5 @@ extern inline void ddsi_serdata_to_ser_unref (struct ddsi_serdata *d, const ddsr
extern inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
extern inline bool ddsi_serdata_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
extern inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b);
extern inline bool ddsi_serdata_print (const struct ddsi_serdata *d, char *buf, size_t size);
extern inline bool ddsi_serdata_print_topicless (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, char *buf, size_t size);

View file

@ -629,6 +629,32 @@ static bool serdata_default_topicless_to_sample_cdr_nokey (const struct ddsi_ser
return true;
}
static size_t serdata_default_print_cdr (const struct ddsi_sertopic *sertopic_common, const struct ddsi_serdata *serdata_common, char *buf, size_t size)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)sertopic_common;
dds_istream_t is;
dds_istream_from_serdata_default (&is, d);
if (d->c.kind == SDK_KEY)
return dds_stream_print_key (&is, tp, buf, size);
else
return dds_stream_print_sample (&is, tp, buf, size);
}
static size_t serdata_default_print_plist (const struct ddsi_sertopic *sertopic_common, const struct ddsi_serdata *serdata_common, char *buf, size_t size)
{
/* FIXME: should change q_plist.c to print to a string instead of a log, and then drop the
logging of QoS in the rest of code, instead relying on this */
(void)sertopic_common; (void)serdata_common;
return (size_t) snprintf (buf, size, "(plist)");
}
static size_t serdata_default_print_raw (const struct ddsi_sertopic *sertopic_common, const struct ddsi_serdata *serdata_common, char *buf, size_t size)
{
(void)sertopic_common; (void)serdata_common;
return (size_t) snprintf (buf, size, "(blob)");
}
const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = {
.get_size = serdata_default_get_size,
.eqkey = serdata_default_eqkey,
@ -641,7 +667,8 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = {
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = serdata_default_topicless_to_sample_cdr
.topicless_to_sample = serdata_default_topicless_to_sample_cdr,
.print = serdata_default_print_cdr
};
const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey = {
@ -656,7 +683,8 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_cdr_nokey = {
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = serdata_default_topicless_to_sample_cdr_nokey
.topicless_to_sample = serdata_default_topicless_to_sample_cdr_nokey,
.print = serdata_default_print_cdr
};
const struct ddsi_serdata_ops ddsi_serdata_ops_plist = {
@ -671,7 +699,8 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_plist = {
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = 0
.topicless_to_sample = 0,
.print = serdata_default_print_plist
};
const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = {
@ -686,5 +715,6 @@ const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = {
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref,
.to_topicless = serdata_default_to_topicless,
.topicless_to_sample = 0
.topicless_to_sample = 0,
.print = serdata_default_print_raw
};

View file

@ -1317,14 +1317,17 @@ static enum update_result do_uint32_bitset (struct cfgst *cfgst, uint32_t *cats,
char *copy = ddsrt_strdup (value), *cursor = copy, *tok;
while ((tok = ddsrt_strsep (&cursor, ",")) != NULL)
{
const int idx = list_index (names, tok);
const int idx = list_index (names, tok[0] == '-' ? tok+1 : tok);
if (idx < 0)
{
const enum update_result ret = cfg_error (cfgst, "'%s' in '%s' undefined", tok, value);
ddsrt_free (copy);
return ret;
}
*cats |= codes[idx];
if (tok[0] == '-')
*cats &= ~codes[idx];
else
*cats |= codes[idx];
}
ddsrt_free (copy);
return URES_SUCCESS;
@ -1517,10 +1520,10 @@ GENERIC_ENUM_CTYPE (standards_conformance, enum nn_standards_conformance)
/* "trace" is special: it enables (nearly) everything */
static const char *tracemask_names[] = {
"fatal", "error", "warning", "info", "config", "discovery", "data", "radmin", "timing", "traffic", "topic", "tcp", "plist", "whc", "throttle", "rhc", "trace", NULL
"fatal", "error", "warning", "info", "config", "discovery", "data", "radmin", "timing", "traffic", "topic", "tcp", "plist", "whc", "throttle", "rhc", "content", "trace", NULL
};
static const uint32_t tracemask_codes[] = {
DDS_LC_FATAL, DDS_LC_ERROR, DDS_LC_WARNING, DDS_LC_INFO, DDS_LC_CONFIG, DDS_LC_DISCOVERY, DDS_LC_DATA, DDS_LC_RADMIN, DDS_LC_TIMING, DDS_LC_TRAFFIC, DDS_LC_TOPIC, DDS_LC_TCP, DDS_LC_PLIST, DDS_LC_WHC, DDS_LC_THROTTLE, DDS_LC_RHC, DDS_LC_ALL
DDS_LC_FATAL, DDS_LC_ERROR, DDS_LC_WARNING, DDS_LC_INFO, DDS_LC_CONFIG, DDS_LC_DISCOVERY, DDS_LC_DATA, DDS_LC_RADMIN, DDS_LC_TIMING, DDS_LC_TRAFFIC, DDS_LC_TOPIC, DDS_LC_TCP, DDS_LC_PLIST, DDS_LC_WHC, DDS_LC_THROTTLE, DDS_LC_RHC, DDS_LC_CONTENT, DDS_LC_ALL
};
static enum update_result uf_tracemask (struct cfgst *cfgst, UNUSED_ARG (void *parent), UNUSED_ARG (struct cfgelem const * const cfgelem), UNUSED_ARG (int first), const char *value)

View file

@ -1818,6 +1818,21 @@ static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **t
ddsi_serdata_unref (sample);
sample = NULL;
}
else if (gv->logconfig.c.mask & DDS_LC_TRACE)
{
const struct proxy_writer *pwr = sampleinfo->pwr;
nn_guid_t guid;
char tmp[1024];
size_t res = 0;
tmp[0] = 0;
if (gv->logconfig.c.mask & DDS_LC_CONTENT)
res = ddsi_serdata_print (sample, tmp, sizeof (tmp));
if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid));
GVTRACE ("data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": ST%x %s/%s:%s%s",
sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1],
PGUID (guid), sampleinfo->seq, statusinfo, topic->name, topic->type_name,
tmp, res < sizeof (tmp) ? "" : "(trunc)");
}
}
return sample;
}
@ -1932,9 +1947,8 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
if ((plist_ret = nn_plist_init_frommsg (&qos, NULL, PP_STATUSINFO | PP_KEYHASH | PP_COHERENT_SET, 0, &src)) < 0)
{
if (plist_ret != DDS_RETCODE_UNSUPPORTED)
DDS_CWARNING (&gv->logconfig,
"data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": invalid inline qos\n",
src.vendorid.id[0], src.vendorid.id[1], PGUID (pwr->e.guid), sampleinfo->seq);
GVWARNING ("data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": invalid inline qos\n",
src.vendorid.id[0], src.vendorid.id[1], PGUID (pwr->e.guid), sampleinfo->seq);
return 0;
}
statusinfo = (qos.present & PP_STATUSINFO) ? qos.statusinfo : 0;
@ -1947,8 +1961,6 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
if (rdguid == NULL)
{
ETRACE (pwr, " %"PRId64"=>EVERYONE\n", sampleinfo->seq);
/* FIXME: Retry loop, for re-delivery of rejected reliable samples. Is a
temporary hack till throttling back of writer is implemented (with late
acknowledgement of sample and nack). */
@ -1963,9 +1975,9 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
struct ddsi_tkmap_instance *tk;
if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rdary[0]->topic)) != NULL)
{
ETRACE (pwr, " => EVERYONE\n");
uint32_t i = 0;
do {
ETRACE (pwr, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
if (!rhc_store (rdary[i]->rhc, &pwr_info, payload, tk))
{
if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
@ -2005,12 +2017,14 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
struct ddsi_tkmap_instance *tk;
if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL)
{
ETRACE (pwr, " =>");
do {
ETRACE (pwr, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
ETRACE (pwr, " "PGUIDFMT, PGUID (rd->e.guid));
(void) rhc_store (rd->rhc, &pwr_info, payload, tk);
rd = proxy_writer_next_in_sync_reader (pwr, &it);
} while (rd != NULL);
free_sample_after_store (gv, payload, tk);
ETRACE (pwr, "\n");
}
}
if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
@ -2021,13 +2035,13 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
else
{
struct reader *rd = ephash_lookup_reader_guid (gv->guid_hash, rdguid);
ETRACE (pwr, " %"PRId64"=>"PGUIDFMT"%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?");
if (rd != NULL)
{
struct ddsi_serdata *payload;
struct ddsi_tkmap_instance *tk;
if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL)
{
ETRACE (pwr, " =>"PGUIDFMT"\n", PGUID (*rdguid));
/* FIXME: why look up rd,pwr again? Their states remains valid while the thread stays
"awake" (although a delete can be initiated), and blocking like this is a stopgap
anyway -- quite possibly to abort once either is deleted */

View file

@ -859,6 +859,8 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
const char *ttname = wr->topic ? wr->topic->type_name : "(null)";
ppbuf[0] = '\0';
tmp = sizeof (ppbuf) - 1;
if (wr->e.gv->logconfig.c.mask & DDS_LC_CONTENT)
ddsi_serdata_print (serdata, ppbuf, sizeof (ppbuf));
ETRACE (wr, "write_sample "PGUIDFMT" #%"PRId64, PGUID (wr->e.guid), seq);
if (plist != 0 && (plist->present & PP_COHERENT_SET))
ETRACE (wr, " C#%"PRId64"", fromSN (plist->coherent_set_seqno));

View file

@ -73,13 +73,16 @@ extern "C" {
#define DDS_LC_WHC (16384u)
/** Debug/trace messages related to throttling. */
#define DDS_LC_THROTTLE (32768u)
/** All common trace categories. */
/** Reader history cache. */
#define DDS_LC_RHC (65536u)
/** Include content in traces. */
#define DDS_LC_CONTENT (131072u)
/** All common trace categories. */
#define DDS_LC_ALL \
(DDS_LC_FATAL | DDS_LC_ERROR | DDS_LC_WARNING | DDS_LC_INFO | \
DDS_LC_CONFIG | DDS_LC_DISCOVERY | DDS_LC_DATA | DDS_LC_TRACE | \
DDS_LC_TIMING | DDS_LC_TRAFFIC | DDS_LC_TCP | DDS_LC_THROTTLE)
DDS_LC_TIMING | DDS_LC_TRAFFIC | DDS_LC_TCP | DDS_LC_THROTTLE | \
DDS_LC_CONTENT)
/** @}*/
#define DDS_LOG_MASK \