Refactored xevents callback deletion

Some changes in the xevents code for deleting callback events,
required for the lifespan QoS implementation. With these changes,
there is no longer a possibility of the callback still being
executing or getting invoked after delete_xevent_callback has
returned.

Signed-off-by: Dennis Potman <dennis.potman@adlinktech.com>
This commit is contained in:
Dennis Potman 2019-12-17 08:56:42 +01:00 committed by eboasson
parent bdb7f17053
commit 1ec9c3a194
5 changed files with 78 additions and 65 deletions

View file

@ -46,6 +46,9 @@ typedef struct {
int64_t v; int64_t v;
} nn_etime_t; } nn_etime_t;
#define NN_MTIME_NEVER ((nn_mtime_t) { T_NEVER })
#define NN_WCTIME_NEVER ((nn_wctime_t) { T_NEVER })
#define NN_ETIME_NEVER ((nn_etime_t) { T_NEVER })
#define NN_WCTIME_INVALID ((nn_wctime_t) { INT64_MIN }) #define NN_WCTIME_INVALID ((nn_wctime_t) { INT64_MIN })
int valid_ddsi_timestamp (ddsi_time_t t); int valid_ddsi_timestamp (ddsi_time_t t);

View file

@ -12,6 +12,9 @@
#ifndef NN_XEVENT_H #ifndef NN_XEVENT_H
#define NN_XEVENT_H #define NN_XEVENT_H
#include "dds/ddsrt/retcode.h"
#include "dds/ddsi/ddsi_guid.h"
#if defined (__cplusplus) #if defined (__cplusplus)
extern "C" { extern "C" {
#endif #endif
@ -30,6 +33,7 @@ struct xevent;
struct xeventq; struct xeventq;
struct proxy_writer; struct proxy_writer;
struct proxy_reader; struct proxy_reader;
struct nn_xmsg;
struct xeventq *xeventq_new struct xeventq *xeventq_new
( (
@ -55,6 +59,7 @@ DDS_EXPORT int qxev_msg_rexmit_wrlock_held (struct xeventq *evq, struct nn_xmsg
/* All of the following lock EVQ for the duration of the operation */ /* All of the following lock EVQ for the duration of the operation */
DDS_EXPORT void delete_xevent (struct xevent *ev); DDS_EXPORT void delete_xevent (struct xevent *ev);
DDS_EXPORT void delete_xevent_callback (struct xevent *ev);
DDS_EXPORT int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched); DDS_EXPORT int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched);
DDS_EXPORT struct xevent *qxev_heartbeat (struct xeventq *evq, nn_mtime_t tsched, const ddsi_guid_t *wr_guid); DDS_EXPORT struct xevent *qxev_heartbeat (struct xeventq *evq, nn_mtime_t tsched, const ddsi_guid_t *wr_guid);

View file

@ -1245,11 +1245,10 @@ static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn
GVLOGDISC (" known%s", vendor_is_cloud (vendorid) ? "-DS" : ""); GVLOGDISC (" known%s", vendor_is_cloud (vendorid) ? "-DS" : "");
if (vendor_is_cloud (vendorid) && pp->implicitly_created && memcmp(&pp->privileged_pp_guid.prefix, src_guid_prefix, sizeof(pp->privileged_pp_guid.prefix)) != 0) if (vendor_is_cloud (vendorid) && pp->implicitly_created && memcmp(&pp->privileged_pp_guid.prefix, src_guid_prefix, sizeof(pp->privileged_pp_guid.prefix)) != 0)
{ {
nn_etime_t never = { T_NEVER };
GVLOGDISC (" "PGUIDFMT" attach-to-DS "PGUIDFMT, PGUID(pp->e.guid), PGUIDPREFIX(*src_guid_prefix), pp->privileged_pp_guid.entityid.u); GVLOGDISC (" "PGUIDFMT" attach-to-DS "PGUIDFMT, PGUID(pp->e.guid), PGUIDPREFIX(*src_guid_prefix), pp->privileged_pp_guid.entityid.u);
ddsrt_mutex_lock (&pp->e.lock); ddsrt_mutex_lock (&pp->e.lock);
pp->privileged_pp_guid.prefix = *src_guid_prefix; pp->privileged_pp_guid.prefix = *src_guid_prefix;
lease_set_expiry(pp->lease, never); lease_set_expiry(pp->lease, NN_ETIME_NEVER);
ddsrt_mutex_unlock (&pp->e.lock); ddsrt_mutex_unlock (&pp->e.lock);
} }
GVLOGDISC ("\n"); GVLOGDISC ("\n");

View file

@ -761,7 +761,7 @@ static void wait_for_receive_threads (struct q_globals *gv)
} }
if (trigev) if (trigev)
{ {
delete_xevent (trigev); delete_xevent_callback (trigev);
} }
} }

View file

@ -94,6 +94,7 @@ struct xevent
struct { struct {
void (*cb) (struct xevent *ev, void *arg, nn_mtime_t tnow); void (*cb) (struct xevent *ev, void *arg, nn_mtime_t tnow);
void *arg; void *arg;
bool executing;
} callback; } callback;
} u; } u;
}; };
@ -232,7 +233,7 @@ static void add_to_non_timed_xmit_list (struct xeventq *evq, struct xevent_nt *e
if (ev->kind == XEVK_MSG_REXMIT) if (ev->kind == XEVK_MSG_REXMIT)
remember_msg (evq, ev); remember_msg (evq, ev);
ddsrt_cond_signal (&evq->cond); ddsrt_cond_broadcast (&evq->cond);
} }
static struct xevent_nt *getnext_from_non_timed_xmit_list (struct xeventq *evq) static struct xevent_nt *getnext_from_non_timed_xmit_list (struct xeventq *evq)
@ -316,6 +317,7 @@ void delete_xevent (struct xevent *ev)
{ {
struct xeventq *evq = ev->evq; struct xeventq *evq = ev->evq;
ddsrt_mutex_lock (&evq->lock); ddsrt_mutex_lock (&evq->lock);
assert (ev->kind != XEVK_CALLBACK || ev->u.callback.executing);
/* Can delete it only once, no matter how we implement it internally */ /* Can delete it only once, no matter how we implement it internally */
assert (ev->tsched.v != TSCHED_DELETE); assert (ev->tsched.v != TSCHED_DELETE);
assert (TSCHED_DELETE < ev->tsched.v); assert (TSCHED_DELETE < ev->tsched.v);
@ -331,27 +333,44 @@ void delete_xevent (struct xevent *ev)
} }
/* TSCHED_DELETE is absolute minimum time, so chances are we need to /* TSCHED_DELETE is absolute minimum time, so chances are we need to
wake up the thread. The superfluous signal is harmless. */ wake up the thread. The superfluous signal is harmless. */
ddsrt_cond_signal (&evq->cond); ddsrt_cond_broadcast (&evq->cond);
ddsrt_mutex_unlock (&evq->lock); ddsrt_mutex_unlock (&evq->lock);
} }
void delete_xevent_callback (struct xevent *ev)
{
struct xeventq *evq = ev->evq;
assert (ev->kind == XEVK_CALLBACK);
ddsrt_mutex_lock (&evq->lock);
if (ev->tsched.v != T_NEVER)
{
assert (ev->tsched.v != TSCHED_DELETE);
ddsrt_fibheap_delete (&evq_xevents_fhdef, &evq->xevents, ev);
ev->tsched.v = TSCHED_DELETE;
}
while (ev->u.callback.executing)
ddsrt_cond_wait (&evq->cond, &evq->lock);
ddsrt_mutex_unlock (&evq->lock);
free_xevent (evq, ev);
}
int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched) int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched)
{ {
struct xeventq *evq = ev->evq; struct xeventq *evq = ev->evq;
int is_resched; int is_resched;
if (tsched.v == T_NEVER)
return 0;
ddsrt_mutex_lock (&evq->lock); ddsrt_mutex_lock (&evq->lock);
assert (tsched.v != TSCHED_DELETE);
/* If you want to delete it, you to say so by calling the right /* If you want to delete it, you to say so by calling the right
function. Don't want to reschedule an event marked for deletion, function. Don't want to reschedule an event marked for deletion,
but with TSCHED_DELETE = MIN_INT64, tsched >= ev->tsched is but with TSCHED_DELETE = MIN_INT64, tsched >= ev->tsched is
guaranteed to be false. */ guaranteed to be false. */
assert (tsched.v > TSCHED_DELETE); assert (tsched.v != TSCHED_DELETE);
if (tsched.v >= ev->tsched.v) if (tsched.v >= ev->tsched.v)
is_resched = 0; is_resched = 0;
else else
{ {
nn_mtime_t tbefore = earliest_in_xeventq (evq); nn_mtime_t tbefore = earliest_in_xeventq (evq);
assert (tsched.v != T_NEVER);
if (ev->tsched.v != T_NEVER) if (ev->tsched.v != T_NEVER)
{ {
ev->tsched = tsched; ev->tsched = tsched;
@ -364,7 +383,7 @@ int resched_xevent_if_earlier (struct xevent *ev, nn_mtime_t tsched)
} }
is_resched = 1; is_resched = 1;
if (tsched.v < tbefore.v) if (tsched.v < tbefore.v)
ddsrt_cond_signal (&evq->cond); ddsrt_cond_broadcast (&evq->cond);
} }
ddsrt_mutex_unlock (&evq->lock); ddsrt_mutex_unlock (&evq->lock);
return is_resched; return is_resched;
@ -406,13 +425,7 @@ static nn_mtime_t earliest_in_xeventq (struct xeventq *evq)
{ {
struct xevent *min; struct xevent *min;
ASSERT_MUTEX_HELD (&evq->lock); ASSERT_MUTEX_HELD (&evq->lock);
if ((min = ddsrt_fibheap_min (&evq_xevents_fhdef, &evq->xevents)) != NULL) return ((min = ddsrt_fibheap_min (&evq_xevents_fhdef, &evq->xevents)) != NULL) ? min->tsched : NN_MTIME_NEVER;
return min->tsched;
else
{
nn_mtime_t r = { T_NEVER };
return r;
}
} }
static void qxev_insert (struct xevent *ev) static void qxev_insert (struct xevent *ev)
@ -426,7 +439,7 @@ static void qxev_insert (struct xevent *ev)
nn_mtime_t tbefore = earliest_in_xeventq (evq); nn_mtime_t tbefore = earliest_in_xeventq (evq);
ddsrt_fibheap_insert (&evq_xevents_fhdef, &evq->xevents, ev); ddsrt_fibheap_insert (&evq_xevents_fhdef, &evq->xevents, ev);
if (ev->tsched.v < tbefore.v) if (ev->tsched.v < tbefore.v)
ddsrt_cond_signal (&evq->cond); ddsrt_cond_broadcast (&evq->cond);
} }
} }
@ -502,7 +515,7 @@ void xeventq_stop (struct xeventq *evq)
assert (evq->ts != NULL); assert (evq->ts != NULL);
ddsrt_mutex_lock (&evq->lock); ddsrt_mutex_lock (&evq->lock);
evq->terminate = 1; evq->terminate = 1;
ddsrt_cond_signal (&evq->cond); ddsrt_cond_broadcast (&evq->cond);
ddsrt_mutex_unlock (&evq->lock); ddsrt_mutex_unlock (&evq->lock);
join_thread (evq->ts); join_thread (evq->ts);
evq->ts = NULL; evq->ts = NULL;
@ -513,22 +526,7 @@ void xeventq_free (struct xeventq *evq)
struct xevent *ev; struct xevent *ev;
assert (evq->ts == NULL); assert (evq->ts == NULL);
while ((ev = ddsrt_fibheap_extract_min (&evq_xevents_fhdef, &evq->xevents)) != NULL) while ((ev = ddsrt_fibheap_extract_min (&evq_xevents_fhdef, &evq->xevents)) != NULL)
{ free_xevent (evq, ev);
if (ev->tsched.v == TSCHED_DELETE || ev->kind != XEVK_CALLBACK)
free_xevent (evq, ev);
else
{
ev->tsched.v = T_NEVER;
ev->u.callback.cb (ev, ev->u.callback.arg, ev->tsched);
if (ev->tsched.v != TSCHED_DELETE)
{
union { void *v; void (*f) (struct xevent *ev, void *arg, nn_mtime_t tnow); } fp;
fp.f = ev->u.callback.cb;
DDS_CWARNING (&evq->gv->logconfig, "xeventq_free: callback %p did not schedule deletion as required, deleting event anyway\n", fp.v);
delete_xevent (ev);
}
}
}
{ {
struct nn_xpack *xp = nn_xpack_new (evq->tev_conn, evq->auxiliary_bandwidth_limit, false); struct nn_xpack *xp = nn_xpack_new (evq->tev_conn, evq->auxiliary_bandwidth_limit, false);
@ -1137,27 +1135,46 @@ static void handle_xevk_delete_writer (UNUSED_ARG (struct nn_xpack *xp), struct
static void handle_individual_xevent (struct thread_state1 * const ts1, struct xevent *xev, struct nn_xpack *xp, nn_mtime_t tnow) static void handle_individual_xevent (struct thread_state1 * const ts1, struct xevent *xev, struct nn_xpack *xp, nn_mtime_t tnow)
{ {
switch (xev->kind) struct xeventq *xevq = xev->evq;
/* We relinquish the lock while processing the event, but require it
held for administrative work. */
ASSERT_MUTEX_HELD (&xevq->lock);
if (xev->kind == XEVK_CALLBACK)
{ {
case XEVK_HEARTBEAT: xev->u.callback.executing = true;
handle_xevk_heartbeat (xp, xev, tnow); ddsrt_mutex_unlock (&xevq->lock);
break; xev->u.callback.cb (xev, xev->u.callback.arg, tnow);
case XEVK_ACKNACK: ddsrt_mutex_lock (&xevq->lock);
handle_xevk_acknack (xp, xev, tnow); xev->u.callback.executing = false;
break; ddsrt_cond_broadcast (&xevq->cond);
case XEVK_SPDP:
handle_xevk_spdp (xp, xev, tnow);
break;
case XEVK_PMD_UPDATE:
handle_xevk_pmd_update (ts1, xp, xev, tnow);
break;
case XEVK_DELETE_WRITER:
handle_xevk_delete_writer (xp, xev, tnow);
break;
case XEVK_CALLBACK:
xev->u.callback.cb (xev, xev->u.callback.arg, tnow);
break;
} }
else
{
ddsrt_mutex_unlock (&xevq->lock);
switch (xev->kind)
{
case XEVK_HEARTBEAT:
handle_xevk_heartbeat (xp, xev, tnow);
break;
case XEVK_ACKNACK:
handle_xevk_acknack (xp, xev, tnow);
break;
case XEVK_SPDP:
handle_xevk_spdp (xp, xev, tnow);
break;
case XEVK_PMD_UPDATE:
handle_xevk_pmd_update (ts1, xp, xev, tnow);
break;
case XEVK_DELETE_WRITER:
handle_xevk_delete_writer (xp, xev, tnow);
break;
case XEVK_CALLBACK:
assert (0);
break;
}
ddsrt_mutex_lock (&xevq->lock);
}
ASSERT_MUTEX_HELD (&xevq->lock);
} }
static void handle_individual_xevent_nt (struct xevent_nt *xev, struct nn_xpack *xp) static void handle_individual_xevent_nt (struct xevent_nt *xev, struct nn_xpack *xp)
@ -1181,20 +1198,8 @@ static void handle_timed_xevent (struct thread_state1 * const ts1, struct xevent
{ {
/* This function handles the individual xevent irrespective of /* This function handles the individual xevent irrespective of
whether it is a "timed" or "non-timed" xevent */ whether it is a "timed" or "non-timed" xevent */
struct xeventq *xevq = xev->evq;
/* We relinquish the lock while processing the event, but require it
held for administrative work. */
ASSERT_MUTEX_HELD (&xevq->lock);
assert (xev->evq == xevq);
assert (xev->tsched.v != TSCHED_DELETE); assert (xev->tsched.v != TSCHED_DELETE);
ddsrt_mutex_unlock (&xevq->lock);
handle_individual_xevent (ts1, xev, xp, tnow /* monotonic */); handle_individual_xevent (ts1, xev, xp, tnow /* monotonic */);
ddsrt_mutex_lock (&xevq->lock);
ASSERT_MUTEX_HELD (&xevq->lock);
} }
static void handle_nontimed_xevent (struct xevent_nt *xev, struct nn_xpack *xp) static void handle_nontimed_xevent (struct xevent_nt *xev, struct nn_xpack *xp)
@ -1517,6 +1522,7 @@ struct xevent *qxev_callback (struct xeventq *evq, nn_mtime_t tsched, void (*cb)
ev = qxev_common (evq, tsched, XEVK_CALLBACK); ev = qxev_common (evq, tsched, XEVK_CALLBACK);
ev->u.callback.cb = cb; ev->u.callback.cb = cb;
ev->u.callback.arg = arg; ev->u.callback.arg = arg;
ev->u.callback.executing = false;
qxev_insert (ev); qxev_insert (ev);
ddsrt_mutex_unlock (&evq->lock); ddsrt_mutex_unlock (&evq->lock);
return ev; return ev;