remove rmbias_and_adjust assert on threadid (#121)

The introduction of multiple receive threads could trigger the assertion
because a set of samples ready for delivery may have been received by
multiple threads (the problem manifests itself most easily with
fragmented samples). This is actually a non-issue:

* while synchronously processing a packet, there is a bias of 2**31
  added to the refcount, to prevent any thread under any circumstance
  from ever freeing the data;
* while data lives in the defragment buffers or reorder buffer of the
  proxy writer, a bias of 2**20 is added to it until this particular
  function is called, after delivery of the data to the readers, and
  (if needed) after inserting the samples in the reorder buffer of
  any readers that are out-of-sync with the proxy writer;
* the relevant refcount is updated atomically in such a manner that this
  particular operation atomically removes the bias and performs the
  delayed increment of the refcount to account for the data being stored
  in any of the defragmenting or reorder buffers;
* the only ordinary decrementing of the refcount happens either
  synchronously (if synchronous delivery is chosen), or asynchronously
  in a delivery queue thread, and so the entire mechanism exists to
  avoid premature freeing of the underlying data because the data is
  delivered very quickly (possibly synchronously);
* as the biases are removed after all the delayed refcount increments
  are taken into account and there are no increments following the call
  to rmbias_and_adjust, the "ordinary" decrements can do no harm.
* the case of data from multiple writers being combined in a single
  packet is dealt with by the 2**20 bias, and so there is potentially a
  problem if there are more than 2**20 out-of-sync readers attached to
  a single proxy writer, or data submessages from more than 2**11
  writers in a single packet. The minimum possible data message is 32
  bytes (headers, encoding, data, padding), so packets up to 64kB are
  safe.

None of this is in any way related to which threads originally accepted
the packets, and therefore I see no argument for the existence of the
assertion.

That said, it is a rather complicated mechanism of unknown benefit, and
a major simplification is definitely something to be considered. In UDP
mode I see no chance of abuse, but there may be network protocols (TCP,
for sure) where there might be packets larger than 64kB and those could,
under worst-case assumptions, cause trouble. That, too, is a reason to
rethink it.

The call to rmbias_and_adjust was sometimes called with the proxy writer
locked, and sometimes after unlocking it. This commit changes it to
consistently call it with the lock held.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-03-27 17:29:26 +01:00
parent 44406ef6a7
commit 30142d1812
2 changed files with 11 additions and 39 deletions

View file

@ -639,11 +639,10 @@ static void nn_rmsg_addbias (struct nn_rmsg *rmsg)
static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust)
{
/* This can happen to any rmsg referenced by an sample still
progressing through the pipeline, but only by the receive
progressing through the pipeline, but only by a receive
thread. Can't require it to be uncommitted. */
uint32_t sub;
DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_and_adjust(%p, %d)\n", (void *) rmsg, adjust);
ASSERT_RBUFPOOL_OWNER (rmsg->chunk.rbuf->rbufpool);
assert (adjust >= 0);
assert ((uint32_t) adjust < RMSG_REFCOUNT_RDATA_BIAS);
sub = RMSG_REFCOUNT_RDATA_BIAS - (uint32_t) adjust;
@ -652,15 +651,6 @@ static void nn_rmsg_rmbias_and_adjust (struct nn_rmsg *rmsg, int adjust)
nn_rmsg_free (rmsg);
}
static void nn_rmsg_rmbias_anythread (struct nn_rmsg *rmsg)
{
/* For removing garbage when freeing a nn_defrag. */
uint32_t sub = RMSG_REFCOUNT_RDATA_BIAS;
DDS_LOG(DDS_LC_RADMIN, "rmsg_rmbias_anythread(%p)\n", (void *) rmsg);
assert (ddsrt_atomic_ld32 (&rmsg->refcount) >= sub);
if (ddsrt_atomic_sub32_nv (&rmsg->refcount, sub) == 0)
nn_rmsg_free (rmsg);
}
static void nn_rmsg_unref (struct nn_rmsg *rmsg)
{
DDS_LOG(DDS_LC_RADMIN, "rmsg_unref(%p)\n", (void *) rmsg);
@ -752,16 +742,6 @@ static void nn_rdata_rmbias_and_adjust (struct nn_rdata *rdata, int adjust)
nn_rmsg_rmbias_and_adjust (rdata->rmsg, adjust);
}
static void nn_rdata_rmbias_anythread (struct nn_rdata *rdata)
{
DDS_LOG(DDS_LC_RADMIN, "rdata_rmbias_anythread(%p)\n", (void *) rdata);
#ifndef NDEBUG
if (ddsrt_atomic_dec32_ov (&rdata->refcount_bias_added) != 1)
abort ();
#endif
nn_rmsg_rmbias_anythread (rdata->rmsg);
}
static void nn_rdata_unref (struct nn_rdata *rdata)
{
DDS_LOG(DDS_LC_RADMIN, "rdata_rdata_unref(%p)\n", (void *) rdata);
@ -900,29 +880,21 @@ struct nn_defrag *nn_defrag_new (enum nn_defrag_drop_mode drop_mode, uint32_t ma
void nn_fragchain_adjust_refcount (struct nn_rdata *frag, int adjust)
{
struct nn_rdata *frag1;
DDS_LOG(DDS_LC_RADMIN, "fragchain_adjust_refcount(%p, %d)\n", (void *) frag, adjust);
while (frag)
{
frag1 = frag->nextfrag;
struct nn_rdata * const frag1 = frag->nextfrag;
nn_rdata_rmbias_and_adjust (frag, adjust);
frag = frag1;
}
}
static void nn_fragchain_rmbias_anythread (struct nn_rdata *frag, UNUSED_ARG (int adjust))
static void nn_fragchain_rmbias (struct nn_rdata *frag)
{
struct nn_rdata *frag1;
DDS_LOG(DDS_LC_RADMIN, "fragchain_rmbias_anythread(%p)\n", (void *) frag);
while (frag)
{
frag1 = frag->nextfrag;
nn_rdata_rmbias_anythread (frag);
frag = frag1;
}
nn_fragchain_adjust_refcount (frag, 0);
}
static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample, void (*fragchain_free) (struct nn_rdata *frag, int adjust))
static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rsample)
{
/* Can't reference rsample after the first fragchain_free, because
we don't know which rdata/rmsg provides the storage for the
@ -938,7 +910,7 @@ static void defrag_rsample_drop (struct nn_defrag *defrag, struct nn_rsample *rs
assert (defrag->n_samples > 0);
defrag->n_samples--;
for (iv = ut_avlIterFirst (&rsample_defrag_fragtree_treedef, &rsample->u.defrag.fragtree, &iter); iv; iv = ut_avlIterNext (&iter))
fragchain_free (iv->first, 0);
nn_fragchain_rmbias (iv->first);
}
void nn_defrag_free (struct nn_defrag *defrag)
@ -948,7 +920,7 @@ void nn_defrag_free (struct nn_defrag *defrag)
while (s)
{
DDS_LOG(DDS_LC_RADMIN, "defrag_free(%p, sample %p seq %"PRId64")\n", (void *) defrag, (void *) s, s->u.defrag.seq);
defrag_rsample_drop (defrag, s, nn_fragchain_rmbias_anythread);
defrag_rsample_drop (defrag, s);
s = ut_avlFindMin (&defrag_sampletree_treedef, &defrag->sampletree);
}
assert (defrag->n_samples == 0);
@ -1312,7 +1284,7 @@ static int defrag_limit_samples (struct nn_defrag *defrag, seqno_t seq, seqno_t
break;
}
assert (sample_to_drop != NULL);
defrag_rsample_drop (defrag, sample_to_drop, nn_fragchain_adjust_refcount);
defrag_rsample_drop (defrag, sample_to_drop);
if (sample_to_drop == defrag->max_sample)
{
defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree);
@ -1444,7 +1416,7 @@ void nn_defrag_notegap (struct nn_defrag *defrag, seqno_t min, seqno_t maxp1)
while (s && s->u.defrag.seq < maxp1)
{
struct nn_rsample *s1 = ut_avlFindSucc (&defrag_sampletree_treedef, &defrag->sampletree, s);
defrag_rsample_drop (defrag, s, nn_fragchain_adjust_refcount);
defrag_rsample_drop (defrag, s);
s = s1;
}
defrag->max_sample = ut_avlFindMax (&defrag_sampletree_treedef, &defrag->sampletree);

View file

@ -2304,8 +2304,8 @@ static int handle_SPDP (const struct nn_rsample_info *sampleinfo, struct nn_rdat
fragchain = nn_rsample_fragchain (rsample);
if ((rres = nn_reorder_rsample (&sc, gv.spdp_reorder, rsample, &refc_adjust, nn_dqueue_is_full (gv.builtins_dqueue))) > 0)
nn_dqueue_enqueue (gv.builtins_dqueue, &sc, rres);
ddsrt_mutex_unlock (&gv.spdp_lock);
nn_fragchain_adjust_refcount (fragchain, refc_adjust);
ddsrt_mutex_unlock (&gv.spdp_lock);
return 0;
}
@ -2342,8 +2342,8 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con
ddsrt_mutex_lock (&pwr->e.lock);
wn = ut_avlLookup (&pwr_readers_treedef, &pwr->readers, &dst);
gap_was_valuable = handle_one_gap (pwr, wn, sampleinfo->seq, sampleinfo->seq+1, gap, &refc_adjust);
ddsrt_mutex_unlock (&pwr->e.lock);
nn_fragchain_adjust_refcount (gap, refc_adjust);
ddsrt_mutex_unlock (&pwr->e.lock);
if (gap_was_valuable)
{