diff --git a/src/core/ddsi/include/dds/ddsi/q_addrset.h b/src/core/ddsi/include/dds/ddsi/q_addrset.h index 2ded8b9..7a09d0e 100644 --- a/src/core/ddsi/include/dds/ddsi/q_addrset.h +++ b/src/core/ddsi/include/dds/ddsi/q_addrset.h @@ -58,6 +58,7 @@ int addrset_empty_mc (const struct addrset *as); int addrset_empty (const struct addrset *as); int addrset_any_uc (const struct addrset *as, nn_locator_t *dst); int addrset_any_mc (const struct addrset *as, nn_locator_t *dst); +void addrset_any_uc_else_mc_nofail (const struct addrset *as, nn_locator_t *dst); /* Keeps AS locked */ int addrset_forone (struct addrset *as, addrset_forone_fun_t f, void *arg); diff --git a/src/core/ddsi/include/dds/ddsi/q_xmsg.h b/src/core/ddsi/include/dds/ddsi/q_xmsg.h index 0a282fe..25ae3d1 100644 --- a/src/core/ddsi/include/dds/ddsi/q_xmsg.h +++ b/src/core/ddsi/include/dds/ddsi/q_xmsg.h @@ -66,8 +66,8 @@ bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp); /* For sending to a particular proxy reader; this is a convenience routine that extracts a suitable address from the proxy reader's address sets and calls setdst1. */ -dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd); -dds_return_t nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr); +void nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd); +void nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr); /* For sending to all in the address set AS -- typically, the writer's address set to multicast to all matched readers */ diff --git a/src/core/ddsi/src/q_addrset.c b/src/core/ddsi/src/q_addrset.c index 236fb2b..c0453c0 100644 --- a/src/core/ddsi/src/q_addrset.c +++ b/src/core/ddsi/src/q_addrset.c @@ -444,6 +444,23 @@ int addrset_any_mc (const struct addrset *as, nn_locator_t *dst) } } +void addrset_any_uc_else_mc_nofail (const struct addrset *as, nn_locator_t *dst) +{ + LOCK (as); + if (!ddsrt_avl_cis_empty (&as->ucaddrs)) + { + const struct addrset_node *n = ddsrt_avl_croot_non_empty (&addrset_treedef, &as->ucaddrs); + *dst = n->loc; + } + else + { + assert (!ddsrt_avl_cis_empty (&as->mcaddrs)); + const struct addrset_node *n = ddsrt_avl_croot_non_empty (&addrset_treedef, &as->mcaddrs); + *dst = n->loc; + } + UNLOCK (as); +} + struct addrset_forall_helper_arg { addrset_forall_fun_t f; diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 911f384..b69dd1c 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -525,28 +525,6 @@ int add_Gap (struct nn_xmsg *msg, struct writer *wr, struct proxy_reader *prd, s return 0; } -static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq) -{ - struct nn_xmsg *m; - - ASSERT_MUTEX_HELD (&wr->e.lock); - assert (wr->reliable); - - m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL); - if (nn_xmsg_setdstPRD (m, prd) < 0) - { - /* If we don't have an address, give up immediately */ - nn_xmsg_free (m); - return; - } - - /* Send a Heartbeat just to this peer */ - add_Heartbeat (m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0); - ETRACE (wr, "force_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n", - PGUID (wr->e.guid), PGUID (prd->e.guid)); - qxev_msg (wr->evq, m); -} - static seqno_t grow_gap_to_next_seq (const struct writer *wr, seqno_t seq) { seqno_t next_seq = whc_next_seq (wr->whc, seq - 1); @@ -653,32 +631,76 @@ struct nn_xmsg * nn_gap_info_create_gap(struct writer *wr, struct proxy_reader * nn_xmsg_setencoderid (m, wr->partition_id); #endif - if (nn_xmsg_setdstPRD (m, prd) < 0) + nn_xmsg_setdstPRD (m, prd); + add_Gap (m, wr, prd, gi->gapstart, gi->gapend, gi->gapnumbits, gi->gapbits); + if (nn_xmsg_size(m) == 0) { nn_xmsg_free (m); m = NULL; } else { - add_Gap (m, wr, prd, gi->gapstart, gi->gapend, gi->gapnumbits, gi->gapbits); - if (nn_xmsg_size(m) == 0) - { - nn_xmsg_free (m); - m = NULL; - } - else - { - unsigned i; - ETRACE (wr, " FXGAP%"PRId64"..%"PRId64"/%d:", gi->gapstart, gi->gapend, gi->gapnumbits); - for (i = 0; i < gi->gapnumbits; i++) - ETRACE (wr, "%c", nn_bitset_isset (gi->gapnumbits, gi->gapbits, i) ? '1' : '0'); - } + unsigned i; + ETRACE (wr, " FXGAP%"PRId64"..%"PRId64"/%d:", gi->gapstart, gi->gapend, gi->gapnumbits); + for (i = 0; i < gi->gapnumbits; i++) + ETRACE (wr, "%c", nn_bitset_isset (gi->gapnumbits, gi->gapbits, i) ? '1' : '0'); } return m; } -static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const AckNack_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid) +struct defer_hb_state { + struct nn_xmsg *m; + struct xeventq *evq; + uint64_t wr_iid; + uint64_t prd_iid; +}; + +static void defer_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq, struct defer_hb_state *defer_hb_state) +{ + ETRACE (wr, "defer_heartbeat_to_peer: "PGUIDFMT" -> "PGUIDFMT" - queue for transmit\n", PGUID (wr->e.guid), PGUID (prd->e.guid)); + + if (defer_hb_state->m != NULL) + { + if (wr->e.iid == defer_hb_state->wr_iid && prd->e.iid == defer_hb_state->prd_iid) + return; + qxev_msg (wr->evq, defer_hb_state->m); + } + + ASSERT_MUTEX_HELD (&wr->e.lock); + assert (wr->reliable); + + defer_hb_state->m = nn_xmsg_new (wr->e.gv->xmsgpool, &wr->e.guid, wr->c.pp, 0, NN_XMSG_KIND_CONTROL); + nn_xmsg_setdstPRD (defer_hb_state->m, prd); + add_Heartbeat (defer_hb_state->m, wr, whcst, hbansreq, 0, prd->e.guid.entityid, 0); + defer_hb_state->evq = wr->evq; + defer_hb_state->wr_iid = wr->e.iid; + defer_hb_state->prd_iid = prd->e.iid; +} + +static void force_heartbeat_to_peer (struct writer *wr, const struct whc_state *whcst, struct proxy_reader *prd, int hbansreq, struct defer_hb_state *defer_hb_state) +{ + defer_heartbeat_to_peer (wr, whcst, prd, hbansreq, defer_hb_state); + qxev_msg (wr->evq, defer_hb_state->m); + defer_hb_state->m = NULL; +} + +static void defer_hb_state_init (struct defer_hb_state *defer_hb_state) +{ + defer_hb_state->m = NULL; +} + +static void defer_hb_state_fini (struct ddsi_domaingv * const gv, struct defer_hb_state *defer_hb_state) +{ + if (defer_hb_state->m) + { + GVTRACE ("send_deferred_heartbeat: %"PRIx64" -> %"PRIx64" - queue for transmit\n", defer_hb_state->wr_iid, defer_hb_state->prd_iid); + qxev_msg (defer_hb_state->evq, defer_hb_state->m); + defer_hb_state->m = NULL; + } +} + +static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const AckNack_t *msg, ddsrt_wctime_t timestamp, SubmessageKind_t prev_smid, struct defer_hb_state *defer_hb_state) { struct proxy_reader *prd; struct wr_prd_match *rn; @@ -862,13 +884,13 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const if (WHCST_ISEMPTY(&whcst)) { RSTTRACE (" whc-empty "); - force_heartbeat_to_peer (wr, &whcst, prd, 0); + force_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state); hb_sent_in_response = 1; } else { RSTTRACE (" rebase "); - force_heartbeat_to_peer (wr, &whcst, prd, 0); + force_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state); hb_sent_in_response = 1; numbits = rst->gv->config.accelerate_rexmit_block_size; seqbase = whcst.min_seq; @@ -1024,7 +1046,8 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const if (msgs_sent && max_seq_in_reply < max_seq_available) { RSTTRACE (" rexmit#%"PRIu32" maxseq:%"PRId64"<%"PRId64"<=%"PRId64"", msgs_sent, max_seq_in_reply, seq_xmit, wr->seq); - force_heartbeat_to_peer (wr, &whcst, prd, 1); + + defer_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state); hb_sent_in_response = 1; /* The primary purpose of hbcontrol_note_asyncwrite is to ensure @@ -1037,7 +1060,9 @@ static int handle_AckNack (struct receiver_state *rst, ddsrt_etime_t tnow, const /* If "final" flag not set, we must respond with a heartbeat. Do it now if we haven't done so already */ if (!(msg->smhdr.flags & ACKNACK_FLAG_FINAL) && !hb_sent_in_response) - force_heartbeat_to_peer (wr, &whcst, prd, 0); + { + defer_heartbeat_to_peer (wr, &whcst, prd, 0, defer_hb_state); + } RSTTRACE (")"); out: ddsrt_mutex_unlock (&wr->e.lock); @@ -1475,7 +1500,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(ddsrt_et return 1; } -static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, const NackFrag_t *msg, SubmessageKind_t prev_smid) +static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, const NackFrag_t *msg, SubmessageKind_t prev_smid, struct defer_hb_state *defer_hb_state) { struct proxy_reader *prd; struct wr_prd_match *rn; @@ -1579,15 +1604,10 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (m, wr->partition_id); #endif - if (nn_xmsg_setdstPRD (m, prd) < 0) - nn_xmsg_free (m); - else - { - /* length-1 bitmap with the bit clear avoids the illegal case of a - length-0 bitmap */ - add_Gap (m, wr, prd, seq, seq+1, 1, &zero); - qxev_msg (wr->evq, m); - } + nn_xmsg_setdstPRD (m, prd); + /* length-1 bitmap with the bit clear avoids the illegal case of a length-0 bitmap */ + add_Gap (m, wr, prd, seq, seq+1, 0, &zero); + qxev_msg (wr->evq, m); } if (seq <= writer_read_seq_xmit (wr)) { @@ -1596,7 +1616,7 @@ static int handle_NackFrag (struct receiver_state *rst, ddsrt_etime_t tnow, cons hearbeats will go out at a reasonably high rate for a while */ struct whc_state whcst; whc_get_state(wr->whc, &whcst); - force_heartbeat_to_peer (wr, &whcst, prd, 1); + defer_heartbeat_to_peer (wr, &whcst, prd, 1, defer_hb_state); writer_hbcontrol_note_asyncwrite (wr, ddsrt_time_monotonic ()); } @@ -2811,6 +2831,7 @@ static int handle_submsg_sequence unsigned char * end = msg + len; struct nn_dqueue *deferred_wakeup = NULL; SubmessageKind_t prev_smid = SMID_PAD; + struct defer_hb_state defer_hb_state; /* Receiver state is dynamically allocated with lifetime bound to the message. Updates cause a new copy to be created if the @@ -2840,6 +2861,7 @@ static int handle_submsg_sequence rst_live = 0; ts_for_latmeas = 0; timestamp = DDSRT_WCTIME_INVALID; + defer_hb_state_init (&defer_hb_state); assert (thread_is_asleep ()); thread_state_awake_fixed_domain (ts1); @@ -2894,7 +2916,7 @@ static int handle_submsg_sequence state = "parse:acknack"; if (!valid_AckNack (rst, &sm->acknack, submsg_size, byteswap)) goto malformed; - handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : DDSRT_WCTIME_INVALID, prev_smid); + handle_AckNack (rst, tnowE, &sm->acknack, ts_for_latmeas ? timestamp : DDSRT_WCTIME_INVALID, prev_smid, &defer_hb_state); ts_for_latmeas = 0; break; case SMID_HEARTBEAT: @@ -2957,7 +2979,7 @@ static int handle_submsg_sequence state = "parse:nackfrag"; if (!valid_NackFrag (&sm->nackfrag, submsg_size, byteswap)) goto malformed; - handle_NackFrag (rst, tnowE, &sm->nackfrag, prev_smid); + handle_NackFrag (rst, tnowE, &sm->nackfrag, prev_smid, &defer_hb_state); ts_for_latmeas = 0; break; case SMID_HEARTBEAT_FRAG: @@ -3106,6 +3128,7 @@ static int handle_submsg_sequence } thread_state_asleep (ts1); assert (thread_is_asleep ()); + defer_hb_state_fini (gv, &defer_hb_state); if (deferred_wakeup) dd_dqueue_enqueue_trigger (deferred_wakeup); return 0; @@ -3116,6 +3139,7 @@ malformed: malformed_asleep: assert (thread_is_asleep ()); malformed_packet_received (rst->gv, msg, submsg, len, state, state_smkind, hdr->vendorid); + defer_hb_state_fini (gv, &defer_hb_state); if (deferred_wakeup) dd_dqueue_enqueue_trigger (deferred_wakeup); return -1; diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 7f97039..7bd4168 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -211,11 +211,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru } /* set the destination explicitly to the unicast destination and the fourth param of add_Heartbeat needs to be the guid of the reader */ - if (nn_xmsg_setdstPRD (msg, prd) < 0) - { - nn_xmsg_free (msg); - return NULL; - } + nn_xmsg_setdstPRD (msg, prd); #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (msg, wr->partition_id); #endif @@ -345,11 +341,7 @@ struct nn_xmsg *writer_hbcontrol_p2p(struct writer *wr, const struct whc_state * /* set the destination explicitly to the unicast destination and the fourth param of add_Heartbeat needs to be the guid of the reader */ - if (nn_xmsg_setdstPRD (msg, prd) < 0) - { - nn_xmsg_free (msg); - return NULL; - } + nn_xmsg_setdstPRD (msg, prd); #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS nn_xmsg_setencoderid (msg, wr->partition_id); #endif @@ -563,12 +555,7 @@ dds_return_t create_fragment_message (struct writer *wr, seqno_t seq, const stru if (prd) { - if (nn_xmsg_setdstPRD (*pmsg, prd) < 0) - { - nn_xmsg_free (*pmsg); - *pmsg = NULL; - return DDS_RETCODE_PRECONDITION_NOT_MET; - } + nn_xmsg_setdstPRD (*pmsg, prd); /* retransmits: latency budget doesn't apply */ } else @@ -699,19 +686,9 @@ static void create_HeartbeatFrag (struct writer *wr, seqno_t seq, unsigned fragn nn_xmsg_setencoderid (*pmsg, wr->partition_id); #endif if (prd) - { - if (nn_xmsg_setdstPRD (*pmsg, prd) < 0) - { - /* HeartbeatFrag is only advisory anyway */ - nn_xmsg_free (*pmsg); - *pmsg = NULL; - return; - } - } + nn_xmsg_setdstPRD (*pmsg, prd); else - { nn_xmsg_setdstN (*pmsg, wr->as, wr->as_group); - } hbf = nn_xmsg_append (*pmsg, &sm_marker, sizeof (HeartbeatFrag_t)); nn_xmsg_submsg_init (*pmsg, sm_marker, SMID_HEARTBEAT_FRAG); hbf->readerId = nn_hton_entityid (prd ? prd->e.guid.entityid : to_entityid (NN_ENTITYID_UNKNOWN)); diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index eeebbff..ea1a4bd 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -1535,20 +1535,14 @@ void qxev_prd_entityid (struct proxy_reader *prd, const ddsi_guid_t *guid) if (! gv->xevents->tev_conn->m_connless) { msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL); - if (nn_xmsg_setdstPRD (msg, prd) == 0) - { - GVTRACE (" qxev_prd_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix)); - nn_xmsg_add_entityid (msg); - ddsrt_mutex_lock (&gv->xevents->lock); - ev = qxev_common_nt (gv->xevents, XEVK_ENTITYID); - ev->u.entityid.msg = msg; - qxev_insert_nt (ev); - ddsrt_mutex_unlock (&gv->xevents->lock); - } - else - { - nn_xmsg_free (msg); - } + nn_xmsg_setdstPRD (msg, prd); + GVTRACE (" qxev_prd_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix)); + nn_xmsg_add_entityid (msg); + ddsrt_mutex_lock (&gv->xevents->lock); + ev = qxev_common_nt (gv->xevents, XEVK_ENTITYID); + ev->u.entityid.msg = msg; + qxev_insert_nt (ev); + ddsrt_mutex_unlock (&gv->xevents->lock); } } @@ -1563,20 +1557,14 @@ void qxev_pwr_entityid (struct proxy_writer *pwr, const ddsi_guid_t *guid) if (! pwr->evq->tev_conn->m_connless) { msg = nn_xmsg_new (gv->xmsgpool, guid, NULL, sizeof (EntityId_t), NN_XMSG_KIND_CONTROL); - if (nn_xmsg_setdstPWR (msg, pwr) == 0) - { - GVTRACE (" qxev_pwr_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix)); - nn_xmsg_add_entityid (msg); - ddsrt_mutex_lock (&pwr->evq->lock); - ev = qxev_common_nt (pwr->evq, XEVK_ENTITYID); - ev->u.entityid.msg = msg; - qxev_insert_nt (ev); - ddsrt_mutex_unlock (&pwr->evq->lock); - } - else - { - nn_xmsg_free (msg); - } + nn_xmsg_setdstPWR (msg, pwr); + GVTRACE (" qxev_pwr_entityid (%"PRIx32":%"PRIx32":%"PRIx32")\n", PGUIDPREFIX (guid->prefix)); + nn_xmsg_add_entityid (msg); + ddsrt_mutex_lock (&pwr->evq->lock); + ev = qxev_common_nt (pwr->evq, XEVK_ENTITYID); + ev->u.entityid.msg = msg; + qxev_insert_nt (ev); + ddsrt_mutex_unlock (&pwr->evq->lock); } } diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index ae172c7..062c042 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -760,31 +760,20 @@ bool nn_xmsg_getdst1prefix (struct nn_xmsg *m, ddsi_guid_prefix_t *gp) return false; } -dds_return_t nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd) +void nn_xmsg_setdstPRD (struct nn_xmsg *m, const struct proxy_reader *prd) { nn_locator_t loc; - if (addrset_any_uc (prd->c.as, &loc) || addrset_any_mc (prd->c.as, &loc)) - { - nn_xmsg_setdst1 (prd->e.gv, m, &prd->e.guid.prefix, &loc); - return 0; - } - else - { - DDS_CWARNING (&prd->e.gv->logconfig, "nn_xmsg_setdstPRD: no address for "PGUIDFMT"", PGUID (prd->e.guid)); - return DDS_RETCODE_PRECONDITION_NOT_MET; - } + // only accepting endpoints that have an address + addrset_any_uc_else_mc_nofail (prd->c.as, &loc); + nn_xmsg_setdst1 (prd->e.gv, m, &prd->e.guid.prefix, &loc); } -dds_return_t nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr) +void nn_xmsg_setdstPWR (struct nn_xmsg *m, const struct proxy_writer *pwr) { nn_locator_t loc; - if (addrset_any_uc (pwr->c.as, &loc) || addrset_any_mc (pwr->c.as, &loc)) - { - nn_xmsg_setdst1 (pwr->e.gv, m, &pwr->e.guid.prefix, &loc); - return 0; - } - DDS_CWARNING (&pwr->e.gv->logconfig, "nn_xmsg_setdstPRD: no address for "PGUIDFMT, PGUID (pwr->e.guid)); - return DDS_RETCODE_PRECONDITION_NOT_MET; + // only accepting endpoints that have an address + addrset_any_uc_else_mc_nofail (pwr->c.as, &loc); + nn_xmsg_setdst1 (pwr->e.gv, m, &pwr->e.guid.prefix, &loc); } void nn_xmsg_setdstN (struct nn_xmsg *m, struct addrset *as, struct addrset *as_group)