diff --git a/docs/dev/volatile_msg_secure.md b/docs/dev/volatile_msg_secure.md new file mode 100644 index 0000000..d784387 --- /dev/null +++ b/docs/dev/volatile_msg_secure.md @@ -0,0 +1,131 @@ +# ParticipantVolatileMessageSecure Handling + +## Short Introduction + +It is expected to have some knowledge of DDSI builtin (security) endpoints. + +```cpp +#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER 0xff0202c3 +#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER 0xff0202c4 +``` +These builtin endpoints have caused about the biggest code change in ddsi, regarding security. + +Chapters 7.4.4.3 and 7.4.4.4 in the DDS Security specification indicates the main issue why these builtin endpoints are different from all the others and somewhat more complex. + +> 7.4.4.3 Contents of the ParticipantVolatileMessageSecure +> The ParticipantVolatileMessageSecure is intended as a holder of secure information that +> is sent point-to-point from a DomainParticipant to another. +> +> [...] +> +> 7.4.4.4 Destination of the ParticipantVolatileMessageSecure +> +> If the destination_participant_guid member is not set to GUID_UNKNOWN, the message written is +> intended only for the BuiltinParticipantVolatileMessageSecureReader belonging to the +> DomainParticipant with a matching Participant Key. +> +> This is equivalent to saying that the BuiltinParticipantVolatileMessageSecureReader has an implied +> content filter with the logical expression: +> +> “destination_participant_guid == GUID_UNKNOWN +> || destination_participant_guid==BuiltinParticipantVolatileMessageSecureReader.participant.guid” +> +> Implementations of the specification can use this content filter or some other mechanism as long as the +> resulting behavior is equivalent to having this filter. +> +> [...] + +The "point-to-point" and "content filter" remarks makes everything more elaborate. + + +## Complexity + +It would be nice to be able to use the ```dds_set_topic_filter()``` functionality for these endpoints. However, that only works on the reader history cache (rhc), which is only available for ddsc entities and not for ddsi builtin entities. And it's the builtin entities that are being used. + +The ```dds_set_topic_filter()``` basically simulates that the sample was inserted into the rhc (but didn't insert it), which causes the rest of ddsi (regarding heartbeat, acknacks, gaps, etc) to work as normal while the sample just isn't provided to the reader. + +Unfortunately, the builtin volatile endpoints can not use that same simple sequence (just handle the sample but ignore it right at the end). Problem is, the sample is encoded. It can only decode samples that are intended for that reader. This would mean that it is best for the reader to only receive 'owned' samples that it can actually decode. + +This has all kinds of affects regarding the heartbeat, acknacks, gaps, etc. Basically, every writer/reader combination should have information regarding gaps and sequence numbers between them, while normally such information about proxies are combined. + + +## Implementation Overview + +This only depicts an overview. Some details will have been omitted. + + +### Writing + +The function ```write_crypto_exchange_message()``` takes care of generating the right sample information and pass it on to ```write_sample_p2p_wrlock_held()```. + +A proxy reader can now have a filter callback function (```proxy_reader::filter```). This indicates (on the writer side) if a sample will be accepted by the actual reader or not. This could be made more generic for proper 'writer side' content filter implementation. However, now it'll only be used by ParticipantVolatileMessageSecure and the filter is hardcoded to ```volatile_secure_data_filter()```. + +So, if ```write_sample_p2p_wrlock_held()``` is called with a proxy reader with a filter, it will get 'send/acked sequences' information between the writer and proxy reader. This is used to determine if gap information has to be send alongside the sample. + +Then, ```write_sample_p2p_wrlock_held()``` will enqueue the sample. + +Just before the submessage is added to the rtps message and send, it is encoded (todo). + + +### Reading + +First things first, the submessage is decoded when the rtps message is received (todo). + +It is received on a builtin reader, so the builtin queue is used and ```builtins_dqueue_handler()``` is called. That will forward the sample to the token exchange functionality, ignoring every sample that isn't related to the related participant (todo). + + +### Gaps on reader side + +The reader remembers the last_seq it knows from every connected proxy writer (```pwr_rd_match::last_seq```). +This is updated when handling heartbeats, gaps and regular messages and used to check if there are gaps. +Normally, the ```last_seq``` of a specific writer is used here. But when the reader knows that the writer uses a 'writer side content filter' (```proxy_writer::uses_filter```), it'll use the the ```last_seq``` that is related to the actual reader/writer match. +It is used to generate the AckNack (which contains gap information) response to the writer. + + +### Gaps on writer side + +The writer remembers which sample sequence it send the last to a specific reader through ```wr_prd_match::lst_seq```. +This is used to determine if a reader has received all relevant samples (through handling of acknack). +It is also used to determine the gap information that is added to samples to a specific reader when necessary. + + +### Heartbeats + +A writer is triggered to send heartbeats once in a while. Normally, that is broadcasted. But, for the volatile secure writer, it has to be send to each reader specifically. The heartbeat submessage that is send to each reader individually is encoded with a reader specific key. This key is generated from the shared secret which was determined during the authentication phase. + +When a writer should send heartbeats, ```handle_xevk_heartbeat()``` is called. For the volatile secure writer, the control is immediately submitted to ```send_heartbeat_to_all_readers()```. This will add heartbeat submessages to an rtps message for every reader it deems necessary. + + +### Reorder + +Normally received samples are placed in the reorder administration of the proxy_writer. However in this case the writer applies a content filter which is specific for each destinated reader. In that case the common reorder administration in the proxy_writer can not be used and the reader specific reorder administration must be used to handle the gap's which will be reader specific. + +
+
+
+=================================================
+Notes
+=================================================
+ +### Trying to put the security participant volatile endpoint implementation into context. + +The following elements are added to the data structures: + +* struct wr_prd_match::lst_seq : Highest seq send to this reader used when filter is applied +* struct pwr_rd_match::last_seq : Reader specific last received sequence number from the writer. +* struct proxy_writer::uses_filter : Indicates that a content-filter is active +* struct proxy_reader::filter : The filter to apply for this specific reader + +Functions added: + +* writer_hbcontrol_p2p : This function creates a heartbeat destined for a specific reader. The volatile secure writer will use an submessage encoding which uses a distinct key for each reader. Therefor a reader specific heartbeat is needed. +* nn_defrag_prune : When a volatile secure reader is deleted then the defragmentation administration could still contain messages destined for this reader. This function removes these messages from the defragmentation administration. +* volatile_secure_data_filter : The filter applied to the secure volatile messages which filters on the destination participant guid. +* write_sample_p2p_wrlock_held : This function writes a message to a particular reader. + +The use of the content-filter for the volatile secure writer implies that for each destination reader which message from the writer history cache is valid and had to be sent. +For messages that do not match this filter a GAP message should be sent to the reader. Each time a message is sent to a specific reader a possible gap message is added. +For the volatile secure writer the sequence number of the last message send to a particular reader is maintained in ```wr_prd_match::lst_seq'''. It is used to determine if a +HEARTBEAT has to send to this particular reader or that the reader has acknowledged all messages. At the reader side the sequence number of the last received message is +maintained in ```pwr_rd_match::last_seq'''. It is used to determine the contents of the ACKNACK message as response to a received HEARTBEAT. +When an ACKNACK (handle_AckNack) is received it is determined which samples should be resent related to the applied filter and for which sequence numbers a GAP message should be sent. diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_security_msg.h b/src/core/ddsi/include/dds/ddsi/ddsi_security_msg.h index a92eb8c..2db227c 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_security_msg.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_security_msg.h @@ -21,6 +21,11 @@ extern "C" { #endif +struct participant; +struct writer; +struct proxy_reader; +struct ddsi_serdata; + #define GMCLASSID_SECURITY_AUTH_REQUEST "dds.sec.auth_request" #define GMCLASSID_SECURITY_AUTH_HANDSHAKE "dds.sec.auth" @@ -32,7 +37,7 @@ typedef struct nn_message_identity { typedef struct nn_participant_generic_message { nn_message_identity_t message_identity; nn_message_identity_t related_message_identity; - ddsi_guid_t destinaton_participant_guid; + ddsi_guid_t destination_participant_guid; ddsi_guid_t destination_endpoint_guid; ddsi_guid_t source_endpoint_guid; const char *message_class_id; @@ -84,6 +89,21 @@ nn_participant_generic_message_serialize( DDS_EXPORT extern const enum pserop pserop_participant_generic_message[]; +DDS_EXPORT int +write_crypto_exchange_message( + const struct participant *pp, + const ddsi_guid_t *dst_pguid, + const ddsi_guid_t *src_eguid, + const ddsi_guid_t *dst_eguid, + const char *classid, + const nn_dataholderseq_t *tokens); + +DDS_EXPORT int +volatile_secure_data_filter( + struct writer *wr, + struct proxy_reader *prd, + struct ddsi_serdata *serdata); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index d782994..46117c7 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -23,6 +23,7 @@ #include "dds/ddsi/q_hbcontrol.h" #include "dds/ddsi/q_feature_check.h" #include "dds/ddsi/q_inverse_uint32_set.h" +#include "dds/ddsi/ddsi_serdata_default.h" #include "dds/ddsi/ddsi_tran.h" @@ -92,6 +93,7 @@ struct wr_prd_match { seqno_t min_seq; /* smallest ack'd seq nr in subtree */ seqno_t max_seq; /* sort-of highest ack'd seq nr in subtree (see augment function) */ seqno_t seq; /* highest acknowledged seq nr */ + seqno_t last_seq; /* highest seq send to this reader used when filter is applied */ int32_t num_reliable_readers_where_seq_equals_max; ddsi_guid_t arbitrary_unacked_reader; nn_count_t next_acknack; /* next acceptable acknack sequence number */ @@ -119,8 +121,10 @@ struct pwr_rd_match { nn_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ nn_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ seqno_t seq_last_nack; /* last seq for which we requested a retransmit */ + seqno_t last_seq; /* last known sequence number from this writer */ struct xevent *acknack_xevent; /* entry in xevent queue for sending acknacks */ enum pwr_rd_match_syncstate in_sync; /* whether in sync with the proxy writer */ + unsigned filtered:1; union { struct { seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */ @@ -364,6 +368,7 @@ struct proxy_writer { unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */ unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */ unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */ + unsigned filtered: 1; /* iff 1, builtin proxy writer uses content filter, which affects heartbeats and gaps. */ #ifdef DDSI_INCLUDE_SSM unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */ #endif @@ -376,6 +381,9 @@ struct proxy_writer { void *ddsi2direct_cbarg; }; + +typedef int (*filter_fn_t)(struct writer *wr, struct proxy_reader *prd, struct ddsi_serdata *serdata); + struct proxy_reader { struct entity_common e; struct proxy_endpoint_common c; @@ -385,6 +393,7 @@ struct proxy_reader { unsigned favours_ssm: 1; /* iff 1, this proxy reader favours SSM when available */ #endif ddsrt_avl_tree_t writers; /* matching LOCAL writers */ + filter_fn_t filter; }; extern const ddsrt_avl_treedef_t wr_readers_treedef; diff --git a/src/core/ddsi/include/dds/ddsi/q_globals.h b/src/core/ddsi/include/dds/ddsi/q_globals.h index c5cff1a..079e7df 100644 --- a/src/core/ddsi/include/dds/ddsi/q_globals.h +++ b/src/core/ddsi/include/dds/ddsi/q_globals.h @@ -238,6 +238,8 @@ struct q_globals { dds_qos_t builtin_endpoint_xqos_rd; dds_qos_t builtin_endpoint_xqos_wr; #ifdef DDSI_INCLUDE_SECURITY + dds_qos_t builtin_volatile_xqos_rd; + dds_qos_t builtin_volatile_xqos_wr; dds_qos_t builtin_stateless_xqos_rd; dds_qos_t builtin_stateless_xqos_wr; #endif diff --git a/src/core/ddsi/include/dds/ddsi/q_hbcontrol.h b/src/core/ddsi/include/dds/ddsi/q_hbcontrol.h index ca6a56f..b79dbf8 100644 --- a/src/core/ddsi/include/dds/ddsi/q_hbcontrol.h +++ b/src/core/ddsi/include/dds/ddsi/q_hbcontrol.h @@ -36,6 +36,11 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_ int writer_hbcontrol_must_send (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow); struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, int hbansreq, int issync); +#ifdef DDSI_INCLUDE_SECURITY +struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state *whcst, int hbansreq, struct proxy_reader *prd); +#endif + + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsi/include/dds/ddsi/q_misc.h b/src/core/ddsi/include/dds/ddsi/q_misc.h index 569f342..13ffade 100644 --- a/src/core/ddsi/include/dds/ddsi/q_misc.h +++ b/src/core/ddsi/include/dds/ddsi/q_misc.h @@ -35,6 +35,8 @@ unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr); int WildcardOverlap(char * p1, char * p2); #endif +DDS_EXPORT int guid_prefix_eq (const ddsi_guid_prefix_t *a, const ddsi_guid_prefix_t *b); +DDS_EXPORT int guid_eq (const struct ddsi_guid *a, const struct ddsi_guid *b); DDS_EXPORT int ddsi2_patmatch (const char *pat, const char *str); uint32_t crc32_calc (const void *buf, size_t length); diff --git a/src/core/ddsi/include/dds/ddsi/q_protocol.h b/src/core/ddsi/include/dds/ddsi/q_protocol.h index 3e58cb6..2fa13bf 100644 --- a/src/core/ddsi/include/dds/ddsi/q_protocol.h +++ b/src/core/ddsi/include/dds/ddsi/q_protocol.h @@ -98,7 +98,8 @@ typedef struct { #define NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DETECTOR (1u<<21) #define NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER (1u<<22) #define NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_DETECTOR (1u<<23) -/* TODO: ENDPOINT_PARTICIPANT_VOLATILE */ +#define NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_ANNOUNCER (1u<<24) +#define NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_DETECTOR (1u<<25) #define NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER (1u << 26) #define NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR (1u << 27) diff --git a/src/core/ddsi/include/dds/ddsi/q_radmin.h b/src/core/ddsi/include/dds/ddsi/q_radmin.h index 862d1ee..b165afe 100644 --- a/src/core/ddsi/include/dds/ddsi/q_radmin.h +++ b/src/core/ddsi/include/dds/ddsi/q_radmin.h @@ -219,6 +219,7 @@ void nn_defrag_free (struct nn_defrag *defrag); struct nn_rsample *nn_defrag_rsample (struct nn_defrag *defrag, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo); void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1); int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz); +void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min); struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_reorder_mode mode, uint32_t max_samples, bool late_ack_mode); void nn_reorder_free (struct nn_reorder *r); @@ -229,6 +230,7 @@ nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reord int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq); unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail); seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder); +void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq); struct nn_dqueue *nn_dqueue_new (const char *name, const struct q_globals *gv, uint32_t max_samples, nn_dqueue_handler_t handler, void *arg); void nn_dqueue_free (struct nn_dqueue *q); diff --git a/src/core/ddsi/include/dds/ddsi/q_receive.h b/src/core/ddsi/include/dds/ddsi/q_receive.h index e165bea..0c8650d 100644 --- a/src/core/ddsi/include/dds/ddsi/q_receive.h +++ b/src/core/ddsi/include/dds/ddsi/q_receive.h @@ -21,11 +21,25 @@ struct nn_rsample_info; struct nn_rdata; struct ddsi_tran_listener; struct recv_thread_arg; +struct writer; +struct proxy_reader; + +struct nn_gap_info { + int64_t gapstart; + int64_t gapend; + unsigned gapnumbits; + unsigned gapbits[256 / 32]; +}; + +void nn_gap_info_init(struct nn_gap_info *gi); +void nn_gap_info_update(struct q_globals *gv, struct nn_gap_info *gi, int64_t seqnr); +struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader *prd, struct nn_gap_info *gi); void trigger_recv_threads (const struct q_globals *gv); uint32_t recv_thread (void *vrecv_thread_arg); uint32_t listen_thread (struct ddsi_tran_listener * listener); int user_dqueue_handler (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const ddsi_guid_t *rdguid, void *qarg); +int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, seqno_t start, seqno_t base, uint32_t numbits, const uint32_t *bits); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/dds/ddsi/q_rtps.h b/src/core/ddsi/include/dds/ddsi/q_rtps.h index 71382f4..448639c 100644 --- a/src/core/ddsi/include/dds/ddsi/q_rtps.h +++ b/src/core/ddsi/include/dds/ddsi/q_rtps.h @@ -53,7 +53,8 @@ typedef int64_t seqno_t; #define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER 0x201c4 #define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER 0xff0200c2 #define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER 0xff0200c7 -/* TODO: ENDPOINT_PARTICIPANT_VOLATILE */ +#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER 0xff0202c3 +#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER 0xff0202c4 #define NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER 0xff0101c2 #define NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER 0xff0101c7 diff --git a/src/core/ddsi/include/dds/ddsi/q_transmit.h b/src/core/ddsi/include/dds/ddsi/q_transmit.h index c7899e6..9120f7e 100644 --- a/src/core/ddsi/include/dds/ddsi/q_transmit.h +++ b/src/core/ddsi/include/dds/ddsi/q_transmit.h @@ -43,6 +43,7 @@ int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *x dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew); int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew); void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync); +int write_sample_p2p_wrlock_held(struct writer *wr, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, struct proxy_reader *prd); #if defined (__cplusplus) } diff --git a/src/core/ddsi/src/ddsi_security_msg.c b/src/core/ddsi/src/ddsi_security_msg.c index afed603..8a93845 100644 --- a/src/core/ddsi/src/ddsi_security_msg.c +++ b/src/core/ddsi/src/ddsi_security_msg.c @@ -11,9 +11,15 @@ */ #include #include +#include "dds/ddsrt/md5.h" #include "dds/ddsrt/heap.h" #include "dds/ddsrt/string.h" +#include "dds/ddsi/q_plist.h" #include "dds/ddsi/q_bswap.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_transmit.h" +#include "dds/ddsi/q_misc.h" +#include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_security_msg.h" #include "dds/ddsi/ddsi_plist_generic.h" @@ -22,7 +28,7 @@ const enum pserop pserop_participant_generic_message[] = /* nn_participant_generic_message */ XG, Xl, /* nn_message_identity_t message_identity */ XG, Xl, /* nn_message_identity_t related_message_identity */ - XG, /* ddsi_guid_t destinaton_participant_guid */ + XG, /* ddsi_guid_t destination_participant_guid */ XG, /* ddsi_guid_t destination_endpoint_guid */ XG, /* ddsi_guid_t source_endpoint_guid */ XS, /* char* message_class_id */ @@ -106,7 +112,7 @@ nn_participant_generic_message_init( } if (dstpguid) - msg->destinaton_participant_guid = *dstpguid; + msg->destination_participant_guid = *dstpguid; if (dsteguid) msg->destination_endpoint_guid = *dsteguid; @@ -147,3 +153,104 @@ nn_participant_generic_message_deseralize( assert(sizeof(nn_participant_generic_message_t) == plist_memsize_generic(pserop_participant_generic_message)); return plist_deser_generic (msg, data, len, bswap, pserop_participant_generic_message); } + +int +write_crypto_exchange_message( + const struct participant *pp, + const ddsi_guid_t *dst_pguid, + const ddsi_guid_t *src_eguid, + const ddsi_guid_t *dst_eguid, + const char *classid, + const nn_dataholderseq_t *tokens) +{ + struct q_globals * const gv = pp->e.gv; + struct nn_participant_generic_message pmg; + struct ddsi_tkmap_instance *tk; + struct ddsi_serdata *serdata; + struct proxy_reader *prd; + ddsi_guid_t prd_guid; + unsigned char *data; + size_t len; + struct writer *wr; + seqno_t seq; + int r; + + if ((wr = get_builtin_writer (pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER)) == NULL) + { + GVLOG (DDS_LC_DISCOVERY, "write_crypto_exchange_message("PGUIDFMT") - builtin volatile secure writer not found\n", PGUID (pp->e.guid)); + return -1; + } + + prd_guid.prefix = dst_pguid->prefix; + prd_guid.entityid.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER; + if ((prd = ephash_lookup_proxy_reader_guid (gv->guid_hash, &prd_guid)) == NULL) + { + return -1; + } + + GVLOG (DDS_LC_DISCOVERY, "send crypto tokens("PGUIDFMT" --> "PGUIDFMT")\n", PGUID (wr->e.guid), PGUID (prd_guid)); + + ddsrt_mutex_lock (&wr->e.lock); + seq = ++wr->seq; + + /* Get serialized message. */ + nn_participant_generic_message_init(&pmg, &wr->e.guid, seq, dst_pguid, dst_eguid, src_eguid, classid, tokens, NULL); + nn_participant_generic_message_serialize(&pmg, &data, &len); + + /* Get the key value. */ + ddsrt_md5_state_t md5st; + ddsrt_md5_byte_t digest[16]; + ddsrt_md5_init (&md5st); + ddsrt_md5_append (&md5st, (const ddsrt_md5_byte_t *)data, sizeof (nn_message_identity_t)); + ddsrt_md5_finish (&md5st, digest); + + /* Write the sample. */ + struct ddsi_rawcdr_sample raw = { + .blob = data, + .size = len, + .key = digest, + .keysize = 16 + }; + serdata = ddsi_serdata_from_sample (gv->rawcdr_topic, SDK_DATA, &raw); + tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, serdata); + r = write_sample_p2p_wrlock_held(wr, seq, NULL, serdata, tk, prd); + ddsi_tkmap_instance_unref (gv->m_tkmap, tk); + ddsi_serdata_unref (serdata); + + nn_participant_generic_message_deinit(&pmg); + + ddsrt_mutex_unlock (&wr->e.lock); + + return r; +} + +int volatile_secure_data_filter(struct writer *wr, struct proxy_reader *prd, struct ddsi_serdata *serdata) +{ + static const size_t guid_offset = offsetof(nn_participant_generic_message_t, destination_participant_guid); + ddsrt_iovec_t guid_ref = { .iov_len=0, .iov_base=NULL }; + ddsi_guid_t *msg_guid; + ddsi_guid_t pp_guid; + int pass; + + DDSRT_UNUSED_ARG(wr); + + assert(wr); + assert(prd); + assert(serdata); + + (void)ddsi_serdata_to_ser_ref(serdata, guid_offset, sizeof(ddsi_guid_t), &guid_ref); + assert(guid_ref.iov_len == sizeof(ddsi_guid_t)); + assert(guid_ref.iov_base); + msg_guid = (ddsi_guid_t*)guid_ref.iov_base; + + pass = is_null_guid(msg_guid); + if (!pass) + { + pp_guid = nn_hton_guid(prd->c.proxypp->e.guid); + pass = guid_eq(msg_guid, &pp_guid); + } + + ddsi_serdata_to_ser_unref(serdata, &guid_ref); + + return pass; +} diff --git a/src/core/ddsi/src/ddsi_security_omg.c b/src/core/ddsi/src/ddsi_security_omg.c index b471fb7..ccaccf3 100644 --- a/src/core/ddsi/src/ddsi_security_omg.c +++ b/src/core/ddsi/src/ddsi_security_omg.c @@ -206,6 +206,9 @@ is_proxy_participant_deletion_allowed( assert(gv); assert(guid); + /* TODO: Check if the proxy writer guid prefix matches that of the proxy + * participant. Deletion is not allowed when they're not equal. */ + /* Always allow deletion from a secure proxy writer. */ if (pwr_entityid.u == NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER) return true; diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index 11022af..2e3b1d1 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -1821,6 +1821,9 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER: /* TODO: Handshake */ break; + case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER: + /* TODO: Key exchange */ + break; #endif default: GVLOGDISC ("data(builtin, vendor %u.%u): "PGUIDFMT" #%"PRId64": not handled\n", diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 89de419..0fbacb6 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -31,6 +31,7 @@ #include "dds/ddsi/q_qosmatch.h" #include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_globals.h" +#include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_addrset.h" #include "dds/ddsi/q_xevent.h" /* qxev_spdp, &c. */ #include "dds/ddsi/q_ddsi_discovery.h" /* spdp_write, &c. */ @@ -50,6 +51,10 @@ #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_security_omg.h" +#ifdef DDSI_INCLUDE_SECURITY +#include "dds/ddsi/ddsi_security_msg.h" +#endif + struct deleted_participant { ddsrt_avl_node_t avlnode; ddsi_guid_t guid; @@ -490,6 +495,10 @@ static void add_security_builtin_endpoints(struct participant *pp, ddsi_guid_t * new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_stateless_xqos_wr, whc_new(gv, 0, 1, 1), NULL, NULL); pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER; + subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER); + new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_volatile_xqos_wr, whc_new(gv, 0, 0, 0), NULL, NULL); + pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_ANNOUNCER; + subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER); new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL); pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_ANNOUNCER; @@ -522,6 +531,10 @@ static void add_security_builtin_endpoints(struct participant *pp, ddsi_guid_t * new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_endpoint_xqos_rd, NULL, NULL, NULL); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR; + subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER); + new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_volatile_xqos_rd, NULL, NULL, NULL); + pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_DETECTOR; + subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER); new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_stateless_xqos_rd, NULL, NULL, NULL); pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_DETECTOR; @@ -911,6 +924,8 @@ static void unref_participant (struct participant *pp, const struct ddsi_guid *g NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER, NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER, NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER, + NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, + NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER, /* PrismTech ones: */ NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER, NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_READER, @@ -1113,6 +1128,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER: bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER; break; + case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER: + bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_ANNOUNCER; + break; case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER: bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_ANNOUNCER; break; @@ -1729,9 +1747,12 @@ static void proxy_writer_drop_connection (const struct ddsi_guid *pwr_guid, stru local_reader_ary_remove (&pwr->rdary, rd); } ddsrt_mutex_unlock (&pwr->e.lock); - if (m != NULL) + if (m) { update_reader_init_acknack_count (&rd->e.gv->logconfig, rd->e.gv->guid_hash, &rd->e.guid, m->count); + if (m->filtered) + nn_defrag_prune(pwr->defrag, &m->rd_guid.prefix, m->last_seq); + } free_pwr_rd_match (m); } @@ -1796,6 +1817,7 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd) m->seq = MAX_SEQ_NUMBER; else m->seq = wr->seq; + m->last_seq = m->seq; if (ddsrt_avl_lookup_ipath (&wr_readers_treedef, &wr->readers, &prd->e.guid, &path)) { ELOGDISC (wr, " writer_add_connection(wr "PGUIDFMT" prd "PGUIDFMT") - already connected\n", @@ -2044,6 +2066,8 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader m->t_heartbeat_accepted.v = 0; m->t_last_nack.v = 0; m->seq_last_nack = 0; + m->last_seq = 0; + m->filtered = 0; /* These can change as a consequence of handling data and/or discovery activities. The safe way of dealing with them is to @@ -2104,9 +2128,16 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader difference in practice.) */ if (rd->reliable) { + uint32_t secondary_reorder_maxsamples = pwr->e.gv->config.secondary_reorder_maxsamples; + + if (rd->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER) + { + secondary_reorder_maxsamples = pwr->e.gv->config.primary_reorder_maxsamples; + m->filtered = 1; + } m->acknack_xevent = qxev_acknack (pwr->evq, add_duration_to_mtime (tnow, pwr->e.gv->config.preemptive_ack_delay), &pwr->e.guid, &rd->e.guid); m->u.not_in_sync.reorder = - nn_reorder_new (&pwr->e.gv->logconfig, NN_REORDER_MODE_NORMAL, pwr->e.gv->config.secondary_reorder_maxsamples, pwr->e.gv->config.late_ack_mode); + nn_reorder_new (&pwr->e.gv->logconfig, NN_REORDER_MODE_NORMAL, secondary_reorder_maxsamples, pwr->e.gv->config.late_ack_mode); pwr->n_reliable_readers++; } else @@ -2142,6 +2173,9 @@ already_matched: return; } + + + static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer *wr) { struct prd_wr_match *m = ddsrt_malloc (sizeof (*m)); @@ -2239,6 +2273,12 @@ static ddsi_entityid_t builtin_entityid_match (ddsi_entityid_t x) case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER: res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER; break; + case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER: + res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER; + break; + case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER: + res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER; + break; case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER: res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER; break; @@ -2939,11 +2979,12 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se assert (wr->xqos->present & QP_RELIABILITY); wr->reliable = (wr->xqos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT); assert (wr->xqos->present & QP_DURABILITY); - if (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE)) + if (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) && + (wr->e.guid.entityid.u != NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER)) { assert (wr->xqos->history.kind == DDS_HISTORY_KEEP_LAST); - assert (wr->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL || - wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER); + assert ((wr->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL) || + (wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER)); } wr->handle_as_transient_local = (wr->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL); wr->include_keyhash = @@ -3053,7 +3094,9 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se wr->whc_low = wr->e.gv->config.whc_lowwater_mark; wr->whc_high = wr->e.gv->config.whc_init_highwater_mark.value; } - assert (!is_builtin_entityid(wr->e.guid.entityid, NN_VENDORID_ECLIPSE) || (wr->whc_low == wr->whc_high && wr->whc_low == INT32_MAX)); + assert (!(wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) + || + (wr->whc_low == wr->whc_high && wr->whc_low == INT32_MAX)); /* Connection admin */ ddsrt_avl_init (&wr_readers_treedef, &wr->readers); @@ -3505,7 +3548,15 @@ static dds_return_t new_reader_guid assert (rd->xqos->present & QP_RELIABILITY); rd->reliable = (rd->xqos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT); assert (rd->xqos->present & QP_DURABILITY); - rd->handle_as_transient_local = (rd->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL); + /* The builtin volatile secure writer applies a filter which is used to send the secure + * crypto token only to the destination reader for which the crypto tokens are applicable. + * Thus the builtin volatile secure reader will receive gaps in the sequence numbers of + * the messages received. Therefore the out-of-order list of the proxy writer cannot be + * used for this reader and reader specific out-of-order list must be used which is + * used for handling transient local data. + */ + rd->handle_as_transient_local = (rd->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL) || + (rd->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER); rd->topic = ddsi_sertopic_ref (topic); rd->ddsi2direct_cb = 0; rd->ddsi2direct_cbarg = 0; @@ -3862,6 +3913,20 @@ static void add_proxy_builtin_endpoints( &gv->builtin_endpoint_xqos_wr, &gv->builtin_endpoint_xqos_rd); + /* Security 'volatile' proxy endpoints. */ + static const struct bestab bestab_volatile[] = { + LTE (PARTICIPANT_VOLATILE_SECURE_ANNOUNCER, P2P, PARTICIPANT_VOLATILE_SECURE_WRITER), + LTE (PARTICIPANT_VOLATILE_SECURE_DETECTOR, P2P, PARTICIPANT_VOLATILE_SECURE_READER) + }; + create_proxy_builtin_endpoints(gv, + bestab_volatile, + (int)(sizeof (bestab_volatile) / sizeof (*bestab_volatile)), + ppguid, + proxypp, + timestamp, + &gv->builtin_volatile_xqos_wr, + &gv->builtin_volatile_xqos_rd); + /* Security 'stateless' proxy endpoints. */ static const struct bestab bestab_stateless[] = { LTE (PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER, P2P, PARTICIPANT_STATELESS_MESSAGE_WRITER), @@ -4351,6 +4416,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons pwr->last_fragnum = ~0u; pwr->nackfragcount = 0; pwr->last_fragnum_reset = 0; + pwr->filtered = 0; ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1); if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) { /* The DDSI built-in proxy writers always deliver @@ -4396,6 +4462,19 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons } reorder_mode = get_proxy_writer_reorder_mode(pwr->e.guid.entityid, isreliable); pwr->reorder = nn_reorder_new (&gv->logconfig, reorder_mode, gv->config.primary_reorder_maxsamples, gv->config.late_ack_mode); + + if (pwr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) + { + /* for the builtin_volatile_secure proxy writer which uses a content filter set the next expected + * sequence number of the reorder administration to the maximum sequence number to ensure that effectively + * the reorder administration of the builtin_volatile_secure proxy writer is not used and because the corresponding + * reader is always considered out of sync the reorder administration of the corresponding reader will be used + * instead. + */ + nn_reorder_set_next_seq(pwr->reorder, MAX_SEQ_NUMBER); + pwr->filtered = 1; + } + pwr->dqueue = dqueue; pwr->evq = evq; pwr->ddsi2direct_cb = 0; @@ -4595,6 +4674,15 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons ddsrt_avl_init (&prd_writers_treedef, &prd->writers); +#ifdef DDSI_INCLUDE_SECURITY + if (prd->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER) + prd->filter = volatile_secure_data_filter; + else + prd->filter = NULL; +#else + prd->filter = NULL; +#endif + /* locking the entity prevents matching while the built-in topic hasn't been published yet */ ddsrt_mutex_lock (&prd->e.lock); ephash_insert_proxy_reader_guid (gv->guid_hash, prd); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 2d880ac..5429094 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -140,6 +140,29 @@ static void make_builtin_endpoint_xqos (dds_qos_t *q, const dds_qos_t *template) q->durability.kind = DDS_DURABILITY_TRANSIENT_LOCAL; } +#ifdef DDSI_INCLUDE_SECURITY +static void make_builtin_volatile_endpoint_xqos (dds_qos_t *q, const dds_qos_t *template) +{ + nn_xqos_copy (q, template); + q->reliability.kind = DDS_RELIABILITY_RELIABLE; + q->reliability.max_blocking_time = 100 * T_MILLISECOND; + q->durability.kind = DDS_DURABILITY_VOLATILE; + q->history.kind = DDS_HISTORY_KEEP_ALL; +} + +static void add_property_to_xqos(dds_qos_t *q, const char *name, const char *value) +{ + assert(!(q->present & QP_PROPERTY_LIST)); + q->present |= QP_PROPERTY_LIST; + q->property.value.n = 1; + q->property.value.props = ddsrt_malloc(sizeof(dds_property_t)); + q->property.binary_value.n = 0; + q->property.binary_value.props = NULL; + q->property.value.props[0].name = ddsrt_strdup(name); + q->property.value.props[0].value = ddsrt_strdup(value); +} +#endif + static int set_recvips (struct q_globals *gv) { gv->recvips = NULL; @@ -1014,10 +1037,17 @@ int rtps_init (struct q_globals *gv) make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_rd, &gv->default_xqos_rd); make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_wr, &gv->default_xqos_wr); #ifdef DDSI_INCLUDE_SECURITY + make_builtin_volatile_endpoint_xqos(&gv->builtin_volatile_xqos_rd, &gv->default_xqos_rd); + make_builtin_volatile_endpoint_xqos(&gv->builtin_volatile_xqos_wr, &gv->default_xqos_wr); nn_xqos_copy (&gv->builtin_stateless_xqos_rd, &gv->default_xqos_rd); nn_xqos_copy (&gv->builtin_stateless_xqos_wr, &gv->default_xqos_wr); gv->builtin_stateless_xqos_wr.reliability.kind = DDS_RELIABILITY_BEST_EFFORT; gv->builtin_stateless_xqos_wr.durability.kind = DDS_DURABILITY_VOLATILE; + + /* Setting these properties allows the CryptoKeyFactory to recognize + * the entities (see DDS Security spec chapter 8.8.8.1). */ + add_property_to_xqos(&gv->builtin_volatile_xqos_rd, "dds.sec.builtin_endpoint_name", "BuiltinParticipantVolatileMessageSecureReader"); + add_property_to_xqos(&gv->builtin_volatile_xqos_wr, "dds.sec.builtin_endpoint_name", "BuiltinParticipantVolatileMessageSecureWriter"); #endif make_special_topics (gv); @@ -1338,6 +1368,8 @@ err_unicast_sockets: #ifdef DDSI_INCLUDE_SECURITY nn_xqos_fini (&gv->builtin_stateless_xqos_wr); nn_xqos_fini (&gv->builtin_stateless_xqos_rd); + nn_xqos_fini (&gv->builtin_volatile_xqos_wr); + nn_xqos_fini (&gv->builtin_volatile_xqos_rd); #endif nn_xqos_fini (&gv->builtin_endpoint_xqos_wr); nn_xqos_fini (&gv->builtin_endpoint_xqos_rd); @@ -1670,6 +1702,8 @@ void rtps_fini (struct q_globals *gv) #ifdef DDSI_INCLUDE_SECURITY nn_xqos_fini (&gv->builtin_stateless_xqos_wr); nn_xqos_fini (&gv->builtin_stateless_xqos_rd); + nn_xqos_fini (&gv->builtin_volatile_xqos_wr); + nn_xqos_fini (&gv->builtin_volatile_xqos_rd); #endif nn_xqos_fini (&gv->builtin_endpoint_xqos_wr); nn_xqos_fini (&gv->builtin_endpoint_xqos_rd); diff --git a/src/core/ddsi/src/q_misc.c b/src/core/ddsi/src/q_misc.c index 8569c01..1e4e7b5 100644 --- a/src/core/ddsi/src/q_misc.c +++ b/src/core/ddsi/src/q_misc.c @@ -42,6 +42,16 @@ int WildcardOverlap(char * p1, char * p2) } #endif +int guid_prefix_eq (const ddsi_guid_prefix_t *a, const ddsi_guid_prefix_t *b) +{ + return a->u[0] == b->u[0] && a->u[1] == b->u[1] && a->u[2] == b->u[2]; +} + +int guid_eq (const struct ddsi_guid *a, const struct ddsi_guid *b) +{ + return guid_prefix_eq(&a->prefix, &b->prefix) && (a->entityid.u == b->entityid.u); +} + int ddsi2_patmatch (const char *pat, const char *str) { while (*pat) diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index d553510..f7de87c 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -1545,6 +1545,28 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu return (int) map->numbits; } +/* There is only one defrag per proxy writer. However for the Volatile Secure writer a filter + * is applied to filter on the destination participant. Note that there will be one + * builtin Volatile Secure reader for each local participant. When this local participant + * is deleted the defrag buffer may still contain fragments for the associated reader. + * The nn_defrag_prune is used to remove these fragments and should only be used when + * the Volatile Secure reader is deleted. + */ +void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min) +{ + struct nn_rsample *s = ddsrt_avl_lookup_succ_eq (&defrag_sampletree_treedef, &defrag->sampletree, &min); + while (s) + { + struct nn_rsample *s1 = ddsrt_avl_find_succ (&defrag_sampletree_treedef, &defrag->sampletree, s); + if (guid_prefix_eq(&s->u.defrag.sampleinfo->rst->dst_guid_prefix, dst)) + { + defrag_rsample_drop (defrag, s); + } + s = s1; + } + defrag->max_sample = ddsrt_avl_find_max (&defrag_sampletree_treedef, &defrag->sampletree); +} + /* REORDER ------------------------------------------------------------- The reorder index tracks out-of-order messages as non-overlapping, @@ -2366,6 +2388,11 @@ seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder) return reorder->next_seq; } +void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq) +{ + reorder->next_seq = seq; +} + /* DQUEUE -------------------------------------------------------------- */ struct nn_dqueue { diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 22a7543..667fdbe 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -79,6 +79,11 @@ static void deliver_user_data_synchronously (struct nn_rsample_chain *sc, const static void maybe_set_reader_in_sync (struct proxy_writer *pwr, struct pwr_rd_match *wn, seqno_t last_deliv_seq) { + if (wn->filtered) + { + wn->in_sync = PRMSS_OUT_OF_SYNC; + return; + } switch (wn->in_sync) { case PRMSS_SYNC: @@ -505,7 +510,7 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms return 1; } -static int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, seqno_t start, seqno_t base, uint32_t numbits, const uint32_t *bits) +int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, seqno_t start, seqno_t base, uint32_t numbits, const uint32_t *bits) { struct nn_xmsg_marker sm_marker; Gap_t *gap; @@ -603,6 +608,82 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_cou return 1; } +void nn_gap_info_init(struct nn_gap_info *gi) +{ + gi->gapstart = -1; + gi->gapend = -1; + gi->gapnumbits = 0; + memset(gi->gapbits, 0, sizeof(gi->gapbits)); +} + +void nn_gap_info_update(struct q_globals *gv, struct nn_gap_info *gi, int64_t seqnr) +{ + if (gi->gapstart == -1) + { + GVTRACE (" M%"PRId64, seqnr); + gi->gapstart = seqnr; + gi->gapend = gi->gapstart + 1; + } + else if (seqnr == gi->gapend) + { + GVTRACE (" M%"PRId64, seqnr); + gi->gapend = seqnr + 1; + } + else if (seqnr - gi->gapend < 256) + { + unsigned idx = (unsigned) (seqnr - gi->gapend); + GVTRACE (" M%"PRId64, seqnr); + gi->gapnumbits = idx + 1; + nn_bitset_set (gi->gapnumbits, gi->gapbits, idx); + } +} + +struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader *prd, struct nn_gap_info *gi) +{ + struct nn_xmsg *m; + + if (gi->gapstart <= 0) + return NULL; + + if (gi->gapnumbits == 0) + { + /* Avoid sending an invalid bitset */ + gi->gapnumbits = 1; + nn_bitset_set (gi->gapnumbits, gi->gapbits, 0); + gi->gapend--; + } + + m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL); + +#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS + nn_xmsg_setencoderid (m, wr->partition_id); +#endif + + if (nn_xmsg_setdstPRD (m, prd) < 0) + { + nn_xmsg_free (m); + m = NULL; + } + else + { + add_Gap (m, wr, prd, gi->gapstart, gi->gapend, gi->gapnumbits, gi->gapbits); + if (nn_xmsg_size(m) == 0) + { + nn_xmsg_free (m); + m = NULL; + } + else + { + unsigned i; + ETRACE (wr, " FXGAP%"PRId64"..%"PRId64"/%d:", gi->gapstart, gi->gapend, gi->gapnumbits); + for (i = 0; i < gi->gapnumbits; i++) + ETRACE (wr, "%c", nn_bitset_isset (gi->gapnumbits, gi->gapbits, i) ? '1' : '0'); + } + } + + return m; +} + static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const AckNack_t *msg, nn_wctime_t timestamp, SubmessageKind_t prev_smid) { struct proxy_reader *prd; @@ -611,10 +692,9 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac ddsi_guid_t src, dst; seqno_t seqbase; seqno_t seq_xmit; + seqno_t max_seq_available; nn_count_t *countp; - seqno_t gapstart = -1, gapend = -1; - unsigned gapnumbits = 0; - uint32_t gapbits[256 / 32]; + struct nn_gap_info gi; int accelerate_rexmit = 0; int is_pure_ack; int is_pure_nonhist_ack; @@ -626,7 +706,6 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac struct whc_node *deferred_free_list = NULL; struct whc_state whcst; int hb_sent_in_response = 0; - memset (gapbits, 0, sizeof (gapbits)); countp = (nn_count_t *) ((char *) msg + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (msg->readerSNState.numbits)); src.prefix = rst->src_guid_prefix; src.entityid = msg->readerId; @@ -836,6 +915,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac a future request'll fix it. */ enqueued = 1; seq_xmit = writer_read_seq_xmit (wr); + nn_gap_info_init(&gi); const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq; const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0; for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++) @@ -853,7 +933,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac if (!wr->retransmitting && sample.unacked) writer_set_retransmitting (wr); - if (rst->gv->config.retransmit_merging != REXMIT_MERGE_NEVER && rn->assumed_in_sync) + if (rst->gv->config.retransmit_merging != REXMIT_MERGE_NEVER && rn->assumed_in_sync && !prd->filter) { /* send retransmit to all receivers, but skip if recently done */ nn_mtime_t tstamp = now_mt (); @@ -875,77 +955,54 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac } else { - /* no merging, send directed retransmit */ - RSTTRACE (" RX%"PRId64"", seqbase + i); - enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, prd, 0) >= 0); - if (enqueued) + /* Is this a volatile reader with a filter? + * If so, call the filter to see if we should re-arrange the sequence gap when needed. */ + if (prd->filter && !prd->filter (wr, prd, sample.serdata)) + nn_gap_info_update (rst->gv, &gi, seqbase + i); + else { - max_seq_in_reply = seqbase + i; - msgs_sent++; - sample.rexmit_count++; + /* no merging, send directed retransmit */ + RSTTRACE (" RX%"PRId64"", seqbase + i); + enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, prd, 0) >= 0); + if (enqueued) + { + max_seq_in_reply = seqbase + i; + msgs_sent++; + sample.rexmit_count++; + } } } - whc_return_sample(wr->whc, &sample, true); } - else if (gapstart == -1) + else { - RSTTRACE (" M%"PRId64, seqbase + i); - gapstart = seqbase + i; - gapend = gapstart + 1; - msgs_lost++; - } - else if (seqbase + i == gapend) - { - RSTTRACE (" M%"PRId64, seqbase + i); - gapend = seqbase + i + 1; - msgs_lost++; - } - else if (seqbase + i - gapend < 256) - { - uint32_t idx = (uint32_t) (seqbase + i - gapend); - RSTTRACE (" M%"PRId64, seqbase + i); - gapnumbits = idx + 1; - nn_bitset_set (gapnumbits, gapbits, idx); + nn_gap_info_update (rst->gv, &gi, seqbase + i); msgs_lost++; } } } + if (!enqueued) RSTTRACE (" rexmit-limit-hit"); /* Generate a Gap message if some of the sequence is missing */ - if (gapstart > 0) + if (gi.gapstart > 0) { - struct nn_xmsg *m; - if (gapend == seqbase + msg->readerSNState.numbits) + struct nn_xmsg *gap; + + if (gi.gapend == seqbase + msg->readerSNState.numbits) + gi.gapend = grow_gap_to_next_seq (wr, gi.gapend); + + if (gi.gapend-1 + gi.gapnumbits > max_seq_in_reply) + max_seq_in_reply = gi.gapend-1 + gi.gapnumbits; + + gap = nn_gap_info_create_gap (wr, prd, &gi); + if (gap) { - /* We automatically grow a gap as far as we can -- can't - retransmit those messages anyway, so no need for round-trip - to the remote reader. */ - gapend = grow_gap_to_next_seq (wr, gapend); + qxev_msg (wr->evq, gap); + msgs_sent++; } - /* The non-bitmap part of a gap message says everything <= - gapend-1 is no more (so the maximum sequence number it informs - the peer of is gapend-1); each bit adds one sequence number to - that. */ - if (gapend-1 + gapnumbits > max_seq_in_reply) - max_seq_in_reply = gapend-1 + gapnumbits; - RSTTRACE (" XGAP%"PRId64"..%"PRId64"/%u:", gapstart, gapend, gapnumbits); - for (uint32_t i = 0; i < gapnumbits; i++) - RSTTRACE ("%c", nn_bitset_isset (gapnumbits, gapbits, i) ? '1' : '0'); - m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL); -#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS - nn_xmsg_setencoderid (m, wr->partition_id); -#endif - if (nn_xmsg_setdstPRD (m, prd) < 0) - nn_xmsg_free (m); - else - { - add_Gap (m, wr, prd, gapstart, gapend, gapnumbits, gapbits); - qxev_msg (wr->evq, m); - } - msgs_sent++; } + wr->rexmit_count += msgs_sent; wr->rexmit_lost_count += msgs_lost; /* If rexmits and/or a gap message were sent, and if the last @@ -953,7 +1010,8 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac less than the last sequence number transmitted by the writer, tell the peer to acknowledge quickly. Not sure if that helps, but it might ... [NB writer->seq is the last msg sent so far] */ - if (msgs_sent && max_seq_in_reply < seq_xmit) + max_seq_available = (prd->filter ? rn->last_seq : seq_xmit); + if (msgs_sent && max_seq_in_reply < max_seq_available) { RSTTRACE (" rexmit#%"PRIu32" maxseq:%"PRId64"<%"PRId64"<=%"PRId64"", msgs_sent, max_seq_in_reply, seq_xmit, wr->seq); force_heartbeat_to_peer (wr, &whcst, prd, 1); @@ -1039,7 +1097,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand struct receiver_state * const rst = arg->rst; Heartbeat_t const * const msg = arg->msg; struct proxy_writer * const pwr = arg->pwr; - seqno_t refseq; + seqno_t refseq, last_seq; ASSERT_MUTEX_HELD (&pwr->e.lock); @@ -1053,7 +1111,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand /* Reference sequence number for determining whether or not to Ack/Nack unfortunately depends on whether the reader is in sync. */ - if (wn->in_sync != PRMSS_OUT_OF_SYNC) + if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered) refseq = nn_reorder_next_seq (pwr->reorder) - 1; else refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; @@ -1070,7 +1128,12 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand { nn_mtime_t tsched; tsched.v = T_NEVER; - if (pwr->last_seq > refseq) + + if (wn->filtered) + last_seq = wn->last_seq; + else + last_seq = pwr->last_seq; + if (last_seq > refseq) { RSTTRACE ("/NAK"); if (arg->tnow_mt.v >= wn->t_last_nack.v + rst->gv->config.nack_delay || refseq >= wn->seq_last_nack) @@ -1195,44 +1258,73 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct int refc_adjust = 0; nn_reorder_result_t res; gap = nn_rdata_newgap (rmsg); - if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, firstseq, &refc_adjust)) > 0) + int filtered = 0; + + if (pwr->filtered && !is_null_guid(&dst)) { - if (pwr->deliver_synchronously) - deliver_user_data_synchronously (&sc, NULL); - else - nn_dqueue_enqueue (pwr->dqueue, &sc, res); - } - for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn)) - if (wn->in_sync != PRMSS_SYNC) + for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn)) { - seqno_t last_deliv_seq = 0; - switch (wn->in_sync) + if (guid_eq(&wn->rd_guid, &dst)) { - case PRMSS_SYNC: - assert(0); - break; - case PRMSS_TLCATCHUP: - last_deliv_seq = nn_reorder_next_seq (pwr->reorder) - 1; - break; - case PRMSS_OUT_OF_SYNC: { + if (wn->filtered) + { struct nn_reorder *ro = wn->u.not_in_sync.reorder; if ((res = nn_reorder_gap (&sc, ro, gap, 1, firstseq, &refc_adjust)) > 0) + nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res); + if (fromSN (msg->lastSN) > wn->last_seq) { - if (pwr->deliver_synchronously) - deliver_user_data_synchronously (&sc, &wn->rd_guid); - else - nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res); + wn->last_seq = fromSN (msg->lastSN); } - last_deliv_seq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; + filtered = 1; } + break; } - if (wn->u.not_in_sync.end_of_tl_seq == MAX_SEQ_NUMBER) - { - wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN); - RSTTRACE (" end-of-tl-seq(rd "PGUIDFMT" #%"PRId64")", PGUID(wn->rd_guid), wn->u.not_in_sync.end_of_tl_seq); - } - maybe_set_reader_in_sync (pwr, wn, last_deliv_seq); } + } + + if (!filtered) + { + if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, firstseq, &refc_adjust)) > 0) + { + if (pwr->deliver_synchronously) + deliver_user_data_synchronously (&sc, NULL); + else + nn_dqueue_enqueue (pwr->dqueue, &sc, res); + } + for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn)) + { + if (wn->in_sync != PRMSS_SYNC) + { + seqno_t last_deliv_seq = 0; + switch (wn->in_sync) + { + case PRMSS_SYNC: + assert(0); + break; + case PRMSS_TLCATCHUP: + last_deliv_seq = nn_reorder_next_seq (pwr->reorder) - 1; + break; + case PRMSS_OUT_OF_SYNC: { + struct nn_reorder *ro = wn->u.not_in_sync.reorder; + if ((res = nn_reorder_gap (&sc, ro, gap, 1, firstseq, &refc_adjust)) > 0) + { + if (pwr->deliver_synchronously) + deliver_user_data_synchronously (&sc, &wn->rd_guid); + else + nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res); + } + last_deliv_seq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; + } + } + if (wn->u.not_in_sync.end_of_tl_seq == MAX_SEQ_NUMBER) + { + wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN); + RSTTRACE (" end-of-tl-seq(rd "PGUIDFMT" #%"PRId64")", PGUID(wn->rd_guid), wn->u.not_in_sync.end_of_tl_seq); + } + maybe_set_reader_in_sync (pwr, wn, last_deliv_seq); + } + } + } nn_fragchain_adjust_refcount (gap, refc_adjust); } @@ -1552,7 +1644,8 @@ static int handle_one_gap (struct proxy_writer *pwr, struct pwr_rd_match *wn, se /* Clean up the defrag admin: no fragments of a missing sample will be arriving in the future */ - nn_defrag_notegap (pwr->defrag, a, b); + if (!(wn && wn->filtered)) + nn_defrag_notegap (pwr->defrag, a, b); /* Primary reorder: the gap message may cause some samples to become deliverable. */ @@ -1733,6 +1826,14 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm pwr->last_fragnum = ~0u; pwr->last_fragnum_reset = 0; } + + if (wn && wn->filtered) + { + if (listbase + last_included_rel > wn->last_seq) + { + wn->last_seq = listbase + last_included_rel; + } + } RSTTRACE (")"); ddsrt_mutex_unlock (&pwr->e.lock); return 1; @@ -2193,105 +2294,135 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct int refc_adjust = 0; struct nn_rsample_chain sc; struct nn_rdata *fragchain = nn_rsample_fragchain (rsample); - nn_reorder_result_t rres; + nn_reorder_result_t rres, rres2; + struct pwr_rd_match *wn; + int filtered = 0; - rres = nn_reorder_rsample (&sc, pwr->reorder, rsample, &refc_adjust, 0); // nn_dqueue_is_full (pwr->dqueue)); - - if (rres == NN_REORDER_ACCEPT && pwr->n_reliable_readers == 0) + if (pwr->filtered && !is_null_guid(&dst)) { - /* If no reliable readers but the reorder buffer accepted the - sample, it must be a reliable proxy writer with only - unreliable readers. "Inserting" a Gap [1, sampleinfo->seq) - will force delivery of this sample, and not cause the gap to - be added to the reorder admin. */ - int gap_refc_adjust = 0; - rres = nn_reorder_gap (&sc, pwr->reorder, rdata, 1, sampleinfo->seq, &gap_refc_adjust); - assert (rres > 0); - assert (gap_refc_adjust == 0); - } - - if (rres > 0) - { - /* Enqueue or deliver with pwr->e.lock held: to ensure no other - receive thread's data gets interleaved -- arguably delivery - needn't be exactly in-order, which would allow us to do this - without pwr->e.lock held. */ - if (pwr->deliver_synchronously) + for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn != NULL; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn)) { - /* FIXME: just in case the synchronous delivery runs into a delay caused - by the current mishandling of resource limits */ - if (*deferred_wakeup) - dd_dqueue_enqueue_trigger (*deferred_wakeup); - deliver_user_data_synchronously (&sc, NULL); - } - else - { - if (nn_dqueue_enqueue_deferred_wakeup (pwr->dqueue, &sc, rres)) + if (guid_eq(&wn->rd_guid, &dst)) { - if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) - dd_dqueue_enqueue_trigger (*deferred_wakeup); - *deferred_wakeup = pwr->dqueue; + if (wn->filtered) + { + rres2 = nn_reorder_rsample (&sc, wn->u.not_in_sync.reorder, rsample, &refc_adjust, nn_dqueue_is_full (pwr->dqueue)); + if (sampleinfo->seq > wn->last_seq) + { + wn->last_seq = sampleinfo->seq; + } + if (rres2 > 0) + { + if (!pwr->deliver_synchronously) + nn_dqueue_enqueue (pwr->dqueue, &sc, rres2); + else + deliver_user_data_synchronously (&sc, &wn->rd_guid); + } + filtered = 1; + } + break; } } } - if (pwr->n_readers_out_of_sync > 0) + if (!filtered) { - /* Those readers catching up with TL but in sync with the proxy - writer may have become in sync with the proxy writer and the - writer; those catching up with TL all by themselves go through - the "TOO_OLD" path below. */ - ddsrt_avl_iter_t it; - struct pwr_rd_match *wn; - struct nn_rsample *rsample_dup = NULL; - int reuse_rsample_dup = 0; - for (wn = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); wn != NULL; wn = ddsrt_avl_iter_next (&it)) + rres = nn_reorder_rsample (&sc, pwr->reorder, rsample, &refc_adjust, 0); // nn_dqueue_is_full (pwr->dqueue)); + + if (rres == NN_REORDER_ACCEPT && pwr->n_reliable_readers == 0) { - nn_reorder_result_t rres2; - if (wn->in_sync == PRMSS_SYNC) - continue; - /* only need to get a copy of the first sample, because that's the one - that triggered delivery */ - if (!reuse_rsample_dup) - rsample_dup = nn_reorder_rsample_dup_first (rmsg, rsample); - rres2 = nn_reorder_rsample (&sc, wn->u.not_in_sync.reorder, rsample_dup, &refc_adjust, nn_dqueue_is_full (pwr->dqueue)); - switch (rres2) + /* If no reliable readers but the reorder buffer accepted the + sample, it must be a reliable proxy writer with only + unreliable readers. "Inserting" a Gap [1, sampleinfo->seq) + will force delivery of this sample, and not cause the gap to + be added to the reorder admin. */ + int gap_refc_adjust = 0; + rres = nn_reorder_gap (&sc, pwr->reorder, rdata, 1, sampleinfo->seq, &gap_refc_adjust); + assert (rres > 0); + assert (gap_refc_adjust == 0); + } + + if (rres > 0) + { + /* Enqueue or deliver with pwr->e.lock held: to ensure no other + receive thread's data gets interleaved -- arguably delivery + needn't be exactly in-order, which would allow us to do this + without pwr->e.lock held. */ + if (pwr->deliver_synchronously) { - case NN_REORDER_TOO_OLD: - case NN_REORDER_REJECT: - reuse_rsample_dup = 1; - break; - case NN_REORDER_ACCEPT: - reuse_rsample_dup = 0; - break; - default: - assert (rres2 > 0); - /* note: can't deliver to a reader, only to a group */ - maybe_set_reader_in_sync (pwr, wn, sampleinfo->seq); - reuse_rsample_dup = 0; - /* No need to deliver old data to out-of-sync readers - synchronously -- ordering guarantees don't change - as fresh data will be delivered anyway and hence - the old data will never be guaranteed to arrive - in-order, and those few microseconds can't hurt in - catching up on transient-local data. See also - NN_REORDER_DELIVER case in outer switch. */ - if (pwr->deliver_synchronously) - { - /* FIXME: just in case the synchronous delivery runs into a delay caused - by the current mishandling of resource limits */ - deliver_user_data_synchronously (&sc, &wn->rd_guid); - } - else - { - if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) + /* FIXME: just in case the synchronous delivery runs into a delay caused + by the current mishandling of resource limits */ + if (*deferred_wakeup) + dd_dqueue_enqueue_trigger (*deferred_wakeup); + deliver_user_data_synchronously (&sc, NULL); + } + else + { + if (nn_dqueue_enqueue_deferred_wakeup (pwr->dqueue, &sc, rres)) + { + if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) + dd_dqueue_enqueue_trigger (*deferred_wakeup); + *deferred_wakeup = pwr->dqueue; + } + } + } + + if (pwr->n_readers_out_of_sync > 0) + { + /* Those readers catching up with TL but in sync with the proxy + writer may have become in sync with the proxy writer and the + writer; those catching up with TL all by themselves go through + the "TOO_OLD" path below. */ + ddsrt_avl_iter_t it; + struct nn_rsample *rsample_dup = NULL; + int reuse_rsample_dup = 0; + for (wn = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); wn != NULL; wn = ddsrt_avl_iter_next (&it)) + { + if (wn->in_sync == PRMSS_SYNC) + continue; + /* only need to get a copy of the first sample, because that's the one + that triggered delivery */ + if (!reuse_rsample_dup) + rsample_dup = nn_reorder_rsample_dup_first (rmsg, rsample); + rres2 = nn_reorder_rsample (&sc, wn->u.not_in_sync.reorder, rsample_dup, &refc_adjust, nn_dqueue_is_full (pwr->dqueue)); + switch (rres2) + { + case NN_REORDER_TOO_OLD: + case NN_REORDER_REJECT: + reuse_rsample_dup = 1; + break; + case NN_REORDER_ACCEPT: + reuse_rsample_dup = 0; + break; + default: + assert (rres2 > 0); + /* note: can't deliver to a reader, only to a group */ + maybe_set_reader_in_sync (pwr, wn, sampleinfo->seq); + reuse_rsample_dup = 0; + /* No need to deliver old data to out-of-sync readers + synchronously -- ordering guarantees don't change + as fresh data will be delivered anyway and hence + the old data will never be guaranteed to arrive + in-order, and those few microseconds can't hurt in + catching up on transient-local data. See also + NN_REORDER_DELIVER case in outer switch. */ + if (pwr->deliver_synchronously) { - dd_dqueue_enqueue_trigger (*deferred_wakeup); - *deferred_wakeup = NULL; + /* FIXME: just in case the synchronous delivery runs into a delay caused + by the current mishandling of resource limits */ + deliver_user_data_synchronously (&sc, &wn->rd_guid); } - nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, rres2); - } - break; + else + { + if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) + { + dd_dqueue_enqueue_trigger (*deferred_wakeup); + *deferred_wakeup = NULL; + } + nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, rres2); + } + break; + } } } } diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index cec9693..6b7a891 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -10,6 +10,7 @@ * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause */ #include +#include #include #include "dds/ddsrt/heap.h" @@ -31,6 +32,7 @@ #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_unused.h" #include "dds/ddsi/q_hbcontrol.h" +#include "dds/ddsi/q_receive.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_sertopic.h" @@ -104,8 +106,8 @@ int64_t writer_hbcontrol_intv (const struct writer *wr, const struct whc_state * void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow) { - struct q_globals const * const gv = wr->e.gv; struct hbcontrol * const hbc = &wr->hbcontrol; + struct q_globals const * const gv = wr->e.gv; nn_mtime_t tnext; /* Reset number of heartbeats since last write: that means the @@ -315,12 +317,53 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_ (hbc->tsched.v == T_NEVER) ? INFINITY : (double) (hbc->tsched.v - tnow.v) / 1e9, ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->min_seq, ddsrt_avl_is_empty (&wr->readers) || root_rdmatch (wr)->all_have_replied_to_hb ? "" : "!", - whcst->max_seq, writer_read_seq_xmit (wr)); + whcst->max_seq, writer_read_seq_xmit(wr)); } return msg; } +#ifdef DDSI_INCLUDE_SECURITY +struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state *whcst, int hbansreq, struct proxy_reader *prd) +{ + struct q_globals const * const gv = wr->e.gv; + struct nn_xmsg *msg; + + ASSERT_MUTEX_HELD (&wr->e.lock); + assert (wr->reliable); + + if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL) + return NULL; + + ETRACE (wr, "writer_hbcontrol_p2p: wr "PGUIDFMT" unicasting to prd "PGUIDFMT" ", PGUID (wr->e.guid), PGUID (prd->e.guid)); + ETRACE (wr, "(rel-prd %d seq-eq-max %d seq %"PRId64" maxseq %"PRId64")\n", + wr->num_reliable_readers, + ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max, + wr->seq, + ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : root_rdmatch (wr)->max_seq); + + /* set the destination explicitly to the unicast destination and the fourth + param of add_Heartbeat needs to be the guid of the reader */ + if (nn_xmsg_setdstPRD (msg, prd) < 0) + { + nn_xmsg_free (msg); + return NULL; + } +#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS + nn_xmsg_setencoderid (msg, wr->partition_id); +#endif + add_Heartbeat (msg, wr, whcst, hbansreq, prd->e.guid.entityid, 1); + + if (nn_xmsg_size(msg) == 0) + { + nn_xmsg_free (msg); + msg = NULL; + } + + return msg; +} +#endif + void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync) { struct q_globals const * const gv = wr->e.gv; @@ -1051,6 +1094,65 @@ static int maybe_grow_whc (struct writer *wr) return 0; } +int write_sample_p2p_wrlock_held(struct writer *wr, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, struct proxy_reader *prd) +{ + struct q_globals * const gv = wr->e.gv; + int r; + nn_mtime_t tnow; + int rexmit = 1; + struct wr_prd_match *wprd = NULL; + seqno_t gseq; + struct nn_xmsg *gap = NULL; + + tnow = now_mt (); + serdata->twrite = tnow; + serdata->timestamp = now(); + + if (prd->filter) + { + if ((wprd = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, &prd->e.guid)) != NULL) + { + rexmit = prd->filter(wr, prd, serdata); + /* determine if gap has to added */ + if (rexmit) + { + struct nn_gap_info gi; + + GVLOG (DDS_LC_DISCOVERY, "send filtered "PGUIDFMT" last_seq=%"PRIu64" seq=%"PRIu64"\n", PGUID (wr->e.guid), wprd->seq, seq); + + nn_gap_info_init(&gi); + for (gseq = wprd->seq + 1; gseq < seq; gseq++) + { + struct whc_borrowed_sample sample; + if (whc_borrow_sample (wr->whc, seq, &sample)) + { + if (prd->filter(wr, prd, sample.serdata) == 0) + { + nn_gap_info_update(wr->e.gv, &gi, gseq); + } + whc_return_sample (wr->whc, &sample, false); + } + } + gap = nn_gap_info_create_gap(wr, prd, &gi); + } + wprd->last_seq = seq; + } + } + + if ((r = insert_sample_in_whc (wr, seq, plist, serdata, tk)) >= 0) + { + enqueue_sample_wrlock_held (wr, seq, plist, serdata, prd, 1); + + if (gap) + qxev_msg (wr->evq, gap); + + if (wr->heartbeat_xevent) + writer_hbcontrol_note_asyncwrite(wr, tnow); + } + + return r; +} + static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, int end_of_txn, int gc_allowed) { struct q_globals const * const gv = wr->e.gv; diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 548508b..77a580f 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -579,6 +579,89 @@ static void handle_xevk_entityid (struct nn_xpack *xp, struct xevent_nt *ev) nn_xpack_addmsg (xp, ev->u.entityid.msg, 0); } +#ifdef DDSI_INCLUDE_SECURITY +static void send_heartbeat_to_all_readers(struct nn_xpack *xp, struct xevent *ev, struct writer *wr, nn_mtime_t tnow) +{ + struct whc_state whcst; + nn_mtime_t t_next; + int hbansreq = 0; + unsigned count = 0; + + ddsrt_mutex_lock (&wr->e.lock); + + whc_get_state(wr->whc, &whcst); + + if (!writer_must_have_hb_scheduled (wr, &whcst)) + { + hbansreq = 1; /* just for trace */ + t_next.v = T_NEVER; + } + else if (!writer_hbcontrol_must_send (wr, &whcst, tnow)) + { + hbansreq = 1; /* just for trace */ + t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow); + } + else + { + struct wr_prd_match *m; + struct ddsi_guid last_guid = { .prefix = {.u = {0,0,0}}, .entityid = {0} }; + + hbansreq = writer_hbcontrol_ack_required (wr, &whcst, tnow); + t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow); + + while ((m = ddsrt_avl_lookup_succ (&wr_readers_treedef, &wr->readers, &last_guid)) != NULL) + { + last_guid = m->prd_guid; + if (m->seq < m->last_seq) + { + struct proxy_reader *prd; + + prd = ephash_lookup_proxy_reader_guid(wr->e.gv->guid_hash, &m->prd_guid); + if (prd) + { + ETRACE (wr, " heartbeat(wr "PGUIDFMT" rd "PGUIDFMT" %s) send, resched in %g s (min-ack %"PRId64", avail-seq %"PRId64")\n", + PGUID (wr->e.guid), + PGUID (m->prd_guid), + hbansreq ? "" : " final", + (double)(t_next.v - tnow.v) / 1e9, + m->seq, + m->last_seq); + + struct nn_xmsg *msg = writer_hbcontrol_p2p(wr, &whcst, hbansreq, prd); + if (msg != NULL) + { + ddsrt_mutex_unlock (&wr->e.lock); + nn_xpack_addmsg (xp, msg, 0); + ddsrt_mutex_lock (&wr->e.lock); + } + count++; + } + } + + } + } + + resched_xevent_if_earlier (ev, t_next); + wr->hbcontrol.tsched = t_next; + + if (count == 0) + { + (void)resched_xevent_if_earlier (ev, t_next); + ETRACE (wr, "heartbeat(wr "PGUIDFMT") suppressed, resched in %g s (min-ack %"PRId64"%s, avail-seq %"PRId64", xmit %"PRId64")\n", + PGUID (wr->e.guid), + (t_next.v == T_NEVER) ? INFINITY : (double)(t_next.v - tnow.v) / 1e9, + ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : ((struct wr_prd_match *) ddsrt_avl_root (&wr_readers_treedef, &wr->readers))->min_seq, + ddsrt_avl_is_empty (&wr->readers) || ((struct wr_prd_match *) ddsrt_avl_root (&wr_readers_treedef, &wr->readers))->all_have_replied_to_hb ? "" : "!", + whcst.max_seq, + writer_read_seq_xmit(wr)); + } + + ddsrt_mutex_unlock (&wr->e.lock); +} + + +#endif + static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow /* monotonic */) { struct q_globals const * const gv = ev->evq->gv; @@ -594,6 +677,14 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt return; } +#ifdef DDSI_INCLUDE_SECURITY + if (wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER) + { + send_heartbeat_to_all_readers(xp, ev, wr, tnow); + return; + } +#endif + ddsrt_mutex_lock (&wr->e.lock); assert (wr->reliable); whc_get_state(wr->whc, &whcst); @@ -701,7 +792,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p AckNack_t *an; struct nn_xmsg_marker sm_marker; uint32_t i, numbits; - seqno_t base; + seqno_t base, last_seq; DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); struct { @@ -716,7 +807,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p /* if in sync, look at proxy writer status, else look at proxy-writer--reader match status */ - if (rwn->in_sync != PRMSS_OUT_OF_SYNC) + if (rwn->in_sync != PRMSS_OUT_OF_SYNC && !rwn->filtered) { reorder = pwr->reorder; if (!pwr->e.gv->config.late_ack_mode) @@ -734,6 +825,11 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p bitmap_base = nn_reorder_next_seq (reorder); } + if (rwn->filtered) + last_seq = rwn->last_seq; + else + last_seq = pwr->last_seq; + an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX); nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK); an->readerId = nn_hton_entityid (rwn->rd_guid.entityid); @@ -741,7 +837,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p /* Make bitmap; note that we've made sure to have room for the maximum bitmap size. */ - numbits = nn_reorder_nackmap (reorder, bitmap_base, pwr->last_seq, &an->readerSNState, an->bits, max_numbits, notail); + numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &an->readerSNState, an->bits, max_numbits, notail); base = fromSN (an->readerSNState.bitmap_base); /* Scan through bitmap, cutting it off at the first missing sample @@ -754,7 +850,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p nackfrag_seq = base + i; if (!nn_bitset_isset (numbits, an->bits, i)) continue; - if (nackfrag_seq == pwr->last_seq) + if (nackfrag_seq == last_seq) fragnum = pwr->last_fragnum; else fragnum = UINT32_MAX; @@ -778,7 +874,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p *nack_seq = (numbits > 0) ? base + numbits : 0; if (!pwr->have_seen_heartbeat) { /* We must have seen a heartbeat for us to consider setting FINAL */ - } else if (*nack_seq && base + numbits <= pwr->last_seq) { + } else if (*nack_seq && base + numbits <= last_seq) { /* If it's a NACK and it doesn't cover samples all the way up to the highest known sequence number, there's some reason to expect we may to do another round. For which we need a Heartbeat. diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index 4c73e77..07ca8d0 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -1546,11 +1546,6 @@ static int nn_xpack_mayaddmsg (const struct nn_xpack *xp, const struct nn_xmsg * return addressing_info_eq_onesidederr (xp, m); } -static int guid_prefix_eq (const ddsi_guid_prefix_t *a, const ddsi_guid_prefix_t *b) -{ - return a->u[0] == b->u[0] && a->u[1] == b->u[1] && a->u[2] == b->u[2]; -} - int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flags) { /* Returns > 0 if pack got sent out before adding m */ diff --git a/src/core/ddsi/tests/security_msg.c b/src/core/ddsi/tests/security_msg.c index ed389ab..87bd734 100644 --- a/src/core/ddsi/tests/security_msg.c +++ b/src/core/ddsi/tests/security_msg.c @@ -18,13 +18,13 @@ static nn_participant_generic_message_t test_msg_in = { - .message_identity = { {{.u={1,2,3}},{4}}, 5 }, - .related_message_identity = { {{.u={5,4,3}},{2}}, 1 }, - .destinaton_participant_guid = { {.u={2,3,4}},{5} }, - .destination_endpoint_guid = { {.u={3,4,5}},{6} }, - .source_endpoint_guid = { {.u={4,5,6}},{7} }, - .message_class_id = "testing message", - .message_data = { + .message_identity = { {{.u={1,2,3}},{4}}, 5 }, + .related_message_identity = { {{.u={5,4,3}},{2}}, 1 }, + .destination_participant_guid = { {.u={2,3,4}},{5} }, + .destination_endpoint_guid = { {.u={3,4,5}},{6} }, + .source_endpoint_guid = { {.u={4,5,6}},{7} }, + .message_class_id = "testing message", + .message_data = { .n = 4, .tags = (nn_dataholder_t[]) { { @@ -146,13 +146,13 @@ static nn_participant_generic_message_t test_msg_in = /* Same as test_msg_in, excluding the non-propagated properties. */ static nn_participant_generic_message_t test_msg_out = { - .message_identity = { {{.u={1,2,3}},{4}}, 5 }, - .related_message_identity = { {{.u={5,4,3}},{2}}, 1 }, - .destinaton_participant_guid = { {.u={2,3,4}},{5} }, - .destination_endpoint_guid = { {.u={3,4,5}},{6} }, - .source_endpoint_guid = { {.u={4,5,6}},{7} }, - .message_class_id = "testing message", - .message_data = { + .message_identity = { {{.u={1,2,3}},{4}}, 5 }, + .related_message_identity = { {{.u={5,4,3}},{2}}, 1 }, + .destination_participant_guid = { {.u={2,3,4}},{5} }, + .destination_endpoint_guid = { {.u={3,4,5}},{6} }, + .source_endpoint_guid = { {.u={4,5,6}},{7} }, + .message_class_id = "testing message", + .message_data = { .n = 4, .tags = (nn_dataholder_t[]) { { @@ -283,7 +283,7 @@ CU_Test (ddsi_security_msg, serializer) &msg_in, &test_msg_in.message_identity.source_guid, test_msg_in.message_identity.sequence_number, - &test_msg_in.destinaton_participant_guid, + &test_msg_in.destination_participant_guid, &test_msg_in.destination_endpoint_guid, &test_msg_in.source_endpoint_guid, test_msg_in.message_class_id,