diff --git a/src/core/ddsc/src/dds_whc.c b/src/core/ddsc/src/dds_whc.c index b63ca62..2992990 100644 --- a/src/core/ddsc/src/dds_whc.c +++ b/src/core/ddsc/src/dds_whc.c @@ -88,7 +88,6 @@ struct whc_impl { seqno_t max_drop_seq; /* samples in whc with seq <= max_drop_seq => transient-local */ struct whc_intvnode *open_intv; /* interval where next sample will go (usually) */ struct whc_node *maxseq_node; /* NULL if empty; if not in open_intv, open_intv is empty */ - struct nn_freelist freelist; /* struct whc_node *; linked via whc_node::next_seq */ #if USE_EHH struct ddsrt_ehh *seq_hash; #else @@ -124,7 +123,7 @@ static struct whc_node *whc_findseq (const struct whc_impl *whc, seqno_t seq); static void insert_whcn_in_hash (struct whc_impl *whc, struct whc_node *whcn); static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn); static int compare_seq (const void *va, const void *vb); -static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list); +static void free_deferred_free_list (struct whc_node *deferred_free_list); static void get_state_locked(const struct whc_impl *whc, struct whc_state *st); static unsigned whc_default_remove_acked_messages_full (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list); @@ -159,6 +158,17 @@ static const struct whc_ops whc_ops = { .free = whc_default_free }; +/* Number of instantiated WHCs and a global freelist for WHC nodes that gets + initialized lazily and cleaned up automatically when the last WHC is freed. + Protected by dds_global.m_mutex. + + sizeof (whc_node) on 64-bit machines ~ 100 bytes, so this is ~1MB + 8k entries seems to be roughly the amount needed for minimum samples, + maximum message size and a short round-trip time */ +#define MAX_FREELIST_SIZE 8192 +static uint32_t whc_count; +static struct nn_freelist whc_node_freelist; + #if USE_EHH static uint32_t whc_seq_entry_hash (const void *vn) { @@ -379,8 +389,10 @@ struct whc *whc_new (int is_transient_local, unsigned hdepth, unsigned tldepth) whc->open_intv = intv; whc->maxseq_node = NULL; - /* hack */ - nn_freelist_init (&whc->freelist, UINT32_MAX, offsetof (struct whc_node, next_seq)); + ddsrt_mutex_lock (&dds_global.m_mutex); + if (whc_count++ == 0) + nn_freelist_init (&whc_node_freelist, MAX_FREELIST_SIZE, offsetof (struct whc_node, next_seq)); + ddsrt_mutex_unlock (&dds_global.m_mutex); check_whc (whc); return (struct whc *)whc; @@ -425,7 +437,11 @@ DDSRT_WARNING_MSVC_ON(6001); } ddsrt_avl_free (&whc_seq_treedef, &whc->seq, ddsrt_free); - nn_freelist_fini (&whc->freelist, ddsrt_free); + + ddsrt_mutex_lock (&dds_global.m_mutex); + if (--whc_count == 0) + nn_freelist_fini (&whc_node_freelist, ddsrt_free); + ddsrt_mutex_unlock (&dds_global.m_mutex); #if USE_EHH ddsrt_ehh_free (whc->seq_hash); @@ -753,11 +769,11 @@ static void whc_delete_one (struct whc_impl *whc, struct whc_node *whcn) if (whcn_tmp->next_seq) whcn_tmp->next_seq->prev_seq = whcn_tmp->prev_seq; whcn_tmp->next_seq = NULL; - free_deferred_free_list (whc, whcn_tmp); + free_deferred_free_list (whcn_tmp); whc->seq_size--; } -static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *deferred_free_list) +static void free_deferred_free_list (struct whc_node *deferred_free_list) { if (deferred_free_list) { @@ -769,7 +785,7 @@ static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *defe if (!cur->borrowed) free_whc_node_contents (cur); } - cur = nn_freelist_pushmany (&whc->freelist, deferred_free_list, last, n); + cur = nn_freelist_pushmany (&whc_node_freelist, deferred_free_list, last, n); while (cur) { struct whc_node *tmp = cur; @@ -781,8 +797,8 @@ static void free_deferred_free_list (struct whc_impl *whc, struct whc_node *defe static void whc_default_free_deferred_free_list (struct whc *whc_generic, struct whc_node *deferred_free_list) { - struct whc_impl * const whc = (struct whc_impl *)whc_generic; - free_deferred_free_list(whc, deferred_free_list); + (void) whc_generic; + free_deferred_free_list (deferred_free_list); } static unsigned whc_default_remove_acked_messages_noidx (struct whc_impl *whc, seqno_t max_drop_seq, struct whc_node **deferred_free_list) @@ -1028,7 +1044,7 @@ static struct whc_node *whc_default_insert_seq (struct whc_impl *whc, seqno_t ma { struct whc_node *newn = NULL; - if ((newn = nn_freelist_pop (&whc->freelist)) == NULL) + if ((newn = nn_freelist_pop (&whc_node_freelist)) == NULL) newn = ddsrt_malloc (sizeof (*newn)); newn->seq = seq; newn->plist = plist; diff --git a/src/core/ddsi/include/dds/ddsi/q_freelist.h b/src/core/ddsi/include/dds/ddsi/q_freelist.h index 8fcafa7..75b4fc0 100644 --- a/src/core/ddsi/include/dds/ddsi/q_freelist.h +++ b/src/core/ddsi/include/dds/ddsi/q_freelist.h @@ -19,7 +19,7 @@ extern "C" { #endif -#define FREELIST_SIMPLE 1 +#define FREELIST_NONE 1 #define FREELIST_ATOMIC_LIFO 2 #define FREELIST_DOUBLE 3 @@ -33,7 +33,13 @@ extern "C" { #endif #endif -#if FREELIST_TYPE == FREELIST_ATOMIC_LIFO +#if FREELIST_TYPE == FREELIST_NONE + +struct nn_freelist { + char dummy; +}; + +#elif FREELIST_TYPE == FREELIST_ATOMIC_LIFO struct nn_freelist { ddsrt_atomic_lifo_t x; diff --git a/src/core/ddsi/src/ddsi_serdata_default.c b/src/core/ddsi/src/ddsi_serdata_default.c index 03e340e..167cc8b 100644 --- a/src/core/ddsi/src/ddsi_serdata_default.c +++ b/src/core/ddsi/src/ddsi_serdata_default.c @@ -27,7 +27,12 @@ #include "dds/ddsi/q_globals.h" #include "dds/ddsi/ddsi_serdata_default.h" -#define MAX_POOL_SIZE 16384 +/* 8k entries in the freelist seems to be roughly the amount needed to send + minimum-size (well, 4 bytes) samples as fast as possible over loopback + while using large messages -- actually, it stands to reason that this would + be the same as the WHC node pool size */ +#define MAX_POOL_SIZE 8192 +#define MAX_SIZE_FOR_POOL 256 #define CLEAR_PADDING 0 #ifndef NDEBUG @@ -205,7 +210,7 @@ static void serdata_default_free(struct ddsi_serdata *dcmn) { struct ddsi_serdata_default *d = (struct ddsi_serdata_default *)dcmn; assert(ddsrt_atomic_ld32(&d->c.refc) == 0); - if (!nn_freelist_push (&gv.serpool->freelist, d)) + if (d->size > MAX_SIZE_FOR_POOL || !nn_freelist_push (&gv.serpool->freelist, d)) dds_free (d); } diff --git a/src/core/ddsi/src/q_freelist.c b/src/core/ddsi/src/q_freelist.c index 78e2a77..fd7aef7 100644 --- a/src/core/ddsi/src/q_freelist.c +++ b/src/core/ddsi/src/q_freelist.c @@ -18,7 +18,37 @@ #include "dds/ddsrt/threads.h" #include "dds/ddsi/q_freelist.h" -#if FREELIST_TYPE == FREELIST_ATOMIC_LIFO +#if FREELIST_TYPE == FREELIST_NONE + +void nn_freelist_init (struct nn_freelist *fl, uint32_t max, off_t linkoff) +{ + (void) fl; (void) max; (void) linkoff; +} + +void nn_freelist_fini (struct nn_freelist *fl, void (*free) (void *elem)) +{ + (void) fl; (void) free; +} + +bool nn_freelist_push (struct nn_freelist *fl, void *elem) +{ + (void) fl; (void) elem; + return false; +} + +void *nn_freelist_pushmany (struct nn_freelist *fl, void *first, void *last, uint32_t n) +{ + (void) fl; (void) first; (void) last; (void) n; + return first; +} + +void *nn_freelist_pop (struct nn_freelist *fl) +{ + (void) fl; + return NULL; +} + +#elif FREELIST_TYPE == FREELIST_ATOMIC_LIFO void nn_freelist_init (struct nn_freelist *fl, uint32_t max, off_t linkoff) { diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 7401d3b..3a208ba 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -400,7 +400,9 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struct ddsi_serdata *serdata, struct nn_xmsg **pmsg) { #define TEST_KEYHASH 0 - const size_t expected_inline_qos_size = 4+8+20+4 + 32; + /* actual expected_inline_qos_size is typically 0, but always claiming 32 bytes won't make + a difference, so no point in being precise */ + const size_t expected_inline_qos_size = /* statusinfo */ 8 + /* keyhash */ 20 + /* sentinel */ 4; struct nn_xmsg_marker sm_marker; unsigned char contentflag = 0; Data_t *data; @@ -423,6 +425,7 @@ static int create_fragment_message_simple (struct writer *wr, seqno_t seq, struc ASSERT_MUTEX_HELD (&wr->e.lock); + /* INFO_TS: 12 bytes, Data_t: 24 bytes, expected inline QoS: 32 => should be single chunk */ if ((*pmsg = nn_xmsg_new (gv.xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (Data_t) + expected_inline_qos_size, NN_XMSG_KIND_DATA)) == NULL) return Q_ERR_OUT_OF_MEMORY; @@ -484,7 +487,9 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli Expected inline QoS size: header(4) + statusinfo(8) + keyhash(20) + sentinel(4). Plus some spare cos I can't be bothered. */ const int set_smhdr_flags_asif_data = config.buggy_datafrag_flags_mode; - const size_t expected_inline_qos_size = 4+8+20+4 + 32; + /* actual expected_inline_qos_size is typically 0, but always claiming 32 bytes won't make + a difference, so no point in being precise */ + const size_t expected_inline_qos_size = /* statusinfo */ 8 + /* keyhash */ 20 + /* sentinel */ 4; struct nn_xmsg_marker sm_marker; void *sm; Data_DataFrag_common_t *ddcmn; @@ -508,6 +513,7 @@ int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_pli fragging = (config.fragment_size < size); + /* INFO_TS: 12 bytes, DataFrag_t: 36 bytes, expected inline QoS: 32 => should be single chunk */ if ((*pmsg = nn_xmsg_new (gv.xmsgpool, &wr->e.guid.prefix, sizeof (InfoTimestamp_t) + sizeof (DataFrag_t) + expected_inline_qos_size, xmsg_kind)) == NULL) return Q_ERR_OUT_OF_MEMORY; diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index 3e8e0fa..3c0eae0 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -241,13 +241,18 @@ static size_t align4u (size_t x) Great expectations, but so far still wanting. */ +/* We need about as many as will fit in a message; an otherwise unadorned data message is ~ 40 bytes + for a really small sample, no key hash, no status info, and message sizes are (typically) < 64kB + so we can expect not to need more than ~ 1600 xmsg at a time. Powers-of-two are nicer :) */ +#define MAX_FREELIST_SIZE 2048 + static void nn_xmsg_realfree (struct nn_xmsg *m); struct nn_xmsgpool *nn_xmsgpool_new (void) { struct nn_xmsgpool *pool; pool = ddsrt_malloc (sizeof (*pool)); - nn_freelist_init (&pool->freelist, UINT32_MAX, offsetof (struct nn_xmsg, link.older)); + nn_freelist_init (&pool->freelist, MAX_FREELIST_SIZE, offsetof (struct nn_xmsg, link.older)); return pool; } @@ -352,7 +357,8 @@ void nn_xmsg_free (struct nn_xmsg *m) unref_addrset (m->dstaddr.all.as); unref_addrset (m->dstaddr.all.as_group); } - if (!nn_freelist_push (&pool->freelist, m)) + /* Only cache the smallest xmsgs; data messages store the payload by reference and are small */ + if (m->maxsz > NN_XMSG_CHUNK_SIZE || !nn_freelist_push (&pool->freelist, m)) { nn_xmsg_realfree (m); }