From ec0062542c92d28956bfae1fc4336178a8a6577b Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Sat, 13 Apr 2019 17:08:07 +0200 Subject: [PATCH] defer triggering dqueue thread until end-of-packet There appears to be a minor performance benefit to not waking up the delivery thread (if used) immediately upon enqueueing the first sample, but rather to wait (typically) until the end of the packet. In a latency measurement it probably makes little difference: one shouldn't use asynchronous delivery if one needs the lowest possible latency, and the end of the packet is reached rather quickly normally. Signed-off-by: Erik Boasson --- src/core/ddsi/include/dds/ddsi/q_radmin.h | 2 ++ src/core/ddsi/src/q_radmin.c | 22 +++++++++++++ src/core/ddsi/src/q_receive.c | 38 +++++++++++++++++------ 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/src/core/ddsi/include/dds/ddsi/q_radmin.h b/src/core/ddsi/include/dds/ddsi/q_radmin.h index 829cdcd..e61c029 100644 --- a/src/core/ddsi/include/dds/ddsi/q_radmin.h +++ b/src/core/ddsi/include/dds/ddsi/q_radmin.h @@ -228,6 +228,8 @@ seqno_t nn_reorder_next_seq (const struct nn_reorder *reorder); struct nn_dqueue *nn_dqueue_new (const char *name, uint32_t max_samples, nn_dqueue_handler_t handler, void *arg); void nn_dqueue_free (struct nn_dqueue *q); +bool nn_dqueue_enqueue_deferred_wakeup (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres); +void dd_dqueue_enqueue_trigger (struct nn_dqueue *q); void nn_dqueue_enqueue (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres); void nn_dqueue_enqueue1 (struct nn_dqueue *q, const nn_guid_t *rdguid, struct nn_rsample_chain *sc, nn_reorder_result_t rres); void nn_dqueue_enqueue_callback (struct nn_dqueue *q, nn_dqueue_callback_t cb, void *arg); diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index 1ca2e83..8b49c64 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -2541,6 +2541,26 @@ static int nn_dqueue_enqueue_locked (struct nn_dqueue *q, struct nn_rsample_chai return must_signal; } +bool nn_dqueue_enqueue_deferred_wakeup (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres) +{ + bool signal; + assert (rres > 0); + assert (sc->first); + assert (sc->last->next == NULL); + ddsrt_mutex_lock (&q->lock); + ddsrt_atomic_add32 (&q->nof_samples, (uint32_t) rres); + signal = nn_dqueue_enqueue_locked (q, sc); + ddsrt_mutex_unlock (&q->lock); + return signal; +} + +void dd_dqueue_enqueue_trigger (struct nn_dqueue *q) +{ + ddsrt_mutex_lock (&q->lock); + ddsrt_cond_broadcast (&q->cond); + ddsrt_mutex_unlock (&q->lock); +} + void nn_dqueue_enqueue (struct nn_dqueue *q, struct nn_rsample_chain *sc, nn_reorder_result_t rres) { assert (rres > 0); @@ -2622,6 +2642,8 @@ void nn_dqueue_wait_until_empty_if_full (struct nn_dqueue *q) if (count >= q->max_samples) { ddsrt_mutex_lock (&q->lock); + /* In case the wakeups are were all deferred */ + ddsrt_cond_broadcast (&q->cond); while (ddsrt_atomic_ld32 (&q->nof_samples) > 0) ddsrt_cond_wait (&q->cond, &q->lock); ddsrt_mutex_unlock (&q->lock); diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index a442e86..8ea8c47 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -2151,7 +2151,7 @@ static void clean_defrag (struct proxy_writer *pwr) nn_defrag_notegap (pwr->defrag, 1, seq); } -static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata) +static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_DataFrag_common_t *msg, const struct nn_rsample_info *sampleinfo, uint32_t fragnum, struct nn_rdata *rdata, struct nn_dqueue **deferred_wakeup) { struct proxy_writer *pwr; struct nn_rsample *rsample; @@ -2251,10 +2251,23 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct receive thread's data gets interleaved -- arguably delivery needn't be exactly in-order, which would allow us to do this without pwr->e.lock held. */ - if (!pwr->deliver_synchronously) - nn_dqueue_enqueue (pwr->dqueue, &sc, rres); - else + if (pwr->deliver_synchronously) + { + /* FIXME: just in case the synchronous delivery runs into a delay caused + by the current mishandling of resource limits */ + if (*deferred_wakeup) + dd_dqueue_enqueue_trigger (*deferred_wakeup); deliver_user_data_synchronously (&sc); + } + else + { + if (nn_dqueue_enqueue_deferred_wakeup (pwr->dqueue, &sc, rres)) + { + if (*deferred_wakeup && *deferred_wakeup != pwr->dqueue) + dd_dqueue_enqueue_trigger (*deferred_wakeup); + *deferred_wakeup = pwr->dqueue; + } + } if (pwr->n_readers_out_of_sync > 0) { /* Those readers catching up with TL but in sync with the proxy @@ -2384,7 +2397,7 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con } } -static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap) +static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const Data_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup) { DDS_TRACE("DATA(%x:%x:%x:%x -> %x:%x:%x:%x #%"PRId64"", PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, @@ -2421,14 +2434,14 @@ static int handle_Data (struct receiver_state *rst, nn_etime_t tnow, struct nn_r } else { - handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata); + handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, ~0u, rdata, deferred_wakeup); } } DDS_TRACE(")"); return 1; } -static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap) +static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct nn_rmsg *rmsg, const DataFrag_t *msg, size_t size, struct nn_rsample_info *sampleinfo, unsigned char *datap, struct nn_dqueue **deferred_wakeup) { DDS_TRACE("DATAFRAG(%x:%x:%x:%x -> %x:%x:%x:%x #%"PRId64"/[%u..%u]", PGUIDPREFIX (rst->src_guid_prefix), msg->x.writerId.u, @@ -2494,7 +2507,7 @@ static int handle_DataFrag (struct receiver_state *rst, nn_etime_t tnow, struct wrong, it'll simply generate a request for retransmitting a non-existent fragment. The other side SHOULD be capable of dealing with that. */ - handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata); + handle_regular (rst, tnow, rmsg, &msg->x, sampleinfo, msg->fragmentStartingNum + msg->fragmentsInSubmessage - 2, rdata, deferred_wakeup); } DDS_TRACE(")"); return 1; @@ -2681,6 +2694,7 @@ static int handle_submsg_sequence nn_ddsi_time_t timestamp; size_t submsg_size = 0; unsigned char * end = msg + len; + struct nn_dqueue *deferred_wakeup = NULL; /* Receiver state is dynamically allocated with lifetime bound to the message. Updates cause a new copy to be created if the @@ -2845,7 +2859,7 @@ static int handle_submsg_sequence goto malformed; sampleinfo.timestamp = timestamp; sampleinfo.reception_timestamp = tnowWC; - handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_size, &sampleinfo, datap); + handle_DataFrag (rst, tnowE, rmsg, &sm->datafrag, submsg_size, &sampleinfo, datap, &deferred_wakeup); rst_live = 1; ts_for_latmeas = 0; } @@ -2860,7 +2874,7 @@ static int handle_submsg_sequence goto malformed; sampleinfo.timestamp = timestamp; sampleinfo.reception_timestamp = tnowWC; - handle_Data (rst, tnowE, rmsg, &sm->data, submsg_size, &sampleinfo, datap); + handle_Data (rst, tnowE, rmsg, &sm->data, submsg_size, &sampleinfo, datap, &deferred_wakeup); rst_live = 1; ts_for_latmeas = 0; } @@ -2958,6 +2972,8 @@ static int handle_submsg_sequence } thread_state_asleep (ts1); assert (thread_is_asleep ()); + if (deferred_wakeup) + dd_dqueue_enqueue_trigger (deferred_wakeup); return 0; malformed: @@ -2966,6 +2982,8 @@ malformed: malformed_asleep: assert (thread_is_asleep ()); malformed_packet_received (msg, submsg, len, state, state_smkind, hdr->vendorid); + if (deferred_wakeup) + dd_dqueue_enqueue_trigger (deferred_wakeup); return -1; }