diff --git a/src/core/ddsc/include/dds/dds.h b/src/core/ddsc/include/dds/dds.h index 089eea8..75f0f44 100644 --- a/src/core/ddsc/include/dds/dds.h +++ b/src/core/ddsc/include/dds/dds.h @@ -555,7 +555,9 @@ dds_set_enabled_status(dds_entity_t entity, uint32_t mask); * @param[in] entity Entity on which to get qos. * @param[out] qos Pointer to the qos structure that returns the set policies. * - * @returns A dds_return_t indicating success or failure. + * @returns A dds_return_t indicating success or failure. The QoS object will have + * at least all QoS relevant for the entity present and the corresponding dds_qget_... + * will return true. * * @retval DDS_RETCODE_OK * The existing set of QoS policy values applied to the entity @@ -962,7 +964,9 @@ dds_lookup_participant( * @brief Creates a new topic with default type handling. * * The type name for the topic is taken from the generated descriptor. Topic - * matching is done on a combination of topic name and type name. + * matching is done on a combination of topic name and type name. Each successful + * call to dds_create_topic creates a new topic entity sharing the same QoS + * settings with all other topics of the same name. * * @param[in] participant Participant on which to create the topic. * @param[in] descriptor An IDL generated topic descriptor. @@ -970,14 +974,20 @@ dds_lookup_participant( * @param[in] qos QoS to set on the new topic (can be NULL). * @param[in] listener Any listener functions associated with the new topic (can be NULL). * - * @returns A valid topic handle or an error code. + * @returns A valid, unique topic handle or an error code. * * @retval >=0 - * A valid topic handle. + * A valid unique topic handle. * @retval DDS_RETCODE_BAD_PARAMETER * Either participant, descriptor, name or qos is invalid. + * @retval DDS_RETCODE_BAD_PARAMETER + * Either participant, descriptor, name or qos is invalid. + * @retval DDS_RETCODE_INCONSISTENT_POLICY + * QoS mismatch between qos and an existing topic's QoS. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * Mismatch between type name in descriptor and pre-existing + * topic's type name. */ -/* TODO: Check list of retcodes is complete. */ DDS_EXPORT dds_entity_t dds_create_topic( dds_entity_t participant, @@ -992,7 +1002,13 @@ struct nn_plist; * @brief Creates a new topic with arbitrary type handling. * * The type name for the topic is taken from the provided "sertopic" object. Topic - * matching is done on a combination of topic name and type name. + * matching is done on a combination of topic name and type name. Each successful + * call to dds_create_topic creates a new topic entity sharing the same QoS + * settings with all other topics of the same name. + * + * If sertopic is not yet known in the domain, it is added and its refcount + * incremented; if an equivalent sertopic object is already known, then the known + * one is used instead. * * @param[in] participant Participant on which to create the topic. * @param[in] sertopic Internal description of the topic type (includes name). @@ -1000,14 +1016,20 @@ struct nn_plist; * @param[in] listener Any listener functions associated with the new topic (can be NULL). * @param[in] sedp_plist Topic description to be published as part of discovery (if NULL, not published). * - * @returns A valid topic handle or an error code. + * @returns A valid, unique topic handle or an error code. * * @retval >=0 - * A valid topic handle. + * A valid unique topic handle. * @retval DDS_RETCODE_BAD_PARAMETER * Either participant, descriptor, name or qos is invalid. + * @retval DDS_RETCODE_BAD_PARAMETER + * Either participant, descriptor, name or qos is invalid. + * @retval DDS_RETCODE_INCONSISTENT_POLICY + * QoS mismatch between qos and an existing topic's QoS. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * Mismatch between type name in sertopic and pre-existing + * topic's type name. */ -/* TODO: Check list of retcodes is complete. */ DDS_EXPORT dds_entity_t dds_create_topic_arbitrary ( dds_entity_t participant, @@ -1030,8 +1052,9 @@ dds_create_topic_arbitrary ( * A valid topic handle. * @retval DDS_RETCODE_BAD_PARAMETER * Participant was invalid. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * No topic of this name existed yet in the participant */ -/* TODO: Check list of retcodes is complete. */ DDS_EXPORT dds_entity_t dds_find_topic(dds_entity_t participant, const char *name); @@ -1047,8 +1070,6 @@ dds_find_topic(dds_entity_t participant, const char *name); * @retval DDS_RETCODE_OK * Success. */ -/* TODO: do we need a convenience version as well that allocates and add a _s suffix to this one? */ -/* TODO: Check annotation. Could be _Out_writes_to_(size, return + 1) as well. */ DDS_EXPORT dds_return_t dds_get_name(dds_entity_t topic, char *name, size_t size); diff --git a/src/core/ddsc/include/dds/ddsc/dds_public_impl.h b/src/core/ddsc/include/dds/ddsc/dds_public_impl.h index 0b0d99d..730020c 100644 --- a/src/core/ddsc/include/dds/ddsc/dds_public_impl.h +++ b/src/core/ddsc/include/dds/ddsc/dds_public_impl.h @@ -112,6 +112,7 @@ typedef enum dds_entity_kind DDS_KIND_DOMAIN, DDS_KIND_CYCLONEDDS } dds_entity_kind_t; +#define DDS_KIND_MAX DDS_KIND_CYCLONEDDS /* Handles are opaque pointers to implementation types */ typedef uint64_t dds_instance_handle_t; diff --git a/src/core/ddsc/src/dds__entity.h b/src/core/ddsc/src/dds__entity.h index 3725d83..23025c3 100644 --- a/src/core/ddsc/src/dds__entity.h +++ b/src/core/ddsc/src/dds__entity.h @@ -86,7 +86,8 @@ DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status); DDS_EXPORT void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst); -DDS_EXPORT dds_participant *dds_entity_participant (dds_entity *e); +DDS_EXPORT dds_participant *dds_entity_participant (const dds_entity *e); +DDS_EXPORT const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e); DDS_EXPORT void dds_entity_final_deinit_before_free (dds_entity *e); DDS_EXPORT bool dds_entity_in_scope (const dds_entity *e, const dds_entity *root); diff --git a/src/core/ddsc/src/dds__participant.h b/src/core/ddsc/src/dds__participant.h index ef52d86..feac58c 100644 --- a/src/core/ddsc/src/dds__participant.h +++ b/src/core/ddsc/src/dds__participant.h @@ -20,6 +20,8 @@ extern "C" { DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_participant, DDS_KIND_PARTICIPANT) +extern const ddsrt_avl_treedef_t participant_ktopics_treedef; + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsc/src/dds__serdata_builtintopic.h b/src/core/ddsc/src/dds__serdata_builtintopic.h index 803f7e3..e95aa36 100644 --- a/src/core/ddsc/src/dds__serdata_builtintopic.h +++ b/src/core/ddsc/src/dds__serdata_builtintopic.h @@ -37,13 +37,12 @@ struct q_globals; struct ddsi_sertopic_builtintopic { struct ddsi_sertopic c; enum ddsi_sertopic_builtintopic_type type; - struct q_globals *gv; }; extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic; extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic; -struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename, struct q_globals *gv); +struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__topic.h b/src/core/ddsc/src/dds__topic.h index d59dc1f..c1c794d 100644 --- a/src/core/ddsc/src/dds__topic.h +++ b/src/core/ddsc/src/dds__topic.h @@ -21,9 +21,13 @@ extern "C" { DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_topic, DDS_KIND_TOPIC) -DDS_EXPORT struct ddsi_sertopic * dds_topic_lookup (dds_domain * domain, const char * name) ddsrt_nonnull_all; DDS_EXPORT void dds_topic_free (dds_domainid_t domainid, struct ddsi_sertopic * st) ddsrt_nonnull_all; +DDS_EXPORT dds_return_t dds_topic_pin (dds_entity_t handle, struct dds_topic **tp) ddsrt_nonnull_all; +DDS_EXPORT void dds_topic_unpin (struct dds_topic *tp) ddsrt_nonnull_all; +DDS_EXPORT void dds_topic_defer_set_qos (struct dds_topic *tp) ddsrt_nonnull_all; +DDS_EXPORT void dds_topic_allow_set_qos (struct dds_topic *tp) ddsrt_nonnull_all; + #ifndef DDS_TOPIC_INTERN_FILTER_FN_DEFINED #define DDS_TOPIC_INTERN_FILTER_FN_DEFINED typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx); diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index 4b9eec5..362a65c 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -34,6 +34,7 @@ struct dds_writer; struct dds_publisher; struct dds_subscriber; struct dds_topic; +struct dds_ktopic; struct dds_readcond; struct dds_guardcond; struct dds_statuscond; @@ -127,7 +128,7 @@ typedef struct dds_entity { ddsrt_avl_node_t m_avlnode_child; /* [m_mutex of m_parent] */ ddsrt_avl_tree_t m_children; /* [m_mutex] tree on m_iid using m_avlnode_child */ struct dds_domain *m_domain; /* constant */ - dds_qos_t *m_qos; /* [m_mutex] */ + dds_qos_t *m_qos; /* [m_mutex]; null for topics (they rely on correpsonding "ktopic") (+waitset,domain,&c.) */ ddsi_guid_t m_guid; /* unique (if not 0) and constant; FIXME: set during creation, but possibly after becoming visible */ dds_instance_handle_t m_iid; /* unique for all time, constant; FIXME: like GUID */ uint32_t m_flags; /* [m_mutex] */ @@ -215,7 +216,6 @@ typedef struct dds_domain { ddsrt_avl_node_t m_node; /* for dds_global.m_domains */ dds_domainid_t m_id; - ddsrt_avl_tree_t m_topics; struct cfgst *cfgst; struct ddsi_sertopic *builtin_participant_topic; @@ -238,14 +238,31 @@ typedef struct dds_publisher { struct dds_entity m_entity; } dds_publisher; +typedef struct dds_ktopic { + /* name -> mapping for topics, part of the participant + and protected by the participant's lock (including the actual QoS + setting) + + defer_set_qos is used to implement an intentionally unfair single-writer/ + multiple-reader lock using the participant's lock & cond var: set_qos + "write-locks" it, create_reader and create_writer "read-lock" it. */ + ddsrt_avl_node_t pp_ktopics_avlnode; + uint32_t refc; + uint32_t defer_set_qos; /* set_qos must wait for this to be 0 */ + dds_qos_t *qos; + char *name; /* [constant] */ + char *type_name; /* [constant] */ +} dds_ktopic; + typedef struct dds_participant { struct dds_entity m_entity; dds_entity_t m_builtin_subscriber; + ddsrt_avl_tree_t m_ktopics; /* [m_entity.m_mutex] */ } dds_participant; typedef struct dds_reader { struct dds_entity m_entity; - struct dds_topic *m_topic; + struct dds_topic *m_topic; /* refc'd, constant, lock(rd) -> lock(tp) allowed */ struct dds_rhc *m_rhc; /* aliases m_rd->rhc with a wider interface, FIXME: but m_rd owns it for resource management */ struct reader *m_rd; bool m_data_on_readers; @@ -265,7 +282,7 @@ typedef struct dds_reader { typedef struct dds_writer { struct dds_entity m_entity; - struct dds_topic *m_topic; + struct dds_topic *m_topic; /* refc'd, constant, lock(wr) -> lock(tp) allowed */ struct nn_xpack *m_xp; struct writer *m_wr; struct whc *m_whc; /* FIXME: ownership still with underlying DDSI writer (cos of DDSI built-in writers )*/ @@ -287,6 +304,7 @@ typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx); typedef struct dds_topic { struct dds_entity m_entity; struct ddsi_sertopic *m_stopic; + struct dds_ktopic *m_ktopic; /* refc'd, constant */ dds_topic_intern_filter_fn filter_fn; void *filter_ctx; diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index 8dd026f..cae0eb3 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -242,6 +242,13 @@ static void dds__builtin_write (const struct entity_common *e, nn_wctime_t times } } +static void unref_builtin_topics (struct dds_domain *dom) +{ + ddsi_sertopic_unref (dom->builtin_participant_topic); + ddsi_sertopic_unref (dom->builtin_reader_topic); + ddsi_sertopic_unref (dom->builtin_writer_topic); +} + void dds__builtin_init (struct dds_domain *dom) { dds_qos_t *qos = dds__create_builtin_qos (); @@ -253,9 +260,15 @@ void dds__builtin_init (struct dds_domain *dom) dom->btif.builtintopic_write = dds__builtin_write; dom->gv.builtin_topic_interface = &dom->btif; - dom->builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant", &dom->gv); - dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription", &dom->gv); - dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication", &dom->gv); + dom->builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant"); + dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription"); + dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication"); + + ddsrt_mutex_lock (&dom->gv.sertopics_lock); + ddsi_sertopic_register_locked (&dom->gv, dom->builtin_participant_topic); + ddsi_sertopic_register_locked (&dom->gv, dom->builtin_reader_topic); + ddsi_sertopic_register_locked (&dom->gv, dom->builtin_writer_topic); + ddsrt_mutex_unlock (&dom->gv.sertopics_lock); thread_state_awake (lookup_thread_state (), &dom->gv); const struct entity_index *gh = dom->gv.entity_index; @@ -265,6 +278,11 @@ void dds__builtin_init (struct dds_domain *dom) thread_state_asleep (lookup_thread_state ()); dds_delete_qos (qos); + + /* ddsi_sertopic_init initializes the refcount to 1 and dds_sertopic_register_locked increments + it. All "real" references (such as readers and writers) are also accounted for in the + reference count, so we have an excess reference here. */ + unref_builtin_topics (dom); } void dds__builtin_fini (struct dds_domain *dom) @@ -275,8 +293,5 @@ void dds__builtin_fini (struct dds_domain *dom) delete_local_orphan_writer (dom->builtintopic_writer_publications); delete_local_orphan_writer (dom->builtintopic_writer_subscriptions); thread_state_asleep (lookup_thread_state ()); - - ddsi_sertopic_unref (dom->builtin_participant_topic); - ddsi_sertopic_unref (dom->builtin_reader_topic); - ddsi_sertopic_unref (dom->builtin_writer_topic); + unref_builtin_topics (dom); } diff --git a/src/core/ddsc/src/dds_domain.c b/src/core/ddsc/src/dds_domain.c index 01d8085..0a1ffe1 100644 --- a/src/core/ddsc/src/dds_domain.c +++ b/src/core/ddsc/src/dds_domain.c @@ -13,6 +13,7 @@ #include "dds/ddsrt/process.h" #include "dds/ddsrt/heap.h" +#include "dds/ddsrt/hopscotch.h" #include "dds__init.h" #include "dds/ddsc/dds_rhc.h" #include "dds__domain.h" @@ -59,7 +60,6 @@ static dds_entity_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i domain->m_entity.m_iid = ddsi_iid_gen (); domain->gv.tstart = now (); - ddsrt_avl_init (&dds_topictree_def, &domain->m_topics); /* | domain_id | domain id in config | result +-----------+---------------------+---------- diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 4e6a225..7a80ddb 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -135,11 +135,65 @@ static bool entity_has_status (const dds_entity *e) return false; } +static bool entity_may_have_children (const dds_entity *e) +{ + switch (e->m_kind) + { + case DDS_KIND_TOPIC: + return false; + case DDS_KIND_READER: + case DDS_KIND_WRITER: + case DDS_KIND_PUBLISHER: + case DDS_KIND_SUBSCRIBER: + case DDS_KIND_PARTICIPANT: + case DDS_KIND_COND_READ: + case DDS_KIND_COND_QUERY: + case DDS_KIND_COND_GUARD: + case DDS_KIND_WAITSET: + case DDS_KIND_DOMAIN: + case DDS_KIND_CYCLONEDDS: + break; + case DDS_KIND_DONTCARE: + abort (); + break; + } + return true; +} + +#ifndef NDEBUG +static bool entity_kind_has_qos (dds_entity_kind_t kind) +{ + switch (kind) + { + case DDS_KIND_READER: + case DDS_KIND_WRITER: + case DDS_KIND_PUBLISHER: + case DDS_KIND_SUBSCRIBER: + case DDS_KIND_PARTICIPANT: + return true; + case DDS_KIND_TOPIC: + case DDS_KIND_COND_READ: + case DDS_KIND_COND_QUERY: + case DDS_KIND_COND_GUARD: + case DDS_KIND_WAITSET: + case DDS_KIND_DOMAIN: + case DDS_KIND_CYCLONEDDS: + break; + case DDS_KIND_DONTCARE: + abort (); + break; + } + return false; +} +#endif + dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind_t kind, bool implicit, dds_qos_t *qos, const dds_listener_t *listener, status_mask_t mask) { dds_handle_t handle; + /* CycloneDDS is at the root of the hierarchy */ assert ((kind == DDS_KIND_CYCLONEDDS) == (parent == NULL)); + assert (entity_kind_has_qos (kind) == (qos != NULL)); assert (e); e->m_kind = kind; @@ -196,7 +250,7 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind { /* for topics, refc counts readers/writers, for all others, it counts children (this we can get away with as long as topics can't have children) */ - if ((handle = dds_handle_create (&e->m_hdllink, implicit, (kind != DDS_KIND_TOPIC))) <= 0) + if ((handle = dds_handle_create (&e->m_hdllink, implicit, entity_may_have_children (e))) <= 0) return (dds_entity_t) handle; } @@ -219,41 +273,40 @@ void dds_entity_register_child (dds_entity *parent, dds_entity *child) dds_entity_add_ref_locked (parent); } -static dds_entity *get_first_child (ddsrt_avl_tree_t *remaining_children, bool ignore_topics) +static dds_entity *get_next_child (ddsrt_avl_tree_t *remaining_children, uint32_t allowed_kinds, uint64_t *cursor) { ddsrt_avl_iter_t it; - for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, remaining_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it)) + for (dds_entity *e = ddsrt_avl_iter_succ (&dds_entity_children_td, remaining_children, &it, cursor); e != NULL; e = ddsrt_avl_iter_next (&it)) { - if ((!ignore_topics) || (dds_entity_kind(e) != DDS_KIND_TOPIC)) + dds_entity_kind_t kind = dds_entity_kind (e); + if ((1u << (uint32_t) kind) & allowed_kinds) return e; } return NULL; } -static void delete_children(struct dds_entity *parent, bool ignore_topics) +static void delete_children (struct dds_entity *parent, uint32_t allowed_kinds) { dds_entity *child; dds_return_t ret; + uint64_t cursor = 0; ddsrt_mutex_lock (&parent->m_mutex); - while ((child = get_first_child(&parent->m_children, ignore_topics)) != NULL) + while ((child = get_next_child (&parent->m_children, allowed_kinds, &cursor)) != NULL) { dds_entity_t child_handle = child->m_hdllink.hdl; + cursor = child->m_iid; /* The child will remove itself from the parent->m_children list. */ ddsrt_mutex_unlock (&parent->m_mutex); ret = dds_delete_impl (child_handle, DIS_FROM_PARENT); assert (ret == DDS_RETCODE_OK || ret == DDS_RETCODE_BAD_PARAMETER); + (void) ret; ddsrt_mutex_lock (&parent->m_mutex); - /* The dds_delete can fail if the child is being deleted in parallel, - * in which case: wait when its not deleted yet. - * The child will trigger the condition after it removed itself from - * the childrens list. */ - if ((ret == DDS_RETCODE_BAD_PARAMETER) && - (get_first_child(&parent->m_children, ignore_topics) == child)) - { + /* The dds_delete can fail if the child is being deleted in parallel, in which case: + wait until it is has gone. */ + if (ddsrt_avl_lookup (&dds_entity_children_td, &parent->m_children, &cursor) != NULL) ddsrt_cond_wait (&parent->m_cond, &parent->m_mutex); - } } ddsrt_mutex_unlock (&parent->m_mutex); } @@ -283,12 +336,20 @@ static const char *entity_kindstr (dds_entity_kind_t kind) static void print_delete (const dds_entity *e, enum delete_impl_state delstate , dds_instance_handle_t iid) { - unsigned cm = ddsrt_atomic_ld32 (&e->m_hdllink.cnt_flags); - printf ("delete(%p, delstate %s, iid %"PRIx64"): %s%s %d pin %u refc %u %s %s\n", - (void *) e, (delstate == DIS_IMPLICIT) ? "implicit" : (delstate == DIS_EXPLICIT) ? "explicit" : "from_parent", iid, - entity_kindstr (e->m_kind), (e->m_flags & DDS_ENTITY_IMPLICIT) ? " [implicit]" : "", - e->m_hdllink.hdl, cm & 0xfff, (cm >> 12) & 0x7fff, (cm & 0x80000000) ? "closed" : "open", - ddsrt_avl_is_empty (&e->m_children) ? "childless" : "has-children"); + if (e) + { + unsigned cm = ddsrt_atomic_ld32 (&e->m_hdllink.cnt_flags); + printf ("delete(%p, delstate %s, iid %"PRIx64"): %s%s %d pin %u refc %u %s %s\n", + (void *) e, (delstate == DIS_IMPLICIT) ? "implicit" : (delstate == DIS_EXPLICIT) ? "explicit" : "from_parent", iid, + entity_kindstr (e->m_kind), (e->m_flags & DDS_ENTITY_IMPLICIT) ? " [implicit]" : "", + e->m_hdllink.hdl, cm & 0xfff, (cm >> 12) & 0x7fff, (cm & 0x80000000) ? "closed" : "open", + ddsrt_avl_is_empty (&e->m_children) ? "childless" : "has-children"); + } + else + { + printf ("delete(%p, delstate %s, handle %"PRId64"): pin failed\n", + (void *) e, (delstate == DIS_IMPLICIT) ? "implicit" : (delstate == DIS_EXPLICIT) ? "explicit" : "from_parent", iid); + } } #endif @@ -315,7 +376,12 @@ static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state else if (ret == DDS_RETCODE_TRY_AGAIN) /* non-child refs exist */ return DDS_RETCODE_OK; else + { +#if TRACE_DELETE + print_delete (NULL, delstate, (uint64_t) entity); +#endif return ret; + } } dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delstate) @@ -392,8 +458,15 @@ static dds_return_t really_delete_pinned_closed_locked (struct dds_entity *e, en * * To circumvent the problem. We ignore topics in the first loop. */ - delete_children(e, true /* ignore topics */); - delete_children(e, false /* delete topics */); + DDSRT_STATIC_ASSERT ((uint32_t) DDS_KIND_MAX < 32); + static const uint32_t disallowed_kinds[] = { + 1u << (uint32_t) DDS_KIND_TOPIC, + (uint32_t) 0 + }; + for (size_t i = 0; i < sizeof (disallowed_kinds) / sizeof (disallowed_kinds[0]); i++) + { + delete_children (e, ~disallowed_kinds[i]); + } /* The dds_handle_delete will wait until the last active claim on that handle is released. It is possible that this last release will be done by a thread that was @@ -476,13 +549,20 @@ dds_entity_t dds_get_parent (dds_entity_t entity) } } -dds_participant *dds_entity_participant (dds_entity *e) +dds_participant *dds_entity_participant (const dds_entity *e) { while (e && dds_entity_kind (e) != DDS_KIND_PARTICIPANT) e = e->m_parent; return (dds_participant *) e; } +const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e) +{ + struct dds_participant const * const pp = dds_entity_participant (e); + assert (pp != NULL); + return &pp->m_entity.m_guid; +} + dds_entity_t dds_get_participant (dds_entity_t entity) { dds_entity *e; @@ -577,51 +657,124 @@ dds_return_t dds_get_qos (dds_entity_t entity, dds_qos_t *qos) ret = DDS_RETCODE_ILLEGAL_OPERATION; else { + dds_qos_t *entity_qos; + if (dds_entity_kind (e) != DDS_KIND_TOPIC) + entity_qos = e->m_qos; + else + { + struct dds_topic * const tp = (dds_topic *) e; + struct dds_participant * const pp = dds_entity_participant (e); + ddsrt_mutex_lock (&pp->m_entity.m_mutex); + entity_qos = tp->m_ktopic->qos; + ddsrt_mutex_unlock (&pp->m_entity.m_mutex); + } + dds_reset_qos (qos); - nn_xqos_mergein_missing (qos, e->m_qos, ~(QP_TOPIC_NAME | QP_TYPE_NAME)); + nn_xqos_mergein_missing (qos, entity_qos, ~(QP_TOPIC_NAME | QP_TYPE_NAME)); ret = DDS_RETCODE_OK; } dds_entity_unlock(e); return ret; } -static dds_return_t dds_set_qos_locked_impl (dds_entity *e, const dds_qos_t *qos, uint64_t mask) +static dds_return_t dds_set_qos_locked_raw (dds_entity *e, dds_qos_t **e_qos_ptr, bool e_enabled, const dds_qos_t *qos, uint64_t mask, const struct ddsrt_log_cfg *logcfg, dds_return_t (*set_qos) (struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all) { dds_return_t ret; + + /* Any attempt to do this on a topic ends up doing it on the ktopic instead, so that there is + but a single QoS for a topic in a participant while there can be multiple definitions of it, + and hence, multiple sertopics. Those are needed for multi-language support. */ dds_qos_t *newqos = dds_create_qos (); nn_xqos_mergein_missing (newqos, qos, mask); - nn_xqos_mergein_missing (newqos, e->m_qos, ~(uint64_t)0); - if ((ret = nn_xqos_valid (&e->m_domain->gv.logconfig, newqos)) != DDS_RETCODE_OK) - ; /* oops ... invalid or inconsistent */ - else if (!(e->m_flags & DDS_ENTITY_ENABLED)) - ; /* do as you please while the entity is not enabled (perhaps we should even allow invalid ones?) */ + nn_xqos_mergein_missing (newqos, *e_qos_ptr, ~(uint64_t)0); + if ((ret = nn_xqos_valid (logcfg, newqos)) != DDS_RETCODE_OK) + { + /* invalid or inconsistent QoS settings */ + goto error_or_nochange; + } + else if (!e_enabled) + { + /* do as you please while the entity is not enabled */ + } else { - const uint64_t delta = nn_xqos_delta (e->m_qos, newqos, ~(uint64_t)0); - if (delta == 0) /* no change */ - ret = DDS_RETCODE_OK; - else if (delta & ~QP_CHANGEABLE_MASK) - ret = DDS_RETCODE_IMMUTABLE_POLICY; - else if (delta & (QP_RXO_MASK | QP_PARTITION)) - ret = DDS_RETCODE_UNSUPPORTED; /* not yet supporting things that affect matching */ - else + const uint64_t delta = nn_xqos_delta (*e_qos_ptr, newqos, ~(uint64_t)0); + if (delta == 0) { - /* yay! */ + /* new settings are identical to the old */ + goto error_or_nochange; + } + else if (delta & ~QP_CHANGEABLE_MASK) + { + /* not all QoS may be changed according to the spec */ + ret = DDS_RETCODE_IMMUTABLE_POLICY; + goto error_or_nochange; + } + else if (delta & (QP_RXO_MASK | QP_PARTITION)) + { + /* Cyclone doesn't (yet) support changing QoS that affect matching. Simply re-doing the + matching is easy enough, but the consequences are very weird. E.g., what is the + expectation if a transient-local writer has published data while its partition QoS is set + to A, and then changes its partition to B? Should a reader in B get the data originally + published in A? + + One can do the same thing with other RxO QoS settings, e.g., the latency budget setting. + I find that weird, and I'd rather have sane answers to these questions than set up these + traps and pitfalls for people to walk into ... + */ + ret = DDS_RETCODE_UNSUPPORTED; + goto error_or_nochange; } } - if (ret != DDS_RETCODE_OK) - dds_delete_qos (newqos); - else if ((ret = dds_entity_deriver_set_qos (e, newqos, e->m_flags & DDS_ENTITY_ENABLED)) != DDS_RETCODE_OK) - dds_delete_qos (newqos); + assert (ret == DDS_RETCODE_OK); + if ((ret = set_qos (e, newqos, e_enabled)) != DDS_RETCODE_OK) + goto error_or_nochange; else { - dds_delete_qos (e->m_qos); - e->m_qos = newqos; + dds_delete_qos (*e_qos_ptr); + *e_qos_ptr = newqos; } + return DDS_RETCODE_OK; + +error_or_nochange: + dds_delete_qos (newqos); return ret; } +static dds_return_t dds_set_qos_locked_impl (dds_entity *e, const dds_qos_t *qos, uint64_t mask) +{ + const struct ddsrt_log_cfg *logcfg = &e->m_domain->gv.logconfig; + dds_entity_kind_t kind = dds_entity_kind (e); + if (kind != DDS_KIND_TOPIC) + { + return dds_set_qos_locked_raw (e, &e->m_qos, (e->m_flags & DDS_ENTITY_ENABLED) != 0, qos, mask, logcfg, dds_entity_deriver_table[kind]->set_qos); + } + else + { + /* Topics must be enabled for now (all are currently, so for now it is not a meaningful limitation): + there can only be a single QoS (or different versions with the same name can have different QoS - + in particular a different value for TOPIC_DATA - and therefore the idea that it is a free-for-all + on the QoS for a disabled entity falls apart for topics. + + FIXME: topic should have a QoS object while still disabled */ + assert (e->m_flags & DDS_ENTITY_ENABLED); + struct dds_topic * const tp = (struct dds_topic *) e; + struct dds_participant * const pp = dds_entity_participant (e); + struct dds_ktopic * const ktp = tp->m_ktopic; + dds_return_t rc; + ddsrt_mutex_lock (&pp->m_entity.m_mutex); + while (ktp->defer_set_qos != 0) + ddsrt_cond_wait (&pp->m_entity.m_cond, &pp->m_entity.m_mutex); + + /* dds_entity_deriver_table[kind]->set_qos had better avoid looking at the entity! */ + rc = dds_set_qos_locked_raw (NULL, &ktp->qos, (e->m_flags & DDS_ENTITY_ENABLED) != 0, qos, mask, logcfg, dds_entity_deriver_table[kind]->set_qos); + + ddsrt_mutex_unlock (&pp->m_entity.m_mutex); + return rc; + } +} + static void pushdown_pubsub_qos (dds_entity *e) { /* e claimed but no mutex held */ @@ -650,7 +803,7 @@ static void pushdown_pubsub_qos (dds_entity *e) ddsrt_mutex_unlock (&e->m_mutex); } -static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp) +static void pushdown_topic_qos (dds_entity *e, struct dds_ktopic *ktp) { /* on input: both entities claimed but no mutexes held */ enum { NOP, PROP, CHANGE } todo; @@ -658,12 +811,12 @@ static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp) { case DDS_KIND_READER: { dds_reader *rd = (dds_reader *) e; - todo = (&rd->m_topic->m_entity == tp) ? CHANGE : NOP; + todo = (rd->m_topic->m_ktopic == ktp) ? CHANGE : NOP; break; } case DDS_KIND_WRITER: { dds_writer *wr = (dds_writer *) e; - todo = (&wr->m_topic->m_entity == tp) ? CHANGE : NOP; + todo = (wr->m_topic->m_ktopic == ktp) ? CHANGE : NOP; break; } default: { @@ -677,10 +830,11 @@ static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp) break; case CHANGE: { /* may lock topic while holding reader/writer lock */ + struct dds_participant * const pp = dds_entity_participant (e); ddsrt_mutex_lock (&e->m_mutex); - ddsrt_mutex_lock (&tp->m_mutex); - dds_set_qos_locked_impl (e, tp->m_qos, QP_TOPIC_DATA); - ddsrt_mutex_unlock (&tp->m_mutex); + ddsrt_mutex_lock (&pp->m_entity.m_mutex); + dds_set_qos_locked_impl (e, ktp->qos, QP_TOPIC_DATA); + ddsrt_mutex_unlock (&pp->m_entity.m_mutex); ddsrt_mutex_unlock (&e->m_mutex); break; } @@ -697,7 +851,7 @@ static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp) assert (x == c); /* see dds_get_children for why "c" remains valid despite unlocking m_mutex */ ddsrt_mutex_unlock (&e->m_mutex); - pushdown_topic_qos (c, tp); + pushdown_topic_qos (c, ktp); ddsrt_mutex_lock (&e->m_mutex); dds_entity_unpin (c); } @@ -740,7 +894,8 @@ dds_return_t dds_set_qos (dds_entity_t entity, const dds_qos_t *qos) assert (dds_entity_kind (e->m_parent) == DDS_KIND_PARTICIPANT); if (dds_entity_pin (e->m_parent->m_hdllink.hdl, &pp) == DDS_RETCODE_OK) { - pushdown_topic_qos (pp, e); + struct dds_topic *tp = (struct dds_topic *) e; + pushdown_topic_qos (pp, tp->m_ktopic); dds_entity_unpin (pp); } break; diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index e9cdacb..47ee768 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -10,6 +10,7 @@ * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause */ #include +#include #include "dds/ddsrt/cdtors.h" #include "dds/ddsrt/environ.h" @@ -30,6 +31,13 @@ DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_participant) #define DDS_PARTICIPANT_STATUS_MASK (0u) +static int cmp_ktopic_name (const void *a, const void *b) +{ + return strcmp (a, b); +} + +const ddsrt_avl_treedef_t participant_ktopics_treedef = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY(offsetof (struct dds_ktopic, pp_ktopics_avlnode), offsetof (struct dds_ktopic, name), cmp_ktopic_name, 0); + static dds_return_t dds_participant_status_validate (uint32_t mask) { return (mask & ~DDS_PARTICIPANT_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK; @@ -42,6 +50,9 @@ static dds_return_t dds_participant_delete (dds_entity *e) dds_return_t ret; assert (dds_entity_kind (e) == DDS_KIND_PARTICIPANT); + /* ktopics & topics are children and therefore must all have been deleted by the time we get here */ + assert (ddsrt_avl_is_empty (&((struct dds_participant *) e)->m_ktopics)); + thread_state_awake (lookup_thread_state (), &e->m_domain->gv); if ((ret = delete_participant (&e->m_domain->gv, &e->m_guid)) < 0) DDS_CERROR (&e->m_domain->gv.logconfig, "dds_participant_delete: internal error %"PRId32"\n", ret); @@ -125,6 +136,7 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_ pp->m_entity.m_iid = get_entity_instance_id (&dom->gv, &guid); pp->m_entity.m_domain = dom; pp->m_builtin_subscriber = 0; + ddsrt_avl_init (&participant_ktopics_treedef, &pp->m_ktopics); /* Add participant to extent */ ddsrt_mutex_lock (&dom->m_entity.m_mutex); diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 71bfd3d..3a6b385 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -14,6 +14,7 @@ #include "dds/dds.h" #include "dds/version.h" #include "dds/ddsrt/static_assert.h" +#include "dds__participant.h" #include "dds__subscriber.h" #include "dds__reader.h" #include "dds__listener.h" @@ -359,32 +360,34 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe { dds_qos_t *rqos; dds_subscriber *sub = NULL; - dds_participant *pp; dds_entity_t subscriber; - dds_reader *rd; dds_topic *tp; - dds_entity_t reader; - dds_entity_t t; - dds_return_t ret = DDS_RETCODE_OK; - bool internal_topic; + dds_return_t rc; + dds_entity_t pseudo_topic = 0; + bool created_implicit_sub = false; switch (topic) { - case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT: case DDS_BUILTIN_TOPIC_DCPSTOPIC: + /* not implemented yet */ + return DDS_RETCODE_BAD_PARAMETER; + + case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT: case DDS_BUILTIN_TOPIC_DCPSPUBLICATION: case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION: - internal_topic = true; - subscriber = dds__get_builtin_subscriber (participant_or_subscriber); - if ((ret = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK) - return ret; - t = dds__get_builtin_topic (subscriber, topic); + /* translate provided pseudo-topic to a real one */ + pseudo_topic = topic; + if ((subscriber = dds__get_builtin_subscriber (participant_or_subscriber)) < 0) + return subscriber; + if ((rc = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK) + return rc; + topic = dds__get_builtin_topic (subscriber, topic); break; default: { dds_entity *p_or_s; - if ((ret = dds_entity_lock (participant_or_subscriber, DDS_KIND_DONTCARE, &p_or_s)) != DDS_RETCODE_OK) - return ret; + if ((rc = dds_entity_lock (participant_or_subscriber, DDS_KIND_DONTCARE, &p_or_s)) != DDS_RETCODE_OK) + return rc; switch (dds_entity_kind (p_or_s)) { case DDS_KIND_SUBSCRIBER: @@ -392,34 +395,39 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe sub = (dds_subscriber *) p_or_s; break; case DDS_KIND_PARTICIPANT: + created_implicit_sub = true; subscriber = dds__create_subscriber_l ((dds_participant *) p_or_s, true, qos, NULL); dds_entity_unlock (p_or_s); - if ((ret = dds_subscriber_lock (subscriber, &sub)) < 0) - return ret; + if ((rc = dds_subscriber_lock (subscriber, &sub)) < 0) + return rc; break; default: dds_entity_unlock (p_or_s); return DDS_RETCODE_ILLEGAL_OPERATION; } - internal_topic = false; - t = topic; break; } } - if ((ret = dds_topic_lock (t, &tp)) != DDS_RETCODE_OK) - { - reader = ret; - goto err_tp_lock; - } + if ((rc = dds_topic_pin (topic, &tp)) < 0) + goto err_pin_topic; assert (tp->m_stopic); - pp = dds_entity_participant (&sub->m_entity); - if (pp != dds_entity_participant (&tp->m_entity)) + if (dds_entity_participant (&sub->m_entity) != dds_entity_participant (&tp->m_entity)) { - reader = DDS_RETCODE_BAD_PARAMETER; + rc = DDS_RETCODE_BAD_PARAMETER; goto err_pp_mismatch; } + /* Prevent set_qos on the topic until reader has been created and registered: we can't + allow a TOPIC_DATA change to ccur before the reader has been created because that + change would then not be published in the discovery/built-in topics. + + Don't keep the participant (which protects the topic's QoS) locked because that + can cause deadlocks for applications creating a reader/writer from within a + subscription matched listener (whether the restrictions on what one can do in + listeners are reasonable or not, it used to work so it can be broken arbitrarily). */ + dds_topic_defer_set_qos (tp); + /* Merge qos from topic and subscriber, dds_copy_qos only fails when it is passed a null argument, but that isn't the case here */ rqos = dds_create_qos (); @@ -427,31 +435,30 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe nn_xqos_mergein_missing (rqos, qos, DDS_READER_QOS_MASK); if (sub->m_entity.m_qos) nn_xqos_mergein_missing (rqos, sub->m_entity.m_qos, ~(uint64_t)0); - if (tp->m_entity.m_qos) - nn_xqos_mergein_missing (rqos, tp->m_entity.m_qos, ~(uint64_t)0); + if (tp->m_ktopic->qos) + nn_xqos_mergein_missing (rqos, tp->m_ktopic->qos, ~(uint64_t)0); nn_xqos_mergein_missing (rqos, &sub->m_entity.m_domain->gv.default_xqos_rd, ~(uint64_t)0); - if ((ret = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) < 0 || - (ret = validate_reader_qos(rqos)) != DDS_RETCODE_OK) + if ((rc = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) < 0 || + (rc = validate_reader_qos(rqos)) != DDS_RETCODE_OK) { dds_delete_qos (rqos); - reader = ret; goto err_bad_qos; } /* Additional checks required for built-in topics: we don't want to run into a resource limit on a built-in topic, it is a needless complication */ - if (internal_topic && !dds__validate_builtin_reader_qos (tp->m_entity.m_domain, topic, rqos)) + if (pseudo_topic && !dds__validate_builtin_reader_qos (tp->m_entity.m_domain, pseudo_topic, rqos)) { dds_delete_qos (rqos); - reader = DDS_RETCODE_INCONSISTENT_POLICY; + rc = DDS_RETCODE_INCONSISTENT_POLICY; goto err_bad_qos; } /* Create reader and associated read cache (if not provided by caller) */ - rd = dds_alloc (sizeof (*rd)); - reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, false, rqos, listener, DDS_READER_STATUS_MASK); + struct dds_reader * const rd = dds_alloc (sizeof (*rd)); + const dds_entity_t reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, false, rqos, listener, DDS_READER_STATUS_MASK); rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED; rd->m_topic = tp; rd->m_rhc = rhc ? rhc : dds_rhc_default_new (rd, tp->m_stopic); @@ -468,25 +475,27 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe dds_entity_init_complete (&rd->m_entity); thread_state_awake (lookup_thread_state (), &sub->m_entity.m_domain->gv); - ret = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd); - assert (ret == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */ + rc = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, dds_entity_participant_guid (&sub->m_entity), tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd); + assert (rc == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */ thread_state_asleep (lookup_thread_state ()); rd->m_entity.m_iid = get_entity_instance_id (&rd->m_entity.m_domain->gv, &rd->m_entity.m_guid); dds_entity_register_child (&sub->m_entity, &rd->m_entity); - dds_topic_unlock (tp); + dds_topic_allow_set_qos (tp); + dds_topic_unpin (tp); dds_subscriber_unlock (sub); return reader; err_bad_qos: + dds_topic_allow_set_qos (tp); err_pp_mismatch: - dds_topic_unlock (tp); -err_tp_lock: + dds_topic_unpin (tp); +err_pin_topic: dds_subscriber_unlock (sub); - if ((sub->m_entity.m_flags & DDS_ENTITY_IMPLICIT) != 0) + if (created_implicit_sub) (void) dds_delete (subscriber); - return reader; + return rc; } void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb, void *cbarg) diff --git a/src/core/ddsc/src/dds_serdata_builtintopic.c b/src/core/ddsc/src/dds_serdata_builtintopic.c index 1425c60..f9d3122 100644 --- a/src/core/ddsc/src/dds_serdata_builtintopic.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -132,7 +132,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi /* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */ const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn; /* keyhash must in host format (which the GUIDs always are internally) */ - struct entity_common *entity = entidx_lookup_guid_untyped (tp->gv->entity_index, (const ddsi_guid_t *) keyhash->value); + struct entity_common *entity = entidx_lookup_guid_untyped (tp->c.gv->entity_index, (const ddsi_guid_t *) keyhash->value); struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY); memcpy (&d->key, keyhash->value, sizeof (d->key)); if (entity) diff --git a/src/core/ddsc/src/dds_sertopic_builtintopic.c b/src/core/ddsc/src/dds_sertopic_builtintopic.c index 07f730c..c5ace24 100644 --- a/src/core/ddsc/src/dds_sertopic_builtintopic.c +++ b/src/core/ddsc/src/dds_sertopic_builtintopic.c @@ -25,12 +25,11 @@ /* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */ -struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename, struct q_globals *gv) +struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename) { struct ddsi_sertopic_builtintopic *tp = ddsrt_malloc (sizeof (*tp)); ddsi_sertopic_init (&tp->c, name, typename, &ddsi_sertopic_ops_builtintopic, &ddsi_serdata_ops_builtintopic, false); tp->type = type; - tp->gv = gv; return &tp->c; } @@ -40,6 +39,19 @@ static void sertopic_builtin_free (struct ddsi_sertopic *tp) ddsrt_free (tp); } +static bool sertopic_builtin_equal (const struct ddsi_sertopic *acmn, const struct ddsi_sertopic *bcmn) +{ + const struct ddsi_sertopic_builtintopic *a = (struct ddsi_sertopic_builtintopic *) acmn; + const struct ddsi_sertopic_builtintopic *b = (struct ddsi_sertopic_builtintopic *) bcmn; + return a->type == b->type; +} + +static uint32_t sertopic_builtin_hash (const struct ddsi_sertopic *tpcmn) +{ + const struct ddsi_sertopic_builtintopic *tp = (struct ddsi_sertopic_builtintopic *) tpcmn; + return tp->type; +} + static void free_pp (void *vsample) { dds_builtintopic_participant_t *sample = vsample; @@ -131,6 +143,8 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_ } const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic = { + .equal = sertopic_builtin_equal, + .hash = sertopic_builtin_hash, .free = sertopic_builtin_free, .zero_samples = sertopic_builtin_zero_samples, .realloc_samples = sertopic_builtin_realloc_samples, diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 9c4c724..7b40f9b 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -45,15 +45,6 @@ struct topic_sertopic_node { const struct ddsi_sertopic *st; }; -static int topic_sertopic_node_cmp (const void *va, const void *vb) -{ - const struct ddsi_sertopic *a = va; - const struct ddsi_sertopic *b = vb; - return strcmp (a->name, b->name); -} - -const ddsrt_avl_treedef_t dds_topictree_def = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct topic_sertopic_node, avlnode), offsetof (struct topic_sertopic_node, st), topic_sertopic_node_cmp, 0); - static bool is_valid_name (const char *name) ddsrt_nonnull_all; static bool is_valid_name (const char *name) @@ -112,164 +103,88 @@ static void dds_topic_status_cb (struct dds_topic *tp) } #endif -struct ddsi_sertopic *dds_topic_lookup (dds_domain *domain, const char *name) +dds_return_t dds_topic_pin (dds_entity_t handle, struct dds_topic **tp) { - const struct ddsi_sertopic key = { .name = (char *) name }; - struct ddsi_sertopic *st; - struct topic_sertopic_node *nst; - ddsrt_mutex_lock (&dds_global.m_mutex); - if ((nst = ddsrt_avl_lookup (&dds_topictree_def, &domain->m_topics, &key)) == NULL) - st = NULL; - else - st = ddsi_sertopic_ref (nst->st); - ddsrt_mutex_unlock (&dds_global.m_mutex); - return st; -} - -static bool dds_find_topic_check_and_add_ref (dds_entity_t participant, dds_entity_t topic, const char *name) -{ - dds_topic *tp; - if (dds_topic_lock (topic, &tp) != DDS_RETCODE_OK) - return false; - - bool ret; - if (dds_entity_participant (&tp->m_entity)->m_entity.m_hdllink.hdl != participant || strcmp (tp->m_stopic->name, name) != 0) - ret = false; - else - { - /* Simply return the same topic, though that is different to the spirit - of the DDS specification, which gives you a unique copy. Giving that - unique copy means there potentially many versions of exactly the same - topic around, and that two entities can be dealing with the same data - even though they have different topics objects (though with the same - name). That I find a confusing model. - - As far as I can tell, the only benefit is the ability to set different - listeners on the various copies of the topic. And that seems to be a - really small benefit. */ - ret = true; - } - dds_topic_unlock (tp); - return ret; -} - -dds_entity_t dds_find_topic (dds_entity_t participant, const char *name) -{ - dds_entity *pe; + struct dds_entity *e; dds_return_t ret; - dds_entity_t topic; - - if (name == NULL) - return DDS_RETCODE_BAD_PARAMETER; - - /* claim participant handle to guarantee the handle remains valid after - unlocking the participant prior to verifying the found topic still - exists */ - if ((ret = dds_entity_pin (participant, &pe)) < 0) + if ((ret = dds_entity_pin (handle, &e)) < 0) return ret; - if (dds_entity_kind (pe) != DDS_KIND_PARTICIPANT) + if (dds_entity_kind (e) != DDS_KIND_TOPIC) { - dds_entity_unpin (pe); + dds_entity_unpin (e); return DDS_RETCODE_ILLEGAL_OPERATION; } + *tp = (struct dds_topic *) e; + return DDS_RETCODE_OK; +} - do { - dds_participant *p; - topic = DDS_RETCODE_PRECONDITION_NOT_MET; - if ((ret = dds_participant_lock (participant, &p)) == DDS_RETCODE_OK) - { - ddsrt_avl_iter_t it; - for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &p->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it)) - { - if (dds_entity_kind (e) == DDS_KIND_TOPIC && strcmp (((dds_topic *) e)->m_stopic->name, name) == 0) - { - topic = e->m_hdllink.hdl; - break; - } - } - dds_participant_unlock (p); - } - } while (topic > 0 && !dds_find_topic_check_and_add_ref (participant, topic, name)); +void dds_topic_unpin (struct dds_topic *tp) +{ + dds_entity_unpin (&tp->m_entity); +} - dds_entity_unpin (pe); - return topic; +void dds_topic_defer_set_qos (struct dds_topic *tp) +{ + struct dds_ktopic * const ktp = tp->m_ktopic; + struct dds_participant * const pp = dds_entity_participant (&tp->m_entity); + ddsrt_mutex_lock (&pp->m_entity.m_mutex); + ++ktp->defer_set_qos; + ddsrt_mutex_unlock (&pp->m_entity.m_mutex); +} + +void dds_topic_allow_set_qos (struct dds_topic *tp) +{ + struct dds_ktopic * const ktp = tp->m_ktopic; + struct dds_participant * const pp = dds_entity_participant (&tp->m_entity); + ddsrt_mutex_lock (&pp->m_entity.m_mutex); + assert (ktp->defer_set_qos > 0); + if (--ktp->defer_set_qos == 0) + ddsrt_cond_broadcast (&pp->m_entity.m_cond); + ddsrt_mutex_unlock (&pp->m_entity.m_mutex); } static dds_return_t dds_topic_delete (dds_entity *e) ddsrt_nonnull_all; static dds_return_t dds_topic_delete (dds_entity *e) { - dds_topic *tp = (dds_topic *) e; - dds_domain *domain = tp->m_entity.m_domain; - ddsrt_avl_dpath_t dp; - struct topic_sertopic_node *stn; - - ddsrt_mutex_lock (&dds_global.m_mutex); - - stn = ddsrt_avl_lookup_dpath (&dds_topictree_def, &domain->m_topics, tp->m_stopic, &dp); - assert (stn != NULL); - if (--stn->refc == 0) - { - ddsrt_avl_delete_dpath (&dds_topictree_def, &domain->m_topics, stn, &dp); - ddsrt_free (stn); - } - + struct dds_topic * const tp = (dds_topic *) e; + struct dds_ktopic * const ktp = tp->m_ktopic; + assert (dds_entity_kind (e->m_parent) == DDS_KIND_PARTICIPANT); + dds_participant * const pp = (dds_participant *) e->m_parent; ddsi_sertopic_unref (tp->m_stopic); - ddsrt_mutex_unlock (&dds_global.m_mutex); + + ddsrt_mutex_lock (&pp->m_entity.m_mutex); + if (--ktp->refc == 0) + { + ddsrt_avl_delete (&participant_ktopics_treedef, &pp->m_ktopics, ktp); + dds_delete_qos (ktp->qos); + ddsrt_free (ktp->name); + ddsrt_free (ktp->type_name); + dds_free (ktp); + } + ddsrt_mutex_unlock (&pp->m_entity.m_mutex); return DDS_RETCODE_OK; } static dds_return_t dds_topic_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled) { - /* note: e->m_qos is still the old one to allow for failure here */ + /* We never actually set the qos of a struct dds_topic and really shouldn't be here, + but the code to check whether set_qos is supported uses the entity's qos_set + function as a proxy. One of the weird things about the topic's set_qos is that + this is called with e == NULL. */ (void) e; (void) qos; (void) enabled; + assert (e == NULL); return DDS_RETCODE_OK; } -static bool dupdef_qos_ok (const dds_qos_t *qos, const dds_topic *tp) +static bool dupdef_qos_ok (const dds_qos_t *qos, const dds_ktopic *ktp) { - if ((qos == NULL) != (tp->m_entity.m_qos == NULL)) + if ((qos == NULL) != (ktp->qos == NULL)) return false; else if (qos == NULL) return true; else - return dds_qos_equal (tp->m_entity.m_qos, qos); -} - -static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b) -{ - if (strcmp (a->name_type_name, b->name_type_name) != 0) - return false; - if (a->serdata_basehash != b->serdata_basehash) - return false; - if (a->ops != b->ops) - return false; - if (a->serdata_ops != b->serdata_ops) - return false; - return true; -} - -static dds_return_t create_topic_topic_arbitrary_check_sertopic (dds_entity_t participant, dds_entity_t topic, struct ddsi_sertopic *sertopic, const dds_qos_t *qos) -{ - dds_topic *tp; - dds_return_t ret; - - if (dds_topic_lock (topic, &tp) < 0) - return DDS_RETCODE_NOT_FOUND; - - if (dds_entity_participant (&tp->m_entity)->m_entity.m_hdllink.hdl != participant) - ret = DDS_RETCODE_NOT_FOUND; - else if (!sertopic_equivalent (tp->m_stopic, sertopic)) - ret = DDS_RETCODE_PRECONDITION_NOT_MET; - else if (!dupdef_qos_ok (qos, tp)) - ret = DDS_RETCODE_INCONSISTENT_POLICY; - else - { - /* See dds_find_topic_check_and_add_ref */ - ret = DDS_RETCODE_OK; - } - dds_topic_unlock (tp); - return ret; + return dds_qos_equal (ktp->qos, qos); } const struct dds_entity_deriver dds_entity_deriver_topic = { @@ -280,31 +195,110 @@ const struct dds_entity_deriver dds_entity_deriver_topic = { .validate_status = dds_topic_status_validate }; +/** +* @brief Checks whether a ktopic with the same name exists in the participant, +* and if so, whether it's QoS matches or not. +* +* The set of ktopics is stored in the participant, protected by the participant's +* mutex and the internal state of these ktopics (including the QoS) is also +* protected by that mutex. +* +* @param[out] ktp_out matching ktopic if call was successful, or NULL if no +* ktopic with this name exists +* @param[in] pp pinned & locked participant +* @param[in] name topic name to look for +* @param[in] type_name type name the topic must have +* @param[in] new_qos QoS for the new topic (can be NULL) +* +* @returns success + ktopic, success + NULL or error. +* +* @retval DDS_RETCODE_OK +* ktp_out is either NULL (first attempt at creating this topic), or +* the matching ktopic entity +* @retval DDS_RETCODE_INCONSISTENT_POLICY +* a ktopic exists with differing QoS +* @retval DDS_RETCODE_PRECONDITION_NOT_MET +* a ktopic exists with a different type name +*/ +static dds_return_t lookup_and_check_ktopic (struct dds_ktopic **ktp_out, dds_participant *pp, const char *name, const char *type_name, const dds_qos_t *new_qos) +{ + struct q_globals * const gv = &pp->m_entity.m_domain->gv; + struct dds_ktopic *ktp; + if ((ktp = *ktp_out = ddsrt_avl_lookup (&participant_ktopics_treedef, &pp->m_ktopics, name)) == NULL) + { + GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: no such ktopic\n"); + return DDS_RETCODE_OK; + } + else if (strcmp (ktp->type_name, type_name) != 0) + { + GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: ktp %p typename %s mismatch\n", (void *) ktp, ktp->type_name); + return DDS_RETCODE_PRECONDITION_NOT_MET; + } + else if (!dupdef_qos_ok (new_qos, ktp)) + { + GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: ktp %p qos mismatch\n", (void *) ktp); + return DDS_RETCODE_INCONSISTENT_POLICY; + } + else + { + GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: ktp %p reuse\n", (void *) ktp); + return DDS_RETCODE_OK; + } +} + +static dds_entity_t create_topic_pp_locked (struct dds_participant *pp, struct dds_ktopic *ktp, bool implicit, struct ddsi_sertopic *sertopic_registered, const dds_listener_t *listener, const nn_plist_t *sedp_plist) +{ + dds_entity_t hdl; + dds_topic *tp = dds_alloc (sizeof (*tp)); + hdl = dds_entity_init (&tp->m_entity, &pp->m_entity, DDS_KIND_TOPIC, implicit, NULL, listener, DDS_TOPIC_STATUS_MASK); + tp->m_entity.m_iid = ddsi_iid_gen (); + dds_entity_register_child (&pp->m_entity, &tp->m_entity); + tp->m_ktopic = ktp; + tp->m_stopic = sertopic_registered; + + /* Publish Topic */ + if (sedp_plist) + { + struct participant *ddsi_pp; + nn_plist_t plist; + + thread_state_awake (lookup_thread_state (), &pp->m_entity.m_domain->gv); + ddsi_pp = entidx_lookup_participant_guid (pp->m_entity.m_domain->gv.entity_index, &pp->m_entity.m_guid); + assert (ddsi_pp); + + nn_plist_init_empty (&plist); + nn_plist_mergein_missing (&plist, sedp_plist, ~(uint64_t)0, ~(uint64_t)0); + nn_xqos_mergein_missing (&plist.qos, ktp->qos, ~(uint64_t)0); + sedp_write_topic (ddsi_pp, &plist); + nn_plist_fini (&plist); + thread_state_asleep (lookup_thread_state ()); + } + + dds_entity_init_complete (&tp->m_entity); + return hdl; +} + dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_sertopic *sertopic, const dds_qos_t *qos, const dds_listener_t *listener, const nn_plist_t *sedp_plist) { dds_return_t rc; - dds_participant *par; - dds_entity *par_ent; - dds_topic *top; + dds_participant *pp; dds_qos_t *new_qos = NULL; dds_entity_t hdl; - struct participant *ddsi_pp; + struct ddsi_sertopic *sertopic_registered; if (sertopic == NULL) return DDS_RETCODE_BAD_PARAMETER; - /* Claim participant handle so we can be sure the handle will not be - reused if we temporarily unlock the participant to check the an - existing topic's compatibility */ - if ((rc = dds_entity_pin (participant, &par_ent)) < 0) - return rc; - /* Verify that we've been given a participant, not strictly necessary - because dds_participant_lock below checks it, but this is more - obvious */ - if (dds_entity_kind (par_ent) != DDS_KIND_PARTICIPANT) { - dds_entity_unpin (par_ent); - return DDS_RETCODE_ILLEGAL_OPERATION; + dds_entity *par_ent; + if ((rc = dds_entity_pin (participant, &par_ent)) < 0) + return rc; + if (dds_entity_kind (par_ent) != DDS_KIND_PARTICIPANT) + { + dds_entity_unpin (par_ent); + return DDS_RETCODE_ILLEGAL_OPERATION; + } + pp = (struct dds_participant *) par_ent; } new_qos = dds_create_qos (); @@ -314,155 +308,76 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s * * nn_xqos_mergein_missing (new_qos, &gv.default_xqos_tp, ~(uint64_t)0); * - * but the crazy defaults of the DDS specification has a default settings - * for reliability that are dependent on the entity type: readers and + * but the crazy defaults of the DDS specification has a default setting + * for reliability that is dependent on the entity type: readers and * topics default to best-effort, but writers to reliable. * * Leaving the topic QoS sparse means a default-default topic QoS of * best-effort will do "the right thing" and let a writer still default to * reliable ... (and keep behaviour unchanged) */ - if ((rc = nn_xqos_valid (&par_ent->m_domain->gv.logconfig, new_qos)) != DDS_RETCODE_OK) - goto err_invalid_qos; - - /* FIXME: just mutex_lock ought to be good enough, but there is the - pesky "closed" check still ... */ - if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK) - goto err_lock_participant; - - bool retry_lookup; - do { - dds_entity_t topic; - - /* claim participant handle to guarantee the handle remains valid after - unlocking the participant prior to verifying the found topic still - exists */ - topic = DDS_RETCODE_PRECONDITION_NOT_MET; - ddsrt_avl_iter_t it; - for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &par->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it)) - { - if (dds_entity_kind (e) == DDS_KIND_TOPIC && strcmp (((dds_topic *) e)->m_stopic->name, sertopic->name) == 0) - { - topic = e->m_hdllink.hdl; - break; - } - } - if (topic < 0) - { - /* no topic with the name exists; we have locked the participant, and - so we can proceed with creating the topic */ - retry_lookup = false; - } - else - { - /* some topic with the same name exists; need to lock the topic to - perform the checks, but locking the topic while holding the - participant lock violates the lock order (child -> parent). So - unlock that participant and check the topic while accounting - for the various scary cases. */ - dds_participant_unlock (par); - - rc = create_topic_topic_arbitrary_check_sertopic (participant, topic, sertopic, new_qos); - switch (rc) - { - case DDS_RETCODE_OK: /* duplicate definition */ - dds_entity_unpin (par_ent); - dds_delete_qos (new_qos); - return topic; - - case DDS_RETCODE_NOT_FOUND: - /* either participant is now being deleted, topic was deleted, or - topic was deleted & the handle reused for something else -- so */ - retry_lookup = true; - break; - - case DDS_RETCODE_PRECONDITION_NOT_MET: /* incompatible sertopic */ - case DDS_RETCODE_INCONSISTENT_POLICY: /* different QoS */ - /* inconsistent definition */ - dds_entity_unpin (par_ent); - dds_delete_qos (new_qos); - return rc; - - default: - abort (); - } - - if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK) - goto err_lock_participant; - } - } while (retry_lookup); - - /* FIXME: make this a function - Add sertopic to domain -- but note that it may have been created by another thread - on another participant that is attached to the same domain */ + struct q_globals * const gv = &pp->m_entity.m_domain->gv; + if ((rc = nn_xqos_valid (&gv->logconfig, new_qos)) != DDS_RETCODE_OK) { - struct dds_domain *domain = par->m_entity.m_domain; - - ddsrt_avl_ipath_t ip; - struct topic_sertopic_node *stn; - - ddsrt_mutex_lock (&dds_global.m_mutex); - - stn = ddsrt_avl_lookup_ipath (&dds_topictree_def, &domain->m_topics, sertopic, &ip); - if (stn == NULL) - { - /* no existing definition: use new */ - stn = ddsrt_malloc (sizeof (*stn)); - stn->refc = 1; - stn->st = ddsi_sertopic_ref (sertopic); - ddsrt_avl_insert (&dds_topictree_def, &domain->m_topics, stn); - ddsrt_mutex_unlock (&dds_global.m_mutex); - } - else if (sertopic_equivalent (stn->st, sertopic)) - { - /* ok -- same definition, so use existing one instead */ - sertopic = ddsi_sertopic_ref (stn->st); - stn->refc++; - ddsrt_mutex_unlock (&dds_global.m_mutex); - } - else - { - /* bummer, delete */ - ddsrt_mutex_unlock (&dds_global.m_mutex); - rc = DDS_RETCODE_PRECONDITION_NOT_MET; - goto err_sertopic_reuse; - } + dds_delete_qos (new_qos); + dds_entity_unpin (&pp->m_entity); + return rc; } - /* Create topic */ - top = dds_alloc (sizeof (*top)); + /* See if we're allowed to create the topic; ktp is returned pinned & locked + so we can be sure it doesn't disappear and its QoS can't change */ + GVTRACE ("dds_create_topic_arbitrary (pp %p "PGUIDFMT" sertopic %p reg?%s refc %"PRIu32" %s/%s)\n", + (void *) pp, PGUID (pp->m_entity.m_guid), (void *) sertopic, sertopic->gv ? "yes" : "no", + ddsrt_atomic_ld32 (&sertopic->refc), sertopic->name, sertopic->type_name); + ddsrt_mutex_lock (&pp->m_entity.m_mutex); + struct dds_ktopic *ktp; + if ((rc = lookup_and_check_ktopic (&ktp, pp, sertopic->name, sertopic->type_name, new_qos)) != DDS_RETCODE_OK) + { + GVTRACE ("dds_create_topic_arbitrary: failed after compatibility check: %s\n", dds_strretcode (rc)); + dds_participant_unlock (pp); + dds_delete_qos (new_qos); + return rc; + } + + /* Create a ktopic if it doesn't exist yet, else reference existing one and delete the + unneeded "new_qos". */ + if (ktp == NULL) + { + ktp = dds_alloc (sizeof (*ktp)); + ktp->refc = 1; + ktp->defer_set_qos = 0; + ktp->qos = new_qos; + /* have to copy these because the ktopic can outlast any specific sertopic */ + ktp->name = ddsrt_strdup (sertopic->name); + ktp->type_name = ddsrt_strdup (sertopic->type_name); + ddsrt_avl_insert (&participant_ktopics_treedef, &pp->m_ktopics, ktp); + GVTRACE ("create_and_lock_ktopic: ktp %p\n", (void *) ktp); + } + else + { + ktp->refc++; + dds_delete_qos (new_qos); + } + + /* Sertopic: re-use a previously registered one if possible, else register this one */ + { + ddsrt_mutex_lock (&gv->sertopics_lock); + if ((sertopic_registered = ddsi_sertopic_lookup_locked (gv, sertopic)) != NULL) + GVTRACE ("dds_create_topic_arbitrary: reuse sertopic %p\n", (void *) sertopic_registered); + else + { + GVTRACE ("dds_create_topic_arbitrary: register new sertopic %p\n", (void *) sertopic); + ddsi_sertopic_register_locked (gv, sertopic); + sertopic_registered = sertopic; + } + ddsrt_mutex_unlock (&gv->sertopics_lock); + } + + /* Create topic referencing ktopic & sertopic_registered */ /* FIXME: setting "implicit" based on sertopic->ops is a hack */ - hdl = dds_entity_init (&top->m_entity, &par->m_entity, DDS_KIND_TOPIC, (sertopic->ops == &ddsi_sertopic_ops_builtintopic), new_qos, listener, DDS_TOPIC_STATUS_MASK); - top->m_entity.m_iid = ddsi_iid_gen (); - dds_entity_register_child (&par->m_entity, &top->m_entity); - top->m_stopic = sertopic; - - /* Publish Topic */ - thread_state_awake (lookup_thread_state (), &par->m_entity.m_domain->gv); - ddsi_pp = entidx_lookup_participant_guid (par->m_entity.m_domain->gv.entity_index, &par->m_entity.m_guid); - assert (ddsi_pp); - if (sedp_plist) - { - nn_plist_t plist; - nn_plist_init_empty (&plist); - nn_plist_mergein_missing (&plist, sedp_plist, ~(uint64_t)0, ~(uint64_t)0); - nn_xqos_mergein_missing (&plist.qos, new_qos, ~(uint64_t)0); - sedp_write_topic (ddsi_pp, &plist); - nn_plist_fini (&plist); - } - thread_state_asleep (lookup_thread_state ()); - - dds_entity_init_complete (&top->m_entity); - dds_participant_unlock (par); - dds_entity_unpin (par_ent); + hdl = create_topic_pp_locked (pp, ktp, (sertopic_registered->ops == &ddsi_sertopic_ops_builtintopic), sertopic_registered, listener, sedp_plist); + dds_participant_unlock (pp); + GVTRACE ("dds_create_topic_arbitrary: new topic %"PRId32"\n", hdl); return hdl; - -err_sertopic_reuse: - dds_participant_unlock (par); -err_lock_participant: -err_invalid_qos: - dds_delete_qos (new_qos); - dds_entity_unpin (par_ent); - return rc; } dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descriptor_t *desc, const char *name, const dds_qos_t *qos, const dds_listener_t *listener) @@ -482,7 +397,6 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip st = dds_alloc (sizeof (*st)); ddsi_sertopic_init (&st->c, name, desc->m_typename, &ddsi_sertopic_ops_default, desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey, (desc->m_nkeys == 0)); - st->gv = &ppent->m_domain->gv; st->native_encoding_identifier = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE); st->serpool = ppent->m_domain->gv.serpool; st->type = (void*) desc; @@ -522,6 +436,47 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip return hdl; } +dds_entity_t dds_find_topic (dds_entity_t participant, const char *name) +{ + dds_participant *pp; + dds_return_t rc; + + if (name == NULL) + return DDS_RETCODE_BAD_PARAMETER; + + if ((rc = dds_participant_lock (participant, &pp)) < 0) + return rc; + + ddsrt_avl_iter_t it; + for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &pp->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it)) + { + if (dds_entity_kind (e) != DDS_KIND_TOPIC) + continue; + + struct dds_entity *x; + if (dds_entity_pin (e->m_hdllink.hdl, &x) != DDS_RETCODE_OK) + continue; + + struct dds_topic * const tp = (struct dds_topic *) e; + if (x != e || strcmp (tp->m_ktopic->name, name) != 0) + { + dds_entity_unpin (x); + continue; + } + + struct ddsi_sertopic * const sertopic = ddsi_sertopic_ref (tp->m_stopic); + struct dds_ktopic * const ktp = tp->m_ktopic; + ktp->refc++; + dds_entity_unpin (x); + + dds_entity_t hdl = create_topic_pp_locked (pp, ktp, false, sertopic, NULL, NULL); + dds_participant_unlock (pp); + return hdl; + } + dds_participant_unlock (pp); + return DDS_RETCODE_PRECONDITION_NOT_MET; +} + static bool dds_topic_chaining_filter (const void *sample, void *ctx) { dds_topic_filter_fn realf = (dds_topic_filter_fn) ctx; @@ -594,10 +549,10 @@ dds_return_t dds_get_name (dds_entity_t topic, char *name, size_t size) if (size <= 0 || name == NULL) return DDS_RETCODE_BAD_PARAMETER; name[0] = '\0'; - if ((ret = dds_topic_lock (topic, &t)) != DDS_RETCODE_OK) + if ((ret = dds_topic_pin (topic, &t)) != DDS_RETCODE_OK) return ret; (void) snprintf (name, size, "%s", t->m_stopic->name); - dds_topic_unlock (t); + dds_topic_unpin (t); return DDS_RETCODE_OK; } @@ -608,10 +563,10 @@ dds_return_t dds_get_type_name (dds_entity_t topic, char *name, size_t size) if (size <= 0 || name == NULL) return DDS_RETCODE_BAD_PARAMETER; name[0] = '\0'; - if ((ret = dds_topic_lock (topic, &t)) != DDS_RETCODE_OK) + if ((ret = dds_topic_pin (topic, &t)) != DDS_RETCODE_OK) return ret; (void) snprintf (name, size, "%s", t->m_stopic->type_name); - dds_topic_unlock (t); + dds_topic_unpin (t); return DDS_RETCODE_OK; } diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 163d4e1..7c5040b 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -263,13 +263,11 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit { dds_return_t rc; dds_qos_t *wqos; - dds_writer *wr; - dds_entity_t writer; dds_publisher *pub = NULL; - dds_participant *pp; dds_topic *tp; dds_entity_t publisher; struct whc_writer_info *wrinfo; + bool created_implicit_pub = false; { dds_entity *p_or_p; @@ -286,6 +284,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit dds_entity_unlock (p_or_p); if ((rc = dds_publisher_lock (publisher, &pub)) < 0) return rc; + created_implicit_pub = true; break; default: dds_entity_unlock (p_or_p); @@ -295,25 +294,33 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc; - if ((rc = dds_topic_lock (topic, &tp)) != DDS_RETCODE_OK) - goto err_tp_lock; + if ((rc = dds_topic_pin (topic, &tp)) != DDS_RETCODE_OK) + goto err_pin_topic; assert (tp->m_stopic); - - pp = dds_entity_participant (&pub->m_entity); - if (pp != dds_entity_participant (&tp->m_entity)) + if (dds_entity_participant (&pub->m_entity) != dds_entity_participant (&tp->m_entity)) { rc = DDS_RETCODE_BAD_PARAMETER; goto err_pp_mismatch; } + /* Prevent set_qos on the topic until writer has been created and registered: we can't + allow a TOPIC_DATA change to ccur before the writer has been created because that + change would then not be published in the discovery/built-in topics. + + Don't keep the participant (which protects the topic's QoS) locked because that + can cause deadlocks for applications creating a reader/writer from within a + publication matched listener (whether the restrictions on what one can do in + listeners are reasonable or not, it used to work so it can be broken arbitrarily). */ + dds_topic_defer_set_qos (tp); + /* Merge Topic & Publisher qos */ wqos = dds_create_qos (); if (qos) nn_xqos_mergein_missing (wqos, qos, DDS_WRITER_QOS_MASK); if (pub->m_entity.m_qos) nn_xqos_mergein_missing (wqos, pub->m_entity.m_qos, ~(uint64_t)0); - if (tp->m_entity.m_qos) - nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0); + if (tp->m_ktopic->qos) + nn_xqos_mergein_missing (wqos, tp->m_ktopic->qos, ~(uint64_t)0); nn_xqos_mergein_missing (wqos, &pub->m_entity.m_domain->gv.default_xqos_wr, ~(uint64_t)0); if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0 || @@ -324,9 +331,8 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit } /* Create writer */ - wr = dds_alloc (sizeof (*wr)); - writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK); - + struct dds_writer * const wr = dds_alloc (sizeof (*wr)); + const dds_entity_t writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK); wr->m_topic = tp; dds_entity_add_ref_locked (&tp->m_entity); wr->m_xp = nn_xpack_new (conn, get_bandwidth_limit (wqos->transport_priority), pub->m_entity.m_domain->gv.config.xpack_send_async); @@ -336,7 +342,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit wr->whc_batch = pub->m_entity.m_domain->gv.config.whc_batch; thread_state_awake (lookup_thread_state (), &pub->m_entity.m_domain->gv); - rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr); + rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, dds_entity_participant_guid (&pub->m_entity), tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr); assert(rc == DDS_RETCODE_OK); thread_state_asleep (lookup_thread_state ()); @@ -344,16 +350,19 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit dds_entity_register_child (&pub->m_entity, &wr->m_entity); dds_entity_init_complete (&wr->m_entity); - dds_topic_unlock (tp); + + dds_topic_allow_set_qos (tp); + dds_topic_unpin (tp); dds_publisher_unlock (pub); return writer; err_bad_qos: + dds_topic_allow_set_qos (tp); err_pp_mismatch: - dds_topic_unlock (tp); -err_tp_lock: + dds_topic_unpin (tp); +err_pin_topic: dds_publisher_unlock (pub); - if ((pub->m_entity.m_flags & DDS_ENTITY_IMPLICIT) != 0) + if (created_implicit_pub) (void) dds_delete (publisher); return rc; } diff --git a/src/core/ddsc/tests/topic.c b/src/core/ddsc/tests/topic.c index d71c8d5..0142a4d 100644 --- a/src/core/ddsc/tests/topic.c +++ b/src/core/ddsc/tests/topic.c @@ -130,8 +130,12 @@ CU_Test(ddsc_topic_create, duplicate, .init=ddsc_topic_init, .fini=ddsc_topic_fi /* Creating the same topic should succeed. */ topic = dds_create_topic(g_participant, &RoundTripModule_DataType_desc, g_topicRtmDataTypeName, NULL, NULL); CU_ASSERT_FATAL(topic > 0); + CU_ASSERT_FATAL(topic != g_topicRtmDataType); ret = dds_delete(topic); CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); + /* Old topic entity should remain in existence */ + ret = dds_get_parent(g_topicRtmDataType); + CU_ASSERT(ret > 0); } /*************************************************************************************************/ @@ -201,7 +205,7 @@ CU_Test(ddsc_topic_find, valid, .init=ddsc_topic_init, .fini=ddsc_topic_fini) dds_return_t ret; topic = dds_find_topic(g_participant, g_topicRtmDataTypeName); - CU_ASSERT_EQUAL_FATAL(topic, g_topicRtmDataType); + CU_ASSERT_NOT_EQUAL_FATAL(topic, g_topicRtmDataType); ret = dds_delete(topic); CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK); diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h index 4d9b64b..0074141 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h @@ -132,6 +132,8 @@ typedef bool (*ddsi_serdata_eqkey_t) (const struct ddsi_serdata *a, const struct returning bufsize-1) if it had to truncate) */ typedef size_t (*ddsi_serdata_print_t) (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, char *buf, size_t size); +#define DDSI_SERDATA_HAS_FROM_SER_IOV 1 + struct ddsi_serdata_ops { ddsi_serdata_eqkey_t eqkey; ddsi_serdata_size_t get_size; diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_serdata_default.h b/src/core/ddsi/include/dds/ddsi/ddsi_serdata_default.h index 4aa7f7b..ae582ff 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_serdata_default.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_serdata_default.h @@ -107,7 +107,6 @@ struct ddsi_sertopic_default { struct ddsi_sertopic c; uint16_t native_encoding_identifier; /* (PL_)?CDR_(LE|BE) */ struct serdatapool *serpool; - struct q_globals *gv; struct dds_topic_descriptor * type; unsigned nkeys; diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_sertopic.h b/src/core/ddsi/include/dds/ddsi/ddsi_sertopic.h index e23333c..1085ddd 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_sertopic.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_sertopic.h @@ -23,17 +23,18 @@ extern "C" { struct ddsi_serdata; struct ddsi_serdata_ops; struct ddsi_sertopic_ops; +struct q_globals; struct ddsi_sertopic { const struct ddsi_sertopic_ops *ops; const struct ddsi_serdata_ops *serdata_ops; uint32_t serdata_basehash; bool topickind_no_key; - char *name_type_name; char *name; char *type_name; uint64_t iid; - ddsrt_atomic_uint32_t refc; /* counts refs from entities, not from data */ + struct q_globals *gv; + ddsrt_atomic_uint32_t refc; /* counts refs from entities (topic, reader, writer), not from data */ }; /* The old and the new happen to have the same memory layout on a 64-bit machine @@ -48,6 +49,27 @@ struct ddsi_sertopic { binary compatible. */ #define DDSI_SERTOPIC_HAS_TOPICKIND_NO_KEY 1 +/* Type changed: name_type_name and ii removed and gv added; and the set of + operations got extended by the a predicate for testing to sertopics (with the + same "ops") for equality ("equal") as well as a function for hashing the + non-generic part of the sertopic definition (via "hash"). These two operations + make it possible to intern sertopics without duplicates, which has become + relevant now that multiple ddsi_sertopics can be associated with a single topic + name. + + Testing for DDSI_SERTOPIC_HAS_EQUAL_AND_HASH allows one to have a single source + that can handle both variants, but there's no binary compatbility. */ +#define DDSI_SERTOPIC_HAS_EQUAL_AND_HASH 1 + +/* Called to compare two sertopics for equality, if it is already known that name, + type name, topickind_no_Key, and operations are all the same. (serdata_basehash + is computed from the set of operations.) */ +typedef bool (*ddsi_sertopic_equal_t) (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b); + +/* Hash the custom components of a sertopic (this XOR'd with a hash computed from + the fields that are defined in struct ddsi_sertopic) */ +typedef uint32_t (*ddsi_sertopic_hash_t) (const struct ddsi_sertopic *tp); + /* Called when the refcount dropped to zero */ typedef void (*ddsi_sertopic_free_t) (struct ddsi_sertopic *tp); @@ -66,15 +88,22 @@ struct ddsi_sertopic_ops { ddsi_sertopic_zero_samples_t zero_samples; ddsi_sertopic_realloc_samples_t realloc_samples; ddsi_sertopic_free_samples_t free_samples; + ddsi_sertopic_equal_t equal; + ddsi_sertopic_hash_t hash; }; +struct ddsi_sertopic *ddsi_sertopic_lookup_locked (struct q_globals *gv, const struct ddsi_sertopic *sertopic_template); +void ddsi_sertopic_register_locked (struct q_globals *gv, struct ddsi_sertopic *sertopic); + DDS_EXPORT void ddsi_sertopic_init (struct ddsi_sertopic *tp, const char *name, const char *type_name, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key); -DDS_EXPORT void ddsi_sertopic_init_anon (struct ddsi_sertopic *tp, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key); DDS_EXPORT void ddsi_sertopic_fini (struct ddsi_sertopic *tp); DDS_EXPORT struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp); DDS_EXPORT void ddsi_sertopic_unref (struct ddsi_sertopic *tp); DDS_EXPORT uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops); +DDS_EXPORT bool ddsi_sertopic_equal (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b); +DDS_EXPORT uint32_t ddsi_sertopic_hash (const struct ddsi_sertopic *tp); + DDS_EXPORT inline void ddsi_sertopic_free (struct ddsi_sertopic *tp) { tp->ops->free (tp); } diff --git a/src/core/ddsi/include/dds/ddsi/q_globals.h b/src/core/ddsi/include/dds/ddsi/q_globals.h index 12f5bb0..954867e 100644 --- a/src/core/ddsi/include/dds/ddsi/q_globals.h +++ b/src/core/ddsi/include/dds/ddsi/q_globals.h @@ -290,6 +290,9 @@ struct q_globals { struct ddsi_builtin_topic_interface *builtin_topic_interface; struct nn_group_membership *mship; + + ddsrt_mutex_t sertopics_lock; + struct ddsrt_hh *sertopics; }; #if defined (__cplusplus) diff --git a/src/core/ddsi/src/ddsi_serdata_default.c b/src/core/ddsi/src/ddsi_serdata_default.c index a2f842e..a33d3c1 100644 --- a/src/core/ddsi/src/ddsi_serdata_default.c +++ b/src/core/ddsi/src/ddsi_serdata_default.c @@ -17,6 +17,7 @@ #include "dds/ddsrt/heap.h" #include "dds/ddsrt/log.h" #include "dds/ddsrt/md5.h" +#include "dds/ddsrt/mh3.h" #include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_config.h" #include "dds/ddsi/q_freelist.h" @@ -117,59 +118,10 @@ static void serdata_default_append_blob (struct ddsi_serdata_default **d, size_t memcpy (p, data, sz); } -/* Fixed seed and length */ - -#define DDS_MH3_LEN 16 -#define DDS_MH3_SEED 0 - -#define DDS_MH3_ROTL32(x,r) (((x) << (r)) | ((x) >> (32 - (r)))) - -/* Really - http://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp, - MurmurHash3_x86_32 - */ - -static uint32_t dds_mh3 (const void * key) -{ - const uint8_t *data = (const uint8_t *) key; - const intptr_t nblocks = (intptr_t) (DDS_MH3_LEN / 4); - const uint32_t c1 = 0xcc9e2d51; - const uint32_t c2 = 0x1b873593; - - uint32_t h1 = DDS_MH3_SEED; - - const uint32_t *blocks = (const uint32_t *) (data + nblocks * 4); - register intptr_t i; - - for (i = -nblocks; i; i++) - { - uint32_t k1 = blocks[i]; - - k1 *= c1; - k1 = DDS_MH3_ROTL32 (k1, 15); - k1 *= c2; - - h1 ^= k1; - h1 = DDS_MH3_ROTL32 (h1, 13); - h1 = h1 * 5+0xe6546b64; - } - - /* finalization */ - - h1 ^= DDS_MH3_LEN; - h1 ^= h1 >> 16; - h1 *= 0x85ebca6b; - h1 ^= h1 >> 13; - h1 *= 0xc2b2ae35; - h1 ^= h1 >> 16; - - return h1; -} - static struct ddsi_serdata *fix_serdata_default(struct ddsi_serdata_default *d, uint32_t basehash) { if (d->keyhash.m_iskey) - d->c.hash = dds_mh3 (d->keyhash.m_hash) ^ basehash; + d->c.hash = ddsrt_mh3 (d->keyhash.m_hash, 16, 0) ^ basehash; else d->c.hash = *((uint32_t *)d->keyhash.m_hash) ^ basehash; return &d->c; @@ -651,8 +603,8 @@ static size_t serdata_default_print_plist (const struct ddsi_sertopic *sertopic_ src.buf = (const unsigned char *) d->data; src.bufsz = d->pos; src.encoding = d->hdr.identifier; - src.factory = tp->gv->m_factory; - src.logconfig = &tp->gv->logconfig; + src.factory = tp->c.gv->m_factory; + src.logconfig = &tp->c.gv->logconfig; src.protocol_version.major = RTPS_MAJOR; src.protocol_version.minor = RTPS_MINOR; src.strict = false; diff --git a/src/core/ddsi/src/ddsi_sertopic.c b/src/core/ddsi/src/ddsi_sertopic.c index 968a1de..4dda872 100644 --- a/src/core/ddsi/src/ddsi_sertopic.c +++ b/src/core/ddsi/src/ddsi_sertopic.c @@ -16,6 +16,8 @@ #include "dds/ddsrt/heap.h" #include "dds/ddsrt/md5.h" +#include "dds/ddsrt/mh3.h" +#include "dds/ddsrt/hopscotch.h" #include "dds/ddsrt/string.h" #include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_config.h" @@ -23,21 +25,82 @@ #include "dds/ddsi/ddsi_iid.h" #include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/ddsi_serdata.h" +#include "dds/ddsi/q_globals.h" + +bool ddsi_sertopic_equal (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b) +{ + if (strcmp (a->name, b->name) != 0) + return false; + if (strcmp (a->type_name, b->type_name) != 0) + return false; + if (a->serdata_basehash != b->serdata_basehash) + return false; + if (a->ops != b->ops) + return false; + if (a->serdata_ops != b->serdata_ops) + return false; + return a->ops->equal (a, b); +} + +uint32_t ddsi_sertopic_hash (const struct ddsi_sertopic *a) +{ + uint32_t h; + h = ddsrt_mh3 (a->name, strlen (a->name), a->serdata_basehash); + h = ddsrt_mh3 (a->type_name, strlen (a->type_name), h); + return h ^ a->ops->hash (a); +} struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *sertopic_const) { - struct ddsi_sertopic *sertopic = (struct ddsi_sertopic *)sertopic_const; + struct ddsi_sertopic *sertopic = (struct ddsi_sertopic *) sertopic_const; if (sertopic) ddsrt_atomic_inc32 (&sertopic->refc); return sertopic; } +struct ddsi_sertopic *ddsi_sertopic_lookup_locked (struct q_globals *gv, const struct ddsi_sertopic *sertopic_template) +{ + struct ddsi_sertopic *sertopic = ddsrt_hh_lookup (gv->sertopics, sertopic_template); +#ifndef NDEBUG + if (sertopic != NULL) + { + assert (sertopic->gv != NULL); + assert (sertopic->iid != 0); + } +#endif + return ddsi_sertopic_ref (sertopic); +} + +void ddsi_sertopic_register_locked (struct q_globals *gv, struct ddsi_sertopic *sertopic) +{ + assert (sertopic->gv == NULL); + assert (sertopic->iid == 0); + assert (ddsrt_atomic_ld32 (&sertopic->refc) == 1); + + (void) ddsi_sertopic_ref (sertopic); + sertopic->gv = gv; + sertopic->iid = ddsi_iid_gen (); + int x = ddsrt_hh_add (gv->sertopics, sertopic); + assert (x); + (void) x; +} + void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic) { if (sertopic) { if (ddsrt_atomic_dec32_ov (&sertopic->refc) == 1) { + /* if registered, drop from set of registered sertopics */ + if (sertopic->gv) + { + ddsrt_mutex_lock (&sertopic->gv->sertopics_lock); + (void) ddsrt_hh_remove (sertopic->gv->sertopics, sertopic); + ddsrt_mutex_unlock (&sertopic->gv->sertopics_lock); + sertopic->gv = NULL; + sertopic->iid = 0; + } + ddsi_sertopic_free (sertopic); } } @@ -46,36 +109,21 @@ void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic) void ddsi_sertopic_init (struct ddsi_sertopic *tp, const char *name, const char *type_name, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key) { ddsrt_atomic_st32 (&tp->refc, 1); - tp->iid = ddsi_iid_gen (); tp->name = ddsrt_strdup (name); tp->type_name = ddsrt_strdup (type_name); - size_t ntn_sz = strlen (tp->name) + 1 + strlen (tp->type_name) + 1; - tp->name_type_name = ddsrt_malloc (ntn_sz); - (void) snprintf (tp->name_type_name, ntn_sz, "%s/%s", tp->name, tp->type_name); - tp->ops = sertopic_ops; - tp->serdata_ops = serdata_ops; - tp->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->serdata_ops); - tp->topickind_no_key = topickind_no_key; -} - -void ddsi_sertopic_init_anon (struct ddsi_sertopic *tp, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key) -{ - ddsrt_atomic_st32 (&tp->refc, 1); - tp->iid = ddsi_iid_gen (); - tp->name = NULL; - tp->type_name = NULL; - tp->name_type_name = NULL; tp->ops = sertopic_ops; tp->serdata_ops = serdata_ops; tp->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->serdata_ops); tp->topickind_no_key = topickind_no_key; + /* set later, on registration */ + tp->iid = 0; + tp->gv = NULL; } void ddsi_sertopic_fini (struct ddsi_sertopic *tp) { ddsrt_free (tp->name); ddsrt_free (tp->type_name); - ddsrt_free (tp->name_type_name); } uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops) diff --git a/src/core/ddsi/src/ddsi_sertopic_default.c b/src/core/ddsi/src/ddsi_sertopic_default.c index 0fbfec6..0da5c5c 100644 --- a/src/core/ddsi/src/ddsi_sertopic_default.c +++ b/src/core/ddsi/src/ddsi_sertopic_default.c @@ -15,6 +15,7 @@ #include #include "dds/ddsrt/md5.h" +#include "dds/ddsrt/mh3.h" #include "dds/ddsrt/heap.h" #include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_config.h" @@ -22,6 +23,25 @@ #include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/ddsi_serdata_default.h" +static bool sertopic_default_equal (const struct ddsi_sertopic *acmn, const struct ddsi_sertopic *bcmn) +{ + const struct ddsi_sertopic_default *a = (struct ddsi_sertopic_default *) acmn; + const struct ddsi_sertopic_default *b = (struct ddsi_sertopic_default *) bcmn; + return a->type == b->type; +} + +static uint32_t sertopic_default_hash (const struct ddsi_sertopic *tpcmn) +{ + const struct ddsi_sertopic_default *tp = (struct ddsi_sertopic_default *) tpcmn; + if (tp->type == NULL) + return 0; + else + { + return ddsrt_mh3 (tp->type->m_keys, tp->type->m_nkeys * sizeof (*tp->type->m_keys), + ddsrt_mh3 (tp->type->m_ops, tp->type->m_nops * sizeof (*tp->type->m_ops), 0)); + } +} + static void sertopic_default_free (struct ddsi_sertopic *tp) { ddsi_sertopic_fini (tp); @@ -76,6 +96,8 @@ static void sertopic_default_free_samples (const struct ddsi_sertopic *sertopic_ } const struct ddsi_sertopic_ops ddsi_sertopic_ops_default = { + .equal = sertopic_default_equal, + .hash = sertopic_default_hash, .free = sertopic_default_free, .zero_samples = sertopic_default_zero_samples, .realloc_samples = sertopic_default_realloc_samples, diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 8e62b22..1deb503 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -767,7 +767,7 @@ static void wait_for_receive_threads (struct q_globals *gv) } } -static struct ddsi_sertopic *make_special_topic (struct q_globals *gv, struct serdatapool *serpool, uint16_t enc_id, const struct ddsi_serdata_ops *ops) +static struct ddsi_sertopic *make_special_topic (const char *name, struct serdatapool *serpool, uint16_t enc_id, const struct ddsi_serdata_ops *ops) { /* FIXME: two things (at least) - it claims there is a key, but the underlying type description is missing @@ -779,26 +779,34 @@ static struct ddsi_sertopic *make_special_topic (struct q_globals *gv, struct se (kinda natural if they stop being "default" ones) */ struct ddsi_sertopic_default *st = ddsrt_malloc (sizeof (*st)); memset (st, 0, sizeof (*st)); - ddsi_sertopic_init_anon (&st->c, &ddsi_sertopic_ops_default, ops, false); - st->gv = gv; + ddsi_sertopic_init (&st->c, name, name, &ddsi_sertopic_ops_default, ops, false); st->native_encoding_identifier = enc_id; st->serpool = serpool; st->nkeys = 1; return (struct ddsi_sertopic *) st; } -static void make_special_topics (struct q_globals *gv) -{ - gv->plist_topic = make_special_topic (gv, gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist); - gv->rawcdr_topic = make_special_topic (gv, gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr); -} - static void free_special_topics (struct q_globals *gv) { ddsi_sertopic_unref (gv->plist_topic); ddsi_sertopic_unref (gv->rawcdr_topic); } +static void make_special_topics (struct q_globals *gv) +{ + gv->plist_topic = make_special_topic ("plist", gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist); + gv->rawcdr_topic = make_special_topic ("rawcdr", gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr); + + ddsrt_mutex_lock (&gv->sertopics_lock); + ddsi_sertopic_register_locked (gv, gv->plist_topic); + ddsi_sertopic_register_locked (gv, gv->rawcdr_topic); + ddsrt_mutex_unlock (&gv->sertopics_lock); + + /* register increments refcount (which is reasonable), but at some point + one needs to get rid of that reference */ + free_special_topics (gv); +} + static bool use_multiple_receive_threads (const struct config *cfg) { /* Under some unknown circumstances Windows (at least Windows 10) exhibits @@ -909,6 +917,16 @@ fail: return -1; } +static int ddsi_sertopic_equal_wrap (const void *a, const void *b) +{ + return ddsi_sertopic_equal (a, b); +} + +static uint32_t ddsi_sertopic_hash_wrap (const void *tp) +{ + return ddsi_sertopic_hash (tp); +} + int rtps_init (struct q_globals *gv) { uint32_t port_disc_uc = 0; @@ -1078,6 +1096,8 @@ int rtps_init (struct q_globals *gv) make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_rd, &gv->default_xqos_rd); make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_wr, &gv->default_xqos_wr); + ddsrt_mutex_init (&gv->sertopics_lock); + gv->sertopics = ddsrt_hh_new (1, ddsi_sertopic_hash_wrap, ddsi_sertopic_equal_wrap); make_special_topics (gv); ddsrt_mutex_init (&gv->participant_set_lock); @@ -1410,6 +1430,14 @@ err_unicast_sockets: ddsrt_cond_destroy (&gv->participant_set_cond); ddsrt_mutex_destroy (&gv->participant_set_lock); free_special_topics (gv); +#ifndef NDEBUG + { + struct ddsrt_hh_iter it; + assert (ddsrt_hh_iter_first (gv->sertopics, &it) == NULL); + } +#endif + ddsrt_hh_free (gv->sertopics); + ddsrt_mutex_destroy (&gv->sertopics_lock); nn_xqos_fini (&gv->builtin_endpoint_xqos_wr); nn_xqos_fini (&gv->builtin_endpoint_xqos_rd); nn_xqos_fini (&gv->spdp_endpoint_xqos); @@ -1748,6 +1776,15 @@ void rtps_fini (struct q_globals *gv) ddsrt_cond_destroy (&gv->participant_set_cond); free_special_topics (gv); +#ifndef NDEBUG + { + struct ddsrt_hh_iter it; + assert (ddsrt_hh_iter_first (gv->sertopics, &it) == NULL); + } +#endif + ddsrt_hh_free (gv->sertopics); + ddsrt_mutex_destroy (&gv->sertopics_lock); + nn_xqos_fini (&gv->builtin_endpoint_xqos_wr); nn_xqos_fini (&gv->builtin_endpoint_xqos_rd); nn_xqos_fini (&gv->spdp_endpoint_xqos); diff --git a/src/core/xtests/rhc_torture/rhc_torture.c b/src/core/xtests/rhc_torture/rhc_torture.c index 8568eeb..757cb03 100644 --- a/src/core/xtests/rhc_torture/rhc_torture.c +++ b/src/core/xtests/rhc_torture/rhc_torture.c @@ -921,10 +921,10 @@ int main (int argc, char **argv) mainthread = lookup_thread_state (); assert (ddsrt_atomic_ldvoidp (&mainthread->gv) != NULL); { - struct dds_entity *x; - if (dds_entity_lock(tp, DDS_KIND_TOPIC, &x) < 0) abort(); - mdtopic = dds_topic_lookup(x->m_domain, "RhcTypes_T"); - dds_entity_unlock(x); + struct dds_topic *x; + if (dds_topic_pin (tp, &x) < 0) abort(); + mdtopic = ddsi_sertopic_ref (x->m_stopic); + dds_topic_unpin (x); } if (0 >= first) diff --git a/src/ddsrt/CMakeLists.txt b/src/ddsrt/CMakeLists.txt index 8525a94..db0deaa 100644 --- a/src/ddsrt/CMakeLists.txt +++ b/src/ddsrt/CMakeLists.txt @@ -111,6 +111,7 @@ list(APPEND headers "${include_path}/dds/ddsrt/endian.h" "${include_path}/dds/ddsrt/arch.h" "${include_path}/dds/ddsrt/misc.h" + "${include_path}/dds/ddsrt/mh3.h" "${include_path}/dds/ddsrt/io.h" "${include_path}/dds/ddsrt/process.h" "${include_path}/dds/ddsrt/strtod.h" @@ -127,6 +128,7 @@ list(APPEND sources "${source_path}/retcode.c" "${source_path}/strtod.c" "${source_path}/strtol.c" + "${source_path}/mh3.c" "${source_path}/avl.c" "${source_path}/expand_envvars.c" "${source_path}/fibheap.c" diff --git a/src/ddsrt/include/dds/ddsrt/mh3.h b/src/ddsrt/include/dds/ddsrt/mh3.h new file mode 100644 index 0000000..d22e387 --- /dev/null +++ b/src/ddsrt/include/dds/ddsrt/mh3.h @@ -0,0 +1,34 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#ifndef DDSRT_MH3_H +#define DDSRT_MH3_H + +#include +#include + +#include "dds/export.h" + +#if defined(__cplusplus) +extern "C" { +#endif + +DDS_EXPORT uint32_t +ddsrt_mh3( + const void *key, + size_t len, + uint32_t seed); + +#if defined(__cplusplus) +} +#endif + +#endif /* DDSRT_MH3_H */ diff --git a/src/ddsrt/src/mh3.c b/src/ddsrt/src/mh3.c new file mode 100644 index 0000000..f186e95 --- /dev/null +++ b/src/ddsrt/src/mh3.c @@ -0,0 +1,69 @@ +/* + * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v. 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License + * v. 1.0 which is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause + */ +#include "dds/ddsrt/mh3.h" + +#define DDSRT_MH3_ROTL32(x,r) (((x) << (r)) | ((x) >> (32 - (r)))) + +/* Really + http://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp, + MurmurHash3_x86_32 +*/ +uint32_t ddsrt_mh3 (const void *key, size_t len, uint32_t seed) +{ + const uint8_t *data = (const uint8_t *) key; + const intptr_t nblocks = (intptr_t) (len / 4); + const uint32_t c1 = 0xcc9e2d51; + const uint32_t c2 = 0x1b873593; + + uint32_t h1 = seed; + const uint32_t *blocks = (const uint32_t *) (data + nblocks * 4); + for (intptr_t i = -nblocks; i; i++) + { + uint32_t k1 = blocks[i]; + + k1 *= c1; + k1 = DDSRT_MH3_ROTL32 (k1, 15); + k1 *= c2; + + h1 ^= k1; + h1 = DDSRT_MH3_ROTL32 (h1, 13); + h1 = h1 * 5 + 0xe6546b64; + } + + const uint8_t *tail = data + nblocks * 4; + uint32_t k1 = 0; + switch (len & 3) + { + case 3: + k1 ^= (uint32_t) tail[2] << 16; + /* FALLS THROUGH */ + case 2: + k1 ^= (uint32_t) tail[1] << 8; + /* FALLS THROUGH */ + case 1: + k1 ^= (uint32_t) tail[0]; + k1 *= c1; + k1 = DDSRT_MH3_ROTL32 (k1, 15); + k1 *= c2; + h1 ^= k1; + /* FALLS THROUGH */ + }; + + /* finalization */ + h1 ^= (uint32_t) len; + h1 ^= h1 >> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >> 16; + return h1; +}