Rename ephash to entity_index/entidx

It is now more than just a GUID hash so "ephash" really doesn't fit
anymore.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-12-10 10:20:08 +01:00 committed by eboasson
parent 2dfde4bf0d
commit 0271c11144
31 changed files with 523 additions and 527 deletions

View file

@ -19,7 +19,7 @@
extern "C" {
#endif
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct ephash *guid_hash);
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct entity_index *entidx);
#if defined (__cplusplus)
}

View file

@ -258,7 +258,7 @@ void dds__builtin_init (struct dds_domain *dom)
dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication", &dom->gv);
thread_state_awake (lookup_thread_state (), &dom->gv);
const struct ephash *gh = dom->gv.guid_hash;
const struct entity_index *gh = dom->gv.entity_index;
dom->builtintopic_writer_participant = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dom->builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT, gh));
dom->builtintopic_writer_publications = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dom->builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER, gh));
dom->builtintopic_writer_subscriptions = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dom->builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER, gh));

View file

@ -16,7 +16,7 @@
#include "dds__guardcond.h"
#include "dds__participant.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_thread.h"

View file

@ -17,7 +17,7 @@
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_bswap.h"
#include "dds__writer.h"
@ -34,7 +34,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
return rc;
else
{
const struct ephash *gh = wr->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = wr->m_entity.m_domain->gv.entity_index;
size_t nrds_act = 0;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer */
@ -45,7 +45,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
m = ddsrt_avl_iter_next (&it))
{
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
{
if (nrds_act < nrds)
rds[nrds_act] = prd->e.iid;
@ -57,7 +57,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
if (nrds_act < nrds)
rds[nrds_act] = rd->e.iid;
@ -84,7 +84,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
return rc;
else
{
const struct ephash *gh = rd->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = rd->m_entity.m_domain->gv.entity_index;
size_t nwrs_act = 0;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer */
@ -95,7 +95,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
m = ddsrt_avl_iter_next (&it))
{
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
{
if (nwrs_act < nwrs)
wrs[nwrs_act] = pwr->e.iid;
@ -107,7 +107,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
m = ddsrt_avl_iter_next (&it))
{
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
{
if (nwrs_act < nwrs)
wrs[nwrs_act] = wr->e.iid;
@ -149,7 +149,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
return NULL;
else
{
const struct ephash *gh = wr->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = wr->m_entity.m_domain->gv.entity_index;
dds_builtintopic_endpoint_t *ret = NULL;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer, and not be so inefficient besides */
@ -160,7 +160,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
m = ddsrt_avl_iter_next (&it))
{
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
{
if (prd->e.iid == ih)
ret = make_builtintopic_endpoint (&prd->e.guid, &prd->c.proxypp->e.guid, prd->c.proxypp->e.iid, prd->c.xqos);
@ -171,7 +171,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
if (rd->e.iid == ih)
ret = make_builtintopic_endpoint (&rd->e.guid, &rd->c.pp->e.guid, rd->c.pp->e.iid, rd->xqos);
@ -192,7 +192,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
return NULL;
else
{
const struct ephash *gh = rd->m_entity.m_domain->gv.guid_hash;
const struct entity_index *gh = rd->m_entity.m_domain->gv.entity_index;
dds_builtintopic_endpoint_t *ret = NULL;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer, and not be so inefficient besides */
@ -203,7 +203,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
m = ddsrt_avl_iter_next (&it))
{
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
{
if (pwr->e.iid == ih)
ret = make_builtintopic_endpoint (&pwr->e.guid, &pwr->c.proxypp->e.guid, pwr->c.proxypp->e.iid, pwr->c.xqos);
@ -214,7 +214,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
m = ddsrt_avl_iter_next (&it))
{
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
{
if (wr->e.iid == ih)
ret = make_builtintopic_endpoint (&wr->e.guid, &wr->c.pp->e.guid, wr->c.pp->e.iid, wr->xqos);

View file

@ -18,7 +18,7 @@
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/version.h"
#include "dds__init.h"
#include "dds__domain.h"
@ -56,7 +56,7 @@ static dds_return_t dds_participant_qos_set (dds_entity *e, const dds_qos_t *qos
{
struct participant *pp;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((pp = ephash_lookup_participant_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
if ((pp = entidx_lookup_participant_guid (e->m_domain->gv.entity_index, &e->m_guid)) != NULL)
{
nn_plist_t plist;
nn_plist_init_empty (&plist);

View file

@ -16,7 +16,7 @@
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsc/dds_rhc.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_sertopic.h"

View file

@ -15,7 +15,7 @@
#include "dds/ddsc/dds_rhc.h"
#include "dds__entity.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_thread.h"

View file

@ -28,7 +28,7 @@
#include "dds/ddsi/q_globals.h"
#include "dds__builtin.h"
#include "dds/ddsi/ddsi_sertopic.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_reader)
@ -78,7 +78,7 @@ static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, boo
{
struct reader *rd;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((rd = ephash_lookup_reader_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (e->m_domain->gv.entity_index, &e->m_guid)) != NULL)
update_reader_qos (rd, qos);
thread_state_asleep (lookup_thread_state ());
}
@ -513,7 +513,7 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb
pwrguid_next.entityid.u = (pwrguid_next.entityid.u & ~(uint32_t)0xff) | NN_ENTITYID_KIND_WRITER_NO_KEY;
}
ddsrt_mutex_unlock (&rd->e.lock);
if ((pwr = ephash_lookup_proxy_writer_guid (dds_entity->m_domain->gv.guid_hash, &pwrguid)) != NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (dds_entity->m_domain->gv.entity_index, &pwrguid)) != NULL)
{
ddsrt_mutex_lock (&pwr->e.lock);
pwr->ddsi2direct_cb = cb;

View file

@ -24,7 +24,7 @@
#include "dds__serdata_builtintopic.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
static const uint64_t unihashconsts[] = {
UINT64_C (16292676669999574021),
@ -132,7 +132,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
/* keyhash must in host format (which the GUIDs always are internally) */
struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const ddsi_guid_t *) keyhash->value);
struct entity_common *entity = entidx_lookup_guid_untyped (tp->gv->entity_index, (const ddsi_guid_t *) keyhash->value);
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
memcpy (&d->key, keyhash->value, sizeof (d->key));
if (entity)

View file

@ -25,7 +25,7 @@
#include "dds__get_status.h"
#include "dds__qos.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/ddsi_sertopic.h"
#include "dds/ddsi/q_ddsi_discovery.h"
@ -438,7 +438,7 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
/* Publish Topic */
thread_state_awake (lookup_thread_state (), &par->m_entity.m_domain->gv);
ddsi_pp = ephash_lookup_participant_guid (par->m_entity.m_domain->gv.guid_hash, &par->m_entity.m_guid);
ddsi_pp = entidx_lookup_participant_guid (par->m_entity.m_domain->gv.entity_index, &par->m_entity.m_guid);
assert (ddsi_pp);
if (sedp_plist)
{

View file

@ -17,7 +17,7 @@
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_tkmap.h"
@ -28,7 +28,7 @@
struct bwhc {
struct whc common;
enum ddsi_sertopic_builtintopic_type type;
const struct ephash *guid_hash;
const struct entity_index *entidx;
};
enum bwhc_iter_state {
@ -42,7 +42,7 @@ struct bwhc_iter {
struct whc_sample_iter_base c;
enum bwhc_iter_state st;
bool have_sample;
struct ephash_enum it;
struct entidx_enum it;
};
/* check that our definition of whc_sample_iter fits in the type that callers allocate */
@ -92,11 +92,11 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
case DSBT_READER: kind = EK_READER; break;
}
assert (whc->type == DSBT_PARTICIPANT || kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, whc->guid_hash, kind);
entidx_enum_init (&it->it, whc->entidx, kind);
it->st = BIS_LOCAL;
/* FALLS THROUGH */
case BIS_LOCAL:
while ((entity = ephash_enum_next (&it->it)) != NULL)
while ((entity = entidx_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
@ -104,7 +104,7 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
entidx_enum_fini (&it->it);
it->st = BIS_INIT_PROXY;
}
/* FALLS THROUGH */
@ -115,11 +115,11 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
case DSBT_READER: kind = EK_PROXY_READER; break;
}
assert (kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, whc->guid_hash, kind);
entidx_enum_init (&it->it, whc->entidx, kind);
it->st = BIS_PROXY;
/* FALLS THROUGH */
case BIS_PROXY:
while ((entity = ephash_enum_next (&it->it)) != NULL)
while ((entity = entidx_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
@ -127,7 +127,7 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
entidx_enum_fini (&it->it);
return false;
}
}
@ -192,11 +192,11 @@ static const struct whc_ops bwhc_ops = {
.free = bwhc_free
};
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct ephash *guid_hash)
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct entity_index *entidx)
{
struct bwhc *whc = ddsrt_malloc (sizeof (*whc));
whc->common.ops = &bwhc_ops;
whc->type = type;
whc->guid_hash = guid_hash;
whc->entidx = entidx;
return (struct whc *) whc;
}

View file

@ -20,7 +20,7 @@
#include "dds/ddsi/ddsi_serdata.h"
#include "dds__stream.h"
#include "dds/ddsi/q_transmit.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_radmin.h"
@ -121,7 +121,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
ddsrt_avl_iter_t it;
struct pwr_rd_match *m;
struct ddsi_writer_info wrinfo;
const struct ephash *gh = wr->e.gv->guid_hash;
const struct entity_index *gh = wr->e.gv->entity_index;
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos);
@ -129,7 +129,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
DDS_CTRACE (&wr->e.gv->logconfig, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
/* Copied the return value ignore from DDSI deliver_user_data () function. */

View file

@ -19,7 +19,7 @@
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds__writer.h"
#include "dds__listener.h"
#include "dds__init.h"
@ -225,7 +225,7 @@ static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, boo
{
struct writer *wr;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((wr = ephash_lookup_writer_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (e->m_domain->gv.entity_index, &e->m_guid)) != NULL)
update_writer_qos (wr, qos);
thread_state_asleep (lookup_thread_state ());
}

View file

@ -20,7 +20,7 @@
#include "dds/version.h"
#include "dds__entity.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsrt/cdtors.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/process.h"
@ -98,7 +98,7 @@ static seqno_t get_pmd_seqno(dds_entity_t participant)
struct writer *wr;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
pp = entidx_lookup_participant_guid(pp_entity->m_domain->gv.entity_index, &pp_entity->m_guid);
wr = get_builtin_writer(pp, NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
CU_ASSERT_FATAL(wr != NULL);
assert(wr != NULL); /* for Clang's static analyzer */
@ -118,7 +118,7 @@ static dds_duration_t get_pmd_interval(dds_entity_t participant)
struct participant *pp;
CU_ASSERT_EQUAL_FATAL(dds_entity_pin(participant, &pp_entity), 0);
thread_state_awake(lookup_thread_state(), &pp_entity->m_domain->gv);
pp = ephash_lookup_participant_guid(pp_entity->m_domain->gv.guid_hash, &pp_entity->m_guid);
pp = entidx_lookup_participant_guid(pp_entity->m_domain->gv.entity_index, &pp_entity->m_guid);
intv = pp_get_pmd_interval(pp);
thread_state_asleep(lookup_thread_state());
dds_entity_unpin(pp_entity);

View file

@ -29,6 +29,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_threadmon.c
ddsi_rhc.c
ddsi_pmd.c
ddsi_entity_index.c
q_addrset.c
q_bitset_inlines.c
q_bswap.c
@ -36,7 +37,6 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
q_ddsi_discovery.c
q_debmon.c
q_entity.c
q_ephash.c
q_gc.c
q_init.c
q_lat_estim.c
@ -82,6 +82,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
ddsi_builtin_topic_if.h
ddsi_rhc.h
ddsi_guid.h
ddsi_entity_index.h
q_addrset.h
q_bitset.h
q_bswap.h
@ -89,7 +90,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
q_ddsi_discovery.h
q_debmon.h
q_entity.h
q_ephash.h
q_feature_check.h
q_freelist.h
q_gc.h

View file

@ -0,0 +1,144 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_ENTITY_INDEX_H
#define DDSI_ENTITY_INDEX_H
#include "dds/ddsrt/hopscotch.h"
#include "dds/ddsi/q_entity.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct entity_index;
struct ddsi_guid;
struct match_entities_range_key {
union {
struct writer wr;
struct reader rd;
struct proxy_writer pwr;
struct proxy_reader prd;
struct entity_common e;
struct generic_proxy_endpoint gpe;
} entity;
struct dds_qos xqos;
};
struct entidx_enum
{
struct entity_index *entidx;
enum entity_kind kind;
struct entity_common *cur;
#ifndef NDEBUG
vtime_t vtime;
#endif
};
/* Readers & writers are both in a GUID- and in a GID-keyed table. If
they are in the GID-based one, they are also in the GUID-based one,
but not the way around, for two reasons:
- firstly, there are readers & writers that do not have a GID
(built-in endpoints, fictitious transient data readers),
- secondly, they are inserted first in the GUID-keyed one, and then
in the GID-keyed one.
The GID is used solely for the interface with the OpenSplice
kernel, all internal state and protocol handling is done using the
GUID. So all this means is that, e.g., a writer being deleted
becomes invisible to the network reader slightly before it
disappears in the protocol handling, or that a writer might exist
at the protocol level slightly before the network reader can use it
to transmit data. */
struct q_globals;
struct entity_index *entity_index_new (struct q_globals *gv);
void entity_index_free (struct entity_index *ei);
void entidx_insert_participant_guid (struct entity_index *ei, struct participant *pp);
void entidx_insert_proxy_participant_guid (struct entity_index *ei, struct proxy_participant *proxypp);
void entidx_insert_writer_guid (struct entity_index *ei, struct writer *wr);
void entidx_insert_reader_guid (struct entity_index *ei, struct reader *rd);
void entidx_insert_proxy_writer_guid (struct entity_index *ei, struct proxy_writer *pwr);
void entidx_insert_proxy_reader_guid (struct entity_index *ei, struct proxy_reader *prd);
void entidx_remove_participant_guid (struct entity_index *ei, struct participant *pp);
void entidx_remove_proxy_participant_guid (struct entity_index *ei, struct proxy_participant *proxypp);
void entidx_remove_writer_guid (struct entity_index *ei, struct writer *wr);
void entidx_remove_reader_guid (struct entity_index *ei, struct reader *rd);
void entidx_remove_proxy_writer_guid (struct entity_index *ei, struct proxy_writer *pwr);
void entidx_remove_proxy_reader_guid (struct entity_index *ei, struct proxy_reader *prd);
DDS_EXPORT void *entidx_lookup_guid_untyped (const struct entity_index *ei, const struct ddsi_guid *guid);
DDS_EXPORT void *entidx_lookup_guid (const struct entity_index *ei, const struct ddsi_guid *guid, enum entity_kind kind);
DDS_EXPORT struct participant *entidx_lookup_participant_guid (const struct entity_index *ei, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_participant *entidx_lookup_proxy_participant_guid (const struct entity_index *ei, const struct ddsi_guid *guid);
DDS_EXPORT struct writer *entidx_lookup_writer_guid (const struct entity_index *ei, const struct ddsi_guid *guid);
DDS_EXPORT struct reader *entidx_lookup_reader_guid (const struct entity_index *ei, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_writer *entidx_lookup_proxy_writer_guid (const struct entity_index *ei, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_reader *entidx_lookup_proxy_reader_guid (const struct entity_index *ei, const struct ddsi_guid *guid);
/* Enumeration of entries in the hash table:
- "next" visits at least all entries that were in the hash table at
the time of calling init and that have not subsequently been
removed;
- "next" may visit an entry more than once, but will do so only
because of rare events (i.e., resize or so);
- the order in which entries are visited is arbitrary;
- the caller must call init() before it may call next(); it must
call fini() before it may call init() again. */
struct entidx_enum_participant { struct entidx_enum st; };
struct entidx_enum_writer { struct entidx_enum st; };
struct entidx_enum_reader { struct entidx_enum st; };
struct entidx_enum_proxy_participant { struct entidx_enum st; };
struct entidx_enum_proxy_writer { struct entidx_enum st; };
struct entidx_enum_proxy_reader { struct entidx_enum st; };
void entidx_enum_init (struct entidx_enum *st, const struct entity_index *ei, enum entity_kind kind);
void entidx_enum_init_topic (struct entidx_enum *st, const struct entity_index *gh, enum entity_kind kind, const char *topic, struct match_entities_range_key *max);
void *entidx_enum_next_max (struct entidx_enum *st, const struct match_entities_range_key *max);
void *entidx_enum_next (struct entidx_enum *st);
void entidx_enum_fini (struct entidx_enum *st);
void entidx_enum_writer_init (struct entidx_enum_writer *st, const struct entity_index *ei);
void entidx_enum_reader_init (struct entidx_enum_reader *st, const struct entity_index *ei);
void entidx_enum_proxy_writer_init (struct entidx_enum_proxy_writer *st, const struct entity_index *ei);
void entidx_enum_proxy_reader_init (struct entidx_enum_proxy_reader *st, const struct entity_index *ei);
void entidx_enum_participant_init (struct entidx_enum_participant *st, const struct entity_index *ei);
void entidx_enum_proxy_participant_init (struct entidx_enum_proxy_participant *st, const struct entity_index *ei);
struct writer *entidx_enum_writer_next (struct entidx_enum_writer *st);
struct reader *entidx_enum_reader_next (struct entidx_enum_reader *st);
struct proxy_writer *entidx_enum_proxy_writer_next (struct entidx_enum_proxy_writer *st);
struct proxy_reader *entidx_enum_proxy_reader_next (struct entidx_enum_proxy_reader *st);
struct participant *entidx_enum_participant_next (struct entidx_enum_participant *st);
struct proxy_participant *entidx_enum_proxy_participant_next (struct entidx_enum_proxy_participant *st);
void entidx_enum_writer_fini (struct entidx_enum_writer *st);
void entidx_enum_reader_fini (struct entidx_enum_reader *st);
void entidx_enum_proxy_writer_fini (struct entidx_enum_proxy_writer *st);
void entidx_enum_proxy_reader_fini (struct entidx_enum_proxy_reader *st);
void entidx_enum_participant_fini (struct entidx_enum_participant *st);
void entidx_enum_proxy_participant_fini (struct entidx_enum_proxy_participant *st);
#if defined (__cplusplus)
}
#endif
#endif /* DDSI_ENTITY_INDEX_H */

View file

@ -686,8 +686,8 @@ void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_gui
int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp);
struct ephash;
void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
struct entity_index;
void delete_proxy_group (struct entity_index *entidx, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit);
/* Call this to empty all address sets of all writers to stop all outgoing traffic, or to
rebuild them all (which only makes sense after previously having emptied them all). */

View file

@ -1,145 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef Q_EPHASH_H
#define Q_EPHASH_H
#include "dds/ddsrt/hopscotch.h"
#include "dds/ddsi/q_entity.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct ephash;
struct ddsi_guid;
struct match_entities_range_key {
union {
struct writer wr;
struct reader rd;
struct proxy_writer pwr;
struct proxy_reader prd;
struct entity_common e;
struct generic_proxy_endpoint gpe;
} entity;
struct dds_qos xqos;
};
struct ephash_enum
{
struct ephash *gh;
enum entity_kind kind;
struct entity_common *cur;
#ifndef NDEBUG
vtime_t vtime;
#endif
};
/* Readers & writers are both in a GUID- and in a GID-keyed table. If
they are in the GID-based one, they are also in the GUID-based one,
but not the way around, for two reasons:
- firstly, there are readers & writers that do not have a GID
(built-in endpoints, fictitious transient data readers),
- secondly, they are inserted first in the GUID-keyed one, and then
in the GID-keyed one.
The GID is used solely for the interface with the OpenSplice
kernel, all internal state and protocol handling is done using the
GUID. So all this means is that, e.g., a writer being deleted
becomes invisible to the network reader slightly before it
disappears in the protocol handling, or that a writer might exist
at the protocol level slightly before the network reader can use it
to transmit data. */
struct q_globals;
struct ephash *ephash_new (struct q_globals *gv);
void ephash_free (struct ephash *ephash);
void ephash_insert_participant_guid (struct ephash *eh, struct participant *pp);
void ephash_insert_proxy_participant_guid (struct ephash *eh, struct proxy_participant *proxypp);
void ephash_insert_writer_guid (struct ephash *eh, struct writer *wr);
void ephash_insert_reader_guid (struct ephash *eh, struct reader *rd);
void ephash_insert_proxy_writer_guid (struct ephash *eh, struct proxy_writer *pwr);
void ephash_insert_proxy_reader_guid (struct ephash *eh, struct proxy_reader *prd);
void ephash_remove_participant_guid (struct ephash *eh, struct participant *pp);
void ephash_remove_proxy_participant_guid (struct ephash *eh, struct proxy_participant *proxypp);
void ephash_remove_writer_guid (struct ephash *eh, struct writer *wr);
void ephash_remove_reader_guid (struct ephash *eh, struct reader *rd);
void ephash_remove_proxy_writer_guid (struct ephash *eh, struct proxy_writer *pwr);
void ephash_remove_proxy_reader_guid (struct ephash *eh, struct proxy_reader *prd);
DDS_EXPORT void *ephash_lookup_guid_untyped (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT void *ephash_lookup_guid (const struct ephash *eh, const struct ddsi_guid *guid, enum entity_kind kind);
DDS_EXPORT struct participant *ephash_lookup_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_participant *ephash_lookup_proxy_participant_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct writer *ephash_lookup_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct reader *ephash_lookup_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_writer *ephash_lookup_proxy_writer_guid (const struct ephash *eh, const struct ddsi_guid *guid);
DDS_EXPORT struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *eh, const struct ddsi_guid *guid);
/* Enumeration of entries in the hash table:
- "next" visits at least all entries that were in the hash table at
the time of calling init and that have not subsequently been
removed;
- "next" may visit an entry more than once, but will do so only
because of rare events (i.e., resize or so);
- the order in which entries are visited is arbitrary;
- the caller must call init() before it may call next(); it must
call fini() before it may call init() again. */
struct ephash_enum_participant { struct ephash_enum st; };
struct ephash_enum_writer { struct ephash_enum st; };
struct ephash_enum_reader { struct ephash_enum st; };
struct ephash_enum_proxy_participant { struct ephash_enum st; };
struct ephash_enum_proxy_writer { struct ephash_enum st; };
struct ephash_enum_proxy_reader { struct ephash_enum st; };
void ephash_enum_init (struct ephash_enum *st, const struct ephash *eh, enum entity_kind kind);
void ephash_enum_init_topic (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind, const char *topic, struct match_entities_range_key *max);
void *ephash_enum_next_max (struct ephash_enum *st, const struct match_entities_range_key *max);
void *ephash_enum_next (struct ephash_enum *st);
void ephash_enum_fini (struct ephash_enum *st);
void ephash_enum_writer_init (struct ephash_enum_writer *st, const struct ephash *eh);
void ephash_enum_reader_init (struct ephash_enum_reader *st, const struct ephash *eh);
void ephash_enum_proxy_writer_init (struct ephash_enum_proxy_writer *st, const struct ephash *eh);
void ephash_enum_proxy_reader_init (struct ephash_enum_proxy_reader *st, const struct ephash *eh);
void ephash_enum_participant_init (struct ephash_enum_participant *st, const struct ephash *eh);
void ephash_enum_proxy_participant_init (struct ephash_enum_proxy_participant *st, const struct ephash *eh);
struct writer *ephash_enum_writer_next (struct ephash_enum_writer *st);
struct reader *ephash_enum_reader_next (struct ephash_enum_reader *st);
struct proxy_writer *ephash_enum_proxy_writer_next (struct ephash_enum_proxy_writer *st);
struct proxy_reader *ephash_enum_proxy_reader_next (struct ephash_enum_proxy_reader *st);
struct participant *ephash_enum_participant_next (struct ephash_enum_participant *st);
struct proxy_participant *ephash_enum_proxy_participant_next (struct ephash_enum_proxy_participant *st);
void ephash_enum_writer_fini (struct ephash_enum_writer *st);
void ephash_enum_reader_fini (struct ephash_enum_reader *st);
void ephash_enum_proxy_writer_fini (struct ephash_enum_proxy_writer *st);
void ephash_enum_proxy_reader_fini (struct ephash_enum_proxy_reader *st);
void ephash_enum_participant_fini (struct ephash_enum_participant *st);
void ephash_enum_proxy_participant_fini (struct ephash_enum_proxy_participant *st);
#if defined (__cplusplus)
}
#endif
#endif /* Q_EPHASH_H */

View file

@ -38,7 +38,7 @@ struct nn_defrag;
struct addrset;
struct xeventq;
struct gcreq_queue;
struct ephash;
struct entity_index;
struct lease;
struct ddsi_tran_conn;
struct ddsi_tran_listener;
@ -93,9 +93,8 @@ struct q_globals {
struct ddsi_tkmap * m_tkmap;
/* Hash tables for participants, readers, writers, proxy
participants, proxy readers and proxy writers by GUID
(guid_hash) */
struct ephash *guid_hash;
participants, proxy readers and proxy writers by GUID. */
struct entity_index *entity_index;
/* Timed events admin */
struct xeventq *xevents;

View file

@ -18,7 +18,7 @@
#include "dds/ddsrt/hopscotch.h"
#include "dds/ddsrt/avl.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_entity.h"
@ -26,8 +26,8 @@
#include "dds/ddsi/q_rtps.h" /* guid_t */
#include "dds/ddsi/q_thread.h" /* for assert(thread is awake) */
struct ephash {
struct ddsrt_chh *hash;
struct entity_index {
struct ddsrt_chh *guid_hash;
ddsrt_mutex_t all_entities_lock;
ddsrt_avl_tree_t all_entities;
};
@ -226,192 +226,192 @@ static void gc_buckets (void *bs, void *varg)
gcreq_enqueue (gcreq);
}
struct ephash *ephash_new (struct q_globals *gv)
struct entity_index *entity_index_new (struct q_globals *gv)
{
struct ephash *ephash;
ephash = ddsrt_malloc (sizeof (*ephash));
ephash->hash = ddsrt_chh_new (32, hash_entity_guid_wrapper, entity_guid_eq_wrapper, gc_buckets, gv);
if (ephash->hash == NULL) {
ddsrt_free (ephash);
struct entity_index *entidx;
entidx = ddsrt_malloc (sizeof (*entidx));
entidx->guid_hash = ddsrt_chh_new (32, hash_entity_guid_wrapper, entity_guid_eq_wrapper, gc_buckets, gv);
if (entidx->guid_hash == NULL) {
ddsrt_free (entidx);
return NULL;
} else {
ddsrt_mutex_init (&ephash->all_entities_lock);
ddsrt_avl_init (&all_entities_treedef, &ephash->all_entities);
return ephash;
ddsrt_mutex_init (&entidx->all_entities_lock);
ddsrt_avl_init (&all_entities_treedef, &entidx->all_entities);
return entidx;
}
}
void ephash_free (struct ephash *ephash)
void entity_index_free (struct entity_index *entidx)
{
ddsrt_avl_free (&all_entities_treedef, &ephash->all_entities, 0);
ddsrt_mutex_destroy (&ephash->all_entities_lock);
ddsrt_chh_free (ephash->hash);
ephash->hash = NULL;
ddsrt_free (ephash);
ddsrt_avl_free (&all_entities_treedef, &entidx->all_entities, 0);
ddsrt_mutex_destroy (&entidx->all_entities_lock);
ddsrt_chh_free (entidx->guid_hash);
entidx->guid_hash = NULL;
ddsrt_free (entidx);
}
static void add_to_all_entities (struct ephash *gh, struct entity_common *e)
static void add_to_all_entities (struct entity_index *ei, struct entity_common *e)
{
ddsrt_mutex_lock (&gh->all_entities_lock);
assert (ddsrt_avl_lookup (&all_entities_treedef, &gh->all_entities, e) == NULL);
ddsrt_avl_insert (&all_entities_treedef, &gh->all_entities, e);
ddsrt_mutex_unlock (&gh->all_entities_lock);
ddsrt_mutex_lock (&ei->all_entities_lock);
assert (ddsrt_avl_lookup (&all_entities_treedef, &ei->all_entities, e) == NULL);
ddsrt_avl_insert (&all_entities_treedef, &ei->all_entities, e);
ddsrt_mutex_unlock (&ei->all_entities_lock);
}
static void remove_from_all_entities (struct ephash *gh, struct entity_common *e)
static void remove_from_all_entities (struct entity_index *ei, struct entity_common *e)
{
ddsrt_mutex_lock (&gh->all_entities_lock);
assert (ddsrt_avl_lookup (&all_entities_treedef, &gh->all_entities, e) != NULL);
ddsrt_avl_delete (&all_entities_treedef, &gh->all_entities, e);
ddsrt_mutex_unlock (&gh->all_entities_lock);
ddsrt_mutex_lock (&ei->all_entities_lock);
assert (ddsrt_avl_lookup (&all_entities_treedef, &ei->all_entities, e) != NULL);
ddsrt_avl_delete (&all_entities_treedef, &ei->all_entities, e);
ddsrt_mutex_unlock (&ei->all_entities_lock);
}
static void ephash_guid_insert (struct ephash *gh, struct entity_common *e)
static void entity_index_insert (struct entity_index *ei, struct entity_common *e)
{
int x;
x = ddsrt_chh_add (gh->hash, e);
x = ddsrt_chh_add (ei->guid_hash, e);
(void)x;
assert (x);
add_to_all_entities (gh, e);
add_to_all_entities (ei, e);
}
static void ephash_guid_remove (struct ephash *gh, struct entity_common *e)
static void entity_index_remove (struct entity_index *ei, struct entity_common *e)
{
int x;
remove_from_all_entities (gh, e);
x = ddsrt_chh_remove (gh->hash, e);
remove_from_all_entities (ei, e);
x = ddsrt_chh_remove (ei->guid_hash, e);
(void)x;
assert (x);
}
void *ephash_lookup_guid_untyped (const struct ephash *gh, const struct ddsi_guid *guid)
void *entidx_lookup_guid_untyped (const struct entity_index *ei, const struct ddsi_guid *guid)
{
/* FIXME: could (now) require guid to be first in entity_common; entity_common already is first in entity */
struct entity_common e;
e.guid = *guid;
assert (thread_is_awake ());
return ddsrt_chh_lookup (gh->hash, &e);
return ddsrt_chh_lookup (ei->guid_hash, &e);
}
static void *ephash_lookup_guid_int (const struct ephash *gh, const struct ddsi_guid *guid, enum entity_kind kind)
static void *entidx_lookup_guid_int (const struct entity_index *ei, const struct ddsi_guid *guid, enum entity_kind kind)
{
struct entity_common *res;
if ((res = ephash_lookup_guid_untyped (gh, guid)) != NULL && res->kind == kind)
if ((res = entidx_lookup_guid_untyped (ei, guid)) != NULL && res->kind == kind)
return res;
else
return NULL;
}
void *ephash_lookup_guid (const struct ephash *gh, const struct ddsi_guid *guid, enum entity_kind kind)
void *entidx_lookup_guid (const struct entity_index *ei, const struct ddsi_guid *guid, enum entity_kind kind)
{
return ephash_lookup_guid_int (gh, guid, kind);
return entidx_lookup_guid_int (ei, guid, kind);
}
void ephash_insert_participant_guid (struct ephash *gh, struct participant *pp)
void entidx_insert_participant_guid (struct entity_index *ei, struct participant *pp)
{
ephash_guid_insert (gh, &pp->e);
entity_index_insert (ei, &pp->e);
}
void ephash_insert_proxy_participant_guid (struct ephash *gh, struct proxy_participant *proxypp)
void entidx_insert_proxy_participant_guid (struct entity_index *ei, struct proxy_participant *proxypp)
{
ephash_guid_insert (gh, &proxypp->e);
entity_index_insert (ei, &proxypp->e);
}
void ephash_insert_writer_guid (struct ephash *gh, struct writer *wr)
void entidx_insert_writer_guid (struct entity_index *ei, struct writer *wr)
{
ephash_guid_insert (gh, &wr->e);
entity_index_insert (ei, &wr->e);
}
void ephash_insert_reader_guid (struct ephash *gh, struct reader *rd)
void entidx_insert_reader_guid (struct entity_index *ei, struct reader *rd)
{
ephash_guid_insert (gh, &rd->e);
entity_index_insert (ei, &rd->e);
}
void ephash_insert_proxy_writer_guid (struct ephash *gh, struct proxy_writer *pwr)
void entidx_insert_proxy_writer_guid (struct entity_index *ei, struct proxy_writer *pwr)
{
ephash_guid_insert (gh, &pwr->e);
entity_index_insert (ei, &pwr->e);
}
void ephash_insert_proxy_reader_guid (struct ephash *gh, struct proxy_reader *prd)
void entidx_insert_proxy_reader_guid (struct entity_index *ei, struct proxy_reader *prd)
{
ephash_guid_insert (gh, &prd->e);
entity_index_insert (ei, &prd->e);
}
void ephash_remove_participant_guid (struct ephash *gh, struct participant *pp)
void entidx_remove_participant_guid (struct entity_index *ei, struct participant *pp)
{
ephash_guid_remove (gh, &pp->e);
entity_index_remove (ei, &pp->e);
}
void ephash_remove_proxy_participant_guid (struct ephash *gh, struct proxy_participant *proxypp)
void entidx_remove_proxy_participant_guid (struct entity_index *ei, struct proxy_participant *proxypp)
{
ephash_guid_remove (gh, &proxypp->e);
entity_index_remove (ei, &proxypp->e);
}
void ephash_remove_writer_guid (struct ephash *gh, struct writer *wr)
void entidx_remove_writer_guid (struct entity_index *ei, struct writer *wr)
{
ephash_guid_remove (gh, &wr->e);
entity_index_remove (ei, &wr->e);
}
void ephash_remove_reader_guid (struct ephash *gh, struct reader *rd)
void entidx_remove_reader_guid (struct entity_index *ei, struct reader *rd)
{
ephash_guid_remove (gh, &rd->e);
entity_index_remove (ei, &rd->e);
}
void ephash_remove_proxy_writer_guid (struct ephash *gh, struct proxy_writer *pwr)
void entidx_remove_proxy_writer_guid (struct entity_index *ei, struct proxy_writer *pwr)
{
ephash_guid_remove (gh, &pwr->e);
entity_index_remove (ei, &pwr->e);
}
void ephash_remove_proxy_reader_guid (struct ephash *gh, struct proxy_reader *prd)
void entidx_remove_proxy_reader_guid (struct entity_index *ei, struct proxy_reader *prd)
{
ephash_guid_remove (gh, &prd->e);
entity_index_remove (ei, &prd->e);
}
struct participant *ephash_lookup_participant_guid (const struct ephash *gh, const struct ddsi_guid *guid)
struct participant *entidx_lookup_participant_guid (const struct entity_index *ei, const struct ddsi_guid *guid)
{
assert (guid->entityid.u == NN_ENTITYID_PARTICIPANT);
assert (offsetof (struct participant, e) == 0);
return ephash_lookup_guid_int (gh, guid, EK_PARTICIPANT);
return entidx_lookup_guid_int (ei, guid, EK_PARTICIPANT);
}
struct proxy_participant *ephash_lookup_proxy_participant_guid (const struct ephash *gh, const struct ddsi_guid *guid)
struct proxy_participant *entidx_lookup_proxy_participant_guid (const struct entity_index *ei, const struct ddsi_guid *guid)
{
assert (guid->entityid.u == NN_ENTITYID_PARTICIPANT);
assert (offsetof (struct proxy_participant, e) == 0);
return ephash_lookup_guid_int (gh, guid, EK_PROXY_PARTICIPANT);
return entidx_lookup_guid_int (ei, guid, EK_PROXY_PARTICIPANT);
}
struct writer *ephash_lookup_writer_guid (const struct ephash *gh, const struct ddsi_guid *guid)
struct writer *entidx_lookup_writer_guid (const struct entity_index *ei, const struct ddsi_guid *guid)
{
assert (is_writer_entityid (guid->entityid));
assert (offsetof (struct writer, e) == 0);
return ephash_lookup_guid_int (gh, guid, EK_WRITER);
return entidx_lookup_guid_int (ei, guid, EK_WRITER);
}
struct reader *ephash_lookup_reader_guid (const struct ephash *gh, const struct ddsi_guid *guid)
struct reader *entidx_lookup_reader_guid (const struct entity_index *ei, const struct ddsi_guid *guid)
{
assert (is_reader_entityid (guid->entityid));
assert (offsetof (struct reader, e) == 0);
return ephash_lookup_guid_int (gh, guid, EK_READER);
return entidx_lookup_guid_int (ei, guid, EK_READER);
}
struct proxy_writer *ephash_lookup_proxy_writer_guid (const struct ephash *gh, const struct ddsi_guid *guid)
struct proxy_writer *entidx_lookup_proxy_writer_guid (const struct entity_index *ei, const struct ddsi_guid *guid)
{
assert (is_writer_entityid (guid->entityid));
assert (offsetof (struct proxy_writer, e) == 0);
return ephash_lookup_guid_int (gh, guid, EK_PROXY_WRITER);
return entidx_lookup_guid_int (ei, guid, EK_PROXY_WRITER);
}
struct proxy_reader *ephash_lookup_proxy_reader_guid (const struct ephash *gh, const struct ddsi_guid *guid)
struct proxy_reader *entidx_lookup_proxy_reader_guid (const struct entity_index *ei, const struct ddsi_guid *guid)
{
assert (is_reader_entityid (guid->entityid));
assert (offsetof (struct proxy_reader, e) == 0);
return ephash_lookup_guid_int (gh, guid, EK_PROXY_READER);
return entidx_lookup_guid_int (ei, guid, EK_PROXY_READER);
}
/* Enumeration */
static void ephash_enum_init_minmax_int (struct ephash_enum *st, const struct ephash *gh, struct match_entities_range_key *min)
static void entidx_enum_init_minmax_int (struct entidx_enum *st, const struct entity_index *ei, struct match_entities_range_key *min)
{
/* Use a lock to protect against concurrent modification and rely on the GC not deleting
any entities while enumerating so we can rely on the (kind, topic, GUID) triple to
@ -422,81 +422,81 @@ static void ephash_enum_init_minmax_int (struct ephash_enum *st, const struct ep
assert (thread_is_awake ());
st->vtime = ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime);
#endif
st->gh = (struct ephash *) gh;
st->entidx = (struct entity_index *) ei;
st->kind = min->entity.e.kind;
ddsrt_mutex_lock (&st->gh->all_entities_lock);
st->cur = ddsrt_avl_lookup_succ_eq (&all_entities_treedef, &st->gh->all_entities, min);
ddsrt_mutex_unlock (&st->gh->all_entities_lock);
ddsrt_mutex_lock (&st->entidx->all_entities_lock);
st->cur = ddsrt_avl_lookup_succ_eq (&all_entities_treedef, &st->entidx->all_entities, min);
ddsrt_mutex_unlock (&st->entidx->all_entities_lock);
}
void ephash_enum_init_topic (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind, const char *topic, struct match_entities_range_key *max)
void entidx_enum_init_topic (struct entidx_enum *st, const struct entity_index *ei, enum entity_kind kind, const char *topic, struct match_entities_range_key *max)
{
assert (kind == EK_READER || kind == EK_WRITER || kind == EK_PROXY_READER || kind == EK_PROXY_WRITER);
struct match_entities_range_key min;
match_endpoint_range (kind, topic, &min, max);
ephash_enum_init_minmax_int (st, gh, &min);
entidx_enum_init_minmax_int (st, ei, &min);
if (st->cur && all_entities_compare (st->cur, &max->entity) > 0)
st->cur = NULL;
}
void ephash_enum_init (struct ephash_enum *st, const struct ephash *gh, enum entity_kind kind)
void entidx_enum_init (struct entidx_enum *st, const struct entity_index *ei, enum entity_kind kind)
{
struct match_entities_range_key min;
match_entity_kind_min (kind, &min);
ephash_enum_init_minmax_int (st, gh, &min);
entidx_enum_init_minmax_int (st, ei, &min);
if (st->cur && st->cur->kind != st->kind)
st->cur = NULL;
}
void ephash_enum_writer_init (struct ephash_enum_writer *st, const struct ephash *gh)
void entidx_enum_writer_init (struct entidx_enum_writer *st, const struct entity_index *ei)
{
ephash_enum_init (&st->st, gh, EK_WRITER);
entidx_enum_init (&st->st, ei, EK_WRITER);
}
void ephash_enum_reader_init (struct ephash_enum_reader *st, const struct ephash *gh)
void entidx_enum_reader_init (struct entidx_enum_reader *st, const struct entity_index *ei)
{
ephash_enum_init (&st->st, gh, EK_READER);
entidx_enum_init (&st->st, ei, EK_READER);
}
void ephash_enum_proxy_writer_init (struct ephash_enum_proxy_writer *st, const struct ephash *gh)
void entidx_enum_proxy_writer_init (struct entidx_enum_proxy_writer *st, const struct entity_index *ei)
{
ephash_enum_init (&st->st, gh, EK_PROXY_WRITER);
entidx_enum_init (&st->st, ei, EK_PROXY_WRITER);
}
void ephash_enum_proxy_reader_init (struct ephash_enum_proxy_reader *st, const struct ephash *gh)
void entidx_enum_proxy_reader_init (struct entidx_enum_proxy_reader *st, const struct entity_index *ei)
{
ephash_enum_init (&st->st, gh, EK_PROXY_READER);
entidx_enum_init (&st->st, ei, EK_PROXY_READER);
}
void ephash_enum_participant_init (struct ephash_enum_participant *st, const struct ephash *gh)
void entidx_enum_participant_init (struct entidx_enum_participant *st, const struct entity_index *ei)
{
ephash_enum_init (&st->st, gh, EK_PARTICIPANT);
entidx_enum_init (&st->st, ei, EK_PARTICIPANT);
}
void ephash_enum_proxy_participant_init (struct ephash_enum_proxy_participant *st, const struct ephash *gh)
void entidx_enum_proxy_participant_init (struct entidx_enum_proxy_participant *st, const struct entity_index *ei)
{
ephash_enum_init (&st->st, gh, EK_PROXY_PARTICIPANT);
entidx_enum_init (&st->st, ei, EK_PROXY_PARTICIPANT);
}
void *ephash_enum_next (struct ephash_enum *st)
void *entidx_enum_next (struct entidx_enum *st)
{
/* st->cur can not have been freed yet, but it may have been removed from the index */
assert (ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime) == st->vtime);
void *res = st->cur;
if (st->cur)
{
ddsrt_mutex_lock (&st->gh->all_entities_lock);
st->cur = ddsrt_avl_lookup_succ (&all_entities_treedef, &st->gh->all_entities, st->cur);
ddsrt_mutex_unlock (&st->gh->all_entities_lock);
ddsrt_mutex_lock (&st->entidx->all_entities_lock);
st->cur = ddsrt_avl_lookup_succ (&all_entities_treedef, &st->entidx->all_entities, st->cur);
ddsrt_mutex_unlock (&st->entidx->all_entities_lock);
if (st->cur && st->cur->kind != st->kind)
st->cur = NULL;
}
return res;
}
void *ephash_enum_next_max (struct ephash_enum *st, const struct match_entities_range_key *max)
void *entidx_enum_next_max (struct entidx_enum *st, const struct match_entities_range_key *max)
{
void *res = ephash_enum_next (st);
void *res = entidx_enum_next (st);
/* max may only make the bounds tighter */
assert (max->entity.e.kind == st->kind);
@ -505,74 +505,74 @@ void *ephash_enum_next_max (struct ephash_enum *st, const struct match_entities_
return res;
}
struct writer *ephash_enum_writer_next (struct ephash_enum_writer *st)
struct writer *entidx_enum_writer_next (struct entidx_enum_writer *st)
{
assert (offsetof (struct writer, e) == 0);
return ephash_enum_next (&st->st);
return entidx_enum_next (&st->st);
}
struct reader *ephash_enum_reader_next (struct ephash_enum_reader *st)
struct reader *entidx_enum_reader_next (struct entidx_enum_reader *st)
{
assert (offsetof (struct reader, e) == 0);
return ephash_enum_next (&st->st);
return entidx_enum_next (&st->st);
}
struct proxy_writer *ephash_enum_proxy_writer_next (struct ephash_enum_proxy_writer *st)
struct proxy_writer *entidx_enum_proxy_writer_next (struct entidx_enum_proxy_writer *st)
{
assert (offsetof (struct proxy_writer, e) == 0);
return ephash_enum_next (&st->st);
return entidx_enum_next (&st->st);
}
struct proxy_reader *ephash_enum_proxy_reader_next (struct ephash_enum_proxy_reader *st)
struct proxy_reader *entidx_enum_proxy_reader_next (struct entidx_enum_proxy_reader *st)
{
assert (offsetof (struct proxy_reader, e) == 0);
return ephash_enum_next (&st->st);
return entidx_enum_next (&st->st);
}
struct participant *ephash_enum_participant_next (struct ephash_enum_participant *st)
struct participant *entidx_enum_participant_next (struct entidx_enum_participant *st)
{
assert (offsetof (struct participant, e) == 0);
return ephash_enum_next (&st->st);
return entidx_enum_next (&st->st);
}
struct proxy_participant *ephash_enum_proxy_participant_next (struct ephash_enum_proxy_participant *st)
struct proxy_participant *entidx_enum_proxy_participant_next (struct entidx_enum_proxy_participant *st)
{
assert (offsetof (struct proxy_participant, e) == 0);
return ephash_enum_next (&st->st);
return entidx_enum_next (&st->st);
}
void ephash_enum_fini (struct ephash_enum *st)
void entidx_enum_fini (struct entidx_enum *st)
{
assert (ddsrt_atomic_ld32 (&lookup_thread_state ()->vtime) == st->vtime);
(void) st;
}
void ephash_enum_writer_fini (struct ephash_enum_writer *st)
void entidx_enum_writer_fini (struct entidx_enum_writer *st)
{
ephash_enum_fini (&st->st);
entidx_enum_fini (&st->st);
}
void ephash_enum_reader_fini (struct ephash_enum_reader *st)
void entidx_enum_reader_fini (struct entidx_enum_reader *st)
{
ephash_enum_fini (&st->st);
entidx_enum_fini (&st->st);
}
void ephash_enum_proxy_writer_fini (struct ephash_enum_proxy_writer *st)
void entidx_enum_proxy_writer_fini (struct entidx_enum_proxy_writer *st)
{
ephash_enum_fini (&st->st);
entidx_enum_fini (&st->st);
}
void ephash_enum_proxy_reader_fini (struct ephash_enum_proxy_reader *st)
void entidx_enum_proxy_reader_fini (struct entidx_enum_proxy_reader *st)
{
ephash_enum_fini (&st->st);
entidx_enum_fini (&st->st);
}
void ephash_enum_participant_fini (struct ephash_enum_participant *st)
void entidx_enum_participant_fini (struct entidx_enum_participant *st)
{
ephash_enum_fini (&st->st);
entidx_enum_fini (&st->st);
}
void ephash_enum_proxy_participant_fini (struct ephash_enum_proxy_participant *st)
void entidx_enum_proxy_participant_fini (struct entidx_enum_proxy_participant *st)
{
ephash_enum_fini (&st->st);
entidx_enum_fini (&st->st);
}

View file

@ -17,7 +17,7 @@
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_log.h"
@ -50,7 +50,7 @@ void write_pmd_message_guid (struct q_globals * const gv, struct ddsi_guid *pp_g
{
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, gv);
struct participant *pp = ephash_lookup_participant_guid (gv->guid_hash, pp_guid);
struct participant *pp = entidx_lookup_participant_guid (gv->entity_index, pp_guid);
if (pp == NULL)
GVTRACE ("write_pmd_message("PGUIDFMT") - builtin pmd writer not found\n", PGUID (*pp_guid));
else
@ -128,7 +128,7 @@ void handle_pmd_message (const struct receiver_state *rst, nn_wctime_t timestamp
debug_print_rawdata (rst->gv, "", pmd->value, length);
ppguid.prefix = p;
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
if ((proxypp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL)
if ((proxypp = entidx_lookup_proxy_participant_guid (rst->gv->entity_index, &ppguid)) == NULL)
RSTTRACE (" PPunknown");
else if (kind == PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE &&
(l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL)

View file

@ -34,7 +34,7 @@
#include "dds/ddsi/q_ddsi_discovery.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_xmsg.h"
@ -398,11 +398,11 @@ static unsigned pseudo_random_delay (const ddsi_guid_t *x, const ddsi_guid_t *y,
static void respond_to_spdp (const struct q_globals *gv, const ddsi_guid_t *dest_proxypp_guid)
{
struct ephash_enum_participant est;
struct entidx_enum_participant est;
struct participant *pp;
nn_mtime_t tnow = now_mt ();
ephash_enum_participant_init (&est, gv->guid_hash);
while ((pp = ephash_enum_participant_next (&est)) != NULL)
entidx_enum_participant_init (&est, gv->entity_index);
while ((pp = entidx_enum_participant_next (&est)) != NULL)
{
/* delay_base has 32 bits, so delay_norm is approximately 1s max;
delay_max <= 1s by gv.config checks */
@ -418,7 +418,7 @@ static void respond_to_spdp (const struct q_globals *gv, const ddsi_guid_t *dest
else
qxev_spdp (gv->xevents, tsched, &pp->e.guid, dest_proxypp_guid);
}
ephash_enum_participant_fini (&est);
entidx_enum_participant_fini (&est);
}
static int handle_SPDP_dead (const struct receiver_state *rst, nn_wctime_t timestamp, const nn_plist_t *datap, unsigned statusinfo)
@ -469,28 +469,28 @@ static void allowmulticast_aware_add_to_addrset (const struct q_globals *gv, uin
add_to_addrset (gv, as, loc);
}
static struct proxy_participant *find_ddsi2_proxy_participant (const struct ephash *guid_hash, const ddsi_guid_t *ppguid)
static struct proxy_participant *find_ddsi2_proxy_participant (const struct entity_index *entidx, const ddsi_guid_t *ppguid)
{
struct ephash_enum_proxy_participant it;
struct entidx_enum_proxy_participant it;
struct proxy_participant *pp;
ephash_enum_proxy_participant_init (&it, guid_hash);
while ((pp = ephash_enum_proxy_participant_next (&it)) != NULL)
entidx_enum_proxy_participant_init (&it, entidx);
while ((pp = entidx_enum_proxy_participant_next (&it)) != NULL)
{
if (vendor_is_eclipse_or_opensplice (pp->vendor) && pp->e.guid.prefix.u[0] == ppguid->prefix.u[0] && pp->is_ddsi2_pp)
break;
}
ephash_enum_proxy_participant_fini (&it);
entidx_enum_proxy_participant_fini (&it);
return pp;
}
static void make_participants_dependent_on_ddsi2 (struct q_globals *gv, const ddsi_guid_t *ddsi2guid, nn_wctime_t timestamp)
{
struct ephash_enum_proxy_participant it;
struct entidx_enum_proxy_participant it;
struct proxy_participant *pp, *d2pp;
if ((d2pp = ephash_lookup_proxy_participant_guid (gv->guid_hash, ddsi2guid)) == NULL)
if ((d2pp = entidx_lookup_proxy_participant_guid (gv->entity_index, ddsi2guid)) == NULL)
return;
ephash_enum_proxy_participant_init (&it, gv->guid_hash);
while ((pp = ephash_enum_proxy_participant_next (&it)) != NULL)
entidx_enum_proxy_participant_init (&it, gv->entity_index);
while ((pp = entidx_enum_proxy_participant_next (&it)) != NULL)
{
if (vendor_is_eclipse_or_opensplice (pp->vendor) && pp->e.guid.prefix.u[0] == ddsi2guid->prefix.u[0] && !pp->is_ddsi2_pp)
{
@ -501,7 +501,7 @@ static void make_participants_dependent_on_ddsi2 (struct q_globals *gv, const dd
proxy_participant_reassign_lease (pp, d2pp->lease);
GVTRACE ("\n");
if (ephash_lookup_proxy_participant_guid (gv->guid_hash, ddsi2guid) == NULL)
if (entidx_lookup_proxy_participant_guid (gv->entity_index, ddsi2guid) == NULL)
{
/* If DDSI2 has been deleted here (i.e., very soon after
having been created), we don't know whether pp will be
@ -510,7 +510,7 @@ static void make_participants_dependent_on_ddsi2 (struct q_globals *gv, const dd
}
}
}
ephash_enum_proxy_participant_fini (&it);
entidx_enum_proxy_participant_fini (&it);
if (pp != NULL)
{
@ -589,7 +589,7 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_
/* Do we know this GUID already? */
{
struct entity_common *existing_entity;
if ((existing_entity = ephash_lookup_guid_untyped (gv->guid_hash, &datap->participant_guid)) == NULL)
if ((existing_entity = entidx_lookup_guid_untyped (gv->entity_index, &datap->participant_guid)) == NULL)
{
/* Local SPDP packets may be looped back, and that can include ones
for participants currently being deleted. The first thing that
@ -674,7 +674,7 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_
the SPDP writer differs from the guid prefix of the new participant,
we make it dependent on the writer's participant. See also the
lease expiration handling. Note that the entityid MUST be
NN_ENTITYID_PARTICIPANT or ephash_lookup will assert. So we only
NN_ENTITYID_PARTICIPANT or entidx_lookup will assert. So we only
zero the prefix. */
privileged_pp_guid.prefix = rst->src_guid_prefix;
privileged_pp_guid.entityid.u = NN_ENTITYID_PARTICIPANT;
@ -691,7 +691,7 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_
/* Non-DDSI2 participants are made dependent on DDSI2 (but DDSI2
itself need not be discovered yet) */
struct proxy_participant *ddsi2;
if ((ddsi2 = find_ddsi2_proxy_participant (gv->guid_hash, &datap->participant_guid)) == NULL)
if ((ddsi2 = find_ddsi2_proxy_participant (gv->entity_index, &datap->participant_guid)) == NULL)
memset (&privileged_pp_guid.prefix, 0, sizeof (privileged_pp_guid.prefix));
else
{
@ -810,7 +810,7 @@ static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_
/* If we just created a participant dependent on DDSI2, make sure
DDSI2 still exists. There is a risk of racing the lease expiry
of DDSI2. */
if (ephash_lookup_proxy_participant_guid (gv->guid_hash, &privileged_pp_guid) == NULL)
if (entidx_lookup_proxy_participant_guid (gv->entity_index, &privileged_pp_guid) == NULL)
{
GVLOGDISC ("make_participants_dependent_on_ddsi2: ddsi2 "PGUIDFMT" is no more, delete "PGUIDFMT"\n",
PGUID (privileged_pp_guid), PGUID (datap->participant_guid));
@ -963,7 +963,7 @@ static int sedp_write_endpoint
the default. */
if (!is_writer_entityid (epguid->entityid))
{
const struct reader *rd = ephash_lookup_reader_guid (gv->guid_hash, epguid);
const struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, epguid);
assert (rd);
if (rd->favours_ssm)
{
@ -1117,7 +1117,7 @@ static struct proxy_participant *implicitly_create_proxypp (struct q_globals *gv
readers or writers, only if remote ddsi2 is provably running
with a minimal built-in endpoint set */
struct proxy_participant *privpp;
if ((privpp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &privguid)) == NULL) {
if ((privpp = entidx_lookup_proxy_participant_guid (gv->entity_index, &privguid)) == NULL) {
GVTRACE (" unknown-src-proxypp?\n");
goto err;
} else if (!privpp->is_ddsi2_pp) {
@ -1149,7 +1149,7 @@ static struct proxy_participant *implicitly_create_proxypp (struct q_globals *gv
err:
nn_plist_fini (&pp_plist);
return ephash_lookup_proxy_participant_guid (gv->guid_hash, ppguid);
return entidx_lookup_proxy_participant_guid (gv->entity_index, ppguid);
}
static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn_plist_t *datap /* note: potentially modifies datap */, const ddsi_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp)
@ -1179,7 +1179,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn
if (is_deleted_participant_guid (gv->deleted_participants, &ppguid, DPG_REMOTE))
E (" local dead pp?\n", err);
if (ephash_lookup_participant_guid (gv->guid_hash, &ppguid) != NULL)
if (entidx_lookup_participant_guid (gv->entity_index, &ppguid) != NULL)
E (" local pp?\n", err);
if (is_builtin_entityid (datap->endpoint_guid.entityid, vendorid))
@ -1189,7 +1189,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn
if (!(datap->qos.present & QP_TYPE_NAME))
E (" no typename?\n", err);
if ((pp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &ppguid)) == NULL)
if ((pp = entidx_lookup_proxy_participant_guid (gv->entity_index, &ppguid)) == NULL)
{
GVLOGDISC (" unknown-proxypp");
if ((pp = implicitly_create_proxypp (gv, &ppguid, datap, src_guid_prefix, vendorid, timestamp, 0)) == NULL)
@ -1231,11 +1231,11 @@ static void handle_SEDP_alive (const struct receiver_state *rst, seqno_t seq, nn
if (is_writer)
{
pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, &datap->endpoint_guid);
pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, &datap->endpoint_guid);
}
else
{
prd = ephash_lookup_proxy_reader_guid (gv->guid_hash, &datap->endpoint_guid);
prd = entidx_lookup_proxy_reader_guid (gv->entity_index, &datap->endpoint_guid);
}
if (pwr || prd)
{
@ -1534,7 +1534,7 @@ static void handle_SEDP_CM (const struct receiver_state *rst, ddsi_entityid_t wr
GVWARNING ("SEDP_CM (vendor %u.%u): missing participant GUID\n", src.vendorid.id[0], src.vendorid.id[1]);
else
{
if ((proxypp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &decoded_data.participant_guid)) == NULL)
if ((proxypp = entidx_lookup_proxy_participant_guid (gv->entity_index, &decoded_data.participant_guid)) == NULL)
proxypp = implicitly_create_proxypp (gv, &decoded_data.participant_guid, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp, 0);
if (proxypp != NULL)
update_proxy_participant_plist (proxypp, 0, &decoded_data, UPD_PROXYPP_CM, timestamp);

View file

@ -26,7 +26,7 @@
#include "dds/ddsi/q_misc.h"
#include "dds/ddsi/q_log.h"
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_addrset.h"
#include "dds/ddsi/q_radmin.h"
@ -144,22 +144,22 @@ static int print_proxy_endpoint_common (ddsi_tran_conn_t conn, const char *label
static int print_participants (struct thread_state1 * const ts1, struct q_globals *gv, ddsi_tran_conn_t conn)
{
struct ephash_enum_participant e;
struct entidx_enum_participant e;
struct participant *p;
int x = 0;
thread_state_awake_fixed_domain (ts1);
ephash_enum_participant_init (&e, gv->guid_hash);
while ((p = ephash_enum_participant_next (&e)) != NULL)
entidx_enum_participant_init (&e, gv->entity_index);
while ((p = entidx_enum_participant_next (&e)) != NULL)
{
ddsrt_mutex_lock (&p->e.lock);
x += cpf (conn, "pp "PGUIDFMT" %s%s\n", PGUID (p->e.guid), p->e.name, p->is_ddsi2_pp ? " [ddsi2]" : "");
ddsrt_mutex_unlock (&p->e.lock);
{
struct ephash_enum_reader er;
struct entidx_enum_reader er;
struct reader *r;
ephash_enum_reader_init (&er, gv->guid_hash);
while ((r = ephash_enum_reader_next (&er)) != NULL)
entidx_enum_reader_init (&er, gv->entity_index);
while ((r = entidx_enum_reader_next (&er)) != NULL)
{
ddsrt_avl_iter_t writ;
struct rd_pwr_match *m;
@ -174,14 +174,14 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global
x += cpf (conn, " pwr "PGUIDFMT"\n", PGUID (m->pwr_guid));
ddsrt_mutex_unlock (&r->e.lock);
}
ephash_enum_reader_fini (&er);
entidx_enum_reader_fini (&er);
}
{
struct ephash_enum_writer ew;
struct entidx_enum_writer ew;
struct writer *w;
ephash_enum_writer_init (&ew, gv->guid_hash);
while ((w = ephash_enum_writer_next (&ew)) != NULL)
entidx_enum_writer_init (&ew, gv->entity_index);
while ((w = entidx_enum_writer_next (&ew)) != NULL)
{
ddsrt_avl_iter_t rdit;
struct wr_prd_match *m;
@ -219,22 +219,22 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global
}
ddsrt_mutex_unlock (&w->e.lock);
}
ephash_enum_writer_fini (&ew);
entidx_enum_writer_fini (&ew);
}
}
ephash_enum_participant_fini (&e);
entidx_enum_participant_fini (&e);
thread_state_asleep (ts1);
return x;
}
static int print_proxy_participants (struct thread_state1 * const ts1, struct q_globals *gv, ddsi_tran_conn_t conn)
{
struct ephash_enum_proxy_participant e;
struct entidx_enum_proxy_participant e;
struct proxy_participant *p;
int x = 0;
thread_state_awake_fixed_domain (ts1);
ephash_enum_proxy_participant_init (&e, gv->guid_hash);
while ((p = ephash_enum_proxy_participant_next (&e)) != NULL)
entidx_enum_proxy_participant_init (&e, gv->entity_index);
while ((p = entidx_enum_proxy_participant_next (&e)) != NULL)
{
ddsrt_mutex_lock (&p->e.lock);
x += cpf (conn, "proxypp "PGUIDFMT"%s\n", PGUID (p->e.guid), p->is_ddsi2_pp ? " [ddsi2]" : "");
@ -243,10 +243,10 @@ static int print_proxy_participants (struct thread_state1 * const ts1, struct q_
x += print_addrset (conn, " meta", p->as_default, "\n");
{
struct ephash_enum_proxy_reader er;
struct entidx_enum_proxy_reader er;
struct proxy_reader *r;
ephash_enum_proxy_reader_init (&er, gv->guid_hash);
while ((r = ephash_enum_proxy_reader_next (&er)) != NULL)
entidx_enum_proxy_reader_init (&er, gv->entity_index);
while ((r = entidx_enum_proxy_reader_next (&er)) != NULL)
{
ddsrt_avl_iter_t writ;
struct prd_wr_match *m;
@ -258,14 +258,14 @@ static int print_proxy_participants (struct thread_state1 * const ts1, struct q_
x += cpf (conn, " wr "PGUIDFMT"\n", PGUID (m->wr_guid));
ddsrt_mutex_unlock (&r->e.lock);
}
ephash_enum_proxy_reader_fini (&er);
entidx_enum_proxy_reader_fini (&er);
}
{
struct ephash_enum_proxy_writer ew;
struct entidx_enum_proxy_writer ew;
struct proxy_writer *w;
ephash_enum_proxy_writer_init (&ew, gv->guid_hash);
while ((w = ephash_enum_proxy_writer_next (&ew)) != NULL)
entidx_enum_proxy_writer_init (&ew, gv->entity_index);
while ((w = entidx_enum_proxy_writer_next (&ew)) != NULL)
{
ddsrt_avl_iter_t rdit;
struct pwr_rd_match *m;
@ -292,10 +292,10 @@ static int print_proxy_participants (struct thread_state1 * const ts1, struct q_
}
ddsrt_mutex_unlock (&w->e.lock);
}
ephash_enum_proxy_writer_fini (&ew);
entidx_enum_proxy_writer_fini (&ew);
}
}
ephash_enum_proxy_participant_fini (&e);
entidx_enum_proxy_participant_fini (&e);
thread_state_asleep (ts1);
return x;
}

View file

@ -30,7 +30,7 @@
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_qosmatch.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_addrset.h"
#include "dds/ddsi/q_xevent.h" /* qxev_spdp, &c. */
@ -494,7 +494,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
/* Participant may not exist yet, but this test is imprecise: if it
used to exist, but is currently being deleted and we're trying to
recreate it. */
if (ephash_lookup_participant_guid (gv->guid_hash, ppguid) != NULL)
if (entidx_lookup_participant_guid (gv->entity_index, ppguid) != NULL)
return DDS_RETCODE_PRECONDITION_NOT_MET;
if (gv->config.max_participants == 0)
@ -576,7 +576,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
/* But we need the as_disc address set for SPDP, because we need to
send it to everyone regardless of the existence of readers. */
{
struct writer *wr = ephash_lookup_writer_guid (gv->guid_hash, &subguid);
struct writer *wr = entidx_lookup_writer_guid (gv->entity_index, &subguid);
assert (wr != NULL);
ddsrt_mutex_lock (&wr->e.lock);
unref_addrset (wr->as);
@ -589,7 +589,7 @@ dds_return_t new_participant_guid (const ddsi_guid_t *ppguid, struct q_globals *
}
/* Make it globally visible, else the endpoint matching won't work. */
ephash_insert_participant_guid (gv->guid_hash, pp);
entidx_insert_participant_guid (gv->entity_index, pp);
/* SEDP writers: */
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
@ -846,7 +846,7 @@ static void unref_participant (struct participant *pp, const struct ddsi_guid *g
The conditional execution of some of this is so we can use
unref_participant() for some of the error handling in
new_participant(). Non-existent built-in endpoints can't be
found in guid_hash and are simply ignored. */
found in entity_index and are simply ignored. */
pp->builtins_deleted = 1;
ddsrt_mutex_unlock (&pp->refc_lock);
@ -942,11 +942,11 @@ dds_return_t delete_participant (struct q_globals *gv, const struct ddsi_guid *p
{
struct participant *pp;
GVLOGDISC ("delete_participant("PGUIDFMT")\n", PGUID (*ppguid));
if ((pp = ephash_lookup_participant_guid (gv->guid_hash, ppguid)) == NULL)
if ((pp = entidx_lookup_participant_guid (gv->entity_index, ppguid)) == NULL)
return DDS_RETCODE_BAD_PARAMETER;
builtintopic_write (gv->builtin_topic_interface, &pp->e, now(), false);
remember_deleted_participant_guid (gv->deleted_participants, &pp->e.guid);
ephash_remove_participant_guid (gv->guid_hash, pp);
entidx_remove_participant_guid (gv->entity_index, pp);
gcreq_participant (pp);
return 0;
}
@ -1017,7 +1017,7 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
bwr_guid.entityid.u = entityid;
}
return ephash_lookup_writer_guid (pp->e.gv->guid_hash, &bwr_guid);
return entidx_lookup_writer_guid (pp->e.gv->entity_index, &bwr_guid);
}
dds_duration_t pp_get_pmd_interval (struct participant *pp)
@ -1072,7 +1072,7 @@ static int rebuild_compare_locs(const void *va, const void *vb)
static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr)
{
struct addrset *all_addrs = new_addrset();
struct ephash *gh = wr->e.gv->guid_hash;
struct entity_index *gh = wr->e.gv->entity_index;
struct wr_prd_match *m;
ddsrt_avl_iter_t it;
#ifdef DDSI_INCLUDE_SSM
@ -1083,7 +1083,7 @@ static struct addrset *rebuild_make_all_addrs (int *nreaders, struct writer *wr)
for (m = ddsrt_avl_iter_first (&wr_readers_treedef, &wr->readers, &it); m; m = ddsrt_avl_iter_next (&it))
{
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) == NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) == NULL)
continue;
(*nreaders)++;
copy_addrset_into_addrset(wr->e.gv, all_addrs, prd->c.as);
@ -1132,7 +1132,7 @@ static void rebuild_make_locs(const struct ddsrt_log_cfg *logcfg, int *p_nlocs,
static void rebuild_make_covered(int8_t **covered, const struct writer *wr, int *nreaders, int nlocs, const nn_locator_t *locs)
{
struct rebuild_flatten_locs_arg flarg;
struct ephash *gh = wr->e.gv->guid_hash;
struct entity_index *gh = wr->e.gv->entity_index;
struct wr_prd_match *m;
ddsrt_avl_iter_t it;
int rdidx, i, j;
@ -1148,7 +1148,7 @@ static void rebuild_make_covered(int8_t **covered, const struct writer *wr, int
{
struct proxy_reader *prd;
struct addrset *ass[] = { NULL, NULL, NULL };
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) == NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gh, &m->prd_guid)) == NULL)
continue;
ass[0] = prd->c.as;
#ifdef DDSI_INCLUDE_SSM
@ -1349,12 +1349,12 @@ static void rebuild_writer_addrset (struct writer *wr)
void rebuild_or_clear_writer_addrsets (struct q_globals *gv, int rebuild)
{
struct ephash_enum_writer est;
struct entidx_enum_writer est;
struct writer *wr;
struct addrset *empty = rebuild ? NULL : new_addrset();
GVLOGDISC ("rebuild_or_delete_writer_addrsets(%d)\n", rebuild);
ephash_enum_writer_init (&est, gv->guid_hash);
while ((wr = ephash_enum_writer_next (&est)) != NULL)
entidx_enum_writer_init (&est, gv->entity_index);
while ((wr = entidx_enum_writer_next (&est)) != NULL)
{
ddsrt_mutex_lock (&wr->e.lock);
if (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
@ -1377,7 +1377,7 @@ void rebuild_or_clear_writer_addrsets (struct q_globals *gv, int rebuild)
}
ddsrt_mutex_unlock (&wr->e.lock);
}
ephash_enum_writer_fini (&est);
entidx_enum_writer_fini (&est);
unref_addrset(empty);
GVLOGDISC ("rebuild_or_delete_writer_addrsets(%d) done\n", rebuild);
}
@ -1452,7 +1452,7 @@ static void proxy_writer_get_alive_state (struct proxy_writer *pwr, struct proxy
static void writer_drop_connection (const struct ddsi_guid *wr_guid, const struct proxy_reader *prd)
{
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (prd->e.gv->guid_hash, wr_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (prd->e.gv->entity_index, wr_guid)) != NULL)
{
struct whc_node *deferred_free_list = NULL;
struct wr_prd_match *m;
@ -1483,7 +1483,7 @@ static void writer_drop_local_connection (const struct ddsi_guid *wr_guid, struc
{
/* Only called by gc_delete_reader, so we actually have a reader pointer */
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (rd->e.gv->guid_hash, wr_guid)) != NULL)
if ((wr = entidx_lookup_writer_guid (rd->e.gv->entity_index, wr_guid)) != NULL)
{
struct wr_rd_match *m;
@ -1551,14 +1551,14 @@ static void reader_update_notify_pwr_alive_state (struct reader *rd, const struc
static void reader_update_notify_pwr_alive_state_guid (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, const struct proxy_writer_alive_state *alive_state)
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, rd_guid)) != NULL)
reader_update_notify_pwr_alive_state (rd, pwr, alive_state);
}
static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr)
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, rd_guid)) != NULL)
{
struct rd_pwr_match *m;
ddsrt_mutex_lock (&rd->e.lock);
@ -1594,7 +1594,7 @@ static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struc
static void reader_drop_local_connection (const struct ddsi_guid *rd_guid, const struct writer *wr)
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (wr->e.gv->guid_hash, rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (wr->e.gv->entity_index, rd_guid)) != NULL)
{
struct rd_wr_match *m;
ddsrt_mutex_lock (&rd->e.lock);
@ -1628,14 +1628,14 @@ static void reader_drop_local_connection (const struct ddsi_guid *rd_guid, const
}
}
static void update_reader_init_acknack_count (const ddsrt_log_cfg_t *logcfg, const struct ephash *guid_hash, const struct ddsi_guid *rd_guid, nn_count_t count)
static void update_reader_init_acknack_count (const ddsrt_log_cfg_t *logcfg, const struct entity_index *entidx, const struct ddsi_guid *rd_guid, nn_count_t count)
{
struct reader *rd;
/* Update the initial acknack sequence number for the reader. See
also reader_add_connection(). */
DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "update_reader_init_acknack_count ("PGUIDFMT", %"PRId32"): ", PGUID (*rd_guid), count);
if ((rd = ephash_lookup_reader_guid (guid_hash, rd_guid)) != NULL)
if ((rd = entidx_lookup_reader_guid (entidx, rd_guid)) != NULL)
{
ddsrt_mutex_lock (&rd->e.lock);
DDS_CLOG (DDS_LC_DISCOVERY, logcfg, "%"PRId32" -> ", rd->init_acknack_count);
@ -1654,7 +1654,7 @@ static void proxy_writer_drop_connection (const struct ddsi_guid *pwr_guid, stru
{
/* Only called by gc_delete_reader, so we actually have a reader pointer */
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (rd->e.gv->guid_hash, pwr_guid)) != NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (rd->e.gv->entity_index, pwr_guid)) != NULL)
{
struct pwr_rd_match *m;
@ -1679,7 +1679,7 @@ static void proxy_writer_drop_connection (const struct ddsi_guid *pwr_guid, stru
ddsrt_mutex_unlock (&pwr->e.lock);
if (m != NULL)
{
update_reader_init_acknack_count (&rd->e.gv->logconfig, rd->e.gv->guid_hash, &rd->e.guid, m->count);
update_reader_init_acknack_count (&rd->e.gv->logconfig, rd->e.gv->entity_index, &rd->e.guid, m->count);
}
free_pwr_rd_match (m);
}
@ -1688,7 +1688,7 @@ static void proxy_writer_drop_connection (const struct ddsi_guid *pwr_guid, stru
static void proxy_reader_drop_connection (const struct ddsi_guid *prd_guid, struct writer *wr)
{
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (wr->e.gv->guid_hash, prd_guid)) != NULL)
if ((prd = entidx_lookup_proxy_reader_guid (wr->e.gv->entity_index, prd_guid)) != NULL)
{
struct prd_wr_match *m;
ddsrt_mutex_lock (&prd->e.lock);
@ -2426,8 +2426,8 @@ static void generic_do_match (struct entity_common *e, nn_mtime_t tnow, bool loc
};
enum entity_kind mkind = generic_do_match_mkind (e->kind, local);
struct ephash const * const guid_hash = e->gv->guid_hash;
struct ephash_enum it;
struct entity_index const * const entidx = e->gv->entity_index;
struct entidx_enum it;
struct entity_common *em;
if (!is_builtin_entityid (e->guid.entityid, NN_VENDORID_ECLIPSE) || (local && is_local_orphan_endpoint (e)))
@ -2447,10 +2447,10 @@ static void generic_do_match (struct entity_common *e, nn_mtime_t tnow, bool loc
deleted between our calling init and our reaching it while
enumerating), but we may visit a single proxy reader multiple
times. */
ephash_enum_init_topic (&it, guid_hash, mkind, tp, &max);
while ((em = ephash_enum_next_max (&it, &max)) != NULL)
entidx_enum_init_topic (&it, entidx, mkind, tp, &max);
while ((em = entidx_enum_next_max (&it, &max)) != NULL)
generic_do_match_connect (e, em, tnow, local);
ephash_enum_fini (&it);
entidx_enum_fini (&it);
}
else if (!local)
{
@ -2467,15 +2467,15 @@ static void generic_do_match (struct entity_common *e, nn_mtime_t tnow, bool loc
isproxy ? "" : "proxy ", tgt_ent.u);
if (tgt_ent.u != NN_ENTITYID_UNKNOWN)
{
ephash_enum_init (&it, guid_hash, pkind);
while ((em = ephash_enum_next (&it)) != NULL)
entidx_enum_init (&it, entidx, pkind);
while ((em = entidx_enum_next (&it)) != NULL)
{
const ddsi_guid_t tgt_guid = { em->guid.prefix, tgt_ent };
struct entity_common *ep;
if ((ep = ephash_lookup_guid (guid_hash, &tgt_guid, mkind)) != NULL)
if ((ep = entidx_lookup_guid (entidx, &tgt_guid, mkind)) != NULL)
generic_do_match_connect (e, ep, tnow, local);
}
ephash_enum_fini (&it);
entidx_enum_fini (&it);
}
}
}
@ -2937,7 +2937,7 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct ddsi_g
nn_mtime_t tnow = now_mt ();
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_writer_guid (pp->e.gv->guid_hash, guid) == NULL);
assert (entidx_lookup_writer_guid (pp->e.gv->entity_index, guid) == NULL);
assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0);
new_reader_writer_common (&pp->e.gv->logconfig, guid, topic, xqos);
@ -2953,13 +2953,13 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct ddsi_g
endpoint_common_init (&wr->e, &wr->c, pp->e.gv, EK_WRITER, guid, group_guid, pp, onlylocal);
new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity);
/* guid_hash needed for protocol handling, so add it before we send
/* entity_index needed for protocol handling, so add it before we send
out our first message. Also: needed for matching, and swapping
the order if hash insert & matching creates a window during which
neither of two endpoints being created in parallel can discover
the other. */
ddsrt_mutex_lock (&wr->e.lock);
ephash_insert_writer_guid (pp->e.gv->guid_hash, wr);
entidx_insert_writer_guid (pp->e.gv->entity_index, wr);
builtintopic_write (wr->e.gv->builtin_topic_interface, &wr->e, now(), true);
ddsrt_mutex_unlock (&wr->e.lock);
@ -2997,14 +2997,14 @@ dds_return_t new_writer (struct writer **wr_out, struct q_globals *gv, struct dd
dds_return_t rc;
uint32_t kind;
if ((pp = ephash_lookup_participant_guid (gv->guid_hash, ppguid)) == NULL)
if ((pp = entidx_lookup_participant_guid (gv->entity_index, ppguid)) == NULL)
{
GVLOGDISC ("new_writer - participant "PGUIDFMT" not found\n", PGUID (*ppguid));
return DDS_RETCODE_BAD_PARAMETER;
}
/* participant can't be freed while we're mucking around cos we are
awake and do not touch the thread's vtime (ephash_lookup already
awake and do not touch the thread's vtime (entidx_lookup already
verifies we're awake) */
wrguid->prefix = pp->e.guid.prefix;
kind = topic->topickind_no_key ? NN_ENTITYID_KIND_WRITER_NO_KEY : NN_ENTITYID_KIND_WRITER_WITH_KEY;
@ -3030,7 +3030,7 @@ struct local_orphan_writer *new_local_orphan_writer (struct q_globals *gv, ddsi_
wr->c.pp = NULL;
memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid));
new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL);
ephash_insert_writer_guid (gv->guid_hash, wr);
entidx_insert_writer_guid (gv->entity_index, wr);
builtintopic_write (gv->builtin_topic_interface, &wr->e, now(), true);
match_writer_with_local_readers (wr, tnow);
return lowr;
@ -3061,7 +3061,7 @@ static void gc_delete_writer (struct gcreq *gcreq)
}
/* Tear down connections -- no proxy reader can be adding/removing
us now, because we can't be found via guid_hash anymore. We
us now, because we can't be found via entity_index anymore. We
therefore need not take lock. */
while (!ddsrt_avl_is_empty (&wr->readers))
@ -3143,7 +3143,7 @@ dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_g
{
struct writer *wr;
assert (is_writer_entityid (guid->entityid));
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, guid)) == NULL)
if ((wr = entidx_lookup_writer_guid (gv->entity_index, guid)) == NULL)
{
GVLOGDISC ("unblock_throttled_writer(guid "PGUIDFMT") - unknown guid\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
@ -3161,7 +3161,7 @@ dds_return_t delete_writer_nolinger_locked (struct writer *wr)
ASSERT_MUTEX_HELD (&wr->e.lock);
builtintopic_write (wr->e.gv->builtin_topic_interface, &wr->e, now(), false);
local_reader_ary_setinvalid (&wr->rdary);
ephash_remove_writer_guid (wr->e.gv->guid_hash, wr);
entidx_remove_writer_guid (wr->e.gv->entity_index, wr);
writer_set_state (wr, WRST_DELETING);
if (wr->lease_duration != NULL) {
ddsrt_mutex_lock (&wr->c.pp->e.lock);
@ -3184,7 +3184,7 @@ dds_return_t delete_writer_nolinger (struct q_globals *gv, const struct ddsi_gui
DDSI participants. But it would be somewhat more elegant to do it
differently. */
assert (is_writer_entityid (guid->entityid));
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, guid)) == NULL)
if ((wr = entidx_lookup_writer_guid (gv->entity_index, guid)) == NULL)
{
GVLOGDISC ("delete_writer_nolinger(guid "PGUIDFMT") - unknown guid\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
@ -3208,7 +3208,7 @@ dds_return_t delete_writer (struct q_globals *gv, const struct ddsi_guid *guid)
{
struct writer *wr;
struct whc_state whcst;
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, guid)) == NULL)
if ((wr = entidx_lookup_writer_guid (gv->entity_index, guid)) == NULL)
{
GVLOGDISC ("delete_writer(guid "PGUIDFMT") - unknown guid\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
@ -3368,7 +3368,7 @@ static dds_return_t new_reader_guid
nn_mtime_t tnow = now_mt ();
assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_reader_guid (pp->e.gv->guid_hash, guid) == NULL);
assert (entidx_lookup_reader_guid (pp->e.gv->entity_index, guid) == NULL);
assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0);
new_reader_writer_common (&pp->e.gv->logconfig, guid, topic, xqos);
@ -3475,7 +3475,7 @@ static dds_return_t new_reader_guid
ddsrt_avl_init (&rd_local_writers_treedef, &rd->local_writers);
ddsrt_mutex_lock (&rd->e.lock);
ephash_insert_reader_guid (pp->e.gv->guid_hash, rd);
entidx_insert_reader_guid (pp->e.gv->entity_index, rd);
builtintopic_write (pp->e.gv->builtin_topic_interface, &rd->e, now(), true);
ddsrt_mutex_unlock (&rd->e.lock);
@ -3503,7 +3503,7 @@ dds_return_t new_reader
dds_return_t rc;
uint32_t kind;
if ((pp = ephash_lookup_participant_guid (gv->guid_hash, ppguid)) == NULL)
if ((pp = entidx_lookup_participant_guid (gv->entity_index, ppguid)) == NULL)
{
GVLOGDISC ("new_reader - participant "PGUIDFMT" not found\n", PGUID (*ppguid));
return DDS_RETCODE_BAD_PARAMETER;
@ -3571,14 +3571,14 @@ dds_return_t delete_reader (struct q_globals *gv, const struct ddsi_guid *guid)
{
struct reader *rd;
assert (!is_writer_entityid (guid->entityid));
if ((rd = ephash_lookup_reader_guid (gv->guid_hash, guid)) == NULL)
if ((rd = entidx_lookup_reader_guid (gv->entity_index, guid)) == NULL)
{
GVLOGDISC ("delete_reader_guid(guid "PGUIDFMT") - unknown guid\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
}
GVLOGDISC ("delete_reader_guid(guid "PGUIDFMT") ...\n", PGUID (*guid));
builtintopic_write (rd->e.gv->builtin_topic_interface, &rd->e, now(), false);
ephash_remove_reader_guid (gv->guid_hash, rd);
entidx_remove_reader_guid (gv->entity_index, rd);
gcreq_reader (rd);
return 0;
}
@ -3746,7 +3746,7 @@ void new_proxy_participant
struct proxy_participant *proxypp;
assert (ppguid->entityid.u == NN_ENTITYID_PARTICIPANT);
assert (ephash_lookup_proxy_participant_guid (gv->guid_hash, ppguid) == NULL);
assert (entidx_lookup_proxy_participant_guid (gv->entity_index, ppguid) == NULL);
assert (privileged_pp_guid == NULL || privileged_pp_guid->entityid.u == NN_ENTITYID_PARTICIPANT);
prune_deleted_participant_guids (gv->deleted_participants, now_mt ());
@ -3781,7 +3781,7 @@ void new_proxy_participant
{
struct proxy_participant *privpp;
privpp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &proxypp->privileged_pp_guid);
privpp = entidx_lookup_proxy_participant_guid (gv->entity_index, &proxypp->privileged_pp_guid);
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_auto);
ddsrt_fibheap_init (&lease_fhdef_proxypp, &proxypp->leaseheap_man);
@ -3852,7 +3852,7 @@ void new_proxy_participant
/* Proxy participant must be in the hash tables for
new_proxy_{writer,reader} to work */
ephash_insert_proxy_participant_guid (gv->guid_hash, proxypp);
entidx_insert_proxy_participant_guid (gv->entity_index, proxypp);
/* Add proxy endpoints based on the advertised (& possibly augmented
...) built-in endpoint set. */
@ -3928,7 +3928,7 @@ void new_proxy_participant
/* Register lease for auto liveliness, but be careful not to accidentally re-register
DDSI2's lease, as we may have become dependent on DDSI2 any time after
ephash_insert_proxy_participant_guid even if privileged_pp_guid was NULL originally */
entidx_insert_proxy_participant_guid even if privileged_pp_guid was NULL originally */
ddsrt_mutex_lock (&proxypp->e.lock);
if (proxypp->owns_lease)
lease_register (ddsrt_atomic_ldvoidp (&proxypp->minl_auto));
@ -4110,12 +4110,12 @@ static void delete_ppt (struct proxy_participant *proxypp, nn_wctime_t timestamp
/* if any proxy participants depend on this participant, delete them */
ELOGDISC (proxypp, "delete_ppt("PGUIDFMT") - deleting dependent proxy participants\n", PGUID (proxypp->e.guid));
{
struct ephash_enum_proxy_participant est;
struct entidx_enum_proxy_participant est;
struct proxy_participant *p;
ephash_enum_proxy_participant_init (&est, proxypp->e.gv->guid_hash);
while ((p = ephash_enum_proxy_participant_next (&est)) != NULL)
entidx_enum_proxy_participant_init (&est, proxypp->e.gv->entity_index);
while ((p = entidx_enum_proxy_participant_next (&est)) != NULL)
delete_or_detach_dependent_pp(p, proxypp, timestamp, isimplicit);
ephash_enum_proxy_participant_fini (&est);
entidx_enum_proxy_participant_fini (&est);
}
ddsrt_mutex_lock (&proxypp->e.lock);
@ -4171,16 +4171,16 @@ void purge_proxy_participants (struct q_globals *gv, const nn_locator_t *loc, bo
/* FIXME: check whether addr:port can't be reused for a new connection by the time we get here. */
/* NOTE: This function exists for the sole purpose of cleaning up after closing a TCP connection in ddsi_tcp_close_conn and the state of the calling thread could be anything at this point. Because of that we do the unspeakable and toggle the thread state conditionally. We can't afford to have it in "asleep", as that causes a race with the garbage collector. */
struct thread_state1 * const ts1 = lookup_thread_state ();
struct ephash_enum_proxy_participant est;
struct entidx_enum_proxy_participant est;
struct proxy_purge_data data;
thread_state_awake_fixed_domain (ts1);
data.loc = loc;
data.timestamp = now();
ephash_enum_proxy_participant_init (&est, gv->guid_hash);
while ((data.proxypp = ephash_enum_proxy_participant_next (&est)) != NULL)
entidx_enum_proxy_participant_init (&est, gv->entity_index);
while ((data.proxypp = entidx_enum_proxy_participant_next (&est)) != NULL)
addrset_forall (data.proxypp->as_meta, purge_helper, &data);
ephash_enum_proxy_participant_fini (&est);
entidx_enum_proxy_participant_fini (&est);
/* Shouldn't try to keep pinging clients once they're gone */
if (delete_from_as_disc)
@ -4195,7 +4195,7 @@ int delete_proxy_participant_by_guid (struct q_globals *gv, const struct ddsi_gu
GVLOGDISC ("delete_proxy_participant_by_guid("PGUIDFMT") ", PGUID (*guid));
ddsrt_mutex_lock (&gv->lock);
ppt = ephash_lookup_proxy_participant_guid (gv->guid_hash, guid);
ppt = entidx_lookup_proxy_participant_guid (gv->entity_index, guid);
if (ppt == NULL)
{
ddsrt_mutex_unlock (&gv->lock);
@ -4205,7 +4205,7 @@ int delete_proxy_participant_by_guid (struct q_globals *gv, const struct ddsi_gu
GVLOGDISC ("- deleting\n");
builtintopic_write (gv->builtin_topic_interface, &ppt->e, timestamp, false);
remember_deleted_participant_guid (gv->deleted_participants, &ppt->e.guid);
ephash_remove_proxy_participant_guid (gv->guid_hash, ppt);
entidx_remove_proxy_participant_guid (gv->entity_index, ppt);
ddsrt_mutex_unlock (&gv->lock);
delete_ppt (ppt, timestamp, isimplicit);
@ -4218,7 +4218,7 @@ uint64_t get_entity_instance_id (const struct q_globals *gv, const struct ddsi_g
struct entity_common *e;
uint64_t iid = 0;
thread_state_awake (ts1, gv);
if ((e = ephash_lookup_guid_untyped (gv->guid_hash, guid)) != NULL)
if ((e = entidx_lookup_guid_untyped (gv->entity_index, guid)) != NULL)
iid = e->iid;
thread_state_asleep (ts1);
return iid;
@ -4280,9 +4280,9 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
int ret;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_writer_guid (gv->guid_hash, guid) == NULL);
assert (entidx_lookup_proxy_writer_guid (gv->entity_index, guid) == NULL);
if ((proxypp = ephash_lookup_proxy_participant_guid (gv->guid_hash, ppguid)) == NULL)
if ((proxypp = entidx_lookup_proxy_participant_guid (gv->entity_index, ppguid)) == NULL)
{
GVWARNING ("new_proxy_writer("PGUIDFMT"): proxy participant unknown\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
@ -4366,7 +4366,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
/* locking the entity prevents matching while the built-in topic hasn't been published yet */
ddsrt_mutex_lock (&pwr->e.lock);
ephash_insert_proxy_writer_guid (gv->guid_hash, pwr);
entidx_insert_proxy_writer_guid (gv->entity_index, pwr);
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, true);
ddsrt_mutex_unlock (&pwr->e.lock);
@ -4402,7 +4402,7 @@ void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset
m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &iter);
while (m)
{
rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid);
rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &m->rd_guid);
if (rd)
{
qxev_pwr_entityid (pwr, &rd->e.guid.prefix);
@ -4456,7 +4456,7 @@ void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset
}
ddsrt_mutex_unlock (&prd->e.lock);
wr = ephash_lookup_writer_guid (prd->e.gv->guid_hash, &wrguid);
wr = entidx_lookup_writer_guid (prd->e.gv->entity_index, &wrguid);
if (wr)
{
ddsrt_mutex_lock (&wr->e.lock);
@ -4484,7 +4484,7 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
struct pwr_rd_match *m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers);
ddsrt_avl_delete (&pwr_readers_treedef, &pwr->readers, m);
reader_drop_connection (&m->rd_guid, pwr);
update_reader_init_acknack_count (&pwr->e.gv->logconfig, pwr->e.gv->guid_hash, &m->rd_guid, m->count);
update_reader_init_acknack_count (&pwr->e.gv->logconfig, pwr->e.gv->entity_index, &m->rd_guid, m->count);
free_pwr_rd_match (m);
}
local_reader_ary_fini (&pwr->rdary);
@ -4505,7 +4505,7 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
GVLOGDISC ("delete_proxy_writer ("PGUIDFMT") ", PGUID (*guid));
ddsrt_mutex_lock (&gv->lock);
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, guid)) == NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, guid)) == NULL)
{
ddsrt_mutex_unlock (&gv->lock);
GVLOGDISC ("- unknown\n");
@ -4519,7 +4519,7 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_
local_reader_ary_setinvalid (&pwr->rdary);
GVLOGDISC ("- deleting\n");
builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (gv->guid_hash, pwr);
entidx_remove_proxy_writer_guid (gv->entity_index, pwr);
ddsrt_mutex_unlock (&gv->lock);
if (pwr->c.xqos->liveliness.lease_duration != T_NEVER &&
pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC)
@ -4558,9 +4558,9 @@ void proxy_writer_set_alive_may_unlock (struct proxy_writer *pwr, bool notify)
assert (!pwr->alive);
/* check that proxy writer still exists (when deleting it is removed from guid hash) */
if (ephash_lookup_proxy_writer_guid (pwr->e.gv->guid_hash, &pwr->e.guid) == NULL)
if (entidx_lookup_proxy_writer_guid (pwr->e.gv->entity_index, &pwr->e.guid) == NULL)
{
ELOGDISC (pwr, "proxy_writer_set_alive_may_unlock("PGUIDFMT") - not in guid_hash, pwr deleting\n", PGUID (pwr->e.guid));
ELOGDISC (pwr, "proxy_writer_set_alive_may_unlock("PGUIDFMT") - not in entity index, pwr deleting\n", PGUID (pwr->e.guid));
return;
}
@ -4602,7 +4602,7 @@ int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify)
void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify)
{
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, pwrguid)) == NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, pwrguid)) == NULL)
GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*pwrguid));
else
{
@ -4627,9 +4627,9 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
int ret;
assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_reader_guid (gv->guid_hash, guid) == NULL);
assert (entidx_lookup_proxy_reader_guid (gv->entity_index, guid) == NULL);
if ((proxypp = ephash_lookup_proxy_participant_guid (gv->guid_hash, ppguid)) == NULL)
if ((proxypp = entidx_lookup_proxy_participant_guid (gv->entity_index, ppguid)) == NULL)
{
GVWARNING ("new_proxy_reader("PGUIDFMT"): proxy participant unknown\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
@ -4652,7 +4652,7 @@ int new_proxy_reader (struct q_globals *gv, const struct ddsi_guid *ppguid, cons
/* locking the entity prevents matching while the built-in topic hasn't been published yet */
ddsrt_mutex_lock (&prd->e.lock);
ephash_insert_proxy_reader_guid (gv->guid_hash, prd);
entidx_insert_proxy_reader_guid (gv->entity_index, prd);
builtintopic_write (gv->builtin_topic_interface, &prd->e, timestamp, true);
ddsrt_mutex_unlock (&prd->e.lock);
@ -4685,7 +4685,7 @@ static void proxy_reader_set_delete_and_ack_all_messages (struct proxy_reader *p
}
ddsrt_mutex_unlock (&prd->e.lock);
if ((wr = ephash_lookup_writer_guid (prd->e.gv->guid_hash, &wrguid)) != NULL)
if ((wr = entidx_lookup_writer_guid (prd->e.gv->entity_index, &wrguid)) != NULL)
{
struct whc_node *deferred_free_list = NULL;
struct wr_prd_match *m_wr;
@ -4732,14 +4732,14 @@ int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_
(void)isimplicit;
GVLOGDISC ("delete_proxy_reader ("PGUIDFMT") ", PGUID (*guid));
ddsrt_mutex_lock (&gv->lock);
if ((prd = ephash_lookup_proxy_reader_guid (gv->guid_hash, guid)) == NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gv->entity_index, guid)) == NULL)
{
ddsrt_mutex_unlock (&gv->lock);
GVLOGDISC ("- unknown\n");
return DDS_RETCODE_BAD_PARAMETER;
}
builtintopic_write (gv->builtin_topic_interface, &prd->e, timestamp, false);
ephash_remove_proxy_reader_guid (gv->guid_hash, prd);
entidx_remove_proxy_reader_guid (gv->entity_index, prd);
ddsrt_mutex_unlock (&gv->lock);
GVLOGDISC ("- deleting\n");

View file

@ -21,14 +21,12 @@
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_globals.h" /* for mattr, cattr */
#include "dds/ddsi/q_receive.h" /* for trigger_receive_threads */
#include "dds/ddsi/q_rtps.h" /* for guid_hash */
struct gcreq_queue {
struct gcreq *first;
struct gcreq *last;

View file

@ -37,7 +37,7 @@
#include "dds/ddsi/q_ddsi_discovery.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_gc.h"
#include "dds/ddsi/q_entity.h"
@ -1053,7 +1053,7 @@ int rtps_init (struct q_globals *gv)
ddsrt_cond_init (&gv->participant_set_cond);
lease_management_init (gv);
gv->deleted_participants = deleted_participants_admin_new (&gv->logconfig, gv->config.prune_deleted_ppant.delay);
gv->guid_hash = ephash_new (gv);
gv->entity_index = entity_index_new (gv);
ddsrt_mutex_init (&gv->privileged_pp_lock);
gv->privileged_pp = NULL;
@ -1372,8 +1372,8 @@ err_unicast_sockets:
ddsrt_mutex_destroy (&gv->spdp_lock);
ddsrt_mutex_destroy (&gv->lock);
ddsrt_mutex_destroy (&gv->privileged_pp_lock);
ephash_free (gv->guid_hash);
gv->guid_hash = NULL;
entity_index_free (gv->entity_index);
gv->entity_index = NULL;
deleted_participants_admin_free (gv->deleted_participants);
lease_management_term (gv);
ddsrt_cond_destroy (&gv->participant_set_cond);
@ -1548,26 +1548,26 @@ void rtps_stop (struct q_globals *gv)
ddsrt_mutex_destroy (&gv->spdp_lock);
{
struct ephash_enum_proxy_participant est;
struct entidx_enum_proxy_participant est;
struct proxy_participant *proxypp;
const nn_wctime_t tnow = now();
/* Clean up proxy readers, proxy writers and proxy
participants. Deleting a proxy participants deletes all its
readers and writers automatically */
thread_state_awake (ts1, gv);
ephash_enum_proxy_participant_init (&est, gv->guid_hash);
while ((proxypp = ephash_enum_proxy_participant_next (&est)) != NULL)
entidx_enum_proxy_participant_init (&est, gv->entity_index);
while ((proxypp = entidx_enum_proxy_participant_next (&est)) != NULL)
{
delete_proxy_participant_by_guid (gv, &proxypp->e.guid, tnow, 1);
}
ephash_enum_proxy_participant_fini (&est);
entidx_enum_proxy_participant_fini (&est);
thread_state_asleep (ts1);
}
{
struct ephash_enum_writer est_wr;
struct ephash_enum_reader est_rd;
struct ephash_enum_participant est_pp;
struct entidx_enum_writer est_wr;
struct entidx_enum_reader est_rd;
struct entidx_enum_participant est_pp;
struct participant *pp;
struct writer *wr;
struct reader *rd;
@ -1577,28 +1577,28 @@ void rtps_stop (struct q_globals *gv)
out. FIXME: need to keep xevent thread alive for a while
longer. */
thread_state_awake (ts1, gv);
ephash_enum_writer_init (&est_wr, gv->guid_hash);
while ((wr = ephash_enum_writer_next (&est_wr)) != NULL)
entidx_enum_writer_init (&est_wr, gv->entity_index);
while ((wr = entidx_enum_writer_next (&est_wr)) != NULL)
{
if (!is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE))
delete_writer_nolinger (gv, &wr->e.guid);
}
ephash_enum_writer_fini (&est_wr);
entidx_enum_writer_fini (&est_wr);
thread_state_awake_to_awake_no_nest (ts1);
ephash_enum_reader_init (&est_rd, gv->guid_hash);
while ((rd = ephash_enum_reader_next (&est_rd)) != NULL)
entidx_enum_reader_init (&est_rd, gv->entity_index);
while ((rd = entidx_enum_reader_next (&est_rd)) != NULL)
{
if (!is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE))
delete_reader (gv, &rd->e.guid);
}
ephash_enum_reader_fini (&est_rd);
entidx_enum_reader_fini (&est_rd);
thread_state_awake_to_awake_no_nest (ts1);
ephash_enum_participant_init (&est_pp, gv->guid_hash);
while ((pp = ephash_enum_participant_next (&est_pp)) != NULL)
entidx_enum_participant_init (&est_pp, gv->entity_index);
while ((pp = entidx_enum_participant_next (&est_pp)) != NULL)
{
delete_participant (gv, &pp->e.guid);
}
ephash_enum_participant_fini (&est_pp);
entidx_enum_participant_fini (&est_pp);
thread_state_asleep (ts1);
}
@ -1709,8 +1709,8 @@ void rtps_fini (struct q_globals *gv)
ddsi_tkmap_free (gv->m_tkmap);
ephash_free (gv->guid_hash);
gv->guid_hash = NULL;
entity_index_free (gv->entity_index);
gv->entity_index = NULL;
deleted_participants_admin_free (gv->deleted_participants);
lease_management_term (gv);
ddsrt_mutex_destroy (&gv->participant_set_lock);

View file

@ -30,7 +30,7 @@
#include "dds/ddsi/q_addrset.h"
#include "dds/ddsi/q_ddsi_discovery.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_xmsg.h"
@ -262,8 +262,8 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow
if (k == EK_PROXY_PARTICIPANT)
{
struct proxy_participant *proxypp;
if ((proxypp = ephash_lookup_proxy_participant_guid (gv->guid_hash, &g)) != NULL &&
ephash_lookup_proxy_participant_guid (gv->guid_hash, &proxypp->privileged_pp_guid) != NULL)
if ((proxypp = entidx_lookup_proxy_participant_guid (gv->entity_index, &g)) != NULL &&
entidx_lookup_proxy_participant_guid (gv->entity_index, &proxypp->privileged_pp_guid) != NULL)
{
GVLOGDISC ("but postponing because privileged pp "PGUIDFMT" is still live\n", PGUID (proxypp->privileged_pp_guid));
l->tsched = add_duration_to_etime (tnowE, 200 * T_MILLISECOND);

View file

@ -38,7 +38,7 @@
#include "dds/ddsi/q_ddsi_discovery.h"
#include "dds/ddsi/q_radmin.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_gc.h"
#include "dds/ddsi/q_entity.h"
@ -276,7 +276,7 @@ static int valid_NackFrag (NackFrag_t *msg, size_t size, int byteswap)
static void set_sampleinfo_proxy_writer (struct nn_rsample_info *sampleinfo, ddsi_guid_t *pwr_guid)
{
struct proxy_writer * pwr = ephash_lookup_proxy_writer_guid (sampleinfo->rst->gv->guid_hash, pwr_guid);
struct proxy_writer * pwr = entidx_lookup_proxy_writer_guid (sampleinfo->rst->gv->entity_index, pwr_guid);
sampleinfo->pwr = pwr;
}
@ -654,7 +654,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
return 1;
}
if ((wr = ephash_lookup_writer_guid (rst->gv->guid_hash, &dst)) == NULL)
if ((wr = entidx_lookup_writer_guid (rst->gv->entity_index, &dst)) == NULL)
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"?)", PGUID (src), PGUID (dst));
return 1;
@ -663,7 +663,7 @@ static int handle_AckNack (struct receiver_state *rst, nn_etime_t tnow, const Ac
the normal pure ack steady state. If (a big "if"!) this shows up
as a significant portion of the time, we can always rewrite it to
only retrieve it when needed. */
if ((prd = ephash_lookup_proxy_reader_guid (rst->gv->guid_hash, &src)) == NULL)
if ((prd = entidx_lookup_proxy_reader_guid (rst->gv->entity_index, &src)) == NULL)
{
RSTTRACE (" "PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1;
@ -1129,7 +1129,7 @@ static int handle_Heartbeat (struct receiver_state *rst, nn_etime_t tnow, struct
return 1;
}
if ((pwr = ephash_lookup_proxy_writer_guid (rst->gv->guid_hash, &src)) == NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (rst->gv->entity_index, &src)) == NULL)
{
RSTTRACE (PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1;
@ -1270,7 +1270,7 @@ static int handle_HeartbeatFrag (struct receiver_state *rst, UNUSED_ARG(nn_etime
return 1;
}
if ((pwr = ephash_lookup_proxy_writer_guid (rst->gv->guid_hash, &src)) == NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (rst->gv->entity_index, &src)) == NULL)
{
RSTTRACE (" "PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1;
@ -1388,7 +1388,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
return 1;
}
if ((wr = ephash_lookup_writer_guid (rst->gv->guid_hash, &dst)) == NULL)
if ((wr = entidx_lookup_writer_guid (rst->gv->entity_index, &dst)) == NULL)
{
RSTTRACE (" "PGUIDFMT" -> "PGUIDFMT"?)", PGUID (src), PGUID (dst));
return 1;
@ -1397,7 +1397,7 @@ static int handle_NackFrag (struct receiver_state *rst, nn_etime_t tnow, const N
the normal pure ack steady state. If (a big "if"!) this shows up
as a significant portion of the time, we can always rewrite it to
only retrieve it when needed. */
if ((prd = ephash_lookup_proxy_reader_guid (rst->gv->guid_hash, &src)) == NULL)
if ((prd = entidx_lookup_proxy_reader_guid (rst->gv->entity_index, &src)) == NULL)
{
RSTTRACE (" "PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1;
@ -1498,7 +1498,7 @@ static int handle_InfoDST (struct receiver_state *rst, const InfoDST_t *msg, con
ddsi_guid_t dst;
dst.prefix = rst->dst_guid_prefix;
dst.entityid = to_entityid(NN_ENTITYID_PARTICIPANT);
rst->forme = (ephash_lookup_participant_guid (rst->gv->guid_hash, &dst) != NULL ||
rst->forme = (entidx_lookup_participant_guid (rst->gv->entity_index, &dst) != NULL ||
is_deleted_participant_guid (rst->gv->deleted_participants, &dst, DPG_LOCAL));
}
return 1;
@ -1644,7 +1644,7 @@ static int handle_Gap (struct receiver_state *rst, nn_etime_t tnow, struct nn_rm
return 1;
}
if ((pwr = ephash_lookup_proxy_writer_guid (rst->gv->guid_hash, &src)) == NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (rst->gv->entity_index, &src)) == NULL)
{
RSTTRACE (""PGUIDFMT"? -> "PGUIDFMT")", PGUID (src), PGUID (dst));
return 1;
@ -1862,7 +1862,7 @@ static struct reader *proxy_writer_first_in_sync_reader (struct proxy_writer *pw
struct pwr_rd_match *m;
struct reader *rd;
for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, it); m != NULL; m = ddsrt_avl_iter_next (it))
if (m->in_sync == PRMSS_SYNC && (rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &m->rd_guid)) != NULL)
return rd;
return NULL;
}
@ -1872,7 +1872,7 @@ static struct reader *proxy_writer_next_in_sync_reader (struct proxy_writer *pwr
struct pwr_rd_match *m;
struct reader *rd;
for (m = ddsrt_avl_iter_next (it); m != NULL; m = ddsrt_avl_iter_next (it))
if (m->in_sync == PRMSS_SYNC && (rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
if (m->in_sync == PRMSS_SYNC && (rd = entidx_lookup_reader_guid (pwr->e.gv->entity_index, &m->rd_guid)) != NULL)
return rd;
return NULL;
}
@ -2027,7 +2027,7 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
}
else
{
struct reader *rd = ephash_lookup_reader_guid (gv->guid_hash, rdguid);
struct reader *rd = entidx_lookup_reader_guid (gv->entity_index, rdguid);
if (rd != NULL)
{
struct ddsi_serdata *payload;
@ -2043,8 +2043,8 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
dds_sleepfor (DDS_MSECS (1));
if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
if (ephash_lookup_reader_guid (gv->guid_hash, rdguid) == NULL ||
ephash_lookup_proxy_writer_guid (gv->guid_hash, &pwr->e.guid) == NULL)
if (entidx_lookup_reader_guid (gv->entity_index, rdguid) == NULL ||
entidx_lookup_proxy_writer_guid (gv->entity_index, &pwr->e.guid) == NULL)
{
/* give up when reader or proxy writer no longer accessible */
break;
@ -2671,7 +2671,7 @@ static int handle_submsg_sequence
/* "forme" is a whether the current submessage is intended for this
instance of DDSI2 and is roughly equivalent to
(dst_prefix == 0) ||
(ephash_lookup_participant_guid(dst_prefix:1c1) != 0)
(entidx_lookup_participant_guid(dst_prefix:1c1) != 0)
they are only roughly equivalent because the second term can become
false at any time. That's ok: it's real purpose is to filter out
discovery data accidentally sent by Cloud */
@ -3092,7 +3092,7 @@ static void local_participant_set_fini (struct local_participant_set *lps)
static void rebuild_local_participant_set (struct thread_state1 * const ts1, struct q_globals *gv, struct local_participant_set *lps)
{
struct ephash_enum_participant est;
struct entidx_enum_participant est;
struct participant *pp;
unsigned nps_alloc;
GVTRACE ("pp set gen changed: local %"PRIu32" global %"PRIu32"\n", lps->gen, ddsrt_atomic_ld32 (&gv->participant_set_generation));
@ -3106,8 +3106,8 @@ static void rebuild_local_participant_set (struct thread_state1 * const ts1, str
ddsrt_free (lps->ps);
lps->nps = 0;
lps->ps = (nps_alloc == 0) ? NULL : ddsrt_malloc (nps_alloc * sizeof (*lps->ps));
ephash_enum_participant_init (&est, gv->guid_hash);
while ((pp = ephash_enum_participant_next (&est)) != NULL)
entidx_enum_participant_init (&est, gv->entity_index);
while ((pp = entidx_enum_participant_next (&est)) != NULL)
{
if (lps->nps == nps_alloc)
{
@ -3115,7 +3115,7 @@ static void rebuild_local_participant_set (struct thread_state1 * const ts1, str
existing ones removed), so we may have to restart if it
turns out we didn't allocate enough memory [an
alternative would be to realloc on the fly]. */
ephash_enum_participant_fini (&est);
entidx_enum_participant_fini (&est);
GVTRACE (" need more memory - restarting\n");
goto restart;
}
@ -3127,7 +3127,7 @@ static void rebuild_local_participant_set (struct thread_state1 * const ts1, str
lps->nps++;
}
}
ephash_enum_participant_fini (&est);
entidx_enum_participant_fini (&est);
/* There is a (very small) probability of a participant
disappearing and new one appearing with the same socket while

View file

@ -18,7 +18,7 @@
#include "dds/ddsrt/avl.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_addrset.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/q_bswap.h"
@ -200,7 +200,7 @@ struct nn_xmsg *writer_hbcontrol_create_heartbeat (struct writer *wr, const stru
else
{
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (gv->guid_hash, prd_guid)) == NULL)
if ((prd = entidx_lookup_proxy_reader_guid (gv->entity_index, prd_guid)) == NULL)
{
ETRACE (wr, "writer_hbcontrol: wr "PGUIDFMT" unknown prd "PGUIDFMT"\n", PGUID (wr->e.guid), PGUID (*prd_guid));
nn_xmsg_free (msg);
@ -676,7 +676,7 @@ dds_return_t write_hb_liveliness (struct q_globals * const gv, struct ddsi_guid
struct whc_state whcst;
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, gv);
struct writer *wr = ephash_lookup_writer_guid (gv->guid_hash, wr_guid);
struct writer *wr = entidx_lookup_writer_guid (gv->entity_index, wr_guid);
if (wr == NULL)
{
GVTRACE ("write_hb_liveliness("PGUIDFMT") - writer not found\n", PGUID (*wr_guid));

View file

@ -28,7 +28,7 @@
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_unused.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_transmit.h"
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_entity.h"
@ -589,7 +589,7 @@ static void handle_xevk_heartbeat (struct nn_xpack *xp, struct xevent *ev, nn_mt
int hbansreq = 0;
struct whc_state whcst;
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, &ev->u.heartbeat.wr_guid)) == NULL)
if ((wr = entidx_lookup_writer_guid (gv->entity_index, &ev->u.heartbeat.wr_guid)) == NULL)
{
GVTRACE("heartbeat(wr "PGUIDFMT") writer gone\n", PGUID (ev->u.heartbeat.wr_guid));
return;
@ -859,7 +859,7 @@ static void handle_xevk_acknack (UNUSED_ARG (struct nn_xpack *xp), struct xevent
struct pwr_rd_match *rwn;
nn_locator_t loc;
if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, &ev->u.acknack.pwr_guid)) == NULL)
if ((pwr = entidx_lookup_proxy_writer_guid (gv->entity_index, &ev->u.acknack.pwr_guid)) == NULL)
{
return;
}
@ -986,7 +986,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
struct writer *spdp_wr;
bool do_write;
if ((pp = ephash_lookup_participant_guid (gv->guid_hash, &ev->u.spdp.pp_guid)) == NULL)
if ((pp = entidx_lookup_participant_guid (gv->entity_index, &ev->u.spdp.pp_guid)) == NULL)
{
GVTRACE ("handle_xevk_spdp "PGUIDFMT" - unknown guid\n", PGUID (ev->u.spdp.pp_guid));
if (ev->u.spdp.directed)
@ -1014,7 +1014,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
ddsi_guid_t guid;
guid.prefix = ev->u.spdp.dest_proxypp_guid_prefix;
guid.entityid.u = NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER;
prd = ephash_lookup_proxy_reader_guid (gv->guid_hash, &guid);
prd = entidx_lookup_proxy_reader_guid (gv->entity_index, &guid);
do_write = (prd != NULL);
if (!do_write)
GVTRACE ("xmit spdp: no proxy reader "PGUIDFMT"\n", PGUID (guid));
@ -1099,7 +1099,7 @@ static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_
dds_duration_t intv;
nn_mtime_t tnext;
if ((pp = ephash_lookup_participant_guid (gv->guid_hash, &ev->u.pmd_update.pp_guid)) == NULL)
if ((pp = entidx_lookup_participant_guid (gv->entity_index, &ev->u.pmd_update.pp_guid)) == NULL)
{
return;
}

View file

@ -38,7 +38,7 @@
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/ddsi_entity_index.h"
#include "dds/ddsi/q_freelist.h"
#include "dds/ddsi/ddsi_serdata_default.h"
@ -694,7 +694,7 @@ int nn_xmsg_merge_rexmit_destinations_wrlock_held (struct q_globals *gv, struct
an addrset in rebuild_writer_addrset: then we don't
need the lock anymore, and the '_wrlock_held' suffix
can go and everyone's life will become easier! */
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, &m->kindspecific.data.wrguid)) == NULL)
if ((wr = entidx_lookup_writer_guid (gv->entity_index, &m->kindspecific.data.wrguid)) == NULL)
{
GVTRACE ("writer-dead)");
return 0;
@ -860,7 +860,7 @@ static void nn_xmsg_chain_release (struct q_globals *gv, struct nn_xmsg_chain *c
struct writer *wr;
assert (m->kindspecific.data.wrseq != 0);
wrguid = m->kindspecific.data.wrguid;
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, &m->kindspecific.data.wrguid)) != NULL)
if ((wr = entidx_lookup_writer_guid (gv->entity_index, &m->kindspecific.data.wrguid)) != NULL)
writer_update_seq_xmit (wr, m->kindspecific.data.wrseq);
}
}