From 4778d6c5dfb6853f07f81ac59ae0d65afc5e3c93 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 15 Apr 2019 11:42:56 +0200 Subject: [PATCH] add QoS to ignore local readers/writers (#78) Adds a new "ignorelocal" QoS to the readers/writers to ignore local matching readers/writers, with three settings: * DDS_IGNORELOCAL_NONE: default * DDS_IGNORELOCAL_PARTICIPANT: ignores readers/writers in the same participant * DDS_IGNORELOCAL_PROCESS: ignores readers/writers in the same process These can be set/got using dds_qset_ignorelocal and dds_qget_ignorelocal. If a matching reader or writer is ignored because of this setting, it is as-if that reader or writer doesn't exist. No traffic will be generated or data retained on its behalf. There are no consequences for interoperability as this is (by definition) a local affair. Signed-off-by: Erik Boasson --- .../ddsc/include/dds/ddsc/dds_public_qos.h | 33 +++++++++++++++++++ src/core/ddsc/src/dds_qos.c | 21 ++++++++++++ src/core/ddsi/include/dds/ddsi/q_xqos.h | 12 +++++++ src/core/ddsi/src/q_entity.c | 27 +++++++++++++++ src/core/ddsi/src/q_plist.c | 10 ++++++ 5 files changed, 103 insertions(+) diff --git a/src/core/ddsc/include/dds/ddsc/dds_public_qos.h b/src/core/ddsc/include/dds/ddsc/dds_public_qos.h index 4ffceb4..8f319c1 100644 --- a/src/core/ddsc/include/dds/ddsc/dds_public_qos.h +++ b/src/core/ddsc/include/dds/ddsc/dds_public_qos.h @@ -136,6 +136,16 @@ typedef enum dds_presentation_access_scope_kind } dds_presentation_access_scope_kind_t; +/** Ignore-local QoS: Applies to DataReader, DataWriter */ +typedef enum dds_ignorelocal_kind +{ + DDS_IGNORELOCAL_NONE, + DDS_IGNORELOCAL_PARTICIPANT, + DDS_IGNORELOCAL_PROCESS +} +dds_ignorelocal_kind_t; + + /** * @brief Allocate memory and initialize default QoS-policies * @@ -465,6 +475,16 @@ dds_qset_durability_service ( int32_t max_instances, int32_t max_samples_per_instance); +/** + * @brief Set the ignore-local policy of a qos structure + * + * @param[in,out] qos - Pointer to a dds_qos_t structure that will store the policy + * @param[in] ignore - True if readers and writers owned by the same participant should be ignored + */ +DDS_EXPORT void dds_qset_ignorelocal ( + dds_qos_t * __restrict qos, + dds_ignorelocal_kind_t ignore); + /** * @brief Get the userdata from a qos structure * @@ -753,6 +773,19 @@ dds_qget_durability_service ( int32_t *max_instances, int32_t *max_samples_per_instance); + /** + * @brief Get the ignore-local qos policy + * + * @param[in] qos - Pointer to a dds_qos_t structure storing the policy + * @param[in,out] ignore - Pointer that will store whether to ignore readers/writers owned by the same participant (optional) + * + * @returns - false iff any of the arguments is invalid or the qos is not present in the qos object + */ +DDS_EXPORT bool +dds_qget_ignorelocal ( + const dds_qos_t * __restrict qos, + dds_ignorelocal_kind_t *ignore); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsc/src/dds_qos.c b/src/core/ddsc/src/dds_qos.c index bc1869c..25a3ce6 100644 --- a/src/core/ddsc/src/dds_qos.c +++ b/src/core/ddsc/src/dds_qos.c @@ -658,6 +658,16 @@ void dds_qset_durability_service } } +void dds_qset_ignorelocal (dds_qos_t * __restrict qos, dds_ignorelocal_kind_t ignore) +{ + if (qos) { + qos->ignorelocal.value = (nn_ignorelocal_kind_t) ignore; + qos->present |= QP_CYCLONE_IGNORELOCAL; + } else { + DDS_ERROR("Argument QoS is NULL\n"); + } +} + bool dds_qget_userdata (const dds_qos_t * __restrict qos, void **value, size_t *sz) { if (!qos || !(qos->present & QP_USER_DATA)) { @@ -931,3 +941,14 @@ bool dds_qget_durability_service (const dds_qos_t * __restrict qos, dds_duration } return true; } + +bool dds_qget_ignorelocal (const dds_qos_t * __restrict qos, dds_ignorelocal_kind_t *ignore) +{ + if (!qos || !(qos->present & QP_CYCLONE_IGNORELOCAL)) { + return false; + } + if (ignore) { + *ignore = (dds_ignorelocal_kind_t) qos->ignorelocal.value; + } + return true; +} diff --git a/src/core/ddsi/include/dds/ddsi/q_xqos.h b/src/core/ddsi/include/dds/ddsi/q_xqos.h index 46a15bf..d13207d 100644 --- a/src/core/ddsi/include/dds/ddsi/q_xqos.h +++ b/src/core/ddsi/include/dds/ddsi/q_xqos.h @@ -205,6 +205,16 @@ typedef struct nn_share_qospolicy { char *name; } nn_share_qospolicy_t; +typedef enum nn_ignorelocal_kind { + NN_NONE_IGNORELOCAL_QOS, + NN_PARTICIPANT_IGNORELOCAL_QOS, + NN_PROCESS_IGNORELOCAL_QOS +} nn_ignorelocal_kind_t; + +typedef struct nn_ignorelocal_qospolicy { + nn_ignorelocal_kind_t value; +} nn_ignorelocal_qospolicy_t; + /***/ /* Qos Present bit indices */ @@ -237,6 +247,7 @@ typedef struct nn_share_qospolicy { #define QP_PRISMTECH_ENTITY_FACTORY ((uint64_t)1 << 27) #define QP_PRISMTECH_SYNCHRONOUS_ENDPOINT ((uint64_t)1 << 28) #define QP_RTI_TYPECODE ((uint64_t)1 << 29) +#define QP_CYCLONE_IGNORELOCAL ((uint64_t)1 << 30) /* Partition QoS is not RxO according to the specification (DDS 1.2, section 7.1.3), but communication will not take place unless it @@ -290,6 +301,7 @@ typedef struct nn_xqos { /*x xR*/nn_reader_lifespan_qospolicy_t reader_lifespan; /*x xR*/nn_share_qospolicy_t share; /*xxx */nn_synchronous_endpoint_qospolicy_t synchronous_endpoint; + /* x */nn_ignorelocal_qospolicy_t ignorelocal; /* X*/nn_octetseq_t rti_typecode; } nn_xqos_t; diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 565c72b..3626afd 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2058,12 +2058,39 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r proxy_writer_add_connection (pwr, rd, tnow, init_count); } +static bool ignore_local_p (const nn_guid_t *guid1, const nn_guid_t *guid2, const struct nn_xqos *xqos1, const struct nn_xqos *xqos2) +{ + assert (xqos1->present & QP_CYCLONE_IGNORELOCAL); + assert (xqos2->present & QP_CYCLONE_IGNORELOCAL); + switch (xqos1->ignorelocal.value) + { + case NN_NONE_IGNORELOCAL_QOS: + break; + case NN_PARTICIPANT_IGNORELOCAL_QOS: + return memcmp (&guid1->prefix, &guid2->prefix, sizeof (guid1->prefix)) == 0; + case NN_PROCESS_IGNORELOCAL_QOS: + return true; + } + switch (xqos2->ignorelocal.value) + { + case NN_NONE_IGNORELOCAL_QOS: + break; + case NN_PARTICIPANT_IGNORELOCAL_QOS: + return memcmp (&guid1->prefix, &guid2->prefix, sizeof (guid1->prefix)) == 0; + case NN_PROCESS_IGNORELOCAL_QOS: + return true; + } + return false; +} + static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn_mtime_t tnow) { int32_t reason; (void)tnow; if (!is_local_orphan_endpoint (&wr->e) && (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) || is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE))) return; + if (ignore_local_p (&wr->e.guid, &rd->e.guid, wr->xqos, rd->xqos)) + return; if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0) { writer_qos_mismatch (wr, (uint32_t)reason); diff --git a/src/core/ddsi/src/q_plist.c b/src/core/ddsi/src/q_plist.c index ad97995..c7a8222 100644 --- a/src/core/ddsi/src/q_plist.c +++ b/src/core/ddsi/src/q_plist.c @@ -2656,6 +2656,9 @@ static void xqos_init_default_common (nn_xqos_t *xqos) xqos->present |= QP_PRISMTECH_SYNCHRONOUS_ENDPOINT; xqos->synchronous_endpoint.value = 0; + + xqos->present |= QP_CYCLONE_IGNORELOCAL; + xqos->ignorelocal.value = NN_NONE_IGNORELOCAL_QOS; } void nn_xqos_init_default_reader (nn_xqos_t *xqos) @@ -2810,6 +2813,7 @@ void nn_xqos_mergein_missing (nn_xqos_t *a, const nn_xqos_t *b) CQ (PRISMTECH_READER_LIFESPAN, reader_lifespan); CQ (PRISMTECH_ENTITY_FACTORY, entity_factory); CQ (PRISMTECH_SYNCHRONOUS_ENDPOINT, synchronous_endpoint); + CQ (CYCLONE_IGNORELOCAL, ignorelocal); #undef CQ /* For allocated ones it is Not strictly necessary to use tmp, as @@ -3197,6 +3201,10 @@ uint64_t nn_xqos_delta (const nn_xqos_t *a, const nn_xqos_t *b, uint64_t mask) if (octetseqs_differ (&a->rti_typecode, &b->rti_typecode)) delta |= QP_RTI_TYPECODE; } + if (check & QP_CYCLONE_IGNORELOCAL) { + if (a->ignorelocal.value != b->ignorelocal.value) + delta |= QP_CYCLONE_IGNORELOCAL; + } return delta; } @@ -3257,6 +3265,7 @@ void nn_xqos_addtomsg (struct nn_xmsg *m, const nn_xqos_t *xqos, uint64_t wanted SIMPLE (PRISMTECH_ENTITY_FACTORY, entity_factory); SIMPLE (PRISMTECH_SYNCHRONOUS_ENDPOINT, synchronous_endpoint); FUNC_BY_REF (RTI_TYPECODE, rti_typecode, octetseq); + /* CYCLONE_IGNORELOCAL is not visible on the wire */ #undef FUNC_BY_REF #undef FUNC_BY_VAL #undef SIMPLE @@ -3467,6 +3476,7 @@ void nn_log_xqos (uint32_t cat, const nn_xqos_t *xqos) log_octetseq (cat, xqos->rti_typecode.length, xqos->rti_typecode.value); DDS_LOG(cat, ">"); }); + DO (CYCLONE_IGNORELOCAL, { LOGB1 ("ignorelocal=%u", xqos->ignorelocal.value); }); #undef PRINTARG_DUR #undef FMT_DUR