From 14ffab27052d51176f595987f2bc9112b16f6b58 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 7 Oct 2019 16:41:48 +0200 Subject: [PATCH] Always use atomic64 for writer seq_xmit (#270) Signed-off-by: Erik Boasson --- src/core/ddsi/include/dds/ddsi/q_entity.h | 33 +++++++++-------------- src/core/ddsi/src/q_debmon.c | 2 +- src/core/ddsi/src/q_entity.c | 5 +++- src/core/ddsi/src/q_receive.c | 6 ++--- src/core/ddsi/src/q_transmit.c | 6 ++--- src/core/ddsi/src/q_xevent.c | 2 +- src/core/ddsi/src/q_xmsg.c | 2 +- 7 files changed, 25 insertions(+), 31 deletions(-) diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 09069c9..a694140 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -203,28 +203,7 @@ enum writer_state { WRST_DELETING /* writer is actually being deleted (removed from hash table) */ }; -#if DDSRT_ATOMIC64_SUPPORT typedef ddsrt_atomic_uint64_t seq_xmit_t; -#define INIT_SEQ_XMIT(wr, v) ddsrt_atomic_st64(&(wr)->seq_xmit, (uint64_t) (v)) -#define READ_SEQ_XMIT(wr) ((seqno_t) ddsrt_atomic_ld64(&(wr)->seq_xmit)) -#define UPDATE_SEQ_XMIT_LOCKED(wr, nv) do { uint64_t ov_; do { \ - ov_ = ddsrt_atomic_ld64(&(wr)->seq_xmit); \ - if ((uint64_t) nv <= ov_) break; \ -} while (!ddsrt_atomic_cas64(&(wr)->seq_xmit, ov_, (uint64_t) nv)); } while (0) -#define UPDATE_SEQ_XMIT_UNLOCKED(sx, nv) UPDATE_SEQ_XMIT_LOCKED(sx, nv) -#else -typedef seqno_t seq_xmit_t; -#define INIT_SEQ_XMIT(wr, v) ((wr)->seq_xmit = (v)) -#define READ_SEQ_XMIT(wr) ((wr)->seq_xmit) -#define UPDATE_SEQ_XMIT_LOCKED(wr, nv) do { \ - if ((nv) > (wr)->seq_xmit) { (wr)->seq_xmit = (nv); } \ -} while (0) -#define UPDATE_SEQ_XMIT_UNLOCKED(wr, nv) do { \ - ddsrt_mutex_lock (&(wr)->e.lock); \ - if ((nv) > (wr)->seq_xmit) { (wr)->seq_xmit = (nv); } \ - ddsrt_mutex_unlock (&(wr)->e.lock); \ -} while (0) -#endif struct writer { @@ -276,6 +255,18 @@ struct writer struct local_reader_ary rdary; /* LOCAL readers for fast-pathing; if not fast-pathed, fall back to scanning local_readers */ }; +inline seqno_t writer_read_seq_xmit (const struct writer *wr) { + return (seqno_t) ddsrt_atomic_ld64 (&wr->seq_xmit); +} + +inline void writer_update_seq_xmit (struct writer *wr, seqno_t nv) { + uint64_t ov; + do { + ov = ddsrt_atomic_ld64 (&wr->seq_xmit); + if ((uint64_t) nv <= ov) break; + } while (!ddsrt_atomic_cas64 (&wr->seq_xmit, ov, (uint64_t) nv)); +} + struct reader { struct entity_common e; diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index 8e629b6..58c8971 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -195,7 +195,7 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global whcst.min_seq, whcst.max_seq, whcst.unacked_bytes, w->throttling ? " THROTTLING" : "", w->whc_low, w->whc_high, - w->seq, READ_SEQ_XMIT(w), w->cs_seq); + w->seq, writer_read_seq_xmit (w), w->cs_seq); if (w->reliable) { x += cpf (conn, " hb %"PRIu32" ackhb %"PRId64" hb %"PRId64" wr %"PRId64" sched %"PRId64" #rel %"PRId32"\n", diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 24add4d..ed13d6c 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -110,6 +110,9 @@ extern inline bool builtintopic_is_builtintopic (const struct ddsi_builtin_topic extern inline struct ddsi_tkmap_instance *builtintopic_get_tkmap_entry (const struct ddsi_builtin_topic_interface *btif, const struct ddsi_guid *guid); extern inline void builtintopic_write (const struct ddsi_builtin_topic_interface *btif, const struct entity_common *e, nn_wctime_t timestamp, bool alive); +extern inline seqno_t writer_read_seq_xmit (const struct writer *wr); +extern inline void writer_update_seq_xmit (struct writer *wr, seqno_t nv); + static int compare_guid (const void *va, const void *vb) { return memcmp (va, vb, sizeof (ddsi_guid_t)); @@ -2726,7 +2729,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se ddsrt_cond_init (&wr->throttle_cond); wr->seq = 0; wr->cs_seq = 0; - INIT_SEQ_XMIT(wr, 0); + ddsrt_atomic_st64 (&wr->seq_xmit, (uint64_t) 0); wr->hbcount = 0; wr->state = WRST_OPERATIONAL; wr->hbfragcount = 0; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 65ac23f..ada45a0 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -555,7 +555,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state * static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq) { seqno_t next_seq = whc_next_seq (wr->whc, seq - 1); - seqno_t seq_xmit = READ_SEQ_XMIT(wr); + seqno_t seq_xmit = writer_read_seq_xmit (wr); if (next_seq == MAX_SEQ_NUMBER) /* no next sample */ return seq_xmit + 1; else if (next_seq > seq_xmit) /* next is beyond last actually transmitted */ @@ -836,7 +836,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac that issue; if it has, then the timing is terribly unlucky, but a future request'll fix it. */ enqueued = 1; - seq_xmit = READ_SEQ_XMIT(wr); + seq_xmit = writer_read_seq_xmit (wr); const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq; const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0; for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++) @@ -1460,7 +1460,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N qxev_msg (wr->evq, m); } } - if (seq < READ_SEQ_XMIT(wr)) + if (seq < writer_read_seq_xmit (wr)) { /* Not everything was retransmitted yet, so force a heartbeat out to give the reader a chance to nack the rest and make sure diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 1f2a150..0a99fc9 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -307,7 +307,7 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_ (hbc->tsched.v == T_NEVER) ? INFINITY : (double) (hbc->tsched.v - tnow.v) / 1e9, ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->min_seq, ddsrt_avl_is_empty (&wr->readers) || root_rdmatch (wr)->all_have_replied_to_hb ? "" : "!", - whcst->max_seq, READ_SEQ_XMIT(wr)); + whcst->max_seq, writer_read_seq_xmit (wr)); } return msg; @@ -354,7 +354,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta seqno_t seq_xmit; min = whcst->min_seq; max = wr->seq; - seq_xmit = READ_SEQ_XMIT(wr); + seq_xmit = writer_read_seq_xmit (wr); assert (min <= max); /* Informing readers of samples that haven't even been transmitted makes little sense, but for transient-local data, we let the first heartbeat determine the time at which @@ -1125,7 +1125,7 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack * (Note that no network destination is very nearly the same as no matching proxy readers. The exception is the SPDP writer.) */ - UPDATE_SEQ_XMIT_LOCKED (wr, seq); + writer_update_seq_xmit (wr, seq); ddsrt_mutex_unlock (&wr->e.lock); if (plist != NULL) { diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 7f0195b..9deee7a 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -622,7 +622,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt (t_next.v == T_NEVER) ? INFINITY : (double)(t_next.v - tnow.v) / 1e9, ddsrt_avl_is_empty (&wr->readers) ? (seqno_t) -1 : ((struct wr_prd_match *) ddsrt_avl_root_non_empty (&wr_readers_treedef, &wr->readers))->min_seq, ddsrt_avl_is_empty (&wr->readers) || ((struct wr_prd_match *) ddsrt_avl_root_non_empty (&wr_readers_treedef, &wr->readers))->all_have_replied_to_hb ? "" : "!", - whcst.max_seq, READ_SEQ_XMIT(wr)); + whcst.max_seq, writer_read_seq_xmit (wr)); resched_xevent_if_earlier (ev, t_next); wr->hbcontrol.tsched = t_next; ddsrt_mutex_unlock (&wr->e.lock); diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index 61be691..28c6200 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -861,7 +861,7 @@ static void nn_xmsg_chain_release (struct q_globals *gv, struct nn_xmsg_chain *c assert (m->kindspecific.data.wrseq != 0); wrguid = m->kindspecific.data.wrguid; if ((wr = ephash_lookup_writer_guid (gv->guid_hash, &m->kindspecific.data.wrguid)) != NULL) - UPDATE_SEQ_XMIT_UNLOCKED(wr, m->kindspecific.data.wrseq); + writer_update_seq_xmit (wr, m->kindspecific.data.wrseq); } }