diff --git a/src/core/ddsi/src/q_config.c b/src/core/ddsi/src/q_config.c index e318181..596fb3e 100644 --- a/src/core/ddsi/src/q_config.c +++ b/src/core/ddsi/src/q_config.c @@ -248,7 +248,7 @@ static const struct cfgelem general_cfgelems[] = { "

This element controls whether DDSI2E uses multicasts for data traffic.

\n\

It is a comma-separated list of some of the following keywords: \"spdp\", \"asm\", \"ssm\", or either of \"false\" or \"true\".

\n\ \n\ @@ -654,7 +654,7 @@ END_MARKER static const struct cfgelem sizing_cfgelems[] = { { LEAF("ReceiveBufferSize"), 1, "1 MiB", ABSOFF(rbuf_size), 0, uf_memsize, 0, pf_memsize, - "

This element sets the size of a single receive buffer. Many receive buffers may be needed. Their size must be greater than ReceiveBufferChunkSize by a modest amount.

" }, + "

This element sets the size of a single receive buffer. Many receive buffers may be needed. The minimum workable size a little bit larger than Sizing/ReceiveBufferChunkSize, and the value used is taken as the configured value and the actual minimum workable size.

" }, { LEAF("ReceiveBufferChunkSize"), 1, "128 KiB", ABSOFF(rmsg_chunk_size), 0, uf_memsize, 0, pf_memsize, "

This element specifies the size of one allocation unit in the receive buffer. Must be greater than the maximum packet size by a modest amount (too large packets are dropped). Each allocation is shrunk immediately after processing a message, or freed straightaway.

" }, END_MARKER diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index c6f4a22..20eaa66 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -330,7 +330,13 @@ struct nn_rbufpool *nn_rbufpool_new (uint32_t rbuf_size, uint32_t max_rmsg_size) struct nn_rbufpool *rbp; assert (max_rmsg_size > 0); - assert (rbuf_size >= max_rmsg_size_w_hdr (max_rmsg_size)); + + /* raise rbuf_size to minimum possible considering max_rmsg_size, there is + no reason to bother the user with the small difference between the two + when he tries to configure things, and the crash is horrible when + rbuf_size is too small */ + if (rbuf_size < max_rmsg_size_w_hdr (max_rmsg_size)) + rbuf_size = max_rmsg_size_w_hdr (max_rmsg_size); if ((rbp = ddsrt_malloc (sizeof (*rbp))) == NULL) goto fail_rbp; @@ -633,11 +639,10 @@ static void nn_rmsg_addbias (struct nn_rmsg *rmsg) static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust) { /* This can happen to any rmsg referenced by an sample still - progressing through the pipeline, but only by the receive + progressing through the pipeline, but only by a receive thread. Can't require it to be uncommitted. */ uint32_t sub; DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_and_adjust(%p, %d)\n", (void *) rmsg, adjust); - ASSERT_RBUFPOOL_OWNER (rmsg->chunk.rbuf->rbufpool); assert (adjust >= 0); assert ((uint32_t) adjust < RMSG_REFCOUNT_RDATA_BIAS); sub = RMSG_REFCOUNT_RDATA_BIAS - (uint32_t) adjust; @@ -646,15 +651,6 @@ static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust) nn_rmsg_free (rmsg); } -static void nn_rmsg_rmbias_anythread (struct nn_rmsg *rmsg) -{ - /* For removing garbage when freeing a nn_defrag. */ - uint32_t sub = RMSG_REFCOUNT_RDATA_BIAS; - DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_anythread(%p)\n", (void *) rmsg); - assert (ddsrt_atomic_ld32 (&rmsg->refcount) >= sub); - if (ddsrt_atomic_sub32_nv (&rmsg->refcount, sub) == 0) - nn_rmsg_free (rmsg); -} static void nn_rmsg_unref (struct nn_rmsg *rmsg) { DDS_LOG(DDS_LC_RADMIN, "rmsg_unref(%p)\n", (void *) rmsg); @@ -746,16 +742,6 @@ static void nn_rdata_rmbias_and_adjust (struct nn_rdata *rdata, int adjust) nn_rmsg_rmbias_and_adjust (rdata->rmsg, adjust); } -static void nn_rdata_rmbias_anythread (struct nn_rdata *rdata) -{ - DDS_LOG(DDS_LC_RADMIN, "rdata_rmbias_anythread(%p)\n", (void *) rdata); -#ifndef NDEBUG - if (ddsrt_atomic_dec32_ov (&rdata->refcount_bias_added) != 1) - abort (); -#endif - nn_rmsg_rmbias_anythread (rdata->rmsg); -} - static void nn_rdata_unref (struct nn_rdata *rdata) { DDS_LOG(DDS_LC_RADMIN, "rdata_rdata_unref(%p)\n", (void *) rdata); @@ -894,29 +880,21 @@ struct nn_defrag *nn_defrag_new (enum nn_defrag_drop_mode drop_mode, uint32_t ma void nn_fragchain_adjust_refcount (struct nn_rdata *frag, int adjust) { - struct nn_rdata *frag1; DDS_LOG(DDS_LC_RADMIN, "fragchain_adjust_refcount(%p, %d)\n", (void *) frag, adjust); while (frag) { - frag1 = frag->nextfrag; + struct nn_rdata * const frag1 = frag->nextfrag; nn_rdata_rmbias_and_adjust (frag, adjust); frag = frag1; } } -static void nn_fragchain_rmbias_anythread (struct nn_rdata *frag, UNUSED_ARG (int adjust)) +static void nn_fragchain_rmbias (struct nn_rdata *frag) { - struct nn_rdata *frag1; - DDS_LOG(DDS_LC_RADMIN, "fragchain_rmbias_anythread(%p)\n", (void *) frag); - while (frag) - { - frag1 = frag->nextfrag; - nn_rdata_rmbias_anythread (frag); - frag = frag1; - } + nn_fragchain_adjust_refcount (frag, 0); } -static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample, void (*fragchain_free) (struct nn_rdata *frag, int adjust)) +static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample) { /* Can't reference rsample after the first fragchain_free, because we don't know which rdata/rmsg provides the storage for the @@ -932,7 +910,7 @@ static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rs assert (defrag->n_samples > 0); defrag->n_samples--; for (iv = ut_avlIterFirst (&rsample_defrag_fragtree_treedef, &rsample->u.defrag.fragtree, &iter); iv; iv = ut_avlIterNext (&iter)) - fragchain_free (iv->first, 0); + nn_fragchain_rmbias (iv->first); } void nn_defrag_free (struct nn_defrag *defrag) @@ -942,7 +920,7 @@ void nn_defrag_free (struct nn_defrag *defrag) while (s) { DDS_LOG(DDS_LC_RADMIN, "defrag_free(%p, sample %p seq %"PRId64")\n", (void *) defrag, (void *) s, s->u.defrag.seq); - defrag_rsample_drop (defrag, s, nn_fragchain_rmbias_anythread); + defrag_rsample_drop (defrag, s); s = ut_avlFindMin (&defrag_sampletree_treedef, &defrag->sampletree); } assert (defrag->n_samples == 0); @@ -1306,7 +1284,7 @@ static int defrag_limit_samples (struct nn_defrag *defrag, seqno_t seq, seqno_t break; } assert (sample_to_drop != NULL); - defrag_rsample_drop (defrag, sample_to_drop, nn_fragchain_adjust_refcount); + defrag_rsample_drop (defrag, sample_to_drop); if (sample_to_drop == defrag->max_sample) { defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree); @@ -1438,7 +1416,7 @@ void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1) while (s && s->u.defrag.seq < maxp1) { struct nn_rsample *s1 = ut_avlFindSucc (&defrag_sampletree_treedef, &defrag->sampletree, s); - defrag_rsample_drop (defrag, s, nn_fragchain_adjust_refcount); + defrag_rsample_drop (defrag, s); s = s1; } defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree); diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 7de04a6..d09c89d 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -2304,8 +2304,8 @@ static int handle_SPDP (const struct nn_rsample_info *sampleinfo, struct nn_rdat fragchain = nn_rsample_fragchain (rsample); if ((rres = nn_reorder_rsample (&sc, gv.spdp_reorder, rsample, &refc_adjust, nn_dqueue_is_full (gv.builtins_dqueue))) > 0) nn_dqueue_enqueue (gv.builtins_dqueue, &sc, rres); - ddsrt_mutex_unlock (&gv.spdp_lock); nn_fragchain_adjust_refcount (fragchain, refc_adjust); + ddsrt_mutex_unlock (&gv.spdp_lock); return 0; } @@ -2342,8 +2342,8 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con ddsrt_mutex_lock (&pwr->e.lock); wn = ut_avlLookup (&pwr_readers_treedef, &pwr->readers, &dst); gap_was_valuable = handle_one_gap (pwr, wn, sampleinfo->seq, sampleinfo->seq+1, gap, &refc_adjust); - ddsrt_mutex_unlock (&pwr->e.lock); nn_fragchain_adjust_refcount (gap, refc_adjust); + ddsrt_mutex_unlock (&pwr->e.lock); if (gap_was_valuable) {