diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e6b23df..6fcb572 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,11 +53,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "VxWorks") endif() if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro") - add_definitions(-m64) - add_definitions(-xc99) - add_definitions(-D__restrict=restrict) - set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -m64") - set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -m64") + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m64 -xc99 -D__restrict=restrict") endif() # Conan diff --git a/src/core/ddsc/CMakeLists.txt b/src/core/ddsc/CMakeLists.txt index f9c5ff4..919c2c7 100644 --- a/src/core/ddsc/CMakeLists.txt +++ b/src/core/ddsc/CMakeLists.txt @@ -37,6 +37,9 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src" dds_subscriber.c dds_write.c dds_whc.c + dds_whc_builtintopic.c + dds_serdata_builtintopic.c + dds_sertopic_builtintopic.c ) PREPEND(hdrs_public_ddsc "$$" @@ -73,6 +76,8 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src" dds__write.h dds__writer.h dds__whc.h + dds__whc_builtintopic.h + dds__serdata_builtintopic.h ) configure_file( diff --git a/src/core/ddsc/include/ddsc/dds.h b/src/core/ddsc/include/ddsc/dds.h index ec57ce8..65ebead 100644 --- a/src/core/ddsc/include/ddsc/dds.h +++ b/src/core/ddsc/include/ddsc/dds.h @@ -1777,7 +1777,7 @@ dds_write_flush( * @returns A dds_return_t indicating success or failure. */ _Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -DDS_EXPORT int +DDS_EXPORT dds_return_t dds_writecdr( dds_entity_t writer, struct ddsi_serdata *serdata); diff --git a/src/core/ddsc/src/dds__builtin.h b/src/core/ddsc/src/dds__builtin.h index 3601d67..38c7d11 100644 --- a/src/core/ddsc/src/dds__builtin.h +++ b/src/core/ddsc/src/dds__builtin.h @@ -13,49 +13,31 @@ #define _DDS_BUILTIN_H_ #include "ddsi/q_time.h" -#include "ddsi/ddsi_serdata_builtin.h" - #if defined (__cplusplus) extern "C" { #endif - /* Get actual topic in related participant related to topic 'id'. */ -_Must_inspect_result_ dds_entity_t -dds__get_builtin_topic( - _In_ dds_entity_t e, - _In_ dds_entity_t topic); - -/* Global publisher singleton (publishes only locally). */ -_Must_inspect_result_ dds_entity_t -dds__get_builtin_publisher( - void); +dds_entity_t dds__get_builtin_topic ( dds_entity_t e, dds_entity_t topic); /* Subscriber singleton within related participant. */ -_Must_inspect_result_ dds_entity_t -dds__get_builtin_subscriber( - _In_ dds_entity_t e); +dds_entity_t dds__get_builtin_subscriber(dds_entity_t e); /* Checks whether the reader QoS is valid for use with built-in topic TOPIC */ bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos); -/* Initialization and finalize functions. */ -void -dds__builtin_init( - void); +struct entity_common; +struct nn_guid; +struct ddsi_tkmap_instance; -void -dds__builtin_fini( - void); - -void -dds__builtin_write( - _In_ enum ddsi_sertopic_builtin_type type, - _In_ const nn_guid_t *guid, - _In_ nn_wctime_t timestamp, - _In_ bool alive); +void dds__builtin_init (void); +void dds__builtin_fini (void); +bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid); +struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid); +struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive); +void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/ddsi/ddsi_serdata_builtin.h b/src/core/ddsc/src/dds__serdata_builtintopic.h similarity index 57% rename from src/core/ddsi/include/ddsi/ddsi_serdata_builtin.h rename to src/core/ddsc/src/dds__serdata_builtintopic.h index 68a450d..ff0771d 100644 --- a/src/core/ddsi/include/ddsi/ddsi_serdata_builtin.h +++ b/src/core/ddsc/src/dds__serdata_builtintopic.h @@ -9,35 +9,33 @@ * * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause */ -#ifndef DDSI_SERDATA_BUILTIN_H -#define DDSI_SERDATA_BUILTIN_H +#ifndef DDSI_SERDATA_BUILTINTOPIC_H +#define DDSI_SERDATA_BUILTINTOPIC_H -#include "os/os.h" -#include "util/ut_avl.h" +#include "ddsi/q_xqos.h" #include "ddsi/ddsi_serdata.h" #include "ddsi/ddsi_sertopic.h" -#include "ddsi/q_xqos.h" -struct ddsi_serdata_builtin { +struct ddsi_serdata_builtintopic { struct ddsi_serdata c; nn_guid_t key; nn_xqos_t xqos; }; -enum ddsi_sertopic_builtin_type { +enum ddsi_sertopic_builtintopic_type { DSBT_PARTICIPANT, DSBT_READER, DSBT_WRITER }; -struct ddsi_sertopic_builtin { +struct ddsi_sertopic_builtintopic { struct ddsi_sertopic c; - enum ddsi_sertopic_builtin_type type; + enum ddsi_sertopic_builtintopic_type type; }; -extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin; -extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtin; +extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic; +extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic; -struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename); +struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename); #endif diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index ab6edb0..78b4665 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -52,7 +52,7 @@ struct rhc; * Obviously, it is encouraged to use condition variables and such. But * sometimes it wouldn't make that much of a difference and taking the * easy route is somewhat pragmatic. */ -#define DDS_HEADBANG_TIMEOUT_MS (10) +#define DDS_HEADBANG_TIMEOUT (DDS_MSECS (10)) typedef bool (*dds_querycondition_filter_with_ctx_fn) (const void * sample, const void *ctx); diff --git a/src/core/ddsc/src/dds__whc_builtintopic.h b/src/core/ddsc/src/dds__whc_builtintopic.h new file mode 100644 index 0000000..6c38157 --- /dev/null +++ b/src/core/ddsc/src/dds__whc_builtintopic.h @@ -0,0 +1,28 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDS_WHC_BUILTINTOPIC_H +#define DDS_WHC_BUILTINTOPIC_H + +#include "ddsi/q_whc.h" +#include "dds__serdata_builtintopic.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type); + +#if defined (__cplusplus) +} +#endif + +#endif /* Q_WHC_H */ diff --git a/src/core/ddsc/src/dds__write.h b/src/core/ddsc/src/dds__write.h index e78d65e..134b950 100644 --- a/src/core/ddsc/src/dds__write.h +++ b/src/core/ddsc/src/dds__write.h @@ -22,28 +22,16 @@ extern "C" { struct ddsi_serdata; -typedef enum -{ +typedef enum { DDS_WR_ACTION_WRITE = 0, DDS_WR_ACTION_WRITE_DISPOSE = DDS_WR_DISPOSE_BIT, DDS_WR_ACTION_DISPOSE = DDS_WR_KEY_BIT | DDS_WR_DISPOSE_BIT, DDS_WR_ACTION_UNREGISTER = DDS_WR_KEY_BIT | DDS_WR_UNREGISTER_BIT -} -dds_write_action; +} dds_write_action; -int -dds_write_impl( - _In_ dds_writer *wr, - _In_ const void *data, - _In_ dds_time_t tstamp, - _In_ dds_write_action action); - -int -dds_writecdr_impl( - _In_ dds_writer *wr, - _Inout_ struct ddsi_serdata *d, - _In_ dds_time_t tstamp, - _In_ dds_write_action action); +dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp, dds_write_action action); +dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action); +dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index db0cd90..bff293b 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -24,418 +24,216 @@ #include "dds__subscriber.h" #include "dds__write.h" #include "dds__writer.h" +#include "dds__whc_builtintopic.h" +#include "dds__serdata_builtintopic.h" #include "ddsi/q_qosmatch.h" -#include "ddsi/ddsi_serdata_builtin.h" +#include "ddsi/ddsi_tkmap.h" -static dds_return_t -dds__delete_builtin_participant( - dds_entity *e); +static struct ddsi_sertopic *builtin_participant_topic; +static struct ddsi_sertopic *builtin_reader_topic; +static struct ddsi_sertopic *builtin_writer_topic; +static struct local_orphan_writer *builtintopic_writer_participant; +static struct local_orphan_writer *builtintopic_writer_publications; +static struct local_orphan_writer *builtintopic_writer_subscriptions; -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_participant( - void); - -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_publisher( - _In_ dds_entity_t participant); - -static dds_entity_t -dds__create_builtin_writer( - _In_ dds_entity_t topic); - -static _Must_inspect_result_ dds_entity_t -dds__get_builtin_participant( - void); - - - - -static os_mutex g_builtin_mutex; -static os_atomic_uint32_t m_call_count = OS_ATOMIC_UINT32_INIT(0); - -/* Singletons are used to publish builtin data locally. */ -static dds_entity_t g_builtin_local_participant = 0; -static dds_entity_t g_builtin_local_publisher = 0; -static dds_entity_t g_builtin_local_writers[] = { - 0, /* index DDS_BUILTIN_TOPIC_DCPSPARTICIPANT - DDS_KIND_INTERNAL - 1 */ - 0, /* index DDS_BUILTIN_TOPIC_DCPSTOPIC - DDS_KIND_INTERNAL - 1 */ - 0, /* index DDS_BUILTIN_TOPIC_DCPSPUBLICATION - DDS_KIND_INTERNAL - 1 */ - 0, /* index DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION - DDS_KIND_INTERNAL - 1 */ -}; - -static _Must_inspect_result_ dds_qos_t * -dds__create_builtin_qos( - void) +static dds_qos_t *dds__create_builtin_qos (void) { - const char *partition = "__BUILT-IN PARTITION__"; - dds_qos_t *qos = dds_create_qos(); - dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL); - dds_qset_presentation(qos, DDS_PRESENTATION_TOPIC, false, false); - dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(100)); - dds_qset_partition(qos, 1, &partition); - return qos; + const char *partition = "__BUILT-IN PARTITION__"; + dds_qos_t *qos = dds_create_qos (); + dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_presentation (qos, DDS_PRESENTATION_TOPIC, false, false); + dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(100)); + dds_qset_partition (qos, 1, &partition); + return qos; } -static dds_return_t -dds__delete_builtin_participant( - dds_entity *e) +void dds__builtin_init (void) { - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); + dds_qos_t *qos = dds__create_builtin_qos (); - assert(e); - assert(thr); - assert(dds_entity_kind(e->m_hdl) == DDS_KIND_PARTICIPANT); + builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant"); + builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription"); + builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication"); - if (asleep) { - thread_state_awake(thr); - } - dds_domain_free(e->m_domain); - if (asleep) { - thread_state_asleep(thr); - } + builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT)); + builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER)); + builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER)); - return DDS_RETCODE_OK; + dds_delete_qos (qos); } -/* - * We don't use the 'normal' create participant. - * - * This way, the application is not able to access the local builtin writers. - * Also, we can indicate that it should be a 'local only' participant, which - * means that none of the entities under the hierarchy of this participant will - * be exposed to the outside world. This is what we want, because these builtin - * writers are only applicable to local user readers. - */ -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_participant( - void) +void dds__builtin_fini (void) { - int q_rc; - nn_plist_t plist; - struct thread_state1 * thr; - bool asleep; - nn_guid_t guid; - dds_entity_t participant; - dds_participant *pp; + /* No more sources for builtin topic samples */ + struct thread_state1 * const self = lookup_thread_state (); + thread_state_awake (self); + delete_local_orphan_writer (builtintopic_writer_participant); + delete_local_orphan_writer (builtintopic_writer_publications); + delete_local_orphan_writer (builtintopic_writer_subscriptions); + thread_state_asleep (self); - nn_plist_init_empty (&plist); - - thr = lookup_thread_state (); - asleep = !vtime_awake_p (thr->vtime); - if (asleep) { - thread_state_awake (thr); - } - q_rc = new_participant (&guid, RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_ONLY_LOCAL, &plist); - if (asleep) { - thread_state_asleep (thr); - } - - if (q_rc != 0) { - DDS_ERROR("Internal builtin error\n"); - participant = DDS_ERRNO(DDS_RETCODE_ERROR); - goto fail; - } - - pp = dds_alloc (sizeof (*pp)); - participant = dds_entity_init (&pp->m_entity, NULL, DDS_KIND_PARTICIPANT, NULL, NULL, 0); - if (participant < 0) { - goto fail; - } - - pp->m_entity.m_guid = guid; - pp->m_entity.m_domain = dds_domain_create (config.domainId.value); - pp->m_entity.m_domainid = config.domainId.value; - pp->m_entity.m_deriver.delete = dds__delete_builtin_participant; - -fail: - return participant; + ddsi_sertopic_unref (builtin_participant_topic); + ddsi_sertopic_unref (builtin_reader_topic); + ddsi_sertopic_unref (builtin_writer_topic); } -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_publisher( - _In_ dds_entity_t participant) +dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic) { - dds_qos_t *qos = dds__create_builtin_qos(); - dds_entity_t pub = dds_create_publisher(participant, qos, NULL); - dds_delete_qos(qos); - return pub; + dds_entity_t pp; + dds_entity_t tp; + + if ((pp = dds_get_participant (e)) <= 0) + return pp; + + struct ddsi_sertopic *sertopic; + if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { + sertopic = builtin_participant_topic; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { + sertopic = builtin_writer_topic; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { + sertopic = builtin_reader_topic; + } else { + assert (0); + return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); + } + + dds_qos_t *qos = dds__create_builtin_qos (); + tp = dds_create_topic_arbitrary (pp, sertopic, sertopic->name, qos, NULL, NULL); + dds_delete_qos (qos); + return tp; } -static _Must_inspect_result_ dds_entity_t -dds__create_builtin_subscriber( - _In_ dds_entity *participant) +static bool qos_has_resource_limits (const dds_qos_t *qos) { - dds_qos_t *qos = dds__create_builtin_qos(); - dds_entity_t sub = dds__create_subscriber_l(participant, qos, NULL); - dds_delete_qos(qos); - return sub; + return (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED || + qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED || + qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED); } -static dds_entity_t -dds__create_builtin_writer( - _In_ dds_entity_t topic) +bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos) { - dds_entity_t wr; - dds_entity_t pub = dds__get_builtin_publisher(); - if (pub > 0) { - dds_entity_t top = dds__get_builtin_topic(pub, topic); - if (top > 0) { - wr = dds_create_writer(pub, top, NULL, NULL); - (void)dds_delete(top); - } else { - wr = top; - } + if (qos == NULL) + /* default QoS inherited from topic is ok by definition */ + return true; + else + { + /* failing writes on built-in topics are unwelcome complications, so we simply forbid the creation of + a reader matching a built-in topics writer that has resource limits */ + struct local_orphan_writer *bwr; + if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { + bwr = builtintopic_writer_participant; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { + bwr = builtintopic_writer_publications; + } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { + bwr = builtintopic_writer_subscriptions; } else { - wr = pub; + assert (0); + return false; } - return wr; + return qos_match_p (qos, bwr->wr.xqos) && !qos_has_resource_limits (qos); + } } - -static _Must_inspect_result_ dds_entity_t -dds__get_builtin_participant( - void) +static dds_entity_t dds__create_builtin_subscriber (dds_entity *participant) { - if (g_builtin_local_participant == 0) { - g_builtin_local_participant = dds__create_builtin_participant(); - (void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPARTICIPANT); - (void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPUBLICATION); - (void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION); - } - return g_builtin_local_participant; + dds_qos_t *qos = dds__create_builtin_qos (); + dds_entity_t sub = dds__create_subscriber_l (participant, qos, NULL); + dds_delete_qos (qos); + return sub; } - -_Must_inspect_result_ dds_entity_t -dds__get_builtin_publisher( - void) +dds_entity_t dds__get_builtin_subscriber (dds_entity_t e) { - if (g_builtin_local_publisher == 0) { - dds_entity_t par = dds__get_builtin_participant(); - if (par > 0) { - g_builtin_local_publisher = dds__create_builtin_publisher(par); - } - } - return g_builtin_local_publisher; -} + dds_entity_t sub; + dds_return_t ret; + dds_entity_t pp; + dds_participant *p; + dds_entity *part_entity; -_Must_inspect_result_ dds_entity_t -dds__get_builtin_subscriber( - _In_ dds_entity_t e) -{ - dds_entity_t sub; - dds_return_t ret; - dds_entity_t participant; - dds_participant *p; - dds_entity *part_entity; - - participant = dds_get_participant(e); - if (participant <= 0) { - /* error already in participant error; no need to repeat error */ - ret = participant; - goto error; - } - ret = dds_entity_lock(participant, DDS_KIND_PARTICIPANT, (dds_entity **)&part_entity); - if (ret != DDS_RETCODE_OK) { - goto error; - } - p = (dds_participant *)part_entity; - if(p->m_builtin_subscriber <= 0) { - p->m_builtin_subscriber = dds__create_builtin_subscriber(part_entity); - } - sub = p->m_builtin_subscriber; - dds_entity_unlock(part_entity); - - return sub; - - /* Error handling */ -error: - assert(ret < 0); + if ((pp = dds_get_participant (e)) <= 0) + return pp; + if ((ret = dds_entity_lock (pp, DDS_KIND_PARTICIPANT, &part_entity)) < 0) return ret; + + p = (dds_participant *) part_entity; + if (p->m_builtin_subscriber <= 0) { + p->m_builtin_subscriber = dds__create_builtin_subscriber (part_entity); + } + sub = p->m_builtin_subscriber; + dds_entity_unlock(part_entity); + return sub; } - -_Must_inspect_result_ dds_entity_t -dds__get_builtin_topic( - _In_ dds_entity_t e, - _In_ dds_entity_t topic) +bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid) { - dds_entity_t participant; - dds_entity_t ret; - - participant = dds_get_participant(e); - if (participant > 0) { - struct ddsi_sertopic *sertopic; - - if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { - sertopic = gv.builtin_participant_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { - sertopic = gv.builtin_writer_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { - sertopic = gv.builtin_reader_topic; - } else { - DDS_ERROR("Invalid builtin-topic handle(%d)\n", topic); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err_invalid_topic; - } - - ret = dds_find_topic (participant, sertopic->name); - if (ret < 0 && dds_err_nr(ret) == DDS_RETCODE_PRECONDITION_NOT_MET) { - dds_qos_t *qos = dds__create_builtin_qos(); - ret = dds_create_topic_arbitrary(participant, sertopic, sertopic->name, qos, NULL, NULL); - dds_delete_qos(qos); - } - } else { - /* Failed to get participant of provided entity */ - ret = participant; - } - -err_invalid_topic: - return ret; + return !(onlylocal || is_builtin_endpoint (entityid, vendorid)); } - -static _Must_inspect_result_ dds_entity_t -dds__get_builtin_writer( - _In_ dds_entity_t topic) +struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid) { - dds_entity_t wr; - if ((topic >= DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) && (topic <= DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION)) { - int index = (int)(topic - DDS_KIND_INTERNAL - 1); - os_mutexLock(&g_builtin_mutex); - wr = g_builtin_local_writers[index]; - if (wr == 0) { - wr = dds__create_builtin_writer(topic); - if (wr > 0) { - g_builtin_local_writers[index] = wr; - } - } - os_mutexUnlock(&g_builtin_mutex); - } else { - DDS_ERROR("Given topic is not a builtin topic\n"); - wr = DDS_ERRNO(DDS_RETCODE_ERROR); - } - return wr; + struct ddsi_tkmap_instance *tk; + struct ddsi_serdata *sd; + struct nn_keyhash kh; + memcpy (&kh, guid, sizeof (kh)); + /* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */ + sd = ddsi_serdata_from_keyhash (builtin_participant_topic, &kh); + tk = ddsi_tkmap_find (sd, false, true); + ddsi_serdata_unref (sd); + return tk; } -static dds_return_t -dds__builtin_write_int( - _In_ dds_entity_t topic, - _In_ const nn_guid_t *guid, - _In_ dds_time_t timestamp, - _In_ bool alive) +struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive) { - dds_return_t ret = DDS_RETCODE_OK; - if (os_atomic_inc32_nv(&m_call_count) > 1) { - dds_entity_t wr; - wr = dds__get_builtin_writer(topic); - if (wr > 0) { - struct ddsi_sertopic *sertopic; - struct ddsi_serdata *serdata; - struct nn_keyhash keyhash; - struct dds_writer *wraddr; - - if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) { - sertopic = gv.builtin_participant_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) { - sertopic = gv.builtin_writer_topic; - } else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) { - sertopic = gv.builtin_reader_topic; - } else { - sertopic = NULL; - assert (0); - } - - memcpy (&keyhash, guid, sizeof (keyhash)); - serdata = ddsi_serdata_from_keyhash(sertopic, &keyhash); - - ret = dds_writer_lock(wr, &wraddr); - if (ret == DDS_RETCODE_OK) { - ret = dds_writecdr_impl (wraddr, serdata, timestamp, alive ? 0 : (DDS_WR_DISPOSE_BIT | DDS_WR_UNREGISTER_BIT)); - dds_writer_unlock(wraddr); - } - } else { - ret = wr; - } - } - os_atomic_dec32(&m_call_count); - return ret; + /* initialize to avoid gcc warning ultimately caused by C's horrible type system */ + struct ddsi_sertopic *topic = NULL; + struct ddsi_serdata *serdata; + struct nn_keyhash keyhash; + switch (e->kind) + { + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + topic = builtin_participant_topic; + break; + case EK_WRITER: + case EK_PROXY_WRITER: + topic = builtin_writer_topic; + break; + case EK_READER: + case EK_PROXY_READER: + topic = builtin_reader_topic; + break; + } + assert (topic != NULL); + memcpy (&keyhash, &e->guid, sizeof (keyhash)); + serdata = ddsi_serdata_from_keyhash (topic, &keyhash); + serdata->timestamp = timestamp; + serdata->statusinfo = alive ? 0 : NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER; + return serdata; } -void -dds__builtin_write( - _In_ enum ddsi_sertopic_builtin_type type, - _In_ const nn_guid_t *guid, - _In_ nn_wctime_t timestamp, - _In_ bool alive) +void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive) { - /* initialize to avoid compiler warning ultimately caused by C's horrible type system */ - dds_entity_t topic = 0; - switch (type) + if (ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, get_entity_vendorid (e))) + { + /* initialize to avoid gcc warning ultimately caused by C's horrible type system */ + struct local_orphan_writer *bwr = NULL; + struct ddsi_serdata *serdata = dds__builtin_make_sample (e, timestamp, alive); + assert (e->tk != NULL); + switch (e->kind) { - case DSBT_PARTICIPANT: - topic = DDS_BUILTIN_TOPIC_DCPSPARTICIPANT; - break; - case DSBT_WRITER: - topic = DDS_BUILTIN_TOPIC_DCPSPUBLICATION; - break; - case DSBT_READER: - topic = DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION; - break; + case EK_PARTICIPANT: + case EK_PROXY_PARTICIPANT: + bwr = builtintopic_writer_participant; + break; + case EK_WRITER: + case EK_PROXY_WRITER: + bwr = builtintopic_writer_publications; + break; + case EK_READER: + case EK_PROXY_READER: + bwr = builtintopic_writer_subscriptions; + break; } - assert(topic != 0); - (void)dds__builtin_write_int(topic, guid, timestamp.v, alive); -} - -bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos) -{ - if (qos == NULL) { - /* default QoS inherited from topic is ok by definition */ - return true; - } else { - dds_entity_t wr = dds__get_builtin_writer(topic); - dds_qos_t *wrqos = dds_create_qos(); - dds_return_t ret = dds_get_qos(wr, wrqos); - bool match; - assert (ret == DDS_RETCODE_OK); - (void)ret; - if (!qos_match_p (qos, wrqos)) { - match = false; - } else if (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED || - qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED || - qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED) { - /* this means a write on the built-in topic writer can't fail */ - match = false; - } else { - match = true; - } - dds_delete_qos(wrqos); - return match; - } -} - -void -dds__builtin_init( - void) -{ - assert(os_atomic_ld32(&m_call_count) == 0); - os_mutexInit(&g_builtin_mutex); - os_atomic_inc32(&m_call_count); -} - -void -dds__builtin_fini( - void) -{ - assert(os_atomic_ld32(&m_call_count) > 0); - while (os_atomic_dec32_nv(&m_call_count) > 0) { - os_atomic_inc32_nv(&m_call_count); - dds_sleepfor(DDS_MSECS(10)); - } - (void)dds_delete(g_builtin_local_participant); - g_builtin_local_participant = 0; - g_builtin_local_publisher = 0; - memset(g_builtin_local_writers, 0, sizeof(g_builtin_local_writers)); - os_mutexDestroy(&g_builtin_mutex); + dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata); + } } diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 9eae64f..23802d4 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -18,6 +18,7 @@ #include "dds__domain.h" #include "dds__err.h" #include "dds__builtin.h" +#include "dds__whc_builtintopic.h" #include "ddsi/ddsi_iid.h" #include "ddsi/ddsi_tkmap.h" #include "ddsi/ddsi_serdata.h" @@ -106,8 +107,6 @@ dds_init(dds_domainid_t domain) * main configured domain id is. */ dds_global.m_default_domain = config.domainId.value; - dds__builtin_init(); - if (rtps_config_prep(dds_cfgst) != 0) { DDS_ERROR("Failed to configure RTPS\n"); @@ -138,6 +137,8 @@ dds_init(dds_domainid_t domain) goto fail_rtps_init; } + dds__builtin_init (); + if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0) { DDS_ERROR("Failed to start the servicelease\n"); @@ -172,7 +173,8 @@ skip: fail_servicelease_start: if (gv.servicelease) nn_servicelease_stop_renewing (gv.servicelease); - rtps_term (); + rtps_stop (); + rtps_fini (); fail_rtps_init: if (gv.servicelease) { @@ -182,7 +184,6 @@ fail_rtps_init: fail_servicelease_new: thread_states_fini(); fail_rtps_config: - dds__builtin_fini(); fail_config_domainid: dds_global.m_default_domain = DDS_DOMAIN_DEFAULT; config_fini (dds_cfgst); @@ -206,11 +207,11 @@ extern void dds_fini (void) dds_global.m_init_count--; if (dds_global.m_init_count == 0) { - dds__builtin_fini(); - if (gv.servicelease) nn_servicelease_stop_renewing (gv.servicelease); - rtps_term (); + rtps_stop (); + dds__builtin_fini (); + rtps_fini (); if (gv.servicelease) nn_servicelease_free (gv.servicelease); gv.servicelease = NULL; @@ -247,7 +248,9 @@ void ddsi_plugin_init (void) ddsi_plugin.init_fn = dds__init_plugin; ddsi_plugin.fini_fn = dds__fini_plugin; - ddsi_plugin.builtin_write = dds__builtin_write; + ddsi_plugin.builtintopic_is_visible = dds__builtin_is_visible; + ddsi_plugin.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry; + ddsi_plugin.builtintopic_write = dds__builtin_write; ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free; ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini; diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 7c4d399..5daf5a5 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -18,6 +18,7 @@ #include "dds__domain.h" #include "dds__participant.h" #include "dds__err.h" +#include "dds__builtin.h" #define DDS_PARTICIPANT_STATUS_MASK 0u diff --git a/src/core/ddsc/src/dds_qos.c b/src/core/ddsc/src/dds_qos.c index caa65dc..38c61e3 100644 --- a/src/core/ddsc/src/dds_qos.c +++ b/src/core/ddsc/src/dds_qos.c @@ -316,7 +316,7 @@ bool dds_qos_equal ( } else if (a == NULL || b == NULL) { return false; } else { - return nn_xqos_delta(a, b, ~(uint64_t)0) != 0; + return nn_xqos_delta(a, b, ~(uint64_t)0) == 0; } } diff --git a/src/core/ddsi/src/ddsi_serdata_builtin.c b/src/core/ddsc/src/dds_serdata_builtintopic.c similarity index 78% rename from src/core/ddsi/src/ddsi_serdata_builtin.c rename to src/core/ddsc/src/dds_serdata_builtintopic.c index 0623ded..c288a83 100644 --- a/src/core/ddsi/src/ddsi_serdata_builtin.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -23,11 +23,10 @@ #include #include "os/os.h" #include "dds__key.h" -#include "ddsi/ddsi_tkmap.h" #include "dds__stream.h" +#include "dds__serdata_builtintopic.h" +#include "ddsi/ddsi_tkmap.h" #include "ddsi/q_entity.h" -#include "ddsi/ddsi_serdata_builtin.h" -//#include "dds.h" /* FIXME: need the sample types of the built-in topics */ static const uint64_t unihashconsts[] = { UINT64_C (16292676669999574021), @@ -46,7 +45,7 @@ static uint32_t hash_guid (const nn_guid_t *g) >> 32); } -static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d, uint32_t basehash) +static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtintopic *d, uint32_t basehash) { d->c.hash = hash_guid (&d->key) ^ basehash; return &d->c; @@ -54,37 +53,37 @@ static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d, static bool serdata_builtin_eqkey(const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn) { - const struct ddsi_serdata_builtin *a = (const struct ddsi_serdata_builtin *)acmn; - const struct ddsi_serdata_builtin *b = (const struct ddsi_serdata_builtin *)bcmn; + const struct ddsi_serdata_builtintopic *a = (const struct ddsi_serdata_builtintopic *)acmn; + const struct ddsi_serdata_builtintopic *b = (const struct ddsi_serdata_builtintopic *)bcmn; return memcmp (&a->key, &b->key, sizeof (a->key)) == 0; } static void serdata_builtin_free(struct ddsi_serdata *dcmn) { - struct ddsi_serdata_builtin *d = (struct ddsi_serdata_builtin *)dcmn; + struct ddsi_serdata_builtintopic *d = (struct ddsi_serdata_builtintopic *)dcmn; if (d->c.kind == SDK_DATA) nn_xqos_fini (&d->xqos); os_free (d); } -static struct ddsi_serdata_builtin *serdata_builtin_new(const struct ddsi_sertopic_builtin *tp, enum ddsi_serdata_kind kind) +static struct ddsi_serdata_builtintopic *serdata_builtin_new(const struct ddsi_sertopic_builtintopic *tp, enum ddsi_serdata_kind kind) { - struct ddsi_serdata_builtin *d = os_malloc(sizeof (*d)); + struct ddsi_serdata_builtintopic *d = os_malloc(sizeof (*d)); ddsi_serdata_init (&d->c, &tp->c, kind); return d; } -static void from_entity_pp (struct ddsi_serdata_builtin *d, const struct participant *pp) +static void from_entity_pp (struct ddsi_serdata_builtintopic *d, const struct participant *pp) { nn_xqos_copy(&d->xqos, &pp->plist->qos); } -static void from_entity_proxypp (struct ddsi_serdata_builtin *d, const struct proxy_participant *proxypp) +static void from_entity_proxypp (struct ddsi_serdata_builtintopic *d, const struct proxy_participant *proxypp) { nn_xqos_copy(&d->xqos, &proxypp->plist->qos); } -static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const struct ddsi_sertopic *tp) +static void set_topic_type_from_sertopic (struct ddsi_serdata_builtintopic *d, const struct ddsi_sertopic *tp) { if (!(d->xqos.present & QP_TOPIC_NAME)) { @@ -98,26 +97,26 @@ static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const } } -static void from_entity_rd (struct ddsi_serdata_builtin *d, const struct reader *rd) +static void from_entity_rd (struct ddsi_serdata_builtintopic *d, const struct reader *rd) { nn_xqos_copy(&d->xqos, rd->xqos); set_topic_type_from_sertopic(d, rd->topic); } -static void from_entity_prd (struct ddsi_serdata_builtin *d, const struct proxy_reader *prd) +static void from_entity_prd (struct ddsi_serdata_builtintopic *d, const struct proxy_reader *prd) { nn_xqos_copy(&d->xqos, prd->c.xqos); assert (d->xqos.present & QP_TOPIC_NAME); assert (d->xqos.present & QP_TYPE_NAME); } -static void from_entity_wr (struct ddsi_serdata_builtin *d, const struct writer *wr) +static void from_entity_wr (struct ddsi_serdata_builtintopic *d, const struct writer *wr) { nn_xqos_copy(&d->xqos, wr->xqos); set_topic_type_from_sertopic(d, wr->topic); } -static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_writer *pwr) +static void from_entity_pwr (struct ddsi_serdata_builtintopic *d, const struct proxy_writer *pwr) { nn_xqos_copy(&d->xqos, pwr->c.xqos); assert (d->xqos.present & QP_TOPIC_NAME); @@ -127,10 +126,10 @@ static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_ struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash) { /* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */ - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)tpcmn; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn; /* keyhash must in host format (which the GUIDs always are internally) */ const struct entity_common *entity = ephash_lookup_guid_untyped ((const nn_guid_t *) keyhash->value); - struct ddsi_serdata_builtin *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY); + struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY); memcpy (&d->key, keyhash->value, sizeof (d->key)); if (entity) { @@ -188,16 +187,25 @@ static char *dds_string_dup_reuse (char *old, const char *src) static dds_qos_t *dds_qos_from_xqos_reuse (dds_qos_t *old, const nn_xqos_t *src) { if (old == NULL) - return nn_xqos_dup (src); + { + old = os_malloc (sizeof (*old)); + nn_xqos_init_empty (old); + old->present |= QP_TOPIC_NAME | QP_TYPE_NAME; + nn_xqos_mergein_missing (old, src); + old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME); + } else { nn_xqos_fini (old); + nn_xqos_init_empty (old); + old->present |= QP_TOPIC_NAME | QP_TYPE_NAME; nn_xqos_mergein_missing (old, src); - return old; + old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME); } + return old; } -static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_participant *sample) +static bool to_sample_pp (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_participant *sample) { convkey (&sample->key, &d->key); if (d->c.kind == SDK_DATA) @@ -207,7 +215,7 @@ static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_built return true; } -static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_endpoint *sample) +static bool to_sample_endpoint (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_endpoint *sample) { nn_guid_t ppguid; convkey (&sample->key, &d->key); @@ -227,8 +235,8 @@ static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds static bool serdata_builtin_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim) { - const struct ddsi_serdata_builtin *d = (const struct ddsi_serdata_builtin *)serdata_common; - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)topic; + const struct ddsi_serdata_builtintopic *d = (const struct ddsi_serdata_builtintopic *)serdata_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)topic; if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */ /* FIXME: completing builtin topic support along these lines requires subscribers, publishers and topics to also become DDSI entities - which is probably a good thing anyway */ switch (tp->type) @@ -270,7 +278,7 @@ static void serdata_builtin_to_ser_unref (struct ddsi_serdata *serdata_common, c (void)serdata_common; (void)ref; } -const struct ddsi_serdata_ops ddsi_serdata_ops_builtin = { +const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = { .get_size = serdata_builtin_get_size, .eqkey = serdata_builtin_eqkey, .free = serdata_builtin_free, diff --git a/src/core/ddsi/src/ddsi_sertopic_builtin.c b/src/core/ddsc/src/dds_sertopic_builtintopic.c similarity index 82% rename from src/core/ddsi/src/ddsi_sertopic_builtin.c rename to src/core/ddsc/src/dds_sertopic_builtintopic.c index 5092a6d..5783ab5 100644 --- a/src/core/ddsi/src/ddsi_sertopic_builtin.c +++ b/src/core/ddsc/src/dds_sertopic_builtintopic.c @@ -20,22 +20,22 @@ #include "ddsi/q_config.h" #include "ddsi/q_freelist.h" #include "ddsi/ddsi_sertopic.h" -#include "ddsi/ddsi_serdata_builtin.h" #include "ddsc/dds.h" +#include "dds__serdata_builtintopic.h" /* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */ -struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename) +struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename) { - struct ddsi_sertopic_builtin *tp = os_malloc (sizeof (*tp)); + struct ddsi_sertopic_builtintopic *tp = os_malloc (sizeof (*tp)); tp->c.iid = ddsi_iid_gen(); tp->c.name = dds_string_dup (name); tp->c.typename = dds_string_dup (typename); const size_t name_typename_size = strlen (tp->c.name) + 1 + strlen (tp->c.typename) + 1; tp->c.name_typename = dds_alloc (name_typename_size); snprintf (tp->c.name_typename, name_typename_size, "%s/%s", tp->c.name, tp->c.typename); - tp->c.ops = &ddsi_sertopic_ops_builtin; - tp->c.serdata_ops = &ddsi_serdata_ops_builtin; + tp->c.ops = &ddsi_sertopic_ops_builtintopic; + tp->c.serdata_ops = &ddsi_serdata_ops_builtintopic; tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops); tp->c.status_cb = 0; tp->c.status_cb_entity = NULL; @@ -66,7 +66,7 @@ static void free_endpoint (void *vsample) sample->qos = NULL; } -static size_t get_size (enum ddsi_sertopic_builtin_type type) +static size_t get_size (enum ddsi_sertopic_builtintopic_type type) { switch (type) { @@ -82,14 +82,14 @@ static size_t get_size (enum ddsi_sertopic_builtin_type type) static void sertopic_builtin_zero_samples (const struct ddsi_sertopic *sertopic_common, void *samples, size_t count) { - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common; size_t size = get_size (tp->type); memset (samples, 0, size * count); } static void sertopic_builtin_realloc_samples (void **ptrs, const struct ddsi_sertopic *sertopic_common, void *old, size_t oldcount, size_t count) { - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common; const size_t size = get_size (tp->type); char *new = dds_realloc (old, size * count); if (new && count > oldcount) @@ -105,7 +105,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_ { if (count > 0) { - const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; + const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common; const size_t size = get_size (tp->type); #ifndef NDEBUG for (size_t i = 0, off = 0; i < count; i++, off += size) @@ -139,7 +139,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_ } } -const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin = { +const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic = { .deinit = sertopic_builtin_deinit, .zero_samples = sertopic_builtin_zero_samples, .realloc_samples = sertopic_builtin_realloc_samples, diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 00c2194..1466fde 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -316,8 +316,6 @@ static bool dupdef_qos_ok(const dds_qos_t *qos, const struct ddsi_sertopic *st) static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b) { - printf ("sertopic_equivalent %p %p (%s %s; %u %u; %p %p; %p %p)\n", (void*)a, (void*)b, a->name_typename, b->name_typename, a->serdata_basehash, b->serdata_basehash, (void *)a->ops, (void *)b->ops, (void *)a->serdata_ops, (void *)b->serdata_ops); - if (strcmp (a->name_typename, b->name_typename) != 0) return false; if (a->serdata_basehash != b->serdata_basehash) diff --git a/src/core/ddsc/src/dds_whc_builtintopic.c b/src/core/ddsc/src/dds_whc_builtintopic.c new file mode 100644 index 0000000..3453102 --- /dev/null +++ b/src/core/ddsc/src/dds_whc_builtintopic.c @@ -0,0 +1,201 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include + +#include "os/os.h" +#include "ddsi/ddsi_serdata.h" +#include "ddsi/q_unused.h" +#include "ddsi/q_config.h" +#include "ddsi/q_ephash.h" +#include "ddsi/q_entity.h" +#include "ddsi/ddsi_tkmap.h" +#include "dds__serdata_builtintopic.h" +#include "dds__whc_builtintopic.h" +#include "dds__builtin.h" + +struct bwhc { + struct whc common; + enum ddsi_sertopic_builtintopic_type type; +}; + +enum bwhc_iter_state { + BIS_INIT_LOCAL, + BIS_LOCAL, + BIS_INIT_PROXY, + BIS_PROXY +}; + +struct bwhc_iter { + struct whc_sample_iter_base c; + enum bwhc_iter_state st; + bool have_sample; + struct ephash_enum it; +}; + +/* check that our definition of whc_sample_iter fits in the type that callers allocate */ +struct bwhc_sample_iter_sizecheck { + char fits_in_generic_type[sizeof(struct bwhc_iter) <= sizeof(struct whc_sample_iter) ? 1 : -1]; +}; + +static void bwhc_free (struct whc *whc_generic) +{ + os_free (whc_generic); +} + +static void bwhc_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it) +{ + struct bwhc_iter *it = (struct bwhc_iter *) opaque_it; + it->c.whc = (struct whc *) whc_generic; + it->st = BIS_INIT_LOCAL; + it->have_sample = false; +} + +static bool is_visible (const struct entity_common *e) +{ + const nn_vendorid_t vendorid = get_entity_vendorid (e); + return ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, vendorid); +} + +static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample) +{ + struct bwhc_iter * const it = (struct bwhc_iter *) opaque_it; + struct bwhc * const whc = (struct bwhc *) it->c.whc; + enum entity_kind kind = EK_PARTICIPANT; /* pacify gcc */ + struct entity_common *entity; + + if (it->have_sample) + { + ddsi_serdata_unref (sample->serdata); + it->have_sample = false; + } + + /* most fields really don't matter, so memset */ + memset (sample, 0, sizeof (*sample)); + + switch (it->st) + { + case BIS_INIT_LOCAL: + switch (whc->type) { + case DSBT_PARTICIPANT: kind = EK_PARTICIPANT; break; + case DSBT_WRITER: kind = EK_WRITER; break; + case DSBT_READER: kind = EK_READER; break; + } + assert (whc->type == DSBT_PARTICIPANT || kind != EK_PARTICIPANT); + ephash_enum_init (&it->it, kind); + it->st = BIS_LOCAL; + /* FALLS THROUGH */ + case BIS_LOCAL: + while ((entity = ephash_enum_next (&it->it)) != NULL) + if (is_visible (entity)) + break; + if (entity) { + sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true); + it->have_sample = true; + return true; + } else { + ephash_enum_fini (&it->it); + it->st = BIS_INIT_PROXY; + } + /* FALLS THROUGH */ + case BIS_INIT_PROXY: + switch (whc->type) { + case DSBT_PARTICIPANT: kind = EK_PROXY_PARTICIPANT; break; + case DSBT_WRITER: kind = EK_PROXY_WRITER; break; + case DSBT_READER: kind = EK_PROXY_READER; break; + } + assert (kind != EK_PARTICIPANT); + ephash_enum_init (&it->it, kind); + it->st = BIS_PROXY; + /* FALLS THROUGH */ + case BIS_PROXY: + while ((entity = ephash_enum_next (&it->it)) != NULL) + if (is_visible (entity)) + break; + if (entity) { + sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true); + it->have_sample = true; + return true; + } else { + ephash_enum_fini (&it->it); + return false; + } + } + assert (0); + return false; +} + +static void bwhc_get_state (const struct whc *whc, struct whc_state *st) +{ + (void)whc; + st->max_seq = -1; + st->min_seq = -1; + st->unacked_bytes = 0; +} + +static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk) +{ + (void)whc; + (void)max_drop_seq; + (void)seq; + (void)serdata; + (void)tk; + if (plist) + os_free (plist); + return 0; +} + +static unsigned bwhc_downgrade_to_volatile (struct whc *whc, struct whc_state *st) +{ + (void)whc; + (void)st; + return 0; +} + +static unsigned bwhc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list) +{ + (void)whc; + (void)max_drop_seq; + (void)whcst; + *deferred_free_list = NULL; + return 0; +} + +static void bwhc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list) +{ + (void)whc; + (void)deferred_free_list; +} + +static const struct whc_ops bwhc_ops = { + .insert = bwhc_insert, + .remove_acked_messages = bwhc_remove_acked_messages, + .free_deferred_free_list = bwhc_free_deferred_free_list, + .get_state = bwhc_get_state, + .next_seq = 0, + .borrow_sample = 0, + .borrow_sample_key = 0, + .return_sample = 0, + .sample_iter_init = bwhc_sample_iter_init, + .sample_iter_borrow_next = bwhc_sample_iter_borrow_next, + .downgrade_to_volatile = bwhc_downgrade_to_volatile, + .free = bwhc_free +}; + +struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type) +{ + struct bwhc *whc = os_malloc (sizeof (*whc)); + whc->common.ops = &bwhc_ops; + whc->type = type; + return (struct whc *) whc; +} diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index 73e5f4d..da7982d 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -26,327 +26,259 @@ #include "ddsi/q_entity.h" #include "ddsi/q_radmin.h" -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -dds_return_t -dds_write( - _In_ dds_entity_t writer, - _In_ const void *data) +dds_return_t dds_write (dds_entity_t writer, const void *data) { - dds_return_t ret; - dds__retcode_t rc; - dds_writer *wr; + dds_return_t ret; + dds__retcode_t rc; + dds_writer *wr; - if (data != NULL) { - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - ret = dds_write_impl(wr, data, dds_time(), 0); - dds_writer_unlock(wr); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("No data buffer provided\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } + if (data == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); - return ret; + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + ret = dds_write_impl (wr, data, dds_time (), 0); + dds_writer_unlock (wr); + return ret; } -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -int -dds_writecdr( - dds_entity_t writer, - struct ddsi_serdata *serdata) +dds_return_t dds_writecdr (dds_entity_t writer, struct ddsi_serdata *serdata) { - dds_return_t ret; - dds__retcode_t rc; - dds_writer *wr; - if (serdata != NULL) { - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - ret = dds_writecdr_impl (wr, serdata, dds_time (), 0); - dds_writer_unlock(wr); - } else { - DDS_ERROR("Error occurred on locking writer\n"); - ret = DDS_ERRNO(rc); - } - } else{ - DDS_ERROR("Given cdr has NULL value\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - return ret; + dds_return_t ret; + dds__retcode_t rc; + dds_writer *wr; + + if (serdata == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + ret = dds_writecdr_impl (wr, serdata, dds_time (), 0); + dds_writer_unlock (wr); + return ret; } -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -dds_return_t -dds_write_ts( - _In_ dds_entity_t writer, - _In_ const void *data, - _In_ dds_time_t timestamp) +dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t timestamp) { - dds_return_t ret; - dds__retcode_t rc; - dds_writer *wr; + dds_return_t ret; + dds__retcode_t rc; + dds_writer *wr; - if(data == NULL){ - DDS_ERROR("Argument data has NULL value\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err; - } - if(timestamp < 0){ - DDS_ERROR("Argument timestamp has negative value\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err; - } - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - ret = dds_write_impl(wr, data, timestamp, 0); - dds_writer_unlock(wr); - } else { - DDS_ERROR("Error occurred on locking writer\n"); - ret = DDS_ERRNO(rc); - } -err: - return ret; + if (data == NULL || timestamp < 0) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + ret = dds_write_impl (wr, data, timestamp, 0); + dds_writer_unlock (wr); + return ret; } -static int -deliver_locally( - _In_ struct writer *wr, - _In_ struct ddsi_serdata *payload, - _In_ struct ddsi_tkmap_instance *tk) +static dds_return_t try_store (struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms) { - dds_return_t ret = DDS_RETCODE_OK; - os_mutexLock (&wr->rdary.rdary_lock); - if (wr->rdary.fastpath_ok) { - struct reader ** const rdary = wr->rdary.rdary; - if (rdary[0]) { - struct proxy_writer_info pwr_info; - unsigned i; - make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); - for (i = 0; rdary[i]; i++) { - bool stored; - DDS_TRACE("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)); - dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC; - do { - stored = (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk); - if (!stored) { - if (max_block_ms <= 0) { - DDS_ERROR("The writer could not deliver data on time, probably due to a local reader resources being full\n"); - ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); - } else { - dds_sleepfor(DDS_MSECS(DDS_HEADBANG_TIMEOUT_MS)); - } - /* Decreasing the block time after the sleep, let's us possibly - * wait a bit too long. But that's preferable compared to waiting - * a bit too short. */ - max_block_ms -= DDS_HEADBANG_TIMEOUT_MS; - } - } while ((!stored) && (ret == DDS_RETCODE_OK)); - } - } - os_mutexUnlock (&wr->rdary.rdary_lock); - } else { - /* When deleting, pwr is no longer accessible via the hash - tables, and consequently, a reader may be deleted without - it being possible to remove it from rdary. The primary - reason rdary exists is to avoid locking the proxy writer - but this is less of an issue when we are deleting it, so - we fall back to using the GUIDs so that we can deliver all - samples we received from it. As writer being deleted any - reliable samples that are rejected are simply discarded. */ - ut_avlIter_t it; - struct pwr_rd_match *m; - struct proxy_writer_info pwr_info; - os_mutexUnlock (&wr->rdary.rdary_lock); - make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); - os_mutexLock (&wr->e.lock); - for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) { - struct reader *rd; - if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) { - DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)); - /* Copied the return value ignore from DDSI deliver_user_data() function. */ - (void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); - } - } - os_mutexUnlock (&wr->e.lock); + while (!(ddsi_plugin.rhc_plugin.rhc_store_fn) (rhc, pwr_info, payload, tk)) + { + if (*max_block_ms > 0) + { + dds_sleepfor (DDS_HEADBANG_TIMEOUT); + *max_block_ms -= DDS_HEADBANG_TIMEOUT; } - return ret; + else + { + DDS_ERROR ("The writer could not deliver data on time, probably due to a local reader resources being full\n"); + return DDS_ERRNO (DDS_RETCODE_TIMEOUT); + } + } + return DDS_RETCODE_OK; } -int -dds_write_impl( - _In_ dds_writer *wr, - _In_ const void * data, - _In_ dds_time_t tstamp, - _In_ dds_write_action action) +static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk) { - dds_return_t ret = DDS_RETCODE_OK; - int w_rc; - - assert (wr); - assert (dds_entity_kind(((dds_entity*)wr)->m_hdl) == DDS_KIND_WRITER); - - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); - const bool writekey = action & DDS_WR_KEY_BIT; - dds_writer * writer = (dds_writer*) wr; - struct writer * ddsi_wr = writer->m_wr; - struct ddsi_tkmap_instance * tk; - struct ddsi_serdata *d; - - if (data == NULL) { - DDS_ERROR("No data buffer provided\n"); - return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); + dds_return_t ret = DDS_RETCODE_OK; + os_mutexLock (&wr->rdary.rdary_lock); + if (wr->rdary.fastpath_ok) + { + struct reader ** const rdary = wr->rdary.rdary; + if (rdary[0]) + { + dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time); + struct proxy_writer_info pwr_info; + unsigned i; + make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos); + for (i = 0; rdary[i]; i++) { + DDS_TRACE ("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)); + if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK) + break; + } } - - /* Check for topic filter */ - if (wr->m_topic->filter_fn && ! writekey) { - if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) { - return DDS_RETCODE_OK; - } + os_mutexUnlock (&wr->rdary.rdary_lock); + } + else + { + /* When deleting, pwr is no longer accessible via the hash + tables, and consequently, a reader may be deleted without + it being possible to remove it from rdary. The primary + reason rdary exists is to avoid locking the proxy writer + but this is less of an issue when we are deleting it, so + we fall back to using the GUIDs so that we can deliver all + samples we received from it. As writer being deleted any + reliable samples that are rejected are simply discarded. */ + ut_avlIter_t it; + struct pwr_rd_match *m; + struct proxy_writer_info pwr_info; + dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time); + os_mutexUnlock (&wr->rdary.rdary_lock); + make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos); + os_mutexLock (&wr->e.lock); + for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) + { + struct reader *rd; + if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) + { + DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)); + /* Copied the return value ignore from DDSI deliver_user_data() function. */ + if ((ret = try_store (rd->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK) + break; + } } - - if (asleep) { - thread_state_awake (thr); - } - - /* Serialize and write data or key */ - d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data); - - /* Set if disposing or unregistering */ - d->statusinfo = ((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) | - ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ; - d->timestamp.v = tstamp; - ddsi_serdata_ref(d); - tk = ddsi_tkmap_lookup_instance_ref(d); - w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk); - - if (w_rc >= 0) { - /* Flush out write unless configured to batch */ - if (! config.whc_batch){ - nn_xpack_send (writer->m_xp, false); - } - ret = DDS_RETCODE_OK; - } else if (w_rc == ERR_TIMEOUT) { - DDS_ERROR("The writer could not deliver data on time, probably due to a reader resources being full\n"); - ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); - } else if (w_rc == ERR_INVALID_DATA) { - DDS_ERROR("Invalid data provided\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } else { - DDS_ERROR("Internal error\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } - if (ret == DDS_RETCODE_OK) { - ret = deliver_locally (ddsi_wr, d, tk); - } - ddsi_serdata_unref(d); - ddsi_tkmap_instance_unref(tk); - - if (asleep) { - thread_state_asleep (thr); - } - - return ret; + os_mutexUnlock (&wr->e.lock); + } + return ret; } -int -dds_writecdr_impl( - _In_ dds_writer *wr, - _Inout_ struct ddsi_serdata *d, - _In_ dds_time_t tstamp, - _In_ dds_write_action action) +dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action) { - int ret = DDS_RETCODE_OK; - int w_rc; + struct thread_state1 * const thr = lookup_thread_state (); + const bool asleep = !vtime_awake_p (thr->vtime); + const bool writekey = action & DDS_WR_KEY_BIT; + struct writer *ddsi_wr = wr->m_wr; + struct ddsi_tkmap_instance *tk; + struct ddsi_serdata *d; + dds_return_t ret = DDS_RETCODE_OK; + int w_rc; - assert (wr); + if (data == NULL) + { + DDS_ERROR("No data buffer provided\n"); + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + } - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); - struct writer * ddsi_wr = wr->m_wr; - struct ddsi_tkmap_instance * tk; + /* Check for topic filter */ + if (wr->m_topic->filter_fn && !writekey) + if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) + return DDS_RETCODE_OK; - if (wr->m_topic->filter_fn) { - abort(); - } + if (asleep) + thread_state_awake (thr); - if (asleep) { - thread_state_awake (thr); - } + /* Serialize and write data or key */ + d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data); + d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0); + d->timestamp.v = tstamp; + ddsi_serdata_ref (d); + tk = ddsi_tkmap_lookup_instance_ref (d); + w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk); - /* Set if disposing or unregistering */ - d->statusinfo = - ((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) | - ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ; - d->timestamp.v = tstamp; - ddsi_serdata_ref(d); - tk = ddsi_tkmap_lookup_instance_ref(d); - w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk); - if (w_rc >= 0) { - /* Flush out write unless configured to batch */ - if (! config.whc_batch) { - nn_xpack_send (wr->m_xp, false); - } - ret = DDS_RETCODE_OK; - } else if (w_rc == ERR_TIMEOUT) { - DDS_ERROR("The writer could not deliver data on time, probably due to a reader resources being full\n"); - ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); - } else if (w_rc == ERR_INVALID_DATA) { - DDS_ERROR("Invalid data provided\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } else { - DDS_ERROR("Internal error\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } + if (w_rc >= 0) + { + /* Flush out write unless configured to batch */ + if (!config.whc_batch) + nn_xpack_send (wr->m_xp, false); + ret = DDS_RETCODE_OK; + } else if (w_rc == ERR_TIMEOUT) { + DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n"); + ret = DDS_ERRNO (DDS_RETCODE_TIMEOUT); + } else if (w_rc == ERR_INVALID_DATA) { + DDS_ERROR ("Invalid data provided\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } else { + DDS_ERROR ("Internal error\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } + if (ret == DDS_RETCODE_OK) + ret = deliver_locally (ddsi_wr, d, tk); + ddsi_serdata_unref (d); + ddsi_tkmap_instance_unref (tk); - if (ret == DDS_RETCODE_OK) { - ret = deliver_locally (ddsi_wr, d, tk); - } - ddsi_serdata_unref(d); - ddsi_tkmap_instance_unref(tk); - - if (asleep) { - thread_state_asleep (thr); - } - - return ret; + if (asleep) + thread_state_asleep (thr); + return ret; } -void -dds_write_set_batch( - bool enable) +dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d) { - config.whc_batch = enable ? 1 : 0; + struct thread_state1 * const thr = lookup_thread_state (); + const bool asleep = !vtime_awake_p (thr->vtime); + struct ddsi_tkmap_instance * tk; + int ret = DDS_RETCODE_OK; + int w_rc; + + if (asleep) + thread_state_awake (thr); + + ddsi_serdata_ref (d); + tk = ddsi_tkmap_lookup_instance_ref (d); + w_rc = write_sample_gc (xp, ddsi_wr, d, tk); + if (w_rc >= 0) { + /* Flush out write unless configured to batch */ + if (!config.whc_batch && xp != NULL) + nn_xpack_send (xp, false); + ret = DDS_RETCODE_OK; + } else if (w_rc == ERR_TIMEOUT) { + DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n"); + ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT); + } else if (w_rc == ERR_INVALID_DATA) { + DDS_ERROR ("Invalid data provided\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } else { + DDS_ERROR ("Internal error\n"); + ret = DDS_ERRNO (DDS_RETCODE_ERROR); + } + + if (ret == DDS_RETCODE_OK) + ret = deliver_locally (ddsi_wr, d, tk); + ddsi_serdata_unref (d); + ddsi_tkmap_instance_unref (tk); + + if (asleep) + thread_state_asleep (thr); + + return ret; } -_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) -void -dds_write_flush( - dds_entity_t writer) +dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action) { - dds__retcode_t rc; - - struct thread_state1 * const thr = lookup_thread_state (); - const bool asleep = !vtime_awake_p (thr->vtime); - dds_writer *wr; - - if (asleep) { - thread_state_awake (thr); - } - rc = dds_writer_lock(writer, &wr); - if (rc == DDS_RETCODE_OK) { - nn_xpack_send (wr->m_xp, true); - dds_writer_unlock(wr); - } else{ - DDS_ERROR("Error occurred on locking writer\n"); - } - - if (asleep) { - thread_state_asleep (thr); - } - - return ; + if (wr->m_topic->filter_fn) + abort (); + /* Set if disposing or unregistering */ + d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0); + d->timestamp.v = tstamp; + return dds_writecdr_impl_lowlevel (wr->m_wr, wr->m_xp, d); +} + +void dds_write_set_batch (bool enable) +{ + config.whc_batch = enable ? 1 : 0; +} + +void dds_write_flush (dds_entity_t writer) +{ + struct thread_state1 * const thr = lookup_thread_state (); + const bool asleep = !vtime_awake_p (thr->vtime); + dds_writer *wr; + dds__retcode_t rc; + + if (asleep) + thread_state_awake (thr); + if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK) + DDS_ERROR ("Error occurred on locking writer\n"); + else + { + nn_xpack_send (wr->m_xp, true); + dds_writer_unlock (wr); + } + + if (asleep) + thread_state_asleep (thr); + return; } diff --git a/src/core/ddsc/tests/builtin_topics.c b/src/core/ddsc/tests/builtin_topics.c index ff24221..2395a8f 100644 --- a/src/core/ddsc/tests/builtin_topics.c +++ b/src/core/ddsc/tests/builtin_topics.c @@ -147,20 +147,21 @@ check_default_qos_of_builtin_entity(dds_entity_t entity) CU_Test(ddsc_builtin_topics, availability_builtin_topics, .init = setup, .fini = teardown) { +/* FIXME: Successful lookup doesn't rhyme with them not being returned when looking at the children of the participant ... */ dds_entity_t topic; topic = dds_find_topic(g_participant, "DCPSParticipant"); - CU_ASSERT_FATAL(topic > 0); - dds_delete(topic); + CU_ASSERT_FATAL(topic < 0); + //dds_delete(topic); topic = dds_find_topic(g_participant, "DCPSTopic"); CU_ASSERT_FATAL(topic < 0); - //TODO CHAM-347: dds_delete(topic); + //dds_delete(topic); topic = dds_find_topic(g_participant, "DCPSSubscription"); - CU_ASSERT_FATAL(topic > 0); - dds_delete(topic); + CU_ASSERT_FATAL(topic < 0); + //dds_delete(topic); topic = dds_find_topic(g_participant, "DCPSPublication"); - CU_ASSERT_FATAL(topic > 0); - dds_delete(topic); + CU_ASSERT_FATAL(topic < 0); + //dds_delete(topic); } CU_Test(ddsc_builtin_topics, read_publication_data, .init = setup, .fini = teardown) diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index 1650a37..0ddb2c3 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -20,10 +20,8 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src" ddsi_mcgroup.c ddsi_serdata.c ddsi_serdata_default.c - ddsi_serdata_builtin.c ddsi_sertopic.c ddsi_sertopic_default.c - ddsi_sertopic_builtin.c ddsi_rhc_plugin.c ddsi_iid.c ddsi_tkmap.c @@ -76,7 +74,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi" ddsi_serdata.h ddsi_sertopic.h ddsi_serdata_default.h - ddsi_serdata_builtin.h ddsi_rhc_plugin.h ddsi_iid.h ddsi_tkmap.h diff --git a/src/core/ddsi/include/ddsi/q_config.h b/src/core/ddsi/include/ddsi/q_config.h index 3605d1f..7810ea4 100644 --- a/src/core/ddsi/include/ddsi/q_config.h +++ b/src/core/ddsi/include/ddsi/q_config.h @@ -22,7 +22,6 @@ #include "ddsi/q_xqos.h" #include "ddsi/ddsi_tran.h" #include "ddsi/q_feature_check.h" -#include "ddsi/ddsi_serdata_builtin.h" #include "ddsi/ddsi_rhc_plugin.h" #if defined (__cplusplus) @@ -412,7 +411,10 @@ struct ddsi_plugin { int (*init_fn) (void); void (*fini_fn) (void); - void (*builtin_write) (enum ddsi_sertopic_builtin_type type, const nn_guid_t *guid, nn_wctime_t timestamp, bool alive); + + bool (*builtintopic_is_visible) (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid); + struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid); + void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive); /* Read cache */ struct ddsi_rhc_plugin rhc_plugin; diff --git a/src/core/ddsi/include/ddsi/q_entity.h b/src/core/ddsi/include/ddsi/q_entity.h index 488952a..fbf99bd 100644 --- a/src/core/ddsi/include/ddsi/q_entity.h +++ b/src/core/ddsi/include/ddsi/q_entity.h @@ -127,12 +127,15 @@ struct pwr_rd_match { struct nn_rsample_info; struct nn_rdata; +struct ddsi_tkmap_instance; struct entity_common { enum entity_kind kind; nn_guid_t guid; + nn_wctime_t tupdate; /* timestamp of last update */ char *name; uint64_t iid; + struct ddsi_tkmap_instance *tk; os_mutex lock; bool onlylocal; }; @@ -390,8 +393,11 @@ int is_deleted_participant_guid (const struct nn_guid *guid, unsigned for_what); nn_entityid_t to_entityid (unsigned u); int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid); +int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid); +bool is_local_orphan_endpoint (const struct entity_common *e); int is_writer_entityid (nn_entityid_t id); int is_reader_entityid (nn_entityid_t id); +nn_vendorid_t get_entity_vendorid (const struct entity_common *e); int pp_allocate_entityid (nn_entityid_t *id, unsigned kind, struct participant *pp); void pp_release_entityid(struct participant *pp, nn_entityid_t id); @@ -473,9 +479,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity GUID "ppguid". May return NULL if participant unknown or writer/reader already known. */ -struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg); +struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg); -struct reader * new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void * status_cb_arg); +struct reader *new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg); struct whc_node; struct whc_state; @@ -492,6 +498,12 @@ int delete_writer_nolinger_locked (struct writer *wr); int delete_reader (const struct nn_guid *guid); uint64_t reader_instance_id (const struct nn_guid *guid); +struct local_orphan_writer { + struct writer wr; +}; +struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc); +void delete_local_orphan_writer (struct local_orphan_writer *wr); + /* To create or delete a new proxy participant: "guid" MUST have the pre-defined participant entity id. Unlike delete_participant(), deleting a proxy participant will automatically delete all its diff --git a/src/core/ddsi/include/ddsi/q_gc.h b/src/core/ddsi/include/ddsi/q_gc.h index 3602d93..e46cd66 100644 --- a/src/core/ddsi/include/ddsi/q_gc.h +++ b/src/core/ddsi/include/ddsi/q_gc.h @@ -43,6 +43,7 @@ struct gcreq { }; struct gcreq_queue *gcreq_queue_new (void); +void gcreq_queue_drain (struct gcreq_queue *q); void gcreq_queue_free (struct gcreq_queue *q); struct gcreq *gcreq_new (struct gcreq_queue *gcreq_queue, gcreq_cb_t cb); diff --git a/src/core/ddsi/include/ddsi/q_globals.h b/src/core/ddsi/include/ddsi/q_globals.h index c91a029..7c19652 100644 --- a/src/core/ddsi/include/ddsi/q_globals.h +++ b/src/core/ddsi/include/ddsi/q_globals.h @@ -281,11 +281,6 @@ struct q_globals { struct ddsi_sertopic *plist_topic; /* used for all discovery data */ struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */ - /* Sertopics for built-in topics -- FIXME: these really have little to do with topics, but everything with topic types, in other words, they are the type supports in DDS ... so a bit of refactoring is required */ - struct ddsi_sertopic *builtin_participant_topic; - struct ddsi_sertopic *builtin_reader_topic; - struct ddsi_sertopic *builtin_writer_topic; - /* Network ID needed by v_groupWrite -- FIXME: might as well pass it to the receive thread instead of making it global (and that would remove the need to include kernelModule.h) */ diff --git a/src/core/ddsi/include/ddsi/q_rtps.h b/src/core/ddsi/include/ddsi/q_rtps.h index 78f99da..eea7cf9 100644 --- a/src/core/ddsi/include/ddsi/q_rtps.h +++ b/src/core/ddsi/include/ddsi/q_rtps.h @@ -80,7 +80,8 @@ int rtps_config_prep (struct cfgst *cfgst); int rtps_config_open (void); int rtps_init (void); void ddsi_plugin_init (void); -void rtps_term (void); +void rtps_stop (void); +void rtps_fini (void); #if defined (__cplusplus) } diff --git a/src/core/ddsi/include/ddsi/q_whc.h b/src/core/ddsi/include/ddsi/q_whc.h index f5bc549..5f33613 100644 --- a/src/core/ddsi/include/ddsi/q_whc.h +++ b/src/core/ddsi/include/ddsi/q_whc.h @@ -42,7 +42,7 @@ struct whc_state { an iter on the stack without specifying an implementation. If future changes or implementations require more, these can be adjusted. An implementation should check things fit at compile time. */ -#define WHC_SAMPLE_ITER_SIZE (1*sizeof(void *)) +#define WHC_SAMPLE_ITER_SIZE (7 * sizeof(void *)) struct whc_sample_iter_base { struct whc *whc; }; diff --git a/src/core/ddsi/src/ddsi_udp.c b/src/core/ddsi/src/ddsi_udp.c index b6cb2f5..da5ceac 100644 --- a/src/core/ddsi/src/ddsi_udp.c +++ b/src/core/ddsi/src/ddsi_udp.c @@ -64,8 +64,13 @@ static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, s msghdr.msg_namelen = srclen; msghdr.msg_iov = &msg_iov; msghdr.msg_iovlen = 1; +#if !defined(__sun) || defined(_XPG4_2) msghdr.msg_control = NULL; msghdr.msg_controllen = 0; +#else + msghdr.msg_accrights = NULL; + msghdr.msg_accrightslen = 0; +#endif do { ret = recvmsg(((ddsi_udp_conn_t) conn)->m_sock, &msghdr, 0); @@ -117,8 +122,13 @@ static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *d set_msghdr_iov (&msg, (os_iovec_t *) iov, niov); msg.msg_name = &dstaddr; msg.msg_namelen = (socklen_t) os_sockaddr_get_size((os_sockaddr *) &dstaddr); +#if !defined(__sun) || defined(_XPG4_2) msg.msg_control = NULL; msg.msg_controllen = 0; +#else + msg.msg_accrights = NULL; + msg.msg_accrightslen = 0; +#endif #if SYSDEPS_MSGHDR_FLAGS msg.msg_flags = (int) flags; #else diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 6e6f05b..36eaffb 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -149,39 +149,41 @@ int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid) } } -static int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid) +int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid) { return is_builtin_entityid (id, vendorid) && id.u != NN_ENTITYID_PARTICIPANT; } -static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_vendorid_t vendorid, bool onlylocal, struct ddsi_tkmap_instance **tk) +bool is_local_orphan_endpoint (const struct entity_common *e) +{ + return (e->guid.prefix.u[0] == 0 && e->guid.prefix.u[1] == 0 && e->guid.prefix.u[2] == 0 && + is_builtin_endpoint (e->guid.entityid, ownvendorid)); +} + +static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_wctime_t tcreate, nn_vendorid_t vendorid, bool onlylocal) { e->guid = *guid; e->kind = kind; + e->tupdate = tcreate; e->name = os_strdup (name ? name : ""); e->onlylocal = onlylocal; os_mutexInit (&e->lock); - if (onlylocal || is_builtin_endpoint (guid->entityid, vendorid)) + if (ddsi_plugin.builtintopic_is_visible (guid->entityid, onlylocal, vendorid)) { - e->iid = ddsi_iid_gen (); - *tk = NULL; + e->tk = ddsi_plugin.builtintopic_get_tkmap_entry (guid); + e->iid = e->tk->m_iid; } else { - struct ddsi_serdata *sd; - struct nn_keyhash kh; - memcpy (&kh, guid, sizeof (kh)); - /* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */ - sd = ddsi_serdata_from_keyhash (gv.builtin_participant_topic, &kh); - /* FIXME: this makes the iid for a reincarnation of a proxy entity dependent on whether an application reader kept the corresponding built-in topic instance around, it may be attractive to reconsider and guarantee a new iid in these cases, at least for the publication handle */ - *tk = ddsi_tkmap_find(sd, false, true); - ddsi_serdata_unref (sd); - e->iid = (*tk)->m_iid; + e->tk = NULL; + e->iid = ddsi_iid_gen (); } } static void entity_common_fini (struct entity_common *e) { + if (e->tk) + ddsi_tkmap_instance_unref (e->tk); os_free (e->name); os_mutexDestroy (&e->lock); } @@ -238,38 +240,27 @@ void local_reader_ary_setinvalid (struct local_reader_ary *x) os_mutexUnlock (&x->rdary_lock); } -static void write_builtin_topic_any (const struct entity_common *e, nn_wctime_t timestamp, bool alive, nn_vendorid_t vendorid, struct ddsi_tkmap_instance *tk) +nn_vendorid_t get_entity_vendorid (const struct entity_common *e) { - if (!(e->onlylocal || is_builtin_endpoint(e->guid.entityid, vendorid))) + switch (e->kind) { - /* initialize to avoid gcc warning ultimately caused by C's horrible type system */ - enum ddsi_sertopic_builtin_type type = DSBT_PARTICIPANT; - switch (e->kind) - { - case EK_PARTICIPANT: - case EK_PROXY_PARTICIPANT: - type = DSBT_PARTICIPANT; - break; - case EK_READER: - case EK_PROXY_READER: - type = DSBT_READER; - break; - case EK_WRITER: - case EK_PROXY_WRITER: - type = DSBT_WRITER; - break; - } - assert(type != DSBT_PARTICIPANT || (e->kind == EK_PARTICIPANT || e->kind == EK_PROXY_PARTICIPANT)); - ddsi_plugin.builtin_write (type, &e->guid, timestamp, alive); + case EK_PARTICIPANT: + case EK_READER: + case EK_WRITER: + return (nn_vendorid_t) MY_VENDOR_ID; + break; + case EK_PROXY_PARTICIPANT: + return ((const struct proxy_participant *) e)->vendor; + break; + case EK_PROXY_READER: + return ((const struct proxy_reader *) e)->c.vendor; + break; + case EK_PROXY_WRITER: + return ((const struct proxy_writer *) e)->c.vendor; + break; } - /* tkmap instance only needs to be kept around until the first write of a built-in topic (if none ever happens, it needn't be kept at all): afterward, the WHC of the local built-in topic writer will keep the entry alive. FIXME: the SPDP/SEPD ones currently use default sertopics instead of builtin sertopics, and so use different mappings and different instnace ids. No-one ever sees those ids, so it doesn't matter, but it would nicer if it could actually be the same one. FIXME: it would also be nicer if the local built-in topics and the SPDP/SEDP writers were the same, but I want the locally created endpoints visible in the built-in topics as well, and those don't exist in the discovery writers ... */ - if (tk) - ddsi_tkmap_instance_unref (tk); -} - -static void write_builtin_topic_local (const struct entity_common *e, nn_wctime_t timestamp, bool alive, struct ddsi_tkmap_instance *tk) -{ - write_builtin_topic_any(e, timestamp, alive, ownvendorid, tk); + assert (0); + return (nn_vendorid_t) NN_VENDORID_UNKNOWN; } /* DELETED PARTICIPANTS --------------------------------------------- */ @@ -408,7 +399,6 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis { struct participant *pp; nn_guid_t subguid, group_guid; - struct ddsi_tkmap_instance *tk; /* no reserved bits may be set */ assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0); @@ -451,7 +441,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis pp = os_malloc (sizeof (*pp)); - entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0), &tk); + entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, now (), ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0)); pp->user_refc = 1; pp->builtin_refc = 0; pp->builtins_deleted = 0; @@ -630,7 +620,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis trigger_recv_threads (); } - write_builtin_topic_local(&pp->e, now(), true, tk); + ddsi_plugin.builtintopic_write (&pp->e, now(), true); /* SPDP periodic broadcast uses the retransmit path, so the initial publication must be done differently. Must be later than making @@ -866,7 +856,7 @@ int delete_participant (const struct nn_guid *ppguid) struct participant *pp; if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL) return ERR_UNKNOWN_ENTITY; - write_builtin_topic_local(&pp->e, now(), false, NULL); + ddsi_plugin.builtintopic_write (&pp->e, now(), false); remember_deleted_participant_guid (&pp->e.guid); ephash_remove_participant_guid (pp); gcreq_participant (pp); @@ -2077,7 +2067,7 @@ static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn { int32_t reason; (void)tnow; - if (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid)) + if (!is_local_orphan_endpoint (&wr->e) && (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid))) return; if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0) { @@ -2295,7 +2285,7 @@ static void generic_do_local_match (struct entity_common *e, nn_mtime_t tnow) struct ephash_enum est; struct entity_common *em; enum entity_kind mkind; - if (is_builtin_entityid (e->guid.entityid, ownvendorid)) + if (is_builtin_entityid (e->guid.entityid, ownvendorid) && !is_local_orphan_endpoint (e)) /* never a need for local matches on discovery endpoints */ return; mkind = generic_do_local_match_mkind(e->kind); @@ -2374,34 +2364,27 @@ static void new_reader_writer_common (const struct nn_guid *guid, const struct d topic ? topic->typename : "(null)"); } -static void endpoint_common_init -( - struct entity_common *e, - struct endpoint_common *c, - enum entity_kind kind, - const struct nn_guid *guid, - const struct nn_guid *group_guid, - struct participant *pp, - struct ddsi_tkmap_instance **tk -) +static void endpoint_common_init (struct entity_common *e, struct endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp) { - entity_common_init (e, guid, NULL, kind, ownvendorid, pp->e.onlylocal, tk); + entity_common_init (e, guid, NULL, kind, now (), ownvendorid, pp->e.onlylocal); c->pp = ref_participant (pp, &e->guid); if (group_guid) - { c->group_guid = *group_guid; - } else - { memset (&c->group_guid, 0, sizeof (c->group_guid)); - } } static void endpoint_common_fini (struct entity_common *e, struct endpoint_common *c) { if (!is_builtin_entityid(e->guid.entityid, ownvendorid)) pp_release_entityid(c->pp, e->guid.entityid); - unref_participant (c->pp, &e->guid); + if (c->pp) + unref_participant (c->pp, &e->guid); + else + { + /* only for the (almost pseudo) writers used for generating the built-in topics */ + assert (is_local_orphan_endpoint (e)); + } entity_common_fini (e); } @@ -2608,25 +2591,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru return n; } -static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity) +static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity) { - struct writer *wr; - nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; - - assert (is_writer_entityid (guid->entityid)); - assert (ephash_lookup_writer_guid (guid) == NULL); - assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0); - - new_reader_writer_common (guid, topic, xqos); - wr = os_malloc (sizeof (*wr)); - - /* want a pointer to the participant so that a parallel call to - delete_participant won't interfere with our ability to address - the participant */ - - endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, &tk); - os_condInit (&wr->throttle_cond, &wr->e.lock); wr->seq = 0; wr->cs_seq = 0; @@ -2658,12 +2624,10 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct assert (wr->xqos->aliased == 0); set_topic_type_name (wr->xqos, topic); - if (dds_get_log_mask() & DDS_LC_DISCOVERY) - { - DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid)); - nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos); - DDS_LOG(DDS_LC_DISCOVERY, "}\n"); - } + DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid)); + nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos); + DDS_LOG(DDS_LC_DISCOVERY, "}\n"); + assert (wr->xqos->present & QP_RELIABILITY); wr->reliable = (wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS); assert (wr->xqos->present & QP_DURABILITY); @@ -2818,23 +2782,43 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct ut_avlInit (&wr_local_readers_treedef, &wr->local_readers); local_reader_ary_init (&wr->rdary); +} + +static struct writer *new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity) +{ + struct writer *wr; + nn_mtime_t tnow = now_mt (); + + assert (is_writer_entityid (guid->entityid)); + assert (ephash_lookup_writer_guid (guid) == NULL); + assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0); + + new_reader_writer_common (guid, topic, xqos); + wr = os_malloc (sizeof (*wr)); + + /* want a pointer to the participant so that a parallel call to + delete_participant won't interfere with our ability to address + the participant */ + + endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp); + new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity); /* guid_hash needed for protocol handling, so add it before we send - out our first message. Also: needed for matching, and swapping - the order if hash insert & matching creates a window during which - neither of two endpoints being created in parallel can discover - the other. */ + out our first message. Also: needed for matching, and swapping + the order if hash insert & matching creates a window during which + neither of two endpoints being created in parallel can discover + the other. */ ephash_insert_writer_guid (wr); /* once it exists, match it with proxy writers and broadcast - existence (I don't think it matters much what the order of these - two is, but it seems likely that match-then-broadcast has a - slightly lower likelihood that a response from a proxy reader - gets dropped) -- but note that without adding a lock it might be - deleted while we do so */ + existence (I don't think it matters much what the order of these + two is, but it seems likely that match-then-broadcast has a + slightly lower likelihood that a response from a proxy reader + gets dropped) -- but note that without adding a lock it might be + deleted while we do so */ match_writer_with_proxy_readers (wr, tnow); match_writer_with_local_readers (wr, tnow); - write_builtin_topic_local(&wr->e, now(), true, tk); + ddsi_plugin.builtintopic_write (&wr->e, now(), true); sedp_write_writer (wr); if (wr->lease_duration != T_NEVER) @@ -2846,7 +2830,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct return wr; } -struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg) +struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg) { struct participant *pp; struct writer * wr; @@ -2866,6 +2850,29 @@ struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_ return wr; } +struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc) +{ + nn_guid_t guid; + struct local_orphan_writer *lowr; + struct writer *wr; + nn_mtime_t tnow = now_mt (); + + DDS_LOG(DDS_LC_DISCOVERY, "new_local_orphan_writer(%s/%s)\n", topic->name, topic->typename); + lowr = os_malloc (sizeof (*lowr)); + wr = &lowr->wr; + + memset (&guid.prefix, 0, sizeof (guid.prefix)); + guid.entityid = entityid; + entity_common_init (&wr->e, &guid, NULL, EK_WRITER, now (), ownvendorid, true); + wr->c.pp = NULL; + memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid)); + new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL); + ephash_insert_writer_guid (wr); + match_writer_with_local_readers (wr, tnow); + ddsi_plugin.builtintopic_write (&wr->e, now(), true); + return lowr; +} + static void gc_delete_writer (struct gcreq *gcreq) { struct writer *wr = gcreq->arg; @@ -2960,7 +2967,7 @@ int delete_writer_nolinger_locked (struct writer *wr) { DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid %x:%x:%x:%x) ...\n", PGUID (wr->e.guid)); ASSERT_MUTEX_HELD (&wr->e.lock); - write_builtin_topic_local(&wr->e, now(), false, NULL); + ddsi_plugin.builtintopic_write (&wr->e, now(), false); local_reader_ary_setinvalid (&wr->rdary); ephash_remove_writer_guid (wr); writer_set_state (wr, WRST_DELETING); @@ -2990,6 +2997,13 @@ int delete_writer_nolinger (const struct nn_guid *guid) return 0; } +void delete_local_orphan_writer (struct local_orphan_writer *lowr) +{ + os_mutexLock (&lowr->wr.e.lock); + delete_writer_nolinger_locked (&lowr->wr); + os_mutexUnlock (&lowr->wr.e.lock); +} + int delete_writer (const struct nn_guid *guid) { struct writer *wr; @@ -3177,7 +3191,6 @@ static struct reader * new_reader_guid struct reader * rd; nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; assert (!is_writer_entityid (guid->entityid)); assert (ephash_lookup_reader_guid (guid) == NULL); @@ -3186,7 +3199,7 @@ static struct reader * new_reader_guid new_reader_writer_common (guid, topic, xqos); rd = os_malloc (sizeof (*rd)); - endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, &tk); + endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp); /* Copy QoS, merging in defaults */ rd->xqos = os_malloc (sizeof (*rd->xqos)); @@ -3290,7 +3303,7 @@ static struct reader * new_reader_guid ephash_insert_reader_guid (rd); match_reader_with_proxy_writers (rd, tnow); match_reader_with_local_writers (rd, tnow); - write_builtin_topic_local(&rd->e, now(), true, tk); + ddsi_plugin.builtintopic_write (&rd->e, now(), true); sedp_write_reader (rd); return rd; } @@ -3383,7 +3396,7 @@ int delete_reader (const struct nn_guid *guid) (ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc); } DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid)); - write_builtin_topic_local(&rd->e, now(), false, NULL); + ddsi_plugin.builtintopic_write (&rd->e, now(), false); ephash_remove_reader_guid (rd); gcreq_reader (rd); return 0; @@ -3459,7 +3472,6 @@ void new_proxy_participant runs on a single thread, it can't go wrong. FIXME, maybe? The same holds for the other functions for creating entities. */ struct proxy_participant *proxypp; - struct ddsi_tkmap_instance *tk; assert (ppguid->entityid.u == NN_ENTITYID_PARTICIPANT); assert (ephash_lookup_proxy_participant_guid (ppguid) == NULL); @@ -3469,7 +3481,7 @@ void new_proxy_participant proxypp = os_malloc (sizeof (*proxypp)); - entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, vendor, false, &tk); + entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false); proxypp->refc = 1; proxypp->lease_expired = 0; proxypp->vendor = vendor; @@ -3625,7 +3637,7 @@ void new_proxy_participant if (proxypp->owns_lease) lease_register (os_atomic_ldvoidp (&proxypp->lease)); - write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, tk); + ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true); os_mutexUnlock (&proxypp->e.lock); } @@ -3643,7 +3655,7 @@ int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, co switch (source) { case UPD_PROXYPP_SPDP: - write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, NULL); + ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true); proxypp->proxypp_have_spdp = 1; break; case UPD_PROXYPP_CM: @@ -3892,7 +3904,7 @@ int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t t return ERR_UNKNOWN_ENTITY; } DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); - write_builtin_topic_any(&ppt->e, timestamp, false, ppt->vendor, NULL); + ddsi_plugin.builtintopic_write (&ppt->e, timestamp, false); remember_deleted_participant_guid (&ppt->e.guid); ephash_remove_proxy_participant_guid (ppt); os_mutexUnlock (&gv.lock); @@ -4021,12 +4033,7 @@ void delete_proxy_group (const nn_guid_t *guid, nn_wctime_t timestamp, int isimp /* PROXY-ENDPOINT --------------------------------------------------- */ -static void proxy_endpoint_common_init -( - struct entity_common *e, struct proxy_endpoint_common *c, - enum entity_kind kind, const struct nn_guid *guid, struct proxy_participant *proxypp, - struct addrset *as, const nn_plist_t *plist, struct ddsi_tkmap_instance **tk -) +static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist) { const char *name; @@ -4036,7 +4043,7 @@ static void proxy_endpoint_common_init assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == (QP_TOPIC_NAME | QP_TYPE_NAME)); name = (plist->present & PP_ENTITY_NAME) ? plist->entity_name : ""; - entity_common_init (e, guid, name, kind, proxypp->vendor, false, tk); + entity_common_init (e, guid, name, kind, tcreate, proxypp->vendor, false); c->xqos = nn_xqos_dup (&plist->qos); c->as = ref_addrset (as); c->topic = NULL; /* set from first matching reader/writer */ @@ -4071,8 +4078,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, struct proxy_writer *pwr; int isreliable; nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; - (void)timestamp; + assert (is_writer_entityid (guid->entityid)); assert (ephash_lookup_proxy_writer_guid (guid) == NULL); @@ -4083,7 +4089,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, } pwr = os_malloc (sizeof (*pwr)); - proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, proxypp, as, plist, &tk); + proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, proxypp, as, plist); ut_avlInit (&pwr_readers_treedef, &pwr->readers); pwr->n_reliable_readers = 0; @@ -4149,7 +4155,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, local_reader_ary_init (&pwr->rdary); ephash_insert_proxy_writer_guid (pwr); match_proxy_writer_with_readers (pwr, tnow); - write_builtin_topic_any(&pwr->e, timestamp, true, pwr->c.vendor, tk); + ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true); os_mutexLock (&pwr->e.lock); pwr->local_matching_inprogress = 0; @@ -4265,7 +4271,6 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq) int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit) { struct proxy_writer *pwr; - (void)timestamp; (void)isimplicit; DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_writer (%x:%x:%x:%x) ", PGUID (*guid)); os_mutexLock (&gv.lock); @@ -4281,7 +4286,7 @@ int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int from removing themselves from the proxy writer's rdary[]. */ local_reader_ary_setinvalid (&pwr->rdary); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); - write_builtin_topic_any(&pwr->e, timestamp, false, pwr->c.vendor, NULL); + ddsi_plugin.builtintopic_write (&pwr->e, timestamp, false); ephash_remove_proxy_writer_guid (pwr); os_mutexUnlock (&gv.lock); gcreq_proxy_writer (pwr); @@ -4299,8 +4304,6 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, struct proxy_participant *proxypp; struct proxy_reader *prd; nn_mtime_t tnow = now_mt (); - struct ddsi_tkmap_instance *tk; - (void)timestamp; assert (!is_writer_entityid (guid->entityid)); assert (ephash_lookup_proxy_reader_guid (guid) == NULL); @@ -4312,7 +4315,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, } prd = os_malloc (sizeof (*prd)); - proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, proxypp, as, plist, &tk); + proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, proxypp, as, plist); prd->deleting = 0; #ifdef DDSI_INCLUDE_SSM @@ -4326,7 +4329,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, ut_avlInit (&prd_writers_treedef, &prd->writers); ephash_insert_proxy_reader_guid (prd); match_proxy_reader_with_writers (prd, tnow); - write_builtin_topic_any(&prd->e, timestamp, true, prd->c.vendor, tk); + ddsi_plugin.builtintopic_write (&prd->e, timestamp, true); return 0; } @@ -4399,7 +4402,6 @@ static void gc_delete_proxy_reader (struct gcreq *gcreq) int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit) { struct proxy_reader *prd; - (void)timestamp; (void)isimplicit; DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_reader (%x:%x:%x:%x) ", PGUID (*guid)); os_mutexLock (&gv.lock); @@ -4409,7 +4411,7 @@ int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n"); return ERR_UNKNOWN_ENTITY; } - write_builtin_topic_any(&prd->e, timestamp, false, prd->c.vendor, NULL); + ddsi_plugin.builtintopic_write (&prd->e, timestamp, false); ephash_remove_proxy_reader_guid (prd); os_mutexUnlock (&gv.lock); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); diff --git a/src/core/ddsi/src/q_gc.c b/src/core/ddsi/src/q_gc.c index fdec90e..4ea9d4f 100644 --- a/src/core/ddsi/src/q_gc.c +++ b/src/core/ddsi/src/q_gc.c @@ -180,6 +180,14 @@ struct gcreq_queue *gcreq_queue_new (void) return q; } +void gcreq_queue_drain (struct gcreq_queue *q) +{ + os_mutexLock (&q->lock); + while (q->count != 0) + os_condWait (&q->cond, &q->lock); + os_mutexUnlock (&q->lock); +} + void gcreq_queue_free (struct gcreq_queue *q) { struct gcreq *gcreq; @@ -191,7 +199,8 @@ void gcreq_queue_free (struct gcreq_queue *q) os_mutexLock (&q->lock); q->terminate = 1; /* Wait until there is only request in existence, the one we just - allocated. Then we know the gc system is quiet. */ + allocated (this is also why we can't use "drain" here). Then + we know the gc system is quiet. */ while (q->count != 1) os_condWait (&q->cond, &q->lock); os_mutexUnlock (&q->lock); @@ -227,7 +236,7 @@ void gcreq_free (struct gcreq *gcreq) struct gcreq_queue *gcreq_queue = gcreq->queue; os_mutexLock (&gcreq_queue->lock); --gcreq_queue->count; - if (gcreq_queue->terminate && gcreq_queue->count <= 1) + if (gcreq_queue->count <= 1) os_condBroadcast (&gcreq_queue->cond); os_mutexUnlock (&gcreq_queue->lock); os_free (gcreq); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 3d2b84c..c0758bd 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -54,7 +54,6 @@ #include "ddsi/ddsi_raweth.h" #include "ddsi/ddsi_mcgroup.h" #include "ddsi/ddsi_serdata_default.h" -#include "ddsi/ddsi_serdata_builtin.h" #include "ddsi/ddsi_tkmap.h" #include "dds__whc.h" @@ -772,18 +771,12 @@ static void make_special_topics (void) { gv.plist_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist); gv.rawcdr_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr); - gv.builtin_participant_topic = new_sertopic_builtin (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant"); - gv.builtin_reader_topic = new_sertopic_builtin (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription"); - gv.builtin_writer_topic = new_sertopic_builtin (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication"); } static void free_special_topics (void) { ddsi_sertopic_unref (gv.plist_topic); ddsi_sertopic_unref (gv.rawcdr_topic); - ddsi_sertopic_unref (gv.builtin_participant_topic); - ddsi_sertopic_unref (gv.builtin_reader_topic); - ddsi_sertopic_unref (gv.builtin_writer_topic); } static int setup_and_start_recv_threads (void) @@ -1408,7 +1401,7 @@ static void builtins_dqueue_ready_cb (void *varg) os_mutexUnlock (&arg->lock); } -void rtps_term (void) +void rtps_stop (void) { struct thread_state1 *self = lookup_thread_state (); #ifdef DDSI_INCLUDE_NETWORK_CHANNELS @@ -1527,17 +1520,26 @@ void rtps_term (void) } /* Wait until all participants are really gone => by then we can be - certain that no new GC requests will be added */ + certain that no new GC requests will be added, short of what we + do here */ os_mutexLock (&gv.participant_set_lock); while (gv.nparticipants > 0) os_condWait (&gv.participant_set_cond, &gv.participant_set_lock); os_mutexUnlock (&gv.participant_set_lock); + /* Wait until no more GC requests are outstanding -- not really + necessary, but it allows us to claim the stack is quiescent + at this point */ + gcreq_queue_drain (gv.gcreq_queue); + /* Clean up privileged_pp -- it must be NULL now (all participants are gone), but the lock still needs to be destroyed */ assert (gv.privileged_pp == NULL); os_mutexDestroy (&gv.privileged_pp_lock); +} +void rtps_fini (void) +{ /* Shut down the GC system -- no new requests will be added */ gcreq_queue_free (gv.gcreq_queue); diff --git a/src/core/ddsi/src/q_transmit.c b/src/core/ddsi/src/q_transmit.c index d763356..28ebdc9 100644 --- a/src/core/ddsi/src/q_transmit.c +++ b/src/core/ddsi/src/q_transmit.c @@ -893,7 +893,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist res = 1; #ifndef NDEBUG - if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) + if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER && !is_local_orphan_endpoint (&wr->e)) { struct whc_state whcst; whc_get_state(wr->whc, &whcst); diff --git a/src/os/CMakeLists.txt b/src/os/CMakeLists.txt index de0d256..e95ac92 100644 --- a/src/os/CMakeLists.txt +++ b/src/os/CMakeLists.txt @@ -95,7 +95,7 @@ endif() if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro") target_link_libraries(OSAPI INTERFACE -lsocket -lnsl) - target_compile_definitions(OSAPI PRIVATE -KPIC) + add_definitions(-KPIC) endif() # Determine if platform is big or little endian. diff --git a/src/os/include/os/posix/os_platform_socket.h b/src/os/include/os/posix/os_platform_socket.h index 7dadd5d..d16dfd9 100644 --- a/src/os/include/os/posix/os_platform_socket.h +++ b/src/os/include/os/posix/os_platform_socket.h @@ -74,8 +74,6 @@ extern "C" { typedef size_t os_iov_len_t; #if defined(__sun) && !defined(_XPG4_2) -#define msg_accrights msg_control -#define msg_accrightslen msg_controllen #define OS_MSGHDR_FLAGS 0 #else #define OS_MSGHDR_FLAGS 1 diff --git a/src/os/tests/mutex.c b/src/os/tests/mutex.c index f33a313..6a03be1 100644 --- a/src/os/tests/mutex.c +++ b/src/os/tests/mutex.c @@ -190,72 +190,6 @@ CU_Test(os_mutex, basic) printf("Ending os_mutex_basic\n"); } -#define RUNTIME_SEC (4) -#define NUM_THREADS (8) -#define OS_STRESS_STOP (0) -#define OS_STRESS_GO (1) -#define THREAD_NAME_LEN (8) - -struct os_mutex_stress { - os_threadId tid; - os_mutex m; - os_atomic_uint32_t * flag; - char name[THREAD_NAME_LEN]; -}; - -static uint32_t -os_mutex_init_thr( - void *args) -{ - struct os_mutex_stress *state = (struct os_mutex_stress *)args; - os_result r; - uint32_t iterations = 0; - - do { - os_mutexInit(&state->m); - r = os_mutexLock_s(&state->m); /* Use the mutex to check that all is OK. */ - CU_ASSERT_EQUAL(r, os_resultSuccess); /* Failure can't be forced. */ - os_mutexUnlock(&state->m); - os_mutexDestroy(&state->m); - iterations++; - } while ( os_atomic_ld32(state->flag) != OS_STRESS_STOP && r == os_resultSuccess); - - printf("%s <%"PRIxMAX">: Performed %u iterations. Stopping now.\n", state->name, os_threadIdToInteger(os_threadIdSelf()), iterations); - return r != os_resultSuccess; /* Return true on faulure */ -} - -CU_Test(os_mutex, init_stress) -{ - struct os_mutex_stress threads[NUM_THREADS]; - os_threadAttr tattr; - unsigned i; - os_atomic_uint32_t flag = OS_ATOMIC_UINT32_INIT(OS_STRESS_GO); - os_time runtime = { .tv_sec = RUNTIME_SEC, .tv_nsec = 0 }; - - printf("Starting os_mutex_init_stress\n"); - - os_threadAttrInit(&tattr); - for ( i = 0; i < NUM_THREADS; i++ ) { - (void) snprintf(&threads[i].name[0], THREAD_NAME_LEN, "thr%u", i); - threads[i].flag = &flag; - os_threadCreate(&threads[i].tid, threads[i].name, &tattr, &os_mutex_init_thr, &threads[i]); - printf("main <%"PRIxMAX">: Started thread '%s' with thread-id %" PRIxMAX "\n", os_threadIdToInteger(os_threadIdSelf()), threads[i].name, os_threadIdToInteger(threads[i].tid)); - } - - printf("main <%"PRIxMAX">: Test will run for ~%ds with %d threads\n", os_threadIdToInteger(os_threadIdSelf()), RUNTIME_SEC, NUM_THREADS); - os_nanoSleep(runtime); - os_atomic_st32(&flag, OS_STRESS_STOP); - - for ( ; i != 0; i-- ) { - uint32_t thread_failed; - os_threadWaitExit(threads[i - 1].tid, &thread_failed); - printf("main <%"PRIxMAX">: Thread %s <%" PRIxMAX "> stopped with result %s.\n", os_threadIdToInteger(os_threadIdSelf()), threads[i - 1].name, os_threadIdToInteger(threads[i - 1].tid), thread_failed ? "FAILED" : "PASS"); - - CU_ASSERT_FALSE(thread_failed); - } - printf("Ending os_mutex_init_stress\n"); -} - CU_Test(os_mutex, lock, false) { /* Test critical section access with locking and PRIVATE scope */ diff --git a/src/tools/ddsls/ddsls.c b/src/tools/ddsls/ddsls.c index 40cb1a3..d6accb8 100644 --- a/src/tools/ddsls/ddsls.c +++ b/src/tools/ddsls/ddsls.c @@ -192,22 +192,22 @@ void qp_duration_qos (const dds_qos_t *q, FILE *fp, const char *what, bool (*qge void qp_lifespan (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "lifespan: duration = ", dds_qget_lifespan); + qp_duration_qos (q, fp, "lifespan: duration", dds_qget_lifespan); } void qp_deadline (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "deadline: period = ", dds_qget_deadline); + qp_duration_qos (q, fp, "deadline: period", dds_qget_deadline); } void qp_latency_budget (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "latency_budget: duration = ", dds_qget_latency_budget); + qp_duration_qos (q, fp, "latency_budget: duration", dds_qget_latency_budget); } void qp_time_based_filter (const dds_qos_t *q, FILE *fp) { - qp_duration_qos (q, fp, "time_based_filter: minimum_separation = ", dds_qget_time_based_filter); + qp_duration_qos (q, fp, "time_based_filter: minimum_separation", dds_qget_time_based_filter); } void qp_ownership (const dds_qos_t *q, FILE *fp)