From 9743bda57b57e9efb2f3d7660fb3fa36a2f390ab Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 15 Mar 2019 14:57:04 +0100 Subject: [PATCH 1/3] Count concurrent calls blocking on a full WHC The writer tracks whether it is throttled because of a full WHC, but does so by treating it as a simple flag. This is fine if there is at most one thread blocked on any single writer at any time, but if there are multiple threads using the same writer it would be possible for one thread to be woken up, clear the flag, and so affect the wakeup of other threads. Turning it from a flag to a counter avoids that problem. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_transmit.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 4d81d84..c4b7911 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -960,7 +960,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) } DDS_LOG(DDS_LC_THROTTLE, "writer %x:%x:%x:%x waiting for whc to shrink below low-water mark (whc %"PRIuSIZE" low=%u high=%u)\n", PGUID (wr->e.guid), whcst.unacked_bytes, wr->whc_low, wr->whc_high); - wr->throttling = 1; + wr->throttling++; wr->throttle_count++; /* Force any outstanding packet out: there will be a heartbeat @@ -1000,7 +1000,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) } } - wr->throttling = 0; + wr->throttling--; if (wr->state != WRST_OPERATIONAL) { /* gc_delete_writer may be waiting */ From 2e9685221a5ada01f2c74951779b55b6428ac748 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 15 Mar 2019 15:04:37 +0100 Subject: [PATCH 2/3] Recheck WHC for unacked data just before blocking A writer blocking on a full WHC will still send out whatever it has buffered but not sent yet. For this, the writer lock must be released, but that means an ACK can sneak in between sending out the packet and relocking the writer (not likely if there's a real network in between, but over a loopback interface it is definitely possible). Therefore, the amount of unacknowledged data that controls the blocking and triggering of it must be refreshed before deciding to block, otherwise it may hang indefinitely. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_transmit.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index c4b7911..1cbad3c 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -950,7 +950,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) nn_mtime_t tnow = now_mt (); const nn_mtime_t abstimeout = add_duration_to_mtime (tnow, nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time)); struct whc_state whcst; - whc_get_state(wr->whc, &whcst); + whc_get_state (wr->whc, &whcst); { ASSERT_MUTEX_HELD (&wr->e.lock); @@ -976,6 +976,7 @@ static os_result throttle_writer (struct nn_xpack *xp, struct writer *wr) } nn_xpack_send (xp, true); os_mutexLock (&wr->e.lock); + whc_get_state (wr->whc, &whcst); } while (gv.rtps_keepgoing && !writer_may_continue (wr, &whcst)) From aa6a6442c22e4e9a022ff87594fedaa80f52a30c Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Sat, 16 Mar 2019 20:47:59 +0100 Subject: [PATCH 3/3] Fix conversion of {sec,nsec} to msec in timedwait on Windows Internally time stamps and durations are all in nanoseconds, but the platform abstraction uses {sec,nsec} (essentially a struct timespec) and Windows uses milliseconds. The conversion to milliseconds with upwards rounding was broken, adding ~1s to each timeout. In most of the handful of uses the effect is minor in practice, but it does matter a lot in the scheduling of Heartbeat and AckNack messages, e.g., by causing a simple throughput test to exhibit periodic drops in throughput. Signed-off-by: Erik Boasson --- src/os/src/windows/os_platform_sync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/os/src/windows/os_platform_sync.c b/src/os/src/windows/os_platform_sync.c index 5084b13..1c36511 100644 --- a/src/os/src/windows/os_platform_sync.c +++ b/src/os/src/windows/os_platform_sync.c @@ -97,7 +97,7 @@ os_result os_condTimedWait(os_cond *cond, os_mutex *mutex, const os_time *time) assert(cond != NULL); assert(mutex != NULL); - timems = time->tv_sec * 1000 + (time->tv_nsec + 999999999) / 1000000; + timems = time->tv_sec * 1000 + (time->tv_nsec + 999999) / 1000000; if (SleepConditionVariableSRW(&cond->cond, &mutex->lock, timems, 0)) { return os_resultSuccess; } else if (GetLastError() != ERROR_TIMEOUT) {