Update leases with atomic ops instead of hashing leases to lease_locks

Lease_renew is the key one, and that one only ever shifts the lease
expiry to the future, providing the lease hasn't expired already.  All
other operations work within leaseheap_lock.

Other updates to lease end time are in set_expiry (which is used in some
special cases).  So ... the number of ways this can go wrong is rather
limited.
This commit is contained in:
Erik Boasson 2019-06-12 13:46:06 +02:00 committed by eboasson
parent 559c325307
commit 9c2f3bdf2b
2 changed files with 31 additions and 67 deletions

View file

@ -64,9 +64,6 @@ enum recvips_mode {
RECVIPS_MODE_SOME /* explicit list of interfaces; only one requiring recvips */ RECVIPS_MODE_SOME /* explicit list of interfaces; only one requiring recvips */
}; };
#define N_LEASE_LOCKS_LG2 4
#define N_LEASE_LOCKS ((int) (1 << N_LEASE_LOCKS_LG2))
enum recv_thread_mode { enum recv_thread_mode {
RTM_SINGLE, RTM_SINGLE,
RTM_MANY RTM_MANY
@ -109,7 +106,6 @@ struct q_globals {
/* Lease junk */ /* Lease junk */
ddsrt_mutex_t leaseheap_lock; ddsrt_mutex_t leaseheap_lock;
ddsrt_mutex_t lease_locks[N_LEASE_LOCKS];
ddsrt_fibheap_t leaseheap; ddsrt_fibheap_t leaseheap;
/* Transport factory */ /* Transport factory */

View file

@ -45,9 +45,9 @@
struct lease { struct lease {
ddsrt_fibheap_node_t heapnode; ddsrt_fibheap_node_t heapnode;
nn_etime_t tsched; /* access guarded by leaseheap_lock */ nn_etime_t tsched; /* access guarded by leaseheap_lock */
nn_etime_t tend; /* access guarded by lock_lease/unlock_lease */ ddsrt_atomic_uint64_t tend; /* really an nn_etime_t */
dds_duration_t tdur; /* constant (renew depends on it) */ dds_duration_t tdur; /* constant (renew depends on it) */
struct entity_common *entity; /* constant */ struct entity_common *entity; /* constant */
}; };
@ -69,40 +69,16 @@ static int compare_lease_tsched (const void *va, const void *vb)
void lease_management_init (void) void lease_management_init (void)
{ {
int i;
ddsrt_mutex_init (&gv.leaseheap_lock); ddsrt_mutex_init (&gv.leaseheap_lock);
for (i = 0; i < N_LEASE_LOCKS; i++)
ddsrt_mutex_init (&gv.lease_locks[i]);
ddsrt_fibheap_init (&lease_fhdef, &gv.leaseheap); ddsrt_fibheap_init (&lease_fhdef, &gv.leaseheap);
} }
void lease_management_term (void) void lease_management_term (void)
{ {
int i;
assert (ddsrt_fibheap_min (&lease_fhdef, &gv.leaseheap) == NULL); assert (ddsrt_fibheap_min (&lease_fhdef, &gv.leaseheap) == NULL);
for (i = 0; i < N_LEASE_LOCKS; i++)
ddsrt_mutex_destroy (&gv.lease_locks[i]);
ddsrt_mutex_destroy (&gv.leaseheap_lock); ddsrt_mutex_destroy (&gv.leaseheap_lock);
} }
static ddsrt_mutex_t *lock_lease_addr (struct lease const * const l)
{
uint32_t u = (uint16_t) ((uintptr_t) l >> 3);
uint32_t v = u * 0xb4817365;
uint32_t idx = v >> (32 - N_LEASE_LOCKS_LG2);
return &gv.lease_locks[idx];
}
static void lock_lease (const struct lease *l)
{
ddsrt_mutex_lock (lock_lease_addr (l));
}
static void unlock_lease (const struct lease *l)
{
ddsrt_mutex_unlock (lock_lease_addr (l));
}
struct lease *lease_new (nn_etime_t texpire, dds_duration_t tdur, struct entity_common *e) struct lease *lease_new (nn_etime_t texpire, dds_duration_t tdur, struct entity_common *e)
{ {
struct lease *l; struct lease *l;
@ -110,7 +86,7 @@ struct lease *lease_new (nn_etime_t texpire, dds_duration_t tdur, struct entity_
return NULL; return NULL;
DDS_TRACE("lease_new(tdur %"PRId64" guid "PGUIDFMT") @ %p\n", tdur, PGUID (e->guid), (void *) l); DDS_TRACE("lease_new(tdur %"PRId64" guid "PGUIDFMT") @ %p\n", tdur, PGUID (e->guid), (void *) l);
l->tdur = tdur; l->tdur = tdur;
l->tend = texpire; ddsrt_atomic_st64 (&l->tend, (uint64_t) texpire.v);
l->tsched.v = TSCHED_NOT_ON_HEAP; l->tsched.v = TSCHED_NOT_ON_HEAP;
l->entity = e; l->entity = e;
return l; return l;
@ -120,14 +96,13 @@ void lease_register (struct lease *l)
{ {
DDS_TRACE("lease_register(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid)); DDS_TRACE("lease_register(l %p guid "PGUIDFMT")\n", (void *) l, PGUID (l->entity->guid));
ddsrt_mutex_lock (&gv.leaseheap_lock); ddsrt_mutex_lock (&gv.leaseheap_lock);
lock_lease (l);
assert (l->tsched.v == TSCHED_NOT_ON_HEAP); assert (l->tsched.v == TSCHED_NOT_ON_HEAP);
if (l->tend.v != T_NEVER) int64_t tend = (int64_t) ddsrt_atomic_ld64 (&l->tend);
if (tend != T_NEVER)
{ {
l->tsched = l->tend; l->tsched.v = tend;
ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l); ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l);
} }
unlock_lease (l);
ddsrt_mutex_unlock (&gv.leaseheap_lock); ddsrt_mutex_unlock (&gv.leaseheap_lock);
/* check_and_handle_lease_expiration runs on GC thread and the only way to be sure that it wakes up in time is by forcing re-evaluation (strictly speaking only needed if this is the first lease to expire, but this operation is quite rare anyway) */ /* check_and_handle_lease_expiration runs on GC thread and the only way to be sure that it wakes up in time is by forcing re-evaluation (strictly speaking only needed if this is the first lease to expire, but this operation is quite rare anyway) */
@ -150,19 +125,16 @@ void lease_free (struct lease *l)
void lease_renew (struct lease *l, nn_etime_t tnowE) void lease_renew (struct lease *l, nn_etime_t tnowE)
{ {
nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur); nn_etime_t tend_new = add_duration_to_etime (tnowE, l->tdur);
int did_update;
lock_lease (l);
/* do not touch tend if moving forward or if already expired */
if (tend_new.v <= l->tend.v || tnowE.v >= l->tend.v)
did_update = 0;
else
{
l->tend = tend_new;
did_update = 1;
}
unlock_lease (l);
if (did_update && (dds_get_log_mask() & DDS_LC_TRACE)) /* do not touch tend if moving forward or if already expired */
int64_t tend;
do {
tend = (int64_t) ddsrt_atomic_ld64 (&l->tend);
if (tend_new.v <= tend || tnowE.v >= tend)
return;
} while (!ddsrt_atomic_cas64 (&l->tend, (uint64_t) tend, (uint64_t) tend_new.v));
if ((dds_get_log_mask() & DDS_LC_TRACE))
{ {
int32_t tsec, tusec; int32_t tsec, tusec;
DDS_TRACE(" L("); DDS_TRACE(" L(");
@ -180,24 +152,24 @@ void lease_set_expiry (struct lease *l, nn_etime_t when)
bool trigger = false; bool trigger = false;
assert (when.v >= 0); assert (when.v >= 0);
ddsrt_mutex_lock (&gv.leaseheap_lock); ddsrt_mutex_lock (&gv.leaseheap_lock);
lock_lease (l); /* only possible concurrent action is to move tend into the future (renew_lease),
l->tend = when; all other operations occur with leaseheap_lock held */
if (l->tend.v < l->tsched.v) ddsrt_atomic_st64 (&l->tend, (uint64_t) when.v);
if (when.v < l->tsched.v)
{ {
/* moved forward and currently scheduled (by virtue of /* moved forward and currently scheduled (by virtue of
TSCHED_NOT_ON_HEAP == INT64_MIN) */ TSCHED_NOT_ON_HEAP == INT64_MIN) */
l->tsched = l->tend; l->tsched = when;
ddsrt_fibheap_decrease_key (&lease_fhdef, &gv.leaseheap, l); ddsrt_fibheap_decrease_key (&lease_fhdef, &gv.leaseheap, l);
trigger = true; trigger = true;
} }
else if (l->tsched.v == TSCHED_NOT_ON_HEAP && l->tend.v < T_NEVER) else if (l->tsched.v == TSCHED_NOT_ON_HEAP && when.v < T_NEVER)
{ {
/* not currently scheduled, with a finite new expiry time */ /* not currently scheduled, with a finite new expiry time */
l->tsched = l->tend; l->tsched = when;
ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l); ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l);
trigger = true; trigger = true;
} }
unlock_lease (l);
ddsrt_mutex_unlock (&gv.leaseheap_lock); ddsrt_mutex_unlock (&gv.leaseheap_lock);
/* see lease_register() */ /* see lease_register() */
@ -217,23 +189,22 @@ int64_t check_and_handle_lease_expiration (nn_etime_t tnowE)
assert (l->tsched.v != TSCHED_NOT_ON_HEAP); assert (l->tsched.v != TSCHED_NOT_ON_HEAP);
ddsrt_fibheap_extract_min (&lease_fhdef, &gv.leaseheap); ddsrt_fibheap_extract_min (&lease_fhdef, &gv.leaseheap);
/* only possible concurrent action is to move tend into the future (renew_lease),
lock_lease (l); all other operations occur with leaseheap_lock held */
if (tnowE.v < l->tend.v) int64_t tend = (int64_t) ddsrt_atomic_ld64 (&l->tend);
if (tnowE.v < tend)
{ {
if (l->tend.v == T_NEVER) { if (tend == T_NEVER) {
/* don't reinsert if it won't expire */ /* don't reinsert if it won't expire */
l->tsched.v = TSCHED_NOT_ON_HEAP; l->tsched.v = TSCHED_NOT_ON_HEAP;
unlock_lease (l);
} else { } else {
l->tsched = l->tend; l->tsched.v = tend;
unlock_lease (l);
ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l); ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l);
} }
continue; continue;
} }
DDS_LOG(DDS_LC_DISCOVERY, "lease expired: l %p guid "PGUIDFMT" tend %"PRId64" < now %"PRId64"\n", (void *) l, PGUID (g), l->tend.v, tnowE.v); DDS_LOG(DDS_LC_DISCOVERY, "lease expired: l %p guid "PGUIDFMT" tend %"PRId64" < now %"PRId64"\n", (void *) l, PGUID (g), tend, tnowE.v);
/* If the proxy participant is relying on another participant for /* If the proxy participant is relying on another participant for
writing its discovery data (on the privileged participant, writing its discovery data (on the privileged participant,
@ -268,15 +239,12 @@ int64_t check_and_handle_lease_expiration (nn_etime_t tnowE)
{ {
DDS_LOG(DDS_LC_DISCOVERY, "but postponing because privileged pp "PGUIDFMT" is still live\n", DDS_LOG(DDS_LC_DISCOVERY, "but postponing because privileged pp "PGUIDFMT" is still live\n",
PGUID (proxypp->privileged_pp_guid)); PGUID (proxypp->privileged_pp_guid));
l->tsched = l->tend = add_duration_to_etime (tnowE, 200 * T_MILLISECOND); l->tsched = add_duration_to_etime (tnowE, 200 * T_MILLISECOND);
unlock_lease (l);
ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l); ddsrt_fibheap_insert (&lease_fhdef, &gv.leaseheap, l);
continue; continue;
} }
} }
unlock_lease (l);
l->tsched.v = TSCHED_NOT_ON_HEAP; l->tsched.v = TSCHED_NOT_ON_HEAP;
ddsrt_mutex_unlock (&gv.leaseheap_lock); ddsrt_mutex_unlock (&gv.leaseheap_lock);