Retain less data in keep-last WHC in absence of ACKs
A keep-last volatile WHC retained data already overwritten by the writer
in the absence of ACKs, introduced by 231cb8c9
.
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
6ed190ce2a
commit
701c6f5a5c
2 changed files with 36 additions and 5 deletions
|
@ -1311,7 +1311,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
|
||||||
newn->idxnode = idxn;
|
newn->idxnode = idxn;
|
||||||
newn->idxnode_pos = idxn->headidx;
|
newn->idxnode_pos = idxn->headidx;
|
||||||
|
|
||||||
if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && whc->wrinfo.tldepth > 0)
|
if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && (!whc->wrinfo.is_transient_local || whc->wrinfo.tldepth > 0))
|
||||||
{
|
{
|
||||||
TRACE (" prune whcn %p", (void *)oldn);
|
TRACE (" prune whcn %p", (void *)oldn);
|
||||||
assert (oldn != whc->maxseq_node || whc->wrinfo.has_deadline);
|
assert (oldn != whc->maxseq_node || whc->wrinfo.has_deadline);
|
||||||
|
|
|
@ -110,27 +110,41 @@ static dds_entity_t create_and_sync_reader(dds_entity_t subscriber, dds_entity_t
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
|
static void get_writer_whc_state (dds_entity_t writer, struct whc_state *whcst)
|
||||||
{
|
{
|
||||||
struct dds_entity *wr_entity;
|
struct dds_entity *wr_entity;
|
||||||
struct writer *wr;
|
struct writer *wr;
|
||||||
struct whc_state whcst;
|
|
||||||
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0);
|
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0);
|
||||||
thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv);
|
thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv);
|
||||||
wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid);
|
wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid);
|
||||||
CU_ASSERT_FATAL(wr != NULL);
|
CU_ASSERT_FATAL(wr != NULL);
|
||||||
assert(wr != NULL); /* for Clang's static analyzer */
|
assert(wr != NULL); /* for Clang's static analyzer */
|
||||||
whc_get_state(wr->whc, &whcst);
|
whc_get_state(wr->whc, whcst);
|
||||||
thread_state_asleep(lookup_thread_state());
|
thread_state_asleep(lookup_thread_state());
|
||||||
dds_entity_unpin(wr_entity);
|
dds_entity_unpin(wr_entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void check_intermediate_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
|
||||||
|
{
|
||||||
|
struct whc_state whcst;
|
||||||
|
get_writer_whc_state (writer, &whcst);
|
||||||
|
/* WHC must not contain any samples < exp_min and must contain at least exp_max if it
|
||||||
|
contains at least one sample. (We never know for certain when ACKs arrive.) */
|
||||||
|
printf(" -- intermediate state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max);
|
||||||
|
CU_ASSERT_FATAL (whcst.min_seq >= exp_min || (whcst.min_seq == -1 && whcst.max_seq == -1));
|
||||||
|
CU_ASSERT_FATAL (whcst.max_seq == exp_max || (whcst.min_seq == -1 && whcst.max_seq == -1));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max)
|
||||||
|
{
|
||||||
|
struct whc_state whcst;
|
||||||
|
get_writer_whc_state (writer, &whcst);
|
||||||
printf(" -- final state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max);
|
printf(" -- final state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max);
|
||||||
CU_ASSERT_EQUAL_FATAL (whcst.unacked_bytes, 0);
|
CU_ASSERT_EQUAL_FATAL (whcst.unacked_bytes, 0);
|
||||||
CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min);
|
CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min);
|
||||||
CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max);
|
CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#define V DDS_DURABILITY_VOLATILE
|
#define V DDS_DURABILITY_VOLATILE
|
||||||
#define TL DDS_DURABILITY_TRANSIENT_LOCAL
|
#define TL DDS_DURABILITY_TRANSIENT_LOCAL
|
||||||
#define R DDS_RELIABILITY_RELIABLE
|
#define R DDS_RELIABILITY_RELIABLE
|
||||||
|
@ -191,6 +205,23 @@ static void test_whc_end_state(dds_durability_kind_t d, dds_reliability_kind_t r
|
||||||
ret = dds_write (writer, &sample_keyless);
|
ret = dds_write (writer, &sample_keyless);
|
||||||
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
|
CU_ASSERT_FATAL (ret == DDS_RETCODE_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* if history is truly keep last, there may never be more data present than the max of the
|
||||||
|
history depth(s) */
|
||||||
|
if (r == R && h != KA && (d == V || dh != KA))
|
||||||
|
{
|
||||||
|
if (rrd || d != V)
|
||||||
|
{
|
||||||
|
int32_t depth = (d == V || hd >= dhd) ? hd : dhd;
|
||||||
|
int32_t exp_max = ni * (s + 1);
|
||||||
|
int32_t exp_min = exp_max - ni * (depth - 1) - (ni - 1);
|
||||||
|
check_intermediate_whc_state (writer, exp_min, exp_max);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
check_intermediate_whc_state (writer, -1, -1);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* delete readers, wait until no matching reader */
|
/* delete readers, wait until no matching reader */
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue