limit WHC, serdata, xmsg freelist memory use (#168)

High sample rates require rather high rates of allocating and freeing
WHC nodes, serialised samples (serdata), and RTPS message fragments
(xmsg).  A bunch of dedicated parallel allocators help take some
pressure off the regular malloc/free calls.  However, these used to
gobble up memory like crazy, in part because of rather generous limits,
and in part because there was no restriction on the size of the samples
that would be cached, and it could end up caching large numbers of
multi-MB samples.  It should be noted that there is no benefit to
caching large samples anyway, because the sample rate will be that much
lower.

This commit reduces the maximum number of entries for all three cases,
it furthermore limits the maximum size of a serdata or xmsg that can be
cached, and finally instead of instantiating a separate allocator for
WHC nodes per WHC, it now shares one across all WHCs.  Total memory use
should now be limited to a couple of MB.

The caching can be disabled by setting ``FREELIST_TYPE`` to
``FREELIST_NONE`` in ``q_freelist.h``.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-05-02 11:23:35 +08:00 committed by eboasson
parent f084498615
commit c16babcd73
6 changed files with 89 additions and 20 deletions

View file

@ -88,7 +88,6 @@ struct whc_impl {
seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */
struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */
struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */
struct nn_freelist freelist; /* struct whc_node *; linked via whc_node::next_seq */
#if USE_EHH
struct ddsrt_ehh *seq_hash;
#else
@ -124,7 +123,7 @@ static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq);
static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn);
static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn);
static int compare_seq (const void *va, const void *vb);
static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list);
static void free_deferred_free_list (struct whc_node *deferred_free_list);
static void get_state_locked(const struct whc_impl *whc, struct whc_state *st);
static unsigned whc_default_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list);
@ -159,6 +158,17 @@ static const struct whc_ops whc_ops = {
.free = whc_default_free
};
/* Number of instantiated WHCs and a global freelist for WHC nodes that gets
initialized lazily and cleaned up automatically when the last WHC is freed.
Protected by dds_global.m_mutex.
sizeof (whc_node) on 64-bit machines ~ 100 bytes, so this is ~1MB
8k entries seems to be roughly the amount needed for minimum samples,
maximum message size and a short round-trip time */
#define MAX_FREELIST_SIZE 8192
static uint32_t whc_count;
static struct nn_freelist whc_node_freelist;
#if USE_EHH
static uint32_t whc_seq_entry_hash (const void *vn)
{
@ -379,8 +389,10 @@ struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth)
whc->open_intv = intv;
whc->maxseq_node = NULL;
/* hack */
nn_freelist_init (&whc->freelist, UINT32_MAX, offsetof (struct whc_node, next_seq));
ddsrt_mutex_lock (&dds_global.m_mutex);
if (whc_count++ == 0)
nn_freelist_init (&whc_node_freelist, MAX_FREELIST_SIZE, offsetof (struct whc_node, next_seq));
ddsrt_mutex_unlock (&dds_global.m_mutex);
check_whc (whc);
return (struct whc *)whc;
@ -425,7 +437,11 @@ DDSRT_WARNING_MSVC_ON(6001);
}
ddsrt_avl_free (&whc_seq_treedef, &whc->seq, ddsrt_free);
nn_freelist_fini (&whc->freelist, ddsrt_free);
ddsrt_mutex_lock (&dds_global.m_mutex);
if (--whc_count == 0)
nn_freelist_fini (&whc_node_freelist, ddsrt_free);
ddsrt_mutex_unlock (&dds_global.m_mutex);
#if USE_EHH
ddsrt_ehh_free (whc->seq_hash);
@ -753,11 +769,11 @@ static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn)
if (whcn_tmp->next_seq)
whcn_tmp->next_seq->prev_seq = whcn_tmp->prev_seq;
whcn_tmp->next_seq = NULL;
free_deferred_free_list (whc, whcn_tmp);
free_deferred_free_list (whcn_tmp);
whc->seq_size--;
}
static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list)
static void free_deferred_free_list (struct whc_node *deferred_free_list)
{
if (deferred_free_list)
{
@ -769,7 +785,7 @@ static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *defe
if (!cur->borrowed)
free_whc_node_contents (cur);
}
cur = nn_freelist_pushmany (&whc->freelist, deferred_free_list, last, n);
cur = nn_freelist_pushmany (&whc_node_freelist, deferred_free_list, last, n);
while (cur)
{
struct whc_node *tmp = cur;
@ -781,8 +797,8 @@ static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *defe
static void whc_default_free_deferred_free_list (struct whc *whc_generic, struct whc_node *deferred_free_list)
{
struct whc_impl * const whc = (struct whc_impl *)whc_generic;
free_deferred_free_list(whc, deferred_free_list);
(void) whc_generic;
free_deferred_free_list (deferred_free_list);
}
static unsigned whc_default_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list)
@ -1028,7 +1044,7 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma
{
struct whc_node *newn = NULL;
if ((newn = nn_freelist_pop (&whc->freelist)) == NULL)
if ((newn = nn_freelist_pop (&whc_node_freelist)) == NULL)
newn = ddsrt_malloc (sizeof (*newn));
newn->seq = seq;
newn->plist = plist;