diff --git a/src/core/ddsi/include/dds/ddsi/q_xmsg.h b/src/core/ddsi/include/dds/ddsi/q_xmsg.h index 2b423a2..0a282fe 100644 --- a/src/core/ddsi/include/dds/ddsi/q_xmsg.h +++ b/src/core/ddsi/include/dds/ddsi/q_xmsg.h @@ -43,7 +43,8 @@ struct nn_xmsg_marker { enum nn_xmsg_kind { NN_XMSG_KIND_CONTROL, NN_XMSG_KIND_DATA, - NN_XMSG_KIND_DATA_REXMIT + NN_XMSG_KIND_DATA_REXMIT, + NN_XMSG_KIND_DATA_REXMIT_NOMERGE }; /* XMSGPOOL */ diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index a248eaf..ae6de3c 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -103,6 +103,7 @@ enum xeventkind_nt { XEVK_MSG, XEVK_MSG_REXMIT, + XEVK_MSG_REXMIT_NOMERGE, XEVK_ENTITYID, XEVK_NT_CALLBACK }; @@ -126,7 +127,7 @@ struct xevent_nt struct nn_xmsg *msg; size_t queued_rexmit_bytes; ddsrt_avl_node_t msg_avlnode; - } msg_rexmit; + } msg_rexmit; /* and msg_rexmit_nomerge */ struct { /* xmsg is self-contained / relies on reference counts */ struct nn_xmsg *msg; @@ -178,7 +179,7 @@ static void update_rexmit_counts (struct xeventq *evq, struct xevent_nt *ev) #if 0 EVQTRACE ("ZZZ(%p,%"PRIuSIZE")", (void *) ev, ev->u.msg_rexmit.queued_rexmit_bytes); #endif - assert (ev->kind == XEVK_MSG_REXMIT); + assert (ev->kind == XEVK_MSG_REXMIT || ev->kind == XEVK_MSG_REXMIT_NOMERGE); assert (ev->u.msg_rexmit.queued_rexmit_bytes <= evq->queued_rexmit_bytes); assert (evq->queued_rexmit_msgs > 0); evq->queued_rexmit_bytes -= ev->u.msg_rexmit.queued_rexmit_bytes; @@ -205,9 +206,20 @@ static void trace_msg (UNUSED_ARG (struct xeventq *evq), UNUSED_ARG (const char static struct xevent_nt *lookup_msg (struct xeventq *evq, struct nn_xmsg *msg) { - assert (nn_xmsg_kind (msg) == NN_XMSG_KIND_DATA_REXMIT); - trace_msg (evq, "lookup-msg", msg); - return ddsrt_avl_lookup (&msg_xevents_treedef, &evq->msg_xevents, msg); + switch (nn_xmsg_kind (msg)) + { + case NN_XMSG_KIND_CONTROL: + case NN_XMSG_KIND_DATA: + assert (0); + return NULL; + case NN_XMSG_KIND_DATA_REXMIT: + trace_msg (evq, "lookup-msg", msg); + return ddsrt_avl_lookup (&msg_xevents_treedef, &evq->msg_xevents, msg); + case NN_XMSG_KIND_DATA_REXMIT_NOMERGE: + return NULL; + } + assert (0); + return NULL; } static void remember_msg (struct xeventq *evq, struct xevent_nt *ev) @@ -1324,6 +1336,7 @@ static void handle_individual_xevent_nt (struct xevent_nt *xev, struct nn_xpack handle_xevk_msg (xp, xev); break; case XEVK_MSG_REXMIT: + case XEVK_MSG_REXMIT_NOMERGE: handle_xevk_msg_rexmit (xp, xev); break; case XEVK_ENTITYID: @@ -1563,7 +1576,7 @@ int qxev_msg_rexmit_wrlock_held (struct xeventq *evq, struct nn_xmsg *msg, int f struct xevent_nt *ev; assert (evq); - assert (nn_xmsg_kind (msg) == NN_XMSG_KIND_DATA_REXMIT); + assert (nn_xmsg_kind (msg) == NN_XMSG_KIND_DATA_REXMIT || nn_xmsg_kind (msg) == NN_XMSG_KIND_DATA_REXMIT_NOMERGE); ddsrt_mutex_lock (&evq->lock); if ((ev = lookup_msg (evq, msg)) != NULL && nn_xmsg_merge_rexmit_destinations_wrlock_held (gv, ev->u.msg_rexmit.msg, msg)) { @@ -1587,7 +1600,9 @@ int qxev_msg_rexmit_wrlock_held (struct xeventq *evq, struct nn_xmsg *msg, int f } else { - ev = qxev_common_nt (evq, XEVK_MSG_REXMIT); + const enum xeventkind_nt kind = + (nn_xmsg_kind (msg) == NN_XMSG_KIND_DATA_REXMIT) ? XEVK_MSG_REXMIT : XEVK_MSG_REXMIT_NOMERGE; + ev = qxev_common_nt (evq, kind); ev->u.msg_rexmit.msg = msg; ev->u.msg_rexmit.queued_rexmit_bytes = msg_size; evq->queued_rexmit_bytes += msg_size; diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index d450100..70c70e5 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -429,6 +429,7 @@ static int submsg_is_compatible (const struct nn_xmsg *msg, SubmessageKind_t smk break; case NN_XMSG_KIND_DATA: case NN_XMSG_KIND_DATA_REXMIT: + case NN_XMSG_KIND_DATA_REXMIT_NOMERGE: switch (smkind) { case SMID_PAD: @@ -551,6 +552,12 @@ void nn_xmsg_submsg_remove(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker) { /* Just reset the message size to the start of the current sub-message. */ msg->sz = sm_marker.offset; + + /* Deleting the submessage means the readerId offset in a DATA_REXMIT message is no + longer valid. Converting the message kind to a _NOMERGE one ensures no subsequent + operation will assume its validity. */ + if (msg->kind == NN_XMSG_KIND_DATA_REXMIT) + msg->kind = NN_XMSG_KIND_DATA_REXMIT_NOMERGE; } void nn_xmsg_submsg_replace(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker, unsigned char *new_submsg, size_t new_len) @@ -573,6 +580,17 @@ void nn_xmsg_submsg_replace(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker /* Replace the sub-message. */ memcpy(msg->data->payload + sm_marker.offset, new_submsg, new_len); + + /* The replacement submessage may have undergone any transformation and so the readerId + offset in a DATA_REXMIT message is potentially no longer valid. Converting the + message kind to a _NOMERGE one ensures no subsequent operation will assume its + validity. This is used by the security implementation when encrypting and/or signing + messages and apart from the offset possibly no longer being valid (for which one + might conceivably be able to correct), there is also the issue that it may now be + meaningless junk or that rewriting it would make the receiver reject it as having + been tampered with. */ + if (msg->kind == NN_XMSG_KIND_DATA_REXMIT) + msg->kind = NN_XMSG_KIND_DATA_REXMIT_NOMERGE; } void nn_xmsg_submsg_append_refd_payload(struct nn_xmsg *msg, struct nn_xmsg_marker sm_marker) @@ -1614,6 +1632,7 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag break; case NN_XMSG_KIND_DATA: case NN_XMSG_KIND_DATA_REXMIT: + case NN_XMSG_KIND_DATA_REXMIT_NOMERGE: GVTRACE ("%s("PGUIDFMT":#%"PRId64"/%u)", (m->kind == NN_XMSG_KIND_DATA) ? "data" : "rexmit", PGUID (m->kindspecific.data.wrguid),