Encoding preparations (#329)

* Payload encoding/decoding preparations.

Signed-off-by: Martin Bremmer <martin.bremmer@adlinktech.com>

* Sub-message encoding/decoding preparations.

Signed-off-by: Martin Bremmer <martin.bremmer@adlinktech.com>

* RTPS message encoding/decoding preparations.

Signed-off-by: Martin Bremmer <martin.bremmer@adlinktech.com>

* Removed redundant destination acquiring.

Signed-off-by: Martin Bremmer <martin.bremmer@adlinktech.com>

* Refactored secure writing of RTPS messages slightly.

Signed-off-by: Martin Bremmer <martin.bremmer@adlinktech.com>
This commit is contained in:
martinbremmer 2019-12-06 16:50:31 +01:00 committed by eboasson
parent 30bd6e4c1c
commit 66c0d87886
13 changed files with 2134 additions and 121 deletions

View file

@ -12,16 +12,40 @@
#ifndef DDSI_OMG_SECURITY_H
#define DDSI_OMG_SECURITY_H
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_xmsg.h"
#if defined (__cplusplus)
extern "C" {
#endif
typedef enum {
NN_RTPS_MSG_STATE_ERROR,
NN_RTPS_MSG_STATE_PLAIN,
NN_RTPS_MSG_STATE_ENCODED
} nn_rtps_msg_state_t;
#ifdef DDSI_INCLUDE_SECURITY
typedef struct nn_msg_sec_info {
int64_t src_pp_handle;
int64_t dst_pp_handle;
bool use_rtps_encoding;
} nn_msg_sec_info_t;
/**
* @brief Check if any participant has security enabled.
*
* @returns bool
* @retval true Some participant is secure
* @retval false No participant is not secure
*/
bool q_omg_security_enabled(void);
/**
* @brief Check if security is enabled for the participant.
*
@ -33,6 +57,26 @@ extern "C" {
*/
bool q_omg_participant_is_secure(const struct participant *pp);
/**
* @brief Get the security handle of the given local participant.
*
* @param[in] pp Participant to check if it is secure.
*
* @returns int64_t
* @retval Local participant security handle
*/
int64_t q_omg_security_get_local_participant_handle(struct participant *pp);
/**
* @brief Get the security handle of the given remote participant.
*
* @param[in] proxypp Participant to check if it is secure.
*
* @returns int64_t
* @retval Remote participant security handle
*/
int64_t q_omg_security_get_remote_participant_handle(struct proxy_participant *proxypp);
/**
* @brief Get security info flags of the given writer.
*
@ -107,12 +151,355 @@ unsigned determine_publication_writer(const struct writer *wr);
* @retval true The proxy participant may be deleted.
* @retval false The proxy participant may not be deleted by this writer.
*/
bool allow_proxy_participant_deletion(struct q_globals * const gv, const struct ddsi_guid *guid, const ddsi_entityid_t pwr_entityid);
bool
is_proxy_participant_deletion_allowed(
struct q_globals * const gv,
const struct ddsi_guid *guid,
const ddsi_entityid_t pwr_entityid);
/**
* @brief Determine if the messages, related to the given remote
* entity, are RTPS protected or not.
*
* @param[in] proxy_pp Related proxy participant.
* @param[in] entityid ID of the entity to check.
*
* @returns bool
* @retval true The entity messages are RTPS protected.
* @retval false The entity messages are not RTPS protected.
*/
bool
q_omg_security_is_remote_rtps_protected(
struct proxy_participant *proxy_pp,
ddsi_entityid_t entityid);
/**
* @brief Determine if the messages, related to the given local
* entity, are RTPS protected or not.
*
* @param[in] pp Related participant.
* @param[in] entityid ID of the entity to check.
*
* @returns bool
* @retval true The entity messages are RTPS protected.
* @retval false The entity messages are not RTPS protected.
*/
bool
q_omg_security_is_local_rtps_protected(
struct participant *pp,
ddsi_entityid_t entityid);
/**
* @brief Set security information, depending on plist, into the given
* proxy participant.
*
* @param[in] proxypp Proxy participant to set security info on.
* @param[in] plist Paramater list, possibly contains security info.
*/
void
set_proxy_participant_security_info(
struct proxy_participant *proxypp,
const nn_plist_t *plist);
/**
* @brief Set security information, depending on plist and proxy participant,
* into the given proxy reader.
*
* @param[in] prd Proxy reader to set security info on.
* @param[in] plist Paramater list, possibly contains security info.
*/
void
set_proxy_reader_security_info(
struct proxy_reader *prd,
const nn_plist_t *plist);
/**
* @brief Set security information, depending on plist and proxy participant,
* into the given proxy writer.
*
* @param[in] pwr Proxy writer to set security info on.
* @param[in] plist Paramater list, possibly contains security info.
*/
void
set_proxy_writer_security_info(
struct proxy_writer *pwr,
const nn_plist_t *plist);
/**
* @brief Encode RTPS message.
*
* @param[in] src_handle Security handle of data source.
* @param[in] src_guid GUID of the entity data source.
* @param[in] src_buf Original RTPS message.
* @param[in] src_len Original RTPS message size.
* @param[out] dst_buf Encoded RTPS message.
* @param[out] dst_len Encoded RTPS message size.
* @param[in] dst_handle Security handle of data destination.
*
* @returns bool
* @retval true Encoding succeeded.
* @retval false Encoding failed.
*/
bool
q_omg_security_encode_rtps_message(
int64_t src_handle,
ddsi_guid_t *src_guid,
const unsigned char *src_buf,
const unsigned int src_len,
unsigned char **dst_buf,
unsigned int *dst_len,
int64_t dst_handle);
/**
* @brief Encode payload when necessary.
*
* When encoding is necessary, *buf will be allocated and the vec contents
* will change to point to that buffer.
* It is expected that the vec contents is always aliased.
*
* If no encoding is necessary, nothing changes.
*
* encoding( not needed) -> return( true), vec(untouched), buf(NULL)
* encoding(needed&success) -> return( true), vec( buf(new))
* encoding(needed&failure) -> return(false), vec(untouched), buf(NULL)
*
* @param[in] wr Writer that writes the payload.
* @param[in,out] vec An iovec that contains the payload.
* @param[out] buf Buffer to contain the encoded payload.
*
* @returns bool
* @retval true Encoding succeeded or not necessary. Either way, vec
* contains the payload that should be send.
* @retval false Encoding was necessary, but failed.
*/
bool
encode_payload(
struct writer *wr,
ddsrt_iovec_t *vec,
unsigned char **buf);
/**
* @brief Decode the payload of a Data submessage.
*
* When decoding is necessary, the payloadp memory will be replaced
* by the decoded payload. This means that the original submessage
* now contains payload that can be deserialized.
*
* If no decoding is necessary, nothing changes.
*
* @param[in] gv Global information.
* @param[in] sampleinfo Sample information.
* @param[in,out] payloadp Pointer to payload memory.
* @param[in] payloadsz Size of payload.
* @param[in,out] submsg_len Size of submessage.
*
* @returns bool
* @retval true Decoding succeeded or not necessary. Either way, payloadp
* contains the data that should be deserialized.
* @retval false Decoding was necessary, but failed.
*/
bool
decode_Data(
const struct q_globals *gv,
struct nn_rsample_info *sampleinfo,
unsigned char *payloadp,
uint32_t payloadsz,
size_t *submsg_len);
/**
* @brief Decode the payload of a DataFrag submessage.
*
* When decoding is necessary, the payloadp memory will be replaced
* by the decoded payload. This means that the original submessage
* now contains payload that can be deserialized.
*
* If no decoding is necessary, nothing changes.
*
* @param[in] gv Global information.
* @param[in] sampleinfo Sample information.
* @param[in,out] payloadp Pointer to payload memory.
* @param[in] payloadsz Size of payload.
* @param[in,out] submsg_len Size of submessage.
*
* @returns bool
* @retval true Decoding succeeded or not necessary. Either way, payloadp
* contains the data that should be deserialized.
* @retval false Decoding was necessary, but failed.
*/
bool
decode_DataFrag(
const struct q_globals *gv,
struct nn_rsample_info *sampleinfo,
unsigned char *payloadp,
uint32_t payloadsz,
size_t *submsg_len);
/**
* @brief Encode datareader submessage when necessary.
*
* When encoding is necessary, the original submessage will be replaced
* by a new encoded submessage.
* If the encoding fails, the original submessage will be removed.
*
* If no encoding is necessary, nothing changes.
*
* @param[in,out] msg Complete message.
* @param[in,out] sm_marker Submessage location within message.
* @param[in] pwr Writer for which the message is intended.
* @param[in] rd_guid Origin reader guid.
*/
void
encode_datareader_submsg(
struct nn_xmsg *msg,
struct nn_xmsg_marker sm_marker,
struct proxy_writer *pwr,
const struct ddsi_guid *rd_guid);
/**
* @brief Encode datawriter submessage when necessary.
*
* When encoding is necessary, the original submessage will be replaced
* by a new encoded submessage.
* If the encoding fails, the original submessage will be removed.
*
* If no encoding is necessary, nothing changes.
*
* @param[in,out] msg Complete message.
* @param[in,out] sm_marker Submessage location within message.
* @param[in] wr Origin writer guid.
*/
void
encode_datawriter_submsg(
struct nn_xmsg *msg,
struct nn_xmsg_marker sm_marker,
struct writer *wr);
/**
* @brief Check if given submessage is properly decoded.
*
* When decoding is necessary, it should be checked if a plain submessage was
* actually decoded. Otherwise data can be injected just by inserting a plain
* submessage directly.
*
* @param[in] e Entity information.
* @param[in] c Proxy endpoint information.
* @param[in] proxypp Related proxy participant.
* @param[in] rst Receiver information.
* @param[in] prev_smid Previously handled submessage ID.
*
* @returns bool
* @retval true Decoding succeeded or was not necessary.
* @retval false Decoding was necessary, but not detected.
*/
bool
validate_msg_decoding(
const struct entity_common *e,
const struct proxy_endpoint_common *c,
struct proxy_participant *proxypp,
struct receiver_state *rst,
SubmessageKind_t prev_smid);
/**
* @brief Decode not only SecPrefix, but also the SecBody and SecPostfix
* sub-messages.
*
* When encrypted, the original SecBody will be replaced by the decrypted
* submessage. Then the normal sequence can continue as if there was no
* encrypted data.
*
* @param[in] rst Receiver information.
* @param[in,out] submsg Pointer to SecPrefix/(SecBody|Submsg)/SecPostfix.
* @param[in] submsg_size Size of SecPrefix submessage.
* @param[in] msg_end End of the complete message.
* @param[in] src_prefix Prefix of the source entity.
* @param[in] dst_prefix Prefix of the destination entity.
* @param[in] byteswap Do the bytes need swapping?
*
* @returns int
* @retval >= 0 Decoding succeeded.
* @retval < 0 Decoding failed.
*/
int
decode_SecPrefix(
struct receiver_state *rst,
unsigned char *submsg,
size_t submsg_size,
unsigned char * const msg_end,
const ddsi_guid_prefix_t * const src_prefix,
const ddsi_guid_prefix_t * const dst_prefix,
int byteswap);
/**
* @brief Decode the RTPS message.
*
* When encrypted, the original buffers and information will be replaced
* by the decrypted RTPS message. Then the normal sequence can continue
* as if there was no encrypted data.
*
* @param[in] ts1 Thread information.
* @param[in] gv Global information.
* @param[in,out] rmsg Message information.
* @param[in,out] hdr Message header.
* @param[in,out] buff Message buffer.
* @param[in,out] sz Message size.
* @param[in] rbpool Buffers pool.
* @param[in] isstream Is message a stream variant?
*
* @returns nn_rtps_msg_state_t
* @retval NN_RTPS_MSG_STATE_PLAIN No decoding was necessary.
* @retval NN_RTPS_MSG_STATE_ENCODED Decoding succeeded.
* @retval NN_RTPS_MSG_STATE_ERROR Decoding failed.
*/
nn_rtps_msg_state_t
decode_rtps_message(
struct thread_state1 * const ts1,
struct q_globals *gv,
struct nn_rmsg **rmsg,
Header_t **hdr,
unsigned char **buff,
ssize_t *sz,
struct nn_rbufpool *rbpool,
bool isstream);
/**
* @brief Send the RTPS message securely.
*
* @param[in] conn Connection to use.
* @param[in] dst Possible destination information.
* @param[in] niov Number of io vectors.
* @param[in] iov Array of io vectors.
* @param[in] flags Connection write flags.
* @param[in,out] msg_len Submessage containing length.
* @param[in] dst_one Is there only one specific destination?
* @param[in] sec_info Security information for handles.
* @param[in] conn_write_cb Function to call to do the actual writing.
*
* @returns ssize_t
* @retval negative/zero Something went wrong.
* @retval positive Secure writing succeeded.
*/
ssize_t
secure_conn_write(
ddsi_tran_conn_t conn,
const nn_locator_t *dst,
size_t niov,
const ddsrt_iovec_t *iov,
uint32_t flags,
MsgLen_t *msg_len,
bool dst_one,
nn_msg_sec_info_t *sec_info,
ddsi_tran_write_fn_t conn_write_cb);
#else /* DDSI_INCLUDE_SECURITY */
#include "dds/ddsi/q_unused.h"
inline bool
q_omg_security_enabled(void)
{
return false;
}
inline bool
q_omg_participant_is_secure(
UNUSED_ARG(const struct participant *pp))
@ -135,7 +522,7 @@ determine_publication_writer(
}
inline bool
allow_proxy_participant_deletion(
is_proxy_participant_deletion_allowed(
UNUSED_ARG(struct q_globals * const gv),
UNUSED_ARG(const struct ddsi_guid *guid),
UNUSED_ARG(const ddsi_entityid_t pwr_entityid))
@ -143,6 +530,106 @@ allow_proxy_participant_deletion(
return true;
}
inline void
set_proxy_participant_security_info(
UNUSED_ARG(struct proxy_participant *prd),
UNUSED_ARG(const nn_plist_t *plist))
{
}
inline void
set_proxy_reader_security_info(
UNUSED_ARG(struct proxy_reader *prd),
UNUSED_ARG(const nn_plist_t *plist))
{
}
inline void
set_proxy_writer_security_info(
UNUSED_ARG(struct proxy_writer *pwr),
UNUSED_ARG(const nn_plist_t *plist))
{
}
inline bool
decode_Data(
UNUSED_ARG(const struct q_globals *gv),
UNUSED_ARG(struct nn_rsample_info *sampleinfo),
UNUSED_ARG(unsigned char *payloadp),
UNUSED_ARG(uint32_t payloadsz),
UNUSED_ARG(size_t *submsg_len))
{
return true;
}
inline bool
decode_DataFrag(
UNUSED_ARG(const struct q_globals *gv),
UNUSED_ARG(struct nn_rsample_info *sampleinfo),
UNUSED_ARG(unsigned char *payloadp),
UNUSED_ARG(uint32_t payloadsz),
UNUSED_ARG(size_t *submsg_len))
{
return true;
}
inline void
encode_datareader_submsg(
UNUSED_ARG(struct nn_xmsg *msg),
UNUSED_ARG(struct nn_xmsg_marker sm_marker),
UNUSED_ARG(struct proxy_writer *pwr),
UNUSED_ARG(const struct ddsi_guid *rd_guid))
{
}
inline void
encode_datawriter_submsg(
UNUSED_ARG(struct nn_xmsg *msg),
UNUSED_ARG(struct nn_xmsg_marker sm_marker),
UNUSED_ARG(struct writer *wr))
{
}
inline bool
validate_msg_decoding(
UNUSED_ARG(const struct entity_common *e),
UNUSED_ARG(const struct proxy_endpoint_common *c),
UNUSED_ARG(struct proxy_participant *proxypp),
UNUSED_ARG(struct receiver_state *rst),
UNUSED_ARG(SubmessageKind_t prev_smid))
{
return true;
}
inline int
decode_SecPrefix(
UNUSED_ARG(struct receiver_state *rst),
UNUSED_ARG(unsigned char *submsg),
UNUSED_ARG(size_t submsg_size),
UNUSED_ARG(unsigned char * const msg_end),
UNUSED_ARG(const ddsi_guid_prefix_t * const src_prefix),
UNUSED_ARG(const ddsi_guid_prefix_t * const dst_prefix),
UNUSED_ARG(int byteswap))
{
/* Just let the parsing ignore the security sub-messages. */
return true;
}
inline nn_rtps_msg_state_t
decode_rtps_message(
UNUSED_ARG(struct thread_state1 * const ts1),
UNUSED_ARG(struct q_globals *gv),
UNUSED_ARG(struct nn_rmsg **rmsg),
UNUSED_ARG(Header_t **hdr),
UNUSED_ARG(unsigned char **buff),
UNUSED_ARG(ssize_t *sz),
UNUSED_ARG(struct nn_rbufpool *rbpool),
UNUSED_ARG(bool isstream))
{
return NN_RTPS_MSG_STATE_PLAIN;
}
#endif /* DDSI_INCLUDE_SECURITY */
#if defined (__cplusplus)

View file

@ -16,6 +16,7 @@
#include "dds/ddsrt/avl.h"
#include "dds/ddsrt/sync.h"
#include "dds/ddsi/q_rtps.h"
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_protocol.h"
#include "dds/ddsi/q_lat_estim.h"
#include "dds/ddsi/q_ephash.h"
@ -314,6 +315,9 @@ struct proxy_participant
unsigned proxypp_have_spdp: 1;
unsigned proxypp_have_cm: 1;
unsigned owns_lease: 1;
#ifdef DDSI_INCLUDE_SECURITY
nn_security_info_t security_info;
#endif
};
/* Representing proxy subscriber & publishers as "groups": until DDSI2
@ -341,6 +345,9 @@ struct proxy_endpoint_common
ddsi_guid_t group_guid; /* 0:0:0:0 if not available */
nn_vendorid_t vendor; /* cached from proxypp->vendor */
seqno_t seq; /* sequence number of most recent SEDP message */
#ifdef DDSI_INCLUDE_SECURITY
nn_security_info_t security_info;
#endif
};
struct proxy_writer {

View file

@ -162,6 +162,12 @@ typedef enum SubmessageKind {
SMID_HEARTBEAT_FRAG = 0x13,
SMID_DATA = 0x15,
SMID_DATA_FRAG = 0x16,
/* security-specific sub messages */
SMID_SEC_BODY = 0x30,
SMID_SEC_PREFIX = 0x31,
SMID_SEC_POSTFIX = 0x32,
SMID_SRTPS_PREFIX = 0x33,
SMID_SRTPS_POSTFIX = 0x34,
/* vendor-specific sub messages (0x80 .. 0xff) */
SMID_PT_INFO_CONTAINER = 0x80,
SMID_PT_MSG_LEN = 0x81,

View file

@ -102,10 +102,11 @@ struct nn_rmsg {
#define NN_RMSG_PAYLOADOFF(m, o) (NN_RMSG_PAYLOAD (m) + (o))
struct receiver_state {
ddsi_guid_prefix_t src_guid_prefix; /* 12 */
ddsi_guid_prefix_t dst_guid_prefix; /* 12 */
ddsi_guid_prefix_t src_guid_prefix; /* 12 */
ddsi_guid_prefix_t dst_guid_prefix; /* 12 */
struct addrset *reply_locators; /* 4/8 */
int forme; /* 4 */
uint32_t forme:1; /* 4 */
uint32_t rtps_encoded:1; /* - */
nn_vendorid_t vendor; /* 2 */
nn_protocol_version_t protocol_version; /* 2 => 44/48 */
ddsi_tran_conn_t conn; /* Connection for request */

View file

@ -46,8 +46,8 @@ DDS_EXPORT dds_return_t xeventq_start (struct xeventq *evq, const char *name); /
DDS_EXPORT void xeventq_stop (struct xeventq *evq);
DDS_EXPORT void qxev_msg (struct xeventq *evq, struct nn_xmsg *msg);
DDS_EXPORT void qxev_pwr_entityid (struct proxy_writer * pwr, ddsi_guid_prefix_t * id);
DDS_EXPORT void qxev_prd_entityid (struct proxy_reader * prd, ddsi_guid_prefix_t * id);
DDS_EXPORT void qxev_pwr_entityid (struct proxy_writer * pwr, const ddsi_guid_t *guid);
DDS_EXPORT void qxev_prd_entityid (struct proxy_reader * prd, const ddsi_guid_t *guid);
/* Returns 1 if queued, 0 otherwise (no point in returning the
event, you can't do anything with it anyway) */

View file

@ -26,6 +26,8 @@ struct ddsi_serdata;
struct addrset;
struct proxy_reader;
struct proxy_writer;
struct writer;
struct participant;
struct nn_prismtech_participant_version_info;
struct nn_xmsgpool;
@ -54,10 +56,11 @@ void nn_xmsgpool_free (struct nn_xmsgpool *pool);
/* To allocate a new xmsg from the pool; if expected_size is NOT
exceeded, no reallocs will be performed, else the address of the
xmsg may change because of reallocing when appending to it. */
struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_prefix_t *src_guid_prefix, size_t expected_size, enum nn_xmsg_kind kind);
struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_t *src_guid, struct participant *pp, size_t expected_size, enum nn_xmsg_kind kind);
/* For sending to a particular destination (participant) */
void nn_xmsg_setdst1 (struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *addr);
void nn_xmsg_setdst1 (struct q_globals *gv, struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *addr);
bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp);
/* For sending to a particular proxy reader; this is a convenience
routine that extracts a suitable address from the proxy reader's
@ -114,7 +117,13 @@ void nn_xmsg_guid_seq_fragid (const struct nn_xmsg *m, ddsi_guid_t *wrguid, seqn
void *nn_xmsg_submsg_from_marker (struct nn_xmsg *msg, struct nn_xmsg_marker marker);
void *nn_xmsg_append (struct nn_xmsg *m, struct nn_xmsg_marker *marker, size_t sz);
void nn_xmsg_shrink (struct nn_xmsg *m, struct nn_xmsg_marker marker, size_t sz);
void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len);
void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len, struct writer *wr);
#ifdef DDSI_INCLUDE_SECURITY
size_t nn_xmsg_submsg_size (struct nn_xmsg *msg, struct nn_xmsg_marker marker);
void nn_xmsg_submsg_remove (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker);
void nn_xmsg_submsg_replace (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, unsigned char *new_submsg, size_t new_len);
void nn_xmsg_submsg_append_refd_payload (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker);
#endif
void nn_xmsg_submsg_setnext (struct nn_xmsg *msg, struct nn_xmsg_marker marker);
void nn_xmsg_submsg_init (struct nn_xmsg *msg, struct nn_xmsg_marker marker, SubmessageKind_t smkind);
void nn_xmsg_add_timestamp (struct nn_xmsg *m, nn_wctime_t t);

File diff suppressed because it is too large Load diff

View file

@ -213,7 +213,7 @@ int spdp_write (struct participant *pp)
terribly important, the msg will grow as needed, address space is
essentially meaningless because we only use the message to
construct the payload. */
mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid.prefix, 0, NN_XMSG_KIND_DATA);
mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid, NULL, 0, NN_XMSG_KIND_DATA);
nn_plist_init_empty (&ps);
ps.present |= PP_PARTICIPANT_GUID | PP_BUILTIN_ENDPOINT_SET |
@ -343,7 +343,7 @@ static int spdp_dispose_unregister_with_wr (struct participant *pp, unsigned ent
return 0;
}
mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid.prefix, 0, NN_XMSG_KIND_DATA);
mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid, NULL, 0, NN_XMSG_KIND_DATA);
nn_plist_init_empty (&ps);
ps.present |= PP_PARTICIPANT_GUID;
ps.participant_guid = pp->e.guid;
@ -437,7 +437,7 @@ static int handle_SPDP_dead (const struct receiver_state *rst, ddsi_entityid_t p
guid = datap->participant_guid;
GVLOGDISC (" %"PRIx32":%"PRIx32":%"PRIx32":%"PRIx32, PGUID (guid));
assert (guid.entityid.u == NN_ENTITYID_PARTICIPANT);
if (allow_proxy_participant_deletion(gv, &guid, pwr_entityid))
if (is_proxy_participant_deletion_allowed(gv, &guid, pwr_entityid))
{
if (delete_proxy_participant_by_guid (gv, &guid, timestamp, 0) < 0)
{
@ -999,7 +999,7 @@ static int sedp_write_endpoint
the QoS and other settings. So the header fields aren't really
important, except that they need to be set to reasonable things
or it'll crash */
mpayload = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_DATA);
mpayload = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, NULL, 0, NN_XMSG_KIND_DATA);
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
if (xqos) nn_xqos_addtomsg (mpayload, xqos, qosdiff);
nn_xmsg_addpar_sentinel (mpayload);
@ -1467,7 +1467,7 @@ int sedp_write_topic (struct participant *pp, const struct nn_plist *datap)
sedp_wr = get_sedp_writer (pp, NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER);
mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid.prefix, 0, NN_XMSG_KIND_DATA);
mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid, NULL, 0, NN_XMSG_KIND_DATA);
delta = nn_xqos_delta (&datap->qos, &sedp_wr->e.gv->default_xqos_tp, ~(uint64_t)0);
if (sedp_wr->e.gv->config.explicitly_publish_qos_set_to_default)
delta |= ~QP_UNRECOGNIZED_INCOMPATIBLE_MASK;
@ -1505,7 +1505,7 @@ int sedp_write_cm_participant (struct participant *pp, int alive)
the QoS and other settings. So the header fields aren't really
important, except that they need to be set to reasonable things
or it'll crash */
mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid.prefix, 0, NN_XMSG_KIND_DATA);
mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid, NULL, 0, NN_XMSG_KIND_DATA);
nn_plist_init_empty (&ps);
ps.present = PP_PARTICIPANT_GUID;
ps.participant_guid = pp->e.guid;

View file

@ -2119,7 +2119,7 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
ddsrt_avl_insert_ipath (&pwr_readers_treedef, &pwr->readers, m, &path);
local_reader_ary_insert(&pwr->rdary, rd);
ddsrt_mutex_unlock (&pwr->e.lock);
qxev_pwr_entityid (pwr, &rd->e.guid.prefix);
qxev_pwr_entityid (pwr, &rd->e.guid);
ELOGDISC (pwr, "\n");
@ -2163,7 +2163,7 @@ static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer
PGUID (wr->e.guid), PGUID (prd->e.guid));
ddsrt_avl_insert_ipath (&prd_writers_treedef, &prd->writers, m, &path);
ddsrt_mutex_unlock (&prd->e.lock);
qxev_prd_entityid (prd, &wr->e.guid.prefix);
qxev_prd_entityid (prd, &wr->e.guid);
}
}
@ -3974,6 +3974,8 @@ void new_proxy_participant
nn_xqos_mergein_missing (&proxypp->plist->qos, &gv->default_plist_pp.qos, ~(uint64_t)0);
ddsrt_avl_init (&proxypp_groups_treedef, &proxypp->groups);
set_proxy_participant_security_info(proxypp, plist);
if (custom_flags & CF_INC_KERNEL_SEQUENCE_NUMBERS)
proxypp->kernel_sequence_numbers = 1;
else
@ -4289,6 +4291,11 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
else
memset (&c->group_guid, 0, sizeof (c->group_guid));
#ifdef DDSI_INCLUDE_SECURITY
c->security_info.security_attributes = 0;
c->security_info.plugin_security_attributes = 0;
#endif
ref_proxy_participant (proxypp, c);
}
@ -4394,6 +4401,8 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
pwr->ddsi2direct_cb = 0;
pwr->ddsi2direct_cbarg = 0;
set_proxy_writer_security_info(pwr, plist);
local_reader_ary_init (&pwr->rdary);
/* locking the entity prevents matching while the built-in topic hasn't been published yet */
@ -4437,7 +4446,7 @@ void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset
rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid);
if (rd)
{
qxev_pwr_entityid (pwr, &rd->e.guid.prefix);
qxev_pwr_entityid (pwr, &rd->e.guid);
}
m = ddsrt_avl_iter_next (&iter);
}
@ -4494,7 +4503,7 @@ void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset
ddsrt_mutex_lock (&wr->e.lock);
rebuild_writer_addrset (wr);
ddsrt_mutex_unlock (&wr->e.lock);
qxev_prd_entityid (prd, &wr->e.guid.prefix);
qxev_prd_entityid (prd, &wr->e.guid);
}
wrguid = guid_next;
ddsrt_mutex_lock (&prd->e.lock);
@ -4582,6 +4591,8 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
#endif
prd->is_fict_trans_reader = 0;
set_proxy_reader_security_info(prd, plist);
ddsrt_avl_init (&prd_writers_treedef, &prd->writers);
/* locking the entity prevents matching while the built-in topic hasn't been published yet */

View file

@ -53,6 +53,7 @@
#include "dds/ddsi/ddsi_mcgroup.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h" /* FIXME: get rid of this */
#include "dds/ddsi/ddsi_security_omg.h"
#include "dds/ddsi/sysdeps.h"
#include "dds__whc.h"
@ -278,7 +279,34 @@ static void set_sampleinfo_proxy_writer (struct nn_rsample_info *sampleinfo, dds
sampleinfo->pwr = pwr;
}
static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, Data_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp)
static int set_sampleinfo_bswap (struct nn_rsample_info *sampleinfo, struct CDRHeader *hdr)
{
if (hdr)
{
switch (hdr->identifier)
{
case CDR_BE:
case PL_CDR_BE:
{
sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN) ? 1 : 0;
break;
}
case CDR_LE:
case PL_CDR_LE:
{
sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN) ? 0 : 1;
break;
}
default:
{
return 0;
}
}
}
return 1;
}
static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, Data_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp, uint32_t *payloadsz)
{
/* on success: sampleinfo->{seq,rst,statusinfo,pt_wr_info_zoff,bswap,complex_qos} all set */
ddsi_guid_t pwr_guid;
@ -354,6 +382,7 @@ static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, D
{
/*TRACE (("no payload\n"));*/
*payloadp = NULL;
*payloadsz = 0;
sampleinfo->size = 0;
}
else if ((size_t) ((char *) ptr + 4 - (char *) msg) > size)
@ -363,35 +392,16 @@ static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, D
}
else
{
struct CDRHeader *hdr;
sampleinfo->size = (uint32_t) ((char *) msg + size - (char *) ptr);
*payloadsz = sampleinfo->size;
*payloadp = ptr;
hdr = (struct CDRHeader *) ptr;
switch (hdr->identifier)
{
case CDR_BE:
case PL_CDR_BE:
{
sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 1 : 0);
break;
}
case CDR_LE:
case PL_CDR_LE:
{
sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 0 : 1);
break;
}
default:
return 0;
}
}
return 1;
}
static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rmsg, DataFrag_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp)
static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rmsg, DataFrag_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp, uint32_t *payloadsz)
{
/* on success: sampleinfo->{rst,statusinfo,pt_wr_info_zoff,bswap,complex_qos} all set */
uint32_t payloadsz;
ddsi_guid_t pwr_guid;
unsigned char *ptr;
@ -473,18 +483,17 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms
}
*payloadp = ptr;
payloadsz = (uint32_t) ((char *) msg + size - (char *) ptr);
if ((uint32_t) msg->fragmentsInSubmessage * msg->fragmentSize <= payloadsz)
*payloadsz = (uint32_t) ((char *) msg + size - (char *) ptr);
if ((uint32_t) msg->fragmentsInSubmessage * msg->fragmentSize <= (*payloadsz))
; /* all spec'd fragments fit in payload */
else if ((uint32_t) (msg->fragmentsInSubmessage - 1) * msg->fragmentSize >= payloadsz)
else if ((uint32_t) (msg->fragmentsInSubmessage - 1) * msg->fragmentSize >= (*payloadsz))
return 0; /* I can live with a short final fragment, but _only_ the final one */
else if ((uint32_t) (msg->fragmentStartingNum - 1) * msg->fragmentSize + payloadsz >= msg->sampleSize)
else if ((uint32_t) (msg->fragmentStartingNum - 1) * msg->fragmentSize + (*payloadsz) >= msg->sampleSize)
; /* final fragment is long enough to cover rest of sample */
else
return 0;
if (msg->fragmentStartingNum == 1)
{
struct CDRHeader *hdr = (struct CDRHeader *) ptr;
if ((size_t) ((char *) ptr + 4 - (char *) msg) > size)
{
/* no space for the header -- technically, allowing small
@ -492,23 +501,6 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms
prefer this */
return 0;
}
switch (hdr->identifier)
{
case CDR_BE:
case PL_CDR_BE:
{
sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 1 : 0);
break;
}
case CDR_LE:
case PL_CDR_LE:
{
sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 0 : 1);
break;
}
default:
return 0;
}
}
return 1;
}
@ -527,6 +519,7 @@ static int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader
gap->gapList.numbits = numbits;
memcpy (gap->bits, bits, NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits));
nn_xmsg_submsg_setnext (msg, sm_marker);
encode_datawriter_submsg(msg, sm_marker, wr);
return 0;
}
@ -537,7 +530,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *
ASSERT_MUTEX_HELD (&wr->e.lock);
assert (wr->reliable);
m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_CONTROL);
m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL);
if (nn_xmsg_setdstPRD (m, prd) < 0)
{
/* If we don't have an address, give up immediately */
@ -610,7 +603,7 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_cou
return 1;
}
static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const AckNack_t *msg, nn_wctime_t timestamp)
static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const AckNack_t *msg, nn_wctime_t timestamp, SubmessageKind_t prev_smid)
{
struct proxy_reader *prd;
struct wr_prd_match *rn;
@ -666,6 +659,12 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
return 1;
}
if (!validate_msg_decoding(&(prd->e), &(prd->c), prd->c.proxypp, rst, prev_smid))
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst));
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow);
@ -934,7 +933,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
RSTTRACE (" XGAP%"PRId64"..%"PRId64"/%u:", gapstart, gapend, gapnumbits);
for (uint32_t i = 0; i < gapnumbits; i++)
RSTTRACE ("%c", nn_bitset_isset (gapnumbits, gapbits, i) ? '1' : '0');
m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_CONTROL);
m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (m, wr->partition_id);
#endif
@ -1094,7 +1093,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand
}
}
static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, nn_wctime_t timestamp)
static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, nn_wctime_t timestamp, SubmessageKind_t prev_smid)
{
/* We now cheat: and process the heartbeat for _all_ readers,
always, regardless of the destination address in the Heartbeat
@ -1132,6 +1131,12 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
return 1;
}
if (!validate_msg_decoding(&(pwr->e), &(pwr->c), pwr->c.proxypp, rst, prev_smid))
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst));
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC,
BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
@ -1244,7 +1249,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
return 1;
}
static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime_t tnow), const HeartbeatFrag_t *msg)
static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime_t tnow), const HeartbeatFrag_t *msg, SubmessageKind_t prev_smid)
{
const seqno_t seq = fromSN (msg->writerSN);
const nn_fragment_number_t fragnum = msg->lastFragmentNum - 1; /* we do 0-based */
@ -1269,6 +1274,12 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime
return 1;
}
if (!validate_msg_decoding(&(pwr->e), &(pwr->c), pwr->c.proxypp, rst, prev_smid))
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst));
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
@ -1355,7 +1366,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime
return 1;
}
static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const NackFrag_t *msg)
static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const NackFrag_t *msg, SubmessageKind_t prev_smid)
{
struct proxy_reader *prd;
struct wr_prd_match *rn;
@ -1396,6 +1407,12 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
return 1;
}
if (!validate_msg_decoding(&(prd->e), &(prd->c), prd->c.proxypp, rst, prev_smid))
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst));
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow);
@ -1446,7 +1463,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
static uint32_t zero = 0;
struct nn_xmsg *m;
RSTTRACE (" msg not available: scheduling Gap\n");
m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_CONTROL);
m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (m, wr->partition_id);
#endif
@ -1590,7 +1607,7 @@ static int handle_one_gap (struct proxy_writer *pwr, struct pwr_rd_match *wn, se
return gap_was_valuable;
}
static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Gap_t *msg)
static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Gap_t *msg, SubmessageKind_t prev_smid)
{
/* Option 1: Process the Gap for the proxy writer and all
out-of-sync readers: what do I care which reader is being
@ -1643,6 +1660,12 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
return 1;
}
if (!validate_msg_decoding(&(pwr->e), &(pwr->c), pwr->c.proxypp, rst, prev_smid))
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst));
return 1;
}
/* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */
lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow);
@ -2346,7 +2369,7 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con
}
}
static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup)
static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup, SubmessageKind_t prev_smid)
{
RSTTRACE ("DATA("PGUIDFMT" -> "PGUIDFMT" #%"PRId64,
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u,
@ -2358,6 +2381,15 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r
return 1;
}
if (sampleinfo->pwr)
{
if (!validate_msg_decoding(&(sampleinfo->pwr->e), &(sampleinfo->pwr->c), sampleinfo->pwr->c.proxypp, rst, prev_smid))
{
RSTTRACE (" clear submsg from protected src "PGUIDFMT")", PGUID (sampleinfo->pwr->e.guid));
return 1;
}
}
if (sampleinfo->size > rst->gv->config.max_sample_size)
drop_oversize (rst, rmsg, &msg->x, sampleinfo);
else
@ -2391,7 +2423,7 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r
return 1;
}
static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup)
static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup, SubmessageKind_t prev_smid)
{
RSTTRACE ("DATAFRAG("PGUIDFMT" -> "PGUIDFMT" #%"PRId64"/[%u..%u]",
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u,
@ -2404,6 +2436,15 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct
return 1;
}
if (sampleinfo->pwr)
{
if (!validate_msg_decoding(&(sampleinfo->pwr->e), &(sampleinfo->pwr->c), sampleinfo->pwr->c.proxypp, rst, prev_smid))
{
RSTTRACE (" clear submsg from protected src "PGUIDFMT")", PGUID (sampleinfo->pwr->e.guid));
return 1;
}
}
if (sampleinfo->size > rst->gv->config.max_sample_size)
drop_oversize (rst, rmsg, &msg->x, sampleinfo);
else
@ -2606,7 +2647,8 @@ static int handle_submsg_sequence
unsigned char * const msg /* NOT const - we may byteswap it */,
const size_t len,
unsigned char * submsg /* aliases somewhere in msg */,
struct nn_rmsg * const rmsg
struct nn_rmsg * const rmsg,
bool rtps_encoded /* indicate if the message was rtps encoded */
)
{
const char *state;
@ -2618,6 +2660,7 @@ static int handle_submsg_sequence
size_t submsg_size = 0;
unsigned char * end = msg + len;
struct nn_dqueue *deferred_wakeup = NULL;
SubmessageKind_t prev_smid = SMID_PAD;
/* Receiver state is dynamically allocated with lifetime bound to
the message. Updates cause a new copy to be created if the
@ -2639,6 +2682,7 @@ static int handle_submsg_sequence
false at any time. That's ok: it's real purpose is to filter out
discovery data accidentally sent by Cloud */
rst->forme = 1;
rst->rtps_encoded = rtps_encoded;
rst->vendor = hdr->vendorid;
rst->protocol_version = hdr->version;
rst->srcloc = *srcloc;
@ -2700,14 +2744,14 @@ static int handle_submsg_sequence
state = "parse:acknack";
if (!valid_AckNack (rst, &sm->acknack, submsg_size, byteswap))
goto malformed;
handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID);
handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID, prev_smid);
ts_for_latmeas = 0;
break;
case SMID_HEARTBEAT:
state = "parse:heartbeat";
if (!valid_Heartbeat (&sm->heartbeat, submsg_size, byteswap))
goto malformed;
handle_Heartbeat (rst, tnowE, rmsg, &sm->heartbeat, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID);
handle_Heartbeat (rst, tnowE, rmsg, &sm->heartbeat, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID, prev_smid);
ts_for_latmeas = 0;
break;
case SMID_GAP:
@ -2719,7 +2763,7 @@ static int handle_submsg_sequence
rst after inserting the gap in the admin. */
if (!valid_Gap (&sm->gap, submsg_size, byteswap))
goto malformed;
handle_Gap (rst, tnowE, rmsg, &sm->gap);
handle_Gap (rst, tnowE, rmsg, &sm->gap, prev_smid);
ts_for_latmeas = 0;
break;
case SMID_INFO_TS:
@ -2763,27 +2807,37 @@ static int handle_submsg_sequence
state = "parse:nackfrag";
if (!valid_NackFrag (&sm->nackfrag, submsg_size, byteswap))
goto malformed;
handle_NackFrag (rst, tnowE, &sm->nackfrag);
handle_NackFrag (rst, tnowE, &sm->nackfrag, prev_smid);
ts_for_latmeas = 0;
break;
case SMID_HEARTBEAT_FRAG:
state = "parse:heartbeatfrag";
if (!valid_HeartbeatFrag (&sm->heartbeatfrag, submsg_size, byteswap))
goto malformed;
handle_HeartbeatFrag (rst, tnowE, &sm->heartbeatfrag);
handle_HeartbeatFrag (rst, tnowE, &sm->heartbeatfrag, prev_smid);
ts_for_latmeas = 0;
break;
case SMID_DATA_FRAG:
state = "parse:datafrag";
{
struct nn_rsample_info sampleinfo;
uint32_t datasz = 0;
unsigned char *datap;
size_t submsg_len = submsg_size;
/* valid_DataFrag does not validate the payload */
if (!valid_DataFrag (rst, rmsg, &sm->datafrag, submsg_size, byteswap, &sampleinfo, &datap))
if (!valid_DataFrag (rst, rmsg, &sm->datafrag, submsg_size, byteswap, &sampleinfo, &datap, &datasz))
goto malformed;
/* This only decodes the payload when needed (possibly reducing the submsg size). */
if (!decode_DataFrag (rst->gv, &sampleinfo, datap, datasz, &submsg_len))
goto malformed;
/* Set the sample bswap according to the payload info (only first fragment has proper header). */
if (sm->datafrag.fragmentStartingNum == 1) {
if (!set_sampleinfo_bswap(&sampleinfo, (struct CDRHeader *)datap))
goto malformed;
}
sampleinfo.timestamp = timestamp;
sampleinfo.reception_timestamp = tnowWC;
handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_size, &sampleinfo, datap, &deferred_wakeup);
handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_len, &sampleinfo, datap, &deferred_wakeup, prev_smid);
rst_live = 1;
ts_for_latmeas = 0;
}
@ -2793,12 +2847,20 @@ static int handle_submsg_sequence
{
struct nn_rsample_info sampleinfo;
unsigned char *datap;
uint32_t datasz = 0;
size_t submsg_len = submsg_size;
/* valid_Data does not validate the payload */
if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap))
if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap, &datasz))
goto malformed;
/* This only decodes the payload when needed (possibly reducing the submsg size). */
if (!decode_Data (rst->gv, &sampleinfo, datap, datasz, &submsg_len))
goto malformed;
/* Set the sample bswap according to the payload info. */
if (!set_sampleinfo_bswap(&sampleinfo, (struct CDRHeader *)datap))
goto malformed;
sampleinfo.timestamp = timestamp;
sampleinfo.reception_timestamp = tnowWC;
handle_Data (rst, tnowE, rmsg, &sm->data, submsg_size, &sampleinfo, datap, &deferred_wakeup);
handle_Data (rst, tnowE, rmsg, &sm->data, submsg_len, &sampleinfo, datap, &deferred_wakeup, prev_smid);
rst_live = 1;
ts_for_latmeas = 0;
}
@ -2819,6 +2881,38 @@ static int handle_submsg_sequence
GVTRACE ("ENTITY_ID");
break;
}
case SMID_SEC_PREFIX:
state = "parse:sec_prefix";
{
GVTRACE ("SEC_PREFIX");
if (decode_SecPrefix(rst, submsg, submsg_size, end, &rst->src_guid_prefix, &rst->dst_guid_prefix, byteswap) < 0)
goto malformed;
}
break;
case SMID_SEC_BODY:
{
/* Ignore: because it should have been handled by SMID_SEC_PREFIX. */
GVTRACE ("SEC_BODY");
}
break;
case SMID_SEC_POSTFIX:
{
/* Ignore: because it should have been handled by SMID_SEC_PREFIX. */
GVTRACE ("SEC_POSTFIX");
}
break;
case SMID_SRTPS_PREFIX:
{
/* Ignore: it should already have been handled. */
GVTRACE ("SRTPS_PREFIX");
}
break;
case SMID_SRTPS_POSTFIX:
{
/* Ignore: it should already have been handled. */
GVTRACE ("SRTPS_POSTFIX");
}
break;
default:
state = "parse:undefined";
GVTRACE ("UNDEFINED(%x)", sm->smhdr.submessageId);
@ -2849,6 +2943,7 @@ static int handle_submsg_sequence
ts_for_latmeas = 0;
break;
}
prev_smid = state_smkind;
submsg += submsg_size;
GVTRACE ("\n");
}
@ -2985,8 +3080,36 @@ static bool do_packet (struct thread_state1 * const ts1, struct q_globals *gv, d
GVTRACE ("HDR(%"PRIx32":%"PRIx32":%"PRIx32" vendor %d.%d) len %lu from %s\n",
PGUIDPREFIX (hdr->guid_prefix), hdr->vendorid.id[0], hdr->vendorid.id[1], (unsigned long) sz, addrstr);
}
nn_rtps_msg_state_t res = decode_rtps_message(ts1,
gv,
&rmsg,
&hdr,
&buff,
&sz,
rbpool,
conn->m_stream);
handle_submsg_sequence (ts1, gv, conn, &srcloc, now (), now_et (), &hdr->guid_prefix, guidprefix, buff, (size_t) sz, buff + RTPS_MESSAGE_HEADER_SIZE, rmsg);
if (res != NN_RTPS_MSG_STATE_ERROR)
{
handle_submsg_sequence (ts1,
gv,
conn,
&srcloc,
now (),
now_et (),
&hdr->guid_prefix,
guidprefix,
buff,
(size_t) sz,
buff + RTPS_MESSAGE_HEADER_SIZE,
rmsg,
res == NN_RTPS_MSG_STATE_ENCODED);
}
else
{
/* drop message */
sz = 1;
}
}
}
nn_rmsg_commit (rmsg);

View file

@ -34,6 +34,7 @@
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_sertopic.h"
#include "dds/ddsi/ddsi_security_omg.h"
#include "dds/ddsi/sysdeps.h"
#include "dds__whc.h"
@ -140,7 +141,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
assert (wr->reliable);
assert (hbansreq >= 0);
if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL)
if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL)
/* out of memory at worst slows down traffic */
return NULL;
@ -218,6 +219,13 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
add_Heartbeat (msg, wr, whcst, hbansreq, prd_guid->entityid, issync);
}
/* It is possible that the encoding removed the submessage(s). */
if (nn_xmsg_size(msg) == 0)
{
nn_xmsg_free (msg);
msg = NULL;
}
writer_hbcontrol_note_hb (wr, tnow, hbansreq);
return msg;
}
@ -379,6 +387,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
hb->count = ++wr->hbcount;
nn_xmsg_submsg_setnext (msg, sm_marker);
encode_datawriter_submsg(msg, sm_marker, wr);
}
static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg)
@ -411,7 +420,7 @@ static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t s
ASSERT_MUTEX_HELD (&wr->e.lock);
/* INFO_TS: 12 bytes, Data_t: 24 bytes, expected inline QoS: 32 => should be single chunk */
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (Data_t) + expected_inline_qos_size, NN_XMSG_KIND_DATA)) == NULL)
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTimestamp_t) + sizeof (Data_t) + expected_inline_qos_size, NN_XMSG_KIND_DATA)) == NULL)
return DDS_RETCODE_OUT_OF_RESOURCES;
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
@ -448,9 +457,9 @@ static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t s
#if TEST_KEYHASH
if (serdata->kind != SDK_KEY || !wr->include_keyhash)
nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata));
nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata), wr);
#else
nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata));
nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata), wr);
#endif
nn_xmsg_submsg_setnext (*pmsg, sm_marker);
return 0;
@ -497,7 +506,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
fragging = (gv->config.fragment_size < size);
/* INFO_TS: 12 bytes, DataFrag_t: 36 bytes, expected inline QoS: 32 => should be single chunk */
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL)
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL)
return DDS_RETCODE_OUT_OF_RESOURCES;
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
@ -618,7 +627,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
}
}
nn_xmsg_serdata (*pmsg, serdata, fragstart, fraglen);
nn_xmsg_serdata (*pmsg, serdata, fragstart, fraglen, wr);
nn_xmsg_submsg_setnext (*pmsg, sm_marker);
#if 0
GVTRACE ("queue data%s "PGUIDFMT" #%lld/%u[%u..%u)\n",
@ -626,6 +635,15 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
seq, fragnum+1, fragstart, fragstart + fraglen);
#endif
encode_datawriter_submsg(*pmsg, sm_marker, wr);
/* It is possible that the encoding removed the submessage.
* If there is no content, free the message. */
if (nn_xmsg_size(*pmsg) == 0) {
nn_xmsg_free (*pmsg);
*pmsg = NULL;
}
return ret;
}
@ -635,7 +653,7 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn
struct nn_xmsg_marker sm_marker;
HeartbeatFrag_t *hbf;
ASSERT_MUTEX_HELD (&wr->e.lock);
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (HeartbeatFrag_t), NN_XMSG_KIND_CONTROL)) == NULL)
if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (HeartbeatFrag_t), NN_XMSG_KIND_CONTROL)) == NULL)
return; /* ignore out-of-memory: HeartbeatFrag is only advisory anyway */
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (*pmsg, wr->partition_id);
@ -664,6 +682,15 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn
hbf->count = ++wr->hbfragcount;
nn_xmsg_submsg_setnext (*pmsg, sm_marker);
encode_datawriter_submsg(*pmsg, sm_marker, wr);
/* It is possible that the encoding removed the submessage.
* If there is no content, free the message. */
if (nn_xmsg_size(*pmsg) == 0)
{
nn_xmsg_free(*pmsg);
*pmsg = NULL;
}
}
#if 0

View file

@ -39,6 +39,7 @@
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/ddsi_security_omg.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds__whc.h"
@ -182,7 +183,7 @@ static void trace_msg (struct xeventq *evq, const char *func, const struct nn_xm
{
if (dds_get_log_mask() & DDS_LC_TRACE)
{
nn_guid_t wrguid;
ddsi_guid_t wrguid;
seqno_t wrseq;
nn_fragment_number_t wrfragid;
nn_xmsg_guid_seq_fragid (m, &wrguid, &wrseq, &wrfragid);
@ -805,6 +806,9 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
base, an->readerSNState.numbits);
for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0');
/* Encode the sub-message when needed. */
encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid);
}
if (nackfrag_numbits > 0)
@ -835,12 +839,15 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0');
}
/* Encode the sub-message when needed. */
encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid);
}
ETRACE (pwr, "\n");
}
static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, nn_mtime_t tnow)
static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow)
{
/* FIXME: ought to keep track of which NACKs are being generated in
response to a Heartbeat. There is no point in having multiple
@ -872,9 +879,18 @@ static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent
if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc))
{
seqno_t nack_seq;
if ((msg = nn_xmsg_new (gv->xmsgpool, &ev->u.acknack.rd_guid.prefix, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL)
struct participant *pp = NULL;
if (q_omg_security_enabled())
{
struct reader *rd = ephash_lookup_reader_guid(pwr->e.gv->guid_hash, &ev->u.acknack.rd_guid);
if (rd)
pp = rd->c.pp;
}
if ((msg = nn_xmsg_new (gv->xmsgpool, &ev->u.acknack.rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL)
goto outofmem;
nn_xmsg_setdst1 (msg, &ev->u.acknack.pwr_guid.prefix, &loc);
nn_xmsg_setdst1 (gv, msg, &ev->u.acknack.pwr_guid.prefix, &loc);
if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v)
{
/* If HB->ACK latency measurement is enabled, and we have a
@ -886,7 +902,13 @@ static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent
rwn->hb_timestamp.v = 0;
}
add_AckNack (msg, pwr, rwn, &nack_seq);
if (nack_seq)
if (nn_xmsg_size(msg) == 0)
{
/* No AckNack added. */
nn_xmsg_free(msg);
msg = NULL;
}
else if (nack_seq)
{
rwn->t_last_nack = tnow;
rwn->seq_last_nack = nack_seq;
@ -948,7 +970,7 @@ static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t
nn_plist_init_empty (&ps);
ps.present |= PP_PARTICIPANT_GUID;
ps.participant_guid = *guid;
struct nn_xmsg *mpayload = nn_xmsg_new (gv->xmsgpool, &guid->prefix, 0, NN_XMSG_KIND_DATA);
struct nn_xmsg *mpayload = nn_xmsg_new (gv->xmsgpool, guid, wr->c.pp, 0, NN_XMSG_KIND_DATA);
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
nn_xmsg_addpar_sentinel (mpayload);
nn_plist_fini (&ps);
@ -1387,7 +1409,7 @@ void qxev_msg (struct xeventq *evq, struct nn_xmsg *msg)
ddsrt_mutex_unlock (&evq->lock);
}
void qxev_prd_entityid (struct proxy_reader *prd, ddsi_guid_prefix_t *id)
void qxev_prd_entityid (struct proxy_reader *prd, const ddsi_guid_t *guid)
{
struct q_globals * const gv = prd->e.gv;
struct nn_xmsg *msg;
@ -1397,10 +1419,10 @@ void qxev_prd_entityid (struct proxy_reader *prd, ddsi_guid_prefix_t *id)
if (! gv->xevents->tev_conn->m_connless)
{
msg = nn_xmsg_new (gv->xmsgpool, id, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL);
msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL);
if (nn_xmsg_setdstPRD (msg, prd) == 0)
{
GVTRACE (" qxev_prd_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (*id));
GVTRACE (" qxev_prd_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix));
nn_xmsg_add_entityid (msg);
ddsrt_mutex_lock (&gv->xevents->lock);
ev = qxev_common_nt (gv->xevents, XEVK_ENTITYID);
@ -1415,7 +1437,7 @@ void qxev_prd_entityid (struct proxy_reader *prd, ddsi_guid_prefix_t *id)
}
}
void qxev_pwr_entityid (struct proxy_writer *pwr, ddsi_guid_prefix_t *id)
void qxev_pwr_entityid (struct proxy_writer *pwr, const ddsi_guid_t *guid)
{
struct q_globals * const gv = pwr->e.gv;
struct nn_xmsg *msg;
@ -1425,10 +1447,10 @@ void qxev_pwr_entityid (struct proxy_writer *pwr, ddsi_guid_prefix_t *id)
if (! pwr->evq->tev_conn->m_connless)
{
msg = nn_xmsg_new (gv->xmsgpool, id, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL);
msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL);
if (nn_xmsg_setdstPWR (msg, pwr) == 0)
{
GVTRACE (" qxev_pwr_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (*id));
GVTRACE (" qxev_pwr_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix));
nn_xmsg_add_entityid (msg);
ddsrt_mutex_lock (&pwr->evq->lock);
ev = qxev_common_nt (pwr->evq, XEVK_ENTITYID);

View file

@ -41,6 +41,7 @@
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/q_freelist.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/ddsi_security_omg.h"
#define NN_XMSG_MAX_ALIGN 8
#define NN_XMSG_CHUNK_SIZE 128
@ -72,6 +73,11 @@ struct nn_xmsg {
int have_params;
struct ddsi_serdata *refd_payload;
ddsrt_iovec_t refd_payload_iov;
#ifdef DDSI_INCLUDE_SECURITY
/* Used as pointer to contain encoded payload to which iov can alias. */
unsigned char *refd_payload_encoded;
nn_msg_sec_info_t sec_info;
#endif
int64_t maxdelay;
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
uint32_t encoderid;
@ -223,6 +229,9 @@ struct nn_xpack
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
uint32_t encoderId;
#endif /* DDSI_INCLUDE_NETWORK_PARTITIONS */
#ifdef DDSI_INCLUDE_SECURITY
nn_msg_sec_info_t sec_info;
#endif
};
static size_t align4u (size_t x)
@ -283,6 +292,12 @@ static void nn_xmsg_reinit (struct nn_xmsg *m, enum nn_xmsg_kind kind)
m->dstmode = NN_XMSG_DST_UNSET;
m->kind = kind;
m->maxdelay = 0;
#ifdef DDSI_INCLUDE_SECURITY
m->refd_payload_encoded = NULL;
m->sec_info.use_rtps_encoding = 0;
m->sec_info.src_pp_handle = 0;
m->sec_info.dst_pp_handle = 0;
#endif
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
m->encoderid = 0;
#endif
@ -322,14 +337,29 @@ static struct nn_xmsg *nn_xmsg_allocnew (struct nn_xmsgpool *pool, size_t expect
return m;
}
struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_prefix_t *src_guid_prefix, size_t expected_size, enum nn_xmsg_kind kind)
struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_t *src_guid, struct participant *pp, size_t expected_size, enum nn_xmsg_kind kind)
{
struct nn_xmsg *m;
if ((m = nn_freelist_pop (&pool->freelist)) != NULL)
nn_xmsg_reinit (m, kind);
else if ((m = nn_xmsg_allocnew (pool, expected_size, kind)) == NULL)
return NULL;
m->data->src.guid_prefix = nn_hton_guid_prefix (*src_guid_prefix);
m->data->src.guid_prefix = nn_hton_guid_prefix (src_guid->prefix);
#ifdef DDSI_INCLUDE_SECURITY
m->sec_info.use_rtps_encoding = 0;
if (pp && q_omg_participant_is_secure(pp))
{
if (q_omg_security_is_local_rtps_protected(pp, src_guid->entityid))
{
m->sec_info.use_rtps_encoding = 1;
m->sec_info.src_pp_handle = q_omg_security_get_local_participant_handle(pp);
}
}
#else
DDSRT_UNUSED_ARG(pp);
#endif
return m;
}
@ -344,6 +374,9 @@ void nn_xmsg_free (struct nn_xmsg *m)
struct nn_xmsgpool *pool = m->pool;
if (m->refd_payload)
ddsi_serdata_to_ser_unref (m->refd_payload, &m->refd_payload_iov);
#ifdef DDSI_INCLUDE_SECURITY
ddsrt_free(m->refd_payload_encoded);
#endif
if (m->dstmode == NN_XMSG_DST_ALL)
{
unref_addrset (m->dstaddr.all.as);
@ -385,6 +418,13 @@ static int submsg_is_compatible (const struct nn_xmsg *msg, SubmessageKind_t smk
case SMID_DATA: case SMID_DATA_FRAG:
/* but data is strictly verboten */
return 0;
case SMID_SEC_BODY:
case SMID_SEC_PREFIX:
case SMID_SEC_POSTFIX:
case SMID_SRTPS_PREFIX:
case SMID_SRTPS_POSTFIX:
/* and the security sm are basically data. */
return 0;
}
assert (0);
break;
@ -406,6 +446,13 @@ static int submsg_is_compatible (const struct nn_xmsg *msg, SubmessageKind_t smk
won't work for initial transmits, but those currently
don't allow a readerId */
return msg->kindspecific.data.readerId_off == 0;
case SMID_SEC_BODY:
case SMID_SEC_PREFIX:
case SMID_SEC_POSTFIX:
case SMID_SRTPS_PREFIX:
case SMID_SRTPS_POSTFIX:
/* Just do the same as 'normal' data sm. */
return msg->kindspecific.data.readerId_off == 0;
case SMID_ACKNACK:
case SMID_HEARTBEAT:
case SMID_GAP:
@ -494,6 +541,80 @@ void nn_xmsg_submsg_setnext (struct nn_xmsg *msg, struct nn_xmsg_marker marker)
((unsigned)(msg->data->payload + msg->sz + plsize - (char *) hdr) - RTPS_SUBMESSAGE_HEADER_SIZE);
}
#ifdef DDSI_INCLUDE_SECURITY
size_t nn_xmsg_submsg_size (struct nn_xmsg *msg, struct nn_xmsg_marker marker)
{
SubmessageHeader_t *hdr = (SubmessageHeader_t*)nn_xmsg_submsg_from_marker(msg, marker);
return align4u(hdr->octetsToNextHeader + sizeof(SubmessageHeader_t));
}
void nn_xmsg_submsg_remove(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker)
{
/* Just reset the message size to the start of the current sub-message. */
msg->sz = sm_marker.offset;
}
void nn_xmsg_submsg_replace(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, unsigned char *new_submsg, size_t new_len)
{
/* Size of current sub-message. */
size_t old_len = msg->sz - sm_marker.offset;
/* Adjust the message size to the new sub-message. */
if (old_len < new_len)
{
nn_xmsg_append(msg, NULL, new_len - old_len);
}
else if (old_len > new_len)
{
nn_xmsg_shrink(msg, sm_marker, new_len);
}
/* Just a sanity check: assert(msg_end == submsg_end) */
assert((msg->data->payload + msg->sz) == (msg->data->payload + sm_marker.offset + new_len));
/* Replace the sub-message. */
memcpy(msg->data->payload + sm_marker.offset, new_submsg, new_len);
}
void nn_xmsg_submsg_append_refd_payload(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker)
{
DDSRT_UNUSED_ARG(sm_marker);
/*
* Normally, the refd payload pointer is moved around until it is added to
* the iov of the socket. This reduces the amount of allocations and copies.
*
* However, in a few cases (like security), the sub-message should be one
* complete blob.
* Appending the payload will just do that.
*/
if (msg->refd_payload)
{
void *dst;
/* Get payload information. */
char *payload_ptr = msg->refd_payload_iov.iov_base;
size_t payload_len = msg->refd_payload_iov.iov_len;
/* Make space for the payload (dst points to the start of the appended space). */
dst = nn_xmsg_append(msg, NULL, payload_len);
/* Copy the payload into the submessage. */
memcpy(dst, payload_ptr, payload_len);
/* No need to remember the payload now. */
ddsi_serdata_unref(msg->refd_payload);
msg->refd_payload = NULL;
if (msg->refd_payload_encoded)
{
ddsrt_free(msg->refd_payload_encoded);
msg->refd_payload_encoded = NULL;
}
}
}
#endif /* DDSI_INCLUDE_SECURITY */
void *nn_xmsg_submsg_from_marker (struct nn_xmsg *msg, struct nn_xmsg_marker marker)
{
return msg->data->payload + marker.offset;
@ -560,22 +681,66 @@ void nn_xmsg_add_entityid (struct nn_xmsg * m)
nn_xmsg_submsg_setnext (m, sm);
}
void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len)
void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len, struct writer *wr)
{
if (serdata->kind != SDK_EMPTY)
{
size_t len4 = align4u (len);
assert (m->refd_payload == NULL);
m->refd_payload = ddsi_serdata_to_ser_ref (serdata, off, len4, &m->refd_payload_iov);
#ifdef DDSI_INCLUDE_SECURITY
assert (m->refd_payload_encoded == NULL);
/* When encoding is necessary, m->refd_payload_encoded will be allocated
* and m->refd_payload_iov contents will change to point to that buffer.
* If no encoding is necessary, nothing changes. */
if (!encode_payload(wr, &(m->refd_payload_iov), &(m->refd_payload_encoded)))
{
DDS_CWARNING (&wr->e.gv->logconfig, "nn_xmsg_serdata: failed to encrypt data for "PGUIDFMT"", PGUID (wr->e.guid));
ddsi_serdata_to_ser_unref (m->refd_payload, &m->refd_payload_iov);
assert (m->refd_payload_encoded == NULL);
m->refd_payload_iov.iov_base = NULL;
m->refd_payload_iov.iov_len = 0;
m->refd_payload = NULL;
}
#else
DDSRT_UNUSED_ARG(wr);
#endif
}
}
void nn_xmsg_setdst1 (struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *loc)
void nn_xmsg_setdst1 (struct q_globals *gv, struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *loc)
{
assert (m->dstmode == NN_XMSG_DST_UNSET);
m->dstmode = NN_XMSG_DST_ONE;
m->dstaddr.one.loc = *loc;
m->data->dst.guid_prefix = nn_hton_guid_prefix (*gp);
#ifdef DDSI_INCLUDE_SECURITY
if (m->sec_info.use_rtps_encoding && !m->sec_info.dst_pp_handle)
{
struct proxy_participant *proxypp;
ddsi_guid_t guid;
guid.prefix = *gp;
guid.entityid.u = NN_ENTITYID_PARTICIPANT;
proxypp = ephash_lookup_proxy_participant_guid(gv->guid_hash, &guid);
if (proxypp)
m->sec_info.dst_pp_handle = q_omg_security_get_remote_participant_handle(proxypp);
}
#else
DDSRT_UNUSED_ARG(gv);
#endif
}
bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp)
{
if (m->dstmode == NN_XMSG_DST_ONE)
{
*gp = nn_hton_guid_prefix(m->data->dst.guid_prefix);
return true;
}
return false;
}
dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd)
@ -583,7 +748,7 @@ dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *pr
nn_locator_t loc;
if (addrset_any_uc (prd->c.as, &loc) || addrset_any_mc (prd->c.as, &loc))
{
nn_xmsg_setdst1 (m, &prd->e.guid.prefix, &loc);
nn_xmsg_setdst1 (prd->e.gv, m, &prd->e.guid.prefix, &loc);
return 0;
}
else
@ -598,7 +763,7 @@ dds_return_t nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pw
nn_locator_t loc;
if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc))
{
nn_xmsg_setdst1 (m, &pwr->e.guid.prefix, &loc);
nn_xmsg_setdst1 (pwr->e.gv, m, &pwr->e.guid.prefix, &loc);
return 0;
}
DDS_CWARNING (&pwr->e.gv->logconfig, "nn_xmsg_setdstPRD: no address for "PGUIDFMT, PGUID (pwr->e.guid));
@ -953,6 +1118,9 @@ static void nn_xpack_reinit (struct nn_xpack *xp)
xp->msg_len.length = 0;
xp->included_msgs.latest = NULL;
xp->maxdelay = T_NEVER;
#ifdef DDSI_INCLUDE_SECURITY
xp->sec_info.use_rtps_encoding = 0;
#endif
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
xp->encoderId = 0;
#endif
@ -1012,6 +1180,34 @@ void nn_xpack_free (struct nn_xpack *xp)
ddsrt_free (xp);
}
static ssize_t nn_xpack_send_rtps(struct nn_xpack * xp, const nn_locator_t *loc)
{
ssize_t ret = -1;
#ifdef DDSI_INCLUDE_SECURITY
/* Only encode when needed. */
if (xp->sec_info.use_rtps_encoding)
{
ret = secure_conn_write(
xp->conn,
loc,
xp->niov,
xp->iov,
xp->call_flags,
&(xp->msg_len),
(xp->dstmode == NN_XMSG_DST_ONE),
&(xp->sec_info),
ddsi_conn_write);
}
else
#endif /* DDSI_INCLUDE_SECURITY */
{
ret = ddsi_conn_write (xp->conn, loc, xp->niov, xp->iov, xp->call_flags);
}
return ret;
}
static ssize_t nn_xpack_send1 (const nn_locator_t *loc, void * varg)
{
struct nn_xpack *xp = varg;
@ -1038,14 +1234,17 @@ static ssize_t nn_xpack_send1 (const nn_locator_t *loc, void * varg)
if (!gv->mute)
{
nbytes = ddsi_conn_write (xp->conn, loc, xp->niov, xp->iov, xp->call_flags);
nbytes = nn_xpack_send_rtps(xp, loc);
#ifndef NDEBUG
{
size_t i, len;
for (i = 0, len = 0; i < xp->niov; i++) {
len += xp->iov[i].iov_len;
}
assert (nbytes == -1 || (size_t) nbytes == len);
/* Possible number of bytes written can be larger
* due to security. */
assert (nbytes == -1 || (size_t) nbytes >= len);
}
#endif
}
@ -1338,6 +1537,12 @@ static int nn_xpack_mayaddmsg (const struct nn_xpack *xp, const struct nn_xmsg *
return 0;
#endif
#ifdef DDSI_INCLUDE_SECURITY
/* Don't mix up encoded and plain rtps messages */
if (xp->sec_info.use_rtps_encoding != m->sec_info.use_rtps_encoding)
return 0;
#endif
return addressing_info_eq_onesidederr (xp, m);
}
@ -1431,6 +1636,9 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag
niov++;
}
#ifdef DDSI_INCLUDE_SECURITY
xp->sec_info = m->sec_info;
#endif
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
xp->encoderId = m->encoderid;
#endif