diff --git a/src/core/ddsi/include/dds/ddsi/q_time.h b/src/core/ddsi/include/dds/ddsi/q_time.h index 7bdd5b0..3b22bee 100644 --- a/src/core/ddsi/include/dds/ddsi/q_time.h +++ b/src/core/ddsi/include/dds/ddsi/q_time.h @@ -46,6 +46,9 @@ typedef struct { int64_t v; } nn_etime_t; +#define NN_MTIME_NEVER ((nn_mtime_t) { T_NEVER }) +#define NN_WCTIME_NEVER ((nn_wctime_t) { T_NEVER }) +#define NN_ETIME_NEVER ((nn_etime_t) { T_NEVER }) #define NN_WCTIME_INVALID ((nn_wctime_t) { INT64_MIN }) int valid_ddsi_timestamp (ddsi_time_t t); diff --git a/src/core/ddsi/include/dds/ddsi/q_xevent.h b/src/core/ddsi/include/dds/ddsi/q_xevent.h index ab25b3e..6f43bac 100644 --- a/src/core/ddsi/include/dds/ddsi/q_xevent.h +++ b/src/core/ddsi/include/dds/ddsi/q_xevent.h @@ -12,6 +12,9 @@ #ifndef NN_XEVENT_H #define NN_XEVENT_H +#include "dds/ddsrt/retcode.h" +#include "dds/ddsi/ddsi_guid.h" + #if defined (__cplusplus) extern "C" { #endif @@ -30,6 +33,7 @@ struct xevent; struct xeventq; struct proxy_writer; struct proxy_reader; +struct nn_xmsg; struct xeventq *xeventq_new ( @@ -55,6 +59,7 @@ DDS_EXPORT int qxev_msg_rexmit_wrlock_held (struct xeventq *evq, struct nn_xmsg /* All of the following lock EVQ for the duration of the operation */ DDS_EXPORT void delete_xevent (struct xevent *ev); +DDS_EXPORT void delete_xevent_callback (struct xevent *ev); DDS_EXPORT int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched); DDS_EXPORT struct xevent *qxev_heartbeat (struct xeventq *evq, nn_mtime_t tsched, const ddsi_guid_t *wr_guid); diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index 6b3603d..e135e74 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -1245,11 +1245,10 @@ static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn GVLOGDISC (" known%s", vendor_is_cloud (vendorid) ? "-DS" : ""); if (vendor_is_cloud (vendorid) && pp->implicitly_created && memcmp(&pp->privileged_pp_guid.prefix, src_guid_prefix, sizeof(pp->privileged_pp_guid.prefix)) != 0) { - nn_etime_t never = { T_NEVER }; GVLOGDISC (" "PGUIDFMT" attach-to-DS "PGUIDFMT, PGUID(pp->e.guid), PGUIDPREFIX(*src_guid_prefix), pp->privileged_pp_guid.entityid.u); ddsrt_mutex_lock (&pp->e.lock); pp->privileged_pp_guid.prefix = *src_guid_prefix; - lease_set_expiry(pp->lease, never); + lease_set_expiry(pp->lease, NN_ETIME_NEVER); ddsrt_mutex_unlock (&pp->e.lock); } GVLOGDISC ("\n"); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index d807559..5365f4d 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -761,7 +761,7 @@ static void wait_for_receive_threads (struct q_globals *gv) } if (trigev) { - delete_xevent (trigev); + delete_xevent_callback (trigev); } } diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 0388852..69826a2 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -94,6 +94,7 @@ struct xevent struct { void (*cb) (struct xevent *ev, void *arg, nn_mtime_t tnow); void *arg; + bool executing; } callback; } u; }; @@ -232,7 +233,7 @@ static void add_to_non_timed_xmit_list (struct xeventq *evq, struct xevent_nt *e if (ev->kind == XEVK_MSG_REXMIT) remember_msg (evq, ev); - ddsrt_cond_signal (&evq->cond); + ddsrt_cond_broadcast (&evq->cond); } static struct xevent_nt *getnext_from_non_timed_xmit_list (struct xeventq *evq) @@ -316,6 +317,7 @@ void delete_xevent (struct xevent *ev) { struct xeventq *evq = ev->evq; ddsrt_mutex_lock (&evq->lock); + assert (ev->kind != XEVK_CALLBACK || ev->u.callback.executing); /* Can delete it only once, no matter how we implement it internally */ assert (ev->tsched.v != TSCHED_DELETE); assert (TSCHED_DELETE < ev->tsched.v); @@ -331,27 +333,44 @@ void delete_xevent (struct xevent *ev) } /* TSCHED_DELETE is absolute minimum time, so chances are we need to wake up the thread. The superfluous signal is harmless. */ - ddsrt_cond_signal (&evq->cond); + ddsrt_cond_broadcast (&evq->cond); ddsrt_mutex_unlock (&evq->lock); } +void delete_xevent_callback (struct xevent *ev) +{ + struct xeventq *evq = ev->evq; + assert (ev->kind == XEVK_CALLBACK); + ddsrt_mutex_lock (&evq->lock); + if (ev->tsched.v != T_NEVER) + { + assert (ev->tsched.v != TSCHED_DELETE); + ddsrt_fibheap_delete (&evq_xevents_fhdef, &evq->xevents, ev); + ev->tsched.v = TSCHED_DELETE; + } + while (ev->u.callback.executing) + ddsrt_cond_wait (&evq->cond, &evq->lock); + ddsrt_mutex_unlock (&evq->lock); + free_xevent (evq, ev); +} + int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched) { struct xeventq *evq = ev->evq; int is_resched; + if (tsched.v == T_NEVER) + return 0; ddsrt_mutex_lock (&evq->lock); - assert (tsched.v != TSCHED_DELETE); /* If you want to delete it, you to say so by calling the right function. Don't want to reschedule an event marked for deletion, but with TSCHED_DELETE = MIN_INT64, tsched >= ev->tsched is guaranteed to be false. */ - assert (tsched.v > TSCHED_DELETE); + assert (tsched.v != TSCHED_DELETE); if (tsched.v >= ev->tsched.v) is_resched = 0; else { nn_mtime_t tbefore = earliest_in_xeventq (evq); - assert (tsched.v != T_NEVER); if (ev->tsched.v != T_NEVER) { ev->tsched = tsched; @@ -364,7 +383,7 @@ int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched) } is_resched = 1; if (tsched.v < tbefore.v) - ddsrt_cond_signal (&evq->cond); + ddsrt_cond_broadcast (&evq->cond); } ddsrt_mutex_unlock (&evq->lock); return is_resched; @@ -406,13 +425,7 @@ static nn_mtime_t earliest_in_xeventq (struct xeventq *evq) { struct xevent *min; ASSERT_MUTEX_HELD (&evq->lock); - if ((min = ddsrt_fibheap_min (&evq_xevents_fhdef, &evq->xevents)) != NULL) - return min->tsched; - else - { - nn_mtime_t r = { T_NEVER }; - return r; - } + return ((min = ddsrt_fibheap_min (&evq_xevents_fhdef, &evq->xevents)) != NULL) ? min->tsched : NN_MTIME_NEVER; } static void qxev_insert (struct xevent *ev) @@ -426,7 +439,7 @@ static void qxev_insert (struct xevent *ev) nn_mtime_t tbefore = earliest_in_xeventq (evq); ddsrt_fibheap_insert (&evq_xevents_fhdef, &evq->xevents, ev); if (ev->tsched.v < tbefore.v) - ddsrt_cond_signal (&evq->cond); + ddsrt_cond_broadcast (&evq->cond); } } @@ -502,7 +515,7 @@ void xeventq_stop (struct xeventq *evq) assert (evq->ts != NULL); ddsrt_mutex_lock (&evq->lock); evq->terminate = 1; - ddsrt_cond_signal (&evq->cond); + ddsrt_cond_broadcast (&evq->cond); ddsrt_mutex_unlock (&evq->lock); join_thread (evq->ts); evq->ts = NULL; @@ -513,22 +526,7 @@ void xeventq_free (struct xeventq *evq) struct xevent *ev; assert (evq->ts == NULL); while ((ev = ddsrt_fibheap_extract_min (&evq_xevents_fhdef, &evq->xevents)) != NULL) - { - if (ev->tsched.v == TSCHED_DELETE || ev->kind != XEVK_CALLBACK) - free_xevent (evq, ev); - else - { - ev->tsched.v = T_NEVER; - ev->u.callback.cb (ev, ev->u.callback.arg, ev->tsched); - if (ev->tsched.v != TSCHED_DELETE) - { - union { void *v; void (*f) (struct xevent *ev, void *arg, nn_mtime_t tnow); } fp; - fp.f = ev->u.callback.cb; - DDS_CWARNING (&evq->gv->logconfig, "xeventq_free: callback %p did not schedule deletion as required, deleting event anyway\n", fp.v); - delete_xevent (ev); - } - } - } + free_xevent (evq, ev); { struct nn_xpack *xp = nn_xpack_new (evq->tev_conn, evq->auxiliary_bandwidth_limit, false); @@ -1137,27 +1135,46 @@ static void handle_xevk_delete_writer (UNUSED_ARG (struct nn_xpack *xp), struct static void handle_individual_xevent (struct thread_state1 * const ts1, struct xevent *xev, struct nn_xpack *xp, nn_mtime_t tnow) { - switch (xev->kind) + struct xeventq *xevq = xev->evq; + /* We relinquish the lock while processing the event, but require it + held for administrative work. */ + ASSERT_MUTEX_HELD (&xevq->lock); + if (xev->kind == XEVK_CALLBACK) { - case XEVK_HEARTBEAT: - handle_xevk_heartbeat (xp, xev, tnow); - break; - case XEVK_ACKNACK: - handle_xevk_acknack (xp, xev, tnow); - break; - case XEVK_SPDP: - handle_xevk_spdp (xp, xev, tnow); - break; - case XEVK_PMD_UPDATE: - handle_xevk_pmd_update (ts1, xp, xev, tnow); - break; - case XEVK_DELETE_WRITER: - handle_xevk_delete_writer (xp, xev, tnow); - break; - case XEVK_CALLBACK: - xev->u.callback.cb (xev, xev->u.callback.arg, tnow); - break; + xev->u.callback.executing = true; + ddsrt_mutex_unlock (&xevq->lock); + xev->u.callback.cb (xev, xev->u.callback.arg, tnow); + ddsrt_mutex_lock (&xevq->lock); + xev->u.callback.executing = false; + ddsrt_cond_broadcast (&xevq->cond); } + else + { + ddsrt_mutex_unlock (&xevq->lock); + switch (xev->kind) + { + case XEVK_HEARTBEAT: + handle_xevk_heartbeat (xp, xev, tnow); + break; + case XEVK_ACKNACK: + handle_xevk_acknack (xp, xev, tnow); + break; + case XEVK_SPDP: + handle_xevk_spdp (xp, xev, tnow); + break; + case XEVK_PMD_UPDATE: + handle_xevk_pmd_update (ts1, xp, xev, tnow); + break; + case XEVK_DELETE_WRITER: + handle_xevk_delete_writer (xp, xev, tnow); + break; + case XEVK_CALLBACK: + assert (0); + break; + } + ddsrt_mutex_lock (&xevq->lock); + } + ASSERT_MUTEX_HELD (&xevq->lock); } static void handle_individual_xevent_nt (struct xevent_nt *xev, struct nn_xpack *xp) @@ -1181,20 +1198,8 @@ static void handle_timed_xevent (struct thread_state1 * const ts1, struct xevent { /* This function handles the individual xevent irrespective of whether it is a "timed" or "non-timed" xevent */ - struct xeventq *xevq = xev->evq; - - /* We relinquish the lock while processing the event, but require it - held for administrative work. */ - ASSERT_MUTEX_HELD (&xevq->lock); - - assert (xev->evq == xevq); assert (xev->tsched.v != TSCHED_DELETE); - - ddsrt_mutex_unlock (&xevq->lock); handle_individual_xevent (ts1, xev, xp, tnow /* monotonic */); - ddsrt_mutex_lock (&xevq->lock); - - ASSERT_MUTEX_HELD (&xevq->lock); } static void handle_nontimed_xevent (struct xevent_nt *xev, struct nn_xpack *xp) @@ -1517,6 +1522,7 @@ struct xevent *qxev_callback (struct xeventq *evq, nn_mtime_t tsched, void (*cb) ev = qxev_common (evq, tsched, XEVK_CALLBACK); ev->u.callback.cb = cb; ev->u.callback.arg = arg; + ev->u.callback.executing = false; qxev_insert (ev); ddsrt_mutex_unlock (&evq->lock); return ev;