Merge pull request #140 from eboasson/master

Remove incorrect assert on thread ids in receive buffer management
This commit is contained in:
eboasson 2019-04-01 09:05:04 +02:00 committed by GitHub
commit bd3188af5b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 42 deletions

View file

@ -248,7 +248,7 @@ static const struct cfgelem general_cfgelems[] = {
"<p>This element controls whether DDSI2E uses multicasts for data traffic.</p>\n\ "<p>This element controls whether DDSI2E uses multicasts for data traffic.</p>\n\
<p>It is a comma-separated list of some of the following keywords: \"spdp\", \"asm\", \"ssm\", or either of \"false\" or \"true\".</p>\n\ <p>It is a comma-separated list of some of the following keywords: \"spdp\", \"asm\", \"ssm\", or either of \"false\" or \"true\".</p>\n\
<ul>\n\ <ul>\n\
<li><i>spdp</i>: enables the use of ASM (any-source multicast) for participant discovery</li>\n\ <li><i>spdp</i>: enables the use of ASM (any-source multicast) for participant discovery, joining the multicast group on the discovery socket, transmitting SPDP messages to this group, but never advertising nor using any multicast address in any discovery message, thus forcing unicast communications for all endpoint discovery and user data.</li>\n\
<li><i>asm</i>: enables the use of ASM for all traffic (including SPDP)</li>\n\ <li><i>asm</i>: enables the use of ASM for all traffic (including SPDP)</li>\n\
<li><i>ssm</i>: enables the use of SSM (source-specific multicast) for all non-SPDP traffic (if supported)</li>\n\ <li><i>ssm</i>: enables the use of SSM (source-specific multicast) for all non-SPDP traffic (if supported)</li>\n\
</ul>\n\ </ul>\n\
@ -654,7 +654,7 @@ END_MARKER
static const struct cfgelem sizing_cfgelems[] = static const struct cfgelem sizing_cfgelems[] =
{ {
{ LEAF("ReceiveBufferSize"), 1, "1 MiB", ABSOFF(rbuf_size), 0, uf_memsize, 0, pf_memsize, { LEAF("ReceiveBufferSize"), 1, "1 MiB", ABSOFF(rbuf_size), 0, uf_memsize, 0, pf_memsize,
"<p>This element sets the size of a single receive buffer. Many receive buffers may be needed. Their size must be greater than ReceiveBufferChunkSize by a modest amount.</p>" }, "<p>This element sets the size of a single receive buffer. Many receive buffers may be needed. The minimum workable size a little bit larger than Sizing/ReceiveBufferChunkSize, and the value used is taken as the configured value and the actual minimum workable size.</p>" },
{ LEAF("ReceiveBufferChunkSize"), 1, "128 KiB", ABSOFF(rmsg_chunk_size), 0, uf_memsize, 0, pf_memsize, { LEAF("ReceiveBufferChunkSize"), 1, "128 KiB", ABSOFF(rmsg_chunk_size), 0, uf_memsize, 0, pf_memsize,
"<p>This element specifies the size of one allocation unit in the receive buffer. Must be greater than the maximum packet size by a modest amount (too large packets are dropped). Each allocation is shrunk immediately after processing a message, or freed straightaway.</p>" }, "<p>This element specifies the size of one allocation unit in the receive buffer. Must be greater than the maximum packet size by a modest amount (too large packets are dropped). Each allocation is shrunk immediately after processing a message, or freed straightaway.</p>" },
END_MARKER END_MARKER

View file

@ -330,7 +330,13 @@ struct nn_rbufpool *nn_rbufpool_new (uint32_t rbuf_size, uint32_t max_rmsg_size)
struct nn_rbufpool *rbp; struct nn_rbufpool *rbp;
assert (max_rmsg_size > 0); assert (max_rmsg_size > 0);
assert (rbuf_size >= max_rmsg_size_w_hdr (max_rmsg_size));
/* raise rbuf_size to minimum possible considering max_rmsg_size, there is
no reason to bother the user with the small difference between the two
when he tries to configure things, and the crash is horrible when
rbuf_size is too small */
if (rbuf_size < max_rmsg_size_w_hdr (max_rmsg_size))
rbuf_size = max_rmsg_size_w_hdr (max_rmsg_size);
if ((rbp = ddsrt_malloc (sizeof (*rbp))) == NULL) if ((rbp = ddsrt_malloc (sizeof (*rbp))) == NULL)
goto fail_rbp; goto fail_rbp;
@ -633,11 +639,10 @@ static void nn_rmsg_addbias (struct nn_rmsg *rmsg)
static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust) static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust)
{ {
/* This can happen to any rmsg referenced by an sample still /* This can happen to any rmsg referenced by an sample still
progressing through the pipeline, but only by the receive progressing through the pipeline, but only by a receive
thread. Can't require it to be uncommitted. */ thread. Can't require it to be uncommitted. */
uint32_t sub; uint32_t sub;
DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_and_adjust(%p, %d)\n", (void *) rmsg, adjust); DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_and_adjust(%p, %d)\n", (void *) rmsg, adjust);
ASSERT_RBUFPOOL_OWNER (rmsg->chunk.rbuf->rbufpool);
assert (adjust >= 0); assert (adjust >= 0);
assert ((uint32_t) adjust < RMSG_REFCOUNT_RDATA_BIAS); assert ((uint32_t) adjust < RMSG_REFCOUNT_RDATA_BIAS);
sub = RMSG_REFCOUNT_RDATA_BIAS - (uint32_t) adjust; sub = RMSG_REFCOUNT_RDATA_BIAS - (uint32_t) adjust;
@ -646,15 +651,6 @@ static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust)
nn_rmsg_free (rmsg); nn_rmsg_free (rmsg);
} }
static void nn_rmsg_rmbias_anythread (struct nn_rmsg *rmsg)
{
/* For removing garbage when freeing a nn_defrag. */
uint32_t sub = RMSG_REFCOUNT_RDATA_BIAS;
DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_anythread(%p)\n", (void *) rmsg);
assert (ddsrt_atomic_ld32 (&rmsg->refcount) >= sub);
if (ddsrt_atomic_sub32_nv (&rmsg->refcount, sub) == 0)
nn_rmsg_free (rmsg);
}
static void nn_rmsg_unref (struct nn_rmsg *rmsg) static void nn_rmsg_unref (struct nn_rmsg *rmsg)
{ {
DDS_LOG(DDS_LC_RADMIN, "rmsg_unref(%p)\n", (void *) rmsg); DDS_LOG(DDS_LC_RADMIN, "rmsg_unref(%p)\n", (void *) rmsg);
@ -746,16 +742,6 @@ static void nn_rdata_rmbias_and_adjust (struct nn_rdata *rdata, int adjust)
nn_rmsg_rmbias_and_adjust (rdata->rmsg, adjust); nn_rmsg_rmbias_and_adjust (rdata->rmsg, adjust);
} }
static void nn_rdata_rmbias_anythread (struct nn_rdata *rdata)
{
DDS_LOG(DDS_LC_RADMIN, "rdata_rmbias_anythread(%p)\n", (void *) rdata);
#ifndef NDEBUG
if (ddsrt_atomic_dec32_ov (&rdata->refcount_bias_added) != 1)
abort ();
#endif
nn_rmsg_rmbias_anythread (rdata->rmsg);
}
static void nn_rdata_unref (struct nn_rdata *rdata) static void nn_rdata_unref (struct nn_rdata *rdata)
{ {
DDS_LOG(DDS_LC_RADMIN, "rdata_rdata_unref(%p)\n", (void *) rdata); DDS_LOG(DDS_LC_RADMIN, "rdata_rdata_unref(%p)\n", (void *) rdata);
@ -894,29 +880,21 @@ struct nn_defrag *nn_defrag_new (enum nn_defrag_drop_mode drop_mode, uint32_t ma
void nn_fragchain_adjust_refcount (struct nn_rdata *frag, int adjust) void nn_fragchain_adjust_refcount (struct nn_rdata *frag, int adjust)
{ {
struct nn_rdata *frag1;
DDS_LOG(DDS_LC_RADMIN, "fragchain_adjust_refcount(%p, %d)\n", (void *) frag, adjust); DDS_LOG(DDS_LC_RADMIN, "fragchain_adjust_refcount(%p, %d)\n", (void *) frag, adjust);
while (frag) while (frag)
{ {
frag1 = frag->nextfrag; struct nn_rdata * const frag1 = frag->nextfrag;
nn_rdata_rmbias_and_adjust (frag, adjust); nn_rdata_rmbias_and_adjust (frag, adjust);
frag = frag1; frag = frag1;
} }
} }
static void nn_fragchain_rmbias_anythread (struct nn_rdata *frag, UNUSED_ARG (int adjust)) static void nn_fragchain_rmbias (struct nn_rdata *frag)
{ {
struct nn_rdata *frag1; nn_fragchain_adjust_refcount (frag, 0);
DDS_LOG(DDS_LC_RADMIN, "fragchain_rmbias_anythread(%p)\n", (void *) frag);
while (frag)
{
frag1 = frag->nextfrag;
nn_rdata_rmbias_anythread (frag);
frag = frag1;
}
} }
static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample, void (*fragchain_free) (struct nn_rdata *frag, int adjust)) static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample)
{ {
/* Can't reference rsample after the first fragchain_free, because /* Can't reference rsample after the first fragchain_free, because
we don't know which rdata/rmsg provides the storage for the we don't know which rdata/rmsg provides the storage for the
@ -932,7 +910,7 @@ static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rs
assert (defrag->n_samples > 0); assert (defrag->n_samples > 0);
defrag->n_samples--; defrag->n_samples--;
for (iv = ut_avlIterFirst (&rsample_defrag_fragtree_treedef, &rsample->u.defrag.fragtree, &iter); iv; iv = ut_avlIterNext (&iter)) for (iv = ut_avlIterFirst (&rsample_defrag_fragtree_treedef, &rsample->u.defrag.fragtree, &iter); iv; iv = ut_avlIterNext (&iter))
fragchain_free (iv->first, 0); nn_fragchain_rmbias (iv->first);
} }
void nn_defrag_free (struct nn_defrag *defrag) void nn_defrag_free (struct nn_defrag *defrag)
@ -942,7 +920,7 @@ void nn_defrag_free (struct nn_defrag *defrag)
while (s) while (s)
{ {
DDS_LOG(DDS_LC_RADMIN, "defrag_free(%p, sample %p seq %"PRId64")\n", (void *) defrag, (void *) s, s->u.defrag.seq); DDS_LOG(DDS_LC_RADMIN, "defrag_free(%p, sample %p seq %"PRId64")\n", (void *) defrag, (void *) s, s->u.defrag.seq);
defrag_rsample_drop (defrag, s, nn_fragchain_rmbias_anythread); defrag_rsample_drop (defrag, s);
s = ut_avlFindMin (&defrag_sampletree_treedef, &defrag->sampletree); s = ut_avlFindMin (&defrag_sampletree_treedef, &defrag->sampletree);
} }
assert (defrag->n_samples == 0); assert (defrag->n_samples == 0);
@ -1306,7 +1284,7 @@ static int defrag_limit_samples (struct nn_defrag *defrag, seqno_t seq, seqno_t
break; break;
} }
assert (sample_to_drop != NULL); assert (sample_to_drop != NULL);
defrag_rsample_drop (defrag, sample_to_drop, nn_fragchain_adjust_refcount); defrag_rsample_drop (defrag, sample_to_drop);
if (sample_to_drop == defrag->max_sample) if (sample_to_drop == defrag->max_sample)
{ {
defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree); defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree);
@ -1438,7 +1416,7 @@ void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1)
while (s && s->u.defrag.seq < maxp1) while (s && s->u.defrag.seq < maxp1)
{ {
struct nn_rsample *s1 = ut_avlFindSucc (&defrag_sampletree_treedef, &defrag->sampletree, s); struct nn_rsample *s1 = ut_avlFindSucc (&defrag_sampletree_treedef, &defrag->sampletree, s);
defrag_rsample_drop (defrag, s, nn_fragchain_adjust_refcount); defrag_rsample_drop (defrag, s);
s = s1; s = s1;
} }
defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree); defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree);

View file

@ -2304,8 +2304,8 @@ static int handle_SPDP (const struct nn_rsample_info *sampleinfo, struct nn_rdat
fragchain = nn_rsample_fragchain (rsample); fragchain = nn_rsample_fragchain (rsample);
if ((rres = nn_reorder_rsample (&sc, gv.spdp_reorder, rsample, &refc_adjust, nn_dqueue_is_full (gv.builtins_dqueue))) > 0) if ((rres = nn_reorder_rsample (&sc, gv.spdp_reorder, rsample, &refc_adjust, nn_dqueue_is_full (gv.builtins_dqueue))) > 0)
nn_dqueue_enqueue (gv.builtins_dqueue, &sc, rres); nn_dqueue_enqueue (gv.builtins_dqueue, &sc, rres);
ddsrt_mutex_unlock (&gv.spdp_lock);
nn_fragchain_adjust_refcount (fragchain, refc_adjust); nn_fragchain_adjust_refcount (fragchain, refc_adjust);
ddsrt_mutex_unlock (&gv.spdp_lock);
return 0; return 0;
} }
@ -2342,8 +2342,8 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
wn = ut_avlLookup (&pwr_readers_treedef, &pwr->readers, &dst); wn = ut_avlLookup (&pwr_readers_treedef, &pwr->readers, &dst);
gap_was_valuable = handle_one_gap (pwr, wn, sampleinfo->seq, sampleinfo->seq+1, gap, &refc_adjust); gap_was_valuable = handle_one_gap (pwr, wn, sampleinfo->seq, sampleinfo->seq+1, gap, &refc_adjust);
ddsrt_mutex_unlock (&pwr->e.lock);
nn_fragchain_adjust_refcount (gap, refc_adjust); nn_fragchain_adjust_refcount (gap, refc_adjust);
ddsrt_mutex_unlock (&pwr->e.lock);
if (gap_was_valuable) if (gap_was_valuable)
{ {