Allow old NACKFRAG count after a delay

An asymmetrical disconnect where the reader undiscovers and rediscovers
the writer, but the reader remains alive all the time for the writer
results in the "count" field of NACKFRAGs restarting.  According to the
spec these must be ignored to protect against multi-pathing, but in this
scenario, ignoring them results in ignoring valid retransmit requests
until the "count" value catches up, which can take a very long time.

For ACKNACKs and HEARTBEATs the same problem exists, there it was
already handled by accepting backward jumps after some time has passed.
This reuses the same logic for NACKFRAGs.

This also changes the "count" fields to uint32_t throughout: the spec
defines them as int32_t, requires them to be strictly monotonically
increasing and omits any mention of a valid range or at what value the
counter should start.  Thus, everything in [-2^31,2^31-1] is allowed,
switching to an uint32_t merely shifts the range.  It also appears that
all implementations start at 0 or 1.  The "strictly monotonically" part
was impossible to do without disconnecting anyway.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-06-19 08:36:25 +02:00 committed by eboasson
parent fb7034e28f
commit 18d4bc9699
6 changed files with 46 additions and 39 deletions

View file

@ -131,9 +131,10 @@ struct wr_prd_match {
seqno_t last_seq; /* highest seq send to this reader used when filter is applied */ seqno_t last_seq; /* highest seq send to this reader used when filter is applied */
int32_t num_reliable_readers_where_seq_equals_max; int32_t num_reliable_readers_where_seq_equals_max;
ddsi_guid_t arbitrary_unacked_reader; ddsi_guid_t arbitrary_unacked_reader;
nn_count_t next_acknack; /* next acceptable acknack sequence number */ nn_count_t prev_acknack; /* latest accepted acknack sequence number */
nn_count_t next_nackfrag; /* next acceptable nackfrag sequence number */ nn_count_t prev_nackfrag; /* latest accepted nackfrag sequence number */
ddsrt_etime_t t_acknack_accepted; /* (local) time an acknack was last accepted */ ddsrt_etime_t t_acknack_accepted; /* (local) time an acknack was last accepted */
ddsrt_etime_t t_nackfrag_accepted; /* (local) time a nackfrag was last accepted */
struct nn_lat_estim hb_to_ack_latency; struct nn_lat_estim hb_to_ack_latency;
ddsrt_wctime_t hb_to_ack_latency_tlastlog; ddsrt_wctime_t hb_to_ack_latency_tlastlog;
uint32_t non_responsive_count; uint32_t non_responsive_count;
@ -154,7 +155,7 @@ struct pwr_rd_match {
ddsi_guid_t rd_guid; ddsi_guid_t rd_guid;
ddsrt_mtime_t tcreate; ddsrt_mtime_t tcreate;
nn_count_t count; /* most recent acknack sequence number */ nn_count_t count; /* most recent acknack sequence number */
nn_count_t next_heartbeat; /* next acceptable heartbeat (see also add_proxy_writer_to_reader) */ nn_count_t prev_heartbeat; /* latest accepted heartbeat (see also add_proxy_writer_to_reader) */
ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */ ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */
ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */
ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */

View file

@ -55,9 +55,7 @@ typedef struct nn_fragment_number_set_header {
#define NN_FRAGMENT_NUMBER_SET_MAX_BITS (256u) #define NN_FRAGMENT_NUMBER_SET_MAX_BITS (256u)
#define NN_FRAGMENT_NUMBER_SET_BITS_SIZE(numbits) ((unsigned) (4 * (((numbits) + 31) / 32))) #define NN_FRAGMENT_NUMBER_SET_BITS_SIZE(numbits) ((unsigned) (4 * (((numbits) + 31) / 32)))
#define NN_FRAGMENT_NUMBER_SET_SIZE(numbits) (sizeof (nn_fragment_number_set_header_t) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits)) #define NN_FRAGMENT_NUMBER_SET_SIZE(numbits) (sizeof (nn_fragment_number_set_header_t) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits))
typedef int32_t nn_count_t; typedef uint32_t nn_count_t;
#define DDSI_COUNT_MIN (-2147483647 - 1)
#define DDSI_COUNT_MAX (2147483647)
/* address field in locator maintained in network byte order, the rest in host */ /* address field in locator maintained in network byte order, the rest in host */
typedef struct { typedef struct {
const struct ddsi_tran_factory *tran; const struct ddsi_tran_factory *tran;

View file

@ -2102,14 +2102,14 @@ static void update_reader_init_acknack_count (const ddsrt_log_cfg_t *logcfg, con
/* Update the initial acknack sequence number for the reader. See /* Update the initial acknack sequence number for the reader. See
also reader_add_connection(). */ also reader_add_connection(). */
DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "update_reader_init_acknack_count ("PGUIDFMT", %"PRId32"): ", PGUID (*rd_guid), count); DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "update_reader_init_acknack_count ("PGUIDFMT", %"PRIu32"): ", PGUID (*rd_guid), count);
if ((rd = entidx_lookup_reader_guid (entidx, rd_guid)) != NULL) if ((rd = entidx_lookup_reader_guid (entidx, rd_guid)) != NULL)
{ {
ddsrt_mutex_lock (&rd->e.lock); ddsrt_mutex_lock (&rd->e.lock);
DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRId32" -> ", rd->init_acknack_count); DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRIu32" -> ", rd->init_acknack_count);
if (count > rd->init_acknack_count) if (count > rd->init_acknack_count)
rd->init_acknack_count = count; rd->init_acknack_count = count;
DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRId32"\n", count); DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRIu32"\n", count);
ddsrt_mutex_unlock (&rd->e.lock); ddsrt_mutex_unlock (&rd->e.lock);
} }
else else
@ -2208,11 +2208,12 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd,
pretend_everything_acked = 0; pretend_everything_acked = 0;
} }
ddsrt_mutex_unlock (&prd->e.lock); ddsrt_mutex_unlock (&prd->e.lock);
m->next_acknack = DDSI_COUNT_MIN; m->prev_acknack = 0;
m->next_nackfrag = DDSI_COUNT_MIN; m->prev_nackfrag = 0;
nn_lat_estim_init (&m->hb_to_ack_latency); nn_lat_estim_init (&m->hb_to_ack_latency);
m->hb_to_ack_latency_tlastlog = ddsrt_time_wallclock (); m->hb_to_ack_latency_tlastlog = ddsrt_time_wallclock ();
m->t_acknack_accepted.v = 0; m->t_acknack_accepted.v = 0;
m->t_nackfrag_accepted.v = 0;
ddsrt_mutex_lock (&wr->e.lock); ddsrt_mutex_lock (&wr->e.lock);
if (pretend_everything_acked) if (pretend_everything_acked)
@ -2364,7 +2365,7 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr,
writer will always see monotonically increasing sequence numbers writer will always see monotonically increasing sequence numbers
from one particular reader. This is then used for the from one particular reader. This is then used for the
pwr_rd_match initialization */ pwr_rd_match initialization */
ELOGDISC (rd, " reader "PGUIDFMT" init_acknack_count = %"PRId32"\n", ELOGDISC (rd, " reader "PGUIDFMT" init_acknack_count = %"PRIu32"\n",
PGUID (rd->e.guid), rd->init_acknack_count); PGUID (rd->e.guid), rd->init_acknack_count);
*init_count = rd->init_acknack_count; *init_count = rd->init_acknack_count;
@ -2494,7 +2495,7 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
If we don't mind those extra AckNacks, we could track the count If we don't mind those extra AckNacks, we could track the count
at the proxy-writer and simply treat all incoming heartbeats as at the proxy-writer and simply treat all incoming heartbeats as
undirected. */ undirected. */
m->next_heartbeat = DDSI_COUNT_MIN; m->prev_heartbeat = 0;
m->hb_timestamp.v = 0; m->hb_timestamp.v = 0;
m->t_heartbeat_accepted.v = 0; m->t_heartbeat_accepted.v = 0;
m->t_last_nack.v = 0; m->t_last_nack.v = 0;
@ -3578,9 +3579,9 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
wr->seq = 0; wr->seq = 0;
wr->cs_seq = 0; wr->cs_seq = 0;
ddsrt_atomic_st64 (&wr->seq_xmit, (uint64_t) 0); ddsrt_atomic_st64 (&wr->seq_xmit, (uint64_t) 0);
wr->hbcount = 0; wr->hbcount = 1;
wr->state = WRST_OPERATIONAL; wr->state = WRST_OPERATIONAL;
wr->hbfragcount = 0; wr->hbfragcount = 1;
writer_hbcontrol_init (&wr->hbcontrol); writer_hbcontrol_init (&wr->hbcontrol);
wr->throttling = 0; wr->throttling = 0;
wr->retransmitting = 0; wr->retransmitting = 0;
@ -4288,7 +4289,7 @@ static dds_return_t new_reader_guid
rd->topic = ddsi_sertopic_ref (topic); rd->topic = ddsi_sertopic_ref (topic);
rd->ddsi2direct_cb = 0; rd->ddsi2direct_cb = 0;
rd->ddsi2direct_cbarg = 0; rd->ddsi2direct_cbarg = 0;
rd->init_acknack_count = 0; rd->init_acknack_count = 1;
rd->num_writers = 0; rd->num_writers = 0;
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
rd->favours_ssm = 0; rd->favours_ssm = 0;
@ -5436,8 +5437,8 @@ int new_proxy_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid,
pwr->n_reliable_readers = 0; pwr->n_reliable_readers = 0;
pwr->n_readers_out_of_sync = 0; pwr->n_readers_out_of_sync = 0;
pwr->last_seq = 0; pwr->last_seq = 0;
pwr->last_fragnum = ~0u; pwr->last_fragnum = UINT32_MAX;
pwr->nackfragcount = 0; pwr->nackfragcount = 1;
pwr->last_fragnum_reset = 0; pwr->last_fragnum_reset = 0;
pwr->alive = 1; pwr->alive = 1;
pwr->alive_vclock = 0; pwr->alive_vclock = 0;

View file

@ -152,7 +152,7 @@ static int valid_AckNack (const struct receiver_state *rst, AckNack_t *msg, size
if (byteswap) if (byteswap)
{ {
bswap_sequence_number_set_bitmap (&msg->readerSNState, msg->bits); bswap_sequence_number_set_bitmap (&msg->readerSNState, msg->bits);
*count = ddsrt_bswap4 (*count); *count = ddsrt_bswap4u (*count);
} }
return 1; return 1;
} }
@ -221,7 +221,7 @@ static int valid_Heartbeat (Heartbeat_t *msg, size_t size, int byteswap)
{ {
bswapSN (&msg->firstSN); bswapSN (&msg->firstSN);
bswapSN (&msg->lastSN); bswapSN (&msg->lastSN);
msg->count = ddsrt_bswap4 (msg->count); msg->count = ddsrt_bswap4u (msg->count);
} }
msg->readerId = nn_ntoh_entityid (msg->readerId); msg->readerId = nn_ntoh_entityid (msg->readerId);
msg->writerId = nn_ntoh_entityid (msg->writerId); msg->writerId = nn_ntoh_entityid (msg->writerId);
@ -239,7 +239,7 @@ static int valid_HeartbeatFrag (HeartbeatFrag_t *msg, size_t size, int byteswap)
{ {
bswapSN (&msg->writerSN); bswapSN (&msg->writerSN);
msg->lastFragmentNum = ddsrt_bswap4u (msg->lastFragmentNum); msg->lastFragmentNum = ddsrt_bswap4u (msg->lastFragmentNum);
msg->count = ddsrt_bswap4 (msg->count); msg->count = ddsrt_bswap4u (msg->count);
} }
msg->readerId = nn_ntoh_entityid (msg->readerId); msg->readerId = nn_ntoh_entityid (msg->readerId);
msg->writerId = nn_ntoh_entityid (msg->writerId); msg->writerId = nn_ntoh_entityid (msg->writerId);
@ -274,7 +274,7 @@ static int valid_NackFrag (NackFrag_t *msg, size_t size, int byteswap)
if (byteswap) if (byteswap)
{ {
bswap_fragment_number_set_bitmap (&msg->fragmentNumberState, msg->bits); bswap_fragment_number_set_bitmap (&msg->fragmentNumberState, msg->bits);
*count = ddsrt_bswap4 (*count); *count = ddsrt_bswap4u (*count);
} }
return 1; return 1;
} }
@ -577,7 +577,7 @@ static int acknack_is_nack (const AckNack_t *msg)
return x != 0; return x != 0;
} }
static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_count, ddsrt_etime_t tnow, ddsrt_etime_t *t_last_accepted, int force_accept) static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *prev_count, ddsrt_etime_t tnow, ddsrt_etime_t *t_last_accepted, int force_accept)
{ {
/* AckNacks and Heartbeats with a sequence number (called "count" /* AckNacks and Heartbeats with a sequence number (called "count"
for some reason) equal to or less than the highest one received for some reason) equal to or less than the highest one received
@ -590,17 +590,22 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_cou
8.4.15.7 says: "New HEARTBEATS should have Counts greater than 8.4.15.7 says: "New HEARTBEATS should have Counts greater than
all older HEARTBEATs. Then, received HEARTBEATs with Counts not all older HEARTBEATs. Then, received HEARTBEATs with Counts not
greater than any previously received can be ignored." But it greater than any previously received can be ignored." But it
isn't clear whether that is about connections or entities, and isn't clear whether that is about connections or entities.
besides there is an issue with the wrap around after 2**31-1.
The type is defined in the spec as signed but without limiting
them to, e.g., positive numbers. Instead of implementing them as
spec'd, we implement it as unsigned to avoid integer overflow (and
the consequence undefined behaviour). Serial number arithmetic
deals with the wrap-around after 2**31-1.
This combined procedure should give the best of all worlds, and This combined procedure should give the best of all worlds, and
is not more expensive in the common case. */ is not more expensive in the common case. */
const int64_t timeout = DDS_SECS (2); const int64_t timeout = DDS_SECS (2);
if (new_count < *exp_count && tnow.v - t_last_accepted->v < timeout && !force_accept) if ((int32_t) (new_count - *prev_count) <= 0 && tnow.v - t_last_accepted->v < timeout && !force_accept)
return 0; return 0;
*exp_count = new_count + 1; *prev_count = new_count;
*t_last_accepted = tnow; *t_last_accepted = tnow;
return 1; return 1;
} }
@ -772,7 +777,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
rn->rexmit_requests++; rn->rexmit_requests++;
} }
if (!accept_ack_or_hb_w_timeout (*countp, &rn->next_acknack, tnow, &rn->t_acknack_accepted, is_preemptive_ack)) if (!accept_ack_or_hb_w_timeout (*countp, &rn->prev_acknack, tnow, &rn->t_acknack_accepted, is_preemptive_ack))
{ {
RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"])", PGUID (src), PGUID (dst)); RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"])", PGUID (src), PGUID (dst));
goto out; goto out;
@ -1100,7 +1105,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand
ASSERT_MUTEX_HELD (&pwr->e.lock); ASSERT_MUTEX_HELD (&pwr->e.lock);
/* Not supposed to respond to repeats and old heartbeats. */ /* Not supposed to respond to repeats and old heartbeats. */
if (!accept_ack_or_hb_w_timeout (msg->count, &wn->next_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) if (!accept_ack_or_hb_w_timeout (msg->count, &wn->prev_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0))
{ {
RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid)); RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid));
return; return;
@ -1528,12 +1533,11 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
} }
/* Ignore old NackFrags (see also handle_AckNack) */ /* Ignore old NackFrags (see also handle_AckNack) */
if (*countp < rn->next_nackfrag) if (!accept_ack_or_hb_w_timeout (*countp, &rn->prev_nackfrag, tnow, &rn->t_nackfrag_accepted, false))
{ {
RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"]", PGUID (src), PGUID (dst)); RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"]", PGUID (src), PGUID (dst));
goto out; goto out;
} }
rn->next_nackfrag = *countp + 1;
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst)); RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst));
/* Resend the requested fragments if we still have the sample, send /* Resend the requested fragments if we still have the sample, send

View file

@ -431,7 +431,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
hb->firstSN = toSN (min); hb->firstSN = toSN (min);
hb->lastSN = toSN (max); hb->lastSN = toSN (max);
hb->count = ++wr->hbcount; hb->count = wr->hbcount++;
nn_xmsg_submsg_setnext (msg, sm_marker); nn_xmsg_submsg_setnext (msg, sm_marker);
encode_datawriter_submsg(msg, sm_marker, wr); encode_datawriter_submsg(msg, sm_marker, wr);
@ -726,7 +726,7 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn
hbf->writerSN = toSN (seq); hbf->writerSN = toSN (seq);
hbf->lastFragmentNum = fragnum + 1; /* network format is 1 based */ hbf->lastFragmentNum = fragnum + 1; /* network format is 1 based */
hbf->count = ++wr->hbfragcount; hbf->count = wr->hbfragcount++;
nn_xmsg_submsg_setnext (*pmsg, sm_marker); nn_xmsg_submsg_setnext (*pmsg, sm_marker);
encode_datawriter_submsg(*pmsg, sm_marker, wr); encode_datawriter_submsg(*pmsg, sm_marker, wr);

View file

@ -943,11 +943,14 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits)); nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits));
nn_xmsg_submsg_setnext (msg, sm_marker); nn_xmsg_submsg_setnext (msg, sm_marker);
ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRId32":%"PRId64"/%"PRIu32":", if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE)
{
ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRIu32":%"PRId64"/%"PRIu32":",
PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count,
base, an->readerSNState.numbits); base, an->readerSNState.numbits);
for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0');
}
/* Encode the sub-message when needed. */ /* Encode the sub-message when needed. */
encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid);
@ -977,7 +980,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p
*countp = ++pwr->nackfragcount; *countp = ++pwr->nackfragcount;
nn_xmsg_submsg_setnext (msg, sm_marker); nn_xmsg_submsg_setnext (msg, sm_marker);
ETRACE (pwr, " + nackfrag #%"PRId32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); ETRACE (pwr, " + nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits);
for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++)
ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0');
} }