diff --git a/src/core/ddsi/include/ddsi/q_lease.h b/src/core/ddsi/include/ddsi/q_lease.h index bbd519f..1bb463c 100644 --- a/src/core/ddsi/include/ddsi/q_lease.h +++ b/src/core/ddsi/include/ddsi/q_lease.h @@ -31,7 +31,7 @@ void lease_register (struct lease *l); void lease_free (struct lease *l); void lease_renew (struct lease *l, nn_etime_t tnow); void lease_set_expiry (struct lease *l, nn_etime_t when); -void check_and_handle_lease_expiration (struct thread_state1 *self, nn_etime_t tnow); +int64_t check_and_handle_lease_expiration (struct thread_state1 *self, nn_etime_t tnow); void handle_PMD (const struct receiver_state *rst, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, unsigned len); diff --git a/src/core/ddsi/src/q_gc.c b/src/core/ddsi/src/q_gc.c index f29bc98..1e52830 100644 --- a/src/core/ddsi/src/q_gc.c +++ b/src/core/ddsi/src/q_gc.c @@ -80,22 +80,33 @@ static uint32_t gcreq_queue_thread (struct gcreq_queue *q) { struct thread_state1 *self = lookup_thread_state (); nn_mtime_t next_thread_cputime = { 0 }; - struct os_time to = { 0, 100 * T_MILLISECOND }; struct os_time shortsleep = { 0, 1 * T_MILLISECOND }; + int64_t delay = T_MILLISECOND; /* force evaluation after startup */ struct gcreq *gcreq = NULL; int trace_shortsleep = 1; os_mutexLock (&q->lock); while (!(q->terminate && q->count == 0)) { LOG_THREAD_CPUTIME (next_thread_cputime); + /* If we are waiting for a gcreq to become ready, don't bother looking at the queue; if we aren't, wait for a request to come in. We can't really wait until something came in because we're also checking lease expirations. */ if (gcreq == NULL) { + assert (trace_shortsleep); if (q->first == NULL) - os_condTimedWait (&q->cond, &q->lock, &to); + { + if (delay == T_NEVER) + os_condWait (&q->cond, &q->lock); + else + { + /* FIXME: fix os_time and use absolute timeouts */ + struct os_time to = { delay / T_SECOND, delay % T_SECOND }; + os_condTimedWait (&q->cond, &q->lock, &to); + } + } if (q->first) { gcreq = q->first; @@ -112,7 +123,7 @@ static uint32_t gcreq_queue_thread (struct gcreq_queue *q) burden on the system than having a separate thread or adding it to the workload of the data handling threads. */ thread_state_awake (self); - check_and_handle_lease_expiration (self, now_et ()); + delay = check_and_handle_lease_expiration (self, now_et ()); thread_state_asleep (self); if (gcreq) diff --git a/src/core/ddsi/src/q_lease.c b/src/core/ddsi/src/q_lease.c index bc24406..1dd7e5d 100644 --- a/src/core/ddsi/src/q_lease.c +++ b/src/core/ddsi/src/q_lease.c @@ -36,6 +36,7 @@ #include "ddsi/q_bswap.h" #include "ddsi/q_transmit.h" #include "ddsi/q_lease.h" +#include "ddsi/q_gc.h" #include "ddsi/sysdeps.h" @@ -55,6 +56,11 @@ static int compare_lease_tsched (const void *va, const void *vb); static const ut_fibheapDef_t lease_fhdef = UT_FIBHEAPDEF_INITIALIZER(offsetof (struct lease, heapnode), compare_lease_tsched); +static void force_lease_check (void) +{ + gcreq_enqueue(gcreq_new(gv.gcreq_queue, gcreq_free)); +} + static int compare_lease_tsched (const void *va, const void *vb) { const struct lease *a = va; @@ -124,6 +130,9 @@ void lease_register (struct lease *l) } unlock_lease (l); os_mutexUnlock (&gv.leaseheap_lock); + + /* check_and_handle_lease_expiration runs on GC thread and the only way to be sure that it wakes up in time is by forcing re-evaluation (strictly speaking only needed if this is the first lease to expire, but this operation is quite rare anyway) */ + force_lease_check(); } void lease_free (struct lease *l) @@ -134,6 +143,9 @@ void lease_free (struct lease *l) ut_fibheapDelete (&lease_fhdef, &gv.leaseheap, l); os_mutexUnlock (&gv.leaseheap_lock); os_free (l); + + /* see lease_register() */ + force_lease_check(); } void lease_renew (struct lease *l, nn_etime_t tnowE) @@ -166,6 +178,7 @@ void lease_renew (struct lease *l, nn_etime_t tnowE) void lease_set_expiry (struct lease *l, nn_etime_t when) { + bool trigger = false; assert (when.v >= 0); os_mutexLock (&gv.leaseheap_lock); lock_lease (l); @@ -176,21 +189,27 @@ void lease_set_expiry (struct lease *l, nn_etime_t when) TSCHED_NOT_ON_HEAP == INT64_MIN) */ l->tsched = l->tend; ut_fibheapDecreaseKey (&lease_fhdef, &gv.leaseheap, l); + trigger = true; } else if (l->tsched.v == TSCHED_NOT_ON_HEAP && l->tend.v < T_NEVER) { /* not currently scheduled, with a finite new expiry time */ l->tsched = l->tend; ut_fibheapInsert (&lease_fhdef, &gv.leaseheap, l); + trigger = true; } unlock_lease (l); os_mutexUnlock (&gv.leaseheap_lock); + + /* see lease_register() */ + if (trigger) + force_lease_check(); } -void check_and_handle_lease_expiration (UNUSED_ARG (struct thread_state1 *self), nn_etime_t tnowE) +int64_t check_and_handle_lease_expiration (UNUSED_ARG (struct thread_state1 *self), nn_etime_t tnowE) { struct lease *l; - const nn_wctime_t tnow = now(); + int64_t delay; os_mutexLock (&gv.leaseheap_lock); while ((l = ut_fibheapMin (&lease_fhdef, &gv.leaseheap)) != NULL && l->tsched.v <= tnowE.v) { @@ -268,25 +287,28 @@ void check_and_handle_lease_expiration (UNUSED_ARG (struct thread_state1 *self), delete_participant (&g); break; case EK_PROXY_PARTICIPANT: - delete_proxy_participant_by_guid (&g, tnow, 1); + delete_proxy_participant_by_guid (&g, now(), 1); break; case EK_WRITER: delete_writer_nolinger (&g); break; case EK_PROXY_WRITER: - delete_proxy_writer (&g, tnow, 1); + delete_proxy_writer (&g, now(), 1); break; case EK_READER: (void)delete_reader (&g); break; case EK_PROXY_READER: - delete_proxy_reader (&g, tnow, 1); + delete_proxy_reader (&g, now(), 1); break; } os_mutexLock (&gv.leaseheap_lock); } + + delay = (l == NULL) ? T_NEVER : (l->tsched.v - tnowE.v); os_mutexUnlock (&gv.leaseheap_lock); + return delay; } /******/