let final messages on shutdown go onto the network

The discovery data is sent asynchronously (via the "tev" thread, i.e.,
via the "xeventq"), but any messages generated but not yet sent upon
destruction of the event queue were discarded rather than sent out.

Deleting the last participant triggers the shutdown of Cyclone, and the
participant discovery message informing the world of its disappearance
always got discarded.  Consequently, all other nodes would not become
aware of the disappearnce until the lease expired.

This commit changes the behaviour to send out those last few messages
and absent packet loss, all peers are now properly informed.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-05-10 18:15:45 +08:00 committed by eboasson
parent 3067a69c92
commit ed9406f642

View file

@ -158,6 +158,7 @@ static uint32_t xevent_thread (struct xeventq *xevq);
static nn_mtime_t earliest_in_xeventq (struct xeventq *evq); static nn_mtime_t earliest_in_xeventq (struct xeventq *evq);
static int msg_xevents_cmp (const void *a, const void *b); static int msg_xevents_cmp (const void *a, const void *b);
static int compare_xevent_tsched (const void *va, const void *vb); static int compare_xevent_tsched (const void *va, const void *vb);
static void handle_nontimed_xevent (struct xevent_nt *xev, struct nn_xpack *xp);
static const ddsrt_avl_treedef_t msg_xevents_treedef = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct xevent_nt, u.msg_rexmit.msg_avlnode), offsetof (struct xevent_nt, u.msg_rexmit.msg), msg_xevents_cmp, 0); static const ddsrt_avl_treedef_t msg_xevents_treedef = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct xevent_nt, u.msg_rexmit.msg_avlnode), offsetof (struct xevent_nt, u.msg_rexmit.msg), msg_xevents_cmp, 0);
@ -173,7 +174,7 @@ static int compare_xevent_tsched (const void *va, const void *vb)
static void update_rexmit_counts (struct xeventq *evq, struct xevent_nt *ev) static void update_rexmit_counts (struct xeventq *evq, struct xevent_nt *ev)
{ {
#if 0 #if 0
DDS_TRACE(("ZZZ(%p,%"PA_PRIuSIZE")", (void *) ev, ev->u.msg_rexmit.queued_rexmit_bytes); DDS_TRACE("ZZZ(%p,%"PRIuSIZE")", (void *) ev, ev->u.msg_rexmit.queued_rexmit_bytes);
#endif #endif
assert (ev->kind == XEVK_MSG_REXMIT); assert (ev->kind == XEVK_MSG_REXMIT);
assert (ev->u.msg_rexmit.queued_rexmit_bytes <= evq->queued_rexmit_bytes); assert (ev->u.msg_rexmit.queued_rexmit_bytes <= evq->queued_rexmit_bytes);
@ -315,26 +316,6 @@ static void free_xevent (struct xeventq *evq, struct xevent *ev)
ddsrt_free (ev); ddsrt_free (ev);
} }
static void free_xevent_nt (struct xeventq *evq, struct xevent_nt *ev)
{
assert (!nontimed_xevent_in_queue (evq, ev));
switch (ev->kind)
{
case XEVK_MSG:
nn_xmsg_free (ev->u.msg.msg);
break;
case XEVK_MSG_REXMIT:
assert (ddsrt_avl_lookup (&msg_xevents_treedef, &evq->msg_xevents, ev->u.msg_rexmit.msg) == NULL);
update_rexmit_counts (evq, ev);
nn_xmsg_free (ev->u.msg_rexmit.msg);
break;
case XEVK_ENTITYID:
nn_xmsg_free (ev->u.entityid.msg);
break;
}
ddsrt_free (ev);
}
void delete_xevent (struct xevent *ev) void delete_xevent (struct xevent *ev)
{ {
struct xeventq *evq = ev->evq; struct xeventq *evq = ev->evq;
@ -551,8 +532,22 @@ void xeventq_free (struct xeventq *evq)
} }
} }
} }
{
struct nn_xpack *xp = nn_xpack_new (evq->tev_conn, evq->auxiliary_bandwidth_limit, false);
thread_state_awake (lookup_thread_state ());
ddsrt_mutex_lock (&evq->lock);
while (!non_timed_xmit_list_is_empty (evq)) while (!non_timed_xmit_list_is_empty (evq))
free_xevent_nt (evq, getnext_from_non_timed_xmit_list (evq)); {
thread_state_awake_to_awake_no_nest (lookup_thread_state ());
handle_nontimed_xevent (getnext_from_non_timed_xmit_list (evq), xp);
}
ddsrt_mutex_unlock (&evq->lock);
thread_state_asleep (lookup_thread_state ());
nn_xpack_send (xp, false);
nn_xpack_free (xp);
}
assert (ddsrt_avl_is_empty (&evq->msg_xevents)); assert (ddsrt_avl_is_empty (&evq->msg_xevents));
ddsrt_cond_destroy (&evq->cond); ddsrt_cond_destroy (&evq->cond);
ddsrt_mutex_destroy (&evq->lock); ddsrt_mutex_destroy (&evq->lock);