abstract the internal representation of a sample

besides the old state being in dire need of cleaning up, this also paves the way for having any number of different sample representations in the system

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-10-26 16:25:26 +08:00
parent 6e1df4c564
commit a25f683bcf
52 changed files with 1400 additions and 1125 deletions

View file

@ -11,7 +11,6 @@
#
PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_eth.c
ddsi_ser.c
ddsi_ssl.c
ddsi_tcp.c
ddsi_tran.c
@ -19,6 +18,10 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_raweth.c
ddsi_ipaddr.c
ddsi_mcgroup.c
ddsi_serdata.c
ddsi_serdata_default.c
ddsi_sertopic.c
ddsi_sertopic_default.c
ddsi_rhc_plugin.c
q_addrset.c
q_bitset_inlines.c
@ -60,7 +63,6 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
# The includes should reside close to the code. As long as that's not the case,
# pull them in from this CMakeLists.txt.
PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi"
ddsi_ser.h
ddsi_ssl.h
ddsi_tcp.h
ddsi_tran.h
@ -68,6 +70,9 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi"
ddsi_raweth.h
ddsi_ipaddr.h
ddsi_mcgroup.h
ddsi_serdata.h
ddsi_sertopic.h
ddsi_serdata_default.h
ddsi_rhc_plugin.h
probes-constants.h
q_addrset.h

View file

@ -15,8 +15,8 @@
struct rhc;
struct nn_xqos;
struct tkmap_instance;
struct serdata;
struct sertopic;
struct ddsi_serdata;
struct ddsi_sertopic;
struct entity_common;
struct proxy_writer_info
@ -33,13 +33,13 @@ struct ddsi_rhc_plugin
void (*rhc_fini_fn) (struct rhc *rhc);
bool (*rhc_store_fn)
(struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info,
struct serdata * __restrict sample, struct tkmap_instance * __restrict tk);
struct ddsi_serdata * __restrict sample, struct tkmap_instance * __restrict tk);
void (*rhc_unregister_wr_fn)
(struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);
void (*rhc_relinquish_ownership_fn)
(struct rhc * __restrict rhc, const uint64_t wr_iid);
void (*rhc_set_qos_fn) (struct rhc * rhc, const struct nn_xqos * qos);
struct tkmap_instance * (*rhc_lookup_fn) (struct serdata *serdata);
struct tkmap_instance * (*rhc_lookup_fn) (struct ddsi_serdata *serdata);
void (*rhc_unref_fn) (struct tkmap_instance *tk);
};

View file

@ -0,0 +1,158 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_SERDATA_H
#define DDSI_SERDATA_H
#include "ddsi/q_time.h"
#include "ddsi/ddsi_sertopic.h"
#include "ddsi/sysdeps.h" /* for ddsi_iovec_t */
struct nn_rdata;
struct nn_keyhash;
enum ddsi_serdata_kind {
SDK_EMPTY,
SDK_KEY,
SDK_DATA
};
struct ddsi_serdata {
const struct ddsi_serdata_ops *ops; /* cached from topic->serdata_ops */
uint32_t hash;
os_atomic_uint32_t refc;
enum ddsi_serdata_kind kind;
const struct ddsi_sertopic *topic;
/* these get set by generic code after creating the serdata */
nn_wctime_t timestamp;
uint32_t statusinfo;
/* FIXME: can I get rid of this one? */
nn_mtime_t twrite; /* write time, not source timestamp, set post-throttling */
};
/* Serialised size of sample: uint32_t because the protocol can't handle samples larger than 4GB anyway */
typedef uint32_t (*ddsi_serdata_size_t) (const struct ddsi_serdata *d);
/* Free a serdata (called by unref when refcount goes to 0) */
typedef void (*ddsi_serdata_free_t) (struct ddsi_serdata *d);
/* Construct a serdata from a fragchain received over the network */
typedef struct ddsi_serdata * (*ddsi_serdata_from_ser_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size);
/* Construct a serdata from a keyhash (an SDK_KEY by definition) */
typedef struct ddsi_serdata * (*ddsi_serdata_from_keyhash_t) (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash);
/* Construct a serdata from an application sample */
typedef struct ddsi_serdata * (*ddsi_serdata_from_sample_t) (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample);
/* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <=
alignup4(size(d)) */
typedef void (*ddsi_serdata_to_ser_t) (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf);
/* Provide a pointer to 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <=
alignup4(size(d)); it must remain valid until the corresponding call to to_ser_unref. Multiple
calls to to_ser_ref() may be issued in parallel, the separate ref/unref bit is there to at least
have the option of lazily creating the serialised representation and freeing it when no one needs
it, while the sample itself remains valid */
typedef struct ddsi_serdata * (*ddsi_serdata_to_ser_ref_t) (const struct ddsi_serdata *d, size_t off, size_t sz, ddsi_iovec_t *ref);
/* Release a lock on serialised data, ref must be a pointer previously obtained by calling
to_ser_ref(d, off, sz) for some offset off. */
typedef void (*ddsi_serdata_to_ser_unref_t) (struct ddsi_serdata *d, const ddsi_iovec_t *ref);
/* Turn serdata into an application sample (or just the key values if only key values are
available); return false on error (typically out-of-memory, but if from_ser doesn't do any
validation it might be a deserialisation error, too).
If (bufptr != 0), then *bufptr .. buflim is space to be used from *bufptr up (with minimal
padding) for any data in the sample that needs to be allocated (e.g., strings, sequences);
otherwise malloc() is to be used for those. (This allows read/take to be given a block of memory
by the caller.) */
typedef bool (*ddsi_serdata_to_sample_t) (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
/* Compare key values of two serdatas (with the same ddsi_serdata_ops, but not necessarily of the
same topic) (FIXME: not sure I need this one) */
typedef int (*ddsi_serdata_cmpkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b);
/* Test key values of two serdatas for equality (with the same ddsi_serdata_ops, but not necessarily
of the same topic) */
typedef bool (*ddsi_serdata_eqkey_t) (const struct ddsi_serdata *a, const struct ddsi_serdata *b);
struct ddsi_serdata_ops {
ddsi_serdata_size_t get_size;
ddsi_serdata_from_ser_t from_ser;
ddsi_serdata_from_keyhash_t from_keyhash;
ddsi_serdata_from_sample_t from_sample;
ddsi_serdata_to_ser_t to_ser;
ddsi_serdata_to_ser_ref_t to_ser_ref;
ddsi_serdata_to_ser_unref_t to_ser_unref;
ddsi_serdata_to_sample_t to_sample;
ddsi_serdata_cmpkey_t cmpkey;
ddsi_serdata_eqkey_t eqkey;
ddsi_serdata_free_t free;
};
void ddsi_serdata_init (struct ddsi_serdata *d, const struct ddsi_sertopic *tp, enum ddsi_serdata_kind kind);
inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *serdata_const) {
struct ddsi_serdata *serdata = (struct ddsi_serdata *)serdata_const;
os_atomic_inc32 (&serdata->refc);
return serdata;
}
inline void ddsi_serdata_unref (struct ddsi_serdata *serdata) {
if (os_atomic_dec32_ov (&serdata->refc) == 1)
serdata->ops->free (serdata);
}
inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d) {
return d->ops->get_size (d);
}
inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size) {
return topic->serdata_ops->from_ser (topic, kind, fragchain, size);
}
inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash) {
return topic->serdata_ops->from_keyhash (topic, keyhash);
}
inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample) {
return topic->serdata_ops->from_sample (topic, kind, sample);
}
inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf) {
return d->ops->to_ser (d, off, sz, buf);
}
inline struct ddsi_serdata *ddsi_serdata_to_ser_ref (const struct ddsi_serdata *d, size_t off, size_t sz, ddsi_iovec_t *ref) {
return d->ops->to_ser_ref (d, off, sz, ref);
}
inline void ddsi_serdata_to_ser_unref (struct ddsi_serdata *d, const ddsi_iovec_t *ref) {
d->ops->to_ser_unref (d, ref);
}
inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim) {
return d->ops->to_sample (d, sample, bufptr, buflim);
}
inline int ddsi_serdata_cmpkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b) {
return a->ops->cmpkey (a, b);
}
inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b) {
return a->ops->eqkey (a, b);
}
#endif

View file

@ -17,6 +17,8 @@
#include "ddsi/q_freelist.h"
#include "util/ut_avl.h"
#include "sysdeps.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/ddsi_sertopic.h"
#include "ddsc/dds.h"
#include "dds__topic.h"
@ -39,47 +41,20 @@
#define CDR_LE 0x0001
#endif
typedef struct serstatepool * serstatepool_t;
typedef struct serstate * serstate_t;
typedef struct serdata * serdata_t;
typedef struct sertopic * sertopic_t;
struct CDRHeader
{
unsigned short identifier;
unsigned short options;
};
struct serdata_msginfo
{
unsigned statusinfo;
nn_wctime_t timestamp;
};
enum serstate_kind {
STK_EMPTY,
STK_KEY,
STK_DATA
};
struct serstate
{
serdata_t data;
nn_mtime_t twrite; /* write time, not source timestamp, set post-throttling */
os_atomic_uint32_t refcount;
size_t pos;
size_t size;
const struct sertopic * topic;
enum serstate_kind kind;
serstatepool_t pool;
struct serstate *next; /* in pool->freelist */
};
struct serstatepool
struct serstatepool /* FIXME: now a serdatapool */
{
struct nn_freelist freelist;
};
struct serstate {
struct ddsi_serdata_default *data;
};
#define DDS_KEY_SET 0x0001
#define DDS_KEY_HASH_SET 0x0002
@ -87,7 +62,7 @@ struct serstatepool
typedef struct dds_key_hash
{
char m_hash [16]; /* Key hash value. Also possibly key. */
char m_hash [16]; /* Key hash value. Also possibly key. Suitably aligned for accessing as uint32_t's */
uint32_t m_key_len; /* Length of key (may be in m_hash or m_key_buff) */
uint32_t m_key_buff_size; /* Size of allocated key buffer (m_key_buff) */
char * m_key_buff; /* Key buffer */
@ -95,57 +70,48 @@ typedef struct dds_key_hash
}
dds_key_hash_t;
struct serdata_base
struct ddsi_serdata_default
{
serstate_t st; /* back pointer to (opaque) serstate so RTPS impl only needs serdata */
struct serdata_msginfo msginfo;
int hash_valid; /* whether hash is valid or must be computed from key/data */
uint32_t hash; /* cached serdata hash, valid only if hash_valid != 0 */
struct ddsi_serdata c;
uint32_t pos;
uint32_t size;
bool bswap;
#ifndef NDEBUG
bool fixed;
#endif
dds_key_hash_t keyhash;
bool bswap; /* Whether state is native endian or requires swapping */
};
struct serdata
{
struct serdata_base v;
struct serstatepool *pool;
struct ddsi_serdata_default *next; /* in pool->freelist */
/* padding to ensure CDRHeader is at an offset 4 mod 8 from the
start of the memory, so that data is 8-byte aligned provided
serdata is 8-byte aligned */
char pad[8 - ((sizeof (struct serdata_base) + 4) % 8)];
char pad[8 - ((sizeof (struct ddsi_serdata) + 4) % 8)];
struct CDRHeader hdr;
char data[1];
};
struct dds_key_descriptor;
struct dds_topic;
typedef void (*topic_cb_t) (struct dds_topic * topic);
#ifndef DDS_TOPIC_INTERN_FILTER_FN_DEFINED
#define DDS_TOPIC_INTERN_FILTER_FN_DEFINED
typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx);
#endif
struct sertopic
struct ddsi_sertopic_default
{
ut_avlNode_t avlnode;
char * name_typename;
char * name;
char * typename;
struct ddsi_sertopic c;
uint16_t native_encoding_identifier; /* (PL_)?CDR_(LE|BE) */
void * type;
unsigned nkeys;
uint32_t id;
uint32_t hash;
uint32_t flags;
size_t opt_size;
os_atomic_uint32_t refcount;
topic_cb_t status_cb;
dds_topic_intern_filter_fn filter_fn;
void * filter_sample;
void * filter_ctx;
struct dds_topic * status_cb_entity;
const struct dds_key_descriptor * keys;
/*
@ -162,29 +128,30 @@ struct sertopic
*/
};
serstatepool_t ddsi_serstatepool_new (void);
void ddsi_serstatepool_free (serstatepool_t pool);
struct ddsi_plist_sample {
void *blob;
size_t size;
nn_parameterid_t keyparam;
};
serdata_t ddsi_serdata_ref (serdata_t serdata);
OSAPI_EXPORT void ddsi_serdata_unref (serdata_t serdata);
int ddsi_serdata_refcount_is_1 (serdata_t serdata);
nn_mtime_t ddsi_serdata_twrite (const struct serdata * serdata);
void ddsi_serdata_set_twrite (struct serdata * serdata, nn_mtime_t twrite);
uint32_t ddsi_serdata_size (const struct serdata * serdata);
int ddsi_serdata_is_key (const struct serdata * serdata);
int ddsi_serdata_is_empty (const struct serdata * serdata);
struct ddsi_rawcdr_sample {
void *blob;
size_t size;
void *key;
size_t keysize;
};
OSAPI_EXPORT void ddsi_serstate_append_blob (serstate_t st, size_t align, size_t sz, const void *data);
OSAPI_EXPORT void ddsi_serstate_set_msginfo (serstate_t st, unsigned statusinfo, nn_wctime_t timestamp);
OSAPI_EXPORT serstate_t ddsi_serstate_new (const struct sertopic * topic);
OSAPI_EXPORT serdata_t ddsi_serstate_fix (serstate_t st);
nn_mtime_t ddsi_serstate_twrite (const struct serstate *serstate);
void ddsi_serstate_set_twrite (struct serstate *serstate, nn_mtime_t twrite);
void ddsi_serstate_release (serstate_t st);
void * ddsi_serstate_append (serstate_t st, size_t n);
void * ddsi_serstate_append_align (serstate_t st, size_t a);
void * ddsi_serstate_append_aligned (serstate_t st, size_t n, size_t a);
extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_default;
OSAPI_EXPORT void ddsi_serdata_getblob (void **raw, size_t *sz, serdata_t serdata);
extern const struct ddsi_serdata_ops ddsi_serdata_ops_cdr;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_plist;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr;
struct serstatepool * ddsi_serstatepool_new (void);
void ddsi_serstatepool_free (struct serstatepool * pool);
OSAPI_EXPORT void ddsi_serstate_append_blob (struct serstate * st, size_t align, size_t sz, const void *data);
void * ddsi_serstate_append (struct serstate * st, size_t n);
void * ddsi_serstate_append_aligned (struct serstate * st, size_t n, size_t a);
#endif

View file

@ -0,0 +1,48 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_SERTOPIC_H
#define DDSI_SERTOPIC_H
#include "util/ut_avl.h"
struct ddsi_serdata;
struct ddsi_serdata_ops;
struct dds_topic;
typedef void (*topic_cb_t) (struct dds_topic * topic);
struct ddsi_sertopic_ops;
struct ddsi_sertopic {
ut_avlNode_t avlnode; /* index on name_typename */
const struct ddsi_sertopic_ops *ops;
const struct ddsi_serdata_ops *serdata_ops;
char *name_typename;
char *name;
char *typename;
uint64_t iid;
os_atomic_uint32_t refc; /* counts refs from entities, not from data */
topic_cb_t status_cb;
struct dds_topic * status_cb_entity;
};
typedef void (*ddsi_sertopic_deinit_t) (struct ddsi_sertopic *tp);
struct ddsi_sertopic_ops {
ddsi_sertopic_deinit_t deinit;
};
struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp);
void ddsi_sertopic_unref (struct ddsi_sertopic *tp);
#endif

View file

@ -33,7 +33,7 @@ struct nn_reorder;
struct nn_defrag;
struct nn_dqueue;
struct addrset;
struct sertopic;
struct ddsi_sertopic;
struct whc;
struct nn_xqos;
struct nn_plist;
@ -234,7 +234,7 @@ struct writer
unsigned supports_ssm: 1;
struct addrset *ssm_as;
#endif
const struct sertopic * topic; /* topic, but may be NULL for built-ins */
const struct ddsi_sertopic * topic; /* topic, but may be NULL for built-ins */
struct addrset *as; /* set of addresses to publish to */
struct addrset *as_group; /* alternate case, used for SPDP, when using Cloud with multiple bootstrap locators */
struct xevent *heartbeat_xevent; /* timed event for "periodically" publishing heartbeats when unack'd data present, NULL <=> unreliable */
@ -276,7 +276,7 @@ struct reader
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
struct addrset *as;
#endif
const struct sertopic * topic; /* topic is NULL for built-in readers */
const struct ddsi_sertopic * topic; /* topic is NULL for built-in readers */
ut_avlTree_t writers; /* all matching PROXY writers, see struct rd_pwr_match */
ut_avlTree_t local_writers; /* all matching LOCAL writers, see struct rd_wr_match */
ddsi2direct_directread_cb_t ddsi2direct_cb;
@ -328,7 +328,7 @@ struct proxy_endpoint_common
struct proxy_endpoint_common *next_ep; /* next \ endpoint belonging to this proxy participant */
struct proxy_endpoint_common *prev_ep; /* prev / -- this is in arbitrary ordering */
struct nn_xqos *xqos; /* proxy endpoint QoS lives here; FIXME: local ones should have it moved to common as well */
const struct sertopic * topic; /* topic may be NULL: for built-ins, but also for never-yet matched proxies (so we don't have to know the topic; when we match, we certainly do know) */
const struct ddsi_sertopic * topic; /* topic may be NULL: for built-ins, but also for never-yet matched proxies (so we don't have to know the topic; when we match, we certainly do know) */
struct addrset *as; /* address set to use for communicating with this endpoint */
nn_guid_t group_guid; /* 0:0:0:0 if not available */
nn_vendorid_t vendor; /* cached from proxypp->vendor */
@ -473,9 +473,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
GUID "ppguid". May return NULL if participant unknown or
writer/reader already known. */
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg);
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg);
struct reader * new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void * status_cb_arg);
struct reader * new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void * status_cb_arg);
struct whc_node;
struct whc_state;

View file

@ -277,6 +277,8 @@ struct q_globals {
transmit queue*/
struct serstatepool *serpool;
struct nn_xmsgpool *xmsgpool;
struct ddsi_sertopic *plist_topic; /* used for all discovery data */
struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */
/* Network ID needed by v_groupWrite -- FIXME: might as well pass it
to the receive thread instead of making it global (and that would

View file

@ -200,7 +200,7 @@ typedef struct nn_plist_src {
nn_protocol_version_t protocol_version;
nn_vendorid_t vendorid;
int encoding;
unsigned char *buf;
const unsigned char *buf;
size_t bufsz;
} nn_plist_src_t;
@ -233,6 +233,7 @@ struct nn_rsample_info;
struct nn_rdata;
unsigned char *nn_plist_quickscan (struct nn_rsample_info *dest, const struct nn_rmsg *rmsg, const nn_plist_src_t *src);
const unsigned char *nn_plist_findparam_native_unchecked (const void *src, nn_parameterid_t pid);
#if defined (__cplusplus)
}

View file

@ -24,7 +24,7 @@ struct nn_xmsg;
struct writer;
struct whc_state;
struct proxy_reader;
struct serdata;
struct ddsi_serdata;
struct tkmap_instance;
/* Writing new data; serdata_twrite (serdata) is assumed to be really
@ -34,14 +34,14 @@ struct tkmap_instance;
"nogc": no GC may occur, so it may not block to throttle the writer if the high water mark of the WHC is reached, which implies true KEEP_LAST behaviour. This is true for all the DDSI built-in writers.
"gc": GC may occur, which means the writer history and watermarks can be anything. This must be used for all application data.
*/
int write_sample_gc (struct nn_xpack *xp, struct writer *wr, struct serdata *serdata, struct tkmap_instance *tk);
int write_sample_nogc (struct nn_xpack *xp, struct writer *wr, struct serdata *serdata, struct tkmap_instance *tk);
int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, struct serdata *serdata);
int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, struct serdata *serdata);
int write_sample_gc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct tkmap_instance *tk);
int write_sample_nogc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct tkmap_instance *tk);
int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
/* When calling the following functions, wr->lock must be held */
int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct serdata *serdata, struct proxy_reader *prd, int isnew);
int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, nn_entityid_t dst, int issync);
#if defined (__cplusplus)

View file

@ -16,7 +16,7 @@
extern "C" {
#endif
struct serdata;
struct ddsi_serdata;
struct nn_plist;
struct tkmap_instance;
struct whc_node; /* opaque, but currently used for deferred free lists */
@ -24,7 +24,7 @@ struct whc;
struct whc_borrowed_sample {
seqno_t seq;
struct serdata *serdata;
struct ddsi_serdata *serdata;
struct nn_plist *plist;
bool unacked;
nn_mtime_t last_rexmit_ts;
@ -56,7 +56,7 @@ struct whc_sample_iter {
typedef seqno_t (*whc_next_seq_t)(const struct whc *whc, seqno_t seq);
typedef void (*whc_get_state_t)(const struct whc *whc, struct whc_state *st);
typedef bool (*whc_borrow_sample_t)(const struct whc *whc, seqno_t seq, struct whc_borrowed_sample *sample);
typedef bool (*whc_borrow_sample_key_t)(const struct whc *whc, const struct serdata *serdata_key, struct whc_borrowed_sample *sample);
typedef bool (*whc_borrow_sample_key_t)(const struct whc *whc, const struct ddsi_serdata *serdata_key, struct whc_borrowed_sample *sample);
typedef void (*whc_return_sample_t)(struct whc *whc, struct whc_borrowed_sample *sample, bool update_retransmit_info);
typedef void (*whc_sample_iter_init_t)(const struct whc *whc, struct whc_sample_iter *it);
typedef bool (*whc_sample_iter_borrow_next_t)(struct whc_sample_iter *it, struct whc_borrowed_sample *sample);
@ -66,7 +66,7 @@ typedef void (*whc_free_t)(struct whc *whc);
reliable readers that have not acknowledged all data */
/* max_drop_seq must go soon, it's way too ugly. */
/* plist may be NULL or os_malloc'd, WHC takes ownership of plist */
typedef int (*whc_insert_t)(struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct serdata *serdata, struct tkmap_instance *tk);
typedef int (*whc_insert_t)(struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk);
typedef unsigned (*whc_downgrade_to_volatile_t)(struct whc *whc, struct whc_state *st);
typedef unsigned (*whc_remove_acked_messages_t)(struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list);
typedef void (*whc_free_deferred_free_list_t)(struct whc *whc, struct whc_node *deferred_free_list);

View file

@ -22,7 +22,7 @@
extern "C" {
#endif
struct serdata;
struct ddsi_serdata;
struct addrset;
struct proxy_reader;
struct proxy_writer;
@ -33,6 +33,7 @@ struct nn_xmsgpool;
struct nn_xmsg_data;
struct nn_xmsg;
struct nn_xpack;
struct ddsi_plist_sample;
struct nn_xmsg_marker {
size_t offset;
@ -107,13 +108,14 @@ int nn_xmsg_compare_fragid (const struct nn_xmsg *a, const struct nn_xmsg *b);
void nn_xmsg_free (struct nn_xmsg *msg);
size_t nn_xmsg_size (const struct nn_xmsg *m);
void *nn_xmsg_payload (size_t *sz, struct nn_xmsg *m);
void nn_xmsg_payload_to_plistsample (struct ddsi_plist_sample *dst, nn_parameterid_t keyparam, const struct nn_xmsg *m);
enum nn_xmsg_kind nn_xmsg_kind (const struct nn_xmsg *m);
void nn_xmsg_guid_seq_fragid (const struct nn_xmsg *m, nn_guid_t *wrguid, seqno_t *wrseq, nn_fragment_number_t *wrfragid);
void *nn_xmsg_submsg_from_marker (struct nn_xmsg *msg, struct nn_xmsg_marker marker);
void *nn_xmsg_append (struct nn_xmsg *m, struct nn_xmsg_marker *marker, size_t sz);
void nn_xmsg_shrink (struct nn_xmsg *m, struct nn_xmsg_marker marker, size_t sz);
void nn_xmsg_serdata (struct nn_xmsg *m, struct serdata *serdata, size_t off, size_t len);
void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len);
void nn_xmsg_submsg_setnext (struct nn_xmsg *msg, struct nn_xmsg_marker marker);
void nn_xmsg_submsg_init (struct nn_xmsg *msg, struct nn_xmsg_marker marker, SubmessageKind_t smkind);
void nn_xmsg_add_timestamp (struct nn_xmsg *m, nn_wctime_t t);
@ -125,7 +127,7 @@ void nn_xmsg_addpar_stringseq (struct nn_xmsg *m, unsigned pid, const nn_strings
void nn_xmsg_addpar_guid (struct nn_xmsg *m, unsigned pid, const nn_guid_t *guid);
void nn_xmsg_addpar_BE4u (struct nn_xmsg *m, unsigned pid, unsigned x);
void nn_xmsg_addpar_4u (struct nn_xmsg *m, unsigned pid, unsigned x);
void nn_xmsg_addpar_keyhash (struct nn_xmsg *m, const struct serdata *serdata);
void nn_xmsg_addpar_keyhash (struct nn_xmsg *m, const struct ddsi_serdata *serdata);
void nn_xmsg_addpar_statusinfo (struct nn_xmsg *m, unsigned statusinfo);
void nn_xmsg_addpar_reliability (struct nn_xmsg *m, unsigned pid, const struct nn_reliability_qospolicy *rq);
void nn_xmsg_addpar_share (struct nn_xmsg *m, unsigned pid, const struct nn_share_qospolicy *rq);

View file

@ -1,242 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include "ddsi/ddsi_ser.h"
#include <stddef.h>
#include <ctype.h>
#include <assert.h>
#include <string.h>
#include "os/os_stdlib.h"
#include "os/os_defs.h"
#include "os/os_thread.h"
#include "os/os_heap.h"
#include "os/os_atomics.h"
#include "ddsi/sysdeps.h"
#include "ddsi/q_md5.h"
#include "ddsi/q_bswap.h"
#include "ddsi/q_config.h"
#include "ddsi/q_freelist.h"
#include "q__osplser.h"
#define MAX_POOL_SIZE 16384
#define CLEAR_PADDING 0
#ifndef NDEBUG
static int ispowerof2_size (size_t x)
{
return x > 0 && !(x & (x-1));
}
#endif
static size_t alignup_size (size_t x, size_t a);
static serstate_t serstate_allocnew (serstatepool_t pool, const struct sertopic * topic);
serstatepool_t ddsi_serstatepool_new (void)
{
serstatepool_t pool;
pool = os_malloc (sizeof (*pool));
nn_freelist_init (&pool->freelist, MAX_POOL_SIZE, offsetof (struct serstate, next));
return pool;
}
static void serstate_free_wrap (void *elem)
{
serstate_free (elem);
}
void ddsi_serstatepool_free (serstatepool_t pool)
{
TRACE (("ddsi_serstatepool_free(%p)\n", pool));
nn_freelist_fini (&pool->freelist, serstate_free_wrap);
os_free (pool);
}
int ddsi_serdata_refcount_is_1 (serdata_t serdata)
{
return (os_atomic_ld32 (&serdata->v.st->refcount) == 1);
}
serdata_t ddsi_serdata_ref (serdata_t serdata)
{
os_atomic_inc32 (&serdata->v.st->refcount);
return serdata;
}
void ddsi_serdata_unref (serdata_t serdata)
{
ddsi_serstate_release (serdata->v.st);
}
nn_mtime_t ddsi_serdata_twrite (const struct serdata *serdata)
{
return ddsi_serstate_twrite (serdata->v.st);
}
void ddsi_serdata_set_twrite (serdata_t serdata, nn_mtime_t twrite)
{
ddsi_serstate_set_twrite (serdata->v.st, twrite);
}
serstate_t ddsi_serstate_new (const struct sertopic * topic)
{
serstate_t st;
if ((st = nn_freelist_pop (&gv.serpool->freelist)) != NULL)
serstate_init (st, topic);
else
st = serstate_allocnew (gv.serpool, topic);
return st;
}
serdata_t ddsi_serstate_fix (serstate_t st)
{
/* see serialize_raw_private() */
ddsi_serstate_append_aligned (st, 0, 4);
return st->data;
}
nn_mtime_t ddsi_serstate_twrite (const struct serstate *serstate)
{
assert (serstate->twrite.v >= 0);
return serstate->twrite;
}
void ddsi_serstate_set_twrite (serstate_t st, nn_mtime_t twrite)
{
st->twrite = twrite;
}
void ddsi_serstate_append_blob (serstate_t st, size_t align, size_t sz, const void *data)
{
char *p = ddsi_serstate_append_aligned (st, sz, align);
memcpy (p, data, sz);
}
void ddsi_serstate_set_msginfo (serstate_t st, unsigned statusinfo, nn_wctime_t timestamp)
{
serdata_t d = st->data;
d->v.msginfo.statusinfo = statusinfo;
d->v.msginfo.timestamp = timestamp;
}
uint32_t ddsi_serdata_size (const struct serdata *serdata)
{
const struct serstate *st = serdata->v.st;
if (serdata->v.st->kind == STK_EMPTY)
return 0;
else
return (uint32_t) (sizeof (struct CDRHeader) + st->pos);
}
void ddsi_serdata_getblob (void **raw, size_t *sz, serdata_t serdata)
{
const struct serstate *st = serdata->v.st;
if (serdata->v.st->kind == STK_EMPTY)
{
*sz = 0;
*raw = NULL;
}
else
{
*sz = sizeof (struct CDRHeader) + st->pos;
*raw = &serdata->hdr;
}
}
int ddsi_serdata_is_key (const struct serdata * serdata)
{
return serdata->v.st->kind == STK_KEY;
}
int ddsi_serdata_is_empty (const struct serdata * serdata)
{
return serdata->v.st->kind == STK_EMPTY;
}
/* Internal static functions */
static serstate_t serstate_allocnew (serstatepool_t pool, const struct sertopic * topic)
{
serstate_t st = os_malloc (sizeof (*st));
size_t size;
memset (st, 0, sizeof (*st));
st->size = 128;
st->pool = pool;
size = offsetof (struct serdata, data) + st->size;
st->data = os_malloc (size);
memset (st->data, 0, sizeof (*st->data));
st->data->v.st = st;
serstate_init (st, topic);
return st;
}
void * ddsi_serstate_append (serstate_t st, size_t n)
{
char *p;
if (st->pos + n > st->size)
{
size_t size1 = alignup_size (st->pos + n, 128);
serdata_t data1 = os_realloc (st->data, offsetof (struct serdata, data) + size1);
st->data = data1;
st->size = size1;
}
assert (st->pos + n <= st->size);
p = st->data->data + st->pos;
st->pos += n;
return p;
}
void ddsi_serstate_release (serstate_t st)
{
if (os_atomic_dec32_ov (&st->refcount) == 1)
{
serstatepool_t pool = st->pool;
sertopic_free ((sertopic_t) st->topic);
if (!nn_freelist_push (&pool->freelist, st))
serstate_free (st);
}
}
void * ddsi_serstate_append_align (serstate_t st, size_t sz)
{
return ddsi_serstate_append_aligned (st, sz, sz);
}
void * ddsi_serstate_append_aligned (serstate_t st, size_t n, size_t a)
{
/* Simply align st->pos, without verifying it fits in the allocated
buffer: ddsi_serstate_append() is called immediately afterward and will
grow the buffer as soon as the end of the requested space no
longer fits. */
#if CLEAR_PADDING
size_t pos0 = st->pos;
#endif
char *p;
assert (ispowerof2_size (a));
st->pos = alignup_size (st->pos, a);
p = ddsi_serstate_append (st, n);
#if CLEAR_PADDING
if (p && st->pos > pos0)
memset (st->data->data + pos0, 0, st->pos - pos0);
#endif
return p;
}
static size_t alignup_size (size_t x, size_t a)
{
size_t m = a-1;
assert (ispowerof2_size (a));
return (x+m) & ~m;
}

View file

@ -0,0 +1,48 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stddef.h>
#include <ctype.h>
#include <assert.h>
#include <string.h>
#include "os/os.h"
#include "ddsi/sysdeps.h"
#include "ddsi/q_md5.h"
#include "ddsi/q_bswap.h"
#include "ddsi/q_config.h"
#include "ddsi/q_freelist.h"
#include "ddsi/ddsi_serdata.h"
void ddsi_serdata_init (struct ddsi_serdata *d, const struct ddsi_sertopic *tp, enum ddsi_serdata_kind kind)
{
d->topic = tp;
d->ops = tp->serdata_ops;
d->kind = kind;
d->hash = 0;
d->statusinfo = 0;
d->timestamp.v = INT64_MIN;
d->twrite.v = INT64_MIN;
os_atomic_st32 (&d->refc, 1);
}
extern inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *serdata_const);
extern inline void ddsi_serdata_unref (struct ddsi_serdata *serdata);
extern inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d);
extern inline struct ddsi_serdata *ddsi_serdata_from_ser (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size);
extern inline struct ddsi_serdata *ddsi_serdata_from_keyhash (const struct ddsi_sertopic *topic, const struct nn_keyhash *keyhash);
extern inline struct ddsi_serdata *ddsi_serdata_from_sample (const struct ddsi_sertopic *topic, enum ddsi_serdata_kind kind, const void *sample);
extern inline void ddsi_serdata_to_ser (const struct ddsi_serdata *d, size_t off, size_t sz, void *buf);
extern inline struct ddsi_serdata *ddsi_serdata_to_ser_ref (const struct ddsi_serdata *d, size_t off, size_t sz, ddsi_iovec_t *ref);
extern inline void ddsi_serdata_to_ser_unref (struct ddsi_serdata *d, const ddsi_iovec_t *ref);
extern inline bool ddsi_serdata_to_sample (const struct ddsi_serdata *d, void *sample, void **bufptr, void *buflim);
extern inline int ddsi_serdata_cmpkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b);
extern inline bool ddsi_serdata_eqkey (const struct ddsi_serdata *a, const struct ddsi_serdata *b);

View file

@ -0,0 +1,510 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stddef.h>
#include <ctype.h>
#include <assert.h>
#include <string.h>
#include "os/os.h"
#include "ddsi/sysdeps.h"
#include "ddsi/q_md5.h"
#include "ddsi/q_bswap.h"
#include "ddsi/q_config.h"
#include "ddsi/q_freelist.h"
#include <assert.h>
#include <string.h>
#include "os/os.h"
#include "dds__key.h"
#include "dds__tkmap.h"
#include "dds__stream.h"
#include "ddsi/q_radmin.h"
#include "ddsi/ddsi_serdata_default.h"
#define MAX_POOL_SIZE 16384
#define CLEAR_PADDING 0
#ifndef NDEBUG
static int ispowerof2_size (size_t x)
{
return x > 0 && !(x & (x-1));
}
#endif
static size_t alignup_size (size_t x, size_t a);
struct serstatepool * ddsi_serstatepool_new (void)
{
struct serstatepool * pool;
pool = os_malloc (sizeof (*pool));
nn_freelist_init (&pool->freelist, MAX_POOL_SIZE, offsetof (struct ddsi_serdata_default, next));
return pool;
}
static void serstate_free_wrap (void *elem)
{
#ifndef NDEBUG
struct ddsi_serdata_default *d = elem;
assert(os_atomic_ld32(&d->c.refc) == 1);
#endif
ddsi_serdata_unref(elem);
}
void ddsi_serstatepool_free (struct serstatepool * pool)
{
TRACE (("ddsi_serstatepool_free(%p)\n", pool));
nn_freelist_fini (&pool->freelist, serstate_free_wrap);
os_free (pool);
}
void ddsi_serstate_append_blob (struct serstate * st, size_t align, size_t sz, const void *data)
{
char *p = ddsi_serstate_append_aligned (st, sz, align);
memcpy (p, data, sz);
}
static size_t alignup_size (size_t x, size_t a)
{
size_t m = a-1;
assert (ispowerof2_size (a));
return (x+m) & ~m;
}
static size_t alignup4 (size_t x)
{
return alignup_size (x, 4);
}
void * ddsi_serstate_append (struct serstate * st, size_t n)
{
char *p;
if (st->data->pos + n > st->data->size)
{
size_t size1 = alignup_size (st->data->pos + n, 128);
struct ddsi_serdata_default * data1 = os_realloc (st->data, offsetof (struct ddsi_serdata_default, data) + size1);
st->data = data1;
st->data->size = (uint32_t)size1;
}
assert (st->data->pos + n <= st->data->size);
p = st->data->data + st->data->pos;
st->data->pos += (uint32_t)n;
return p;
}
void * ddsi_serstate_append_aligned (struct serstate * st, size_t n, size_t a)
{
/* Simply align st->pos, without verifying it fits in the allocated
buffer: ddsi_serstate_append() is called immediately afterward and will
grow the buffer as soon as the end of the requested space no
longer fits. */
#if CLEAR_PADDING
size_t pos0 = st->pos;
#endif
char *p;
assert (ispowerof2_size (a));
st->data->pos = (uint32_t) alignup_size (st->data->pos, a);
p = ddsi_serstate_append (st, n);
#if CLEAR_PADDING
if (p && st->pos > pos0)
memset (st->data->data + pos0, 0, st->pos - pos0);
#endif
return p;
}
/* Fixed seed and length */
#define DDS_MH3_LEN 16
#define DDS_MH3_SEED 0
#define DDS_MH3_ROTL32(x,r) (((x) << (r)) | ((x) >> (32 - (r))))
/* Really
http://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp,
MurmurHash3_x86_32
*/
static uint32_t dds_mh3 (const void * key)
{
const uint8_t *data = (const uint8_t *) key;
const intptr_t nblocks = (intptr_t) (DDS_MH3_LEN / 4);
const uint32_t c1 = 0xcc9e2d51;
const uint32_t c2 = 0x1b873593;
uint32_t h1 = DDS_MH3_SEED;
const uint32_t *blocks = (const uint32_t *) (data + nblocks * 4);
register intptr_t i;
for (i = -nblocks; i; i++)
{
uint32_t k1 = blocks[i];
k1 *= c1;
k1 = DDS_MH3_ROTL32 (k1, 15);
k1 *= c2;
h1 ^= k1;
h1 = DDS_MH3_ROTL32 (h1, 13);
h1 = h1 * 5+0xe6546b64;
}
/* finalization */
h1 ^= DDS_MH3_LEN;
h1 ^= h1 >> 16;
h1 *= 0x85ebca6b;
h1 ^= h1 >> 13;
h1 *= 0xc2b2ae35;
h1 ^= h1 >> 16;
return h1;
}
static struct ddsi_serdata *fix_serdata_default(struct ddsi_serdata_default *d, uint64_t tp_iid)
{
if (d->keyhash.m_flags & DDS_KEY_IS_HASH)
d->c.hash = dds_mh3 (d->keyhash.m_hash) ^ (uint32_t)tp_iid;
else
d->c.hash = *((uint32_t *)d->keyhash.m_hash) ^ (uint32_t)tp_iid;
return &d->c;
}
static uint32_t serdata_default_get_size(const struct ddsi_serdata *dcmn)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *) dcmn;
return d->pos + (uint32_t)sizeof (struct CDRHeader);
}
static bool serdata_default_eqkey(const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn)
{
const struct ddsi_serdata_default *a = (const struct ddsi_serdata_default *)acmn;
const struct ddsi_serdata_default *b = (const struct ddsi_serdata_default *)bcmn;
const struct ddsi_sertopic_default *tp;
assert(a->c.ops == b->c.ops);
tp = (struct ddsi_sertopic_default *)a->c.topic;
if (tp->nkeys == 0)
return true;
else
{
assert (a->keyhash.m_flags & DDS_KEY_HASH_SET);
return memcmp (a->keyhash.m_hash, b->keyhash.m_hash, 16) == 0;
}
}
static void serdata_default_free(struct ddsi_serdata *dcmn)
{
struct ddsi_serdata_default *d = (struct ddsi_serdata_default *)dcmn;
dds_free (d->keyhash.m_key_buff);
dds_free (d);
}
static void serdata_default_init(struct ddsi_serdata_default *d, const struct ddsi_sertopic_default *tp, enum ddsi_serdata_kind kind)
{
ddsi_serdata_init (&d->c, &tp->c, kind);
d->pos = 0;
#ifndef NDEBUG
d->fixed = false;
#endif
d->hdr.identifier = tp->native_encoding_identifier;
d->hdr.options = 0;
d->bswap = false;
memset (d->keyhash.m_hash, 0, sizeof (d->keyhash.m_hash));
d->keyhash.m_key_len = 0;
d->keyhash.m_flags = 0;
d->keyhash.m_key_buff = NULL;
d->keyhash.m_key_buff_size = 0;
}
static struct ddsi_serdata_default *serdata_default_allocnew(struct serstatepool *pool)
{
const uint32_t init_size = 128;
struct ddsi_serdata_default *d = os_malloc(offsetof (struct ddsi_serdata_default, data) + init_size);
d->size = init_size;
d->pool = pool;
return d;
}
static struct ddsi_serdata_default *serdata_default_new(const struct ddsi_sertopic_default *tp, enum ddsi_serdata_kind kind)
{
struct ddsi_serdata_default *d;
if ((d = nn_freelist_pop (&gv.serpool->freelist)) == NULL)
d = serdata_default_allocnew(gv.serpool);
serdata_default_init(d, tp, kind);
return d;
}
/* Construct a serdata from a fragchain received over the network */
static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size)
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
uint32_t off = 4; /* must skip the CDR header */
struct serstate st = { .data = d };
assert (fragchain->min == 0);
assert (fragchain->maxp1 >= off); /* CDR header must be in first fragment */
(void)size;
memcpy (&d->hdr, NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)), sizeof (d->hdr));
switch (d->hdr.identifier) {
case CDR_LE:
case PL_CDR_LE:
d->bswap = ! PLATFORM_IS_LITTLE_ENDIAN;
break;
case CDR_BE:
case PL_CDR_BE:
d->bswap = PLATFORM_IS_LITTLE_ENDIAN;
break;
default:
/* must not ever try to use a serdata format for an unsupported encoding */
abort ();
}
while (fragchain)
{
assert (fragchain->min <= off);
assert (fragchain->maxp1 <= size);
if (fragchain->maxp1 > off)
{
/* only copy if this fragment adds data */
const unsigned char *payload = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain));
ddsi_serstate_append_blob (&st, 1, fragchain->maxp1 - off, payload + off - fragchain->min);
off = fragchain->maxp1;
}
fragchain = fragchain->nextfrag;
}
/* FIXME: assignment here is because of reallocs, but doing it this way is a bit hacky */
d = st.data;
dds_stream_t is;
dds_stream_from_serdata_default (&is, d);
dds_stream_read_keyhash (&is, &d->keyhash, (const dds_topic_descriptor_t *)tp->type, kind == SDK_KEY);
return fix_serdata_default (d, tp->c.iid);
}
struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash)
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY);
struct serstate st = { .data = d };
/* FIXME: not quite sure this is correct */
ddsi_serstate_append_blob (&st, 1, sizeof (keyhash->value), keyhash->value);
/* FIXME: assignment here is because of reallocs, but doing it this way is a bit hacky */
d = st.data;
return fix_serdata_default(d, tp->c.iid);
}
static struct ddsi_serdata *serdata_default_from_sample_cdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample)
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
dds_stream_t os;
dds_key_gen ((const dds_topic_descriptor_t *)tp->type, &d->keyhash, (char*)sample);
dds_stream_from_serdata_default (&os, d);
switch (kind)
{
case SDK_EMPTY:
break;
case SDK_KEY:
dds_stream_write_key (&os, sample, tp);
break;
case SDK_DATA:
dds_stream_write_sample (&os, sample, tp);
break;
}
dds_stream_add_to_serdata_default (&os, &d);
return fix_serdata_default (d, tp->c.iid);
}
static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample)
{
/* Currently restricted to DDSI discovery data (XTypes will need a rethink of the default representation and that may result in discovery data being moved to that new representation), and that means: keys are either GUIDs or an unbounded string for topics, for which MD5 is acceptable. Furthermore, these things don't get written very often, so scanning the parameter list to get the key value out is good enough for now. And at least it keeps the DDSI discovery data writing out of the internals of the sample representation */
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
const struct ddsi_plist_sample *sample = vsample;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
struct serstate st = { .data = d };
ddsi_serstate_append_blob (&st, 1, sample->size, sample->blob);
d = st.data;
const unsigned char *rawkey = nn_plist_findparam_native_unchecked (sample->blob, sample->keyparam);
#ifndef NDEBUG
size_t keysize;
#endif
switch (sample->keyparam)
{
case PID_PARTICIPANT_GUID:
case PID_ENDPOINT_GUID:
case PID_GROUP_GUID:
d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH;
d->keyhash.m_key_len = 16;
memcpy (&d->keyhash.m_hash, rawkey, d->keyhash.m_key_len);
#ifndef NDEBUG
keysize = d->keyhash.m_key_len;
#endif
break;
case PID_TOPIC_NAME: {
const char *topic_name = (const char *) (rawkey + sizeof(uint32_t));
uint32_t topic_name_sz;
uint32_t topic_name_sz_BE;
md5_state_t md5st;
md5_byte_t digest[16];
topic_name_sz = (uint32_t) strlen (topic_name) + 1;
d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET;
d->keyhash.m_key_len = 16;
md5_init (&md5st);
md5_append (&md5st, (const md5_byte_t *) &topic_name_sz_BE, sizeof (topic_name_sz_BE));
md5_append (&md5st, (const md5_byte_t *) topic_name, topic_name_sz);
md5_finish (&md5st, digest);
memcpy (&d->keyhash.m_hash, digest, d->keyhash.m_key_len);
#ifndef NDEBUG
keysize = sizeof (uint32_t) + topic_name_sz;
#endif
break;
}
default:
abort();
}
/* if we're it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */
assert (kind != SDK_KEY || rawkey == (const unsigned char *)sample->blob + sizeof (nn_parameter_t));
assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup4 (keysize) + sizeof (nn_parameter_t));
return fix_serdata_default (d, tp->c.iid);
}
static struct ddsi_serdata *serdata_default_from_sample_rawcdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample)
{
/* Currently restricted to DDSI discovery data (XTypes will need a rethink of the default representation and that may result in discovery data being moved to that new representation), and that means: keys are either GUIDs or an unbounded string for topics, for which MD5 is acceptable. Furthermore, these things don't get written very often, so scanning the parameter list to get the key value out is good enough for now. And at least it keeps the DDSI discovery data writing out of the internals of the sample representation */
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
const struct ddsi_rawcdr_sample *sample = vsample;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
struct serstate st = { .data = d };
assert (sample->keysize <= 16);
ddsi_serstate_append_blob (&st, 1, sample->size, sample->blob);
d = st.data;
d->keyhash.m_flags = DDS_KEY_SET | DDS_KEY_HASH_SET | DDS_KEY_IS_HASH;
d->keyhash.m_key_len = (uint32_t) sample->keysize;
if (sample->keysize > 0)
memcpy (&d->keyhash.m_hash, sample->key, sample->keysize);
return fix_serdata_default (d, tp->c.iid);
}
/* Fill buffer with 'size' bytes of serialised data, starting from 'off'; 0 <= off < off+sz <= alignup4(size(d)) */
static void serdata_default_to_ser (const struct ddsi_serdata *serdata_common, size_t off, size_t sz, void *buf)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
assert (off < d->pos + sizeof(struct CDRHeader));
assert (sz <= alignup4 (d->pos + sizeof(struct CDRHeader)) - off);
/* FIXME: maybe I should pull the header out ... */
memcpy (buf, (char *)&d->hdr + off, sz);
}
static struct ddsi_serdata *serdata_default_to_ser_ref (const struct ddsi_serdata *serdata_common, size_t off, size_t sz, ddsi_iovec_t *ref)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
assert (off < d->pos + sizeof(struct CDRHeader));
assert (sz <= alignup4 (d->pos + sizeof(struct CDRHeader)) - off);
ref->iov_base = (char *)&d->hdr + off;
ref->iov_len = sz;
return ddsi_serdata_ref(serdata_common);
}
static void serdata_default_to_ser_unref (struct ddsi_serdata *serdata_common, const ddsi_iovec_t *ref)
{
(void)ref;
ddsi_serdata_unref(serdata_common);
}
static bool serdata_default_to_sample_cdr (const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
dds_stream_t is;
if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */
dds_stream_from_serdata_default(&is, d);
if (d->c.kind == SDK_KEY)
dds_stream_read_key (&is, sample, (const dds_topic_descriptor_t*) ((struct ddsi_sertopic_default *)d->c.topic)->type);
else
dds_stream_read_sample (&is, sample, (const struct ddsi_sertopic_default *)d->c.topic);
return true; /* FIXME: can't conversion to sample fail? */
}
static bool serdata_default_to_sample_plist (const struct ddsi_serdata *serdata_common, void *vsample, void **bufptr, void *buflim)
{
#if 0
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
struct ddsi_plist_sample *sample = vsample;
/* output of to_sample for normal samples is a copy, and so it should be for this one; only for native format (like the inverse) */
if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */
assert (d->hdr.identifier == PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE);
sample->size = d->pos;
sample->blob = os_malloc (sample->size);
memcpy (sample->blob, (char *)&d->hdr + sizeof(struct CDRHeader), sample->size);
sample->keyparam = PID_PAD;
return true;
#else
/* I don't think I need this */
(void)serdata_common; (void)vsample; (void)bufptr; (void)buflim;
abort();
return false;
#endif
}
static bool serdata_default_to_sample_rawcdr (const struct ddsi_serdata *serdata_common, void *vsample, void **bufptr, void *buflim)
{
/* I don't think I need this */
(void)serdata_common; (void)vsample; (void)bufptr; (void)buflim;
abort();
return false;
}
const struct ddsi_serdata_ops ddsi_serdata_ops_cdr = {
.get_size = serdata_default_get_size,
.cmpkey = 0,
.eqkey = serdata_default_eqkey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser,
.from_keyhash = ddsi_serdata_from_keyhash_cdr,
.from_sample = serdata_default_from_sample_cdr,
.to_ser = serdata_default_to_ser,
.to_sample = serdata_default_to_sample_cdr,
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref
};
const struct ddsi_serdata_ops ddsi_serdata_ops_plist = {
.get_size = serdata_default_get_size,
.cmpkey = 0,
.eqkey = serdata_default_eqkey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser,
.from_keyhash = 0, /* q_ddsi_discovery.c takes care of it internally */
.from_sample = serdata_default_from_sample_plist,
.to_ser = serdata_default_to_ser,
.to_sample = serdata_default_to_sample_plist,
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref
};
const struct ddsi_serdata_ops ddsi_serdata_ops_rawcdr = {
.get_size = serdata_default_get_size,
.cmpkey = 0,
.eqkey = serdata_default_eqkey,
.free = serdata_default_free,
.from_ser = serdata_default_from_ser,
.from_keyhash = 0, /* q_ddsi_discovery.c takes care of it internally */
.from_sample = serdata_default_from_sample_rawcdr,
.to_ser = serdata_default_to_ser,
.to_sample = serdata_default_to_sample_rawcdr,
.to_ser_ref = serdata_default_to_ser_ref,
.to_ser_unref = serdata_default_to_ser_unref
};

View file

@ -0,0 +1,46 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stddef.h>
#include <ctype.h>
#include <assert.h>
#include <string.h>
#include "os/os.h"
#include "ddsi/sysdeps.h"
#include "ddsi/q_md5.h"
#include "ddsi/q_bswap.h"
#include "ddsi/q_config.h"
#include "ddsi/q_freelist.h"
#include "ddsi/ddsi_sertopic.h"
struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *sertopic_const)
{
struct ddsi_sertopic *sertopic = (struct ddsi_sertopic *)sertopic_const;
if (sertopic)
os_atomic_inc32 (&sertopic->refc);
return sertopic;
}
void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic)
{
if (sertopic)
{
if (os_atomic_dec32_ov (&sertopic->refc) == 1)
{
sertopic->ops->deinit (sertopic);
os_free (sertopic->name_typename);
os_free (sertopic->name);
os_free (sertopic->typename);
os_free (sertopic);
}
}
}

View file

@ -0,0 +1,35 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stddef.h>
#include <ctype.h>
#include <assert.h>
#include <string.h>
#include "os/os.h"
#include "ddsi/sysdeps.h"
#include "ddsi/q_md5.h"
#include "ddsi/q_bswap.h"
#include "ddsi/q_config.h"
#include "ddsi/q_freelist.h"
#include "ddsi/ddsi_sertopic.h"
#include "ddsi/ddsi_serdata_default.h"
/* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */
static void deinit_sertopic_default (struct ddsi_sertopic *tp)
{
(void)tp;
}
const struct ddsi_sertopic_ops ddsi_sertopic_ops_default = {
.deinit = deinit_sertopic_default
};

View file

@ -18,7 +18,6 @@
#include "os/os.h"
#include "util/ut_avl.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/q_protocol.h"
#include "ddsi/q_rtps.h"
#include "ddsi/q_misc.h"
@ -39,7 +38,7 @@
#include "ddsi/q_lease.h"
#include "ddsi/q_error.h"
#include "ddsi/q_builtin_topic.h"
#include "q__osplser.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/q_md5.h"
#include "ddsi/q_feature_check.h"
@ -173,21 +172,28 @@ static void maybe_add_pp_as_meta_to_as_disc (const struct addrset *as_meta)
}
}
static int write_mpayload (struct writer *wr, int alive, nn_parameterid_t keyparam, struct nn_xmsg *mpayload)
{
struct ddsi_plist_sample plist_sample;
struct ddsi_serdata *serdata;
nn_xmsg_payload_to_plistsample (&plist_sample, keyparam, mpayload);
serdata = ddsi_serdata_from_sample (gv.plist_topic, alive ? SDK_DATA : SDK_KEY, &plist_sample);
serdata->statusinfo = alive ? 0 : NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
serdata->timestamp = now ();
return write_sample_nogc_notk (NULL, wr, serdata);
}
int spdp_write (struct participant *pp)
{
static const nn_vendorid_t myvendorid = MY_VENDOR_ID;
serdata_t serdata;
serstate_t serstate;
struct nn_xmsg *mpayload;
size_t payload_sz;
char *payload_blob;
struct nn_locators_one def_uni_loc_one, def_multi_loc_one, meta_uni_loc_one, meta_multi_loc_one;
nn_plist_t ps;
nn_guid_t kh;
struct writer *wr;
size_t size;
char node[64];
uint64_t qosdiff;
int ret;
if (pp->e.onlylocal) {
/* This topic is only locally available. */
@ -317,32 +323,19 @@ int spdp_write (struct participant *pp)
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, 0);
nn_plist_addtomsg (mpayload, pp->plist, 0, qosdiff);
nn_xmsg_addpar_sentinel (mpayload);
nn_plist_fini (&ps);
/* A NULL topic implies a parameter list, now that we do PMD through
the serializer */
serstate = ddsi_serstate_new (NULL);
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
kh = nn_hton_guid (pp->e.guid);
serstate_set_key (serstate, 0, &kh);
ddsi_serstate_set_msginfo (serstate, 0, now ());
serdata = ddsi_serstate_fix (serstate);
nn_plist_fini(&ps);
ret = write_mpayload (wr, 1, PID_PARTICIPANT_GUID, mpayload);
nn_xmsg_free (mpayload);
return write_sample_nogc_notk (NULL, wr, serdata);
return ret;
}
int spdp_dispose_unregister (struct participant *pp)
{
struct nn_xmsg *mpayload;
size_t payload_sz;
char *payload_blob;
nn_plist_t ps;
serdata_t serdata;
serstate_t serstate;
nn_guid_t kh;
struct writer *wr;
int ret;
if ((wr = get_builtin_writer (pp, NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)) == NULL)
{
@ -356,17 +349,11 @@ int spdp_dispose_unregister (struct participant *pp)
ps.participant_guid = pp->e.guid;
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
nn_xmsg_addpar_sentinel (mpayload);
nn_plist_fini (&ps);
serstate = ddsi_serstate_new (NULL);
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
kh = nn_hton_guid (pp->e.guid);
serstate_set_key (serstate, 1, &kh);
ddsi_serstate_set_msginfo (serstate, NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER, now ());
serdata = ddsi_serstate_fix (serstate);
ret = write_mpayload (wr, 0, PID_PARTICIPANT_GUID, mpayload);
nn_xmsg_free (mpayload);
return write_sample_nogc_notk (NULL, wr, serdata);
return ret;
}
static unsigned pseudo_random_delay (const nn_guid_t *x, const nn_guid_t *y, nn_mtime_t tnow)
@ -718,13 +705,17 @@ static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t time
/* If unicast locators not present, then try to obtain from connection */
if (!config.tcp_use_peeraddr_for_unicast && (datap->present & PP_DEFAULT_UNICAST_LOCATOR) && (get_locator (&loc, &datap->default_unicast_locators, uc_same_subnet)))
add_to_addrset (as_default, &loc);
else
nn_log (LC_DISCOVERY, " (srclocD)"), add_to_addrset (as_default, &rst->srcloc);
else {
nn_log (LC_DISCOVERY, " (srclocD)");
add_to_addrset (as_default, &rst->srcloc);
}
if (!config.tcp_use_peeraddr_for_unicast && (datap->present & PP_METATRAFFIC_UNICAST_LOCATOR) && (get_locator (&loc, &datap->metatraffic_unicast_locators, uc_same_subnet)))
add_to_addrset (as_meta, &loc);
else
nn_log (LC_DISCOVERY, " (srclocM)"), add_to_addrset (as_meta, &rst->srcloc);
else {
nn_log (LC_DISCOVERY, " (srclocM)");
add_to_addrset (as_meta, &rst->srcloc);
}
nn_log_addrset (LC_DISCOVERY, " (data", as_default);
nn_log_addrset (LC_DISCOVERY, " meta", as_meta);
@ -880,22 +871,16 @@ static void add_locator_to_ps (const nn_locator_t *loc, void *arg)
static int sedp_write_endpoint
(
struct writer *wr, int end_of_life, const nn_guid_t *epguid,
struct writer *wr, int alive, const nn_guid_t *epguid,
const struct entity_common *common, const struct endpoint_common *epcommon,
const nn_xqos_t *xqos, struct addrset *as)
{
const nn_xqos_t *defqos = is_writer_entityid (epguid->entityid) ? &gv.default_xqos_wr : &gv.default_xqos_rd;
const nn_vendorid_t my_vendor_id = MY_VENDOR_ID;
const int just_key = end_of_life;
struct nn_xmsg *mpayload;
uint64_t qosdiff;
nn_guid_t kh;
nn_plist_t ps;
serstate_t serstate;
serdata_t serdata;
void *payload_blob;
size_t payload_sz;
unsigned statusinfo;
int ret;
nn_plist_init_empty (&ps);
ps.present |= PP_ENDPOINT_GUID;
@ -908,7 +893,7 @@ static int sedp_write_endpoint
ps.entity_name = common->name;
}
if (end_of_life)
if (!alive)
{
assert (xqos == NULL);
assert (epcommon == NULL);
@ -966,23 +951,10 @@ static int sedp_write_endpoint
nn_xmsg_addpar_sentinel (mpayload);
nn_plist_fini (&ps);
/* Then we take the payload from the message and turn it into a
serdata, and then we can write it as normal data */
serstate = ddsi_serstate_new (NULL);
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
kh = nn_hton_guid (*epguid);
serstate_set_key (serstate, just_key, &kh);
if (end_of_life)
statusinfo = NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
else
statusinfo = 0;
ddsi_serstate_set_msginfo (serstate, statusinfo, now ());
serdata = ddsi_serstate_fix (serstate);
nn_xmsg_free (mpayload);
TRACE (("sedp: write for %x:%x:%x:%x via %x:%x:%x:%x\n", PGUID (*epguid), PGUID (wr->e.guid)));
return write_sample_nogc_notk (NULL, wr, serdata);
ret = write_mpayload (wr, alive, PID_ENDPOINT_GUID, mpayload);
nn_xmsg_free (mpayload);
return ret;
}
static struct writer *get_sedp_writer (const struct participant *pp, unsigned entityid)
@ -1003,7 +975,7 @@ int sedp_write_writer (struct writer *wr)
#else
struct addrset *as = NULL;
#endif
return sedp_write_endpoint (sedp_wr, 0, &wr->e.guid, &wr->e, &wr->c, wr->xqos, as);
return sedp_write_endpoint (sedp_wr, 1, &wr->e.guid, &wr->e, &wr->c, wr->xqos, as);
}
return 0;
}
@ -1018,7 +990,7 @@ int sedp_write_reader (struct reader *rd)
#else
struct addrset *as = NULL;
#endif
return sedp_write_endpoint (sedp_wr, 0, &rd->e.guid, &rd->e, &rd->c, rd->xqos, as);
return sedp_write_endpoint (sedp_wr, 1, &rd->e.guid, &rd->e, &rd->c, rd->xqos, as);
}
return 0;
}
@ -1028,7 +1000,7 @@ int sedp_dispose_unregister_writer (struct writer *wr)
if ((!is_builtin_entityid(wr->e.guid.entityid, ownvendorid)) && (!wr->e.onlylocal))
{
struct writer *sedp_wr = get_sedp_writer (wr->c.pp, NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER);
return sedp_write_endpoint (sedp_wr, 1, &wr->e.guid, NULL, NULL, NULL, NULL);
return sedp_write_endpoint (sedp_wr, 0, &wr->e.guid, NULL, NULL, NULL, NULL);
}
return 0;
}
@ -1038,7 +1010,7 @@ int sedp_dispose_unregister_reader (struct reader *rd)
if ((!is_builtin_entityid(rd->e.guid.entityid, ownvendorid)) && (!rd->e.onlylocal))
{
struct writer *sedp_wr = get_sedp_writer (rd->c.pp, NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
return sedp_write_endpoint (sedp_wr, 1, &rd->e.guid, NULL, NULL, NULL, NULL);
return sedp_write_endpoint (sedp_wr, 0, &rd->e.guid, NULL, NULL, NULL, NULL);
}
return 0;
}
@ -1255,9 +1227,14 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
if (!config.tcp_use_peeraddr_for_unicast && (datap->present & PP_UNICAST_LOCATOR) && get_locator (&loc, &datap->unicast_locators, 0))
add_to_addrset (as, &loc);
else if (config.tcp_use_peeraddr_for_unicast)
nn_log (LC_DISCOVERY, " (srcloc)"), add_to_addrset (as, &rst->srcloc);
{
nn_log (LC_DISCOVERY, " (srcloc)");
add_to_addrset (as, &rst->srcloc);
}
else
{
copy_addrset_into_addrset_uc (as, pp->as_default);
}
if ((datap->present & PP_MULTICAST_LOCATOR) && get_locator (&loc, &datap->multicast_locators, 0))
allowmulticast_aware_add_to_addrset (as, &loc);
else
@ -1404,15 +1381,8 @@ int sedp_write_topic (struct participant *pp, const struct nn_plist *datap)
{
struct writer *sedp_wr;
struct nn_xmsg *mpayload;
serstate_t serstate;
serdata_t serdata;
void *payload_blob;
size_t payload_sz;
uint32_t topic_name_sz;
uint32_t topic_name_sz_BE;
uint64_t delta;
unsigned char digest[16];
md5_state_t md5st;
int ret;
assert (datap->qos.present & QP_TOPIC_NAME);
@ -1430,25 +1400,10 @@ int sedp_write_topic (struct participant *pp, const struct nn_plist *datap)
nn_plist_addtomsg (mpayload, datap, ~(uint64_t)0, delta);
nn_xmsg_addpar_sentinel (mpayload);
serstate = ddsi_serstate_new (NULL);
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
topic_name_sz = (uint32_t) strlen (datap->qos.topic_name) + 1;
topic_name_sz_BE = toBE4u (topic_name_sz);
md5_init (&md5st);
md5_append (&md5st, (const md5_byte_t *) &topic_name_sz_BE, sizeof (topic_name_sz_BE));
md5_append (&md5st, (const md5_byte_t *) datap->qos.topic_name, topic_name_sz);
md5_finish (&md5st, digest);
serstate_set_key (serstate, 0, digest);
ddsi_serstate_set_msginfo (serstate, 0, now ());
serdata = ddsi_serstate_fix (serstate);
nn_xmsg_free (mpayload);
TRACE (("sedp: write topic %s via %x:%x:%x:%x\n", datap->qos.topic_name, PGUID (sedp_wr->e.guid)));
return write_sample_nogc_notk (NULL, sedp_wr, serdata);
ret = write_mpayload (sedp_wr, 1, PID_TOPIC_NAME, mpayload);
nn_xmsg_free (mpayload);
return ret;
}
@ -1462,13 +1417,8 @@ int sedp_write_cm_participant (struct participant *pp, int alive)
{
struct writer * sedp_wr;
struct nn_xmsg *mpayload;
serstate_t serstate;
serdata_t serdata;
nn_plist_t ps;
nn_guid_t kh;
void *payload_blob;
size_t payload_sz;
unsigned statusinfo;
int ret;
if (pp->e.onlylocal) {
/* This topic is only locally available. */
@ -1487,9 +1437,10 @@ int sedp_write_cm_participant (struct participant *pp, int alive)
nn_plist_init_empty (&ps);
ps.present = PP_PARTICIPANT_GUID;
ps.participant_guid = pp->e.guid;
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
nn_plist_fini (&ps);
if (alive)
{
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
nn_plist_addtomsg (mpayload, pp->plist,
PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID |
PP_PRISMTECH_WATCHDOG_SCHEDULING | PP_PRISMTECH_LISTENER_SCHEDULING |
@ -1498,23 +1449,11 @@ int sedp_write_cm_participant (struct participant *pp, int alive)
}
nn_xmsg_addpar_sentinel (mpayload);
/* Then we take the payload from the message and turn it into a
serdata, and then we can write it as normal data */
serstate = ddsi_serstate_new (NULL);
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
kh = nn_hton_guid (pp->e.guid);
serstate_set_key (serstate, !alive, &kh);
if (!alive)
statusinfo = NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
else
statusinfo = 0;
ddsi_serstate_set_msginfo (serstate, statusinfo, now ());
serdata = ddsi_serstate_fix (serstate);
TRACE (("sedp: write CMParticipant ST%x for %x:%x:%x:%x via %x:%x:%x:%x\n",
alive ? 0 : NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER, PGUID (pp->e.guid), PGUID (sedp_wr->e.guid)));
ret = write_mpayload (sedp_wr, alive, PID_PARTICIPANT_GUID, mpayload);
nn_xmsg_free (mpayload);
TRACE (("sedp: write CMParticipant ST%x for %x:%x:%x:%x via %x:%x:%x:%x\n", statusinfo, PGUID (pp->e.guid), PGUID (sedp_wr->e.guid)));
return write_sample_nogc_notk (NULL, sedp_wr, serdata);
return ret;
}
static void handle_SEDP_CM (const struct receiver_state *rst, nn_entityid_t wr_entity_id, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, unsigned len)
@ -1576,13 +1515,8 @@ int sedp_write_cm_publisher (const struct nn_plist *datap, int alive)
struct participant *pp;
struct writer *sedp_wr;
struct nn_xmsg *mpayload;
serstate_t serstate;
serdata_t serdata;
nn_guid_t kh;
void *payload_blob;
size_t payload_sz;
unsigned statusinfo;
uint64_t delta;
int ret;
if ((pp = group_guid_to_participant (&datap->group_guid)) == NULL)
{
@ -1607,23 +1541,9 @@ int sedp_write_cm_publisher (const struct nn_plist *datap, int alive)
}
nn_plist_addtomsg (mpayload, datap, ~(uint64_t)0, delta);
nn_xmsg_addpar_sentinel (mpayload);
/* Then we take the payload from the message and turn it into a
serdata, and then we can write it as normal data */
serstate = ddsi_serstate_new (NULL);
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
kh = nn_hton_guid (datap->group_guid);
serstate_set_key (serstate, !alive, &kh);
if (!alive)
statusinfo = NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
else
statusinfo = 0;
ddsi_serstate_set_msginfo (serstate, statusinfo, now ());
serdata = ddsi_serstate_fix (serstate);
ret = write_mpayload (sedp_wr, alive, PID_GROUP_GUID ,mpayload);
nn_xmsg_free (mpayload);
return write_sample_nogc_notk (NULL, sedp_wr, serdata);
return ret;
}
int sedp_write_cm_subscriber (const struct nn_plist *datap, int alive)
@ -1631,13 +1551,8 @@ int sedp_write_cm_subscriber (const struct nn_plist *datap, int alive)
struct participant *pp;
struct writer *sedp_wr;
struct nn_xmsg *mpayload;
serstate_t serstate;
serdata_t serdata;
nn_guid_t kh;
void *payload_blob;
size_t payload_sz;
unsigned statusinfo;
uint64_t delta;
int ret;
if ((pp = group_guid_to_participant (&datap->group_guid)) == NULL)
{
@ -1662,23 +1577,9 @@ int sedp_write_cm_subscriber (const struct nn_plist *datap, int alive)
}
nn_plist_addtomsg (mpayload, datap, ~(uint64_t)0, delta);
nn_xmsg_addpar_sentinel (mpayload);
/* Then we take the payload from the message and turn it into a
serdata, and then we can write it as normal data */
serstate = ddsi_serstate_new (NULL);
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
kh = nn_hton_guid (datap->group_guid);
serstate_set_key (serstate, !alive, &kh);
if (!alive)
statusinfo = NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
else
statusinfo = 0;
ddsi_serstate_set_msginfo (serstate, statusinfo, now ());
serdata = ddsi_serstate_fix (serstate);
ret = write_mpayload (sedp_wr, alive, PID_GROUP_GUID, mpayload);
nn_xmsg_free (mpayload);
return write_sample_nogc_notk (NULL, sedp_wr, serdata);
return ret;
}
static void handle_SEDP_GROUP_alive (nn_plist_t *datap /* note: potentially modifies datap */, nn_wctime_t timestamp)

View file

@ -23,7 +23,6 @@
#include "ddsi/q_misc.h"
#include "ddsi/q_log.h"
#include "ddsi/q_plist.h"
#include "q__osplser.h"
#include "ddsi/q_ephash.h"
#include "ddsi/q_globals.h"
#include "ddsi/q_addrset.h"
@ -33,7 +32,7 @@
#include "ddsi/q_unused.h"
#include "ddsi/q_error.h"
#include "ddsi/q_debmon.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/ddsi_tran.h"
#include "ddsi/ddsi_tcp.h"
@ -107,7 +106,7 @@ static int print_addrset_if_notempty (ddsi_tran_conn_t conn, const char *prefix,
}
static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e,
const struct nn_xqos *xqos, const struct sertopic *topic)
const struct nn_xqos *xqos, const struct ddsi_sertopic *topic)
{
int x = 0;
x += cpf (conn, " %s %x:%x:%x:%x ", label, PGUID (e->guid));
@ -126,7 +125,7 @@ static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label,
return x;
}
static int print_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct endpoint_common *c, const struct nn_xqos *xqos, const struct sertopic *topic)
static int print_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct endpoint_common *c, const struct nn_xqos *xqos, const struct ddsi_sertopic *topic)
{
OS_UNUSED_ARG (c);
return print_any_endpoint_common (conn, label, e, xqos, topic);

View file

@ -24,7 +24,6 @@
#include "util/ut_avl.h"
#include "ddsi/q_plist.h"
#include "ddsi/q_lease.h"
#include "q__osplser.h"
#include "ddsi/q_qosmatch.h"
#include "ddsi/q_ephash.h"
#include "ddsi/q_globals.h"
@ -37,7 +36,7 @@
#include "ddsi/q_unused.h"
#include "ddsi/q_error.h"
#include "ddsi/q_builtin_topic.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/q_receive.h"
@ -85,8 +84,8 @@ static const unsigned prismtech_builtin_writers_besmask =
NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER |
NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER;
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg);
static struct reader * new_reader_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg);
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg);
static struct reader * new_reader_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg);
static struct participant *ref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
static void unref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
static void delete_proxy_group_locked (struct proxy_group *pgroup, nn_wctime_t timestamp, int isimplicit);
@ -1611,7 +1610,7 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
while (wr->whc->ops->sample_iter_borrow_next(&it, &sample))
{
struct proxy_writer_info pwr_info;
serdata_t payload = sample.serdata;
struct ddsi_serdata *payload = sample.serdata;
/* FIXME: whc has tk reference in its index nodes, which is what we really should be iterating over anyway, and so we don't really have to look them up anymore */
struct tkmap_instance *tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (payload);
make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
@ -2297,7 +2296,7 @@ static void match_proxy_reader_with_writers (struct proxy_reader *prd, nn_mtime_
/* ENDPOINT --------------------------------------------------------- */
static void new_reader_writer_common (const struct nn_guid *guid, const struct sertopic * topic, const struct nn_xqos *xqos)
static void new_reader_writer_common (const struct nn_guid *guid, const struct ddsi_sertopic * topic, const struct nn_xqos *xqos)
{
const char *partition = "(default)";
const char *partition_suffix = "";
@ -2353,7 +2352,7 @@ static void endpoint_common_fini (struct entity_common *e, struct endpoint_commo
entity_common_fini (e);
}
static int set_topic_type_name (nn_xqos_t *xqos, const struct sertopic * topic)
static int set_topic_type_name (nn_xqos_t *xqos, const struct ddsi_sertopic * topic)
{
if (!(xqos->present & QP_TYPE_NAME) && topic)
{
@ -2556,7 +2555,7 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru
return n;
}
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity)
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity)
{
struct writer *wr;
nn_mtime_t tnow = now_mt ();
@ -2651,11 +2650,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
(wr->xqos->durability.kind == NN_VOLATILE_DURABILITY_QOS &&
wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS);
}
if (topic)
{
os_atomic_inc32 (&((struct sertopic *)topic)->refcount);
}
wr->topic = topic;
wr->topic = ddsi_sertopic_ref (topic);
wr->as = new_addrset ();
wr->as_group = NULL;
@ -2796,11 +2791,10 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
return wr;
}
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg)
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg)
{
struct participant *pp;
struct writer * wr;
unsigned entity_kind;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
{
@ -2810,9 +2804,8 @@ struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_
/* participant can't be freed while we're mucking around cos we are
awake and do not touch the thread's vtime (ephash_lookup already
verifies we're awake) */
entity_kind = (topic->nkeys ? NN_ENTITYID_KIND_WRITER_WITH_KEY : NN_ENTITYID_KIND_WRITER_NO_KEY);
wrguid->prefix = pp->e.guid.prefix;
if (pp_allocate_entityid (&wrguid->entityid, entity_kind, pp) < 0)
if (pp_allocate_entityid (&wrguid->entityid, NN_ENTITYID_KIND_WRITER_WITH_KEY, pp) < 0)
return NULL;
wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg);
return wr;
@ -2872,7 +2865,7 @@ static void gc_delete_writer (struct gcreq *gcreq)
local_reader_ary_fini (&wr->rdary);
os_condDestroy (&wr->throttle_cond);
sertopic_free ((struct sertopic *) wr->topic);
ddsi_sertopic_unref ((struct ddsi_sertopic *) wr->topic);
endpoint_common_fini (&wr->e, &wr->c);
os_free (wr);
}
@ -3117,7 +3110,7 @@ static struct reader * new_reader_guid
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp,
const struct sertopic *topic,
const struct ddsi_sertopic *topic,
const struct nn_xqos *xqos,
struct rhc *rhc,
status_cb_t status_cb,
@ -3155,11 +3148,7 @@ static struct reader * new_reader_guid
rd->reliable = (rd->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS);
assert (rd->xqos->present & QP_DURABILITY);
rd->handle_as_transient_local = (rd->xqos->durability.kind == NN_TRANSIENT_LOCAL_DURABILITY_QOS);
if (topic)
{
os_atomic_inc32 (&((struct sertopic *)topic)->refcount);
}
rd->topic = topic;
rd->topic = ddsi_sertopic_ref (topic);
rd->ddsi2direct_cb = 0;
rd->ddsi2direct_cbarg = 0;
rd->init_acknack_count = 0;
@ -3253,7 +3242,7 @@ struct reader * new_reader
struct nn_guid *rdguid,
const struct nn_guid *group_guid,
const struct nn_guid *ppguid,
const struct sertopic *topic,
const struct ddsi_sertopic *topic,
const struct nn_xqos *xqos,
struct rhc * rhc,
status_cb_t status_cb,
@ -3262,16 +3251,14 @@ struct reader * new_reader
{
struct participant * pp;
struct reader * rd;
unsigned entity_kind;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
{
nn_log (LC_DISCOVERY, "new_reader - participant %x:%x:%x:%x not found\n", PGUID (*ppguid));
return NULL;
}
entity_kind = (topic->nkeys ? NN_ENTITYID_KIND_READER_WITH_KEY : NN_ENTITYID_KIND_READER_NO_KEY);
rdguid->prefix = pp->e.guid.prefix;
if (pp_allocate_entityid (&rdguid->entityid, entity_kind, pp) < 0)
if (pp_allocate_entityid (&rdguid->entityid, NN_ENTITYID_KIND_READER_WITH_KEY, pp) < 0)
return NULL;
rd = new_reader_guid (rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg);
return rd;
@ -3312,7 +3299,7 @@ static void gc_delete_reader (struct gcreq *gcreq)
{
(rd->status_cb) (rd->status_cb_entity, NULL);
}
sertopic_free ((struct sertopic *) rd->topic);
ddsi_sertopic_unref ((struct ddsi_sertopic *) rd->topic);
nn_xqos_fini (rd->xqos);
os_free (rd->xqos);

View file

@ -50,12 +50,12 @@
#include "ddsi/sysdeps.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/ddsi_tran.h"
#include "ddsi/ddsi_udp.h"
#include "ddsi/ddsi_tcp.h"
#include "ddsi/ddsi_raweth.h"
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/ddsi_serdata_default.h"
#include "dds__tkmap.h"
#include "dds__whc.h"
@ -752,6 +752,39 @@ static void wait_for_receive_threads (void)
}
}
static struct ddsi_sertopic *make_special_topic (uint16_t enc_id, const struct ddsi_serdata_ops *ops)
{
/* FIXME: two things (at least)
- it claims there is a key, but the underlying type description is missing
that only works as long as it ends up comparing the keyhash field ...
the keyhash field should be eliminated; but this can simply be moved over to an alternate
topic class, it need not use the "default" one, that's mere expediency
- initialising/freeing them here, in this manner, is not very clean
it should be moved to somewhere in the topic implementation
(kinda natural if they stop being "default" ones) */
struct ddsi_sertopic_default *st = os_malloc (sizeof (*st));
memset (st, 0, sizeof (*st));
os_atomic_st32 (&st->c.refc, 1);
st->c.ops = &ddsi_sertopic_ops_default;
st->c.serdata_ops = ops;
st->native_encoding_identifier = enc_id;
st->c.iid = ddsi_plugin.iidgen_fn();
st->nkeys = 1;
return (struct ddsi_sertopic *)st;
}
static void make_special_topics (void)
{
gv.plist_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist);
gv.rawcdr_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr);
}
static void free_special_topics (void)
{
ddsi_sertopic_unref (gv.plist_topic);
ddsi_sertopic_unref (gv.rawcdr_topic);
}
static int setup_and_start_recv_threads (void)
{
unsigned i;
@ -998,6 +1031,8 @@ int rtps_init (void)
make_builtin_endpoint_xqos (&gv.builtin_endpoint_xqos_rd, &gv.default_xqos_rd);
make_builtin_endpoint_xqos (&gv.builtin_endpoint_xqos_wr, &gv.default_xqos_wr);
make_special_topics (); /* FIXME: leaking these for now */
os_mutexInit (&gv.participant_set_lock);
os_condInit (&gv.participant_set_cond, &gv.participant_set_lock);
lease_management_init ();
@ -1309,6 +1344,7 @@ err_unicast_sockets:
lease_management_term ();
os_condDestroy (&gv.participant_set_cond);
os_mutexDestroy (&gv.participant_set_lock);
free_special_topics ();
#ifdef DDSI_INCLUDE_ENCRYPTION
if (q_security_plugin.free_decoder)
q_security_plugin.free_decoder (gv.recvSecurityCodec);
@ -1586,6 +1622,7 @@ void rtps_term (void)
lease_management_term ();
os_mutexDestroy (&gv.participant_set_lock);
os_condDestroy (&gv.participant_set_cond);
free_special_topics ();
nn_xqos_fini (&gv.builtin_endpoint_xqos_wr);
nn_xqos_fini (&gv.builtin_endpoint_xqos_rd);

View file

@ -17,7 +17,7 @@
#include "util/ut_fibheap.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/q_protocol.h"
#include "ddsi/q_rtps.h"
#include "ddsi/q_misc.h"

View file

@ -3066,6 +3066,22 @@ int nn_plist_init_frommsg
return ERR_INVALID;
}
const unsigned char *nn_plist_findparam_native_unchecked (const void *src, nn_parameterid_t pid)
{
/* Scans the parameter list starting at src looking just for pid, returning NULL if not found;
no further checking is done and the input is assumed to valid and in native format. Clearly
this is only to be used for internally generated data -- to precise, for grabbing the key
value from discovery data that is being sent out. */
const nn_parameter_t *par = src;
while (par->parameterid != pid)
{
if (pid == PID_SENTINEL)
return NULL;
par = (const nn_parameter_t *) ((const char *) (par + 1) + par->length);
}
return (unsigned char *) (par + 1);
}
unsigned char *nn_plist_quickscan (struct nn_rsample_info *dest, const struct nn_rmsg *rmsg, const nn_plist_src_t *src)
{
/* Sets a few fields in dest, returns address of first byte

View file

@ -16,10 +16,8 @@
#include "os/os.h"
#include "ddsi/q_md5.h"
#include "util/ut_avl.h"
#include "q__osplser.h"
#include "dds__stream.h"
#include "ddsi/q_protocol.h"
#include "ddsi/q_rtps.h"
@ -49,6 +47,8 @@
#include "ddsi/q_static_assert.h"
#include "ddsi/q_init.h"
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/ddsi_serdata_default.h" /* FIXME: get rid of this */
#include "ddsi/sysdeps.h"
#include "dds__whc.h"
@ -1784,70 +1784,28 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
return 1;
}
static serstate_t make_raw_serstate
(
struct sertopic const * const topic,
const struct nn_rdata *fragchain, uint32_t sz, int justkey,
unsigned statusinfo, nn_wctime_t tstamp
)
static struct ddsi_serdata *get_serdata (struct ddsi_sertopic const * const topic, const struct nn_rdata *fragchain, uint32_t sz, int justkey, unsigned statusinfo, nn_wctime_t tstamp)
{
serstate_t st = ddsi_serstate_new (topic);
ddsi_serstate_set_msginfo (st, statusinfo, tstamp);
st->kind = justkey ? STK_KEY : STK_DATA;
/* the CDR header is always fully contained in the first fragment
(see valid_DataFrag), so extracting it is easy */
assert (fragchain->min == 0);
(void)sz;
/* alignment at head-of-stream is guaranteed, requesting 1 byte
alignment is therefore fine for pasting together fragments of
data */
{
uint32_t off = 4; /* must skip the CDR header */
while (fragchain)
{
assert (fragchain->min <= off);
assert (fragchain->maxp1 <= sz);
if (fragchain->maxp1 > off)
{
/* only copy if this fragment adds data */
const unsigned char *payload = NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain));
ddsi_serstate_append_blob (st, 1, fragchain->maxp1 - off, payload + off - fragchain->min);
off = fragchain->maxp1;
}
fragchain = fragchain->nextfrag;
}
}
return st;
struct ddsi_serdata *sd = ddsi_serdata_from_ser (topic, justkey ? SDK_KEY : SDK_DATA, fragchain, sz);
sd->statusinfo = statusinfo;
sd->timestamp = tstamp;
return sd;
}
static serdata_t ddsi_serstate_fix_with_key (serstate_t st, const struct sertopic *topic, bool bswap)
{
serdata_t sample = ddsi_serstate_fix(st);
dds_stream_t is;
assert(sample->v.keyhash.m_flags == 0);
sample->v.bswap = bswap;
dds_stream_from_serstate (&is, sample->v.st);
/* FIXME: the relationship between dds_topic, topic_descriptor and sertopic clearly needs some work */
dds_stream_read_keyhash (&is, &sample->v.keyhash, topic->status_cb_entity->m_descriptor, sample->v.st->kind == STK_KEY);
return sample;
}
static serdata_t extract_sample_from_data
static struct ddsi_serdata *extract_sample_from_data
(
const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags,
const nn_plist_t *qos, const struct nn_rdata *fragchain, unsigned statusinfo,
nn_wctime_t tstamp, struct sertopic const * const topic
nn_wctime_t tstamp, struct ddsi_sertopic const * const topic
)
{
static const nn_guid_t null_guid = {{{0,0,0,0,0,0,0,0,0,0,0,0}},{0}};
const char *failmsg = NULL;
serdata_t sample = NULL;
struct ddsi_serdata * sample = NULL;
if (statusinfo == 0)
{
/* normal write */
serstate_t st;
if (!(data_smhdr_flags & DATA_FLAG_DATAFLAG) || sampleinfo->size == 0)
{
const struct proxy_writer *pwr = sampleinfo->pwr;
@ -1859,25 +1817,21 @@ static serdata_t extract_sample_from_data
data_smhdr_flags, sampleinfo->size));
return NULL;
}
st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp);
sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
sample = get_serdata (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp);
}
else if (sampleinfo->size)
{
/* dispose or unregister with included serialized key or data
(data is a PrismTech extension) -- i.e., dispose or unregister
as one would expect to receive */
serstate_t st;
if (data_smhdr_flags & DATA_FLAG_KEYFLAG)
{
st = make_raw_serstate (topic, fragchain, sampleinfo->size, 1, statusinfo, tstamp);
sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
sample = get_serdata (topic, fragchain, sampleinfo->size, 1, statusinfo, tstamp);
}
else
{
assert (data_smhdr_flags & DATA_FLAG_DATAFLAG);
st = make_raw_serstate (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp);
sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
sample = get_serdata (topic, fragchain, sampleinfo->size, 0, statusinfo, tstamp);
}
}
else if (data_smhdr_flags & DATA_FLAG_INLINE_QOS)
@ -1890,12 +1844,9 @@ static serdata_t extract_sample_from_data
failmsg = "qos present but without keyhash";
else
{
serstate_t st;
st = ddsi_serstate_new (topic);
ddsi_serstate_set_msginfo (st, statusinfo, tstamp);
st->kind = STK_KEY;
ddsi_serstate_append_blob (st, 1, sizeof (qos->keyhash), qos->keyhash.value);
sample = ddsi_serstate_fix_with_key (st, topic, sampleinfo->bswap);
sample = ddsi_serdata_from_keyhash (topic, &qos->keyhash);
sample->statusinfo = statusinfo;
sample->timestamp = tstamp;
}
}
else
@ -1943,20 +1894,17 @@ unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr, in
}
}
static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const nn_guid_t *rdguid, int pwr_locked)
{
struct receiver_state const * const rst = sampleinfo->rst;
struct proxy_writer * const pwr = sampleinfo->pwr;
struct sertopic const * const topic = pwr->c.topic;
struct ddsi_sertopic const * const topic = pwr->c.topic;
unsigned statusinfo;
Data_DataFrag_common_t *msg;
unsigned char data_smhdr_flags;
nn_plist_t qos;
int need_keyhash;
serdata_t payload;
struct ddsi_serdata * payload;
if (pwr->ddsi2direct_cb)
{

View file

@ -32,7 +32,8 @@
#include "ddsi/q_hbcontrol.h"
#include "ddsi/q_static_assert.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/ddsi_sertopic.h"
#include "ddsi/sysdeps.h"
#include "dds__whc.h"
@ -394,13 +395,26 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
nn_xmsg_submsg_setnext (msg, sm_marker);
}
static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struct serdata *serdata, struct nn_xmsg **pmsg)
static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg)
{
const size_t expected_inline_qos_size = 4+8+20+4 + 32;
struct nn_xmsg_marker sm_marker;
const unsigned char contentflag = (ddsi_serdata_is_empty (serdata) ? 0 : ddsi_serdata_is_key (serdata) ? DATA_FLAG_KEYFLAG : DATA_FLAG_DATAFLAG);
unsigned char contentflag;
Data_t *data;
switch (serdata->kind)
{
case SDK_EMPTY:
contentflag = 0;
break;
case SDK_KEY:
contentflag = DATA_FLAG_KEYFLAG;
break;
case SDK_DATA:
contentflag = DATA_FLAG_DATAFLAG;
break;
}
ASSERT_MUTEX_HELD (&wr->e.lock);
if ((*pmsg = nn_xmsg_new (gv.xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (Data_t) + expected_inline_qos_size, NN_XMSG_KIND_DATA)) == NULL)
@ -413,7 +427,7 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc
nn_xmsg_setdstN (*pmsg, wr->as, wr->as_group);
nn_xmsg_setmaxdelay (*pmsg, nn_from_ddsi_duration (wr->xqos->latency_budget.duration));
nn_xmsg_add_timestamp (*pmsg, serdata->v.msginfo.timestamp);
nn_xmsg_add_timestamp (*pmsg, serdata->timestamp);
data = nn_xmsg_append (*pmsg, &sm_marker, sizeof (Data_t));
nn_xmsg_submsg_init (*pmsg, sm_marker, SMID_DATA);
@ -430,8 +444,8 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc
/* Adding parameters means potential reallocing, so sm, ddcmn now likely become invalid */
if (wr->include_keyhash)
nn_xmsg_addpar_keyhash (*pmsg, serdata);
if (serdata->v.msginfo.statusinfo)
nn_xmsg_addpar_statusinfo (*pmsg, serdata->v.msginfo.statusinfo);
if (serdata->statusinfo)
nn_xmsg_addpar_statusinfo (*pmsg, serdata->statusinfo);
if (nn_xmsg_addpar_sentinel_ifparam (*pmsg) > 0)
{
data = nn_xmsg_submsg_from_marker (*pmsg, sm_marker);
@ -443,7 +457,7 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc
return 0;
}
int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct serdata *serdata, unsigned fragnum, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew)
int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd, struct nn_xmsg **pmsg, int isnew)
{
/* We always fragment into FRAGMENT_SIZEd fragments, which are near
the smallest allowed fragment size & can't be bothered (yet) to
@ -464,14 +478,15 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
void *sm;
Data_DataFrag_common_t *ddcmn;
int fragging;
unsigned fragstart, fraglen;
uint32_t fragstart, fraglen;
enum nn_xmsg_kind xmsg_kind = isnew ? NN_XMSG_KIND_DATA : NN_XMSG_KIND_DATA_REXMIT;
const uint32_t size = ddsi_serdata_size (serdata);
int ret = 0;
(void)plist;
ASSERT_MUTEX_HELD (&wr->e.lock);
if (fragnum * config.fragment_size >= ddsi_serdata_size (serdata) && ddsi_serdata_size (serdata) > 0)
if (fragnum * config.fragment_size >= size && size > 0)
{
/* This is the first chance to detect an attempt at retransmitting
an non-existent fragment, which a malicious (or buggy) remote
@ -480,7 +495,7 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
return ERR_INVALID;
}
fragging = (config.fragment_size < ddsi_serdata_size (serdata));
fragging = (config.fragment_size < size);
if ((*pmsg = nn_xmsg_new (gv.xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL)
return ERR_OUT_OF_MEMORY;
@ -509,7 +524,7 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
/* Timestamp only needed once, for the first fragment */
if (fragnum == 0)
{
nn_xmsg_add_timestamp (*pmsg, serdata->v.msginfo.timestamp);
nn_xmsg_add_timestamp (*pmsg, serdata->timestamp);
}
sm = nn_xmsg_append (*pmsg, &sm_marker, fragging ? sizeof (DataFrag_t) : sizeof (Data_t));
@ -517,13 +532,19 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
if (!fragging)
{
const unsigned char contentflag = (ddsi_serdata_is_empty (serdata) ? 0 : ddsi_serdata_is_key (serdata) ? DATA_FLAG_KEYFLAG : DATA_FLAG_DATAFLAG);
unsigned char contentflag = 0;
Data_t *data = sm;
switch (serdata->kind)
{
case SDK_EMPTY: contentflag = 0; break;
case SDK_KEY: contentflag = DATA_FLAG_KEYFLAG; break;
case SDK_DATA: contentflag = DATA_FLAG_DATAFLAG; break;
}
nn_xmsg_submsg_init (*pmsg, sm_marker, SMID_DATA);
ddcmn->smhdr.flags = (unsigned char) (ddcmn->smhdr.flags | contentflag);
fragstart = 0;
fraglen = ddsi_serdata_size (serdata);
fraglen = size;
ddcmn->octetsToInlineQos = (unsigned short) ((char*) (data+1) - ((char*) &ddcmn->octetsToInlineQos + 2));
if (wr->reliable)
@ -533,18 +554,18 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
{
const unsigned char contentflag =
set_smhdr_flags_asif_data
? (ddsi_serdata_is_key (serdata) ? DATA_FLAG_KEYFLAG : DATA_FLAG_DATAFLAG)
: (ddsi_serdata_is_key (serdata) ? DATAFRAG_FLAG_KEYFLAG : 0);
? (serdata->kind == SDK_KEY ? DATA_FLAG_KEYFLAG : DATA_FLAG_DATAFLAG)
: (serdata->kind == SDK_KEY ? DATAFRAG_FLAG_KEYFLAG : 0);
DataFrag_t *frag = sm;
/* empty means size = 0, which means it never needs fragmenting */
assert (!ddsi_serdata_is_empty (serdata));
assert (serdata->kind != SDK_EMPTY);
nn_xmsg_submsg_init (*pmsg, sm_marker, SMID_DATA_FRAG);
ddcmn->smhdr.flags = (unsigned char) (ddcmn->smhdr.flags | contentflag);
frag->fragmentStartingNum = fragnum + 1;
frag->fragmentsInSubmessage = 1;
frag->fragmentSize = (unsigned short) config.fragment_size;
frag->sampleSize = ddsi_serdata_size (serdata);
frag->sampleSize = (uint32_t)size;
fragstart = fragnum * config.fragment_size;
#if MULTIPLE_FRAGS_IN_SUBMSG /* ugly hack for testing only */
@ -555,8 +576,8 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
#endif
fraglen = config.fragment_size * frag->fragmentsInSubmessage;
if (fragstart + fraglen > ddsi_serdata_size (serdata))
fraglen = ddsi_serdata_size (serdata) - fragstart;
if (fragstart + fraglen > size)
fraglen = (uint32_t)(size - fragstart);
ddcmn->octetsToInlineQos = (unsigned short) ((char*) (frag+1) - ((char*) &ddcmn->octetsToInlineQos + 2));
if (wr->reliable && (!isnew || fragstart + fraglen == ddsi_serdata_size (serdata)))
@ -588,9 +609,9 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
{
nn_xmsg_addpar_keyhash (*pmsg, serdata);
}
if (serdata->v.msginfo.statusinfo)
if (serdata->statusinfo)
{
nn_xmsg_addpar_statusinfo (*pmsg, serdata->v.msginfo.statusinfo);
nn_xmsg_addpar_statusinfo (*pmsg, serdata->statusinfo);
}
rc = nn_xmsg_addpar_sentinel_ifparam (*pmsg);
if (rc > 0)
@ -670,7 +691,7 @@ static int must_skip_frag (const char *frags_to_skip, unsigned frag)
}
#endif
static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew, unsigned nfrags)
static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew, unsigned nfrags)
{
unsigned i;
#if 0
@ -715,7 +736,7 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *
struct nn_xmsg *msg = NULL;
int hbansreq;
os_mutexLock (&wr->e.lock);
msg = writer_hbcontrol_piggyback (wr, whcst, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq);
msg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq);
os_mutexUnlock (&wr->e.lock);
if (msg)
{
@ -726,17 +747,17 @@ static void transmit_sample_lgmsg_unlocked (struct nn_xpack *xp, struct writer *
}
}
static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew)
static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr, const struct whc_state *whcst, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew)
{
/* on entry: &wr->e.lock held; on exit: lock no longer held */
struct nn_xmsg *fmsg;
unsigned sz;
uint32_t sz;
assert(xp);
sz = ddsi_serdata_size (serdata);
if (sz > config.fragment_size || !isnew || plist != NULL || prd != NULL)
{
unsigned nfrags;
uint32_t nfrags;
os_mutexUnlock (&wr->e.lock);
nfrags = (sz + config.fragment_size - 1) / config.fragment_size;
transmit_sample_lgmsg_unlocked (xp, wr, whcst, seq, plist, serdata, prd, isnew, nfrags);
@ -754,7 +775,7 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
/* Note: wr->heartbeat_xevent != NULL <=> wr is reliable */
if (wr->heartbeat_xevent)
hmsg = writer_hbcontrol_piggyback (wr, whcst, ddsi_serdata_twrite (serdata), nn_xpack_packetid (xp), &hbansreq);
hmsg = writer_hbcontrol_piggyback (wr, whcst, serdata->twrite, nn_xpack_packetid (xp), &hbansreq);
else
hmsg = NULL;
@ -767,9 +788,9 @@ static void transmit_sample_unlocks_wr (struct nn_xpack *xp, struct writer *wr,
}
}
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, serdata_t serdata, struct proxy_reader *prd, int isnew)
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew)
{
unsigned i, sz, nfrags;
uint32_t i, sz, nfrags;
int enqueued = 1;
ASSERT_MUTEX_HELD (&wr->e.lock);
@ -821,7 +842,7 @@ int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_
return enqueued ? 0 : -1;
}
static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk)
static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk)
{
/* returns: < 0 on error, 0 if no need to insert in whc, > 0 if inserted */
int do_insert, insres, res;
@ -839,9 +860,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
nn_log (LC_TRACE, "write_sample %x:%x:%x:%x #%"PRId64"", PGUID (wr->e.guid), seq);
if (plist != 0 && (plist->present & PP_COHERENT_SET))
nn_log (LC_TRACE, " C#%"PRId64"", fromSN (plist->coherent_set_seqno));
nn_log (LC_TRACE, ": ST%u %s/%s:%s%s\n",
serdata->v.msginfo.statusinfo, tname, ttname,
ppbuf, tmp < (int) sizeof (ppbuf) ? "" : " (trunc)");
nn_log (LC_TRACE, ": ST%u %s/%s:%s%s\n", serdata->statusinfo, tname, ttname, ppbuf, tmp < (int) sizeof (ppbuf) ? "" : " (trunc)");
}
assert (wr->reliable || have_reliable_subs (wr) == 0);
@ -999,7 +1018,7 @@ static int maybe_grow_whc (struct writer *wr)
return 0;
}
static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_plist *plist, serdata_t serdata, struct tkmap_instance *tk, int end_of_txn, int gc_allowed)
static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_plist *plist, struct ddsi_serdata *serdata, struct tkmap_instance *tk, int end_of_txn, int gc_allowed)
{
int r;
seqno_t seq;
@ -1061,7 +1080,7 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
/* Always use the current monotonic time */
tnow = now_mt ();
ddsi_serdata_set_twrite (serdata, tnow);
serdata->twrite = tnow;
seq = ++wr->seq;
if (wr->cs_seq != 0)
@ -1135,17 +1154,17 @@ drop:
return r;
}
int write_sample_gc (struct nn_xpack *xp, struct writer *wr, serdata_t serdata, struct tkmap_instance *tk)
int write_sample_gc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct tkmap_instance *tk)
{
return write_sample_eot (xp, wr, NULL, serdata, tk, 0, 1);
}
int write_sample_nogc (struct nn_xpack *xp, struct writer *wr, serdata_t serdata, struct tkmap_instance *tk)
int write_sample_nogc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct tkmap_instance *tk)
{
return write_sample_eot (xp, wr, NULL, serdata, tk, 0, 0);
}
int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, serdata_t serdata)
int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata)
{
struct tkmap_instance *tk;
int res;
@ -1155,7 +1174,7 @@ int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, serdata_t serd
return res;
}
int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, serdata_t serdata)
int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata)
{
struct tkmap_instance *tk;
int res;

View file

@ -36,8 +36,8 @@
#include "ddsi/q_bitset.h"
#include "ddsi/q_lease.h"
#include "ddsi/q_xmsg.h"
#include "q__osplser.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/ddsi_serdata_default.h"
#include "dds__whc.h"
#include "ddsi/sysdeps.h"
@ -966,9 +966,6 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
struct proxy_reader *prd;
struct writer *spdp_wr;
struct whc_borrowed_sample sample;
serstate_t st;
serdata_t sd;
nn_guid_t kh;
#ifndef NDEBUG
bool sample_found;
#endif
@ -977,6 +974,8 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
{
TRACE (("handle_xevk_spdp %x:%x:%x:%x - unknown guid\n",
PGUID (ev->u.spdp.pp_guid)));
if (ev->u.spdp.directed)
delete_xevent (ev);
return;
}
@ -984,6 +983,8 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
{
TRACE (("handle_xevk_spdp %x:%x:%x:%x - spdp writer of participant not found\n",
PGUID (ev->u.spdp.pp_guid)));
if (ev->u.spdp.directed)
delete_xevent (ev);
return;
}
@ -1005,15 +1006,21 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
}
}
/* Look up data in (transient-local) WHC by key value */
if ((st = ddsi_serstate_new (NULL)) == NULL)
{
TRACE (("xmit spdp: skip %x:%x:%x:%x: out of memory\n", PGUID (ev->u.spdp.pp_guid)));
goto skip;
}
kh = nn_hton_guid (ev->u.spdp.pp_guid);
serstate_set_key (st, 1, &kh);
sd = ddsi_serstate_fix (st);
/* Look up data in (transient-local) WHC by key value -- FIXME: clearly
a slightly more efficient and elegant way of looking up the key value
is to be preferred */
nn_plist_t ps;
nn_plist_init_empty (&ps);
ps.present |= PP_PARTICIPANT_GUID;
ps.participant_guid = ev->u.spdp.pp_guid;
struct nn_xmsg *mpayload = nn_xmsg_new (gv.xmsgpool, &ev->u.spdp.pp_guid.prefix, 0, NN_XMSG_KIND_DATA);
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
nn_xmsg_addpar_sentinel (mpayload);
nn_plist_fini (&ps);
struct ddsi_plist_sample plist_sample;
nn_xmsg_payload_to_plistsample (&plist_sample, PID_PARTICIPANT_GUID, mpayload);
struct ddsi_serdata *sd = ddsi_serdata_from_sample (gv.plist_topic, SDK_KEY, &plist_sample);
nn_xmsg_free (mpayload);
os_mutexLock (&spdp_wr->e.lock);
if (spdp_wr->whc->ops->borrow_sample_key (spdp_wr->whc, sd, &sample))
@ -1120,8 +1127,7 @@ static void write_pmd_message (struct nn_xpack *xp, struct participant *pp, unsi
ParticipantMessageData_t pmd;
char pad[offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH];
} u;
serdata_t serdata;
serstate_t serstate;
struct ddsi_serdata *serdata;
struct tkmap_instance *tk;
if ((wr = get_builtin_writer (pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER)) == NULL)
@ -1135,16 +1141,14 @@ static void write_pmd_message (struct nn_xpack *xp, struct participant *pp, unsi
u.pmd.length = PMD_DATA_LENGTH;
memset (u.pmd.value, 0, u.pmd.length);
serstate = ddsi_serstate_new (NULL);
ddsi_serstate_append_blob (serstate, 4, sizeof (u.pad), &u.pmd);
serstate_set_key (serstate, 0, &u.pmd);
ddsi_serstate_set_msginfo (serstate, 0, now ());
serdata = ddsi_serstate_fix (serstate);
/* HORRIBLE HACK ALERT -- serstate/serdata looks at whether topic is
a null pointer to choose PL_CDR_x encoding or regular CDR_x
encoding. */
serdata->hdr.identifier = PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE;
struct ddsi_rawcdr_sample raw = {
.blob = &u,
.size = offsetof (ParticipantMessageData_t, value) + PMD_DATA_LENGTH,
.key = &u.pmd,
.keysize = 16
};
serdata = ddsi_serdata_from_sample (gv.rawcdr_topic, SDK_DATA, &raw);
serdata->timestamp = now ();
tk = (ddsi_plugin.rhc_plugin.rhc_lookup_fn) (serdata);
write_sample_nogc (xp, wr, serdata, tk);

View file

@ -24,7 +24,6 @@
#include "util/ut_avl.h"
#include "util/ut_thread_pool.h"
#include "ddsi/ddsi_ser.h"
#include "ddsi/q_protocol.h"
#include "ddsi/q_xqos.h"
#include "ddsi/q_bswap.h"
@ -41,7 +40,7 @@
#include "ddsi/q_globals.h"
#include "ddsi/q_ephash.h"
#include "ddsi/q_freelist.h"
#include "q__osplser.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/sysdeps.h"
@ -73,7 +72,7 @@ struct nn_xmsg {
size_t maxsz;
size_t sz;
int have_params;
struct serdata *refd_payload;
struct ddsi_serdata *refd_payload;
ddsi_iovec_t refd_payload_iov;
int64_t maxdelay;
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
@ -345,16 +344,16 @@ void nn_xmsg_free (struct nn_xmsg *m)
{
struct nn_xmsgpool *pool = m->pool;
if (m->refd_payload)
{
ddsi_serdata_unref (m->refd_payload);
}
ddsi_serdata_to_ser_unref (m->refd_payload, &m->refd_payload_iov);
if (m->dstmode == NN_XMSG_DST_ALL)
{
unref_addrset (m->dstaddr.all.as);
unref_addrset (m->dstaddr.all.as_group);
}
if (!nn_freelist_push (&pool->freelist, m))
{
nn_xmsg_realfree (m);
}
}
/************************************************/
@ -468,6 +467,13 @@ void *nn_xmsg_payload (size_t *sz, struct nn_xmsg *m)
return m->data->payload;
}
void nn_xmsg_payload_to_plistsample (struct ddsi_plist_sample *dst, nn_parameterid_t keyparam, const struct nn_xmsg *m)
{
dst->blob = m->data->payload;
dst->size = m->sz;
dst->keyparam = keyparam;
}
void nn_xmsg_submsg_init (struct nn_xmsg *msg, struct nn_xmsg_marker marker, SubmessageKind_t smkind)
{
SubmessageHeader_t *hdr = (SubmessageHeader_t *) (msg->data->payload + marker.offset);
@ -554,15 +560,13 @@ void nn_xmsg_add_entityid (struct nn_xmsg * m)
nn_xmsg_submsg_setnext (m, sm);
}
void nn_xmsg_serdata (struct nn_xmsg *m, serdata_t serdata, size_t off, size_t len)
void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len)
{
if (!ddsi_serdata_is_empty (serdata))
if (serdata->kind != SDK_EMPTY)
{
size_t len4 = align4u (len);
assert (m->refd_payload == NULL);
m->refd_payload = ddsi_serdata_ref (serdata);
m->refd_payload_iov.iov_base = (char *) &m->refd_payload->hdr + off;
m->refd_payload_iov.iov_len = (ddsi_iov_len_t) len4;
m->refd_payload = ddsi_serdata_to_ser_ref (serdata, off, len4, &m->refd_payload_iov);
}
}
@ -889,12 +893,13 @@ void nn_xmsg_addpar_stringseq (struct nn_xmsg *m, unsigned pid, const nn_strings
}
}
void nn_xmsg_addpar_keyhash (struct nn_xmsg *m, const struct serdata *serdata)
void nn_xmsg_addpar_keyhash (struct nn_xmsg *m, const struct ddsi_serdata *serdata)
{
if (!ddsi_serdata_is_empty (serdata))
if (serdata->kind != SDK_EMPTY)
{
const struct ddsi_serdata_default *serdata_def = (const struct ddsi_serdata_default *)serdata;
char *p = nn_xmsg_addpar (m, PID_KEYHASH, 16);
memcpy (p, serdata->v.keyhash.m_hash, 16);
memcpy (p, serdata_def->keyhash.m_hash, 16);
}
}