From a0d7b71e52e981a0146e577cfd40f10b466dd5df Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 6 May 2020 21:37:23 +0200 Subject: [PATCH] Atomic update next heartbeat time for p2p writers Sending a heartbeat to all matched readers for the P2P builtin participant volatile secure writer unlocks the writer before pushing each individual message out, and so determining the time of the next heartbeat event before writing and updating it afterwards means the state may have changed. While this is appears benign, it is better to do the update atomically. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_xevent.c | 40 ++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index b535bbd..443a9fc 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -620,6 +620,31 @@ static void handle_xevk_entityid (struct nn_xpack *xp, struct xevent_nt *ev) } #ifdef DDSI_INCLUDE_SECURITY +static int send_heartbeat_to_all_readers_check_and_sched (struct xevent *ev, struct writer *wr, const struct whc_state *whcst, ddsrt_mtime_t tnow, ddsrt_mtime_t *t_next) +{ + int send; + if (!writer_must_have_hb_scheduled (wr, whcst)) + { + wr->hbcontrol.tsched = DDSRT_MTIME_NEVER; + send = -1; + } + else if (!writer_hbcontrol_must_send (wr, whcst, tnow)) + { + wr->hbcontrol.tsched = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, whcst, tnow)); + send = -1; + } + else + { + const int hbansreq = writer_hbcontrol_ack_required (wr, whcst, tnow); + wr->hbcontrol.tsched = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, whcst, tnow)); + send = hbansreq; + } + + resched_xevent_if_earlier (ev, wr->hbcontrol.tsched); + *t_next = wr->hbcontrol.tsched; + return send; +} + static void send_heartbeat_to_all_readers (struct nn_xpack *xp, struct xevent *ev, struct writer *wr, ddsrt_mtime_t tnow) { struct whc_state whcst; @@ -629,17 +654,11 @@ static void send_heartbeat_to_all_readers (struct nn_xpack *xp, struct xevent *e ddsrt_mutex_lock (&wr->e.lock); whc_get_state(wr->whc, &whcst); - - if (!writer_must_have_hb_scheduled (wr, &whcst)) - t_next = DDSRT_MTIME_NEVER; - else if (!writer_hbcontrol_must_send (wr, &whcst, tnow)) - t_next = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, &whcst, tnow)); - else + const int hbansreq = send_heartbeat_to_all_readers_check_and_sched (ev, wr, &whcst, tnow, &t_next); + if (hbansreq >= 0) { struct wr_prd_match *m; struct ddsi_guid last_guid = { .prefix = {.u = {0,0,0}}, .entityid = {0} }; - const int hbansreq = writer_hbcontrol_ack_required (wr, &whcst, tnow); - t_next = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, &whcst, tnow)); while ((m = ddsrt_avl_lookup_succ (&wr_readers_treedef, &wr->readers, &last_guid)) != NULL) { @@ -669,16 +688,11 @@ static void send_heartbeat_to_all_readers (struct nn_xpack *xp, struct xevent *e count++; } } - } } - resched_xevent_if_earlier (ev, t_next); - wr->hbcontrol.tsched = t_next; - if (count == 0) { - (void)resched_xevent_if_earlier (ev, t_next); ETRACE (wr, "heartbeat(wr "PGUIDFMT") suppressed, resched in %g s (min-ack %"PRId64"%s, avail-seq %"PRId64", xmit %"PRId64")\n", PGUID (wr->e.guid), (t_next.v == DDS_NEVER) ? INFINITY : (double)(t_next.v - tnow.v) / 1e9,