diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index 5b1c71c..069c9d7 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -257,10 +257,12 @@ void dds__builtin_init (struct dds_domain *dom) dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription", &dom->gv); dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication", &dom->gv); + thread_state_awake (lookup_thread_state (), &dom->gv); const struct ephash *gh = dom->gv.guid_hash; dom->builtintopic_writer_participant = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dom->builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT, gh)); dom->builtintopic_writer_publications = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dom->builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER, gh)); dom->builtintopic_writer_subscriptions = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dom->builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER, gh)); + thread_state_asleep (lookup_thread_state ()); dds_delete_qos (qos); } diff --git a/src/core/ddsc/src/dds_matched.c b/src/core/ddsc/src/dds_matched.c index 0fe4006..5a35173 100644 --- a/src/core/ddsc/src/dds_matched.c +++ b/src/core/ddsc/src/dds_matched.c @@ -17,6 +17,7 @@ #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_bswap.h" #include "dds__writer.h" diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 31b5c82..9f87daf 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -18,6 +18,7 @@ #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_plist.h" #include "dds/ddsi/q_globals.h" +#include "dds/ddsi/q_ephash.h" #include "dds/version.h" #include "dds__init.h" #include "dds__domain.h" diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index c969768..430f84b 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -28,6 +28,7 @@ #include "dds/ddsi/q_globals.h" #include "dds__builtin.h" #include "dds/ddsi/ddsi_sertopic.h" +#include "dds/ddsi/q_ephash.h" DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_reader) diff --git a/src/core/ddsc/src/dds_serdata_builtintopic.c b/src/core/ddsc/src/dds_serdata_builtintopic.c index 4bedb15..64b4574 100644 --- a/src/core/ddsc/src/dds_serdata_builtintopic.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -24,6 +24,7 @@ #include "dds__serdata_builtintopic.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" static const uint64_t unihashconsts[] = { UINT64_C (16292676669999574021), diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 4630336..58293b4 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -25,6 +25,7 @@ #include "dds__get_status.h" #include "dds__qos.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/q_ddsi_discovery.h" diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index f0c0bc4..533ac5d 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -19,6 +19,7 @@ #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_xmsg.h" +#include "dds/ddsi/q_ephash.h" #include "dds__writer.h" #include "dds__listener.h" #include "dds__init.h" diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c index 32c5b3a..c2c6963 100644 --- a/src/core/ddsc/tests/liveliness.c +++ b/src/core/ddsc/tests/liveliness.c @@ -20,6 +20,7 @@ #include "dds/version.h" #include "dds__entity.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsrt/cdtors.h" #include "dds/ddsrt/misc.h" #include "dds/ddsrt/process.h" diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index c899296..c73fdf6 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -20,7 +20,6 @@ #include "dds/ddsi/q_rtps.h" #include "dds/ddsi/q_protocol.h" #include "dds/ddsi/q_lat_estim.h" -#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_hbcontrol.h" #include "dds/ddsi/q_feature_check.h" #include "dds/ddsi/q_inverse_uint32_set.h" @@ -48,6 +47,15 @@ struct proxy_group; struct proxy_endpoint_common; typedef void (*ddsi2direct_directread_cb_t) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg); +enum entity_kind { + EK_PARTICIPANT, + EK_PROXY_PARTICIPANT, + EK_WRITER, + EK_PROXY_WRITER, + EK_READER, + EK_PROXY_READER +}; + /* Liveliness changed is more complicated than just add/remove. Encode the event in status_cb_data_t::extra and ignore status_cb_data_t::add */ enum liveliness_changed_data_extra { @@ -157,6 +165,7 @@ struct entity_common { ddsrt_mutex_t lock; bool onlylocal; struct q_globals *gv; + ddsrt_avl_node_t all_entities_avlnode; /* QoS changes always lock the entity itself, and additionally (and within the scope of the entity lock) acquire qos_lock @@ -369,6 +378,11 @@ struct proxy_endpoint_common seqno_t seq; /* sequence number of most recent SEDP message */ }; +struct generic_proxy_endpoint { + struct entity_common e; + struct proxy_endpoint_common c; +}; + struct proxy_writer { struct entity_common e; struct proxy_endpoint_common c; @@ -671,6 +685,8 @@ int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify); void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify); int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); + +struct ephash; void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit); /* Call this to empty all address sets of all writers to stop all outgoing traffic, or to diff --git a/src/core/ddsi/include/dds/ddsi/q_ephash.h b/src/core/ddsi/include/dds/ddsi/q_ephash.h index 630d6ad..41fb559 100644 --- a/src/core/ddsi/include/dds/ddsi/q_ephash.h +++ b/src/core/ddsi/include/dds/ddsi/q_ephash.h @@ -13,35 +13,35 @@ #define Q_EPHASH_H #include "dds/ddsrt/hopscotch.h" +#include "dds/ddsi/q_entity.h" #if defined (__cplusplus) extern "C" { #endif struct ephash; -struct participant; -struct reader; -struct writer; -struct proxy_participant; -struct proxy_reader; -struct proxy_writer; struct ddsi_guid; - enum entity_kind { - EK_PARTICIPANT, - EK_PROXY_PARTICIPANT, - EK_WRITER, - EK_PROXY_WRITER, - EK_READER, - EK_PROXY_READER - }; -#define EK_NKINDS ((int) EK_PROXY_READER + 1) +struct match_entities_range_key { + union { + struct writer wr; + struct reader rd; + struct proxy_writer pwr; + struct proxy_reader prd; + struct entity_common e; + struct generic_proxy_endpoint gpe; + } entity; + struct dds_qos xqos; +}; struct ephash_enum { - struct ddsrt_chh_iter it; + struct ephash *gh; enum entity_kind kind; struct entity_common *cur; +#ifndef NDEBUG + vtime_t vtime; +#endif }; /* Readers & writers are both in a GUID- and in a GID-keyed table. If @@ -112,6 +112,8 @@ struct ephash_enum_proxy_writer { struct ephash_enum st; }; struct ephash_enum_proxy_reader { struct ephash_enum st; }; void ephash_enum_init (struct ephash_enum *st, const struct ephash *eh, enum entity_kind kind); +void ephash_enum_init_topic (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind, const char *topic, struct match_entities_range_key *max); +void *ephash_enum_next_max (struct ephash_enum *st, const struct match_entities_range_key *max); void *ephash_enum_next (struct ephash_enum *st); void ephash_enum_fini (struct ephash_enum *st); diff --git a/src/core/ddsi/src/ddsi_pmd.c b/src/core/ddsi/src/ddsi_pmd.c index 035a9bf..8588e8f 100644 --- a/src/core/ddsi/src/ddsi_pmd.c +++ b/src/core/ddsi/src/ddsi_pmd.c @@ -17,6 +17,7 @@ #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_lease.h" #include "dds/ddsi/q_log.h" diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index bb1430c..1c888b8 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2324,7 +2324,8 @@ static void connect_writer_with_proxy_reader_wrapper (struct entity_common *vwr, struct proxy_reader *prd = (struct proxy_reader *) vprd; assert (wr->e.kind == EK_WRITER); assert (prd->e.kind == EK_PROXY_READER); - connect_writer_with_proxy_reader(wr, prd, tnow); + assert (is_builtin_endpoint (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) == is_builtin_endpoint (prd->e.guid.entityid, prd->c.vendor)); + connect_writer_with_proxy_reader (wr, prd, tnow); } static void connect_proxy_writer_with_reader_wrapper (struct entity_common *vpwr, struct entity_common *vrd, nn_mtime_t tnow) @@ -2333,7 +2334,8 @@ static void connect_proxy_writer_with_reader_wrapper (struct entity_common *vpwr struct reader *rd = (struct reader *) vrd; assert (pwr->e.kind == EK_PROXY_WRITER); assert (rd->e.kind == EK_READER); - connect_proxy_writer_with_reader(pwr, rd, tnow); + assert (is_builtin_endpoint (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) == is_builtin_endpoint (pwr->e.guid.entityid, pwr->c.vendor)); + connect_proxy_writer_with_reader (pwr, rd, tnow); } static void connect_writer_with_reader_wrapper (struct entity_common *vwr, struct entity_common *vrd, nn_mtime_t tnow) @@ -2342,17 +2344,19 @@ static void connect_writer_with_reader_wrapper (struct entity_common *vwr, struc struct reader *rd = (struct reader *) vrd; assert (wr->e.kind == EK_WRITER); assert (rd->e.kind == EK_READER); - connect_writer_with_reader(wr, rd, tnow); + assert (!is_builtin_endpoint (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) || is_local_orphan_endpoint (&wr->e)); + assert (!is_builtin_endpoint (rd->e.guid.entityid, NN_VENDORID_ECLIPSE)); + connect_writer_with_reader (wr, rd, tnow); } -static enum entity_kind generic_do_match_mkind (enum entity_kind kind) +static enum entity_kind generic_do_match_mkind (enum entity_kind kind, bool local) { switch (kind) { - case EK_WRITER: return EK_PROXY_READER; - case EK_READER: return EK_PROXY_WRITER; - case EK_PROXY_WRITER: return EK_READER; - case EK_PROXY_READER: return EK_WRITER; + case EK_WRITER: return local ? EK_READER : EK_PROXY_READER; + case EK_READER: return local ? EK_WRITER : EK_PROXY_WRITER; + case EK_PROXY_WRITER: assert (!local); return EK_READER; + case EK_PROXY_READER: assert (!local); return EK_WRITER; case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: assert(0); @@ -2362,88 +2366,29 @@ static enum entity_kind generic_do_match_mkind (enum entity_kind kind) return EK_WRITER; } -static enum entity_kind generic_do_local_match_mkind (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return EK_READER; - case EK_READER: return EK_WRITER; - case EK_PROXY_WRITER: - case EK_PROXY_READER: - case EK_PARTICIPANT: - case EK_PROXY_PARTICIPANT: - assert(0); - return EK_WRITER; - } - assert(0); - return EK_WRITER; -} - -static const char *generic_do_match_kindstr_us (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return "writer"; - case EK_READER: return "reader"; - case EK_PROXY_WRITER: return "proxy_writer"; - case EK_PROXY_READER: return "proxy_reader"; - case EK_PARTICIPANT: return "participant"; - case EK_PROXY_PARTICIPANT: return "proxy_participant"; - } - assert(0); - return "?"; -} - -static const char *generic_do_match_kindstr (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return "writer"; - case EK_READER: return "reader"; - case EK_PROXY_WRITER: return "proxy writer"; - case EK_PROXY_READER: return "proxy reader"; - case EK_PARTICIPANT: return "participant"; - case EK_PROXY_PARTICIPANT: return "proxy participant"; - } - assert(0); - return "?"; -} - -static const char *generic_do_match_kindabbrev (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return "wr"; - case EK_READER: return "rd"; - case EK_PROXY_WRITER: return "pwr"; - case EK_PROXY_READER: return "prd"; - case EK_PARTICIPANT: return "pp"; - case EK_PROXY_PARTICIPANT: return "proxypp"; - } - assert(0); - return "?"; -} - -static int generic_do_match_isproxy (const struct entity_common *e) -{ - return e->kind == EK_PROXY_WRITER || e->kind == EK_PROXY_READER || e->kind == EK_PROXY_PARTICIPANT; -} - -static void generic_do_match_connect (struct entity_common *e, struct entity_common *em, nn_mtime_t tnow) +static void generic_do_match_connect (struct entity_common *e, struct entity_common *em, nn_mtime_t tnow, bool local) { switch (e->kind) { case EK_WRITER: - connect_writer_with_proxy_reader_wrapper(e, em, tnow); + if (local) + connect_writer_with_reader_wrapper (e, em, tnow); + else + connect_writer_with_proxy_reader_wrapper (e, em, tnow); break; case EK_READER: - connect_proxy_writer_with_reader_wrapper(em, e, tnow); + if (local) + connect_writer_with_reader_wrapper (em, e, tnow); + else + connect_proxy_writer_with_reader_wrapper (em, e, tnow); break; case EK_PROXY_WRITER: - connect_proxy_writer_with_reader_wrapper(e, em, tnow); + assert (!local); + connect_proxy_writer_with_reader_wrapper (e, em, tnow); break; case EK_PROXY_READER: - connect_writer_with_proxy_reader_wrapper(em, e, tnow); + assert (!local); + connect_writer_with_proxy_reader_wrapper (em, e, tnow); break; case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: @@ -2451,125 +2396,118 @@ static void generic_do_match_connect (struct entity_common *e, struct entity_com } } -static void generic_do_local_match_connect (struct entity_common *e, struct entity_common *em, nn_mtime_t tnow) +static const char *entity_topic_name (const struct entity_common *e) { switch (e->kind) { case EK_WRITER: - connect_writer_with_reader_wrapper(e, em, tnow); - break; + return ((const struct writer *) e)->xqos->topic_name; case EK_READER: - connect_writer_with_reader_wrapper(em, e, tnow); - break; + return ((const struct reader *) e)->xqos->topic_name; case EK_PROXY_WRITER: case EK_PROXY_READER: + return ((const struct generic_proxy_endpoint *) e)->c.xqos->topic_name; case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: - assert(0); + assert (0); } + return ""; } -static void generic_do_match (struct entity_common *e, nn_mtime_t tnow) +static void generic_do_match (struct entity_common *e, nn_mtime_t tnow, bool local) { - struct ephash * const guid_hash = e->gv->guid_hash; - struct ephash_enum est; + static const struct { const char *full; const char *full_us; const char *abbrev; } kindstr[] = { + [EK_WRITER] = { "writer", "writer", "wr" }, + [EK_READER] = { "reader", "reader", "rd" }, + [EK_PROXY_WRITER] = { "proxy writer", "proxy_writer", "pwr" }, + [EK_PROXY_READER] = { "proxy reader", "proxy_reader", "prd" }, + [EK_PARTICIPANT] = { "participant", "participant", "pp" }, + [EK_PROXY_PARTICIPANT] = { "proxy participant", "proxy_participant", "proxypp" } + }; + + enum entity_kind mkind = generic_do_match_mkind (e->kind, local); + struct ephash const * const guid_hash = e->gv->guid_hash; + struct ephash_enum it; struct entity_common *em; - enum entity_kind mkind = generic_do_match_mkind(e->kind); - if (!is_builtin_entityid (e->guid.entityid, NN_VENDORID_ECLIPSE)) + + if (!is_builtin_entityid (e->guid.entityid, NN_VENDORID_ECLIPSE) || (local && is_local_orphan_endpoint (e))) { - EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning all %ss\n", - generic_do_match_kindstr_us (e->kind), generic_do_match_kindstr_us (mkind), - generic_do_match_kindabbrev (e->kind), PGUID (e->guid), - generic_do_match_kindstr(mkind)); + /* Non-builtins need matching on topics, the local orphan endpoints + are a bit weird because they reuse the builtin entityids but + otherwise need to be treated as normal readers */ + struct match_entities_range_key max; + const char *tp = entity_topic_name (e); + EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning all %ss%s%s\n", + kindstr[e->kind].full_us, kindstr[mkind].full_us, + kindstr[e->kind].abbrev, PGUID (e->guid), + kindstr[mkind].abbrev, + tp ? " of topic " : "", tp ? tp : ""); /* Note: we visit at least all proxies that existed when we called - init (with the -- possible -- exception of ones that were - deleted between our calling init and our reaching it while - enumerating), but we may visit a single proxy reader multiple - times. */ - ephash_enum_init (&est, guid_hash, mkind); - while ((em = ephash_enum_next (&est)) != NULL) - generic_do_match_connect(e, em, tnow); - ephash_enum_fini (&est); + init (with the -- possible -- exception of ones that were + deleted between our calling init and our reaching it while + enumerating), but we may visit a single proxy reader multiple + times. */ + ephash_enum_init_topic (&it, guid_hash, mkind, tp, &max); + while ((em = ephash_enum_next_max (&it, &max)) != NULL) + generic_do_match_connect (e, em, tnow, local); + ephash_enum_fini (&it); } - else + else if (!local) { - /* Built-ins have fixed QoS */ - ddsi_entityid_t tgt_ent = builtin_entityid_match (e->guid.entityid); - enum entity_kind pkind = generic_do_match_isproxy (e) ? EK_PARTICIPANT : EK_PROXY_PARTICIPANT; + /* Built-ins have fixed QoS and a known entity id to use, so instead of + looking for the right topic, just probe the matching GUIDs for all + (proxy) participants. Local matching never needs to look at the + discovery endpoints */ + const ddsi_entityid_t tgt_ent = builtin_entityid_match (e->guid.entityid); + const bool isproxy = (e->kind == EK_PROXY_WRITER || e->kind == EK_PROXY_READER || e->kind == EK_PROXY_PARTICIPANT); + enum entity_kind pkind = isproxy ? EK_PARTICIPANT : EK_PROXY_PARTICIPANT; EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning %sparticipants tgt=%"PRIx32"\n", - generic_do_match_kindstr_us (e->kind), generic_do_match_kindstr_us (mkind), - generic_do_match_kindabbrev (e->kind), PGUID (e->guid), - generic_do_match_isproxy (e) ? "" : "proxy ", - tgt_ent.u); + kindstr[e->kind].full_us, kindstr[mkind].full_us, + kindstr[e->kind].abbrev, PGUID (e->guid), + isproxy ? "" : "proxy ", tgt_ent.u); if (tgt_ent.u != NN_ENTITYID_UNKNOWN) { - struct entity_common *ep; - ephash_enum_init (&est, guid_hash, pkind); - while ((ep = ephash_enum_next (&est)) != NULL) + ephash_enum_init (&it, guid_hash, pkind); + while ((em = ephash_enum_next (&it)) != NULL) { - ddsi_guid_t tgt_guid; - tgt_guid.prefix = ep->guid.prefix; - tgt_guid.entityid = tgt_ent; - if ((em = ephash_lookup_guid (guid_hash, &tgt_guid, mkind)) != NULL) - generic_do_match_connect(e, em, tnow); + const ddsi_guid_t tgt_guid = { em->guid.prefix, tgt_ent }; + struct entity_common *ep; + if ((ep = ephash_lookup_guid (guid_hash, &tgt_guid, mkind)) != NULL) + generic_do_match_connect (e, ep, tnow, local); } - ephash_enum_fini (&est); + ephash_enum_fini (&it); } } } -static void generic_do_local_match (struct entity_common *e, nn_mtime_t tnow) -{ - struct ephash_enum est; - struct entity_common *em; - enum entity_kind mkind; - if (is_builtin_entityid (e->guid.entityid, NN_VENDORID_ECLIPSE) && !is_local_orphan_endpoint (e)) - /* never a need for local matches on discovery endpoints */ - return; - mkind = generic_do_local_match_mkind (e->kind); - EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning all %ss\n", - generic_do_match_kindstr_us (e->kind), generic_do_match_kindstr_us (mkind), - generic_do_match_kindabbrev (e->kind), PGUID (e->guid), - generic_do_match_kindstr(mkind)); - /* Note: we visit at least all proxies that existed when we called - init (with the -- possible -- exception of ones that were - deleted between our calling init and our reaching it while - enumerating), but we may visit a single proxy reader multiple - times. */ - ephash_enum_init (&est, e->gv->guid_hash, mkind); - while ((em = ephash_enum_next (&est)) != NULL) - generic_do_local_match_connect (e, em, tnow); - ephash_enum_fini (&est); -} - static void match_writer_with_proxy_readers (struct writer *wr, nn_mtime_t tnow) { - generic_do_match (&wr->e, tnow); + generic_do_match (&wr->e, tnow, false); } static void match_writer_with_local_readers (struct writer *wr, nn_mtime_t tnow) { - generic_do_local_match (&wr->e, tnow); + generic_do_match (&wr->e, tnow, true); } static void match_reader_with_proxy_writers (struct reader *rd, nn_mtime_t tnow) { - generic_do_match (&rd->e, tnow); + generic_do_match (&rd->e, tnow, false); } static void match_reader_with_local_writers (struct reader *rd, nn_mtime_t tnow) { - generic_do_local_match (&rd->e, tnow); + generic_do_match (&rd->e, tnow, true); } static void match_proxy_writer_with_readers (struct proxy_writer *pwr, nn_mtime_t tnow) { - generic_do_match (&pwr->e, tnow); + generic_do_match (&pwr->e, tnow, false); } static void match_proxy_reader_with_writers (struct proxy_reader *prd, nn_mtime_t tnow) { - generic_do_match(&prd->e, tnow); + generic_do_match(&prd->e, tnow, false); } /* ENDPOINT --------------------------------------------------------- */ diff --git a/src/core/ddsi/src/q_ephash.c b/src/core/ddsi/src/q_ephash.c index 0e30e0b..63a0004 100644 --- a/src/core/ddsi/src/q_ephash.c +++ b/src/core/ddsi/src/q_ephash.c @@ -11,11 +11,13 @@ */ #include #include +#include #include "dds/ddsrt/heap.h" #include "dds/ddsrt/misc.h" #include "dds/ddsrt/hopscotch.h" +#include "dds/ddsrt/avl.h" #include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_globals.h" @@ -26,6 +28,8 @@ struct ephash { struct ddsrt_chh *hash; + ddsrt_mutex_t all_entities_lock; + ddsrt_avl_tree_t all_entities; }; static const uint64_t unihashconsts[] = { @@ -35,6 +39,10 @@ static const uint64_t unihashconsts[] = { UINT64_C (16728792139623414127) }; +static int all_entities_compare (const void *va, const void *vb); +static const ddsrt_avl_treedef_t all_entities_treedef = + DDSRT_AVL_TREEDEF_INITIALIZER (offsetof (struct entity_common, all_entities_avlnode), 0, all_entities_compare, 0); + static uint32_t hash_entity_guid (const struct entity_common *c) { return @@ -62,6 +70,147 @@ static int entity_guid_eq_wrapper (const void *a, const void *b) return entity_guid_eq (a, b); } +static int all_entities_compare_isbuiltin (const struct entity_common *e, nn_vendorid_t vendor) +{ + const unsigned char *guid_bytes = (const unsigned char *) &e->guid; + if (guid_bytes[0] != 0 && guid_bytes[0] != 0xff) + return is_builtin_endpoint (e->guid.entityid, vendor); + else + { + for (size_t i = 1; i < sizeof (e->guid); i++) + if (guid_bytes[i] != guid_bytes[0]) + return is_builtin_endpoint (e->guid.entityid, vendor) && !is_local_orphan_endpoint (e); + return 0; + } +} + +static int all_entities_compare (const void *va, const void *vb) +{ + const struct entity_common *a = va; + const struct entity_common *b = vb; + const char *tp_a = ""; + const char *tp_b = ""; + int cmpres; + + if (a->kind != b->kind) + return (int) a->kind - (int) b->kind; + + switch (a->kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + break; + + case EK_WRITER: { + const struct writer *wra = va; + const struct writer *wrb = vb; + if (!all_entities_compare_isbuiltin (a, NN_VENDORID_ECLIPSE)) { + assert ((wra->xqos->present & QP_TOPIC_NAME) && wra->xqos->topic_name); + tp_a = wra->xqos->topic_name; + } + if (!all_entities_compare_isbuiltin (b, NN_VENDORID_ECLIPSE)) { + assert ((wrb->xqos->present & QP_TOPIC_NAME) && wrb->xqos->topic_name); + tp_b = wrb->xqos->topic_name; + } + break; + } + + case EK_READER: { + const struct reader *rda = va; + const struct reader *rdb = vb; + if (!all_entities_compare_isbuiltin (a, NN_VENDORID_ECLIPSE)) { + assert ((rda->xqos->present & QP_TOPIC_NAME) && rda->xqos->topic_name); + tp_a = rda->xqos->topic_name; + } + if (!all_entities_compare_isbuiltin (b, NN_VENDORID_ECLIPSE)) { + assert ((rdb->xqos->present & QP_TOPIC_NAME) && rdb->xqos->topic_name); + tp_b = rdb->xqos->topic_name; + } + break; + } + + case EK_PROXY_WRITER: + case EK_PROXY_READER: { + const struct generic_proxy_endpoint *ga = va; + const struct generic_proxy_endpoint *gb = vb; + if (!all_entities_compare_isbuiltin (a, ga->c.vendor)) { + assert ((ga->c.xqos->present & QP_TOPIC_NAME) && ga->c.xqos->topic_name); + tp_a = ga->c.xqos->topic_name; + } + if (!all_entities_compare_isbuiltin (b, gb->c.vendor)) { + assert ((gb->c.xqos->present & QP_TOPIC_NAME) && gb->c.xqos->topic_name); + tp_b = gb->c.xqos->topic_name; + } + break; + } + } + + if ((cmpres = strcmp (tp_a, tp_b)) != 0) + return cmpres; + else + return memcmp (&a->guid, &b->guid, sizeof (a->guid)); +} + +static void match_endpoint_range (enum entity_kind kind, const char *tp, struct match_entities_range_key *min, struct match_entities_range_key *max) +{ + /* looking for entities of kind KIND; initialize fake entities such that they are + valid input to all_entities_compare and that span the range of all possibly + matching endpoints. */ + min->entity.e.kind = max->entity.e.kind = kind; + memset (&min->entity.e.guid, 0x00, sizeof (min->entity.e.guid)); + memset (&max->entity.e.guid, 0xff, sizeof (max->entity.e.guid)); + min->xqos.present = max->xqos.present = QP_TOPIC_NAME; + min->xqos.topic_name = max->xqos.topic_name = (char *) tp; + switch (kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + break; + case EK_WRITER: + min->entity.wr.xqos = &min->xqos; + max->entity.wr.xqos = &max->xqos; + break; + case EK_READER: + min->entity.rd.xqos = &min->xqos; + max->entity.rd.xqos = &max->xqos; + break; + case EK_PROXY_WRITER: + case EK_PROXY_READER: + min->entity.gpe.c.vendor = max->entity.gpe.c.vendor = NN_VENDORID_ECLIPSE; + min->entity.gpe.c.xqos = &min->xqos; + max->entity.gpe.c.xqos = &max->xqos; + break; + } +} + +static void match_entity_kind_min (enum entity_kind kind, struct match_entities_range_key *min) +{ + /* looking for entities of kind KIND; initialize fake entities such that they are + valid input to all_entities_compare and that span the range of all possibly + matching endpoints. */ + min->entity.e.kind = kind; + memset (&min->entity.e.guid, 0x00, sizeof (min->entity.e.guid)); + min->xqos.present = QP_TOPIC_NAME; + min->xqos.topic_name = ""; + switch (kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + break; + case EK_WRITER: + min->entity.wr.xqos = &min->xqos; + break; + case EK_READER: + min->entity.rd.xqos = &min->xqos; + break; + case EK_PROXY_WRITER: + case EK_PROXY_READER: + min->entity.gpe.c.vendor = NN_VENDORID_ECLIPSE; + min->entity.gpe.c.xqos = &min->xqos; + break; + } +} + static void gc_buckets_cb (struct gcreq *gcreq) { void *bs = gcreq->arg; @@ -86,28 +235,50 @@ struct ephash *ephash_new (struct q_globals *gv) ddsrt_free (ephash); return NULL; } else { + ddsrt_mutex_init (&ephash->all_entities_lock); + ddsrt_avl_init (&all_entities_treedef, &ephash->all_entities); return ephash; } } void ephash_free (struct ephash *ephash) { + ddsrt_avl_free (&all_entities_treedef, &ephash->all_entities, 0); + ddsrt_mutex_destroy (&ephash->all_entities_lock); ddsrt_chh_free (ephash->hash); ephash->hash = NULL; ddsrt_free (ephash); } +static void add_to_all_entities (struct ephash *gh, struct entity_common *e) +{ + ddsrt_mutex_lock (&gh->all_entities_lock); + assert (ddsrt_avl_lookup (&all_entities_treedef, &gh->all_entities, e) == NULL); + ddsrt_avl_insert (&all_entities_treedef, &gh->all_entities, e); + ddsrt_mutex_unlock (&gh->all_entities_lock); +} + +static void remove_from_all_entities (struct ephash *gh, struct entity_common *e) +{ + ddsrt_mutex_lock (&gh->all_entities_lock); + assert (ddsrt_avl_lookup (&all_entities_treedef, &gh->all_entities, e) != NULL); + ddsrt_avl_delete (&all_entities_treedef, &gh->all_entities, e); + ddsrt_mutex_unlock (&gh->all_entities_lock); +} + static void ephash_guid_insert (struct ephash *gh, struct entity_common *e) { int x; x = ddsrt_chh_add (gh->hash, e); (void)x; assert (x); + add_to_all_entities (gh, e); } static void ephash_guid_remove (struct ephash *gh, struct entity_common *e) { int x; + remove_from_all_entities (gh, e); x = ddsrt_chh_remove (gh->hash, e); (void)x; assert (x); @@ -240,17 +411,41 @@ struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *gh, c /* Enumeration */ -static void ephash_enum_init_int (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind) +static void ephash_enum_init_minmax_int (struct ephash_enum *st, const struct ephash *gh, struct match_entities_range_key *min) { - st->kind = kind; - st->cur = ddsrt_chh_iter_first (gh->hash, &st->it); - while (st->cur && st->cur->kind != st->kind) - st->cur = ddsrt_chh_iter_next (&st->it); + /* Use a lock to protect against concurrent modification and rely on the GC not deleting + any entities while enumerating so we can rely on the (kind, topic, GUID) triple to + remain valid for looking up the next entity. With a bit of additional effort it would + be possible to allow the GC to reclaim any entities already visited, but I don't think + that additional effort is worth it. */ +#ifndef NDEBUG + assert (thread_is_awake ()); + st->vtime = ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime); +#endif + st->gh = (struct ephash *) gh; + st->kind = min->entity.e.kind; + ddsrt_mutex_lock (&st->gh->all_entities_lock); + st->cur = ddsrt_avl_lookup_succ_eq (&all_entities_treedef, &st->gh->all_entities, min); + ddsrt_mutex_unlock (&st->gh->all_entities_lock); +} + +void ephash_enum_init_topic (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind, const char *topic, struct match_entities_range_key *max) +{ + assert (kind == EK_READER || kind == EK_WRITER || kind == EK_PROXY_READER || kind == EK_PROXY_WRITER); + struct match_entities_range_key min; + match_endpoint_range (kind, topic, &min, max); + ephash_enum_init_minmax_int (st, gh, &min); + if (st->cur && all_entities_compare (st->cur, &max->entity) > 0) + st->cur = NULL; } void ephash_enum_init (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind) { - ephash_enum_init_int(st, gh, kind); + struct match_entities_range_key min; + match_entity_kind_min (kind, &min); + ephash_enum_init_minmax_int (st, gh, &min); + if (st->cur && st->cur->kind != st->kind) + st->cur = NULL; } void ephash_enum_writer_init (struct ephash_enum_writer *st, const struct ephash *gh) @@ -285,16 +480,31 @@ void ephash_enum_proxy_participant_init (struct ephash_enum_proxy_participant *s void *ephash_enum_next (struct ephash_enum *st) { + /* st->cur can not have been freed yet, but it may have been removed from the index */ + assert (ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime) == st->vtime); void *res = st->cur; if (st->cur) { - st->cur = ddsrt_chh_iter_next (&st->it); - while (st->cur && st->cur->kind != st->kind) - st->cur = ddsrt_chh_iter_next (&st->it); + ddsrt_mutex_lock (&st->gh->all_entities_lock); + st->cur = ddsrt_avl_lookup_succ (&all_entities_treedef, &st->gh->all_entities, st->cur); + ddsrt_mutex_unlock (&st->gh->all_entities_lock); + if (st->cur && st->cur->kind != st->kind) + st->cur = NULL; } return res; } +void *ephash_enum_next_max (struct ephash_enum *st, const struct match_entities_range_key *max) +{ + void *res = ephash_enum_next (st); + + /* max may only make the bounds tighter */ + assert (max->entity.e.kind == st->kind); + if (st->cur && all_entities_compare (st->cur, &max->entity) > 0) + st->cur = NULL; + return res; +} + struct writer *ephash_enum_writer_next (struct ephash_enum_writer *st) { assert (offsetof (struct writer, e) == 0); @@ -333,7 +543,8 @@ struct proxy_participant *ephash_enum_proxy_participant_next (struct ephash_enum void ephash_enum_fini (struct ephash_enum *st) { - DDSRT_UNUSED_ARG(st); + assert (ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime) == st->vtime); + (void) st; } void ephash_enum_writer_fini (struct ephash_enum_writer *st) diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 6d2a69a..b9c65b2 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -18,6 +18,7 @@ #include "dds/ddsrt/avl.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_addrset.h" #include "dds/ddsi/q_xmsg.h" #include "dds/ddsi/q_bswap.h"