diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h b/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h index 298370f..316954d 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_security_omg.h @@ -12,16 +12,40 @@ #ifndef DDSI_OMG_SECURITY_H #define DDSI_OMG_SECURITY_H -#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_plist.h" +#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_globals.h" +#include "dds/ddsi/q_radmin.h" +#include "dds/ddsi/q_xmsg.h" #if defined (__cplusplus) extern "C" { #endif +typedef enum { + NN_RTPS_MSG_STATE_ERROR, + NN_RTPS_MSG_STATE_PLAIN, + NN_RTPS_MSG_STATE_ENCODED +} nn_rtps_msg_state_t; + #ifdef DDSI_INCLUDE_SECURITY +typedef struct nn_msg_sec_info { + int64_t src_pp_handle; + int64_t dst_pp_handle; + bool use_rtps_encoding; +} nn_msg_sec_info_t; + + +/** + * @brief Check if any participant has security enabled. + * + * @returns bool + * @retval true Some participant is secure + * @retval false No participant is not secure + */ +bool q_omg_security_enabled(void); + /** * @brief Check if security is enabled for the participant. * @@ -33,6 +57,26 @@ extern "C" { */ bool q_omg_participant_is_secure(const struct participant *pp); +/** + * @brief Get the security handle of the given local participant. + * + * @param[in] pp Participant to check if it is secure. + * + * @returns int64_t + * @retval Local participant security handle + */ +int64_t q_omg_security_get_local_participant_handle(struct participant *pp); + +/** + * @brief Get the security handle of the given remote participant. + * + * @param[in] proxypp Participant to check if it is secure. + * + * @returns int64_t + * @retval Remote participant security handle + */ +int64_t q_omg_security_get_remote_participant_handle(struct proxy_participant *proxypp); + /** * @brief Get security info flags of the given writer. * @@ -107,12 +151,355 @@ unsigned determine_publication_writer(const struct writer *wr); * @retval true The proxy participant may be deleted. * @retval false The proxy participant may not be deleted by this writer. */ -bool allow_proxy_participant_deletion(struct q_globals * const gv, const struct ddsi_guid *guid, const ddsi_entityid_t pwr_entityid); +bool +is_proxy_participant_deletion_allowed( + struct q_globals * const gv, + const struct ddsi_guid *guid, + const ddsi_entityid_t pwr_entityid); + +/** + * @brief Determine if the messages, related to the given remote + * entity, are RTPS protected or not. + * + * @param[in] proxy_pp Related proxy participant. + * @param[in] entityid ID of the entity to check. + * + * @returns bool + * @retval true The entity messages are RTPS protected. + * @retval false The entity messages are not RTPS protected. + */ +bool +q_omg_security_is_remote_rtps_protected( + struct proxy_participant *proxy_pp, + ddsi_entityid_t entityid); + +/** + * @brief Determine if the messages, related to the given local + * entity, are RTPS protected or not. + * + * @param[in] pp Related participant. + * @param[in] entityid ID of the entity to check. + * + * @returns bool + * @retval true The entity messages are RTPS protected. + * @retval false The entity messages are not RTPS protected. + */ +bool +q_omg_security_is_local_rtps_protected( + struct participant *pp, + ddsi_entityid_t entityid); + +/** + * @brief Set security information, depending on plist, into the given + * proxy participant. + * + * @param[in] proxypp Proxy participant to set security info on. + * @param[in] plist Paramater list, possibly contains security info. + */ +void +set_proxy_participant_security_info( + struct proxy_participant *proxypp, + const nn_plist_t *plist); + +/** + * @brief Set security information, depending on plist and proxy participant, + * into the given proxy reader. + * + * @param[in] prd Proxy reader to set security info on. + * @param[in] plist Paramater list, possibly contains security info. + */ +void +set_proxy_reader_security_info( + struct proxy_reader *prd, + const nn_plist_t *plist); + +/** + * @brief Set security information, depending on plist and proxy participant, + * into the given proxy writer. + * + * @param[in] pwr Proxy writer to set security info on. + * @param[in] plist Paramater list, possibly contains security info. + */ +void +set_proxy_writer_security_info( + struct proxy_writer *pwr, + const nn_plist_t *plist); + +/** + * @brief Encode RTPS message. + * + * @param[in] src_handle Security handle of data source. + * @param[in] src_guid GUID of the entity data source. + * @param[in] src_buf Original RTPS message. + * @param[in] src_len Original RTPS message size. + * @param[out] dst_buf Encoded RTPS message. + * @param[out] dst_len Encoded RTPS message size. + * @param[in] dst_handle Security handle of data destination. + * + * @returns bool + * @retval true Encoding succeeded. + * @retval false Encoding failed. + */ +bool +q_omg_security_encode_rtps_message( + int64_t src_handle, + ddsi_guid_t *src_guid, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len, + int64_t dst_handle); + +/** + * @brief Encode payload when necessary. + * + * When encoding is necessary, *buf will be allocated and the vec contents + * will change to point to that buffer. + * It is expected that the vec contents is always aliased. + * + * If no encoding is necessary, nothing changes. + * + * encoding( not needed) -> return( true), vec(untouched), buf(NULL) + * encoding(needed&success) -> return( true), vec( buf(new)) + * encoding(needed&failure) -> return(false), vec(untouched), buf(NULL) + * + * @param[in] wr Writer that writes the payload. + * @param[in,out] vec An iovec that contains the payload. + * @param[out] buf Buffer to contain the encoded payload. + * + * @returns bool + * @retval true Encoding succeeded or not necessary. Either way, vec + * contains the payload that should be send. + * @retval false Encoding was necessary, but failed. + */ +bool +encode_payload( + struct writer *wr, + ddsrt_iovec_t *vec, + unsigned char **buf); + +/** + * @brief Decode the payload of a Data submessage. + * + * When decoding is necessary, the payloadp memory will be replaced + * by the decoded payload. This means that the original submessage + * now contains payload that can be deserialized. + * + * If no decoding is necessary, nothing changes. + * + * @param[in] gv Global information. + * @param[in] sampleinfo Sample information. + * @param[in,out] payloadp Pointer to payload memory. + * @param[in] payloadsz Size of payload. + * @param[in,out] submsg_len Size of submessage. + * + * @returns bool + * @retval true Decoding succeeded or not necessary. Either way, payloadp + * contains the data that should be deserialized. + * @retval false Decoding was necessary, but failed. + */ +bool +decode_Data( + const struct q_globals *gv, + struct nn_rsample_info *sampleinfo, + unsigned char *payloadp, + uint32_t payloadsz, + size_t *submsg_len); + +/** + * @brief Decode the payload of a DataFrag submessage. + * + * When decoding is necessary, the payloadp memory will be replaced + * by the decoded payload. This means that the original submessage + * now contains payload that can be deserialized. + * + * If no decoding is necessary, nothing changes. + * + * @param[in] gv Global information. + * @param[in] sampleinfo Sample information. + * @param[in,out] payloadp Pointer to payload memory. + * @param[in] payloadsz Size of payload. + * @param[in,out] submsg_len Size of submessage. + * + * @returns bool + * @retval true Decoding succeeded or not necessary. Either way, payloadp + * contains the data that should be deserialized. + * @retval false Decoding was necessary, but failed. + */ +bool +decode_DataFrag( + const struct q_globals *gv, + struct nn_rsample_info *sampleinfo, + unsigned char *payloadp, + uint32_t payloadsz, + size_t *submsg_len); + +/** + * @brief Encode datareader submessage when necessary. + * + * When encoding is necessary, the original submessage will be replaced + * by a new encoded submessage. + * If the encoding fails, the original submessage will be removed. + * + * If no encoding is necessary, nothing changes. + * + * @param[in,out] msg Complete message. + * @param[in,out] sm_marker Submessage location within message. + * @param[in] pwr Writer for which the message is intended. + * @param[in] rd_guid Origin reader guid. + */ +void +encode_datareader_submsg( + struct nn_xmsg *msg, + struct nn_xmsg_marker sm_marker, + struct proxy_writer *pwr, + const struct ddsi_guid *rd_guid); + +/** + * @brief Encode datawriter submessage when necessary. + * + * When encoding is necessary, the original submessage will be replaced + * by a new encoded submessage. + * If the encoding fails, the original submessage will be removed. + * + * If no encoding is necessary, nothing changes. + * + * @param[in,out] msg Complete message. + * @param[in,out] sm_marker Submessage location within message. + * @param[in] wr Origin writer guid. + */ +void +encode_datawriter_submsg( + struct nn_xmsg *msg, + struct nn_xmsg_marker sm_marker, + struct writer *wr); + +/** + * @brief Check if given submessage is properly decoded. + * + * When decoding is necessary, it should be checked if a plain submessage was + * actually decoded. Otherwise data can be injected just by inserting a plain + * submessage directly. + * + * @param[in] e Entity information. + * @param[in] c Proxy endpoint information. + * @param[in] proxypp Related proxy participant. + * @param[in] rst Receiver information. + * @param[in] prev_smid Previously handled submessage ID. + * + * @returns bool + * @retval true Decoding succeeded or was not necessary. + * @retval false Decoding was necessary, but not detected. + */ +bool +validate_msg_decoding( + const struct entity_common *e, + const struct proxy_endpoint_common *c, + struct proxy_participant *proxypp, + struct receiver_state *rst, + SubmessageKind_t prev_smid); + +/** + * @brief Decode not only SecPrefix, but also the SecBody and SecPostfix + * sub-messages. + * + * When encrypted, the original SecBody will be replaced by the decrypted + * submessage. Then the normal sequence can continue as if there was no + * encrypted data. + * + * @param[in] rst Receiver information. + * @param[in,out] submsg Pointer to SecPrefix/(SecBody|Submsg)/SecPostfix. + * @param[in] submsg_size Size of SecPrefix submessage. + * @param[in] msg_end End of the complete message. + * @param[in] src_prefix Prefix of the source entity. + * @param[in] dst_prefix Prefix of the destination entity. + * @param[in] byteswap Do the bytes need swapping? + * + * @returns int + * @retval >= 0 Decoding succeeded. + * @retval < 0 Decoding failed. + */ +int +decode_SecPrefix( + struct receiver_state *rst, + unsigned char *submsg, + size_t submsg_size, + unsigned char * const msg_end, + const ddsi_guid_prefix_t * const src_prefix, + const ddsi_guid_prefix_t * const dst_prefix, + int byteswap); + +/** + * @brief Decode the RTPS message. + * + * When encrypted, the original buffers and information will be replaced + * by the decrypted RTPS message. Then the normal sequence can continue + * as if there was no encrypted data. + * + * @param[in] ts1 Thread information. + * @param[in] gv Global information. + * @param[in,out] rmsg Message information. + * @param[in,out] hdr Message header. + * @param[in,out] buff Message buffer. + * @param[in,out] sz Message size. + * @param[in] rbpool Buffers pool. + * @param[in] isstream Is message a stream variant? + * + * @returns nn_rtps_msg_state_t + * @retval NN_RTPS_MSG_STATE_PLAIN No decoding was necessary. + * @retval NN_RTPS_MSG_STATE_ENCODED Decoding succeeded. + * @retval NN_RTPS_MSG_STATE_ERROR Decoding failed. + */ +nn_rtps_msg_state_t +decode_rtps_message( + struct thread_state1 * const ts1, + struct q_globals *gv, + struct nn_rmsg **rmsg, + Header_t **hdr, + unsigned char **buff, + ssize_t *sz, + struct nn_rbufpool *rbpool, + bool isstream); + +/** + * @brief Send the RTPS message securely. + * + * @param[in] conn Connection to use. + * @param[in] dst Possible destination information. + * @param[in] niov Number of io vectors. + * @param[in] iov Array of io vectors. + * @param[in] flags Connection write flags. + * @param[in,out] msg_len Submessage containing length. + * @param[in] dst_one Is there only one specific destination? + * @param[in] sec_info Security information for handles. + * @param[in] conn_write_cb Function to call to do the actual writing. + * + * @returns ssize_t + * @retval negative/zero Something went wrong. + * @retval positive Secure writing succeeded. + */ +ssize_t +secure_conn_write( + ddsi_tran_conn_t conn, + const nn_locator_t *dst, + size_t niov, + const ddsrt_iovec_t *iov, + uint32_t flags, + MsgLen_t *msg_len, + bool dst_one, + nn_msg_sec_info_t *sec_info, + ddsi_tran_write_fn_t conn_write_cb); #else /* DDSI_INCLUDE_SECURITY */ #include "dds/ddsi/q_unused.h" +inline bool +q_omg_security_enabled(void) +{ + return false; +} + inline bool q_omg_participant_is_secure( UNUSED_ARG(const struct participant *pp)) @@ -135,7 +522,7 @@ determine_publication_writer( } inline bool -allow_proxy_participant_deletion( +is_proxy_participant_deletion_allowed( UNUSED_ARG(struct q_globals * const gv), UNUSED_ARG(const struct ddsi_guid *guid), UNUSED_ARG(const ddsi_entityid_t pwr_entityid)) @@ -143,6 +530,106 @@ allow_proxy_participant_deletion( return true; } +inline void +set_proxy_participant_security_info( + UNUSED_ARG(struct proxy_participant *prd), + UNUSED_ARG(const nn_plist_t *plist)) +{ +} + +inline void +set_proxy_reader_security_info( + UNUSED_ARG(struct proxy_reader *prd), + UNUSED_ARG(const nn_plist_t *plist)) +{ +} + +inline void +set_proxy_writer_security_info( + UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const nn_plist_t *plist)) +{ +} + + +inline bool +decode_Data( + UNUSED_ARG(const struct q_globals *gv), + UNUSED_ARG(struct nn_rsample_info *sampleinfo), + UNUSED_ARG(unsigned char *payloadp), + UNUSED_ARG(uint32_t payloadsz), + UNUSED_ARG(size_t *submsg_len)) +{ + return true; +} + +inline bool +decode_DataFrag( + UNUSED_ARG(const struct q_globals *gv), + UNUSED_ARG(struct nn_rsample_info *sampleinfo), + UNUSED_ARG(unsigned char *payloadp), + UNUSED_ARG(uint32_t payloadsz), + UNUSED_ARG(size_t *submsg_len)) +{ + return true; +} + +inline void +encode_datareader_submsg( + UNUSED_ARG(struct nn_xmsg *msg), + UNUSED_ARG(struct nn_xmsg_marker sm_marker), + UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const struct ddsi_guid *rd_guid)) +{ +} + +inline void +encode_datawriter_submsg( + UNUSED_ARG(struct nn_xmsg *msg), + UNUSED_ARG(struct nn_xmsg_marker sm_marker), + UNUSED_ARG(struct writer *wr)) +{ +} + +inline bool +validate_msg_decoding( + UNUSED_ARG(const struct entity_common *e), + UNUSED_ARG(const struct proxy_endpoint_common *c), + UNUSED_ARG(struct proxy_participant *proxypp), + UNUSED_ARG(struct receiver_state *rst), + UNUSED_ARG(SubmessageKind_t prev_smid)) +{ + return true; +} + +inline int +decode_SecPrefix( + UNUSED_ARG(struct receiver_state *rst), + UNUSED_ARG(unsigned char *submsg), + UNUSED_ARG(size_t submsg_size), + UNUSED_ARG(unsigned char * const msg_end), + UNUSED_ARG(const ddsi_guid_prefix_t * const src_prefix), + UNUSED_ARG(const ddsi_guid_prefix_t * const dst_prefix), + UNUSED_ARG(int byteswap)) +{ + /* Just let the parsing ignore the security sub-messages. */ + return true; +} + +inline nn_rtps_msg_state_t +decode_rtps_message( + UNUSED_ARG(struct thread_state1 * const ts1), + UNUSED_ARG(struct q_globals *gv), + UNUSED_ARG(struct nn_rmsg **rmsg), + UNUSED_ARG(Header_t **hdr), + UNUSED_ARG(unsigned char **buff), + UNUSED_ARG(ssize_t *sz), + UNUSED_ARG(struct nn_rbufpool *rbpool), + UNUSED_ARG(bool isstream)) +{ + return NN_RTPS_MSG_STATE_PLAIN; +} + #endif /* DDSI_INCLUDE_SECURITY */ #if defined (__cplusplus) diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index a694140..d782994 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -16,6 +16,7 @@ #include "dds/ddsrt/avl.h" #include "dds/ddsrt/sync.h" #include "dds/ddsi/q_rtps.h" +#include "dds/ddsi/q_plist.h" #include "dds/ddsi/q_protocol.h" #include "dds/ddsi/q_lat_estim.h" #include "dds/ddsi/q_ephash.h" @@ -314,6 +315,9 @@ struct proxy_participant unsigned proxypp_have_spdp: 1; unsigned proxypp_have_cm: 1; unsigned owns_lease: 1; +#ifdef DDSI_INCLUDE_SECURITY + nn_security_info_t security_info; +#endif }; /* Representing proxy subscriber & publishers as "groups": until DDSI2 @@ -341,6 +345,9 @@ struct proxy_endpoint_common ddsi_guid_t group_guid; /* 0:0:0:0 if not available */ nn_vendorid_t vendor; /* cached from proxypp->vendor */ seqno_t seq; /* sequence number of most recent SEDP message */ +#ifdef DDSI_INCLUDE_SECURITY + nn_security_info_t security_info; +#endif }; struct proxy_writer { diff --git a/src/core/ddsi/include/dds/ddsi/q_protocol.h b/src/core/ddsi/include/dds/ddsi/q_protocol.h index 0fbbc64..3e58cb6 100644 --- a/src/core/ddsi/include/dds/ddsi/q_protocol.h +++ b/src/core/ddsi/include/dds/ddsi/q_protocol.h @@ -162,6 +162,12 @@ typedef enum SubmessageKind { SMID_HEARTBEAT_FRAG = 0x13, SMID_DATA = 0x15, SMID_DATA_FRAG = 0x16, + /* security-specific sub messages */ + SMID_SEC_BODY = 0x30, + SMID_SEC_PREFIX = 0x31, + SMID_SEC_POSTFIX = 0x32, + SMID_SRTPS_PREFIX = 0x33, + SMID_SRTPS_POSTFIX = 0x34, /* vendor-specific sub messages (0x80 .. 0xff) */ SMID_PT_INFO_CONTAINER = 0x80, SMID_PT_MSG_LEN = 0x81, diff --git a/src/core/ddsi/include/dds/ddsi/q_radmin.h b/src/core/ddsi/include/dds/ddsi/q_radmin.h index 61814bf..862d1ee 100644 --- a/src/core/ddsi/include/dds/ddsi/q_radmin.h +++ b/src/core/ddsi/include/dds/ddsi/q_radmin.h @@ -102,10 +102,11 @@ struct nn_rmsg { #define NN_RMSG_PAYLOADOFF(m, o) (NN_RMSG_PAYLOAD (m) + (o)) struct receiver_state { - ddsi_guid_prefix_t src_guid_prefix; /* 12 */ - ddsi_guid_prefix_t dst_guid_prefix; /* 12 */ + ddsi_guid_prefix_t src_guid_prefix; /* 12 */ + ddsi_guid_prefix_t dst_guid_prefix; /* 12 */ struct addrset *reply_locators; /* 4/8 */ - int forme; /* 4 */ + uint32_t forme:1; /* 4 */ + uint32_t rtps_encoded:1; /* - */ nn_vendorid_t vendor; /* 2 */ nn_protocol_version_t protocol_version; /* 2 => 44/48 */ ddsi_tran_conn_t conn; /* Connection for request */ diff --git a/src/core/ddsi/include/dds/ddsi/q_xevent.h b/src/core/ddsi/include/dds/ddsi/q_xevent.h index ab25b3e..c6b9794 100644 --- a/src/core/ddsi/include/dds/ddsi/q_xevent.h +++ b/src/core/ddsi/include/dds/ddsi/q_xevent.h @@ -46,8 +46,8 @@ DDS_EXPORT dds_return_t xeventq_start (struct xeventq *evq, const char *name); / DDS_EXPORT void xeventq_stop (struct xeventq *evq); DDS_EXPORT void qxev_msg (struct xeventq *evq, struct nn_xmsg *msg); -DDS_EXPORT void qxev_pwr_entityid (struct proxy_writer * pwr, ddsi_guid_prefix_t * id); -DDS_EXPORT void qxev_prd_entityid (struct proxy_reader * prd, ddsi_guid_prefix_t * id); +DDS_EXPORT void qxev_pwr_entityid (struct proxy_writer * pwr, const ddsi_guid_t *guid); +DDS_EXPORT void qxev_prd_entityid (struct proxy_reader * prd, const ddsi_guid_t *guid); /* Returns 1 if queued, 0 otherwise (no point in returning the event, you can't do anything with it anyway) */ diff --git a/src/core/ddsi/include/dds/ddsi/q_xmsg.h b/src/core/ddsi/include/dds/ddsi/q_xmsg.h index 78463a7..24f32b0 100644 --- a/src/core/ddsi/include/dds/ddsi/q_xmsg.h +++ b/src/core/ddsi/include/dds/ddsi/q_xmsg.h @@ -26,6 +26,8 @@ struct ddsi_serdata; struct addrset; struct proxy_reader; struct proxy_writer; +struct writer; +struct participant; struct nn_prismtech_participant_version_info; struct nn_xmsgpool; @@ -54,10 +56,11 @@ void nn_xmsgpool_free (struct nn_xmsgpool *pool); /* To allocate a new xmsg from the pool; if expected_size is NOT exceeded, no reallocs will be performed, else the address of the xmsg may change because of reallocing when appending to it. */ -struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_prefix_t *src_guid_prefix, size_t expected_size, enum nn_xmsg_kind kind); +struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_t *src_guid, struct participant *pp, size_t expected_size, enum nn_xmsg_kind kind); /* For sending to a particular destination (participant) */ -void nn_xmsg_setdst1 (struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *addr); +void nn_xmsg_setdst1 (struct q_globals *gv, struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *addr); +bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp); /* For sending to a particular proxy reader; this is a convenience routine that extracts a suitable address from the proxy reader's @@ -114,7 +117,13 @@ void nn_xmsg_guid_seq_fragid (const struct nn_xmsg *m, ddsi_guid_t *wrguid, seqn void *nn_xmsg_submsg_from_marker (struct nn_xmsg *msg, struct nn_xmsg_marker marker); void *nn_xmsg_append (struct nn_xmsg *m, struct nn_xmsg_marker *marker, size_t sz); void nn_xmsg_shrink (struct nn_xmsg *m, struct nn_xmsg_marker marker, size_t sz); -void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len); +void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len, struct writer *wr); +#ifdef DDSI_INCLUDE_SECURITY +size_t nn_xmsg_submsg_size (struct nn_xmsg *msg, struct nn_xmsg_marker marker); +void nn_xmsg_submsg_remove (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker); +void nn_xmsg_submsg_replace (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, unsigned char *new_submsg, size_t new_len); +void nn_xmsg_submsg_append_refd_payload (struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker); +#endif void nn_xmsg_submsg_setnext (struct nn_xmsg *msg, struct nn_xmsg_marker marker); void nn_xmsg_submsg_init (struct nn_xmsg *msg, struct nn_xmsg_marker marker, SubmessageKind_t smkind); void nn_xmsg_add_timestamp (struct nn_xmsg *m, nn_wctime_t t); diff --git a/src/core/ddsi/src/ddsi_security_omg.c b/src/core/ddsi/src/ddsi_security_omg.c index 8e04eb3..b471fb7 100644 --- a/src/core/ddsi/src/ddsi_security_omg.c +++ b/src/core/ddsi/src/ddsi_security_omg.c @@ -14,9 +14,73 @@ #include #include "dds/ddsrt/misc.h" +#include "dds/ddsrt/heap.h" +#include "dds/ddsrt/bswap.h" +#include "dds/ddsrt/string.h" +#include "dds/ddsrt/process.h" +#include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_unused.h" +#include "dds/ddsi/q_radmin.h" #include "dds/ddsi/ddsi_security_omg.h" +#include "dds/ddsi/ddsi_sertopic.h" + + + +static bool +q_omg_writer_is_payload_protected( + const struct writer *wr); + + + +static bool endpoint_is_DCPSParticipantSecure(const ddsi_guid_t *guid) +{ + return ((guid->entityid.u == NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER) || + (guid->entityid.u == NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER) ); +} + +static bool endpoint_is_DCPSPublicationsSecure(const ddsi_guid_t *guid) +{ + return ((guid->entityid.u == NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER) || + (guid->entityid.u == NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER) ); +} + +static bool endpoint_is_DCPSSubscriptionsSecure(const ddsi_guid_t *guid) +{ + return ((guid->entityid.u == NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER) || + (guid->entityid.u == NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER) ); +} + +static bool endpoint_is_DCPSParticipantStatelessMessage(const ddsi_guid_t *guid) +{ + return ((guid->entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER) || + (guid->entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER) ); +} + +static bool endpoint_is_DCPSParticipantMessageSecure(const ddsi_guid_t *guid) +{ + return ((guid->entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER) || + (guid->entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER) ); +} + +static bool endpoint_is_DCPSParticipantVolatileMessageSecure(const ddsi_guid_t *guid) +{ +#if 1 + /* TODO: volatile endpoint. */ + DDSRT_UNUSED_ARG(guid); + return false; +#else + return ((guid->entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) || + (guid->entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER) ); +#endif +} + + +bool +q_omg_security_enabled(void) +{ + return false; +} bool q_omg_participant_is_secure( @@ -54,9 +118,18 @@ q_omg_get_writer_security_info( assert(info); /* TODO: Register local writer. */ DDSRT_UNUSED_ARG(wr); + info->plugin_security_attributes = 0; - info->security_attributes = 0; - return false; + if (q_omg_writer_is_payload_protected(wr)) + { + info->security_attributes = NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID| + NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_PAYLOAD_PROTECTED; + } + else + { + info->security_attributes = 0; + } + return true; } bool @@ -75,11 +148,29 @@ q_omg_get_reader_security_info( static bool q_omg_proxyparticipant_is_authenticated( + const struct proxy_participant *proxy_pp) +{ + /* TODO: Handshake */ + DDSRT_UNUSED_ARG(proxy_pp); + return false; +} + +int64_t +q_omg_security_get_local_participant_handle( + struct participant *pp) +{ + /* TODO: Local registration */ + DDSRT_UNUSED_ARG(pp); + return 0; +} + +int64_t +q_omg_security_get_remote_participant_handle( struct proxy_participant *proxypp) { /* TODO: Handshake */ DDSRT_UNUSED_ARG(proxypp); - return false; + return 0; } unsigned @@ -105,7 +196,7 @@ determine_publication_writer( } bool -allow_proxy_participant_deletion( +is_proxy_participant_deletion_allowed( struct q_globals * const gv, const struct ddsi_guid *guid, const ddsi_entityid_t pwr_entityid) @@ -130,19 +221,1040 @@ allow_proxy_participant_deletion( return (!q_omg_proxyparticipant_is_authenticated(proxypp)); } +bool +q_omg_security_is_remote_rtps_protected( + struct proxy_participant *proxy_pp, + ddsi_entityid_t entityid) +{ + /* TODO: Handshake */ + DDSRT_UNUSED_ARG(proxy_pp); + DDSRT_UNUSED_ARG(entityid); + return false; +} + +bool +q_omg_security_is_local_rtps_protected( + struct participant *pp, + ddsi_entityid_t entityid) +{ + /* TODO: Handshake */ + DDSRT_UNUSED_ARG(pp); + DDSRT_UNUSED_ARG(entityid); + return false; +} + +void +set_proxy_participant_security_info( + struct proxy_participant *proxypp, + const nn_plist_t *plist) +{ + assert(proxypp); + assert(plist); + if (plist->present & PP_PARTICIPANT_SECURITY_INFO) { + proxypp->security_info.security_attributes = plist->participant_security_info.security_attributes; + proxypp->security_info.plugin_security_attributes = plist->participant_security_info.plugin_security_attributes; + } else { + proxypp->security_info.security_attributes = 0; + proxypp->security_info.plugin_security_attributes = 0; + } +} + +static void +q_omg_get_proxy_endpoint_security_info( + const struct entity_common *entity, + nn_security_info_t *proxypp_sec_info, + const nn_plist_t *plist, + nn_security_info_t *info) +{ + bool proxypp_info_available; + + proxypp_info_available = (proxypp_sec_info->security_attributes != 0) || + (proxypp_sec_info->plugin_security_attributes != 0); + + /* + * If Security info is present, use that. + * Otherwise, use the specified values for the secure builtin endpoints. + * (Table 20 – EndpointSecurityAttributes for all "Builtin Security Endpoints") + * Otherwise, reset. + */ + if (plist->present & PP_ENDPOINT_SECURITY_INFO) + { + info->security_attributes = plist->endpoint_security_info.security_attributes; + info->plugin_security_attributes = plist->endpoint_security_info.plugin_security_attributes; + } + else if (endpoint_is_DCPSParticipantSecure(&(entity->guid)) || + endpoint_is_DCPSPublicationsSecure(&(entity->guid)) || + endpoint_is_DCPSSubscriptionsSecure(&(entity->guid)) ) + { + info->plugin_security_attributes = NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; + info->security_attributes = NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; + if (proxypp_info_available) + { + if (proxypp_sec_info->security_attributes & NN_PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_DISCOVERY_PROTECTED) + { + info->security_attributes |= NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED; + } + if (proxypp_sec_info->plugin_security_attributes & NN_PLUGIN_PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_DISCOVERY_ENCRYPTED) + { + info->plugin_security_attributes |= NN_PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; + } + if (proxypp_sec_info->plugin_security_attributes & NN_PLUGIN_PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_DISCOVERY_AUTHENTICATED) + { + info->plugin_security_attributes |= NN_PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ORIGIN_AUTHENTICATED; + } + } + else + { + /* No participant info: assume hardcoded OpenSplice V6.10.0 values. */ + info->security_attributes |= NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED; + info->plugin_security_attributes |= NN_PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; + } + } + else if (endpoint_is_DCPSParticipantMessageSecure(&(entity->guid))) + { + info->plugin_security_attributes = NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; + info->security_attributes = NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; + if (proxypp_info_available) + { + if (proxypp_sec_info->security_attributes & NN_PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_LIVELINESS_PROTECTED) + { + info->security_attributes |= NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED; + } + if (proxypp_sec_info->plugin_security_attributes & NN_PLUGIN_PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_LIVELINESS_ENCRYPTED) + { + info->plugin_security_attributes |= NN_PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; + } + if (proxypp_sec_info->plugin_security_attributes & NN_PLUGIN_PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_LIVELINESS_AUTHENTICATED) + { + info->plugin_security_attributes |= NN_PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ORIGIN_AUTHENTICATED; + } + } + else + { + /* No participant info: assume hardcoded OpenSplice V6.10.0 values. */ + info->security_attributes |= NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED; + info->plugin_security_attributes |= NN_PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; + } + } + else if (endpoint_is_DCPSParticipantStatelessMessage(&(entity->guid))) + { + info->security_attributes = NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID; + info->plugin_security_attributes = 0; + } + else if (endpoint_is_DCPSParticipantVolatileMessageSecure(&(entity->guid))) + { + info->security_attributes = NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID | + NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED; + info->plugin_security_attributes = 0; + } + else + { + info->security_attributes = 0; + info->plugin_security_attributes = 0; + } +} + +void +set_proxy_reader_security_info( + struct proxy_reader *prd, + const nn_plist_t *plist) +{ + assert(prd); + q_omg_get_proxy_endpoint_security_info(&(prd->e), + &(prd->c.proxypp->security_info), + plist, + &(prd->c.security_info)); +} + +void +set_proxy_writer_security_info( + struct proxy_writer *pwr, + const nn_plist_t *plist) +{ + assert(pwr); + q_omg_get_proxy_endpoint_security_info(&(pwr->e), + &(pwr->c.proxypp->security_info), + plist, + &(pwr->c.security_info)); +} + + +static bool +q_omg_security_encode_datareader_submessage( + struct reader *rd, + const ddsi_guid_prefix_t *dst_prefix, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len) +{ + /* TODO: Use proper keys to actually encode (need key-exchange). */ + DDSRT_UNUSED_ARG(rd); + DDSRT_UNUSED_ARG(dst_prefix); + DDSRT_UNUSED_ARG(src_buf); + DDSRT_UNUSED_ARG(src_len); + DDSRT_UNUSED_ARG(dst_buf); + DDSRT_UNUSED_ARG(dst_len); + return false; +} + +static bool +q_omg_security_encode_datawriter_submessage( + struct writer *wr, + const ddsi_guid_prefix_t *dst_prefix, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len) +{ + /* TODO: Use proper keys to actually encode (need key-exchange). */ + DDSRT_UNUSED_ARG(wr); + DDSRT_UNUSED_ARG(dst_prefix); + DDSRT_UNUSED_ARG(src_buf); + DDSRT_UNUSED_ARG(src_len); + DDSRT_UNUSED_ARG(dst_buf); + DDSRT_UNUSED_ARG(dst_len); + return false; +} + +static bool +q_omg_security_decode_submessage( + const ddsi_guid_prefix_t* const src_prefix, + const ddsi_guid_prefix_t* const dst_prefix, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len) +{ + /* TODO: Use proper keys to actually decode (need key-exchange). */ + DDSRT_UNUSED_ARG(src_prefix); + DDSRT_UNUSED_ARG(dst_prefix); + DDSRT_UNUSED_ARG(src_buf); + DDSRT_UNUSED_ARG(src_len); + DDSRT_UNUSED_ARG(dst_buf); + DDSRT_UNUSED_ARG(dst_len); + return false; +} + +static bool +q_omg_security_encode_serialized_payload( + const struct writer *wr, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len) +{ + /* TODO: Use proper keys to actually encode (need key-exchange). */ + DDSRT_UNUSED_ARG(wr); + DDSRT_UNUSED_ARG(src_buf); + DDSRT_UNUSED_ARG(src_len); + DDSRT_UNUSED_ARG(dst_buf); + DDSRT_UNUSED_ARG(dst_len); + return false; +} + +static bool +q_omg_security_decode_serialized_payload( + struct proxy_writer *pwr, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len) +{ + /* TODO: Use proper keys to actually decode (need key-exchange). */ + DDSRT_UNUSED_ARG(pwr); + DDSRT_UNUSED_ARG(src_buf); + DDSRT_UNUSED_ARG(src_len); + DDSRT_UNUSED_ARG(dst_buf); + DDSRT_UNUSED_ARG(dst_len); + return false; +} + +bool +q_omg_security_encode_rtps_message( + int64_t src_handle, + ddsi_guid_t *src_guid, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len, + int64_t dst_handle) +{ + /* TODO: Use proper keys to actually encode (need key-exchange). */ + DDSRT_UNUSED_ARG(src_handle); + DDSRT_UNUSED_ARG(src_guid); + DDSRT_UNUSED_ARG(src_buf); + DDSRT_UNUSED_ARG(src_len); + DDSRT_UNUSED_ARG(dst_buf); + DDSRT_UNUSED_ARG(dst_len); + DDSRT_UNUSED_ARG(dst_handle); + return false; +} + +static bool +q_omg_security_decode_rtps_message( + struct proxy_participant *proxypp, + const unsigned char *src_buf, + const unsigned int src_len, + unsigned char **dst_buf, + unsigned int *dst_len) +{ + /* TODO: Use proper keys to actually decode (need key-exchange). */ + DDSRT_UNUSED_ARG(proxypp); + DDSRT_UNUSED_ARG(src_buf); + DDSRT_UNUSED_ARG(src_len); + DDSRT_UNUSED_ARG(dst_buf); + DDSRT_UNUSED_ARG(dst_len); + return false; +} + +static bool +q_omg_writer_is_payload_protected( + const struct writer *wr) +{ + /* TODO: Local registration. */ + DDSRT_UNUSED_ARG(wr); + return false; +} + +static bool +q_omg_writer_is_submessage_protected( + struct writer *wr) +{ + /* TODO: Local registration. */ + DDSRT_UNUSED_ARG(wr); + return false; +} + +static bool +q_omg_reader_is_submessage_protected( + struct reader *rd) +{ + /* TODO: Local registration. */ + DDSRT_UNUSED_ARG(rd); + return false; +} + +bool +encode_payload( + struct writer *wr, + ddsrt_iovec_t *vec, + unsigned char **buf) +{ + bool ok = true; + *buf = NULL; + if (q_omg_writer_is_payload_protected(wr)) + { + /* Encrypt the data. */ + unsigned char *enc_buf; + unsigned int enc_len; + ok = q_omg_security_encode_serialized_payload( + wr, + vec->iov_base, + (unsigned int)vec->iov_len, + &enc_buf, + &enc_len); + if (ok) + { + /* Replace the iov buffer, which should always be aliased. */ + vec->iov_base = (char *)enc_buf; + vec->iov_len = enc_len; + /* Remember the pointer to be able to free the memory. */ + *buf = enc_buf; + } + } + return ok; +} + + +static bool +decode_payload( + const struct q_globals *gv, + struct nn_rsample_info *sampleinfo, + unsigned char *payloadp, + uint32_t *payloadsz, + size_t *submsg_len) +{ + bool ok = true; + + assert(payloadp); + assert(payloadsz); + assert(*payloadsz); + assert(submsg_len); + assert(sampleinfo); + + if (sampleinfo->pwr == NULL) + { + /* No specified proxy writer means no encoding. */ + return true; + } + + /* Only decode when the attributes tell us so. */ + if ((sampleinfo->pwr->c.security_info.security_attributes & NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_PAYLOAD_PROTECTED) + == NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_PAYLOAD_PROTECTED) + { + unsigned char *dst_buf = NULL; + unsigned int dst_len = 0; + + /* Decrypt the payload. */ + if (q_omg_security_decode_serialized_payload(sampleinfo->pwr, payloadp, *payloadsz, &dst_buf, &dst_len)) + { + /* Expect result to always fit into the original buffer. */ + assert(*payloadsz >= dst_len); + + /* Reduce submessage and payload lengths. */ + *submsg_len -= (*payloadsz - dst_len); + *payloadsz = dst_len; + + /* Replace the encrypted payload with the decrypted. */ + memcpy(payloadp, dst_buf, dst_len); + ddsrt_free(dst_buf); + } + else + { + GVWARNING("decode_payload: failed to decrypt data from "PGUIDFMT"", PGUID (sampleinfo->pwr->e.guid)); + ok = false; + } + } + + return ok; +} + +bool +decode_Data( + const struct q_globals *gv, + struct nn_rsample_info *sampleinfo, + unsigned char *payloadp, + uint32_t payloadsz, + size_t *submsg_len) +{ + int ok = true; + /* Only decode when there's actual data. */ + if (payloadp && (payloadsz > 0)) + { + ok = decode_payload(gv, sampleinfo, payloadp, &payloadsz, submsg_len); + if (ok) + { + /* It's possible that the payload size (and thus the sample size) has been reduced. */ + sampleinfo->size = payloadsz; + } + } + return ok; +} + +bool +decode_DataFrag( + const struct q_globals *gv, + struct nn_rsample_info *sampleinfo, + unsigned char *payloadp, + uint32_t payloadsz, + size_t *submsg_len) +{ + int ok = true; + /* Only decode when there's actual data. */ + if (payloadp && (payloadsz > 0)) + { + ok = decode_payload(gv, sampleinfo, payloadp, &payloadsz, submsg_len); + /* Do not touch the sampleinfo->size in contradiction to decode_Data() (it has been calculated differently). */ + } + return ok; +} + + +void +encode_datareader_submsg( + struct nn_xmsg *msg, + struct nn_xmsg_marker sm_marker, + struct proxy_writer *pwr, + const struct ddsi_guid *rd_guid) +{ + /* Only encode when needed. */ + if (q_omg_security_enabled()) + { + struct reader *rd = ephash_lookup_reader_guid(pwr->e.gv->guid_hash, rd_guid); + if (rd) + { + if (q_omg_reader_is_submessage_protected(rd)) + { + unsigned char *src_buf; + unsigned int src_len; + unsigned char *dst_buf; + unsigned int dst_len; + + /* Make one blob of the current sub-message by appending the serialized payload. */ + nn_xmsg_submsg_append_refd_payload(msg, sm_marker); + + /* Get the sub-message buffer. */ + src_buf = (unsigned char*)nn_xmsg_submsg_from_marker(msg, sm_marker); + src_len = (unsigned int)nn_xmsg_submsg_size(msg, sm_marker); + + /* Do the actual encryption. */ + if (q_omg_security_encode_datareader_submessage(rd, &(pwr->e.guid.prefix), src_buf, src_len, &dst_buf, &dst_len)) + { + /* Replace the old sub-message with the new encoded one(s). */ + nn_xmsg_submsg_replace(msg, sm_marker, dst_buf, dst_len); + ddsrt_free(dst_buf); + } + else + { + /* The sub-message should have been encoded, which failed. + * Remove it to prevent it from being send. */ + nn_xmsg_submsg_remove(msg, sm_marker); + } + } + } + } +} + + +void +encode_datawriter_submsg( + struct nn_xmsg *msg, + struct nn_xmsg_marker sm_marker, + struct writer *wr) +{ + /* Only encode when needed. */ + if (q_omg_security_enabled()) + { + if (q_omg_writer_is_submessage_protected(wr)) + { + unsigned char *src_buf; + unsigned int src_len; + unsigned char *dst_buf; + unsigned int dst_len; + ddsi_guid_prefix_t dst_guid_prefix; + ddsi_guid_prefix_t *dst = NULL; + + /* Make one blob of the current sub-message by appending the serialized payload. */ + nn_xmsg_submsg_append_refd_payload(msg, sm_marker); + + /* Get the sub-message buffer. */ + src_buf = (unsigned char*)nn_xmsg_submsg_from_marker(msg, sm_marker); + src_len = (unsigned int)nn_xmsg_submsg_size(msg, sm_marker); + + if (nn_xmsg_getdst1prefix(msg, &dst_guid_prefix)) + { + dst = &dst_guid_prefix; + } + + /* Do the actual encryption. */ + if (q_omg_security_encode_datawriter_submessage(wr, dst, src_buf, src_len, &dst_buf, &dst_len)) + { + /* Replace the old sub-message with the new encoded one(s). */ + nn_xmsg_submsg_replace(msg, sm_marker, dst_buf, dst_len); + ddsrt_free(dst_buf); + } + else + { + /* The sub-message should have been encoded, which failed. + * Remove it to prevent it from being send. */ + nn_xmsg_submsg_remove(msg, sm_marker); + } + } + } +} + + + +bool +validate_msg_decoding( + const struct entity_common *e, + const struct proxy_endpoint_common *c, + struct proxy_participant *proxypp, + struct receiver_state *rst, + SubmessageKind_t prev_smid) +{ + assert(e); + assert(c); + assert(proxypp); + assert(rst); + + /* If this endpoint is expected to have submessages protected, it means that the + * previous submessage id (prev_smid) has to be SMID_SEC_PREFIX. That caused the + * protected submessage to be copied into the current RTPS message as a clear + * submessage, which we are currently handling. + * However, we have to check if the prev_smid is actually SMID_SEC_PREFIX, otherwise + * a rascal can inject data as just a clear submessage. */ + if ((c->security_info.security_attributes & NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED) + == NN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED) + { + if (prev_smid != SMID_SEC_PREFIX) + { + return false; + } + } + + /* At this point, we should also check if the complete RTPS message was encoded when + * that is expected. */ + if (q_omg_security_is_remote_rtps_protected(proxypp, e->guid.entityid) && !rst->rtps_encoded) + { + return 0; + } + + return true; +} + +static int +validate_submsg(struct q_globals *gv, unsigned char smid, unsigned char *submsg, unsigned char * const end, int byteswap) +{ + int result = -1; + if ((submsg + RTPS_SUBMESSAGE_HEADER_SIZE) <= end) + { + SubmessageHeader_t *hdr = (SubmessageHeader_t*)submsg; + if ((smid == 0 /* don't care */) || (hdr->submessageId == smid)) + { + unsigned short size = hdr->octetsToNextHeader; + if (byteswap) + { + size = ddsrt_bswap2u(size); + } + result = (int)size + (int)RTPS_SUBMESSAGE_HEADER_SIZE; + if ((submsg + result) > end) + { + result = -1; + } + } + else + { + GVWARNING("Unexpected submsg 0x%02x (0x%02x expected)", hdr->submessageId, smid); + } + } + else + { + GVWARNING("Submsg 0x%02x does not fit message", smid); + } + return result; +} + + +static int +padding_submsg(struct q_globals *gv, unsigned char *start, unsigned char *end, int byteswap) +{ + SubmessageHeader_t *padding = (SubmessageHeader_t*)start; + size_t size = (size_t)(end - start); + int result = -1; + + assert(start <= end); + + if (size > sizeof(SubmessageHeader_t)) + { + result = (int)size; + padding->submessageId = SMID_PAD; + padding->flags = (byteswap ? !(DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN) : (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN)); + padding->octetsToNextHeader = (unsigned short)(size - sizeof(SubmessageHeader_t)); + if (byteswap) + { + padding->octetsToNextHeader = ddsrt_bswap2u(padding->octetsToNextHeader); + } + } + else + { + GVWARNING("Padding submessage doesn't fit"); + } + return result; +} + +int +decode_SecPrefix( + struct receiver_state *rst, + unsigned char *submsg, + size_t submsg_size, + unsigned char * const msg_end, + const ddsi_guid_prefix_t * const src_prefix, + const ddsi_guid_prefix_t * const dst_prefix, + int byteswap) +{ + int result = -1; + int totalsize = (int)submsg_size; + unsigned char *body_submsg; + unsigned char *prefix_submsg; + unsigned char *postfix_submsg; + SubmessageHeader_t *hdr = (SubmessageHeader_t*)submsg; + uint8_t flags = hdr->flags; + + if (byteswap) + { + if ((DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN)) + hdr->flags |= 0x01; + else + hdr->flags &= 0xFE; + } + + /* First sub-message is the SEC_PREFIX. */ + prefix_submsg = submsg; + + /* Next sub-message is SEC_BODY when encrypted or the original submessage when only signed. */ + body_submsg = submsg + submsg_size; + result = validate_submsg(rst->gv, 0 /* don't care smid */, body_submsg, msg_end, byteswap); + if (result > 0) + { + totalsize += result; + + /* Third sub-message should be the SEC_POSTFIX. */ + postfix_submsg = submsg + totalsize; + result = validate_submsg(rst->gv, SMID_SEC_POSTFIX, postfix_submsg, msg_end, byteswap); + if (result > 0) + { + bool decoded; + unsigned char *dst_buf; + unsigned int dst_len; + + totalsize += result; + + /* Decode all three submessages. */ + decoded = q_omg_security_decode_submessage(src_prefix, dst_prefix, submsg, (unsigned int)totalsize, &dst_buf, &dst_len); + if (decoded && dst_buf) + { + /* + * The 'normal' submessage sequence handling will continue after the + * given security SEC_PREFIX. + */ + if (*body_submsg == SMID_SEC_BODY) + { + /* + * Copy the decoded buffer into the original message, replacing (part + * of) SEC_BODY. + * + * By replacing the SEC_BODY with the decoded submessage, everything + * can continue as if there was never an encoded submessage. + */ + assert((int)dst_len <= ((int)totalsize - (int)submsg_size)); + memcpy(body_submsg, dst_buf, dst_len); + + /* Remainder of SEC_BODY & SEC_POSTFIX should be padded to keep the submsg sequence going. */ + result = padding_submsg(rst->gv, body_submsg + dst_len, prefix_submsg + totalsize, byteswap); + } + else + { + /* + * When only signed, then the submessage is already available and + * SMID_SEC_POSTFIX will be ignored. + * So, we don't really have to do anything. + */ + } + ddsrt_free(dst_buf); + } + else + { + /* + * Decoding or signing failed. + * + * Replace the security submessages with padding. This also removes a plain + * submessage when a signature check failed. + */ + result = padding_submsg(rst->gv, body_submsg, prefix_submsg + totalsize, byteswap); + } + } + } + /* Restore flags. */ + hdr->flags = flags; + return result; +} + +static nn_rtps_msg_state_t +check_rtps_message_is_secure( + struct q_globals *gv, + Header_t *hdr, + unsigned char *buff, + bool isstream, + struct proxy_participant **proxypp) +{ + nn_rtps_msg_state_t ret = NN_RTPS_MSG_STATE_ERROR; + + SubmessageHeader_t *submsg; + uint32_t offset = RTPS_MESSAGE_HEADER_SIZE + (isstream ? sizeof(MsgLen_t) : 0); + + submsg = (SubmessageHeader_t *)(buff + offset); + if (submsg->submessageId == SMID_SRTPS_PREFIX) + { + ddsi_guid_t guid; + + guid.prefix = hdr->guid_prefix; + guid.entityid.u = NN_ENTITYID_PARTICIPANT; + + GVTRACE(" from "PGUIDFMT, PGUID(guid)); + + *proxypp = ephash_lookup_proxy_participant_guid(gv->guid_hash, &guid); + if (*proxypp) + { + if (q_omg_proxyparticipant_is_authenticated(*proxypp)) + { + ret = NN_RTPS_MSG_STATE_ENCODED; + } + else + { + GVTRACE ("received encoded rtps message from unauthenticated participant"); + } + } + else + { + GVTRACE ("received encoded rtps message from unknown participant"); + } + GVTRACE("\n"); + } + else + { + ret = NN_RTPS_MSG_STATE_PLAIN; + } + + return ret; +} + +nn_rtps_msg_state_t +decode_rtps_message( + struct thread_state1 * const ts1, + struct q_globals *gv, + struct nn_rmsg **rmsg, + Header_t **hdr, + unsigned char **buff, + ssize_t *sz, + struct nn_rbufpool *rbpool, + bool isstream) +{ + nn_rtps_msg_state_t ret = NN_RTPS_MSG_STATE_ERROR; + struct proxy_participant *proxypp = NULL; + unsigned char *dstbuf; + unsigned char *srcbuf; + uint32_t srclen, dstlen; + bool decoded; + + /* Currently the decode_rtps_message returns a new allocated buffer. + * This could be optimized by providing a pre-allocated nn_rmsg buffer to + * copy the decoded rtps message in. + */ + thread_state_awake_fixed_domain (ts1); + ret = check_rtps_message_is_secure(gv, *hdr, *buff, isstream, &proxypp); + if (ret == NN_RTPS_MSG_STATE_ENCODED) + { + if (isstream) + { + /* Remove MsgLen Submessage which was only needed for a stream to determine the end of the message */ + srcbuf = *buff + sizeof(MsgLen_t); + srclen = (uint32_t)((size_t)(*sz) - sizeof(MsgLen_t)); + memmove(srcbuf, *buff, RTPS_MESSAGE_HEADER_SIZE); + } + else + { + srcbuf = *buff; + srclen = (uint32_t)*sz; + } + + decoded = q_omg_security_decode_rtps_message(proxypp, srcbuf, srclen, &dstbuf, &dstlen); + if (decoded) + { + nn_rmsg_commit (*rmsg); + *rmsg = nn_rmsg_new (rbpool); + + *buff = (unsigned char *) NN_RMSG_PAYLOAD (*rmsg); + + memcpy(*buff, dstbuf, dstlen); + nn_rmsg_setsize (*rmsg, dstlen); + + ddsrt_free(dstbuf); + + *hdr = (Header_t*) *buff; + (*hdr)->guid_prefix = nn_ntoh_guid_prefix ((*hdr)->guid_prefix); + *sz = (ssize_t)dstlen; + } else { + ret = NN_RTPS_MSG_STATE_ERROR; + } + } + thread_state_asleep (ts1); + return ret; +} + +ssize_t +secure_conn_write( + ddsi_tran_conn_t conn, + const nn_locator_t *dst, + size_t niov, + const ddsrt_iovec_t *iov, + uint32_t flags, + MsgLen_t *msg_len, + bool dst_one, + nn_msg_sec_info_t *sec_info, + ddsi_tran_write_fn_t conn_write_cb) +{ + ssize_t ret = -1; + + unsigned i; + Header_t *hdr; + ddsi_guid_t guid; + unsigned char stbuf[2048]; + unsigned char *srcbuf; + unsigned char *dstbuf = NULL; + uint32_t srclen, dstlen; + int64_t dst_handle = 0; + + assert(iov); + assert(conn); + assert(msg_len); + assert(sec_info); + assert(niov > 0); + assert(conn_write_cb); + + if (dst_one) + { + dst_handle = sec_info->dst_pp_handle; + if (dst_handle == 0) { + return -1; + } + } + + hdr = (Header_t *)iov[0].iov_base; + guid.prefix = nn_ntoh_guid_prefix(hdr->guid_prefix); + guid.entityid.u = NN_ENTITYID_PARTICIPANT; + + /* first determine the size of the message, then select the + * on-stack buffer or allocate one on the heap ... + */ + srclen = 0; + for (i = 0; i < (unsigned)niov; i++) + { + /* Do not copy MsgLen submessage in case of a stream connection */ + if ((i != 1) || !conn->m_stream) + srclen += (uint32_t) iov[i].iov_len; + } + if (srclen <= sizeof (stbuf)) + { + srcbuf = stbuf; + } + else + { + srcbuf = ddsrt_malloc (srclen); + } + + /* ... then copy data into buffer */ + srclen = 0; + for (i = 0; i < (unsigned)niov; i++) + { + if ((i != 1) || !conn->m_stream) + { + memcpy(srcbuf + srclen, iov[i].iov_base, iov[i].iov_len); + srclen += (uint32_t) iov[i].iov_len; + } + } + + if (q_omg_security_encode_rtps_message(sec_info->src_pp_handle, &guid, srcbuf, srclen, &dstbuf, &dstlen, dst_handle)) + { + ddsrt_iovec_t tmp_iov[3]; + size_t tmp_niov; + + if (conn->m_stream) + { + /* Add MsgLen submessage after Header */ + msg_len->length = dstlen + (uint32_t)sizeof(*msg_len); + + tmp_iov[0].iov_base = dstbuf; + tmp_iov[0].iov_len = RTPS_MESSAGE_HEADER_SIZE; + tmp_iov[1].iov_base = (void*) msg_len; + tmp_iov[1].iov_len = sizeof (*msg_len); + tmp_iov[2].iov_base = dstbuf + RTPS_MESSAGE_HEADER_SIZE; + tmp_iov[2].iov_len = dstlen - RTPS_MESSAGE_HEADER_SIZE; + tmp_niov = 3; + } + else + { + msg_len->length = dstlen; + + tmp_iov[0].iov_base = dstbuf; + tmp_iov[0].iov_len = dstlen; + tmp_niov = 1; + } + ret = conn_write_cb (conn, dst, tmp_niov, tmp_iov, flags); + } + + if (srcbuf != stbuf) + { + ddsrt_free (srcbuf); + } + + ddsrt_free(dstbuf); + + return ret; +} #else /* DDSI_INCLUDE_SECURITY */ #include "dds/ddsi/ddsi_security_omg.h" -extern inline bool q_omg_participant_is_secure(UNUSED_ARG(const struct participant *pp)); +extern inline bool q_omg_security_enabled(void); -extern inline unsigned determine_subscription_writer(UNUSED_ARG(const struct reader *rd)); -extern inline unsigned determine_publication_writer(UNUSED_ARG(const struct writer *wr)); +extern inline bool q_omg_participant_is_secure( + UNUSED_ARG(const struct participant *pp)); -extern inline bool allow_proxy_participant_deletion( +extern inline unsigned determine_subscription_writer( + UNUSED_ARG(const struct reader *rd)); + +extern inline unsigned determine_publication_writer( + UNUSED_ARG(const struct writer *wr)); + +extern inline bool is_proxy_participant_deletion_allowed( UNUSED_ARG(struct q_globals * const gv), UNUSED_ARG(const struct ddsi_guid *guid), UNUSED_ARG(const ddsi_entityid_t pwr_entityid)); +extern inline void set_proxy_participant_security_info( + UNUSED_ARG(struct proxy_participant *prd), + UNUSED_ARG(const nn_plist_t *plist)); + +extern inline void set_proxy_reader_security_info( + UNUSED_ARG(struct proxy_reader *prd), + UNUSED_ARG(const nn_plist_t *plist)); + +extern inline void set_proxy_writer_security_info( + UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const nn_plist_t *plist)); + +extern inline bool decode_Data( + UNUSED_ARG(const struct q_globals *gv), + UNUSED_ARG(struct nn_rsample_info *sampleinfo), + UNUSED_ARG(unsigned char *payloadp), + UNUSED_ARG(uint32_t payloadsz), + UNUSED_ARG(size_t *submsg_len)); + +extern inline bool decode_DataFrag( + UNUSED_ARG(const struct q_globals *gv), + UNUSED_ARG(struct nn_rsample_info *sampleinfo), + UNUSED_ARG(unsigned char *payloadp), + UNUSED_ARG(uint32_t payloadsz), + UNUSED_ARG(size_t *submsg_len)); + +extern inline void encode_datareader_submsg( + UNUSED_ARG(struct nn_xmsg *msg), + UNUSED_ARG(struct nn_xmsg_marker sm_marker), + UNUSED_ARG(struct proxy_writer *pwr), + UNUSED_ARG(const struct ddsi_guid *rd_guid)); + +extern inline void encode_datawriter_submsg( + UNUSED_ARG(struct nn_xmsg *msg), + UNUSED_ARG(struct nn_xmsg_marker sm_marker), + UNUSED_ARG(struct writer *wr)); + +extern inline bool validate_msg_decoding( + UNUSED_ARG(const struct entity_common *e), + UNUSED_ARG(const struct proxy_endpoint_common *c), + UNUSED_ARG(struct proxy_participant *proxypp), + UNUSED_ARG(struct receiver_state *rst), + UNUSED_ARG(SubmessageKind_t prev_smid)); + +extern inline int decode_SecPrefix( + UNUSED_ARG(struct receiver_state *rst), + UNUSED_ARG(unsigned char *submsg), + UNUSED_ARG(size_t submsg_size), + UNUSED_ARG(unsigned char * const msg_end), + UNUSED_ARG(const ddsi_guid_prefix_t * const src_prefix), + UNUSED_ARG(const ddsi_guid_prefix_t * const dst_prefix), + UNUSED_ARG(int byteswap)); + +extern inline nn_rtps_msg_state_t decode_rtps_message( + UNUSED_ARG(struct thread_state1 * const ts1), + UNUSED_ARG(struct q_globals *gv), + UNUSED_ARG(struct nn_rmsg **rmsg), + UNUSED_ARG(Header_t **hdr), + UNUSED_ARG(unsigned char **buff), + UNUSED_ARG(ssize_t *sz), + UNUSED_ARG(struct nn_rbufpool *rbpool), + UNUSED_ARG(bool isstream)); + #endif /* DDSI_INCLUDE_SECURITY */ diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index d6f2137..11022af 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -213,7 +213,7 @@ int spdp_write (struct participant *pp) terribly important, the msg will grow as needed, address space is essentially meaningless because we only use the message to construct the payload. */ - mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid.prefix, 0, NN_XMSG_KIND_DATA); + mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid, NULL, 0, NN_XMSG_KIND_DATA); nn_plist_init_empty (&ps); ps.present |= PP_PARTICIPANT_GUID | PP_BUILTIN_ENDPOINT_SET | @@ -343,7 +343,7 @@ static int spdp_dispose_unregister_with_wr (struct participant *pp, unsigned ent return 0; } - mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid.prefix, 0, NN_XMSG_KIND_DATA); + mpayload = nn_xmsg_new (pp->e.gv->xmsgpool, &pp->e.guid, NULL, 0, NN_XMSG_KIND_DATA); nn_plist_init_empty (&ps); ps.present |= PP_PARTICIPANT_GUID; ps.participant_guid = pp->e.guid; @@ -437,7 +437,7 @@ static int handle_SPDP_dead (const struct receiver_state *rst, ddsi_entityid_t p guid = datap->participant_guid; GVLOGDISC (" %"PRIx32":%"PRIx32":%"PRIx32":%"PRIx32, PGUID (guid)); assert (guid.entityid.u == NN_ENTITYID_PARTICIPANT); - if (allow_proxy_participant_deletion(gv, &guid, pwr_entityid)) + if (is_proxy_participant_deletion_allowed(gv, &guid, pwr_entityid)) { if (delete_proxy_participant_by_guid (gv, &guid, timestamp, 0) < 0) { @@ -999,7 +999,7 @@ static int sedp_write_endpoint the QoS and other settings. So the header fields aren't really important, except that they need to be set to reasonable things or it'll crash */ - mpayload = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_DATA); + mpayload = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, NULL, 0, NN_XMSG_KIND_DATA); nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0); if (xqos) nn_xqos_addtomsg (mpayload, xqos, qosdiff); nn_xmsg_addpar_sentinel (mpayload); @@ -1467,7 +1467,7 @@ int sedp_write_topic (struct participant *pp, const struct nn_plist *datap) sedp_wr = get_sedp_writer (pp, NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER); - mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid.prefix, 0, NN_XMSG_KIND_DATA); + mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid, NULL, 0, NN_XMSG_KIND_DATA); delta = nn_xqos_delta (&datap->qos, &sedp_wr->e.gv->default_xqos_tp, ~(uint64_t)0); if (sedp_wr->e.gv->config.explicitly_publish_qos_set_to_default) delta |= ~QP_UNRECOGNIZED_INCOMPATIBLE_MASK; @@ -1505,7 +1505,7 @@ int sedp_write_cm_participant (struct participant *pp, int alive) the QoS and other settings. So the header fields aren't really important, except that they need to be set to reasonable things or it'll crash */ - mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid.prefix, 0, NN_XMSG_KIND_DATA); + mpayload = nn_xmsg_new (sedp_wr->e.gv->xmsgpool, &sedp_wr->e.guid, NULL, 0, NN_XMSG_KIND_DATA); nn_plist_init_empty (&ps); ps.present = PP_PARTICIPANT_GUID; ps.participant_guid = pp->e.guid; diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index e4c020d..89de419 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2119,7 +2119,7 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader ddsrt_avl_insert_ipath (&pwr_readers_treedef, &pwr->readers, m, &path); local_reader_ary_insert(&pwr->rdary, rd); ddsrt_mutex_unlock (&pwr->e.lock); - qxev_pwr_entityid (pwr, &rd->e.guid.prefix); + qxev_pwr_entityid (pwr, &rd->e.guid); ELOGDISC (pwr, "\n"); @@ -2163,7 +2163,7 @@ static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer PGUID (wr->e.guid), PGUID (prd->e.guid)); ddsrt_avl_insert_ipath (&prd_writers_treedef, &prd->writers, m, &path); ddsrt_mutex_unlock (&prd->e.lock); - qxev_prd_entityid (prd, &wr->e.guid.prefix); + qxev_prd_entityid (prd, &wr->e.guid); } } @@ -3974,6 +3974,8 @@ void new_proxy_participant nn_xqos_mergein_missing (&proxypp->plist->qos, &gv->default_plist_pp.qos, ~(uint64_t)0); ddsrt_avl_init (&proxypp_groups_treedef, &proxypp->groups); + set_proxy_participant_security_info(proxypp, plist); + if (custom_flags & CF_INC_KERNEL_SEQUENCE_NUMBERS) proxypp->kernel_sequence_numbers = 1; else @@ -4289,6 +4291,11 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en else memset (&c->group_guid, 0, sizeof (c->group_guid)); +#ifdef DDSI_INCLUDE_SECURITY + c->security_info.security_attributes = 0; + c->security_info.plugin_security_attributes = 0; +#endif + ref_proxy_participant (proxypp, c); } @@ -4394,6 +4401,8 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons pwr->ddsi2direct_cb = 0; pwr->ddsi2direct_cbarg = 0; + set_proxy_writer_security_info(pwr, plist); + local_reader_ary_init (&pwr->rdary); /* locking the entity prevents matching while the built-in topic hasn't been published yet */ @@ -4437,7 +4446,7 @@ void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid); if (rd) { - qxev_pwr_entityid (pwr, &rd->e.guid.prefix); + qxev_pwr_entityid (pwr, &rd->e.guid); } m = ddsrt_avl_iter_next (&iter); } @@ -4494,7 +4503,7 @@ void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset ddsrt_mutex_lock (&wr->e.lock); rebuild_writer_addrset (wr); ddsrt_mutex_unlock (&wr->e.lock); - qxev_prd_entityid (prd, &wr->e.guid.prefix); + qxev_prd_entityid (prd, &wr->e.guid); } wrguid = guid_next; ddsrt_mutex_lock (&prd->e.lock); @@ -4582,6 +4591,8 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons #endif prd->is_fict_trans_reader = 0; + set_proxy_reader_security_info(prd, plist); + ddsrt_avl_init (&prd_writers_treedef, &prd->writers); /* locking the entity prevents matching while the built-in topic hasn't been published yet */ diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index eec9837..22a7543 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -53,6 +53,7 @@ #include "dds/ddsi/ddsi_mcgroup.h" #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata_default.h" /* FIXME: get rid of this */ +#include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/sysdeps.h" #include "dds__whc.h" @@ -278,7 +279,34 @@ static void set_sampleinfo_proxy_writer (struct nn_rsample_info *sampleinfo, dds sampleinfo->pwr = pwr; } -static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, Data_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp) +static int set_sampleinfo_bswap (struct nn_rsample_info *sampleinfo, struct CDRHeader *hdr) +{ + if (hdr) + { + switch (hdr->identifier) + { + case CDR_BE: + case PL_CDR_BE: + { + sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN) ? 1 : 0; + break; + } + case CDR_LE: + case PL_CDR_LE: + { + sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN) ? 0 : 1; + break; + } + default: + { + return 0; + } + } + } + return 1; +} + +static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, Data_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp, uint32_t *payloadsz) { /* on success: sampleinfo->{seq,rst,statusinfo,pt_wr_info_zoff,bswap,complex_qos} all set */ ddsi_guid_t pwr_guid; @@ -354,6 +382,7 @@ static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, D { /*TRACE (("no payload\n"));*/ *payloadp = NULL; + *payloadsz = 0; sampleinfo->size = 0; } else if ((size_t) ((char *) ptr + 4 - (char *) msg) > size) @@ -363,35 +392,16 @@ static int valid_Data (const struct receiver_state *rst, struct nn_rmsg *rmsg, D } else { - struct CDRHeader *hdr; sampleinfo->size = (uint32_t) ((char *) msg + size - (char *) ptr); + *payloadsz = sampleinfo->size; *payloadp = ptr; - hdr = (struct CDRHeader *) ptr; - switch (hdr->identifier) - { - case CDR_BE: - case PL_CDR_BE: - { - sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 1 : 0); - break; - } - case CDR_LE: - case PL_CDR_LE: - { - sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 0 : 1); - break; - } - default: - return 0; - } } return 1; } -static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rmsg, DataFrag_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp) +static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rmsg, DataFrag_t *msg, size_t size, int byteswap, struct nn_rsample_info *sampleinfo, unsigned char **payloadp, uint32_t *payloadsz) { /* on success: sampleinfo->{rst,statusinfo,pt_wr_info_zoff,bswap,complex_qos} all set */ - uint32_t payloadsz; ddsi_guid_t pwr_guid; unsigned char *ptr; @@ -473,18 +483,17 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms } *payloadp = ptr; - payloadsz = (uint32_t) ((char *) msg + size - (char *) ptr); - if ((uint32_t) msg->fragmentsInSubmessage * msg->fragmentSize <= payloadsz) + *payloadsz = (uint32_t) ((char *) msg + size - (char *) ptr); + if ((uint32_t) msg->fragmentsInSubmessage * msg->fragmentSize <= (*payloadsz)) ; /* all spec'd fragments fit in payload */ - else if ((uint32_t) (msg->fragmentsInSubmessage - 1) * msg->fragmentSize >= payloadsz) + else if ((uint32_t) (msg->fragmentsInSubmessage - 1) * msg->fragmentSize >= (*payloadsz)) return 0; /* I can live with a short final fragment, but _only_ the final one */ - else if ((uint32_t) (msg->fragmentStartingNum - 1) * msg->fragmentSize + payloadsz >= msg->sampleSize) + else if ((uint32_t) (msg->fragmentStartingNum - 1) * msg->fragmentSize + (*payloadsz) >= msg->sampleSize) ; /* final fragment is long enough to cover rest of sample */ else return 0; if (msg->fragmentStartingNum == 1) { - struct CDRHeader *hdr = (struct CDRHeader *) ptr; if ((size_t) ((char *) ptr + 4 - (char *) msg) > size) { /* no space for the header -- technically, allowing small @@ -492,23 +501,6 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms prefer this */ return 0; } - switch (hdr->identifier) - { - case CDR_BE: - case PL_CDR_BE: - { - sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 1 : 0); - break; - } - case CDR_LE: - case PL_CDR_LE: - { - sampleinfo->bswap = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? 0 : 1); - break; - } - default: - return 0; - } } return 1; } @@ -527,6 +519,7 @@ static int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader gap->gapList.numbits = numbits; memcpy (gap->bits, bits, NN_SEQUENCE_NUMBER_SET_BITS_SIZE (numbits)); nn_xmsg_submsg_setnext (msg, sm_marker); + encode_datawriter_submsg(msg, sm_marker, wr); return 0; } @@ -537,7 +530,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state * ASSERT_MUTEX_HELD (&wr->e.lock); assert (wr->reliable); - m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_CONTROL); + m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL); if (nn_xmsg_setdstPRD (m, prd) < 0) { /* If we don't have an address, give up immediately */ @@ -610,7 +603,7 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_cou return 1; } -static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const AckNack_t *msg, nn_wctime_t timestamp) +static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const AckNack_t *msg, nn_wctime_t timestamp, SubmessageKind_t prev_smid) { struct proxy_reader *prd; struct wr_prd_match *rn; @@ -666,6 +659,12 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac return 1; } + if (!validate_msg_decoding(&(prd->e), &(prd->c), prd->c.proxypp, rst, prev_smid)) + { + RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst)); + return 1; + } + /* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */ lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow); @@ -934,7 +933,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac RSTTRACE (" XGAP%"PRId64"..%"PRId64"/%u:", gapstart, gapend, gapnumbits); for (uint32_t i = 0; i < gapnumbits; i++) RSTTRACE ("%c", nn_bitset_isset (gapnumbits, gapbits, i) ? '1' : '0'); - m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_CONTROL); + m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL); #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (m, wr->partition_id); #endif @@ -1094,7 +1093,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand } } -static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, nn_wctime_t timestamp) +static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Heartbeat_t *msg, nn_wctime_t timestamp, SubmessageKind_t prev_smid) { /* We now cheat: and process the heartbeat for _all_ readers, always, regardless of the destination address in the Heartbeat @@ -1132,6 +1131,12 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct return 1; } + if (!validate_msg_decoding(&(pwr->e), &(pwr->c), pwr->c.proxypp, rst, prev_smid)) + { + RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst)); + return 1; + } + /* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */ lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow); @@ -1244,7 +1249,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct return 1; } -static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime_t tnow), const HeartbeatFrag_t *msg) +static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime_t tnow), const HeartbeatFrag_t *msg, SubmessageKind_t prev_smid) { const seqno_t seq = fromSN (msg->writerSN); const nn_fragment_number_t fragnum = msg->lastFragmentNum - 1; /* we do 0-based */ @@ -1269,6 +1274,12 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime return 1; } + if (!validate_msg_decoding(&(pwr->e), &(pwr->c), pwr->c.proxypp, rst, prev_smid)) + { + RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst)); + return 1; + } + /* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */ lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow); @@ -1355,7 +1366,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime return 1; } -static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const NackFrag_t *msg) +static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const NackFrag_t *msg, SubmessageKind_t prev_smid) { struct proxy_reader *prd; struct wr_prd_match *rn; @@ -1396,6 +1407,12 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N return 1; } + if (!validate_msg_decoding(&(prd->e), &(prd->c), prd->c.proxypp, rst, prev_smid)) + { + RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst)); + return 1; + } + /* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */ lease_renew (ddsrt_atomic_ldvoidp (&prd->c.proxypp->lease), tnow); @@ -1446,7 +1463,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N static uint32_t zero = 0; struct nn_xmsg *m; RSTTRACE (" msg not available: scheduling Gap\n"); - m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid.prefix, 0, NN_XMSG_KIND_CONTROL); + m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL); #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (m, wr->partition_id); #endif @@ -1590,7 +1607,7 @@ static int handle_one_gap (struct proxy_writer *pwr, struct pwr_rd_match *wn, se return gap_was_valuable; } -static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Gap_t *msg) +static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Gap_t *msg, SubmessageKind_t prev_smid) { /* Option 1: Process the Gap for the proxy writer and all out-of-sync readers: what do I care which reader is being @@ -1643,6 +1660,12 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm return 1; } + if (!validate_msg_decoding(&(pwr->e), &(pwr->c), pwr->c.proxypp, rst, prev_smid)) + { + RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT" clear submsg from protected src)", PGUID (src), PGUID (dst)); + return 1; + } + /* liveliness is still only implemented partially (with all set to AUTOMATIC, BY_PARTICIPANT, &c.), so we simply renew the proxy participant's lease. */ lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->lease), tnow); @@ -2346,7 +2369,7 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con } } -static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup) +static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup, SubmessageKind_t prev_smid) { RSTTRACE ("DATA("PGUIDFMT" -> "PGUIDFMT" #%"PRId64, PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, @@ -2358,6 +2381,15 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r return 1; } + if (sampleinfo->pwr) + { + if (!validate_msg_decoding(&(sampleinfo->pwr->e), &(sampleinfo->pwr->c), sampleinfo->pwr->c.proxypp, rst, prev_smid)) + { + RSTTRACE (" clear submsg from protected src "PGUIDFMT")", PGUID (sampleinfo->pwr->e.guid)); + return 1; + } + } + if (sampleinfo->size > rst->gv->config.max_sample_size) drop_oversize (rst, rmsg, &msg->x, sampleinfo); else @@ -2391,7 +2423,7 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r return 1; } -static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup) +static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup, SubmessageKind_t prev_smid) { RSTTRACE ("DATAFRAG("PGUIDFMT" -> "PGUIDFMT" #%"PRId64"/[%u..%u]", PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, @@ -2404,6 +2436,15 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct return 1; } + if (sampleinfo->pwr) + { + if (!validate_msg_decoding(&(sampleinfo->pwr->e), &(sampleinfo->pwr->c), sampleinfo->pwr->c.proxypp, rst, prev_smid)) + { + RSTTRACE (" clear submsg from protected src "PGUIDFMT")", PGUID (sampleinfo->pwr->e.guid)); + return 1; + } + } + if (sampleinfo->size > rst->gv->config.max_sample_size) drop_oversize (rst, rmsg, &msg->x, sampleinfo); else @@ -2606,7 +2647,8 @@ static int handle_submsg_sequence unsigned char * const msg /* NOT const - we may byteswap it */, const size_t len, unsigned char * submsg /* aliases somewhere in msg */, - struct nn_rmsg * const rmsg + struct nn_rmsg * const rmsg, + bool rtps_encoded /* indicate if the message was rtps encoded */ ) { const char *state; @@ -2618,6 +2660,7 @@ static int handle_submsg_sequence size_t submsg_size = 0; unsigned char * end = msg + len; struct nn_dqueue *deferred_wakeup = NULL; + SubmessageKind_t prev_smid = SMID_PAD; /* Receiver state is dynamically allocated with lifetime bound to the message. Updates cause a new copy to be created if the @@ -2639,6 +2682,7 @@ static int handle_submsg_sequence false at any time. That's ok: it's real purpose is to filter out discovery data accidentally sent by Cloud */ rst->forme = 1; + rst->rtps_encoded = rtps_encoded; rst->vendor = hdr->vendorid; rst->protocol_version = hdr->version; rst->srcloc = *srcloc; @@ -2700,14 +2744,14 @@ static int handle_submsg_sequence state = "parse:acknack"; if (!valid_AckNack (rst, &sm->acknack, submsg_size, byteswap)) goto malformed; - handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID); + handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID, prev_smid); ts_for_latmeas = 0; break; case SMID_HEARTBEAT: state = "parse:heartbeat"; if (!valid_Heartbeat (&sm->heartbeat, submsg_size, byteswap)) goto malformed; - handle_Heartbeat (rst, tnowE, rmsg, &sm->heartbeat, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID); + handle_Heartbeat (rst, tnowE, rmsg, &sm->heartbeat, ts_for_latmeas ? timestamp : NN_WCTIME_INVALID, prev_smid); ts_for_latmeas = 0; break; case SMID_GAP: @@ -2719,7 +2763,7 @@ static int handle_submsg_sequence rst after inserting the gap in the admin. */ if (!valid_Gap (&sm->gap, submsg_size, byteswap)) goto malformed; - handle_Gap (rst, tnowE, rmsg, &sm->gap); + handle_Gap (rst, tnowE, rmsg, &sm->gap, prev_smid); ts_for_latmeas = 0; break; case SMID_INFO_TS: @@ -2763,27 +2807,37 @@ static int handle_submsg_sequence state = "parse:nackfrag"; if (!valid_NackFrag (&sm->nackfrag, submsg_size, byteswap)) goto malformed; - handle_NackFrag (rst, tnowE, &sm->nackfrag); + handle_NackFrag (rst, tnowE, &sm->nackfrag, prev_smid); ts_for_latmeas = 0; break; case SMID_HEARTBEAT_FRAG: state = "parse:heartbeatfrag"; if (!valid_HeartbeatFrag (&sm->heartbeatfrag, submsg_size, byteswap)) goto malformed; - handle_HeartbeatFrag (rst, tnowE, &sm->heartbeatfrag); + handle_HeartbeatFrag (rst, tnowE, &sm->heartbeatfrag, prev_smid); ts_for_latmeas = 0; break; case SMID_DATA_FRAG: state = "parse:datafrag"; { struct nn_rsample_info sampleinfo; + uint32_t datasz = 0; unsigned char *datap; + size_t submsg_len = submsg_size; /* valid_DataFrag does not validate the payload */ - if (!valid_DataFrag (rst, rmsg, &sm->datafrag, submsg_size, byteswap, &sampleinfo, &datap)) + if (!valid_DataFrag (rst, rmsg, &sm->datafrag, submsg_size, byteswap, &sampleinfo, &datap, &datasz)) goto malformed; + /* This only decodes the payload when needed (possibly reducing the submsg size). */ + if (!decode_DataFrag (rst->gv, &sampleinfo, datap, datasz, &submsg_len)) + goto malformed; + /* Set the sample bswap according to the payload info (only first fragment has proper header). */ + if (sm->datafrag.fragmentStartingNum == 1) { + if (!set_sampleinfo_bswap(&sampleinfo, (struct CDRHeader *)datap)) + goto malformed; + } sampleinfo.timestamp = timestamp; sampleinfo.reception_timestamp = tnowWC; - handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_size, &sampleinfo, datap, &deferred_wakeup); + handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_len, &sampleinfo, datap, &deferred_wakeup, prev_smid); rst_live = 1; ts_for_latmeas = 0; } @@ -2793,12 +2847,20 @@ static int handle_submsg_sequence { struct nn_rsample_info sampleinfo; unsigned char *datap; + uint32_t datasz = 0; + size_t submsg_len = submsg_size; /* valid_Data does not validate the payload */ - if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap)) + if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap, &datasz)) + goto malformed; + /* This only decodes the payload when needed (possibly reducing the submsg size). */ + if (!decode_Data (rst->gv, &sampleinfo, datap, datasz, &submsg_len)) + goto malformed; + /* Set the sample bswap according to the payload info. */ + if (!set_sampleinfo_bswap(&sampleinfo, (struct CDRHeader *)datap)) goto malformed; sampleinfo.timestamp = timestamp; sampleinfo.reception_timestamp = tnowWC; - handle_Data (rst, tnowE, rmsg, &sm->data, submsg_size, &sampleinfo, datap, &deferred_wakeup); + handle_Data (rst, tnowE, rmsg, &sm->data, submsg_len, &sampleinfo, datap, &deferred_wakeup, prev_smid); rst_live = 1; ts_for_latmeas = 0; } @@ -2819,6 +2881,38 @@ static int handle_submsg_sequence GVTRACE ("ENTITY_ID"); break; } + case SMID_SEC_PREFIX: + state = "parse:sec_prefix"; + { + GVTRACE ("SEC_PREFIX"); + if (decode_SecPrefix(rst, submsg, submsg_size, end, &rst->src_guid_prefix, &rst->dst_guid_prefix, byteswap) < 0) + goto malformed; + } + break; + case SMID_SEC_BODY: + { + /* Ignore: because it should have been handled by SMID_SEC_PREFIX. */ + GVTRACE ("SEC_BODY"); + } + break; + case SMID_SEC_POSTFIX: + { + /* Ignore: because it should have been handled by SMID_SEC_PREFIX. */ + GVTRACE ("SEC_POSTFIX"); + } + break; + case SMID_SRTPS_PREFIX: + { + /* Ignore: it should already have been handled. */ + GVTRACE ("SRTPS_PREFIX"); + } + break; + case SMID_SRTPS_POSTFIX: + { + /* Ignore: it should already have been handled. */ + GVTRACE ("SRTPS_POSTFIX"); + } + break; default: state = "parse:undefined"; GVTRACE ("UNDEFINED(%x)", sm->smhdr.submessageId); @@ -2849,6 +2943,7 @@ static int handle_submsg_sequence ts_for_latmeas = 0; break; } + prev_smid = state_smkind; submsg += submsg_size; GVTRACE ("\n"); } @@ -2985,8 +3080,36 @@ static bool do_packet (struct thread_state1 * const ts1, struct q_globals *gv, d GVTRACE ("HDR(%"PRIx32":%"PRIx32":%"PRIx32" vendor %d.%d) len %lu from %s\n", PGUIDPREFIX (hdr->guid_prefix), hdr->vendorid.id[0], hdr->vendorid.id[1], (unsigned long) sz, addrstr); } + nn_rtps_msg_state_t res = decode_rtps_message(ts1, + gv, + &rmsg, + &hdr, + &buff, + &sz, + rbpool, + conn->m_stream); - handle_submsg_sequence (ts1, gv, conn, &srcloc, now (), now_et (), &hdr->guid_prefix, guidprefix, buff, (size_t) sz, buff + RTPS_MESSAGE_HEADER_SIZE, rmsg); + if (res != NN_RTPS_MSG_STATE_ERROR) + { + handle_submsg_sequence (ts1, + gv, + conn, + &srcloc, + now (), + now_et (), + &hdr->guid_prefix, + guidprefix, + buff, + (size_t) sz, + buff + RTPS_MESSAGE_HEADER_SIZE, + rmsg, + res == NN_RTPS_MSG_STATE_ENCODED); + } + else + { + /* drop message */ + sz = 1; + } } } nn_rmsg_commit (rmsg); diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index b0061fe..cec9693 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -34,6 +34,7 @@ #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_sertopic.h" +#include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/sysdeps.h" #include "dds__whc.h" @@ -140,7 +141,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru assert (wr->reliable); assert (hbansreq >= 0); - if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL) + if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL) /* out of memory at worst slows down traffic */ return NULL; @@ -218,6 +219,13 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru add_Heartbeat (msg, wr, whcst, hbansreq, prd_guid->entityid, issync); } + /* It is possible that the encoding removed the submessage(s). */ + if (nn_xmsg_size(msg) == 0) + { + nn_xmsg_free (msg); + msg = NULL; + } + writer_hbcontrol_note_hb (wr, tnow, hbansreq); return msg; } @@ -379,6 +387,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta hb->count = ++wr->hbcount; nn_xmsg_submsg_setnext (msg, sm_marker); + encode_datawriter_submsg(msg, sm_marker, wr); } static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg) @@ -411,7 +420,7 @@ static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t s ASSERT_MUTEX_HELD (&wr->e.lock); /* INFO_TS: 12 bytes, Data_t: 24 bytes, expected inline QoS: 32 => should be single chunk */ - if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (Data_t) + expected_inline_qos_size, NN_XMSG_KIND_DATA)) == NULL) + if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTimestamp_t) + sizeof (Data_t) + expected_inline_qos_size, NN_XMSG_KIND_DATA)) == NULL) return DDS_RETCODE_OUT_OF_RESOURCES; #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS @@ -448,9 +457,9 @@ static dds_return_t create_fragment_message_simple (struct writer *wr, seqno_t s #if TEST_KEYHASH if (serdata->kind != SDK_KEY || !wr->include_keyhash) - nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata)); + nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata), wr); #else - nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata)); + nn_xmsg_serdata (*pmsg, serdata, 0, ddsi_serdata_size (serdata), wr); #endif nn_xmsg_submsg_setnext (*pmsg, sm_marker); return 0; @@ -497,7 +506,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru fragging = (gv->config.fragment_size < size); /* INFO_TS: 12 bytes, DataFrag_t: 36 bytes, expected inline QoS: 32 => should be single chunk */ - if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL) + if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL) return DDS_RETCODE_OUT_OF_RESOURCES; #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS @@ -618,7 +627,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru } } - nn_xmsg_serdata (*pmsg, serdata, fragstart, fraglen); + nn_xmsg_serdata (*pmsg, serdata, fragstart, fraglen, wr); nn_xmsg_submsg_setnext (*pmsg, sm_marker); #if 0 GVTRACE ("queue data%s "PGUIDFMT" #%lld/%u[%u..%u)\n", @@ -626,6 +635,15 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru seq, fragnum+1, fragstart, fragstart + fraglen); #endif + encode_datawriter_submsg(*pmsg, sm_marker, wr); + + /* It is possible that the encoding removed the submessage. + * If there is no content, free the message. */ + if (nn_xmsg_size(*pmsg) == 0) { + nn_xmsg_free (*pmsg); + *pmsg = NULL; + } + return ret; } @@ -635,7 +653,7 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn struct nn_xmsg_marker sm_marker; HeartbeatFrag_t *hbf; ASSERT_MUTEX_HELD (&wr->e.lock); - if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid.prefix, sizeof (HeartbeatFrag_t), NN_XMSG_KIND_CONTROL)) == NULL) + if ((*pmsg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (HeartbeatFrag_t), NN_XMSG_KIND_CONTROL)) == NULL) return; /* ignore out-of-memory: HeartbeatFrag is only advisory anyway */ #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (*pmsg, wr->partition_id); @@ -664,6 +682,15 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn hbf->count = ++wr->hbfragcount; nn_xmsg_submsg_setnext (*pmsg, sm_marker); + encode_datawriter_submsg(*pmsg, sm_marker, wr); + + /* It is possible that the encoding removed the submessage. + * If there is no content, free the message. */ + if (nn_xmsg_size(*pmsg) == 0) + { + nn_xmsg_free(*pmsg); + *pmsg = NULL; + } } #if 0 diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 775b2f4..548508b 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -39,6 +39,7 @@ #include "dds/ddsi/q_xmsg.h" #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata_default.h" +#include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds__whc.h" @@ -182,7 +183,7 @@ static void trace_msg (struct xeventq *evq, const char *func, const struct nn_xm { if (dds_get_log_mask() & DDS_LC_TRACE) { - nn_guid_t wrguid; + ddsi_guid_t wrguid; seqno_t wrseq; nn_fragment_number_t wrfragid; nn_xmsg_guid_seq_fragid (m, &wrguid, &wrseq, &wrfragid); @@ -805,6 +806,9 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p base, an->readerSNState.numbits); for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); + + /* Encode the sub-message when needed. */ + encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); } if (nackfrag_numbits > 0) @@ -835,12 +839,15 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); } + + /* Encode the sub-message when needed. */ + encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); } ETRACE (pwr, "\n"); } -static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, nn_mtime_t tnow) +static void handle_xevk_acknack (struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow) { /* FIXME: ought to keep track of which NACKs are being generated in response to a Heartbeat. There is no point in having multiple @@ -872,9 +879,18 @@ static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc)) { seqno_t nack_seq; - if ((msg = nn_xmsg_new (gv->xmsgpool, &ev->u.acknack.rd_guid.prefix, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) + + struct participant *pp = NULL; + if (q_omg_security_enabled()) + { + struct reader *rd = ephash_lookup_reader_guid(pwr->e.gv->guid_hash, &ev->u.acknack.rd_guid); + if (rd) + pp = rd->c.pp; + } + + if ((msg = nn_xmsg_new (gv->xmsgpool, &ev->u.acknack.rd_guid, pp, ACKNACK_SIZE_MAX, NN_XMSG_KIND_CONTROL)) == NULL) goto outofmem; - nn_xmsg_setdst1 (msg, &ev->u.acknack.pwr_guid.prefix, &loc); + nn_xmsg_setdst1 (gv, msg, &ev->u.acknack.pwr_guid.prefix, &loc); if (gv->config.meas_hb_to_ack_latency && rwn->hb_timestamp.v) { /* If HB->ACK latency measurement is enabled, and we have a @@ -886,7 +902,13 @@ static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent rwn->hb_timestamp.v = 0; } add_AckNack (msg, pwr, rwn, &nack_seq); - if (nack_seq) + if (nn_xmsg_size(msg) == 0) + { + /* No AckNack added. */ + nn_xmsg_free(msg); + msg = NULL; + } + else if (nack_seq) { rwn->t_last_nack = tnow; rwn->seq_last_nack = nack_seq; @@ -948,7 +970,7 @@ static bool resend_spdp_sample_by_guid_key (struct writer *wr, const ddsi_guid_t nn_plist_init_empty (&ps); ps.present |= PP_PARTICIPANT_GUID; ps.participant_guid = *guid; - struct nn_xmsg *mpayload = nn_xmsg_new (gv->xmsgpool, &guid->prefix, 0, NN_XMSG_KIND_DATA); + struct nn_xmsg *mpayload = nn_xmsg_new (gv->xmsgpool, guid, wr->c.pp, 0, NN_XMSG_KIND_DATA); nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0); nn_xmsg_addpar_sentinel (mpayload); nn_plist_fini (&ps); @@ -1387,7 +1409,7 @@ void qxev_msg (struct xeventq *evq, struct nn_xmsg *msg) ddsrt_mutex_unlock (&evq->lock); } -void qxev_prd_entityid (struct proxy_reader *prd, ddsi_guid_prefix_t *id) +void qxev_prd_entityid (struct proxy_reader *prd, const ddsi_guid_t *guid) { struct q_globals * const gv = prd->e.gv; struct nn_xmsg *msg; @@ -1397,10 +1419,10 @@ void qxev_prd_entityid (struct proxy_reader *prd, ddsi_guid_prefix_t *id) if (! gv->xevents->tev_conn->m_connless) { - msg = nn_xmsg_new (gv->xmsgpool, id, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL); + msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL); if (nn_xmsg_setdstPRD (msg, prd) == 0) { - GVTRACE (" qxev_prd_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (*id)); + GVTRACE (" qxev_prd_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix)); nn_xmsg_add_entityid (msg); ddsrt_mutex_lock (&gv->xevents->lock); ev = qxev_common_nt (gv->xevents, XEVK_ENTITYID); @@ -1415,7 +1437,7 @@ void qxev_prd_entityid (struct proxy_reader *prd, ddsi_guid_prefix_t *id) } } -void qxev_pwr_entityid (struct proxy_writer *pwr, ddsi_guid_prefix_t *id) +void qxev_pwr_entityid (struct proxy_writer *pwr, const ddsi_guid_t *guid) { struct q_globals * const gv = pwr->e.gv; struct nn_xmsg *msg; @@ -1425,10 +1447,10 @@ void qxev_pwr_entityid (struct proxy_writer *pwr, ddsi_guid_prefix_t *id) if (! pwr->evq->tev_conn->m_connless) { - msg = nn_xmsg_new (gv->xmsgpool, id, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL); + msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL); if (nn_xmsg_setdstPWR (msg, pwr) == 0) { - GVTRACE (" qxev_pwr_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (*id)); + GVTRACE (" qxev_pwr_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix)); nn_xmsg_add_entityid (msg); ddsrt_mutex_lock (&pwr->evq->lock); ev = qxev_common_nt (pwr->evq, XEVK_ENTITYID); diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index 5e704a3..4c73e77 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -41,6 +41,7 @@ #include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_freelist.h" #include "dds/ddsi/ddsi_serdata_default.h" +#include "dds/ddsi/ddsi_security_omg.h" #define NN_XMSG_MAX_ALIGN 8 #define NN_XMSG_CHUNK_SIZE 128 @@ -72,6 +73,11 @@ struct nn_xmsg { int have_params; struct ddsi_serdata *refd_payload; ddsrt_iovec_t refd_payload_iov; +#ifdef DDSI_INCLUDE_SECURITY + /* Used as pointer to contain encoded payload to which iov can alias. */ + unsigned char *refd_payload_encoded; + nn_msg_sec_info_t sec_info; +#endif int64_t maxdelay; #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS uint32_t encoderid; @@ -223,6 +229,9 @@ struct nn_xpack #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS uint32_t encoderId; #endif /* DDSI_INCLUDE_NETWORK_PARTITIONS */ +#ifdef DDSI_INCLUDE_SECURITY + nn_msg_sec_info_t sec_info; +#endif }; static size_t align4u (size_t x) @@ -283,6 +292,12 @@ static void nn_xmsg_reinit (struct nn_xmsg *m, enum nn_xmsg_kind kind) m->dstmode = NN_XMSG_DST_UNSET; m->kind = kind; m->maxdelay = 0; +#ifdef DDSI_INCLUDE_SECURITY + m->refd_payload_encoded = NULL; + m->sec_info.use_rtps_encoding = 0; + m->sec_info.src_pp_handle = 0; + m->sec_info.dst_pp_handle = 0; +#endif #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS m->encoderid = 0; #endif @@ -322,14 +337,29 @@ static struct nn_xmsg *nn_xmsg_allocnew (struct nn_xmsgpool *pool, size_t expect return m; } -struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_prefix_t *src_guid_prefix, size_t expected_size, enum nn_xmsg_kind kind) +struct nn_xmsg *nn_xmsg_new (struct nn_xmsgpool *pool, const ddsi_guid_t *src_guid, struct participant *pp, size_t expected_size, enum nn_xmsg_kind kind) { struct nn_xmsg *m; if ((m = nn_freelist_pop (&pool->freelist)) != NULL) nn_xmsg_reinit (m, kind); else if ((m = nn_xmsg_allocnew (pool, expected_size, kind)) == NULL) return NULL; - m->data->src.guid_prefix = nn_hton_guid_prefix (*src_guid_prefix); + m->data->src.guid_prefix = nn_hton_guid_prefix (src_guid->prefix); + +#ifdef DDSI_INCLUDE_SECURITY + m->sec_info.use_rtps_encoding = 0; + if (pp && q_omg_participant_is_secure(pp)) + { + if (q_omg_security_is_local_rtps_protected(pp, src_guid->entityid)) + { + m->sec_info.use_rtps_encoding = 1; + m->sec_info.src_pp_handle = q_omg_security_get_local_participant_handle(pp); + } + } +#else + DDSRT_UNUSED_ARG(pp); +#endif + return m; } @@ -344,6 +374,9 @@ void nn_xmsg_free (struct nn_xmsg *m) struct nn_xmsgpool *pool = m->pool; if (m->refd_payload) ddsi_serdata_to_ser_unref (m->refd_payload, &m->refd_payload_iov); +#ifdef DDSI_INCLUDE_SECURITY + ddsrt_free(m->refd_payload_encoded); +#endif if (m->dstmode == NN_XMSG_DST_ALL) { unref_addrset (m->dstaddr.all.as); @@ -385,6 +418,13 @@ static int submsg_is_compatible (const struct nn_xmsg *msg, SubmessageKind_t smk case SMID_DATA: case SMID_DATA_FRAG: /* but data is strictly verboten */ return 0; + case SMID_SEC_BODY: + case SMID_SEC_PREFIX: + case SMID_SEC_POSTFIX: + case SMID_SRTPS_PREFIX: + case SMID_SRTPS_POSTFIX: + /* and the security sm are basically data. */ + return 0; } assert (0); break; @@ -406,6 +446,13 @@ static int submsg_is_compatible (const struct nn_xmsg *msg, SubmessageKind_t smk won't work for initial transmits, but those currently don't allow a readerId */ return msg->kindspecific.data.readerId_off == 0; + case SMID_SEC_BODY: + case SMID_SEC_PREFIX: + case SMID_SEC_POSTFIX: + case SMID_SRTPS_PREFIX: + case SMID_SRTPS_POSTFIX: + /* Just do the same as 'normal' data sm. */ + return msg->kindspecific.data.readerId_off == 0; case SMID_ACKNACK: case SMID_HEARTBEAT: case SMID_GAP: @@ -494,6 +541,80 @@ void nn_xmsg_submsg_setnext (struct nn_xmsg *msg, struct nn_xmsg_marker marker) ((unsigned)(msg->data->payload + msg->sz + plsize - (char *) hdr) - RTPS_SUBMESSAGE_HEADER_SIZE); } +#ifdef DDSI_INCLUDE_SECURITY + +size_t nn_xmsg_submsg_size (struct nn_xmsg *msg, struct nn_xmsg_marker marker) +{ + SubmessageHeader_t *hdr = (SubmessageHeader_t*)nn_xmsg_submsg_from_marker(msg, marker); + return align4u(hdr->octetsToNextHeader + sizeof(SubmessageHeader_t)); +} + +void nn_xmsg_submsg_remove(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker) +{ + /* Just reset the message size to the start of the current sub-message. */ + msg->sz = sm_marker.offset; +} + +void nn_xmsg_submsg_replace(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, unsigned char *new_submsg, size_t new_len) +{ + /* Size of current sub-message. */ + size_t old_len = msg->sz - sm_marker.offset; + + /* Adjust the message size to the new sub-message. */ + if (old_len < new_len) + { + nn_xmsg_append(msg, NULL, new_len - old_len); + } + else if (old_len > new_len) + { + nn_xmsg_shrink(msg, sm_marker, new_len); + } + + /* Just a sanity check: assert(msg_end == submsg_end) */ + assert((msg->data->payload + msg->sz) == (msg->data->payload + sm_marker.offset + new_len)); + + /* Replace the sub-message. */ + memcpy(msg->data->payload + sm_marker.offset, new_submsg, new_len); +} + +void nn_xmsg_submsg_append_refd_payload(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker) +{ + DDSRT_UNUSED_ARG(sm_marker); + /* + * Normally, the refd payload pointer is moved around until it is added to + * the iov of the socket. This reduces the amount of allocations and copies. + * + * However, in a few cases (like security), the sub-message should be one + * complete blob. + * Appending the payload will just do that. + */ + if (msg->refd_payload) + { + void *dst; + + /* Get payload information. */ + char *payload_ptr = msg->refd_payload_iov.iov_base; + size_t payload_len = msg->refd_payload_iov.iov_len; + + /* Make space for the payload (dst points to the start of the appended space). */ + dst = nn_xmsg_append(msg, NULL, payload_len); + + /* Copy the payload into the submessage. */ + memcpy(dst, payload_ptr, payload_len); + + /* No need to remember the payload now. */ + ddsi_serdata_unref(msg->refd_payload); + msg->refd_payload = NULL; + if (msg->refd_payload_encoded) + { + ddsrt_free(msg->refd_payload_encoded); + msg->refd_payload_encoded = NULL; + } + } +} + +#endif /* DDSI_INCLUDE_SECURITY */ + void *nn_xmsg_submsg_from_marker (struct nn_xmsg *msg, struct nn_xmsg_marker marker) { return msg->data->payload + marker.offset; @@ -560,22 +681,66 @@ void nn_xmsg_add_entityid (struct nn_xmsg * m) nn_xmsg_submsg_setnext (m, sm); } -void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len) +void nn_xmsg_serdata (struct nn_xmsg *m, struct ddsi_serdata *serdata, size_t off, size_t len, struct writer *wr) { if (serdata->kind != SDK_EMPTY) { size_t len4 = align4u (len); assert (m->refd_payload == NULL); m->refd_payload = ddsi_serdata_to_ser_ref (serdata, off, len4, &m->refd_payload_iov); + +#ifdef DDSI_INCLUDE_SECURITY + assert (m->refd_payload_encoded == NULL); + /* When encoding is necessary, m->refd_payload_encoded will be allocated + * and m->refd_payload_iov contents will change to point to that buffer. + * If no encoding is necessary, nothing changes. */ + if (!encode_payload(wr, &(m->refd_payload_iov), &(m->refd_payload_encoded))) + { + DDS_CWARNING (&wr->e.gv->logconfig, "nn_xmsg_serdata: failed to encrypt data for "PGUIDFMT"", PGUID (wr->e.guid)); + ddsi_serdata_to_ser_unref (m->refd_payload, &m->refd_payload_iov); + assert (m->refd_payload_encoded == NULL); + m->refd_payload_iov.iov_base = NULL; + m->refd_payload_iov.iov_len = 0; + m->refd_payload = NULL; + } +#else + DDSRT_UNUSED_ARG(wr); +#endif } } -void nn_xmsg_setdst1 (struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *loc) +void nn_xmsg_setdst1 (struct q_globals *gv, struct nn_xmsg *m, const ddsi_guid_prefix_t *gp, const nn_locator_t *loc) { assert (m->dstmode == NN_XMSG_DST_UNSET); m->dstmode = NN_XMSG_DST_ONE; m->dstaddr.one.loc = *loc; m->data->dst.guid_prefix = nn_hton_guid_prefix (*gp); +#ifdef DDSI_INCLUDE_SECURITY + if (m->sec_info.use_rtps_encoding && !m->sec_info.dst_pp_handle) + { + struct proxy_participant *proxypp; + ddsi_guid_t guid; + + guid.prefix = *gp; + guid.entityid.u = NN_ENTITYID_PARTICIPANT; + + proxypp = ephash_lookup_proxy_participant_guid(gv->guid_hash, &guid); + if (proxypp) + m->sec_info.dst_pp_handle = q_omg_security_get_remote_participant_handle(proxypp); + } +#else + DDSRT_UNUSED_ARG(gv); +#endif +} + +bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp) +{ + if (m->dstmode == NN_XMSG_DST_ONE) + { + *gp = nn_hton_guid_prefix(m->data->dst.guid_prefix); + return true; + } + return false; } dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd) @@ -583,7 +748,7 @@ dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *pr nn_locator_t loc; if (addrset_any_uc (prd->c.as, &loc) || addrset_any_mc (prd->c.as, &loc)) { - nn_xmsg_setdst1 (m, &prd->e.guid.prefix, &loc); + nn_xmsg_setdst1 (prd->e.gv, m, &prd->e.guid.prefix, &loc); return 0; } else @@ -598,7 +763,7 @@ dds_return_t nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pw nn_locator_t loc; if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc)) { - nn_xmsg_setdst1 (m, &pwr->e.guid.prefix, &loc); + nn_xmsg_setdst1 (pwr->e.gv, m, &pwr->e.guid.prefix, &loc); return 0; } DDS_CWARNING (&pwr->e.gv->logconfig, "nn_xmsg_setdstPRD: no address for "PGUIDFMT, PGUID (pwr->e.guid)); @@ -953,6 +1118,9 @@ static void nn_xpack_reinit (struct nn_xpack *xp) xp->msg_len.length = 0; xp->included_msgs.latest = NULL; xp->maxdelay = T_NEVER; +#ifdef DDSI_INCLUDE_SECURITY + xp->sec_info.use_rtps_encoding = 0; +#endif #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS xp->encoderId = 0; #endif @@ -1012,6 +1180,34 @@ void nn_xpack_free (struct nn_xpack *xp) ddsrt_free (xp); } +static ssize_t nn_xpack_send_rtps(struct nn_xpack * xp, const nn_locator_t *loc) +{ + ssize_t ret = -1; + +#ifdef DDSI_INCLUDE_SECURITY + /* Only encode when needed. */ + if (xp->sec_info.use_rtps_encoding) + { + ret = secure_conn_write( + xp->conn, + loc, + xp->niov, + xp->iov, + xp->call_flags, + &(xp->msg_len), + (xp->dstmode == NN_XMSG_DST_ONE), + &(xp->sec_info), + ddsi_conn_write); + } + else +#endif /* DDSI_INCLUDE_SECURITY */ + { + ret = ddsi_conn_write (xp->conn, loc, xp->niov, xp->iov, xp->call_flags); + } + + return ret; +} + static ssize_t nn_xpack_send1 (const nn_locator_t *loc, void * varg) { struct nn_xpack *xp = varg; @@ -1038,14 +1234,17 @@ static ssize_t nn_xpack_send1 (const nn_locator_t *loc, void * varg) if (!gv->mute) { - nbytes = ddsi_conn_write (xp->conn, loc, xp->niov, xp->iov, xp->call_flags); + nbytes = nn_xpack_send_rtps(xp, loc); + #ifndef NDEBUG { size_t i, len; for (i = 0, len = 0; i < xp->niov; i++) { len += xp->iov[i].iov_len; } - assert (nbytes == -1 || (size_t) nbytes == len); + /* Possible number of bytes written can be larger + * due to security. */ + assert (nbytes == -1 || (size_t) nbytes >= len); } #endif } @@ -1338,6 +1537,12 @@ static int nn_xpack_mayaddmsg (const struct nn_xpack *xp, const struct nn_xmsg * return 0; #endif +#ifdef DDSI_INCLUDE_SECURITY + /* Don't mix up encoded and plain rtps messages */ + if (xp->sec_info.use_rtps_encoding != m->sec_info.use_rtps_encoding) + return 0; +#endif + return addressing_info_eq_onesidederr (xp, m); } @@ -1431,6 +1636,9 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag niov++; } +#ifdef DDSI_INCLUDE_SECURITY + xp->sec_info = m->sec_info; +#endif #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS xp->encoderId = m->encoderid; #endif