Always use atomic64 for writer seq_xmit (#270)

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-10-07 16:41:48 +02:00 committed by eboasson
parent 8ce389dd85
commit 14ffab2705
7 changed files with 25 additions and 31 deletions

View file

@ -203,28 +203,7 @@ enum writer_state {
WRST_DELETING /* writer is actually being deleted (removed from hash table) */
};
#if DDSRT_ATOMIC64_SUPPORT
typedef ddsrt_atomic_uint64_t seq_xmit_t;
#define INIT_SEQ_XMIT(wr, v) ddsrt_atomic_st64(&(wr)->seq_xmit, (uint64_t) (v))
#define READ_SEQ_XMIT(wr) ((seqno_t) ddsrt_atomic_ld64(&(wr)->seq_xmit))
#define UPDATE_SEQ_XMIT_LOCKED(wr, nv) do { uint64_t ov_; do { \
ov_ = ddsrt_atomic_ld64(&(wr)->seq_xmit); \
if ((uint64_t) nv <= ov_) break; \
} while (!ddsrt_atomic_cas64(&(wr)->seq_xmit, ov_, (uint64_t) nv)); } while (0)
#define UPDATE_SEQ_XMIT_UNLOCKED(sx, nv) UPDATE_SEQ_XMIT_LOCKED(sx, nv)
#else
typedef seqno_t seq_xmit_t;
#define INIT_SEQ_XMIT(wr, v) ((wr)->seq_xmit = (v))
#define READ_SEQ_XMIT(wr) ((wr)->seq_xmit)
#define UPDATE_SEQ_XMIT_LOCKED(wr, nv) do { \
if ((nv) > (wr)->seq_xmit) { (wr)->seq_xmit = (nv); } \
} while (0)
#define UPDATE_SEQ_XMIT_UNLOCKED(wr, nv) do { \
ddsrt_mutex_lock (&(wr)->e.lock); \
if ((nv) > (wr)->seq_xmit) { (wr)->seq_xmit = (nv); } \
ddsrt_mutex_unlock (&(wr)->e.lock); \
} while (0)
#endif
struct writer
{
@ -276,6 +255,18 @@ struct writer
struct local_reader_ary rdary; /* LOCAL readers for fast-pathing; if not fast-pathed, fall back to scanning local_readers */
};
inline seqno_t writer_read_seq_xmit (const struct writer *wr) {
return (seqno_t) ddsrt_atomic_ld64 (&wr->seq_xmit);
}
inline void writer_update_seq_xmit (struct writer *wr, seqno_t nv) {
uint64_t ov;
do {
ov = ddsrt_atomic_ld64 (&wr->seq_xmit);
if ((uint64_t) nv <= ov) break;
} while (!ddsrt_atomic_cas64 (&wr->seq_xmit, ov, (uint64_t) nv));
}
struct reader
{
struct entity_common e;

View file

@ -195,7 +195,7 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global
whcst.min_seq, whcst.max_seq, whcst.unacked_bytes,
w->throttling ? " THROTTLING" : "",
w->whc_low, w->whc_high,
w->seq, READ_SEQ_XMIT(w), w->cs_seq);
w->seq, writer_read_seq_xmit (w), w->cs_seq);
if (w->reliable)
{
x += cpf (conn, " hb %"PRIu32" ackhb %"PRId64" hb %"PRId64" wr %"PRId64" sched %"PRId64" #rel %"PRId32"\n",

View file

@ -110,6 +110,9 @@ extern inline bool builtintopic_is_builtintopic (const struct ddsi_builtin_topic
extern inline struct ddsi_tkmap_instance *builtintopic_get_tkmap_entry (const struct ddsi_builtin_topic_interface *btif, const struct ddsi_guid *guid);
extern inline void builtintopic_write (const struct ddsi_builtin_topic_interface *btif, const struct entity_common *e, nn_wctime_t timestamp, bool alive);
extern inline seqno_t writer_read_seq_xmit (const struct writer *wr);
extern inline void writer_update_seq_xmit (struct writer *wr, seqno_t nv);
static int compare_guid (const void *va, const void *vb)
{
return memcmp (va, vb, sizeof (ddsi_guid_t));
@ -2726,7 +2729,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
ddsrt_cond_init (&wr->throttle_cond);
wr->seq = 0;
wr->cs_seq = 0;
INIT_SEQ_XMIT(wr, 0);
ddsrt_atomic_st64 (&wr->seq_xmit, (uint64_t) 0);
wr->hbcount = 0;
wr->state = WRST_OPERATIONAL;
wr->hbfragcount = 0;

View file

@ -555,7 +555,7 @@ static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *
static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq)
{
seqno_t next_seq = whc_next_seq (wr->whc, seq - 1);
seqno_t seq_xmit = READ_SEQ_XMIT(wr);
seqno_t seq_xmit = writer_read_seq_xmit (wr);
if (next_seq == MAX_SEQ_NUMBER) /* no next sample */
return seq_xmit + 1;
else if (next_seq > seq_xmit) /* next is beyond last actually transmitted */
@ -836,7 +836,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
that issue; if it has, then the timing is terribly unlucky, but
a future request'll fix it. */
enqueued = 1;
seq_xmit = READ_SEQ_XMIT(wr);
seq_xmit = writer_read_seq_xmit (wr);
const bool gap_for_already_acked = vendor_is_eclipse (rst->vendor) && prd->c.xqos->durability.kind == DDS_DURABILITY_VOLATILE && seqbase <= rn->seq;
const seqno_t min_seq_to_rexmit = gap_for_already_acked ? rn->seq + 1 : 0;
for (uint32_t i = 0; i < numbits && seqbase + i <= seq_xmit && enqueued; i++)
@ -1460,7 +1460,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
qxev_msg (wr->evq, m);
}
}
if (seq < READ_SEQ_XMIT(wr))
if (seq < writer_read_seq_xmit (wr))
{
/* Not everything was retransmitted yet, so force a heartbeat out
to give the reader a chance to nack the rest and make sure

View file

@ -307,7 +307,7 @@ struct nn_xmsg *writer_hbcontrol_piggyback (struct writer *wr, const struct whc_
(hbc->tsched.v == T_NEVER) ? INFINITY : (double) (hbc->tsched.v - tnow.v) / 1e9,
ddsrt_avl_is_empty (&wr->readers) ? -1 : root_rdmatch (wr)->min_seq,
ddsrt_avl_is_empty (&wr->readers) || root_rdmatch (wr)->all_have_replied_to_hb ? "" : "!",
whcst->max_seq, READ_SEQ_XMIT(wr));
whcst->max_seq, writer_read_seq_xmit (wr));
}
return msg;
@ -354,7 +354,7 @@ void add_Heartbeat (struct nn_xmsg *msg, struct writer *wr, const struct whc_sta
seqno_t seq_xmit;
min = whcst->min_seq;
max = wr->seq;
seq_xmit = READ_SEQ_XMIT(wr);
seq_xmit = writer_read_seq_xmit (wr);
assert (min <= max);
/* Informing readers of samples that haven't even been transmitted makes little sense,
but for transient-local data, we let the first heartbeat determine the time at which
@ -1125,7 +1125,7 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *
(Note that no network destination is very nearly the same as no
matching proxy readers. The exception is the SPDP writer.) */
UPDATE_SEQ_XMIT_LOCKED (wr, seq);
writer_update_seq_xmit (wr, seq);
ddsrt_mutex_unlock (&wr->e.lock);
if (plist != NULL)
{

View file

@ -622,7 +622,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
(t_next.v == T_NEVER) ? INFINITY : (double)(t_next.v - tnow.v) / 1e9,
ddsrt_avl_is_empty (&wr->readers) ? (seqno_t) -1 : ((struct wr_prd_match *) ddsrt_avl_root_non_empty (&wr_readers_treedef, &wr->readers))->min_seq,
ddsrt_avl_is_empty (&wr->readers) || ((struct wr_prd_match *) ddsrt_avl_root_non_empty (&wr_readers_treedef, &wr->readers))->all_have_replied_to_hb ? "" : "!",
whcst.max_seq, READ_SEQ_XMIT(wr));
whcst.max_seq, writer_read_seq_xmit (wr));
resched_xevent_if_earlier (ev, t_next);
wr->hbcontrol.tsched = t_next;
ddsrt_mutex_unlock (&wr->e.lock);

View file

@ -861,7 +861,7 @@ static void nn_xmsg_chain_release (struct q_globals *gv, struct nn_xmsg_chain *c
assert (m->kindspecific.data.wrseq != 0);
wrguid = m->kindspecific.data.wrguid;
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, &m->kindspecific.data.wrguid)) != NULL)
UPDATE_SEQ_XMIT_UNLOCKED(wr, m->kindspecific.data.wrseq);
writer_update_seq_xmit (wr, m->kindspecific.data.wrseq);
}
}