diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 1d20d34..0acd3da 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -338,6 +338,7 @@ struct proxy_endpoint_common struct addrset *as; /* address set to use for communicating with this endpoint */ nn_guid_t group_guid; /* 0:0:0:0 if not available */ nn_vendorid_t vendor; /* cached from proxypp->vendor */ + seqno_t seq; /* sequence number of most recent SEDP message */ }; struct proxy_writer { @@ -608,8 +609,8 @@ void purge_proxy_participants (struct q_globals *gv, const nn_locator_t *loc, bo /* To create a new proxy writer or reader; the proxy participant is determined from the GUID and must exist. */ - int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp); -int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, nn_wctime_t timestamp + int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp, seqno_t seq); +int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const struct nn_plist *plist, nn_wctime_t timestamp, seqno_t seq #ifdef DDSI_INCLUDE_SSM , int favours_ssm #endif @@ -623,8 +624,8 @@ int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const int delete_proxy_writer (struct q_globals *gv, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); int delete_proxy_reader (struct q_globals *gv, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); -void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); -void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); +void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); +void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); int new_proxy_group (const struct nn_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); void delete_proxy_group (struct ephash *guid_hash, const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index fa08998..227c961 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -1120,7 +1120,7 @@ static struct proxy_participant *implicitly_create_proxypp (struct q_globals *gv return ephash_lookup_proxy_participant_guid (gv->guid_hash, ppguid); } -static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp) +static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp) { #define E(msg, lbl) do { GVLOGDISC (msg); goto lbl; } while (0) struct q_globals * const gv = rst->gv; @@ -1275,7 +1275,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat { if (pwr) { - update_proxy_writer (pwr, as, xqos, timestamp); + update_proxy_writer (pwr, seq, as, xqos, timestamp); } else { @@ -1287,7 +1287,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat new_proxy_writer (&ppguid, &datap->endpoint_guid, as, datap, channel->dqueue, channel->evq ? channel->evq : gv->xevents, timestamp); } #else - new_proxy_writer (gv, &ppguid, &datap->endpoint_guid, as, datap, gv->user_dqueue, gv->xevents, timestamp); + new_proxy_writer (gv, &ppguid, &datap->endpoint_guid, as, datap, gv->user_dqueue, gv->xevents, timestamp, seq); #endif } } @@ -1295,14 +1295,14 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat { if (prd) { - update_proxy_reader (prd, as, xqos, timestamp); + update_proxy_reader (prd, seq, as, xqos, timestamp); } else { #ifdef DDSI_INCLUDE_SSM - new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp, ssm); + new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp, seq, ssm); #else - new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp); + new_proxy_reader (gv, &ppguid, &datap->endpoint_guid, as, datap, timestamp, seq); #endif } } @@ -1337,7 +1337,7 @@ static void handle_SEDP_dead (const struct receiver_state *rst, nn_plist_t *data GVLOGDISC (" %s\n", (res < 0) ? " unknown" : " delete"); } -static void handle_SEDP (const struct receiver_state *rst, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len) +static void handle_SEDP (const struct receiver_state *rst, seqno_t seq, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len) { struct q_globals * const gv = rst->gv; const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */ @@ -1370,7 +1370,7 @@ static void handle_SEDP (const struct receiver_state *rst, nn_wctime_t timestamp switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER)) { case 0: - handle_SEDP_alive (rst, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp); + handle_SEDP_alive (rst, seq, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp); break; case NN_STATUSINFO_DISPOSE: @@ -1736,7 +1736,7 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str break; case NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER: case NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER: - handle_SEDP (sampleinfo->rst, timestamp, statusinfo, datap, datasz); + handle_SEDP (sampleinfo->rst, sampleinfo->seq, timestamp, statusinfo, datap, datasz); break; case NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER: handle_PMD (sampleinfo->rst, timestamp, statusinfo, datap, datasz); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 42ccf84..da19224 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -3659,15 +3659,15 @@ void new_proxy_participant assert (is_builtin_entityid (guid1.entityid, proxypp->vendor)); if (is_writer_entityid (guid1.entityid)) { - new_proxy_writer (gv, ppguid, &guid1, proxypp->as_meta, &plist_wr, gv->builtins_dqueue, gv->xevents, timestamp); + new_proxy_writer (gv, ppguid, &guid1, proxypp->as_meta, &plist_wr, gv->builtins_dqueue, gv->xevents, timestamp, 0); } else { #ifdef DDSI_INCLUDE_SSM const int ssm = addrset_contains_ssm (gv, proxypp->as_meta); - new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp, ssm); + new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp, 0, ssm); #else - new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp); + new_proxy_reader (gv, ppguid, &guid1, proxypp->as_meta, &plist_rd, timestamp, 0); #endif } } @@ -3696,7 +3696,7 @@ int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, se nn_plist_mergein_missing (new_plist, datap, PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID | PP_ENTITY_NAME, QP_USER_DATA); nn_plist_mergein_missing (new_plist, &proxypp->e.gv->default_plist_pp, ~(uint64_t)0, ~(uint64_t)0); - if (seq && seq > proxypp->seq) + if (seq > proxypp->seq) proxypp->seq = seq; switch (source) @@ -3953,7 +3953,7 @@ uint64_t get_entity_instance_id (const struct q_globals *gv, const struct nn_gui /* PROXY-ENDPOINT --------------------------------------------------- */ -static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist) +static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, seqno_t seq, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist) { const char *name; @@ -3968,6 +3968,7 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en c->as = ref_addrset (as); c->topic = NULL; /* set from first matching reader/writer */ c->vendor = proxypp->vendor; + c->seq = seq; if (plist->present & PP_GROUP_GUID) c->group_guid = plist->group_guid; @@ -3991,7 +3992,7 @@ static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_en /* PROXY-WRITER ----------------------------------------------------- */ -int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp) +int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, struct nn_dqueue *dqueue, struct xeventq *evq, nn_wctime_t timestamp, seqno_t seq) { struct proxy_participant *proxypp; struct proxy_writer *pwr; @@ -4008,7 +4009,7 @@ int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const } pwr = ddsrt_malloc (sizeof (*pwr)); - proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, proxypp, as, plist); + proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, seq, proxypp, as, plist); ddsrt_avl_init (&pwr_readers_treedef, &pwr->readers); pwr->n_reliable_readers = 0; @@ -4084,7 +4085,7 @@ int new_proxy_writer (struct q_globals *gv, const struct nn_guid *ppguid, const return 0; } -void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) +void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) { struct reader * rd; struct pwr_rd_match * m; @@ -4093,31 +4094,35 @@ void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const st /* Update proxy writer endpoints (from SEDP alive) */ ddsrt_mutex_lock (&pwr->e.lock); - if (! addrset_eq_onesidederr (pwr->c.as, as)) + if (seq > pwr->c.seq) { -#ifdef DDSI_INCLUDE_SSM - pwr->supports_ssm = (addrset_contains_ssm (pwr->e.gv, as) && pwr->e.gv->config.allowMulticast & AMC_SSM) ? 1 : 0; -#endif - unref_addrset (pwr->c.as); - ref_addrset (as); - pwr->c.as = as; - m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &iter); - while (m) + pwr->c.seq = seq; + if (! addrset_eq_onesidederr (pwr->c.as, as)) { - rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid); - if (rd) +#ifdef DDSI_INCLUDE_SSM + pwr->supports_ssm = (addrset_contains_ssm (pwr->e.gv, as) && pwr->e.gv->config.allowMulticast & AMC_SSM) ? 1 : 0; +#endif + unref_addrset (pwr->c.as); + ref_addrset (as); + pwr->c.as = as; + m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &iter); + while (m) { - qxev_pwr_entityid (pwr, &rd->e.guid.prefix); + rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid); + if (rd) + { + qxev_pwr_entityid (pwr, &rd->e.guid.prefix); + } + m = ddsrt_avl_iter_next (&iter); } - m = ddsrt_avl_iter_next (&iter); } - } - update_qos_locked (&pwr->e, pwr->c.xqos, xqos, timestamp); + update_qos_locked (&pwr->e, pwr->c.xqos, xqos, timestamp); + } ddsrt_mutex_unlock (&pwr->e.lock); } -void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) +void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) { struct prd_wr_match * m; nn_guid_t wrguid; @@ -4125,49 +4130,53 @@ void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const st memset (&wrguid, 0, sizeof (wrguid)); ddsrt_mutex_lock (&prd->e.lock); - if (! addrset_eq_onesidederr (prd->c.as, as)) + if (seq > prd->c.seq) { - /* Update proxy reader endpoints (from SEDP alive) */ - - unref_addrset (prd->c.as); - ref_addrset (as); - prd->c.as = as; - - /* Rebuild writer endpoints */ - - while ((m = ddsrt_avl_lookup_succ_eq (&prd_writers_treedef, &prd->writers, &wrguid)) != NULL) + prd->c.seq = seq; + if (! addrset_eq_onesidederr (prd->c.as, as)) { - struct prd_wr_match *next; - nn_guid_t guid_next; - struct writer * wr; + /* Update proxy reader endpoints (from SEDP alive) */ - wrguid = m->wr_guid; - next = ddsrt_avl_find_succ (&prd_writers_treedef, &prd->writers, m); - if (next) - { - guid_next = next->wr_guid; - } - else - { - memset (&guid_next, 0xff, sizeof (guid_next)); - guid_next.entityid.u = (guid_next.entityid.u & ~(unsigned)0xff) | NN_ENTITYID_KIND_WRITER_NO_KEY; - } + unref_addrset (prd->c.as); + ref_addrset (as); + prd->c.as = as; - ddsrt_mutex_unlock (&prd->e.lock); - wr = ephash_lookup_writer_guid (prd->e.gv->guid_hash, &wrguid); - if (wr) + /* Rebuild writer endpoints */ + + while ((m = ddsrt_avl_lookup_succ_eq (&prd_writers_treedef, &prd->writers, &wrguid)) != NULL) { - ddsrt_mutex_lock (&wr->e.lock); - rebuild_writer_addrset (wr); - ddsrt_mutex_unlock (&wr->e.lock); - qxev_prd_entityid (prd, &wr->e.guid.prefix); + struct prd_wr_match *next; + nn_guid_t guid_next; + struct writer * wr; + + wrguid = m->wr_guid; + next = ddsrt_avl_find_succ (&prd_writers_treedef, &prd->writers, m); + if (next) + { + guid_next = next->wr_guid; + } + else + { + memset (&guid_next, 0xff, sizeof (guid_next)); + guid_next.entityid.u = (guid_next.entityid.u & ~(unsigned)0xff) | NN_ENTITYID_KIND_WRITER_NO_KEY; + } + + ddsrt_mutex_unlock (&prd->e.lock); + wr = ephash_lookup_writer_guid (prd->e.gv->guid_hash, &wrguid); + if (wr) + { + ddsrt_mutex_lock (&wr->e.lock); + rebuild_writer_addrset (wr); + ddsrt_mutex_unlock (&wr->e.lock); + qxev_prd_entityid (prd, &wr->e.guid.prefix); + } + wrguid = guid_next; + ddsrt_mutex_lock (&prd->e.lock); } - wrguid = guid_next; - ddsrt_mutex_lock (&prd->e.lock); } - } - update_qos_locked (&prd->e, prd->c.xqos, xqos, timestamp); + update_qos_locked (&prd->e, prd->c.xqos, xqos, timestamp); + } ddsrt_mutex_unlock (&prd->e.lock); } @@ -4219,7 +4228,7 @@ int delete_proxy_writer (struct q_globals *gv, const struct nn_guid *guid, nn_wc /* PROXY-READER ----------------------------------------------------- */ -int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, nn_wctime_t timestamp +int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const struct nn_guid *guid, struct addrset *as, const nn_plist_t *plist, nn_wctime_t timestamp, seqno_t seq #ifdef DDSI_INCLUDE_SSM , int favours_ssm #endif @@ -4239,7 +4248,7 @@ int new_proxy_reader (struct q_globals *gv, const struct nn_guid *ppguid, const } prd = ddsrt_malloc (sizeof (*prd)); - proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, proxypp, as, plist); + proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, seq, proxypp, as, plist); prd->deleting = 0; #ifdef DDSI_INCLUDE_SSM