From ed9406f64254bf28a664ba4ded08ff7ee56ded28 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 10 May 2019 18:15:45 +0800 Subject: [PATCH] let final messages on shutdown go onto the network The discovery data is sent asynchronously (via the "tev" thread, i.e., via the "xeventq"), but any messages generated but not yet sent upon destruction of the event queue were discarded rather than sent out. Deleting the last participant triggers the shutdown of Cyclone, and the participant discovery message informing the world of its disappearance always got discarded. Consequently, all other nodes would not become aware of the disappearnce until the lease expired. This commit changes the behaviour to send out those last few messages and absent packet loss, all peers are now properly informed. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_xevent.c | 41 ++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index dc959c0..9eca691 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -158,6 +158,7 @@ static uint32_t xevent_thread (struct xeventq *xevq); static nn_mtime_t earliest_in_xeventq (struct xeventq *evq); static int msg_xevents_cmp (const void *a, const void *b); static int compare_xevent_tsched (const void *va, const void *vb); +static void handle_nontimed_xevent (struct xevent_nt *xev, struct nn_xpack *xp); static const ddsrt_avl_treedef_t msg_xevents_treedef = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct xevent_nt, u.msg_rexmit.msg_avlnode), offsetof (struct xevent_nt, u.msg_rexmit.msg), msg_xevents_cmp, 0); @@ -173,7 +174,7 @@ static int compare_xevent_tsched (const void *va, const void *vb) static void update_rexmit_counts (struct xeventq *evq, struct xevent_nt *ev) { #if 0 - DDS_TRACE(("ZZZ(%p,%"PA_PRIuSIZE")", (void *) ev, ev->u.msg_rexmit.queued_rexmit_bytes); + DDS_TRACE("ZZZ(%p,%"PRIuSIZE")", (void *) ev, ev->u.msg_rexmit.queued_rexmit_bytes); #endif assert (ev->kind == XEVK_MSG_REXMIT); assert (ev->u.msg_rexmit.queued_rexmit_bytes <= evq->queued_rexmit_bytes); @@ -315,26 +316,6 @@ static void free_xevent (struct xeventq *evq, struct xevent *ev) ddsrt_free (ev); } -static void free_xevent_nt (struct xeventq *evq, struct xevent_nt *ev) -{ - assert (!nontimed_xevent_in_queue (evq, ev)); - switch (ev->kind) - { - case XEVK_MSG: - nn_xmsg_free (ev->u.msg.msg); - break; - case XEVK_MSG_REXMIT: - assert (ddsrt_avl_lookup (&msg_xevents_treedef, &evq->msg_xevents, ev->u.msg_rexmit.msg) == NULL); - update_rexmit_counts (evq, ev); - nn_xmsg_free (ev->u.msg_rexmit.msg); - break; - case XEVK_ENTITYID: - nn_xmsg_free (ev->u.entityid.msg); - break; - } - ddsrt_free (ev); -} - void delete_xevent (struct xevent *ev) { struct xeventq *evq = ev->evq; @@ -551,8 +532,22 @@ void xeventq_free (struct xeventq *evq) } } } - while (!non_timed_xmit_list_is_empty(evq)) - free_xevent_nt (evq, getnext_from_non_timed_xmit_list (evq)); + + { + struct nn_xpack *xp = nn_xpack_new (evq->tev_conn, evq->auxiliary_bandwidth_limit, false); + thread_state_awake (lookup_thread_state ()); + ddsrt_mutex_lock (&evq->lock); + while (!non_timed_xmit_list_is_empty (evq)) + { + thread_state_awake_to_awake_no_nest (lookup_thread_state ()); + handle_nontimed_xevent (getnext_from_non_timed_xmit_list (evq), xp); + } + ddsrt_mutex_unlock (&evq->lock); + thread_state_asleep (lookup_thread_state ()); + nn_xpack_send (xp, false); + nn_xpack_free (xp); + } + assert (ddsrt_avl_is_empty (&evq->msg_xevents)); ddsrt_cond_destroy (&evq->cond); ddsrt_mutex_destroy (&evq->lock);