Check sequence number of SEDP messages

Historical data can processed after new data, effectively going backward
in time, so only process data that is newer than the current state.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-07-18 19:12:00 +02:00 committed by eboasson
parent 966ec0dda7
commit 103210bf8e
3 changed files with 84 additions and 74 deletions

View file

@ -338,6 +338,7 @@ struct proxy_endpoint_common
struct addrset *as; /* address set to use for communicating with this endpoint */ struct addrset *as; /* address set to use for communicating with this endpoint */
nn_guid_t group_guid; /* 0:0:0:0 if not available */ nn_guid_t group_guid; /* 0:0:0:0 if not available */
nn_vendorid_t vendor; /* cached from proxypp->vendor */ nn_vendorid_t vendor; /* cached from proxypp->vendor */
seqno_t seq; /* sequence number of most recent SEDP message */
}; };
struct proxy_writer { struct proxy_writer {
@ -608,8 +609,8 @@ void purge_proxy_participants (struct q_globals *gv, const nn_locator_t *loc, bo
/* To create a new proxy writer or reader; the proxy participant is /* To create a new proxy writer or reader; the proxy participant is
determined from the GUID and must exist. */ determined from the GUID and must exist. */
int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp); int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp, seqno_t seq);
int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, nn_wctime_t timestamp int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, nn_wctime_t timestamp, seqno_t seq
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
, int favours_ssm , int favours_ssm
#endif #endif
@ -623,8 +624,8 @@ int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const
int delete_proxy_writer (struct q_globals *gv, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); int delete_proxy_writer (struct q_globals *gv, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit);
int delete_proxy_reader (struct q_globals *gv, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); int delete_proxy_reader (struct q_globals *gv, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit);
void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp);
int new_proxy_group (const struct nn_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); int new_proxy_group (const struct nn_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
void delete_proxy_group (struct ephash *guid_hash, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); void delete_proxy_group (struct ephash *guid_hash, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit);

View file

@ -1120,7 +1120,7 @@ static struct proxy_participant *implicitly_create_proxypp (struct q_globals *gv
return ephash_lookup_proxy_participant_guid (gv->guid_hash, ppguid); return ephash_lookup_proxy_participant_guid (gv->guid_hash, ppguid);
} }
static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp) static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp)
{ {
#define E(msg, lbl) do { GVLOGDISC (msg); goto lbl; } while (0) #define E(msg, lbl) do { GVLOGDISC (msg); goto lbl; } while (0)
struct q_globals * const gv = rst->gv; struct q_globals * const gv = rst->gv;
@ -1275,7 +1275,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
{ {
if (pwr) if (pwr)
{ {
update_proxy_writer (pwr, as, xqos, timestamp); update_proxy_writer (pwr, seq, as, xqos, timestamp);
} }
else else
{ {
@ -1287,7 +1287,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
new_proxy_writer (&ppguid, &datap->endpoint_guid, as, datap, channel->dqueue, channel->evq ? channel->evq : gv->xevents, timestamp); new_proxy_writer (&ppguid, &datap->endpoint_guid, as, datap, channel->dqueue, channel->evq ? channel->evq : gv->xevents, timestamp);
} }
#else #else
new_proxy_writer (gv, &ppguid, &datap->endpoint_guid, as, datap, gv->user_dqueue, gv->xevents, timestamp); new_proxy_writer (gv, &ppguid, &datap->endpoint_guid, as, datap, gv->user_dqueue, gv->xevents, timestamp, seq);
#endif #endif
} }
} }
@ -1295,14 +1295,14 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat
{ {
if (prd) if (prd)
{ {
update_proxy_reader (prd, as, xqos, timestamp); update_proxy_reader (prd, seq, as, xqos, timestamp);
} }
else else
{ {
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp, ssm); new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp, seq, ssm);
#else #else
new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp); new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp, seq);
#endif #endif
} }
} }
@ -1337,7 +1337,7 @@ static void handle_SEDP_dead (const struct receiver_state *rst, nn_plist_t *data
GVLOGDISC (" %s\n", (res < 0) ? " unknown" : " delete"); GVLOGDISC (" %s\n", (res < 0) ? " unknown" : " delete");
} }
static void handle_SEDP (const struct receiver_state *rst, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len) static void handle_SEDP (const struct receiver_state *rst, seqno_t seq, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len)
{ {
struct q_globals * const gv = rst->gv; struct q_globals * const gv = rst->gv;
const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */ const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */
@ -1370,7 +1370,7 @@ static void handle_SEDP (const struct receiver_state *rst, nn_wctime_t timestamp
switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER)) switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER))
{ {
case 0: case 0:
handle_SEDP_alive (rst, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp); handle_SEDP_alive (rst, seq, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp);
break; break;
case NN_STATUSINFO_DISPOSE: case NN_STATUSINFO_DISPOSE:
@ -1736,7 +1736,7 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str
break; break;
case NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER: case NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER:
case NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER: case NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER:
handle_SEDP (sampleinfo->rst, timestamp, statusinfo, datap, datasz); handle_SEDP (sampleinfo->rst, sampleinfo->seq, timestamp, statusinfo, datap, datasz);
break; break;
case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER: case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER:
handle_PMD (sampleinfo->rst, timestamp, statusinfo, datap, datasz); handle_PMD (sampleinfo->rst, timestamp, statusinfo, datap, datasz);

View file

@ -3659,15 +3659,15 @@ void new_proxy_participant
assert (is_builtin_entityid (guid1.entityid, proxypp->vendor)); assert (is_builtin_entityid (guid1.entityid, proxypp->vendor));
if (is_writer_entityid (guid1.entityid)) if (is_writer_entityid (guid1.entityid))
{ {
new_proxy_writer (gv, ppguid, &guid1, proxypp->as_meta, &plist_wr, gv->builtins_dqueue, gv->xevents, timestamp); new_proxy_writer (gv, ppguid, &guid1, proxypp->as_meta, &plist_wr, gv->builtins_dqueue, gv->xevents, timestamp, 0);
} }
else else
{ {
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
const int ssm = addrset_contains_ssm (gv, proxypp->as_meta); const int ssm = addrset_contains_ssm (gv, proxypp->as_meta);
new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp, ssm); new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp, 0, ssm);
#else #else
new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp); new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp, 0);
#endif #endif
} }
} }
@ -3696,7 +3696,7 @@ int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, se
nn_plist_mergein_missing (new_plist, datap, PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID | PP_ENTITY_NAME, QP_USER_DATA); nn_plist_mergein_missing (new_plist, datap, PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID | PP_ENTITY_NAME, QP_USER_DATA);
nn_plist_mergein_missing (new_plist, &proxypp->e.gv->default_plist_pp, ~(uint64_t)0, ~(uint64_t)0); nn_plist_mergein_missing (new_plist, &proxypp->e.gv->default_plist_pp, ~(uint64_t)0, ~(uint64_t)0);
if (seq && seq > proxypp->seq) if (seq > proxypp->seq)
proxypp->seq = seq; proxypp->seq = seq;
switch (source) switch (source)
@ -3953,7 +3953,7 @@ uint64_t get_entity_instance_id (const struct q_globals *gv, const struct nn_gui
/* PROXY-ENDPOINT --------------------------------------------------- */ /* PROXY-ENDPOINT --------------------------------------------------- */
static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist) static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
{ {
const char *name; const char *name;
@ -3968,6 +3968,7 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
c->as = ref_addrset (as); c->as = ref_addrset (as);
c->topic = NULL; /* set from first matching reader/writer */ c->topic = NULL; /* set from first matching reader/writer */
c->vendor = proxypp->vendor; c->vendor = proxypp->vendor;
c->seq = seq;
if (plist->present & PP_GROUP_GUID) if (plist->present & PP_GROUP_GUID)
c->group_guid = plist->group_guid; c->group_guid = plist->group_guid;
@ -3991,7 +3992,7 @@ static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_en
/* PROXY-WRITER ----------------------------------------------------- */ /* PROXY-WRITER ----------------------------------------------------- */
int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp) int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp, seqno_t seq)
{ {
struct proxy_participant *proxypp; struct proxy_participant *proxypp;
struct proxy_writer *pwr; struct proxy_writer *pwr;
@ -4008,7 +4009,7 @@ int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const
} }
pwr = ddsrt_malloc (sizeof (*pwr)); pwr = ddsrt_malloc (sizeof (*pwr));
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, proxypp, as, plist); proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist);
ddsrt_avl_init (&pwr_readers_treedef, &pwr->readers); ddsrt_avl_init (&pwr_readers_treedef, &pwr->readers);
pwr->n_reliable_readers = 0; pwr->n_reliable_readers = 0;
@ -4084,7 +4085,7 @@ int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const
return 0; return 0;
} }
void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp)
{ {
struct reader * rd; struct reader * rd;
struct pwr_rd_match * m; struct pwr_rd_match * m;
@ -4093,31 +4094,35 @@ void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const st
/* Update proxy writer endpoints (from SEDP alive) */ /* Update proxy writer endpoints (from SEDP alive) */
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
if (! addrset_eq_onesidederr (pwr->c.as, as)) if (seq > pwr->c.seq)
{ {
#ifdef DDSI_INCLUDE_SSM pwr->c.seq = seq;
pwr->supports_ssm = (addrset_contains_ssm (pwr->e.gv, as) && pwr->e.gv->config.allowMulticast & AMC_SSM) ? 1 : 0; if (! addrset_eq_onesidederr (pwr->c.as, as))
#endif
unref_addrset (pwr->c.as);
ref_addrset (as);
pwr->c.as = as;
m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &iter);
while (m)
{ {
rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid); #ifdef DDSI_INCLUDE_SSM
if (rd) pwr->supports_ssm = (addrset_contains_ssm (pwr->e.gv, as) && pwr->e.gv->config.allowMulticast & AMC_SSM) ? 1 : 0;
#endif
unref_addrset (pwr->c.as);
ref_addrset (as);
pwr->c.as = as;
m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &iter);
while (m)
{ {
qxev_pwr_entityid (pwr, &rd->e.guid.prefix); rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid);
if (rd)
{
qxev_pwr_entityid (pwr, &rd->e.guid.prefix);
}
m = ddsrt_avl_iter_next (&iter);
} }
m = ddsrt_avl_iter_next (&iter);
} }
}
update_qos_locked (&pwr->e, pwr->c.xqos, xqos, timestamp); update_qos_locked (&pwr->e, pwr->c.xqos, xqos, timestamp);
}
ddsrt_mutex_unlock (&pwr->e.lock); ddsrt_mutex_unlock (&pwr->e.lock);
} }
void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp)
{ {
struct prd_wr_match * m; struct prd_wr_match * m;
nn_guid_t wrguid; nn_guid_t wrguid;
@ -4125,49 +4130,53 @@ void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const st
memset (&wrguid, 0, sizeof (wrguid)); memset (&wrguid, 0, sizeof (wrguid));
ddsrt_mutex_lock (&prd->e.lock); ddsrt_mutex_lock (&prd->e.lock);
if (! addrset_eq_onesidederr (prd->c.as, as)) if (seq > prd->c.seq)
{ {
/* Update proxy reader endpoints (from SEDP alive) */ prd->c.seq = seq;
if (! addrset_eq_onesidederr (prd->c.as, as))
unref_addrset (prd->c.as);
ref_addrset (as);
prd->c.as = as;
/* Rebuild writer endpoints */
while ((m = ddsrt_avl_lookup_succ_eq (&prd_writers_treedef, &prd->writers, &wrguid)) != NULL)
{ {
struct prd_wr_match *next; /* Update proxy reader endpoints (from SEDP alive) */
nn_guid_t guid_next;
struct writer * wr;
wrguid = m->wr_guid; unref_addrset (prd->c.as);
next = ddsrt_avl_find_succ (&prd_writers_treedef, &prd->writers, m); ref_addrset (as);
if (next) prd->c.as = as;
{
guid_next = next->wr_guid;
}
else
{
memset (&guid_next, 0xff, sizeof (guid_next));
guid_next.entityid.u = (guid_next.entityid.u & ~(unsigned)0xff) | NN_ENTITYID_KIND_WRITER_NO_KEY;
}
ddsrt_mutex_unlock (&prd->e.lock); /* Rebuild writer endpoints */
wr = ephash_lookup_writer_guid (prd->e.gv->guid_hash, &wrguid);
if (wr) while ((m = ddsrt_avl_lookup_succ_eq (&prd_writers_treedef, &prd->writers, &wrguid)) != NULL)
{ {
ddsrt_mutex_lock (&wr->e.lock); struct prd_wr_match *next;
rebuild_writer_addrset (wr); nn_guid_t guid_next;
ddsrt_mutex_unlock (&wr->e.lock); struct writer * wr;
qxev_prd_entityid (prd, &wr->e.guid.prefix);
wrguid = m->wr_guid;
next = ddsrt_avl_find_succ (&prd_writers_treedef, &prd->writers, m);
if (next)
{
guid_next = next->wr_guid;
}
else
{
memset (&guid_next, 0xff, sizeof (guid_next));
guid_next.entityid.u = (guid_next.entityid.u & ~(unsigned)0xff) | NN_ENTITYID_KIND_WRITER_NO_KEY;
}
ddsrt_mutex_unlock (&prd->e.lock);
wr = ephash_lookup_writer_guid (prd->e.gv->guid_hash, &wrguid);
if (wr)
{
ddsrt_mutex_lock (&wr->e.lock);
rebuild_writer_addrset (wr);
ddsrt_mutex_unlock (&wr->e.lock);
qxev_prd_entityid (prd, &wr->e.guid.prefix);
}
wrguid = guid_next;
ddsrt_mutex_lock (&prd->e.lock);
} }
wrguid = guid_next;
ddsrt_mutex_lock (&prd->e.lock);
} }
}
update_qos_locked (&prd->e, prd->c.xqos, xqos, timestamp); update_qos_locked (&prd->e, prd->c.xqos, xqos, timestamp);
}
ddsrt_mutex_unlock (&prd->e.lock); ddsrt_mutex_unlock (&prd->e.lock);
} }
@ -4219,7 +4228,7 @@ int delete_proxy_writer (struct q_globals *gv, const struct nn_guid *guid, nn_wc
/* PROXY-READER ----------------------------------------------------- */ /* PROXY-READER ----------------------------------------------------- */
int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, nn_wctime_t timestamp int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, nn_wctime_t timestamp, seqno_t seq
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
, int favours_ssm , int favours_ssm
#endif #endif
@ -4239,7 +4248,7 @@ int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const
} }
prd = ddsrt_malloc (sizeof (*prd)); prd = ddsrt_malloc (sizeof (*prd));
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, proxypp, as, plist); proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist);
prd->deleting = 0; prd->deleting = 0;
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM