diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index 41c6192..20eaa66 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -639,11 +639,10 @@ static void nn_rmsg_addbias (struct nn_rmsg *rmsg) static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust) { /* This can happen to any rmsg referenced by an sample still - progressing through the pipeline, but only by the receive + progressing through the pipeline, but only by a receive thread. Can't require it to be uncommitted. */ uint32_t sub; DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_and_adjust(%p, %d)\n", (void *) rmsg, adjust); - ASSERT_RBUFPOOL_OWNER (rmsg->chunk.rbuf->rbufpool); assert (adjust >= 0); assert ((uint32_t) adjust < RMSG_REFCOUNT_RDATA_BIAS); sub = RMSG_REFCOUNT_RDATA_BIAS - (uint32_t) adjust; @@ -652,15 +651,6 @@ static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust) nn_rmsg_free (rmsg); } -static void nn_rmsg_rmbias_anythread (struct nn_rmsg *rmsg) -{ - /* For removing garbage when freeing a nn_defrag. */ - uint32_t sub = RMSG_REFCOUNT_RDATA_BIAS; - DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_anythread(%p)\n", (void *) rmsg); - assert (ddsrt_atomic_ld32 (&rmsg->refcount) >= sub); - if (ddsrt_atomic_sub32_nv (&rmsg->refcount, sub) == 0) - nn_rmsg_free (rmsg); -} static void nn_rmsg_unref (struct nn_rmsg *rmsg) { DDS_LOG(DDS_LC_RADMIN, "rmsg_unref(%p)\n", (void *) rmsg); @@ -752,16 +742,6 @@ static void nn_rdata_rmbias_and_adjust (struct nn_rdata *rdata, int adjust) nn_rmsg_rmbias_and_adjust (rdata->rmsg, adjust); } -static void nn_rdata_rmbias_anythread (struct nn_rdata *rdata) -{ - DDS_LOG(DDS_LC_RADMIN, "rdata_rmbias_anythread(%p)\n", (void *) rdata); -#ifndef NDEBUG - if (ddsrt_atomic_dec32_ov (&rdata->refcount_bias_added) != 1) - abort (); -#endif - nn_rmsg_rmbias_anythread (rdata->rmsg); -} - static void nn_rdata_unref (struct nn_rdata *rdata) { DDS_LOG(DDS_LC_RADMIN, "rdata_rdata_unref(%p)\n", (void *) rdata); @@ -900,29 +880,21 @@ struct nn_defrag *nn_defrag_new (enum nn_defrag_drop_mode drop_mode, uint32_t ma void nn_fragchain_adjust_refcount (struct nn_rdata *frag, int adjust) { - struct nn_rdata *frag1; DDS_LOG(DDS_LC_RADMIN, "fragchain_adjust_refcount(%p, %d)\n", (void *) frag, adjust); while (frag) { - frag1 = frag->nextfrag; + struct nn_rdata * const frag1 = frag->nextfrag; nn_rdata_rmbias_and_adjust (frag, adjust); frag = frag1; } } -static void nn_fragchain_rmbias_anythread (struct nn_rdata *frag, UNUSED_ARG (int adjust)) +static void nn_fragchain_rmbias (struct nn_rdata *frag) { - struct nn_rdata *frag1; - DDS_LOG(DDS_LC_RADMIN, "fragchain_rmbias_anythread(%p)\n", (void *) frag); - while (frag) - { - frag1 = frag->nextfrag; - nn_rdata_rmbias_anythread (frag); - frag = frag1; - } + nn_fragchain_adjust_refcount (frag, 0); } -static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample, void (*fragchain_free) (struct nn_rdata *frag, int adjust)) +static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample) { /* Can't reference rsample after the first fragchain_free, because we don't know which rdata/rmsg provides the storage for the @@ -938,7 +910,7 @@ static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rs assert (defrag->n_samples > 0); defrag->n_samples--; for (iv = ut_avlIterFirst (&rsample_defrag_fragtree_treedef, &rsample->u.defrag.fragtree, &iter); iv; iv = ut_avlIterNext (&iter)) - fragchain_free (iv->first, 0); + nn_fragchain_rmbias (iv->first); } void nn_defrag_free (struct nn_defrag *defrag) @@ -948,7 +920,7 @@ void nn_defrag_free (struct nn_defrag *defrag) while (s) { DDS_LOG(DDS_LC_RADMIN, "defrag_free(%p, sample %p seq %"PRId64")\n", (void *) defrag, (void *) s, s->u.defrag.seq); - defrag_rsample_drop (defrag, s, nn_fragchain_rmbias_anythread); + defrag_rsample_drop (defrag, s); s = ut_avlFindMin (&defrag_sampletree_treedef, &defrag->sampletree); } assert (defrag->n_samples == 0); @@ -1312,7 +1284,7 @@ static int defrag_limit_samples (struct nn_defrag *defrag, seqno_t seq, seqno_t break; } assert (sample_to_drop != NULL); - defrag_rsample_drop (defrag, sample_to_drop, nn_fragchain_adjust_refcount); + defrag_rsample_drop (defrag, sample_to_drop); if (sample_to_drop == defrag->max_sample) { defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree); @@ -1444,7 +1416,7 @@ void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1) while (s && s->u.defrag.seq < maxp1) { struct nn_rsample *s1 = ut_avlFindSucc (&defrag_sampletree_treedef, &defrag->sampletree, s); - defrag_rsample_drop (defrag, s, nn_fragchain_adjust_refcount); + defrag_rsample_drop (defrag, s); s = s1; } defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree); diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 7de04a6..d09c89d 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -2304,8 +2304,8 @@ static int handle_SPDP (const struct nn_rsample_info *sampleinfo, struct nn_rdat fragchain = nn_rsample_fragchain (rsample); if ((rres = nn_reorder_rsample (&sc, gv.spdp_reorder, rsample, &refc_adjust, nn_dqueue_is_full (gv.builtins_dqueue))) > 0) nn_dqueue_enqueue (gv.builtins_dqueue, &sc, rres); - ddsrt_mutex_unlock (&gv.spdp_lock); nn_fragchain_adjust_refcount (fragchain, refc_adjust); + ddsrt_mutex_unlock (&gv.spdp_lock); return 0; } @@ -2342,8 +2342,8 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con ddsrt_mutex_lock (&pwr->e.lock); wn = ut_avlLookup (&pwr_readers_treedef, &pwr->readers, &dst); gap_was_valuable = handle_one_gap (pwr, wn, sampleinfo->seq, sampleinfo->seq+1, gap, &refc_adjust); - ddsrt_mutex_unlock (&pwr->e.lock); nn_fragchain_adjust_refcount (gap, refc_adjust); + ddsrt_mutex_unlock (&pwr->e.lock); if (gap_was_valuable) {