Combine heartbeats in response to ACKNACK/NACKFRAG

This adds tracking of whether a heartbeat should be generated until
processing of the message is complete or an ACKNACK or NACKFRAG from
another reader requires a response.  This way, an ACKNACK + NACKFRAG
pair does not trigger multiple heartbeat messages.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-06-22 17:42:41 +02:00 committed by eboasson
parent b5c0eab2fb
commit 0a4c863f11
7 changed files with 126 additions and 130 deletions

View file

@ -58,6 +58,7 @@ int addrset_empty_mc (const struct addrset *as);
int addrset_empty (const struct addrset *as);
int addrset_any_uc (const struct addrset *as, nn_locator_t *dst);
int addrset_any_mc (const struct addrset *as, nn_locator_t *dst);
void addrset_any_uc_else_mc_nofail (const struct addrset *as, nn_locator_t *dst);
/* Keeps AS locked */
int addrset_forone (struct addrset *as, addrset_forone_fun_t f, void *arg);

View file

@ -66,8 +66,8 @@ bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp);
/* For sending to a particular proxy reader; this is a convenience
routine that extracts a suitable address from the proxy reader's
address sets and calls setdst1. */
dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd);
dds_return_t nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr);
void nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd);
void nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr);
/* For sending to all in the address set AS -- typically, the writer's
address set to multicast to all matched readers */

View file

@ -444,6 +444,23 @@ int addrset_any_mc (const struct addrset *as, nn_locator_t *dst)
}
}
void addrset_any_uc_else_mc_nofail (const struct addrset *as, nn_locator_t *dst)
{
LOCK (as);
if (!ddsrt_avl_cis_empty (&as->ucaddrs))
{
const struct addrset_node *n = ddsrt_avl_croot_non_empty (&addrset_treedef, &as->ucaddrs);
*dst = n->loc;
}
else
{
assert (!ddsrt_avl_cis_empty (&as->mcaddrs));
const struct addrset_node *n = ddsrt_avl_croot_non_empty (&addrset_treedef, &as->mcaddrs);
*dst = n->loc;
}
UNLOCK (as);
}
struct addrset_forall_helper_arg
{
addrset_forall_fun_t f;

View file

@ -525,28 +525,6 @@ int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, s
return 0;
}
static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq)
{
struct nn_xmsg *m;
ASSERT_MUTEX_HELD (&wr->e.lock);
assert (wr->reliable);
m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL);
if (nn_xmsg_setdstPRD (m, prd) < 0)
{
/* If we don't have an address, give up immediately */
nn_xmsg_free (m);
return;
}
/* Send a Heartbeat just to this peer */
add_Heartbeat (m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0);
ETRACE (wr, "force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n",
PGUID (wr->e.guid), PGUID (prd->e.guid));
qxev_msg (wr->evq, m);
}
static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq)
{
seqno_t next_seq = whc_next_seq (wr->whc, seq - 1);
@ -653,13 +631,7 @@ struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader *
nn_xmsg_setencoderid (m, wr->partition_id);
#endif
if (nn_xmsg_setdstPRD (m, prd) < 0)
{
nn_xmsg_free (m);
m = NULL;
}
else
{
nn_xmsg_setdstPRD (m, prd);
add_Gap (m, wr, prd, gi->gapstart, gi->gapend, gi->gapnumbits, gi->gapbits);
if (nn_xmsg_size(m) == 0)
{
@ -673,12 +645,62 @@ struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader *
for (i = 0; i < gi->gapnumbits; i++)
ETRACE (wr, "%c", nn_bitset_isset (gi->gapnumbits, gi->gapbits, i) ? '1' : '0');
}
}
return m;
}
static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const AckNack_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid)
struct defer_hb_state {
struct nn_xmsg *m;
struct xeventq *evq;
uint64_t wr_iid;
uint64_t prd_iid;
};
static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq, struct defer_hb_state *defer_hb_state)
{
ETRACE (wr, "defer_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n", PGUID (wr->e.guid), PGUID (prd->e.guid));
if (defer_hb_state->m != NULL)
{
if (wr->e.iid == defer_hb_state->wr_iid && prd->e.iid == defer_hb_state->prd_iid)
return;
qxev_msg (wr->evq, defer_hb_state->m);
}
ASSERT_MUTEX_HELD (&wr->e.lock);
assert (wr->reliable);
defer_hb_state->m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL);
nn_xmsg_setdstPRD (defer_hb_state->m, prd);
add_Heartbeat (defer_hb_state->m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0);
defer_hb_state->evq = wr->evq;
defer_hb_state->wr_iid = wr->e.iid;
defer_hb_state->prd_iid = prd->e.iid;
}
static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq, struct defer_hb_state *defer_hb_state)
{
defer_heartbeat_to_peer (wr, whcst, prd, hbansreq, defer_hb_state);
qxev_msg (wr->evq, defer_hb_state->m);
defer_hb_state->m = NULL;
}
static void defer_hb_state_init (struct defer_hb_state *defer_hb_state)
{
defer_hb_state->m = NULL;
}
static void defer_hb_state_fini (struct ddsi_domaingv * const gv, struct defer_hb_state *defer_hb_state)
{
if (defer_hb_state->m)
{
GVTRACE ("send_deferred_heartbeat: %"PRIx64" -> %"PRIx64" - queue for transmit\n", defer_hb_state->wr_iid, defer_hb_state->prd_iid);
qxev_msg (defer_hb_state->evq, defer_hb_state->m);
defer_hb_state->m = NULL;
}
}
static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const AckNack_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid, struct defer_hb_state *defer_hb_state)
{
struct proxy_reader *prd;
struct wr_prd_match *rn;
@ -862,13 +884,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
if (WHCST_ISEMPTY(&whcst))
{
RSTTRACE (" whc-empty ");
force_heartbeat_to_peer (wr, &whcst, prd, 0);
force_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state);
hb_sent_in_response = 1;
}
else
{
RSTTRACE (" rebase ");
force_heartbeat_to_peer (wr, &whcst, prd, 0);
force_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state);
hb_sent_in_response = 1;
numbits = rst->gv->config.accelerate_rexmit_block_size;
seqbase = whcst.min_seq;
@ -1024,7 +1046,8 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
if (msgs_sent && max_seq_in_reply < max_seq_available)
{
RSTTRACE (" rexmit#%"PRIu32" maxseq:%"PRId64"<%"PRId64"<=%"PRId64"", msgs_sent, max_seq_in_reply, seq_xmit, wr->seq);
force_heartbeat_to_peer (wr, &whcst, prd, 1);
defer_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state);
hb_sent_in_response = 1;
/* The primary purpose of hbcontrol_note_asyncwrite is to ensure
@ -1037,7 +1060,9 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const
/* If "final" flag not set, we must respond with a heartbeat. Do it
now if we haven't done so already */
if (!(msg->smhdr.flags & ACKNACK_FLAG_FINAL) && !hb_sent_in_response)
force_heartbeat_to_peer (wr, &whcst, prd, 0);
{
defer_heartbeat_to_peer (wr, &whcst, prd, 0, defer_hb_state);
}
RSTTRACE (")");
out:
ddsrt_mutex_unlock (&wr->e.lock);
@ -1475,7 +1500,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et
return 1;
}
static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, const NackFrag_t *msg, SubmessageKind_t prev_smid)
static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, const NackFrag_t *msg, SubmessageKind_t prev_smid, struct defer_hb_state *defer_hb_state)
{
struct proxy_reader *prd;
struct wr_prd_match *rn;
@ -1579,16 +1604,11 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (m, wr->partition_id);
#endif
if (nn_xmsg_setdstPRD (m, prd) < 0)
nn_xmsg_free (m);
else
{
/* length-1 bitmap with the bit clear avoids the illegal case of a
length-0 bitmap */
add_Gap (m, wr, prd, seq, seq+1, 1, &zero);
nn_xmsg_setdstPRD (m, prd);
/* length-1 bitmap with the bit clear avoids the illegal case of a length-0 bitmap */
add_Gap (m, wr, prd, seq, seq+1, 0, &zero);
qxev_msg (wr->evq, m);
}
}
if (seq <= writer_read_seq_xmit (wr))
{
/* Not everything was retransmitted yet, so force a heartbeat out
@ -1596,7 +1616,7 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons
hearbeats will go out at a reasonably high rate for a while */
struct whc_state whcst;
whc_get_state(wr->whc, &whcst);
force_heartbeat_to_peer (wr, &whcst, prd, 1);
defer_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state);
writer_hbcontrol_note_asyncwrite (wr, ddsrt_time_monotonic ());
}
@ -2811,6 +2831,7 @@ static int handle_submsg_sequence
unsigned char * end = msg + len;
struct nn_dqueue *deferred_wakeup = NULL;
SubmessageKind_t prev_smid = SMID_PAD;
struct defer_hb_state defer_hb_state;
/* Receiver state is dynamically allocated with lifetime bound to
the message. Updates cause a new copy to be created if the
@ -2840,6 +2861,7 @@ static int handle_submsg_sequence
rst_live = 0;
ts_for_latmeas = 0;
timestamp = DDSRT_WCTIME_INVALID;
defer_hb_state_init (&defer_hb_state);
assert (thread_is_asleep ());
thread_state_awake_fixed_domain (ts1);
@ -2894,7 +2916,7 @@ static int handle_submsg_sequence
state = "parse:acknack";
if (!valid_AckNack (rst, &sm->acknack, submsg_size, byteswap))
goto malformed;
handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : DDSRT_WCTIME_INVALID, prev_smid);
handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : DDSRT_WCTIME_INVALID, prev_smid, &defer_hb_state);
ts_for_latmeas = 0;
break;
case SMID_HEARTBEAT:
@ -2957,7 +2979,7 @@ static int handle_submsg_sequence
state = "parse:nackfrag";
if (!valid_NackFrag (&sm->nackfrag, submsg_size, byteswap))
goto malformed;
handle_NackFrag (rst, tnowE, &sm->nackfrag, prev_smid);
handle_NackFrag (rst, tnowE, &sm->nackfrag, prev_smid, &defer_hb_state);
ts_for_latmeas = 0;
break;
case SMID_HEARTBEAT_FRAG:
@ -3106,6 +3128,7 @@ static int handle_submsg_sequence
}
thread_state_asleep (ts1);
assert (thread_is_asleep ());
defer_hb_state_fini (gv, &defer_hb_state);
if (deferred_wakeup)
dd_dqueue_enqueue_trigger (deferred_wakeup);
return 0;
@ -3116,6 +3139,7 @@ malformed:
malformed_asleep:
assert (thread_is_asleep ());
malformed_packet_received (rst->gv, msg, submsg, len, state, state_smkind, hdr->vendorid);
defer_hb_state_fini (gv, &defer_hb_state);
if (deferred_wakeup)
dd_dqueue_enqueue_trigger (deferred_wakeup);
return -1;

View file

@ -211,11 +211,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
}
/* set the destination explicitly to the unicast destination and the fourth
param of add_Heartbeat needs to be the guid of the reader */
if (nn_xmsg_setdstPRD (msg, prd) < 0)
{
nn_xmsg_free (msg);
return NULL;
}
nn_xmsg_setdstPRD (msg, prd);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
@ -345,11 +341,7 @@ struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state *
/* set the destination explicitly to the unicast destination and the fourth
param of add_Heartbeat needs to be the guid of the reader */
if (nn_xmsg_setdstPRD (msg, prd) < 0)
{
nn_xmsg_free (msg);
return NULL;
}
nn_xmsg_setdstPRD (msg, prd);
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
nn_xmsg_setencoderid (msg, wr->partition_id);
#endif
@ -563,12 +555,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru
if (prd)
{
if (nn_xmsg_setdstPRD (*pmsg, prd) < 0)
{
nn_xmsg_free (*pmsg);
*pmsg = NULL;
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
nn_xmsg_setdstPRD (*pmsg, prd);
/* retransmits: latency budget doesn't apply */
}
else
@ -699,19 +686,9 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn
nn_xmsg_setencoderid (*pmsg, wr->partition_id);
#endif
if (prd)
{
if (nn_xmsg_setdstPRD (*pmsg, prd) < 0)
{
/* HeartbeatFrag is only advisory anyway */
nn_xmsg_free (*pmsg);
*pmsg = NULL;
return;
}
}
nn_xmsg_setdstPRD (*pmsg, prd);
else
{
nn_xmsg_setdstN (*pmsg, wr->as, wr->as_group);
}
hbf = nn_xmsg_append (*pmsg, &sm_marker, sizeof (HeartbeatFrag_t));
nn_xmsg_submsg_init (*pmsg, sm_marker, SMID_HEARTBEAT_FRAG);
hbf->readerId = nn_hton_entityid (prd ? prd->e.guid.entityid : to_entityid (NN_ENTITYID_UNKNOWN));

View file

@ -1535,8 +1535,7 @@ void qxev_prd_entityid (struct proxy_reader *prd, const ddsi_guid_t *guid)
if (! gv->xevents->tev_conn->m_connless)
{
msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL);
if (nn_xmsg_setdstPRD (msg, prd) == 0)
{
nn_xmsg_setdstPRD (msg, prd);
GVTRACE (" qxev_prd_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix));
nn_xmsg_add_entityid (msg);
ddsrt_mutex_lock (&gv->xevents->lock);
@ -1545,11 +1544,6 @@ void qxev_prd_entityid (struct proxy_reader *prd, const ddsi_guid_t *guid)
qxev_insert_nt (ev);
ddsrt_mutex_unlock (&gv->xevents->lock);
}
else
{
nn_xmsg_free (msg);
}
}
}
void qxev_pwr_entityid (struct proxy_writer *pwr, const ddsi_guid_t *guid)
@ -1563,8 +1557,7 @@ void qxev_pwr_entityid (struct proxy_writer *pwr, const ddsi_guid_t *guid)
if (! pwr->evq->tev_conn->m_connless)
{
msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL);
if (nn_xmsg_setdstPWR (msg, pwr) == 0)
{
nn_xmsg_setdstPWR (msg, pwr);
GVTRACE (" qxev_pwr_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix));
nn_xmsg_add_entityid (msg);
ddsrt_mutex_lock (&pwr->evq->lock);
@ -1573,11 +1566,6 @@ void qxev_pwr_entityid (struct proxy_writer *pwr, const ddsi_guid_t *guid)
qxev_insert_nt (ev);
ddsrt_mutex_unlock (&pwr->evq->lock);
}
else
{
nn_xmsg_free (msg);
}
}
}
int qxev_msg_rexmit_wrlock_held (struct xeventq *evq, struct nn_xmsg *msg, int force)

View file

@ -760,31 +760,20 @@ bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp)
return false;
}
dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd)
void nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd)
{
nn_locator_t loc;
if (addrset_any_uc (prd->c.as, &loc) || addrset_any_mc (prd->c.as, &loc))
{
// only accepting endpoints that have an address
addrset_any_uc_else_mc_nofail (prd->c.as, &loc);
nn_xmsg_setdst1 (prd->e.gv, m, &prd->e.guid.prefix, &loc);
return 0;
}
else
{
DDS_CWARNING (&prd->e.gv->logconfig, "nn_xmsg_setdstPRD: no address for "PGUIDFMT"", PGUID (prd->e.guid));
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
}
dds_return_t nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr)
void nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr)
{
nn_locator_t loc;
if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc))
{
// only accepting endpoints that have an address
addrset_any_uc_else_mc_nofail (pwr->c.as, &loc);
nn_xmsg_setdst1 (pwr->e.gv, m, &pwr->e.guid.prefix, &loc);
return 0;
}
DDS_CWARNING (&pwr->e.gv->logconfig, "nn_xmsg_setdstPRD: no address for "PGUIDFMT, PGUID (pwr->e.guid));
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
void nn_xmsg_setdstN (struct nn_xmsg *m, struct addrset *as, struct addrset *as_group)