From 18d4bc96990b7387f968aee5ea96d7537bb7fc25 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 19 Jun 2020 08:36:25 +0200 Subject: [PATCH] Allow old NACKFRAG count after a delay An asymmetrical disconnect where the reader undiscovers and rediscovers the writer, but the reader remains alive all the time for the writer results in the "count" field of NACKFRAGs restarting. According to the spec these must be ignored to protect against multi-pathing, but in this scenario, ignoring them results in ignoring valid retransmit requests until the "count" value catches up, which can take a very long time. For ACKNACKs and HEARTBEATs the same problem exists, there it was already handled by accepting backward jumps after some time has passed. This reuses the same logic for NACKFRAGs. This also changes the "count" fields to uint32_t throughout: the spec defines them as int32_t, requires them to be strictly monotonically increasing and omits any mention of a valid range or at what value the counter should start. Thus, everything in [-2^31,2^31-1] is allowed, switching to an uint32_t merely shifts the range. It also appears that all implementations start at 0 or 1. The "strictly monotonically" part was impossible to do without disconnecting anyway. Signed-off-by: Erik Boasson --- src/core/ddsi/include/dds/ddsi/q_entity.h | 7 ++--- src/core/ddsi/include/dds/ddsi/q_protocol.h | 4 +-- src/core/ddsi/src/q_entity.c | 25 ++++++++--------- src/core/ddsi/src/q_receive.c | 30 ++++++++++++--------- src/core/ddsi/src/q_transmit.c | 4 +-- src/core/ddsi/src/q_xevent.c | 15 ++++++----- 6 files changed, 46 insertions(+), 39 deletions(-) diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index ce1c7c3..7b6ac19 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -131,9 +131,10 @@ struct wr_prd_match { seqno_t last_seq; /* highest seq send to this reader used when filter is applied */ int32_t num_reliable_readers_where_seq_equals_max; ddsi_guid_t arbitrary_unacked_reader; - nn_count_t next_acknack; /* next acceptable acknack sequence number */ - nn_count_t next_nackfrag; /* next acceptable nackfrag sequence number */ + nn_count_t prev_acknack; /* latest accepted acknack sequence number */ + nn_count_t prev_nackfrag; /* latest accepted nackfrag sequence number */ ddsrt_etime_t t_acknack_accepted; /* (local) time an acknack was last accepted */ + ddsrt_etime_t t_nackfrag_accepted; /* (local) time a nackfrag was last accepted */ struct nn_lat_estim hb_to_ack_latency; ddsrt_wctime_t hb_to_ack_latency_tlastlog; uint32_t non_responsive_count; @@ -154,7 +155,7 @@ struct pwr_rd_match { ddsi_guid_t rd_guid; ddsrt_mtime_t tcreate; nn_count_t count; /* most recent acknack sequence number */ - nn_count_t next_heartbeat; /* next acceptable heartbeat (see also add_proxy_writer_to_reader) */ + nn_count_t prev_heartbeat; /* latest accepted heartbeat (see also add_proxy_writer_to_reader) */ ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */ ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ diff --git a/src/core/ddsi/include/dds/ddsi/q_protocol.h b/src/core/ddsi/include/dds/ddsi/q_protocol.h index 688a8a1..c26ed10 100644 --- a/src/core/ddsi/include/dds/ddsi/q_protocol.h +++ b/src/core/ddsi/include/dds/ddsi/q_protocol.h @@ -55,9 +55,7 @@ typedef struct nn_fragment_number_set_header { #define NN_FRAGMENT_NUMBER_SET_MAX_BITS (256u) #define NN_FRAGMENT_NUMBER_SET_BITS_SIZE(numbits) ((unsigned) (4 * (((numbits) + 31) / 32))) #define NN_FRAGMENT_NUMBER_SET_SIZE(numbits) (sizeof (nn_fragment_number_set_header_t) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits)) -typedef int32_t nn_count_t; -#define DDSI_COUNT_MIN (-2147483647 - 1) -#define DDSI_COUNT_MAX (2147483647) +typedef uint32_t nn_count_t; /* address field in locator maintained in network byte order, the rest in host */ typedef struct { const struct ddsi_tran_factory *tran; diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 51ef631..a47a840 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2102,14 +2102,14 @@ static void update_reader_init_acknack_count (const ddsrt_log_cfg_t *logcfg, con /* Update the initial acknack sequence number for the reader. See also reader_add_connection(). */ - DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "update_reader_init_acknack_count ("PGUIDFMT", %"PRId32"): ", PGUID (*rd_guid), count); + DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "update_reader_init_acknack_count ("PGUIDFMT", %"PRIu32"): ", PGUID (*rd_guid), count); if ((rd = entidx_lookup_reader_guid (entidx, rd_guid)) != NULL) { ddsrt_mutex_lock (&rd->e.lock); - DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRId32" -> ", rd->init_acknack_count); + DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRIu32" -> ", rd->init_acknack_count); if (count > rd->init_acknack_count) rd->init_acknack_count = count; - DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRId32"\n", count); + DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRIu32"\n", count); ddsrt_mutex_unlock (&rd->e.lock); } else @@ -2208,11 +2208,12 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd, pretend_everything_acked = 0; } ddsrt_mutex_unlock (&prd->e.lock); - m->next_acknack = DDSI_COUNT_MIN; - m->next_nackfrag = DDSI_COUNT_MIN; + m->prev_acknack = 0; + m->prev_nackfrag = 0; nn_lat_estim_init (&m->hb_to_ack_latency); m->hb_to_ack_latency_tlastlog = ddsrt_time_wallclock (); m->t_acknack_accepted.v = 0; + m->t_nackfrag_accepted.v = 0; ddsrt_mutex_lock (&wr->e.lock); if (pretend_everything_acked) @@ -2364,7 +2365,7 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, writer will always see monotonically increasing sequence numbers from one particular reader. This is then used for the pwr_rd_match initialization */ - ELOGDISC (rd, " reader "PGUIDFMT" init_acknack_count = %"PRId32"\n", + ELOGDISC (rd, " reader "PGUIDFMT" init_acknack_count = %"PRIu32"\n", PGUID (rd->e.guid), rd->init_acknack_count); *init_count = rd->init_acknack_count; @@ -2494,7 +2495,7 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader If we don't mind those extra AckNacks, we could track the count at the proxy-writer and simply treat all incoming heartbeats as undirected. */ - m->next_heartbeat = DDSI_COUNT_MIN; + m->prev_heartbeat = 0; m->hb_timestamp.v = 0; m->t_heartbeat_accepted.v = 0; m->t_last_nack.v = 0; @@ -3578,9 +3579,9 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se wr->seq = 0; wr->cs_seq = 0; ddsrt_atomic_st64 (&wr->seq_xmit, (uint64_t) 0); - wr->hbcount = 0; + wr->hbcount = 1; wr->state = WRST_OPERATIONAL; - wr->hbfragcount = 0; + wr->hbfragcount = 1; writer_hbcontrol_init (&wr->hbcontrol); wr->throttling = 0; wr->retransmitting = 0; @@ -4288,7 +4289,7 @@ static dds_return_t new_reader_guid rd->topic = ddsi_sertopic_ref (topic); rd->ddsi2direct_cb = 0; rd->ddsi2direct_cbarg = 0; - rd->init_acknack_count = 0; + rd->init_acknack_count = 1; rd->num_writers = 0; #ifdef DDSI_INCLUDE_SSM rd->favours_ssm = 0; @@ -5436,8 +5437,8 @@ int new_proxy_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid, pwr->n_reliable_readers = 0; pwr->n_readers_out_of_sync = 0; pwr->last_seq = 0; - pwr->last_fragnum = ~0u; - pwr->nackfragcount = 0; + pwr->last_fragnum = UINT32_MAX; + pwr->nackfragcount = 1; pwr->last_fragnum_reset = 0; pwr->alive = 1; pwr->alive_vclock = 0; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 7b6e53e..6099c12 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -152,7 +152,7 @@ static int valid_AckNack (const struct receiver_state *rst, AckNack_t *msg, size if (byteswap) { bswap_sequence_number_set_bitmap (&msg->readerSNState, msg->bits); - *count = ddsrt_bswap4 (*count); + *count = ddsrt_bswap4u (*count); } return 1; } @@ -221,7 +221,7 @@ static int valid_Heartbeat (Heartbeat_t *msg, size_t size, int byteswap) { bswapSN (&msg->firstSN); bswapSN (&msg->lastSN); - msg->count = ddsrt_bswap4 (msg->count); + msg->count = ddsrt_bswap4u (msg->count); } msg->readerId = nn_ntoh_entityid (msg->readerId); msg->writerId = nn_ntoh_entityid (msg->writerId); @@ -239,7 +239,7 @@ static int valid_HeartbeatFrag (HeartbeatFrag_t *msg, size_t size, int byteswap) { bswapSN (&msg->writerSN); msg->lastFragmentNum = ddsrt_bswap4u (msg->lastFragmentNum); - msg->count = ddsrt_bswap4 (msg->count); + msg->count = ddsrt_bswap4u (msg->count); } msg->readerId = nn_ntoh_entityid (msg->readerId); msg->writerId = nn_ntoh_entityid (msg->writerId); @@ -274,7 +274,7 @@ static int valid_NackFrag (NackFrag_t *msg, size_t size, int byteswap) if (byteswap) { bswap_fragment_number_set_bitmap (&msg->fragmentNumberState, msg->bits); - *count = ddsrt_bswap4 (*count); + *count = ddsrt_bswap4u (*count); } return 1; } @@ -577,7 +577,7 @@ static int acknack_is_nack (const AckNack_t *msg) return x != 0; } -static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_count, ddsrt_etime_t tnow, ddsrt_etime_t *t_last_accepted, int force_accept) +static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *prev_count, ddsrt_etime_t tnow, ddsrt_etime_t *t_last_accepted, int force_accept) { /* AckNacks and Heartbeats with a sequence number (called "count" for some reason) equal to or less than the highest one received @@ -590,17 +590,22 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_cou 8.4.15.7 says: "New HEARTBEATS should have Counts greater than all older HEARTBEATs. Then, received HEARTBEATs with Counts not greater than any previously received can be ignored." But it - isn't clear whether that is about connections or entities, and - besides there is an issue with the wrap around after 2**31-1. + isn't clear whether that is about connections or entities. + + The type is defined in the spec as signed but without limiting + them to, e.g., positive numbers. Instead of implementing them as + spec'd, we implement it as unsigned to avoid integer overflow (and + the consequence undefined behaviour). Serial number arithmetic + deals with the wrap-around after 2**31-1. This combined procedure should give the best of all worlds, and is not more expensive in the common case. */ const int64_t timeout = DDS_SECS (2); - if (new_count < *exp_count && tnow.v - t_last_accepted->v < timeout && !force_accept) + if ((int32_t) (new_count - *prev_count) <= 0 && tnow.v - t_last_accepted->v < timeout && !force_accept) return 0; - *exp_count = new_count + 1; + *prev_count = new_count; *t_last_accepted = tnow; return 1; } @@ -772,7 +777,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const rn->rexmit_requests++; } - if (!accept_ack_or_hb_w_timeout (*countp, &rn->next_acknack, tnow, &rn->t_acknack_accepted, is_preemptive_ack)) + if (!accept_ack_or_hb_w_timeout (*countp, &rn->prev_acknack, tnow, &rn->t_acknack_accepted, is_preemptive_ack)) { RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"])", PGUID (src), PGUID (dst)); goto out; @@ -1100,7 +1105,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand ASSERT_MUTEX_HELD (&pwr->e.lock); /* Not supposed to respond to repeats and old heartbeats. */ - if (!accept_ack_or_hb_w_timeout (msg->count, &wn->next_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) + if (!accept_ack_or_hb_w_timeout (msg->count, &wn->prev_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) { RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid)); return; @@ -1528,12 +1533,11 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons } /* Ignore old NackFrags (see also handle_AckNack) */ - if (*countp < rn->next_nackfrag) + if (!accept_ack_or_hb_w_timeout (*countp, &rn->prev_nackfrag, tnow, &rn->t_nackfrag_accepted, false)) { RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"]", PGUID (src), PGUID (dst)); goto out; } - rn->next_nackfrag = *countp + 1; RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst)); /* Resend the requested fragments if we still have the sample, send diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 4a52fdb..90fd213 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -431,7 +431,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta hb->firstSN = toSN (min); hb->lastSN = toSN (max); - hb->count = ++wr->hbcount; + hb->count = wr->hbcount++; nn_xmsg_submsg_setnext (msg, sm_marker); encode_datawriter_submsg(msg, sm_marker, wr); @@ -726,7 +726,7 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn hbf->writerSN = toSN (seq); hbf->lastFragmentNum = fragnum + 1; /* network format is 1 based */ - hbf->count = ++wr->hbfragcount; + hbf->count = wr->hbfragcount++; nn_xmsg_submsg_setnext (*pmsg, sm_marker); encode_datawriter_submsg(*pmsg, sm_marker, wr); diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 443a9fc..c1f7313 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -943,11 +943,14 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits)); nn_xmsg_submsg_setnext (msg, sm_marker); - ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRId32":%"PRId64"/%"PRIu32":", - PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, - base, an->readerSNState.numbits); - for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) - ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); + if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) + { + ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRIu32":%"PRId64"/%"PRIu32":", + PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, + base, an->readerSNState.numbits); + for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) + ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); + } /* Encode the sub-message when needed. */ encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); @@ -977,7 +980,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p *countp = ++pwr->nackfragcount; nn_xmsg_submsg_setnext (msg, sm_marker); - ETRACE (pwr, " + nackfrag #%"PRId32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); + ETRACE (pwr, " + nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); }