Merge pull request #92 from eboasson/issue-83
Avoid delivery of history to volatile reader
This commit is contained in:
commit
f863975504
2 changed files with 36 additions and 17 deletions
|
@ -1831,14 +1831,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
|
|||
{
|
||||
m->in_sync = PRMSS_SYNC;
|
||||
}
|
||||
else if (last_deliv_seq == 0)
|
||||
{
|
||||
/* proxy-writer hasn't seen any data yet, in which case this reader is in sync with the proxy writer (i.e., no reader-specific reorder buffer needed), but still should generate a notification when all historical data has been received, except for the built-in ones (for now anyway, it may turn out to be useful for determining discovery status). */
|
||||
m->in_sync = PRMSS_TLCATCHUP;
|
||||
m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER;
|
||||
if (m->in_sync != PRMSS_SYNC)
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " - tlcatchup");
|
||||
}
|
||||
else if (!config.conservative_builtin_reader_startup && is_builtin_entityid (rd->e.guid.entityid, ownvendorid) && !ut_avlIsEmpty (&pwr->readers))
|
||||
{
|
||||
/* builtins really don't care about multiple copies */
|
||||
|
@ -1848,8 +1840,16 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
|
|||
{
|
||||
/* normal transient-local, reader is behind proxy writer */
|
||||
m->in_sync = PRMSS_OUT_OF_SYNC;
|
||||
m->u.not_in_sync.end_of_tl_seq = pwr->last_seq;
|
||||
m->u.not_in_sync.end_of_out_of_sync_seq = last_deliv_seq;
|
||||
if (last_deliv_seq == 0)
|
||||
{
|
||||
m->u.not_in_sync.end_of_out_of_sync_seq = MAX_SEQ_NUMBER;
|
||||
m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER;
|
||||
}
|
||||
else
|
||||
{
|
||||
m->u.not_in_sync.end_of_tl_seq = pwr->last_seq;
|
||||
m->u.not_in_sync.end_of_out_of_sync_seq = last_deliv_seq;
|
||||
}
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " - out-of-sync %"PRId64, m->u.not_in_sync.end_of_out_of_sync_seq);
|
||||
}
|
||||
if (m->in_sync != PRMSS_SYNC)
|
||||
|
|
|
@ -1210,6 +1210,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
the range of available sequence numbers is is interpreted here as
|
||||
a gap [1,a). See also handle_Gap. */
|
||||
const seqno_t firstseq = fromSN (msg->firstSN);
|
||||
const seqno_t lastseq = fromSN (msg->lastSN);
|
||||
struct handle_Heartbeat_helper_arg arg;
|
||||
struct proxy_writer *pwr;
|
||||
nn_guid_t src, dst;
|
||||
|
@ -1219,8 +1220,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
dst.prefix = rst->dst_guid_prefix;
|
||||
dst.entityid = msg->readerId;
|
||||
|
||||
DDS_TRACE("HEARTBEAT(%s#%d:%"PRId64"..%"PRId64" ", msg->smhdr.flags & HEARTBEAT_FLAG_FINAL ? "F" : "",
|
||||
msg->count, firstseq, fromSN (msg->lastSN));
|
||||
DDS_TRACE("HEARTBEAT(%s#%d:%"PRId64"..%"PRId64" ", msg->smhdr.flags & HEARTBEAT_FLAG_FINAL ? "F" : "", msg->count, firstseq, lastseq);
|
||||
|
||||
if (!rst->forme)
|
||||
{
|
||||
|
@ -1242,14 +1242,33 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
|
||||
os_mutexLock (&pwr->e.lock);
|
||||
|
||||
pwr->have_seen_heartbeat = 1;
|
||||
if (fromSN (msg->lastSN) > pwr->last_seq)
|
||||
if (!pwr->have_seen_heartbeat)
|
||||
{
|
||||
pwr->last_seq = fromSN (msg->lastSN);
|
||||
struct nn_rdata *gap;
|
||||
struct nn_rsample_chain sc;
|
||||
int refc_adjust = 0;
|
||||
nn_reorder_result_t res;
|
||||
|
||||
nn_defrag_notegap (pwr->defrag, 1, lastseq + 1);
|
||||
gap = nn_rdata_newgap (rmsg);
|
||||
if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, lastseq + 1, &refc_adjust)) > 0)
|
||||
{
|
||||
if (pwr->deliver_synchronously)
|
||||
deliver_user_data_synchronously (&sc);
|
||||
else
|
||||
nn_dqueue_enqueue (pwr->dqueue, &sc, res);
|
||||
}
|
||||
nn_fragchain_adjust_refcount (gap, refc_adjust);
|
||||
pwr->have_seen_heartbeat = 1;
|
||||
}
|
||||
|
||||
if (lastseq > pwr->last_seq)
|
||||
{
|
||||
pwr->last_seq = lastseq;
|
||||
pwr->last_fragnum = ~0u;
|
||||
pwr->last_fragnum_reset = 0;
|
||||
}
|
||||
else if (pwr->last_fragnum != ~0u && fromSN (msg->lastSN) == pwr->last_seq)
|
||||
else if (pwr->last_fragnum != ~0u && lastseq == pwr->last_seq)
|
||||
{
|
||||
if (!pwr->last_fragnum_reset)
|
||||
pwr->last_fragnum_reset = 1;
|
||||
|
@ -1297,7 +1316,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
}
|
||||
if (wn->u.not_in_sync.end_of_tl_seq == MAX_SEQ_NUMBER)
|
||||
{
|
||||
wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN);
|
||||
wn->u.not_in_sync.end_of_out_of_sync_seq = wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN);
|
||||
DDS_TRACE(" end-of-tl-seq(rd %x:%x:%x:%x #%"PRId64")", PGUID(wn->rd_guid), wn->u.not_in_sync.end_of_tl_seq);
|
||||
}
|
||||
maybe_set_reader_in_sync (pwr, wn, last_deliv_seq);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue