Atomic update next heartbeat time for p2p writers

Sending a heartbeat to all matched readers for the P2P builtin
participant volatile secure writer unlocks the writer before pushing
each individual message out, and so determining the time of the next
heartbeat event before writing and updating it afterwards means the
state may have changed.  While this is appears benign, it is better to
do the update atomically.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-05-06 21:37:23 +02:00 committed by eboasson
parent 58e02d0b12
commit a0d7b71e52

View file

@ -620,6 +620,31 @@ static void handle_xevk_entityid (struct nn_xpack *xp, struct xevent_nt *ev)
} }
#ifdef DDSI_INCLUDE_SECURITY #ifdef DDSI_INCLUDE_SECURITY
static int send_heartbeat_to_all_readers_check_and_sched (struct xevent *ev, struct writer *wr, const struct whc_state *whcst, ddsrt_mtime_t tnow, ddsrt_mtime_t *t_next)
{
int send;
if (!writer_must_have_hb_scheduled (wr, whcst))
{
wr->hbcontrol.tsched = DDSRT_MTIME_NEVER;
send = -1;
}
else if (!writer_hbcontrol_must_send (wr, whcst, tnow))
{
wr->hbcontrol.tsched = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, whcst, tnow));
send = -1;
}
else
{
const int hbansreq = writer_hbcontrol_ack_required (wr, whcst, tnow);
wr->hbcontrol.tsched = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, whcst, tnow));
send = hbansreq;
}
resched_xevent_if_earlier (ev, wr->hbcontrol.tsched);
*t_next = wr->hbcontrol.tsched;
return send;
}
static void send_heartbeat_to_all_readers (struct nn_xpack *xp, struct xevent *ev, struct writer *wr, ddsrt_mtime_t tnow) static void send_heartbeat_to_all_readers (struct nn_xpack *xp, struct xevent *ev, struct writer *wr, ddsrt_mtime_t tnow)
{ {
struct whc_state whcst; struct whc_state whcst;
@ -629,17 +654,11 @@ static void send_heartbeat_to_all_readers (struct nn_xpack *xp, struct xevent *e
ddsrt_mutex_lock (&wr->e.lock); ddsrt_mutex_lock (&wr->e.lock);
whc_get_state(wr->whc, &whcst); whc_get_state(wr->whc, &whcst);
const int hbansreq = send_heartbeat_to_all_readers_check_and_sched (ev, wr, &whcst, tnow, &t_next);
if (!writer_must_have_hb_scheduled (wr, &whcst)) if (hbansreq >= 0)
t_next = DDSRT_MTIME_NEVER;
else if (!writer_hbcontrol_must_send (wr, &whcst, tnow))
t_next = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, &whcst, tnow));
else
{ {
struct wr_prd_match *m; struct wr_prd_match *m;
struct ddsi_guid last_guid = { .prefix = {.u = {0,0,0}}, .entityid = {0} }; struct ddsi_guid last_guid = { .prefix = {.u = {0,0,0}}, .entityid = {0} };
const int hbansreq = writer_hbcontrol_ack_required (wr, &whcst, tnow);
t_next = ddsrt_mtime_add_duration (tnow, writer_hbcontrol_intv (wr, &whcst, tnow));
while ((m = ddsrt_avl_lookup_succ (&wr_readers_treedef, &wr->readers, &last_guid)) != NULL) while ((m = ddsrt_avl_lookup_succ (&wr_readers_treedef, &wr->readers, &last_guid)) != NULL)
{ {
@ -669,16 +688,11 @@ static void send_heartbeat_to_all_readers (struct nn_xpack *xp, struct xevent *e
count++; count++;
} }
} }
} }
} }
resched_xevent_if_earlier (ev, t_next);
wr->hbcontrol.tsched = t_next;
if (count == 0) if (count == 0)
{ {
(void)resched_xevent_if_earlier (ev, t_next);
ETRACE (wr, "heartbeat(wr "PGUIDFMT") suppressed, resched in %g s (min-ack %"PRId64"%s, avail-seq %"PRId64", xmit %"PRId64")\n", ETRACE (wr, "heartbeat(wr "PGUIDFMT") suppressed, resched in %g s (min-ack %"PRId64"%s, avail-seq %"PRId64", xmit %"PRId64")\n",
PGUID (wr->e.guid), PGUID (wr->e.guid),
(t_next.v == DDS_NEVER) ? INFINITY : (double)(t_next.v - tnow.v) / 1e9, (t_next.v == DDS_NEVER) ? INFINITY : (double)(t_next.v - tnow.v) / 1e9,