defer triggering dqueue thread until end-of-packet

There appears to be a minor performance benefit to not waking up the
delivery thread (if used) immediately upon enqueueing the first sample,
but rather to wait (typically) until the end of the packet.  In a
latency measurement it probably makes little difference: one shouldn't
use asynchronous delivery if one needs the lowest possible latency, and
the end of the packet is reached rather quickly normally.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-04-13 17:08:07 +02:00 committed by eboasson
parent c92820677d
commit ec0062542c
3 changed files with 52 additions and 10 deletions

View file

@ -228,6 +228,8 @@ seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder);
struct nn_dqueue *nn_dqueue_new (const char *name, uint32_t max_samples, nn_dqueue_handler_t handler, void *arg);
void nn_dqueue_free (struct nn_dqueue *q);
bool nn_dqueue_enqueue_deferred_wakeup (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres);
void dd_dqueue_enqueue_trigger (struct nn_dqueue *q);
void nn_dqueue_enqueue (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres);
void nn_dqueue_enqueue1 (struct nn_dqueue *q, const nn_guid_t *rdguid, struct nn_rsample_chain *sc, nn_reorder_result_t rres);
void nn_dqueue_enqueue_callback (struct nn_dqueue *q, nn_dqueue_callback_t cb, void *arg);

View file

@ -2541,6 +2541,26 @@ static int nn_dqueue_enqueue_locked (struct nn_dqueue *q, struct nn_rsample_chai
return must_signal;
}
bool nn_dqueue_enqueue_deferred_wakeup (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres)
{
bool signal;
assert (rres > 0);
assert (sc->first);
assert (sc->last->next == NULL);
ddsrt_mutex_lock (&q->lock);
ddsrt_atomic_add32 (&q->nof_samples, (uint32_t) rres);
signal = nn_dqueue_enqueue_locked (q, sc);
ddsrt_mutex_unlock (&q->lock);
return signal;
}
void dd_dqueue_enqueue_trigger (struct nn_dqueue *q)
{
ddsrt_mutex_lock (&q->lock);
ddsrt_cond_broadcast (&q->cond);
ddsrt_mutex_unlock (&q->lock);
}
void nn_dqueue_enqueue (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres)
{
assert (rres > 0);
@ -2622,6 +2642,8 @@ void nn_dqueue_wait_until_empty_if_full (struct nn_dqueue *q)
if (count >= q->max_samples)
{
ddsrt_mutex_lock (&q->lock);
/* In case the wakeups are were all deferred */
ddsrt_cond_broadcast (&q->cond);
while (ddsrt_atomic_ld32 (&q->nof_samples) > 0)
ddsrt_cond_wait (&q->cond, &q->lock);
ddsrt_mutex_unlock (&q->lock);

View file

@ -2151,7 +2151,7 @@ static void clean_defrag (struct proxy_writer *pwr)
nn_defrag_notegap (pwr->defrag, 1, seq);
}
static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata)
static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup)
{
struct proxy_writer *pwr;
struct nn_rsample *rsample;
@ -2251,10 +2251,23 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct
receive thread's data gets interleaved -- arguably delivery
needn't be exactly in-order, which would allow us to do this
without pwr->e.lock held. */
if (!pwr->deliver_synchronously)
nn_dqueue_enqueue (pwr->dqueue, &sc, rres);
else
if (pwr->deliver_synchronously)
{
/* FIXME: just in case the synchronous delivery runs into a delay caused
by the current mishandling of resource limits */
if (*deferred_wakeup)
dd_dqueue_enqueue_trigger (*deferred_wakeup);
deliver_user_data_synchronously (&sc);
}
else
{
if (nn_dqueue_enqueue_deferred_wakeup (pwr->dqueue, &sc, rres))
{
if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue)
dd_dqueue_enqueue_trigger (*deferred_wakeup);
*deferred_wakeup = pwr->dqueue;
}
}
if (pwr->n_readers_out_of_sync > 0)
{
/* Those readers catching up with TL but in sync with the proxy
@ -2384,7 +2397,7 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con
}
}
static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap)
static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup)
{
DDS_TRACE("DATA(%x:%x:%x:%x -> %x:%x:%x:%x #%"PRId64"",
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u,
@ -2421,14 +2434,14 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r
}
else
{
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata);
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup);
}
}
DDS_TRACE(")");
return 1;
}
static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap)
static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup)
{
DDS_TRACE("DATAFRAG(%x:%x:%x:%x -> %x:%x:%x:%x #%"PRId64"/[%u..%u]",
PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u,
@ -2494,7 +2507,7 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct
wrong, it'll simply generate a request for retransmitting a
non-existent fragment. The other side SHOULD be capable of
dealing with that. */
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata);
handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata, deferred_wakeup);
}
DDS_TRACE(")");
return 1;
@ -2681,6 +2694,7 @@ static int handle_submsg_sequence
nn_ddsi_time_t timestamp;
size_t submsg_size = 0;
unsigned char * end = msg + len;
struct nn_dqueue *deferred_wakeup = NULL;
/* Receiver state is dynamically allocated with lifetime bound to
the message. Updates cause a new copy to be created if the
@ -2845,7 +2859,7 @@ static int handle_submsg_sequence
goto malformed;
sampleinfo.timestamp = timestamp;
sampleinfo.reception_timestamp = tnowWC;
handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_size, &sampleinfo, datap);
handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_size, &sampleinfo, datap, &deferred_wakeup);
rst_live = 1;
ts_for_latmeas = 0;
}
@ -2860,7 +2874,7 @@ static int handle_submsg_sequence
goto malformed;
sampleinfo.timestamp = timestamp;
sampleinfo.reception_timestamp = tnowWC;
handle_Data (rst, tnowE, rmsg, &sm->data, submsg_size, &sampleinfo, datap);
handle_Data (rst, tnowE, rmsg, &sm->data, submsg_size, &sampleinfo, datap, &deferred_wakeup);
rst_live = 1;
ts_for_latmeas = 0;
}
@ -2958,6 +2972,8 @@ static int handle_submsg_sequence
}
thread_state_asleep (ts1);
assert (thread_is_asleep ());
if (deferred_wakeup)
dd_dqueue_enqueue_trigger (deferred_wakeup);
return 0;
malformed:
@ -2966,6 +2982,8 @@ malformed:
malformed_asleep:
assert (thread_is_asleep ());
malformed_packet_received (msg, submsg, len, state, state_smkind, hdr->vendorid);
if (deferred_wakeup)
dd_dqueue_enqueue_trigger (deferred_wakeup);
return -1;
}