diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index ce1c7c3..7b6ac19 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -131,9 +131,10 @@ struct wr_prd_match { seqno_t last_seq; /* highest seq send to this reader used when filter is applied */ int32_t num_reliable_readers_where_seq_equals_max; ddsi_guid_t arbitrary_unacked_reader; - nn_count_t next_acknack; /* next acceptable acknack sequence number */ - nn_count_t next_nackfrag; /* next acceptable nackfrag sequence number */ + nn_count_t prev_acknack; /* latest accepted acknack sequence number */ + nn_count_t prev_nackfrag; /* latest accepted nackfrag sequence number */ ddsrt_etime_t t_acknack_accepted; /* (local) time an acknack was last accepted */ + ddsrt_etime_t t_nackfrag_accepted; /* (local) time a nackfrag was last accepted */ struct nn_lat_estim hb_to_ack_latency; ddsrt_wctime_t hb_to_ack_latency_tlastlog; uint32_t non_responsive_count; @@ -154,7 +155,7 @@ struct pwr_rd_match { ddsi_guid_t rd_guid; ddsrt_mtime_t tcreate; nn_count_t count; /* most recent acknack sequence number */ - nn_count_t next_heartbeat; /* next acceptable heartbeat (see also add_proxy_writer_to_reader) */ + nn_count_t prev_heartbeat; /* latest accepted heartbeat (see also add_proxy_writer_to_reader) */ ddsrt_wctime_t hb_timestamp; /* time of most recent heartbeat that rescheduled the ack event */ ddsrt_etime_t t_heartbeat_accepted; /* (local) time a heartbeat was last accepted */ ddsrt_mtime_t t_last_nack; /* (local) time we last sent a NACK */ /* FIXME: probably elapsed time is better */ diff --git a/src/core/ddsi/include/dds/ddsi/q_protocol.h b/src/core/ddsi/include/dds/ddsi/q_protocol.h index 688a8a1..c26ed10 100644 --- a/src/core/ddsi/include/dds/ddsi/q_protocol.h +++ b/src/core/ddsi/include/dds/ddsi/q_protocol.h @@ -55,9 +55,7 @@ typedef struct nn_fragment_number_set_header { #define NN_FRAGMENT_NUMBER_SET_MAX_BITS (256u) #define NN_FRAGMENT_NUMBER_SET_BITS_SIZE(numbits) ((unsigned) (4 * (((numbits) + 31) / 32))) #define NN_FRAGMENT_NUMBER_SET_SIZE(numbits) (sizeof (nn_fragment_number_set_header_t) + NN_FRAGMENT_NUMBER_SET_BITS_SIZE (numbits)) -typedef int32_t nn_count_t; -#define DDSI_COUNT_MIN (-2147483647 - 1) -#define DDSI_COUNT_MAX (2147483647) +typedef uint32_t nn_count_t; /* address field in locator maintained in network byte order, the rest in host */ typedef struct { const struct ddsi_tran_factory *tran; diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 51ef631..a47a840 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2102,14 +2102,14 @@ static void update_reader_init_acknack_count (const ddsrt_log_cfg_t *logcfg, con /* Update the initial acknack sequence number for the reader. See also reader_add_connection(). */ - DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "update_reader_init_acknack_count ("PGUIDFMT", %"PRId32"): ", PGUID (*rd_guid), count); + DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "update_reader_init_acknack_count ("PGUIDFMT", %"PRIu32"): ", PGUID (*rd_guid), count); if ((rd = entidx_lookup_reader_guid (entidx, rd_guid)) != NULL) { ddsrt_mutex_lock (&rd->e.lock); - DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRId32" -> ", rd->init_acknack_count); + DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRIu32" -> ", rd->init_acknack_count); if (count > rd->init_acknack_count) rd->init_acknack_count = count; - DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRId32"\n", count); + DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRIu32"\n", count); ddsrt_mutex_unlock (&rd->e.lock); } else @@ -2208,11 +2208,12 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd, pretend_everything_acked = 0; } ddsrt_mutex_unlock (&prd->e.lock); - m->next_acknack = DDSI_COUNT_MIN; - m->next_nackfrag = DDSI_COUNT_MIN; + m->prev_acknack = 0; + m->prev_nackfrag = 0; nn_lat_estim_init (&m->hb_to_ack_latency); m->hb_to_ack_latency_tlastlog = ddsrt_time_wallclock (); m->t_acknack_accepted.v = 0; + m->t_nackfrag_accepted.v = 0; ddsrt_mutex_lock (&wr->e.lock); if (pretend_everything_acked) @@ -2364,7 +2365,7 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, writer will always see monotonically increasing sequence numbers from one particular reader. This is then used for the pwr_rd_match initialization */ - ELOGDISC (rd, " reader "PGUIDFMT" init_acknack_count = %"PRId32"\n", + ELOGDISC (rd, " reader "PGUIDFMT" init_acknack_count = %"PRIu32"\n", PGUID (rd->e.guid), rd->init_acknack_count); *init_count = rd->init_acknack_count; @@ -2494,7 +2495,7 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader If we don't mind those extra AckNacks, we could track the count at the proxy-writer and simply treat all incoming heartbeats as undirected. */ - m->next_heartbeat = DDSI_COUNT_MIN; + m->prev_heartbeat = 0; m->hb_timestamp.v = 0; m->t_heartbeat_accepted.v = 0; m->t_last_nack.v = 0; @@ -3578,9 +3579,9 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se wr->seq = 0; wr->cs_seq = 0; ddsrt_atomic_st64 (&wr->seq_xmit, (uint64_t) 0); - wr->hbcount = 0; + wr->hbcount = 1; wr->state = WRST_OPERATIONAL; - wr->hbfragcount = 0; + wr->hbfragcount = 1; writer_hbcontrol_init (&wr->hbcontrol); wr->throttling = 0; wr->retransmitting = 0; @@ -4288,7 +4289,7 @@ static dds_return_t new_reader_guid rd->topic = ddsi_sertopic_ref (topic); rd->ddsi2direct_cb = 0; rd->ddsi2direct_cbarg = 0; - rd->init_acknack_count = 0; + rd->init_acknack_count = 1; rd->num_writers = 0; #ifdef DDSI_INCLUDE_SSM rd->favours_ssm = 0; @@ -5436,8 +5437,8 @@ int new_proxy_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *ppguid, pwr->n_reliable_readers = 0; pwr->n_readers_out_of_sync = 0; pwr->last_seq = 0; - pwr->last_fragnum = ~0u; - pwr->nackfragcount = 0; + pwr->last_fragnum = UINT32_MAX; + pwr->nackfragcount = 1; pwr->last_fragnum_reset = 0; pwr->alive = 1; pwr->alive_vclock = 0; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 7b6e53e..6099c12 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -152,7 +152,7 @@ static int valid_AckNack (const struct receiver_state *rst, AckNack_t *msg, size if (byteswap) { bswap_sequence_number_set_bitmap (&msg->readerSNState, msg->bits); - *count = ddsrt_bswap4 (*count); + *count = ddsrt_bswap4u (*count); } return 1; } @@ -221,7 +221,7 @@ static int valid_Heartbeat (Heartbeat_t *msg, size_t size, int byteswap) { bswapSN (&msg->firstSN); bswapSN (&msg->lastSN); - msg->count = ddsrt_bswap4 (msg->count); + msg->count = ddsrt_bswap4u (msg->count); } msg->readerId = nn_ntoh_entityid (msg->readerId); msg->writerId = nn_ntoh_entityid (msg->writerId); @@ -239,7 +239,7 @@ static int valid_HeartbeatFrag (HeartbeatFrag_t *msg, size_t size, int byteswap) { bswapSN (&msg->writerSN); msg->lastFragmentNum = ddsrt_bswap4u (msg->lastFragmentNum); - msg->count = ddsrt_bswap4 (msg->count); + msg->count = ddsrt_bswap4u (msg->count); } msg->readerId = nn_ntoh_entityid (msg->readerId); msg->writerId = nn_ntoh_entityid (msg->writerId); @@ -274,7 +274,7 @@ static int valid_NackFrag (NackFrag_t *msg, size_t size, int byteswap) if (byteswap) { bswap_fragment_number_set_bitmap (&msg->fragmentNumberState, msg->bits); - *count = ddsrt_bswap4 (*count); + *count = ddsrt_bswap4u (*count); } return 1; } @@ -577,7 +577,7 @@ static int acknack_is_nack (const AckNack_t *msg) return x != 0; } -static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_count, ddsrt_etime_t tnow, ddsrt_etime_t *t_last_accepted, int force_accept) +static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *prev_count, ddsrt_etime_t tnow, ddsrt_etime_t *t_last_accepted, int force_accept) { /* AckNacks and Heartbeats with a sequence number (called "count" for some reason) equal to or less than the highest one received @@ -590,17 +590,22 @@ static int accept_ack_or_hb_w_timeout (nn_count_t new_count, nn_count_t *exp_cou 8.4.15.7 says: "New HEARTBEATS should have Counts greater than all older HEARTBEATs. Then, received HEARTBEATs with Counts not greater than any previously received can be ignored." But it - isn't clear whether that is about connections or entities, and - besides there is an issue with the wrap around after 2**31-1. + isn't clear whether that is about connections or entities. + + The type is defined in the spec as signed but without limiting + them to, e.g., positive numbers. Instead of implementing them as + spec'd, we implement it as unsigned to avoid integer overflow (and + the consequence undefined behaviour). Serial number arithmetic + deals with the wrap-around after 2**31-1. This combined procedure should give the best of all worlds, and is not more expensive in the common case. */ const int64_t timeout = DDS_SECS (2); - if (new_count < *exp_count && tnow.v - t_last_accepted->v < timeout && !force_accept) + if ((int32_t) (new_count - *prev_count) <= 0 && tnow.v - t_last_accepted->v < timeout && !force_accept) return 0; - *exp_count = new_count + 1; + *prev_count = new_count; *t_last_accepted = tnow; return 1; } @@ -772,7 +777,7 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const rn->rexmit_requests++; } - if (!accept_ack_or_hb_w_timeout (*countp, &rn->next_acknack, tnow, &rn->t_acknack_accepted, is_preemptive_ack)) + if (!accept_ack_or_hb_w_timeout (*countp, &rn->prev_acknack, tnow, &rn->t_acknack_accepted, is_preemptive_ack)) { RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"])", PGUID (src), PGUID (dst)); goto out; @@ -1100,7 +1105,7 @@ static void handle_Heartbeat_helper (struct pwr_rd_match * const wn, struct hand ASSERT_MUTEX_HELD (&pwr->e.lock); /* Not supposed to respond to repeats and old heartbeats. */ - if (!accept_ack_or_hb_w_timeout (msg->count, &wn->next_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) + if (!accept_ack_or_hb_w_timeout (msg->count, &wn->prev_heartbeat, arg->tnow, &wn->t_heartbeat_accepted, 0)) { RSTTRACE (" ("PGUIDFMT")", PGUID (wn->rd_guid)); return; @@ -1528,12 +1533,11 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons } /* Ignore old NackFrags (see also handle_AckNack) */ - if (*countp < rn->next_nackfrag) + if (!accept_ack_or_hb_w_timeout (*countp, &rn->prev_nackfrag, tnow, &rn->t_nackfrag_accepted, false)) { RSTTRACE (" ["PGUIDFMT" -> "PGUIDFMT"]", PGUID (src), PGUID (dst)); goto out; } - rn->next_nackfrag = *countp + 1; RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"", PGUID (src), PGUID (dst)); /* Resend the requested fragments if we still have the sample, send diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 4a52fdb..90fd213 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -431,7 +431,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta hb->firstSN = toSN (min); hb->lastSN = toSN (max); - hb->count = ++wr->hbcount; + hb->count = wr->hbcount++; nn_xmsg_submsg_setnext (msg, sm_marker); encode_datawriter_submsg(msg, sm_marker, wr); @@ -726,7 +726,7 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn hbf->writerSN = toSN (seq); hbf->lastFragmentNum = fragnum + 1; /* network format is 1 based */ - hbf->count = ++wr->hbfragcount; + hbf->count = wr->hbfragcount++; nn_xmsg_submsg_setnext (*pmsg, sm_marker); encode_datawriter_submsg(*pmsg, sm_marker, wr); diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 443a9fc..c1f7313 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -943,11 +943,14 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p nn_xmsg_shrink (msg, sm_marker, ACKNACK_SIZE (an->readerSNState.numbits)); nn_xmsg_submsg_setnext (msg, sm_marker); - ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRId32":%"PRId64"/%"PRIu32":", - PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, - base, an->readerSNState.numbits); - for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) - ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); + if (pwr->e.gv->logconfig.c.mask & DDS_LC_TRACE) + { + ETRACE (pwr, "acknack "PGUIDFMT" -> "PGUIDFMT": #%"PRIu32":%"PRId64"/%"PRIu32":", + PGUID (rwn->rd_guid), PGUID (pwr->e.guid), rwn->count, + base, an->readerSNState.numbits); + for (uint32_t ui = 0; ui != an->readerSNState.numbits; ui++) + ETRACE (pwr, "%c", nn_bitset_isset (numbits, an->bits, ui) ? '1' : '0'); + } /* Encode the sub-message when needed. */ encode_datareader_submsg(msg, sm_marker, pwr, &rwn->rd_guid); @@ -977,7 +980,7 @@ static void add_AckNack (struct nn_xmsg *msg, struct proxy_writer *pwr, struct p *countp = ++pwr->nackfragcount; nn_xmsg_submsg_setnext (msg, sm_marker); - ETRACE (pwr, " + nackfrag #%"PRId32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); + ETRACE (pwr, " + nackfrag #%"PRIu32":%"PRId64"/%u/%"PRIu32":", *countp, fromSN (nf->writerSN), nf->fragmentNumberState.bitmap_base, nf->fragmentNumberState.numbits); for (uint32_t ui = 0; ui != nf->fragmentNumberState.numbits; ui++) ETRACE (pwr, "%c", nn_bitset_isset (nf->fragmentNumberState.numbits, nf->bits, ui) ? '1' : '0'); }