diff --git a/src/core/ddsc/src/dds_whc.c b/src/core/ddsc/src/dds_whc.c index de50ba4..d9b46e7 100644 --- a/src/core/ddsc/src/dds_whc.c +++ b/src/core/ddsc/src/dds_whc.c @@ -1311,7 +1311,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se newn->idxnode = idxn; newn->idxnode_pos = idxn->headidx; - if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && whc->wrinfo.tldepth > 0) + if (oldn && (whc->wrinfo.hdepth > 0 || oldn->seq <= max_drop_seq) && (!whc->wrinfo.is_transient_local || whc->wrinfo.tldepth > 0)) { TRACE (" prune whcn %p", (void *)oldn); assert (oldn != whc->maxseq_node || whc->wrinfo.has_deadline); diff --git a/src/core/ddsc/tests/whc.c b/src/core/ddsc/tests/whc.c index 212c51a..cdade5e 100644 --- a/src/core/ddsc/tests/whc.c +++ b/src/core/ddsc/tests/whc.c @@ -110,27 +110,41 @@ static dds_entity_t create_and_sync_reader(dds_entity_t subscriber, dds_entity_t return reader; } -static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max) +static void get_writer_whc_state (dds_entity_t writer, struct whc_state *whcst) { struct dds_entity *wr_entity; struct writer *wr; - struct whc_state whcst; CU_ASSERT_EQUAL_FATAL(dds_entity_pin(writer, &wr_entity), 0); thread_state_awake(lookup_thread_state(), &wr_entity->m_domain->gv); wr = entidx_lookup_writer_guid(wr_entity->m_domain->gv.entity_index, &wr_entity->m_guid); CU_ASSERT_FATAL(wr != NULL); assert(wr != NULL); /* for Clang's static analyzer */ - whc_get_state(wr->whc, &whcst); + whc_get_state(wr->whc, whcst); thread_state_asleep(lookup_thread_state()); dds_entity_unpin(wr_entity); +} +static void check_intermediate_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max) +{ + struct whc_state whcst; + get_writer_whc_state (writer, &whcst); + /* WHC must not contain any samples < exp_min and must contain at least exp_max if it + contains at least one sample. (We never know for certain when ACKs arrive.) */ + printf(" -- intermediate state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max); + CU_ASSERT_FATAL (whcst.min_seq >= exp_min || (whcst.min_seq == -1 && whcst.max_seq == -1)); + CU_ASSERT_FATAL (whcst.max_seq == exp_max || (whcst.min_seq == -1 && whcst.max_seq == -1)); +} + +static void check_whc_state(dds_entity_t writer, seqno_t exp_min, seqno_t exp_max) +{ + struct whc_state whcst; + get_writer_whc_state (writer, &whcst); printf(" -- final state: unacked: %zu; min %"PRId64" (exp %"PRId64"); max %"PRId64" (exp %"PRId64")\n", whcst.unacked_bytes, whcst.min_seq, exp_min, whcst.max_seq, exp_max); CU_ASSERT_EQUAL_FATAL (whcst.unacked_bytes, 0); CU_ASSERT_EQUAL_FATAL (whcst.min_seq, exp_min); CU_ASSERT_EQUAL_FATAL (whcst.max_seq, exp_max); } - #define V DDS_DURABILITY_VOLATILE #define TL DDS_DURABILITY_TRANSIENT_LOCAL #define R DDS_RELIABILITY_RELIABLE @@ -191,6 +205,23 @@ static void test_whc_end_state(dds_durability_kind_t d, dds_reliability_kind_t r ret = dds_write (writer, &sample_keyless); CU_ASSERT_FATAL (ret == DDS_RETCODE_OK); } + + /* if history is truly keep last, there may never be more data present than the max of the + history depth(s) */ + if (r == R && h != KA && (d == V || dh != KA)) + { + if (rrd || d != V) + { + int32_t depth = (d == V || hd >= dhd) ? hd : dhd; + int32_t exp_max = ni * (s + 1); + int32_t exp_min = exp_max - ni * (depth - 1) - (ni - 1); + check_intermediate_whc_state (writer, exp_min, exp_max); + } + else + { + check_intermediate_whc_state (writer, -1, -1); + } + } } /* delete readers, wait until no matching reader */