Refactor handling of an SPDP-republish event

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-12-30 10:39:19 +01:00 committed by Erik Boasson
parent 0064def595
commit ca99fd10aa

View file

@ -959,6 +959,42 @@ static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent
resched_xevent_if_earlier (ev, add_duration_to_mtime (tnow, 100 * T_MILLISECOND));
}
static bool resend_spdp_sample_by_guid_key (struct writer *wr, const nn_guid_t *guid, struct proxy_reader *prd)
{
/* Look up data in (transient-local) WHC by key value -- FIXME: clearly
a slightly more efficient and elegant way of looking up the key value
is to be preferred */
bool sample_found;
nn_plist_t ps;
nn_plist_init_empty (&ps);
ps.present |= PP_PARTICIPANT_GUID;
ps.participant_guid = *guid;
struct nn_xmsg *mpayload = nn_xmsg_new (gv.xmsgpool, &guid->prefix, 0, NN_XMSG_KIND_DATA);
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
nn_xmsg_addpar_sentinel (mpayload);
nn_plist_fini (&ps);
struct ddsi_plist_sample plist_sample;
nn_xmsg_payload_to_plistsample (&plist_sample, PID_PARTICIPANT_GUID, mpayload);
struct ddsi_serdata *sd = ddsi_serdata_from_sample (gv.plist_topic, SDK_KEY, &plist_sample);
struct whc_borrowed_sample sample;
nn_xmsg_free (mpayload);
os_mutexLock (&wr->e.lock);
sample_found = whc_borrow_sample_key (wr->whc, sd, &sample);
if (sample_found)
{
/* Claiming it is new rather than a retransmit so that the rexmit
limiting won't kick in. It is best-effort and therefore the
updating of the last transmitted sequence number won't take
place anyway. Nor is it necessary to fiddle with heartbeat
control stuff. */
enqueue_sample_wrlock_held (wr, sample.seq, sample.plist, sample.serdata, prd, 1);
whc_return_sample(wr->whc, &sample, false);
}
os_mutexUnlock (&wr->e.lock);
ddsi_serdata_unref (sd);
return sample_found;
}
static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *ev, nn_mtime_t tnow)
{
@ -966,10 +1002,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
struct participant *pp;
struct proxy_reader *prd;
struct writer *spdp_wr;
struct whc_borrowed_sample sample;
#ifndef NDEBUG
bool sample_found;
#endif
bool do_write;
if ((pp = ephash_lookup_participant_guid (&ev->u.spdp.pp_guid)) == NULL)
{
@ -994,62 +1027,22 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
/* memset is for tracing output */
memset (&ev->u.spdp.dest_proxypp_guid_prefix, 0, sizeof (ev->u.spdp.dest_proxypp_guid_prefix));
prd = NULL;
do_write = true;
}
else
{
nn_guid_t guid;
guid.prefix = ev->u.spdp.dest_proxypp_guid_prefix;
guid.entityid.u = NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER;
if ((prd = ephash_lookup_proxy_reader_guid (&guid)) == NULL)
{
prd = ephash_lookup_proxy_reader_guid (&guid);
do_write = (prd != NULL);
if (!do_write)
DDS_TRACE("xmit spdp: no proxy reader %x:%x:%x:%x\n", PGUID (guid));
goto skip;
}
}
/* Look up data in (transient-local) WHC by key value -- FIXME: clearly
a slightly more efficient and elegant way of looking up the key value
is to be preferred */
nn_plist_t ps;
nn_plist_init_empty (&ps);
ps.present |= PP_PARTICIPANT_GUID;
ps.participant_guid = ev->u.spdp.pp_guid;
struct nn_xmsg *mpayload = nn_xmsg_new (gv.xmsgpool, &ev->u.spdp.pp_guid.prefix, 0, NN_XMSG_KIND_DATA);
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
nn_xmsg_addpar_sentinel (mpayload);
nn_plist_fini (&ps);
struct ddsi_plist_sample plist_sample;
nn_xmsg_payload_to_plistsample (&plist_sample, PID_PARTICIPANT_GUID, mpayload);
struct ddsi_serdata *sd = ddsi_serdata_from_sample (gv.plist_topic, SDK_KEY, &plist_sample);
nn_xmsg_free (mpayload);
os_mutexLock (&spdp_wr->e.lock);
if (whc_borrow_sample_key (spdp_wr->whc, sd, &sample))
if (do_write && !resend_spdp_sample_by_guid_key (spdp_wr, &ev->u.spdp.pp_guid, prd))
{
/* Claiming it is new rather than a retransmit so that the rexmit
limiting won't kick in. It is best-effort and therefore the
updating of the last transmitted sequence number won't take
place anyway. Nor is it necessary to fiddle with heartbeat
control stuff. */
enqueue_sample_wrlock_held (spdp_wr, sample.seq, sample.plist, sample.serdata, prd, 1);
whc_return_sample(spdp_wr->whc, &sample, false);
#ifndef NDEBUG
sample_found = true;
#endif
}
#ifndef NDEBUG
else
{
sample_found = false;
}
#endif
os_mutexUnlock (&spdp_wr->e.lock);
ddsi_serdata_unref (sd);
#ifndef NDEBUG
if (!sample_found)
{
/* If undirected, it is pp->spdp_xevent, and that one must never
run into an empty WHC unless it is already marked for deletion.
@ -1072,10 +1065,9 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
DDS_TRACE("xmit spdp: suppressing early spdp response from %x:%x:%x:%x to %x:%x:%x:%x\n",
PGUID (pp->e.guid), PGUIDPREFIX (ev->u.spdp.dest_proxypp_guid_prefix), NN_ENTITYID_PARTICIPANT);
}
}
#endif
}
skip:
if (ev->u.spdp.directed)
{
/* Directed events are used to send SPDP packets to newly