From 24802ecad368c9c89fd10ed51b83ff68a41d5a1d Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 7 Jan 2019 18:55:42 +0100 Subject: [PATCH 1/7] avoid setting topic and type name in the QoS objects returned in the built-in topics Signed-off-by: Erik Boasson --- src/core/ddsi/src/ddsi_serdata_builtin.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/core/ddsi/src/ddsi_serdata_builtin.c b/src/core/ddsi/src/ddsi_serdata_builtin.c index 0623ded..7498721 100644 --- a/src/core/ddsi/src/ddsi_serdata_builtin.c +++ b/src/core/ddsi/src/ddsi_serdata_builtin.c @@ -188,13 +188,22 @@ static char *dds_string_dup_reuse (char *old, const char *src) static dds_qos_t *dds_qos_from_xqos_reuse (dds_qos_t *old, const nn_xqos_t *src) { if (old == NULL) - return nn_xqos_dup (src); + { + old = os_malloc (sizeof (*old)); + nn_xqos_init_empty (old); + old->present |= QP_TOPIC_NAME | QP_TYPE_NAME; + nn_xqos_mergein_missing (old, src); + old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME); + } else { nn_xqos_fini (old); + nn_xqos_init_empty (old); + old->present |= QP_TOPIC_NAME | QP_TYPE_NAME; nn_xqos_mergein_missing (old, src); - return old; + old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME); } + return old; } static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_participant *sample) From 30f421ea9b4110426d8c2e8a11582659d9c17868 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Sun, 6 Jan 2019 13:11:02 +0100 Subject: [PATCH 2/7] remove a stray debugging printf when calling create_topic multiple times for the same topic Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_topic.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 00c2194..1466fde 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -316,8 +316,6 @@ static bool dupdef_qos_ok(const dds_qos_t *qos, const struct ddsi_sertopic *st) static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b) { - printf ("sertopic_equivalent %p %p (%s %s; %u %u; %p %p; %p %p)\n", (void*)a, (void*)b, a->name_typename, b->name_typename, a->serdata_basehash, b->serdata_basehash, (void *)a->ops, (void *)b->ops, (void *)a->serdata_ops, (void *)b->serdata_ops); - if (strcmp (a->name_typename, b->name_typename) != 0) return false; if (a->serdata_basehash != b->serdata_basehash) From d6dcb0558daff2fb543bd90f3edd2228a6886822 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 7 Jan 2019 12:52:17 +0100 Subject: [PATCH 3/7] fix incorrect QoS compare that breaks creating topics multiple times Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_qos.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ddsc/src/dds_qos.c b/src/core/ddsc/src/dds_qos.c index caa65dc..38c61e3 100644 --- a/src/core/ddsc/src/dds_qos.c +++ b/src/core/ddsc/src/dds_qos.c @@ -316,7 +316,7 @@ bool dds_qos_equal ( } else if (a == NULL || b == NULL) { return false; } else { - return nn_xqos_delta(a, b, ~(uint64_t)0) != 0; + return nn_xqos_delta(a, b, ~(uint64_t)0) == 0; } } From 66076817e1f98163a4dcc913be44eca075828597 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Sun, 6 Jan 2019 13:10:24 +0100 Subject: [PATCH 4/7] rework built-in topics again Move details of built-in topics out of the DDSI core (so the only hooks remain). For this, rtps_term had to be split, so now it is "stop" followed by "fini". Add a notion of local writers that are not bound to a participant ("local orphans"), so that the local built-in topic writers can be created during initialization. This eliminates the "builtin" participant. This uncovered in inconsistency in the unit tests: on the one hand, a newly created participant is expected to have no child entities; on the other hand, the built-in topics were expected to be returned by find_topic ... This inconsistency has been resolved by creating them lazily and accepting that find_topic can't return them until they have been created. Special code was in place in dds_create_reader anyway, so it is not expected to have any real consequence for applications. Use a special WHC implementation that regenerates the data on the fly using the internal discovery tables of DDSI, so that the samples are only stored by readers. This eliminates the memory overhead of that existed previously when the WHC of the writers stored the data. No longer return topic name and type name in the built-in topics, they have been extracted already and are not accessible through the normal interface but do cause problems when comparing QoS. Signed-off-by: Erik Boasson --- src/core/ddsc/CMakeLists.txt | 5 + src/core/ddsc/include/ddsc/dds.h | 2 +- src/core/ddsc/src/dds__builtin.h | 40 +- .../src/dds__serdata_builtintopic.h} | 22 +- src/core/ddsc/src/dds__types.h | 2 +- src/core/ddsc/src/dds__whc_builtintopic.h | 28 + src/core/ddsc/src/dds__write.h | 22 +- src/core/ddsc/src/dds_builtin.c | 532 ++++++------------ src/core/ddsc/src/dds_init.c | 19 +- src/core/ddsc/src/dds_participant.c | 1 + .../src/dds_serdata_builtintopic.c} | 45 +- .../src/dds_sertopic_builtintopic.c} | 20 +- src/core/ddsc/src/dds_whc_builtintopic.c | 201 +++++++ src/core/ddsc/src/dds_write.c | 510 ++++++++--------- src/core/ddsc/tests/builtin_topics.c | 15 +- src/core/ddsi/CMakeLists.txt | 3 - src/core/ddsi/include/ddsi/q_config.h | 6 +- src/core/ddsi/include/ddsi/q_entity.h | 16 +- src/core/ddsi/include/ddsi/q_gc.h | 1 + src/core/ddsi/include/ddsi/q_globals.h | 5 - src/core/ddsi/include/ddsi/q_rtps.h | 3 +- src/core/ddsi/include/ddsi/q_whc.h | 2 +- src/core/ddsi/src/q_entity.c | 260 ++++----- src/core/ddsi/src/q_gc.c | 13 +- src/core/ddsi/src/q_init.c | 20 +- src/core/ddsi/src/q_transmit.c | 2 +- 26 files changed, 876 insertions(+), 919 deletions(-) rename src/core/{ddsi/include/ddsi/ddsi_serdata_builtin.h => ddsc/src/dds__serdata_builtintopic.h} (57%) create mode 100644 src/core/ddsc/src/dds__whc_builtintopic.h rename src/core/{ddsi/src/ddsi_serdata_builtin.c => ddsc/src/dds_serdata_builtintopic.c} (82%) rename src/core/{ddsi/src/ddsi_sertopic_builtin.c => ddsc/src/dds_sertopic_builtintopic.c} (82%) create mode 100644 src/core/ddsc/src/dds_whc_builtintopic.c diff --git a/src/core/ddsc/CMakeLists.txt b/src/core/ddsc/CMakeLists.txt index f9c5ff4..919c2c7 100644 --- a/src/core/ddsc/CMakeLists.txt +++ b/src/core/ddsc/CMakeLists.txt @@ -37,6 +37,9 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src" dds_subscriber.c dds_write.c dds_whc.c + dds_whc_builtintopic.c + dds_serdata_builtintopic.c + dds_sertopic_builtintopic.c ) PREPEND(hdrs_public_ddsc "$$" @@ -73,6 +76,8 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src" dds__write.h dds__writer.h dds__whc.h + dds__whc_builtintopic.h + dds__serdata_builtintopic.h ) configure_file( diff --git a/src/core/ddsc/include/ddsc/dds.h b/src/core/ddsc/include/ddsc/dds.h index ec57ce8..65ebead 100644 --- a/src/core/ddsc/include/ddsc/dds.h +++ b/src/core/ddsc/include/ddsc/dds.h @@ -1777,7 +1777,7 @@ dds_write_flush( * @returns A dds_return_t indicating success or failure. */ _Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -DDS_EXPORT int +DDS_EXPORT dds_return_t dds_writecdr( dds_entity_t writer, struct ddsi_serdata *serdata); diff --git a/src/core/ddsc/src/dds__builtin.h b/src/core/ddsc/src/dds__builtin.h index 3601d67..38c7d11 100644 --- a/src/core/ddsc/src/dds__builtin.h +++ b/src/core/ddsc/src/dds__builtin.h @@ -13,49 +13,31 @@ #define _DDS_BUILTIN_H_ #include "ddsi/q_time.h" -#include "ddsi/ddsi_serdata_builtin.h" - #if defined (__cplusplus) extern "C" { #endif - /* Get actual topic in related participant related to topic 'id'. */ -_Must_inspect_result_ dds_entity_t -dds__get_builtin_topic( - _In_ dds_entity_t e, - _In_ dds_entity_t topic); - -/* Global publisher singleton (publishes only locally). */ -_Must_inspect_result_ dds_entity_t -dds__get_builtin_publisher( - void); +dds_entity_t dds__get_builtin_topic ( dds_entity_t e, dds_entity_t topic); /* Subscriber singleton within related participant. */ -_Must_inspect_result_ dds_entity_t -dds__get_builtin_subscriber( - _In_ dds_entity_t e); +dds_entity_t dds__get_builtin_subscriber(dds_entity_t e); /* Checks whether the reader QoS is valid for use with built-in topic TOPIC */ bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos); -/* Initialization and finalize functions. */ -void -dds__builtin_init( - void); +struct entity_common; +struct nn_guid; +struct ddsi_tkmap_instance; -void -dds__builtin_fini( - void); - -void -dds__builtin_write( - _In_ enum ddsi_sertopic_builtin_type type, - _In_ const nn_guid_t *guid, - _In_ nn_wctime_t timestamp, - _In_ bool alive); +void dds__builtin_init (void); +void dds__builtin_fini (void); +bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid); +struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid); +struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive); +void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/ddsi/ddsi_serdata_builtin.h b/src/core/ddsc/src/dds__serdata_builtintopic.h similarity index 57% rename from src/core/ddsi/include/ddsi/ddsi_serdata_builtin.h rename to src/core/ddsc/src/dds__serdata_builtintopic.h index 68a450d..ff0771d 100644 --- a/src/core/ddsi/include/ddsi/ddsi_serdata_builtin.h +++ b/src/core/ddsc/src/dds__serdata_builtintopic.h @@ -9,35 +9,33 @@ * * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause */ -#ifndef DDSI_SERDATA_BUILTIN_H -#define DDSI_SERDATA_BUILTIN_H +#ifndef DDSI_SERDATA_BUILTINTOPIC_H +#define DDSI_SERDATA_BUILTINTOPIC_H -#include "os/os.h" -#include "util/ut_avl.h" +#include "ddsi/q_xqos.h" #include "ddsi/ddsi_serdata.h" #include "ddsi/ddsi_sertopic.h" -#include "ddsi/q_xqos.h" -struct ddsi_serdata_builtin { +struct ddsi_serdata_builtintopic { struct ddsi_serdata c; nn_guid_t key; nn_xqos_t xqos; }; -enum ddsi_sertopic_builtin_type { +enum ddsi_sertopic_builtintopic_type { DSBT_PARTICIPANT, DSBT_READER, DSBT_WRITER }; -struct ddsi_sertopic_builtin { +struct ddsi_sertopic_builtintopic { struct ddsi_sertopic c; - enum ddsi_sertopic_builtin_type type; + enum ddsi_sertopic_builtintopic_type type; }; -extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin; -extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtin; +extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic; +extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic; -struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename); +struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename); #endif diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index ab6edb0..78b4665 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -52,7 +52,7 @@ struct rhc; * Obviously, it is encouraged to use condition variables and such. But * sometimes it wouldn't make that much of a difference and taking the * easy route is somewhat pragmatic. */ -#define DDS_HEADBANG_TIMEOUT_MS (10) +#define DDS_HEADBANG_TIMEOUT (DDS_MSECS (10)) typedef bool (*dds_querycondition_filter_with_ctx_fn) (const void * sample, const void *ctx); diff --git a/src/core/ddsc/src/dds__whc_builtintopic.h b/src/core/ddsc/src/dds__whc_builtintopic.h new file mode 100644 index 0000000..6c38157 --- /dev/null +++ b/src/core/ddsc/src/dds__whc_builtintopic.h @@ -0,0 +1,28 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDS_WHC_BUILTINTOPIC_H +#define DDS_WHC_BUILTINTOPIC_H + +#include "ddsi/q_whc.h" +#include "dds__serdata_builtintopic.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type); + +#if defined (__cplusplus) +} +#endif + +#endif /* Q_WHC_H */ diff --git a/src/core/ddsc/src/dds__write.h b/src/core/ddsc/src/dds__write.h index e78d65e..134b950 100644 --- a/src/core/ddsc/src/dds__write.h +++ b/src/core/ddsc/src/dds__write.h @@ -22,28 +22,16 @@ extern "C" { struct ddsi_serdata; -typedef enum -{ +typedef enum { DDS_WR_ACTION_WRITE = 0, DDS_WR_ACTION_WRITE_DISPOSE = DDS_WR_DISPOSE_BIT, DDS_WR_ACTION_DISPOSE = DDS_WR_KEY_BIT | DDS_WR_DISPOSE_BIT, DDS_WR_ACTION_UNREGISTER = DDS_WR_KEY_BIT | DDS_WR_UNREGISTER_BIT -} -dds_write_action; +} dds_write_action; -int -dds_write_impl( - _In_ dds_writer *wr, - _In_ const void *data, - _In_ dds_time_t tstamp, - _In_ dds_write_action action); - -int -dds_writecdr_impl( - _In_ dds_writer *wr, - _Inout_ struct ddsi_serdata *d, - _In_ dds_time_t tstamp, - _In_ dds_write_action action); +dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp, dds_write_action action); +dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action); +dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index db0cd90..bff293b 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -24,418 +24,216 @@ #include "dds__subscriber.h" #include "dds__write.h" #include "dds__writer.h" +#include "dds__whc_builtintopic.h" +#include "dds__serdata_builtintopic.h" #include "ddsi/q_qosmatch.h" -#include "ddsi/ddsi_serdata_builtin.h" +#include "ddsi/ddsi_tkmap.h" -static dds_return_t -dds__delete_builtin_participant( - dds_entity *e); +static struct ddsi_sertopic *builtin_participant_topic; +static struct ddsi_sertopic *builtin_reader_topic; +static struct ddsi_sertopic *builtin_writer_topic; +static struct local_orphan_writer *builtintopic_writer_participant; +static struct local_orphan_writer *builtintopic_writer_publications; +static struct local_orphan_writer *builtintopic_writer_subscriptions; -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_participant( - void); - -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_publisher( - _In_ dds_entity_t participant); - -static dds_entity_t -dds__create_builtin_writer( - _In_ dds_entity_t topic); - -static _Must_inspect_result_ dds_entity_t -dds__get_builtin_participant( - void); - - - - -static os_mutex g_builtin_mutex; -static os_atomic_uint32_t m_call_count = OS_ATOMIC_UINT32_INIT(0); - -/* Singletons are used to publish builtin data locally. */ -static dds_entity_t g_builtin_local_participant = 0; -static dds_entity_t g_builtin_local_publisher = 0; -static dds_entity_t g_builtin_local_writers[] = { - 0, /* index DDS_BUILTIN_TOPIC_DCPSPARTICIPANT - DDS_KIND_INTERNAL - 1 */ - 0, /* index DDS_BUILTIN_TOPIC_DCPSTOPIC - DDS_KIND_INTERNAL - 1 */ - 0, /* index DDS_BUILTIN_TOPIC_DCPSPUBLICATION - DDS_KIND_INTERNAL - 1 */ - 0, /* index DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION - DDS_KIND_INTERNAL - 1 */ -}; - -static _Must_inspect_result_ dds_qos_t * -dds__create_builtin_qos( - void) +static dds_qos_t *dds__create_builtin_qos (void) { - const char *partition = "__BUILT-IN PARTITION__"; - dds_qos_t *qos = dds_create_qos(); - dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); - dds_qset_presentation(qos, DDS_PRESENTATION_TOPIC, false, false); - dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(100)); - dds_qset_partition(qos, 1, &partition); - return qos; + const char *partition = "__BUILT-IN PARTITION__"; + dds_qos_t *qos = dds_create_qos (); + dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_presentation (qos, DDS_PRESENTATION_TOPIC, false, false); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(100)); + dds_qset_partition (qos, 1, &partition); + return qos; } -static dds_return_t -dds__delete_builtin_participant( - dds_entity *e) +void dds__builtin_init (void) { - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); + dds_qos_t *qos = dds__create_builtin_qos (); - assert(e); - assert(thr); - assert(dds_entity_kind(e->m_hdl) == DDS_KIND_PARTICIPANT); + builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant"); + builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription"); + builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication"); - if (asleep) { - thread_state_awake(thr); - } - dds_domain_free(e->m_domain); - if (asleep) { - thread_state_asleep(thr); - } + builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT)); + builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER)); + builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER)); - return DDS_RETCODE_OK; + dds_delete_qos (qos); } -/* - * We don't use the 'normal' create participant. - * - * This way, the application is not able to access the local builtin writers. - * Also, we can indicate that it should be a 'local only' participant, which - * means that none of the entities under the hierarchy of this participant will - * be exposed to the outside world. This is what we want, because these builtin - * writers are only applicable to local user readers. - */ -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_participant( - void) +void dds__builtin_fini (void) { - int q_rc; - nn_plist_t plist; - struct thread_state1 * thr; - bool asleep; - nn_guid_t guid; - dds_entity_t participant; - dds_participant *pp; + /* No more sources for builtin topic samples */ + struct thread_state1 * const self = lookup_thread_state (); + thread_state_awake (self); + delete_local_orphan_writer (builtintopic_writer_participant); + delete_local_orphan_writer (builtintopic_writer_publications); + delete_local_orphan_writer (builtintopic_writer_subscriptions); + thread_state_asleep (self); - nn_plist_init_empty (&plist); - - thr = lookup_thread_state (); - asleep = !vtime_awake_p (thr->vtime); - if (asleep) { - thread_state_awake (thr); - } - q_rc = new_participant (&guid, RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_ONLY_LOCAL, &plist); - if (asleep) { - thread_state_asleep (thr); - } - - if (q_rc != 0) { - DDS_ERROR("Internal builtin error\n"); - participant = DDS_ERRNO(DDS_RETCODE_ERROR); - goto fail; - } - - pp = dds_alloc (sizeof (*pp)); - participant = dds_entity_init (&pp->m_entity, NULL, DDS_KIND_PARTICIPANT, NULL, NULL, 0); - if (participant < 0) { - goto fail; - } - - pp->m_entity.m_guid = guid; - pp->m_entity.m_domain = dds_domain_create (config.domainId.value); - pp->m_entity.m_domainid = config.domainId.value; - pp->m_entity.m_deriver.delete = dds__delete_builtin_participant; - -fail: - return participant; + ddsi_sertopic_unref (builtin_participant_topic); + ddsi_sertopic_unref (builtin_reader_topic); + ddsi_sertopic_unref (builtin_writer_topic); } -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_publisher( - _In_ dds_entity_t participant) +dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic) { - dds_qos_t *qos = dds__create_builtin_qos(); - dds_entity_t pub = dds_create_publisher(participant, qos, NULL); - dds_delete_qos(qos); - return pub; + dds_entity_t pp; + dds_entity_t tp; + + if ((pp = dds_get_participant (e)) <= 0) + return pp; + + struct ddsi_sertopic *sertopic; + if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { + sertopic = builtin_participant_topic; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { + sertopic = builtin_writer_topic; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { + sertopic = builtin_reader_topic; + } else { + assert (0); + return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); + } + + dds_qos_t *qos = dds__create_builtin_qos (); + tp = dds_create_topic_arbitrary (pp, sertopic, sertopic->name, qos, NULL, NULL); + dds_delete_qos (qos); + return tp; } -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_subscriber( - _In_ dds_entity *participant) +static bool qos_has_resource_limits (const dds_qos_t *qos) { - dds_qos_t *qos = dds__create_builtin_qos(); - dds_entity_t sub = dds__create_subscriber_l(participant, qos, NULL); - dds_delete_qos(qos); - return sub; + return (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED || + qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED || + qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED); } -static dds_entity_t -dds__create_builtin_writer( - _In_ dds_entity_t topic) +bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos) { - dds_entity_t wr; - dds_entity_t pub = dds__get_builtin_publisher(); - if (pub > 0) { - dds_entity_t top = dds__get_builtin_topic(pub, topic); - if (top > 0) { - wr = dds_create_writer(pub, top, NULL, NULL); - (void)dds_delete(top); - } else { - wr = top; - } + if (qos == NULL) + /* default QoS inherited from topic is ok by definition */ + return true; + else + { + /* failing writes on built-in topics are unwelcome complications, so we simply forbid the creation of + a reader matching a built-in topics writer that has resource limits */ + struct local_orphan_writer *bwr; + if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { + bwr = builtintopic_writer_participant; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { + bwr = builtintopic_writer_publications; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { + bwr = builtintopic_writer_subscriptions; } else { - wr = pub; + assert (0); + return false; } - return wr; + return qos_match_p (qos, bwr->wr.xqos) && !qos_has_resource_limits (qos); + } } - -static _Must_inspect_result_ dds_entity_t -dds__get_builtin_participant( - void) +static dds_entity_t dds__create_builtin_subscriber (dds_entity *participant) { - if (g_builtin_local_participant == 0) { - g_builtin_local_participant = dds__create_builtin_participant(); - (void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPARTICIPANT); - (void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPUBLICATION); - (void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION); - } - return g_builtin_local_participant; + dds_qos_t *qos = dds__create_builtin_qos (); + dds_entity_t sub = dds__create_subscriber_l (participant, qos, NULL); + dds_delete_qos (qos); + return sub; } - -_Must_inspect_result_ dds_entity_t -dds__get_builtin_publisher( - void) +dds_entity_t dds__get_builtin_subscriber (dds_entity_t e) { - if (g_builtin_local_publisher == 0) { - dds_entity_t par = dds__get_builtin_participant(); - if (par > 0) { - g_builtin_local_publisher = dds__create_builtin_publisher(par); - } - } - return g_builtin_local_publisher; -} + dds_entity_t sub; + dds_return_t ret; + dds_entity_t pp; + dds_participant *p; + dds_entity *part_entity; -_Must_inspect_result_ dds_entity_t -dds__get_builtin_subscriber( - _In_ dds_entity_t e) -{ - dds_entity_t sub; - dds_return_t ret; - dds_entity_t participant; - dds_participant *p; - dds_entity *part_entity; - - participant = dds_get_participant(e); - if (participant <= 0) { - /* error already in participant error; no need to repeat error */ - ret = participant; - goto error; - } - ret = dds_entity_lock(participant, DDS_KIND_PARTICIPANT, (dds_entity **)&part_entity); - if (ret != DDS_RETCODE_OK) { - goto error; - } - p = (dds_participant *)part_entity; - if(p->m_builtin_subscriber <= 0) { - p->m_builtin_subscriber = dds__create_builtin_subscriber(part_entity); - } - sub = p->m_builtin_subscriber; - dds_entity_unlock(part_entity); - - return sub; - - /* Error handling */ -error: - assert(ret < 0); + if ((pp = dds_get_participant (e)) <= 0) + return pp; + if ((ret = dds_entity_lock (pp, DDS_KIND_PARTICIPANT, &part_entity)) < 0) return ret; + + p = (dds_participant *) part_entity; + if (p->m_builtin_subscriber <= 0) { + p->m_builtin_subscriber = dds__create_builtin_subscriber (part_entity); + } + sub = p->m_builtin_subscriber; + dds_entity_unlock(part_entity); + return sub; } - -_Must_inspect_result_ dds_entity_t -dds__get_builtin_topic( - _In_ dds_entity_t e, - _In_ dds_entity_t topic) +bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid) { - dds_entity_t participant; - dds_entity_t ret; - - participant = dds_get_participant(e); - if (participant > 0) { - struct ddsi_sertopic *sertopic; - - if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { - sertopic = gv.builtin_participant_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { - sertopic = gv.builtin_writer_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { - sertopic = gv.builtin_reader_topic; - } else { - DDS_ERROR("Invalid builtin-topic handle(%d)\n", topic); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err_invalid_topic; - } - - ret = dds_find_topic (participant, sertopic->name); - if (ret < 0 && dds_err_nr(ret) == DDS_RETCODE_PRECONDITION_NOT_MET) { - dds_qos_t *qos = dds__create_builtin_qos(); - ret = dds_create_topic_arbitrary(participant, sertopic, sertopic->name, qos, NULL, NULL); - dds_delete_qos(qos); - } - } else { - /* Failed to get participant of provided entity */ - ret = participant; - } - -err_invalid_topic: - return ret; + return !(onlylocal || is_builtin_endpoint (entityid, vendorid)); } - -static _Must_inspect_result_ dds_entity_t -dds__get_builtin_writer( - _In_ dds_entity_t topic) +struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid) { - dds_entity_t wr; - if ((topic >= DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) && (topic <= DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION)) { - int index = (int)(topic - DDS_KIND_INTERNAL - 1); - os_mutexLock(&g_builtin_mutex); - wr = g_builtin_local_writers[index]; - if (wr == 0) { - wr = dds__create_builtin_writer(topic); - if (wr > 0) { - g_builtin_local_writers[index] = wr; - } - } - os_mutexUnlock(&g_builtin_mutex); - } else { - DDS_ERROR("Given topic is not a builtin topic\n"); - wr = DDS_ERRNO(DDS_RETCODE_ERROR); - } - return wr; + struct ddsi_tkmap_instance *tk; + struct ddsi_serdata *sd; + struct nn_keyhash kh; + memcpy (&kh, guid, sizeof (kh)); + /* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */ + sd = ddsi_serdata_from_keyhash (builtin_participant_topic, &kh); + tk = ddsi_tkmap_find (sd, false, true); + ddsi_serdata_unref (sd); + return tk; } -static dds_return_t -dds__builtin_write_int( - _In_ dds_entity_t topic, - _In_ const nn_guid_t *guid, - _In_ dds_time_t timestamp, - _In_ bool alive) +struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive) { - dds_return_t ret = DDS_RETCODE_OK; - if (os_atomic_inc32_nv(&m_call_count) > 1) { - dds_entity_t wr; - wr = dds__get_builtin_writer(topic); - if (wr > 0) { - struct ddsi_sertopic *sertopic; - struct ddsi_serdata *serdata; - struct nn_keyhash keyhash; - struct dds_writer *wraddr; - - if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { - sertopic = gv.builtin_participant_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { - sertopic = gv.builtin_writer_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { - sertopic = gv.builtin_reader_topic; - } else { - sertopic = NULL; - assert (0); - } - - memcpy (&keyhash, guid, sizeof (keyhash)); - serdata = ddsi_serdata_from_keyhash(sertopic, &keyhash); - - ret = dds_writer_lock(wr, &wraddr); - if (ret == DDS_RETCODE_OK) { - ret = dds_writecdr_impl (wraddr, serdata, timestamp, alive ? 0 : (DDS_WR_DISPOSE_BIT | DDS_WR_UNREGISTER_BIT)); - dds_writer_unlock(wraddr); - } - } else { - ret = wr; - } - } - os_atomic_dec32(&m_call_count); - return ret; + /* initialize to avoid gcc warning ultimately caused by C's horrible type system */ + struct ddsi_sertopic *topic = NULL; + struct ddsi_serdata *serdata; + struct nn_keyhash keyhash; + switch (e->kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + topic = builtin_participant_topic; + break; + case EK_WRITER: + case EK_PROXY_WRITER: + topic = builtin_writer_topic; + break; + case EK_READER: + case EK_PROXY_READER: + topic = builtin_reader_topic; + break; + } + assert (topic != NULL); + memcpy (&keyhash, &e->guid, sizeof (keyhash)); + serdata = ddsi_serdata_from_keyhash (topic, &keyhash); + serdata->timestamp = timestamp; + serdata->statusinfo = alive ? 0 : NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER; + return serdata; } -void -dds__builtin_write( - _In_ enum ddsi_sertopic_builtin_type type, - _In_ const nn_guid_t *guid, - _In_ nn_wctime_t timestamp, - _In_ bool alive) +void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive) { - /* initialize to avoid compiler warning ultimately caused by C's horrible type system */ - dds_entity_t topic = 0; - switch (type) + if (ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, get_entity_vendorid (e))) + { + /* initialize to avoid gcc warning ultimately caused by C's horrible type system */ + struct local_orphan_writer *bwr = NULL; + struct ddsi_serdata *serdata = dds__builtin_make_sample (e, timestamp, alive); + assert (e->tk != NULL); + switch (e->kind) { - case DSBT_PARTICIPANT: - topic = DDS_BUILTIN_TOPIC_DCPSPARTICIPANT; - break; - case DSBT_WRITER: - topic = DDS_BUILTIN_TOPIC_DCPSPUBLICATION; - break; - case DSBT_READER: - topic = DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION; - break; + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + bwr = builtintopic_writer_participant; + break; + case EK_WRITER: + case EK_PROXY_WRITER: + bwr = builtintopic_writer_publications; + break; + case EK_READER: + case EK_PROXY_READER: + bwr = builtintopic_writer_subscriptions; + break; } - assert(topic != 0); - (void)dds__builtin_write_int(topic, guid, timestamp.v, alive); -} - -bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos) -{ - if (qos == NULL) { - /* default QoS inherited from topic is ok by definition */ - return true; - } else { - dds_entity_t wr = dds__get_builtin_writer(topic); - dds_qos_t *wrqos = dds_create_qos(); - dds_return_t ret = dds_get_qos(wr, wrqos); - bool match; - assert (ret == DDS_RETCODE_OK); - (void)ret; - if (!qos_match_p (qos, wrqos)) { - match = false; - } else if (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED || - qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED || - qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED) { - /* this means a write on the built-in topic writer can't fail */ - match = false; - } else { - match = true; - } - dds_delete_qos(wrqos); - return match; - } -} - -void -dds__builtin_init( - void) -{ - assert(os_atomic_ld32(&m_call_count) == 0); - os_mutexInit(&g_builtin_mutex); - os_atomic_inc32(&m_call_count); -} - -void -dds__builtin_fini( - void) -{ - assert(os_atomic_ld32(&m_call_count) > 0); - while (os_atomic_dec32_nv(&m_call_count) > 0) { - os_atomic_inc32_nv(&m_call_count); - dds_sleepfor(DDS_MSECS(10)); - } - (void)dds_delete(g_builtin_local_participant); - g_builtin_local_participant = 0; - g_builtin_local_publisher = 0; - memset(g_builtin_local_writers, 0, sizeof(g_builtin_local_writers)); - os_mutexDestroy(&g_builtin_mutex); + dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata); + } } diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 9eae64f..23802d4 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -18,6 +18,7 @@ #include "dds__domain.h" #include "dds__err.h" #include "dds__builtin.h" +#include "dds__whc_builtintopic.h" #include "ddsi/ddsi_iid.h" #include "ddsi/ddsi_tkmap.h" #include "ddsi/ddsi_serdata.h" @@ -106,8 +107,6 @@ dds_init(dds_domainid_t domain) * main configured domain id is. */ dds_global.m_default_domain = config.domainId.value; - dds__builtin_init(); - if (rtps_config_prep(dds_cfgst) != 0) { DDS_ERROR("Failed to configure RTPS\n"); @@ -138,6 +137,8 @@ dds_init(dds_domainid_t domain) goto fail_rtps_init; } + dds__builtin_init (); + if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0) { DDS_ERROR("Failed to start the servicelease\n"); @@ -172,7 +173,8 @@ skip: fail_servicelease_start: if (gv.servicelease) nn_servicelease_stop_renewing (gv.servicelease); - rtps_term (); + rtps_stop (); + rtps_fini (); fail_rtps_init: if (gv.servicelease) { @@ -182,7 +184,6 @@ fail_rtps_init: fail_servicelease_new: thread_states_fini(); fail_rtps_config: - dds__builtin_fini(); fail_config_domainid: dds_global.m_default_domain = DDS_DOMAIN_DEFAULT; config_fini (dds_cfgst); @@ -206,11 +207,11 @@ extern void dds_fini (void) dds_global.m_init_count--; if (dds_global.m_init_count == 0) { - dds__builtin_fini(); - if (gv.servicelease) nn_servicelease_stop_renewing (gv.servicelease); - rtps_term (); + rtps_stop (); + dds__builtin_fini (); + rtps_fini (); if (gv.servicelease) nn_servicelease_free (gv.servicelease); gv.servicelease = NULL; @@ -247,7 +248,9 @@ void ddsi_plugin_init (void) ddsi_plugin.init_fn = dds__init_plugin; ddsi_plugin.fini_fn = dds__fini_plugin; - ddsi_plugin.builtin_write = dds__builtin_write; + ddsi_plugin.builtintopic_is_visible = dds__builtin_is_visible; + ddsi_plugin.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry; + ddsi_plugin.builtintopic_write = dds__builtin_write; ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free; ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini; diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 7c4d399..5daf5a5 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -18,6 +18,7 @@ #include "dds__domain.h" #include "dds__participant.h" #include "dds__err.h" +#include "dds__builtin.h" #define DDS_PARTICIPANT_STATUS_MASK 0u diff --git a/src/core/ddsi/src/ddsi_serdata_builtin.c b/src/core/ddsc/src/dds_serdata_builtintopic.c similarity index 82% rename from src/core/ddsi/src/ddsi_serdata_builtin.c rename to src/core/ddsc/src/dds_serdata_builtintopic.c index 7498721..c288a83 100644 --- a/src/core/ddsi/src/ddsi_serdata_builtin.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -23,11 +23,10 @@ #include #include "os/os.h" #include "dds__key.h" -#include "ddsi/ddsi_tkmap.h" #include "dds__stream.h" +#include "dds__serdata_builtintopic.h" +#include "ddsi/ddsi_tkmap.h" #include "ddsi/q_entity.h" -#include "ddsi/ddsi_serdata_builtin.h" -//#include "dds.h" /* FIXME: need the sample types of the built-in topics */ static const uint64_t unihashconsts[] = { UINT64_C (16292676669999574021), @@ -46,7 +45,7 @@ static uint32_t hash_guid (const nn_guid_t *g) >> 32); } -static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d, uint32_t basehash) +static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtintopic *d, uint32_t basehash) { d->c.hash = hash_guid (&d->key) ^ basehash; return &d->c; @@ -54,37 +53,37 @@ static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d, static bool serdata_builtin_eqkey(const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn) { - const struct ddsi_serdata_builtin *a = (const struct ddsi_serdata_builtin *)acmn; - const struct ddsi_serdata_builtin *b = (const struct ddsi_serdata_builtin *)bcmn; + const struct ddsi_serdata_builtintopic *a = (const struct ddsi_serdata_builtintopic *)acmn; + const struct ddsi_serdata_builtintopic *b = (const struct ddsi_serdata_builtintopic *)bcmn; return memcmp (&a->key, &b->key, sizeof (a->key)) == 0; } static void serdata_builtin_free(struct ddsi_serdata *dcmn) { - struct ddsi_serdata_builtin *d = (struct ddsi_serdata_builtin *)dcmn; + struct ddsi_serdata_builtintopic *d = (struct ddsi_serdata_builtintopic *)dcmn; if (d->c.kind == SDK_DATA) nn_xqos_fini (&d->xqos); os_free (d); } -static struct ddsi_serdata_builtin *serdata_builtin_new(const struct ddsi_sertopic_builtin *tp, enum ddsi_serdata_kind kind) +static struct ddsi_serdata_builtintopic *serdata_builtin_new(const struct ddsi_sertopic_builtintopic *tp, enum ddsi_serdata_kind kind) { - struct ddsi_serdata_builtin *d = os_malloc(sizeof (*d)); + struct ddsi_serdata_builtintopic *d = os_malloc(sizeof (*d)); ddsi_serdata_init (&d->c, &tp->c, kind); return d; } -static void from_entity_pp (struct ddsi_serdata_builtin *d, const struct participant *pp) +static void from_entity_pp (struct ddsi_serdata_builtintopic *d, const struct participant *pp) { nn_xqos_copy(&d->xqos, &pp->plist->qos); } -static void from_entity_proxypp (struct ddsi_serdata_builtin *d, const struct proxy_participant *proxypp) +static void from_entity_proxypp (struct ddsi_serdata_builtintopic *d, const struct proxy_participant *proxypp) { nn_xqos_copy(&d->xqos, &proxypp->plist->qos); } -static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const struct ddsi_sertopic *tp) +static void set_topic_type_from_sertopic (struct ddsi_serdata_builtintopic *d, const struct ddsi_sertopic *tp) { if (!(d->xqos.present & QP_TOPIC_NAME)) { @@ -98,26 +97,26 @@ static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const } } -static void from_entity_rd (struct ddsi_serdata_builtin *d, const struct reader *rd) +static void from_entity_rd (struct ddsi_serdata_builtintopic *d, const struct reader *rd) { nn_xqos_copy(&d->xqos, rd->xqos); set_topic_type_from_sertopic(d, rd->topic); } -static void from_entity_prd (struct ddsi_serdata_builtin *d, const struct proxy_reader *prd) +static void from_entity_prd (struct ddsi_serdata_builtintopic *d, const struct proxy_reader *prd) { nn_xqos_copy(&d->xqos, prd->c.xqos); assert (d->xqos.present & QP_TOPIC_NAME); assert (d->xqos.present & QP_TYPE_NAME); } -static void from_entity_wr (struct ddsi_serdata_builtin *d, const struct writer *wr) +static void from_entity_wr (struct ddsi_serdata_builtintopic *d, const struct writer *wr) { nn_xqos_copy(&d->xqos, wr->xqos); set_topic_type_from_sertopic(d, wr->topic); } -static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_writer *pwr) +static void from_entity_pwr (struct ddsi_serdata_builtintopic *d, const struct proxy_writer *pwr) { nn_xqos_copy(&d->xqos, pwr->c.xqos); assert (d->xqos.present & QP_TOPIC_NAME); @@ -127,10 +126,10 @@ static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_ struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash) { /* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */ - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)tpcmn; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn; /* keyhash must in host format (which the GUIDs always are internally) */ const struct entity_common *entity = ephash_lookup_guid_untyped ((const nn_guid_t *) keyhash->value); - struct ddsi_serdata_builtin *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY); + struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY); memcpy (&d->key, keyhash->value, sizeof (d->key)); if (entity) { @@ -206,7 +205,7 @@ static dds_qos_t *dds_qos_from_xqos_reuse (dds_qos_t *old, const nn_xqos_t *src) return old; } -static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_participant *sample) +static bool to_sample_pp (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_participant *sample) { convkey (&sample->key, &d->key); if (d->c.kind == SDK_DATA) @@ -216,7 +215,7 @@ static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_built return true; } -static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_endpoint *sample) +static bool to_sample_endpoint (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_endpoint *sample) { nn_guid_t ppguid; convkey (&sample->key, &d->key); @@ -236,8 +235,8 @@ static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds static bool serdata_builtin_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim) { - const struct ddsi_serdata_builtin *d = (const struct ddsi_serdata_builtin *)serdata_common; - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)topic; + const struct ddsi_serdata_builtintopic *d = (const struct ddsi_serdata_builtintopic *)serdata_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)topic; if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */ /* FIXME: completing builtin topic support along these lines requires subscribers, publishers and topics to also become DDSI entities - which is probably a good thing anyway */ switch (tp->type) @@ -279,7 +278,7 @@ static void serdata_builtin_to_ser_unref (struct ddsi_serdata *serdata_common, c (void)serdata_common; (void)ref; } -const struct ddsi_serdata_ops ddsi_serdata_ops_builtin = { +const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = { .get_size = serdata_builtin_get_size, .eqkey = serdata_builtin_eqkey, .free = serdata_builtin_free, diff --git a/src/core/ddsi/src/ddsi_sertopic_builtin.c b/src/core/ddsc/src/dds_sertopic_builtintopic.c similarity index 82% rename from src/core/ddsi/src/ddsi_sertopic_builtin.c rename to src/core/ddsc/src/dds_sertopic_builtintopic.c index 5092a6d..5783ab5 100644 --- a/src/core/ddsi/src/ddsi_sertopic_builtin.c +++ b/src/core/ddsc/src/dds_sertopic_builtintopic.c @@ -20,22 +20,22 @@ #include "ddsi/q_config.h" #include "ddsi/q_freelist.h" #include "ddsi/ddsi_sertopic.h" -#include "ddsi/ddsi_serdata_builtin.h" #include "ddsc/dds.h" +#include "dds__serdata_builtintopic.h" /* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */ -struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename) +struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename) { - struct ddsi_sertopic_builtin *tp = os_malloc (sizeof (*tp)); + struct ddsi_sertopic_builtintopic *tp = os_malloc (sizeof (*tp)); tp->c.iid = ddsi_iid_gen(); tp->c.name = dds_string_dup (name); tp->c.typename = dds_string_dup (typename); const size_t name_typename_size = strlen (tp->c.name) + 1 + strlen (tp->c.typename) + 1; tp->c.name_typename = dds_alloc (name_typename_size); snprintf (tp->c.name_typename, name_typename_size, "%s/%s", tp->c.name, tp->c.typename); - tp->c.ops = &ddsi_sertopic_ops_builtin; - tp->c.serdata_ops = &ddsi_serdata_ops_builtin; + tp->c.ops = &ddsi_sertopic_ops_builtintopic; + tp->c.serdata_ops = &ddsi_serdata_ops_builtintopic; tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops); tp->c.status_cb = 0; tp->c.status_cb_entity = NULL; @@ -66,7 +66,7 @@ static void free_endpoint (void *vsample) sample->qos = NULL; } -static size_t get_size (enum ddsi_sertopic_builtin_type type) +static size_t get_size (enum ddsi_sertopic_builtintopic_type type) { switch (type) { @@ -82,14 +82,14 @@ static size_t get_size (enum ddsi_sertopic_builtin_type type) static void sertopic_builtin_zero_samples (const struct ddsi_sertopic *sertopic_common, void *samples, size_t count) { - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common; size_t size = get_size (tp->type); memset (samples, 0, size * count); } static void sertopic_builtin_realloc_samples (void **ptrs, const struct ddsi_sertopic *sertopic_common, void *old, size_t oldcount, size_t count) { - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common; const size_t size = get_size (tp->type); char *new = dds_realloc (old, size * count); if (new && count > oldcount) @@ -105,7 +105,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_ { if (count > 0) { - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common; const size_t size = get_size (tp->type); #ifndef NDEBUG for (size_t i = 0, off = 0; i < count; i++, off += size) @@ -139,7 +139,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_ } } -const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin = { +const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic = { .deinit = sertopic_builtin_deinit, .zero_samples = sertopic_builtin_zero_samples, .realloc_samples = sertopic_builtin_realloc_samples, diff --git a/src/core/ddsc/src/dds_whc_builtintopic.c b/src/core/ddsc/src/dds_whc_builtintopic.c new file mode 100644 index 0000000..3453102 --- /dev/null +++ b/src/core/ddsc/src/dds_whc_builtintopic.c @@ -0,0 +1,201 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include + +#include "os/os.h" +#include "ddsi/ddsi_serdata.h" +#include "ddsi/q_unused.h" +#include "ddsi/q_config.h" +#include "ddsi/q_ephash.h" +#include "ddsi/q_entity.h" +#include "ddsi/ddsi_tkmap.h" +#include "dds__serdata_builtintopic.h" +#include "dds__whc_builtintopic.h" +#include "dds__builtin.h" + +struct bwhc { + struct whc common; + enum ddsi_sertopic_builtintopic_type type; +}; + +enum bwhc_iter_state { + BIS_INIT_LOCAL, + BIS_LOCAL, + BIS_INIT_PROXY, + BIS_PROXY +}; + +struct bwhc_iter { + struct whc_sample_iter_base c; + enum bwhc_iter_state st; + bool have_sample; + struct ephash_enum it; +}; + +/* check that our definition of whc_sample_iter fits in the type that callers allocate */ +struct bwhc_sample_iter_sizecheck { + char fits_in_generic_type[sizeof(struct bwhc_iter) <= sizeof(struct whc_sample_iter) ? 1 : -1]; +}; + +static void bwhc_free (struct whc *whc_generic) +{ + os_free (whc_generic); +} + +static void bwhc_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it) +{ + struct bwhc_iter *it = (struct bwhc_iter *) opaque_it; + it->c.whc = (struct whc *) whc_generic; + it->st = BIS_INIT_LOCAL; + it->have_sample = false; +} + +static bool is_visible (const struct entity_common *e) +{ + const nn_vendorid_t vendorid = get_entity_vendorid (e); + return ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, vendorid); +} + +static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample) +{ + struct bwhc_iter * const it = (struct bwhc_iter *) opaque_it; + struct bwhc * const whc = (struct bwhc *) it->c.whc; + enum entity_kind kind = EK_PARTICIPANT; /* pacify gcc */ + struct entity_common *entity; + + if (it->have_sample) + { + ddsi_serdata_unref (sample->serdata); + it->have_sample = false; + } + + /* most fields really don't matter, so memset */ + memset (sample, 0, sizeof (*sample)); + + switch (it->st) + { + case BIS_INIT_LOCAL: + switch (whc->type) { + case DSBT_PARTICIPANT: kind = EK_PARTICIPANT; break; + case DSBT_WRITER: kind = EK_WRITER; break; + case DSBT_READER: kind = EK_READER; break; + } + assert (whc->type == DSBT_PARTICIPANT || kind != EK_PARTICIPANT); + ephash_enum_init (&it->it, kind); + it->st = BIS_LOCAL; + /* FALLS THROUGH */ + case BIS_LOCAL: + while ((entity = ephash_enum_next (&it->it)) != NULL) + if (is_visible (entity)) + break; + if (entity) { + sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true); + it->have_sample = true; + return true; + } else { + ephash_enum_fini (&it->it); + it->st = BIS_INIT_PROXY; + } + /* FALLS THROUGH */ + case BIS_INIT_PROXY: + switch (whc->type) { + case DSBT_PARTICIPANT: kind = EK_PROXY_PARTICIPANT; break; + case DSBT_WRITER: kind = EK_PROXY_WRITER; break; + case DSBT_READER: kind = EK_PROXY_READER; break; + } + assert (kind != EK_PARTICIPANT); + ephash_enum_init (&it->it, kind); + it->st = BIS_PROXY; + /* FALLS THROUGH */ + case BIS_PROXY: + while ((entity = ephash_enum_next (&it->it)) != NULL) + if (is_visible (entity)) + break; + if (entity) { + sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true); + it->have_sample = true; + return true; + } else { + ephash_enum_fini (&it->it); + return false; + } + } + assert (0); + return false; +} + +static void bwhc_get_state (const struct whc *whc, struct whc_state *st) +{ + (void)whc; + st->max_seq = -1; + st->min_seq = -1; + st->unacked_bytes = 0; +} + +static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) +{ + (void)whc; + (void)max_drop_seq; + (void)seq; + (void)serdata; + (void)tk; + if (plist) + os_free (plist); + return 0; +} + +static unsigned bwhc_downgrade_to_volatile (struct whc *whc, struct whc_state *st) +{ + (void)whc; + (void)st; + return 0; +} + +static unsigned bwhc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list) +{ + (void)whc; + (void)max_drop_seq; + (void)whcst; + *deferred_free_list = NULL; + return 0; +} + +static void bwhc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list) +{ + (void)whc; + (void)deferred_free_list; +} + +static const struct whc_ops bwhc_ops = { + .insert = bwhc_insert, + .remove_acked_messages = bwhc_remove_acked_messages, + .free_deferred_free_list = bwhc_free_deferred_free_list, + .get_state = bwhc_get_state, + .next_seq = 0, + .borrow_sample = 0, + .borrow_sample_key = 0, + .return_sample = 0, + .sample_iter_init = bwhc_sample_iter_init, + .sample_iter_borrow_next = bwhc_sample_iter_borrow_next, + .downgrade_to_volatile = bwhc_downgrade_to_volatile, + .free = bwhc_free +}; + +struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type) +{ + struct bwhc *whc = os_malloc (sizeof (*whc)); + whc->common.ops = &bwhc_ops; + whc->type = type; + return (struct whc *) whc; +} diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index 73e5f4d..da7982d 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -26,327 +26,259 @@ #include "ddsi/q_entity.h" #include "ddsi/q_radmin.h" -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -dds_return_t -dds_write( - _In_ dds_entity_t writer, - _In_ const void *data) +dds_return_t dds_write (dds_entity_t writer, const void *data) { - dds_return_t ret; - dds__retcode_t rc; - dds_writer *wr; + dds_return_t ret; + dds__retcode_t rc; + dds_writer *wr; - if (data != NULL) { - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - ret = dds_write_impl(wr, data, dds_time(), 0); - dds_writer_unlock(wr); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("No data buffer provided\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } + if (data == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); - return ret; + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + ret = dds_write_impl (wr, data, dds_time (), 0); + dds_writer_unlock (wr); + return ret; } -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -int -dds_writecdr( - dds_entity_t writer, - struct ddsi_serdata *serdata) +dds_return_t dds_writecdr (dds_entity_t writer, struct ddsi_serdata *serdata) { - dds_return_t ret; - dds__retcode_t rc; - dds_writer *wr; - if (serdata != NULL) { - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - ret = dds_writecdr_impl (wr, serdata, dds_time (), 0); - dds_writer_unlock(wr); - } else { - DDS_ERROR("Error occurred on locking writer\n"); - ret = DDS_ERRNO(rc); - } - } else{ - DDS_ERROR("Given cdr has NULL value\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - return ret; + dds_return_t ret; + dds__retcode_t rc; + dds_writer *wr; + + if (serdata == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + ret = dds_writecdr_impl (wr, serdata, dds_time (), 0); + dds_writer_unlock (wr); + return ret; } -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -dds_return_t -dds_write_ts( - _In_ dds_entity_t writer, - _In_ const void *data, - _In_ dds_time_t timestamp) +dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t timestamp) { - dds_return_t ret; - dds__retcode_t rc; - dds_writer *wr; + dds_return_t ret; + dds__retcode_t rc; + dds_writer *wr; - if(data == NULL){ - DDS_ERROR("Argument data has NULL value\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err; - } - if(timestamp < 0){ - DDS_ERROR("Argument timestamp has negative value\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err; - } - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - ret = dds_write_impl(wr, data, timestamp, 0); - dds_writer_unlock(wr); - } else { - DDS_ERROR("Error occurred on locking writer\n"); - ret = DDS_ERRNO(rc); - } -err: - return ret; + if (data == NULL || timestamp < 0) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + ret = dds_write_impl (wr, data, timestamp, 0); + dds_writer_unlock (wr); + return ret; } -static int -deliver_locally( - _In_ struct writer *wr, - _In_ struct ddsi_serdata *payload, - _In_ struct ddsi_tkmap_instance *tk) +static dds_return_t try_store (struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms) { - dds_return_t ret = DDS_RETCODE_OK; - os_mutexLock (&wr->rdary.rdary_lock); - if (wr->rdary.fastpath_ok) { - struct reader ** const rdary = wr->rdary.rdary; - if (rdary[0]) { - struct proxy_writer_info pwr_info; - unsigned i; - make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); - for (i = 0; rdary[i]; i++) { - bool stored; - DDS_TRACE("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)); - dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC; - do { - stored = (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk); - if (!stored) { - if (max_block_ms <= 0) { - DDS_ERROR("The writer could not deliver data on time, probably due to a local reader resources being full\n"); - ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); - } else { - dds_sleepfor(DDS_MSECS(DDS_HEADBANG_TIMEOUT_MS)); - } - /* Decreasing the block time after the sleep, let's us possibly - * wait a bit too long. But that's preferable compared to waiting - * a bit too short. */ - max_block_ms -= DDS_HEADBANG_TIMEOUT_MS; - } - } while ((!stored) && (ret == DDS_RETCODE_OK)); - } - } - os_mutexUnlock (&wr->rdary.rdary_lock); - } else { - /* When deleting, pwr is no longer accessible via the hash - tables, and consequently, a reader may be deleted without - it being possible to remove it from rdary. The primary - reason rdary exists is to avoid locking the proxy writer - but this is less of an issue when we are deleting it, so - we fall back to using the GUIDs so that we can deliver all - samples we received from it. As writer being deleted any - reliable samples that are rejected are simply discarded. */ - ut_avlIter_t it; - struct pwr_rd_match *m; - struct proxy_writer_info pwr_info; - os_mutexUnlock (&wr->rdary.rdary_lock); - make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); - os_mutexLock (&wr->e.lock); - for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) { - struct reader *rd; - if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) { - DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)); - /* Copied the return value ignore from DDSI deliver_user_data() function. */ - (void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); - } - } - os_mutexUnlock (&wr->e.lock); + while (!(ddsi_plugin.rhc_plugin.rhc_store_fn) (rhc, pwr_info, payload, tk)) + { + if (*max_block_ms > 0) + { + dds_sleepfor (DDS_HEADBANG_TIMEOUT); + *max_block_ms -= DDS_HEADBANG_TIMEOUT; } - return ret; + else + { + DDS_ERROR ("The writer could not deliver data on time, probably due to a local reader resources being full\n"); + return DDS_ERRNO (DDS_RETCODE_TIMEOUT); + } + } + return DDS_RETCODE_OK; } -int -dds_write_impl( - _In_ dds_writer *wr, - _In_ const void * data, - _In_ dds_time_t tstamp, - _In_ dds_write_action action) +static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk) { - dds_return_t ret = DDS_RETCODE_OK; - int w_rc; - - assert (wr); - assert (dds_entity_kind(((dds_entity*)wr)->m_hdl) == DDS_KIND_WRITER); - - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); - const bool writekey = action & DDS_WR_KEY_BIT; - dds_writer * writer = (dds_writer*) wr; - struct writer * ddsi_wr = writer->m_wr; - struct ddsi_tkmap_instance * tk; - struct ddsi_serdata *d; - - if (data == NULL) { - DDS_ERROR("No data buffer provided\n"); - return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); + dds_return_t ret = DDS_RETCODE_OK; + os_mutexLock (&wr->rdary.rdary_lock); + if (wr->rdary.fastpath_ok) + { + struct reader ** const rdary = wr->rdary.rdary; + if (rdary[0]) + { + dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time); + struct proxy_writer_info pwr_info; + unsigned i; + make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos); + for (i = 0; rdary[i]; i++) { + DDS_TRACE ("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)); + if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK) + break; + } } - - /* Check for topic filter */ - if (wr->m_topic->filter_fn && ! writekey) { - if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) { - return DDS_RETCODE_OK; - } + os_mutexUnlock (&wr->rdary.rdary_lock); + } + else + { + /* When deleting, pwr is no longer accessible via the hash + tables, and consequently, a reader may be deleted without + it being possible to remove it from rdary. The primary + reason rdary exists is to avoid locking the proxy writer + but this is less of an issue when we are deleting it, so + we fall back to using the GUIDs so that we can deliver all + samples we received from it. As writer being deleted any + reliable samples that are rejected are simply discarded. */ + ut_avlIter_t it; + struct pwr_rd_match *m; + struct proxy_writer_info pwr_info; + dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time); + os_mutexUnlock (&wr->rdary.rdary_lock); + make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos); + os_mutexLock (&wr->e.lock); + for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) + { + struct reader *rd; + if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) + { + DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)); + /* Copied the return value ignore from DDSI deliver_user_data() function. */ + if ((ret = try_store (rd->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK) + break; + } } - - if (asleep) { - thread_state_awake (thr); - } - - /* Serialize and write data or key */ - d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data); - - /* Set if disposing or unregistering */ - d->statusinfo = ((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) | - ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ; - d->timestamp.v = tstamp; - ddsi_serdata_ref(d); - tk = ddsi_tkmap_lookup_instance_ref(d); - w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk); - - if (w_rc >= 0) { - /* Flush out write unless configured to batch */ - if (! config.whc_batch){ - nn_xpack_send (writer->m_xp, false); - } - ret = DDS_RETCODE_OK; - } else if (w_rc == ERR_TIMEOUT) { - DDS_ERROR("The writer could not deliver data on time, probably due to a reader resources being full\n"); - ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); - } else if (w_rc == ERR_INVALID_DATA) { - DDS_ERROR("Invalid data provided\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } else { - DDS_ERROR("Internal error\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } - if (ret == DDS_RETCODE_OK) { - ret = deliver_locally (ddsi_wr, d, tk); - } - ddsi_serdata_unref(d); - ddsi_tkmap_instance_unref(tk); - - if (asleep) { - thread_state_asleep (thr); - } - - return ret; + os_mutexUnlock (&wr->e.lock); + } + return ret; } -int -dds_writecdr_impl( - _In_ dds_writer *wr, - _Inout_ struct ddsi_serdata *d, - _In_ dds_time_t tstamp, - _In_ dds_write_action action) +dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action) { - int ret = DDS_RETCODE_OK; - int w_rc; + struct thread_state1 * const thr = lookup_thread_state (); + const bool asleep = !vtime_awake_p (thr->vtime); + const bool writekey = action & DDS_WR_KEY_BIT; + struct writer *ddsi_wr = wr->m_wr; + struct ddsi_tkmap_instance *tk; + struct ddsi_serdata *d; + dds_return_t ret = DDS_RETCODE_OK; + int w_rc; - assert (wr); + if (data == NULL) + { + DDS_ERROR("No data buffer provided\n"); + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + } - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); - struct writer * ddsi_wr = wr->m_wr; - struct ddsi_tkmap_instance * tk; + /* Check for topic filter */ + if (wr->m_topic->filter_fn && !writekey) + if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) + return DDS_RETCODE_OK; - if (wr->m_topic->filter_fn) { - abort(); - } + if (asleep) + thread_state_awake (thr); - if (asleep) { - thread_state_awake (thr); - } + /* Serialize and write data or key */ + d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data); + d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0); + d->timestamp.v = tstamp; + ddsi_serdata_ref (d); + tk = ddsi_tkmap_lookup_instance_ref (d); + w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk); - /* Set if disposing or unregistering */ - d->statusinfo = - ((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) | - ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ; - d->timestamp.v = tstamp; - ddsi_serdata_ref(d); - tk = ddsi_tkmap_lookup_instance_ref(d); - w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk); - if (w_rc >= 0) { - /* Flush out write unless configured to batch */ - if (! config.whc_batch) { - nn_xpack_send (wr->m_xp, false); - } - ret = DDS_RETCODE_OK; - } else if (w_rc == ERR_TIMEOUT) { - DDS_ERROR("The writer could not deliver data on time, probably due to a reader resources being full\n"); - ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); - } else if (w_rc == ERR_INVALID_DATA) { - DDS_ERROR("Invalid data provided\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } else { - DDS_ERROR("Internal error\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } + if (w_rc >= 0) + { + /* Flush out write unless configured to batch */ + if (!config.whc_batch) + nn_xpack_send (wr->m_xp, false); + ret = DDS_RETCODE_OK; + } else if (w_rc == ERR_TIMEOUT) { + DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n"); + ret = DDS_ERRNO (DDS_RETCODE_TIMEOUT); + } else if (w_rc == ERR_INVALID_DATA) { + DDS_ERROR ("Invalid data provided\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } else { + DDS_ERROR ("Internal error\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } + if (ret == DDS_RETCODE_OK) + ret = deliver_locally (ddsi_wr, d, tk); + ddsi_serdata_unref (d); + ddsi_tkmap_instance_unref (tk); - if (ret == DDS_RETCODE_OK) { - ret = deliver_locally (ddsi_wr, d, tk); - } - ddsi_serdata_unref(d); - ddsi_tkmap_instance_unref(tk); - - if (asleep) { - thread_state_asleep (thr); - } - - return ret; + if (asleep) + thread_state_asleep (thr); + return ret; } -void -dds_write_set_batch( - bool enable) +dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d) { - config.whc_batch = enable ? 1 : 0; + struct thread_state1 * const thr = lookup_thread_state (); + const bool asleep = !vtime_awake_p (thr->vtime); + struct ddsi_tkmap_instance * tk; + int ret = DDS_RETCODE_OK; + int w_rc; + + if (asleep) + thread_state_awake (thr); + + ddsi_serdata_ref (d); + tk = ddsi_tkmap_lookup_instance_ref (d); + w_rc = write_sample_gc (xp, ddsi_wr, d, tk); + if (w_rc >= 0) { + /* Flush out write unless configured to batch */ + if (!config.whc_batch && xp != NULL) + nn_xpack_send (xp, false); + ret = DDS_RETCODE_OK; + } else if (w_rc == ERR_TIMEOUT) { + DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n"); + ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); + } else if (w_rc == ERR_INVALID_DATA) { + DDS_ERROR ("Invalid data provided\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } else { + DDS_ERROR ("Internal error\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } + + if (ret == DDS_RETCODE_OK) + ret = deliver_locally (ddsi_wr, d, tk); + ddsi_serdata_unref (d); + ddsi_tkmap_instance_unref (tk); + + if (asleep) + thread_state_asleep (thr); + + return ret; } -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -void -dds_write_flush( - dds_entity_t writer) +dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action) { - dds__retcode_t rc; - - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); - dds_writer *wr; - - if (asleep) { - thread_state_awake (thr); - } - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - nn_xpack_send (wr->m_xp, true); - dds_writer_unlock(wr); - } else{ - DDS_ERROR("Error occurred on locking writer\n"); - } - - if (asleep) { - thread_state_asleep (thr); - } - - return ; + if (wr->m_topic->filter_fn) + abort (); + /* Set if disposing or unregistering */ + d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0); + d->timestamp.v = tstamp; + return dds_writecdr_impl_lowlevel (wr->m_wr, wr->m_xp, d); +} + +void dds_write_set_batch (bool enable) +{ + config.whc_batch = enable ? 1 : 0; +} + +void dds_write_flush (dds_entity_t writer) +{ + struct thread_state1 * const thr = lookup_thread_state (); + const bool asleep = !vtime_awake_p (thr->vtime); + dds_writer *wr; + dds__retcode_t rc; + + if (asleep) + thread_state_awake (thr); + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + DDS_ERROR ("Error occurred on locking writer\n"); + else + { + nn_xpack_send (wr->m_xp, true); + dds_writer_unlock (wr); + } + + if (asleep) + thread_state_asleep (thr); + return; } diff --git a/src/core/ddsc/tests/builtin_topics.c b/src/core/ddsc/tests/builtin_topics.c index ff24221..2395a8f 100644 --- a/src/core/ddsc/tests/builtin_topics.c +++ b/src/core/ddsc/tests/builtin_topics.c @@ -147,20 +147,21 @@ check_default_qos_of_builtin_entity(dds_entity_t entity) CU_Test(ddsc_builtin_topics, availability_builtin_topics, .init = setup, .fini = teardown) { +/* FIXME: Successful lookup doesn't rhyme with them not being returned when looking at the children of the participant ... */ dds_entity_t topic; topic = dds_find_topic(g_participant, "DCPSParticipant"); - CU_ASSERT_FATAL(topic > 0); - dds_delete(topic); + CU_ASSERT_FATAL(topic < 0); + //dds_delete(topic); topic = dds_find_topic(g_participant, "DCPSTopic"); CU_ASSERT_FATAL(topic < 0); - //TODO CHAM-347: dds_delete(topic); + //dds_delete(topic); topic = dds_find_topic(g_participant, "DCPSSubscription"); - CU_ASSERT_FATAL(topic > 0); - dds_delete(topic); + CU_ASSERT_FATAL(topic < 0); + //dds_delete(topic); topic = dds_find_topic(g_participant, "DCPSPublication"); - CU_ASSERT_FATAL(topic > 0); - dds_delete(topic); + CU_ASSERT_FATAL(topic < 0); + //dds_delete(topic); } CU_Test(ddsc_builtin_topics, read_publication_data, .init = setup, .fini = teardown) diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index 1650a37..0ddb2c3 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -20,10 +20,8 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" ddsi_mcgroup.c ddsi_serdata.c ddsi_serdata_default.c - ddsi_serdata_builtin.c ddsi_sertopic.c ddsi_sertopic_default.c - ddsi_sertopic_builtin.c ddsi_rhc_plugin.c ddsi_iid.c ddsi_tkmap.c @@ -76,7 +74,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi" ddsi_serdata.h ddsi_sertopic.h ddsi_serdata_default.h - ddsi_serdata_builtin.h ddsi_rhc_plugin.h ddsi_iid.h ddsi_tkmap.h diff --git a/src/core/ddsi/include/ddsi/q_config.h b/src/core/ddsi/include/ddsi/q_config.h index 3605d1f..7810ea4 100644 --- a/src/core/ddsi/include/ddsi/q_config.h +++ b/src/core/ddsi/include/ddsi/q_config.h @@ -22,7 +22,6 @@ #include "ddsi/q_xqos.h" #include "ddsi/ddsi_tran.h" #include "ddsi/q_feature_check.h" -#include "ddsi/ddsi_serdata_builtin.h" #include "ddsi/ddsi_rhc_plugin.h" #if defined (__cplusplus) @@ -412,7 +411,10 @@ struct ddsi_plugin { int (*init_fn) (void); void (*fini_fn) (void); - void (*builtin_write) (enum ddsi_sertopic_builtin_type type, const nn_guid_t *guid, nn_wctime_t timestamp, bool alive); + + bool (*builtintopic_is_visible) (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid); + struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid); + void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive); /* Read cache */ struct ddsi_rhc_plugin rhc_plugin; diff --git a/src/core/ddsi/include/ddsi/q_entity.h b/src/core/ddsi/include/ddsi/q_entity.h index 488952a..fbf99bd 100644 --- a/src/core/ddsi/include/ddsi/q_entity.h +++ b/src/core/ddsi/include/ddsi/q_entity.h @@ -127,12 +127,15 @@ struct pwr_rd_match { struct nn_rsample_info; struct nn_rdata; +struct ddsi_tkmap_instance; struct entity_common { enum entity_kind kind; nn_guid_t guid; + nn_wctime_t tupdate; /* timestamp of last update */ char *name; uint64_t iid; + struct ddsi_tkmap_instance *tk; os_mutex lock; bool onlylocal; }; @@ -390,8 +393,11 @@ int is_deleted_participant_guid (const struct nn_guid *guid, unsigned for_what); nn_entityid_t to_entityid (unsigned u); int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid); +int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid); +bool is_local_orphan_endpoint (const struct entity_common *e); int is_writer_entityid (nn_entityid_t id); int is_reader_entityid (nn_entityid_t id); +nn_vendorid_t get_entity_vendorid (const struct entity_common *e); int pp_allocate_entityid (nn_entityid_t *id, unsigned kind, struct participant *pp); void pp_release_entityid(struct participant *pp, nn_entityid_t id); @@ -473,9 +479,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity GUID "ppguid". May return NULL if participant unknown or writer/reader already known. */ -struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg); +struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg); -struct reader * new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void * status_cb_arg); +struct reader *new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg); struct whc_node; struct whc_state; @@ -492,6 +498,12 @@ int delete_writer_nolinger_locked (struct writer *wr); int delete_reader (const struct nn_guid *guid); uint64_t reader_instance_id (const struct nn_guid *guid); +struct local_orphan_writer { + struct writer wr; +}; +struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc); +void delete_local_orphan_writer (struct local_orphan_writer *wr); + /* To create or delete a new proxy participant: "guid" MUST have the pre-defined participant entity id. Unlike delete_participant(), deleting a proxy participant will automatically delete all its diff --git a/src/core/ddsi/include/ddsi/q_gc.h b/src/core/ddsi/include/ddsi/q_gc.h index 3602d93..e46cd66 100644 --- a/src/core/ddsi/include/ddsi/q_gc.h +++ b/src/core/ddsi/include/ddsi/q_gc.h @@ -43,6 +43,7 @@ struct gcreq { }; struct gcreq_queue *gcreq_queue_new (void); +void gcreq_queue_drain (struct gcreq_queue *q); void gcreq_queue_free (struct gcreq_queue *q); struct gcreq *gcreq_new (struct gcreq_queue *gcreq_queue, gcreq_cb_t cb); diff --git a/src/core/ddsi/include/ddsi/q_globals.h b/src/core/ddsi/include/ddsi/q_globals.h index c91a029..7c19652 100644 --- a/src/core/ddsi/include/ddsi/q_globals.h +++ b/src/core/ddsi/include/ddsi/q_globals.h @@ -281,11 +281,6 @@ struct q_globals { struct ddsi_sertopic *plist_topic; /* used for all discovery data */ struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */ - /* Sertopics for built-in topics -- FIXME: these really have little to do with topics, but everything with topic types, in other words, they are the type supports in DDS ... so a bit of refactoring is required */ - struct ddsi_sertopic *builtin_participant_topic; - struct ddsi_sertopic *builtin_reader_topic; - struct ddsi_sertopic *builtin_writer_topic; - /* Network ID needed by v_groupWrite -- FIXME: might as well pass it to the receive thread instead of making it global (and that would remove the need to include kernelModule.h) */ diff --git a/src/core/ddsi/include/ddsi/q_rtps.h b/src/core/ddsi/include/ddsi/q_rtps.h index 78f99da..eea7cf9 100644 --- a/src/core/ddsi/include/ddsi/q_rtps.h +++ b/src/core/ddsi/include/ddsi/q_rtps.h @@ -80,7 +80,8 @@ int rtps_config_prep (struct cfgst *cfgst); int rtps_config_open (void); int rtps_init (void); void ddsi_plugin_init (void); -void rtps_term (void); +void rtps_stop (void); +void rtps_fini (void); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/ddsi/q_whc.h b/src/core/ddsi/include/ddsi/q_whc.h index f5bc549..5f33613 100644 --- a/src/core/ddsi/include/ddsi/q_whc.h +++ b/src/core/ddsi/include/ddsi/q_whc.h @@ -42,7 +42,7 @@ struct whc_state { an iter on the stack without specifying an implementation. If future changes or implementations require more, these can be adjusted. An implementation should check things fit at compile time. */ -#define WHC_SAMPLE_ITER_SIZE (1*sizeof(void *)) +#define WHC_SAMPLE_ITER_SIZE (7 * sizeof(void *)) struct whc_sample_iter_base { struct whc *whc; }; diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 3eca5ff..e9222d6 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -149,39 +149,41 @@ int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid) } } -static int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid) +int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid) { return is_builtin_entityid (id, vendorid) && id.u != NN_ENTITYID_PARTICIPANT; } -static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_vendorid_t vendorid, bool onlylocal, struct ddsi_tkmap_instance **tk) +bool is_local_orphan_endpoint (const struct entity_common *e) +{ + return (e->guid.prefix.u[0] == 0 && e->guid.prefix.u[1] == 0 && e->guid.prefix.u[2] == 0 && + is_builtin_endpoint (e->guid.entityid, ownvendorid)); +} + +static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_wctime_t tcreate, nn_vendorid_t vendorid, bool onlylocal) { e->guid = *guid; e->kind = kind; + e->tupdate = tcreate; e->name = os_strdup (name ? name : ""); e->onlylocal = onlylocal; os_mutexInit (&e->lock); - if (onlylocal || is_builtin_endpoint (guid->entityid, vendorid)) + if (ddsi_plugin.builtintopic_is_visible (guid->entityid, onlylocal, vendorid)) { - e->iid = ddsi_iid_gen (); - *tk = NULL; + e->tk = ddsi_plugin.builtintopic_get_tkmap_entry (guid); + e->iid = e->tk->m_iid; } else { - struct ddsi_serdata *sd; - struct nn_keyhash kh; - memcpy (&kh, guid, sizeof (kh)); - /* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */ - sd = ddsi_serdata_from_keyhash (gv.builtin_participant_topic, &kh); - /* FIXME: this makes the iid for a reincarnation of a proxy entity dependent on whether an application reader kept the corresponding built-in topic instance around, it may be attractive to reconsider and guarantee a new iid in these cases, at least for the publication handle */ - *tk = ddsi_tkmap_find(sd, false, true); - ddsi_serdata_unref (sd); - e->iid = (*tk)->m_iid; + e->tk = NULL; + e->iid = ddsi_iid_gen (); } } static void entity_common_fini (struct entity_common *e) { + if (e->tk) + ddsi_tkmap_instance_unref (e->tk); os_free (e->name); os_mutexDestroy (&e->lock); } @@ -238,38 +240,27 @@ void local_reader_ary_setinvalid (struct local_reader_ary *x) os_mutexUnlock (&x->rdary_lock); } -static void write_builtin_topic_any (const struct entity_common *e, nn_wctime_t timestamp, bool alive, nn_vendorid_t vendorid, struct ddsi_tkmap_instance *tk) +nn_vendorid_t get_entity_vendorid (const struct entity_common *e) { - if (!(e->onlylocal || is_builtin_endpoint(e->guid.entityid, vendorid))) + switch (e->kind) { - /* initialize to avoid gcc warning ultimately caused by C's horrible type system */ - enum ddsi_sertopic_builtin_type type = DSBT_PARTICIPANT; - switch (e->kind) - { - case EK_PARTICIPANT: - case EK_PROXY_PARTICIPANT: - type = DSBT_PARTICIPANT; - break; - case EK_READER: - case EK_PROXY_READER: - type = DSBT_READER; - break; - case EK_WRITER: - case EK_PROXY_WRITER: - type = DSBT_WRITER; - break; - } - assert(type != DSBT_PARTICIPANT || (e->kind == EK_PARTICIPANT || e->kind == EK_PROXY_PARTICIPANT)); - ddsi_plugin.builtin_write (type, &e->guid, timestamp, alive); + case EK_PARTICIPANT: + case EK_READER: + case EK_WRITER: + return (nn_vendorid_t) MY_VENDOR_ID; + break; + case EK_PROXY_PARTICIPANT: + return ((const struct proxy_participant *) e)->vendor; + break; + case EK_PROXY_READER: + return ((const struct proxy_reader *) e)->c.vendor; + break; + case EK_PROXY_WRITER: + return ((const struct proxy_writer *) e)->c.vendor; + break; } - /* tkmap instance only needs to be kept around until the first write of a built-in topic (if none ever happens, it needn't be kept at all): afterward, the WHC of the local built-in topic writer will keep the entry alive. FIXME: the SPDP/SEPD ones currently use default sertopics instead of builtin sertopics, and so use different mappings and different instnace ids. No-one ever sees those ids, so it doesn't matter, but it would nicer if it could actually be the same one. FIXME: it would also be nicer if the local built-in topics and the SPDP/SEDP writers were the same, but I want the locally created endpoints visible in the built-in topics as well, and those don't exist in the discovery writers ... */ - if (tk) - ddsi_tkmap_instance_unref (tk); -} - -static void write_builtin_topic_local (const struct entity_common *e, nn_wctime_t timestamp, bool alive, struct ddsi_tkmap_instance *tk) -{ - write_builtin_topic_any(e, timestamp, alive, ownvendorid, tk); + assert (0); + return (nn_vendorid_t) NN_VENDORID_UNKNOWN; } /* DELETED PARTICIPANTS --------------------------------------------- */ @@ -408,7 +399,6 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis { struct participant *pp; nn_guid_t subguid, group_guid; - struct ddsi_tkmap_instance *tk; /* no reserved bits may be set */ assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0); @@ -451,7 +441,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis pp = os_malloc (sizeof (*pp)); - entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0), &tk); + entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, now (), ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0)); pp->user_refc = 1; pp->builtin_refc = 0; pp->builtins_deleted = 0; @@ -630,7 +620,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis trigger_recv_threads (); } - write_builtin_topic_local(&pp->e, now(), true, tk); + ddsi_plugin.builtintopic_write (&pp->e, now(), true); /* SPDP periodic broadcast uses the retransmit path, so the initial publication must be done differently. Must be later than making @@ -866,7 +856,7 @@ int delete_participant (const struct nn_guid *ppguid) struct participant *pp; if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL) return ERR_UNKNOWN_ENTITY; - write_builtin_topic_local(&pp->e, now(), false, NULL); + ddsi_plugin.builtintopic_write (&pp->e, now(), false); remember_deleted_participant_guid (&pp->e.guid); ephash_remove_participant_guid (pp); gcreq_participant (pp); @@ -2077,7 +2067,7 @@ static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn { int32_t reason; (void)tnow; - if (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid)) + if (!is_local_orphan_endpoint (&wr->e) && (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid))) return; if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0) { @@ -2295,7 +2285,7 @@ static void generic_do_local_match (struct entity_common *e, nn_mtime_t tnow) struct ephash_enum est; struct entity_common *em; enum entity_kind mkind; - if (is_builtin_entityid (e->guid.entityid, ownvendorid)) + if (is_builtin_entityid (e->guid.entityid, ownvendorid) && !is_local_orphan_endpoint (e)) /* never a need for local matches on discovery endpoints */ return; mkind = generic_do_local_match_mkind(e->kind); @@ -2374,34 +2364,27 @@ static void new_reader_writer_common (const struct nn_guid *guid, const struct d topic ? topic->typename : "(null)"); } -static void endpoint_common_init -( - struct entity_common *e, - struct endpoint_common *c, - enum entity_kind kind, - const struct nn_guid *guid, - const struct nn_guid *group_guid, - struct participant *pp, - struct ddsi_tkmap_instance **tk -) +static void endpoint_common_init (struct entity_common *e, struct endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp) { - entity_common_init (e, guid, NULL, kind, ownvendorid, pp->e.onlylocal, tk); + entity_common_init (e, guid, NULL, kind, now (), ownvendorid, pp->e.onlylocal); c->pp = ref_participant (pp, &e->guid); if (group_guid) - { c->group_guid = *group_guid; - } else - { memset (&c->group_guid, 0, sizeof (c->group_guid)); - } } static void endpoint_common_fini (struct entity_common *e, struct endpoint_common *c) { if (!is_builtin_entityid(e->guid.entityid, ownvendorid)) pp_release_entityid(c->pp, e->guid.entityid); - unref_participant (c->pp, &e->guid); + if (c->pp) + unref_participant (c->pp, &e->guid); + else + { + /* only for the (almost pseudo) writers used for generating the built-in topics */ + assert (is_local_orphan_endpoint (e)); + } entity_common_fini (e); } @@ -2608,25 +2591,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru return n; } -static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity) +static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity) { - struct writer *wr; - nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; - - assert (is_writer_entityid (guid->entityid)); - assert (ephash_lookup_writer_guid (guid) == NULL); - assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0); - - new_reader_writer_common (guid, topic, xqos); - wr = os_malloc (sizeof (*wr)); - - /* want a pointer to the participant so that a parallel call to - delete_participant won't interfere with our ability to address - the participant */ - - endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, &tk); - os_condInit (&wr->throttle_cond, &wr->e.lock); wr->seq = 0; wr->cs_seq = 0; @@ -2658,12 +2624,10 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct assert (wr->xqos->aliased == 0); set_topic_type_name (wr->xqos, topic); - if (dds_get_log_mask() & DDS_LC_DISCOVERY) - { - DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid)); - nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos); - DDS_LOG(DDS_LC_DISCOVERY, "}\n"); - } + DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid)); + nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos); + DDS_LOG(DDS_LC_DISCOVERY, "}\n"); + assert (wr->xqos->present & QP_RELIABILITY); wr->reliable = (wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS); assert (wr->xqos->present & QP_DURABILITY); @@ -2818,23 +2782,43 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct ut_avlInit (&wr_local_readers_treedef, &wr->local_readers); local_reader_ary_init (&wr->rdary); +} + +static struct writer *new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity) +{ + struct writer *wr; + nn_mtime_t tnow = now_mt (); + + assert (is_writer_entityid (guid->entityid)); + assert (ephash_lookup_writer_guid (guid) == NULL); + assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0); + + new_reader_writer_common (guid, topic, xqos); + wr = os_malloc (sizeof (*wr)); + + /* want a pointer to the participant so that a parallel call to + delete_participant won't interfere with our ability to address + the participant */ + + endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp); + new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity); /* guid_hash needed for protocol handling, so add it before we send - out our first message. Also: needed for matching, and swapping - the order if hash insert & matching creates a window during which - neither of two endpoints being created in parallel can discover - the other. */ + out our first message. Also: needed for matching, and swapping + the order if hash insert & matching creates a window during which + neither of two endpoints being created in parallel can discover + the other. */ ephash_insert_writer_guid (wr); /* once it exists, match it with proxy writers and broadcast - existence (I don't think it matters much what the order of these - two is, but it seems likely that match-then-broadcast has a - slightly lower likelihood that a response from a proxy reader - gets dropped) -- but note that without adding a lock it might be - deleted while we do so */ + existence (I don't think it matters much what the order of these + two is, but it seems likely that match-then-broadcast has a + slightly lower likelihood that a response from a proxy reader + gets dropped) -- but note that without adding a lock it might be + deleted while we do so */ match_writer_with_proxy_readers (wr, tnow); match_writer_with_local_readers (wr, tnow); - write_builtin_topic_local(&wr->e, now(), true, tk); + ddsi_plugin.builtintopic_write (&wr->e, now(), true); sedp_write_writer (wr); if (wr->lease_duration != T_NEVER) @@ -2846,7 +2830,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct return wr; } -struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg) +struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg) { struct participant *pp; struct writer * wr; @@ -2866,6 +2850,29 @@ struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_ return wr; } +struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc) +{ + nn_guid_t guid; + struct local_orphan_writer *lowr; + struct writer *wr; + nn_mtime_t tnow = now_mt (); + + DDS_LOG(DDS_LC_DISCOVERY, "new_local_orphan_writer(%s/%s)\n", topic->name, topic->typename); + lowr = os_malloc (sizeof (*lowr)); + wr = &lowr->wr; + + memset (&guid.prefix, 0, sizeof (guid.prefix)); + guid.entityid = entityid; + entity_common_init (&wr->e, &guid, NULL, EK_WRITER, now (), ownvendorid, true); + wr->c.pp = NULL; + memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid)); + new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL); + ephash_insert_writer_guid (wr); + match_writer_with_local_readers (wr, tnow); + ddsi_plugin.builtintopic_write (&wr->e, now(), true); + return lowr; +} + static void gc_delete_writer (struct gcreq *gcreq) { struct writer *wr = gcreq->arg; @@ -2960,7 +2967,7 @@ int delete_writer_nolinger_locked (struct writer *wr) { DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid %x:%x:%x:%x) ...\n", PGUID (wr->e.guid)); ASSERT_MUTEX_HELD (&wr->e.lock); - write_builtin_topic_local(&wr->e, now(), false, NULL); + ddsi_plugin.builtintopic_write (&wr->e, now(), false); local_reader_ary_setinvalid (&wr->rdary); ephash_remove_writer_guid (wr); writer_set_state (wr, WRST_DELETING); @@ -2990,6 +2997,13 @@ int delete_writer_nolinger (const struct nn_guid *guid) return 0; } +void delete_local_orphan_writer (struct local_orphan_writer *lowr) +{ + os_mutexLock (&lowr->wr.e.lock); + delete_writer_nolinger_locked (&lowr->wr); + os_mutexUnlock (&lowr->wr.e.lock); +} + int delete_writer (const struct nn_guid *guid) { struct writer *wr; @@ -3177,7 +3191,6 @@ static struct reader * new_reader_guid struct reader * rd; nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; assert (!is_writer_entityid (guid->entityid)); assert (ephash_lookup_reader_guid (guid) == NULL); @@ -3186,7 +3199,7 @@ static struct reader * new_reader_guid new_reader_writer_common (guid, topic, xqos); rd = os_malloc (sizeof (*rd)); - endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, &tk); + endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp); /* Copy QoS, merging in defaults */ rd->xqos = os_malloc (sizeof (*rd->xqos)); @@ -3290,7 +3303,7 @@ static struct reader * new_reader_guid ephash_insert_reader_guid (rd); match_reader_with_proxy_writers (rd, tnow); match_reader_with_local_writers (rd, tnow); - write_builtin_topic_local(&rd->e, now(), true, tk); + ddsi_plugin.builtintopic_write (&rd->e, now(), true); sedp_write_reader (rd); return rd; } @@ -3383,7 +3396,7 @@ int delete_reader (const struct nn_guid *guid) (ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc); } DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid)); - write_builtin_topic_local(&rd->e, now(), false, NULL); + ddsi_plugin.builtintopic_write (&rd->e, now(), false); ephash_remove_reader_guid (rd); gcreq_reader (rd); return 0; @@ -3459,7 +3472,6 @@ void new_proxy_participant runs on a single thread, it can't go wrong. FIXME, maybe? The same holds for the other functions for creating entities. */ struct proxy_participant *proxypp; - struct ddsi_tkmap_instance *tk; assert (ppguid->entityid.u == NN_ENTITYID_PARTICIPANT); assert (ephash_lookup_proxy_participant_guid (ppguid) == NULL); @@ -3469,7 +3481,7 @@ void new_proxy_participant proxypp = os_malloc (sizeof (*proxypp)); - entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, vendor, false, &tk); + entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false); proxypp->refc = 1; proxypp->lease_expired = 0; proxypp->vendor = vendor; @@ -3625,7 +3637,7 @@ void new_proxy_participant if (proxypp->owns_lease) lease_register (os_atomic_ldvoidp (&proxypp->lease)); - write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, tk); + ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true); os_mutexUnlock (&proxypp->e.lock); } @@ -3643,7 +3655,7 @@ int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, co switch (source) { case UPD_PROXYPP_SPDP: - write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, NULL); + ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true); proxypp->proxypp_have_spdp = 1; break; case UPD_PROXYPP_CM: @@ -3892,7 +3904,7 @@ int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t t return ERR_UNKNOWN_ENTITY; } DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); - write_builtin_topic_any(&ppt->e, timestamp, false, ppt->vendor, NULL); + ddsi_plugin.builtintopic_write (&ppt->e, timestamp, false); remember_deleted_participant_guid (&ppt->e.guid); ephash_remove_proxy_participant_guid (ppt); os_mutexUnlock (&gv.lock); @@ -4021,12 +4033,7 @@ void delete_proxy_group (const nn_guid_t *guid, nn_wctime_t timestamp, int isimp /* PROXY-ENDPOINT --------------------------------------------------- */ -static void proxy_endpoint_common_init -( - struct entity_common *e, struct proxy_endpoint_common *c, - enum entity_kind kind, const struct nn_guid *guid, struct proxy_participant *proxypp, - struct addrset *as, const nn_plist_t *plist, struct ddsi_tkmap_instance **tk -) +static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist) { const char *name; @@ -4036,7 +4043,7 @@ static void proxy_endpoint_common_init assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == (QP_TOPIC_NAME | QP_TYPE_NAME)); name = (plist->present & PP_ENTITY_NAME) ? plist->entity_name : ""; - entity_common_init (e, guid, name, kind, proxypp->vendor, false, tk); + entity_common_init (e, guid, name, kind, tcreate, proxypp->vendor, false); c->xqos = nn_xqos_dup (&plist->qos); c->as = ref_addrset (as); c->topic = NULL; /* set from first matching reader/writer */ @@ -4071,8 +4078,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, struct proxy_writer *pwr; int isreliable; nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; - (void)timestamp; + assert (is_writer_entityid (guid->entityid)); assert (ephash_lookup_proxy_writer_guid (guid) == NULL); @@ -4083,7 +4089,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, } pwr = os_malloc (sizeof (*pwr)); - proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, proxypp, as, plist, &tk); + proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, proxypp, as, plist); ut_avlInit (&pwr_readers_treedef, &pwr->readers); pwr->n_reliable_readers = 0; @@ -4149,7 +4155,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, local_reader_ary_init (&pwr->rdary); ephash_insert_proxy_writer_guid (pwr); match_proxy_writer_with_readers (pwr, tnow); - write_builtin_topic_any(&pwr->e, timestamp, true, pwr->c.vendor, tk); + ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true); os_mutexLock (&pwr->e.lock); pwr->local_matching_inprogress = 0; @@ -4265,7 +4271,6 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq) int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit) { struct proxy_writer *pwr; - (void)timestamp; (void)isimplicit; DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_writer (%x:%x:%x:%x) ", PGUID (*guid)); os_mutexLock (&gv.lock); @@ -4281,7 +4286,7 @@ int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int from removing themselves from the proxy writer's rdary[]. */ local_reader_ary_setinvalid (&pwr->rdary); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); - write_builtin_topic_any(&pwr->e, timestamp, false, pwr->c.vendor, NULL); + ddsi_plugin.builtintopic_write (&pwr->e, timestamp, false); ephash_remove_proxy_writer_guid (pwr); os_mutexUnlock (&gv.lock); gcreq_proxy_writer (pwr); @@ -4299,8 +4304,6 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, struct proxy_participant *proxypp; struct proxy_reader *prd; nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; - (void)timestamp; assert (!is_writer_entityid (guid->entityid)); assert (ephash_lookup_proxy_reader_guid (guid) == NULL); @@ -4312,7 +4315,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, } prd = os_malloc (sizeof (*prd)); - proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, proxypp, as, plist, &tk); + proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, proxypp, as, plist); prd->deleting = 0; #ifdef DDSI_INCLUDE_SSM @@ -4326,7 +4329,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, ut_avlInit (&prd_writers_treedef, &prd->writers); ephash_insert_proxy_reader_guid (prd); match_proxy_reader_with_writers (prd, tnow); - write_builtin_topic_any(&prd->e, timestamp, true, prd->c.vendor, tk); + ddsi_plugin.builtintopic_write (&prd->e, timestamp, true); return 0; } @@ -4399,7 +4402,6 @@ static void gc_delete_proxy_reader (struct gcreq *gcreq) int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit) { struct proxy_reader *prd; - (void)timestamp; (void)isimplicit; DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_reader (%x:%x:%x:%x) ", PGUID (*guid)); os_mutexLock (&gv.lock); @@ -4409,7 +4411,7 @@ int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n"); return ERR_UNKNOWN_ENTITY; } - write_builtin_topic_any(&prd->e, timestamp, false, prd->c.vendor, NULL); + ddsi_plugin.builtintopic_write (&prd->e, timestamp, false); ephash_remove_proxy_reader_guid (prd); os_mutexUnlock (&gv.lock); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); diff --git a/src/core/ddsi/src/q_gc.c b/src/core/ddsi/src/q_gc.c index fdec90e..4ea9d4f 100644 --- a/src/core/ddsi/src/q_gc.c +++ b/src/core/ddsi/src/q_gc.c @@ -180,6 +180,14 @@ struct gcreq_queue *gcreq_queue_new (void) return q; } +void gcreq_queue_drain (struct gcreq_queue *q) +{ + os_mutexLock (&q->lock); + while (q->count != 0) + os_condWait (&q->cond, &q->lock); + os_mutexUnlock (&q->lock); +} + void gcreq_queue_free (struct gcreq_queue *q) { struct gcreq *gcreq; @@ -191,7 +199,8 @@ void gcreq_queue_free (struct gcreq_queue *q) os_mutexLock (&q->lock); q->terminate = 1; /* Wait until there is only request in existence, the one we just - allocated. Then we know the gc system is quiet. */ + allocated (this is also why we can't use "drain" here). Then + we know the gc system is quiet. */ while (q->count != 1) os_condWait (&q->cond, &q->lock); os_mutexUnlock (&q->lock); @@ -227,7 +236,7 @@ void gcreq_free (struct gcreq *gcreq) struct gcreq_queue *gcreq_queue = gcreq->queue; os_mutexLock (&gcreq_queue->lock); --gcreq_queue->count; - if (gcreq_queue->terminate && gcreq_queue->count <= 1) + if (gcreq_queue->count <= 1) os_condBroadcast (&gcreq_queue->cond); os_mutexUnlock (&gcreq_queue->lock); os_free (gcreq); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 3d2b84c..c0758bd 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -54,7 +54,6 @@ #include "ddsi/ddsi_raweth.h" #include "ddsi/ddsi_mcgroup.h" #include "ddsi/ddsi_serdata_default.h" -#include "ddsi/ddsi_serdata_builtin.h" #include "ddsi/ddsi_tkmap.h" #include "dds__whc.h" @@ -772,18 +771,12 @@ static void make_special_topics (void) { gv.plist_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist); gv.rawcdr_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr); - gv.builtin_participant_topic = new_sertopic_builtin (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant"); - gv.builtin_reader_topic = new_sertopic_builtin (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription"); - gv.builtin_writer_topic = new_sertopic_builtin (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication"); } static void free_special_topics (void) { ddsi_sertopic_unref (gv.plist_topic); ddsi_sertopic_unref (gv.rawcdr_topic); - ddsi_sertopic_unref (gv.builtin_participant_topic); - ddsi_sertopic_unref (gv.builtin_reader_topic); - ddsi_sertopic_unref (gv.builtin_writer_topic); } static int setup_and_start_recv_threads (void) @@ -1408,7 +1401,7 @@ static void builtins_dqueue_ready_cb (void *varg) os_mutexUnlock (&arg->lock); } -void rtps_term (void) +void rtps_stop (void) { struct thread_state1 *self = lookup_thread_state (); #ifdef DDSI_INCLUDE_NETWORK_CHANNELS @@ -1527,17 +1520,26 @@ void rtps_term (void) } /* Wait until all participants are really gone => by then we can be - certain that no new GC requests will be added */ + certain that no new GC requests will be added, short of what we + do here */ os_mutexLock (&gv.participant_set_lock); while (gv.nparticipants > 0) os_condWait (&gv.participant_set_cond, &gv.participant_set_lock); os_mutexUnlock (&gv.participant_set_lock); + /* Wait until no more GC requests are outstanding -- not really + necessary, but it allows us to claim the stack is quiescent + at this point */ + gcreq_queue_drain (gv.gcreq_queue); + /* Clean up privileged_pp -- it must be NULL now (all participants are gone), but the lock still needs to be destroyed */ assert (gv.privileged_pp == NULL); os_mutexDestroy (&gv.privileged_pp_lock); +} +void rtps_fini (void) +{ /* Shut down the GC system -- no new requests will be added */ gcreq_queue_free (gv.gcreq_queue); diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index d763356..28ebdc9 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -893,7 +893,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist res = 1; #ifndef NDEBUG - if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) + if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER && !is_local_orphan_endpoint (&wr->e)) { struct whc_state whcst; whc_get_state(wr->whc, &whcst); From de3d3cc8cdafc6472f037b29390fe1fb7b444904 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Tue, 8 Jan 2019 11:23:19 +0100 Subject: [PATCH 5/7] fix formatting error for durations in ddsls Signed-off-by: Erik Boasson --- src/tools/ddsls/ddsls.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tools/ddsls/ddsls.c b/src/tools/ddsls/ddsls.c index 40cb1a3..d6accb8 100644 --- a/src/tools/ddsls/ddsls.c +++ b/src/tools/ddsls/ddsls.c @@ -192,22 +192,22 @@ void qp_duration_qos (const dds_qos_t *q, FILE *fp, const char *what, bool (*qge void qp_lifespan (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "lifespan: duration = ", dds_qget_lifespan); + qp_duration_qos (q, fp, "lifespan: duration", dds_qget_lifespan); } void qp_deadline (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "deadline: period = ", dds_qget_deadline); + qp_duration_qos (q, fp, "deadline: period", dds_qget_deadline); } void qp_latency_budget (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "latency_budget: duration = ", dds_qget_latency_budget); + qp_duration_qos (q, fp, "latency_budget: duration", dds_qget_latency_budget); } void qp_time_based_filter (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "time_based_filter: minimum_separation = ", dds_qget_time_based_filter); + qp_duration_qos (q, fp, "time_based_filter: minimum_separation", dds_qget_time_based_filter); } void qp_ownership (const dds_qos_t *q, FILE *fp) From d659b416f2a0d493af57f9d66761064665bc2eed Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 9 Jan 2019 11:15:11 +0100 Subject: [PATCH 6/7] undo breakage of OpenIndiana (Solaris) build Signed-off-by: Erik Boasson --- src/CMakeLists.txt | 6 +----- src/core/ddsi/src/ddsi_udp.c | 10 ++++++++++ src/os/CMakeLists.txt | 2 +- src/os/include/os/posix/os_platform_socket.h | 2 -- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e6b23df..6fcb572 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,11 +53,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "VxWorks") endif() if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro") - add_definitions(-m64) - add_definitions(-xc99) - add_definitions(-D__restrict=restrict) - set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -m64") - set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -m64") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m64 -xc99 -D__restrict=restrict") endif() # Conan diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index b6cb2f5..da5ceac 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -64,8 +64,13 @@ static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, s msghdr.msg_namelen = srclen; msghdr.msg_iov = &msg_iov; msghdr.msg_iovlen = 1; +#if !defined(__sun) || defined(_XPG4_2) msghdr.msg_control = NULL; msghdr.msg_controllen = 0; +#else + msghdr.msg_accrights = NULL; + msghdr.msg_accrightslen = 0; +#endif do { ret = recvmsg(((ddsi_udp_conn_t) conn)->m_sock, &msghdr, 0); @@ -117,8 +122,13 @@ static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *d set_msghdr_iov (&msg, (os_iovec_t *) iov, niov); msg.msg_name = &dstaddr; msg.msg_namelen = (socklen_t) os_sockaddr_get_size((os_sockaddr *) &dstaddr); +#if !defined(__sun) || defined(_XPG4_2) msg.msg_control = NULL; msg.msg_controllen = 0; +#else + msg.msg_accrights = NULL; + msg.msg_accrightslen = 0; +#endif #if SYSDEPS_MSGHDR_FLAGS msg.msg_flags = (int) flags; #else diff --git a/src/os/CMakeLists.txt b/src/os/CMakeLists.txt index de0d256..e95ac92 100644 --- a/src/os/CMakeLists.txt +++ b/src/os/CMakeLists.txt @@ -95,7 +95,7 @@ endif() if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro") target_link_libraries(OSAPI INTERFACE -lsocket -lnsl) - target_compile_definitions(OSAPI PRIVATE -KPIC) + add_definitions(-KPIC) endif() # Determine if platform is big or little endian. diff --git a/src/os/include/os/posix/os_platform_socket.h b/src/os/include/os/posix/os_platform_socket.h index 7dadd5d..d16dfd9 100644 --- a/src/os/include/os/posix/os_platform_socket.h +++ b/src/os/include/os/posix/os_platform_socket.h @@ -74,8 +74,6 @@ extern "C" { typedef size_t os_iov_len_t; #if defined(__sun) && !defined(_XPG4_2) -#define msg_accrights msg_control -#define msg_accrightslen msg_controllen #define OS_MSGHDR_FLAGS 0 #else #define OS_MSGHDR_FLAGS 1 From 8ab1cd8987fbdc2b84407ea0622f0542e77f7d1a Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 14 Jan 2019 11:45:31 +0100 Subject: [PATCH 7/7] remove bogus mutex_init_stress test Signed-off-by: Erik Boasson --- src/os/tests/mutex.c | 66 -------------------------------------------- 1 file changed, 66 deletions(-) diff --git a/src/os/tests/mutex.c b/src/os/tests/mutex.c index f33a313..6a03be1 100644 --- a/src/os/tests/mutex.c +++ b/src/os/tests/mutex.c @@ -190,72 +190,6 @@ CU_Test(os_mutex, basic) printf("Ending os_mutex_basic\n"); } -#define RUNTIME_SEC (4) -#define NUM_THREADS (8) -#define OS_STRESS_STOP (0) -#define OS_STRESS_GO (1) -#define THREAD_NAME_LEN (8) - -struct os_mutex_stress { - os_threadId tid; - os_mutex m; - os_atomic_uint32_t * flag; - char name[THREAD_NAME_LEN]; -}; - -static uint32_t -os_mutex_init_thr( - void *args) -{ - struct os_mutex_stress *state = (struct os_mutex_stress *)args; - os_result r; - uint32_t iterations = 0; - - do { - os_mutexInit(&state->m); - r = os_mutexLock_s(&state->m); /* Use the mutex to check that all is OK. */ - CU_ASSERT_EQUAL(r, os_resultSuccess); /* Failure can't be forced. */ - os_mutexUnlock(&state->m); - os_mutexDestroy(&state->m); - iterations++; - } while ( os_atomic_ld32(state->flag) != OS_STRESS_STOP && r == os_resultSuccess); - - printf("%s <%"PRIxMAX">: Performed %u iterations. Stopping now.\n", state->name, os_threadIdToInteger(os_threadIdSelf()), iterations); - return r != os_resultSuccess; /* Return true on faulure */ -} - -CU_Test(os_mutex, init_stress) -{ - struct os_mutex_stress threads[NUM_THREADS]; - os_threadAttr tattr; - unsigned i; - os_atomic_uint32_t flag = OS_ATOMIC_UINT32_INIT(OS_STRESS_GO); - os_time runtime = { .tv_sec = RUNTIME_SEC, .tv_nsec = 0 }; - - printf("Starting os_mutex_init_stress\n"); - - os_threadAttrInit(&tattr); - for ( i = 0; i < NUM_THREADS; i++ ) { - (void) snprintf(&threads[i].name[0], THREAD_NAME_LEN, "thr%u", i); - threads[i].flag = &flag; - os_threadCreate(&threads[i].tid, threads[i].name, &tattr, &os_mutex_init_thr, &threads[i]); - printf("main <%"PRIxMAX">: Started thread '%s' with thread-id %" PRIxMAX "\n", os_threadIdToInteger(os_threadIdSelf()), threads[i].name, os_threadIdToInteger(threads[i].tid)); - } - - printf("main <%"PRIxMAX">: Test will run for ~%ds with %d threads\n", os_threadIdToInteger(os_threadIdSelf()), RUNTIME_SEC, NUM_THREADS); - os_nanoSleep(runtime); - os_atomic_st32(&flag, OS_STRESS_STOP); - - for ( ; i != 0; i-- ) { - uint32_t thread_failed; - os_threadWaitExit(threads[i - 1].tid, &thread_failed); - printf("main <%"PRIxMAX">: Thread %s <%" PRIxMAX "> stopped with result %s.\n", os_threadIdToInteger(os_threadIdSelf()), threads[i - 1].name, os_threadIdToInteger(threads[i - 1].tid), thread_failed ? "FAILED" : "PASS"); - - CU_ASSERT_FALSE(thread_failed); - } - printf("Ending os_mutex_init_stress\n"); -} - CU_Test(os_mutex, lock, false) { /* Test critical section access with locking and PRIVATE scope */