Added volatile security endpoints. (#310)

* Added volatile security endpoints.

Signed-off-by: Marcel Jordense <marcel.jordense@adlinktech.com>

* Fix review issues with volatile security endpoints

Signed-off-by: Marcel Jordense <marcel.jordense@adlinktech.com>

* Update sending point-2-point heartbeats

Signed-off-by: Marcel Jordense <marcel.jordense@adlinktech.com>

* Fix infinite loop when sending p2p heartbeats

Signed-off-by: Marcel Jordense <marcel.jordense@adlinktech.com>
This commit is contained in:
MarcelJordense 2019-12-12 15:54:19 +01:00 committed by eboasson
parent 66c0d87886
commit f2f0205f25
23 changed files with 1002 additions and 218 deletions

View file

@ -0,0 +1,131 @@
# ParticipantVolatileMessageSecure Handling
## Short Introduction
It is expected to have some knowledge of DDSI builtin (security) endpoints.
```cpp
#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER 0xff0202c3
#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER 0xff0202c4
```
These builtin endpoints have caused about the biggest code change in ddsi, regarding security.
Chapters 7.4.4.3 and 7.4.4.4 in the DDS Security specification indicates the main issue why these builtin endpoints are different from all the others and somewhat more complex.
> 7.4.4.3 Contents of the ParticipantVolatileMessageSecure
> The ParticipantVolatileMessageSecure is intended as a holder of secure information that
> is sent point-to-point from a DomainParticipant to another.
>
> [...]
>
> 7.4.4.4 Destination of the ParticipantVolatileMessageSecure
>
> If the destination_participant_guid member is not set to GUID_UNKNOWN, the message written is
> intended only for the BuiltinParticipantVolatileMessageSecureReader belonging to the
> DomainParticipant with a matching Participant Key.
>
> This is equivalent to saying that the BuiltinParticipantVolatileMessageSecureReader has an implied
> content filter with the logical expression:
>
> “destination_participant_guid == GUID_UNKNOWN
> || destination_participant_guid==BuiltinParticipantVolatileMessageSecureReader.participant.guid”
>
> Implementations of the specification can use this content filter or some other mechanism as long as the
> resulting behavior is equivalent to having this filter.
>
> [...]
The "point-to-point" and "content filter" remarks makes everything more elaborate.
## Complexity
It would be nice to be able to use the ```dds_set_topic_filter()``` functionality for these endpoints. However, that only works on the reader history cache (rhc), which is only available for ddsc entities and not for ddsi builtin entities. And it's the builtin entities that are being used.
The ```dds_set_topic_filter()``` basically simulates that the sample was inserted into the rhc (but didn't insert it), which causes the rest of ddsi (regarding heartbeat, acknacks, gaps, etc) to work as normal while the sample just isn't provided to the reader.
Unfortunately, the builtin volatile endpoints can not use that same simple sequence (just handle the sample but ignore it right at the end). Problem is, the sample is encoded. It can only decode samples that are intended for that reader. This would mean that it is best for the reader to only receive 'owned' samples that it can actually decode.
This has all kinds of affects regarding the heartbeat, acknacks, gaps, etc. Basically, every writer/reader combination should have information regarding gaps and sequence numbers between them, while normally such information about proxies are combined.
## Implementation Overview
This only depicts an overview. Some details will have been omitted.
### Writing
The function ```write_crypto_exchange_message()``` takes care of generating the right sample information and pass it on to ```write_sample_p2p_wrlock_held()```.
A proxy reader can now have a filter callback function (```proxy_reader::filter```). This indicates (on the writer side) if a sample will be accepted by the actual reader or not. This could be made more generic for proper 'writer side' content filter implementation. However, now it'll only be used by ParticipantVolatileMessageSecure and the filter is hardcoded to ```volatile_secure_data_filter()```.
So, if ```write_sample_p2p_wrlock_held()``` is called with a proxy reader with a filter, it will get 'send/acked sequences' information between the writer and proxy reader. This is used to determine if gap information has to be send alongside the sample.
Then, ```write_sample_p2p_wrlock_held()``` will enqueue the sample.
Just before the submessage is added to the rtps message and send, it is encoded (todo).
### Reading
First things first, the submessage is decoded when the rtps message is received (todo).
It is received on a builtin reader, so the builtin queue is used and ```builtins_dqueue_handler()``` is called. That will forward the sample to the token exchange functionality, ignoring every sample that isn't related to the related participant (todo).
### Gaps on reader side
The reader remembers the last_seq it knows from every connected proxy writer (```pwr_rd_match::last_seq```).
This is updated when handling heartbeats, gaps and regular messages and used to check if there are gaps.
Normally, the ```last_seq``` of a specific writer is used here. But when the reader knows that the writer uses a 'writer side content filter' (```proxy_writer::uses_filter```), it'll use the the ```last_seq``` that is related to the actual reader/writer match.
It is used to generate the AckNack (which contains gap information) response to the writer.
### Gaps on writer side
The writer remembers which sample sequence it send the last to a specific reader through ```wr_prd_match::lst_seq```.
This is used to determine if a reader has received all relevant samples (through handling of acknack).
It is also used to determine the gap information that is added to samples to a specific reader when necessary.
### Heartbeats
A writer is triggered to send heartbeats once in a while. Normally, that is broadcasted. But, for the volatile secure writer, it has to be send to each reader specifically. The heartbeat submessage that is send to each reader individually is encoded with a reader specific key. This key is generated from the shared secret which was determined during the authentication phase.
When a writer should send heartbeats, ```handle_xevk_heartbeat()``` is called. For the volatile secure writer, the control is immediately submitted to ```send_heartbeat_to_all_readers()```. This will add heartbeat submessages to an rtps message for every reader it deems necessary.
### Reorder
Normally received samples are placed in the reorder administration of the proxy_writer. However in this case the writer applies a content filter which is specific for each destinated reader. In that case the common reorder administration in the proxy_writer can not be used and the reader specific reorder administration must be used to handle the gap's which will be reader specific.
</br>
</br>
</br>
=================================================</br>
Notes</br>
=================================================</br>
### Trying to put the security participant volatile endpoint implementation into context.
The following elements are added to the data structures:
* struct wr_prd_match::lst_seq : Highest seq send to this reader used when filter is applied
* struct pwr_rd_match::last_seq : Reader specific last received sequence number from the writer.
* struct proxy_writer::uses_filter : Indicates that a content-filter is active
* struct proxy_reader::filter : The filter to apply for this specific reader
Functions added:
* writer_hbcontrol_p2p : This function creates a heartbeat destined for a specific reader. The volatile secure writer will use an submessage encoding which uses a distinct key for each reader. Therefor a reader specific heartbeat is needed.
* nn_defrag_prune : When a volatile secure reader is deleted then the defragmentation administration could still contain messages destined for this reader. This function removes these messages from the defragmentation administration.
* volatile_secure_data_filter : The filter applied to the secure volatile messages which filters on the destination participant guid.
* write_sample_p2p_wrlock_held : This function writes a message to a particular reader.
The use of the content-filter for the volatile secure writer implies that for each destination reader which message from the writer history cache is valid and had to be sent.
For messages that do not match this filter a GAP message should be sent to the reader. Each time a message is sent to a specific reader a possible gap message is added.
For the volatile secure writer the sequence number of the last message send to a particular reader is maintained in ```wr_prd_match::lst_seq'''. It is used to determine if a
HEARTBEAT has to send to this particular reader or that the reader has acknowledged all messages. At the reader side the sequence number of the last received message is
maintained in ```pwr_rd_match::last_seq'''. It is used to determine the contents of the ACKNACK message as response to a received HEARTBEAT.
When an ACKNACK (handle_AckNack) is received it is determined which samples should be resent related to the applied filter and for which sequence numbers a GAP message should be sent.

View file

@ -21,6 +21,11 @@
extern "C" { extern "C" {
#endif #endif
struct participant;
struct writer;
struct proxy_reader;
struct ddsi_serdata;
#define GMCLASSID_SECURITY_AUTH_REQUEST "dds.sec.auth_request" #define GMCLASSID_SECURITY_AUTH_REQUEST "dds.sec.auth_request"
#define GMCLASSID_SECURITY_AUTH_HANDSHAKE "dds.sec.auth" #define GMCLASSID_SECURITY_AUTH_HANDSHAKE "dds.sec.auth"
@ -32,7 +37,7 @@ typedef struct nn_message_identity {
typedef struct nn_participant_generic_message { typedef struct nn_participant_generic_message {
nn_message_identity_t message_identity; nn_message_identity_t message_identity;
nn_message_identity_t related_message_identity; nn_message_identity_t related_message_identity;
ddsi_guid_t destinaton_participant_guid; ddsi_guid_t destination_participant_guid;
ddsi_guid_t destination_endpoint_guid; ddsi_guid_t destination_endpoint_guid;
ddsi_guid_t source_endpoint_guid; ddsi_guid_t source_endpoint_guid;
const char *message_class_id; const char *message_class_id;
@ -84,6 +89,21 @@ nn_participant_generic_message_serialize(
DDS_EXPORT extern const enum pserop pserop_participant_generic_message[]; DDS_EXPORT extern const enum pserop pserop_participant_generic_message[];
DDS_EXPORT int
write_crypto_exchange_message(
const struct participant *pp,
const ddsi_guid_t *dst_pguid,
const ddsi_guid_t *src_eguid,
const ddsi_guid_t *dst_eguid,
const char *classid,
const nn_dataholderseq_t *tokens);
DDS_EXPORT int
volatile_secure_data_filter(
struct writer *wr,
struct proxy_reader *prd,
struct ddsi_serdata *serdata);
#if defined (__cplusplus) #if defined (__cplusplus)
} }
#endif #endif

View file

@ -23,6 +23,7 @@
#include "dds/ddsi/q_hbcontrol.h" #include "dds/ddsi/q_hbcontrol.h"
#include "dds/ddsi/q_feature_check.h" #include "dds/ddsi/q_feature_check.h"
#include "dds/ddsi/q_inverse_uint32_set.h" #include "dds/ddsi/q_inverse_uint32_set.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#include "dds/ddsi/ddsi_tran.h" #include "dds/ddsi/ddsi_tran.h"
@ -92,6 +93,7 @@ struct wr_prd_match {
seqno_t min_seq; /* smallest ack'd seq nr in subtree */ seqno_t min_seq; /* smallest ack'd seq nr in subtree */
seqno_t max_seq; /* sort-of highest ack'd seq nr in subtree (see augment function) */ seqno_t max_seq; /* sort-of highest ack'd seq nr in subtree (see augment function) */
seqno_t seq; /* highest acknowledged seq nr */ seqno_t seq; /* highest acknowledged seq nr */
seqno_t last_seq; /* highest seq send to this reader used when filter is applied */
int32_t num_reliable_readers_where_seq_equals_max; int32_t num_reliable_readers_where_seq_equals_max;
ddsi_guid_t arbitrary_unacked_reader; ddsi_guid_t arbitrary_unacked_reader;
nn_count_t next_acknack; /* next acceptable acknack sequence number */ nn_count_t next_acknack; /* next acceptable acknack sequence number */
@ -119,8 +121,10 @@ struct pwr_rd_match {
nn_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ nn_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */
nn_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ nn_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */
seqno_t seq_last_nack; /* last seq for which we requested a retransmit */ seqno_t seq_last_nack; /* last seq for which we requested a retransmit */
seqno_t last_seq; /* last known sequence number from this writer */
struct xevent *acknack_xevent; /* entry in xevent queue for sending acknacks */ struct xevent *acknack_xevent; /* entry in xevent queue for sending acknacks */
enum pwr_rd_match_syncstate in_sync; /* whether in sync with the proxy writer */ enum pwr_rd_match_syncstate in_sync; /* whether in sync with the proxy writer */
unsigned filtered:1;
union { union {
struct { struct {
seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */ seqno_t end_of_tl_seq; /* when seq >= end_of_tl_seq, it's in sync, =0 when not tl */
@ -364,6 +368,7 @@ struct proxy_writer {
unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */ unsigned deliver_synchronously: 1; /* iff 1, delivery happens straight from receive thread for non-historical data; else through delivery queue "dqueue" */
unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */ unsigned have_seen_heartbeat: 1; /* iff 1, we have received at least on heartbeat from this proxy writer */
unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */ unsigned local_matching_inprogress: 1; /* iff 1, we are still busy matching local readers; this is so we don't deliver incoming data to some but not all readers initially */
unsigned filtered: 1; /* iff 1, builtin proxy writer uses content filter, which affects heartbeats and gaps. */
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */ unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */
#endif #endif
@ -376,6 +381,9 @@ struct proxy_writer {
void *ddsi2direct_cbarg; void *ddsi2direct_cbarg;
}; };
typedef int (*filter_fn_t)(struct writer *wr, struct proxy_reader *prd, struct ddsi_serdata *serdata);
struct proxy_reader { struct proxy_reader {
struct entity_common e; struct entity_common e;
struct proxy_endpoint_common c; struct proxy_endpoint_common c;
@ -385,6 +393,7 @@ struct proxy_reader {
unsigned favours_ssm: 1; /* iff 1, this proxy reader favours SSM when available */ unsigned favours_ssm: 1; /* iff 1, this proxy reader favours SSM when available */
#endif #endif
ddsrt_avl_tree_t writers; /* matching LOCAL writers */ ddsrt_avl_tree_t writers; /* matching LOCAL writers */
filter_fn_t filter;
}; };
extern const ddsrt_avl_treedef_t wr_readers_treedef; extern const ddsrt_avl_treedef_t wr_readers_treedef;

View file

@ -238,6 +238,8 @@ struct q_globals {
dds_qos_t builtin_endpoint_xqos_rd; dds_qos_t builtin_endpoint_xqos_rd;
dds_qos_t builtin_endpoint_xqos_wr; dds_qos_t builtin_endpoint_xqos_wr;
#ifdef DDSI_INCLUDE_SECURITY #ifdef DDSI_INCLUDE_SECURITY
dds_qos_t builtin_volatile_xqos_rd;
dds_qos_t builtin_volatile_xqos_wr;
dds_qos_t builtin_stateless_xqos_rd; dds_qos_t builtin_stateless_xqos_rd;
dds_qos_t builtin_stateless_xqos_wr; dds_qos_t builtin_stateless_xqos_wr;
#endif #endif

View file

@ -36,6 +36,11 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_
int writer_hbcontrol_must_send (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow); int writer_hbcontrol_must_send (const struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow);
struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, int hbansreq, int issync); struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const struct whc_state *whcst, nn_mtime_t tnow, int hbansreq, int issync);
#ifdef DDSI_INCLUDE_SECURITY
struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state *whcst, int hbansreq, struct proxy_reader *prd);
#endif
#if defined (__cplusplus) #if defined (__cplusplus)
} }
#endif #endif

View file

@ -35,6 +35,8 @@ unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr);
int WildcardOverlap(char * p1, char * p2); int WildcardOverlap(char * p1, char * p2);
#endif #endif
DDS_EXPORT int guid_prefix_eq (const ddsi_guid_prefix_t *a, const ddsi_guid_prefix_t *b);
DDS_EXPORT int guid_eq (const struct ddsi_guid *a, const struct ddsi_guid *b);
DDS_EXPORT int ddsi2_patmatch (const char *pat, const char *str); DDS_EXPORT int ddsi2_patmatch (const char *pat, const char *str);
uint32_t crc32_calc (const void *buf, size_t length); uint32_t crc32_calc (const void *buf, size_t length);

View file

@ -98,7 +98,8 @@ typedef struct {
#define NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DETECTOR (1u<<21) #define NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_DETECTOR (1u<<21)
#define NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER (1u<<22) #define NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER (1u<<22)
#define NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_DETECTOR (1u<<23) #define NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_DETECTOR (1u<<23)
/* TODO: ENDPOINT_PARTICIPANT_VOLATILE */ #define NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_ANNOUNCER (1u<<24)
#define NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_DETECTOR (1u<<25)
#define NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER (1u << 26) #define NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_ANNOUNCER (1u << 26)
#define NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR (1u << 27) #define NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR (1u << 27)

View file

@ -219,6 +219,7 @@ void nn_defrag_free (struct nn_defrag *defrag);
struct nn_rsample *nn_defrag_rsample (struct nn_defrag *defrag, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo); struct nn_rsample *nn_defrag_rsample (struct nn_defrag *defrag, struct nn_rdata *rdata, const struct nn_rsample_info *sampleinfo);
void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1); void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1);
int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz); int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnum, struct nn_fragment_number_set_header *map, uint32_t *mapbits, uint32_t maxsz);
void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min);
struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_reorder_mode mode, uint32_t max_samples, bool late_ack_mode); struct nn_reorder *nn_reorder_new (const struct ddsrt_log_cfg *logcfg, enum nn_reorder_mode mode, uint32_t max_samples, bool late_ack_mode);
void nn_reorder_free (struct nn_reorder *r); void nn_reorder_free (struct nn_reorder *r);
@ -229,6 +230,7 @@ nn_reorder_result_t nn_reorder_gap (struct nn_rsample_chain *sc, struct nn_reord
int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq); int nn_reorder_wantsample (struct nn_reorder *reorder, seqno_t seq);
unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail); unsigned nn_reorder_nackmap (struct nn_reorder *reorder, seqno_t base, seqno_t maxseq, struct nn_sequence_number_set_header *map, uint32_t *mapbits, uint32_t maxsz, int notail);
seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder); seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder);
void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq);
struct nn_dqueue *nn_dqueue_new (const char *name, const struct q_globals *gv, uint32_t max_samples, nn_dqueue_handler_t handler, void *arg); struct nn_dqueue *nn_dqueue_new (const char *name, const struct q_globals *gv, uint32_t max_samples, nn_dqueue_handler_t handler, void *arg);
void nn_dqueue_free (struct nn_dqueue *q); void nn_dqueue_free (struct nn_dqueue *q);

View file

@ -21,11 +21,25 @@ struct nn_rsample_info;
struct nn_rdata; struct nn_rdata;
struct ddsi_tran_listener; struct ddsi_tran_listener;
struct recv_thread_arg; struct recv_thread_arg;
struct writer;
struct proxy_reader;
struct nn_gap_info {
int64_t gapstart;
int64_t gapend;
unsigned gapnumbits;
unsigned gapbits[256 / 32];
};
void nn_gap_info_init(struct nn_gap_info *gi);
void nn_gap_info_update(struct q_globals *gv, struct nn_gap_info *gi, int64_t seqnr);
struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader *prd, struct nn_gap_info *gi);
void trigger_recv_threads (const struct q_globals *gv); void trigger_recv_threads (const struct q_globals *gv);
uint32_t recv_thread (void *vrecv_thread_arg); uint32_t recv_thread (void *vrecv_thread_arg);
uint32_t listen_thread (struct ddsi_tran_listener * listener); uint32_t listen_thread (struct ddsi_tran_listener * listener);
int user_dqueue_handler (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const ddsi_guid_t *rdguid, void *qarg); int user_dqueue_handler (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const ddsi_guid_t *rdguid, void *qarg);
int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, seqno_t start, seqno_t base, uint32_t numbits, const uint32_t *bits);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -53,7 +53,8 @@ typedef int64_t seqno_t;
#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER 0x201c4 #define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER 0x201c4
#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER 0xff0200c2 #define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER 0xff0200c2
#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER 0xff0200c7 #define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER 0xff0200c7
/* TODO: ENDPOINT_PARTICIPANT_VOLATILE */ #define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER 0xff0202c3
#define NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER 0xff0202c4
#define NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER 0xff0101c2 #define NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER 0xff0101c2
#define NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER 0xff0101c7 #define NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER 0xff0101c7

View file

@ -43,6 +43,7 @@ int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *x
dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew); dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew); int enqueue_sample_wrlock_held (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, struct proxy_reader *prd, int isnew);
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync); void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync);
int write_sample_p2p_wrlock_held(struct writer *wr, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, struct proxy_reader *prd);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -11,9 +11,15 @@
*/ */
#include <stddef.h> #include <stddef.h>
#include <string.h> #include <string.h>
#include "dds/ddsrt/md5.h"
#include "dds/ddsrt/heap.h" #include "dds/ddsrt/heap.h"
#include "dds/ddsrt/string.h" #include "dds/ddsrt/string.h"
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_transmit.h"
#include "dds/ddsi/q_misc.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_security_msg.h" #include "dds/ddsi/ddsi_security_msg.h"
#include "dds/ddsi/ddsi_plist_generic.h" #include "dds/ddsi/ddsi_plist_generic.h"
@ -22,7 +28,7 @@ const enum pserop pserop_participant_generic_message[] =
/* nn_participant_generic_message */ /* nn_participant_generic_message */
XG, Xl, /* nn_message_identity_t message_identity */ XG, Xl, /* nn_message_identity_t message_identity */
XG, Xl, /* nn_message_identity_t related_message_identity */ XG, Xl, /* nn_message_identity_t related_message_identity */
XG, /* ddsi_guid_t destinaton_participant_guid */ XG, /* ddsi_guid_t destination_participant_guid */
XG, /* ddsi_guid_t destination_endpoint_guid */ XG, /* ddsi_guid_t destination_endpoint_guid */
XG, /* ddsi_guid_t source_endpoint_guid */ XG, /* ddsi_guid_t source_endpoint_guid */
XS, /* char* message_class_id */ XS, /* char* message_class_id */
@ -106,7 +112,7 @@ nn_participant_generic_message_init(
} }
if (dstpguid) if (dstpguid)
msg->destinaton_participant_guid = *dstpguid; msg->destination_participant_guid = *dstpguid;
if (dsteguid) if (dsteguid)
msg->destination_endpoint_guid = *dsteguid; msg->destination_endpoint_guid = *dsteguid;
@ -147,3 +153,104 @@ nn_participant_generic_message_deseralize(
assert(sizeof(nn_participant_generic_message_t) == plist_memsize_generic(pserop_participant_generic_message)); assert(sizeof(nn_participant_generic_message_t) == plist_memsize_generic(pserop_participant_generic_message));
return plist_deser_generic (msg, data, len, bswap, pserop_participant_generic_message); return plist_deser_generic (msg, data, len, bswap, pserop_participant_generic_message);
} }
int
write_crypto_exchange_message(
const struct participant *pp,
const ddsi_guid_t *dst_pguid,
const ddsi_guid_t *src_eguid,
const ddsi_guid_t *dst_eguid,
const char *classid,
const nn_dataholderseq_t *tokens)
{
struct q_globals * const gv = pp->e.gv;
struct nn_participant_generic_message pmg;
struct ddsi_tkmap_instance *tk;
struct ddsi_serdata *serdata;
struct proxy_reader *prd;
ddsi_guid_t prd_guid;
unsigned char *data;
size_t len;
struct writer *wr;
seqno_t seq;
int r;
if ((wr = get_builtin_writer (pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER)) == NULL)
{
GVLOG (DDS_LC_DISCOVERY, "write_crypto_exchange_message("PGUIDFMT") - builtin volatile secure writer not found\n", PGUID (pp->e.guid));
return -1;
}
prd_guid.prefix = dst_pguid->prefix;
prd_guid.entityid.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
if ((prd = ephash_lookup_proxy_reader_guid (gv->guid_hash, &prd_guid)) == NULL)
{
return -1;
}
GVLOG (DDS_LC_DISCOVERY, "send crypto tokens("PGUIDFMT" --> "PGUIDFMT")\n", PGUID (wr->e.guid), PGUID (prd_guid));
ddsrt_mutex_lock (&wr->e.lock);
seq = ++wr->seq;
/* Get serialized message. */
nn_participant_generic_message_init(&pmg, &wr->e.guid, seq, dst_pguid, dst_eguid, src_eguid, classid, tokens, NULL);
nn_participant_generic_message_serialize(&pmg, &data, &len);
/* Get the key value. */
ddsrt_md5_state_t md5st;
ddsrt_md5_byte_t digest[16];
ddsrt_md5_init (&md5st);
ddsrt_md5_append (&md5st, (const ddsrt_md5_byte_t *)data, sizeof (nn_message_identity_t));
ddsrt_md5_finish (&md5st, digest);
/* Write the sample. */
struct ddsi_rawcdr_sample raw = {
.blob = data,
.size = len,
.key = digest,
.keysize = 16
};
serdata = ddsi_serdata_from_sample (gv->rawcdr_topic, SDK_DATA, &raw);
tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, serdata);
r = write_sample_p2p_wrlock_held(wr, seq, NULL, serdata, tk, prd);
ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
ddsi_serdata_unref (serdata);
nn_participant_generic_message_deinit(&pmg);
ddsrt_mutex_unlock (&wr->e.lock);
return r;
}
int volatile_secure_data_filter(struct writer *wr, struct proxy_reader *prd, struct ddsi_serdata *serdata)
{
static const size_t guid_offset = offsetof(nn_participant_generic_message_t, destination_participant_guid);
ddsrt_iovec_t guid_ref = { .iov_len=0, .iov_base=NULL };
ddsi_guid_t *msg_guid;
ddsi_guid_t pp_guid;
int pass;
DDSRT_UNUSED_ARG(wr);
assert(wr);
assert(prd);
assert(serdata);
(void)ddsi_serdata_to_ser_ref(serdata, guid_offset, sizeof(ddsi_guid_t), &guid_ref);
assert(guid_ref.iov_len == sizeof(ddsi_guid_t));
assert(guid_ref.iov_base);
msg_guid = (ddsi_guid_t*)guid_ref.iov_base;
pass = is_null_guid(msg_guid);
if (!pass)
{
pp_guid = nn_hton_guid(prd->c.proxypp->e.guid);
pass = guid_eq(msg_guid, &pp_guid);
}
ddsi_serdata_to_ser_unref(serdata, &guid_ref);
return pass;
}

View file

@ -206,6 +206,9 @@ is_proxy_participant_deletion_allowed(
assert(gv); assert(gv);
assert(guid); assert(guid);
/* TODO: Check if the proxy writer guid prefix matches that of the proxy
* participant. Deletion is not allowed when they're not equal. */
/* Always allow deletion from a secure proxy writer. */ /* Always allow deletion from a secure proxy writer. */
if (pwr_entityid.u == NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER) if (pwr_entityid.u == NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER)
return true; return true;

View file

@ -1821,6 +1821,9 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER: case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER:
/* TODO: Handshake */ /* TODO: Handshake */
break; break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER:
/* TODO: Key exchange */
break;
#endif #endif
default: default:
GVLOGDISC ("data(builtin, vendor %u.%u): "PGUIDFMT" #%"PRId64": not handled\n", GVLOGDISC ("data(builtin, vendor %u.%u): "PGUIDFMT" #%"PRId64": not handled\n",

View file

@ -31,6 +31,7 @@
#include "dds/ddsi/q_qosmatch.h" #include "dds/ddsi/q_qosmatch.h"
#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_addrset.h" #include "dds/ddsi/q_addrset.h"
#include "dds/ddsi/q_xevent.h" /* qxev_spdp, &c. */ #include "dds/ddsi/q_xevent.h" /* qxev_spdp, &c. */
#include "dds/ddsi/q_ddsi_discovery.h" /* spdp_write, &c. */ #include "dds/ddsi/q_ddsi_discovery.h" /* spdp_write, &c. */
@ -50,6 +51,10 @@
#include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_security_omg.h" #include "dds/ddsi/ddsi_security_omg.h"
#ifdef DDSI_INCLUDE_SECURITY
#include "dds/ddsi/ddsi_security_msg.h"
#endif
struct deleted_participant { struct deleted_participant {
ddsrt_avl_node_t avlnode; ddsrt_avl_node_t avlnode;
ddsi_guid_t guid; ddsi_guid_t guid;
@ -490,6 +495,10 @@ static void add_security_builtin_endpoints(struct participant *pp, ddsi_guid_t *
new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_stateless_xqos_wr, whc_new(gv, 0, 1, 1), NULL, NULL); new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_stateless_xqos_wr, whc_new(gv, 0, 1, 1), NULL, NULL);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER; pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER;
subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER);
new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_volatile_xqos_wr, whc_new(gv, 0, 0, 0), NULL, NULL);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_ANNOUNCER;
subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER); subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER);
new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL); new_writer_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_endpoint_xqos_wr, whc_new(gv, 1, 1, 1), NULL, NULL);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_ANNOUNCER; pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_ANNOUNCER;
@ -522,6 +531,10 @@ static void add_security_builtin_endpoints(struct participant *pp, ddsi_guid_t *
new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_endpoint_xqos_rd, NULL, NULL, NULL); new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_endpoint_xqos_rd, NULL, NULL, NULL);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR; pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_SECURE_DETECTOR;
subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER);
new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_volatile_xqos_rd, NULL, NULL, NULL);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_DETECTOR;
subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER); subguid->entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER);
new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_stateless_xqos_rd, NULL, NULL, NULL); new_reader_guid (NULL, subguid, group_guid, pp, NULL, &gv->builtin_stateless_xqos_rd, NULL, NULL, NULL);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_DETECTOR; pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_DETECTOR;
@ -911,6 +924,8 @@ static void unref_participant (struct participant *pp, const struct ddsi_guid *g
NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER,
NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER, NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER,
NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER, NN_ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER,
NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER,
NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER,
/* PrismTech ones: */ /* PrismTech ones: */
NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER, NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER,
NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_READER, NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_READER,
@ -1113,6 +1128,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER: case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER:
bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER; bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER;
break; break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER:
bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_SECURE_ANNOUNCER;
break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER: case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER:
bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_ANNOUNCER; bes_mask = NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_SECURE_ANNOUNCER;
break; break;
@ -1729,9 +1747,12 @@ static void proxy_writer_drop_connection (const struct ddsi_guid *pwr_guid, stru
local_reader_ary_remove (&pwr->rdary, rd); local_reader_ary_remove (&pwr->rdary, rd);
} }
ddsrt_mutex_unlock (&pwr->e.lock); ddsrt_mutex_unlock (&pwr->e.lock);
if (m != NULL) if (m)
{ {
update_reader_init_acknack_count (&rd->e.gv->logconfig, rd->e.gv->guid_hash, &rd->e.guid, m->count); update_reader_init_acknack_count (&rd->e.gv->logconfig, rd->e.gv->guid_hash, &rd->e.guid, m->count);
if (m->filtered)
nn_defrag_prune(pwr->defrag, &m->rd_guid.prefix, m->last_seq);
} }
free_pwr_rd_match (m); free_pwr_rd_match (m);
} }
@ -1796,6 +1817,7 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd)
m->seq = MAX_SEQ_NUMBER; m->seq = MAX_SEQ_NUMBER;
else else
m->seq = wr->seq; m->seq = wr->seq;
m->last_seq = m->seq;
if (ddsrt_avl_lookup_ipath (&wr_readers_treedef, &wr->readers, &prd->e.guid, &path)) if (ddsrt_avl_lookup_ipath (&wr_readers_treedef, &wr->readers, &prd->e.guid, &path))
{ {
ELOGDISC (wr, " writer_add_connection(wr "PGUIDFMT" prd "PGUIDFMT") - already connected\n", ELOGDISC (wr, " writer_add_connection(wr "PGUIDFMT" prd "PGUIDFMT") - already connected\n",
@ -2044,6 +2066,8 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
m->t_heartbeat_accepted.v = 0; m->t_heartbeat_accepted.v = 0;
m->t_last_nack.v = 0; m->t_last_nack.v = 0;
m->seq_last_nack = 0; m->seq_last_nack = 0;
m->last_seq = 0;
m->filtered = 0;
/* These can change as a consequence of handling data and/or /* These can change as a consequence of handling data and/or
discovery activities. The safe way of dealing with them is to discovery activities. The safe way of dealing with them is to
@ -2104,9 +2128,16 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
difference in practice.) */ difference in practice.) */
if (rd->reliable) if (rd->reliable)
{ {
uint32_t secondary_reorder_maxsamples = pwr->e.gv->config.secondary_reorder_maxsamples;
if (rd->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER)
{
secondary_reorder_maxsamples = pwr->e.gv->config.primary_reorder_maxsamples;
m->filtered = 1;
}
m->acknack_xevent = qxev_acknack (pwr->evq, add_duration_to_mtime (tnow, pwr->e.gv->config.preemptive_ack_delay), &pwr->e.guid, &rd->e.guid); m->acknack_xevent = qxev_acknack (pwr->evq, add_duration_to_mtime (tnow, pwr->e.gv->config.preemptive_ack_delay), &pwr->e.guid, &rd->e.guid);
m->u.not_in_sync.reorder = m->u.not_in_sync.reorder =
nn_reorder_new (&pwr->e.gv->logconfig, NN_REORDER_MODE_NORMAL, pwr->e.gv->config.secondary_reorder_maxsamples, pwr->e.gv->config.late_ack_mode); nn_reorder_new (&pwr->e.gv->logconfig, NN_REORDER_MODE_NORMAL, secondary_reorder_maxsamples, pwr->e.gv->config.late_ack_mode);
pwr->n_reliable_readers++; pwr->n_reliable_readers++;
} }
else else
@ -2142,6 +2173,9 @@ already_matched:
return; return;
} }
static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer *wr) static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer *wr)
{ {
struct prd_wr_match *m = ddsrt_malloc (sizeof (*m)); struct prd_wr_match *m = ddsrt_malloc (sizeof (*m));
@ -2239,6 +2273,12 @@ static ddsi_entityid_t builtin_entityid_match (ddsi_entityid_t x)
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER: case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER:
res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER; res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER;
break; break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER:
res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER;
break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER:
res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER: case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER:
res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER; res.u = NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER;
break; break;
@ -2939,11 +2979,12 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
assert (wr->xqos->present & QP_RELIABILITY); assert (wr->xqos->present & QP_RELIABILITY);
wr->reliable = (wr->xqos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT); wr->reliable = (wr->xqos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT);
assert (wr->xqos->present & QP_DURABILITY); assert (wr->xqos->present & QP_DURABILITY);
if (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE)) if (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) &&
(wr->e.guid.entityid.u != NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER))
{ {
assert (wr->xqos->history.kind == DDS_HISTORY_KEEP_LAST); assert (wr->xqos->history.kind == DDS_HISTORY_KEEP_LAST);
assert (wr->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL || assert ((wr->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL) ||
wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER); (wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER));
} }
wr->handle_as_transient_local = (wr->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL); wr->handle_as_transient_local = (wr->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL);
wr->include_keyhash = wr->include_keyhash =
@ -3053,7 +3094,9 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
wr->whc_low = wr->e.gv->config.whc_lowwater_mark; wr->whc_low = wr->e.gv->config.whc_lowwater_mark;
wr->whc_high = wr->e.gv->config.whc_init_highwater_mark.value; wr->whc_high = wr->e.gv->config.whc_init_highwater_mark.value;
} }
assert (!is_builtin_entityid(wr->e.guid.entityid, NN_VENDORID_ECLIPSE) || (wr->whc_low == wr->whc_high && wr->whc_low == INT32_MAX)); assert (!(wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER)
||
(wr->whc_low == wr->whc_high && wr->whc_low == INT32_MAX));
/* Connection admin */ /* Connection admin */
ddsrt_avl_init (&wr_readers_treedef, &wr->readers); ddsrt_avl_init (&wr_readers_treedef, &wr->readers);
@ -3505,7 +3548,15 @@ static dds_return_t new_reader_guid
assert (rd->xqos->present & QP_RELIABILITY); assert (rd->xqos->present & QP_RELIABILITY);
rd->reliable = (rd->xqos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT); rd->reliable = (rd->xqos->reliability.kind != DDS_RELIABILITY_BEST_EFFORT);
assert (rd->xqos->present & QP_DURABILITY); assert (rd->xqos->present & QP_DURABILITY);
rd->handle_as_transient_local = (rd->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL); /* The builtin volatile secure writer applies a filter which is used to send the secure
* crypto token only to the destination reader for which the crypto tokens are applicable.
* Thus the builtin volatile secure reader will receive gaps in the sequence numbers of
* the messages received. Therefore the out-of-order list of the proxy writer cannot be
* used for this reader and reader specific out-of-order list must be used which is
* used for handling transient local data.
*/
rd->handle_as_transient_local = (rd->xqos->durability.kind == DDS_DURABILITY_TRANSIENT_LOCAL) ||
(rd->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER);
rd->topic = ddsi_sertopic_ref (topic); rd->topic = ddsi_sertopic_ref (topic);
rd->ddsi2direct_cb = 0; rd->ddsi2direct_cb = 0;
rd->ddsi2direct_cbarg = 0; rd->ddsi2direct_cbarg = 0;
@ -3862,6 +3913,20 @@ static void add_proxy_builtin_endpoints(
&gv->builtin_endpoint_xqos_wr, &gv->builtin_endpoint_xqos_wr,
&gv->builtin_endpoint_xqos_rd); &gv->builtin_endpoint_xqos_rd);
/* Security 'volatile' proxy endpoints. */
static const struct bestab bestab_volatile[] = {
LTE (PARTICIPANT_VOLATILE_SECURE_ANNOUNCER, P2P, PARTICIPANT_VOLATILE_SECURE_WRITER),
LTE (PARTICIPANT_VOLATILE_SECURE_DETECTOR, P2P, PARTICIPANT_VOLATILE_SECURE_READER)
};
create_proxy_builtin_endpoints(gv,
bestab_volatile,
(int)(sizeof (bestab_volatile) / sizeof (*bestab_volatile)),
ppguid,
proxypp,
timestamp,
&gv->builtin_volatile_xqos_wr,
&gv->builtin_volatile_xqos_rd);
/* Security 'stateless' proxy endpoints. */ /* Security 'stateless' proxy endpoints. */
static const struct bestab bestab_stateless[] = { static const struct bestab bestab_stateless[] = {
LTE (PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER, P2P, PARTICIPANT_STATELESS_MESSAGE_WRITER), LTE (PARTICIPANT_STATELESS_MESSAGE_ANNOUNCER, P2P, PARTICIPANT_STATELESS_MESSAGE_WRITER),
@ -4351,6 +4416,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
pwr->last_fragnum = ~0u; pwr->last_fragnum = ~0u;
pwr->nackfragcount = 0; pwr->nackfragcount = 0;
pwr->last_fragnum_reset = 0; pwr->last_fragnum_reset = 0;
pwr->filtered = 0;
ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1); ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1);
if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) { if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) {
/* The DDSI built-in proxy writers always deliver /* The DDSI built-in proxy writers always deliver
@ -4396,6 +4462,19 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
} }
reorder_mode = get_proxy_writer_reorder_mode(pwr->e.guid.entityid, isreliable); reorder_mode = get_proxy_writer_reorder_mode(pwr->e.guid.entityid, isreliable);
pwr->reorder = nn_reorder_new (&gv->logconfig, reorder_mode, gv->config.primary_reorder_maxsamples, gv->config.late_ack_mode); pwr->reorder = nn_reorder_new (&gv->logconfig, reorder_mode, gv->config.primary_reorder_maxsamples, gv->config.late_ack_mode);
if (pwr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER)
{
/* for the builtin_volatile_secure proxy writer which uses a content filter set the next expected
* sequence number of the reorder administration to the maximum sequence number to ensure that effectively
* the reorder administration of the builtin_volatile_secure proxy writer is not used and because the corresponding
* reader is always considered out of sync the reorder administration of the corresponding reader will be used
* instead.
*/
nn_reorder_set_next_seq(pwr->reorder, MAX_SEQ_NUMBER);
pwr->filtered = 1;
}
pwr->dqueue = dqueue; pwr->dqueue = dqueue;
pwr->evq = evq; pwr->evq = evq;
pwr->ddsi2direct_cb = 0; pwr->ddsi2direct_cb = 0;
@ -4595,6 +4674,15 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
ddsrt_avl_init (&prd_writers_treedef, &prd->writers); ddsrt_avl_init (&prd_writers_treedef, &prd->writers);
#ifdef DDSI_INCLUDE_SECURITY
if (prd->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER)
prd->filter = volatile_secure_data_filter;
else
prd->filter = NULL;
#else
prd->filter = NULL;
#endif
/* locking the entity prevents matching while the built-in topic hasn't been published yet */ /* locking the entity prevents matching while the built-in topic hasn't been published yet */
ddsrt_mutex_lock (&prd->e.lock); ddsrt_mutex_lock (&prd->e.lock);
ephash_insert_proxy_reader_guid (gv->guid_hash, prd); ephash_insert_proxy_reader_guid (gv->guid_hash, prd);

View file

@ -140,6 +140,29 @@ static void make_builtin_endpoint_xqos (dds_qos_t *q, const dds_qos_t *template)
q->durability.kind = DDS_DURABILITY_TRANSIENT_LOCAL; q->durability.kind = DDS_DURABILITY_TRANSIENT_LOCAL;
} }
#ifdef DDSI_INCLUDE_SECURITY
static void make_builtin_volatile_endpoint_xqos (dds_qos_t *q, const dds_qos_t *template)
{
nn_xqos_copy (q, template);
q->reliability.kind = DDS_RELIABILITY_RELIABLE;
q->reliability.max_blocking_time = 100 * T_MILLISECOND;
q->durability.kind = DDS_DURABILITY_VOLATILE;
q->history.kind = DDS_HISTORY_KEEP_ALL;
}
static void add_property_to_xqos(dds_qos_t *q, const char *name, const char *value)
{
assert(!(q->present & QP_PROPERTY_LIST));
q->present |= QP_PROPERTY_LIST;
q->property.value.n = 1;
q->property.value.props = ddsrt_malloc(sizeof(dds_property_t));
q->property.binary_value.n = 0;
q->property.binary_value.props = NULL;
q->property.value.props[0].name = ddsrt_strdup(name);
q->property.value.props[0].value = ddsrt_strdup(value);
}
#endif
static int set_recvips (struct q_globals *gv) static int set_recvips (struct q_globals *gv)
{ {
gv->recvips = NULL; gv->recvips = NULL;
@ -1014,10 +1037,17 @@ int rtps_init (struct q_globals *gv)
make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_rd, &gv->default_xqos_rd); make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_rd, &gv->default_xqos_rd);
make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_wr, &gv->default_xqos_wr); make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_wr, &gv->default_xqos_wr);
#ifdef DDSI_INCLUDE_SECURITY #ifdef DDSI_INCLUDE_SECURITY
make_builtin_volatile_endpoint_xqos(&gv->builtin_volatile_xqos_rd, &gv->default_xqos_rd);
make_builtin_volatile_endpoint_xqos(&gv->builtin_volatile_xqos_wr, &gv->default_xqos_wr);
nn_xqos_copy (&gv->builtin_stateless_xqos_rd, &gv->default_xqos_rd); nn_xqos_copy (&gv->builtin_stateless_xqos_rd, &gv->default_xqos_rd);
nn_xqos_copy (&gv->builtin_stateless_xqos_wr, &gv->default_xqos_wr); nn_xqos_copy (&gv->builtin_stateless_xqos_wr, &gv->default_xqos_wr);
gv->builtin_stateless_xqos_wr.reliability.kind = DDS_RELIABILITY_BEST_EFFORT; gv->builtin_stateless_xqos_wr.reliability.kind = DDS_RELIABILITY_BEST_EFFORT;
gv->builtin_stateless_xqos_wr.durability.kind = DDS_DURABILITY_VOLATILE; gv->builtin_stateless_xqos_wr.durability.kind = DDS_DURABILITY_VOLATILE;
/* Setting these properties allows the CryptoKeyFactory to recognize
* the entities (see DDS Security spec chapter 8.8.8.1). */
add_property_to_xqos(&gv->builtin_volatile_xqos_rd, "dds.sec.builtin_endpoint_name", "BuiltinParticipantVolatileMessageSecureReader");
add_property_to_xqos(&gv->builtin_volatile_xqos_wr, "dds.sec.builtin_endpoint_name", "BuiltinParticipantVolatileMessageSecureWriter");
#endif #endif
make_special_topics (gv); make_special_topics (gv);
@ -1338,6 +1368,8 @@ err_unicast_sockets:
#ifdef DDSI_INCLUDE_SECURITY #ifdef DDSI_INCLUDE_SECURITY
nn_xqos_fini (&gv->builtin_stateless_xqos_wr); nn_xqos_fini (&gv->builtin_stateless_xqos_wr);
nn_xqos_fini (&gv->builtin_stateless_xqos_rd); nn_xqos_fini (&gv->builtin_stateless_xqos_rd);
nn_xqos_fini (&gv->builtin_volatile_xqos_wr);
nn_xqos_fini (&gv->builtin_volatile_xqos_rd);
#endif #endif
nn_xqos_fini (&gv->builtin_endpoint_xqos_wr); nn_xqos_fini (&gv->builtin_endpoint_xqos_wr);
nn_xqos_fini (&gv->builtin_endpoint_xqos_rd); nn_xqos_fini (&gv->builtin_endpoint_xqos_rd);
@ -1670,6 +1702,8 @@ void rtps_fini (struct q_globals *gv)
#ifdef DDSI_INCLUDE_SECURITY #ifdef DDSI_INCLUDE_SECURITY
nn_xqos_fini (&gv->builtin_stateless_xqos_wr); nn_xqos_fini (&gv->builtin_stateless_xqos_wr);
nn_xqos_fini (&gv->builtin_stateless_xqos_rd); nn_xqos_fini (&gv->builtin_stateless_xqos_rd);
nn_xqos_fini (&gv->builtin_volatile_xqos_wr);
nn_xqos_fini (&gv->builtin_volatile_xqos_rd);
#endif #endif
nn_xqos_fini (&gv->builtin_endpoint_xqos_wr); nn_xqos_fini (&gv->builtin_endpoint_xqos_wr);
nn_xqos_fini (&gv->builtin_endpoint_xqos_rd); nn_xqos_fini (&gv->builtin_endpoint_xqos_rd);

View file

@ -42,6 +42,16 @@ int WildcardOverlap(char * p1, char * p2)
} }
#endif #endif
int guid_prefix_eq (const ddsi_guid_prefix_t *a, const ddsi_guid_prefix_t *b)
{
return a->u[0] == b->u[0] && a->u[1] == b->u[1] && a->u[2] == b->u[2];
}
int guid_eq (const struct ddsi_guid *a, const struct ddsi_guid *b)
{
return guid_prefix_eq(&a->prefix, &b->prefix) && (a->entityid.u == b->entityid.u);
}
int ddsi2_patmatch (const char *pat, const char *str) int ddsi2_patmatch (const char *pat, const char *str)
{ {
while (*pat) while (*pat)

View file

@ -1545,6 +1545,28 @@ int nn_defrag_nackmap (struct nn_defrag *defrag, seqno_t seq, uint32_t maxfragnu
return (int) map->numbits; return (int) map->numbits;
} }
/* There is only one defrag per proxy writer. However for the Volatile Secure writer a filter
* is applied to filter on the destination participant. Note that there will be one
* builtin Volatile Secure reader for each local participant. When this local participant
* is deleted the defrag buffer may still contain fragments for the associated reader.
* The nn_defrag_prune is used to remove these fragments and should only be used when
* the Volatile Secure reader is deleted.
*/
void nn_defrag_prune (struct nn_defrag *defrag, ddsi_guid_prefix_t *dst, seqno_t min)
{
struct nn_rsample *s = ddsrt_avl_lookup_succ_eq (&defrag_sampletree_treedef, &defrag->sampletree, &min);
while (s)
{
struct nn_rsample *s1 = ddsrt_avl_find_succ (&defrag_sampletree_treedef, &defrag->sampletree, s);
if (guid_prefix_eq(&s->u.defrag.sampleinfo->rst->dst_guid_prefix, dst))
{
defrag_rsample_drop (defrag, s);
}
s = s1;
}
defrag->max_sample = ddsrt_avl_find_max (&defrag_sampletree_treedef, &defrag->sampletree);
}
/* REORDER ------------------------------------------------------------- /* REORDER -------------------------------------------------------------
The reorder index tracks out-of-order messages as non-overlapping, The reorder index tracks out-of-order messages as non-overlapping,
@ -2366,6 +2388,11 @@ seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder)
return reorder->next_seq; return reorder->next_seq;
} }
void nn_reorder_set_next_seq (struct nn_reorder *reorder, seqno_t seq)
{
reorder->next_seq = seq;
}
/* DQUEUE -------------------------------------------------------------- */ /* DQUEUE -------------------------------------------------------------- */
struct nn_dqueue { struct nn_dqueue {

View file

@ -79,6 +79,11 @@ static void deliver_user_data_synchronously (struct nn_rsample_chain *sc, const
static void maybe_set_reader_in_sync (struct proxy_writer *pwr, struct pwr_rd_match *wn, seqno_t last_deliv_seq) static void maybe_set_reader_in_sync (struct proxy_writer *pwr, struct pwr_rd_match *wn, seqno_t last_deliv_seq)
{ {
if (wn->filtered)
{
wn->in_sync = PRMSS_OUT_OF_SYNC;
return;
}
switch (wn->in_sync) switch (wn->in_sync)
{ {
case PRMSS_SYNC: case PRMSS_SYNC:
@ -505,7 +510,7 @@ static int valid_DataFrag (const struct receiver_state *rst, struct nn_rmsg *rms
return 1; return 1;
} }
static int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, seqno_t start, seqno_t base, uint32_t numbits, const uint32_t *bits) int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, seqno_t start, seqno_t base, uint32_t numbits, const uint32_t *bits)
{ {
struct nn_xmsg_marker sm_marker; struct nn_xmsg_marker sm_marker;
Gap_t *gap; Gap_t *gap;
@ -603,6 +608,82 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_cou
return 1; return 1;
} }
void nn_gap_info_init(struct nn_gap_info *gi)
{
gi->gapstart = -1;
gi->gapend = -1;
gi->gapnumbits = 0;
memset(gi->gapbits, 0, sizeof(gi->gapbits));
}
void nn_gap_info_update(struct q_globals *gv, struct nn_gap_info *gi, int64_t seqnr)
{
if (gi->gapstart == -1)
{
GVTRACE (" M%"PRId64, seqnr);
gi->gapstart = seqnr;
gi->gapend = gi->gapstart + 1;
}
else if (seqnr == gi->gapend)
{
GVTRACE (" M%"PRId64, seqnr);
gi->gapend = seqnr + 1;
}
else if (seqnr - gi->gapend < 256)
{
unsigned idx = (unsigned) (seqnr - gi->gapend);
GVTRACE (" M%"PRId64, seqnr);
gi->gapnumbits = idx + 1;
nn_bitset_set (gi->gapnumbits, gi->gapbits, idx);
}
}
struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader *prd, struct nn_gap_info *gi)
{
struct nn_xmsg *m;
if (gi->gapstart <= 0)
return NULL;
if (gi->gapnumbits == 0)
{
/* Avoid sending an invalid bitset */
gi->gapnumbits = 1;
nn_bitset_set (gi->gapnumbits, gi->gapbits, 0);
gi->gapend--;
}
m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (m, wr->partition_id);
#endif
if (nn_xmsg_setdstPRD (m, prd) < 0)
{
nn_xmsg_free (m);
m = NULL;
}
else
{
add_Gap (m, wr, prd, gi->gapstart, gi->gapend, gi->gapnumbits, gi->gapbits);
if (nn_xmsg_size(m) == 0)
{
nn_xmsg_free (m);
m = NULL;
}
else
{
unsigned i;
ETRACE (wr, " FXGAP%"PRId64"..%"PRId64"/%d:", gi->gapstart, gi->gapend, gi->gapnumbits);
for (i = 0; i < gi->gapnumbits; i++)
ETRACE (wr, "%c", nn_bitset_isset (gi->gapnumbits, gi->gapbits, i) ? '1' : '0');
}
}
return m;
}
static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const AckNack_t *msg, nn_wctime_t timestamp, SubmessageKind_t prev_smid) static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const AckNack_t *msg, nn_wctime_t timestamp, SubmessageKind_t prev_smid)
{ {
struct proxy_reader *prd; struct proxy_reader *prd;
@ -611,10 +692,9 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
ddsi_guid_t src, dst; ddsi_guid_t src, dst;
seqno_t seqbase; seqno_t seqbase;
seqno_t seq_xmit; seqno_t seq_xmit;
seqno_t max_seq_available;
nn_count_t *countp; nn_count_t *countp;
seqno_t gapstart = -1, gapend = -1; struct nn_gap_info gi;
unsigned gapnumbits = 0;
uint32_t gapbits[256 / 32];
int accelerate_rexmit = 0; int accelerate_rexmit = 0;
int is_pure_ack; int is_pure_ack;
int is_pure_nonhist_ack; int is_pure_nonhist_ack;
@ -626,7 +706,6 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
struct whc_node *deferred_free_list = NULL; struct whc_node *deferred_free_list = NULL;
struct whc_state whcst; struct whc_state whcst;
int hb_sent_in_response = 0; int hb_sent_in_response = 0;
memset (gapbits, 0, sizeof (gapbits));
countp = (nn_count_t *) ((char *) msg + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (msg->readerSNState.numbits)); countp = (nn_count_t *) ((char *) msg + offsetof (AckNack_t, bits) + NN_SEQUENCE_NUMBER_SET_BITS_SIZE (msg->readerSNState.numbits));
src.prefix = rst->src_guid_prefix; src.prefix = rst->src_guid_prefix;
src.entityid = msg->readerId; src.entityid = msg->readerId;
@ -836,6 +915,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
a future request'll fix it. */ a future request'll fix it. */
enqueued = 1; enqueued = 1;
seq_xmit = writer_read_seq_xmit (wr); seq_xmit = writer_read_seq_xmit (wr);
nn_gap_info_init(&gi);
const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq; const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq;
const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0; const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0;
for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++) for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++)
@ -853,7 +933,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
if (!wr->retransmitting && sample.unacked) if (!wr->retransmitting && sample.unacked)
writer_set_retransmitting (wr); writer_set_retransmitting (wr);
if (rst->gv->config.retransmit_merging != REXMIT_MERGE_NEVER && rn->assumed_in_sync) if (rst->gv->config.retransmit_merging != REXMIT_MERGE_NEVER && rn->assumed_in_sync && !prd->filter)
{ {
/* send retransmit to all receivers, but skip if recently done */ /* send retransmit to all receivers, but skip if recently done */
nn_mtime_t tstamp = now_mt (); nn_mtime_t tstamp = now_mt ();
@ -875,77 +955,54 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
} }
else else
{ {
/* no merging, send directed retransmit */ /* Is this a volatile reader with a filter?
RSTTRACE (" RX%"PRId64"", seqbase + i); * If so, call the filter to see if we should re-arrange the sequence gap when needed. */
enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, prd, 0) >= 0); if (prd->filter && !prd->filter (wr, prd, sample.serdata))
if (enqueued) nn_gap_info_update (rst->gv, &gi, seqbase + i);
else
{ {
max_seq_in_reply = seqbase + i; /* no merging, send directed retransmit */
msgs_sent++; RSTTRACE (" RX%"PRId64"", seqbase + i);
sample.rexmit_count++; enqueued = (enqueue_sample_wrlock_held (wr, seq, sample.plist, sample.serdata, prd, 0) >= 0);
if (enqueued)
{
max_seq_in_reply = seqbase + i;
msgs_sent++;
sample.rexmit_count++;
}
} }
} }
whc_return_sample(wr->whc, &sample, true); whc_return_sample(wr->whc, &sample, true);
} }
else if (gapstart == -1) else
{ {
RSTTRACE (" M%"PRId64, seqbase + i); nn_gap_info_update (rst->gv, &gi, seqbase + i);
gapstart = seqbase + i;
gapend = gapstart + 1;
msgs_lost++;
}
else if (seqbase + i == gapend)
{
RSTTRACE (" M%"PRId64, seqbase + i);
gapend = seqbase + i + 1;
msgs_lost++;
}
else if (seqbase + i - gapend < 256)
{
uint32_t idx = (uint32_t) (seqbase + i - gapend);
RSTTRACE (" M%"PRId64, seqbase + i);
gapnumbits = idx + 1;
nn_bitset_set (gapnumbits, gapbits, idx);
msgs_lost++; msgs_lost++;
} }
} }
} }
if (!enqueued) if (!enqueued)
RSTTRACE (" rexmit-limit-hit"); RSTTRACE (" rexmit-limit-hit");
/* Generate a Gap message if some of the sequence is missing */ /* Generate a Gap message if some of the sequence is missing */
if (gapstart > 0) if (gi.gapstart > 0)
{ {
struct nn_xmsg *m; struct nn_xmsg *gap;
if (gapend == seqbase + msg->readerSNState.numbits)
if (gi.gapend == seqbase + msg->readerSNState.numbits)
gi.gapend = grow_gap_to_next_seq (wr, gi.gapend);
if (gi.gapend-1 + gi.gapnumbits > max_seq_in_reply)
max_seq_in_reply = gi.gapend-1 + gi.gapnumbits;
gap = nn_gap_info_create_gap (wr, prd, &gi);
if (gap)
{ {
/* We automatically grow a gap as far as we can -- can't qxev_msg (wr->evq, gap);
retransmit those messages anyway, so no need for round-trip msgs_sent++;
to the remote reader. */
gapend = grow_gap_to_next_seq (wr, gapend);
} }
/* The non-bitmap part of a gap message says everything <=
gapend-1 is no more (so the maximum sequence number it informs
the peer of is gapend-1); each bit adds one sequence number to
that. */
if (gapend-1 + gapnumbits > max_seq_in_reply)
max_seq_in_reply = gapend-1 + gapnumbits;
RSTTRACE (" XGAP%"PRId64"..%"PRId64"/%u:", gapstart, gapend, gapnumbits);
for (uint32_t i = 0; i < gapnumbits; i++)
RSTTRACE ("%c", nn_bitset_isset (gapnumbits, gapbits, i) ? '1' : '0');
m = nn_xmsg_new (rst->gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (m, wr->partition_id);
#endif
if (nn_xmsg_setdstPRD (m, prd) < 0)
nn_xmsg_free (m);
else
{
add_Gap (m, wr, prd, gapstart, gapend, gapnumbits, gapbits);
qxev_msg (wr->evq, m);
}
msgs_sent++;
} }
wr->rexmit_count += msgs_sent; wr->rexmit_count += msgs_sent;
wr->rexmit_lost_count += msgs_lost; wr->rexmit_lost_count += msgs_lost;
/* If rexmits and/or a gap message were sent, and if the last /* If rexmits and/or a gap message were sent, and if the last
@ -953,7 +1010,8 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
less than the last sequence number transmitted by the writer, less than the last sequence number transmitted by the writer,
tell the peer to acknowledge quickly. Not sure if that helps, but tell the peer to acknowledge quickly. Not sure if that helps, but
it might ... [NB writer->seq is the last msg sent so far] */ it might ... [NB writer->seq is the last msg sent so far] */
if (msgs_sent && max_seq_in_reply < seq_xmit) max_seq_available = (prd->filter ? rn->last_seq : seq_xmit);
if (msgs_sent && max_seq_in_reply < max_seq_available)
{ {
RSTTRACE (" rexmit#%"PRIu32" maxseq:%"PRId64"<%"PRId64"<=%"PRId64"", msgs_sent, max_seq_in_reply, seq_xmit, wr->seq); RSTTRACE (" rexmit#%"PRIu32" maxseq:%"PRId64"<%"PRId64"<=%"PRId64"", msgs_sent, max_seq_in_reply, seq_xmit, wr->seq);
force_heartbeat_to_peer (wr, &whcst, prd, 1); force_heartbeat_to_peer (wr, &whcst, prd, 1);
@ -1039,7 +1097,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand
struct receiver_state * const rst = arg->rst; struct receiver_state * const rst = arg->rst;
Heartbeat_t const * const msg = arg->msg; Heartbeat_t const * const msg = arg->msg;
struct proxy_writer * const pwr = arg->pwr; struct proxy_writer * const pwr = arg->pwr;
seqno_t refseq; seqno_t refseq, last_seq;
ASSERT_MUTEX_HELD (&pwr->e.lock); ASSERT_MUTEX_HELD (&pwr->e.lock);
@ -1053,7 +1111,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand
/* Reference sequence number for determining whether or not to /* Reference sequence number for determining whether or not to
Ack/Nack unfortunately depends on whether the reader is in Ack/Nack unfortunately depends on whether the reader is in
sync. */ sync. */
if (wn->in_sync != PRMSS_OUT_OF_SYNC) if (wn->in_sync != PRMSS_OUT_OF_SYNC && !wn->filtered)
refseq = nn_reorder_next_seq (pwr->reorder) - 1; refseq = nn_reorder_next_seq (pwr->reorder) - 1;
else else
refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; refseq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1;
@ -1070,7 +1128,12 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand
{ {
nn_mtime_t tsched; nn_mtime_t tsched;
tsched.v = T_NEVER; tsched.v = T_NEVER;
if (pwr->last_seq > refseq)
if (wn->filtered)
last_seq = wn->last_seq;
else
last_seq = pwr->last_seq;
if (last_seq > refseq)
{ {
RSTTRACE ("/NAK"); RSTTRACE ("/NAK");
if (arg->tnow_mt.v >= wn->t_last_nack.v + rst->gv->config.nack_delay || refseq >= wn->seq_last_nack) if (arg->tnow_mt.v >= wn->t_last_nack.v + rst->gv->config.nack_delay || refseq >= wn->seq_last_nack)
@ -1195,44 +1258,73 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
int refc_adjust = 0; int refc_adjust = 0;
nn_reorder_result_t res; nn_reorder_result_t res;
gap = nn_rdata_newgap (rmsg); gap = nn_rdata_newgap (rmsg);
if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, firstseq, &refc_adjust)) > 0) int filtered = 0;
if (pwr->filtered && !is_null_guid(&dst))
{ {
if (pwr->deliver_synchronously) for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn))
deliver_user_data_synchronously (&sc, NULL);
else
nn_dqueue_enqueue (pwr->dqueue, &sc, res);
}
for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn))
if (wn->in_sync != PRMSS_SYNC)
{ {
seqno_t last_deliv_seq = 0; if (guid_eq(&wn->rd_guid, &dst))
switch (wn->in_sync)
{ {
case PRMSS_SYNC: if (wn->filtered)
assert(0); {
break;
case PRMSS_TLCATCHUP:
last_deliv_seq = nn_reorder_next_seq (pwr->reorder) - 1;
break;
case PRMSS_OUT_OF_SYNC: {
struct nn_reorder *ro = wn->u.not_in_sync.reorder; struct nn_reorder *ro = wn->u.not_in_sync.reorder;
if ((res = nn_reorder_gap (&sc, ro, gap, 1, firstseq, &refc_adjust)) > 0) if ((res = nn_reorder_gap (&sc, ro, gap, 1, firstseq, &refc_adjust)) > 0)
nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res);
if (fromSN (msg->lastSN) > wn->last_seq)
{ {
if (pwr->deliver_synchronously) wn->last_seq = fromSN (msg->lastSN);
deliver_user_data_synchronously (&sc, &wn->rd_guid);
else
nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res);
} }
last_deliv_seq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1; filtered = 1;
} }
break;
} }
if (wn->u.not_in_sync.end_of_tl_seq == MAX_SEQ_NUMBER)
{
wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN);
RSTTRACE (" end-of-tl-seq(rd "PGUIDFMT" #%"PRId64")", PGUID(wn->rd_guid), wn->u.not_in_sync.end_of_tl_seq);
}
maybe_set_reader_in_sync (pwr, wn, last_deliv_seq);
} }
}
if (!filtered)
{
if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, firstseq, &refc_adjust)) > 0)
{
if (pwr->deliver_synchronously)
deliver_user_data_synchronously (&sc, NULL);
else
nn_dqueue_enqueue (pwr->dqueue, &sc, res);
}
for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn))
{
if (wn->in_sync != PRMSS_SYNC)
{
seqno_t last_deliv_seq = 0;
switch (wn->in_sync)
{
case PRMSS_SYNC:
assert(0);
break;
case PRMSS_TLCATCHUP:
last_deliv_seq = nn_reorder_next_seq (pwr->reorder) - 1;
break;
case PRMSS_OUT_OF_SYNC: {
struct nn_reorder *ro = wn->u.not_in_sync.reorder;
if ((res = nn_reorder_gap (&sc, ro, gap, 1, firstseq, &refc_adjust)) > 0)
{
if (pwr->deliver_synchronously)
deliver_user_data_synchronously (&sc, &wn->rd_guid);
else
nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, res);
}
last_deliv_seq = nn_reorder_next_seq (wn->u.not_in_sync.reorder) - 1;
}
}
if (wn->u.not_in_sync.end_of_tl_seq == MAX_SEQ_NUMBER)
{
wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN);
RSTTRACE (" end-of-tl-seq(rd "PGUIDFMT" #%"PRId64")", PGUID(wn->rd_guid), wn->u.not_in_sync.end_of_tl_seq);
}
maybe_set_reader_in_sync (pwr, wn, last_deliv_seq);
}
}
}
nn_fragchain_adjust_refcount (gap, refc_adjust); nn_fragchain_adjust_refcount (gap, refc_adjust);
} }
@ -1552,7 +1644,8 @@ static int handle_one_gap (struct proxy_writer *pwr, struct pwr_rd_match *wn, se
/* Clean up the defrag admin: no fragments of a missing sample will /* Clean up the defrag admin: no fragments of a missing sample will
be arriving in the future */ be arriving in the future */
nn_defrag_notegap (pwr->defrag, a, b); if (!(wn && wn->filtered))
nn_defrag_notegap (pwr->defrag, a, b);
/* Primary reorder: the gap message may cause some samples to become /* Primary reorder: the gap message may cause some samples to become
deliverable. */ deliverable. */
@ -1733,6 +1826,14 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
pwr->last_fragnum = ~0u; pwr->last_fragnum = ~0u;
pwr->last_fragnum_reset = 0; pwr->last_fragnum_reset = 0;
} }
if (wn && wn->filtered)
{
if (listbase + last_included_rel > wn->last_seq)
{
wn->last_seq = listbase + last_included_rel;
}
}
RSTTRACE (")"); RSTTRACE (")");
ddsrt_mutex_unlock (&pwr->e.lock); ddsrt_mutex_unlock (&pwr->e.lock);
return 1; return 1;
@ -2193,105 +2294,135 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
int refc_adjust = 0; int refc_adjust = 0;
struct nn_rsample_chain sc; struct nn_rsample_chain sc;
struct nn_rdata *fragchain = nn_rsample_fragchain (rsample); struct nn_rdata *fragchain = nn_rsample_fragchain (rsample);
nn_reorder_result_t rres; nn_reorder_result_t rres, rres2;
struct pwr_rd_match *wn;
int filtered = 0;
rres = nn_reorder_rsample (&sc, pwr->reorder, rsample, &refc_adjust, 0); // nn_dqueue_is_full (pwr->dqueue)); if (pwr->filtered && !is_null_guid(&dst))
if (rres == NN_REORDER_ACCEPT && pwr->n_reliable_readers == 0)
{ {
/* If no reliable readers but the reorder buffer accepted the for (wn = ddsrt_avl_find_min (&pwr_readers_treedef, &pwr->readers); wn != NULL; wn = ddsrt_avl_find_succ (&pwr_readers_treedef, &pwr->readers, wn))
sample, it must be a reliable proxy writer with only
unreliable readers. "Inserting" a Gap [1, sampleinfo->seq)
will force delivery of this sample, and not cause the gap to
be added to the reorder admin. */
int gap_refc_adjust = 0;
rres = nn_reorder_gap (&sc, pwr->reorder, rdata, 1, sampleinfo->seq, &gap_refc_adjust);
assert (rres > 0);
assert (gap_refc_adjust == 0);
}
if (rres > 0)
{
/* Enqueue or deliver with pwr->e.lock held: to ensure no other
receive thread's data gets interleaved -- arguably delivery
needn't be exactly in-order, which would allow us to do this
without pwr->e.lock held. */
if (pwr->deliver_synchronously)
{ {
/* FIXME: just in case the synchronous delivery runs into a delay caused if (guid_eq(&wn->rd_guid, &dst))
by the current mishandling of resource limits */
if (*deferred_wakeup)
dd_dqueue_enqueue_trigger (*deferred_wakeup);
deliver_user_data_synchronously (&sc, NULL);
}
else
{
if (nn_dqueue_enqueue_deferred_wakeup (pwr->dqueue, &sc, rres))
{ {
if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) if (wn->filtered)
dd_dqueue_enqueue_trigger (*deferred_wakeup); {
*deferred_wakeup = pwr->dqueue; rres2 = nn_reorder_rsample (&sc, wn->u.not_in_sync.reorder, rsample, &refc_adjust, nn_dqueue_is_full (pwr->dqueue));
if (sampleinfo->seq > wn->last_seq)
{
wn->last_seq = sampleinfo->seq;
}
if (rres2 > 0)
{
if (!pwr->deliver_synchronously)
nn_dqueue_enqueue (pwr->dqueue, &sc, rres2);
else
deliver_user_data_synchronously (&sc, &wn->rd_guid);
}
filtered = 1;
}
break;
} }
} }
} }
if (pwr->n_readers_out_of_sync > 0) if (!filtered)
{ {
/* Those readers catching up with TL but in sync with the proxy rres = nn_reorder_rsample (&sc, pwr->reorder, rsample, &refc_adjust, 0); // nn_dqueue_is_full (pwr->dqueue));
writer may have become in sync with the proxy writer and the
writer; those catching up with TL all by themselves go through if (rres == NN_REORDER_ACCEPT && pwr->n_reliable_readers == 0)
the "TOO_OLD" path below. */
ddsrt_avl_iter_t it;
struct pwr_rd_match *wn;
struct nn_rsample *rsample_dup = NULL;
int reuse_rsample_dup = 0;
for (wn = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); wn != NULL; wn = ddsrt_avl_iter_next (&it))
{ {
nn_reorder_result_t rres2; /* If no reliable readers but the reorder buffer accepted the
if (wn->in_sync == PRMSS_SYNC) sample, it must be a reliable proxy writer with only
continue; unreliable readers. "Inserting" a Gap [1, sampleinfo->seq)
/* only need to get a copy of the first sample, because that's the one will force delivery of this sample, and not cause the gap to
that triggered delivery */ be added to the reorder admin. */
if (!reuse_rsample_dup) int gap_refc_adjust = 0;
rsample_dup = nn_reorder_rsample_dup_first (rmsg, rsample); rres = nn_reorder_gap (&sc, pwr->reorder, rdata, 1, sampleinfo->seq, &gap_refc_adjust);
rres2 = nn_reorder_rsample (&sc, wn->u.not_in_sync.reorder, rsample_dup, &refc_adjust, nn_dqueue_is_full (pwr->dqueue)); assert (rres > 0);
switch (rres2) assert (gap_refc_adjust == 0);
}
if (rres > 0)
{
/* Enqueue or deliver with pwr->e.lock held: to ensure no other
receive thread's data gets interleaved -- arguably delivery
needn't be exactly in-order, which would allow us to do this
without pwr->e.lock held. */
if (pwr->deliver_synchronously)
{ {
case NN_REORDER_TOO_OLD: /* FIXME: just in case the synchronous delivery runs into a delay caused
case NN_REORDER_REJECT: by the current mishandling of resource limits */
reuse_rsample_dup = 1; if (*deferred_wakeup)
break; dd_dqueue_enqueue_trigger (*deferred_wakeup);
case NN_REORDER_ACCEPT: deliver_user_data_synchronously (&sc, NULL);
reuse_rsample_dup = 0; }
break; else
default: {
assert (rres2 > 0); if (nn_dqueue_enqueue_deferred_wakeup (pwr->dqueue, &sc, rres))
/* note: can't deliver to a reader, only to a group */ {
maybe_set_reader_in_sync (pwr, wn, sampleinfo->seq); if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue)
reuse_rsample_dup = 0; dd_dqueue_enqueue_trigger (*deferred_wakeup);
/* No need to deliver old data to out-of-sync readers *deferred_wakeup = pwr->dqueue;
synchronously -- ordering guarantees don't change }
as fresh data will be delivered anyway and hence }
the old data will never be guaranteed to arrive }
in-order, and those few microseconds can't hurt in
catching up on transient-local data. See also if (pwr->n_readers_out_of_sync > 0)
NN_REORDER_DELIVER case in outer switch. */ {
if (pwr->deliver_synchronously) /* Those readers catching up with TL but in sync with the proxy
{ writer may have become in sync with the proxy writer and the
/* FIXME: just in case the synchronous delivery runs into a delay caused writer; those catching up with TL all by themselves go through
by the current mishandling of resource limits */ the "TOO_OLD" path below. */
deliver_user_data_synchronously (&sc, &wn->rd_guid); ddsrt_avl_iter_t it;
} struct nn_rsample *rsample_dup = NULL;
else int reuse_rsample_dup = 0;
{ for (wn = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); wn != NULL; wn = ddsrt_avl_iter_next (&it))
if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) {
if (wn->in_sync == PRMSS_SYNC)
continue;
/* only need to get a copy of the first sample, because that's the one
that triggered delivery */
if (!reuse_rsample_dup)
rsample_dup = nn_reorder_rsample_dup_first (rmsg, rsample);
rres2 = nn_reorder_rsample (&sc, wn->u.not_in_sync.reorder, rsample_dup, &refc_adjust, nn_dqueue_is_full (pwr->dqueue));
switch (rres2)
{
case NN_REORDER_TOO_OLD:
case NN_REORDER_REJECT:
reuse_rsample_dup = 1;
break;
case NN_REORDER_ACCEPT:
reuse_rsample_dup = 0;
break;
default:
assert (rres2 > 0);
/* note: can't deliver to a reader, only to a group */
maybe_set_reader_in_sync (pwr, wn, sampleinfo->seq);
reuse_rsample_dup = 0;
/* No need to deliver old data to out-of-sync readers
synchronously -- ordering guarantees don't change
as fresh data will be delivered anyway and hence
the old data will never be guaranteed to arrive
in-order, and those few microseconds can't hurt in
catching up on transient-local data. See also
NN_REORDER_DELIVER case in outer switch. */
if (pwr->deliver_synchronously)
{ {
dd_dqueue_enqueue_trigger (*deferred_wakeup); /* FIXME: just in case the synchronous delivery runs into a delay caused
*deferred_wakeup = NULL; by the current mishandling of resource limits */
deliver_user_data_synchronously (&sc, &wn->rd_guid);
} }
nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, rres2); else
} {
break; if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue)
{
dd_dqueue_enqueue_trigger (*deferred_wakeup);
*deferred_wakeup = NULL;
}
nn_dqueue_enqueue1 (pwr->dqueue, &wn->rd_guid, &sc, rres2);
}
break;
}
} }
} }
} }

View file

@ -10,6 +10,7 @@
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/ */
#include <assert.h> #include <assert.h>
#include <string.h>
#include <math.h> #include <math.h>
#include "dds/ddsrt/heap.h" #include "dds/ddsrt/heap.h"
@ -31,6 +32,7 @@
#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_unused.h" #include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_hbcontrol.h" #include "dds/ddsi/q_hbcontrol.h"
#include "dds/ddsi/q_receive.h"
#include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_serdata.h" #include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/ddsi_sertopic.h"
@ -104,8 +106,8 @@ int64_t writer_hbcontrol_intv (const struct writer *wr, const struct whc_state *
void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow) void writer_hbcontrol_note_asyncwrite (struct writer *wr, nn_mtime_t tnow)
{ {
struct q_globals const * const gv = wr->e.gv;
struct hbcontrol * const hbc = &wr->hbcontrol; struct hbcontrol * const hbc = &wr->hbcontrol;
struct q_globals const * const gv = wr->e.gv;
nn_mtime_t tnext; nn_mtime_t tnext;
/* Reset number of heartbeats since last write: that means the /* Reset number of heartbeats since last write: that means the
@ -315,12 +317,53 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_
(hbc->tsched.v == T_NEVER) ? INFINITY : (double) (hbc->tsched.v - tnow.v) / 1e9, (hbc->tsched.v == T_NEVER) ? INFINITY : (double) (hbc->tsched.v - tnow.v) / 1e9,
ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->min_seq, ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->min_seq,
ddsrt_avl_is_empty (&wr->readers) || root_rdmatch (wr)->all_have_replied_to_hb ? "" : "!", ddsrt_avl_is_empty (&wr->readers) || root_rdmatch (wr)->all_have_replied_to_hb ? "" : "!",
whcst->max_seq, writer_read_seq_xmit (wr)); whcst->max_seq, writer_read_seq_xmit(wr));
} }
return msg; return msg;
} }
#ifdef DDSI_INCLUDE_SECURITY
struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state *whcst, int hbansreq, struct proxy_reader *prd)
{
struct q_globals const * const gv = wr->e.gv;
struct nn_xmsg *msg;
ASSERT_MUTEX_HELD (&wr->e.lock);
assert (wr->reliable);
if ((msg = nn_xmsg_new (gv->xmsgpool, &wr->e.guid, wr->c.pp, sizeof (InfoTS_t) + sizeof (Heartbeat_t), NN_XMSG_KIND_CONTROL)) == NULL)
return NULL;
ETRACE (wr, "writer_hbcontrol_p2p: wr "PGUIDFMT" unicasting to prd "PGUIDFMT" ", PGUID (wr->e.guid), PGUID (prd->e.guid));
ETRACE (wr, "(rel-prd %d seq-eq-max %d seq %"PRId64" maxseq %"PRId64")\n",
wr->num_reliable_readers,
ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->num_reliable_readers_where_seq_equals_max,
wr->seq,
ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : root_rdmatch (wr)->max_seq);
/* set the destination explicitly to the unicast destination and the fourth
param of add_Heartbeat needs to be the guid of the reader */
if (nn_xmsg_setdstPRD (msg, prd) < 0)
{
nn_xmsg_free (msg);
return NULL;
}
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
add_Heartbeat (msg, wr, whcst, hbansreq, prd->e.guid.entityid, 1);
if (nn_xmsg_size(msg) == 0)
{
nn_xmsg_free (msg);
msg = NULL;
}
return msg;
}
#endif
void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync) void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_state *whcst, int hbansreq, ddsi_entityid_t dst, int issync)
{ {
struct q_globals const * const gv = wr->e.gv; struct q_globals const * const gv = wr->e.gv;
@ -1051,6 +1094,65 @@ static int maybe_grow_whc (struct writer *wr)
return 0; return 0;
} }
int write_sample_p2p_wrlock_held(struct writer *wr, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, struct proxy_reader *prd)
{
struct q_globals * const gv = wr->e.gv;
int r;
nn_mtime_t tnow;
int rexmit = 1;
struct wr_prd_match *wprd = NULL;
seqno_t gseq;
struct nn_xmsg *gap = NULL;
tnow = now_mt ();
serdata->twrite = tnow;
serdata->timestamp = now();
if (prd->filter)
{
if ((wprd = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, &prd->e.guid)) != NULL)
{
rexmit = prd->filter(wr, prd, serdata);
/* determine if gap has to added */
if (rexmit)
{
struct nn_gap_info gi;
GVLOG (DDS_LC_DISCOVERY, "send filtered "PGUIDFMT" last_seq=%"PRIu64" seq=%"PRIu64"\n", PGUID (wr->e.guid), wprd->seq, seq);
nn_gap_info_init(&gi);
for (gseq = wprd->seq + 1; gseq < seq; gseq++)
{
struct whc_borrowed_sample sample;
if (whc_borrow_sample (wr->whc, seq, &sample))
{
if (prd->filter(wr, prd, sample.serdata) == 0)
{
nn_gap_info_update(wr->e.gv, &gi, gseq);
}
whc_return_sample (wr->whc, &sample, false);
}
}
gap = nn_gap_info_create_gap(wr, prd, &gi);
}
wprd->last_seq = seq;
}
}
if ((r = insert_sample_in_whc (wr, seq, plist, serdata, tk)) >= 0)
{
enqueue_sample_wrlock_held (wr, seq, plist, serdata, prd, 1);
if (gap)
qxev_msg (wr->evq, gap);
if (wr->heartbeat_xevent)
writer_hbcontrol_note_asyncwrite(wr, tnow);
}
return r;
}
static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, int end_of_txn, int gc_allowed) static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, int end_of_txn, int gc_allowed)
{ {
struct q_globals const * const gv = wr->e.gv; struct q_globals const * const gv = wr->e.gv;

View file

@ -579,6 +579,89 @@ static void handle_xevk_entityid (struct nn_xpack *xp, struct xevent_nt *ev)
nn_xpack_addmsg (xp, ev->u.entityid.msg, 0); nn_xpack_addmsg (xp, ev->u.entityid.msg, 0);
} }
#ifdef DDSI_INCLUDE_SECURITY
static void send_heartbeat_to_all_readers(struct nn_xpack *xp, struct xevent *ev, struct writer *wr, nn_mtime_t tnow)
{
struct whc_state whcst;
nn_mtime_t t_next;
int hbansreq = 0;
unsigned count = 0;
ddsrt_mutex_lock (&wr->e.lock);
whc_get_state(wr->whc, &whcst);
if (!writer_must_have_hb_scheduled (wr, &whcst))
{
hbansreq = 1; /* just for trace */
t_next.v = T_NEVER;
}
else if (!writer_hbcontrol_must_send (wr, &whcst, tnow))
{
hbansreq = 1; /* just for trace */
t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow);
}
else
{
struct wr_prd_match *m;
struct ddsi_guid last_guid = { .prefix = {.u = {0,0,0}}, .entityid = {0} };
hbansreq = writer_hbcontrol_ack_required (wr, &whcst, tnow);
t_next.v = tnow.v + writer_hbcontrol_intv (wr, &whcst, tnow);
while ((m = ddsrt_avl_lookup_succ (&wr_readers_treedef, &wr->readers, &last_guid)) != NULL)
{
last_guid = m->prd_guid;
if (m->seq < m->last_seq)
{
struct proxy_reader *prd;
prd = ephash_lookup_proxy_reader_guid(wr->e.gv->guid_hash, &m->prd_guid);
if (prd)
{
ETRACE (wr, " heartbeat(wr "PGUIDFMT" rd "PGUIDFMT" %s) send, resched in %g s (min-ack %"PRId64", avail-seq %"PRId64")\n",
PGUID (wr->e.guid),
PGUID (m->prd_guid),
hbansreq ? "" : " final",
(double)(t_next.v - tnow.v) / 1e9,
m->seq,
m->last_seq);
struct nn_xmsg *msg = writer_hbcontrol_p2p(wr, &whcst, hbansreq, prd);
if (msg != NULL)
{
ddsrt_mutex_unlock (&wr->e.lock);
nn_xpack_addmsg (xp, msg, 0);
ddsrt_mutex_lock (&wr->e.lock);
}
count++;
}
}
}
}
resched_xevent_if_earlier (ev, t_next);
wr->hbcontrol.tsched = t_next;
if (count == 0)
{
(void)resched_xevent_if_earlier (ev, t_next);
ETRACE (wr, "heartbeat(wr "PGUIDFMT") suppressed, resched in %g s (min-ack %"PRId64"%s, avail-seq %"PRId64", xmit %"PRId64")\n",
PGUID (wr->e.guid),
(t_next.v == T_NEVER) ? INFINITY : (double)(t_next.v - tnow.v) / 1e9,
ddsrt_avl_is_empty (&wr->readers) ? (int64_t) -1 : ((struct wr_prd_match *) ddsrt_avl_root (&wr_readers_treedef, &wr->readers))->min_seq,
ddsrt_avl_is_empty (&wr->readers) || ((struct wr_prd_match *) ddsrt_avl_root (&wr_readers_treedef, &wr->readers))->all_have_replied_to_hb ? "" : "!",
whcst.max_seq,
writer_read_seq_xmit(wr));
}
ddsrt_mutex_unlock (&wr->e.lock);
}
#endif
static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow /* monotonic */) static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow /* monotonic */)
{ {
struct q_globals const * const gv = ev->evq->gv; struct q_globals const * const gv = ev->evq->gv;
@ -594,6 +677,14 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
return; return;
} }
#ifdef DDSI_INCLUDE_SECURITY
if (wr->e.guid.entityid.u == NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER)
{
send_heartbeat_to_all_readers(xp, ev, wr, tnow);
return;
}
#endif
ddsrt_mutex_lock (&wr->e.lock); ddsrt_mutex_lock (&wr->e.lock);
assert (wr->reliable); assert (wr->reliable);
whc_get_state(wr->whc, &whcst); whc_get_state(wr->whc, &whcst);
@ -701,7 +792,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
AckNack_t *an; AckNack_t *an;
struct nn_xmsg_marker sm_marker; struct nn_xmsg_marker sm_marker;
uint32_t i, numbits; uint32_t i, numbits;
seqno_t base; seqno_t base, last_seq;
DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0); DDSRT_STATIC_ASSERT ((NN_FRAGMENT_NUMBER_SET_MAX_BITS % 32) == 0);
struct { struct {
@ -716,7 +807,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
/* if in sync, look at proxy writer status, else look at /* if in sync, look at proxy writer status, else look at
proxy-writer--reader match status */ proxy-writer--reader match status */
if (rwn->in_sync != PRMSS_OUT_OF_SYNC) if (rwn->in_sync != PRMSS_OUT_OF_SYNC && !rwn->filtered)
{ {
reorder = pwr->reorder; reorder = pwr->reorder;
if (!pwr->e.gv->config.late_ack_mode) if (!pwr->e.gv->config.late_ack_mode)
@ -734,6 +825,11 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
bitmap_base = nn_reorder_next_seq (reorder); bitmap_base = nn_reorder_next_seq (reorder);
} }
if (rwn->filtered)
last_seq = rwn->last_seq;
else
last_seq = pwr->last_seq;
an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX); an = nn_xmsg_append (msg, &sm_marker, ACKNACK_SIZE_MAX);
nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK); nn_xmsg_submsg_init (msg, sm_marker, SMID_ACKNACK);
an->readerId = nn_hton_entityid (rwn->rd_guid.entityid); an->readerId = nn_hton_entityid (rwn->rd_guid.entityid);
@ -741,7 +837,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
/* Make bitmap; note that we've made sure to have room for the /* Make bitmap; note that we've made sure to have room for the
maximum bitmap size. */ maximum bitmap size. */
numbits = nn_reorder_nackmap (reorder, bitmap_base, pwr->last_seq, &an->readerSNState, an->bits, max_numbits, notail); numbits = nn_reorder_nackmap (reorder, bitmap_base, last_seq, &an->readerSNState, an->bits, max_numbits, notail);
base = fromSN (an->readerSNState.bitmap_base); base = fromSN (an->readerSNState.bitmap_base);
/* Scan through bitmap, cutting it off at the first missing sample /* Scan through bitmap, cutting it off at the first missing sample
@ -754,7 +850,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
nackfrag_seq = base + i; nackfrag_seq = base + i;
if (!nn_bitset_isset (numbits, an->bits, i)) if (!nn_bitset_isset (numbits, an->bits, i))
continue; continue;
if (nackfrag_seq == pwr->last_seq) if (nackfrag_seq == last_seq)
fragnum = pwr->last_fragnum; fragnum = pwr->last_fragnum;
else else
fragnum = UINT32_MAX; fragnum = UINT32_MAX;
@ -778,7 +874,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
*nack_seq = (numbits > 0) ? base + numbits : 0; *nack_seq = (numbits > 0) ? base + numbits : 0;
if (!pwr->have_seen_heartbeat) { if (!pwr->have_seen_heartbeat) {
/* We must have seen a heartbeat for us to consider setting FINAL */ /* We must have seen a heartbeat for us to consider setting FINAL */
} else if (*nack_seq && base + numbits <= pwr->last_seq) { } else if (*nack_seq && base + numbits <= last_seq) {
/* If it's a NACK and it doesn't cover samples all the way up to /* If it's a NACK and it doesn't cover samples all the way up to
the highest known sequence number, there's some reason to expect the highest known sequence number, there's some reason to expect
we may to do another round. For which we need a Heartbeat. we may to do another round. For which we need a Heartbeat.

View file

@ -1546,11 +1546,6 @@ static int nn_xpack_mayaddmsg (const struct nn_xpack *xp, const struct nn_xmsg *
return addressing_info_eq_onesidederr (xp, m); return addressing_info_eq_onesidederr (xp, m);
} }
static int guid_prefix_eq (const ddsi_guid_prefix_t *a, const ddsi_guid_prefix_t *b)
{
return a->u[0] == b->u[0] && a->u[1] == b->u[1] && a->u[2] == b->u[2];
}
int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flags) int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flags)
{ {
/* Returns > 0 if pack got sent out before adding m */ /* Returns > 0 if pack got sent out before adding m */

View file

@ -18,13 +18,13 @@
static nn_participant_generic_message_t test_msg_in = static nn_participant_generic_message_t test_msg_in =
{ {
.message_identity = { {{.u={1,2,3}},{4}}, 5 }, .message_identity = { {{.u={1,2,3}},{4}}, 5 },
.related_message_identity = { {{.u={5,4,3}},{2}}, 1 }, .related_message_identity = { {{.u={5,4,3}},{2}}, 1 },
.destinaton_participant_guid = { {.u={2,3,4}},{5} }, .destination_participant_guid = { {.u={2,3,4}},{5} },
.destination_endpoint_guid = { {.u={3,4,5}},{6} }, .destination_endpoint_guid = { {.u={3,4,5}},{6} },
.source_endpoint_guid = { {.u={4,5,6}},{7} }, .source_endpoint_guid = { {.u={4,5,6}},{7} },
.message_class_id = "testing message", .message_class_id = "testing message",
.message_data = { .message_data = {
.n = 4, .n = 4,
.tags = (nn_dataholder_t[]) { .tags = (nn_dataholder_t[]) {
{ {
@ -146,13 +146,13 @@ static nn_participant_generic_message_t test_msg_in =
/* Same as test_msg_in, excluding the non-propagated properties. */ /* Same as test_msg_in, excluding the non-propagated properties. */
static nn_participant_generic_message_t test_msg_out = static nn_participant_generic_message_t test_msg_out =
{ {
.message_identity = { {{.u={1,2,3}},{4}}, 5 }, .message_identity = { {{.u={1,2,3}},{4}}, 5 },
.related_message_identity = { {{.u={5,4,3}},{2}}, 1 }, .related_message_identity = { {{.u={5,4,3}},{2}}, 1 },
.destinaton_participant_guid = { {.u={2,3,4}},{5} }, .destination_participant_guid = { {.u={2,3,4}},{5} },
.destination_endpoint_guid = { {.u={3,4,5}},{6} }, .destination_endpoint_guid = { {.u={3,4,5}},{6} },
.source_endpoint_guid = { {.u={4,5,6}},{7} }, .source_endpoint_guid = { {.u={4,5,6}},{7} },
.message_class_id = "testing message", .message_class_id = "testing message",
.message_data = { .message_data = {
.n = 4, .n = 4,
.tags = (nn_dataholder_t[]) { .tags = (nn_dataholder_t[]) {
{ {
@ -283,7 +283,7 @@ CU_Test (ddsi_security_msg, serializer)
&msg_in, &msg_in,
&test_msg_in.message_identity.source_guid, &test_msg_in.message_identity.source_guid,
test_msg_in.message_identity.sequence_number, test_msg_in.message_identity.sequence_number,
&test_msg_in.destinaton_participant_guid, &test_msg_in.destination_participant_guid,
&test_msg_in.destination_endpoint_guid, &test_msg_in.destination_endpoint_guid,
&test_msg_in.source_endpoint_guid, &test_msg_in.source_endpoint_guid,
test_msg_in.message_class_id, test_msg_in.message_class_id,