diff --git a/src/core/ddsi/src/q_xevent.c b/src/core/ddsi/src/q_xevent.c index 7f9ee9a..e9313a4 100644 --- a/src/core/ddsi/src/q_xevent.c +++ b/src/core/ddsi/src/q_xevent.c @@ -959,6 +959,42 @@ static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent resched_xevent_if_earlier (ev, add_duration_to_mtime (tnow, 100 * T_MILLISECOND)); } +static bool resend_spdp_sample_by_guid_key (struct writer *wr, const nn_guid_t *guid, struct proxy_reader *prd) +{ + /* Look up data in (transient-local) WHC by key value -- FIXME: clearly + a slightly more efficient and elegant way of looking up the key value + is to be preferred */ + bool sample_found; + nn_plist_t ps; + nn_plist_init_empty (&ps); + ps.present |= PP_PARTICIPANT_GUID; + ps.participant_guid = *guid; + struct nn_xmsg *mpayload = nn_xmsg_new (gv.xmsgpool, &guid->prefix, 0, NN_XMSG_KIND_DATA); + nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0); + nn_xmsg_addpar_sentinel (mpayload); + nn_plist_fini (&ps); + struct ddsi_plist_sample plist_sample; + nn_xmsg_payload_to_plistsample (&plist_sample, PID_PARTICIPANT_GUID, mpayload); + struct ddsi_serdata *sd = ddsi_serdata_from_sample (gv.plist_topic, SDK_KEY, &plist_sample); + struct whc_borrowed_sample sample; + nn_xmsg_free (mpayload); + + os_mutexLock (&wr->e.lock); + sample_found = whc_borrow_sample_key (wr->whc, sd, &sample); + if (sample_found) + { + /* Claiming it is new rather than a retransmit so that the rexmit + limiting won't kick in. It is best-effort and therefore the + updating of the last transmitted sequence number won't take + place anyway. Nor is it necessary to fiddle with heartbeat + control stuff. */ + enqueue_sample_wrlock_held (wr, sample.seq, sample.plist, sample.serdata, prd, 1); + whc_return_sample(wr->whc, &sample, false); + } + os_mutexUnlock (&wr->e.lock); + ddsi_serdata_unref (sd); + return sample_found; +} static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, nn_mtime_t tnow) { @@ -966,10 +1002,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e struct participant *pp; struct proxy_reader *prd; struct writer *spdp_wr; - struct whc_borrowed_sample sample; -#ifndef NDEBUG - bool sample_found; -#endif + bool do_write; if ((pp = ephash_lookup_participant_guid (&ev->u.spdp.pp_guid)) == NULL) { @@ -994,62 +1027,22 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e /* memset is for tracing output */ memset (&ev->u.spdp.dest_proxypp_guid_prefix, 0, sizeof (ev->u.spdp.dest_proxypp_guid_prefix)); prd = NULL; + do_write = true; } else { nn_guid_t guid; guid.prefix = ev->u.spdp.dest_proxypp_guid_prefix; guid.entityid.u = NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER; - if ((prd = ephash_lookup_proxy_reader_guid (&guid)) == NULL) - { + prd = ephash_lookup_proxy_reader_guid (&guid); + do_write = (prd != NULL); + if (!do_write) DDS_TRACE("xmit spdp: no proxy reader %x:%x:%x:%x\n", PGUID (guid)); - goto skip; - } } - /* Look up data in (transient-local) WHC by key value -- FIXME: clearly - a slightly more efficient and elegant way of looking up the key value - is to be preferred */ - nn_plist_t ps; - nn_plist_init_empty (&ps); - ps.present |= PP_PARTICIPANT_GUID; - ps.participant_guid = ev->u.spdp.pp_guid; - struct nn_xmsg *mpayload = nn_xmsg_new (gv.xmsgpool, &ev->u.spdp.pp_guid.prefix, 0, NN_XMSG_KIND_DATA); - nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0); - nn_xmsg_addpar_sentinel (mpayload); - nn_plist_fini (&ps); - struct ddsi_plist_sample plist_sample; - nn_xmsg_payload_to_plistsample (&plist_sample, PID_PARTICIPANT_GUID, mpayload); - struct ddsi_serdata *sd = ddsi_serdata_from_sample (gv.plist_topic, SDK_KEY, &plist_sample); - nn_xmsg_free (mpayload); - - os_mutexLock (&spdp_wr->e.lock); - if (whc_borrow_sample_key (spdp_wr->whc, sd, &sample)) + if (do_write && !resend_spdp_sample_by_guid_key (spdp_wr, &ev->u.spdp.pp_guid, prd)) { - /* Claiming it is new rather than a retransmit so that the rexmit - limiting won't kick in. It is best-effort and therefore the - updating of the last transmitted sequence number won't take - place anyway. Nor is it necessary to fiddle with heartbeat - control stuff. */ - enqueue_sample_wrlock_held (spdp_wr, sample.seq, sample.plist, sample.serdata, prd, 1); - whc_return_sample(spdp_wr->whc, &sample, false); #ifndef NDEBUG - sample_found = true; -#endif - } -#ifndef NDEBUG - else - { - sample_found = false; - } -#endif - os_mutexUnlock (&spdp_wr->e.lock); - - ddsi_serdata_unref (sd); - -#ifndef NDEBUG - if (!sample_found) - { /* If undirected, it is pp->spdp_xevent, and that one must never run into an empty WHC unless it is already marked for deletion. @@ -1072,10 +1065,9 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e DDS_TRACE("xmit spdp: suppressing early spdp response from %x:%x:%x:%x to %x:%x:%x:%x\n", PGUID (pp->e.guid), PGUIDPREFIX (ev->u.spdp.dest_proxypp_guid_prefix), NN_ENTITYID_PARTICIPANT); } - } #endif + } - skip: if (ev->u.spdp.directed) { /* Directed events are used to send SPDP packets to newly