diff --git a/src/core/ddsc/src/dds__builtin.h b/src/core/ddsc/src/dds__builtin.h index 767396e..5d01346 100644 --- a/src/core/ddsc/src/dds__builtin.h +++ b/src/core/ddsc/src/dds__builtin.h @@ -12,7 +12,7 @@ #ifndef _DDS_BUILTIN_H_ #define _DDS_BUILTIN_H_ -#include "dds/ddsi/q_time.h" +#include "dds/ddsi/ddsi_builtin_topic_if.h" #if defined (__cplusplus) extern "C" @@ -20,25 +20,23 @@ extern "C" #endif /* Get actual topic in related participant related to topic 'id'. */ -dds_entity_t dds__get_builtin_topic ( dds_entity_t e, dds_entity_t topic); +dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic); /* Subscriber singleton within related participant. */ -dds_entity_t dds__get_builtin_subscriber(dds_entity_t e); +dds_entity_t dds__get_builtin_subscriber (dds_entity_t e); /* Checks whether the reader QoS is valid for use with built-in topic TOPIC */ -bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos); +bool dds__validate_builtin_reader_qos (const dds_domain *dom, dds_entity_t topic, const dds_qos_t *qos); + +/* Init/fini for builtin-topic support that is global across domains */ +void dds__builtin_init_global (void); +void dds__builtin_fini_global (void); + +void dds__builtin_init (struct dds_domain *dom); +void dds__builtin_fini (struct dds_domain *dom); struct entity_common; -struct nn_guid; -struct ddsi_tkmap_instance; - -void dds__builtin_init (void); -void dds__builtin_fini (void); -bool dds__builtin_is_builtintopic (const struct ddsi_sertopic *tp); -bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendorid); -struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid); struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive); -void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__domain.h b/src/core/ddsc/src/dds__domain.h index 0b13070..2e3baaa 100644 --- a/src/core/ddsc/src/dds__domain.h +++ b/src/core/ddsc/src/dds__domain.h @@ -18,11 +18,9 @@ extern "C" { #endif -extern DDS_EXPORT const ddsrt_avl_treedef_t dds_domaintree_def; - -DDS_EXPORT dds_domain * dds_domain_create (dds_domainid_t id); -DDS_EXPORT void dds_domain_free (dds_domain * domain); -DDS_EXPORT dds_domain * dds_domain_find_locked (dds_domainid_t id); +DDS_EXPORT dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id); +DDS_EXPORT void dds_domain_free (dds_domain *domain); +DDS_EXPORT dds_domain *dds_domain_find_locked (dds_domainid_t id); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__handles.h b/src/core/ddsc/src/dds__handles.h index 70321d3..46bf594 100644 --- a/src/core/ddsc/src/dds__handles.h +++ b/src/core/ddsc/src/dds__handles.h @@ -78,7 +78,7 @@ struct dds_handle_link { * Initialize handleserver singleton. */ DDS_EXPORT dds_return_t -dds_handle_server_init(void (*free_via_gc) (void *x)); +dds_handle_server_init(void); /* diff --git a/src/core/ddsc/src/dds__init.h b/src/core/ddsc/src/dds__init.h index 859a459..57d41a3 100644 --- a/src/core/ddsc/src/dds__init.h +++ b/src/core/ddsc/src/dds__init.h @@ -18,9 +18,6 @@ extern "C" { #endif -dds_return_t -dds__check_domain(dds_domainid_t domain); - /** *Description : Initialization function, called from main. This operation *initializes all the required DDS resources, @@ -30,8 +27,7 @@ dds__check_domain(dds_domainid_t domain); *Arguments : *-# Returns 0 on success or a non-zero error status **/ -dds_return_t -dds_init(dds_domainid_t domain); +dds_return_t dds_init (void); /* Finalization function, called from main */ @@ -42,16 +38,7 @@ dds_init(dds_domainid_t domain); *Arguments : *-# None **/ -void -dds_fini(void); - -/** - * Description : Function that provides the explicit ID of default domain - * It should be called after DDS initialization. - * @return Valid domain id. Undetermined behaviour if DDS is not initialized. - */ -dds_domainid_t dds_domain_default (void); - +void dds_fini (void); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index e105d6a..54d62ba 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -18,6 +18,7 @@ #include "dds/ddsrt/sync.h" #include "dds/ddsi/q_rtps.h" #include "dds/ddsrt/avl.h" +#include "dds/ddsi/ddsi_builtin_topic_if.h" #include "dds__handles.h" #if defined (__cplusplus) @@ -92,10 +93,19 @@ struct dds_listener { #define DDS_ENTITY_IMPLICIT 0x0002u typedef struct dds_domain { + /* FIXME: protected by dds_global.lock -- for now */ ddsrt_avl_node_t m_node; dds_domainid_t m_id; ddsrt_avl_tree_t m_topics; + struct dds_entity *ppants; uint32_t m_refc; + struct cfgst *cfgst; + + struct local_orphan_writer *builtintopic_writer_participant; + struct local_orphan_writer *builtintopic_writer_publications; + struct local_orphan_writer *builtintopic_writer_subscriptions; + + struct ddsi_builtin_topic_interface btif; } dds_domain; struct dds_entity; @@ -295,10 +305,13 @@ typedef struct dds_waitset { /* Globals */ typedef struct dds_globals { - dds_domainid_t m_default_domain; int32_t m_init_count; ddsrt_avl_tree_t m_domains; ddsrt_mutex_t m_mutex; + + struct ddsi_sertopic *builtin_participant_topic; + struct ddsi_sertopic *builtin_reader_topic; + struct ddsi_sertopic *builtin_writer_topic; } dds_globals; DDS_EXPORT extern dds_globals dds_global; diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index c663bc4..6a89b86 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -15,6 +15,7 @@ #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_config.h" +#include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_plist.h" /* for nn_keyhash */ #include "dds__init.h" #include "dds__domain.h" @@ -29,13 +30,6 @@ #include "dds/ddsi/q_qosmatch.h" #include "dds/ddsi/ddsi_tkmap.h" -static struct ddsi_sertopic *builtin_participant_topic; -static struct ddsi_sertopic *builtin_reader_topic; -static struct ddsi_sertopic *builtin_writer_topic; -static struct local_orphan_writer *builtintopic_writer_participant; -static struct local_orphan_writer *builtintopic_writer_publications; -static struct local_orphan_writer *builtintopic_writer_subscriptions; - static dds_qos_t *dds__create_builtin_qos (void) { const char *partition = "__BUILT-IN PARTITION__"; @@ -47,58 +41,37 @@ static dds_qos_t *dds__create_builtin_qos (void) return qos; } -void dds__builtin_init (void) +dds_entity_t dds__get_builtin_topic (dds_entity_t entity, dds_entity_t topic) { - dds_qos_t *qos = dds__create_builtin_qos (); - - builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant"); - builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription"); - builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication"); - - builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT)); - builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER)); - builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER)); - - dds_delete_qos (qos); -} - -void dds__builtin_fini (void) -{ - /* No more sources for builtin topic samples */ - thread_state_awake (lookup_thread_state ()); - delete_local_orphan_writer (builtintopic_writer_participant); - delete_local_orphan_writer (builtintopic_writer_publications); - delete_local_orphan_writer (builtintopic_writer_subscriptions); - thread_state_asleep (lookup_thread_state ()); - - ddsi_sertopic_unref (builtin_participant_topic); - ddsi_sertopic_unref (builtin_reader_topic); - ddsi_sertopic_unref (builtin_writer_topic); -} - -dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic) -{ - dds_entity_t pp; dds_entity_t tp; + dds_return_t rc; + dds_entity *e; - if ((pp = dds_get_participant (e)) <= 0) - return pp; + if ((rc = dds_entity_pin (entity, &e)) < 0) + return rc; struct ddsi_sertopic *sertopic; - if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { - sertopic = builtin_participant_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { - sertopic = builtin_writer_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { - sertopic = builtin_reader_topic; - } else { - assert (0); - return DDS_RETCODE_BAD_PARAMETER; + switch (topic) + { + case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT: + sertopic = dds_global.builtin_participant_topic; + break; + case DDS_BUILTIN_TOPIC_DCPSPUBLICATION: + sertopic = dds_global.builtin_writer_topic; + break; + case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION: + sertopic = dds_global.builtin_reader_topic; + break; + default: + assert (0); + dds_entity_unpin (e); + return DDS_RETCODE_BAD_PARAMETER; } dds_qos_t *qos = dds__create_builtin_qos (); - tp = dds_create_topic_arbitrary (pp, sertopic, qos, NULL, NULL); + tp = dds_create_topic_arbitrary (e->m_participant->m_hdllink.hdl, sertopic, qos, NULL, NULL); dds_delete_qos (qos); + dds_entity_unpin (e); return tp; } @@ -110,7 +83,7 @@ static bool qos_has_resource_limits (const dds_qos_t *qos) qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED); } -bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos) +bool dds__validate_builtin_reader_qos (const dds_domain *dom, dds_entity_t topic, const dds_qos_t *qos) { if (qos == NULL) /* default QoS inherited from topic is ok by definition */ @@ -121,15 +94,20 @@ bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos) forbid the creation of a reader matching a built-in topics writer that has resource limits */ struct local_orphan_writer *bwr; - if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { - bwr = builtintopic_writer_participant; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { - bwr = builtintopic_writer_publications; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { - bwr = builtintopic_writer_subscriptions; - } else { - assert (0); - return false; + switch (topic) + { + case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT: + bwr = dom->builtintopic_writer_participant; + break; + case DDS_BUILTIN_TOPIC_DCPSPUBLICATION: + bwr = dom->builtintopic_writer_publications; + break; + case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION: + bwr = dom->builtintopic_writer_subscriptions; + break; + default: + assert (0); + return false; } /* FIXME: DDSI-level readers, writers have topic, type name in their QoS, but @@ -171,26 +149,29 @@ dds_entity_t dds__get_builtin_subscriber (dds_entity_t e) return sub; } -bool dds__builtin_is_builtintopic (const struct ddsi_sertopic *tp) +static bool dds__builtin_is_builtintopic (const struct ddsi_sertopic *tp, void *vdomain) { + (void) vdomain; return tp->ops == &ddsi_sertopic_ops_builtintopic; } -bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendorid) +static bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendorid, void *vdomain) { + (void) vdomain; if (is_builtin_endpoint (guid->entityid, vendorid)) return false; return true; } -struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid) +static struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid, void *vdomain) { struct ddsi_tkmap_instance *tk; struct ddsi_serdata *sd; struct nn_keyhash kh; + (void) vdomain; memcpy (&kh, guid, sizeof (kh)); /* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */ - sd = ddsi_serdata_from_keyhash (builtin_participant_topic, &kh); + sd = ddsi_serdata_from_keyhash (dds_global.builtin_participant_topic, &kh); tk = ddsi_tkmap_find (sd, false, true); ddsi_serdata_unref (sd); return tk; @@ -206,15 +187,15 @@ struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn { case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: - topic = builtin_participant_topic; + topic = dds_global.builtin_participant_topic; break; case EK_WRITER: case EK_PROXY_WRITER: - topic = builtin_writer_topic; + topic = dds_global.builtin_writer_topic; break; case EK_READER: case EK_PROXY_READER: - topic = builtin_reader_topic; + topic = dds_global.builtin_reader_topic; break; } assert (topic != NULL); @@ -225,9 +206,10 @@ struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn return serdata; } -void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive) +static void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive, void *vdomain) { - if (dds__builtin_is_visible (&e->guid, get_entity_vendorid (e))) + struct dds_domain *dom = vdomain; + if (dds__builtin_is_visible (&e->guid, get_entity_vendorid (e), dom)) { /* initialize to avoid gcc warning ultimately caused by C's horrible type system */ struct local_orphan_writer *bwr = NULL; @@ -237,17 +219,59 @@ void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, b { case EK_PARTICIPANT: case EK_PROXY_PARTICIPANT: - bwr = builtintopic_writer_participant; + bwr = dom->builtintopic_writer_participant; break; case EK_WRITER: case EK_PROXY_WRITER: - bwr = builtintopic_writer_publications; + bwr = dom->builtintopic_writer_publications; break; case EK_READER: case EK_PROXY_READER: - bwr = builtintopic_writer_subscriptions; + bwr = dom->builtintopic_writer_subscriptions; break; } dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata); } } + +void dds__builtin_init_global (void) +{ + dds_global.builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant"); + dds_global.builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription"); + dds_global.builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication"); +} + +void dds__builtin_fini_global (void) +{ + ddsi_sertopic_unref (dds_global.builtin_participant_topic); + ddsi_sertopic_unref (dds_global.builtin_reader_topic); + ddsi_sertopic_unref (dds_global.builtin_writer_topic); +} + +void dds__builtin_init (struct dds_domain *dom) +{ + dds_qos_t *qos = dds__create_builtin_qos (); + + dom->btif.arg = dom; + dom->btif.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry; + dom->btif.builtintopic_is_builtintopic = dds__builtin_is_builtintopic; + dom->btif.builtintopic_is_visible = dds__builtin_is_visible; + dom->btif.builtintopic_write = dds__builtin_write; + gv.builtin_topic_interface = &dom->btif; + + dom->builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dds_global.builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT)); + dom->builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dds_global.builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER)); + dom->builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dds_global.builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER)); + + dds_delete_qos (qos); +} + +void dds__builtin_fini (struct dds_domain *dom) +{ + /* No more sources for builtin topic samples */ + thread_state_awake (lookup_thread_state ()); + delete_local_orphan_writer (dom->builtintopic_writer_participant); + delete_local_orphan_writer (dom->builtintopic_writer_publications); + delete_local_orphan_writer (dom->builtintopic_writer_subscriptions); + thread_state_asleep (lookup_thread_state ()); +} diff --git a/src/core/ddsc/src/dds_domain.c b/src/core/ddsc/src/dds_domain.c index d088217..2cc7f4e 100644 --- a/src/core/ddsc/src/dds_domain.c +++ b/src/core/ddsc/src/dds_domain.c @@ -9,48 +9,253 @@ * * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause */ +#include + +#include "dds/ddsrt/environ.h" +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/heap.h" +#include "dds__init.h" +#include "dds__rhc.h" #include "dds__domain.h" +#include "dds__builtin.h" +#include "dds__whc_builtintopic.h" +#include "dds/ddsi/ddsi_iid.h" #include "dds/ddsi/ddsi_tkmap.h" +#include "dds/ddsi/ddsi_serdata.h" +#include "dds/ddsi/ddsi_threadmon.h" +#include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_config.h" +#include "dds/ddsi/q_gc.h" +#include "dds/ddsi/q_globals.h" +#include "dds/version.h" static int dds_domain_compare (const void *va, const void *vb) { - const int32_t *a = va; - const int32_t *b = vb; + const dds_domainid_t *a = va; + const dds_domainid_t *b = vb; return (*a == *b) ? 0 : (*a < *b) ? -1 : 1; } -const ddsrt_avl_treedef_t dds_domaintree_def = DDSRT_AVL_TREEDEF_INITIALIZER ( +static const ddsrt_avl_treedef_t dds_domaintree_def = DDSRT_AVL_TREEDEF_INITIALIZER ( offsetof (dds_domain, m_node), offsetof (dds_domain, m_id), dds_domain_compare, 0); +static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_id) +{ + dds_return_t ret = DDS_RETCODE_OK; + char * uri = NULL; + uint32_t len; + + domain->m_id = domain_id; + domain->m_refc = 1; + ddsrt_avl_init (&dds_topictree_def, &domain->m_topics); + + gv.tstart = now (); + + (void) ddsrt_getenv ("CYCLONEDDS_URI", &uri); + domain->cfgst = config_init (uri); + if (domain->cfgst == NULL) + { + DDS_LOG (DDS_LC_CONFIG, "Failed to parse configuration XML file %s\n", uri); + ret = DDS_RETCODE_ERROR; + goto fail_config; + } + + /* if a domain id was explicitly given, check & fix up the configuration */ + if (domain_id != DDS_DOMAIN_DEFAULT) + { + if (domain_id < 0 || domain_id > 230) + { + DDS_ERROR ("requested domain id %"PRId32" is out of range\n", domain_id); + ret = DDS_RETCODE_ERROR; + goto fail_config_domainid; + } + else if (config.domainId.isdefault) + { + config.domainId.value = domain_id; + } + else if (domain_id != config.domainId.value) + { + DDS_ERROR ("requested domain id %"PRId32" is inconsistent with configured value %"PRId32"\n", domain_id, config.domainId.value); + ret = DDS_RETCODE_ERROR; + goto fail_config_domainid; + } + } + + /* FIXME: The config.domainId can change internally in DDSI. So, remember what the + * main configured domain id is. */ + domain->m_id = config.domainId.value; + + if (rtps_config_prep (domain->cfgst) != 0) + { + DDS_LOG (DDS_LC_CONFIG, "Failed to configure RTPS\n"); + ret = DDS_RETCODE_ERROR; + goto fail_rtps_config; + } + + /* Start monitoring the liveliness of all threads. */ + if (!config.liveliness_monitoring) + gv.threadmon = NULL; + else + { + gv.threadmon = ddsi_threadmon_new (); + if (gv.threadmon == NULL) + { + DDS_ERROR ("Failed to create a thread monitor\n"); + ret = DDS_RETCODE_OUT_OF_RESOURCES; + goto fail_threadmon_new; + } + } + + if (rtps_init () < 0) + { + DDS_LOG (DDS_LC_CONFIG, "Failed to initialize RTPS\n"); + ret = DDS_RETCODE_ERROR; + goto fail_rtps_init; + } + + dds__builtin_init (domain); + + if (rtps_start () < 0) + { + DDS_LOG (DDS_LC_CONFIG, "Failed to start RTPS\n"); + ret = DDS_RETCODE_ERROR; + goto fail_rtps_start; + } + + if (gv.threadmon && ddsi_threadmon_start (gv.threadmon) < 0) + { + DDS_ERROR ("Failed to start the servicelease\n"); + ret = DDS_RETCODE_ERROR; + goto fail_threadmon_start; + } + + /* Set additional default participant properties */ + + char progname[50] = "UNKNOWN"; /* FIXME: once retrieving process names is back in */ + char hostname[64]; + gv.default_plist_pp.process_id = (unsigned) ddsrt_getpid(); + gv.default_plist_pp.present |= PP_PRISMTECH_PROCESS_ID; + gv.default_plist_pp.exec_name = dds_string_alloc(32); + (void) snprintf (gv.default_plist_pp.exec_name, 32, "CycloneDDS: %u", gv.default_plist_pp.process_id); + len = (uint32_t) (13 + strlen (gv.default_plist_pp.exec_name)); + gv.default_plist_pp.present |= PP_PRISMTECH_EXEC_NAME; + if (ddsrt_gethostname (hostname, sizeof (hostname)) == DDS_RETCODE_OK) + { + gv.default_plist_pp.node_name = dds_string_dup (hostname); + gv.default_plist_pp.present |= PP_PRISMTECH_NODE_NAME; + } + gv.default_plist_pp.entity_name = dds_alloc (len); + (void) snprintf (gv.default_plist_pp.entity_name, len, "%s<%u>", progname, gv.default_plist_pp.process_id); + gv.default_plist_pp.present |= PP_ENTITY_NAME; + + return DDS_RETCODE_OK; + +fail_threadmon_start: + if (gv.threadmon) + ddsi_threadmon_stop (gv.threadmon); + rtps_stop (); +fail_rtps_start: + rtps_fini (); +fail_rtps_init: + if (gv.threadmon) + { + ddsi_threadmon_free (gv.threadmon); + gv.threadmon = NULL; + } +fail_threadmon_new: + downgrade_main_thread (); + thread_states_fini(); +fail_rtps_config: +fail_config_domainid: + config_fini (domain->cfgst); +fail_config: + return ret; +} + +static void dds_domain_fini (struct dds_domain *domain) +{ + if (gv.threadmon) + ddsi_threadmon_stop (gv.threadmon); + rtps_stop (); + dds__builtin_fini (domain); + rtps_fini (); + if (gv.threadmon) + ddsi_threadmon_free (gv.threadmon); + gv.threadmon = NULL; + config_fini (domain->cfgst); +} + dds_domain *dds_domain_find_locked (dds_domainid_t id) { return ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &id); } -dds_domain *dds_domain_create (dds_domainid_t id) +dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id) { - dds_domain *domain; + struct dds_domain *dom = NULL; + dds_return_t ret; + + if (id != DDS_DOMAIN_DEFAULT && (id < 0 || id > 230)) + return DDS_RETCODE_BAD_PARAMETER; + ddsrt_mutex_lock (&dds_global.m_mutex); - domain = dds_domain_find_locked (id); - if (domain == NULL) + + /* FIXME: hack around default domain ids, not yet being able to handle multiple domains simultaneously */ + if (id != DDS_DOMAIN_DEFAULT) { - domain = dds_alloc (sizeof (*domain)); - domain->m_id = id; - ddsrt_avl_init (&dds_topictree_def, &domain->m_topics); - ddsrt_avl_insert (&dds_domaintree_def, &dds_global.m_domains, domain); + if ((dom = dds_domain_find_locked (id)) == NULL) + ret = DDS_RETCODE_NOT_FOUND; + else if (dom->m_id == id) + ret = DDS_RETCODE_OK; + else + ret = DDS_RETCODE_PRECONDITION_NOT_MET; + } + else + { + if ((dom = ddsrt_avl_find_min (&dds_domaintree_def, &dds_global.m_domains)) != NULL) + ret = DDS_RETCODE_OK; + else + ret = DDS_RETCODE_NOT_FOUND; + } + + switch (ret) + { + case DDS_RETCODE_OK: + dom->m_refc++; + *domain_out = dom; + break; + case DDS_RETCODE_NOT_FOUND: + dom = dds_alloc (sizeof (*dom)); + if ((ret = dds_domain_init (dom, id)) < 0) + dds_free (dom); + else + { + ddsrt_avl_insert (&dds_domaintree_def, &dds_global.m_domains, dom); + *domain_out = dom; + } + break; + case DDS_RETCODE_PRECONDITION_NOT_MET: + DDS_ERROR("Inconsistent domain configuration detected: domain on configuration: %"PRId32", domain %"PRId32"\n", dom->m_id, id); + break; } - domain->m_refc++; ddsrt_mutex_unlock (&dds_global.m_mutex); - return domain; + return ret; } void dds_domain_free (dds_domain *domain) { ddsrt_mutex_lock (&dds_global.m_mutex); - if (--domain->m_refc == 0) + if (--domain->m_refc != 0) { + //fprintf(stderr, "domain_free %d %p refcount > 0\n", (int)domain->m_id, domain); + ddsrt_mutex_unlock (&dds_global.m_mutex); + } + else + { + //fprintf(stderr, "domain_free %d %p\n", (int)domain->m_id, domain); ddsrt_avl_delete (&dds_domaintree_def, &dds_global.m_domains, domain); + ddsrt_mutex_unlock (&dds_global.m_mutex); + dds_domain_fini (domain); dds_free (domain); } - ddsrt_mutex_unlock (&dds_global.m_mutex); } diff --git a/src/core/ddsc/src/dds_handles.c b/src/core/ddsc/src/dds_handles.c index def0975..7b5afdc 100644 --- a/src/core/ddsc/src/dds_handles.c +++ b/src/core/ddsc/src/dds_handles.c @@ -76,13 +76,12 @@ static int handle_equal (const void *va, const void *vb) return a->hdl == b->hdl; } -dds_return_t dds_handle_server_init (void (*free_via_gc) (void *x)) +dds_return_t dds_handle_server_init (void) { #if USE_CHH handles.ht = ddsrt_chh_new (128, handle_hash, handle_equal, free_via_gc); #else handles.ht = ddsrt_hh_new (128, handle_hash, handle_equal); - (void) free_via_gc; #endif handles.count = 0; ddsrt_mutex_init (&handles.lock); diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 1492c74..d1efc7f 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -37,249 +37,60 @@ struct q_globals gv; -dds_globals dds_global = { .m_default_domain = DDS_DOMAIN_DEFAULT }; -static struct cfgst * dds_cfgst = NULL; +dds_globals dds_global; -static void free_via_gc_cb (struct gcreq *gcreq) +dds_return_t dds_init (void) { - void *bs = gcreq->arg; - gcreq_free (gcreq); - ddsrt_free (bs); -} + dds_return_t ret; -static void free_via_gc (void *bs) -{ - struct gcreq *gcreq = gcreq_new (gv.gcreq_queue, free_via_gc_cb); - gcreq->arg = bs; - gcreq_enqueue (gcreq); -} - -dds_return_t -dds_init(dds_domainid_t domain) -{ - dds_return_t ret = DDS_RETCODE_OK; - char * uri = NULL; - char progname[50] = "UNKNOWN"; /* FIXME: once retrieving process names is back in */ - char hostname[64]; - uint32_t len; - ddsrt_mutex_t *init_mutex; - - /* Be sure the DDS lifecycle resources are initialized. */ - ddsrt_init(); - init_mutex = ddsrt_get_singleton_mutex(); - - ddsrt_mutex_lock(init_mutex); - - dds_global.m_init_count++; - if (dds_global.m_init_count > 1) + ddsrt_init (); + ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex (); + ddsrt_mutex_lock (init_mutex); + if (dds_global.m_init_count++ != 0) { - goto skip; + ddsrt_mutex_unlock (init_mutex); + return DDS_RETCODE_OK; } - gv.tstart = now (); ddsrt_mutex_init (&dds_global.m_mutex); - thread_states_init_static(); + ddsi_iid_init (); + thread_states_init_static (); + thread_states_init (64); + upgrade_main_thread (); + dds__builtin_init_global (); - (void)ddsrt_getenv (DDS_PROJECT_NAME_NOSPACE_CAPS"_URI", &uri); - dds_cfgst = config_init (uri); - if (dds_cfgst == NULL) - { - DDS_LOG(DDS_LC_CONFIG, "Failed to parse configuration XML file %s\n", uri); - ret = DDS_RETCODE_ERROR; - goto fail_config; - } - - /* if a domain id was explicitly given, check & fix up the configuration */ - if (domain != DDS_DOMAIN_DEFAULT) - { - if (domain < 0 || domain > 230) - { - DDS_ERROR("requested domain id %"PRId32" is out of range\n", domain); - ret = DDS_RETCODE_ERROR; - goto fail_config_domainid; - } - else if (config.domainId.isdefault) - { - config.domainId.value = domain; - } - else if (domain != config.domainId.value) - { - DDS_ERROR("requested domain id %"PRId32" is inconsistent with configured value %"PRId32"\n", domain, config.domainId.value); - ret = DDS_RETCODE_ERROR; - goto fail_config_domainid; - } - } - - /* The config.domainId can change internally in DDSI. So, remember what the - * main configured domain id is. */ - dds_global.m_default_domain = config.domainId.value; - - if (rtps_config_prep(dds_cfgst) != 0) - { - DDS_LOG(DDS_LC_CONFIG, "Failed to configure RTPS\n"); - ret = DDS_RETCODE_ERROR; - goto fail_rtps_config; - } - - upgrade_main_thread(); - ddsrt_avl_init(&dds_domaintree_def, &dds_global.m_domains); - - /* Start monitoring the liveliness of all threads. */ - if (!config.liveliness_monitoring) - gv.threadmon = NULL; - else - { - gv.threadmon = ddsi_threadmon_new (); - if (gv.threadmon == NULL) - { - DDS_ERROR("Failed to create a thread monitor\n"); - ret = DDS_RETCODE_OUT_OF_RESOURCES; - goto fail_threadmon_new; - } - } - - if (rtps_init () < 0) - { - DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n"); - ret = DDS_RETCODE_ERROR; - goto fail_rtps_init; - } - - if (dds_handle_server_init (free_via_gc) != DDS_RETCODE_OK) + if (dds_handle_server_init () != DDS_RETCODE_OK) { DDS_ERROR("Failed to initialize internal handle server\n"); ret = DDS_RETCODE_ERROR; goto fail_handleserver; } - dds__builtin_init (); - - if (rtps_start () < 0) - { - DDS_LOG(DDS_LC_CONFIG, "Failed to start RTPS\n"); - ret = DDS_RETCODE_ERROR; - goto fail_rtps_start; - } - - if (gv.threadmon && ddsi_threadmon_start(gv.threadmon) < 0) - { - DDS_ERROR("Failed to start the servicelease\n"); - ret = DDS_RETCODE_ERROR; - goto fail_threadmon_start; - } - - /* Set additional default participant properties */ - - gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid(); - gv.default_plist_pp.present |= PP_PRISMTECH_PROCESS_ID; - gv.default_plist_pp.exec_name = dds_string_alloc(32); - (void) snprintf(gv.default_plist_pp.exec_name, 32, "%s: %u", DDS_PROJECT_NAME, gv.default_plist_pp.process_id); - len = (uint32_t) (13 + strlen(gv.default_plist_pp.exec_name)); - gv.default_plist_pp.present |= PP_PRISMTECH_EXEC_NAME; - if (ddsrt_gethostname(hostname, sizeof(hostname)) == DDS_RETCODE_OK) - { - gv.default_plist_pp.node_name = dds_string_dup(hostname); - gv.default_plist_pp.present |= PP_PRISMTECH_NODE_NAME; - } - gv.default_plist_pp.entity_name = dds_alloc(len); - (void) snprintf(gv.default_plist_pp.entity_name, len, "%s<%u>", progname, - gv.default_plist_pp.process_id); - gv.default_plist_pp.present |= PP_ENTITY_NAME; - -skip: - ddsrt_mutex_unlock(init_mutex); + ddsrt_mutex_unlock (init_mutex); return DDS_RETCODE_OK; -fail_threadmon_start: - if (gv.threadmon) - ddsi_threadmon_stop (gv.threadmon); - dds_handle_server_fini(); fail_handleserver: - rtps_stop (); -fail_rtps_start: - dds__builtin_fini (); - rtps_fini (); -fail_rtps_init: - if (gv.threadmon) - { - ddsi_threadmon_free (gv.threadmon); - gv.threadmon = NULL; - } -fail_threadmon_new: - downgrade_main_thread (); - thread_states_fini(); -fail_rtps_config: -fail_config_domainid: - dds_global.m_default_domain = DDS_DOMAIN_DEFAULT; - config_fini (dds_cfgst); - dds_cfgst = NULL; -fail_config: ddsrt_mutex_destroy (&dds_global.m_mutex); dds_global.m_init_count--; - ddsrt_mutex_unlock(init_mutex); - ddsrt_fini(); + ddsrt_mutex_unlock (init_mutex); + ddsrt_fini (); return ret; } extern void dds_fini (void) { - ddsrt_mutex_t *init_mutex; - init_mutex = ddsrt_get_singleton_mutex(); - ddsrt_mutex_lock(init_mutex); - assert(dds_global.m_init_count > 0); - dds_global.m_init_count--; - if (dds_global.m_init_count == 0) + ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex (); + ddsrt_mutex_lock (init_mutex); + assert (dds_global.m_init_count > 0); + if (--dds_global.m_init_count == 0) { - if (gv.threadmon) - ddsi_threadmon_stop (gv.threadmon); - dds_handle_server_fini(); - rtps_stop (); - dds__builtin_fini (); - rtps_fini (); - if (gv.threadmon) - ddsi_threadmon_free (gv.threadmon); - gv.threadmon = NULL; + dds_handle_server_fini (); + dds__builtin_fini_global (); downgrade_main_thread (); thread_states_fini (); - - config_fini (dds_cfgst); - dds_cfgst = NULL; + ddsi_iid_fini (); ddsrt_mutex_destroy (&dds_global.m_mutex); - dds_global.m_default_domain = DDS_DOMAIN_DEFAULT; } - ddsrt_mutex_unlock(init_mutex); - ddsrt_fini(); -} - -void ddsi_plugin_init (void) -{ - ddsi_plugin.builtintopic_is_builtintopic = dds__builtin_is_builtintopic; - ddsi_plugin.builtintopic_is_visible = dds__builtin_is_visible; - ddsi_plugin.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry; - ddsi_plugin.builtintopic_write = dds__builtin_write; -} - -//provides explicit default domain id. -dds_domainid_t dds_domain_default (void) -{ - return dds_global.m_default_domain; -} - -dds_return_t -dds__check_domain( - dds_domainid_t domain) -{ - dds_return_t ret = DDS_RETCODE_OK; - /* If domain is default: use configured id. */ - if (domain != DDS_DOMAIN_DEFAULT) - { - /* Specific domain has to be the same as the configured domain. */ - if (domain != dds_global.m_default_domain) - { - DDS_ERROR("Inconsistent domain configuration detected: domain on " - "configuration: %"PRId32", domain %"PRId32"\n", dds_global.m_default_domain, domain); - ret = DDS_RETCODE_ERROR; - } - } - return ret; + ddsrt_mutex_unlock (init_mutex); + ddsrt_fini (); } diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 31108d7..bcbf5fe 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -27,10 +27,6 @@ DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_participant) #define DDS_PARTICIPANT_STATUS_MASK (0u) - -/* List of created participants */ -static dds_entity *dds_pp_head = NULL; - static dds_return_t dds_participant_status_validate (uint32_t mask) { return (mask & ~DDS_PARTICIPANT_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK; @@ -40,13 +36,17 @@ static dds_return_t dds_participant_delete (dds_entity *e) ddsrt_nonnull_all; static dds_return_t dds_participant_delete (dds_entity *e) { + struct dds_domain *dom; + dds_return_t ret; + dds_entity *prev, *iter; assert (dds_entity_kind (e) == DDS_KIND_PARTICIPANT); thread_state_awake (lookup_thread_state ()); - dds_domain_free (e->m_domain); + if ((ret = delete_participant (&e->m_guid)) < 0) + DDS_ERROR ("dds_participant_delete: internal error %"PRId32"\n", ret); ddsrt_mutex_lock (&dds_global.m_mutex); - dds_entity *prev, *iter; - for (iter = dds_pp_head, prev = NULL; iter; prev = iter, iter = iter->m_next) + dom = e->m_domain; + for (iter = dom->ppants, prev = NULL; iter; prev = iter, iter = iter->m_next) { if (iter == e) break; @@ -55,11 +55,12 @@ static dds_return_t dds_participant_delete (dds_entity *e) if (prev) prev->m_next = iter->m_next; else - dds_pp_head = iter->m_next; + dom->ppants = iter->m_next; ddsrt_mutex_unlock (&dds_global.m_mutex); thread_state_asleep (lookup_thread_state ()); /* Every dds_init needs a dds_fini. */ + dds_domain_free (e->m_domain); dds_fini (); return DDS_RETCODE_OK; } @@ -93,6 +94,7 @@ const struct dds_entity_deriver dds_entity_deriver_participant = { dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener) { + dds_domain *dom; dds_entity_t ret; nn_guid_t guid; dds_participant * pp; @@ -100,12 +102,11 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_ dds_qos_t *new_qos = NULL; /* Make sure DDS instance is initialized. */ - if ((ret = dds_init (domain)) != DDS_RETCODE_OK) + if ((ret = dds_init ()) < 0) goto err_dds_init; - /* Check domain id */ - if ((ret = dds__check_domain (domain)) != DDS_RETCODE_OK) - goto err_domain_check; + if ((ret = dds_domain_create (&dom, domain)) < 0) + goto err_domain_create; new_qos = dds_create_qos (); if (qos != NULL) @@ -134,13 +135,13 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_ pp->m_entity.m_guid = guid; pp->m_entity.m_iid = get_entity_instance_id (&guid); - pp->m_entity.m_domain = dds_domain_create (dds_domain_default ()); + pp->m_entity.m_domain = dom; pp->m_builtin_subscriber = 0; /* Add participant to extent */ ddsrt_mutex_lock (&dds_global.m_mutex); - pp->m_entity.m_next = dds_pp_head; - dds_pp_head = &pp->m_entity; + pp->m_entity.m_next = pp->m_entity.m_domain->ppants; + pp->m_entity.m_domain->ppants = &pp->m_entity; ddsrt_mutex_unlock (&dds_global.m_mutex); return ret; @@ -149,7 +150,8 @@ err_entity_init: err_new_participant: err_qos_validation: dds_delete_qos (new_qos); -err_domain_check: + dds_domain_free (dom); +err_domain_create: dds_fini (); err_dds_init: return ret; @@ -157,16 +159,11 @@ err_dds_init: dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *participants, size_t size) { - ddsrt_mutex_t *init_mutex; + if ((participants != NULL && (size <= 0 || size >= INT32_MAX)) || (participants == NULL && size != 0)) + return DDS_RETCODE_BAD_PARAMETER; ddsrt_init (); - init_mutex = ddsrt_get_singleton_mutex (); - - if ((participants != NULL && (size <= 0 || size >= INT32_MAX)) || (participants == NULL && size != 0)) - { - ddsrt_fini (); - return DDS_RETCODE_BAD_PARAMETER; - } + ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex (); if (participants) participants[0] = 0; @@ -175,10 +172,11 @@ dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *par ddsrt_mutex_lock (init_mutex); if (dds_global.m_init_count > 0) { + struct dds_domain *dom; ddsrt_mutex_lock (&dds_global.m_mutex); - for (dds_entity *iter = dds_pp_head; iter; iter = iter->m_next) + if ((dom = dds_domain_find_locked (domain_id)) != NULL) { - if (iter->m_domain->m_id == domain_id) + for (dds_entity *iter = dom->ppants; iter; iter = iter->m_next) { if ((size_t) ret < size) participants[ret] = iter->m_hdllink.hdl; diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index 067ca1c..64e83a6 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -16,6 +16,7 @@ #include "dds__participant.h" #include "dds__publisher.h" #include "dds__qos.h" +#include "dds/ddsi/ddsi_iid.h" #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_globals.h" #include "dds/version.h" diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 8c3aef9..b2537ee 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -370,7 +370,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti /* Additional checks required for built-in topics: we don't want to run into a resource limit on a built-in topic, it is a needless complication */ - if (internal_topic && !dds__validate_builtin_reader_qos (topic, rqos)) + if (internal_topic && !dds__validate_builtin_reader_qos (tp->m_entity.m_domain, topic, rqos)) { dds_delete_qos (rqos); reader = DDS_RETCODE_INCONSISTENT_POLICY; diff --git a/src/core/ddsc/src/dds_subscriber.c b/src/core/ddsc/src/dds_subscriber.c index 59bc7d1..ea4f972 100644 --- a/src/core/ddsc/src/dds_subscriber.c +++ b/src/core/ddsc/src/dds_subscriber.c @@ -14,6 +14,7 @@ #include "dds__participant.h" #include "dds__subscriber.h" #include "dds__qos.h" +#include "dds/ddsi/ddsi_iid.h" #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_globals.h" #include "dds/ddsrt/heap.h" diff --git a/src/core/ddsc/src/dds_whc_builtintopic.c b/src/core/ddsc/src/dds_whc_builtintopic.c index 76926ed..342c69c 100644 --- a/src/core/ddsc/src/dds_whc_builtintopic.c +++ b/src/core/ddsc/src/dds_whc_builtintopic.c @@ -19,6 +19,7 @@ #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_ephash.h" #include "dds/ddsi/q_entity.h" +#include "dds/ddsi/q_globals.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds__serdata_builtintopic.h" #include "dds__whc_builtintopic.h" @@ -62,7 +63,7 @@ static void bwhc_sample_iter_init (const struct whc *whc_generic, struct whc_sam static bool is_visible (const struct entity_common *e) { const nn_vendorid_t vendorid = get_entity_vendorid (e); - return ddsi_plugin.builtintopic_is_visible (&e->guid, vendorid); + return builtintopic_is_visible (gv.builtin_topic_interface, &e->guid, vendorid); } static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample) diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index a996d34..c245722 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -77,6 +77,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi" ddsi_tkmap.h ddsi_vendor.h ddsi_threadmon.h + ddsi_builtin_topic_if.h q_addrset.h q_bitset.h q_bswap.h diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_builtin_topic_if.h b/src/core/ddsi/include/dds/ddsi/ddsi_builtin_topic_if.h new file mode 100644 index 0000000..0032160 --- /dev/null +++ b/src/core/ddsi/include/dds/ddsi/ddsi_builtin_topic_if.h @@ -0,0 +1,53 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef _DDSI_BUILTIN_TOPIC_IF_H_ +#define _DDSI_BUILTIN_TOPIC_IF_H_ + +#include "dds/ddsi/ddsi_vendor.h" +#include "dds/ddsi/q_time.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +struct entity_common; +struct ddsi_tkmap_instance; +struct ddsi_sertopic; +struct nn_guid; + +struct ddsi_builtin_topic_interface { + void *arg; + + bool (*builtintopic_is_builtintopic) (const struct ddsi_sertopic *topic, void *arg); + bool (*builtintopic_is_visible) (const struct nn_guid *guid, nn_vendorid_t vendorid, void *arg); + struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid, void *arg); + void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive, void *arg); +}; + +inline bool builtintopic_is_visible (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid, nn_vendorid_t vendorid) { + return btif ? btif->builtintopic_is_visible (guid, vendorid, btif->arg) : false; +} +inline bool builtintopic_is_builtintopic (const struct ddsi_builtin_topic_interface *btif, const struct ddsi_sertopic *topic) { + return btif ? btif->builtintopic_is_builtintopic (topic, btif->arg) : false; +} +inline struct ddsi_tkmap_instance *builtintopic_get_tkmap_entry (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid) { + return btif ? btif->builtintopic_get_tkmap_entry (guid, btif->arg) : NULL; +} +inline void builtintopic_write (const struct ddsi_builtin_topic_interface *btif, const struct entity_common *e, nn_wctime_t timestamp, bool alive) { + if (btif) btif->builtintopic_write (e, timestamp, alive, btif->arg); +} + +#if defined (__cplusplus) +} +#endif + +#endif diff --git a/src/core/ddsi/include/dds/ddsi/q_config.h b/src/core/ddsi/include/dds/ddsi/q_config.h index af3ade8..7a96a76 100644 --- a/src/core/ddsi/include/dds/ddsi/q_config.h +++ b/src/core/ddsi/include/dds/ddsi/q_config.h @@ -392,18 +392,7 @@ struct config struct prune_deleted_ppant prune_deleted_ppant; }; -struct ddsi_sertopic; -struct entity_common; -struct ddsi_plugin -{ - bool (*builtintopic_is_builtintopic) (const struct ddsi_sertopic *topic); - bool (*builtintopic_is_visible) (const nn_guid_t *guid, nn_vendorid_t vendorid); - struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid); - void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive); -}; - extern struct config DDS_EXPORT config; -extern struct ddsi_plugin ddsi_plugin; struct cfgst; diff --git a/src/core/ddsi/include/dds/ddsi/q_globals.h b/src/core/ddsi/include/dds/ddsi/q_globals.h index 55efe7c..51bd170 100644 --- a/src/core/ddsi/include/dds/ddsi/q_globals.h +++ b/src/core/ddsi/include/dds/ddsi/q_globals.h @@ -24,7 +24,6 @@ #include "dds/ddsi/q_protocol.h" #include "dds/ddsi/q_nwif.h" #include "dds/ddsi/q_sockwaitset.h" -#include "dds/ddsi/ddsi_iid.h" #ifdef DDSI_INCLUDE_ENCRYPTION #include "dds/ddsi/q_security.h" /* for q_securityDecoderSet */ @@ -91,7 +90,6 @@ struct q_globals { volatile int mute; struct ddsi_tkmap * m_tkmap; - struct ddsi_iid dds_iid; /* Hash tables for participants, readers, writers, proxy participants, proxy readers and proxy writers by GUID @@ -296,6 +294,8 @@ struct q_globals { FILE *pcap_fp; ddsrt_mutex_t pcap_lock; + struct ddsi_builtin_topic_interface *builtin_topic_interface; + struct nn_group_membership *mship; }; diff --git a/src/core/ddsi/include/dds/ddsi/q_rtps.h b/src/core/ddsi/include/dds/ddsi/q_rtps.h index 57e3d0a..1c219e8 100644 --- a/src/core/ddsi/include/dds/ddsi/q_rtps.h +++ b/src/core/ddsi/include/dds/ddsi/q_rtps.h @@ -78,7 +78,6 @@ int rtps_config_prep (struct cfgst *cfgst); int rtps_config_open (void); int rtps_init (void); int rtps_start (void); -void ddsi_plugin_init (void); void rtps_stop (void); void rtps_fini (void); diff --git a/src/core/ddsi/src/ddsi_iid.c b/src/core/ddsi/src/ddsi_iid.c index 5625973..3b9f023 100644 --- a/src/core/ddsi/src/ddsi_iid.c +++ b/src/core/ddsi/src/ddsi_iid.c @@ -11,10 +11,11 @@ */ #include "dds/ddsrt/atomics.h" #include "dds/ddsrt/process.h" +#include "dds/ddsrt/random.h" #include "dds/ddsrt/sync.h" #include "dds/ddsi/ddsi_iid.h" -#include "dds/ddsi/q_time.h" -#include "dds/ddsi/q_globals.h" + +static struct ddsi_iid dds_iid; static void dds_tea_encrypt (uint32_t v[2], const uint32_t k[4]) { @@ -49,14 +50,14 @@ uint64_t ddsi_iid_gen (void) union { uint64_t u64; uint32_t u32[2]; } tmp; #if DDSRT_ATOMIC64_SUPPORT - tmp.u64 = ddsrt_atomic_inc64_nv (&gv.dds_iid.counter); + tmp.u64 = ddsrt_atomic_inc64_nv (&dds_iid.counter); #else - ddsrt_mutex_lock (&gv.dds_iid.lock); - tmp.u64 = ++gv.dds_iid.counter; - ddsrt_mutex_unlock (&gv.dds_iid.lock); + ddsrt_mutex_lock (&dds_iid.lock); + tmp.u64 = ++dds_iid.counter; + ddsrt_mutex_unlock (&dds_iid.lock); #endif - dds_tea_encrypt (tmp.u32, gv.dds_iid.key); + dds_tea_encrypt (tmp.u32, dds_iid.key); iid = tmp.u64; return iid; } @@ -64,29 +65,26 @@ uint64_t ddsi_iid_gen (void) void ddsi_iid_init (void) { union { uint64_t u64; uint32_t u32[2]; } tmp; - nn_wctime_t tnow = now (); #if ! DDSRT_ATOMIC64_SUPPORT - ddsrt_mutex_init (&gv.dds_iid.lock); + ddsrt_mutex_init (&dds_iid.lock); #endif - gv.dds_iid.key[0] = (uint32_t) ddsrt_getpid(); - gv.dds_iid.key[1] = (uint32_t) tnow.v; - gv.dds_iid.key[2] = (uint32_t) (tnow.v >> 32); - gv.dds_iid.key[3] = 0xdeadbeef; + for (size_t i = 0; i < sizeof (dds_iid.key) / sizeof (dds_iid.key[0]); i++) + dds_iid.key[0] = ddsrt_random (); tmp.u64 = 0; - dds_tea_decrypt (tmp.u32, gv.dds_iid.key); + dds_tea_decrypt (tmp.u32, dds_iid.key); #if DDSRT_ATOMIC64_SUPPORT - ddsrt_atomic_st64 (&gv.dds_iid.counter, tmp.u64); + ddsrt_atomic_st64 (&dds_iid.counter, tmp.u64); #else - gv.dds_iid.counter = tmp.u64; + dds_iid.counter = tmp.u64; #endif } void ddsi_iid_fini (void) { #if ! DDSRT_ATOMIC64_SUPPORT - ddsrt_mutex_destroy (&gv.dds_iid.lock); + ddsrt_mutex_destroy (&dds_iid.lock); #endif } diff --git a/src/core/ddsi/src/q_config.c b/src/core/ddsi/src/q_config.c index de83025..89df843 100644 --- a/src/core/ddsi/src/q_config.c +++ b/src/core/ddsi/src/q_config.c @@ -844,7 +844,6 @@ static const struct cfgelem root_cfgelem = { #undef CO struct config config; -struct ddsi_plugin ddsi_plugin; static const struct unit unittab_duration[] = { { "ns", 1 }, diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index c94cd89..c5fe908 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -104,6 +104,11 @@ static int gcreq_proxy_participant (struct proxy_participant *proxypp); static int gcreq_proxy_writer (struct proxy_writer *pwr); static int gcreq_proxy_reader (struct proxy_reader *prd); +extern inline bool builtintopic_is_visible (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid, nn_vendorid_t vendorid); +extern inline bool builtintopic_is_builtintopic (const struct ddsi_builtin_topic_interface *btif, const struct ddsi_sertopic *topic); +extern inline struct ddsi_tkmap_instance *builtintopic_get_tkmap_entry (const struct ddsi_builtin_topic_interface *btif, const struct nn_guid *guid); +extern inline void builtintopic_write (const struct ddsi_builtin_topic_interface *btif, const struct entity_common *e, nn_wctime_t timestamp, bool alive); + static int compare_guid (const void *va, const void *vb) { return memcmp (va, vb, sizeof (nn_guid_t)); @@ -174,9 +179,9 @@ static void entity_common_init (struct entity_common *e, const struct nn_guid *g e->name = ddsrt_strdup (name ? name : ""); e->onlylocal = onlylocal; ddsrt_mutex_init (&e->lock); - if (ddsi_plugin.builtintopic_is_visible (guid, vendorid)) + if (builtintopic_is_visible (gv.builtin_topic_interface, guid, vendorid)) { - e->tk = ddsi_plugin.builtintopic_get_tkmap_entry (guid); + e->tk = builtintopic_get_tkmap_entry (gv.builtin_topic_interface, guid); e->iid = e->tk->m_iid; } else @@ -407,7 +412,7 @@ static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, cons nn_xqos_fini_mask (ent_qos, mask); nn_xqos_mergein_missing (ent_qos, xqos, mask); - ddsi_plugin.builtintopic_write (e, timestamp, true); + builtintopic_write (gv.builtin_topic_interface, e, timestamp, true); return true; } @@ -662,7 +667,7 @@ dds_return_t new_participant_guid (const nn_guid_t *ppguid, unsigned flags, cons trigger_recv_threads (); } - ddsi_plugin.builtintopic_write (&pp->e, now(), true); + builtintopic_write (gv.builtin_topic_interface, &pp->e, now(), true); /* SPDP periodic broadcast uses the retransmit path, so the initial publication must be done differently. Must be later than making @@ -906,7 +911,7 @@ dds_return_t delete_participant (const struct nn_guid *ppguid) struct participant *pp; if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL) return DDS_RETCODE_BAD_PARAMETER; - ddsi_plugin.builtintopic_write (&pp->e, now(), false); + builtintopic_write (gv.builtin_topic_interface, &pp->e, now(), false); remember_deleted_participant_guid (gv.deleted_participants, &pp->e.guid); ephash_remove_participant_guid (pp); gcreq_participant (pp); @@ -2857,7 +2862,7 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct nn_gui delete_participant won't interfere with our ability to address the participant */ - const bool onlylocal = topic && ddsi_plugin.builtintopic_is_builtintopic (topic); + const bool onlylocal = topic && builtintopic_is_builtintopic (gv.builtin_topic_interface, topic); endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, onlylocal); new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity); @@ -2868,7 +2873,7 @@ static dds_return_t new_writer_guid (struct writer **wr_out, const struct nn_gui the other. */ ddsrt_mutex_lock (&wr->e.lock); ephash_insert_writer_guid (wr); - ddsi_plugin.builtintopic_write (&wr->e, now(), true); + builtintopic_write (gv.builtin_topic_interface, &wr->e, now(), true); ddsrt_mutex_unlock (&wr->e.lock); /* once it exists, match it with proxy writers and broadcast @@ -2927,7 +2932,7 @@ struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, str memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid)); new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL); ephash_insert_writer_guid (wr); - ddsi_plugin.builtintopic_write (&wr->e, now(), true); + builtintopic_write (gv.builtin_topic_interface, &wr->e, now(), true); match_writer_with_local_readers (wr, tnow); return lowr; } @@ -3034,7 +3039,7 @@ dds_return_t delete_writer_nolinger_locked (struct writer *wr) { DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid "PGUIDFMT") ...\n", PGUID (wr->e.guid)); ASSERT_MUTEX_HELD (&wr->e.lock); - ddsi_plugin.builtintopic_write (&wr->e, now(), false); + builtintopic_write (gv.builtin_topic_interface, &wr->e, now(), false); local_reader_ary_setinvalid (&wr->rdary); ephash_remove_writer_guid (wr); writer_set_state (wr, WRST_DELETING); @@ -3235,7 +3240,7 @@ static dds_return_t new_reader_guid if (rd_out) *rd_out = rd; - const bool onlylocal = topic && ddsi_plugin.builtintopic_is_builtintopic (topic); + const bool onlylocal = topic && builtintopic_is_builtintopic (gv.builtin_topic_interface, topic); endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, onlylocal); /* Copy QoS, merging in defaults */ @@ -3336,7 +3341,7 @@ static dds_return_t new_reader_guid ddsrt_mutex_lock (&rd->e.lock); ephash_insert_reader_guid (rd); - ddsi_plugin.builtintopic_write (&rd->e, now(), true); + builtintopic_write (gv.builtin_topic_interface, &rd->e, now(), true); ddsrt_mutex_unlock (&rd->e.lock); match_reader_with_proxy_writers (rd, tnow); @@ -3429,7 +3434,7 @@ dds_return_t delete_reader (const struct nn_guid *guid) return DDS_RETCODE_BAD_PARAMETER; } DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid "PGUIDFMT") ...\n", PGUID (*guid)); - ddsi_plugin.builtintopic_write (&rd->e, now(), false); + builtintopic_write (gv.builtin_topic_interface, &rd->e, now(), false); ephash_remove_reader_guid (rd); gcreq_reader (rd); return 0; @@ -3665,7 +3670,7 @@ void new_proxy_participant if (proxypp->owns_lease) lease_register (ddsrt_atomic_ldvoidp (&proxypp->lease)); - ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true); + builtintopic_write (gv.builtin_topic_interface, &proxypp->e, timestamp, true); ddsrt_mutex_unlock (&proxypp->e.lock); } @@ -3915,7 +3920,7 @@ int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t t return DDS_RETCODE_BAD_PARAMETER; } DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); - ddsi_plugin.builtintopic_write (&ppt->e, timestamp, false); + builtintopic_write (gv.builtin_topic_interface, &ppt->e, timestamp, false); remember_deleted_participant_guid (gv.deleted_participants, &ppt->e.guid); ephash_remove_proxy_participant_guid (ppt); ddsrt_mutex_unlock (&gv.lock); @@ -4161,7 +4166,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, /* locking the entity prevents matching while the built-in topic hasn't been published yet */ ddsrt_mutex_lock (&pwr->e.lock); ephash_insert_proxy_writer_guid (pwr); - ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true); + builtintopic_write (gv.builtin_topic_interface, &pwr->e, timestamp, true); ddsrt_mutex_unlock (&pwr->e.lock); match_proxy_writer_with_readers (pwr, tnow); @@ -4299,7 +4304,7 @@ int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int from removing themselves from the proxy writer's rdary[]. */ local_reader_ary_setinvalid (&pwr->rdary); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); - ddsi_plugin.builtintopic_write (&pwr->e, timestamp, false); + builtintopic_write (gv.builtin_topic_interface, &pwr->e, timestamp, false); ephash_remove_proxy_writer_guid (pwr); ddsrt_mutex_unlock (&gv.lock); gcreq_proxy_writer (pwr); @@ -4341,7 +4346,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, /* locking the entity prevents matching while the built-in topic hasn't been published yet */ ddsrt_mutex_lock (&prd->e.lock); ephash_insert_proxy_reader_guid (prd); - ddsi_plugin.builtintopic_write (&prd->e, timestamp, true); + builtintopic_write (gv.builtin_topic_interface, &prd->e, timestamp, true); ddsrt_mutex_unlock (&prd->e.lock); match_proxy_reader_with_writers (prd, tnow); @@ -4426,7 +4431,7 @@ int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n"); return DDS_RETCODE_BAD_PARAMETER; } - ddsi_plugin.builtintopic_write (&prd->e, timestamp, false); + builtintopic_write (gv.builtin_topic_interface, &prd->e, timestamp, false); ephash_remove_proxy_reader_guid (prd); ddsrt_mutex_unlock (&gv.lock); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 7976841..192206b 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -569,7 +569,7 @@ int rtps_config_prep (struct cfgst *cfgst) lease, gc, debmon; once thread state admin has been inited, upgrade the main thread one participating in the thread tracking stuff as if it had been created using create_thread(). */ - +#if 0 /* FIXME: threads are per-process, not per-domain */ { /* Temporary: thread states for each application thread is managed using thread_states structure */ @@ -582,6 +582,7 @@ int rtps_config_prep (struct cfgst *cfgst) #endif thread_states_init (max_threads); } +#endif /* Now the per-thread-log-buffers are set up, so print the configuration. After this there is no value to the source information for the various configuration elements, so free those. */ @@ -852,8 +853,6 @@ int rtps_init (void) gv.tstart = now (); /* wall clock time, used in logs */ - ddsi_plugin_init (); - ddsi_iid_init (); nn_plist_init_tables (); gv.disc_conn_uc = NULL; @@ -1327,7 +1326,6 @@ err_unicast_sockets: nn_plist_fini (&gv.default_plist_pp); ddsi_serdatapool_free (gv.serpool); nn_xmsgpool_free (gv.xmsgpool); - ddsi_iid_fini (); #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS err_network_partition_addrset: for (struct config_networkpartition_listelem *np = config.networkPartitions; np; np = np->next) @@ -1674,6 +1672,5 @@ void rtps_fini (void) ddsi_serdatapool_free (gv.serpool); nn_xmsgpool_free (gv.xmsgpool); - ddsi_iid_fini (); DDS_LOG(DDS_LC_CONFIG, "Finis.\n"); }