diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index dc959c0..9eca691 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -158,6 +158,7 @@ static uint32_t xevent_thread (struct xeventq *xevq); static nn_mtime_t earliest_in_xeventq (struct xeventq *evq); static int msg_xevents_cmp (const void *a, const void *b); static int compare_xevent_tsched (const void *va, const void *vb); +static void handle_nontimed_xevent (struct xevent_nt *xev, struct nn_xpack *xp); static const ddsrt_avl_treedef_t msg_xevents_treedef = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct xevent_nt, u.msg_rexmit.msg_avlnode), offsetof (struct xevent_nt, u.msg_rexmit.msg), msg_xevents_cmp, 0); @@ -173,7 +174,7 @@ static int compare_xevent_tsched (const void *va, const void *vb) static void update_rexmit_counts (struct xeventq *evq, struct xevent_nt *ev) { #if 0 - DDS_TRACE(("ZZZ(%p,%"PA_PRIuSIZE")", (void *) ev, ev->u.msg_rexmit.queued_rexmit_bytes); + DDS_TRACE("ZZZ(%p,%"PRIuSIZE")", (void *) ev, ev->u.msg_rexmit.queued_rexmit_bytes); #endif assert (ev->kind == XEVK_MSG_REXMIT); assert (ev->u.msg_rexmit.queued_rexmit_bytes <= evq->queued_rexmit_bytes); @@ -315,26 +316,6 @@ static void free_xevent (struct xeventq *evq, struct xevent *ev) ddsrt_free (ev); } -static void free_xevent_nt (struct xeventq *evq, struct xevent_nt *ev) -{ - assert (!nontimed_xevent_in_queue (evq, ev)); - switch (ev->kind) - { - case XEVK_MSG: - nn_xmsg_free (ev->u.msg.msg); - break; - case XEVK_MSG_REXMIT: - assert (ddsrt_avl_lookup (&msg_xevents_treedef, &evq->msg_xevents, ev->u.msg_rexmit.msg) == NULL); - update_rexmit_counts (evq, ev); - nn_xmsg_free (ev->u.msg_rexmit.msg); - break; - case XEVK_ENTITYID: - nn_xmsg_free (ev->u.entityid.msg); - break; - } - ddsrt_free (ev); -} - void delete_xevent (struct xevent *ev) { struct xeventq *evq = ev->evq; @@ -551,8 +532,22 @@ void xeventq_free (struct xeventq *evq) } } } - while (!non_timed_xmit_list_is_empty(evq)) - free_xevent_nt (evq, getnext_from_non_timed_xmit_list (evq)); + + { + struct nn_xpack *xp = nn_xpack_new (evq->tev_conn, evq->auxiliary_bandwidth_limit, false); + thread_state_awake (lookup_thread_state ()); + ddsrt_mutex_lock (&evq->lock); + while (!non_timed_xmit_list_is_empty (evq)) + { + thread_state_awake_to_awake_no_nest (lookup_thread_state ()); + handle_nontimed_xevent (getnext_from_non_timed_xmit_list (evq), xp); + } + ddsrt_mutex_unlock (&evq->lock); + thread_state_asleep (lookup_thread_state ()); + nn_xpack_send (xp, false); + nn_xpack_free (xp); + } + assert (ddsrt_avl_is_empty (&evq->msg_xevents)); ddsrt_cond_destroy (&evq->cond); ddsrt_mutex_destroy (&evq->lock);