From ee9a12b469ec4a463a4bb1ff39db7827417a314c Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 24 Jun 2019 12:38:50 +0200 Subject: [PATCH] Allocate xpack::iov once writer has to send data Currently each DDSC (not DDSI) writer has its own "xpack" for packing submessages into larger messages, but that is a bit wasteful, especially when a lot of samples are being generated that never need to go onto the wire. Lazily allocating them and only pushing message into them when they have a destination address saves memory and improves speed for local communications. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_transmit.c | 16 ++++++++++++++++ src/core/ddsi/src/q_xmsg.c | 7 ++++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 5686fb6..4cfa4ea 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -1103,6 +1103,22 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack * ddsrt_free (plist); } } + else if (addrset_empty (wr->as) && (wr->as_group == NULL || addrset_empty (wr->as_group))) + { + /* No network destination, so no point in doing all the work involved + in going all the way. We do have to record that we "transmitted" + this sample, or it might not be retransmitted on request. + + (Note that no network destination is very nearly the same as no + matching proxy readers. The exception is the SPDP writer.) */ + UPDATE_SEQ_XMIT_LOCKED (wr, seq); + ddsrt_mutex_unlock (&wr->e.lock); + if (plist != NULL) + { + nn_plist_fini (plist); + ddsrt_free (plist); + } + } else { /* Note the subtlety of enqueueing with the lock held but diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index fabe8c0..c38fca7 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -200,7 +200,7 @@ struct nn_xpack ddsi_tran_conn_t conn; ddsi_sem_t sem; size_t niov; - ddsrt_iovec_t iov[NN_XMSG_MAX_MESSAGE_IOVECS]; + ddsrt_iovec_t *iov; enum nn_xmsg_dstmode dstmode; union @@ -978,6 +978,7 @@ struct nn_xpack * nn_xpack_new (ddsi_tran_conn_t conn, uint32_t bw_limit, bool a xp = ddsrt_malloc (sizeof (*xp)); memset (xp, 0, sizeof (*xp)); xp->async_mode = async_mode; + xp->iov = NULL; /* Fixed header fields, initialized just once */ xp->hdr.protocol.id[0] = 'R'; @@ -1030,6 +1031,7 @@ void nn_xpack_free (struct nn_xpack *xp) #endif if (gv.thread_pool) ddsi_sem_destroy (&xp->sem); + ddsrt_free (xp->iov); ddsrt_free (xp); } @@ -1411,6 +1413,9 @@ int nn_xpack_addmsg (struct nn_xpack *xp, struct nn_xmsg *m, const uint32_t flag assert ((m->sz % 4) == 0); assert (m->refd_payload == NULL || (m->refd_payload_iov.iov_len % 4) == 0); + if (xp->iov == NULL) + xp->iov = malloc (NN_XMSG_MAX_MESSAGE_IOVECS * sizeof (*xp->iov)); + if (!nn_xpack_mayaddmsg (xp, m, flags)) { assert (xp->niov > 0);