From 2dfde4bf0dd7450ca67baf9aac7a0068b89192bf Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 9 Dec 2019 15:42:33 +0100 Subject: [PATCH] Only touch endpoints of same topic when matching Maintain an index of all entities on (kind, topic, GUID) in addition to the concurrent hash table on GUID and use that when looking for matching entities. For endpoint matching, restrict the scan to readers/writers of the right topic, for regular iterators, restrict it to the range of the correct entity kind. Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_builtin.c | 2 + src/core/ddsc/src/dds_matched.c | 1 + src/core/ddsc/src/dds_participant.c | 1 + src/core/ddsc/src/dds_reader.c | 1 + src/core/ddsc/src/dds_serdata_builtintopic.c | 1 + src/core/ddsc/src/dds_topic.c | 1 + src/core/ddsc/src/dds_writer.c | 1 + src/core/ddsc/tests/liveliness.c | 1 + src/core/ddsi/include/dds/ddsi/q_entity.h | 18 +- src/core/ddsi/include/dds/ddsi/q_ephash.h | 34 +-- src/core/ddsi/src/ddsi_pmd.c | 1 + src/core/ddsi/src/q_entity.c | 238 +++++++------------ src/core/ddsi/src/q_ephash.c | 231 +++++++++++++++++- src/core/ddsi/src/q_transmit.c | 1 + 14 files changed, 355 insertions(+), 177 deletions(-) diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index 5b1c71c..069c9d7 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -257,10 +257,12 @@ void dds__builtin_init (struct dds_domain *dom) dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription", &dom->gv); dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication", &dom->gv); + thread_state_awake (lookup_thread_state (), &dom->gv); const struct ephash *gh = dom->gv.guid_hash; dom->builtintopic_writer_participant = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dom->builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT, gh)); dom->builtintopic_writer_publications = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dom->builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER, gh)); dom->builtintopic_writer_subscriptions = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dom->builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER, gh)); + thread_state_asleep (lookup_thread_state ()); dds_delete_qos (qos); } diff --git a/src/core/ddsc/src/dds_matched.c b/src/core/ddsc/src/dds_matched.c index 0fe4006..5a35173 100644 --- a/src/core/ddsc/src/dds_matched.c +++ b/src/core/ddsc/src/dds_matched.c @@ -17,6 +17,7 @@ #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_bswap.h" #include "dds__writer.h" diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 31b5c82..9f87daf 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -18,6 +18,7 @@ #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_plist.h" #include "dds/ddsi/q_globals.h" +#include "dds/ddsi/q_ephash.h" #include "dds/version.h" #include "dds__init.h" #include "dds__domain.h" diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index c969768..430f84b 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -28,6 +28,7 @@ #include "dds/ddsi/q_globals.h" #include "dds__builtin.h" #include "dds/ddsi/ddsi_sertopic.h" +#include "dds/ddsi/q_ephash.h" DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_reader) diff --git a/src/core/ddsc/src/dds_serdata_builtintopic.c b/src/core/ddsc/src/dds_serdata_builtintopic.c index 4bedb15..64b4574 100644 --- a/src/core/ddsc/src/dds_serdata_builtintopic.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -24,6 +24,7 @@ #include "dds__serdata_builtintopic.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" static const uint64_t unihashconsts[] = { UINT64_C (16292676669999574021), diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 4630336..58293b4 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -25,6 +25,7 @@ #include "dds__get_status.h" #include "dds__qos.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/q_ddsi_discovery.h" diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index f0c0bc4..533ac5d 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -19,6 +19,7 @@ #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_xmsg.h" +#include "dds/ddsi/q_ephash.h" #include "dds__writer.h" #include "dds__listener.h" #include "dds__init.h" diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c index 32c5b3a..c2c6963 100644 --- a/src/core/ddsc/tests/liveliness.c +++ b/src/core/ddsc/tests/liveliness.c @@ -20,6 +20,7 @@ #include "dds/version.h" #include "dds__entity.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsrt/cdtors.h" #include "dds/ddsrt/misc.h" #include "dds/ddsrt/process.h" diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index c899296..c73fdf6 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -20,7 +20,6 @@ #include "dds/ddsi/q_rtps.h" #include "dds/ddsi/q_protocol.h" #include "dds/ddsi/q_lat_estim.h" -#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_hbcontrol.h" #include "dds/ddsi/q_feature_check.h" #include "dds/ddsi/q_inverse_uint32_set.h" @@ -48,6 +47,15 @@ struct proxy_group; struct proxy_endpoint_common; typedef void (*ddsi2direct_directread_cb_t) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg); +enum entity_kind { + EK_PARTICIPANT, + EK_PROXY_PARTICIPANT, + EK_WRITER, + EK_PROXY_WRITER, + EK_READER, + EK_PROXY_READER +}; + /* Liveliness changed is more complicated than just add/remove. Encode the event in status_cb_data_t::extra and ignore status_cb_data_t::add */ enum liveliness_changed_data_extra { @@ -157,6 +165,7 @@ struct entity_common { ddsrt_mutex_t lock; bool onlylocal; struct q_globals *gv; + ddsrt_avl_node_t all_entities_avlnode; /* QoS changes always lock the entity itself, and additionally (and within the scope of the entity lock) acquire qos_lock @@ -369,6 +378,11 @@ struct proxy_endpoint_common seqno_t seq; /* sequence number of most recent SEDP message */ }; +struct generic_proxy_endpoint { + struct entity_common e; + struct proxy_endpoint_common c; +}; + struct proxy_writer { struct entity_common e; struct proxy_endpoint_common c; @@ -671,6 +685,8 @@ int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify); void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify); int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); + +struct ephash; void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit); /* Call this to empty all address sets of all writers to stop all outgoing traffic, or to diff --git a/src/core/ddsi/include/dds/ddsi/q_ephash.h b/src/core/ddsi/include/dds/ddsi/q_ephash.h index 630d6ad..41fb559 100644 --- a/src/core/ddsi/include/dds/ddsi/q_ephash.h +++ b/src/core/ddsi/include/dds/ddsi/q_ephash.h @@ -13,35 +13,35 @@ #define Q_EPHASH_H #include "dds/ddsrt/hopscotch.h" +#include "dds/ddsi/q_entity.h" #if defined (__cplusplus) extern "C" { #endif struct ephash; -struct participant; -struct reader; -struct writer; -struct proxy_participant; -struct proxy_reader; -struct proxy_writer; struct ddsi_guid; - enum entity_kind { - EK_PARTICIPANT, - EK_PROXY_PARTICIPANT, - EK_WRITER, - EK_PROXY_WRITER, - EK_READER, - EK_PROXY_READER - }; -#define EK_NKINDS ((int) EK_PROXY_READER + 1) +struct match_entities_range_key { + union { + struct writer wr; + struct reader rd; + struct proxy_writer pwr; + struct proxy_reader prd; + struct entity_common e; + struct generic_proxy_endpoint gpe; + } entity; + struct dds_qos xqos; +}; struct ephash_enum { - struct ddsrt_chh_iter it; + struct ephash *gh; enum entity_kind kind; struct entity_common *cur; +#ifndef NDEBUG + vtime_t vtime; +#endif }; /* Readers & writers are both in a GUID- and in a GID-keyed table. If @@ -112,6 +112,8 @@ struct ephash_enum_proxy_writer { struct ephash_enum st; }; struct ephash_enum_proxy_reader { struct ephash_enum st; }; void ephash_enum_init (struct ephash_enum *st, const struct ephash *eh, enum entity_kind kind); +void ephash_enum_init_topic (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind, const char *topic, struct match_entities_range_key *max); +void *ephash_enum_next_max (struct ephash_enum *st, const struct match_entities_range_key *max); void *ephash_enum_next (struct ephash_enum *st); void ephash_enum_fini (struct ephash_enum *st); diff --git a/src/core/ddsi/src/ddsi_pmd.c b/src/core/ddsi/src/ddsi_pmd.c index 035a9bf..8588e8f 100644 --- a/src/core/ddsi/src/ddsi_pmd.c +++ b/src/core/ddsi/src/ddsi_pmd.c @@ -17,6 +17,7 @@ #include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_lease.h" #include "dds/ddsi/q_log.h" diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index bb1430c..1c888b8 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2324,7 +2324,8 @@ static void connect_writer_with_proxy_reader_wrapper (struct entity_common *vwr, struct proxy_reader *prd = (struct proxy_reader *) vprd; assert (wr->e.kind == EK_WRITER); assert (prd->e.kind == EK_PROXY_READER); - connect_writer_with_proxy_reader(wr, prd, tnow); + assert (is_builtin_endpoint (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) == is_builtin_endpoint (prd->e.guid.entityid, prd->c.vendor)); + connect_writer_with_proxy_reader (wr, prd, tnow); } static void connect_proxy_writer_with_reader_wrapper (struct entity_common *vpwr, struct entity_common *vrd, nn_mtime_t tnow) @@ -2333,7 +2334,8 @@ static void connect_proxy_writer_with_reader_wrapper (struct entity_common *vpwr struct reader *rd = (struct reader *) vrd; assert (pwr->e.kind == EK_PROXY_WRITER); assert (rd->e.kind == EK_READER); - connect_proxy_writer_with_reader(pwr, rd, tnow); + assert (is_builtin_endpoint (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) == is_builtin_endpoint (pwr->e.guid.entityid, pwr->c.vendor)); + connect_proxy_writer_with_reader (pwr, rd, tnow); } static void connect_writer_with_reader_wrapper (struct entity_common *vwr, struct entity_common *vrd, nn_mtime_t tnow) @@ -2342,17 +2344,19 @@ static void connect_writer_with_reader_wrapper (struct entity_common *vwr, struc struct reader *rd = (struct reader *) vrd; assert (wr->e.kind == EK_WRITER); assert (rd->e.kind == EK_READER); - connect_writer_with_reader(wr, rd, tnow); + assert (!is_builtin_endpoint (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) || is_local_orphan_endpoint (&wr->e)); + assert (!is_builtin_endpoint (rd->e.guid.entityid, NN_VENDORID_ECLIPSE)); + connect_writer_with_reader (wr, rd, tnow); } -static enum entity_kind generic_do_match_mkind (enum entity_kind kind) +static enum entity_kind generic_do_match_mkind (enum entity_kind kind, bool local) { switch (kind) { - case EK_WRITER: return EK_PROXY_READER; - case EK_READER: return EK_PROXY_WRITER; - case EK_PROXY_WRITER: return EK_READER; - case EK_PROXY_READER: return EK_WRITER; + case EK_WRITER: return local ? EK_READER : EK_PROXY_READER; + case EK_READER: return local ? EK_WRITER : EK_PROXY_WRITER; + case EK_PROXY_WRITER: assert (!local); return EK_READER; + case EK_PROXY_READER: assert (!local); return EK_WRITER; case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: assert(0); @@ -2362,88 +2366,29 @@ static enum entity_kind generic_do_match_mkind (enum entity_kind kind) return EK_WRITER; } -static enum entity_kind generic_do_local_match_mkind (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return EK_READER; - case EK_READER: return EK_WRITER; - case EK_PROXY_WRITER: - case EK_PROXY_READER: - case EK_PARTICIPANT: - case EK_PROXY_PARTICIPANT: - assert(0); - return EK_WRITER; - } - assert(0); - return EK_WRITER; -} - -static const char *generic_do_match_kindstr_us (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return "writer"; - case EK_READER: return "reader"; - case EK_PROXY_WRITER: return "proxy_writer"; - case EK_PROXY_READER: return "proxy_reader"; - case EK_PARTICIPANT: return "participant"; - case EK_PROXY_PARTICIPANT: return "proxy_participant"; - } - assert(0); - return "?"; -} - -static const char *generic_do_match_kindstr (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return "writer"; - case EK_READER: return "reader"; - case EK_PROXY_WRITER: return "proxy writer"; - case EK_PROXY_READER: return "proxy reader"; - case EK_PARTICIPANT: return "participant"; - case EK_PROXY_PARTICIPANT: return "proxy participant"; - } - assert(0); - return "?"; -} - -static const char *generic_do_match_kindabbrev (enum entity_kind kind) -{ - switch (kind) - { - case EK_WRITER: return "wr"; - case EK_READER: return "rd"; - case EK_PROXY_WRITER: return "pwr"; - case EK_PROXY_READER: return "prd"; - case EK_PARTICIPANT: return "pp"; - case EK_PROXY_PARTICIPANT: return "proxypp"; - } - assert(0); - return "?"; -} - -static int generic_do_match_isproxy (const struct entity_common *e) -{ - return e->kind == EK_PROXY_WRITER || e->kind == EK_PROXY_READER || e->kind == EK_PROXY_PARTICIPANT; -} - -static void generic_do_match_connect (struct entity_common *e, struct entity_common *em, nn_mtime_t tnow) +static void generic_do_match_connect (struct entity_common *e, struct entity_common *em, nn_mtime_t tnow, bool local) { switch (e->kind) { case EK_WRITER: - connect_writer_with_proxy_reader_wrapper(e, em, tnow); + if (local) + connect_writer_with_reader_wrapper (e, em, tnow); + else + connect_writer_with_proxy_reader_wrapper (e, em, tnow); break; case EK_READER: - connect_proxy_writer_with_reader_wrapper(em, e, tnow); + if (local) + connect_writer_with_reader_wrapper (em, e, tnow); + else + connect_proxy_writer_with_reader_wrapper (em, e, tnow); break; case EK_PROXY_WRITER: - connect_proxy_writer_with_reader_wrapper(e, em, tnow); + assert (!local); + connect_proxy_writer_with_reader_wrapper (e, em, tnow); break; case EK_PROXY_READER: - connect_writer_with_proxy_reader_wrapper(em, e, tnow); + assert (!local); + connect_writer_with_proxy_reader_wrapper (em, e, tnow); break; case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: @@ -2451,125 +2396,118 @@ static void generic_do_match_connect (struct entity_common *e, struct entity_com } } -static void generic_do_local_match_connect (struct entity_common *e, struct entity_common *em, nn_mtime_t tnow) +static const char *entity_topic_name (const struct entity_common *e) { switch (e->kind) { case EK_WRITER: - connect_writer_with_reader_wrapper(e, em, tnow); - break; + return ((const struct writer *) e)->xqos->topic_name; case EK_READER: - connect_writer_with_reader_wrapper(em, e, tnow); - break; + return ((const struct reader *) e)->xqos->topic_name; case EK_PROXY_WRITER: case EK_PROXY_READER: + return ((const struct generic_proxy_endpoint *) e)->c.xqos->topic_name; case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: - assert(0); + assert (0); } + return ""; } -static void generic_do_match (struct entity_common *e, nn_mtime_t tnow) +static void generic_do_match (struct entity_common *e, nn_mtime_t tnow, bool local) { - struct ephash * const guid_hash = e->gv->guid_hash; - struct ephash_enum est; + static const struct { const char *full; const char *full_us; const char *abbrev; } kindstr[] = { + [EK_WRITER] = { "writer", "writer", "wr" }, + [EK_READER] = { "reader", "reader", "rd" }, + [EK_PROXY_WRITER] = { "proxy writer", "proxy_writer", "pwr" }, + [EK_PROXY_READER] = { "proxy reader", "proxy_reader", "prd" }, + [EK_PARTICIPANT] = { "participant", "participant", "pp" }, + [EK_PROXY_PARTICIPANT] = { "proxy participant", "proxy_participant", "proxypp" } + }; + + enum entity_kind mkind = generic_do_match_mkind (e->kind, local); + struct ephash const * const guid_hash = e->gv->guid_hash; + struct ephash_enum it; struct entity_common *em; - enum entity_kind mkind = generic_do_match_mkind(e->kind); - if (!is_builtin_entityid (e->guid.entityid, NN_VENDORID_ECLIPSE)) + + if (!is_builtin_entityid (e->guid.entityid, NN_VENDORID_ECLIPSE) || (local && is_local_orphan_endpoint (e))) { - EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning all %ss\n", - generic_do_match_kindstr_us (e->kind), generic_do_match_kindstr_us (mkind), - generic_do_match_kindabbrev (e->kind), PGUID (e->guid), - generic_do_match_kindstr(mkind)); + /* Non-builtins need matching on topics, the local orphan endpoints + are a bit weird because they reuse the builtin entityids but + otherwise need to be treated as normal readers */ + struct match_entities_range_key max; + const char *tp = entity_topic_name (e); + EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning all %ss%s%s\n", + kindstr[e->kind].full_us, kindstr[mkind].full_us, + kindstr[e->kind].abbrev, PGUID (e->guid), + kindstr[mkind].abbrev, + tp ? " of topic " : "", tp ? tp : ""); /* Note: we visit at least all proxies that existed when we called - init (with the -- possible -- exception of ones that were - deleted between our calling init and our reaching it while - enumerating), but we may visit a single proxy reader multiple - times. */ - ephash_enum_init (&est, guid_hash, mkind); - while ((em = ephash_enum_next (&est)) != NULL) - generic_do_match_connect(e, em, tnow); - ephash_enum_fini (&est); + init (with the -- possible -- exception of ones that were + deleted between our calling init and our reaching it while + enumerating), but we may visit a single proxy reader multiple + times. */ + ephash_enum_init_topic (&it, guid_hash, mkind, tp, &max); + while ((em = ephash_enum_next_max (&it, &max)) != NULL) + generic_do_match_connect (e, em, tnow, local); + ephash_enum_fini (&it); } - else + else if (!local) { - /* Built-ins have fixed QoS */ - ddsi_entityid_t tgt_ent = builtin_entityid_match (e->guid.entityid); - enum entity_kind pkind = generic_do_match_isproxy (e) ? EK_PARTICIPANT : EK_PROXY_PARTICIPANT; + /* Built-ins have fixed QoS and a known entity id to use, so instead of + looking for the right topic, just probe the matching GUIDs for all + (proxy) participants. Local matching never needs to look at the + discovery endpoints */ + const ddsi_entityid_t tgt_ent = builtin_entityid_match (e->guid.entityid); + const bool isproxy = (e->kind == EK_PROXY_WRITER || e->kind == EK_PROXY_READER || e->kind == EK_PROXY_PARTICIPANT); + enum entity_kind pkind = isproxy ? EK_PARTICIPANT : EK_PROXY_PARTICIPANT; EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning %sparticipants tgt=%"PRIx32"\n", - generic_do_match_kindstr_us (e->kind), generic_do_match_kindstr_us (mkind), - generic_do_match_kindabbrev (e->kind), PGUID (e->guid), - generic_do_match_isproxy (e) ? "" : "proxy ", - tgt_ent.u); + kindstr[e->kind].full_us, kindstr[mkind].full_us, + kindstr[e->kind].abbrev, PGUID (e->guid), + isproxy ? "" : "proxy ", tgt_ent.u); if (tgt_ent.u != NN_ENTITYID_UNKNOWN) { - struct entity_common *ep; - ephash_enum_init (&est, guid_hash, pkind); - while ((ep = ephash_enum_next (&est)) != NULL) + ephash_enum_init (&it, guid_hash, pkind); + while ((em = ephash_enum_next (&it)) != NULL) { - ddsi_guid_t tgt_guid; - tgt_guid.prefix = ep->guid.prefix; - tgt_guid.entityid = tgt_ent; - if ((em = ephash_lookup_guid (guid_hash, &tgt_guid, mkind)) != NULL) - generic_do_match_connect(e, em, tnow); + const ddsi_guid_t tgt_guid = { em->guid.prefix, tgt_ent }; + struct entity_common *ep; + if ((ep = ephash_lookup_guid (guid_hash, &tgt_guid, mkind)) != NULL) + generic_do_match_connect (e, ep, tnow, local); } - ephash_enum_fini (&est); + ephash_enum_fini (&it); } } } -static void generic_do_local_match (struct entity_common *e, nn_mtime_t tnow) -{ - struct ephash_enum est; - struct entity_common *em; - enum entity_kind mkind; - if (is_builtin_entityid (e->guid.entityid, NN_VENDORID_ECLIPSE) && !is_local_orphan_endpoint (e)) - /* never a need for local matches on discovery endpoints */ - return; - mkind = generic_do_local_match_mkind (e->kind); - EELOGDISC (e, "match_%s_with_%ss(%s "PGUIDFMT") scanning all %ss\n", - generic_do_match_kindstr_us (e->kind), generic_do_match_kindstr_us (mkind), - generic_do_match_kindabbrev (e->kind), PGUID (e->guid), - generic_do_match_kindstr(mkind)); - /* Note: we visit at least all proxies that existed when we called - init (with the -- possible -- exception of ones that were - deleted between our calling init and our reaching it while - enumerating), but we may visit a single proxy reader multiple - times. */ - ephash_enum_init (&est, e->gv->guid_hash, mkind); - while ((em = ephash_enum_next (&est)) != NULL) - generic_do_local_match_connect (e, em, tnow); - ephash_enum_fini (&est); -} - static void match_writer_with_proxy_readers (struct writer *wr, nn_mtime_t tnow) { - generic_do_match (&wr->e, tnow); + generic_do_match (&wr->e, tnow, false); } static void match_writer_with_local_readers (struct writer *wr, nn_mtime_t tnow) { - generic_do_local_match (&wr->e, tnow); + generic_do_match (&wr->e, tnow, true); } static void match_reader_with_proxy_writers (struct reader *rd, nn_mtime_t tnow) { - generic_do_match (&rd->e, tnow); + generic_do_match (&rd->e, tnow, false); } static void match_reader_with_local_writers (struct reader *rd, nn_mtime_t tnow) { - generic_do_local_match (&rd->e, tnow); + generic_do_match (&rd->e, tnow, true); } static void match_proxy_writer_with_readers (struct proxy_writer *pwr, nn_mtime_t tnow) { - generic_do_match (&pwr->e, tnow); + generic_do_match (&pwr->e, tnow, false); } static void match_proxy_reader_with_writers (struct proxy_reader *prd, nn_mtime_t tnow) { - generic_do_match(&prd->e, tnow); + generic_do_match(&prd->e, tnow, false); } /* ENDPOINT --------------------------------------------------------- */ diff --git a/src/core/ddsi/src/q_ephash.c b/src/core/ddsi/src/q_ephash.c index 0e30e0b..63a0004 100644 --- a/src/core/ddsi/src/q_ephash.c +++ b/src/core/ddsi/src/q_ephash.c @@ -11,11 +11,13 @@ */ #include #include +#include #include "dds/ddsrt/heap.h" #include "dds/ddsrt/misc.h" #include "dds/ddsrt/hopscotch.h" +#include "dds/ddsrt/avl.h" #include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_globals.h" @@ -26,6 +28,8 @@ struct ephash { struct ddsrt_chh *hash; + ddsrt_mutex_t all_entities_lock; + ddsrt_avl_tree_t all_entities; }; static const uint64_t unihashconsts[] = { @@ -35,6 +39,10 @@ static const uint64_t unihashconsts[] = { UINT64_C (16728792139623414127) }; +static int all_entities_compare (const void *va, const void *vb); +static const ddsrt_avl_treedef_t all_entities_treedef = + DDSRT_AVL_TREEDEF_INITIALIZER (offsetof (struct entity_common, all_entities_avlnode), 0, all_entities_compare, 0); + static uint32_t hash_entity_guid (const struct entity_common *c) { return @@ -62,6 +70,147 @@ static int entity_guid_eq_wrapper (const void *a, const void *b) return entity_guid_eq (a, b); } +static int all_entities_compare_isbuiltin (const struct entity_common *e, nn_vendorid_t vendor) +{ + const unsigned char *guid_bytes = (const unsigned char *) &e->guid; + if (guid_bytes[0] != 0 && guid_bytes[0] != 0xff) + return is_builtin_endpoint (e->guid.entityid, vendor); + else + { + for (size_t i = 1; i < sizeof (e->guid); i++) + if (guid_bytes[i] != guid_bytes[0]) + return is_builtin_endpoint (e->guid.entityid, vendor) && !is_local_orphan_endpoint (e); + return 0; + } +} + +static int all_entities_compare (const void *va, const void *vb) +{ + const struct entity_common *a = va; + const struct entity_common *b = vb; + const char *tp_a = ""; + const char *tp_b = ""; + int cmpres; + + if (a->kind != b->kind) + return (int) a->kind - (int) b->kind; + + switch (a->kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + break; + + case EK_WRITER: { + const struct writer *wra = va; + const struct writer *wrb = vb; + if (!all_entities_compare_isbuiltin (a, NN_VENDORID_ECLIPSE)) { + assert ((wra->xqos->present & QP_TOPIC_NAME) && wra->xqos->topic_name); + tp_a = wra->xqos->topic_name; + } + if (!all_entities_compare_isbuiltin (b, NN_VENDORID_ECLIPSE)) { + assert ((wrb->xqos->present & QP_TOPIC_NAME) && wrb->xqos->topic_name); + tp_b = wrb->xqos->topic_name; + } + break; + } + + case EK_READER: { + const struct reader *rda = va; + const struct reader *rdb = vb; + if (!all_entities_compare_isbuiltin (a, NN_VENDORID_ECLIPSE)) { + assert ((rda->xqos->present & QP_TOPIC_NAME) && rda->xqos->topic_name); + tp_a = rda->xqos->topic_name; + } + if (!all_entities_compare_isbuiltin (b, NN_VENDORID_ECLIPSE)) { + assert ((rdb->xqos->present & QP_TOPIC_NAME) && rdb->xqos->topic_name); + tp_b = rdb->xqos->topic_name; + } + break; + } + + case EK_PROXY_WRITER: + case EK_PROXY_READER: { + const struct generic_proxy_endpoint *ga = va; + const struct generic_proxy_endpoint *gb = vb; + if (!all_entities_compare_isbuiltin (a, ga->c.vendor)) { + assert ((ga->c.xqos->present & QP_TOPIC_NAME) && ga->c.xqos->topic_name); + tp_a = ga->c.xqos->topic_name; + } + if (!all_entities_compare_isbuiltin (b, gb->c.vendor)) { + assert ((gb->c.xqos->present & QP_TOPIC_NAME) && gb->c.xqos->topic_name); + tp_b = gb->c.xqos->topic_name; + } + break; + } + } + + if ((cmpres = strcmp (tp_a, tp_b)) != 0) + return cmpres; + else + return memcmp (&a->guid, &b->guid, sizeof (a->guid)); +} + +static void match_endpoint_range (enum entity_kind kind, const char *tp, struct match_entities_range_key *min, struct match_entities_range_key *max) +{ + /* looking for entities of kind KIND; initialize fake entities such that they are + valid input to all_entities_compare and that span the range of all possibly + matching endpoints. */ + min->entity.e.kind = max->entity.e.kind = kind; + memset (&min->entity.e.guid, 0x00, sizeof (min->entity.e.guid)); + memset (&max->entity.e.guid, 0xff, sizeof (max->entity.e.guid)); + min->xqos.present = max->xqos.present = QP_TOPIC_NAME; + min->xqos.topic_name = max->xqos.topic_name = (char *) tp; + switch (kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + break; + case EK_WRITER: + min->entity.wr.xqos = &min->xqos; + max->entity.wr.xqos = &max->xqos; + break; + case EK_READER: + min->entity.rd.xqos = &min->xqos; + max->entity.rd.xqos = &max->xqos; + break; + case EK_PROXY_WRITER: + case EK_PROXY_READER: + min->entity.gpe.c.vendor = max->entity.gpe.c.vendor = NN_VENDORID_ECLIPSE; + min->entity.gpe.c.xqos = &min->xqos; + max->entity.gpe.c.xqos = &max->xqos; + break; + } +} + +static void match_entity_kind_min (enum entity_kind kind, struct match_entities_range_key *min) +{ + /* looking for entities of kind KIND; initialize fake entities such that they are + valid input to all_entities_compare and that span the range of all possibly + matching endpoints. */ + min->entity.e.kind = kind; + memset (&min->entity.e.guid, 0x00, sizeof (min->entity.e.guid)); + min->xqos.present = QP_TOPIC_NAME; + min->xqos.topic_name = ""; + switch (kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + break; + case EK_WRITER: + min->entity.wr.xqos = &min->xqos; + break; + case EK_READER: + min->entity.rd.xqos = &min->xqos; + break; + case EK_PROXY_WRITER: + case EK_PROXY_READER: + min->entity.gpe.c.vendor = NN_VENDORID_ECLIPSE; + min->entity.gpe.c.xqos = &min->xqos; + break; + } +} + static void gc_buckets_cb (struct gcreq *gcreq) { void *bs = gcreq->arg; @@ -86,28 +235,50 @@ struct ephash *ephash_new (struct q_globals *gv) ddsrt_free (ephash); return NULL; } else { + ddsrt_mutex_init (&ephash->all_entities_lock); + ddsrt_avl_init (&all_entities_treedef, &ephash->all_entities); return ephash; } } void ephash_free (struct ephash *ephash) { + ddsrt_avl_free (&all_entities_treedef, &ephash->all_entities, 0); + ddsrt_mutex_destroy (&ephash->all_entities_lock); ddsrt_chh_free (ephash->hash); ephash->hash = NULL; ddsrt_free (ephash); } +static void add_to_all_entities (struct ephash *gh, struct entity_common *e) +{ + ddsrt_mutex_lock (&gh->all_entities_lock); + assert (ddsrt_avl_lookup (&all_entities_treedef, &gh->all_entities, e) == NULL); + ddsrt_avl_insert (&all_entities_treedef, &gh->all_entities, e); + ddsrt_mutex_unlock (&gh->all_entities_lock); +} + +static void remove_from_all_entities (struct ephash *gh, struct entity_common *e) +{ + ddsrt_mutex_lock (&gh->all_entities_lock); + assert (ddsrt_avl_lookup (&all_entities_treedef, &gh->all_entities, e) != NULL); + ddsrt_avl_delete (&all_entities_treedef, &gh->all_entities, e); + ddsrt_mutex_unlock (&gh->all_entities_lock); +} + static void ephash_guid_insert (struct ephash *gh, struct entity_common *e) { int x; x = ddsrt_chh_add (gh->hash, e); (void)x; assert (x); + add_to_all_entities (gh, e); } static void ephash_guid_remove (struct ephash *gh, struct entity_common *e) { int x; + remove_from_all_entities (gh, e); x = ddsrt_chh_remove (gh->hash, e); (void)x; assert (x); @@ -240,17 +411,41 @@ struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *gh, c /* Enumeration */ -static void ephash_enum_init_int (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind) +static void ephash_enum_init_minmax_int (struct ephash_enum *st, const struct ephash *gh, struct match_entities_range_key *min) { - st->kind = kind; - st->cur = ddsrt_chh_iter_first (gh->hash, &st->it); - while (st->cur && st->cur->kind != st->kind) - st->cur = ddsrt_chh_iter_next (&st->it); + /* Use a lock to protect against concurrent modification and rely on the GC not deleting + any entities while enumerating so we can rely on the (kind, topic, GUID) triple to + remain valid for looking up the next entity. With a bit of additional effort it would + be possible to allow the GC to reclaim any entities already visited, but I don't think + that additional effort is worth it. */ +#ifndef NDEBUG + assert (thread_is_awake ()); + st->vtime = ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime); +#endif + st->gh = (struct ephash *) gh; + st->kind = min->entity.e.kind; + ddsrt_mutex_lock (&st->gh->all_entities_lock); + st->cur = ddsrt_avl_lookup_succ_eq (&all_entities_treedef, &st->gh->all_entities, min); + ddsrt_mutex_unlock (&st->gh->all_entities_lock); +} + +void ephash_enum_init_topic (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind, const char *topic, struct match_entities_range_key *max) +{ + assert (kind == EK_READER || kind == EK_WRITER || kind == EK_PROXY_READER || kind == EK_PROXY_WRITER); + struct match_entities_range_key min; + match_endpoint_range (kind, topic, &min, max); + ephash_enum_init_minmax_int (st, gh, &min); + if (st->cur && all_entities_compare (st->cur, &max->entity) > 0) + st->cur = NULL; } void ephash_enum_init (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind) { - ephash_enum_init_int(st, gh, kind); + struct match_entities_range_key min; + match_entity_kind_min (kind, &min); + ephash_enum_init_minmax_int (st, gh, &min); + if (st->cur && st->cur->kind != st->kind) + st->cur = NULL; } void ephash_enum_writer_init (struct ephash_enum_writer *st, const struct ephash *gh) @@ -285,16 +480,31 @@ void ephash_enum_proxy_participant_init (struct ephash_enum_proxy_participant *s void *ephash_enum_next (struct ephash_enum *st) { + /* st->cur can not have been freed yet, but it may have been removed from the index */ + assert (ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime) == st->vtime); void *res = st->cur; if (st->cur) { - st->cur = ddsrt_chh_iter_next (&st->it); - while (st->cur && st->cur->kind != st->kind) - st->cur = ddsrt_chh_iter_next (&st->it); + ddsrt_mutex_lock (&st->gh->all_entities_lock); + st->cur = ddsrt_avl_lookup_succ (&all_entities_treedef, &st->gh->all_entities, st->cur); + ddsrt_mutex_unlock (&st->gh->all_entities_lock); + if (st->cur && st->cur->kind != st->kind) + st->cur = NULL; } return res; } +void *ephash_enum_next_max (struct ephash_enum *st, const struct match_entities_range_key *max) +{ + void *res = ephash_enum_next (st); + + /* max may only make the bounds tighter */ + assert (max->entity.e.kind == st->kind); + if (st->cur && all_entities_compare (st->cur, &max->entity) > 0) + st->cur = NULL; + return res; +} + struct writer *ephash_enum_writer_next (struct ephash_enum_writer *st) { assert (offsetof (struct writer, e) == 0); @@ -333,7 +543,8 @@ struct proxy_participant *ephash_enum_proxy_participant_next (struct ephash_enum void ephash_enum_fini (struct ephash_enum *st) { - DDSRT_UNUSED_ARG(st); + assert (ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime) == st->vtime); + (void) st; } void ephash_enum_writer_fini (struct ephash_enum_writer *st) diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index 6d2a69a..b9c65b2 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -18,6 +18,7 @@ #include "dds/ddsrt/avl.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_addrset.h" #include "dds/ddsi/q_xmsg.h" #include "dds/ddsi/q_bswap.h"