Avoid delivery of history to volatile reader
When a remote writer is discovery, a proxy_writer object representation that writer is created without yet having any knowledge of what the current sequence number for that writer is. If a local reader is matched with that proxy writer before a Heartbeat has been recevied and this sequence number information is known, all historical data will be made available to that reader, even if it is volatile. By treating the first Heartbeat specially, by moving the next sequence number to be delivered as fresh data forward to the next sequence number, retrieval of historical data is avoided. Transient-local readers have a separate ("out-of-sync") route to request it anyway. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
e518f9d0e2
commit
72c1920eed
2 changed files with 36 additions and 17 deletions
|
@ -1831,14 +1831,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
|
|||
{
|
||||
m->in_sync = PRMSS_SYNC;
|
||||
}
|
||||
else if (last_deliv_seq == 0)
|
||||
{
|
||||
/* proxy-writer hasn't seen any data yet, in which case this reader is in sync with the proxy writer (i.e., no reader-specific reorder buffer needed), but still should generate a notification when all historical data has been received, except for the built-in ones (for now anyway, it may turn out to be useful for determining discovery status). */
|
||||
m->in_sync = PRMSS_TLCATCHUP;
|
||||
m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER;
|
||||
if (m->in_sync != PRMSS_SYNC)
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " - tlcatchup");
|
||||
}
|
||||
else if (!config.conservative_builtin_reader_startup && is_builtin_entityid (rd->e.guid.entityid, ownvendorid) && !ut_avlIsEmpty (&pwr->readers))
|
||||
{
|
||||
/* builtins really don't care about multiple copies */
|
||||
|
@ -1848,8 +1840,16 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
|
|||
{
|
||||
/* normal transient-local, reader is behind proxy writer */
|
||||
m->in_sync = PRMSS_OUT_OF_SYNC;
|
||||
m->u.not_in_sync.end_of_tl_seq = pwr->last_seq;
|
||||
m->u.not_in_sync.end_of_out_of_sync_seq = last_deliv_seq;
|
||||
if (last_deliv_seq == 0)
|
||||
{
|
||||
m->u.not_in_sync.end_of_out_of_sync_seq = MAX_SEQ_NUMBER;
|
||||
m->u.not_in_sync.end_of_tl_seq = MAX_SEQ_NUMBER;
|
||||
}
|
||||
else
|
||||
{
|
||||
m->u.not_in_sync.end_of_tl_seq = pwr->last_seq;
|
||||
m->u.not_in_sync.end_of_out_of_sync_seq = last_deliv_seq;
|
||||
}
|
||||
DDS_LOG(DDS_LC_DISCOVERY, " - out-of-sync %"PRId64, m->u.not_in_sync.end_of_out_of_sync_seq);
|
||||
}
|
||||
if (m->in_sync != PRMSS_SYNC)
|
||||
|
|
|
@ -1210,6 +1210,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
the range of available sequence numbers is is interpreted here as
|
||||
a gap [1,a). See also handle_Gap. */
|
||||
const seqno_t firstseq = fromSN (msg->firstSN);
|
||||
const seqno_t lastseq = fromSN (msg->lastSN);
|
||||
struct handle_Heartbeat_helper_arg arg;
|
||||
struct proxy_writer *pwr;
|
||||
nn_guid_t src, dst;
|
||||
|
@ -1219,8 +1220,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
dst.prefix = rst->dst_guid_prefix;
|
||||
dst.entityid = msg->readerId;
|
||||
|
||||
DDS_TRACE("HEARTBEAT(%s#%d:%"PRId64"..%"PRId64" ", msg->smhdr.flags & HEARTBEAT_FLAG_FINAL ? "F" : "",
|
||||
msg->count, firstseq, fromSN (msg->lastSN));
|
||||
DDS_TRACE("HEARTBEAT(%s#%d:%"PRId64"..%"PRId64" ", msg->smhdr.flags & HEARTBEAT_FLAG_FINAL ? "F" : "", msg->count, firstseq, lastseq);
|
||||
|
||||
if (!rst->forme)
|
||||
{
|
||||
|
@ -1242,14 +1242,33 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
|
||||
os_mutexLock (&pwr->e.lock);
|
||||
|
||||
pwr->have_seen_heartbeat = 1;
|
||||
if (fromSN (msg->lastSN) > pwr->last_seq)
|
||||
if (!pwr->have_seen_heartbeat)
|
||||
{
|
||||
pwr->last_seq = fromSN (msg->lastSN);
|
||||
struct nn_rdata *gap;
|
||||
struct nn_rsample_chain sc;
|
||||
int refc_adjust = 0;
|
||||
nn_reorder_result_t res;
|
||||
|
||||
nn_defrag_notegap (pwr->defrag, 1, lastseq + 1);
|
||||
gap = nn_rdata_newgap (rmsg);
|
||||
if ((res = nn_reorder_gap (&sc, pwr->reorder, gap, 1, lastseq + 1, &refc_adjust)) > 0)
|
||||
{
|
||||
if (pwr->deliver_synchronously)
|
||||
deliver_user_data_synchronously (&sc);
|
||||
else
|
||||
nn_dqueue_enqueue (pwr->dqueue, &sc, res);
|
||||
}
|
||||
nn_fragchain_adjust_refcount (gap, refc_adjust);
|
||||
pwr->have_seen_heartbeat = 1;
|
||||
}
|
||||
|
||||
if (lastseq > pwr->last_seq)
|
||||
{
|
||||
pwr->last_seq = lastseq;
|
||||
pwr->last_fragnum = ~0u;
|
||||
pwr->last_fragnum_reset = 0;
|
||||
}
|
||||
else if (pwr->last_fragnum != ~0u && fromSN (msg->lastSN) == pwr->last_seq)
|
||||
else if (pwr->last_fragnum != ~0u && lastseq == pwr->last_seq)
|
||||
{
|
||||
if (!pwr->last_fragnum_reset)
|
||||
pwr->last_fragnum_reset = 1;
|
||||
|
@ -1297,7 +1316,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
|
|||
}
|
||||
if (wn->u.not_in_sync.end_of_tl_seq == MAX_SEQ_NUMBER)
|
||||
{
|
||||
wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN);
|
||||
wn->u.not_in_sync.end_of_out_of_sync_seq = wn->u.not_in_sync.end_of_tl_seq = fromSN (msg->lastSN);
|
||||
DDS_TRACE(" end-of-tl-seq(rd %x:%x:%x:%x #%"PRId64")", PGUID(wn->rd_guid), wn->u.not_in_sync.end_of_tl_seq);
|
||||
}
|
||||
maybe_set_reader_in_sync (pwr, wn, last_deliv_seq);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue