Relax constraints on topic entities
This commit changes the implementation of topics so that multiple topic entities can exist in a single participant for the same topic. Different entities may refer to different topic implementations (sertopics, akin to a type support in the DDS specification). All entities (for the same participant) always have the same QoS, via the new "ktopic" table in the participant. Readers and writers are bound to a topic entity and inherit its properties. If a topic comes in two definitions, say one for C and one for C++, one can have a single participant with a reader delivering the data in C representation and another reader delivering it in C++ representation. This changes the behaviour of create_topic and find_topic: these now (on successful return) always return a new entity (and thus with a unique handle), where previously these would simply return a existing one when possible. This also requires some small additions to the sertopic/serdata interface. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
08c9db0934
commit
27d7c72626
29 changed files with 946 additions and 530 deletions
|
@ -555,7 +555,9 @@ dds_set_enabled_status(dds_entity_t entity, uint32_t mask);
|
|||
* @param[in] entity Entity on which to get qos.
|
||||
* @param[out] qos Pointer to the qos structure that returns the set policies.
|
||||
*
|
||||
* @returns A dds_return_t indicating success or failure.
|
||||
* @returns A dds_return_t indicating success or failure. The QoS object will have
|
||||
* at least all QoS relevant for the entity present and the corresponding dds_qget_...
|
||||
* will return true.
|
||||
*
|
||||
* @retval DDS_RETCODE_OK
|
||||
* The existing set of QoS policy values applied to the entity
|
||||
|
@ -962,7 +964,9 @@ dds_lookup_participant(
|
|||
* @brief Creates a new topic with default type handling.
|
||||
*
|
||||
* The type name for the topic is taken from the generated descriptor. Topic
|
||||
* matching is done on a combination of topic name and type name.
|
||||
* matching is done on a combination of topic name and type name. Each successful
|
||||
* call to dds_create_topic creates a new topic entity sharing the same QoS
|
||||
* settings with all other topics of the same name.
|
||||
*
|
||||
* @param[in] participant Participant on which to create the topic.
|
||||
* @param[in] descriptor An IDL generated topic descriptor.
|
||||
|
@ -970,14 +974,20 @@ dds_lookup_participant(
|
|||
* @param[in] qos QoS to set on the new topic (can be NULL).
|
||||
* @param[in] listener Any listener functions associated with the new topic (can be NULL).
|
||||
*
|
||||
* @returns A valid topic handle or an error code.
|
||||
* @returns A valid, unique topic handle or an error code.
|
||||
*
|
||||
* @retval >=0
|
||||
* A valid topic handle.
|
||||
* A valid unique topic handle.
|
||||
* @retval DDS_RETCODE_BAD_PARAMETER
|
||||
* Either participant, descriptor, name or qos is invalid.
|
||||
* @retval DDS_RETCODE_BAD_PARAMETER
|
||||
* Either participant, descriptor, name or qos is invalid.
|
||||
* @retval DDS_RETCODE_INCONSISTENT_POLICY
|
||||
* QoS mismatch between qos and an existing topic's QoS.
|
||||
* @retval DDS_RETCODE_PRECONDITION_NOT_MET
|
||||
* Mismatch between type name in descriptor and pre-existing
|
||||
* topic's type name.
|
||||
*/
|
||||
/* TODO: Check list of retcodes is complete. */
|
||||
DDS_EXPORT dds_entity_t
|
||||
dds_create_topic(
|
||||
dds_entity_t participant,
|
||||
|
@ -992,7 +1002,13 @@ struct nn_plist;
|
|||
* @brief Creates a new topic with arbitrary type handling.
|
||||
*
|
||||
* The type name for the topic is taken from the provided "sertopic" object. Topic
|
||||
* matching is done on a combination of topic name and type name.
|
||||
* matching is done on a combination of topic name and type name. Each successful
|
||||
* call to dds_create_topic creates a new topic entity sharing the same QoS
|
||||
* settings with all other topics of the same name.
|
||||
*
|
||||
* If sertopic is not yet known in the domain, it is added and its refcount
|
||||
* incremented; if an equivalent sertopic object is already known, then the known
|
||||
* one is used instead.
|
||||
*
|
||||
* @param[in] participant Participant on which to create the topic.
|
||||
* @param[in] sertopic Internal description of the topic type (includes name).
|
||||
|
@ -1000,14 +1016,20 @@ struct nn_plist;
|
|||
* @param[in] listener Any listener functions associated with the new topic (can be NULL).
|
||||
* @param[in] sedp_plist Topic description to be published as part of discovery (if NULL, not published).
|
||||
*
|
||||
* @returns A valid topic handle or an error code.
|
||||
* @returns A valid, unique topic handle or an error code.
|
||||
*
|
||||
* @retval >=0
|
||||
* A valid topic handle.
|
||||
* A valid unique topic handle.
|
||||
* @retval DDS_RETCODE_BAD_PARAMETER
|
||||
* Either participant, descriptor, name or qos is invalid.
|
||||
* @retval DDS_RETCODE_BAD_PARAMETER
|
||||
* Either participant, descriptor, name or qos is invalid.
|
||||
* @retval DDS_RETCODE_INCONSISTENT_POLICY
|
||||
* QoS mismatch between qos and an existing topic's QoS.
|
||||
* @retval DDS_RETCODE_PRECONDITION_NOT_MET
|
||||
* Mismatch between type name in sertopic and pre-existing
|
||||
* topic's type name.
|
||||
*/
|
||||
/* TODO: Check list of retcodes is complete. */
|
||||
DDS_EXPORT dds_entity_t
|
||||
dds_create_topic_arbitrary (
|
||||
dds_entity_t participant,
|
||||
|
@ -1030,8 +1052,9 @@ dds_create_topic_arbitrary (
|
|||
* A valid topic handle.
|
||||
* @retval DDS_RETCODE_BAD_PARAMETER
|
||||
* Participant was invalid.
|
||||
* @retval DDS_RETCODE_PRECONDITION_NOT_MET
|
||||
* No topic of this name existed yet in the participant
|
||||
*/
|
||||
/* TODO: Check list of retcodes is complete. */
|
||||
DDS_EXPORT dds_entity_t
|
||||
dds_find_topic(dds_entity_t participant, const char *name);
|
||||
|
||||
|
@ -1047,8 +1070,6 @@ dds_find_topic(dds_entity_t participant, const char *name);
|
|||
* @retval DDS_RETCODE_OK
|
||||
* Success.
|
||||
*/
|
||||
/* TODO: do we need a convenience version as well that allocates and add a _s suffix to this one? */
|
||||
/* TODO: Check annotation. Could be _Out_writes_to_(size, return + 1) as well. */
|
||||
DDS_EXPORT dds_return_t
|
||||
dds_get_name(dds_entity_t topic, char *name, size_t size);
|
||||
|
||||
|
|
|
@ -112,6 +112,7 @@ typedef enum dds_entity_kind
|
|||
DDS_KIND_DOMAIN,
|
||||
DDS_KIND_CYCLONEDDS
|
||||
} dds_entity_kind_t;
|
||||
#define DDS_KIND_MAX DDS_KIND_CYCLONEDDS
|
||||
|
||||
/* Handles are opaque pointers to implementation types */
|
||||
typedef uint64_t dds_instance_handle_t;
|
||||
|
|
|
@ -86,7 +86,8 @@ DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status);
|
|||
|
||||
DDS_EXPORT void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst);
|
||||
|
||||
DDS_EXPORT dds_participant *dds_entity_participant (dds_entity *e);
|
||||
DDS_EXPORT dds_participant *dds_entity_participant (const dds_entity *e);
|
||||
DDS_EXPORT const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e);
|
||||
DDS_EXPORT void dds_entity_final_deinit_before_free (dds_entity *e);
|
||||
DDS_EXPORT bool dds_entity_in_scope (const dds_entity *e, const dds_entity *root);
|
||||
|
||||
|
|
|
@ -20,6 +20,8 @@ extern "C" {
|
|||
|
||||
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_participant, DDS_KIND_PARTICIPANT)
|
||||
|
||||
extern const ddsrt_avl_treedef_t participant_ktopics_treedef;
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -37,13 +37,12 @@ struct q_globals;
|
|||
struct ddsi_sertopic_builtintopic {
|
||||
struct ddsi_sertopic c;
|
||||
enum ddsi_sertopic_builtintopic_type type;
|
||||
struct q_globals *gv;
|
||||
};
|
||||
|
||||
extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic;
|
||||
extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic;
|
||||
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename, struct q_globals *gv);
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
|
|
|
@ -21,9 +21,13 @@ extern "C" {
|
|||
|
||||
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_topic, DDS_KIND_TOPIC)
|
||||
|
||||
DDS_EXPORT struct ddsi_sertopic * dds_topic_lookup (dds_domain * domain, const char * name) ddsrt_nonnull_all;
|
||||
DDS_EXPORT void dds_topic_free (dds_domainid_t domainid, struct ddsi_sertopic * st) ddsrt_nonnull_all;
|
||||
|
||||
DDS_EXPORT dds_return_t dds_topic_pin (dds_entity_t handle, struct dds_topic **tp) ddsrt_nonnull_all;
|
||||
DDS_EXPORT void dds_topic_unpin (struct dds_topic *tp) ddsrt_nonnull_all;
|
||||
DDS_EXPORT void dds_topic_defer_set_qos (struct dds_topic *tp) ddsrt_nonnull_all;
|
||||
DDS_EXPORT void dds_topic_allow_set_qos (struct dds_topic *tp) ddsrt_nonnull_all;
|
||||
|
||||
#ifndef DDS_TOPIC_INTERN_FILTER_FN_DEFINED
|
||||
#define DDS_TOPIC_INTERN_FILTER_FN_DEFINED
|
||||
typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx);
|
||||
|
|
|
@ -34,6 +34,7 @@ struct dds_writer;
|
|||
struct dds_publisher;
|
||||
struct dds_subscriber;
|
||||
struct dds_topic;
|
||||
struct dds_ktopic;
|
||||
struct dds_readcond;
|
||||
struct dds_guardcond;
|
||||
struct dds_statuscond;
|
||||
|
@ -127,7 +128,7 @@ typedef struct dds_entity {
|
|||
ddsrt_avl_node_t m_avlnode_child; /* [m_mutex of m_parent] */
|
||||
ddsrt_avl_tree_t m_children; /* [m_mutex] tree on m_iid using m_avlnode_child */
|
||||
struct dds_domain *m_domain; /* constant */
|
||||
dds_qos_t *m_qos; /* [m_mutex] */
|
||||
dds_qos_t *m_qos; /* [m_mutex]; null for topics (they rely on correpsonding "ktopic") (+waitset,domain,&c.) */
|
||||
ddsi_guid_t m_guid; /* unique (if not 0) and constant; FIXME: set during creation, but possibly after becoming visible */
|
||||
dds_instance_handle_t m_iid; /* unique for all time, constant; FIXME: like GUID */
|
||||
uint32_t m_flags; /* [m_mutex] */
|
||||
|
@ -215,7 +216,6 @@ typedef struct dds_domain {
|
|||
|
||||
ddsrt_avl_node_t m_node; /* for dds_global.m_domains */
|
||||
dds_domainid_t m_id;
|
||||
ddsrt_avl_tree_t m_topics;
|
||||
struct cfgst *cfgst;
|
||||
|
||||
struct ddsi_sertopic *builtin_participant_topic;
|
||||
|
@ -238,14 +238,31 @@ typedef struct dds_publisher {
|
|||
struct dds_entity m_entity;
|
||||
} dds_publisher;
|
||||
|
||||
typedef struct dds_ktopic {
|
||||
/* name -> <type_name, QoS> mapping for topics, part of the participant
|
||||
and protected by the participant's lock (including the actual QoS
|
||||
setting)
|
||||
|
||||
defer_set_qos is used to implement an intentionally unfair single-writer/
|
||||
multiple-reader lock using the participant's lock & cond var: set_qos
|
||||
"write-locks" it, create_reader and create_writer "read-lock" it. */
|
||||
ddsrt_avl_node_t pp_ktopics_avlnode;
|
||||
uint32_t refc;
|
||||
uint32_t defer_set_qos; /* set_qos must wait for this to be 0 */
|
||||
dds_qos_t *qos;
|
||||
char *name; /* [constant] */
|
||||
char *type_name; /* [constant] */
|
||||
} dds_ktopic;
|
||||
|
||||
typedef struct dds_participant {
|
||||
struct dds_entity m_entity;
|
||||
dds_entity_t m_builtin_subscriber;
|
||||
ddsrt_avl_tree_t m_ktopics; /* [m_entity.m_mutex] */
|
||||
} dds_participant;
|
||||
|
||||
typedef struct dds_reader {
|
||||
struct dds_entity m_entity;
|
||||
struct dds_topic *m_topic;
|
||||
struct dds_topic *m_topic; /* refc'd, constant, lock(rd) -> lock(tp) allowed */
|
||||
struct dds_rhc *m_rhc; /* aliases m_rd->rhc with a wider interface, FIXME: but m_rd owns it for resource management */
|
||||
struct reader *m_rd;
|
||||
bool m_data_on_readers;
|
||||
|
@ -265,7 +282,7 @@ typedef struct dds_reader {
|
|||
|
||||
typedef struct dds_writer {
|
||||
struct dds_entity m_entity;
|
||||
struct dds_topic *m_topic;
|
||||
struct dds_topic *m_topic; /* refc'd, constant, lock(wr) -> lock(tp) allowed */
|
||||
struct nn_xpack *m_xp;
|
||||
struct writer *m_wr;
|
||||
struct whc *m_whc; /* FIXME: ownership still with underlying DDSI writer (cos of DDSI built-in writers )*/
|
||||
|
@ -287,6 +304,7 @@ typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx);
|
|||
typedef struct dds_topic {
|
||||
struct dds_entity m_entity;
|
||||
struct ddsi_sertopic *m_stopic;
|
||||
struct dds_ktopic *m_ktopic; /* refc'd, constant */
|
||||
|
||||
dds_topic_intern_filter_fn filter_fn;
|
||||
void *filter_ctx;
|
||||
|
|
|
@ -242,6 +242,13 @@ static void dds__builtin_write (const struct entity_common *e, nn_wctime_t times
|
|||
}
|
||||
}
|
||||
|
||||
static void unref_builtin_topics (struct dds_domain *dom)
|
||||
{
|
||||
ddsi_sertopic_unref (dom->builtin_participant_topic);
|
||||
ddsi_sertopic_unref (dom->builtin_reader_topic);
|
||||
ddsi_sertopic_unref (dom->builtin_writer_topic);
|
||||
}
|
||||
|
||||
void dds__builtin_init (struct dds_domain *dom)
|
||||
{
|
||||
dds_qos_t *qos = dds__create_builtin_qos ();
|
||||
|
@ -253,9 +260,15 @@ void dds__builtin_init (struct dds_domain *dom)
|
|||
dom->btif.builtintopic_write = dds__builtin_write;
|
||||
dom->gv.builtin_topic_interface = &dom->btif;
|
||||
|
||||
dom->builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant", &dom->gv);
|
||||
dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription", &dom->gv);
|
||||
dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication", &dom->gv);
|
||||
dom->builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
|
||||
dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
|
||||
dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
|
||||
|
||||
ddsrt_mutex_lock (&dom->gv.sertopics_lock);
|
||||
ddsi_sertopic_register_locked (&dom->gv, dom->builtin_participant_topic);
|
||||
ddsi_sertopic_register_locked (&dom->gv, dom->builtin_reader_topic);
|
||||
ddsi_sertopic_register_locked (&dom->gv, dom->builtin_writer_topic);
|
||||
ddsrt_mutex_unlock (&dom->gv.sertopics_lock);
|
||||
|
||||
thread_state_awake (lookup_thread_state (), &dom->gv);
|
||||
const struct entity_index *gh = dom->gv.entity_index;
|
||||
|
@ -265,6 +278,11 @@ void dds__builtin_init (struct dds_domain *dom)
|
|||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
dds_delete_qos (qos);
|
||||
|
||||
/* ddsi_sertopic_init initializes the refcount to 1 and dds_sertopic_register_locked increments
|
||||
it. All "real" references (such as readers and writers) are also accounted for in the
|
||||
reference count, so we have an excess reference here. */
|
||||
unref_builtin_topics (dom);
|
||||
}
|
||||
|
||||
void dds__builtin_fini (struct dds_domain *dom)
|
||||
|
@ -275,8 +293,5 @@ void dds__builtin_fini (struct dds_domain *dom)
|
|||
delete_local_orphan_writer (dom->builtintopic_writer_publications);
|
||||
delete_local_orphan_writer (dom->builtintopic_writer_subscriptions);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
ddsi_sertopic_unref (dom->builtin_participant_topic);
|
||||
ddsi_sertopic_unref (dom->builtin_reader_topic);
|
||||
ddsi_sertopic_unref (dom->builtin_writer_topic);
|
||||
unref_builtin_topics (dom);
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
#include "dds/ddsrt/process.h"
|
||||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds/ddsrt/hopscotch.h"
|
||||
#include "dds__init.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds__domain.h"
|
||||
|
@ -59,7 +60,6 @@ static dds_entity_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
domain->m_entity.m_iid = ddsi_iid_gen ();
|
||||
|
||||
domain->gv.tstart = now ();
|
||||
ddsrt_avl_init (&dds_topictree_def, &domain->m_topics);
|
||||
|
||||
/* | domain_id | domain id in config | result
|
||||
+-----------+---------------------+----------
|
||||
|
|
|
@ -135,11 +135,65 @@ static bool entity_has_status (const dds_entity *e)
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool entity_may_have_children (const dds_entity *e)
|
||||
{
|
||||
switch (e->m_kind)
|
||||
{
|
||||
case DDS_KIND_TOPIC:
|
||||
return false;
|
||||
case DDS_KIND_READER:
|
||||
case DDS_KIND_WRITER:
|
||||
case DDS_KIND_PUBLISHER:
|
||||
case DDS_KIND_SUBSCRIBER:
|
||||
case DDS_KIND_PARTICIPANT:
|
||||
case DDS_KIND_COND_READ:
|
||||
case DDS_KIND_COND_QUERY:
|
||||
case DDS_KIND_COND_GUARD:
|
||||
case DDS_KIND_WAITSET:
|
||||
case DDS_KIND_DOMAIN:
|
||||
case DDS_KIND_CYCLONEDDS:
|
||||
break;
|
||||
case DDS_KIND_DONTCARE:
|
||||
abort ();
|
||||
break;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
#ifndef NDEBUG
|
||||
static bool entity_kind_has_qos (dds_entity_kind_t kind)
|
||||
{
|
||||
switch (kind)
|
||||
{
|
||||
case DDS_KIND_READER:
|
||||
case DDS_KIND_WRITER:
|
||||
case DDS_KIND_PUBLISHER:
|
||||
case DDS_KIND_SUBSCRIBER:
|
||||
case DDS_KIND_PARTICIPANT:
|
||||
return true;
|
||||
case DDS_KIND_TOPIC:
|
||||
case DDS_KIND_COND_READ:
|
||||
case DDS_KIND_COND_QUERY:
|
||||
case DDS_KIND_COND_GUARD:
|
||||
case DDS_KIND_WAITSET:
|
||||
case DDS_KIND_DOMAIN:
|
||||
case DDS_KIND_CYCLONEDDS:
|
||||
break;
|
||||
case DDS_KIND_DONTCARE:
|
||||
abort ();
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind_t kind, bool implicit, dds_qos_t *qos, const dds_listener_t *listener, status_mask_t mask)
|
||||
{
|
||||
dds_handle_t handle;
|
||||
|
||||
/* CycloneDDS is at the root of the hierarchy */
|
||||
assert ((kind == DDS_KIND_CYCLONEDDS) == (parent == NULL));
|
||||
assert (entity_kind_has_qos (kind) == (qos != NULL));
|
||||
assert (e);
|
||||
|
||||
e->m_kind = kind;
|
||||
|
@ -196,7 +250,7 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
|
|||
{
|
||||
/* for topics, refc counts readers/writers, for all others, it counts children (this we can get away with
|
||||
as long as topics can't have children) */
|
||||
if ((handle = dds_handle_create (&e->m_hdllink, implicit, (kind != DDS_KIND_TOPIC))) <= 0)
|
||||
if ((handle = dds_handle_create (&e->m_hdllink, implicit, entity_may_have_children (e))) <= 0)
|
||||
return (dds_entity_t) handle;
|
||||
}
|
||||
|
||||
|
@ -219,42 +273,41 @@ void dds_entity_register_child (dds_entity *parent, dds_entity *child)
|
|||
dds_entity_add_ref_locked (parent);
|
||||
}
|
||||
|
||||
static dds_entity *get_first_child (ddsrt_avl_tree_t *remaining_children, bool ignore_topics)
|
||||
static dds_entity *get_next_child (ddsrt_avl_tree_t *remaining_children, uint32_t allowed_kinds, uint64_t *cursor)
|
||||
{
|
||||
ddsrt_avl_iter_t it;
|
||||
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, remaining_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
for (dds_entity *e = ddsrt_avl_iter_succ (&dds_entity_children_td, remaining_children, &it, cursor); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
if ((!ignore_topics) || (dds_entity_kind(e) != DDS_KIND_TOPIC))
|
||||
dds_entity_kind_t kind = dds_entity_kind (e);
|
||||
if ((1u << (uint32_t) kind) & allowed_kinds)
|
||||
return e;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void delete_children(struct dds_entity *parent, bool ignore_topics)
|
||||
static void delete_children (struct dds_entity *parent, uint32_t allowed_kinds)
|
||||
{
|
||||
dds_entity *child;
|
||||
dds_return_t ret;
|
||||
uint64_t cursor = 0;
|
||||
ddsrt_mutex_lock (&parent->m_mutex);
|
||||
while ((child = get_first_child(&parent->m_children, ignore_topics)) != NULL)
|
||||
while ((child = get_next_child (&parent->m_children, allowed_kinds, &cursor)) != NULL)
|
||||
{
|
||||
dds_entity_t child_handle = child->m_hdllink.hdl;
|
||||
cursor = child->m_iid;
|
||||
|
||||
/* The child will remove itself from the parent->m_children list. */
|
||||
ddsrt_mutex_unlock (&parent->m_mutex);
|
||||
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT);
|
||||
assert (ret == DDS_RETCODE_OK || ret == DDS_RETCODE_BAD_PARAMETER);
|
||||
(void) ret;
|
||||
ddsrt_mutex_lock (&parent->m_mutex);
|
||||
|
||||
/* The dds_delete can fail if the child is being deleted in parallel,
|
||||
* in which case: wait when its not deleted yet.
|
||||
* The child will trigger the condition after it removed itself from
|
||||
* the childrens list. */
|
||||
if ((ret == DDS_RETCODE_BAD_PARAMETER) &&
|
||||
(get_first_child(&parent->m_children, ignore_topics) == child))
|
||||
{
|
||||
/* The dds_delete can fail if the child is being deleted in parallel, in which case:
|
||||
wait until it is has gone. */
|
||||
if (ddsrt_avl_lookup (&dds_entity_children_td, &parent->m_children, &cursor) != NULL)
|
||||
ddsrt_cond_wait (&parent->m_cond, &parent->m_mutex);
|
||||
}
|
||||
}
|
||||
ddsrt_mutex_unlock (&parent->m_mutex);
|
||||
}
|
||||
|
||||
|
@ -283,12 +336,20 @@ static const char *entity_kindstr (dds_entity_kind_t kind)
|
|||
|
||||
static void print_delete (const dds_entity *e, enum delete_impl_state delstate , dds_instance_handle_t iid)
|
||||
{
|
||||
if (e)
|
||||
{
|
||||
unsigned cm = ddsrt_atomic_ld32 (&e->m_hdllink.cnt_flags);
|
||||
printf ("delete(%p, delstate %s, iid %"PRIx64"): %s%s %d pin %u refc %u %s %s\n",
|
||||
(void *) e, (delstate == DIS_IMPLICIT) ? "implicit" : (delstate == DIS_EXPLICIT) ? "explicit" : "from_parent", iid,
|
||||
entity_kindstr (e->m_kind), (e->m_flags & DDS_ENTITY_IMPLICIT) ? " [implicit]" : "",
|
||||
e->m_hdllink.hdl, cm & 0xfff, (cm >> 12) & 0x7fff, (cm & 0x80000000) ? "closed" : "open",
|
||||
ddsrt_avl_is_empty (&e->m_children) ? "childless" : "has-children");
|
||||
}
|
||||
else
|
||||
{
|
||||
printf ("delete(%p, delstate %s, handle %"PRId64"): pin failed\n",
|
||||
(void *) e, (delstate == DIS_IMPLICIT) ? "implicit" : (delstate == DIS_EXPLICIT) ? "explicit" : "from_parent", iid);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -315,7 +376,12 @@ static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state
|
|||
else if (ret == DDS_RETCODE_TRY_AGAIN) /* non-child refs exist */
|
||||
return DDS_RETCODE_OK;
|
||||
else
|
||||
{
|
||||
#if TRACE_DELETE
|
||||
print_delete (NULL, delstate, (uint64_t) entity);
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delstate)
|
||||
|
@ -392,8 +458,15 @@ static dds_return_t really_delete_pinned_closed_locked (struct dds_entity *e, en
|
|||
*
|
||||
* To circumvent the problem. We ignore topics in the first loop.
|
||||
*/
|
||||
delete_children(e, true /* ignore topics */);
|
||||
delete_children(e, false /* delete topics */);
|
||||
DDSRT_STATIC_ASSERT ((uint32_t) DDS_KIND_MAX < 32);
|
||||
static const uint32_t disallowed_kinds[] = {
|
||||
1u << (uint32_t) DDS_KIND_TOPIC,
|
||||
(uint32_t) 0
|
||||
};
|
||||
for (size_t i = 0; i < sizeof (disallowed_kinds) / sizeof (disallowed_kinds[0]); i++)
|
||||
{
|
||||
delete_children (e, ~disallowed_kinds[i]);
|
||||
}
|
||||
|
||||
/* The dds_handle_delete will wait until the last active claim on that handle is
|
||||
released. It is possible that this last release will be done by a thread that was
|
||||
|
@ -476,13 +549,20 @@ dds_entity_t dds_get_parent (dds_entity_t entity)
|
|||
}
|
||||
}
|
||||
|
||||
dds_participant *dds_entity_participant (dds_entity *e)
|
||||
dds_participant *dds_entity_participant (const dds_entity *e)
|
||||
{
|
||||
while (e && dds_entity_kind (e) != DDS_KIND_PARTICIPANT)
|
||||
e = e->m_parent;
|
||||
return (dds_participant *) e;
|
||||
}
|
||||
|
||||
const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e)
|
||||
{
|
||||
struct dds_participant const * const pp = dds_entity_participant (e);
|
||||
assert (pp != NULL);
|
||||
return &pp->m_entity.m_guid;
|
||||
}
|
||||
|
||||
dds_entity_t dds_get_participant (dds_entity_t entity)
|
||||
{
|
||||
dds_entity *e;
|
||||
|
@ -577,51 +657,124 @@ dds_return_t dds_get_qos (dds_entity_t entity, dds_qos_t *qos)
|
|||
ret = DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
else
|
||||
{
|
||||
dds_qos_t *entity_qos;
|
||||
if (dds_entity_kind (e) != DDS_KIND_TOPIC)
|
||||
entity_qos = e->m_qos;
|
||||
else
|
||||
{
|
||||
struct dds_topic * const tp = (dds_topic *) e;
|
||||
struct dds_participant * const pp = dds_entity_participant (e);
|
||||
ddsrt_mutex_lock (&pp->m_entity.m_mutex);
|
||||
entity_qos = tp->m_ktopic->qos;
|
||||
ddsrt_mutex_unlock (&pp->m_entity.m_mutex);
|
||||
}
|
||||
|
||||
dds_reset_qos (qos);
|
||||
nn_xqos_mergein_missing (qos, e->m_qos, ~(QP_TOPIC_NAME | QP_TYPE_NAME));
|
||||
nn_xqos_mergein_missing (qos, entity_qos, ~(QP_TOPIC_NAME | QP_TYPE_NAME));
|
||||
ret = DDS_RETCODE_OK;
|
||||
}
|
||||
dds_entity_unlock(e);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static dds_return_t dds_set_qos_locked_impl (dds_entity *e, const dds_qos_t *qos, uint64_t mask)
|
||||
static dds_return_t dds_set_qos_locked_raw (dds_entity *e, dds_qos_t **e_qos_ptr, bool e_enabled, const dds_qos_t *qos, uint64_t mask, const struct ddsrt_log_cfg *logcfg, dds_return_t (*set_qos) (struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all)
|
||||
{
|
||||
dds_return_t ret;
|
||||
|
||||
/* Any attempt to do this on a topic ends up doing it on the ktopic instead, so that there is
|
||||
but a single QoS for a topic in a participant while there can be multiple definitions of it,
|
||||
and hence, multiple sertopics. Those are needed for multi-language support. */
|
||||
dds_qos_t *newqos = dds_create_qos ();
|
||||
nn_xqos_mergein_missing (newqos, qos, mask);
|
||||
nn_xqos_mergein_missing (newqos, e->m_qos, ~(uint64_t)0);
|
||||
if ((ret = nn_xqos_valid (&e->m_domain->gv.logconfig, newqos)) != DDS_RETCODE_OK)
|
||||
; /* oops ... invalid or inconsistent */
|
||||
else if (!(e->m_flags & DDS_ENTITY_ENABLED))
|
||||
; /* do as you please while the entity is not enabled (perhaps we should even allow invalid ones?) */
|
||||
nn_xqos_mergein_missing (newqos, *e_qos_ptr, ~(uint64_t)0);
|
||||
if ((ret = nn_xqos_valid (logcfg, newqos)) != DDS_RETCODE_OK)
|
||||
{
|
||||
/* invalid or inconsistent QoS settings */
|
||||
goto error_or_nochange;
|
||||
}
|
||||
else if (!e_enabled)
|
||||
{
|
||||
/* do as you please while the entity is not enabled */
|
||||
}
|
||||
else
|
||||
{
|
||||
const uint64_t delta = nn_xqos_delta (e->m_qos, newqos, ~(uint64_t)0);
|
||||
if (delta == 0) /* no change */
|
||||
ret = DDS_RETCODE_OK;
|
||||
const uint64_t delta = nn_xqos_delta (*e_qos_ptr, newqos, ~(uint64_t)0);
|
||||
if (delta == 0)
|
||||
{
|
||||
/* new settings are identical to the old */
|
||||
goto error_or_nochange;
|
||||
}
|
||||
else if (delta & ~QP_CHANGEABLE_MASK)
|
||||
ret = DDS_RETCODE_IMMUTABLE_POLICY;
|
||||
else if (delta & (QP_RXO_MASK | QP_PARTITION))
|
||||
ret = DDS_RETCODE_UNSUPPORTED; /* not yet supporting things that affect matching */
|
||||
else
|
||||
{
|
||||
/* yay! */
|
||||
/* not all QoS may be changed according to the spec */
|
||||
ret = DDS_RETCODE_IMMUTABLE_POLICY;
|
||||
goto error_or_nochange;
|
||||
}
|
||||
else if (delta & (QP_RXO_MASK | QP_PARTITION))
|
||||
{
|
||||
/* Cyclone doesn't (yet) support changing QoS that affect matching. Simply re-doing the
|
||||
matching is easy enough, but the consequences are very weird. E.g., what is the
|
||||
expectation if a transient-local writer has published data while its partition QoS is set
|
||||
to A, and then changes its partition to B? Should a reader in B get the data originally
|
||||
published in A?
|
||||
|
||||
One can do the same thing with other RxO QoS settings, e.g., the latency budget setting.
|
||||
I find that weird, and I'd rather have sane answers to these questions than set up these
|
||||
traps and pitfalls for people to walk into ...
|
||||
*/
|
||||
ret = DDS_RETCODE_UNSUPPORTED;
|
||||
goto error_or_nochange;
|
||||
}
|
||||
}
|
||||
|
||||
if (ret != DDS_RETCODE_OK)
|
||||
dds_delete_qos (newqos);
|
||||
else if ((ret = dds_entity_deriver_set_qos (e, newqos, e->m_flags & DDS_ENTITY_ENABLED)) != DDS_RETCODE_OK)
|
||||
dds_delete_qos (newqos);
|
||||
assert (ret == DDS_RETCODE_OK);
|
||||
if ((ret = set_qos (e, newqos, e_enabled)) != DDS_RETCODE_OK)
|
||||
goto error_or_nochange;
|
||||
else
|
||||
{
|
||||
dds_delete_qos (e->m_qos);
|
||||
e->m_qos = newqos;
|
||||
dds_delete_qos (*e_qos_ptr);
|
||||
*e_qos_ptr = newqos;
|
||||
}
|
||||
return DDS_RETCODE_OK;
|
||||
|
||||
error_or_nochange:
|
||||
dds_delete_qos (newqos);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static dds_return_t dds_set_qos_locked_impl (dds_entity *e, const dds_qos_t *qos, uint64_t mask)
|
||||
{
|
||||
const struct ddsrt_log_cfg *logcfg = &e->m_domain->gv.logconfig;
|
||||
dds_entity_kind_t kind = dds_entity_kind (e);
|
||||
if (kind != DDS_KIND_TOPIC)
|
||||
{
|
||||
return dds_set_qos_locked_raw (e, &e->m_qos, (e->m_flags & DDS_ENTITY_ENABLED) != 0, qos, mask, logcfg, dds_entity_deriver_table[kind]->set_qos);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Topics must be enabled for now (all are currently, so for now it is not a meaningful limitation):
|
||||
there can only be a single QoS (or different versions with the same name can have different QoS -
|
||||
in particular a different value for TOPIC_DATA - and therefore the idea that it is a free-for-all
|
||||
on the QoS for a disabled entity falls apart for topics.
|
||||
|
||||
FIXME: topic should have a QoS object while still disabled */
|
||||
assert (e->m_flags & DDS_ENTITY_ENABLED);
|
||||
struct dds_topic * const tp = (struct dds_topic *) e;
|
||||
struct dds_participant * const pp = dds_entity_participant (e);
|
||||
struct dds_ktopic * const ktp = tp->m_ktopic;
|
||||
dds_return_t rc;
|
||||
ddsrt_mutex_lock (&pp->m_entity.m_mutex);
|
||||
while (ktp->defer_set_qos != 0)
|
||||
ddsrt_cond_wait (&pp->m_entity.m_cond, &pp->m_entity.m_mutex);
|
||||
|
||||
/* dds_entity_deriver_table[kind]->set_qos had better avoid looking at the entity! */
|
||||
rc = dds_set_qos_locked_raw (NULL, &ktp->qos, (e->m_flags & DDS_ENTITY_ENABLED) != 0, qos, mask, logcfg, dds_entity_deriver_table[kind]->set_qos);
|
||||
|
||||
ddsrt_mutex_unlock (&pp->m_entity.m_mutex);
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
static void pushdown_pubsub_qos (dds_entity *e)
|
||||
{
|
||||
/* e claimed but no mutex held */
|
||||
|
@ -650,7 +803,7 @@ static void pushdown_pubsub_qos (dds_entity *e)
|
|||
ddsrt_mutex_unlock (&e->m_mutex);
|
||||
}
|
||||
|
||||
static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp)
|
||||
static void pushdown_topic_qos (dds_entity *e, struct dds_ktopic *ktp)
|
||||
{
|
||||
/* on input: both entities claimed but no mutexes held */
|
||||
enum { NOP, PROP, CHANGE } todo;
|
||||
|
@ -658,12 +811,12 @@ static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp)
|
|||
{
|
||||
case DDS_KIND_READER: {
|
||||
dds_reader *rd = (dds_reader *) e;
|
||||
todo = (&rd->m_topic->m_entity == tp) ? CHANGE : NOP;
|
||||
todo = (rd->m_topic->m_ktopic == ktp) ? CHANGE : NOP;
|
||||
break;
|
||||
}
|
||||
case DDS_KIND_WRITER: {
|
||||
dds_writer *wr = (dds_writer *) e;
|
||||
todo = (&wr->m_topic->m_entity == tp) ? CHANGE : NOP;
|
||||
todo = (wr->m_topic->m_ktopic == ktp) ? CHANGE : NOP;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
|
@ -677,10 +830,11 @@ static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp)
|
|||
break;
|
||||
case CHANGE: {
|
||||
/* may lock topic while holding reader/writer lock */
|
||||
struct dds_participant * const pp = dds_entity_participant (e);
|
||||
ddsrt_mutex_lock (&e->m_mutex);
|
||||
ddsrt_mutex_lock (&tp->m_mutex);
|
||||
dds_set_qos_locked_impl (e, tp->m_qos, QP_TOPIC_DATA);
|
||||
ddsrt_mutex_unlock (&tp->m_mutex);
|
||||
ddsrt_mutex_lock (&pp->m_entity.m_mutex);
|
||||
dds_set_qos_locked_impl (e, ktp->qos, QP_TOPIC_DATA);
|
||||
ddsrt_mutex_unlock (&pp->m_entity.m_mutex);
|
||||
ddsrt_mutex_unlock (&e->m_mutex);
|
||||
break;
|
||||
}
|
||||
|
@ -697,7 +851,7 @@ static void pushdown_topic_qos (dds_entity *e, struct dds_entity *tp)
|
|||
assert (x == c);
|
||||
/* see dds_get_children for why "c" remains valid despite unlocking m_mutex */
|
||||
ddsrt_mutex_unlock (&e->m_mutex);
|
||||
pushdown_topic_qos (c, tp);
|
||||
pushdown_topic_qos (c, ktp);
|
||||
ddsrt_mutex_lock (&e->m_mutex);
|
||||
dds_entity_unpin (c);
|
||||
}
|
||||
|
@ -740,7 +894,8 @@ dds_return_t dds_set_qos (dds_entity_t entity, const dds_qos_t *qos)
|
|||
assert (dds_entity_kind (e->m_parent) == DDS_KIND_PARTICIPANT);
|
||||
if (dds_entity_pin (e->m_parent->m_hdllink.hdl, &pp) == DDS_RETCODE_OK)
|
||||
{
|
||||
pushdown_topic_qos (pp, e);
|
||||
struct dds_topic *tp = (struct dds_topic *) e;
|
||||
pushdown_topic_qos (pp, tp->m_ktopic);
|
||||
dds_entity_unpin (pp);
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
*/
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "dds/ddsrt/cdtors.h"
|
||||
#include "dds/ddsrt/environ.h"
|
||||
|
@ -30,6 +31,13 @@ DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_participant)
|
|||
|
||||
#define DDS_PARTICIPANT_STATUS_MASK (0u)
|
||||
|
||||
static int cmp_ktopic_name (const void *a, const void *b)
|
||||
{
|
||||
return strcmp (a, b);
|
||||
}
|
||||
|
||||
const ddsrt_avl_treedef_t participant_ktopics_treedef = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY(offsetof (struct dds_ktopic, pp_ktopics_avlnode), offsetof (struct dds_ktopic, name), cmp_ktopic_name, 0);
|
||||
|
||||
static dds_return_t dds_participant_status_validate (uint32_t mask)
|
||||
{
|
||||
return (mask & ~DDS_PARTICIPANT_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK;
|
||||
|
@ -42,6 +50,9 @@ static dds_return_t dds_participant_delete (dds_entity *e)
|
|||
dds_return_t ret;
|
||||
assert (dds_entity_kind (e) == DDS_KIND_PARTICIPANT);
|
||||
|
||||
/* ktopics & topics are children and therefore must all have been deleted by the time we get here */
|
||||
assert (ddsrt_avl_is_empty (&((struct dds_participant *) e)->m_ktopics));
|
||||
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
if ((ret = delete_participant (&e->m_domain->gv, &e->m_guid)) < 0)
|
||||
DDS_CERROR (&e->m_domain->gv.logconfig, "dds_participant_delete: internal error %"PRId32"\n", ret);
|
||||
|
@ -125,6 +136,7 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
|
|||
pp->m_entity.m_iid = get_entity_instance_id (&dom->gv, &guid);
|
||||
pp->m_entity.m_domain = dom;
|
||||
pp->m_builtin_subscriber = 0;
|
||||
ddsrt_avl_init (&participant_ktopics_treedef, &pp->m_ktopics);
|
||||
|
||||
/* Add participant to extent */
|
||||
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "dds/dds.h"
|
||||
#include "dds/version.h"
|
||||
#include "dds/ddsrt/static_assert.h"
|
||||
#include "dds__participant.h"
|
||||
#include "dds__subscriber.h"
|
||||
#include "dds__reader.h"
|
||||
#include "dds__listener.h"
|
||||
|
@ -359,32 +360,34 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
|
|||
{
|
||||
dds_qos_t *rqos;
|
||||
dds_subscriber *sub = NULL;
|
||||
dds_participant *pp;
|
||||
dds_entity_t subscriber;
|
||||
dds_reader *rd;
|
||||
dds_topic *tp;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t t;
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
bool internal_topic;
|
||||
dds_return_t rc;
|
||||
dds_entity_t pseudo_topic = 0;
|
||||
bool created_implicit_sub = false;
|
||||
|
||||
switch (topic)
|
||||
{
|
||||
case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT:
|
||||
case DDS_BUILTIN_TOPIC_DCPSTOPIC:
|
||||
/* not implemented yet */
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT:
|
||||
case DDS_BUILTIN_TOPIC_DCPSPUBLICATION:
|
||||
case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION:
|
||||
internal_topic = true;
|
||||
subscriber = dds__get_builtin_subscriber (participant_or_subscriber);
|
||||
if ((ret = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
t = dds__get_builtin_topic (subscriber, topic);
|
||||
/* translate provided pseudo-topic to a real one */
|
||||
pseudo_topic = topic;
|
||||
if ((subscriber = dds__get_builtin_subscriber (participant_or_subscriber)) < 0)
|
||||
return subscriber;
|
||||
if ((rc = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
topic = dds__get_builtin_topic (subscriber, topic);
|
||||
break;
|
||||
|
||||
default: {
|
||||
dds_entity *p_or_s;
|
||||
if ((ret = dds_entity_lock (participant_or_subscriber, DDS_KIND_DONTCARE, &p_or_s)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
if ((rc = dds_entity_lock (participant_or_subscriber, DDS_KIND_DONTCARE, &p_or_s)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
switch (dds_entity_kind (p_or_s))
|
||||
{
|
||||
case DDS_KIND_SUBSCRIBER:
|
||||
|
@ -392,34 +395,39 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
|
|||
sub = (dds_subscriber *) p_or_s;
|
||||
break;
|
||||
case DDS_KIND_PARTICIPANT:
|
||||
created_implicit_sub = true;
|
||||
subscriber = dds__create_subscriber_l ((dds_participant *) p_or_s, true, qos, NULL);
|
||||
dds_entity_unlock (p_or_s);
|
||||
if ((ret = dds_subscriber_lock (subscriber, &sub)) < 0)
|
||||
return ret;
|
||||
if ((rc = dds_subscriber_lock (subscriber, &sub)) < 0)
|
||||
return rc;
|
||||
break;
|
||||
default:
|
||||
dds_entity_unlock (p_or_s);
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
internal_topic = false;
|
||||
t = topic;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((ret = dds_topic_lock (t, &tp)) != DDS_RETCODE_OK)
|
||||
{
|
||||
reader = ret;
|
||||
goto err_tp_lock;
|
||||
}
|
||||
if ((rc = dds_topic_pin (topic, &tp)) < 0)
|
||||
goto err_pin_topic;
|
||||
assert (tp->m_stopic);
|
||||
pp = dds_entity_participant (&sub->m_entity);
|
||||
if (pp != dds_entity_participant (&tp->m_entity))
|
||||
if (dds_entity_participant (&sub->m_entity) != dds_entity_participant (&tp->m_entity))
|
||||
{
|
||||
reader = DDS_RETCODE_BAD_PARAMETER;
|
||||
rc = DDS_RETCODE_BAD_PARAMETER;
|
||||
goto err_pp_mismatch;
|
||||
}
|
||||
|
||||
/* Prevent set_qos on the topic until reader has been created and registered: we can't
|
||||
allow a TOPIC_DATA change to ccur before the reader has been created because that
|
||||
change would then not be published in the discovery/built-in topics.
|
||||
|
||||
Don't keep the participant (which protects the topic's QoS) locked because that
|
||||
can cause deadlocks for applications creating a reader/writer from within a
|
||||
subscription matched listener (whether the restrictions on what one can do in
|
||||
listeners are reasonable or not, it used to work so it can be broken arbitrarily). */
|
||||
dds_topic_defer_set_qos (tp);
|
||||
|
||||
/* Merge qos from topic and subscriber, dds_copy_qos only fails when it is passed a null
|
||||
argument, but that isn't the case here */
|
||||
rqos = dds_create_qos ();
|
||||
|
@ -427,31 +435,30 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
|
|||
nn_xqos_mergein_missing (rqos, qos, DDS_READER_QOS_MASK);
|
||||
if (sub->m_entity.m_qos)
|
||||
nn_xqos_mergein_missing (rqos, sub->m_entity.m_qos, ~(uint64_t)0);
|
||||
if (tp->m_entity.m_qos)
|
||||
nn_xqos_mergein_missing (rqos, tp->m_entity.m_qos, ~(uint64_t)0);
|
||||
if (tp->m_ktopic->qos)
|
||||
nn_xqos_mergein_missing (rqos, tp->m_ktopic->qos, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (rqos, &sub->m_entity.m_domain->gv.default_xqos_rd, ~(uint64_t)0);
|
||||
|
||||
if ((ret = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) < 0 ||
|
||||
(ret = validate_reader_qos(rqos)) != DDS_RETCODE_OK)
|
||||
if ((rc = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) < 0 ||
|
||||
(rc = validate_reader_qos(rqos)) != DDS_RETCODE_OK)
|
||||
{
|
||||
dds_delete_qos (rqos);
|
||||
reader = ret;
|
||||
goto err_bad_qos;
|
||||
}
|
||||
|
||||
/* Additional checks required for built-in topics: we don't want to
|
||||
run into a resource limit on a built-in topic, it is a needless
|
||||
complication */
|
||||
if (internal_topic && !dds__validate_builtin_reader_qos (tp->m_entity.m_domain, topic, rqos))
|
||||
if (pseudo_topic && !dds__validate_builtin_reader_qos (tp->m_entity.m_domain, pseudo_topic, rqos))
|
||||
{
|
||||
dds_delete_qos (rqos);
|
||||
reader = DDS_RETCODE_INCONSISTENT_POLICY;
|
||||
rc = DDS_RETCODE_INCONSISTENT_POLICY;
|
||||
goto err_bad_qos;
|
||||
}
|
||||
|
||||
/* Create reader and associated read cache (if not provided by caller) */
|
||||
rd = dds_alloc (sizeof (*rd));
|
||||
reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, false, rqos, listener, DDS_READER_STATUS_MASK);
|
||||
struct dds_reader * const rd = dds_alloc (sizeof (*rd));
|
||||
const dds_entity_t reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, false, rqos, listener, DDS_READER_STATUS_MASK);
|
||||
rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED;
|
||||
rd->m_topic = tp;
|
||||
rd->m_rhc = rhc ? rhc : dds_rhc_default_new (rd, tp->m_stopic);
|
||||
|
@ -468,25 +475,27 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
|
|||
dds_entity_init_complete (&rd->m_entity);
|
||||
|
||||
thread_state_awake (lookup_thread_state (), &sub->m_entity.m_domain->gv);
|
||||
ret = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
|
||||
assert (ret == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */
|
||||
rc = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, dds_entity_participant_guid (&sub->m_entity), tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
|
||||
assert (rc == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
rd->m_entity.m_iid = get_entity_instance_id (&rd->m_entity.m_domain->gv, &rd->m_entity.m_guid);
|
||||
dds_entity_register_child (&sub->m_entity, &rd->m_entity);
|
||||
|
||||
dds_topic_unlock (tp);
|
||||
dds_topic_allow_set_qos (tp);
|
||||
dds_topic_unpin (tp);
|
||||
dds_subscriber_unlock (sub);
|
||||
return reader;
|
||||
|
||||
err_bad_qos:
|
||||
dds_topic_allow_set_qos (tp);
|
||||
err_pp_mismatch:
|
||||
dds_topic_unlock (tp);
|
||||
err_tp_lock:
|
||||
dds_topic_unpin (tp);
|
||||
err_pin_topic:
|
||||
dds_subscriber_unlock (sub);
|
||||
if ((sub->m_entity.m_flags & DDS_ENTITY_IMPLICIT) != 0)
|
||||
if (created_implicit_sub)
|
||||
(void) dds_delete (subscriber);
|
||||
return reader;
|
||||
return rc;
|
||||
}
|
||||
|
||||
void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb, void *cbarg)
|
||||
|
|
|
@ -132,7 +132,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi
|
|||
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
|
||||
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
|
||||
/* keyhash must in host format (which the GUIDs always are internally) */
|
||||
struct entity_common *entity = entidx_lookup_guid_untyped (tp->gv->entity_index, (const ddsi_guid_t *) keyhash->value);
|
||||
struct entity_common *entity = entidx_lookup_guid_untyped (tp->c.gv->entity_index, (const ddsi_guid_t *) keyhash->value);
|
||||
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
|
||||
memcpy (&d->key, keyhash->value, sizeof (d->key));
|
||||
if (entity)
|
||||
|
|
|
@ -25,12 +25,11 @@
|
|||
|
||||
/* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */
|
||||
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename, struct q_globals *gv)
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename)
|
||||
{
|
||||
struct ddsi_sertopic_builtintopic *tp = ddsrt_malloc (sizeof (*tp));
|
||||
ddsi_sertopic_init (&tp->c, name, typename, &ddsi_sertopic_ops_builtintopic, &ddsi_serdata_ops_builtintopic, false);
|
||||
tp->type = type;
|
||||
tp->gv = gv;
|
||||
return &tp->c;
|
||||
}
|
||||
|
||||
|
@ -40,6 +39,19 @@ static void sertopic_builtin_free (struct ddsi_sertopic *tp)
|
|||
ddsrt_free (tp);
|
||||
}
|
||||
|
||||
static bool sertopic_builtin_equal (const struct ddsi_sertopic *acmn, const struct ddsi_sertopic *bcmn)
|
||||
{
|
||||
const struct ddsi_sertopic_builtintopic *a = (struct ddsi_sertopic_builtintopic *) acmn;
|
||||
const struct ddsi_sertopic_builtintopic *b = (struct ddsi_sertopic_builtintopic *) bcmn;
|
||||
return a->type == b->type;
|
||||
}
|
||||
|
||||
static uint32_t sertopic_builtin_hash (const struct ddsi_sertopic *tpcmn)
|
||||
{
|
||||
const struct ddsi_sertopic_builtintopic *tp = (struct ddsi_sertopic_builtintopic *) tpcmn;
|
||||
return tp->type;
|
||||
}
|
||||
|
||||
static void free_pp (void *vsample)
|
||||
{
|
||||
dds_builtintopic_participant_t *sample = vsample;
|
||||
|
@ -131,6 +143,8 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_
|
|||
}
|
||||
|
||||
const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic = {
|
||||
.equal = sertopic_builtin_equal,
|
||||
.hash = sertopic_builtin_hash,
|
||||
.free = sertopic_builtin_free,
|
||||
.zero_samples = sertopic_builtin_zero_samples,
|
||||
.realloc_samples = sertopic_builtin_realloc_samples,
|
||||
|
|
|
@ -45,15 +45,6 @@ struct topic_sertopic_node {
|
|||
const struct ddsi_sertopic *st;
|
||||
};
|
||||
|
||||
static int topic_sertopic_node_cmp (const void *va, const void *vb)
|
||||
{
|
||||
const struct ddsi_sertopic *a = va;
|
||||
const struct ddsi_sertopic *b = vb;
|
||||
return strcmp (a->name, b->name);
|
||||
}
|
||||
|
||||
const ddsrt_avl_treedef_t dds_topictree_def = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct topic_sertopic_node, avlnode), offsetof (struct topic_sertopic_node, st), topic_sertopic_node_cmp, 0);
|
||||
|
||||
static bool is_valid_name (const char *name) ddsrt_nonnull_all;
|
||||
|
||||
static bool is_valid_name (const char *name)
|
||||
|
@ -112,164 +103,88 @@ static void dds_topic_status_cb (struct dds_topic *tp)
|
|||
}
|
||||
#endif
|
||||
|
||||
struct ddsi_sertopic *dds_topic_lookup (dds_domain *domain, const char *name)
|
||||
dds_return_t dds_topic_pin (dds_entity_t handle, struct dds_topic **tp)
|
||||
{
|
||||
const struct ddsi_sertopic key = { .name = (char *) name };
|
||||
struct ddsi_sertopic *st;
|
||||
struct topic_sertopic_node *nst;
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
if ((nst = ddsrt_avl_lookup (&dds_topictree_def, &domain->m_topics, &key)) == NULL)
|
||||
st = NULL;
|
||||
else
|
||||
st = ddsi_sertopic_ref (nst->st);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
return st;
|
||||
}
|
||||
|
||||
static bool dds_find_topic_check_and_add_ref (dds_entity_t participant, dds_entity_t topic, const char *name)
|
||||
{
|
||||
dds_topic *tp;
|
||||
if (dds_topic_lock (topic, &tp) != DDS_RETCODE_OK)
|
||||
return false;
|
||||
|
||||
bool ret;
|
||||
if (dds_entity_participant (&tp->m_entity)->m_entity.m_hdllink.hdl != participant || strcmp (tp->m_stopic->name, name) != 0)
|
||||
ret = false;
|
||||
else
|
||||
{
|
||||
/* Simply return the same topic, though that is different to the spirit
|
||||
of the DDS specification, which gives you a unique copy. Giving that
|
||||
unique copy means there potentially many versions of exactly the same
|
||||
topic around, and that two entities can be dealing with the same data
|
||||
even though they have different topics objects (though with the same
|
||||
name). That I find a confusing model.
|
||||
|
||||
As far as I can tell, the only benefit is the ability to set different
|
||||
listeners on the various copies of the topic. And that seems to be a
|
||||
really small benefit. */
|
||||
ret = true;
|
||||
}
|
||||
dds_topic_unlock (tp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
dds_entity_t dds_find_topic (dds_entity_t participant, const char *name)
|
||||
{
|
||||
dds_entity *pe;
|
||||
struct dds_entity *e;
|
||||
dds_return_t ret;
|
||||
dds_entity_t topic;
|
||||
|
||||
if (name == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
/* claim participant handle to guarantee the handle remains valid after
|
||||
unlocking the participant prior to verifying the found topic still
|
||||
exists */
|
||||
if ((ret = dds_entity_pin (participant, &pe)) < 0)
|
||||
if ((ret = dds_entity_pin (handle, &e)) < 0)
|
||||
return ret;
|
||||
if (dds_entity_kind (pe) != DDS_KIND_PARTICIPANT)
|
||||
if (dds_entity_kind (e) != DDS_KIND_TOPIC)
|
||||
{
|
||||
dds_entity_unpin (pe);
|
||||
dds_entity_unpin (e);
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
*tp = (struct dds_topic *) e;
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
do {
|
||||
dds_participant *p;
|
||||
topic = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
if ((ret = dds_participant_lock (participant, &p)) == DDS_RETCODE_OK)
|
||||
{
|
||||
ddsrt_avl_iter_t it;
|
||||
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &p->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
if (dds_entity_kind (e) == DDS_KIND_TOPIC && strcmp (((dds_topic *) e)->m_stopic->name, name) == 0)
|
||||
{
|
||||
topic = e->m_hdllink.hdl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
dds_participant_unlock (p);
|
||||
}
|
||||
} while (topic > 0 && !dds_find_topic_check_and_add_ref (participant, topic, name));
|
||||
void dds_topic_unpin (struct dds_topic *tp)
|
||||
{
|
||||
dds_entity_unpin (&tp->m_entity);
|
||||
}
|
||||
|
||||
dds_entity_unpin (pe);
|
||||
return topic;
|
||||
void dds_topic_defer_set_qos (struct dds_topic *tp)
|
||||
{
|
||||
struct dds_ktopic * const ktp = tp->m_ktopic;
|
||||
struct dds_participant * const pp = dds_entity_participant (&tp->m_entity);
|
||||
ddsrt_mutex_lock (&pp->m_entity.m_mutex);
|
||||
++ktp->defer_set_qos;
|
||||
ddsrt_mutex_unlock (&pp->m_entity.m_mutex);
|
||||
}
|
||||
|
||||
void dds_topic_allow_set_qos (struct dds_topic *tp)
|
||||
{
|
||||
struct dds_ktopic * const ktp = tp->m_ktopic;
|
||||
struct dds_participant * const pp = dds_entity_participant (&tp->m_entity);
|
||||
ddsrt_mutex_lock (&pp->m_entity.m_mutex);
|
||||
assert (ktp->defer_set_qos > 0);
|
||||
if (--ktp->defer_set_qos == 0)
|
||||
ddsrt_cond_broadcast (&pp->m_entity.m_cond);
|
||||
ddsrt_mutex_unlock (&pp->m_entity.m_mutex);
|
||||
}
|
||||
|
||||
static dds_return_t dds_topic_delete (dds_entity *e) ddsrt_nonnull_all;
|
||||
|
||||
static dds_return_t dds_topic_delete (dds_entity *e)
|
||||
{
|
||||
dds_topic *tp = (dds_topic *) e;
|
||||
dds_domain *domain = tp->m_entity.m_domain;
|
||||
ddsrt_avl_dpath_t dp;
|
||||
struct topic_sertopic_node *stn;
|
||||
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
|
||||
stn = ddsrt_avl_lookup_dpath (&dds_topictree_def, &domain->m_topics, tp->m_stopic, &dp);
|
||||
assert (stn != NULL);
|
||||
if (--stn->refc == 0)
|
||||
{
|
||||
ddsrt_avl_delete_dpath (&dds_topictree_def, &domain->m_topics, stn, &dp);
|
||||
ddsrt_free (stn);
|
||||
}
|
||||
|
||||
struct dds_topic * const tp = (dds_topic *) e;
|
||||
struct dds_ktopic * const ktp = tp->m_ktopic;
|
||||
assert (dds_entity_kind (e->m_parent) == DDS_KIND_PARTICIPANT);
|
||||
dds_participant * const pp = (dds_participant *) e->m_parent;
|
||||
ddsi_sertopic_unref (tp->m_stopic);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
|
||||
ddsrt_mutex_lock (&pp->m_entity.m_mutex);
|
||||
if (--ktp->refc == 0)
|
||||
{
|
||||
ddsrt_avl_delete (&participant_ktopics_treedef, &pp->m_ktopics, ktp);
|
||||
dds_delete_qos (ktp->qos);
|
||||
ddsrt_free (ktp->name);
|
||||
ddsrt_free (ktp->type_name);
|
||||
dds_free (ktp);
|
||||
}
|
||||
ddsrt_mutex_unlock (&pp->m_entity.m_mutex);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
static dds_return_t dds_topic_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled)
|
||||
{
|
||||
/* note: e->m_qos is still the old one to allow for failure here */
|
||||
/* We never actually set the qos of a struct dds_topic and really shouldn't be here,
|
||||
but the code to check whether set_qos is supported uses the entity's qos_set
|
||||
function as a proxy. One of the weird things about the topic's set_qos is that
|
||||
this is called with e == NULL. */
|
||||
(void) e; (void) qos; (void) enabled;
|
||||
assert (e == NULL);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
static bool dupdef_qos_ok (const dds_qos_t *qos, const dds_topic *tp)
|
||||
static bool dupdef_qos_ok (const dds_qos_t *qos, const dds_ktopic *ktp)
|
||||
{
|
||||
if ((qos == NULL) != (tp->m_entity.m_qos == NULL))
|
||||
if ((qos == NULL) != (ktp->qos == NULL))
|
||||
return false;
|
||||
else if (qos == NULL)
|
||||
return true;
|
||||
else
|
||||
return dds_qos_equal (tp->m_entity.m_qos, qos);
|
||||
}
|
||||
|
||||
static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b)
|
||||
{
|
||||
if (strcmp (a->name_type_name, b->name_type_name) != 0)
|
||||
return false;
|
||||
if (a->serdata_basehash != b->serdata_basehash)
|
||||
return false;
|
||||
if (a->ops != b->ops)
|
||||
return false;
|
||||
if (a->serdata_ops != b->serdata_ops)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
static dds_return_t create_topic_topic_arbitrary_check_sertopic (dds_entity_t participant, dds_entity_t topic, struct ddsi_sertopic *sertopic, const dds_qos_t *qos)
|
||||
{
|
||||
dds_topic *tp;
|
||||
dds_return_t ret;
|
||||
|
||||
if (dds_topic_lock (topic, &tp) < 0)
|
||||
return DDS_RETCODE_NOT_FOUND;
|
||||
|
||||
if (dds_entity_participant (&tp->m_entity)->m_entity.m_hdllink.hdl != participant)
|
||||
ret = DDS_RETCODE_NOT_FOUND;
|
||||
else if (!sertopic_equivalent (tp->m_stopic, sertopic))
|
||||
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
else if (!dupdef_qos_ok (qos, tp))
|
||||
ret = DDS_RETCODE_INCONSISTENT_POLICY;
|
||||
else
|
||||
{
|
||||
/* See dds_find_topic_check_and_add_ref */
|
||||
ret = DDS_RETCODE_OK;
|
||||
}
|
||||
dds_topic_unlock (tp);
|
||||
return ret;
|
||||
return dds_qos_equal (ktp->qos, qos);
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_topic = {
|
||||
|
@ -280,32 +195,111 @@ const struct dds_entity_deriver dds_entity_deriver_topic = {
|
|||
.validate_status = dds_topic_status_validate
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Checks whether a ktopic with the same name exists in the participant,
|
||||
* and if so, whether it's QoS matches or not.
|
||||
*
|
||||
* The set of ktopics is stored in the participant, protected by the participant's
|
||||
* mutex and the internal state of these ktopics (including the QoS) is also
|
||||
* protected by that mutex.
|
||||
*
|
||||
* @param[out] ktp_out matching ktopic if call was successful, or NULL if no
|
||||
* ktopic with this name exists
|
||||
* @param[in] pp pinned & locked participant
|
||||
* @param[in] name topic name to look for
|
||||
* @param[in] type_name type name the topic must have
|
||||
* @param[in] new_qos QoS for the new topic (can be NULL)
|
||||
*
|
||||
* @returns success + ktopic, success + NULL or error.
|
||||
*
|
||||
* @retval DDS_RETCODE_OK
|
||||
* ktp_out is either NULL (first attempt at creating this topic), or
|
||||
* the matching ktopic entity
|
||||
* @retval DDS_RETCODE_INCONSISTENT_POLICY
|
||||
* a ktopic exists with differing QoS
|
||||
* @retval DDS_RETCODE_PRECONDITION_NOT_MET
|
||||
* a ktopic exists with a different type name
|
||||
*/
|
||||
static dds_return_t lookup_and_check_ktopic (struct dds_ktopic **ktp_out, dds_participant *pp, const char *name, const char *type_name, const dds_qos_t *new_qos)
|
||||
{
|
||||
struct q_globals * const gv = &pp->m_entity.m_domain->gv;
|
||||
struct dds_ktopic *ktp;
|
||||
if ((ktp = *ktp_out = ddsrt_avl_lookup (&participant_ktopics_treedef, &pp->m_ktopics, name)) == NULL)
|
||||
{
|
||||
GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: no such ktopic\n");
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
else if (strcmp (ktp->type_name, type_name) != 0)
|
||||
{
|
||||
GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: ktp %p typename %s mismatch\n", (void *) ktp, ktp->type_name);
|
||||
return DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
}
|
||||
else if (!dupdef_qos_ok (new_qos, ktp))
|
||||
{
|
||||
GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: ktp %p qos mismatch\n", (void *) ktp);
|
||||
return DDS_RETCODE_INCONSISTENT_POLICY;
|
||||
}
|
||||
else
|
||||
{
|
||||
GVTRACE ("lookup_and_check_ktopic_may_unlock_pp: ktp %p reuse\n", (void *) ktp);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
}
|
||||
|
||||
static dds_entity_t create_topic_pp_locked (struct dds_participant *pp, struct dds_ktopic *ktp, bool implicit, struct ddsi_sertopic *sertopic_registered, const dds_listener_t *listener, const nn_plist_t *sedp_plist)
|
||||
{
|
||||
dds_entity_t hdl;
|
||||
dds_topic *tp = dds_alloc (sizeof (*tp));
|
||||
hdl = dds_entity_init (&tp->m_entity, &pp->m_entity, DDS_KIND_TOPIC, implicit, NULL, listener, DDS_TOPIC_STATUS_MASK);
|
||||
tp->m_entity.m_iid = ddsi_iid_gen ();
|
||||
dds_entity_register_child (&pp->m_entity, &tp->m_entity);
|
||||
tp->m_ktopic = ktp;
|
||||
tp->m_stopic = sertopic_registered;
|
||||
|
||||
/* Publish Topic */
|
||||
if (sedp_plist)
|
||||
{
|
||||
struct participant *ddsi_pp;
|
||||
nn_plist_t plist;
|
||||
|
||||
thread_state_awake (lookup_thread_state (), &pp->m_entity.m_domain->gv);
|
||||
ddsi_pp = entidx_lookup_participant_guid (pp->m_entity.m_domain->gv.entity_index, &pp->m_entity.m_guid);
|
||||
assert (ddsi_pp);
|
||||
|
||||
nn_plist_init_empty (&plist);
|
||||
nn_plist_mergein_missing (&plist, sedp_plist, ~(uint64_t)0, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (&plist.qos, ktp->qos, ~(uint64_t)0);
|
||||
sedp_write_topic (ddsi_pp, &plist);
|
||||
nn_plist_fini (&plist);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
}
|
||||
|
||||
dds_entity_init_complete (&tp->m_entity);
|
||||
return hdl;
|
||||
}
|
||||
|
||||
dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_sertopic *sertopic, const dds_qos_t *qos, const dds_listener_t *listener, const nn_plist_t *sedp_plist)
|
||||
{
|
||||
dds_return_t rc;
|
||||
dds_participant *par;
|
||||
dds_entity *par_ent;
|
||||
dds_topic *top;
|
||||
dds_participant *pp;
|
||||
dds_qos_t *new_qos = NULL;
|
||||
dds_entity_t hdl;
|
||||
struct participant *ddsi_pp;
|
||||
struct ddsi_sertopic *sertopic_registered;
|
||||
|
||||
if (sertopic == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
/* Claim participant handle so we can be sure the handle will not be
|
||||
reused if we temporarily unlock the participant to check the an
|
||||
existing topic's compatibility */
|
||||
{
|
||||
dds_entity *par_ent;
|
||||
if ((rc = dds_entity_pin (participant, &par_ent)) < 0)
|
||||
return rc;
|
||||
/* Verify that we've been given a participant, not strictly necessary
|
||||
because dds_participant_lock below checks it, but this is more
|
||||
obvious */
|
||||
if (dds_entity_kind (par_ent) != DDS_KIND_PARTICIPANT)
|
||||
{
|
||||
dds_entity_unpin (par_ent);
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
pp = (struct dds_participant *) par_ent;
|
||||
}
|
||||
|
||||
new_qos = dds_create_qos ();
|
||||
if (qos)
|
||||
|
@ -314,155 +308,76 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
|
|||
*
|
||||
* nn_xqos_mergein_missing (new_qos, &gv.default_xqos_tp, ~(uint64_t)0);
|
||||
*
|
||||
* but the crazy defaults of the DDS specification has a default settings
|
||||
* for reliability that are dependent on the entity type: readers and
|
||||
* but the crazy defaults of the DDS specification has a default setting
|
||||
* for reliability that is dependent on the entity type: readers and
|
||||
* topics default to best-effort, but writers to reliable.
|
||||
*
|
||||
* Leaving the topic QoS sparse means a default-default topic QoS of
|
||||
* best-effort will do "the right thing" and let a writer still default to
|
||||
* reliable ... (and keep behaviour unchanged) */
|
||||
if ((rc = nn_xqos_valid (&par_ent->m_domain->gv.logconfig, new_qos)) != DDS_RETCODE_OK)
|
||||
goto err_invalid_qos;
|
||||
|
||||
/* FIXME: just mutex_lock ought to be good enough, but there is the
|
||||
pesky "closed" check still ... */
|
||||
if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
|
||||
goto err_lock_participant;
|
||||
|
||||
bool retry_lookup;
|
||||
do {
|
||||
dds_entity_t topic;
|
||||
|
||||
/* claim participant handle to guarantee the handle remains valid after
|
||||
unlocking the participant prior to verifying the found topic still
|
||||
exists */
|
||||
topic = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
ddsrt_avl_iter_t it;
|
||||
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &par->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
struct q_globals * const gv = &pp->m_entity.m_domain->gv;
|
||||
if ((rc = nn_xqos_valid (&gv->logconfig, new_qos)) != DDS_RETCODE_OK)
|
||||
{
|
||||
if (dds_entity_kind (e) == DDS_KIND_TOPIC && strcmp (((dds_topic *) e)->m_stopic->name, sertopic->name) == 0)
|
||||
{
|
||||
topic = e->m_hdllink.hdl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (topic < 0)
|
||||
{
|
||||
/* no topic with the name exists; we have locked the participant, and
|
||||
so we can proceed with creating the topic */
|
||||
retry_lookup = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* some topic with the same name exists; need to lock the topic to
|
||||
perform the checks, but locking the topic while holding the
|
||||
participant lock violates the lock order (child -> parent). So
|
||||
unlock that participant and check the topic while accounting
|
||||
for the various scary cases. */
|
||||
dds_participant_unlock (par);
|
||||
|
||||
rc = create_topic_topic_arbitrary_check_sertopic (participant, topic, sertopic, new_qos);
|
||||
switch (rc)
|
||||
{
|
||||
case DDS_RETCODE_OK: /* duplicate definition */
|
||||
dds_entity_unpin (par_ent);
|
||||
dds_delete_qos (new_qos);
|
||||
return topic;
|
||||
dds_entity_unpin (&pp->m_entity);
|
||||
return rc;
|
||||
}
|
||||
|
||||
case DDS_RETCODE_NOT_FOUND:
|
||||
/* either participant is now being deleted, topic was deleted, or
|
||||
topic was deleted & the handle reused for something else -- so */
|
||||
retry_lookup = true;
|
||||
break;
|
||||
|
||||
case DDS_RETCODE_PRECONDITION_NOT_MET: /* incompatible sertopic */
|
||||
case DDS_RETCODE_INCONSISTENT_POLICY: /* different QoS */
|
||||
/* inconsistent definition */
|
||||
dds_entity_unpin (par_ent);
|
||||
/* See if we're allowed to create the topic; ktp is returned pinned & locked
|
||||
so we can be sure it doesn't disappear and its QoS can't change */
|
||||
GVTRACE ("dds_create_topic_arbitrary (pp %p "PGUIDFMT" sertopic %p reg?%s refc %"PRIu32" %s/%s)\n",
|
||||
(void *) pp, PGUID (pp->m_entity.m_guid), (void *) sertopic, sertopic->gv ? "yes" : "no",
|
||||
ddsrt_atomic_ld32 (&sertopic->refc), sertopic->name, sertopic->type_name);
|
||||
ddsrt_mutex_lock (&pp->m_entity.m_mutex);
|
||||
struct dds_ktopic *ktp;
|
||||
if ((rc = lookup_and_check_ktopic (&ktp, pp, sertopic->name, sertopic->type_name, new_qos)) != DDS_RETCODE_OK)
|
||||
{
|
||||
GVTRACE ("dds_create_topic_arbitrary: failed after compatibility check: %s\n", dds_strretcode (rc));
|
||||
dds_participant_unlock (pp);
|
||||
dds_delete_qos (new_qos);
|
||||
return rc;
|
||||
|
||||
default:
|
||||
abort ();
|
||||
}
|
||||
|
||||
if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
|
||||
goto err_lock_participant;
|
||||
}
|
||||
} while (retry_lookup);
|
||||
|
||||
/* FIXME: make this a function
|
||||
Add sertopic to domain -- but note that it may have been created by another thread
|
||||
on another participant that is attached to the same domain */
|
||||
/* Create a ktopic if it doesn't exist yet, else reference existing one and delete the
|
||||
unneeded "new_qos". */
|
||||
if (ktp == NULL)
|
||||
{
|
||||
struct dds_domain *domain = par->m_entity.m_domain;
|
||||
|
||||
ddsrt_avl_ipath_t ip;
|
||||
struct topic_sertopic_node *stn;
|
||||
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
|
||||
stn = ddsrt_avl_lookup_ipath (&dds_topictree_def, &domain->m_topics, sertopic, &ip);
|
||||
if (stn == NULL)
|
||||
{
|
||||
/* no existing definition: use new */
|
||||
stn = ddsrt_malloc (sizeof (*stn));
|
||||
stn->refc = 1;
|
||||
stn->st = ddsi_sertopic_ref (sertopic);
|
||||
ddsrt_avl_insert (&dds_topictree_def, &domain->m_topics, stn);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
}
|
||||
else if (sertopic_equivalent (stn->st, sertopic))
|
||||
{
|
||||
/* ok -- same definition, so use existing one instead */
|
||||
sertopic = ddsi_sertopic_ref (stn->st);
|
||||
stn->refc++;
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
ktp = dds_alloc (sizeof (*ktp));
|
||||
ktp->refc = 1;
|
||||
ktp->defer_set_qos = 0;
|
||||
ktp->qos = new_qos;
|
||||
/* have to copy these because the ktopic can outlast any specific sertopic */
|
||||
ktp->name = ddsrt_strdup (sertopic->name);
|
||||
ktp->type_name = ddsrt_strdup (sertopic->type_name);
|
||||
ddsrt_avl_insert (&participant_ktopics_treedef, &pp->m_ktopics, ktp);
|
||||
GVTRACE ("create_and_lock_ktopic: ktp %p\n", (void *) ktp);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* bummer, delete */
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
rc = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
goto err_sertopic_reuse;
|
||||
}
|
||||
ktp->refc++;
|
||||
dds_delete_qos (new_qos);
|
||||
}
|
||||
|
||||
/* Create topic */
|
||||
top = dds_alloc (sizeof (*top));
|
||||
/* Sertopic: re-use a previously registered one if possible, else register this one */
|
||||
{
|
||||
ddsrt_mutex_lock (&gv->sertopics_lock);
|
||||
if ((sertopic_registered = ddsi_sertopic_lookup_locked (gv, sertopic)) != NULL)
|
||||
GVTRACE ("dds_create_topic_arbitrary: reuse sertopic %p\n", (void *) sertopic_registered);
|
||||
else
|
||||
{
|
||||
GVTRACE ("dds_create_topic_arbitrary: register new sertopic %p\n", (void *) sertopic);
|
||||
ddsi_sertopic_register_locked (gv, sertopic);
|
||||
sertopic_registered = sertopic;
|
||||
}
|
||||
ddsrt_mutex_unlock (&gv->sertopics_lock);
|
||||
}
|
||||
|
||||
/* Create topic referencing ktopic & sertopic_registered */
|
||||
/* FIXME: setting "implicit" based on sertopic->ops is a hack */
|
||||
hdl = dds_entity_init (&top->m_entity, &par->m_entity, DDS_KIND_TOPIC, (sertopic->ops == &ddsi_sertopic_ops_builtintopic), new_qos, listener, DDS_TOPIC_STATUS_MASK);
|
||||
top->m_entity.m_iid = ddsi_iid_gen ();
|
||||
dds_entity_register_child (&par->m_entity, &top->m_entity);
|
||||
top->m_stopic = sertopic;
|
||||
|
||||
/* Publish Topic */
|
||||
thread_state_awake (lookup_thread_state (), &par->m_entity.m_domain->gv);
|
||||
ddsi_pp = entidx_lookup_participant_guid (par->m_entity.m_domain->gv.entity_index, &par->m_entity.m_guid);
|
||||
assert (ddsi_pp);
|
||||
if (sedp_plist)
|
||||
{
|
||||
nn_plist_t plist;
|
||||
nn_plist_init_empty (&plist);
|
||||
nn_plist_mergein_missing (&plist, sedp_plist, ~(uint64_t)0, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (&plist.qos, new_qos, ~(uint64_t)0);
|
||||
sedp_write_topic (ddsi_pp, &plist);
|
||||
nn_plist_fini (&plist);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
dds_entity_init_complete (&top->m_entity);
|
||||
dds_participant_unlock (par);
|
||||
dds_entity_unpin (par_ent);
|
||||
hdl = create_topic_pp_locked (pp, ktp, (sertopic_registered->ops == &ddsi_sertopic_ops_builtintopic), sertopic_registered, listener, sedp_plist);
|
||||
dds_participant_unlock (pp);
|
||||
GVTRACE ("dds_create_topic_arbitrary: new topic %"PRId32"\n", hdl);
|
||||
return hdl;
|
||||
|
||||
err_sertopic_reuse:
|
||||
dds_participant_unlock (par);
|
||||
err_lock_participant:
|
||||
err_invalid_qos:
|
||||
dds_delete_qos (new_qos);
|
||||
dds_entity_unpin (par_ent);
|
||||
return rc;
|
||||
}
|
||||
|
||||
dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descriptor_t *desc, const char *name, const dds_qos_t *qos, const dds_listener_t *listener)
|
||||
|
@ -482,7 +397,6 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
|
|||
st = dds_alloc (sizeof (*st));
|
||||
|
||||
ddsi_sertopic_init (&st->c, name, desc->m_typename, &ddsi_sertopic_ops_default, desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey, (desc->m_nkeys == 0));
|
||||
st->gv = &ppent->m_domain->gv;
|
||||
st->native_encoding_identifier = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE);
|
||||
st->serpool = ppent->m_domain->gv.serpool;
|
||||
st->type = (void*) desc;
|
||||
|
@ -522,6 +436,47 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
|
|||
return hdl;
|
||||
}
|
||||
|
||||
dds_entity_t dds_find_topic (dds_entity_t participant, const char *name)
|
||||
{
|
||||
dds_participant *pp;
|
||||
dds_return_t rc;
|
||||
|
||||
if (name == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
if ((rc = dds_participant_lock (participant, &pp)) < 0)
|
||||
return rc;
|
||||
|
||||
ddsrt_avl_iter_t it;
|
||||
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &pp->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
if (dds_entity_kind (e) != DDS_KIND_TOPIC)
|
||||
continue;
|
||||
|
||||
struct dds_entity *x;
|
||||
if (dds_entity_pin (e->m_hdllink.hdl, &x) != DDS_RETCODE_OK)
|
||||
continue;
|
||||
|
||||
struct dds_topic * const tp = (struct dds_topic *) e;
|
||||
if (x != e || strcmp (tp->m_ktopic->name, name) != 0)
|
||||
{
|
||||
dds_entity_unpin (x);
|
||||
continue;
|
||||
}
|
||||
|
||||
struct ddsi_sertopic * const sertopic = ddsi_sertopic_ref (tp->m_stopic);
|
||||
struct dds_ktopic * const ktp = tp->m_ktopic;
|
||||
ktp->refc++;
|
||||
dds_entity_unpin (x);
|
||||
|
||||
dds_entity_t hdl = create_topic_pp_locked (pp, ktp, false, sertopic, NULL, NULL);
|
||||
dds_participant_unlock (pp);
|
||||
return hdl;
|
||||
}
|
||||
dds_participant_unlock (pp);
|
||||
return DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
}
|
||||
|
||||
static bool dds_topic_chaining_filter (const void *sample, void *ctx)
|
||||
{
|
||||
dds_topic_filter_fn realf = (dds_topic_filter_fn) ctx;
|
||||
|
@ -594,10 +549,10 @@ dds_return_t dds_get_name (dds_entity_t topic, char *name, size_t size)
|
|||
if (size <= 0 || name == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
name[0] = '\0';
|
||||
if ((ret = dds_topic_lock (topic, &t)) != DDS_RETCODE_OK)
|
||||
if ((ret = dds_topic_pin (topic, &t)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
(void) snprintf (name, size, "%s", t->m_stopic->name);
|
||||
dds_topic_unlock (t);
|
||||
dds_topic_unpin (t);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
|
@ -608,10 +563,10 @@ dds_return_t dds_get_type_name (dds_entity_t topic, char *name, size_t size)
|
|||
if (size <= 0 || name == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
name[0] = '\0';
|
||||
if ((ret = dds_topic_lock (topic, &t)) != DDS_RETCODE_OK)
|
||||
if ((ret = dds_topic_pin (topic, &t)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
(void) snprintf (name, size, "%s", t->m_stopic->type_name);
|
||||
dds_topic_unlock (t);
|
||||
dds_topic_unpin (t);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
|
|
|
@ -263,13 +263,11 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
{
|
||||
dds_return_t rc;
|
||||
dds_qos_t *wqos;
|
||||
dds_writer *wr;
|
||||
dds_entity_t writer;
|
||||
dds_publisher *pub = NULL;
|
||||
dds_participant *pp;
|
||||
dds_topic *tp;
|
||||
dds_entity_t publisher;
|
||||
struct whc_writer_info *wrinfo;
|
||||
bool created_implicit_pub = false;
|
||||
|
||||
{
|
||||
dds_entity *p_or_p;
|
||||
|
@ -286,6 +284,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
dds_entity_unlock (p_or_p);
|
||||
if ((rc = dds_publisher_lock (publisher, &pub)) < 0)
|
||||
return rc;
|
||||
created_implicit_pub = true;
|
||||
break;
|
||||
default:
|
||||
dds_entity_unlock (p_or_p);
|
||||
|
@ -295,25 +294,33 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
|
||||
ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc;
|
||||
|
||||
if ((rc = dds_topic_lock (topic, &tp)) != DDS_RETCODE_OK)
|
||||
goto err_tp_lock;
|
||||
if ((rc = dds_topic_pin (topic, &tp)) != DDS_RETCODE_OK)
|
||||
goto err_pin_topic;
|
||||
assert (tp->m_stopic);
|
||||
|
||||
pp = dds_entity_participant (&pub->m_entity);
|
||||
if (pp != dds_entity_participant (&tp->m_entity))
|
||||
if (dds_entity_participant (&pub->m_entity) != dds_entity_participant (&tp->m_entity))
|
||||
{
|
||||
rc = DDS_RETCODE_BAD_PARAMETER;
|
||||
goto err_pp_mismatch;
|
||||
}
|
||||
|
||||
/* Prevent set_qos on the topic until writer has been created and registered: we can't
|
||||
allow a TOPIC_DATA change to ccur before the writer has been created because that
|
||||
change would then not be published in the discovery/built-in topics.
|
||||
|
||||
Don't keep the participant (which protects the topic's QoS) locked because that
|
||||
can cause deadlocks for applications creating a reader/writer from within a
|
||||
publication matched listener (whether the restrictions on what one can do in
|
||||
listeners are reasonable or not, it used to work so it can be broken arbitrarily). */
|
||||
dds_topic_defer_set_qos (tp);
|
||||
|
||||
/* Merge Topic & Publisher qos */
|
||||
wqos = dds_create_qos ();
|
||||
if (qos)
|
||||
nn_xqos_mergein_missing (wqos, qos, DDS_WRITER_QOS_MASK);
|
||||
if (pub->m_entity.m_qos)
|
||||
nn_xqos_mergein_missing (wqos, pub->m_entity.m_qos, ~(uint64_t)0);
|
||||
if (tp->m_entity.m_qos)
|
||||
nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0);
|
||||
if (tp->m_ktopic->qos)
|
||||
nn_xqos_mergein_missing (wqos, tp->m_ktopic->qos, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (wqos, &pub->m_entity.m_domain->gv.default_xqos_wr, ~(uint64_t)0);
|
||||
|
||||
if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0 ||
|
||||
|
@ -324,9 +331,8 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
}
|
||||
|
||||
/* Create writer */
|
||||
wr = dds_alloc (sizeof (*wr));
|
||||
writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK);
|
||||
|
||||
struct dds_writer * const wr = dds_alloc (sizeof (*wr));
|
||||
const dds_entity_t writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK);
|
||||
wr->m_topic = tp;
|
||||
dds_entity_add_ref_locked (&tp->m_entity);
|
||||
wr->m_xp = nn_xpack_new (conn, get_bandwidth_limit (wqos->transport_priority), pub->m_entity.m_domain->gv.config.xpack_send_async);
|
||||
|
@ -336,7 +342,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
wr->whc_batch = pub->m_entity.m_domain->gv.config.whc_batch;
|
||||
|
||||
thread_state_awake (lookup_thread_state (), &pub->m_entity.m_domain->gv);
|
||||
rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
|
||||
rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, dds_entity_participant_guid (&pub->m_entity), tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
|
||||
assert(rc == DDS_RETCODE_OK);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
|
@ -344,16 +350,19 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
dds_entity_register_child (&pub->m_entity, &wr->m_entity);
|
||||
|
||||
dds_entity_init_complete (&wr->m_entity);
|
||||
dds_topic_unlock (tp);
|
||||
|
||||
dds_topic_allow_set_qos (tp);
|
||||
dds_topic_unpin (tp);
|
||||
dds_publisher_unlock (pub);
|
||||
return writer;
|
||||
|
||||
err_bad_qos:
|
||||
dds_topic_allow_set_qos (tp);
|
||||
err_pp_mismatch:
|
||||
dds_topic_unlock (tp);
|
||||
err_tp_lock:
|
||||
dds_topic_unpin (tp);
|
||||
err_pin_topic:
|
||||
dds_publisher_unlock (pub);
|
||||
if ((pub->m_entity.m_flags & DDS_ENTITY_IMPLICIT) != 0)
|
||||
if (created_implicit_pub)
|
||||
(void) dds_delete (publisher);
|
||||
return rc;
|
||||
}
|
||||
|
|
|
@ -130,8 +130,12 @@ CU_Test(ddsc_topic_create, duplicate, .init=ddsc_topic_init, .fini=ddsc_topic_fi
|
|||
/* Creating the same topic should succeed. */
|
||||
topic = dds_create_topic(g_participant, &RoundTripModule_DataType_desc, g_topicRtmDataTypeName, NULL, NULL);
|
||||
CU_ASSERT_FATAL(topic > 0);
|
||||
CU_ASSERT_FATAL(topic != g_topicRtmDataType);
|
||||
ret = dds_delete(topic);
|
||||
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
|
||||
/* Old topic entity should remain in existence */
|
||||
ret = dds_get_parent(g_topicRtmDataType);
|
||||
CU_ASSERT(ret > 0);
|
||||
}
|
||||
/*************************************************************************************************/
|
||||
|
||||
|
@ -201,7 +205,7 @@ CU_Test(ddsc_topic_find, valid, .init=ddsc_topic_init, .fini=ddsc_topic_fini)
|
|||
dds_return_t ret;
|
||||
|
||||
topic = dds_find_topic(g_participant, g_topicRtmDataTypeName);
|
||||
CU_ASSERT_EQUAL_FATAL(topic, g_topicRtmDataType);
|
||||
CU_ASSERT_NOT_EQUAL_FATAL(topic, g_topicRtmDataType);
|
||||
|
||||
ret = dds_delete(topic);
|
||||
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
|
||||
|
|
|
@ -132,6 +132,8 @@ typedef bool (*ddsi_serdata_eqkey_t) (const struct ddsi_serdata *a, const struct
|
|||
returning bufsize-1) if it had to truncate) */
|
||||
typedef size_t (*ddsi_serdata_print_t) (const struct ddsi_sertopic *topic, const struct ddsi_serdata *d, char *buf, size_t size);
|
||||
|
||||
#define DDSI_SERDATA_HAS_FROM_SER_IOV 1
|
||||
|
||||
struct ddsi_serdata_ops {
|
||||
ddsi_serdata_eqkey_t eqkey;
|
||||
ddsi_serdata_size_t get_size;
|
||||
|
|
|
@ -107,7 +107,6 @@ struct ddsi_sertopic_default {
|
|||
struct ddsi_sertopic c;
|
||||
uint16_t native_encoding_identifier; /* (PL_)?CDR_(LE|BE) */
|
||||
struct serdatapool *serpool;
|
||||
struct q_globals *gv;
|
||||
|
||||
struct dds_topic_descriptor * type;
|
||||
unsigned nkeys;
|
||||
|
|
|
@ -23,17 +23,18 @@ extern "C" {
|
|||
struct ddsi_serdata;
|
||||
struct ddsi_serdata_ops;
|
||||
struct ddsi_sertopic_ops;
|
||||
struct q_globals;
|
||||
|
||||
struct ddsi_sertopic {
|
||||
const struct ddsi_sertopic_ops *ops;
|
||||
const struct ddsi_serdata_ops *serdata_ops;
|
||||
uint32_t serdata_basehash;
|
||||
bool topickind_no_key;
|
||||
char *name_type_name;
|
||||
char *name;
|
||||
char *type_name;
|
||||
uint64_t iid;
|
||||
ddsrt_atomic_uint32_t refc; /* counts refs from entities, not from data */
|
||||
struct q_globals *gv;
|
||||
ddsrt_atomic_uint32_t refc; /* counts refs from entities (topic, reader, writer), not from data */
|
||||
};
|
||||
|
||||
/* The old and the new happen to have the same memory layout on a 64-bit machine
|
||||
|
@ -48,6 +49,27 @@ struct ddsi_sertopic {
|
|||
binary compatible. */
|
||||
#define DDSI_SERTOPIC_HAS_TOPICKIND_NO_KEY 1
|
||||
|
||||
/* Type changed: name_type_name and ii removed and gv added; and the set of
|
||||
operations got extended by the a predicate for testing to sertopics (with the
|
||||
same "ops") for equality ("equal") as well as a function for hashing the
|
||||
non-generic part of the sertopic definition (via "hash"). These two operations
|
||||
make it possible to intern sertopics without duplicates, which has become
|
||||
relevant now that multiple ddsi_sertopics can be associated with a single topic
|
||||
name.
|
||||
|
||||
Testing for DDSI_SERTOPIC_HAS_EQUAL_AND_HASH allows one to have a single source
|
||||
that can handle both variants, but there's no binary compatbility. */
|
||||
#define DDSI_SERTOPIC_HAS_EQUAL_AND_HASH 1
|
||||
|
||||
/* Called to compare two sertopics for equality, if it is already known that name,
|
||||
type name, topickind_no_Key, and operations are all the same. (serdata_basehash
|
||||
is computed from the set of operations.) */
|
||||
typedef bool (*ddsi_sertopic_equal_t) (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b);
|
||||
|
||||
/* Hash the custom components of a sertopic (this XOR'd with a hash computed from
|
||||
the fields that are defined in struct ddsi_sertopic) */
|
||||
typedef uint32_t (*ddsi_sertopic_hash_t) (const struct ddsi_sertopic *tp);
|
||||
|
||||
/* Called when the refcount dropped to zero */
|
||||
typedef void (*ddsi_sertopic_free_t) (struct ddsi_sertopic *tp);
|
||||
|
||||
|
@ -66,15 +88,22 @@ struct ddsi_sertopic_ops {
|
|||
ddsi_sertopic_zero_samples_t zero_samples;
|
||||
ddsi_sertopic_realloc_samples_t realloc_samples;
|
||||
ddsi_sertopic_free_samples_t free_samples;
|
||||
ddsi_sertopic_equal_t equal;
|
||||
ddsi_sertopic_hash_t hash;
|
||||
};
|
||||
|
||||
struct ddsi_sertopic *ddsi_sertopic_lookup_locked (struct q_globals *gv, const struct ddsi_sertopic *sertopic_template);
|
||||
void ddsi_sertopic_register_locked (struct q_globals *gv, struct ddsi_sertopic *sertopic);
|
||||
|
||||
DDS_EXPORT void ddsi_sertopic_init (struct ddsi_sertopic *tp, const char *name, const char *type_name, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key);
|
||||
DDS_EXPORT void ddsi_sertopic_init_anon (struct ddsi_sertopic *tp, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key);
|
||||
DDS_EXPORT void ddsi_sertopic_fini (struct ddsi_sertopic *tp);
|
||||
DDS_EXPORT struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp);
|
||||
DDS_EXPORT void ddsi_sertopic_unref (struct ddsi_sertopic *tp);
|
||||
DDS_EXPORT uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops);
|
||||
|
||||
DDS_EXPORT bool ddsi_sertopic_equal (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b);
|
||||
DDS_EXPORT uint32_t ddsi_sertopic_hash (const struct ddsi_sertopic *tp);
|
||||
|
||||
DDS_EXPORT inline void ddsi_sertopic_free (struct ddsi_sertopic *tp) {
|
||||
tp->ops->free (tp);
|
||||
}
|
||||
|
|
|
@ -290,6 +290,9 @@ struct q_globals {
|
|||
struct ddsi_builtin_topic_interface *builtin_topic_interface;
|
||||
|
||||
struct nn_group_membership *mship;
|
||||
|
||||
ddsrt_mutex_t sertopics_lock;
|
||||
struct ddsrt_hh *sertopics;
|
||||
};
|
||||
|
||||
#if defined (__cplusplus)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds/ddsrt/log.h"
|
||||
#include "dds/ddsrt/md5.h"
|
||||
#include "dds/ddsrt/mh3.h"
|
||||
#include "dds/ddsi/q_bswap.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
#include "dds/ddsi/q_freelist.h"
|
||||
|
@ -117,59 +118,10 @@ static void serdata_default_append_blob (struct ddsi_serdata_default **d, size_t
|
|||
memcpy (p, data, sz);
|
||||
}
|
||||
|
||||
/* Fixed seed and length */
|
||||
|
||||
#define DDS_MH3_LEN 16
|
||||
#define DDS_MH3_SEED 0
|
||||
|
||||
#define DDS_MH3_ROTL32(x,r) (((x) << (r)) | ((x) >> (32 - (r))))
|
||||
|
||||
/* Really
|
||||
http://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp,
|
||||
MurmurHash3_x86_32
|
||||
*/
|
||||
|
||||
static uint32_t dds_mh3 (const void * key)
|
||||
{
|
||||
const uint8_t *data = (const uint8_t *) key;
|
||||
const intptr_t nblocks = (intptr_t) (DDS_MH3_LEN / 4);
|
||||
const uint32_t c1 = 0xcc9e2d51;
|
||||
const uint32_t c2 = 0x1b873593;
|
||||
|
||||
uint32_t h1 = DDS_MH3_SEED;
|
||||
|
||||
const uint32_t *blocks = (const uint32_t *) (data + nblocks * 4);
|
||||
register intptr_t i;
|
||||
|
||||
for (i = -nblocks; i; i++)
|
||||
{
|
||||
uint32_t k1 = blocks[i];
|
||||
|
||||
k1 *= c1;
|
||||
k1 = DDS_MH3_ROTL32 (k1, 15);
|
||||
k1 *= c2;
|
||||
|
||||
h1 ^= k1;
|
||||
h1 = DDS_MH3_ROTL32 (h1, 13);
|
||||
h1 = h1 * 5+0xe6546b64;
|
||||
}
|
||||
|
||||
/* finalization */
|
||||
|
||||
h1 ^= DDS_MH3_LEN;
|
||||
h1 ^= h1 >> 16;
|
||||
h1 *= 0x85ebca6b;
|
||||
h1 ^= h1 >> 13;
|
||||
h1 *= 0xc2b2ae35;
|
||||
h1 ^= h1 >> 16;
|
||||
|
||||
return h1;
|
||||
}
|
||||
|
||||
static struct ddsi_serdata *fix_serdata_default(struct ddsi_serdata_default *d, uint32_t basehash)
|
||||
{
|
||||
if (d->keyhash.m_iskey)
|
||||
d->c.hash = dds_mh3 (d->keyhash.m_hash) ^ basehash;
|
||||
d->c.hash = ddsrt_mh3 (d->keyhash.m_hash, 16, 0) ^ basehash;
|
||||
else
|
||||
d->c.hash = *((uint32_t *)d->keyhash.m_hash) ^ basehash;
|
||||
return &d->c;
|
||||
|
@ -651,8 +603,8 @@ static size_t serdata_default_print_plist (const struct ddsi_sertopic *sertopic_
|
|||
src.buf = (const unsigned char *) d->data;
|
||||
src.bufsz = d->pos;
|
||||
src.encoding = d->hdr.identifier;
|
||||
src.factory = tp->gv->m_factory;
|
||||
src.logconfig = &tp->gv->logconfig;
|
||||
src.factory = tp->c.gv->m_factory;
|
||||
src.logconfig = &tp->c.gv->logconfig;
|
||||
src.protocol_version.major = RTPS_MAJOR;
|
||||
src.protocol_version.minor = RTPS_MINOR;
|
||||
src.strict = false;
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds/ddsrt/md5.h"
|
||||
#include "dds/ddsrt/mh3.h"
|
||||
#include "dds/ddsrt/hopscotch.h"
|
||||
#include "dds/ddsrt/string.h"
|
||||
#include "dds/ddsi/q_bswap.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
|
@ -23,21 +25,82 @@
|
|||
#include "dds/ddsi/ddsi_iid.h"
|
||||
#include "dds/ddsi/ddsi_sertopic.h"
|
||||
#include "dds/ddsi/ddsi_serdata.h"
|
||||
#include "dds/ddsi/q_globals.h"
|
||||
|
||||
bool ddsi_sertopic_equal (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b)
|
||||
{
|
||||
if (strcmp (a->name, b->name) != 0)
|
||||
return false;
|
||||
if (strcmp (a->type_name, b->type_name) != 0)
|
||||
return false;
|
||||
if (a->serdata_basehash != b->serdata_basehash)
|
||||
return false;
|
||||
if (a->ops != b->ops)
|
||||
return false;
|
||||
if (a->serdata_ops != b->serdata_ops)
|
||||
return false;
|
||||
return a->ops->equal (a, b);
|
||||
}
|
||||
|
||||
uint32_t ddsi_sertopic_hash (const struct ddsi_sertopic *a)
|
||||
{
|
||||
uint32_t h;
|
||||
h = ddsrt_mh3 (a->name, strlen (a->name), a->serdata_basehash);
|
||||
h = ddsrt_mh3 (a->type_name, strlen (a->type_name), h);
|
||||
return h ^ a->ops->hash (a);
|
||||
}
|
||||
|
||||
struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *sertopic_const)
|
||||
{
|
||||
struct ddsi_sertopic *sertopic = (struct ddsi_sertopic *)sertopic_const;
|
||||
struct ddsi_sertopic *sertopic = (struct ddsi_sertopic *) sertopic_const;
|
||||
if (sertopic)
|
||||
ddsrt_atomic_inc32 (&sertopic->refc);
|
||||
return sertopic;
|
||||
}
|
||||
|
||||
struct ddsi_sertopic *ddsi_sertopic_lookup_locked (struct q_globals *gv, const struct ddsi_sertopic *sertopic_template)
|
||||
{
|
||||
struct ddsi_sertopic *sertopic = ddsrt_hh_lookup (gv->sertopics, sertopic_template);
|
||||
#ifndef NDEBUG
|
||||
if (sertopic != NULL)
|
||||
{
|
||||
assert (sertopic->gv != NULL);
|
||||
assert (sertopic->iid != 0);
|
||||
}
|
||||
#endif
|
||||
return ddsi_sertopic_ref (sertopic);
|
||||
}
|
||||
|
||||
void ddsi_sertopic_register_locked (struct q_globals *gv, struct ddsi_sertopic *sertopic)
|
||||
{
|
||||
assert (sertopic->gv == NULL);
|
||||
assert (sertopic->iid == 0);
|
||||
assert (ddsrt_atomic_ld32 (&sertopic->refc) == 1);
|
||||
|
||||
(void) ddsi_sertopic_ref (sertopic);
|
||||
sertopic->gv = gv;
|
||||
sertopic->iid = ddsi_iid_gen ();
|
||||
int x = ddsrt_hh_add (gv->sertopics, sertopic);
|
||||
assert (x);
|
||||
(void) x;
|
||||
}
|
||||
|
||||
void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic)
|
||||
{
|
||||
if (sertopic)
|
||||
{
|
||||
if (ddsrt_atomic_dec32_ov (&sertopic->refc) == 1)
|
||||
{
|
||||
/* if registered, drop from set of registered sertopics */
|
||||
if (sertopic->gv)
|
||||
{
|
||||
ddsrt_mutex_lock (&sertopic->gv->sertopics_lock);
|
||||
(void) ddsrt_hh_remove (sertopic->gv->sertopics, sertopic);
|
||||
ddsrt_mutex_unlock (&sertopic->gv->sertopics_lock);
|
||||
sertopic->gv = NULL;
|
||||
sertopic->iid = 0;
|
||||
}
|
||||
|
||||
ddsi_sertopic_free (sertopic);
|
||||
}
|
||||
}
|
||||
|
@ -46,36 +109,21 @@ void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic)
|
|||
void ddsi_sertopic_init (struct ddsi_sertopic *tp, const char *name, const char *type_name, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key)
|
||||
{
|
||||
ddsrt_atomic_st32 (&tp->refc, 1);
|
||||
tp->iid = ddsi_iid_gen ();
|
||||
tp->name = ddsrt_strdup (name);
|
||||
tp->type_name = ddsrt_strdup (type_name);
|
||||
size_t ntn_sz = strlen (tp->name) + 1 + strlen (tp->type_name) + 1;
|
||||
tp->name_type_name = ddsrt_malloc (ntn_sz);
|
||||
(void) snprintf (tp->name_type_name, ntn_sz, "%s/%s", tp->name, tp->type_name);
|
||||
tp->ops = sertopic_ops;
|
||||
tp->serdata_ops = serdata_ops;
|
||||
tp->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->serdata_ops);
|
||||
tp->topickind_no_key = topickind_no_key;
|
||||
}
|
||||
|
||||
void ddsi_sertopic_init_anon (struct ddsi_sertopic *tp, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key)
|
||||
{
|
||||
ddsrt_atomic_st32 (&tp->refc, 1);
|
||||
tp->iid = ddsi_iid_gen ();
|
||||
tp->name = NULL;
|
||||
tp->type_name = NULL;
|
||||
tp->name_type_name = NULL;
|
||||
tp->ops = sertopic_ops;
|
||||
tp->serdata_ops = serdata_ops;
|
||||
tp->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->serdata_ops);
|
||||
tp->topickind_no_key = topickind_no_key;
|
||||
/* set later, on registration */
|
||||
tp->iid = 0;
|
||||
tp->gv = NULL;
|
||||
}
|
||||
|
||||
void ddsi_sertopic_fini (struct ddsi_sertopic *tp)
|
||||
{
|
||||
ddsrt_free (tp->name);
|
||||
ddsrt_free (tp->type_name);
|
||||
ddsrt_free (tp->name_type_name);
|
||||
}
|
||||
|
||||
uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include <string.h>
|
||||
|
||||
#include "dds/ddsrt/md5.h"
|
||||
#include "dds/ddsrt/mh3.h"
|
||||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds/ddsi/q_bswap.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
|
@ -22,6 +23,25 @@
|
|||
#include "dds/ddsi/ddsi_sertopic.h"
|
||||
#include "dds/ddsi/ddsi_serdata_default.h"
|
||||
|
||||
static bool sertopic_default_equal (const struct ddsi_sertopic *acmn, const struct ddsi_sertopic *bcmn)
|
||||
{
|
||||
const struct ddsi_sertopic_default *a = (struct ddsi_sertopic_default *) acmn;
|
||||
const struct ddsi_sertopic_default *b = (struct ddsi_sertopic_default *) bcmn;
|
||||
return a->type == b->type;
|
||||
}
|
||||
|
||||
static uint32_t sertopic_default_hash (const struct ddsi_sertopic *tpcmn)
|
||||
{
|
||||
const struct ddsi_sertopic_default *tp = (struct ddsi_sertopic_default *) tpcmn;
|
||||
if (tp->type == NULL)
|
||||
return 0;
|
||||
else
|
||||
{
|
||||
return ddsrt_mh3 (tp->type->m_keys, tp->type->m_nkeys * sizeof (*tp->type->m_keys),
|
||||
ddsrt_mh3 (tp->type->m_ops, tp->type->m_nops * sizeof (*tp->type->m_ops), 0));
|
||||
}
|
||||
}
|
||||
|
||||
static void sertopic_default_free (struct ddsi_sertopic *tp)
|
||||
{
|
||||
ddsi_sertopic_fini (tp);
|
||||
|
@ -76,6 +96,8 @@ static void sertopic_default_free_samples (const struct ddsi_sertopic *sertopic_
|
|||
}
|
||||
|
||||
const struct ddsi_sertopic_ops ddsi_sertopic_ops_default = {
|
||||
.equal = sertopic_default_equal,
|
||||
.hash = sertopic_default_hash,
|
||||
.free = sertopic_default_free,
|
||||
.zero_samples = sertopic_default_zero_samples,
|
||||
.realloc_samples = sertopic_default_realloc_samples,
|
||||
|
|
|
@ -767,7 +767,7 @@ static void wait_for_receive_threads (struct q_globals *gv)
|
|||
}
|
||||
}
|
||||
|
||||
static struct ddsi_sertopic *make_special_topic (struct q_globals *gv, struct serdatapool *serpool, uint16_t enc_id, const struct ddsi_serdata_ops *ops)
|
||||
static struct ddsi_sertopic *make_special_topic (const char *name, struct serdatapool *serpool, uint16_t enc_id, const struct ddsi_serdata_ops *ops)
|
||||
{
|
||||
/* FIXME: two things (at least)
|
||||
- it claims there is a key, but the underlying type description is missing
|
||||
|
@ -779,26 +779,34 @@ static struct ddsi_sertopic *make_special_topic (struct q_globals *gv, struct se
|
|||
(kinda natural if they stop being "default" ones) */
|
||||
struct ddsi_sertopic_default *st = ddsrt_malloc (sizeof (*st));
|
||||
memset (st, 0, sizeof (*st));
|
||||
ddsi_sertopic_init_anon (&st->c, &ddsi_sertopic_ops_default, ops, false);
|
||||
st->gv = gv;
|
||||
ddsi_sertopic_init (&st->c, name, name, &ddsi_sertopic_ops_default, ops, false);
|
||||
st->native_encoding_identifier = enc_id;
|
||||
st->serpool = serpool;
|
||||
st->nkeys = 1;
|
||||
return (struct ddsi_sertopic *) st;
|
||||
}
|
||||
|
||||
static void make_special_topics (struct q_globals *gv)
|
||||
{
|
||||
gv->plist_topic = make_special_topic (gv, gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist);
|
||||
gv->rawcdr_topic = make_special_topic (gv, gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr);
|
||||
}
|
||||
|
||||
static void free_special_topics (struct q_globals *gv)
|
||||
{
|
||||
ddsi_sertopic_unref (gv->plist_topic);
|
||||
ddsi_sertopic_unref (gv->rawcdr_topic);
|
||||
}
|
||||
|
||||
static void make_special_topics (struct q_globals *gv)
|
||||
{
|
||||
gv->plist_topic = make_special_topic ("plist", gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist);
|
||||
gv->rawcdr_topic = make_special_topic ("rawcdr", gv->serpool, DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr);
|
||||
|
||||
ddsrt_mutex_lock (&gv->sertopics_lock);
|
||||
ddsi_sertopic_register_locked (gv, gv->plist_topic);
|
||||
ddsi_sertopic_register_locked (gv, gv->rawcdr_topic);
|
||||
ddsrt_mutex_unlock (&gv->sertopics_lock);
|
||||
|
||||
/* register increments refcount (which is reasonable), but at some point
|
||||
one needs to get rid of that reference */
|
||||
free_special_topics (gv);
|
||||
}
|
||||
|
||||
static bool use_multiple_receive_threads (const struct config *cfg)
|
||||
{
|
||||
/* Under some unknown circumstances Windows (at least Windows 10) exhibits
|
||||
|
@ -909,6 +917,16 @@ fail:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int ddsi_sertopic_equal_wrap (const void *a, const void *b)
|
||||
{
|
||||
return ddsi_sertopic_equal (a, b);
|
||||
}
|
||||
|
||||
static uint32_t ddsi_sertopic_hash_wrap (const void *tp)
|
||||
{
|
||||
return ddsi_sertopic_hash (tp);
|
||||
}
|
||||
|
||||
int rtps_init (struct q_globals *gv)
|
||||
{
|
||||
uint32_t port_disc_uc = 0;
|
||||
|
@ -1078,6 +1096,8 @@ int rtps_init (struct q_globals *gv)
|
|||
make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_rd, &gv->default_xqos_rd);
|
||||
make_builtin_endpoint_xqos (&gv->builtin_endpoint_xqos_wr, &gv->default_xqos_wr);
|
||||
|
||||
ddsrt_mutex_init (&gv->sertopics_lock);
|
||||
gv->sertopics = ddsrt_hh_new (1, ddsi_sertopic_hash_wrap, ddsi_sertopic_equal_wrap);
|
||||
make_special_topics (gv);
|
||||
|
||||
ddsrt_mutex_init (&gv->participant_set_lock);
|
||||
|
@ -1410,6 +1430,14 @@ err_unicast_sockets:
|
|||
ddsrt_cond_destroy (&gv->participant_set_cond);
|
||||
ddsrt_mutex_destroy (&gv->participant_set_lock);
|
||||
free_special_topics (gv);
|
||||
#ifndef NDEBUG
|
||||
{
|
||||
struct ddsrt_hh_iter it;
|
||||
assert (ddsrt_hh_iter_first (gv->sertopics, &it) == NULL);
|
||||
}
|
||||
#endif
|
||||
ddsrt_hh_free (gv->sertopics);
|
||||
ddsrt_mutex_destroy (&gv->sertopics_lock);
|
||||
nn_xqos_fini (&gv->builtin_endpoint_xqos_wr);
|
||||
nn_xqos_fini (&gv->builtin_endpoint_xqos_rd);
|
||||
nn_xqos_fini (&gv->spdp_endpoint_xqos);
|
||||
|
@ -1748,6 +1776,15 @@ void rtps_fini (struct q_globals *gv)
|
|||
ddsrt_cond_destroy (&gv->participant_set_cond);
|
||||
free_special_topics (gv);
|
||||
|
||||
#ifndef NDEBUG
|
||||
{
|
||||
struct ddsrt_hh_iter it;
|
||||
assert (ddsrt_hh_iter_first (gv->sertopics, &it) == NULL);
|
||||
}
|
||||
#endif
|
||||
ddsrt_hh_free (gv->sertopics);
|
||||
ddsrt_mutex_destroy (&gv->sertopics_lock);
|
||||
|
||||
nn_xqos_fini (&gv->builtin_endpoint_xqos_wr);
|
||||
nn_xqos_fini (&gv->builtin_endpoint_xqos_rd);
|
||||
nn_xqos_fini (&gv->spdp_endpoint_xqos);
|
||||
|
|
|
@ -921,10 +921,10 @@ int main (int argc, char **argv)
|
|||
mainthread = lookup_thread_state ();
|
||||
assert (ddsrt_atomic_ldvoidp (&mainthread->gv) != NULL);
|
||||
{
|
||||
struct dds_entity *x;
|
||||
if (dds_entity_lock(tp, DDS_KIND_TOPIC, &x) < 0) abort();
|
||||
mdtopic = dds_topic_lookup(x->m_domain, "RhcTypes_T");
|
||||
dds_entity_unlock(x);
|
||||
struct dds_topic *x;
|
||||
if (dds_topic_pin (tp, &x) < 0) abort();
|
||||
mdtopic = ddsi_sertopic_ref (x->m_stopic);
|
||||
dds_topic_unpin (x);
|
||||
}
|
||||
|
||||
if (0 >= first)
|
||||
|
|
|
@ -111,6 +111,7 @@ list(APPEND headers
|
|||
"${include_path}/dds/ddsrt/endian.h"
|
||||
"${include_path}/dds/ddsrt/arch.h"
|
||||
"${include_path}/dds/ddsrt/misc.h"
|
||||
"${include_path}/dds/ddsrt/mh3.h"
|
||||
"${include_path}/dds/ddsrt/io.h"
|
||||
"${include_path}/dds/ddsrt/process.h"
|
||||
"${include_path}/dds/ddsrt/strtod.h"
|
||||
|
@ -127,6 +128,7 @@ list(APPEND sources
|
|||
"${source_path}/retcode.c"
|
||||
"${source_path}/strtod.c"
|
||||
"${source_path}/strtol.c"
|
||||
"${source_path}/mh3.c"
|
||||
"${source_path}/avl.c"
|
||||
"${source_path}/expand_envvars.c"
|
||||
"${source_path}/fibheap.c"
|
||||
|
|
34
src/ddsrt/include/dds/ddsrt/mh3.h
Normal file
34
src/ddsrt/include/dds/ddsrt/mh3.h
Normal file
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License v. 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
||||
* v. 1.0 which is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
*/
|
||||
#ifndef DDSRT_MH3_H
|
||||
#define DDSRT_MH3_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stddef.h>
|
||||
|
||||
#include "dds/export.h"
|
||||
|
||||
#if defined(__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
DDS_EXPORT uint32_t
|
||||
ddsrt_mh3(
|
||||
const void *key,
|
||||
size_t len,
|
||||
uint32_t seed);
|
||||
|
||||
#if defined(__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* DDSRT_MH3_H */
|
69
src/ddsrt/src/mh3.c
Normal file
69
src/ddsrt/src/mh3.c
Normal file
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License v. 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
||||
* v. 1.0 which is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
*/
|
||||
#include "dds/ddsrt/mh3.h"
|
||||
|
||||
#define DDSRT_MH3_ROTL32(x,r) (((x) << (r)) | ((x) >> (32 - (r))))
|
||||
|
||||
/* Really
|
||||
http://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp,
|
||||
MurmurHash3_x86_32
|
||||
*/
|
||||
uint32_t ddsrt_mh3 (const void *key, size_t len, uint32_t seed)
|
||||
{
|
||||
const uint8_t *data = (const uint8_t *) key;
|
||||
const intptr_t nblocks = (intptr_t) (len / 4);
|
||||
const uint32_t c1 = 0xcc9e2d51;
|
||||
const uint32_t c2 = 0x1b873593;
|
||||
|
||||
uint32_t h1 = seed;
|
||||
const uint32_t *blocks = (const uint32_t *) (data + nblocks * 4);
|
||||
for (intptr_t i = -nblocks; i; i++)
|
||||
{
|
||||
uint32_t k1 = blocks[i];
|
||||
|
||||
k1 *= c1;
|
||||
k1 = DDSRT_MH3_ROTL32 (k1, 15);
|
||||
k1 *= c2;
|
||||
|
||||
h1 ^= k1;
|
||||
h1 = DDSRT_MH3_ROTL32 (h1, 13);
|
||||
h1 = h1 * 5 + 0xe6546b64;
|
||||
}
|
||||
|
||||
const uint8_t *tail = data + nblocks * 4;
|
||||
uint32_t k1 = 0;
|
||||
switch (len & 3)
|
||||
{
|
||||
case 3:
|
||||
k1 ^= (uint32_t) tail[2] << 16;
|
||||
/* FALLS THROUGH */
|
||||
case 2:
|
||||
k1 ^= (uint32_t) tail[1] << 8;
|
||||
/* FALLS THROUGH */
|
||||
case 1:
|
||||
k1 ^= (uint32_t) tail[0];
|
||||
k1 *= c1;
|
||||
k1 = DDSRT_MH3_ROTL32 (k1, 15);
|
||||
k1 *= c2;
|
||||
h1 ^= k1;
|
||||
/* FALLS THROUGH */
|
||||
};
|
||||
|
||||
/* finalization */
|
||||
h1 ^= (uint32_t) len;
|
||||
h1 ^= h1 >> 16;
|
||||
h1 *= 0x85ebca6b;
|
||||
h1 ^= h1 >> 13;
|
||||
h1 *= 0xc2b2ae35;
|
||||
h1 ^= h1 >> 16;
|
||||
return h1;
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue