limit WHC, serdata, xmsg freelist memory use (#168)

High sample rates require rather high rates of allocating and freeing
WHC nodes, serialised samples (serdata), and RTPS message fragments
(xmsg).  A bunch of dedicated parallel allocators help take some
pressure off the regular malloc/free calls.  However, these used to
gobble up memory like crazy, in part because of rather generous limits,
and in part because there was no restriction on the size of the samples
that would be cached, and it could end up caching large numbers of
multi-MB samples.  It should be noted that there is no benefit to
caching large samples anyway, because the sample rate will be that much
lower.

This commit reduces the maximum number of entries for all three cases,
it furthermore limits the maximum size of a serdata or xmsg that can be
cached, and finally instead of instantiating a separate allocator for
WHC nodes per WHC, it now shares one across all WHCs.  Total memory use
should now be limited to a couple of MB.

The caching can be disabled by setting ``FREELIST_TYPE`` to
``FREELIST_NONE`` in ``q_freelist.h``.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-05-02 11:23:35 +08:00 committed by eboasson
parent 6011422566
commit d693d8eac9
6 changed files with 89 additions and 20 deletions

View file

@ -88,7 +88,6 @@ struct whc_impl {
seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */
struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */
struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */
struct nn_freelist freelist; /* struct whc_node *; linked via whc_node::next_seq */
#if USE_EHH
struct ddsrt_ehh *seq_hash;
#else
@ -124,7 +123,7 @@ static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq);
static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn);
static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn);
static int compare_seq (const void *va, const void *vb);
static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list);
static void free_deferred_free_list (struct whc_node *deferred_free_list);
static void get_state_locked(const struct whc_impl *whc, struct whc_state *st);
static unsigned whc_default_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list);
@ -159,6 +158,17 @@ static const struct whc_ops whc_ops = {
.free = whc_default_free
};
/* Number of instantiated WHCs and a global freelist for WHC nodes that gets
initialized lazily and cleaned up automatically when the last WHC is freed.
Protected by dds_global.m_mutex.
sizeof (whc_node) on 64-bit machines ~ 100 bytes, so this is ~1MB
8k entries seems to be roughly the amount needed for minimum samples,
maximum message size and a short round-trip time */
#define MAX_FREELIST_SIZE 8192
static uint32_t whc_count;
static struct nn_freelist whc_node_freelist;
#if USE_EHH
static uint32_t whc_seq_entry_hash (const void *vn)
{
@ -379,8 +389,10 @@ struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth)
whc->open_intv = intv;
whc->maxseq_node = NULL;
/* hack */
nn_freelist_init (&whc->freelist, UINT32_MAX, offsetof (struct whc_node, next_seq));
ddsrt_mutex_lock (&dds_global.m_mutex);
if (whc_count++ == 0)
nn_freelist_init (&whc_node_freelist, MAX_FREELIST_SIZE, offsetof (struct whc_node, next_seq));
ddsrt_mutex_unlock (&dds_global.m_mutex);
check_whc (whc);
return (struct whc *)whc;
@ -425,7 +437,11 @@ DDSRT_WARNING_MSVC_ON(6001);
}
ddsrt_avl_free (&whc_seq_treedef, &whc->seq, ddsrt_free);
nn_freelist_fini (&whc->freelist, ddsrt_free);
ddsrt_mutex_lock (&dds_global.m_mutex);
if (--whc_count == 0)
nn_freelist_fini (&whc_node_freelist, ddsrt_free);
ddsrt_mutex_unlock (&dds_global.m_mutex);
#if USE_EHH
ddsrt_ehh_free (whc->seq_hash);
@ -753,11 +769,11 @@ static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn)
if (whcn_tmp->next_seq)
whcn_tmp->next_seq->prev_seq = whcn_tmp->prev_seq;
whcn_tmp->next_seq = NULL;
free_deferred_free_list (whc, whcn_tmp);
free_deferred_free_list (whcn_tmp);
whc->seq_size--;
}
static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list)
static void free_deferred_free_list (struct whc_node *deferred_free_list)
{
if (deferred_free_list)
{
@ -769,7 +785,7 @@ static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *defe
if (!cur->borrowed)
free_whc_node_contents (cur);
}
cur = nn_freelist_pushmany (&whc->freelist, deferred_free_list, last, n);
cur = nn_freelist_pushmany (&whc_node_freelist, deferred_free_list, last, n);
while (cur)
{
struct whc_node *tmp = cur;
@ -781,8 +797,8 @@ static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *defe
static void whc_default_free_deferred_free_list (struct whc *whc_generic, struct whc_node *deferred_free_list)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
free_deferred_free_list(whc, deferred_free_list);
(void) whc_generic;
free_deferred_free_list (deferred_free_list);
}
static unsigned whc_default_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list)
@ -1028,7 +1044,7 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma
{
struct whc_node *newn = NULL;
if ((newn = nn_freelist_pop (&whc->freelist)) == NULL)
if ((newn = nn_freelist_pop (&whc_node_freelist)) == NULL)
newn = ddsrt_malloc (sizeof (*newn));
newn->seq = seq;
newn->plist = plist;

View file

@ -19,7 +19,7 @@
extern "C" {
#endif
#define FREELIST_SIMPLE 1
#define FREELIST_NONE 1
#define FREELIST_ATOMIC_LIFO 2
#define FREELIST_DOUBLE 3
@ -33,7 +33,13 @@ extern "C" {
#endif
#endif
#if FREELIST_TYPE == FREELIST_ATOMIC_LIFO
#if FREELIST_TYPE == FREELIST_NONE
struct nn_freelist {
char dummy;
};
#elif FREELIST_TYPE == FREELIST_ATOMIC_LIFO
struct nn_freelist {
ddsrt_atomic_lifo_t x;

View file

@ -27,7 +27,12 @@
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_serdata_default.h"
#define MAX_POOL_SIZE 16384
/* 8k entries in the freelist seems to be roughly the amount needed to send
minimum-size (well, 4 bytes) samples as fast as possible over loopback
while using large messages -- actually, it stands to reason that this would
be the same as the WHC node pool size */
#define MAX_POOL_SIZE 8192
#define MAX_SIZE_FOR_POOL 256
#define CLEAR_PADDING 0
#ifndef NDEBUG
@ -205,7 +210,7 @@ static void serdata_default_free(struct ddsi_serdata *dcmn)
{
struct ddsi_serdata_default *d = (struct ddsi_serdata_default *)dcmn;
assert(ddsrt_atomic_ld32(&d->c.refc) == 0);
if (!nn_freelist_push (&gv.serpool->freelist, d))
if (d->size > MAX_SIZE_FOR_POOL || !nn_freelist_push (&gv.serpool->freelist, d))
dds_free (d);
}

View file

@ -18,7 +18,37 @@
#include "dds/ddsrt/threads.h"
#include "dds/ddsi/q_freelist.h"
#if FREELIST_TYPE == FREELIST_ATOMIC_LIFO
#if FREELIST_TYPE == FREELIST_NONE
void nn_freelist_init (struct nn_freelist *fl, uint32_t max, off_t linkoff)
{
(void) fl; (void) max; (void) linkoff;
}
void nn_freelist_fini (struct nn_freelist *fl, void (*free) (void *elem))
{
(void) fl; (void) free;
}
bool nn_freelist_push (struct nn_freelist *fl, void *elem)
{
(void) fl; (void) elem;
return false;
}
void *nn_freelist_pushmany (struct nn_freelist *fl, void *first, void *last, uint32_t n)
{
(void) fl; (void) first; (void) last; (void) n;
return first;
}
void *nn_freelist_pop (struct nn_freelist *fl)
{
(void) fl;
return NULL;
}
#elif FREELIST_TYPE == FREELIST_ATOMIC_LIFO
void nn_freelist_init (struct nn_freelist *fl, uint32_t max, off_t linkoff)
{

View file

@ -400,7 +400,9 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg)
{
#define TEST_KEYHASH 0
const size_t expected_inline_qos_size = 4+8+20+4 + 32;
/* actual expected_inline_qos_size is typically 0, but always claiming 32 bytes won't make
a difference, so no point in being precise */
const size_t expected_inline_qos_size = /* statusinfo */ 8 + /* keyhash */ 20 + /* sentinel */ 4;
struct nn_xmsg_marker sm_marker;
unsigned char contentflag = 0;
Data_t *data;
@ -423,6 +425,7 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc
ASSERT_MUTEX_HELD (&wr->e.lock);
/* INFO_TS: 12 bytes, Data_t: 24 bytes, expected inline QoS: 32 => should be single chunk */
if ((*pmsg = nn_xmsg_new (gv.xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (Data_t) + expected_inline_qos_size, NN_XMSG_KIND_DATA)) == NULL)
return Q_ERR_OUT_OF_MEMORY;
@ -484,7 +487,9 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
Expected inline QoS size: header(4) + statusinfo(8) + keyhash(20)
+ sentinel(4). Plus some spare cos I can't be bothered. */
const int set_smhdr_flags_asif_data = config.buggy_datafrag_flags_mode;
const size_t expected_inline_qos_size = 4+8+20+4 + 32;
/* actual expected_inline_qos_size is typically 0, but always claiming 32 bytes won't make
a difference, so no point in being precise */
const size_t expected_inline_qos_size = /* statusinfo */ 8 + /* keyhash */ 20 + /* sentinel */ 4;
struct nn_xmsg_marker sm_marker;
void *sm;
Data_DataFrag_common_t *ddcmn;
@ -508,6 +513,7 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli
fragging = (config.fragment_size < size);
/* INFO_TS: 12 bytes, DataFrag_t: 36 bytes, expected inline QoS: 32 => should be single chunk */
if ((*pmsg = nn_xmsg_new (gv.xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL)
return Q_ERR_OUT_OF_MEMORY;

View file

@ -241,13 +241,18 @@ static size_t align4u (size_t x)
Great expectations, but so far still wanting. */
/* We need about as many as will fit in a message; an otherwise unadorned data message is ~ 40 bytes
for a really small sample, no key hash, no status info, and message sizes are (typically) < 64kB
so we can expect not to need more than ~ 1600 xmsg at a time. Powers-of-two are nicer :) */
#define MAX_FREELIST_SIZE 2048
static void nn_xmsg_realfree (struct nn_xmsg *m);
struct nn_xmsgpool *nn_xmsgpool_new (void)
{
struct nn_xmsgpool *pool;
pool = ddsrt_malloc (sizeof (*pool));
nn_freelist_init (&pool->freelist, UINT32_MAX, offsetof (struct nn_xmsg, link.older));
nn_freelist_init (&pool->freelist, MAX_FREELIST_SIZE, offsetof (struct nn_xmsg, link.older));
return pool;
}
@ -352,7 +357,8 @@ void nn_xmsg_free (struct nn_xmsg *m)
unref_addrset (m->dstaddr.all.as);
unref_addrset (m->dstaddr.all.as_group);
}
if (!nn_freelist_push (&pool->freelist, m))
/* Only cache the smallest xmsgs; data messages store the payload by reference and are small */
if (m->maxsz > NN_XMSG_CHUNK_SIZE || !nn_freelist_push (&pool->freelist, m))
{
nn_xmsg_realfree (m);
}