DDSI stack init/fini in domain_create/free

This moves DDSI stack initialisation and finalisation to the creating
and deleting of a domain, and modifies the related code to trigger all
that from creating/deleting participants.

Built-in topic generation is partially domain-dependent, so that moves
as well.  The underlying ddsi_sertopics can be created are domain
independent and created without initialising DDSI, which necessitates
moving the IID generation (and thus init/fini) out of the DDSI stack and
to what will remain global data.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-06-27 10:45:38 +02:00 committed by eboasson
parent cf46ddbb7b
commit 782f032df8
23 changed files with 500 additions and 423 deletions

View file

@ -12,7 +12,7 @@
#ifndef _DDS_BUILTIN_H_ #ifndef _DDS_BUILTIN_H_
#define _DDS_BUILTIN_H_ #define _DDS_BUILTIN_H_
#include "dds/ddsi/q_time.h" #include "dds/ddsi/ddsi_builtin_topic_if.h"
#if defined (__cplusplus) #if defined (__cplusplus)
extern "C" extern "C"
@ -20,25 +20,23 @@ extern "C"
#endif #endif
/* Get actual topic in related participant related to topic 'id'. */ /* Get actual topic in related participant related to topic 'id'. */
dds_entity_t dds__get_builtin_topic ( dds_entity_t e, dds_entity_t topic); dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic);
/* Subscriber singleton within related participant. */ /* Subscriber singleton within related participant. */
dds_entity_t dds__get_builtin_subscriber(dds_entity_t e); dds_entity_t dds__get_builtin_subscriber (dds_entity_t e);
/* Checks whether the reader QoS is valid for use with built-in topic TOPIC */ /* Checks whether the reader QoS is valid for use with built-in topic TOPIC */
bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos); bool dds__validate_builtin_reader_qos (const dds_domain *dom, dds_entity_t topic, const dds_qos_t *qos);
/* Init/fini for builtin-topic support that is global across domains */
void dds__builtin_init_global (void);
void dds__builtin_fini_global (void);
void dds__builtin_init (struct dds_domain *dom);
void dds__builtin_fini (struct dds_domain *dom);
struct entity_common; struct entity_common;
struct nn_guid;
struct ddsi_tkmap_instance;
void dds__builtin_init (void);
void dds__builtin_fini (void);
bool dds__builtin_is_builtintopic (const struct ddsi_sertopic *tp);
bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendorid);
struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid);
struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive); struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -18,11 +18,9 @@
extern "C" { extern "C" {
#endif #endif
extern DDS_EXPORT const ddsrt_avl_treedef_t dds_domaintree_def; DDS_EXPORT dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id);
DDS_EXPORT void dds_domain_free (dds_domain *domain);
DDS_EXPORT dds_domain * dds_domain_create (dds_domainid_t id); DDS_EXPORT dds_domain *dds_domain_find_locked (dds_domainid_t id);
DDS_EXPORT void dds_domain_free (dds_domain * domain);
DDS_EXPORT dds_domain * dds_domain_find_locked (dds_domainid_t id);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -78,7 +78,7 @@ struct dds_handle_link {
* Initialize handleserver singleton. * Initialize handleserver singleton.
*/ */
DDS_EXPORT dds_return_t DDS_EXPORT dds_return_t
dds_handle_server_init(void (*free_via_gc) (void *x)); dds_handle_server_init(void);
/* /*

View file

@ -18,9 +18,6 @@
extern "C" { extern "C" {
#endif #endif
dds_return_t
dds__check_domain(dds_domainid_t domain);
/** /**
*Description : Initialization function, called from main. This operation *Description : Initialization function, called from main. This operation
*initializes all the required DDS resources, *initializes all the required DDS resources,
@ -30,8 +27,7 @@ dds__check_domain(dds_domainid_t domain);
*Arguments : *Arguments :
*-# Returns 0 on success or a non-zero error status *-# Returns 0 on success or a non-zero error status
**/ **/
dds_return_t dds_return_t dds_init (void);
dds_init(dds_domainid_t domain);
/* Finalization function, called from main */ /* Finalization function, called from main */
@ -42,16 +38,7 @@ dds_init(dds_domainid_t domain);
*Arguments : *Arguments :
*-# None *-# None
**/ **/
void void dds_fini (void);
dds_fini(void);
/**
* Description : Function that provides the explicit ID of default domain
* It should be called after DDS initialization.
* @return Valid domain id. Undetermined behaviour if DDS is not initialized.
*/
dds_domainid_t dds_domain_default (void);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -18,6 +18,7 @@
#include "dds/ddsrt/sync.h" #include "dds/ddsrt/sync.h"
#include "dds/ddsi/q_rtps.h" #include "dds/ddsi/q_rtps.h"
#include "dds/ddsrt/avl.h" #include "dds/ddsrt/avl.h"
#include "dds/ddsi/ddsi_builtin_topic_if.h"
#include "dds__handles.h" #include "dds__handles.h"
#if defined (__cplusplus) #if defined (__cplusplus)
@ -92,10 +93,19 @@ struct dds_listener {
#define DDS_ENTITY_IMPLICIT 0x0002u #define DDS_ENTITY_IMPLICIT 0x0002u
typedef struct dds_domain { typedef struct dds_domain {
/* FIXME: protected by dds_global.lock -- for now */
ddsrt_avl_node_t m_node; ddsrt_avl_node_t m_node;
dds_domainid_t m_id; dds_domainid_t m_id;
ddsrt_avl_tree_t m_topics; ddsrt_avl_tree_t m_topics;
struct dds_entity *ppants;
uint32_t m_refc; uint32_t m_refc;
struct cfgst *cfgst;
struct local_orphan_writer *builtintopic_writer_participant;
struct local_orphan_writer *builtintopic_writer_publications;
struct local_orphan_writer *builtintopic_writer_subscriptions;
struct ddsi_builtin_topic_interface btif;
} dds_domain; } dds_domain;
struct dds_entity; struct dds_entity;
@ -295,10 +305,13 @@ typedef struct dds_waitset {
/* Globals */ /* Globals */
typedef struct dds_globals { typedef struct dds_globals {
dds_domainid_t m_default_domain;
int32_t m_init_count; int32_t m_init_count;
ddsrt_avl_tree_t m_domains; ddsrt_avl_tree_t m_domains;
ddsrt_mutex_t m_mutex; ddsrt_mutex_t m_mutex;
struct ddsi_sertopic *builtin_participant_topic;
struct ddsi_sertopic *builtin_reader_topic;
struct ddsi_sertopic *builtin_writer_topic;
} dds_globals; } dds_globals;
DDS_EXPORT extern dds_globals dds_global; DDS_EXPORT extern dds_globals dds_global;

View file

@ -15,6 +15,7 @@
#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_config.h" #include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/q_plist.h" /* for nn_keyhash */ #include "dds/ddsi/q_plist.h" /* for nn_keyhash */
#include "dds__init.h" #include "dds__init.h"
#include "dds__domain.h" #include "dds__domain.h"
@ -29,13 +30,6 @@
#include "dds/ddsi/q_qosmatch.h" #include "dds/ddsi/q_qosmatch.h"
#include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_tkmap.h"
static struct ddsi_sertopic *builtin_participant_topic;
static struct ddsi_sertopic *builtin_reader_topic;
static struct ddsi_sertopic *builtin_writer_topic;
static struct local_orphan_writer *builtintopic_writer_participant;
static struct local_orphan_writer *builtintopic_writer_publications;
static struct local_orphan_writer *builtintopic_writer_subscriptions;
static dds_qos_t *dds__create_builtin_qos (void) static dds_qos_t *dds__create_builtin_qos (void)
{ {
const char *partition = "__BUILT-IN PARTITION__"; const char *partition = "__BUILT-IN PARTITION__";
@ -47,58 +41,37 @@ static dds_qos_t *dds__create_builtin_qos (void)
return qos; return qos;
} }
void dds__builtin_init (void) dds_entity_t dds__get_builtin_topic (dds_entity_t entity, dds_entity_t topic)
{ {
dds_qos_t *qos = dds__create_builtin_qos ();
builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT));
builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER));
builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER));
dds_delete_qos (qos);
}
void dds__builtin_fini (void)
{
/* No more sources for builtin topic samples */
thread_state_awake (lookup_thread_state ());
delete_local_orphan_writer (builtintopic_writer_participant);
delete_local_orphan_writer (builtintopic_writer_publications);
delete_local_orphan_writer (builtintopic_writer_subscriptions);
thread_state_asleep (lookup_thread_state ());
ddsi_sertopic_unref (builtin_participant_topic);
ddsi_sertopic_unref (builtin_reader_topic);
ddsi_sertopic_unref (builtin_writer_topic);
}
dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic)
{
dds_entity_t pp;
dds_entity_t tp; dds_entity_t tp;
dds_return_t rc;
dds_entity *e;
if ((pp = dds_get_participant (e)) <= 0) if ((rc = dds_entity_pin (entity, &e)) < 0)
return pp; return rc;
struct ddsi_sertopic *sertopic; struct ddsi_sertopic *sertopic;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { switch (topic)
sertopic = builtin_participant_topic; {
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT:
sertopic = builtin_writer_topic; sertopic = dds_global.builtin_participant_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { break;
sertopic = builtin_reader_topic; case DDS_BUILTIN_TOPIC_DCPSPUBLICATION:
} else { sertopic = dds_global.builtin_writer_topic;
assert (0); break;
return DDS_RETCODE_BAD_PARAMETER; case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION:
sertopic = dds_global.builtin_reader_topic;
break;
default:
assert (0);
dds_entity_unpin (e);
return DDS_RETCODE_BAD_PARAMETER;
} }
dds_qos_t *qos = dds__create_builtin_qos (); dds_qos_t *qos = dds__create_builtin_qos ();
tp = dds_create_topic_arbitrary (pp, sertopic, qos, NULL, NULL); tp = dds_create_topic_arbitrary (e->m_participant->m_hdllink.hdl, sertopic, qos, NULL, NULL);
dds_delete_qos (qos); dds_delete_qos (qos);
dds_entity_unpin (e);
return tp; return tp;
} }
@ -110,7 +83,7 @@ static bool qos_has_resource_limits (const dds_qos_t *qos)
qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED); qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED);
} }
bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos) bool dds__validate_builtin_reader_qos (const dds_domain *dom, dds_entity_t topic, const dds_qos_t *qos)
{ {
if (qos == NULL) if (qos == NULL)
/* default QoS inherited from topic is ok by definition */ /* default QoS inherited from topic is ok by definition */
@ -121,15 +94,20 @@ bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos)
forbid the creation of a reader matching a built-in topics writer that has forbid the creation of a reader matching a built-in topics writer that has
resource limits */ resource limits */
struct local_orphan_writer *bwr; struct local_orphan_writer *bwr;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { switch (topic)
bwr = builtintopic_writer_participant; {
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT:
bwr = builtintopic_writer_publications; bwr = dom->builtintopic_writer_participant;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { break;
bwr = builtintopic_writer_subscriptions; case DDS_BUILTIN_TOPIC_DCPSPUBLICATION:
} else { bwr = dom->builtintopic_writer_publications;
assert (0); break;
return false; case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION:
bwr = dom->builtintopic_writer_subscriptions;
break;
default:
assert (0);
return false;
} }
/* FIXME: DDSI-level readers, writers have topic, type name in their QoS, but /* FIXME: DDSI-level readers, writers have topic, type name in their QoS, but
@ -171,26 +149,29 @@ dds_entity_t dds__get_builtin_subscriber (dds_entity_t e)
return sub; return sub;
} }
bool dds__builtin_is_builtintopic (const struct ddsi_sertopic *tp) static bool dds__builtin_is_builtintopic (const struct ddsi_sertopic *tp, void *vdomain)
{ {
(void) vdomain;
return tp->ops == &ddsi_sertopic_ops_builtintopic; return tp->ops == &ddsi_sertopic_ops_builtintopic;
} }
bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendorid) static bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendorid, void *vdomain)
{ {
(void) vdomain;
if (is_builtin_endpoint (guid->entityid, vendorid)) if (is_builtin_endpoint (guid->entityid, vendorid))
return false; return false;
return true; return true;
} }
struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid) static struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid, void *vdomain)
{ {
struct ddsi_tkmap_instance *tk; struct ddsi_tkmap_instance *tk;
struct ddsi_serdata *sd; struct ddsi_serdata *sd;
struct nn_keyhash kh; struct nn_keyhash kh;
(void) vdomain;
memcpy (&kh, guid, sizeof (kh)); memcpy (&kh, guid, sizeof (kh));
/* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */ /* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
sd = ddsi_serdata_from_keyhash (builtin_participant_topic, &kh); sd = ddsi_serdata_from_keyhash (dds_global.builtin_participant_topic, &kh);
tk = ddsi_tkmap_find (sd, false, true); tk = ddsi_tkmap_find (sd, false, true);
ddsi_serdata_unref (sd); ddsi_serdata_unref (sd);
return tk; return tk;
@ -206,15 +187,15 @@ struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn
{ {
case EK_PARTICIPANT: case EK_PARTICIPANT:
case EK_PROXY_PARTICIPANT: case EK_PROXY_PARTICIPANT:
topic = builtin_participant_topic; topic = dds_global.builtin_participant_topic;
break; break;
case EK_WRITER: case EK_WRITER:
case EK_PROXY_WRITER: case EK_PROXY_WRITER:
topic = builtin_writer_topic; topic = dds_global.builtin_writer_topic;
break; break;
case EK_READER: case EK_READER:
case EK_PROXY_READER: case EK_PROXY_READER:
topic = builtin_reader_topic; topic = dds_global.builtin_reader_topic;
break; break;
} }
assert (topic != NULL); assert (topic != NULL);
@ -225,9 +206,10 @@ struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn
return serdata; return serdata;
} }
void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive) static void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive, void *vdomain)
{ {
if (dds__builtin_is_visible (&e->guid, get_entity_vendorid (e))) struct dds_domain *dom = vdomain;
if (dds__builtin_is_visible (&e->guid, get_entity_vendorid (e), dom))
{ {
/* initialize to avoid gcc warning ultimately caused by C's horrible type system */ /* initialize to avoid gcc warning ultimately caused by C's horrible type system */
struct local_orphan_writer *bwr = NULL; struct local_orphan_writer *bwr = NULL;
@ -237,17 +219,59 @@ void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, b
{ {
case EK_PARTICIPANT: case EK_PARTICIPANT:
case EK_PROXY_PARTICIPANT: case EK_PROXY_PARTICIPANT:
bwr = builtintopic_writer_participant; bwr = dom->builtintopic_writer_participant;
break; break;
case EK_WRITER: case EK_WRITER:
case EK_PROXY_WRITER: case EK_PROXY_WRITER:
bwr = builtintopic_writer_publications; bwr = dom->builtintopic_writer_publications;
break; break;
case EK_READER: case EK_READER:
case EK_PROXY_READER: case EK_PROXY_READER:
bwr = builtintopic_writer_subscriptions; bwr = dom->builtintopic_writer_subscriptions;
break; break;
} }
dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata); dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata);
} }
} }
void dds__builtin_init_global (void)
{
dds_global.builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
dds_global.builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
dds_global.builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
}
void dds__builtin_fini_global (void)
{
ddsi_sertopic_unref (dds_global.builtin_participant_topic);
ddsi_sertopic_unref (dds_global.builtin_reader_topic);
ddsi_sertopic_unref (dds_global.builtin_writer_topic);
}
void dds__builtin_init (struct dds_domain *dom)
{
dds_qos_t *qos = dds__create_builtin_qos ();
dom->btif.arg = dom;
dom->btif.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry;
dom->btif.builtintopic_is_builtintopic = dds__builtin_is_builtintopic;
dom->btif.builtintopic_is_visible = dds__builtin_is_visible;
dom->btif.builtintopic_write = dds__builtin_write;
gv.builtin_topic_interface = &dom->btif;
dom->builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dds_global.builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT));
dom->builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dds_global.builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER));
dom->builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dds_global.builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER));
dds_delete_qos (qos);
}
void dds__builtin_fini (struct dds_domain *dom)
{
/* No more sources for builtin topic samples */
thread_state_awake (lookup_thread_state ());
delete_local_orphan_writer (dom->builtintopic_writer_participant);
delete_local_orphan_writer (dom->builtintopic_writer_publications);
delete_local_orphan_writer (dom->builtintopic_writer_subscriptions);
thread_state_asleep (lookup_thread_state ());
}

View file

@ -9,48 +9,253 @@
* *
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/ */
#include <string.h>
#include "dds/ddsrt/environ.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/heap.h"
#include "dds__init.h"
#include "dds__rhc.h"
#include "dds__domain.h" #include "dds__domain.h"
#include "dds__builtin.h"
#include "dds__whc_builtintopic.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/ddsi_threadmon.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_gc.h"
#include "dds/ddsi/q_globals.h"
#include "dds/version.h"
static int dds_domain_compare (const void *va, const void *vb) static int dds_domain_compare (const void *va, const void *vb)
{ {
const int32_t *a = va; const dds_domainid_t *a = va;
const int32_t *b = vb; const dds_domainid_t *b = vb;
return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; return (*a == *b) ? 0 : (*a < *b) ? -1 : 1;
} }
const ddsrt_avl_treedef_t dds_domaintree_def = DDSRT_AVL_TREEDEF_INITIALIZER ( static const ddsrt_avl_treedef_t dds_domaintree_def = DDSRT_AVL_TREEDEF_INITIALIZER (
offsetof (dds_domain, m_node), offsetof (dds_domain, m_id), dds_domain_compare, 0); offsetof (dds_domain, m_node), offsetof (dds_domain, m_id), dds_domain_compare, 0);
static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_id)
{
dds_return_t ret = DDS_RETCODE_OK;
char * uri = NULL;
uint32_t len;
domain->m_id = domain_id;
domain->m_refc = 1;
ddsrt_avl_init (&dds_topictree_def, &domain->m_topics);
gv.tstart = now ();
(void) ddsrt_getenv ("CYCLONEDDS_URI", &uri);
domain->cfgst = config_init (uri);
if (domain->cfgst == NULL)
{
DDS_LOG (DDS_LC_CONFIG, "Failed to parse configuration XML file %s\n", uri);
ret = DDS_RETCODE_ERROR;
goto fail_config;
}
/* if a domain id was explicitly given, check & fix up the configuration */
if (domain_id != DDS_DOMAIN_DEFAULT)
{
if (domain_id < 0 || domain_id > 230)
{
DDS_ERROR ("requested domain id %"PRId32" is out of range\n", domain_id);
ret = DDS_RETCODE_ERROR;
goto fail_config_domainid;
}
else if (config.domainId.isdefault)
{
config.domainId.value = domain_id;
}
else if (domain_id != config.domainId.value)
{
DDS_ERROR ("requested domain id %"PRId32" is inconsistent with configured value %"PRId32"\n", domain_id, config.domainId.value);
ret = DDS_RETCODE_ERROR;
goto fail_config_domainid;
}
}
/* FIXME: The config.domainId can change internally in DDSI. So, remember what the
* main configured domain id is. */
domain->m_id = config.domainId.value;
if (rtps_config_prep (domain->cfgst) != 0)
{
DDS_LOG (DDS_LC_CONFIG, "Failed to configure RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_config;
}
/* Start monitoring the liveliness of all threads. */
if (!config.liveliness_monitoring)
gv.threadmon = NULL;
else
{
gv.threadmon = ddsi_threadmon_new ();
if (gv.threadmon == NULL)
{
DDS_ERROR ("Failed to create a thread monitor\n");
ret = DDS_RETCODE_OUT_OF_RESOURCES;
goto fail_threadmon_new;
}
}
if (rtps_init () < 0)
{
DDS_LOG (DDS_LC_CONFIG, "Failed to initialize RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_init;
}
dds__builtin_init (domain);
if (rtps_start () < 0)
{
DDS_LOG (DDS_LC_CONFIG, "Failed to start RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_start;
}
if (gv.threadmon && ddsi_threadmon_start (gv.threadmon) < 0)
{
DDS_ERROR ("Failed to start the servicelease\n");
ret = DDS_RETCODE_ERROR;
goto fail_threadmon_start;
}
/* Set additional default participant properties */
char progname[50] = "UNKNOWN"; /* FIXME: once retrieving process names is back in */
char hostname[64];
gv.default_plist_pp.process_id = (unsigned) ddsrt_getpid();
gv.default_plist_pp.present |= PP_PRISMTECH_PROCESS_ID;
gv.default_plist_pp.exec_name = dds_string_alloc(32);
(void) snprintf (gv.default_plist_pp.exec_name, 32, "CycloneDDS: %u", gv.default_plist_pp.process_id);
len = (uint32_t) (13 + strlen (gv.default_plist_pp.exec_name));
gv.default_plist_pp.present |= PP_PRISMTECH_EXEC_NAME;
if (ddsrt_gethostname (hostname, sizeof (hostname)) == DDS_RETCODE_OK)
{
gv.default_plist_pp.node_name = dds_string_dup (hostname);
gv.default_plist_pp.present |= PP_PRISMTECH_NODE_NAME;
}
gv.default_plist_pp.entity_name = dds_alloc (len);
(void) snprintf (gv.default_plist_pp.entity_name, len, "%s<%u>", progname, gv.default_plist_pp.process_id);
gv.default_plist_pp.present |= PP_ENTITY_NAME;
return DDS_RETCODE_OK;
fail_threadmon_start:
if (gv.threadmon)
ddsi_threadmon_stop (gv.threadmon);
rtps_stop ();
fail_rtps_start:
rtps_fini ();
fail_rtps_init:
if (gv.threadmon)
{
ddsi_threadmon_free (gv.threadmon);
gv.threadmon = NULL;
}
fail_threadmon_new:
downgrade_main_thread ();
thread_states_fini();
fail_rtps_config:
fail_config_domainid:
config_fini (domain->cfgst);
fail_config:
return ret;
}
static void dds_domain_fini (struct dds_domain *domain)
{
if (gv.threadmon)
ddsi_threadmon_stop (gv.threadmon);
rtps_stop ();
dds__builtin_fini (domain);
rtps_fini ();
if (gv.threadmon)
ddsi_threadmon_free (gv.threadmon);
gv.threadmon = NULL;
config_fini (domain->cfgst);
}
dds_domain *dds_domain_find_locked (dds_domainid_t id) dds_domain *dds_domain_find_locked (dds_domainid_t id)
{ {
return ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &id); return ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &id);
} }
dds_domain *dds_domain_create (dds_domainid_t id) dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id)
{ {
dds_domain *domain; struct dds_domain *dom = NULL;
dds_return_t ret;
if (id != DDS_DOMAIN_DEFAULT && (id < 0 || id > 230))
return DDS_RETCODE_BAD_PARAMETER;
ddsrt_mutex_lock (&dds_global.m_mutex); ddsrt_mutex_lock (&dds_global.m_mutex);
domain = dds_domain_find_locked (id);
if (domain == NULL) /* FIXME: hack around default domain ids, not yet being able to handle multiple domains simultaneously */
if (id != DDS_DOMAIN_DEFAULT)
{ {
domain = dds_alloc (sizeof (*domain)); if ((dom = dds_domain_find_locked (id)) == NULL)
domain->m_id = id; ret = DDS_RETCODE_NOT_FOUND;
ddsrt_avl_init (&dds_topictree_def, &domain->m_topics); else if (dom->m_id == id)
ddsrt_avl_insert (&dds_domaintree_def, &dds_global.m_domains, domain); ret = DDS_RETCODE_OK;
else
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
}
else
{
if ((dom = ddsrt_avl_find_min (&dds_domaintree_def, &dds_global.m_domains)) != NULL)
ret = DDS_RETCODE_OK;
else
ret = DDS_RETCODE_NOT_FOUND;
}
switch (ret)
{
case DDS_RETCODE_OK:
dom->m_refc++;
*domain_out = dom;
break;
case DDS_RETCODE_NOT_FOUND:
dom = dds_alloc (sizeof (*dom));
if ((ret = dds_domain_init (dom, id)) < 0)
dds_free (dom);
else
{
ddsrt_avl_insert (&dds_domaintree_def, &dds_global.m_domains, dom);
*domain_out = dom;
}
break;
case DDS_RETCODE_PRECONDITION_NOT_MET:
DDS_ERROR("Inconsistent domain configuration detected: domain on configuration: %"PRId32", domain %"PRId32"\n", dom->m_id, id);
break;
} }
domain->m_refc++;
ddsrt_mutex_unlock (&dds_global.m_mutex); ddsrt_mutex_unlock (&dds_global.m_mutex);
return domain; return ret;
} }
void dds_domain_free (dds_domain *domain) void dds_domain_free (dds_domain *domain)
{ {
ddsrt_mutex_lock (&dds_global.m_mutex); ddsrt_mutex_lock (&dds_global.m_mutex);
if (--domain->m_refc == 0) if (--domain->m_refc != 0)
{ {
//fprintf(stderr, "domain_free %d %p refcount > 0\n", (int)domain->m_id, domain);
ddsrt_mutex_unlock (&dds_global.m_mutex);
}
else
{
//fprintf(stderr, "domain_free %d %p\n", (int)domain->m_id, domain);
ddsrt_avl_delete (&dds_domaintree_def, &dds_global.m_domains, domain); ddsrt_avl_delete (&dds_domaintree_def, &dds_global.m_domains, domain);
ddsrt_mutex_unlock (&dds_global.m_mutex);
dds_domain_fini (domain);
dds_free (domain); dds_free (domain);
} }
ddsrt_mutex_unlock (&dds_global.m_mutex);
} }

View file

@ -76,13 +76,12 @@ static int handle_equal (const void *va, const void *vb)
return a->hdl == b->hdl; return a->hdl == b->hdl;
} }
dds_return_t dds_handle_server_init (void (*free_via_gc) (void *x)) dds_return_t dds_handle_server_init (void)
{ {
#if USE_CHH #if USE_CHH
handles.ht = ddsrt_chh_new (128, handle_hash, handle_equal, free_via_gc); handles.ht = ddsrt_chh_new (128, handle_hash, handle_equal, free_via_gc);
#else #else
handles.ht = ddsrt_hh_new (128, handle_hash, handle_equal); handles.ht = ddsrt_hh_new (128, handle_hash, handle_equal);
(void) free_via_gc;
#endif #endif
handles.count = 0; handles.count = 0;
ddsrt_mutex_init (&handles.lock); ddsrt_mutex_init (&handles.lock);

View file

@ -37,249 +37,60 @@
struct q_globals gv; struct q_globals gv;
dds_globals dds_global = { .m_default_domain = DDS_DOMAIN_DEFAULT }; dds_globals dds_global;
static struct cfgst * dds_cfgst = NULL;
static void free_via_gc_cb (struct gcreq *gcreq) dds_return_t dds_init (void)
{ {
void *bs = gcreq->arg; dds_return_t ret;
gcreq_free (gcreq);
ddsrt_free (bs);
}
static void free_via_gc (void *bs) ddsrt_init ();
{ ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
struct gcreq *gcreq = gcreq_new (gv.gcreq_queue, free_via_gc_cb); ddsrt_mutex_lock (init_mutex);
gcreq->arg = bs; if (dds_global.m_init_count++ != 0)
gcreq_enqueue (gcreq);
}
dds_return_t
dds_init(dds_domainid_t domain)
{
dds_return_t ret = DDS_RETCODE_OK;
char * uri = NULL;
char progname[50] = "UNKNOWN"; /* FIXME: once retrieving process names is back in */
char hostname[64];
uint32_t len;
ddsrt_mutex_t *init_mutex;
/* Be sure the DDS lifecycle resources are initialized. */
ddsrt_init();
init_mutex = ddsrt_get_singleton_mutex();
ddsrt_mutex_lock(init_mutex);
dds_global.m_init_count++;
if (dds_global.m_init_count > 1)
{ {
goto skip; ddsrt_mutex_unlock (init_mutex);
return DDS_RETCODE_OK;
} }
gv.tstart = now ();
ddsrt_mutex_init (&dds_global.m_mutex); ddsrt_mutex_init (&dds_global.m_mutex);
thread_states_init_static(); ddsi_iid_init ();
thread_states_init_static ();
thread_states_init (64);
upgrade_main_thread ();
dds__builtin_init_global ();
(void)ddsrt_getenv (DDS_PROJECT_NAME_NOSPACE_CAPS"_URI", &uri); if (dds_handle_server_init () != DDS_RETCODE_OK)
dds_cfgst = config_init (uri);
if (dds_cfgst == NULL)
{
DDS_LOG(DDS_LC_CONFIG, "Failed to parse configuration XML file %s\n", uri);
ret = DDS_RETCODE_ERROR;
goto fail_config;
}
/* if a domain id was explicitly given, check & fix up the configuration */
if (domain != DDS_DOMAIN_DEFAULT)
{
if (domain < 0 || domain > 230)
{
DDS_ERROR("requested domain id %"PRId32" is out of range\n", domain);
ret = DDS_RETCODE_ERROR;
goto fail_config_domainid;
}
else if (config.domainId.isdefault)
{
config.domainId.value = domain;
}
else if (domain != config.domainId.value)
{
DDS_ERROR("requested domain id %"PRId32" is inconsistent with configured value %"PRId32"\n", domain, config.domainId.value);
ret = DDS_RETCODE_ERROR;
goto fail_config_domainid;
}
}
/* The config.domainId can change internally in DDSI. So, remember what the
* main configured domain id is. */
dds_global.m_default_domain = config.domainId.value;
if (rtps_config_prep(dds_cfgst) != 0)
{
DDS_LOG(DDS_LC_CONFIG, "Failed to configure RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_config;
}
upgrade_main_thread();
ddsrt_avl_init(&dds_domaintree_def, &dds_global.m_domains);
/* Start monitoring the liveliness of all threads. */
if (!config.liveliness_monitoring)
gv.threadmon = NULL;
else
{
gv.threadmon = ddsi_threadmon_new ();
if (gv.threadmon == NULL)
{
DDS_ERROR("Failed to create a thread monitor\n");
ret = DDS_RETCODE_OUT_OF_RESOURCES;
goto fail_threadmon_new;
}
}
if (rtps_init () < 0)
{
DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_init;
}
if (dds_handle_server_init (free_via_gc) != DDS_RETCODE_OK)
{ {
DDS_ERROR("Failed to initialize internal handle server\n"); DDS_ERROR("Failed to initialize internal handle server\n");
ret = DDS_RETCODE_ERROR; ret = DDS_RETCODE_ERROR;
goto fail_handleserver; goto fail_handleserver;
} }
dds__builtin_init (); ddsrt_mutex_unlock (init_mutex);
if (rtps_start () < 0)
{
DDS_LOG(DDS_LC_CONFIG, "Failed to start RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_start;
}
if (gv.threadmon && ddsi_threadmon_start(gv.threadmon) < 0)
{
DDS_ERROR("Failed to start the servicelease\n");
ret = DDS_RETCODE_ERROR;
goto fail_threadmon_start;
}
/* Set additional default participant properties */
gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid();
gv.default_plist_pp.present |= PP_PRISMTECH_PROCESS_ID;
gv.default_plist_pp.exec_name = dds_string_alloc(32);
(void) snprintf(gv.default_plist_pp.exec_name, 32, "%s: %u", DDS_PROJECT_NAME, gv.default_plist_pp.process_id);
len = (uint32_t) (13 + strlen(gv.default_plist_pp.exec_name));
gv.default_plist_pp.present |= PP_PRISMTECH_EXEC_NAME;
if (ddsrt_gethostname(hostname, sizeof(hostname)) == DDS_RETCODE_OK)
{
gv.default_plist_pp.node_name = dds_string_dup(hostname);
gv.default_plist_pp.present |= PP_PRISMTECH_NODE_NAME;
}
gv.default_plist_pp.entity_name = dds_alloc(len);
(void) snprintf(gv.default_plist_pp.entity_name, len, "%s<%u>", progname,
gv.default_plist_pp.process_id);
gv.default_plist_pp.present |= PP_ENTITY_NAME;
skip:
ddsrt_mutex_unlock(init_mutex);
return DDS_RETCODE_OK; return DDS_RETCODE_OK;
fail_threadmon_start:
if (gv.threadmon)
ddsi_threadmon_stop (gv.threadmon);
dds_handle_server_fini();
fail_handleserver: fail_handleserver:
rtps_stop ();
fail_rtps_start:
dds__builtin_fini ();
rtps_fini ();
fail_rtps_init:
if (gv.threadmon)
{
ddsi_threadmon_free (gv.threadmon);
gv.threadmon = NULL;
}
fail_threadmon_new:
downgrade_main_thread ();
thread_states_fini();
fail_rtps_config:
fail_config_domainid:
dds_global.m_default_domain = DDS_DOMAIN_DEFAULT;
config_fini (dds_cfgst);
dds_cfgst = NULL;
fail_config:
ddsrt_mutex_destroy (&dds_global.m_mutex); ddsrt_mutex_destroy (&dds_global.m_mutex);
dds_global.m_init_count--; dds_global.m_init_count--;
ddsrt_mutex_unlock(init_mutex); ddsrt_mutex_unlock (init_mutex);
ddsrt_fini(); ddsrt_fini ();
return ret; return ret;
} }
extern void dds_fini (void) extern void dds_fini (void)
{ {
ddsrt_mutex_t *init_mutex; ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
init_mutex = ddsrt_get_singleton_mutex(); ddsrt_mutex_lock (init_mutex);
ddsrt_mutex_lock(init_mutex); assert (dds_global.m_init_count > 0);
assert(dds_global.m_init_count > 0); if (--dds_global.m_init_count == 0)
dds_global.m_init_count--;
if (dds_global.m_init_count == 0)
{ {
if (gv.threadmon) dds_handle_server_fini ();
ddsi_threadmon_stop (gv.threadmon); dds__builtin_fini_global ();
dds_handle_server_fini();
rtps_stop ();
dds__builtin_fini ();
rtps_fini ();
if (gv.threadmon)
ddsi_threadmon_free (gv.threadmon);
gv.threadmon = NULL;
downgrade_main_thread (); downgrade_main_thread ();
thread_states_fini (); thread_states_fini ();
ddsi_iid_fini ();
config_fini (dds_cfgst);
dds_cfgst = NULL;
ddsrt_mutex_destroy (&dds_global.m_mutex); ddsrt_mutex_destroy (&dds_global.m_mutex);
dds_global.m_default_domain = DDS_DOMAIN_DEFAULT;
} }
ddsrt_mutex_unlock(init_mutex); ddsrt_mutex_unlock (init_mutex);
ddsrt_fini(); ddsrt_fini ();
}
void ddsi_plugin_init (void)
{
ddsi_plugin.builtintopic_is_builtintopic = dds__builtin_is_builtintopic;
ddsi_plugin.builtintopic_is_visible = dds__builtin_is_visible;
ddsi_plugin.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry;
ddsi_plugin.builtintopic_write = dds__builtin_write;
}
//provides explicit default domain id.
dds_domainid_t dds_domain_default (void)
{
return dds_global.m_default_domain;
}
dds_return_t
dds__check_domain(
dds_domainid_t domain)
{
dds_return_t ret = DDS_RETCODE_OK;
/* If domain is default: use configured id. */
if (domain != DDS_DOMAIN_DEFAULT)
{
/* Specific domain has to be the same as the configured domain. */
if (domain != dds_global.m_default_domain)
{
DDS_ERROR("Inconsistent domain configuration detected: domain on "
"configuration: %"PRId32", domain %"PRId32"\n", dds_global.m_default_domain, domain);
ret = DDS_RETCODE_ERROR;
}
}
return ret;
} }

View file

@ -27,10 +27,6 @@ DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_participant)
#define DDS_PARTICIPANT_STATUS_MASK (0u) #define DDS_PARTICIPANT_STATUS_MASK (0u)
/* List of created participants */
static dds_entity *dds_pp_head = NULL;
static dds_return_t dds_participant_status_validate (uint32_t mask) static dds_return_t dds_participant_status_validate (uint32_t mask)
{ {
return (mask & ~DDS_PARTICIPANT_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK; return (mask & ~DDS_PARTICIPANT_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK;
@ -40,13 +36,17 @@ static dds_return_t dds_participant_delete (dds_entity *e) ddsrt_nonnull_all;
static dds_return_t dds_participant_delete (dds_entity *e) static dds_return_t dds_participant_delete (dds_entity *e)
{ {
struct dds_domain *dom;
dds_return_t ret;
dds_entity *prev, *iter;
assert (dds_entity_kind (e) == DDS_KIND_PARTICIPANT); assert (dds_entity_kind (e) == DDS_KIND_PARTICIPANT);
thread_state_awake (lookup_thread_state ()); thread_state_awake (lookup_thread_state ());
dds_domain_free (e->m_domain); if ((ret = delete_participant (&e->m_guid)) < 0)
DDS_ERROR ("dds_participant_delete: internal error %"PRId32"\n", ret);
ddsrt_mutex_lock (&dds_global.m_mutex); ddsrt_mutex_lock (&dds_global.m_mutex);
dds_entity *prev, *iter; dom = e->m_domain;
for (iter = dds_pp_head, prev = NULL; iter; prev = iter, iter = iter->m_next) for (iter = dom->ppants, prev = NULL; iter; prev = iter, iter = iter->m_next)
{ {
if (iter == e) if (iter == e)
break; break;
@ -55,11 +55,12 @@ static dds_return_t dds_participant_delete (dds_entity *e)
if (prev) if (prev)
prev->m_next = iter->m_next; prev->m_next = iter->m_next;
else else
dds_pp_head = iter->m_next; dom->ppants = iter->m_next;
ddsrt_mutex_unlock (&dds_global.m_mutex); ddsrt_mutex_unlock (&dds_global.m_mutex);
thread_state_asleep (lookup_thread_state ()); thread_state_asleep (lookup_thread_state ());
/* Every dds_init needs a dds_fini. */ /* Every dds_init needs a dds_fini. */
dds_domain_free (e->m_domain);
dds_fini (); dds_fini ();
return DDS_RETCODE_OK; return DDS_RETCODE_OK;
} }
@ -93,6 +94,7 @@ const struct dds_entity_deriver dds_entity_deriver_participant = {
dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener) dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener)
{ {
dds_domain *dom;
dds_entity_t ret; dds_entity_t ret;
nn_guid_t guid; nn_guid_t guid;
dds_participant * pp; dds_participant * pp;
@ -100,12 +102,11 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
dds_qos_t *new_qos = NULL; dds_qos_t *new_qos = NULL;
/* Make sure DDS instance is initialized. */ /* Make sure DDS instance is initialized. */
if ((ret = dds_init (domain)) != DDS_RETCODE_OK) if ((ret = dds_init ()) < 0)
goto err_dds_init; goto err_dds_init;
/* Check domain id */ if ((ret = dds_domain_create (&dom, domain)) < 0)
if ((ret = dds__check_domain (domain)) != DDS_RETCODE_OK) goto err_domain_create;
goto err_domain_check;
new_qos = dds_create_qos (); new_qos = dds_create_qos ();
if (qos != NULL) if (qos != NULL)
@ -134,13 +135,13 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
pp->m_entity.m_guid = guid; pp->m_entity.m_guid = guid;
pp->m_entity.m_iid = get_entity_instance_id (&guid); pp->m_entity.m_iid = get_entity_instance_id (&guid);
pp->m_entity.m_domain = dds_domain_create (dds_domain_default ()); pp->m_entity.m_domain = dom;
pp->m_builtin_subscriber = 0; pp->m_builtin_subscriber = 0;
/* Add participant to extent */ /* Add participant to extent */
ddsrt_mutex_lock (&dds_global.m_mutex); ddsrt_mutex_lock (&dds_global.m_mutex);
pp->m_entity.m_next = dds_pp_head; pp->m_entity.m_next = pp->m_entity.m_domain->ppants;
dds_pp_head = &pp->m_entity; pp->m_entity.m_domain->ppants = &pp->m_entity;
ddsrt_mutex_unlock (&dds_global.m_mutex); ddsrt_mutex_unlock (&dds_global.m_mutex);
return ret; return ret;
@ -149,7 +150,8 @@ err_entity_init:
err_new_participant: err_new_participant:
err_qos_validation: err_qos_validation:
dds_delete_qos (new_qos); dds_delete_qos (new_qos);
err_domain_check: dds_domain_free (dom);
err_domain_create:
dds_fini (); dds_fini ();
err_dds_init: err_dds_init:
return ret; return ret;
@ -157,16 +159,11 @@ err_dds_init:
dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *participants, size_t size) dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *participants, size_t size)
{ {
ddsrt_mutex_t *init_mutex; if ((participants != NULL && (size <= 0 || size >= INT32_MAX)) || (participants == NULL && size != 0))
return DDS_RETCODE_BAD_PARAMETER;
ddsrt_init (); ddsrt_init ();
init_mutex = ddsrt_get_singleton_mutex (); ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
if ((participants != NULL && (size <= 0 || size >= INT32_MAX)) || (participants == NULL && size != 0))
{
ddsrt_fini ();
return DDS_RETCODE_BAD_PARAMETER;
}
if (participants) if (participants)
participants[0] = 0; participants[0] = 0;
@ -175,10 +172,11 @@ dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *par
ddsrt_mutex_lock (init_mutex); ddsrt_mutex_lock (init_mutex);
if (dds_global.m_init_count > 0) if (dds_global.m_init_count > 0)
{ {
struct dds_domain *dom;
ddsrt_mutex_lock (&dds_global.m_mutex); ddsrt_mutex_lock (&dds_global.m_mutex);
for (dds_entity *iter = dds_pp_head; iter; iter = iter->m_next) if ((dom = dds_domain_find_locked (domain_id)) != NULL)
{ {
if (iter->m_domain->m_id == domain_id) for (dds_entity *iter = dom->ppants; iter; iter = iter->m_next)
{ {
if ((size_t) ret < size) if ((size_t) ret < size)
participants[ret] = iter->m_hdllink.hdl; participants[ret] = iter->m_hdllink.hdl;

View file

@ -16,6 +16,7 @@
#include "dds__participant.h" #include "dds__participant.h"
#include "dds__publisher.h" #include "dds__publisher.h"
#include "dds__qos.h" #include "dds__qos.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_globals.h"
#include "dds/version.h" #include "dds/version.h"

View file

@ -370,7 +370,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
/* Additional checks required for built-in topics: we don't want to /* Additional checks required for built-in topics: we don't want to
run into a resource limit on a built-in topic, it is a needless run into a resource limit on a built-in topic, it is a needless
complication */ complication */
if (internal_topic && !dds__validate_builtin_reader_qos (topic, rqos)) if (internal_topic && !dds__validate_builtin_reader_qos (tp->m_entity.m_domain, topic, rqos))
{ {
dds_delete_qos (rqos); dds_delete_qos (rqos);
reader = DDS_RETCODE_INCONSISTENT_POLICY; reader = DDS_RETCODE_INCONSISTENT_POLICY;

View file

@ -14,6 +14,7 @@
#include "dds__participant.h" #include "dds__participant.h"
#include "dds__subscriber.h" #include "dds__subscriber.h"
#include "dds__qos.h" #include "dds__qos.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_globals.h"
#include "dds/ddsrt/heap.h" #include "dds/ddsrt/heap.h"

View file

@ -19,6 +19,7 @@
#include "dds/ddsi/q_config.h" #include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_ephash.h"
#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_tkmap.h" #include "dds/ddsi/ddsi_tkmap.h"
#include "dds__serdata_builtintopic.h" #include "dds__serdata_builtintopic.h"
#include "dds__whc_builtintopic.h" #include "dds__whc_builtintopic.h"
@ -62,7 +63,7 @@ static void bwhc_sample_iter_init (const struct whc *whc_generic, struct whc_sam
static bool is_visible (const struct entity_common *e) static bool is_visible (const struct entity_common *e)
{ {
const nn_vendorid_t vendorid = get_entity_vendorid (e); const nn_vendorid_t vendorid = get_entity_vendorid (e);
return ddsi_plugin.builtintopic_is_visible (&e->guid, vendorid); return builtintopic_is_visible (gv.builtin_topic_interface, &e->guid, vendorid);
} }
static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample) static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample)

View file

@ -77,6 +77,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
ddsi_tkmap.h ddsi_tkmap.h
ddsi_vendor.h ddsi_vendor.h
ddsi_threadmon.h ddsi_threadmon.h
ddsi_builtin_topic_if.h
q_addrset.h q_addrset.h
q_bitset.h q_bitset.h
q_bswap.h q_bswap.h

View file

@ -0,0 +1,53 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef _DDSI_BUILTIN_TOPIC_IF_H_
#define _DDSI_BUILTIN_TOPIC_IF_H_
#include "dds/ddsi/ddsi_vendor.h"
#include "dds/ddsi/q_time.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct entity_common;
struct ddsi_tkmap_instance;
struct ddsi_sertopic;
struct nn_guid;
struct ddsi_builtin_topic_interface {
void *arg;
bool (*builtintopic_is_builtintopic) (const struct ddsi_sertopic *topic, void *arg);
bool (*builtintopic_is_visible) (const struct nn_guid *guid, nn_vendorid_t vendorid, void *arg);
struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid, void *arg);
void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive, void *arg);
};
inline bool builtintopic_is_visible (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid, nn_vendorid_t vendorid) {
return btif ? btif->builtintopic_is_visible (guid, vendorid, btif->arg) : false;
}
inline bool builtintopic_is_builtintopic (const struct ddsi_builtin_topic_interface *btif, const struct ddsi_sertopic *topic) {
return btif ? btif->builtintopic_is_builtintopic (topic, btif->arg) : false;
}
inline struct ddsi_tkmap_instance *builtintopic_get_tkmap_entry (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid) {
return btif ? btif->builtintopic_get_tkmap_entry (guid, btif->arg) : NULL;
}
inline void builtintopic_write (const struct ddsi_builtin_topic_interface *btif, const struct entity_common *e, nn_wctime_t timestamp, bool alive) {
if (btif) btif->builtintopic_write (e, timestamp, alive, btif->arg);
}
#if defined (__cplusplus)
}
#endif
#endif

View file

@ -392,18 +392,7 @@ struct config
struct prune_deleted_ppant prune_deleted_ppant; struct prune_deleted_ppant prune_deleted_ppant;
}; };
struct ddsi_sertopic;
struct entity_common;
struct ddsi_plugin
{
bool (*builtintopic_is_builtintopic) (const struct ddsi_sertopic *topic);
bool (*builtintopic_is_visible) (const nn_guid_t *guid, nn_vendorid_t vendorid);
struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid);
void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
};
extern struct config DDS_EXPORT config; extern struct config DDS_EXPORT config;
extern struct ddsi_plugin ddsi_plugin;
struct cfgst; struct cfgst;

View file

@ -24,7 +24,6 @@
#include "dds/ddsi/q_protocol.h" #include "dds/ddsi/q_protocol.h"
#include "dds/ddsi/q_nwif.h" #include "dds/ddsi/q_nwif.h"
#include "dds/ddsi/q_sockwaitset.h" #include "dds/ddsi/q_sockwaitset.h"
#include "dds/ddsi/ddsi_iid.h"
#ifdef DDSI_INCLUDE_ENCRYPTION #ifdef DDSI_INCLUDE_ENCRYPTION
#include "dds/ddsi/q_security.h" /* for q_securityDecoderSet */ #include "dds/ddsi/q_security.h" /* for q_securityDecoderSet */
@ -91,7 +90,6 @@ struct q_globals {
volatile int mute; volatile int mute;
struct ddsi_tkmap * m_tkmap; struct ddsi_tkmap * m_tkmap;
struct ddsi_iid dds_iid;
/* Hash tables for participants, readers, writers, proxy /* Hash tables for participants, readers, writers, proxy
participants, proxy readers and proxy writers by GUID participants, proxy readers and proxy writers by GUID
@ -296,6 +294,8 @@ struct q_globals {
FILE *pcap_fp; FILE *pcap_fp;
ddsrt_mutex_t pcap_lock; ddsrt_mutex_t pcap_lock;
struct ddsi_builtin_topic_interface *builtin_topic_interface;
struct nn_group_membership *mship; struct nn_group_membership *mship;
}; };

View file

@ -78,7 +78,6 @@ int rtps_config_prep (struct cfgst *cfgst);
int rtps_config_open (void); int rtps_config_open (void);
int rtps_init (void); int rtps_init (void);
int rtps_start (void); int rtps_start (void);
void ddsi_plugin_init (void);
void rtps_stop (void); void rtps_stop (void);
void rtps_fini (void); void rtps_fini (void);

View file

@ -11,10 +11,11 @@
*/ */
#include "dds/ddsrt/atomics.h" #include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/process.h" #include "dds/ddsrt/process.h"
#include "dds/ddsrt/random.h"
#include "dds/ddsrt/sync.h" #include "dds/ddsrt/sync.h"
#include "dds/ddsi/ddsi_iid.h" #include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_time.h"
#include "dds/ddsi/q_globals.h" static struct ddsi_iid dds_iid;
static void dds_tea_encrypt (uint32_t v[2], const uint32_t k[4]) static void dds_tea_encrypt (uint32_t v[2], const uint32_t k[4])
{ {
@ -49,14 +50,14 @@ uint64_t ddsi_iid_gen (void)
union { uint64_t u64; uint32_t u32[2]; } tmp; union { uint64_t u64; uint32_t u32[2]; } tmp;
#if DDSRT_ATOMIC64_SUPPORT #if DDSRT_ATOMIC64_SUPPORT
tmp.u64 = ddsrt_atomic_inc64_nv (&gv.dds_iid.counter); tmp.u64 = ddsrt_atomic_inc64_nv (&dds_iid.counter);
#else #else
ddsrt_mutex_lock (&gv.dds_iid.lock); ddsrt_mutex_lock (&dds_iid.lock);
tmp.u64 = ++gv.dds_iid.counter; tmp.u64 = ++dds_iid.counter;
ddsrt_mutex_unlock (&gv.dds_iid.lock); ddsrt_mutex_unlock (&dds_iid.lock);
#endif #endif
dds_tea_encrypt (tmp.u32, gv.dds_iid.key); dds_tea_encrypt (tmp.u32, dds_iid.key);
iid = tmp.u64; iid = tmp.u64;
return iid; return iid;
} }
@ -64,29 +65,26 @@ uint64_t ddsi_iid_gen (void)
void ddsi_iid_init (void) void ddsi_iid_init (void)
{ {
union { uint64_t u64; uint32_t u32[2]; } tmp; union { uint64_t u64; uint32_t u32[2]; } tmp;
nn_wctime_t tnow = now ();
#if ! DDSRT_ATOMIC64_SUPPORT #if ! DDSRT_ATOMIC64_SUPPORT
ddsrt_mutex_init (&gv.dds_iid.lock); ddsrt_mutex_init (&dds_iid.lock);
#endif #endif
gv.dds_iid.key[0] = (uint32_t) ddsrt_getpid(); for (size_t i = 0; i < sizeof (dds_iid.key) / sizeof (dds_iid.key[0]); i++)
gv.dds_iid.key[1] = (uint32_t) tnow.v; dds_iid.key[0] = ddsrt_random ();
gv.dds_iid.key[2] = (uint32_t) (tnow.v >> 32);
gv.dds_iid.key[3] = 0xdeadbeef;
tmp.u64 = 0; tmp.u64 = 0;
dds_tea_decrypt (tmp.u32, gv.dds_iid.key); dds_tea_decrypt (tmp.u32, dds_iid.key);
#if DDSRT_ATOMIC64_SUPPORT #if DDSRT_ATOMIC64_SUPPORT
ddsrt_atomic_st64 (&gv.dds_iid.counter, tmp.u64); ddsrt_atomic_st64 (&dds_iid.counter, tmp.u64);
#else #else
gv.dds_iid.counter = tmp.u64; dds_iid.counter = tmp.u64;
#endif #endif
} }
void ddsi_iid_fini (void) void ddsi_iid_fini (void)
{ {
#if ! DDSRT_ATOMIC64_SUPPORT #if ! DDSRT_ATOMIC64_SUPPORT
ddsrt_mutex_destroy (&gv.dds_iid.lock); ddsrt_mutex_destroy (&dds_iid.lock);
#endif #endif
} }

View file

@ -844,7 +844,6 @@ static const struct cfgelem root_cfgelem = {
#undef CO #undef CO
struct config config; struct config config;
struct ddsi_plugin ddsi_plugin;
static const struct unit unittab_duration[] = { static const struct unit unittab_duration[] = {
{ "ns", 1 }, { "ns", 1 },

View file

@ -104,6 +104,11 @@ static int gcreq_proxy_participant (struct proxy_participant *proxypp);
static int gcreq_proxy_writer (struct proxy_writer *pwr); static int gcreq_proxy_writer (struct proxy_writer *pwr);
static int gcreq_proxy_reader (struct proxy_reader *prd); static int gcreq_proxy_reader (struct proxy_reader *prd);
extern inline bool builtintopic_is_visible (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid, nn_vendorid_t vendorid);
extern inline bool builtintopic_is_builtintopic (const struct ddsi_builtin_topic_interface *btif, const struct ddsi_sertopic *topic);
extern inline struct ddsi_tkmap_instance *builtintopic_get_tkmap_entry (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid);
extern inline void builtintopic_write (const struct ddsi_builtin_topic_interface *btif, const struct entity_common *e, nn_wctime_t timestamp, bool alive);
static int compare_guid (const void *va, const void *vb) static int compare_guid (const void *va, const void *vb)
{ {
return memcmp (va, vb, sizeof (nn_guid_t)); return memcmp (va, vb, sizeof (nn_guid_t));
@ -174,9 +179,9 @@ static void entity_common_init (struct entity_common *e, const struct nn_guid *g
e->name = ddsrt_strdup (name ? name : ""); e->name = ddsrt_strdup (name ? name : "");
e->onlylocal = onlylocal; e->onlylocal = onlylocal;
ddsrt_mutex_init (&e->lock); ddsrt_mutex_init (&e->lock);
if (ddsi_plugin.builtintopic_is_visible (guid, vendorid)) if (builtintopic_is_visible (gv.builtin_topic_interface, guid, vendorid))
{ {
e->tk = ddsi_plugin.builtintopic_get_tkmap_entry (guid); e->tk = builtintopic_get_tkmap_entry (gv.builtin_topic_interface, guid);
e->iid = e->tk->m_iid; e->iid = e->tk->m_iid;
} }
else else
@ -407,7 +412,7 @@ static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, cons
nn_xqos_fini_mask (ent_qos, mask); nn_xqos_fini_mask (ent_qos, mask);
nn_xqos_mergein_missing (ent_qos, xqos, mask); nn_xqos_mergein_missing (ent_qos, xqos, mask);
ddsi_plugin.builtintopic_write (e, timestamp, true); builtintopic_write (gv.builtin_topic_interface, e, timestamp, true);
return true; return true;
} }
@ -662,7 +667,7 @@ dds_return_t new_participant_guid (const nn_guid_t *ppguid, unsigned flags, cons
trigger_recv_threads (); trigger_recv_threads ();
} }
ddsi_plugin.builtintopic_write (&pp->e, now(), true); builtintopic_write (gv.builtin_topic_interface, &pp->e, now(), true);
/* SPDP periodic broadcast uses the retransmit path, so the initial /* SPDP periodic broadcast uses the retransmit path, so the initial
publication must be done differently. Must be later than making publication must be done differently. Must be later than making
@ -906,7 +911,7 @@ dds_return_t delete_participant (const struct nn_guid *ppguid)
struct participant *pp; struct participant *pp;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL) if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
return DDS_RETCODE_BAD_PARAMETER; return DDS_RETCODE_BAD_PARAMETER;
ddsi_plugin.builtintopic_write (&pp->e, now(), false); builtintopic_write (gv.builtin_topic_interface, &pp->e, now(), false);
remember_deleted_participant_guid (gv.deleted_participants, &pp->e.guid); remember_deleted_participant_guid (gv.deleted_participants, &pp->e.guid);
ephash_remove_participant_guid (pp); ephash_remove_participant_guid (pp);
gcreq_participant (pp); gcreq_participant (pp);
@ -2857,7 +2862,7 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct nn_gui
delete_participant won't interfere with our ability to address delete_participant won't interfere with our ability to address
the participant */ the participant */
const bool onlylocal = topic && ddsi_plugin.builtintopic_is_builtintopic (topic); const bool onlylocal = topic && builtintopic_is_builtintopic (gv.builtin_topic_interface, topic);
endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, onlylocal); endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, onlylocal);
new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity); new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity);
@ -2868,7 +2873,7 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct nn_gui
the other. */ the other. */
ddsrt_mutex_lock (&wr->e.lock); ddsrt_mutex_lock (&wr->e.lock);
ephash_insert_writer_guid (wr); ephash_insert_writer_guid (wr);
ddsi_plugin.builtintopic_write (&wr->e, now(), true); builtintopic_write (gv.builtin_topic_interface, &wr->e, now(), true);
ddsrt_mutex_unlock (&wr->e.lock); ddsrt_mutex_unlock (&wr->e.lock);
/* once it exists, match it with proxy writers and broadcast /* once it exists, match it with proxy writers and broadcast
@ -2927,7 +2932,7 @@ struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, str
memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid)); memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid));
new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL); new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL);
ephash_insert_writer_guid (wr); ephash_insert_writer_guid (wr);
ddsi_plugin.builtintopic_write (&wr->e, now(), true); builtintopic_write (gv.builtin_topic_interface, &wr->e, now(), true);
match_writer_with_local_readers (wr, tnow); match_writer_with_local_readers (wr, tnow);
return lowr; return lowr;
} }
@ -3034,7 +3039,7 @@ dds_return_t delete_writer_nolinger_locked (struct writer *wr)
{ {
DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid "PGUIDFMT") ...\n", PGUID (wr->e.guid)); DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid "PGUIDFMT") ...\n", PGUID (wr->e.guid));
ASSERT_MUTEX_HELD (&wr->e.lock); ASSERT_MUTEX_HELD (&wr->e.lock);
ddsi_plugin.builtintopic_write (&wr->e, now(), false); builtintopic_write (gv.builtin_topic_interface, &wr->e, now(), false);
local_reader_ary_setinvalid (&wr->rdary); local_reader_ary_setinvalid (&wr->rdary);
ephash_remove_writer_guid (wr); ephash_remove_writer_guid (wr);
writer_set_state (wr, WRST_DELETING); writer_set_state (wr, WRST_DELETING);
@ -3235,7 +3240,7 @@ static dds_return_t new_reader_guid
if (rd_out) if (rd_out)
*rd_out = rd; *rd_out = rd;
const bool onlylocal = topic && ddsi_plugin.builtintopic_is_builtintopic (topic); const bool onlylocal = topic && builtintopic_is_builtintopic (gv.builtin_topic_interface, topic);
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, onlylocal); endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, onlylocal);
/* Copy QoS, merging in defaults */ /* Copy QoS, merging in defaults */
@ -3336,7 +3341,7 @@ static dds_return_t new_reader_guid
ddsrt_mutex_lock (&rd->e.lock); ddsrt_mutex_lock (&rd->e.lock);
ephash_insert_reader_guid (rd); ephash_insert_reader_guid (rd);
ddsi_plugin.builtintopic_write (&rd->e, now(), true); builtintopic_write (gv.builtin_topic_interface, &rd->e, now(), true);
ddsrt_mutex_unlock (&rd->e.lock); ddsrt_mutex_unlock (&rd->e.lock);
match_reader_with_proxy_writers (rd, tnow); match_reader_with_proxy_writers (rd, tnow);
@ -3429,7 +3434,7 @@ dds_return_t delete_reader (const struct nn_guid *guid)
return DDS_RETCODE_BAD_PARAMETER; return DDS_RETCODE_BAD_PARAMETER;
} }
DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid "PGUIDFMT") ...\n", PGUID (*guid)); DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid "PGUIDFMT") ...\n", PGUID (*guid));
ddsi_plugin.builtintopic_write (&rd->e, now(), false); builtintopic_write (gv.builtin_topic_interface, &rd->e, now(), false);
ephash_remove_reader_guid (rd); ephash_remove_reader_guid (rd);
gcreq_reader (rd); gcreq_reader (rd);
return 0; return 0;
@ -3665,7 +3670,7 @@ void new_proxy_participant
if (proxypp->owns_lease) if (proxypp->owns_lease)
lease_register (ddsrt_atomic_ldvoidp (&proxypp->lease)); lease_register (ddsrt_atomic_ldvoidp (&proxypp->lease));
ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true); builtintopic_write (gv.builtin_topic_interface, &proxypp->e, timestamp, true);
ddsrt_mutex_unlock (&proxypp->e.lock); ddsrt_mutex_unlock (&proxypp->e.lock);
} }
@ -3915,7 +3920,7 @@ int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t t
return DDS_RETCODE_BAD_PARAMETER; return DDS_RETCODE_BAD_PARAMETER;
} }
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
ddsi_plugin.builtintopic_write (&ppt->e, timestamp, false); builtintopic_write (gv.builtin_topic_interface, &ppt->e, timestamp, false);
remember_deleted_participant_guid (gv.deleted_participants, &ppt->e.guid); remember_deleted_participant_guid (gv.deleted_participants, &ppt->e.guid);
ephash_remove_proxy_participant_guid (ppt); ephash_remove_proxy_participant_guid (ppt);
ddsrt_mutex_unlock (&gv.lock); ddsrt_mutex_unlock (&gv.lock);
@ -4161,7 +4166,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
/* locking the entity prevents matching while the built-in topic hasn't been published yet */ /* locking the entity prevents matching while the built-in topic hasn't been published yet */
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
ephash_insert_proxy_writer_guid (pwr); ephash_insert_proxy_writer_guid (pwr);
ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true); builtintopic_write (gv.builtin_topic_interface, &pwr->e, timestamp, true);
ddsrt_mutex_unlock (&pwr->e.lock); ddsrt_mutex_unlock (&pwr->e.lock);
match_proxy_writer_with_readers (pwr, tnow); match_proxy_writer_with_readers (pwr, tnow);
@ -4299,7 +4304,7 @@ int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int
from removing themselves from the proxy writer's rdary[]. */ from removing themselves from the proxy writer's rdary[]. */
local_reader_ary_setinvalid (&pwr->rdary); local_reader_ary_setinvalid (&pwr->rdary);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
ddsi_plugin.builtintopic_write (&pwr->e, timestamp, false); builtintopic_write (gv.builtin_topic_interface, &pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (pwr); ephash_remove_proxy_writer_guid (pwr);
ddsrt_mutex_unlock (&gv.lock); ddsrt_mutex_unlock (&gv.lock);
gcreq_proxy_writer (pwr); gcreq_proxy_writer (pwr);
@ -4341,7 +4346,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
/* locking the entity prevents matching while the built-in topic hasn't been published yet */ /* locking the entity prevents matching while the built-in topic hasn't been published yet */
ddsrt_mutex_lock (&prd->e.lock); ddsrt_mutex_lock (&prd->e.lock);
ephash_insert_proxy_reader_guid (prd); ephash_insert_proxy_reader_guid (prd);
ddsi_plugin.builtintopic_write (&prd->e, timestamp, true); builtintopic_write (gv.builtin_topic_interface, &prd->e, timestamp, true);
ddsrt_mutex_unlock (&prd->e.lock); ddsrt_mutex_unlock (&prd->e.lock);
match_proxy_reader_with_writers (prd, tnow); match_proxy_reader_with_writers (prd, tnow);
@ -4426,7 +4431,7 @@ int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int
DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n"); DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n");
return DDS_RETCODE_BAD_PARAMETER; return DDS_RETCODE_BAD_PARAMETER;
} }
ddsi_plugin.builtintopic_write (&prd->e, timestamp, false); builtintopic_write (gv.builtin_topic_interface, &prd->e, timestamp, false);
ephash_remove_proxy_reader_guid (prd); ephash_remove_proxy_reader_guid (prd);
ddsrt_mutex_unlock (&gv.lock); ddsrt_mutex_unlock (&gv.lock);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");

View file

@ -569,7 +569,7 @@ int rtps_config_prep (struct cfgst *cfgst)
lease, gc, debmon; once thread state admin has been inited, upgrade the lease, gc, debmon; once thread state admin has been inited, upgrade the
main thread one participating in the thread tracking stuff as main thread one participating in the thread tracking stuff as
if it had been created using create_thread(). */ if it had been created using create_thread(). */
#if 0 /* FIXME: threads are per-process, not per-domain */
{ {
/* Temporary: thread states for each application thread is managed using thread_states structure /* Temporary: thread states for each application thread is managed using thread_states structure
*/ */
@ -582,6 +582,7 @@ int rtps_config_prep (struct cfgst *cfgst)
#endif #endif
thread_states_init (max_threads); thread_states_init (max_threads);
} }
#endif
/* Now the per-thread-log-buffers are set up, so print the configuration. After this there /* Now the per-thread-log-buffers are set up, so print the configuration. After this there
is no value to the source information for the various configuration elements, so free those. */ is no value to the source information for the various configuration elements, so free those. */
@ -852,8 +853,6 @@ int rtps_init (void)
gv.tstart = now (); /* wall clock time, used in logs */ gv.tstart = now (); /* wall clock time, used in logs */
ddsi_plugin_init ();
ddsi_iid_init ();
nn_plist_init_tables (); nn_plist_init_tables ();
gv.disc_conn_uc = NULL; gv.disc_conn_uc = NULL;
@ -1327,7 +1326,6 @@ err_unicast_sockets:
nn_plist_fini (&gv.default_plist_pp); nn_plist_fini (&gv.default_plist_pp);
ddsi_serdatapool_free (gv.serpool); ddsi_serdatapool_free (gv.serpool);
nn_xmsgpool_free (gv.xmsgpool); nn_xmsgpool_free (gv.xmsgpool);
ddsi_iid_fini ();
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
err_network_partition_addrset: err_network_partition_addrset:
for (struct config_networkpartition_listelem *np = config.networkPartitions; np; np = np->next) for (struct config_networkpartition_listelem *np = config.networkPartitions; np; np = np->next)
@ -1674,6 +1672,5 @@ void rtps_fini (void)
ddsi_serdatapool_free (gv.serpool); ddsi_serdatapool_free (gv.serpool);
nn_xmsgpool_free (gv.xmsgpool); nn_xmsgpool_free (gv.xmsgpool);
ddsi_iid_fini ();
DDS_LOG(DDS_LC_CONFIG, "Finis.\n"); DDS_LOG(DDS_LC_CONFIG, "Finis.\n");
} }