diff --git a/src/core/ddsc/src/dds__qos.h b/src/core/ddsc/src/dds__qos.h index 4d66c67..be21eb9 100644 --- a/src/core/ddsc/src/dds__qos.h +++ b/src/core/ddsc/src/dds__qos.h @@ -12,14 +12,43 @@ #ifndef _DDS_QOS_H_ #define _DDS_QOS_H_ -#include "dds__entity.h" #include "dds/ddsi/q_xqos.h" #if defined (__cplusplus) extern "C" { #endif -dds_return_t dds_qos_validate_mutable_common (const dds_qos_t *qos); +#define DDS_TOPIC_QOS_MASK \ + (QP_TOPIC_DATA | QP_DURABILITY | QP_DURABILITY_SERVICE | \ + QP_DEADLINE | QP_LATENCY_BUDGET | QP_OWNERSHIP | QP_LIVELINESS | \ + QP_RELIABILITY | QP_TRANSPORT_PRIORITY | QP_LIFESPAN | \ + QP_DESTINATION_ORDER | QP_HISTORY | QP_RESOURCE_LIMITS) + +#define DDS_PARTICIPANT_QOS_MASK \ + (QP_USER_DATA | QP_PRISMTECH_ENTITY_FACTORY | QP_CYCLONE_IGNORELOCAL) + +#define DDS_PUBLISHER_QOS_MASK \ + (QP_PARTITION | QP_PRESENTATION | QP_GROUP_DATA | \ + QP_PRISMTECH_ENTITY_FACTORY | QP_CYCLONE_IGNORELOCAL) + +#define DDS_READER_QOS_MASK \ + (QP_USER_DATA | QP_DURABILITY | QP_DEADLINE | QP_LATENCY_BUDGET | \ + QP_OWNERSHIP | QP_LIVELINESS | QP_TIME_BASED_FILTER | \ + QP_RELIABILITY | QP_DESTINATION_ORDER | QP_HISTORY | \ + QP_RESOURCE_LIMITS | QP_PRISMTECH_READER_DATA_LIFECYCLE | \ + QP_CYCLONE_IGNORELOCAL) + +#define DDS_SUBSCRIBER_QOS_MASK \ + (QP_PARTITION | QP_PRESENTATION | QP_GROUP_DATA | \ + QP_PRISMTECH_ENTITY_FACTORY | QP_CYCLONE_IGNORELOCAL) + +#define DDS_WRITER_QOS_MASK \ + (QP_USER_DATA | QP_DURABILITY | QP_DURABILITY_SERVICE | QP_DEADLINE | \ + QP_LATENCY_BUDGET | QP_OWNERSHIP | QP_OWNERSHIP_STRENGTH | \ + QP_LIVELINESS | QP_RELIABILITY | QP_TRANSPORT_PRIORITY | \ + QP_LIFESPAN | QP_DESTINATION_ORDER | QP_HISTORY | \ + QP_RESOURCE_LIMITS | QP_PRISMTECH_WRITER_DATA_LIFECYCLE | \ + QP_CYCLONE_IGNORELOCAL) #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index f7cc781..c663bc4 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -17,7 +17,6 @@ #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_plist.h" /* for nn_keyhash */ #include "dds__init.h" -#include "dds__qos.h" #include "dds__domain.h" #include "dds__participant.h" #include "dds__types.h" diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 368f40c..4b51023 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -19,6 +19,8 @@ #include "dds__writer.h" #include "dds__reader.h" #include "dds__listener.h" +#include "dds__qos.h" +#include "dds__topic.h" #include "dds/version.h" #include "dds/ddsi/q_xqos.h" @@ -339,6 +341,32 @@ dds_return_t dds_get_children (dds_entity_t entity, dds_entity_t *children, size } } +static uint64_t entity_kind_qos_mask (dds_entity_kind_t kind) +{ + switch (kind) + { + case DDS_KIND_TOPIC: + return DDS_TOPIC_QOS_MASK; + case DDS_KIND_PARTICIPANT: + return DDS_PARTICIPANT_QOS_MASK; + case DDS_KIND_READER: + return DDS_READER_QOS_MASK; + case DDS_KIND_WRITER: + return DDS_WRITER_QOS_MASK; + case DDS_KIND_SUBSCRIBER: + return DDS_SUBSCRIBER_QOS_MASK; + case DDS_KIND_PUBLISHER: + return DDS_PUBLISHER_QOS_MASK; + case DDS_KIND_DONTCARE: + case DDS_KIND_COND_READ: + case DDS_KIND_COND_QUERY: + case DDS_KIND_COND_GUARD: + case DDS_KIND_WAITSET: + break; + } + return 0; +} + dds_return_t dds_get_qos (dds_entity_t entity, dds_qos_t *qos) { dds_entity *e; @@ -362,29 +390,159 @@ dds_return_t dds_get_qos (dds_entity_t entity, dds_qos_t *qos) return ret; } +static dds_return_t dds_set_qos_locked_impl (dds_entity *e, const dds_qos_t *qos, uint64_t mask) +{ + dds_return_t ret; + dds_qos_t *newqos = dds_create_qos (); + nn_xqos_mergein_missing (newqos, qos, mask); + nn_xqos_mergein_missing (newqos, e->m_qos, ~(uint64_t)0); + if ((ret = nn_xqos_valid (newqos)) != DDS_RETCODE_OK) + ; /* oops ... invalid or inconsistent */ + else if (!(e->m_flags & DDS_ENTITY_ENABLED)) + ; /* do as you please while the entity is not enabled (perhaps we should even allow invalid ones?) */ + else + { + const uint64_t delta = nn_xqos_delta (e->m_qos, newqos, ~(uint64_t)0); + if (delta == 0) /* no change */ + ret = DDS_RETCODE_OK; + else if (delta & ~QP_CHANGEABLE_MASK) + ret = DDS_RETCODE_IMMUTABLE_POLICY; + else if (delta & (QP_RXO_MASK | QP_PARTITION)) + ret = DDS_RETCODE_UNSUPPORTED; /* not yet supporting things that affect matching */ + else + { + /* yay! */ + } + } + + if (ret != DDS_RETCODE_OK) + dds_delete_qos (newqos); + else if ((ret = e->m_deriver.set_qos (e, newqos, e->m_flags & DDS_ENTITY_ENABLED)) != DDS_RETCODE_OK) + dds_delete_qos (newqos); + else + { + dds_delete_qos (e->m_qos); + e->m_qos = newqos; + } + return ret; +} + +static void pushdown_pubsub_qos (dds_entity_t entity) +{ + dds_entity_t *cs = NULL; + int ncs, size = 0; + while ((ncs = dds_get_children (entity, cs, (size_t) size)) > size) + { + size = ncs; + cs = ddsrt_realloc (cs, (size_t) size * sizeof (*cs)); + } + for (int i = 0; i < ncs; i++) + { + dds_entity *e; + if (dds_entity_lock (cs[i], DDS_KIND_DONTCARE, &e) == DDS_RETCODE_OK) + { + if (dds_entity_kind (e) == DDS_KIND_READER || dds_entity_kind (e) == DDS_KIND_WRITER) + { + dds_entity *pe; + if (dds_entity_lock (entity, DDS_KIND_DONTCARE, &pe) == DDS_RETCODE_OK) + { + dds_set_qos_locked_impl (e, pe->m_qos, QP_GROUP_DATA | QP_PARTITION); + dds_entity_unlock (pe); + } + } + dds_entity_unlock (e); + } + } + ddsrt_free (cs); +} + +static void pushdown_topic_qos (dds_entity_t parent, dds_entity_t topic) +{ + dds_entity_t *cs = NULL; + int ncs, size = 0; + while ((ncs = dds_get_children (parent, cs, (size_t) size)) > size) + { + size = ncs; + cs = ddsrt_realloc (cs, (size_t) size * sizeof (*cs)); + } + for (int i = 0; i < ncs; i++) + { + dds_entity *e; + if (dds_entity_lock (cs[i], DDS_KIND_DONTCARE, &e) == DDS_RETCODE_OK) + { + enum { NOP, PROP, CHANGE } todo; + switch (dds_entity_kind (e)) + { + case DDS_KIND_READER: { + dds_reader *rd = (dds_reader *) e; + todo = (rd->m_topic->m_entity.m_hdllink.hdl == topic) ? CHANGE : NOP; + break; + } + case DDS_KIND_WRITER: { + dds_writer *wr = (dds_writer *) e; + todo = (wr->m_topic->m_entity.m_hdllink.hdl == topic) ? CHANGE : NOP; + break; + case DDS_KIND_PUBLISHER: + case DDS_KIND_SUBSCRIBER: + todo = PROP; + break; + default: + todo = NOP; + break; + } + } + if (todo == CHANGE) + { + dds_topic *tp; + if (dds_topic_lock (topic, &tp) == DDS_RETCODE_OK) + { + dds_set_qos_locked_impl (e, tp->m_entity.m_qos, QP_TOPIC_DATA); + dds_topic_unlock (tp); + } + } + dds_entity_unlock (e); + if (todo == PROP) + { + pushdown_topic_qos (cs[i], topic); + } + } + } + ddsrt_free (cs); +} + dds_return_t dds_set_qos (dds_entity_t entity, const dds_qos_t *qos) { dds_entity *e; dds_return_t ret; - if (qos == NULL) return DDS_RETCODE_BAD_PARAMETER; - - if ((ret = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + else if ((ret = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) return ret; - - if (e->m_deriver.set_qos == 0) - ret = DDS_RETCODE_ILLEGAL_OPERATION; else { - if ((ret = e->m_deriver.set_qos (e, qos, e->m_flags & DDS_ENTITY_ENABLED)) == DDS_RETCODE_OK) + const dds_entity_kind_t kind = dds_entity_kind (e); + dds_entity_t pphandle = e->m_participant->m_hdllink.hdl; + if (e->m_deriver.set_qos == 0) + ret = DDS_RETCODE_ILLEGAL_OPERATION; + else + ret = dds_set_qos_locked_impl (e, qos, entity_kind_qos_mask (kind)); + dds_entity_unlock (e); + if (ret == DDS_RETCODE_OK) { - if (e->m_qos == NULL) - e->m_qos = dds_create_qos (); - ret = dds_copy_qos (e->m_qos, qos); + switch (dds_entity_kind (e)) + { + case DDS_KIND_TOPIC: + pushdown_topic_qos (pphandle, entity); + break; + case DDS_KIND_PUBLISHER: + case DDS_KIND_SUBSCRIBER: + pushdown_pubsub_qos (entity); + break; + default: + break; + } } } - dds_entity_unlock (e); return ret; } diff --git a/src/core/ddsc/src/dds_matched.c b/src/core/ddsc/src/dds_matched.c index 327ec56..10fb690 100644 --- a/src/core/ddsc/src/dds_matched.c +++ b/src/core/ddsc/src/dds_matched.c @@ -21,7 +21,6 @@ #include "dds/ddsi/q_bswap.h" #include "dds__writer.h" #include "dds__reader.h" -#include "dds__qos.h" #include "dds__topic.h" dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_handle_t *rds, size_t nrds) diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 614ec59..fea399b 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -16,16 +16,18 @@ #include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_plist.h" +#include "dds/ddsi/q_globals.h" #include "dds__init.h" -#include "dds__qos.h" #include "dds__domain.h" #include "dds__participant.h" #include "dds__builtin.h" +#include "dds__qos.h" DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_participant) #define DDS_PARTICIPANT_STATUS_MASK (0u) + /* List of created participants */ static dds_entity *dds_pp_head = NULL; @@ -70,28 +72,24 @@ static dds_return_t dds_participant_instance_hdl (dds_entity *e, dds_instance_ha return DDS_RETCODE_OK; } -static dds_return_t dds_participant_qos_validate (const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - -static dds_return_t dds_participant_qos_validate (const dds_qos_t *qos, bool enabled) -{ - dds_return_t ret; - (void)enabled; - if ((ret = nn_xqos_valid (qos)) < 0) - return ret; - return DDS_RETCODE_OK; -} - -static dds_return_t dds_participant_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - static dds_return_t dds_participant_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { - dds_return_t ret; - (void)e; - if ((ret = dds_participant_qos_validate (qos, enabled)) != DDS_RETCODE_OK) - return ret; - if (enabled) /* FIXME: changing QoS */ - return DDS_RETCODE_UNSUPPORTED; - return ret; + /* note: e->m_qos is still the old one to allow for failure here */ + if (enabled) + { + struct participant *pp; + thread_state_awake (lookup_thread_state ()); + if ((pp = ephash_lookup_participant_guid (&e->m_guid)) != NULL) + { + nn_plist_t plist; + nn_plist_init_empty (&plist); + plist.qos.present = plist.qos.aliased = qos->present; + plist.qos = *qos; + update_participant_plist (pp, &plist); + } + thread_state_asleep (lookup_thread_state ()); + } + return DDS_RETCODE_OK; } dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener) @@ -110,13 +108,11 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_ if ((ret = dds__check_domain (domain)) != DDS_RETCODE_OK) goto err_domain_check; - -#define DDS_QOSMASK_PARTICIPANT (QP_USER_DATA | QP_PRISMTECH_ENTITY_FACTORY | QP_CYCLONE_IGNORELOCAL) new_qos = dds_create_qos (); if (qos != NULL) - nn_xqos_mergein_missing (new_qos, qos, DDS_QOSMASK_PARTICIPANT); - /* Validate qos or use default if NULL */ - if ((ret = dds_participant_qos_validate (new_qos, false)) != DDS_RETCODE_OK) + nn_xqos_mergein_missing (new_qos, qos, DDS_PARTICIPANT_QOS_MASK); + nn_xqos_mergein_missing (new_qos, &gv.default_plist_pp.qos, ~(uint64_t)0); + if ((ret = nn_xqos_valid (new_qos)) < 0) goto err_qos_validation; /* Translate qos */ diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index bbaa4c5..307a40b 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -29,34 +29,14 @@ static dds_return_t dds_publisher_instance_hdl (dds_entity *e, dds_instance_hand static dds_return_t dds_publisher_instance_hdl (dds_entity *e, dds_instance_handle_t *i) { /* FIXME: Get/generate proper handle. */ - (void) e; - (void) i; + (void) e; (void) i; return DDS_RETCODE_UNSUPPORTED; } -static dds_return_t dds_publisher_qos_validate (const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - -static dds_return_t dds_publisher_qos_validate (const dds_qos_t *qos, bool enabled) -{ - dds_return_t ret; - if ((ret = nn_xqos_valid (qos)) < 0) - return ret; - /* FIXME: Improve/check immutable check. */ - if (enabled && (qos->present & QP_PRESENTATION)) - return DDS_RETCODE_IMMUTABLE_POLICY; - return DDS_RETCODE_OK; -} - -static dds_return_t dds_publisher_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - static dds_return_t dds_publisher_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { - dds_return_t ret; - (void)e; - if ((ret = dds_publisher_qos_validate (qos, enabled)) != DDS_RETCODE_OK) - return ret; - if (enabled) /* FIXME: QoS changes. */ - return DDS_RETCODE_UNSUPPORTED; + /* note: e->m_qos is still the old one to allow for failure here */ + (void) e; (void) qos; (void) enabled; return DDS_RETCODE_OK; } @@ -73,12 +53,11 @@ dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qo dds_qos_t *new_qos; dds_return_t ret; -#define DDS_QOSMASK_PUBLISHER (QP_PARTITION | QP_PRESENTATION | QP_GROUP_DATA | QP_PRISMTECH_ENTITY_FACTORY | QP_CYCLONE_IGNORELOCAL) new_qos = dds_create_qos (); if (qos) - nn_xqos_mergein_missing (new_qos, qos, DDS_QOSMASK_PUBLISHER); + nn_xqos_mergein_missing (new_qos, qos, DDS_PUBLISHER_QOS_MASK); nn_xqos_mergein_missing (new_qos, &gv.default_xqos_pub, ~(uint64_t)0); - if ((ret = dds_publisher_qos_validate (new_qos, false)) != DDS_RETCODE_OK) + if ((ret = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK) { dds_delete_qos (new_qos); return ret; diff --git a/src/core/ddsc/src/dds_qos.c b/src/core/ddsc/src/dds_qos.c index a67329e..a9c0296 100644 --- a/src/core/ddsc/src/dds_qos.c +++ b/src/core/ddsc/src/dds_qos.c @@ -11,10 +11,11 @@ */ #include #include -#include "dds__qos.h" +#include +#include "dds/dds.h" #include "dds/ddsrt/heap.h" #include "dds/ddsrt/string.h" -#include "dds/ddsi/q_config.h" +#include "dds/ddsi/q_plist.h" static void dds_qos_data_copy_in (ddsi_octetseq_t *data, const void * __restrict value, size_t sz, bool overwrite) { @@ -47,26 +48,6 @@ static bool dds_qos_data_copy_out (const ddsi_octetseq_t *data, void **value, si return true; } -dds_return_t dds_qos_validate_mutable_common (const dds_qos_t *qos) -{ - /* FIXME: Check whether immutable QoS are changed should actually incorporate change to current QoS */ - if (qos->present & QP_DEADLINE) - return DDS_RETCODE_IMMUTABLE_POLICY; - if (qos->present & QP_OWNERSHIP) - return DDS_RETCODE_IMMUTABLE_POLICY; - if (qos->present & QP_LIVELINESS) - return DDS_RETCODE_IMMUTABLE_POLICY; - if (qos->present & QP_RELIABILITY) - return DDS_RETCODE_IMMUTABLE_POLICY; - if (qos->present & QP_DESTINATION_ORDER) - return DDS_RETCODE_IMMUTABLE_POLICY; - if (qos->present & QP_HISTORY) - return DDS_RETCODE_IMMUTABLE_POLICY; - if (qos->present & QP_RESOURCE_LIMITS) - return DDS_RETCODE_IMMUTABLE_POLICY; - return DDS_RETCODE_OK; -} - dds_qos_t *dds_create_qos (void) { dds_qos_t *qos = ddsrt_malloc (sizeof (dds_qos_t)); diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index b5896ad..9fcecd6 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -17,11 +17,11 @@ #include "dds__subscriber.h" #include "dds__reader.h" #include "dds__listener.h" -#include "dds__qos.h" #include "dds__init.h" #include "dds__rhc.h" #include "dds__topic.h" #include "dds__get_status.h" +#include "dds__qos.h" #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/q_globals.h" @@ -77,26 +77,18 @@ static dds_return_t dds_reader_delete (dds_entity *e) return ret; } -static dds_return_t dds_reader_qos_validate (const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - -static dds_return_t dds_reader_qos_validate (const dds_qos_t *qos, bool enabled) -{ - dds_return_t ret; - if ((ret = nn_xqos_valid (qos)) < 0) - return ret; - return (enabled ? dds_qos_validate_mutable_common (qos) : DDS_RETCODE_OK); -} - -static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { - dds_return_t ret; - (void) e; - if ((ret = dds_reader_qos_validate (qos, enabled)) != DDS_RETCODE_OK) - return ret; - /* FIXME: QoS changes. */ - return (enabled ? DDS_RETCODE_UNSUPPORTED : DDS_RETCODE_OK); + /* note: e->m_qos is still the old one to allow for failure here */ + if (enabled) + { + struct reader *rd; + thread_state_awake (lookup_thread_state ()); + if ((rd = ephash_lookup_reader_guid (&e->m_guid)) != NULL) + update_reader_qos (rd, qos); + thread_state_asleep (lookup_thread_state ()); + } + return DDS_RETCODE_OK; } static dds_return_t dds_reader_status_validate (uint32_t mask) @@ -363,17 +355,16 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti /* Merge qos from topic and subscriber, dds_copy_qos only fails when it is passed a null argument, but that isn't the case here */ -#define DDS_QOSMASK_READER (QP_USER_DATA | QP_DURABILITY | QP_DEADLINE | QP_LATENCY_BUDGET | QP_OWNERSHIP | QP_LIVELINESS | QP_TIME_BASED_FILTER | QP_RELIABILITY | QP_DESTINATION_ORDER | QP_HISTORY | QP_RESOURCE_LIMITS | QP_PRISMTECH_READER_DATA_LIFECYCLE | QP_CYCLONE_IGNORELOCAL) rqos = dds_create_qos (); if (qos) - nn_xqos_mergein_missing (rqos, qos, DDS_QOSMASK_READER); + nn_xqos_mergein_missing (rqos, qos, DDS_READER_QOS_MASK); if (sub->m_entity.m_qos) nn_xqos_mergein_missing (rqos, sub->m_entity.m_qos, ~(uint64_t)0); if (tp->m_entity.m_qos) nn_xqos_mergein_missing (rqos, tp->m_entity.m_qos, ~(uint64_t)0); nn_xqos_mergein_missing (rqos, &gv.default_xqos_rd, ~(uint64_t)0); - if ((ret = dds_reader_qos_validate (rqos, false)) != DDS_RETCODE_OK) + if ((ret = nn_xqos_valid (rqos)) != DDS_RETCODE_OK) { dds_delete_qos (rqos); reader = ret; diff --git a/src/core/ddsc/src/dds_serdata_builtintopic.c b/src/core/ddsc/src/dds_serdata_builtintopic.c index b95df6e..b1b7e01 100644 --- a/src/core/ddsc/src/dds_serdata_builtintopic.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -190,21 +190,13 @@ static char *dds_string_dup_reuse (char *old, const char *src) static dds_qos_t *dds_qos_from_xqos_reuse (dds_qos_t *old, const dds_qos_t *src) { if (old == NULL) - { old = ddsrt_malloc (sizeof (*old)); - nn_xqos_init_empty (old); - old->present |= QP_TOPIC_NAME | QP_TYPE_NAME; - nn_xqos_mergein_missing (old, src, ~(uint64_t)0); - old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME); - } else { nn_xqos_fini (old); - nn_xqos_init_empty (old); - old->present |= QP_TOPIC_NAME | QP_TYPE_NAME; - nn_xqos_mergein_missing (old, src, ~(uint64_t)0); - old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME); } + nn_xqos_init_empty (old); + nn_xqos_mergein_missing (old, src, ~(QP_TOPIC_NAME | QP_TYPE_NAME)); return old; } diff --git a/src/core/ddsc/src/dds_subscriber.c b/src/core/ddsc/src/dds_subscriber.c index 1054ef6..181a6af 100644 --- a/src/core/ddsc/src/dds_subscriber.c +++ b/src/core/ddsc/src/dds_subscriber.c @@ -12,8 +12,8 @@ #include #include "dds__listener.h" #include "dds__participant.h" -#include "dds__qos.h" #include "dds__subscriber.h" +#include "dds__qos.h" #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_globals.h" #include "dds/version.h" @@ -27,32 +27,16 @@ static dds_return_t dds_subscriber_instance_hdl (dds_entity *e, dds_instance_han static dds_return_t dds_subscriber_instance_hdl (dds_entity *e, dds_instance_handle_t *i) { - (void) e; - (void) i; + (void) e; (void) i; /* FIXME: Get/generate proper handle. */ return DDS_RETCODE_UNSUPPORTED; } -static dds_return_t dds__subscriber_qos_validate (const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - -static dds_return_t dds__subscriber_qos_validate (const dds_qos_t *qos, bool enabled) -{ - dds_return_t ret; - if ((ret = nn_xqos_valid (qos)) < 0) - return ret; - return (enabled && (qos->present & QP_PRESENTATION)) ? DDS_RETCODE_IMMUTABLE_POLICY : DDS_RETCODE_OK; -} - -static dds_return_t dds_subscriber_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - static dds_return_t dds_subscriber_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { - /* FIXME: QoS changes. */ - dds_return_t ret; - (void) e; - if ((ret = dds__subscriber_qos_validate (qos, enabled)) != DDS_RETCODE_OK) - return ret; - return (enabled ? DDS_RETCODE_UNSUPPORTED : DDS_RETCODE_OK); + /* note: e->m_qos is still the old one to allow for failure here */ + (void) e; (void) qos; (void) enabled; + return DDS_RETCODE_OK; } static dds_return_t dds_subscriber_status_validate (uint32_t mask) @@ -68,12 +52,11 @@ dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_q dds_return_t ret; dds_qos_t *new_qos; -#define DDS_QOSMASK_SUBSCRIBER (QP_PARTITION | QP_PRESENTATION | QP_GROUP_DATA | QP_PRISMTECH_ENTITY_FACTORY | QP_CYCLONE_IGNORELOCAL) new_qos = dds_create_qos (); if (qos) - nn_xqos_mergein_missing (new_qos, qos, DDS_QOSMASK_SUBSCRIBER); + nn_xqos_mergein_missing (new_qos, qos, DDS_SUBSCRIBER_QOS_MASK); nn_xqos_mergein_missing (new_qos, &gv.default_xqos_sub, ~(uint64_t)0); - if ((ret = dds__subscriber_qos_validate (new_qos, false)) != DDS_RETCODE_OK) + if ((ret = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK) { dds_delete_qos (new_qos); return ret; diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 7804a67..90bf553 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -18,12 +18,12 @@ #include "dds/ddsrt/string.h" #include "dds__topic.h" #include "dds__listener.h" -#include "dds__qos.h" #include "dds__participant.h" #include "dds__stream.h" #include "dds__init.h" #include "dds__domain.h" #include "dds__get_status.h" +#include "dds__qos.h" #include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_thread.h" #include "dds/ddsi/ddsi_sertopic.h" @@ -182,25 +182,11 @@ static dds_return_t dds_topic_delete (dds_entity *e) return DDS_RETCODE_OK; } -static dds_return_t dds_topic_qos_validate (const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - -static dds_return_t dds_topic_qos_validate (const dds_qos_t *qos, bool enabled) -{ - dds_return_t ret; - if ((ret = nn_xqos_valid (qos)) < 0) - return ret; - return enabled ? dds_qos_validate_mutable_common (qos) : DDS_RETCODE_OK; -} - - static dds_return_t dds_topic_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { - /* FIXME: QoS changes */ - dds_return_t ret; - (void) e; - if ((ret = dds_topic_qos_validate (qos, enabled)) != DDS_RETCODE_OK) - return ret; - return enabled ? DDS_RETCODE_UNSUPPORTED : DDS_RETCODE_OK; + /* note: e->m_qos is still the old one to allow for failure here */ + (void) e; (void) qos; (void) enabled; + return DDS_RETCODE_OK; } static bool dupdef_qos_ok (const dds_qos_t *qos, const struct ddsi_sertopic *st) @@ -239,13 +225,25 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s if (sertopic == NULL) return DDS_RETCODE_BAD_PARAMETER; - if (qos && (rc = dds_topic_qos_validate (qos, false)) != DDS_RETCODE_OK) - return rc; + new_qos = dds_create_qos (); + if (qos) + nn_xqos_mergein_missing (new_qos, qos, DDS_TOPIC_QOS_MASK); + /* One would expect this: + * + * nn_xqos_mergein_missing (new_qos, &gv.default_xqos_tp, ~(uint64_t)0); + * + * but the crazy defaults of the DDS specification has a default settings + * for reliability that are dependent on the entity type: readers and + * topics default to best-effort, but writers to reliable. + * + * Leaving the topic QoS sparse means a default-default topic QoS of + * best-effort will do "the right thing" and let a writer still default to + * reliable ... (and keep behaviour unchanged) */ + if ((rc = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK) + goto err_invalid_qos; if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK) - return rc; - - /* FIXME: I find it weird that qos may be NULL in the entity */ + goto err_lock_participant; /* Check if topic already exists with same name */ ddsrt_mutex_lock (&dds_global.m_mutex); @@ -254,27 +252,22 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s /* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */ rc = DDS_RETCODE_PRECONDITION_NOT_MET; goto err_mismatch; - } else if (!dupdef_qos_ok(qos, stgeneric)) { + } else if (!dupdef_qos_ok (new_qos, stgeneric)) { /* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */ rc = DDS_RETCODE_INCONSISTENT_POLICY; goto err_mismatch; } else { /* FIXME: calling addref is wrong because the Cyclone library has no - knowledge of the reference and hence simply deleting the participant - won't make the ref count drop to 0. On the other hand, the DDS spec - says find_topic (and a second call to create_topic) return a new - proxy that must separately be deleted. */ + knowledge of the reference and hence simply deleting the participant + won't make the ref count drop to 0. On the other hand, the DDS spec + says find_topic (and a second call to create_topic) return a new + proxy that must separately be deleted. */ dds_entity_add_ref (&stgeneric->status_cb_entity->m_entity); hdl = stgeneric->status_cb_entity->m_entity.m_hdllink.hdl; + dds_delete_qos (new_qos); } ddsrt_mutex_unlock (&dds_global.m_mutex); } else { - if (qos) - { - new_qos = dds_create_qos (); - (void)dds_copy_qos (new_qos, qos); - } - /* Create topic */ top = dds_alloc (sizeof (*top)); hdl = dds_entity_init (&top->m_entity, &par->m_entity, DDS_KIND_TOPIC, new_qos, listener, DDS_TOPIC_STATUS_MASK); @@ -293,7 +286,14 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s ddsi_pp = ephash_lookup_participant_guid (&par->m_entity.m_guid); assert (ddsi_pp); if (sedp_plist) - sedp_write_topic (ddsi_pp, sedp_plist); + { + nn_plist_t plist; + nn_plist_init_empty (&plist); + nn_plist_mergein_missing (&plist, sedp_plist, ~(uint64_t)0, ~(uint64_t)0); + nn_xqos_mergein_missing (&plist.qos, new_qos, ~(uint64_t)0); + sedp_write_topic (ddsi_pp, &plist); + nn_plist_fini (&plist); + } thread_state_asleep (lookup_thread_state ()); } dds_participant_unlock (par); @@ -302,6 +302,9 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s err_mismatch: ddsrt_mutex_unlock (&dds_global.m_mutex); dds_participant_unlock (par); +err_lock_participant: +err_invalid_qos: + dds_delete_qos (new_qos); return rc; } @@ -310,7 +313,6 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip char *key = NULL; struct ddsi_sertopic_default *st; const char *typename; - dds_qos_t *new_qos = NULL; nn_plist_t plist; dds_entity_t hdl; size_t keysz; @@ -346,12 +348,7 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip st->opt_size = dds_stream_check_optimize (desc); } -#define DDS_QOSMASK_TOPIC (QP_TOPIC_DATA | QP_DURABILITY | QP_DURABILITY_SERVICE | QP_DEADLINE | QP_LATENCY_BUDGET | QP_OWNERSHIP | QP_LIVELINESS | QP_RELIABILITY | QP_TRANSPORT_PRIORITY | QP_LIFESPAN | QP_DESTINATION_ORDER | QP_HISTORY | QP_RESOURCE_LIMITS) nn_plist_init_empty (&plist); - if (new_qos) - nn_xqos_mergein_missing (&plist.qos, new_qos, DDS_QOSMASK_TOPIC); - nn_xqos_mergein_missing (&plist.qos, &gv.default_xqos_tp, DDS_QOSMASK_TOPIC); - /* Set Topic meta data (for SEDP publication) */ plist.qos.topic_name = ddsrt_strdup (st->c.name); plist.qos.type_name = ddsrt_strdup (st->c.type_name); diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 847e85a..bf55b18 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -20,11 +20,11 @@ #include "dds/ddsi/q_xmsg.h" #include "dds__writer.h" #include "dds__listener.h" -#include "dds__qos.h" #include "dds__init.h" #include "dds__publisher.h" #include "dds__topic.h" #include "dds__get_status.h" +#include "dds__qos.h" #include "dds/ddsi/ddsi_tkmap.h" #include "dds__whc.h" @@ -206,48 +206,18 @@ static dds_return_t dds_writer_delete (dds_entity *e) return ret; } -static dds_return_t dds_writer_qos_validate (const dds_qos_t *qos, bool enabled) -{ - dds_return_t ret; - if ((ret = nn_xqos_valid (qos)) < 0) - return ret; - return enabled ? dds_qos_validate_mutable_common (qos) : DDS_RETCODE_OK; -} - static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { - /* FIXME: QoS changes */ - dds_return_t ret; - - if ((ret = dds_writer_qos_validate (qos, enabled)) != DDS_RETCODE_OK) - return ret; - - /* Sort-of support updating ownership strength */ - if ((qos->present & QP_OWNERSHIP_STRENGTH) && (qos->present & ~QP_OWNERSHIP_STRENGTH) == 0) + /* note: e->m_qos is still the old one to allow for failure here */ + if (enabled) { - dds_ownership_kind_t kind; - dds_qget_ownership (e->m_qos, &kind); - - if (kind != DDS_OWNERSHIP_EXCLUSIVE) - return DDS_RETCODE_ERROR; - - struct writer *ddsi_wr = ((dds_writer *) e)->m_wr; - dds_qset_ownership_strength (e->m_qos, qos->ownership_strength.value); - + struct writer *wr; thread_state_awake (lookup_thread_state ()); - - /* FIXME: with QoS changes being unsupported by the underlying stack I wonder what will happen; locking the underlying DDSI writer is of doubtful value as well */ - ddsrt_mutex_lock (&ddsi_wr->e.lock); - ddsi_wr->xqos->ownership_strength.value = qos->ownership_strength.value; - ddsrt_mutex_unlock (&ddsi_wr->e.lock); + if ((wr = ephash_lookup_writer_guid (&e->m_guid)) != NULL) + update_writer_qos (wr, qos); thread_state_asleep (lookup_thread_state ()); } - else - { - if (enabled) - ret = DDS_RETCODE_UNSUPPORTED; - } - return ret; + return DDS_RETCODE_OK; } static struct whc *make_whc (const dds_qos_t *qos) @@ -309,17 +279,16 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit assert (pub->m_entity.m_domain == tp->m_entity.m_domain); /* Merge Topic & Publisher qos */ -#define DDS_QOSMASK_WRITER (QP_USER_DATA | QP_DURABILITY | QP_DURABILITY_SERVICE | QP_DEADLINE | QP_LATENCY_BUDGET | QP_OWNERSHIP | QP_OWNERSHIP_STRENGTH | QP_LIVELINESS | QP_RELIABILITY | QP_TRANSPORT_PRIORITY | QP_LIFESPAN | QP_DESTINATION_ORDER | QP_HISTORY | QP_RESOURCE_LIMITS | QP_PRISMTECH_WRITER_DATA_LIFECYCLE | QP_CYCLONE_IGNORELOCAL) wqos = dds_create_qos (); if (qos) - nn_xqos_mergein_missing (wqos, qos, DDS_QOSMASK_WRITER); + nn_xqos_mergein_missing (wqos, qos, DDS_WRITER_QOS_MASK); if (pub->m_entity.m_qos) nn_xqos_mergein_missing (wqos, pub->m_entity.m_qos, ~(uint64_t)0); if (tp->m_entity.m_qos) nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0); nn_xqos_mergein_missing (wqos, &gv.default_xqos_wr, ~(uint64_t)0); - if ((rc = dds_writer_qos_validate (wqos, false)) != DDS_RETCODE_OK) + if ((rc = nn_xqos_valid (wqos)) < 0) { dds_delete_qos(wqos); goto err_bad_qos; diff --git a/src/core/ddsc/tests/entity_api.c b/src/core/ddsc/tests/entity_api.c index fa098b5..f3241ac 100644 --- a/src/core/ddsc/tests/entity_api.c +++ b/src/core/ddsc/tests/entity_api.c @@ -78,7 +78,7 @@ void entity_qos_get_set(dds_entity_t e, const char* info) CU_ASSERT_EQUAL_FATAL(status, DDS_RETCODE_OK); status = dds_set_qos (e, qos); /* Doesn't change anything, so no need to forbid. But we return NOT_SUPPORTED anyway for now*/ - CU_ASSERT_EQUAL_FATAL(status, DDS_RETCODE_UNSUPPORTED); + CU_ASSERT_EQUAL_FATAL(status, DDS_RETCODE_OK); dds_delete_qos(qos); } diff --git a/src/core/ddsc/tests/unsupported.c b/src/core/ddsc/tests/unsupported.c index 279e8cd..417302a 100644 --- a/src/core/ddsc/tests/unsupported.c +++ b/src/core/ddsc/tests/unsupported.c @@ -76,7 +76,7 @@ CU_Test(ddsc_unsupported, dds_begin_end_coherent, .init = setup, .fini = teardow {BAD, DDS_RETCODE_BAD_PARAMETER} }; - for (int i=0; i < 5; i++) { + for (size_t i=0; i < sizeof (pars) / sizeof (pars[0]);i++) { result = dds_begin_coherent(e[pars[i].index]); CU_ASSERT_EQUAL(result, pars[i].exp_res); result = dds_end_coherent(e[pars[i].index]); @@ -93,7 +93,7 @@ CU_Test(ddsc_unsupported, dds_wait_for_acks, .init = setup, .fini = teardown) {BAD, DDS_RETCODE_BAD_PARAMETER} }; - for (int i=0; i< 3; i++) { + for (size_t i=0; i < sizeof (pars) / sizeof (pars[0]);i++) { result = dds_wait_for_acks(e[pars[i].index], 0); CU_ASSERT_EQUAL(result, pars[i].exp_res); } @@ -108,7 +108,7 @@ CU_Test(ddsc_unsupported, dds_suspend_resume, .init = setup, .fini = teardown) {BAD, DDS_RETCODE_BAD_PARAMETER} }; - for (int i=0; i< 3; i++) { + for (size_t i=0; i < sizeof (pars) / sizeof (pars[0]);i++) { result = dds_suspend(e[pars[i].index]); CU_ASSERT_EQUAL(result, pars[i].exp_res); result = dds_resume(e[pars[i].index]); @@ -128,7 +128,7 @@ CU_Test(ddsc_unsupported, dds_get_instance_handle, .init = setup, .fini = teardo {BAD, DDS_RETCODE_BAD_PARAMETER} }; - for (int i=0; i < 5; i++) { + for (size_t i=0; i < sizeof (pars) / sizeof (pars[0]);i++) { result = dds_get_instance_handle(e[pars[i].index], &ih); CU_ASSERT_EQUAL(result, pars[i].exp_res); } @@ -139,18 +139,12 @@ CU_Test(ddsc_unsupported, dds_set_qos, .init = setup, .fini = teardown) dds_return_t result; dds_qos_t *qos; static struct index_result pars[] = { - {PAR, DDS_RETCODE_UNSUPPORTED}, - {TOP, DDS_RETCODE_UNSUPPORTED}, - {PUB, DDS_RETCODE_UNSUPPORTED}, - {WRI, DDS_RETCODE_UNSUPPORTED}, - {SUB, DDS_RETCODE_UNSUPPORTED}, - {REA, DDS_RETCODE_UNSUPPORTED}, {RCD, DDS_RETCODE_ILLEGAL_OPERATION}, {BAD, DDS_RETCODE_BAD_PARAMETER} }; qos = dds_create_qos(); - for (int i=0; i < 8;i++) { + for (size_t i=0; i < sizeof (pars) / sizeof (pars[0]);i++) { result = dds_set_qos(e[pars[i].index], qos); CU_ASSERT_EQUAL(result, pars[i].exp_res); } diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 5011b2e..bb55315 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -301,6 +301,7 @@ struct proxy_participant struct addrset *as_meta; /* default address set to use for discovery traffic */ struct proxy_endpoint_common *endpoints; /* all proxy endpoints can be reached from here */ ddsrt_avl_tree_t groups; /* table of all groups (publisher, subscriber), see struct proxy_group */ + seqno_t seq; /* sequence number of most recent SPDP message */ unsigned kernel_sequence_numbers : 1; /* whether this proxy participant generates OSPL kernel sequence numbers */ unsigned implicitly_created : 1; /* participants are implicitly created for Cloud/Fog discovered endpoints */ unsigned is_ddsi2_pp: 1; /* if this is the federation-leader on the remote node */ @@ -527,6 +528,7 @@ dds_return_t new_participant (struct nn_guid *ppguid, unsigned flags, const stru * ppguid lookup failed. */ dds_return_t delete_participant (const struct nn_guid *ppguid); +void update_participant_plist (struct participant *pp, const struct nn_plist *plist); /* To obtain the builtin writer to be used for publishing SPDP, SEDP, PMD stuff for PP and its endpoints, given the entityid. If PP has @@ -541,6 +543,9 @@ dds_return_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const s dds_return_t new_reader (struct reader **rd_out, struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct dds_qos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg); +void update_reader_qos (struct reader *rd, const struct dds_qos *xqos); +void update_writer_qos (struct writer *wr, const struct dds_qos *xqos); + struct whc_node; struct whc_state; unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, struct whc_node **deferred_free_list); @@ -586,7 +591,7 @@ void delete_local_orphan_writer (struct local_orphan_writer *wr); /* Set when this proxy participant is not to be announced on the built-in topics yet */ #define CF_PROXYPP_NO_SPDP (1 << 3) -void new_proxy_participant (const struct nn_guid *guid, unsigned bes, unsigned prismtech_bes, const struct nn_guid *privileged_pp_guid, struct addrset *as_default, struct addrset *as_meta, const struct nn_plist *plist, dds_duration_t tlease_dur, nn_vendorid_t vendor, unsigned custom_flags, nn_wctime_t timestamp); +void new_proxy_participant (const struct nn_guid *guid, unsigned bes, unsigned prismtech_bes, const struct nn_guid *privileged_pp_guid, struct addrset *as_default, struct addrset *as_meta, const struct nn_plist *plist, dds_duration_t tlease_dur, nn_vendorid_t vendor, unsigned custom_flags, nn_wctime_t timestamp, seqno_t seq); int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t timestamp, int isimplicit); uint64_t participant_instance_id (const struct nn_guid *guid); @@ -595,8 +600,8 @@ enum update_proxy_participant_source { UPD_PROXYPP_CM }; -int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp); -int update_proxy_participant_plist (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp); +int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp); +int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp); void proxy_participant_reassign_lease (struct proxy_participant *proxypp, struct lease *newlease); void purge_proxy_participants (const nn_locator_t *loc, bool delete_from_as_disc); @@ -618,8 +623,8 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid, int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); -void update_proxy_reader (struct proxy_reader * prd, struct addrset *as); -void update_proxy_writer (struct proxy_writer * pwr, struct addrset *as); +void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); +void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); int new_proxy_group (const struct nn_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); void delete_proxy_group (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit); diff --git a/src/core/ddsi/include/dds/ddsi/q_xqos.h b/src/core/ddsi/include/dds/ddsi/q_xqos.h index 7b5e8ea..e34fb58 100644 --- a/src/core/ddsi/include/dds/ddsi/q_xqos.h +++ b/src/core/ddsi/include/dds/ddsi/q_xqos.h @@ -277,6 +277,7 @@ DDS_EXPORT void nn_xqos_init_default_topic (dds_qos_t *xqos); DDS_EXPORT void nn_xqos_copy (dds_qos_t *dst, const dds_qos_t *src); DDS_EXPORT void nn_xqos_unalias (dds_qos_t *xqos); DDS_EXPORT void nn_xqos_fini (dds_qos_t *xqos); +DDS_EXPORT void nn_xqos_fini_mask (dds_qos_t *xqos, uint64_t mask); DDS_EXPORT dds_return_t nn_xqos_valid (const dds_qos_t *xqos); DDS_EXPORT void nn_xqos_mergein_missing (dds_qos_t *a, const dds_qos_t *b, uint64_t mask); DDS_EXPORT uint64_t nn_xqos_delta (const dds_qos_t *a, const dds_qos_t *b, uint64_t mask); diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index 9218611..5df36a1 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -506,7 +506,7 @@ static void make_participants_dependent_on_ddsi2 (const nn_guid_t *ddsi2guid, nn } } -static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t timestamp, const nn_plist_t *datap) +static int handle_SPDP_alive (const struct receiver_state *rst, seqno_t seq, nn_wctime_t timestamp, const nn_plist_t *datap) { const unsigned bes_sedp_announcer_mask = NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER | @@ -593,11 +593,14 @@ static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t time DDS_LOG(DDS_LC_TRACE, "SPDP ST0 "PGUIDFMT" (known)", PGUID (datap->participant_guid)); lease_renew (ddsrt_atomic_ldvoidp (&proxypp->lease), now_et ()); ddsrt_mutex_lock (&proxypp->e.lock); - if (proxypp->implicitly_created) + if (proxypp->implicitly_created || seq > proxypp->seq) { - DDS_LOG(DDS_LC_DISCOVERY, " (NEW was-implicitly-created)"); + if (proxypp->implicitly_created) + DDS_LOG(DDS_LC_DISCOVERY, " (NEW was-implicitly-created)"); + else + DDS_LOG(DDS_LC_DISCOVERY, " (update)"); proxypp->implicitly_created = 0; - update_proxy_participant_plist_locked (proxypp, datap, UPD_PROXYPP_SPDP, timestamp); + update_proxy_participant_plist_locked (proxypp, seq, datap, UPD_PROXYPP_SPDP, timestamp); } ddsrt_mutex_unlock (&proxypp->e.lock); return 0; @@ -739,7 +742,8 @@ static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t time lease_duration, rst->vendor, custom_flags, - timestamp + timestamp, + seq ); /* Force transmission of SPDP messages - we're not very careful @@ -779,7 +783,7 @@ static int handle_SPDP_alive (const struct receiver_state *rst, nn_wctime_t time return 1; } -static void handle_SPDP (const struct receiver_state *rst, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len) +static void handle_SPDP (const struct receiver_state *rst, seqno_t seq, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, uint32_t len) { const struct CDRHeader *data = vdata; /* built-ins not deserialized (yet) */ DDS_TRACE("SPDP ST%x", statusinfo); @@ -809,7 +813,7 @@ static void handle_SPDP (const struct receiver_state *rst, nn_wctime_t timestamp switch (statusinfo & (NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER)) { case 0: - interesting = handle_SPDP_alive (rst, timestamp, &decoded_data); + interesting = handle_SPDP_alive (rst, seq, timestamp, &decoded_data); break; case NN_STATUSINFO_DISPOSE: @@ -1019,7 +1023,7 @@ static const char *durability_to_string (dds_durability_kind_t k) return "undefined-durability"; } -static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppguid, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp) +static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppguid, nn_plist_t *datap /* note: potentially modifies datap */, const nn_guid_prefix_t *src_guid_prefix, nn_vendorid_t vendorid, nn_wctime_t timestamp, seqno_t seq) { nn_guid_t privguid; nn_plist_t pp_plist; @@ -1056,7 +1060,7 @@ static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppg doing anything about (1). That means we fall back to the legacy mode of locally generating GIDs but leaving the system id unchanged if the remote is OSPL. */ actual_vendorid = (datap->present & PP_VENDORID) ? datap->vendorid : vendorid; - new_proxy_participant(ppguid, 0, 0, &privguid, new_addrset(), new_addrset(), &pp_plist, T_NEVER, actual_vendorid, CF_IMPLICITLY_CREATED_PROXYPP, timestamp); + new_proxy_participant(ppguid, 0, 0, &privguid, new_addrset(), new_addrset(), &pp_plist, T_NEVER, actual_vendorid, CF_IMPLICITLY_CREATED_PROXYPP, timestamp, seq); } else if (ppguid->prefix.u[0] == src_guid_prefix->u[0] && vendor_is_eclipse_or_opensplice (vendorid)) { @@ -1090,7 +1094,7 @@ static struct proxy_participant *implicitly_create_proxypp (const nn_guid_t *ppg ddsrt_mutex_unlock (&privpp->e.lock); pp_plist.prismtech_participant_version_info.flags &= ~NN_PRISMTECH_FL_PARTICIPANT_IS_DDSI2; - new_proxy_participant (ppguid, 0, 0, &privguid, as_default, as_meta, &pp_plist, T_NEVER, vendorid, CF_IMPLICITLY_CREATED_PROXYPP | CF_PROXYPP_NO_SPDP, timestamp); + new_proxy_participant (ppguid, 0, 0, &privguid, as_default, as_meta, &pp_plist, T_NEVER, vendorid, CF_IMPLICITLY_CREATED_PROXYPP | CF_PROXYPP_NO_SPDP, timestamp, seq); } } @@ -1138,7 +1142,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat if ((pp = ephash_lookup_proxy_participant_guid (&ppguid)) == NULL) { DDS_LOG(DDS_LC_DISCOVERY, " unknown-proxypp"); - if ((pp = implicitly_create_proxypp (&ppguid, datap, src_guid_prefix, vendorid, timestamp)) == NULL) + if ((pp = implicitly_create_proxypp (&ppguid, datap, src_guid_prefix, vendorid, timestamp, 0)) == NULL) E ("?\n", err); /* Repeat regular SEDP trace for convenience */ DDS_LOG(DDS_LC_DISCOVERY, "SEDP ST0 "PGUIDFMT" (cont)", PGUID (datap->endpoint_guid)); @@ -1185,18 +1189,10 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat } if (pwr || prd) { - /* Cloud load balances by updating participant endpoints */ - - if (! vendor_is_cloud (vendorid)) - { - DDS_LOG(DDS_LC_DISCOVERY, " known\n"); - goto err; - } - /* Re-bind the proxy participant to the discovery service - and do this if it is currently bound to another DS instance, because that other DS instance may have already failed and with a new one taking over, without our noticing it. */ - DDS_LOG(DDS_LC_DISCOVERY, " known-DS"); + DDS_LOG(DDS_LC_DISCOVERY, " known%s", vendor_is_cloud (vendorid) ? "-DS" : ""); if (vendor_is_cloud (vendorid) && pp->implicitly_created && memcmp(&pp->privileged_pp_guid.prefix, src_guid_prefix, sizeof(pp->privileged_pp_guid.prefix)) != 0) { nn_etime_t never = { T_NEVER }; @@ -1261,7 +1257,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat { if (pwr) { - update_proxy_writer (pwr, as); + update_proxy_writer (pwr, as, xqos, timestamp); } else { @@ -1281,7 +1277,7 @@ static void handle_SEDP_alive (const struct receiver_state *rst, nn_plist_t *dat { if (prd) { - update_proxy_reader (prd, as); + update_proxy_reader (prd, as, xqos, timestamp); } else { @@ -1484,9 +1480,9 @@ static void handle_SEDP_CM (const struct receiver_state *rst, nn_entityid_t wr_e else { if ((proxypp = ephash_lookup_proxy_participant_guid (&decoded_data.participant_guid)) == NULL) - proxypp = implicitly_create_proxypp (&decoded_data.participant_guid, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp); + proxypp = implicitly_create_proxypp (&decoded_data.participant_guid, &decoded_data, &rst->src_guid_prefix, rst->vendor, timestamp, 0); if (proxypp != NULL) - update_proxy_participant_plist (proxypp, &decoded_data, UPD_PROXYPP_CM, timestamp); + update_proxy_participant_plist (proxypp, 0, &decoded_data, UPD_PROXYPP_CM, timestamp); } } @@ -1864,7 +1860,7 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str switch (srcguid.entityid.u) { case NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER: - handle_SPDP (sampleinfo->rst, timestamp, statusinfo, datap, datasz); + handle_SPDP (sampleinfo->rst, sampleinfo->seq, timestamp, statusinfo, datap, datasz); break; case NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER: case NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER: diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 5870a41..3ae6d2a 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -375,6 +375,36 @@ static void remove_deleted_participant_guid (const struct nn_guid *guid, unsigne /* PARTICIPANT ------------------------------------------------------ */ +static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, const dds_qos_t *xqos, nn_wctime_t timestamp) +{ + uint64_t mask; + + mask = nn_xqos_delta (ent_qos, xqos, QP_CHANGEABLE_MASK & ~(QP_RXO_MASK | QP_PARTITION)) & xqos->present; +#if 0 + int a = (ent_qos->present & QP_TOPIC_DATA) ? (int) ent_qos->topic_data.length : 6; + int b = (xqos->present & QP_TOPIC_DATA) ? (int) xqos->topic_data.length : 6; + char *astr = (ent_qos->present & QP_TOPIC_DATA) ? (char *) ent_qos->topic_data.value : "(null)"; + char *bstr = (xqos->present & QP_TOPIC_DATA) ? (char *) xqos->topic_data.value : "(null)"; + printf ("%d: "PGUIDFMT" ent_qos %d \"%*.*s\" xqos %d \"%*.*s\" => mask %d\n", + (int) getpid (), PGUID (e->guid), + !!(ent_qos->present & QP_TOPIC_DATA), a, a, astr, + !!(xqos->present & QP_TOPIC_DATA), b, b, bstr, + !!(mask & QP_TOPIC_DATA)); +#endif + DDS_LOG (DDS_LC_DISCOVERY, "update_qos_locked "PGUIDFMT" delta=%"PRIu64" QOS={", PGUID(e->guid), mask); + nn_log_xqos(DDS_LC_DISCOVERY, xqos); + DDS_LOG (DDS_LC_DISCOVERY, "}\n"); + + if (mask == 0) + /* no change, or an as-yet unsupported one */ + return false; + + nn_xqos_fini_mask (ent_qos, mask); + nn_xqos_mergein_missing (ent_qos, xqos, mask); + ddsi_plugin.builtintopic_write (e, timestamp, true); + return true; +} + static dds_return_t pp_allocate_entityid(nn_entityid_t *id, unsigned kind, struct participant *pp) { uint32_t id1; @@ -674,6 +704,14 @@ dds_return_t new_participant (nn_guid_t *p_ppguid, unsigned flags, const nn_plis return new_participant_guid (p_ppguid, flags, plist); } +void update_participant_plist (struct participant *pp, const nn_plist_t *plist) +{ + ddsrt_mutex_lock (&pp->e.lock); + if (update_qos_locked (&pp->e, &pp->plist->qos, &plist->qos, now ())) + spdp_write (pp); + ddsrt_mutex_unlock (&pp->e.lock); +} + static void delete_builtin_endpoint (const struct nn_guid *ppguid, unsigned entityid) { nn_guid_t guid; @@ -2909,6 +2947,14 @@ struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, str return lowr; } +void update_writer_qos (struct writer *wr, const dds_qos_t *xqos) +{ + ddsrt_mutex_lock (&wr->e.lock); + if (update_qos_locked (&wr->e, wr->xqos, xqos, now ())) + sedp_write_writer (wr); + ddsrt_mutex_unlock (&wr->e.lock); +} + static void gc_delete_writer (struct gcreq *gcreq) { struct writer *wr = gcreq->arg; @@ -3432,6 +3478,13 @@ uint64_t reader_instance_id (const struct nn_guid *guid) return 0; } +void update_reader_qos (struct reader *rd, const dds_qos_t *xqos) +{ + ddsrt_mutex_lock (&rd->e.lock); + if (update_qos_locked (&rd->e, rd->xqos, xqos, now ())) + sedp_write_reader (rd); + ddsrt_mutex_unlock (&rd->e.lock); +} /* PROXY-PARTICIPANT ------------------------------------------------ */ static void gc_proxy_participant_lease (struct gcreq *gcreq) @@ -3481,7 +3534,8 @@ void new_proxy_participant dds_duration_t tlease_dur, nn_vendorid_t vendor, unsigned custom_flags, - nn_wctime_t timestamp + nn_wctime_t timestamp, + seqno_t seq ) { /* No locking => iff all participants use unique guids, and sedp @@ -3503,6 +3557,7 @@ void new_proxy_participant proxypp->vendor = vendor; proxypp->bes = bes; proxypp->prismtech_bes = prismtech_bes; + proxypp->seq = seq; if (privileged_pp_guid) { proxypp->privileged_pp_guid = *privileged_pp_guid; } else { @@ -3543,6 +3598,7 @@ void new_proxy_participant proxypp->as_meta = as_meta; proxypp->endpoints = NULL; proxypp->plist = nn_plist_dup (plist); + nn_xqos_mergein_missing (&proxypp->plist->qos, &gv.default_plist_pp.qos, ~(uint64_t)0); ddsrt_avl_init (&proxypp_groups_treedef, &proxypp->groups); if (custom_flags & CF_INC_KERNEL_SEQUENCE_NUMBERS) @@ -3656,51 +3712,39 @@ void new_proxy_participant ddsrt_mutex_unlock (&proxypp->e.lock); } -int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp) +int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp) { - /* Currently, built-in processing is single-threaded, and it is only through this function and the proxy participant deletion (which necessarily happens when no-one else potentially references the proxy participant anymore). So at the moment, the lock is superfluous. */ - nn_plist_t *new_plist; + nn_plist_t *new_plist = ddsrt_malloc (sizeof (*new_plist)); + nn_plist_init_empty (new_plist); + nn_plist_mergein_missing (new_plist, datap, PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID | PP_ENTITY_NAME, QP_USER_DATA); + nn_plist_mergein_missing (new_plist, &gv.default_plist_pp, ~(uint64_t)0, ~(uint64_t)0); - new_plist = nn_plist_dup (datap); - nn_plist_mergein_missing (new_plist, proxypp->plist, ~(uint64_t)0, ~(uint64_t)0); - nn_plist_fini (proxypp->plist); - ddsrt_free (proxypp->plist); - proxypp->plist = new_plist; + if (seq && seq > proxypp->seq) + proxypp->seq = seq; switch (source) { case UPD_PROXYPP_SPDP: - ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true); + update_qos_locked (&proxypp->e, &proxypp->plist->qos, &new_plist->qos, timestamp); + nn_plist_fini (new_plist); + ddsrt_free (new_plist); proxypp->proxypp_have_spdp = 1; break; + case UPD_PROXYPP_CM: + nn_plist_fini (proxypp->plist); + ddsrt_free (proxypp->plist); + proxypp->plist = new_plist; proxypp->proxypp_have_cm = 1; break; } - return 0; } -int update_proxy_participant_plist (struct proxy_participant *proxypp, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp) +int update_proxy_participant_plist (struct proxy_participant *proxypp, seqno_t seq, const struct nn_plist *datap, enum update_proxy_participant_source source, nn_wctime_t timestamp) { - nn_plist_t tmp; - - /* FIXME: find a better way of restricting which bits can get updated */ ddsrt_mutex_lock (&proxypp->e.lock); - switch (source) - { - case UPD_PROXYPP_SPDP: - update_proxy_participant_plist_locked (proxypp, datap, source, timestamp); - break; - case UPD_PROXYPP_CM: - tmp = *datap; - tmp.present &= - PP_PRISMTECH_NODE_NAME | PP_PRISMTECH_EXEC_NAME | PP_PRISMTECH_PROCESS_ID | - PP_ENTITY_NAME; - tmp.qos.present &= QP_PRISMTECH_ENTITY_FACTORY; - update_proxy_participant_plist_locked (proxypp, &tmp, source, timestamp); - break; - } + update_proxy_participant_plist_locked (proxypp, seq, datap, source, timestamp); ddsrt_mutex_unlock (&proxypp->e.lock); return 0; } @@ -4064,7 +4108,6 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en else memset (&c->group_guid, 0, sizeof (c->group_guid)); - ref_proxy_participant (proxypp, c); } @@ -4175,7 +4218,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid, return 0; } -void update_proxy_writer (struct proxy_writer * pwr, struct addrset * as) +void update_proxy_writer (struct proxy_writer *pwr, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) { struct reader * rd; struct pwr_rd_match * m; @@ -4203,10 +4246,12 @@ void update_proxy_writer (struct proxy_writer * pwr, struct addrset * as) m = ddsrt_avl_iter_next (&iter); } } + + update_qos_locked (&pwr->e, pwr->c.xqos, xqos, timestamp); ddsrt_mutex_unlock (&pwr->e.lock); } -void update_proxy_reader (struct proxy_reader * prd, struct addrset * as) +void update_proxy_reader (struct proxy_reader *prd, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp) { struct prd_wr_match * m; nn_guid_t wrguid; @@ -4255,6 +4300,8 @@ void update_proxy_reader (struct proxy_reader * prd, struct addrset * as) ddsrt_mutex_lock (&prd->e.lock); } } + + update_qos_locked (&prd->e, prd->c.xqos, xqos, timestamp); ddsrt_mutex_unlock (&prd->e.lock); } diff --git a/src/core/ddsi/src/q_plist.c b/src/core/ddsi/src/q_plist.c index 0cf2da1..15f27f4 100644 --- a/src/core/ddsi/src/q_plist.c +++ b/src/core/ddsi/src/q_plist.c @@ -1248,7 +1248,7 @@ void nn_plist_init_tables (void) ddsrt_once (&table_init_control, nn_plist_init_tables_real); } -static void plist_or_xqos_fini (void * __restrict dst, size_t shift) +static void plist_or_xqos_fini (void * __restrict dst, size_t shift, uint64_t pmask, uint64_t qmask) { /* shift == 0: plist, shift > 0: just qos */ struct flagset pfs, qfs; @@ -1277,7 +1277,8 @@ static void plist_or_xqos_fini (void * __restrict dst, size_t shift) assert (shift == 0 || entry->plist_offset - shift < sizeof (dds_qos_t)); size_t dstoff = entry->plist_offset - shift; struct flagset * const fs = (entry->flags & PDF_QOS) ? &qfs : &pfs; - if ((*fs->present & entry->present_flag)) + uint64_t mask = (entry->flags & PDF_QOS) ? qmask : pmask; + if (*fs->present & entry->present_flag & mask) { if (!(entry->flags & PDF_FUNCTION)) fini_generic (dst, &dstoff, fs, entry->present_flag, entry->op.desc); @@ -1285,8 +1286,8 @@ static void plist_or_xqos_fini (void * __restrict dst, size_t shift) entry->op.f.fini (dst, &dstoff, fs, entry->present_flag); } } - if (pfs.present) { *pfs.present = *pfs.aliased = 0; } - *qfs.present = *qfs.aliased = 0; + if (pfs.present) { *pfs.present &= ~pmask; *pfs.aliased &= ~pmask; } + *qfs.present &= ~qmask; *qfs.aliased &= ~qmask; } static void plist_or_xqos_unalias (void * __restrict dst, size_t shift) @@ -1441,7 +1442,7 @@ static void plist_or_xqos_addtomsg (struct nn_xmsg *xmsg, const void * __restric void nn_plist_fini (nn_plist_t *plist) { - plist_or_xqos_fini (plist, 0); + plist_or_xqos_fini (plist, 0, ~(uint64_t)0, ~(uint64_t)0); } void nn_plist_unalias (nn_plist_t *plist) @@ -2322,18 +2323,19 @@ void nn_xqos_init_empty (dds_qos_t *dest) void nn_plist_init_default_participant (nn_plist_t *plist) { nn_plist_init_empty (plist); + plist->qos.present |= QP_PRISMTECH_ENTITY_FACTORY; plist->qos.entity_factory.autoenable_created_entities = 0; + + plist->qos.present |= QP_USER_DATA; + plist->qos.user_data.length = 0; + plist->qos.user_data.value = NULL; } static void xqos_init_default_common (dds_qos_t *xqos) { nn_xqos_init_empty (xqos); - xqos->present |= QP_PARTITION; - xqos->partition.n = 0; - xqos->partition.strs = NULL; - xqos->present |= QP_PRESENTATION; xqos->presentation.access_scope = DDS_PRESENTATION_INSTANCE; xqos->presentation.coherent_access = 0; @@ -2374,10 +2376,31 @@ static void xqos_init_default_common (dds_qos_t *xqos) xqos->ignorelocal.value = DDS_IGNORELOCAL_NONE; } -void nn_xqos_init_default_reader (dds_qos_t *xqos) +static void nn_xqos_init_default_endpoint (dds_qos_t *xqos) { xqos_init_default_common (xqos); + xqos->present |= QP_TOPIC_DATA; + xqos->topic_data.length = 0; + xqos->topic_data.value = NULL; + + xqos->present |= QP_GROUP_DATA; + xqos->group_data.length = 0; + xqos->group_data.value = NULL; + + xqos->present |= QP_USER_DATA; + xqos->user_data.length = 0; + xqos->user_data.value = NULL; + + xqos->present |= QP_PARTITION; + xqos->partition.n = 0; + xqos->partition.strs = NULL; +} + +void nn_xqos_init_default_reader (dds_qos_t *xqos) +{ + nn_xqos_init_default_endpoint (xqos); + xqos->present |= QP_RELIABILITY; xqos->reliability.kind = DDS_RELIABILITY_BEST_EFFORT; @@ -2400,7 +2423,7 @@ void nn_xqos_init_default_reader (dds_qos_t *xqos) void nn_xqos_init_default_writer (dds_qos_t *xqos) { - xqos_init_default_common (xqos); + nn_xqos_init_default_endpoint (xqos); xqos->present |= QP_DURABILITY_SERVICE; xqos->durability_service.service_cleanup_delay = 0; @@ -2461,10 +2484,14 @@ void nn_xqos_init_default_topic (dds_qos_t *xqos) xqos->subscription_keys.key_list.strs = NULL; } -void nn_xqos_init_default_subscriber (dds_qos_t *xqos) +static void nn_xqos_init_default_publisher_subscriber (dds_qos_t *xqos) { nn_xqos_init_empty (xqos); + xqos->present |= QP_GROUP_DATA; + xqos->group_data.length = 0; + xqos->group_data.value = NULL; + xqos->present |= QP_PRISMTECH_ENTITY_FACTORY; xqos->entity_factory.autoenable_created_entities = 1; @@ -2473,16 +2500,14 @@ void nn_xqos_init_default_subscriber (dds_qos_t *xqos) xqos->partition.strs = NULL; } +void nn_xqos_init_default_subscriber (dds_qos_t *xqos) +{ + nn_xqos_init_default_publisher_subscriber (xqos); +} + void nn_xqos_init_default_publisher (dds_qos_t *xqos) { - nn_xqos_init_empty (xqos); - - xqos->present |= QP_PRISMTECH_ENTITY_FACTORY; - xqos->entity_factory.autoenable_created_entities = 1; - - xqos->present |= QP_PARTITION; - xqos->partition.n = 0; - xqos->partition.strs = NULL; + nn_xqos_init_default_publisher_subscriber (xqos); } void nn_xqos_copy (dds_qos_t *dst, const dds_qos_t *src) @@ -2493,7 +2518,12 @@ void nn_xqos_copy (dds_qos_t *dst, const dds_qos_t *src) void nn_xqos_fini (dds_qos_t *xqos) { - plist_or_xqos_fini (xqos, offsetof (nn_plist_t, qos)); + plist_or_xqos_fini (xqos, offsetof (nn_plist_t, qos), ~(uint64_t)0, ~(uint64_t)0); +} + +void nn_xqos_fini_mask (dds_qos_t *xqos, uint64_t mask) +{ + plist_or_xqos_fini (xqos, offsetof (nn_plist_t, qos), ~(uint64_t)0, mask); } void nn_xqos_unalias (dds_qos_t *xqos) diff --git a/src/ddsrt/src/expand_envvars.c b/src/ddsrt/src/expand_envvars.c index dbaa33a..f34d424 100644 --- a/src/ddsrt/src/expand_envvars.c +++ b/src/ddsrt/src/expand_envvars.c @@ -19,6 +19,7 @@ #include "dds/ddsrt/heap.h" #include "dds/ddsrt/log.h" #include "dds/ddsrt/string.h" +#include "dds/ddsrt/process.h" typedef char * (*expand_fn)(const char *src0); @@ -34,8 +35,16 @@ static void expand_append (char **dst, size_t *sz, size_t *pos, char c) static char *expand_env (const char *name, char op, const char *alt, expand_fn expand) { + char pidstr[20]; char *env = NULL; - (void)ddsrt_getenv (name, &env); + + if (name[0] == '$' && name[1] == 0) { + snprintf (pidstr, sizeof (pidstr), "%"PRIdPID, ddsrt_getpid ()); + env = pidstr; + } else { + (void) ddsrt_getenv (name, &env); + } + switch (op) { case 0: diff --git a/src/mpt/CMakeLists.txt b/src/mpt/CMakeLists.txt index 5c9ec7f..1a9c4ef 100644 --- a/src/mpt/CMakeLists.txt +++ b/src/mpt/CMakeLists.txt @@ -20,4 +20,3 @@ set(MPT_DEFAULT_TIMEOUT "60") add_subdirectory(mpt) add_subdirectory(tests) - diff --git a/src/mpt/tests/CMakeLists.txt b/src/mpt/tests/CMakeLists.txt index 63e786b..02eadaa 100644 --- a/src/mpt/tests/CMakeLists.txt +++ b/src/mpt/tests/CMakeLists.txt @@ -16,4 +16,4 @@ if(MPT_ENABLE_SELFTEST) endif() add_subdirectory(basic) -add_subdirectory(qosmatch) +add_subdirectory(qos) diff --git a/src/mpt/tests/qosmatch/CMakeLists.txt b/src/mpt/tests/qos/CMakeLists.txt similarity index 53% rename from src/mpt/tests/qosmatch/CMakeLists.txt rename to src/mpt/tests/qos/CMakeLists.txt index efa3324..e2f8109 100644 --- a/src/mpt/tests/qosmatch/CMakeLists.txt +++ b/src/mpt/tests/qos/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright(c) 2006 to 2018 ADLINK Technology Limited and others +# Copyright(c) 2019 ADLINK Technology Limited and others # # This program and the accompanying materials are made available under the # terms of the Eclipse Public License v. 2.0 which is available at @@ -11,13 +11,18 @@ # include(${MPT_CMAKE}) -set(sources +add_compile_options("-I${PROJECT_SOURCE_DIR}/core/ddsi/include") + +idlc_generate(mpt_rwdata_lib "procs/rwdata.idl") + +set(sources_qosmatch "procs/rw.c" "qosmatch.c") +add_mpt_executable(mpt_qosmatch ${sources_qosmatch}) +target_link_libraries(mpt_qosmatch PRIVATE mpt_rwdata_lib) -add_compile_options("-I${PROJECT_SOURCE_DIR}/core/ddsi/include") -add_mpt_executable(mpt_qosmatch ${sources}) - -idlc_generate(mpt_qosmatch_rwdata_lib "procs/rwdata.idl") -target_link_libraries(mpt_qosmatch PRIVATE mpt_qosmatch_rwdata_lib) - +set(sources_ppuserdata + "procs/ppud.c" + "ppuserdata.c") +add_mpt_executable(mpt_ppuserdata ${sources_ppuserdata}) +target_link_libraries(mpt_ppuserdata PRIVATE mpt_rwdata_lib) diff --git a/src/mpt/tests/qos/ppuserdata.c b/src/mpt/tests/qos/ppuserdata.c new file mode 100644 index 0000000..9e52042 --- /dev/null +++ b/src/mpt/tests/qos/ppuserdata.c @@ -0,0 +1,58 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include "mpt/mpt.h" +#include "procs/ppud.h" + + +/* + * Checks whether participant user_data QoS changes work. + */ +#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, true, 10) +#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, false, 0) +MPT_TestProcess(qos, ppuserdata, a, ppud, TEST_A_ARGS); +MPT_TestProcess(qos, ppuserdata, b, ppud, TEST_B_ARGS); +MPT_Test(qos, ppuserdata, .init=ppud_init, .fini=ppud_fini); +#undef TEST_A_ARGS +#undef TEST_B_ARGS + +/* + * Checks whether reader/writer user_data QoS changes work. + */ +#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", true, 10) +#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwuserdata", false, 0) +MPT_TestProcess(qos, rwuserdata, a, rwud, TEST_A_ARGS); +MPT_TestProcess(qos, rwuserdata, b, rwud, TEST_B_ARGS); +MPT_Test(qos, rwuserdata, .init=ppud_init, .fini=ppud_fini); +#undef TEST_A_ARGS +#undef TEST_B_ARGS + +/* + * Checks whether topic_data QoS changes become visible in reader/writer. + */ +#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", true, 10) +#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwtopicdata", false, 0) +MPT_TestProcess(qos, rwtopicdata, a, rwtd, TEST_A_ARGS); +MPT_TestProcess(qos, rwtopicdata, b, rwtd, TEST_B_ARGS); +MPT_Test(qos, rwtopicdata, .init=ppud_init, .fini=ppud_fini); +#undef TEST_A_ARGS +#undef TEST_B_ARGS + +/* + * Checks whether group_data QoS changes become visible in reader/writer. + */ +#define TEST_A_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", true, 10) +#define TEST_B_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "rwgroupdata", false, 0) +MPT_TestProcess(qos, rwgroupdata, a, rwgd, TEST_A_ARGS); +MPT_TestProcess(qos, rwgroupdata, b, rwgd, TEST_B_ARGS); +MPT_Test(qos, rwgroupdata, .init=ppud_init, .fini=ppud_fini); +#undef TEST_A_ARGS +#undef TEST_B_ARGS diff --git a/src/mpt/tests/qos/procs/ppud.c b/src/mpt/tests/qos/procs/ppud.c new file mode 100644 index 0000000..0d564fb --- /dev/null +++ b/src/mpt/tests/qos/procs/ppud.c @@ -0,0 +1,569 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include +#include +#include + +#include "mpt/mpt.h" +#include "rwdata.h" + +#include "dds/dds.h" + +#include "dds/ddsrt/time.h" +#include "dds/ddsrt/process.h" +#include "dds/ddsrt/sockets.h" +#include "dds/ddsrt/heap.h" + +void ppud_init (void) { } +void ppud_fini (void) { } + +static const char *exp_ud[] = { + "a", "bc", "def", "" +}; + +MPT_ProcessEntry (ppud, + MPT_Args (dds_domainid_t domainid, + bool active, + unsigned ncycles)) +{ + dds_entity_t dp, rd, ws; + dds_instance_handle_t dpih; + dds_return_t rc; + dds_qos_t *qos; + int id = (int) ddsrt_getpid (); + + printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid); + + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + dp = dds_create_participant (domainid, qos, NULL); + MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); + rc = dds_get_instance_handle (dp, &dpih); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not get participant instance handle: %s\n", dds_strretcode (rc)); + rd = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPARTICIPANT, qos, NULL); + MPT_ASSERT_FATAL_GT (rd, 0, "Could not create DCPSParticipant reader: %s\n", dds_strretcode (rd)); + rc = dds_set_status_mask (rd, DDS_DATA_AVAILABLE_STATUS); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not set status mask: %s\n", dds_strretcode (rc)); + ws = dds_create_waitset (dp); + MPT_ASSERT_FATAL_GT (ws, 0, "Could not create waitset: %s\n", dds_strretcode (ws)); + rc = dds_waitset_attach (ws, rd, 0); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not attach reader to waitset: %s\n", dds_strretcode (rc)); + + bool done = false; + bool first = true; + unsigned exp_index = 0; + unsigned exp_cycle = 0; + while (!done) + { + rc = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY); + MPT_ASSERT_FATAL_GEQ (rc, 0, "Wait failed: %s\n", dds_strretcode (ws)); + + void *raw = NULL; + dds_sample_info_t si; + int32_t n; + while ((n = dds_take (rd, &raw, &si, 1, 1)) == 1) + { + const dds_builtintopic_participant_t *sample = raw; + if (si.instance_state != DDS_IST_ALIVE) + done = true; + else if (si.instance_handle == dpih || !si.valid_data) + continue; + else + { + void *ud = NULL; + size_t usz = 0; + if (!dds_qget_userdata (sample->qos, &ud, &usz)) + printf ("%d: user data not set in QoS\n", id); + if (first && usz == 0) + { + dds_qset_userdata (qos, "X", 1); + rc = dds_set_qos (dp, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + } + else + { + const char *exp = exp_ud[exp_index]; + if (first && strcmp (ud, "X") == 0) + exp = "X"; + const size_t expsz = strlen (exp); + bool eq = (usz == expsz && (usz == 0 || memcmp (ud, exp, usz) == 0)); + printf ("%d: expected %u %zu/%s received %zu/%s\n", + id, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); + MPT_ASSERT (eq, "User data mismatch: expected %u %zu/%s received %zu/%s\n", + exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); + if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0])) + { + exp_index = 0; + exp_cycle++; + } + + if (active && exp_cycle == ncycles) + done = true; + else + { + const void *newud; + size_t newusz; + if (!active) + { + /* Set user data to the same value in response */ + newud = ud; newusz = usz; + dds_qset_userdata (qos, ud, usz); + } + else /* Set next agreed value */ + { + newud = exp_ud[exp_index]; newusz = strlen (exp_ud[exp_index]); + dds_qset_userdata (qos, newud, newusz); + } + + rc = dds_set_qos (dp, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + + dds_qos_t *chk = dds_create_qos (); + rc = dds_get_qos (dp, chk); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); + + void *chkud = NULL; + size_t chkusz = 0; + if (!dds_qget_userdata (chk, &chkud, &chkusz)) + MPT_ASSERT (0, "Check QoS: no user data present\n"); + MPT_ASSERT (chkusz == newusz && (newusz == 0 || memcmp (chkud, newud, newusz) == 0), + "Retrieved user data differs from user data just set (%zu/%s vs %zu/%s)\n", + chkusz, chkud ? (char *) chkud : "(null)", newusz, newud ? (char *) newud : "(null)"); + dds_free (chkud); + dds_delete_qos (chk); + first = false; + } + } + dds_free (ud); + } + } + MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n)); + dds_return_loan (rd, &raw, 1); + } + dds_delete_qos (qos); + rc = dds_delete (dp); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n"); + printf ("=== [Publisher(%d)] Done\n", id); +} + +MPT_ProcessEntry (rwud, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool active, + unsigned ncycles)) +{ + dds_entity_t dp, tp, ep, rdep, ws; + dds_return_t rc; + dds_qos_t *qos; + int id = (int) ddsrt_getpid (); + + printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid); + + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + dp = dds_create_participant (domainid, NULL, NULL); + MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); + tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL); + MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp)); + if (active) + { + rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, qos, NULL); + MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSSubscription reader: %s\n", dds_strretcode (rdep)); + ep = dds_create_writer (dp, tp, qos, NULL); + MPT_ASSERT_FATAL_GT (ep, 0, "Could not create writer: %s\n", dds_strretcode (ep)); + } + else + { + rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, qos, NULL); + MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSPublication reader: %s\n", dds_strretcode (rdep)); + ep = dds_create_reader (dp, tp, qos, NULL); + MPT_ASSERT_FATAL_GT (ep, 0, "Could not create reader: %s\n", dds_strretcode (ep)); + } + rc = dds_set_status_mask (rdep, DDS_DATA_AVAILABLE_STATUS); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not set status mask: %s\n", dds_strretcode (rc)); + ws = dds_create_waitset (dp); + MPT_ASSERT_FATAL_GT (ws, 0, "Could not create waitset: %s\n", dds_strretcode (ws)); + rc = dds_waitset_attach (ws, rdep, 0); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not attach built-in reader to waitset: %s\n", dds_strretcode (rc)); + + bool done = false; + bool first = true; + unsigned exp_index = 0; + unsigned exp_cycle = 0; + while (!done) + { + rc = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY); + MPT_ASSERT_FATAL_GEQ (rc, 0, "Wait failed: %s\n", dds_strretcode (ws)); + + void *raw = NULL; + dds_sample_info_t si; + int32_t n; + while ((n = dds_take (rdep, &raw, &si, 1, 1)) == 1) + { + const dds_builtintopic_endpoint_t *sample = raw; + if (si.instance_state != DDS_IST_ALIVE) + done = true; + else if (!si.valid_data || strcmp (sample->topic_name, topic_name) != 0) + continue; + else + { + void *ud = NULL; + size_t usz = 0; + if (!dds_qget_userdata (sample->qos, &ud, &usz)) + printf ("%d: user data not set in QoS\n", id); + if (first && usz == 0) + { + dds_qset_userdata (qos, "X", 1); + rc = dds_set_qos (ep, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + } + else + { + const char *exp = exp_ud[exp_index]; + if (first && strcmp (ud, "X") == 0) + exp = "X"; + const size_t expsz = first ? 1 : strlen (exp); + bool eq = (usz == expsz && (usz == 0 || memcmp (ud, exp, usz) == 0)); + printf ("%d: expected %u %zu/%s received %zu/%s\n", + id, exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); + MPT_ASSERT (eq, "User data mismatch: expected %u %zu/%s received %zu/%s\n", + exp_index, expsz, exp, usz, ud ? (char *) ud : "(null)"); + if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0])) + { + exp_index = 0; + exp_cycle++; + } + + if (active && exp_cycle == ncycles) + done = true; + else + { + const void *newud; + size_t newusz; + if (!active) + { + /* Set user data to the same value in response */ + newud = ud; newusz = usz; + dds_qset_userdata (qos, ud, usz); + } + else /* Set next agreed value */ + { + newud = exp_ud[exp_index]; newusz = strlen (exp_ud[exp_index]); + dds_qset_userdata (qos, newud, newusz); + } + + rc = dds_set_qos (ep, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + + dds_qos_t *chk = dds_create_qos (); + rc = dds_get_qos (ep, chk); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); + + void *chkud = NULL; + size_t chkusz = 0; + if (!dds_qget_userdata (chk, &chkud, &chkusz)) + MPT_ASSERT (0, "Check QoS: no user data present\n"); + MPT_ASSERT (chkusz == newusz && (newusz == 0 || memcmp (chkud, newud, newusz) == 0), + "Retrieved user data differs from user data just set (%zu/%s vs %zu/%s)\n", + chkusz, chkud ? (char *) chkud : "(null)", newusz, newud ? (char *) newud : "(null)"); + dds_free (chkud); + dds_delete_qos (chk); + first = false; + } + } + dds_free (ud); + } + } + MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n)); + dds_return_loan (rdep, &raw, 1); + } + dds_delete_qos (qos); + rc = dds_delete (dp); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n"); + printf ("=== [Publisher(%d)] Done\n", id); +} + +MPT_ProcessEntry (rwtd, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool active, + unsigned ncycles)) +{ + dds_entity_t dp, tp, ep, rdep, ws; + dds_return_t rc; + dds_qos_t *qos; + int id = (int) ddsrt_getpid (); + + printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid); + + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + dp = dds_create_participant (domainid, NULL, NULL); + MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); + tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL); + MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp)); + if (active) + { + rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, qos, NULL); + MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSSubscription reader: %s\n", dds_strretcode (rdep)); + ep = dds_create_writer (dp, tp, qos, NULL); + MPT_ASSERT_FATAL_GT (ep, 0, "Could not create writer: %s\n", dds_strretcode (ep)); + } + else + { + rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, qos, NULL); + MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSPublication reader: %s\n", dds_strretcode (rdep)); + ep = dds_create_reader (dp, tp, qos, NULL); + MPT_ASSERT_FATAL_GT (ep, 0, "Could not create reader: %s\n", dds_strretcode (ep)); + } + rc = dds_set_status_mask (rdep, DDS_DATA_AVAILABLE_STATUS); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not set status mask: %s\n", dds_strretcode (rc)); + ws = dds_create_waitset (dp); + MPT_ASSERT_FATAL_GT (ws, 0, "Could not create waitset: %s\n", dds_strretcode (ws)); + rc = dds_waitset_attach (ws, rdep, 0); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not attach built-in reader to waitset: %s\n", dds_strretcode (rc)); + + bool done = false; + bool first = true; + unsigned exp_index = 0; + unsigned exp_cycle = 0; + while (!done) + { + rc = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY); + MPT_ASSERT_FATAL_GEQ (rc, 0, "Wait failed: %s\n", dds_strretcode (ws)); + + void *raw = NULL; + dds_sample_info_t si; + int32_t n; + while ((n = dds_take (rdep, &raw, &si, 1, 1)) == 1) + { + const dds_builtintopic_endpoint_t *sample = raw; + if (si.instance_state != DDS_IST_ALIVE) + done = true; + else if (!si.valid_data || strcmp (sample->topic_name, topic_name) != 0) + continue; + else + { + void *td = NULL; + size_t tsz = 0; + if (!dds_qget_topicdata (sample->qos, &td, &tsz)) + printf ("%d: topic data not set in QoS\n", id); + if (first && tsz == 0) + { + dds_qset_topicdata (qos, "X", 1); + rc = dds_set_qos (tp, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + } + else + { + const char *exp = exp_ud[exp_index]; + if (first && strcmp (td, "X") == 0) + exp = "X"; + const size_t expsz = first ? 1 : strlen (exp); + bool eq = (tsz == expsz && (tsz == 0 || memcmp (td, exp, tsz) == 0)); + printf ("%d: expected %u %zu/%s received %zu/%s\n", + id, exp_index, expsz, exp, tsz, td ? (char *) td : "(null)"); + MPT_ASSERT (eq, "Topic data mismatch: expected %u %zu/%s received %zu/%s\n", + exp_index, expsz, exp, tsz, td ? (char *) td : "(null)"); + if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0])) + { + exp_index = 0; + exp_cycle++; + } + + if (active && exp_cycle == ncycles) + done = true; + else + { + const void *newtd; + size_t newtsz; + if (!active) + { + /* Set topic data to the same value in response */ + newtd = td; newtsz = tsz; + dds_qset_topicdata (qos, td, tsz); + } + else /* Set next agreed value */ + { + newtd = exp_ud[exp_index]; newtsz = strlen (exp_ud[exp_index]); + dds_qset_topicdata (qos, newtd, newtsz); + } + + rc = dds_set_qos (tp, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + + dds_qos_t *chk = dds_create_qos (); + rc = dds_get_qos (ep, chk); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); + + void *chktd = NULL; + size_t chktsz = 0; + if (!dds_qget_topicdata (chk, &chktd, &chktsz)) + MPT_ASSERT (0, "Check QoS: no topic data present\n"); + MPT_ASSERT (chktsz == newtsz && (newtsz == 0 || memcmp (chktd, newtd, newtsz) == 0), + "Retrieved topic data differs from topic data just set (%zu/%s vs %zu/%s)\n", + chktsz, chktd ? (char *) chktd : "(null)", newtsz, newtd ? (char *) newtd : "(null)"); + dds_free (chktd); + dds_delete_qos (chk); + first = false; + } + } + dds_free (td); + } + } + MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n)); + dds_return_loan (rdep, &raw, 1); + } + dds_delete_qos (qos); + rc = dds_delete (dp); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n"); + printf ("=== [Publisher(%d)] Done\n", id); +} + +MPT_ProcessEntry (rwgd, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool active, + unsigned ncycles)) +{ + dds_entity_t dp, tp, ep, rdep, grp, ws; + dds_return_t rc; + dds_qos_t *qos; + int id = (int) ddsrt_getpid (); + + printf ("=== [Check(%d)] active=%d ncycles=%u Start(%d) ...\n", id, active, ncycles, (int) domainid); + + qos = dds_create_qos (); + dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + dp = dds_create_participant (domainid, NULL, NULL); + MPT_ASSERT_FATAL_GT (dp, 0, "Could not create participant: %s\n", dds_strretcode (dp)); + tp = dds_create_topic (dp, &RWData_Msg_desc, topic_name, qos, NULL); + MPT_ASSERT_FATAL_GT (tp, 0, "Could not create topic: %s\n", dds_strretcode (tp)); + if (active) + { + rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, qos, NULL); + MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSSubscription reader: %s\n", dds_strretcode (rdep)); + ep = dds_create_writer (dp, tp, qos, NULL); + MPT_ASSERT_FATAL_GT (ep, 0, "Could not create writer: %s\n", dds_strretcode (ep)); + } + else + { + rdep = dds_create_reader (dp, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, qos, NULL); + MPT_ASSERT_FATAL_GT (rdep, 0, "Could not create DCPSPublication reader: %s\n", dds_strretcode (rdep)); + ep = dds_create_reader (dp, tp, qos, NULL); + MPT_ASSERT_FATAL_GT (ep, 0, "Could not create reader: %s\n", dds_strretcode (ep)); + } + grp = dds_get_parent (ep); + MPT_ASSERT_FATAL_GT (grp, 0, "Could not get pub/sub from wr/rd: %s\n", dds_strretcode (grp)); + rc = dds_set_status_mask (rdep, DDS_DATA_AVAILABLE_STATUS); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not set status mask: %s\n", dds_strretcode (rc)); + ws = dds_create_waitset (dp); + MPT_ASSERT_FATAL_GT (ws, 0, "Could not create waitset: %s\n", dds_strretcode (ws)); + rc = dds_waitset_attach (ws, rdep, 0); + MPT_ASSERT_FATAL_EQ (rc, 0, "Could not attach built-in reader to waitset: %s\n", dds_strretcode (rc)); + + bool done = false; + bool first = true; + unsigned exp_index = 0; + unsigned exp_cycle = 0; + while (!done) + { + rc = dds_waitset_wait (ws, NULL, 0, DDS_INFINITY); + MPT_ASSERT_FATAL_GEQ (rc, 0, "Wait failed: %s\n", dds_strretcode (ws)); + + void *raw = NULL; + dds_sample_info_t si; + int32_t n; + while ((n = dds_take (rdep, &raw, &si, 1, 1)) == 1) + { + const dds_builtintopic_endpoint_t *sample = raw; + if (si.instance_state != DDS_IST_ALIVE) + done = true; + else if (!si.valid_data || strcmp (sample->topic_name, topic_name) != 0) + continue; + else + { + void *gd = NULL; + size_t gsz = 0; + if (!dds_qget_groupdata (sample->qos, &gd, &gsz)) + printf ("%d: group data not set in QoS\n", id); + if (first && gsz == 0) + { + dds_qset_groupdata (qos, "X", 1); + rc = dds_set_qos (grp, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + } + else + { + const char *exp = exp_ud[exp_index]; + if (first && strcmp (gd, "X") == 0) + exp = "X"; + const size_t expsz = first ? 1 : strlen (exp); + bool eq = (gsz == expsz && (gsz == 0 || memcmp (gd, exp, gsz) == 0)); + printf ("%d: expected %u %zu/%s received %zu/%s\n", + id, exp_index, expsz, exp, gsz, gd ? (char *) gd : "(null)"); + MPT_ASSERT (eq, "Group data mismatch: expected %u %zu/%s received %zu/%s\n", + exp_index, expsz, exp, gsz, gd ? (char *) gd : "(null)"); + if (strcmp (exp, "X") != 0 && ++exp_index == sizeof (exp_ud) / sizeof (exp_ud[0])) + { + exp_index = 0; + exp_cycle++; + } + + if (active && exp_cycle == ncycles) + done = true; + else + { + const void *newgd; + size_t newgsz; + if (!active) + { + /* Set group data to the same value in response */ + newgd = gd; newgsz = gsz; + dds_qset_groupdata (qos, gd, gsz); + } + else /* Set next agreed value */ + { + newgd = exp_ud[exp_index]; newgsz = strlen (exp_ud[exp_index]); + dds_qset_groupdata (qos, newgd, newgsz); + } + + rc = dds_set_qos (grp, qos); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Set QoS failed: %s\n", dds_strretcode (rc)); + + dds_qos_t *chk = dds_create_qos (); + rc = dds_get_qos (ep, chk); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "Get QoS failed: %s\n", dds_strretcode (rc)); + + void *chkgd = NULL; + size_t chkgsz = 0; + if (!dds_qget_groupdata (chk, &chkgd, &chkgsz)) + MPT_ASSERT (0, "Check QoS: no group data present\n"); + MPT_ASSERT (chkgsz == newgsz && (newgsz == 0 || memcmp (chkgd, newgd, newgsz) == 0), + "Retrieved group data differs from group data just set (%zu/%s vs %zu/%s)\n", + chkgsz, chkgd ? (char *) chkgd : "(null)", newgsz, newgd ? (char *) newgd : "(null)"); + dds_free (chkgd); + dds_delete_qos (chk); + first = false; + } + } + dds_free (gd); + } + } + MPT_ASSERT_FATAL_EQ (n, 0, "Read failed: %s\n", dds_strretcode (n)); + dds_return_loan (rdep, &raw, 1); + } + dds_delete_qos (qos); + rc = dds_delete (dp); + MPT_ASSERT_EQ (rc, DDS_RETCODE_OK, "teardown failed\n"); + printf ("=== [Publisher(%d)] Done\n", id); +} diff --git a/src/mpt/tests/qos/procs/ppud.h b/src/mpt/tests/qos/procs/ppud.h new file mode 100644 index 0000000..c017aaa --- /dev/null +++ b/src/mpt/tests/qos/procs/ppud.h @@ -0,0 +1,55 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ + +#ifndef MPT_QOSMATCH_PROCS_PPUD_H +#define MPT_QOSMATCH_PROCS_PPUD_H + +#include +#include + +#include "dds/dds.h" +#include "mpt/mpt.h" + +#if defined (__cplusplus) +extern "C" { +#endif + +void ppud_init (void); +void ppud_fini (void); + +MPT_ProcessEntry (ppud, + MPT_Args (dds_domainid_t domainid, + bool active, + unsigned ncycles)); + +MPT_ProcessEntry (rwud, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool active, + unsigned ncycles)); + +MPT_ProcessEntry (rwtd, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool active, + unsigned ncycles)); + +MPT_ProcessEntry (rwgd, + MPT_Args (dds_domainid_t domainid, + const char *topic_name, + bool active, + unsigned ncycles)); +#if defined (__cplusplus) +} +#endif + +#endif diff --git a/src/mpt/tests/qosmatch/procs/rw.c b/src/mpt/tests/qos/procs/rw.c similarity index 96% rename from src/mpt/tests/qosmatch/procs/rw.c rename to src/mpt/tests/qos/procs/rw.c index b4720ff..ba0d3b9 100644 --- a/src/mpt/tests/qosmatch/procs/rw.c +++ b/src/mpt/tests/qos/procs/rw.c @@ -1,3 +1,14 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ #include #include #include diff --git a/src/mpt/tests/qosmatch/procs/rw.h b/src/mpt/tests/qos/procs/rw.h similarity index 93% rename from src/mpt/tests/qosmatch/procs/rw.h rename to src/mpt/tests/qos/procs/rw.h index 851ee20..bfd5b69 100644 --- a/src/mpt/tests/qosmatch/procs/rw.h +++ b/src/mpt/tests/qos/procs/rw.h @@ -1,5 +1,5 @@ /* - * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * Copyright(c) 2019 ADLINK Technology Limited and others * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at diff --git a/src/mpt/tests/qos/procs/rwdata.idl b/src/mpt/tests/qos/procs/rwdata.idl new file mode 100644 index 0000000..3e821a1 --- /dev/null +++ b/src/mpt/tests/qos/procs/rwdata.idl @@ -0,0 +1,19 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +module RWData +{ + struct Msg + { + long k; + }; + #pragma keylist Msg k +}; diff --git a/src/mpt/tests/qos/qosmatch.c b/src/mpt/tests/qos/qosmatch.c new file mode 100644 index 0000000..c6b0c5a --- /dev/null +++ b/src/mpt/tests/qos/qosmatch.c @@ -0,0 +1,21 @@ +/* + * Copyright(c) 2019 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include "mpt/mpt.h" +#include "procs/rw.h" + +#define TEST_PUB_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "multi_qosmatch") +#define TEST_SUB_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "multi_qosmatch") +MPT_TestProcess(qos, qosmatch, pub, rw_publisher, TEST_PUB_ARGS); +MPT_TestProcess(qos, qosmatch, sub, rw_subscriber, TEST_SUB_ARGS); +MPT_Test(qos, qosmatch, .init=rw_init, .fini=rw_fini); +#undef TEST_SUB_ARGS +#undef TEST_PUB_ARGS diff --git a/src/mpt/tests/qosmatch/procs/rwdata.idl b/src/mpt/tests/qosmatch/procs/rwdata.idl deleted file mode 100644 index 7722451..0000000 --- a/src/mpt/tests/qosmatch/procs/rwdata.idl +++ /dev/null @@ -1,8 +0,0 @@ -module RWData -{ - struct Msg - { - long k; - }; - #pragma keylist Msg k -}; diff --git a/src/mpt/tests/qosmatch/qosmatch.c b/src/mpt/tests/qosmatch/qosmatch.c deleted file mode 100644 index b249b35..0000000 --- a/src/mpt/tests/qosmatch/qosmatch.c +++ /dev/null @@ -1,20 +0,0 @@ -#include "mpt/mpt.h" -#include "procs/rw.h" - - -/* - * Tests to check communication between multiple publisher(s) and subscriber(s). - */ - - -/* - * The publisher expects 2 publication matched. - * The subscribers expect 1 sample each. - */ -#define TEST_PUB_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "multi_qosmatch") -#define TEST_SUB_ARGS MPT_ArgValues(DDS_DOMAIN_DEFAULT, "multi_qosmatch") -MPT_TestProcess(qosmatch, qosmatch, pub, rw_publisher, TEST_PUB_ARGS); -MPT_TestProcess(qosmatch, qosmatch, sub, rw_subscriber, TEST_SUB_ARGS); -MPT_Test(qosmatch, qosmatch, .init=rw_init, .fini=rw_fini); -#undef TEST_SUB_ARGS -#undef TEST_PUB_ARGS