validate and normalize received CDR data

The CDR deserializer failed to check it was staying within the bounds of
the received data, and it turns out it also was inconsistent in its
interpretation of the (undocumented) serializer instructions.  This
commit adds some information on the instruction format obtained by
reverse engineering the code and studying the output of the IDL
preprocessor, and furthermore changes a lot of the types used in the
(de)serializer code to have some more compiler support.  The IDL
preprocessor is untouched and the generated instructinos do exactly the
same thing (except where change was needed).

The bulk of this commit replaces the implementation of the
(de)serializer.  It is still rather ugly, but at least the very long
functions with several levels of nested conditions and switch statements
have been split out into multiple functions.  Most of these have single
call-sites, so the compiler hopefully inlines them nicely.

The other important thing is that it adds a "normalize" function that
validates the structure of the CDR and performs byteswapping if
necessary.  This means the deserializer can now assume a well-formed
input in native byte-order.  Checks and conditional byteswaps have been
removed accordingly.

It changes some types to make a compile-time distinction between
read-only, native-endianness input, a native-endianness output, and a
big-endian output for dealing with key hashes.  This should reduce the
risk of accidentally mixing endianness or modifying an input stream.

The preprocessor has been modified to indicate the presence of unions in
a topic type in the descriptor flags.  If a union is present, any
memory allocated in a sample is freed first and the sample is zero'd out
prior to deserializing the new value.  This is to prevent reading
garbage pointers for strings and sequences when switching union cases.

The test tool has been included in the commit but it does not get run by
itself.  Firstly, it requires the presence of OpenSplice DDS as an
alternative implementation to check the CDR processing against.
Secondly, it takes quite a while to run and is of no interest unless one
changes something in the (de)serialization.

Finally, I have no idea why there was a "CDR stream" interface among the
public functions.  The existing interfaces are fundamentally broken by
the removal of arbitrary-endianness streams, and the interfaces were
already incapable of proper error notification.  So, they have been
removed.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-05-10 17:59:06 +08:00 committed by eboasson
parent d91e7b34c9
commit 3067a69c92
25 changed files with 2315 additions and 1941 deletions

View file

@ -24,7 +24,6 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds_qos.c
dds_handles.c
dds_entity.c
dds_key.c
dds_querycond.c
dds_topic.c
dds_listener.c
@ -48,7 +47,6 @@ PREPEND(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include/dd
ddsc/dds_public_listener.h
ddsc/dds_public_qos.h
ddsc/dds_public_status.h
ddsc/dds_public_stream.h
)
PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
@ -58,7 +56,6 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds__handles.h
dds__entity.h
dds__init.h
dds__key.h
dds__listener.h
dds__participant.h
dds__publisher.h

View file

@ -45,7 +45,6 @@ typedef int32_t dds_entity_t;
#include "dds/ddsrt/time.h"
#include "dds/ddsrt/retcode.h"
#include "dds/ddsrt/log.h"
#include "dds/ddsc/dds_public_stream.h"
#include "dds/ddsc/dds_public_impl.h"
#include "dds/ddsc/dds_public_alloc.h"
#include "dds/ddsc/dds_public_qos.h"

View file

@ -21,9 +21,10 @@
#ifndef DDS_IMPL_H
#define DDS_IMPL_H
#include <stdint.h>
#include <stdbool.h>
#include "dds/export.h"
#include "dds/ddsc/dds_public_alloc.h"
#include "dds/ddsc/dds_public_stream.h"
#if defined (__cplusplus)
extern "C" {
@ -71,6 +72,7 @@ dds_topic_descriptor_t;
#define DDS_TOPIC_NO_OPTIMIZE 0x0001
#define DDS_TOPIC_FIXED_KEY 0x0002
#define DDS_TOPIC_CONTAINS_UNION 0x0004
/*
Masks for read condition, read, take: there is only one mask here,
@ -119,89 +121,119 @@ typedef int32_t dds_domainid_t;
/* Topic encoding instruction types */
#define DDS_OP_RTS 0x00000000
#define DDS_OP_ADR 0x01000000
#define DDS_OP_JSR 0x02000000
#define DDS_OP_JEQ 0x03000000
enum dds_stream_opcode {
/* return from subroutine, exits top-level
[RTS, 0, 0, 0] */
DDS_OP_RTS = 0x00 << 24,
/* data field
[ADR, nBY, 0, k] [offset]
[ADR, STR, 0, k] [offset]
[ADR, BST, 0, k] [offset] [bound]
[ADR, SEQ, nBY, 0] [offset]
[ADR, SEQ, STR, 0] [offset]
[ADR, SEQ, BST, 0] [offset] [bound]
[ADR, SEQ, s, 0] [offset] [elem-size] [next-insn, elem-insn]
where s = {SEQ,ARR,UNI,STU}
[ADR, ARR, nBY, k] [offset] [alen]
[ADR, ARR, STR, 0] [offset] [alen]
[ADR, ARR, BST, 0] [offset] [alen] [0] [bound]
[ADR, ARR, s, 0] [offset] [alen] [next-insn, elem-insn] [elem-size]
where s = {SEQ,ARR,UNI,STU}
[ADR, UNI, d, z] [offset] [alen] [next-insn, cases]
where
d = discriminant type of {1BY,2BY,4BY}
z = default present/not present (DDS_OP_FLAG_DEF)
offset = discriminant offset
followed by alen case labels: in JEQ format
note: [ADR, STU, ...] is illegal
where
s = subtype
k = key/not key (DDS_OP_FLAG_KEY)
[offset] = field offset from start of element in memory
[elem-size] = element size in memory
[bound] = string bound + 1
[alen] = array length, number of cases
[next-insn] = (unsigned 16 bits) offset to instruction for next field, from start of insn
[elem-insn] = (unsigned 16 bits) offset to first instruction for element, from start of insn
[cases] = (unsigned 16 bits) offset to first case label, from start of insn
*/
DDS_OP_ADR = 0x01 << 24,
/* jump-to-subroutine (apparently not used at the moment)
[JSR, 0, e]
where
e = (signed 16 bits) offset to first instruction in subroutine, from start of insn
instruction sequence must end in RTS, execution resumes at instruction
following JSR */
DDS_OP_JSR = 0x02 << 24,
/* union case
[JEQ, nBY, 0] [disc] [offset]
[JEQ, STR, 0] [disc] [offset]
[JEQ, s, e] [disc] [offset]
where
s = subtype other than {nBY,STR}
e = (unsigned 16 bits) offset to first instruction for case, from start of insn
instruction sequence must end in RTS, at which point executes continues
at the next field's instruction as specified by the union */
DDS_OP_JEQ = 0x03 << 24
};
/* Core type flags
1BY : One byte simple type
2BY : Two byte simple type
4BY : Four byte simple type
8BY : Eight byte simple type
STR : String
BST : Bounded string
SEQ : Sequence
ARR : Array
UNI : Union
STU : Struct
*/
#define DDS_OP_VAL_1BY 0x01
#define DDS_OP_VAL_2BY 0x02
#define DDS_OP_VAL_4BY 0x03
#define DDS_OP_VAL_8BY 0x04
#define DDS_OP_VAL_STR 0x05
#define DDS_OP_VAL_BST 0x06
#define DDS_OP_VAL_SEQ 0x07
#define DDS_OP_VAL_ARR 0x08
#define DDS_OP_VAL_UNI 0x09
#define DDS_OP_VAL_STU 0x0a
#define DDS_OP_TYPE_1BY (DDS_OP_VAL_1BY << 16)
#define DDS_OP_TYPE_2BY (DDS_OP_VAL_2BY << 16)
#define DDS_OP_TYPE_4BY (DDS_OP_VAL_4BY << 16)
#define DDS_OP_TYPE_8BY (DDS_OP_VAL_8BY << 16)
#define DDS_OP_TYPE_STR (DDS_OP_VAL_STR << 16)
#define DDS_OP_TYPE_SEQ (DDS_OP_VAL_SEQ << 16)
#define DDS_OP_TYPE_ARR (DDS_OP_VAL_ARR << 16)
#define DDS_OP_TYPE_UNI (DDS_OP_VAL_UNI << 16)
#define DDS_OP_TYPE_STU (DDS_OP_VAL_STU << 16)
#define DDS_OP_TYPE_BST (DDS_OP_VAL_BST << 16)
enum dds_stream_typecode {
DDS_OP_VAL_1BY = 0x01, /* one byte simple type (char, octet, boolean) */
DDS_OP_VAL_2BY = 0x02, /* two byte simple type ((unsigned) short) */
DDS_OP_VAL_4BY = 0x03, /* four byte simple type ((unsigned) long, enums, float) */
DDS_OP_VAL_8BY = 0x04, /* eight byte simple type ((unsigned) long long, double) */
DDS_OP_VAL_STR = 0x05, /* string */
DDS_OP_VAL_BST = 0x06, /* bounded string */
DDS_OP_VAL_SEQ = 0x07, /* sequence */
DDS_OP_VAL_ARR = 0x08, /* array */
DDS_OP_VAL_UNI = 0x09, /* union */
DDS_OP_VAL_STU = 0x0a /* struct */
};
/* primary type code for DDS_OP_ADR, DDS_OP_JEQ */
enum dds_stream_typecode_primary {
DDS_OP_TYPE_1BY = DDS_OP_VAL_1BY << 16,
DDS_OP_TYPE_2BY = DDS_OP_VAL_2BY << 16,
DDS_OP_TYPE_4BY = DDS_OP_VAL_4BY << 16,
DDS_OP_TYPE_8BY = DDS_OP_VAL_8BY << 16,
DDS_OP_TYPE_STR = DDS_OP_VAL_STR << 16,
DDS_OP_TYPE_BST = DDS_OP_VAL_BST << 16,
DDS_OP_TYPE_SEQ = DDS_OP_VAL_SEQ << 16,
DDS_OP_TYPE_ARR = DDS_OP_VAL_ARR << 16,
DDS_OP_TYPE_UNI = DDS_OP_VAL_UNI << 16,
DDS_OP_TYPE_STU = DDS_OP_VAL_STU << 16
};
#define DDS_OP_TYPE_BOO DDS_OP_TYPE_1BY
/* sub-type code:
- encodes element type for DDS_OP_TYPE_{SEQ,ARR},
- discriminant type for DDS_OP_TYPE_UNI */
enum dds_stream_typecode_subtype {
DDS_OP_SUBTYPE_1BY = DDS_OP_VAL_1BY << 8,
DDS_OP_SUBTYPE_2BY = DDS_OP_VAL_2BY << 8,
DDS_OP_SUBTYPE_4BY = DDS_OP_VAL_4BY << 8,
DDS_OP_SUBTYPE_8BY = DDS_OP_VAL_8BY << 8,
DDS_OP_SUBTYPE_STR = DDS_OP_VAL_STR << 8,
DDS_OP_SUBTYPE_BST = DDS_OP_VAL_BST << 8,
DDS_OP_SUBTYPE_SEQ = DDS_OP_VAL_SEQ << 8,
DDS_OP_SUBTYPE_ARR = DDS_OP_VAL_ARR << 8,
DDS_OP_SUBTYPE_UNI = DDS_OP_VAL_UNI << 8,
DDS_OP_SUBTYPE_STU = DDS_OP_VAL_STU << 8
};
#define DDS_OP_SUBTYPE_BOO DDS_OP_SUBTYPE_1BY
#define DDS_OP_SUBTYPE_1BY (DDS_OP_VAL_1BY << 8)
#define DDS_OP_SUBTYPE_2BY (DDS_OP_VAL_2BY << 8)
#define DDS_OP_SUBTYPE_4BY (DDS_OP_VAL_4BY << 8)
#define DDS_OP_SUBTYPE_8BY (DDS_OP_VAL_8BY << 8)
#define DDS_OP_SUBTYPE_STR (DDS_OP_VAL_STR << 8)
#define DDS_OP_SUBTYPE_SEQ (DDS_OP_VAL_SEQ << 8)
#define DDS_OP_SUBTYPE_ARR (DDS_OP_VAL_ARR << 8)
#define DDS_OP_SUBTYPE_UNI (DDS_OP_VAL_UNI << 8)
#define DDS_OP_SUBTYPE_STU (DDS_OP_VAL_STU << 8)
#define DDS_OP_SUBTYPE_BST (DDS_OP_VAL_BST << 8)
#define DDS_OP_FLAG_KEY 0x01
#define DDS_OP_FLAG_DEF 0x02
#define DDS_OP_FLAG_KEY 0x01 /* key field: applicable to {1,2,4,8}BY, STR, BST, ARR-of-{1,2,4,8}BY */
#define DDS_OP_FLAG_DEF 0x02 /* union has a default case (for DDS_OP_ADR | DDS_OP_TYPE_UNI) */
/**
* Description : Enable or disable write batching. Overrides default configuration
* setting for write batching (DDSI2E/Internal/WriteBatch).
* setting for write batching (Internal/WriteBatch).
*
* Arguments :
* -# enable Enables or disables write batching for all writers.
*/
DDS_EXPORT void dds_write_set_batch (bool enable);
/**
* Description : Install tcp/ssl and encryption support. Depends on openssl.
*
* Arguments :
* -# None
*/
DDS_EXPORT void dds_ssl_plugin (void);
/**
* Description : Install client durability support. Depends on OSPL server.
*
* Arguments :
* -# None
*/
DDS_EXPORT void dds_durability_plugin (void);
#if defined (__cplusplus)
}
#endif

View file

@ -1,108 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
/** @file
*
* @brief DDS C Stream API
*
* This header file defines the public API of the Streams in the
* Eclipse Cyclone DDS C language binding.
*/
#ifndef DDS_STREAM_H
#define DDS_STREAM_H
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include "dds/export.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct dds_sequence;
typedef union
{
uint8_t * p8;
uint16_t * p16;
uint32_t * p32;
uint64_t * p64;
float * pf;
double * pd;
void * pv;
}
dds_uptr_t;
typedef struct dds_stream
{
dds_uptr_t m_buffer; /* Union of pointers to start of buffer */
uint32_t m_size; /* Buffer size */
uint32_t m_index; /* Read/write offset from start of buffer */
bool m_endian; /* Endian: big (false) or little (true) */
bool m_failed; /* Attempt made to read beyond end of buffer */
}
dds_stream_t;
#define DDS_STREAM_BE false
#define DDS_STREAM_LE true
DDS_EXPORT dds_stream_t * dds_stream_create (uint32_t size);
DDS_EXPORT dds_stream_t * dds_stream_from_buffer (const void *buf, size_t sz, int bswap);
DDS_EXPORT void dds_stream_delete (dds_stream_t * st);
DDS_EXPORT void dds_stream_fini (dds_stream_t * st);
DDS_EXPORT void dds_stream_reset (dds_stream_t * st);
DDS_EXPORT void dds_stream_init (dds_stream_t * st, uint32_t size);
DDS_EXPORT void dds_stream_grow (dds_stream_t * st, uint32_t size);
DDS_EXPORT bool dds_stream_endian (void);
struct dds_topic_descriptor;
DDS_EXPORT void dds_stream_read_sample_w_desc (dds_stream_t * is, void * data, const struct dds_topic_descriptor * desc);
DDS_EXPORT bool dds_stream_read_bool (dds_stream_t * is);
DDS_EXPORT uint8_t dds_stream_read_uint8 (dds_stream_t * is);
DDS_EXPORT uint16_t dds_stream_read_uint16 (dds_stream_t * is);
DDS_EXPORT uint32_t dds_stream_read_uint32 (dds_stream_t * is);
DDS_EXPORT uint64_t dds_stream_read_uint64 (dds_stream_t * is);
DDS_EXPORT float dds_stream_read_float (dds_stream_t * is);
DDS_EXPORT double dds_stream_read_double (dds_stream_t * is);
DDS_EXPORT char * dds_stream_read_string (dds_stream_t * is);
DDS_EXPORT void dds_stream_read_buffer (dds_stream_t * is, uint8_t * buffer, uint32_t len);
inline char dds_stream_read_char (dds_stream_t *is) { return (char) dds_stream_read_uint8 (is); }
inline int8_t dds_stream_read_int8 (dds_stream_t *is) { return (int8_t) dds_stream_read_uint8 (is); }
inline int16_t dds_stream_read_int16 (dds_stream_t *is) { return (int16_t) dds_stream_read_uint16 (is); }
inline int32_t dds_stream_read_int32 (dds_stream_t *is) { return (int32_t) dds_stream_read_uint32 (is); }
inline int64_t dds_stream_read_int64 (dds_stream_t *is) { return (int64_t) dds_stream_read_uint64 (is); }
DDS_EXPORT void dds_stream_write_bool (dds_stream_t * os, bool val);
DDS_EXPORT void dds_stream_write_uint8 (dds_stream_t * os, uint8_t val);
DDS_EXPORT void dds_stream_write_uint16 (dds_stream_t * os, uint16_t val);
DDS_EXPORT void dds_stream_write_uint32 (dds_stream_t * os, uint32_t val);
DDS_EXPORT void dds_stream_write_uint64 (dds_stream_t * os, uint64_t val);
DDS_EXPORT void dds_stream_write_float (dds_stream_t * os, float val);
DDS_EXPORT void dds_stream_write_double (dds_stream_t * os, double val);
DDS_EXPORT void dds_stream_write_string (dds_stream_t * os, const char * val);
DDS_EXPORT void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, const uint8_t * buffer);
DDS_EXPORT void *dds_stream_address (dds_stream_t * s);
DDS_EXPORT void *dds_stream_alignto (dds_stream_t * s, uint32_t a);
inline void dds_stream_write_char (dds_stream_t * os, char val) { dds_stream_write_uint8 (os, (uint8_t) val); }
inline void dds_stream_write_int8 (dds_stream_t * os, int8_t val) { dds_stream_write_uint8 (os, (uint8_t) val); }
inline void dds_stream_write_int16 (dds_stream_t * os, int16_t val) { dds_stream_write_uint16 (os, (uint16_t) val); }
inline void dds_stream_write_int32 (dds_stream_t * os, int32_t val) { dds_stream_write_uint32 (os, (uint32_t) val); }
inline void dds_stream_write_int64 (dds_stream_t * os, int64_t val) { dds_stream_write_uint64 (os, (uint64_t) val); }
#if defined (__cplusplus)
}
#endif
#endif

View file

@ -1,35 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef _DDS_KEY_H_
#define _DDS_KEY_H_
#include "dds__types.h"
struct dds_key_hash;
#if defined (__cplusplus)
extern "C" {
#endif
void dds_key_md5 (struct dds_key_hash * kh);
void dds_key_gen
(
const dds_topic_descriptor_t * const desc,
struct dds_key_hash * kh,
const char * sample
);
#if defined (__cplusplus)
}
#endif
#endif

View file

@ -19,47 +19,46 @@
extern "C" {
#endif
void dds_stream_write_sample
(
dds_stream_t * os,
const void * data,
const struct ddsi_sertopic_default * topic
);
void dds_stream_read_sample
(
dds_stream_t * is,
void * data,
const struct ddsi_sertopic_default * topic
);
typedef struct dds_istream {
const unsigned char *m_buffer;
uint32_t m_size; /* Buffer size */
uint32_t m_index; /* Read/write offset from start of buffer */
} dds_istream_t;
size_t dds_stream_check_optimize (const dds_topic_descriptor_t * desc);
void dds_stream_from_serdata_default (dds_stream_t * s, const struct ddsi_serdata_default *d);
void dds_stream_add_to_serdata_default (dds_stream_t * s, struct ddsi_serdata_default **d);
typedef struct dds_ostream {
unsigned char *m_buffer;
uint32_t m_size; /* Buffer size */
uint32_t m_index; /* Read/write offset from start of buffer */
} dds_ostream_t;
void dds_stream_write_key (dds_stream_t * os, const char * sample, const struct ddsi_sertopic_default * topic);
uint32_t dds_stream_extract_key (dds_stream_t *is, dds_stream_t *os, const uint32_t *ops, const bool just_key);
void dds_stream_read_key
(
dds_stream_t * is,
char * sample,
const dds_topic_descriptor_t * desc
);
void dds_stream_read_keyhash
(
dds_stream_t * is,
dds_key_hash_t * kh,
const dds_topic_descriptor_t * desc,
const bool just_key
);
char * dds_stream_reuse_string
(
dds_stream_t * is,
char * str,
const uint32_t bound
);
DDS_EXPORT void dds_stream_swap (void * buff, uint32_t size, uint32_t num);
typedef struct dds_ostreamBE {
dds_ostream_t x;
} dds_ostreamBE_t;
extern const uint32_t dds_op_size[5];
DDS_EXPORT void dds_ostream_init (dds_ostream_t * __restrict st, uint32_t size);
DDS_EXPORT void dds_ostream_fini (dds_ostream_t * __restrict st);
DDS_EXPORT void dds_ostreamBE_init (dds_ostreamBE_t * __restrict st, uint32_t size);
DDS_EXPORT void dds_ostreamBE_fini (dds_ostreamBE_t * __restrict st);
bool dds_stream_normalize (void * __restrict data, uint32_t size, bool bswap, const struct ddsi_sertopic_default * __restrict topic, bool just_key);
void dds_stream_write_sample (dds_ostream_t * __restrict os, const void * __restrict data, const struct ddsi_sertopic_default * __restrict topic);
void dds_stream_read_sample (dds_istream_t * __restrict is, void * __restrict data, const struct ddsi_sertopic_default * __restrict topic);
size_t dds_stream_check_optimize (const dds_topic_descriptor_t * __restrict desc);
void dds_istream_from_serdata_default (dds_istream_t * __restrict s, const struct ddsi_serdata_default * __restrict d);
void dds_ostream_from_serdata_default (dds_ostream_t * __restrict s, struct ddsi_serdata_default * __restrict d);
void dds_ostream_add_to_serdata_default (dds_ostream_t * __restrict s, struct ddsi_serdata_default ** __restrict d);
void dds_ostreamBE_from_serdata_default (dds_ostreamBE_t * __restrict s, struct ddsi_serdata_default * __restrict d);
void dds_ostreamBE_add_to_serdata_default (dds_ostreamBE_t * __restrict s, struct ddsi_serdata_default ** __restrict d);
void dds_stream_write_key (dds_ostream_t * __restrict os, const char * __restrict sample, const struct ddsi_sertopic_default * __restrict topic);
void dds_stream_write_keyBE (dds_ostreamBE_t * __restrict os, const char * __restrict sample, const struct ddsi_sertopic_default * __restrict topic);
void dds_stream_extract_key_from_data (dds_istream_t * __restrict is, dds_ostream_t * __restrict os, const struct ddsi_sertopic_default * __restrict topic);
void dds_stream_extract_keyBE_from_data (dds_istream_t * __restrict is, dds_ostreamBE_t * __restrict os, const struct ddsi_sertopic_default * __restrict topic);
void dds_stream_extract_keyhash (dds_istream_t * __restrict is, dds_keyhash_t * __restrict kh, const struct ddsi_sertopic_default * __restrict topic, const bool just_key);
void dds_stream_read_key (dds_istream_t * __restrict is, char * __restrict sample, const struct ddsi_sertopic_default * __restrict topic);
/* For marshalling op code handling */
@ -70,14 +69,14 @@ extern const uint32_t dds_op_size[5];
#define DDS_OP_FLAGS_MASK 0x000000ff
#define DDS_JEQ_TYPE_MASK 0x00ff0000
#define DDS_OP(o) ((o) & DDS_OP_MASK)
#define DDS_OP_TYPE(o) (((o) & DDS_OP_TYPE_MASK) >> 16)
#define DDS_OP_SUBTYPE(o) (((o) & DDS_OP_SUBTYPE_MASK) >> 8)
#define DDS_OP_FLAGS(o) ((o) & DDS_OP_FLAGS_MASK)
#define DDS_OP(o) ((enum dds_stream_opcode) ((o) & DDS_OP_MASK))
#define DDS_OP_TYPE(o) ((enum dds_stream_typecode) (((o) & DDS_OP_TYPE_MASK) >> 16))
#define DDS_OP_SUBTYPE(o) ((enum dds_stream_typecode) (((o) & DDS_OP_SUBTYPE_MASK) >> 8))
#define DDS_OP_FLAGS(o) ((o) & DDS_OP_FLAGS_MASK)
#define DDS_OP_ADR_JSR(o) ((o) & DDS_OP_JMP_MASK)
#define DDS_OP_JUMP(o) ((int16_t) ((o) & DDS_OP_JMP_MASK))
#define DDS_OP_JUMP(o) ((int16_t) ((o) & DDS_OP_JMP_MASK))
#define DDS_OP_ADR_JMP(o) ((o) >> 16)
#define DDS_JEQ_TYPE(o) (((o) & DDS_JEQ_TYPE_MASK) >> 16)
#define DDS_JEQ_TYPE(o) ((enum dds_stream_typecode) (((o) & DDS_JEQ_TYPE_MASK) >> 16))
#if defined (__cplusplus)
}

View file

@ -179,6 +179,8 @@ void dds_sample_free_contents (char * data, const uint32_t * ops)
if (seq->_release)
{
dds_free (seq->_buffer);
seq->_maximum = 0;
seq->_length = 0;
seq->_buffer = NULL;
}
break;

View file

@ -1,126 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <string.h>
#include "dds/ddsrt/md5.h"
#include "dds__key.h"
#include "dds__stream.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/q_bswap.h"
#ifndef NDEBUG
static bool keyhash_is_reset(const dds_key_hash_t *kh)
{
return !kh->m_set;
}
#endif
/*
dds_key_gen: Generates key and keyhash for a sample.
See section 9.6.3.3 of DDSI spec.
*/
static void dds_key_gen_stream (const dds_topic_descriptor_t * const desc, dds_stream_t *os, const char *sample)
{
const char * src;
const uint32_t * op;
uint32_t i;
uint32_t len = 0;
for (i = 0; i < desc->m_nkeys; i++)
{
op = desc->m_ops + desc->m_keys[i].m_index;
src = sample + op[1];
assert ((*op & DDS_OP_FLAG_KEY) && ((DDS_OP_MASK & *op) == DDS_OP_ADR));
switch (DDS_OP_TYPE (*op))
{
case DDS_OP_VAL_1BY:
{
dds_stream_write_uint8 (os, *((const uint8_t *) src));
break;
}
case DDS_OP_VAL_2BY:
{
dds_stream_write_uint16 (os, *((const uint16_t *) src));
break;
}
case DDS_OP_VAL_4BY:
{
dds_stream_write_uint32 (os, *((const uint32_t *) src));
break;
}
case DDS_OP_VAL_8BY:
{
dds_stream_write_uint64 (os, *((const uint64_t *) src));
break;
}
case DDS_OP_VAL_STR:
{
src = *((char**) src);
}
/* FALLS THROUGH */
case DDS_OP_VAL_BST:
{
len = (uint32_t) (strlen (src) + 1);
dds_stream_write_uint32 (os, len);
dds_stream_write_buffer (os, len, (const uint8_t *) src);
break;
}
case DDS_OP_VAL_ARR:
{
uint32_t size = dds_op_size[DDS_OP_SUBTYPE (*op)];
char *dst;
len = size * op[2];
dst = dds_stream_alignto (os, op[2]);
dds_stream_write_buffer (os, len, (const uint8_t *) src);
if (dds_stream_endian () && (size != 1u))
dds_stream_swap (dst, size, op[2]);
break;
}
default: assert (0);
}
}
}
void dds_key_gen (const dds_topic_descriptor_t * const desc, dds_key_hash_t * kh, const char * sample)
{
assert(keyhash_is_reset(kh));
kh->m_set = 1;
if (desc->m_nkeys == 0)
kh->m_iskey = 1;
else if (desc->m_flagset & DDS_TOPIC_FIXED_KEY)
{
dds_stream_t os;
kh->m_iskey = 1;
dds_stream_init(&os, 0);
os.m_endian = 0;
os.m_buffer.pv = kh->m_hash;
os.m_size = 16;
dds_key_gen_stream (desc, &os, sample);
}
else
{
dds_stream_t os;
ddsrt_md5_state_t md5st;
kh->m_iskey = 0;
dds_stream_init(&os, 64);
os.m_endian = 0;
dds_key_gen_stream (desc, &os, sample);
ddsrt_md5_init (&md5st);
ddsrt_md5_append (&md5st, os.m_buffer.p8, os.m_index);
ddsrt_md5_finish (&md5st, (unsigned char *) kh->m_hash);
dds_stream_fini (&os);
}
}

View file

@ -19,7 +19,6 @@
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_freelist.h"
#include "dds__key.h"
#include "dds__stream.h"
#include "dds__serdata_builtintopic.h"
#include "dds/ddsi/ddsi_tkmap.h"

File diff suppressed because it is too large Load diff

View file

@ -43,12 +43,11 @@ struct serdatapool {
struct nn_freelist freelist;
};
typedef struct dds_key_hash {
char m_hash [16]; /* Key hash value. Also possibly key. Suitably aligned for accessing as uint32_t's */
typedef struct dds_keyhash {
unsigned char m_hash [16]; /* Key hash value. Also possibly key. Suitably aligned for accessing as uint32_t's */
unsigned m_set : 1; /* has it been initialised? */
unsigned m_iskey : 1; /* m_hash is key value */
}
dds_key_hash_t;
} dds_keyhash_t;
struct ddsi_serdata_default {
struct ddsi_serdata c;
@ -57,7 +56,7 @@ struct ddsi_serdata_default {
#ifndef NDEBUG
bool fixed;
#endif
dds_key_hash_t keyhash;
dds_keyhash_t keyhash;
struct serdatapool *pool;
struct ddsi_serdata_default *next; /* in pool->freelist */

View file

@ -20,20 +20,30 @@
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_freelist.h"
#include "dds__key.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds__stream.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#if DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN
#define NATIVE_ENCODING CDR_LE
#define NATIVE_ENCODING_PL PL_CDR_LE
#elif DDSRT_ENDIAN == DDSRT_BIG_ENDIAN
#define NATIVE_ENCODING CDR_BE
#define NATIVE_ENCODING_PL PL_CDR_BE
#else
#error "DDSRT_ENDIAN neither LITTLE nor BIG"
#endif
/* 8k entries in the freelist seems to be roughly the amount needed to send
minimum-size (well, 4 bytes) samples as fast as possible over loopback
while using large messages -- actually, it stands to reason that this would
be the same as the WHC node pool size */
#define MAX_POOL_SIZE 8192
#define MAX_SIZE_FOR_POOL 256
#define CLEAR_PADDING 0
#define DEFAULT_NEW_SIZE 128
#define CHUNK_SIZE 128
#ifndef NDEBUG
static int ispowerof2_size (size_t x)
@ -80,7 +90,7 @@ static void *serdata_default_append (struct ddsi_serdata_default **d, size_t n)
char *p;
if ((*d)->pos + n > (*d)->size)
{
size_t size1 = alignup_size ((*d)->pos + n, 128);
size_t size1 = alignup_size ((*d)->pos + n, CHUNK_SIZE);
*d = ddsrt_realloc (*d, offsetof (struct ddsi_serdata_default, data) + size1);
(*d)->size = (uint32_t)size1;
}
@ -228,36 +238,49 @@ static void serdata_default_init(struct ddsi_serdata_default *d, const struct dd
d->keyhash.m_iskey = 0;
}
static struct ddsi_serdata_default *serdata_default_allocnew(struct serdatapool *pool)
static struct ddsi_serdata_default *serdata_default_allocnew (struct serdatapool *pool, uint32_t init_size)
{
const uint32_t init_size = 128;
struct ddsi_serdata_default *d = ddsrt_malloc(offsetof (struct ddsi_serdata_default, data) + init_size);
struct ddsi_serdata_default *d = ddsrt_malloc (offsetof (struct ddsi_serdata_default, data) + init_size);
d->size = init_size;
d->pool = pool;
return d;
}
static struct ddsi_serdata_default *serdata_default_new(const struct ddsi_sertopic_default *tp, enum ddsi_serdata_kind kind)
static struct ddsi_serdata_default *serdata_default_new_size (const struct ddsi_sertopic_default *tp, enum ddsi_serdata_kind kind, uint32_t size)
{
struct ddsi_serdata_default *d;
if ((d = nn_freelist_pop (&gv.serpool->freelist)) == NULL)
d = serdata_default_allocnew(gv.serpool);
else
ddsrt_atomic_st32(&d->c.refc, 1);
serdata_default_init(d, tp, kind);
if (size <= MAX_SIZE_FOR_POOL && (d = nn_freelist_pop (&gv.serpool->freelist)) != NULL)
ddsrt_atomic_st32 (&d->c.refc, 1);
else if ((d = serdata_default_allocnew (gv.serpool, size)) == NULL)
return NULL;
serdata_default_init (d, tp, kind);
return d;
}
static struct ddsi_serdata_default *serdata_default_new (const struct ddsi_sertopic_default *tp, enum ddsi_serdata_kind kind)
{
return serdata_default_new_size (tp, kind, DEFAULT_NEW_SIZE);
}
/* Construct a serdata from a fragchain received over the network */
static struct ddsi_serdata_default *serdata_default_from_ser_common (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size)
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
/* FIXME: check whether this really is the correct maximum: offsets are relative
to the CDR header, but there are also some places that use a serdata as-if it
were a stream, and those use offsets (m_index) relative to the start of the
serdata */
if (size > UINT32_MAX - offsetof (struct ddsi_serdata_default, hdr))
return NULL;
struct ddsi_serdata_default *d = serdata_default_new_size (tp, kind, (uint32_t) size);
if (d == NULL)
return NULL;
uint32_t off = 4; /* must skip the CDR header */
assert (fragchain->min == 0);
assert (fragchain->maxp1 >= off); /* CDR header must be in first fragment */
(void)size;
memcpy (&d->hdr, NN_RMSG_PAYLOADOFF (fragchain->rmsg, NN_RDATA_PAYLOAD_OFF (fragchain)), sizeof (d->hdr));
assert (d->hdr.identifier == CDR_LE || d->hdr.identifier == CDR_BE);
@ -276,20 +299,36 @@ static struct ddsi_serdata_default *serdata_default_from_ser_common (const struc
fragchain = fragchain->nextfrag;
}
dds_stream_t is;
dds_stream_from_serdata_default (&is, d);
dds_stream_read_keyhash (&is, &d->keyhash, (const dds_topic_descriptor_t *)tp->type, kind == SDK_KEY);
return d;
const bool needs_bswap = (d->hdr.identifier != NATIVE_ENCODING);
d->hdr.identifier = NATIVE_ENCODING;
if (!dds_stream_normalize (d->data, d->pos, needs_bswap, tp, kind == SDK_KEY))
{
ddsi_serdata_unref (&d->c);
return NULL;
}
else
{
dds_istream_t is;
dds_istream_from_serdata_default (&is, d);
dds_stream_extract_keyhash (&is, &d->keyhash, tp, kind == SDK_KEY);
return d;
}
}
static struct ddsi_serdata *serdata_default_from_ser (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size)
{
return fix_serdata_default (serdata_default_from_ser_common (tpcmn, kind, fragchain, size), tpcmn->serdata_basehash);
struct ddsi_serdata_default *d;
if ((d = serdata_default_from_ser_common (tpcmn, kind, fragchain, size)) == NULL)
return NULL;
return fix_serdata_default (d, tpcmn->serdata_basehash);
}
static struct ddsi_serdata *serdata_default_from_ser_nokey (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const struct nn_rdata *fragchain, size_t size)
{
return fix_serdata_default_nokey (serdata_default_from_ser_common (tpcmn, kind, fragchain, size), tpcmn->serdata_basehash);
struct ddsi_serdata_default *d;
if ((d = serdata_default_from_ser_common (tpcmn, kind, fragchain, size)) == NULL)
return NULL;
return fix_serdata_default_nokey (d, tpcmn->serdata_basehash);
}
struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash)
@ -304,8 +343,14 @@ struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr (const struct ddsi_sertopic *
else
{
struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY);
d->hdr.identifier = CDR_BE;
if (d == NULL)
return NULL;
serdata_default_append_blob (&d, 1, sizeof (keyhash->value), keyhash->value);
if (!dds_stream_normalize (d->data, d->pos, (NATIVE_ENCODING != CDR_BE), tp, true))
{
ddsi_serdata_unref (&d->c);
return NULL;
}
memcpy (d->keyhash.m_hash, keyhash->value, sizeof (d->keyhash.m_hash));
d->keyhash.m_set = 1;
d->keyhash.m_iskey = 1;
@ -317,19 +362,52 @@ struct ddsi_serdata *ddsi_serdata_from_keyhash_cdr_nokey (const struct ddsi_sert
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, SDK_KEY);
if (d == NULL)
return NULL;
(void)keyhash;
d->keyhash.m_set = 1;
d->keyhash.m_iskey = 1;
return fix_serdata_default_nokey(d, tp->c.serdata_basehash);
}
static void gen_keyhash_from_sample (const struct ddsi_sertopic_default *topic, dds_keyhash_t *kh, const char *sample)
{
const struct dds_topic_descriptor *desc = (const struct dds_topic_descriptor *) topic->type;
kh->m_set = 1;
if (desc->m_nkeys == 0)
kh->m_iskey = 1;
else if (desc->m_flagset & DDS_TOPIC_FIXED_KEY)
{
dds_ostreamBE_t os;
kh->m_iskey = 1;
dds_ostreamBE_init (&os, 0);
os.x.m_buffer = kh->m_hash;
os.x.m_size = 16;
dds_stream_write_keyBE (&os, sample, topic);
}
else
{
dds_ostreamBE_t os;
ddsrt_md5_state_t md5st;
kh->m_iskey = 0;
dds_ostreamBE_init (&os, 64);
dds_stream_write_keyBE (&os, sample, topic);
ddsrt_md5_init (&md5st);
ddsrt_md5_append (&md5st, os.x.m_buffer, os.x.m_index);
ddsrt_md5_finish (&md5st, kh->m_hash);
dds_ostreamBE_fini (&os);
}
}
static struct ddsi_serdata_default *serdata_default_from_sample_cdr_common (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample)
{
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
dds_stream_t os;
dds_key_gen ((const dds_topic_descriptor_t *)tp->type, &d->keyhash, (char*)sample);
dds_stream_from_serdata_default (&os, d);
if (d == NULL)
return NULL;
dds_ostream_t os;
gen_keyhash_from_sample (tp, &d->keyhash, sample);
dds_ostream_from_serdata_default (&os, d);
switch (kind)
{
case SDK_EMPTY:
@ -341,18 +419,24 @@ static struct ddsi_serdata_default *serdata_default_from_sample_cdr_common (cons
dds_stream_write_sample (&os, sample, tp);
break;
}
dds_stream_add_to_serdata_default (&os, &d);
dds_ostream_add_to_serdata_default (&os, &d);
return d;
}
static struct ddsi_serdata *serdata_default_from_sample_cdr (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample)
{
return fix_serdata_default (serdata_default_from_sample_cdr_common (tpcmn, kind, sample), tpcmn->serdata_basehash);
struct ddsi_serdata_default *d;
if ((d = serdata_default_from_sample_cdr_common (tpcmn, kind, sample)) == NULL)
return NULL;
return fix_serdata_default (d, tpcmn->serdata_basehash);
}
static struct ddsi_serdata *serdata_default_from_sample_cdr_nokey (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *sample)
{
return fix_serdata_default_nokey (serdata_default_from_sample_cdr_common (tpcmn, kind, sample), tpcmn->serdata_basehash);
struct ddsi_serdata_default *d;
if ((d = serdata_default_from_sample_cdr_common (tpcmn, kind, sample)) == NULL)
return NULL;
return fix_serdata_default_nokey (d, tpcmn->serdata_basehash);
}
static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi_sertopic *tpcmn, enum ddsi_serdata_kind kind, const void *vsample)
@ -361,6 +445,8 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
const struct ddsi_plist_sample *sample = vsample;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
if (d == NULL)
return NULL;
serdata_default_append_blob (&d, 1, sample->size, sample->blob);
const unsigned char *rawkey = nn_plist_findparam_native_unchecked (sample->blob, sample->keyparam);
#ifndef NDEBUG
@ -416,6 +502,8 @@ static struct ddsi_serdata *serdata_default_from_sample_rawcdr (const struct dds
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)tpcmn;
const struct ddsi_rawcdr_sample *sample = vsample;
struct ddsi_serdata_default *d = serdata_default_new(tp, kind);
if (d == NULL)
return NULL;
assert (sample->keysize <= 16);
serdata_default_append_blob (&d, 1, sample->size, sample->blob);
d->keyhash.m_set = 1;
@ -433,7 +521,10 @@ static struct ddsi_serdata *serdata_default_to_topicless (const struct ddsi_serd
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)d->c.topic;
assert (d->hdr.identifier == NATIVE_ENCODING || d->hdr.identifier == NATIVE_ENCODING_PL);
struct ddsi_serdata_default *d_tl = serdata_default_new(tp, SDK_KEY);
if (d_tl == NULL)
return NULL;
d_tl->c.topic = NULL;
d_tl->c.hash = d->c.hash;
d_tl->c.timestamp.v = INT64_MIN;
@ -443,31 +534,31 @@ static struct ddsi_serdata *serdata_default_to_topicless (const struct ddsi_serd
the payload is of interest. */
if (d->c.ops == &ddsi_serdata_ops_cdr)
{
assert (d->hdr.identifier == NATIVE_ENCODING);
if (d->c.kind == SDK_KEY)
{
d_tl->hdr.identifier = d->hdr.identifier;
serdata_default_append_blob (&d_tl, 1, d->pos, d->data);
}
else if (d->keyhash.m_iskey)
{
d_tl->hdr.identifier = CDR_BE;
serdata_default_append_blob (&d_tl, 1, sizeof (d->keyhash.m_hash), d->keyhash.m_hash);
#if NATIVE_ENCODING != CDR_BE
bool ok = dds_stream_normalize (d_tl->data, d_tl->pos, true, tp, true);
assert (ok);
(void) ok;
#endif
}
else
{
const struct dds_topic_descriptor *desc = tp->type;
dds_stream_t is, os;
uint32_t nbytes;
dds_stream_from_serdata_default (&is, d);
dds_stream_from_serdata_default (&os, d_tl);
nbytes = dds_stream_extract_key (&is, &os, desc->m_ops, false);
os.m_index += nbytes;
dds_istream_t is;
dds_ostream_t os;
dds_istream_from_serdata_default (&is, d);
dds_ostream_from_serdata_default (&os, d_tl);
dds_stream_extract_key_from_data (&is, &os, tp);
if (os.m_index < os.m_size)
{
os.m_buffer.p8 = dds_realloc (os.m_buffer.p8, os.m_index);
os.m_buffer = dds_realloc (os.m_buffer, os.m_index);
os.m_size = os.m_index;
}
dds_stream_add_to_serdata_default (&os, &d_tl);
dds_ostream_add_to_serdata_default (&os, &d_tl);
}
}
return (struct ddsi_serdata *)d_tl;
@ -501,26 +592,30 @@ static void serdata_default_to_ser_unref (struct ddsi_serdata *serdata_common, c
static bool serdata_default_to_sample_cdr (const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
dds_stream_t is;
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *) d->c.topic;
dds_istream_t is;
if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */
dds_stream_from_serdata_default(&is, d);
assert (d->hdr.identifier == NATIVE_ENCODING);
dds_istream_from_serdata_default(&is, d);
if (d->c.kind == SDK_KEY)
dds_stream_read_key (&is, sample, (const dds_topic_descriptor_t*) ((struct ddsi_sertopic_default *)d->c.topic)->type);
dds_stream_read_key (&is, sample, tp);
else
dds_stream_read_sample (&is, sample, (const struct ddsi_sertopic_default *)d->c.topic);
dds_stream_read_sample (&is, sample, tp);
return true; /* FIXME: can't conversion to sample fail? */
}
static bool serdata_default_topicless_to_sample_cdr (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
dds_stream_t is;
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *) topic;
dds_istream_t is;
assert (d->c.topic == NULL);
assert (d->c.kind == SDK_KEY);
assert (d->c.ops == topic->serdata_ops);
assert (d->hdr.identifier == NATIVE_ENCODING);
if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */
dds_stream_from_serdata_default(&is, d);
dds_stream_read_key (&is, sample, (const dds_topic_descriptor_t*) ((struct ddsi_sertopic_default *)topic)->type);
dds_istream_from_serdata_default(&is, d);
dds_stream_read_key (&is, sample, tp);
return true; /* FIXME: can't conversion to sample fail? */
}

View file

@ -1766,8 +1766,11 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
static struct ddsi_serdata *get_serdata (struct ddsi_sertopic const * const topic, const struct nn_rdata *fragchain, uint32_t sz, int justkey, unsigned statusinfo, nn_wctime_t tstamp)
{
struct ddsi_serdata *sd = ddsi_serdata_from_ser (topic, justkey ? SDK_KEY : SDK_DATA, fragchain, sz);
sd->statusinfo = statusinfo;
sd->timestamp = tstamp;
if (sd)
{
sd->statusinfo = statusinfo;
sd->timestamp = tstamp;
}
return sd;
}

View file

@ -0,0 +1,46 @@
#
# Copyright(c) 2019 ADLINK Technology Limited and others
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
# v. 1.0 which is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#
cmake_minimum_required(VERSION 3.5)
if (NOT TARGET CycloneDDS::ddsc)
# Find the CycloneDDS package. If it is not in a default location, try
# finding it relative to the example where it most likely resides.
find_package(CycloneDDS REQUIRED PATHS ../../)
endif()
add_compile_options("-I${PROJECT_SOURCE_DIR}/ddsrt/include")
add_compile_options("-I${PROJECT_SOURCE_DIR}/core/ddsc/include")
add_compile_options("-I${PROJECT_SOURCE_DIR}/core/ddsc/src")
add_compile_options("-I${PROJECT_SOURCE_DIR}/core/ddsi/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/abstraction/os/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/database/database/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/database/serialization/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/utilities/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/kernel/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/kernel/bld/$ENV{SPLICE_TARGET}")
add_compile_options("-I$ENV{OSPL_HOME}/src/user/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/api/dcps/sac/include")
add_compile_options("-I$ENV{OSPL_HOME}/src/api/dcps/sac/code")
# This is a convenience function, provided by the CycloneDDS package,
# that will supply a library target related the the given idl file.
# In short, it takes the idl file, generates the source files with
# the proper data types and compiles them into a library.
idlc_generate(xxx_lib "xxx.idl")
# Both executables have only one related source file.
add_executable(xxx xxx-cyc.c)
# Both executables need to be linked to the idl data type library and
# the ddsc API library.
target_link_libraries(xxx xxx_lib CycloneDDS::ddsc -L$ENV{OSPL_HOME}/lib/$ENV{SPLICE_TARGET} -Wl,-rpath,$ENV{OSPL_HOME}/lib/$ENV{SPLICE_TARGET} ddskernel dcpssac)

View file

@ -0,0 +1,403 @@
#
# Copyright(c) 2019 ADLINK Technology Limited and others
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
# v. 1.0 which is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#
#use strict;
use Data::Dumper;
$Data::Dumper::Terse = 1;
$Data::Dumper::Useqq = 1;
my $outfn = "xxx";
local $nextident = "a0000";
my @types = qw(u0 u1 u2 u3 u4 seq ary str uni);
my @idltype = ("octet", "unsigned short", "unsigned long", "unsigned long long", "string");
# unions cannot have an octet as a discriminator ...
my @idltype_unidisc = ("char", "unsigned short", "unsigned long", "unsigned long long", "string");
my @ctype = ("uint8_t", "uint16_t", "uint32_t", "uint64_t", "char *");
my @probs = do {
my @ps = qw(0.3 0.3 0.3 0.3 0.3 1 1 1 1);
my (@xs, $sum);
for (@ps) { $sum += $_; push @xs, $sum; }
@xs;
};
my @noaryprobs = do {
my @ps = qw(0.3 0.3 0.3 0.3 0.3 1 0 1 1);
my (@xs, $sum);
for (@ps) { $sum += $_; push @xs, $sum; }
@xs;
};
my @unicaseprobs = do {
my @ps = qw(0.3 0.3 0.3 0.3 0.3 1 0 1 0);
my (@xs, $sum);
for (@ps) { $sum += $_; push @xs, $sum; }
@xs;
};
open IDL, ">${outfn}.idl" or die "can't open ${outfn}.idl";
open CYC, ">${outfn}-cyc.c" or die "can't open ${outfn}-cyc.c";
print CYC <<EOF;
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include "dds/dds.h"
#include "dds/ddsrt/random.h"
#include "dds/ddsrt/sockets.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds__stream.h"
#include "c_base.h"
#include "sd_cdr.h"
#include "sd_serializerXMLTypeinfo.h"
#include "v_copyIn.h"
#include "sac_genericCopyIn.h"
#include "xxx.h"
int main()
{
unsigned char garbage[1000];
struct ddsi_sertopic_default ddd;
uint32_t deser_garbage = 0;
memset (&ddd, 0, sizeof (ddd));
dds_istream_t is;
c_base base = c_create ("X", NULL, 0, 0);
dds_entity_t dp = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
if (dp < 0) abort ();
EOF
;
for (1 .. 300) {
my $t = genstr (0);
my $idl = genidltd ($t);
print IDL $idl;
(my $idlcmt = $idl) =~ s,^,//,mg;
print CYC $idlcmt;
gencyc ($t);
}
print CYC <<EOF;
dds_delete (dp);
printf ("deserialized %"PRIu32" pieces of garbage\\n", deser_garbage);
return 0;
}
EOF
;
close CYC;
close IDL;
sub gencyc {
my ($t) = @_;
print CYC <<EOF;
{
dds_entity_t tp = dds_create_topic (dp, &$t->[1]_desc, \"$t->[1]\", NULL, NULL);
if (tp < 0) abort ();
dds_entity_t rd = dds_create_reader (dp, tp, NULL, NULL);
if (rd < 0) abort ();
dds_entity_t wr = dds_create_writer (dp, tp, NULL, NULL);
if (wr < 0) abort ();
EOF
;
print CYC geninit ($t);
print CYC <<EOF;
if (dds_write (wr, &v$t->[1]) < 0) abort ();
void *msg = NULL;
dds_sample_info_t info;
if (dds_take (rd, &msg, &info, 1, 1) != 1) abort ();
const $t->[1] *b = msg;
EOF
;
print CYC gencmp ($t);
print CYC <<EOF;
ddd.type = (struct dds_topic_descriptor *) &$t->[1]_desc;
for (uint32_t i = 0; i < 1000; i++) {
for (size_t j = 0; j < sizeof (garbage); j++)
garbage[j] = (unsigned char) ddsrt_random ();
if (dds_stream_normalize (garbage, (uint32_t) sizeof (garbage), false, &ddd, false)) {
is.m_buffer = garbage;
is.m_size = 1000;
is.m_index = 0;
dds_stream_read_sample (&is, msg, &ddd);
deser_garbage++;
}
}
sd_serializer serializer = sd_serializerXMLTypeinfoNew (base, 0);
sd_serializedData meta_data = sd_serializerFromString (serializer, $t->[1]_desc.m_meta);
if (sd_serializerDeserialize (serializer, meta_data) == NULL) abort ();
c_type type = c_resolve (base, "$t->[1]"); if (!type) abort ();
sd_serializedDataFree (meta_data);
sd_serializerFree (serializer);
struct sd_cdrInfo *ci = sd_cdrInfoNew (type);
if (sd_cdrCompile (ci) < 0) abort ();
DDS_copyCache cc = DDS_copyCacheNew ((c_metaObject) type);
struct DDS_srcInfo_s src = { .src = &v$t->[1], cc };
void *samplecopy = c_new (type);
DDS_copyInStruct (base, &src, samplecopy);
struct sd_cdrSerdata *sd = sd_cdrSerializeBSwap (ci, samplecopy);
const void *blob;
uint32_t blobsz = sd_cdrSerdataBlob (&blob, sd);
/* hack alert: modifying read-only blob ...*/
if (!dds_stream_normalize ((void *) blob, blobsz, true, &ddd, false)) abort ();
is.m_buffer = blob;
is.m_size = blobsz;
is.m_index = 0;
dds_stream_read_sample (&is, msg, &ddd);
sd_cdrSerdataFree (sd);
sd = sd_cdrSerialize (ci, samplecopy);
blobsz = sd_cdrSerdataBlob (&blob, sd);
if (!dds_stream_normalize ((void *) blob, blobsz, false, &ddd, false)) abort ();
for (uint32_t i = 1; i < blobsz && i <= 16; i++) {
if (dds_stream_normalize ((void *) blob, blobsz - i, false, &ddd, false)) abort ();
}
sd_cdrSerdataFree (sd);
EOF
;
print CYC gencmp ($t);
print CYC <<EOF;
sd_cdrInfoFree (ci);
dds_return_loan (rd, &msg, 1);
dds_delete (rd);
dds_delete (wr);
dds_delete (tp);
}
EOF
;
}
sub geninit {
my ($t) = @_;
my @out;
my $res = geninit1 (" ", \@out, $t, "");
return (join "", @out) . " $t->[1] v$t->[1] = $res;\n";
}
sub gencmp {
my ($t) = @_;
my $res = gencmp1 ($t, "v$t->[1]", "");
return $res;
}
sub geninit1 {
my ($ind, $out, $t, $idxsuf) = @_;
if ($t->[0] =~ /^u([0-3])$/) {
return int (rand (10));
} elsif ($t->[0] eq "u4") {
return "\"".("x"x(int (rand (8))))."\"";
} elsif ($t->[0] eq "seq") {
my $len = int (rand (10));
my $bufref;
if ($len == 0) {
$bufref = "0";
} else {
my $buf = "vb$t->[1]_$idxsuf";
$bufref = "$buf";
my $ctype = ($t->[2]->[0] =~ /^u(\d+)$/) ? $ctype[$1] : $t->[2]->[1];
my $tmp = " $ctype $buf\[\] = {";
for (1..$len) {
$tmp .= geninit1 ("$ind", $out, $t->[2], "${idxsuf}_$_");
$tmp .= "," if $_ < $len;
}
$tmp .= "};\n";
push @$out, $tmp;
}
return "{$len,$len,$bufref,0}";
} elsif ($t->[0] eq "ary") {
my $len = $t->[3]; die unless $len > 0;
my $tmp = "{";
for (1..$len) {
$tmp .= geninit1 ("$ind", $out, $t->[2], "${idxsuf}_$_");
$tmp .= "," if $_ < $len;
}
$tmp .= "}";
return $tmp;
} elsif ($t->[0] eq "str") {
my $tmp = "{";
for (my $i = 2; $i < @$t; $i++) {
my ($name, $st) = @{$t->[$i]};
$tmp .= geninit1 ("", $out, $st, "${idxsuf}_");
$tmp .= "," if $i + 1 < @$t;
}
$tmp .= "}";
return $tmp;
} elsif ($t->[0] eq "uni") { # uni name disctype hasdef case...
my $discval = int(rand(@$t - 3)); # -3 so we generate values outside label range as well
my $hasdef = $t->[3];
my $case = (4 + $discval < @$t) ? $discval : $hasdef ? @$t-1 : 0;
$discval = ("'".chr ($discval + ord ("A"))."'") if $t->[2] eq "u0";
# $case matches have a label or default; if no default generate an initializer for the
# first case to avoid compiler warnings
my ($name, $st) = @{$t->[4+$case]};
my $tmp = "{$discval,{.$name=";
$tmp .= geninit1 ("", $out, $st, "${idxsuf}_");
$tmp .= "}}";
return $tmp;
} else {
die;
}
}
sub gencmp1 {
my ($t, $toplevel, $path) = @_;
if ($t->[0] =~ /^u([0-3])$/) {
return " if ($toplevel.$path != b->$path) abort ();\n";
} elsif ($t->[0] eq "u4") {
return " if (strcmp ($toplevel.$path, b->$path) != 0) abort ();\n";
} elsif ($t->[0] eq "seq") {
my $idx = "i".length $path;
return ("if ($toplevel.$path._length != b->$path._length) abort ();\n" .
"for (uint32_t $idx = 0; $idx < $toplevel.$path._length; $idx++) {\n" .
gencmp1 ($t->[2], $toplevel, "$path._buffer[$idx]") .
"}\n");
} elsif ($t->[0] eq "ary") {
my $len = $t->[3]; die unless $len > 0;
my $idx = "i".length $path;
return ("for (uint32_t $idx = 0; $idx < $len; $idx++) {\n" .
gencmp1 ($t->[2], $toplevel, "$path\[$idx]") .
"}\n");
} elsif ($t->[0] eq "str") {
my $sep = length $path == 0 ? "" : ".";
my $tmp = "";
for (my $i = 2; $i < @$t; $i++) {
my ($name, $st) = @{$t->[$i]};
$tmp .= gencmp1 ($st, $toplevel, "$path$sep$name");
}
return $tmp;
} elsif ($t->[0] eq "uni") { # uni name disctype hasdef case...
my $tmp = "if ($toplevel.$path._d != b->$path._d) abort ();\n";
my $hasdef = $t->[3];
$tmp .= "switch ($toplevel.$path._d) {\n";
for (my $i = 4; $i < @$t; $i++) {
my ($name, $st) = @{$t->[$i]};
my $discval = $i - 4;
$discval = "'".chr ($discval + ord ("A"))."'" if $t->[2] eq "u0";
$tmp .= ($i == @$t && $hasdef) ? " default:\n" : " case $discval:\n";
$tmp .= gencmp1 ($st, $toplevel, "$path._u.$name");
$tmp .= "break;\n";
}
$tmp .= "}\n";
return $tmp;
} else {
die;
}
}
sub genidltd {
my ($t) = @_;
my @out = ();
my $res = genidl1td ("", \@out, $t);
return (join "", @out) . $res . "#pragma keylist $t->[1]\n//------------\n";
};
sub genidl1 {
my ($ind, $out, $name, $t) = @_;
my $res = "";
if ($t->[0] =~ /^u(\d+)$/) {
$res = "${ind}$idltype[$1] $name;\n";
} elsif ($t->[0] eq "seq") {
push @$out, genidl1td ("", $out, $t);
$res = "${ind}$t->[1] $name;\n";
} elsif ($t->[0] eq "ary") {
if ($t->[2]->[0] =~ /^u(\d+)$/) {
$res = "${ind}$idltype[$1] ${name}[$t->[3]];\n";
} else {
push @$out, genidl1td ("", $out, $t->[2]);
$res = "${ind}$t->[2]->[1] ${name}[$t->[3]];\n";
}
} elsif ($t->[0] eq "str") {
push @$out, genidl1td ("", $out, $t);
$res = "${ind}$t->[1] $name;\n";
} elsif ($t->[0] eq "uni") {
push @$out, genidl1td ("", $out, $t);
$res = "${ind}$t->[1] $name;\n";
} else {
die;
}
return $res;
}
sub genidl1td {
my ($ind, $out, $t) = @_;
if ($t->[0] eq "seq") {
if ($t->[2]->[0] =~ /^u(\d+)$/) {
return "${ind}typedef sequence<$idltype[$1]> $t->[1];\n";
} else {
push @$out, genidl1td ("", $out, $t->[2]);
return "${ind}typedef sequence<$t->[2]->[1]> $t->[1];\n";
}
} elsif ($t->[0] eq "ary") {
if ($t->[2]->[0] =~ /^u(\d+)$/) {
return "${ind}typedef ${idltype[$1]} $t->[1]"."[$t->[3]];\n";
} else {
push @$out, genidl1td ("", $out, $t->[2]);
return "${ind}typedef $t->[2]->[1] $t->[1]"."[$t->[3]];\n";
}
} elsif ($t->[0] eq "str") {
my $res = "struct $t->[1] {\n";
for (my $i = 2; $i < @$t; $i++) {
$res .= genidl1 ($ind." ", $out, @{$t->[$i]});
}
$res .= "};\n";
return $res;
} elsif ($t->[0] eq "uni") {
my $hasdef = $t->[3];
die unless $t->[2] =~ /^u([0-2])$/;
my $res = "${ind}union $t->[1] switch ($idltype_unidisc[$1]) {\n";
for (my $i = 4; $i < @$t; $i++) {
my $discval = $i - 4;
$discval = "'".(chr ($discval + ord ("A")))."'" if $t->[2] eq "u0";
$res .= ($i == @$t && $hasdef) ? "$ind default: " : "$ind case $discval: ";
$res .= genidl1 ($ind." ", $out, @{$t->[$i]});
}
$res .= "};\n";
return $res;
} else {
die;
}
};
sub genu0 { return ["u0"]; }
sub genu1 { return ["u1"]; }
sub genu2 { return ["u2"]; }
sub genu3 { return ["u3"]; }
sub genu4 { return ["u4"]; }
sub genseq { return ["seq", nextident (), gentype ($_[0] + 1, @probs)]; }
sub genary { return ["ary", nextident (), gentype ($_[0] + 1, @noaryprobs), 1 + int (rand (4))]; }
sub genstr {
my @ts = ("str", nextident ());
my $n = 1 + int (rand (4));
push @ts, [ nextident (), gentype ($_[0] + 1, @probs) ] while $n--;
return \@ts;
}
sub genuni {
my @ts = ("uni", nextident (), "u".(int (rand (2))), int (rand (1))); # uni name disctype hasdef case...
my $ncases = 1 + int (rand (4));
push @ts, [ nextident (), gentype ($_[0] + 1, @unicaseprobs) ] while $ncases--;
return \@ts;
}
sub gentype {
my $t = choosetype (@_);
my $f = "gen$t";
return &$f (@_);
}
sub choosetype {
my ($lev, @probs) = @_;
my $r = rand ($_[0] == 4 ? $probs[3] : $probs[$#probs]);
my $i;
for ($i = 0; $i < $#probs; $i++) {
last if $r < $probs[$i];
}
return $types[$i];
}
sub nextident {
return $nextident++;
}

View file

@ -48,6 +48,16 @@ public class ArrayType extends AbstractType
return new ArrayType (dimensions, subtype);
}
public boolean containsUnion ()
{
Type t = subtype;
while (t instanceof TypedefType)
{
t = ((TypedefType)t).getRef ();
}
return t.containsUnion ();
}
public ArrayList <String> getMetaOp (String myname, String structname)
{
ArrayList <String> result = new ArrayList <String> ();

View file

@ -57,6 +57,11 @@ public class BasicType extends AbstractType
return new BasicType (type);
}
public boolean containsUnion ()
{
return false;
}
public ArrayList <String> getMetaOp (String myname, String structname)
{
ArrayList <String> result = new ArrayList <String> (1);

View file

@ -36,6 +36,11 @@ public class BoundedStringType extends AbstractType
return size;
}
public boolean containsUnion ()
{
return false;
}
public ArrayList <String> getMetaOp (String myname, String structname)
{
ArrayList <String> result = new ArrayList <String> ();

View file

@ -40,6 +40,11 @@ public class EnumType extends BasicType implements NamedType
return SN.toString ("_");
}
public boolean containsUnion ()
{
return false;
}
public void addEnumerand (String val)
{
vals.add (val);

View file

@ -1207,6 +1207,10 @@ public class GenVisitor extends org.eclipse.cyclonedds.parser.IDLBaseVisitor <Vo
{
topicST.add ("flags", "DDS_TOPIC_NO_OPTIMIZE");
}
if (topicmeta.containsUnion ())
{
topicST.add ("flags", "DDS_TOPIC_CONTAINS_UNION");
}
topicST.add ("alignment", topicmeta.getAlignment ());
}

View file

@ -48,6 +48,16 @@ public class SequenceType extends AbstractType
return new SequenceType (subtype.dup (), name);
}
public boolean containsUnion ()
{
Type t = subtype;
while (t instanceof TypedefType)
{
t = ((TypedefType)t).getRef ();
}
return t.containsUnion ();
}
public ArrayList <String> getMetaOp (String myname, String structname)
{
ArrayList <String> result = new ArrayList <String> ();

View file

@ -176,6 +176,23 @@ public class StructType extends AbstractType implements NamedType
return false;
}
public boolean containsUnion ()
{
for (Member m : members)
{
Type mtype = m.type;
while (mtype instanceof TypedefType)
{
mtype = ((TypedefType)mtype).getRef ();
}
if (mtype.containsUnion ())
{
return true;
}
}
return false;
}
public long getKeySize ()
{
Type mtype;

View file

@ -29,5 +29,6 @@ public interface Type
public int getMetaOpSize ();
public Alignment getAlignment ();
public Type dup ();
public boolean containsUnion ();
}

View file

@ -27,6 +27,11 @@ public class TypedefType extends AbstractType implements NamedType
return new TypedefType (name, ref);
}
public boolean containsUnion ()
{
return ref.containsUnion ();
}
public ArrayList <String> getMetaOp (String myname, String structname)
{
return ref.getMetaOp (myname, structname);

View file

@ -207,6 +207,11 @@ public class UnionType extends AbstractType implements NamedType
less than the alignment of the members. */
}
public boolean containsUnion ()
{
return true;
}
public Alignment getAlignment ()
{
Alignment result = discriminant.getAlignment ();