Promote domains and Cyclone library to entities

This commit adds two entity types: a "domain", which is the parent of
participants and which is instantiated for each domain that has at least
one participant in it; and "cyclonedds", which is a representation of
the (initialized) Cyclone DDS library in the process and that is the
parent of all domain entities.  The handle of the latter is a
compile-constant, DDS_CYCLONEDDS_HANDLE.

This changes the return value from dds_get_parent when executed on a
participant: it now returns the handle of the entity representing the
domain the participant is attached to.  Two participants in the same
domain self-evidently return the same domain entity.

This allows deleting all participants in a domain by calling dds_delete
on the domain entity, or tearing down everything and deinitializing the
library by calling dds_delete on the top-level entity.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-08-28 14:15:28 +02:00 committed by eboasson
parent c6befb48a7
commit 0b12ff5cfc
23 changed files with 690 additions and 305 deletions

View file

@ -66,6 +66,9 @@ struct ddsi_serdata;
#define DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION ((dds_entity_t) (DDS_MIN_PSEUDO_HANDLE + 4))
/** @}*/
/** Special handle representing the entity corresponding to the CycloneDDS library itself */
#define DDS_CYCLONEDDS_HANDLE ((dds_entity_t) (DDS_MIN_PSEUDO_HANDLE + 256))
/** @name Communication Status definitions
@{**/
typedef enum dds_status_id {
@ -860,7 +863,8 @@ dds_get_children(dds_entity_t entity, dds_entity_t *children, size_t size);
* DataReaders), etc are also attached to that domain.
*
* This function will return the original domain ID when called on
* any of the entities within that hierarchy.
* any of the entities within that hierarchy. For entities not associated
* with a domain, the id is set to DDS_DOMAIN_DEFAULT.
*
* @param[in] entity Entity from which to get its children.
* @param[out] id Pointer to put the domain ID in.

View file

@ -108,7 +108,9 @@ typedef enum dds_entity_kind
DDS_KIND_COND_READ,
DDS_KIND_COND_QUERY,
DDS_KIND_COND_GUARD,
DDS_KIND_WAITSET
DDS_KIND_WAITSET,
DDS_KIND_DOMAIN,
DDS_KIND_CYCLONEDDS
} dds_entity_kind_t;
/* Handles are opaque pointers to implementation types */

View file

@ -19,7 +19,6 @@ extern "C" {
#endif
DDS_EXPORT dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id);
DDS_EXPORT void dds_domain_free (dds_domain *domain);
DDS_EXPORT dds_domain *dds_domain_find_locked (dds_domainid_t id);
#if defined (__cplusplus)

View file

@ -78,6 +78,9 @@ DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status);
DDS_EXPORT void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst);
DDS_EXPORT dds_participant *dds_entity_participant (dds_entity *e);
DDS_EXPORT void dds_entity_final_deinit_before_free (dds_entity *e);
DDS_EXPORT dds_return_t
dds_entity_pin (
dds_entity_t hdl,
@ -107,10 +110,6 @@ dds_entity_observer_unregister(
dds_entity *observed,
dds_entity *observer);
DDS_EXPORT dds_domain *
dds__entity_domain(
dds_entity* e);
DDS_EXPORT dds_return_t
dds_generic_unimplemented_operation_manykinds(
dds_entity_t handle,

View file

@ -112,6 +112,14 @@ dds_handle_create(
struct dds_handle_link *link);
/*
* Register a specific handle.
*/
DDS_EXPORT dds_return_t
dds_handle_register_special (
struct dds_handle_link *link, dds_handle_t handle);
/*
* This will close the handle. All information remains, only new claims will
* fail.

View file

@ -19,27 +19,15 @@ extern "C" {
#endif
/**
*Description : Initialization function, called from main. This operation
*initializes all the required DDS resources,
*handles configuration of domainid based on the input passed, parses and
*configures middleware from a xml file and initializes required resources.
*Description : Initializes the library and constructs the global
*pseudo-entity identified by DDS_CYCLONEDDS_HANDLE with one reference
*that must (eventually) be released by calling dds_delete on that handle.
*
*Arguments :
*-# Returns 0 on success or a non-zero error status
**/
dds_return_t dds_init (void);
/* Finalization function, called from main */
/**
*Description : Finalization function, called from main. This operation
*releases all the resources used by DDS.
*
*Arguments :
*-# None
**/
void dds_fini (void);
#if defined (__cplusplus)
}
#endif

View file

@ -93,28 +93,9 @@ struct dds_listener {
#define DDS_ENTITY_ENABLED 0x0001u
#define DDS_ENTITY_IMPLICIT 0x0002u
typedef struct dds_domain {
/* FIXME: protected by dds_global.lock -- for now */
ddsrt_avl_node_t m_node;
dds_domainid_t m_id;
ddsrt_avl_tree_t m_topics;
ddsrt_avl_tree_t m_ppants;
uint32_t m_refc;
struct cfgst *cfgst;
struct ddsi_sertopic *builtin_participant_topic;
struct ddsi_sertopic *builtin_reader_topic;
struct ddsi_sertopic *builtin_writer_topic;
struct local_orphan_writer *builtintopic_writer_participant;
struct local_orphan_writer *builtintopic_writer_publications;
struct local_orphan_writer *builtintopic_writer_subscriptions;
struct ddsi_builtin_topic_interface btif;
struct q_globals gv;
} dds_domain;
struct dds_domain;
struct dds_entity;
typedef struct dds_entity_deriver {
/* Close can be used to terminate (blocking) actions on a entity before actually deleting it. */
dds_return_t (*close)(struct dds_entity *e) ddsrt_nonnull_all;
@ -141,7 +122,6 @@ typedef struct dds_entity {
struct dds_entity *m_parent; /* constant */
ddsrt_avl_node_t m_avlnode_child; /* [m_mutex of m_parent] */
ddsrt_avl_tree_t m_children; /* [m_mutex] tree on m_iid using m_avlnode_child */
struct dds_entity *m_participant; /* constant */
struct dds_domain *m_domain; /* constant */
dds_qos_t *m_qos; /* [m_mutex] */
nn_guid_t m_guid; /* unique (if not 0) and constant; FIXME: set during creation, but possibly after becoming visible */
@ -182,6 +162,8 @@ extern const struct dds_entity_deriver dds_entity_deriver_publisher;
extern const struct dds_entity_deriver dds_entity_deriver_readcondition;
extern const struct dds_entity_deriver dds_entity_deriver_guardcondition;
extern const struct dds_entity_deriver dds_entity_deriver_waitset;
extern const struct dds_entity_deriver dds_entity_deriver_domain;
extern const struct dds_entity_deriver dds_entity_deriver_cyclonedds;
extern const struct dds_entity_deriver *dds_entity_deriver_table[];
dds_return_t dds_entity_deriver_dummy_close (struct dds_entity *e);
@ -189,7 +171,6 @@ dds_return_t dds_entity_deriver_dummy_delete (struct dds_entity *e);
dds_return_t dds_entity_deriver_dummy_set_qos (struct dds_entity *e, const dds_qos_t *qos, bool enabled);
dds_return_t dds_entity_deriver_dummy_validate_status (uint32_t mask);
inline dds_return_t dds_entity_deriver_close (struct dds_entity *e) {
return (dds_entity_deriver_table[e->m_kind]->close) (e);
}
@ -209,6 +190,36 @@ inline bool dds_entity_supports_validate_status (struct dds_entity *e) {
return dds_entity_deriver_table[e->m_kind]->validate_status != dds_entity_deriver_dummy_validate_status;
}
typedef struct dds_cyclonedds_entity {
struct dds_entity m_entity;
ddsrt_mutex_t m_mutex;
ddsrt_cond_t m_cond;
ddsrt_avl_tree_t m_domains;
uint32_t threadmon_count;
struct ddsi_threadmon *threadmon;
} dds_cyclonedds_entity;
typedef struct dds_domain {
struct dds_entity m_entity;
ddsrt_avl_node_t m_node; /* for dds_global.m_domains */
dds_domainid_t m_id;
ddsrt_avl_tree_t m_topics;
struct cfgst *cfgst;
struct ddsi_sertopic *builtin_participant_topic;
struct ddsi_sertopic *builtin_reader_topic;
struct ddsi_sertopic *builtin_writer_topic;
struct local_orphan_writer *builtintopic_writer_participant;
struct local_orphan_writer *builtintopic_writer_publications;
struct local_orphan_writer *builtintopic_writer_subscriptions;
struct ddsi_builtin_topic_interface btif;
struct q_globals gv;
} dds_domain;
typedef struct dds_subscriber {
struct dds_entity m_entity;
} dds_subscriber;
@ -309,17 +320,7 @@ typedef struct dds_waitset {
dds_attachment *entities; /* [m_entity.m_mutex] 0 .. ntriggered are triggred, ntriggred .. nentities are not */
} dds_waitset;
/* Globals */
typedef struct dds_globals {
int32_t m_init_count;
ddsrt_avl_tree_t m_domains;
ddsrt_mutex_t m_mutex;
uint32_t threadmon_count;
struct ddsi_threadmon *threadmon;
} dds_globals;
DDS_EXPORT extern dds_globals dds_global;
DDS_EXPORT extern dds_cyclonedds_entity dds_global;
#if defined (__cplusplus)
}

View file

@ -22,6 +22,7 @@
#include "dds__participant.h"
#include "dds__types.h"
#include "dds__builtin.h"
#include "dds__entity.h"
#include "dds__subscriber.h"
#include "dds__write.h"
#include "dds__writer.h"
@ -46,9 +47,15 @@ dds_entity_t dds__get_builtin_topic (dds_entity_t entity, dds_entity_t topic)
dds_entity_t tp;
dds_return_t rc;
dds_entity *e;
dds_participant *par;
if ((rc = dds_entity_pin (entity, &e)) < 0)
return rc;
if ((par = dds_entity_participant (e)) == NULL)
{
dds_entity_unpin (e);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
struct ddsi_sertopic *sertopic;
switch (topic)
@ -69,7 +76,7 @@ dds_entity_t dds__get_builtin_topic (dds_entity_t entity, dds_entity_t topic)
}
dds_qos_t *qos = dds__create_builtin_qos ();
tp = dds_create_topic_arbitrary (e->m_participant->m_hdllink.hdl, sertopic, qos, NULL, NULL);
tp = dds_create_topic_arbitrary (par->m_entity.m_hdllink.hdl, sertopic, qos, NULL, NULL);
dds_delete_qos (qos);
dds_entity_unpin (e);
return tp;

View file

@ -19,6 +19,7 @@
#include "dds__domain.h"
#include "dds__builtin.h"
#include "dds__whc_builtintopic.h"
#include "dds__entity.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_serdata.h"
@ -29,6 +30,15 @@
#include "dds/ddsi/q_globals.h"
#include "dds/version.h"
static dds_return_t dds_domain_free (dds_entity *vdomain);
const struct dds_entity_deriver dds_entity_deriver_domain = {
.close = dds_entity_deriver_dummy_close,
.delete = dds_domain_free,
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status
};
static int dds_domain_compare (const void *va, const void *vb)
{
const dds_domainid_t *a = va;
@ -44,9 +54,15 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
dds_return_t ret = DDS_RETCODE_OK;
char * uri = NULL;
uint32_t len;
dds_entity_t domain_handle;
if ((domain_handle = dds_entity_init (&domain->m_entity, &dds_global.m_entity, DDS_KIND_DOMAIN, NULL, NULL, 0)) < 0)
return domain_handle;
domain->m_entity.m_domain = domain;
domain->m_entity.m_flags |= DDS_ENTITY_IMPLICIT;
domain->m_entity.m_iid = ddsi_iid_gen ();
domain->gv.tstart = now ();
domain->m_refc = 1;
ddsrt_avl_init (&dds_topictree_def, &domain->m_topics);
/* | domain_id | domain id in config | result
@ -170,30 +186,11 @@ fail_rtps_init:
fail_rtps_config:
config_fini (domain->cfgst);
fail_config:
dds_handle_close (&domain->m_entity.m_hdllink);
dds_handle_delete (&domain->m_entity.m_hdllink, DDS_INFINITY);
return ret;
}
static void dds_domain_fini (struct dds_domain *domain)
{
rtps_stop (&domain->gv);
dds__builtin_fini (domain);
if (domain->gv.config.liveliness_monitoring)
ddsi_threadmon_unregister_domain (dds_global.threadmon, &domain->gv);
rtps_fini (&domain->gv);
ddsrt_mutex_lock (&dds_global.m_mutex);
if (domain->gv.config.liveliness_monitoring && --dds_global.threadmon_count == 0)
{
ddsi_threadmon_stop (dds_global.threadmon);
ddsi_threadmon_free (dds_global.threadmon);
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
config_fini (domain->cfgst);
}
dds_domain *dds_domain_find_locked (dds_domainid_t id)
{
return ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &id);
@ -201,12 +198,12 @@ dds_domain *dds_domain_find_locked (dds_domainid_t id)
dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id)
{
struct dds_domain *dom = NULL;
struct dds_domain *dom;
dds_return_t ret;
/* FIXME: should perhaps lock parent object just like everywhere */
ddsrt_mutex_lock (&dds_global.m_mutex);
/* FIXME: hack around default domain ids, not yet being able to handle multiple domains simultaneously */
retry:
if (id != DDS_DOMAIN_DEFAULT)
{
if ((dom = dds_domain_find_locked (id)) == NULL)
@ -225,8 +222,19 @@ dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id)
switch (ret)
{
case DDS_RETCODE_OK:
dom->m_refc++;
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
if (dds_handle_is_closed (&dom->m_entity.m_hdllink))
{
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
ddsrt_cond_wait (&dds_global.m_cond, &dds_global.m_mutex);
goto retry;
}
else
{
dds_entity_add_ref_locked (&dom->m_entity);
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}
break;
case DDS_RETCODE_NOT_FOUND:
dom = dds_alloc (sizeof (*dom));
@ -234,32 +242,46 @@ dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id)
dds_free (dom);
else
{
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
ddsrt_avl_insert (&dds_domaintree_def, &dds_global.m_domains, dom);
dds_entity_register_child (&dds_global.m_entity, &dom->m_entity);
dds_entity_add_ref_locked (&dom->m_entity);
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}
break;
case DDS_RETCODE_PRECONDITION_NOT_MET:
DDS_ILOG (DDS_LC_ERROR, id, "Inconsistent domain configuration detected: domain on configuration: %"PRIu32", domain %"PRIu32"\n", dom->m_id, id);
break;
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
return ret;
}
void dds_domain_free (dds_domain *domain)
static dds_return_t dds_domain_free (dds_entity *vdomain)
{
struct dds_domain *domain = (struct dds_domain *) vdomain;
rtps_stop (&domain->gv);
dds__builtin_fini (domain);
if (domain->gv.config.liveliness_monitoring)
ddsi_threadmon_unregister_domain (dds_global.threadmon, &domain->gv);
rtps_fini (&domain->gv);
/* tearing down the top-level object has more consequences, so it waits until signalled that all
domains have been removed */
ddsrt_mutex_lock (&dds_global.m_mutex);
if (--domain->m_refc != 0)
if (domain->gv.config.liveliness_monitoring && --dds_global.threadmon_count == 0)
{
ddsrt_mutex_unlock (&dds_global.m_mutex);
ddsi_threadmon_stop (dds_global.threadmon);
ddsi_threadmon_free (dds_global.threadmon);
}
else
{
ddsrt_avl_delete (&dds_domaintree_def, &dds_global.m_domains, domain);
dds_entity_final_deinit_before_free (vdomain);
config_fini (domain->cfgst);
dds_free (vdomain);
ddsrt_cond_broadcast (&dds_global.m_cond);
ddsrt_mutex_unlock (&dds_global.m_mutex);
dds_domain_fini (domain);
dds_free (domain);
}
return DDS_RETCODE_NO_DATA;
}
#include "dds__entity.h"
@ -306,7 +328,7 @@ void dds_write_set_batch (bool enable)
dds_instance_handle_t last_iid = 0;
struct dds_entity *e;
while (dom && (e = ddsrt_avl_lookup_succ (&dds_entity_children_td, &dom->m_ppants, &last_iid)) != NULL)
while (dom && (e = ddsrt_avl_lookup_succ (&dds_entity_children_td, &dom->m_entity.m_children, &last_iid)) != NULL)
{
struct dds_entity *x;
last_iid = e->m_iid;
@ -321,5 +343,6 @@ void dds_write_set_batch (bool enable)
}
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
dds_fini ();
/* FIXME */
dds_delete (DDS_CYCLONEDDS_HANDLE);
}

View file

@ -40,6 +40,8 @@ const struct dds_entity_deriver *dds_entity_deriver_table[] = {
[DDS_KIND_COND_QUERY] = &dds_entity_deriver_readcondition,
[DDS_KIND_COND_GUARD] = &dds_entity_deriver_guardcondition,
[DDS_KIND_WAITSET] = &dds_entity_deriver_waitset,
[DDS_KIND_DOMAIN] = &dds_entity_deriver_domain,
[DDS_KIND_CYCLONEDDS] = &dds_entity_deriver_cyclonedds
};
dds_return_t dds_entity_deriver_dummy_close (struct dds_entity *e) {
@ -80,16 +82,6 @@ void dds_entity_add_ref_locked (dds_entity *e)
dds_handle_add_ref (&e->m_hdllink);
}
dds_domain *dds__entity_domain (dds_entity *e)
{
return e->m_domain;
}
static dds_entity *dds__nonself_parent (dds_entity *e)
{
return e->m_parent == e ? NULL : e->m_parent;
}
static bool entity_has_status (const dds_entity *e)
{
switch (e->m_kind)
@ -105,6 +97,8 @@ static bool entity_has_status (const dds_entity *e)
case DDS_KIND_COND_QUERY:
case DDS_KIND_COND_GUARD:
case DDS_KIND_WAITSET:
case DDS_KIND_DOMAIN:
case DDS_KIND_CYCLONEDDS:
break;
case DDS_KIND_DONTCARE:
abort ();
@ -117,7 +111,7 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
{
dds_handle_t handle;
assert ((kind == DDS_KIND_PARTICIPANT) == (parent == NULL));
assert ((kind == DDS_KIND_CYCLONEDDS) == (parent == NULL));
assert (e);
e->m_kind = kind;
@ -143,14 +137,14 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
{
e->m_parent = parent;
e->m_domain = parent->m_domain;
e->m_participant = parent->m_participant;
ddsrt_avl_init (&dds_entity_children_td, &e->m_children);
}
else
{
e->m_participant = e;
e->m_parent = e;
assert (kind == DDS_KIND_CYCLONEDDS);
e->m_parent = NULL;
e->m_domain = NULL;
}
ddsrt_avl_init (&dds_entity_children_td, &e->m_children);
dds_reset_listener (&e->m_listener);
if (listener)
@ -162,8 +156,16 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
ddsrt_mutex_unlock (&parent->m_observers_lock);
}
if (kind == DDS_KIND_CYCLONEDDS)
{
if ((handle = dds_handle_register_special (&e->m_hdllink, DDS_CYCLONEDDS_HANDLE)) <= 0)
return (dds_entity_t) handle;
}
else
{
if ((handle = dds_handle_create (&e->m_hdllink)) <= 0)
return (dds_entity_t) handle;
}
/* An dds_handle_t is directly used as dds_entity_t. */
return (dds_entity_t) handle;
@ -187,31 +189,100 @@ static dds_entity *next_non_topic_child (ddsrt_avl_tree_t *remaining_children)
return NULL;
}
static dds_return_t dds_delete_impl (dds_entity_t entity, bool deleting_parent);
static dds_return_t dds_delete_impl_pinned_and_locked (dds_entity *e, bool deleting_parent);
enum delete_impl_state {
DIS_EXPLICIT, /* explicit delete on this entity */
DIS_FROM_PARENT, /* called because the parent is being deleted */
DIS_IMPLICIT /* called from child; delete if implicit w/o children */
};
#define TRACE_DELETE 0 /* FIXME: use DDS_LOG for this */
#if TRACE_DELETE
static const char *entity_kindstr (dds_entity_kind_t kind)
{
switch (kind)
{
case DDS_KIND_TOPIC: return "topic";
case DDS_KIND_READER: return "reader";
case DDS_KIND_WRITER: return "writer";
case DDS_KIND_PUBLISHER: return "publisher";
case DDS_KIND_SUBSCRIBER: return "subscriber";
case DDS_KIND_PARTICIPANT: return "participant";
case DDS_KIND_COND_READ: return "readcond";
case DDS_KIND_COND_QUERY: return "querycond";
case DDS_KIND_COND_GUARD: return "guardcond";
case DDS_KIND_WAITSET: return "waitset";
case DDS_KIND_DOMAIN: return "domain";
case DDS_KIND_CYCLONEDDS: return "cyclonedds";
case DDS_KIND_DONTCARE: break;
}
return "UNDEF";
}
static void print_delete (const dds_entity *e, enum delete_impl_state delstate , dds_instance_handle_t iid)
{
unsigned cm = ddsrt_atomic_ld32 (&e->m_hdllink.cnt_flags);
printf ("delete(%p, delstate %s, iid %"PRIx64"): %s%s %d pin %u refc %u %s %s\n",
(void *) e, (delstate == DIS_IMPLICIT) ? "implicit" : (delstate == DIS_EXPLICIT) ? "explicit" : "from_parent", iid,
entity_kindstr (e->m_kind), (e->m_flags & DDS_ENTITY_IMPLICIT) ? " [implicit]" : "",
e->m_hdllink.hdl, cm & 0xfff, (cm >> 12) & 0xffff, (cm & 0x80000000) ? "closed" : "open",
ddsrt_avl_is_empty (&e->m_children) ? "childless" : "has-children");
}
#endif
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate, dds_instance_handle_t iid);
dds_return_t dds_delete (dds_entity_t entity)
{
return dds_delete_impl (entity, false);
return dds_delete_impl (entity, DIS_EXPLICIT, 0);
}
static dds_return_t dds_delete_impl (dds_entity_t entity, bool deleting_parent)
void dds_entity_final_deinit_before_free (dds_entity *e)
{
dds_entity *e;
dds_return_t rc;
if ((rc = dds_entity_pin (entity, &e)) < 0)
return rc;
ddsrt_mutex_lock (&e->m_mutex);
return dds_delete_impl_pinned_and_locked (e, deleting_parent);
dds_delete_qos (e->m_qos);
ddsrt_cond_destroy (&e->m_cond);
ddsrt_cond_destroy (&e->m_observers_cond);
ddsrt_mutex_destroy (&e->m_mutex);
ddsrt_mutex_destroy (&e->m_observers_lock);
}
static dds_return_t dds_delete_impl_pinned_and_locked (dds_entity *e, bool deleting_parent)
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate, dds_instance_handle_t iid)
{
dds_time_t timeout = DDS_SECS (10);
dds_entity *child;
dds_return_t ret;
dds_entity *e;
/* iid is used to guarantee that attempts at deleting implicit parents never touch the wrong
entity: there is a tiny chance that the parent got deleted in parallel and the handle has been
reused, but there is no risk of the iid getting reused. There is no such risk for the other
cases, so there require iid = 0. */
assert ((delstate == DIS_IMPLICIT) == (iid != 0));
if ((ret = dds_entity_pin (entity, &e)) < 0)
{
#if TRACE_DELETE
printf ("delete %"PRId32" - pin failed: %s\n", entity, dds_strretcode (ret));
#endif
return ret;
}
ddsrt_mutex_lock (&e->m_mutex);
#if TRACE_DELETE
print_delete (e, delstate, iid);
#endif
if (delstate == DIS_IMPLICIT)
{
/* Called after deleting a child; only delete if implicit & no remaining children. There is a
tiny chance that the parent got deleted in parallel and the handle has been reused, but there
is no risk of the iid getting reused. */
if (!(e->m_iid == iid && (e->m_flags & DDS_ENTITY_IMPLICIT) && ddsrt_avl_is_empty (&e->m_children)))
{
ddsrt_mutex_unlock (&e->m_mutex);
dds_entity_unpin (e);
return DDS_RETCODE_OK;
}
}
if (! dds_handle_drop_ref (&e->m_hdllink))
{
@ -256,7 +327,7 @@ static dds_return_t dds_delete_impl_pinned_and_locked (dds_entity *e, bool delet
{
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
ret = dds_delete_impl (child_handle, true);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT, 0);
ddsrt_mutex_lock (&e->m_mutex);
}
while ((child = ddsrt_avl_find_min (&dds_entity_children_td, &e->m_children)) != NULL && ret == DDS_RETCODE_OK)
@ -264,7 +335,7 @@ static dds_return_t dds_delete_impl_pinned_and_locked (dds_entity *e, bool delet
assert (dds_entity_kind (child) == DDS_KIND_TOPIC);
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
ret = dds_delete_impl (child_handle, true);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT, 0);
ddsrt_mutex_lock (&e->m_mutex);
}
ddsrt_mutex_unlock (&e->m_mutex);
@ -272,104 +343,93 @@ static dds_return_t dds_delete_impl_pinned_and_locked (dds_entity *e, bool delet
ret = dds_entity_deriver_close (e);
dds_entity_unpin (e);
/* FIXME: deleting shouldn't fail, and bailing out halfway through deleting is also bad */
/* FIXME: deleting shouldn't fail, and bailing out halfway through deleting is also
bad */
if (ret != DDS_RETCODE_OK)
return ret;
/* The dds_handle_delete will wait until the last active claim on that handle
is released. It is possible that this last release will be done by a thread
that was kicked during the close(). */
/* The dds_handle_delete will wait until the last active claim on that handle is
released. It is possible that this last release will be done by a thread that was
kicked during the close(). */
if ((ret = dds_handle_delete (&e->m_hdllink, timeout)) != DDS_RETCODE_OK)
return ret;
/* Remove all possible observers. */
dds_entity_observers_delete (e);
/* Remove from parent; schedule deletion if it was created implicitly and no longer
has any remaining children */
dds_entity *parent_to_delete = NULL;
/* Remove from parent; schedule deletion of parent if it was created implicitly, no
longer has any remaining children, and we didn't arrive here as a consequence of
deleting the parent. */
dds_entity_t parent_handle = 0;
dds_instance_handle_t parent_iid = 0;
if (e->m_parent != NULL)
{
dds_entity * const parent = dds__nonself_parent (e);
if (parent != NULL)
struct dds_entity * const p = e->m_parent;
ddsrt_mutex_lock (&p->m_mutex);
assert (ddsrt_avl_lookup (&dds_entity_children_td, &p->m_children, &e->m_iid) != NULL);
ddsrt_avl_delete (&dds_entity_children_td, &p->m_children, e);
if (delstate != DIS_FROM_PARENT && (p->m_flags & DDS_ENTITY_IMPLICIT) && ddsrt_avl_is_empty (&p->m_children))
{
ddsrt_mutex_lock (&parent->m_mutex);
assert (ddsrt_avl_lookup (&dds_entity_children_td, &parent->m_children, &e->m_iid) != NULL);
ddsrt_avl_delete (&dds_entity_children_td, &parent->m_children, e);
if (!deleting_parent && ddsrt_avl_is_empty (&parent->m_children) && (parent->m_flags & DDS_ENTITY_IMPLICIT))
{
/* another thread might be attempting to delete the parent already, so an
error return is acceptable */
if (dds_entity_pin (parent->m_hdllink.hdl, &parent_to_delete) < 0)
parent_to_delete = NULL;
else
assert (parent == parent_to_delete);
}
if (parent_to_delete == NULL)
{
/* 'Tis admittedly ugly to unlock the parent only if we're not going to delete it
but the advantage of keeping it pinned & locked is that no-one else can delete
it or use it until we do; and deferring it means we can do tail recursion (which
might not be worth the bother ...) */
ddsrt_mutex_unlock (&parent->m_mutex);
}
}
parent_handle = p->m_hdllink.hdl;
parent_iid = p->m_iid;
}
/* Do some specific deletion when needed. */
if ((ret = dds_entity_deriver_delete (e)) != DDS_RETCODE_OK)
{
if (parent_to_delete != NULL)
ddsrt_mutex_unlock (&parent_to_delete->m_mutex);
ddsrt_mutex_unlock (&p->m_mutex);
}
/* Do some specific deletion when needed. Bootstrapping and its inverse are always a
tricky business, and here it is no different: deleting the pseudo-top-level object
tears down all kinds of stuff that is supposed to remain in existence (like the
entire platform abstraction) and so it must be the final call. Thus, we rely on it
to call "dds_entity_final_deinit_before_free" and return a special error code. */
ret = dds_entity_deriver_delete (e);
if (ret == DDS_RETCODE_NO_DATA)
ret = DDS_RETCODE_OK;
else if (ret != DDS_RETCODE_OK)
return ret;
else
{
dds_entity_final_deinit_before_free (e);
dds_free (e);
}
dds_delete_qos (e->m_qos);
ddsrt_cond_destroy (&e->m_cond);
ddsrt_cond_destroy (&e->m_observers_cond);
ddsrt_mutex_destroy (&e->m_mutex);
ddsrt_mutex_destroy (&e->m_observers_lock);
dds_free (e);
return (parent_to_delete != NULL) ? dds_delete_impl_pinned_and_locked (parent_to_delete, false) : ret;
return (parent_handle != 0) ? dds_delete_impl (parent_handle, DIS_IMPLICIT, parent_iid) : DDS_RETCODE_OK;
}
dds_entity_t dds_get_parent (dds_entity_t entity)
{
dds_entity *e;
dds_return_t rc;
if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
else
{
dds_entity *parent;
dds_entity_t hdl;
if ((parent = dds__nonself_parent(e)) == NULL)
hdl = DDS_ENTITY_NIL;
else
{
dds_entity *x;
hdl = parent->m_hdllink.hdl;
if (dds_entity_lock (hdl, DDS_KIND_DONTCARE, &x) == DDS_RETCODE_OK)
{
parent->m_flags &= ~DDS_ENTITY_IMPLICIT;
dds_entity_unlock (parent);
}
}
dds_entity_unlock (e);
dds_entity_t hdl = e->m_parent ? e->m_parent->m_hdllink.hdl : 0;
dds_entity_unpin (e);
return hdl;
}
}
dds_participant *dds_entity_participant (dds_entity *e)
{
while (e && dds_entity_kind (e) != DDS_KIND_PARTICIPANT)
e = e->m_parent;
return (dds_participant *) e;
}
dds_entity_t dds_get_participant (dds_entity_t entity)
{
dds_entity *e;
dds_return_t rc;
if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
else
{
dds_entity_t hdl = e->m_participant->m_hdllink.hdl;
dds_entity_unlock (e);
dds_participant *par = dds_entity_participant (e);
dds_entity_t hdl = par ? par->m_entity.m_hdllink.hdl : 0;
dds_entity_unpin (e);
return hdl;
}
}
@ -379,41 +439,21 @@ dds_return_t dds_get_children (dds_entity_t entity, dds_entity_t *children, size
dds_entity *e;
dds_return_t rc;
if (children != NULL && (size <= 0 || size >= INT32_MAX))
return DDS_RETCODE_BAD_PARAMETER;
if (children == NULL && size != 0)
if ((children != NULL && (size == 0 || size > INT32_MAX)) || (children == NULL && size != 0))
return DDS_RETCODE_BAD_PARAMETER;
if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
else
{
/* FIXME: fix the implicit/explicit stuff so the set_explicit isn't needed; and then this can also be done with a regular iterator & without unlocking */
ddsrt_avl_iter_t it;
size_t n = 0;
dds_instance_handle_t last_iid = 0;
struct dds_entity *c;
ddsrt_mutex_lock (&e->m_mutex);
while ((c = ddsrt_avl_lookup_succ (&dds_entity_children_td, &e->m_children, &last_iid)) != NULL)
for (c = ddsrt_avl_iter_first (&dds_entity_children_td, &e->m_children, &it); c != NULL; c = ddsrt_avl_iter_next (&it))
{
last_iid = c->m_iid;
if (n < size)
{
dds_entity *x;
/* Claim child handle to guarantee the child entity remains valid; as we unlock "e" only when we manage to claim the child, and the child has to remain in existence until we release it, "c" remains a valid pointer despite the unlocking. */
if (dds_entity_pin (c->m_hdllink.hdl, &x) == DDS_RETCODE_OK)
{
assert (x == c);
children[n] = c->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
ddsrt_mutex_lock (&c->m_mutex);
c->m_flags &= ~DDS_ENTITY_IMPLICIT;
ddsrt_mutex_unlock (&c->m_mutex);
ddsrt_mutex_lock (&e->m_mutex);
dds_entity_unpin (c);
}
}
n++;
}
ddsrt_mutex_unlock (&e->m_mutex);
@ -445,6 +485,8 @@ static uint64_t entity_kind_qos_mask (dds_entity_kind_t kind)
case DDS_KIND_COND_QUERY:
case DDS_KIND_COND_GUARD:
case DDS_KIND_WAITSET:
case DDS_KIND_DOMAIN:
case DDS_KIND_CYCLONEDDS:
break;
}
return 0;
@ -625,7 +667,8 @@ dds_return_t dds_set_qos (dds_entity_t entity, const dds_qos_t *qos)
{
case DDS_KIND_TOPIC: {
dds_entity *pp;
if (dds_entity_pin (e->m_participant->m_hdllink.hdl, &pp) == DDS_RETCODE_OK)
assert (dds_entity_kind (e->m_parent) == DDS_KIND_PARTICIPANT);
if (dds_entity_pin (e->m_parent->m_hdllink.hdl, &pp) == DDS_RETCODE_OK)
{
pushdown_topic_qos (pp, e);
dds_entity_unpin (pp);
@ -823,7 +866,7 @@ dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listen
if (listener)
dds_merge_listener (&e->m_listener, listener);
x = e;
while (dds_entity_kind (x) != DDS_KIND_PARTICIPANT)
while (dds_entity_kind (x) != DDS_KIND_CYCLONEDDS)
{
x = x->m_parent;
ddsrt_mutex_lock (&x->m_observers_lock);
@ -987,11 +1030,11 @@ dds_return_t dds_get_domainid (dds_entity_t entity, dds_domainid_t *id)
if (id == NULL)
return DDS_RETCODE_BAD_PARAMETER;
if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
if ((rc = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
return rc;
*id = e->m_domain->m_id;
dds_entity_unlock(e);
*id = e->m_domain ? e->m_domain->m_id : DDS_DOMAIN_DEFAULT;
dds_entity_unpin (e);
return DDS_RETCODE_OK;
}

View file

@ -100,6 +100,13 @@ void dds_handle_server_fini (void)
#else /* USE_CHH */
#ifndef NDEBUG
struct ddsrt_hh_iter it;
for (struct dds_handle_link *link = ddsrt_hh_iter_first (handles.ht, &it); link != NULL; link = ddsrt_hh_iter_next (&it))
{
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
DDS_ERROR ("handle %"PRId32" pin %"PRIu32" ref %"PRIu32" %s\n", link->hdl,
cf & HDL_PINCOUNT_MASK, (cf & HDL_REFCOUNT_MASK) >> HDL_REFCOUNT_SHIFT,
cf & HDL_FLAG_CLOSED ? "closed" : "open");
}
assert (ddsrt_hh_iter_first (handles.ht, &it) == NULL);
#endif
ddsrt_hh_free (handles.ht);
@ -158,6 +165,50 @@ dds_handle_t dds_handle_create (struct dds_handle_link *link)
return ret;
}
dds_return_t dds_handle_register_special (struct dds_handle_link *link, dds_handle_t handle)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#endif
dds_return_t ret;
if (handle <= 0)
return DDS_RETCODE_BAD_PARAMETER;
#if USE_CHH
thread_state_awake (ts1);
#endif
ddsrt_mutex_lock (&handles.lock);
if (handles.count == MAX_HANDLES)
{
ddsrt_mutex_unlock (&handles.lock);
ret = DDS_RETCODE_OUT_OF_RESOURCES;
}
else
{
handles.count++;
ddsrt_atomic_st32 (&link->cnt_flags, HDL_REFCOUNT_UNIT);
link->hdl = handle;
#if USE_CHH
ddsrt_mutex_unlock (&handles.lock);
if (hhadd (handles.ht, link))
ret = handle;
else
ret = DDS_RETCODE_BAD_PARAMETER;
return link->hdl;
#else
if (hhadd (handles.ht, link))
ret = handle;
else
ret = DDS_RETCODE_BAD_PARAMETER;
ddsrt_mutex_unlock (&handles.lock);
#endif
assert (ret > 0);
}
#if USE_CHH
thread_state_asleep (ts1);
#endif
return ret;
}
void dds_handle_close (struct dds_handle_link *link)
{
ddsrt_atomic_or32 (&link->cnt_flags, HDL_FLAG_CLOSED);

View file

@ -22,6 +22,7 @@
#include "dds__domain.h"
#include "dds__builtin.h"
#include "dds__whc_builtintopic.h"
#include "dds__entity.h"
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_serdata.h"
@ -32,10 +33,61 @@
#include "dds/ddsi/q_globals.h"
#include "dds/version.h"
#define DOMAIN_ID_MIN 0
#define DOMAIN_ID_MAX 230
static dds_return_t dds_close (struct dds_entity *e);
static dds_return_t dds_fini (struct dds_entity *e);
dds_globals dds_global;
const struct dds_entity_deriver dds_entity_deriver_cyclonedds = {
.close = dds_close,
.delete = dds_fini,
.set_qos = dds_entity_deriver_dummy_set_qos,
.validate_status = dds_entity_deriver_dummy_validate_status
};
dds_cyclonedds_entity dds_global;
enum dds_cyclonedds_state {
CDDS_STATE_ZERO,
CDDS_STATE_STARTING,
CDDS_STATE_READY,
CDDS_STATE_STOPPING
};
static enum dds_cyclonedds_state dds_state;
static void common_cleanup (void)
{
downgrade_main_thread ();
thread_states_fini ();
ddsi_iid_fini ();
ddsrt_cond_destroy (&dds_global.m_cond);
ddsrt_mutex_destroy (&dds_global.m_mutex);
dds_state = CDDS_STATE_ZERO;
ddsrt_cond_broadcast (ddsrt_get_singleton_cond ());
}
static bool cyclonedds_entity_ready (void)
{
assert (dds_state != CDDS_STATE_ZERO);
if (dds_state == CDDS_STATE_STARTING || dds_state == CDDS_STATE_STOPPING)
return false;
else
{
struct dds_handle_link *x;
bool ready;
if (dds_handle_pin (DDS_CYCLONEDDS_HANDLE, &x) < 0)
ready = false;
else
{
ddsrt_mutex_lock (&dds_global.m_entity.m_mutex);
ready = !dds_handle_is_closed (x);
if (ready)
dds_entity_add_ref_locked (&dds_global.m_entity);
ddsrt_mutex_unlock (&dds_global.m_entity.m_mutex);
dds_handle_unpin (x);
}
return ready;
}
}
dds_return_t dds_init (void)
{
@ -43,14 +95,28 @@ dds_return_t dds_init (void)
ddsrt_init ();
ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
ddsrt_cond_t * const init_cond = ddsrt_get_singleton_cond ();
ddsrt_mutex_lock (init_mutex);
if (dds_global.m_init_count++ != 0)
while (dds_state != CDDS_STATE_ZERO && !cyclonedds_entity_ready ())
ddsrt_cond_wait (init_cond, init_mutex);
switch (dds_state)
{
case CDDS_STATE_READY:
assert (dds_global.m_entity.m_hdllink.hdl == DDS_CYCLONEDDS_HANDLE);
ddsrt_mutex_unlock (init_mutex);
return DDS_RETCODE_OK;
case CDDS_STATE_ZERO:
dds_state = CDDS_STATE_STARTING;
break;
default:
ddsrt_mutex_unlock (init_mutex);
ddsrt_fini ();
return DDS_RETCODE_ERROR;
}
ddsrt_mutex_init (&dds_global.m_mutex);
ddsrt_cond_init (&dds_global.m_cond);
ddsi_iid_init ();
thread_states_init_static ();
thread_states_init (64);
@ -63,30 +129,57 @@ dds_return_t dds_init (void)
goto fail_handleserver;
}
dds_entity_init (&dds_global.m_entity, NULL, DDS_KIND_CYCLONEDDS, NULL, NULL, 0);
dds_global.m_entity.m_iid = ddsi_iid_gen ();
dds_global.m_entity.m_flags = DDS_ENTITY_IMPLICIT;
ddsrt_mutex_lock (&dds_global.m_entity.m_mutex);
dds_entity_add_ref_locked (&dds_global.m_entity);
ddsrt_mutex_unlock (&dds_global.m_entity.m_mutex);
dds_state = CDDS_STATE_READY;
ddsrt_mutex_unlock (init_mutex);
return DDS_RETCODE_OK;
fail_handleserver:
ddsrt_mutex_destroy (&dds_global.m_mutex);
dds_global.m_init_count--;
assert (dds_state == CDDS_STATE_STARTING);
common_cleanup ();
ddsrt_mutex_unlock (init_mutex);
ddsrt_fini ();
return ret;
}
extern void dds_fini (void)
static dds_return_t dds_close (struct dds_entity *e)
{
(void) e;
ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
ddsrt_cond_t * const init_cond = ddsrt_get_singleton_cond ();
ddsrt_mutex_lock (init_mutex);
assert (dds_global.m_init_count > 0);
if (--dds_global.m_init_count == 0)
{
assert (dds_state == CDDS_STATE_READY);
dds_state = CDDS_STATE_STOPPING;
ddsrt_cond_broadcast (init_cond);
ddsrt_mutex_unlock (init_mutex);
return DDS_RETCODE_OK;
}
static dds_return_t dds_fini (struct dds_entity *e)
{
(void) e;
ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
/* If there are multiple domains shutting down simultaneously, the one "deleting" the top-level
entity (and thus arriving here) may have overtaken another thread that is still in the process
of deleting its domain object. For most entities such races are not an issue, but here we tear
down the run-time, so here we must wait until everyone else is out. */
ddsrt_mutex_lock (&dds_global.m_mutex);
while (!ddsrt_avl_is_empty (&dds_global.m_domains))
ddsrt_cond_wait (&dds_global.m_cond, &dds_global.m_mutex);
ddsrt_mutex_unlock (&dds_global.m_mutex);
ddsrt_mutex_lock (init_mutex);
assert (dds_state == CDDS_STATE_STOPPING);
dds_entity_final_deinit_before_free (e);
dds_handle_server_fini ();
downgrade_main_thread ();
thread_states_fini ();
ddsi_iid_fini ();
ddsrt_mutex_destroy (&dds_global.m_mutex);
}
common_cleanup ();
ddsrt_mutex_unlock (init_mutex);
ddsrt_fini ();
return DDS_RETCODE_NO_DATA;
}

View file

@ -27,15 +27,14 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
{
dds_writer *wr;
dds_return_t rc;
if (rds == NULL && nrds > 0)
if ((rds != NULL && (nrds == 0 || nrds > INT32_MAX)) || (rds == NULL && nrds != 0))
return DDS_RETCODE_BAD_PARAMETER;
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
return rc;
else
{
const struct ephash *gh = wr->m_entity.m_domain->gv.guid_hash;
const int32_t nrds_max = (nrds > INT32_MAX) ? INT32_MAX : (int32_t) nrds;
int32_t nrds_act = 0;
size_t nrds_act = 0;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer */
thread_state_awake (lookup_thread_state (), &wr->m_entity.m_domain->gv);
@ -47,7 +46,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
struct proxy_reader *prd;
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
{
if (nrds_act < nrds_max)
if (nrds_act < nrds)
rds[nrds_act] = prd->e.iid;
nrds_act++;
}
@ -59,7 +58,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
if (nrds_act < nrds_max)
if (nrds_act < nrds)
rds[nrds_act] = rd->e.iid;
nrds_act++;
}
@ -67,7 +66,10 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
ddsrt_mutex_unlock (&wr->m_wr->e.lock);
thread_state_asleep (lookup_thread_state ());
dds_writer_unlock (wr);
return nrds_act;
/* FIXME: is it really true that there can not be more than INT32_MAX matching readers?
(in practice it'll come to a halt long before that) */
assert (nrds_act <= INT32_MAX);
return (dds_return_t) nrds_act;
}
}
@ -75,15 +77,14 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
{
dds_reader *rd;
dds_return_t rc;
if (wrs == NULL && nwrs > 0)
if ((wrs != NULL && (nwrs == 0 || nwrs > INT32_MAX)) || (wrs == NULL && nwrs != 0))
return DDS_RETCODE_BAD_PARAMETER;
if ((rc = dds_reader_lock (reader, &rd)) != DDS_RETCODE_OK)
return rc;
else
{
const struct ephash *gh = rd->m_entity.m_domain->gv.guid_hash;
const int32_t nwrs_max = (nwrs > INT32_MAX) ? INT32_MAX : (int32_t) nwrs;
int32_t nwrs_act = 0;
size_t nwrs_act = 0;
ddsrt_avl_iter_t it;
/* FIXME: this ought not be so tightly coupled to the lower layer */
thread_state_awake (lookup_thread_state (), &rd->m_entity.m_domain->gv);
@ -95,7 +96,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
struct proxy_writer *pwr;
if ((pwr = ephash_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
{
if (nwrs_act < nwrs_max)
if (nwrs_act < nwrs)
wrs[nwrs_act] = pwr->e.iid;
nwrs_act++;
}
@ -107,7 +108,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
struct writer *wr;
if ((wr = ephash_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
{
if (nwrs_act < nwrs_max)
if (nwrs_act < nwrs)
wrs[nwrs_act] = wr->e.iid;
nwrs_act++;
}
@ -115,7 +116,10 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
ddsrt_mutex_unlock (&rd->m_rd->e.lock);
thread_state_asleep (lookup_thread_state ());
dds_reader_unlock (rd);
return nwrs_act;
/* FIXME: is it really true that there can not be more than INT32_MAX matching readers?
(in practice it'll come to a halt long before that) */
assert (nwrs_act <= INT32_MAX);
return (dds_return_t) nwrs_act;
}
}

View file

@ -42,14 +42,7 @@ static dds_return_t dds_participant_delete (dds_entity *e)
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((ret = delete_participant (&e->m_domain->gv, &e->m_guid)) < 0)
DDS_CERROR (&e->m_domain->gv.logconfig, "dds_participant_delete: internal error %"PRId32"\n", ret);
ddsrt_mutex_lock (&dds_global.m_mutex);
ddsrt_avl_delete (&dds_entity_children_td, &e->m_domain->m_ppants, e);
ddsrt_mutex_unlock (&dds_global.m_mutex);
thread_state_asleep (lookup_thread_state ());
/* Every dds_init needs a dds_fini. */
dds_domain_free (e->m_domain);
dds_fini ();
return DDS_RETCODE_OK;
}
@ -118,7 +111,7 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
}
pp = dds_alloc (sizeof (*pp));
if ((ret = dds_entity_init (&pp->m_entity, NULL, DDS_KIND_PARTICIPANT, new_qos, listener, DDS_PARTICIPANT_STATUS_MASK)) < 0)
if ((ret = dds_entity_init (&pp->m_entity, &dom->m_entity, DDS_KIND_PARTICIPANT, new_qos, listener, DDS_PARTICIPANT_STATUS_MASK)) < 0)
goto err_entity_init;
pp->m_entity.m_guid = guid;
@ -127,9 +120,13 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
pp->m_builtin_subscriber = 0;
/* Add participant to extent */
ddsrt_mutex_lock (&dds_global.m_mutex);
ddsrt_avl_insert (&dds_entity_children_td, &dom->m_ppants, &pp->m_entity);
ddsrt_mutex_unlock (&dds_global.m_mutex);
ddsrt_mutex_lock (&dds_global.m_entity.m_mutex);
dds_entity_register_child (&dom->m_entity, &pp->m_entity);
ddsrt_mutex_unlock (&dds_global.m_entity.m_mutex);
/* drop temporary extra ref to domain, dds_init */
dds_delete (dom->m_entity.m_hdllink.hdl);
dds_delete (DDS_CYCLONEDDS_HANDLE);
return ret;
err_entity_init:
@ -137,34 +134,33 @@ err_entity_init:
err_new_participant:
err_qos_validation:
dds_delete_qos (new_qos);
dds_domain_free (dom);
dds_delete (dom->m_entity.m_hdllink.hdl);
err_domain_create:
dds_fini ();
dds_delete (DDS_CYCLONEDDS_HANDLE);
err_dds_init:
return ret;
}
dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *participants, size_t size)
dds_return_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *participants, size_t size)
{
if ((participants != NULL && (size <= 0 || size >= INT32_MAX)) || (participants == NULL && size != 0))
return DDS_RETCODE_BAD_PARAMETER;
dds_return_t ret;
ddsrt_init ();
ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
if ((participants != NULL && (size == 0 || size >= INT32_MAX)) || (participants == NULL && size != 0))
return DDS_RETCODE_BAD_PARAMETER;
if (participants)
participants[0] = 0;
dds_return_t ret = 0;
ddsrt_mutex_lock (init_mutex);
if (dds_global.m_init_count > 0)
{
if ((ret = dds_init ()) < 0)
return ret;
ret = 0;
struct dds_domain *dom;
ddsrt_mutex_lock (&dds_global.m_mutex);
if ((dom = dds_domain_find_locked (domain_id)) != NULL)
{
ddsrt_avl_iter_t it;
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &dom->m_ppants, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &dom->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
{
if ((size_t) ret < size)
participants[ret] = e->m_hdllink.hdl;
@ -172,8 +168,6 @@ dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *par
}
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
}
ddsrt_mutex_unlock (init_mutex);
ddsrt_fini ();
dds_delete (DDS_CYCLONEDDS_HANDLE);
return ret;
}

View file

@ -288,6 +288,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
{
dds_qos_t *rqos;
dds_subscriber *sub = NULL;
dds_participant *pp;
dds_entity_t subscriber;
dds_reader *rd;
dds_topic *tp;
@ -340,7 +341,8 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
goto err_tp_lock;
}
assert (tp->m_stopic);
if (sub->m_entity.m_participant != tp->m_entity.m_participant)
pp = dds_entity_participant (&sub->m_entity);
if (pp != dds_entity_participant (&tp->m_entity))
{
reader = DDS_RETCODE_BAD_PARAMETER;
goto err_pp_mismatch;
@ -390,7 +392,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
ddsrt_mutex_unlock (&sub->m_entity.m_mutex);
thread_state_awake (lookup_thread_state (), &sub->m_entity.m_domain->gv);
ret = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
ret = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
ddsrt_mutex_lock (&sub->m_entity.m_mutex);
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
assert (ret == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */

View file

@ -131,7 +131,7 @@ static bool dds_find_topic_check_and_add_ref (dds_entity_t participant, dds_enti
return false;
bool ret;
if (tp->m_entity.m_participant->m_hdllink.hdl != participant || strcmp (tp->m_stopic->name, name) != 0)
if (dds_entity_participant (&tp->m_entity)->m_entity.m_hdllink.hdl != participant || strcmp (tp->m_stopic->name, name) != 0)
ret = false;
else
{
@ -251,7 +251,7 @@ static dds_return_t create_topic_topic_arbirary_check_sertopic (dds_entity_t par
if (dds_topic_lock (topic, &tp) < 0)
return DDS_RETCODE_NOT_FOUND;
if (tp->m_entity.m_participant->m_hdllink.hdl != participant)
if (dds_entity_participant (&tp->m_entity)->m_entity.m_hdllink.hdl != participant)
ret = DDS_RETCODE_NOT_FOUND;
else if (!sertopic_equivalent (tp->m_stopic, sertopic))
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
@ -296,6 +296,14 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
existing topic's compatibility */
if ((rc = dds_entity_pin (participant, &par_ent)) < 0)
return rc;
/* Verify that we've been given a participant, not strictly necessary
because dds_participant_lock below checks it, but this is more
obvious */
if (dds_entity_kind (par_ent) != DDS_KIND_PARTICIPANT)
{
dds_entity_unpin (par_ent);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
new_qos = dds_create_qos ();
if (qos)

View file

@ -246,6 +246,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
dds_writer *wr;
dds_entity_t writer;
dds_publisher *pub = NULL;
dds_participant *pp;
dds_topic *tp;
dds_entity_t publisher;
@ -254,7 +255,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
if ((rc = dds_entity_pin (participant_or_publisher, &p_or_p)) != DDS_RETCODE_OK)
return rc;
if (dds_entity_kind (p_or_p) == DDS_KIND_PARTICIPANT)
publisher = dds_create_publisher(participant_or_publisher, qos, NULL);
publisher = dds_create_publisher (participant_or_publisher, qos, NULL);
else
publisher = participant_or_publisher;
dds_entity_unpin (p_or_p);
@ -271,7 +272,8 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
goto err_tp_lock;
assert (tp->m_stopic);
if (pub->m_entity.m_participant != tp->m_entity.m_participant)
pp = dds_entity_participant (&pub->m_entity);
if (pp != dds_entity_participant (&tp->m_entity))
{
rc = DDS_RETCODE_BAD_PARAMETER;
goto err_pp_mismatch;
@ -311,7 +313,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
thread_state_awake (lookup_thread_state (), &pub->m_entity.m_domain->gv);
rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
ddsrt_mutex_lock (&pub->m_entity.m_mutex);
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
assert(rc == DDS_RETCODE_OK);
@ -329,9 +331,8 @@ err_pp_mismatch:
dds_topic_unlock (tp);
err_tp_lock:
dds_publisher_unlock (pub);
if ((pub->m_entity.m_flags & DDS_ENTITY_IMPLICIT) != 0){
(void )dds_delete (publisher);
}
if ((pub->m_entity.m_flags & DDS_ENTITY_IMPLICIT) != 0)
(void) dds_delete (publisher);
return rc;
}

View file

@ -20,6 +20,7 @@ set(ddsc_test_sources
"builtin_topics.c"
"config.c"
"dispose.c"
"domain.c"
"entity_api.c"
"entity_hierarchy.c"
"entity_status.c"

View file

@ -0,0 +1,142 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <stdlib.h>
#include "dds/dds.h"
#include "CUnit/Test.h"
#include "config_env.h"
#include "dds/version.h"
#include "dds/ddsrt/environ.h"
CU_Test(ddsc_domain, get_domainid)
{
dds_entity_t pp, d, x;
dds_return_t rc;
uint32_t did;
pp = dds_create_participant (0, NULL, NULL);
CU_ASSERT_FATAL (pp > 0);
d = dds_get_parent (pp);
CU_ASSERT_FATAL (d > 0);
x = dds_get_parent (d);
CU_ASSERT_FATAL (x == DDS_CYCLONEDDS_HANDLE);
x = dds_get_parent (x);
CU_ASSERT_FATAL (x == 0);
rc = dds_get_domainid (pp, &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
CU_ASSERT_FATAL (did == 0);
rc = dds_get_domainid (d, &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
CU_ASSERT_FATAL (did == 0);
rc = dds_get_domainid (DDS_CYCLONEDDS_HANDLE, &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
CU_ASSERT_FATAL (did == DDS_DOMAIN_DEFAULT);
rc = dds_delete (pp);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
}
CU_Test(ddsc_domain, delete_domain0)
{
dds_entity_t pp[3], d[3];
dds_return_t rc;
uint32_t did;
for (dds_domainid_t i = 0; i < (dds_domainid_t) (sizeof (pp) / sizeof (pp[0])); i++)
{
pp[i] = dds_create_participant (0, NULL, NULL);
CU_ASSERT_FATAL (pp[i] > 0);
d[i] = dds_get_parent (pp[i]);
CU_ASSERT_FATAL (d[i] > 0);
if (i > 0)
CU_ASSERT_FATAL (d[i] == d[i-1]);
}
rc = dds_delete (pp[0]);
CU_ASSERT_FATAL (rc == 0);
rc = dds_get_domainid (pp[0], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_BAD_PARAMETER);
for (size_t i = 1; i < sizeof (pp) / sizeof (pp[0]); i++)
{
rc = dds_get_domainid (pp[i], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
CU_ASSERT_FATAL (did == 0);
}
rc = dds_delete (d[1]);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
/* Deleting the domain should delete all participants in it as well,
and as there is only a single domain in this test, that should
de-initialize the library.
A non-initialized library returns PRECONDITION_NOT_MET; an
initialized one given an invalid handle returns BAD_PARAMETER,
so we can distinguish the two cases. */
rc = dds_get_domainid (pp[1], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_PRECONDITION_NOT_MET);
}
CU_Test(ddsc_domain, delete_domainM)
{
dds_entity_t pp[3], d[3], x;
dds_return_t rc;
uint32_t did;
for (dds_domainid_t i = 0; i < (dds_domainid_t) (sizeof (pp) / sizeof (pp[0])); i++)
{
pp[i] = dds_create_participant (i, NULL, NULL);
CU_ASSERT_FATAL (pp[i] > 0);
d[i] = dds_get_parent (pp[i]);
CU_ASSERT_FATAL (d[i] > 0);
for (dds_domainid_t j = 0; j < i; j++)
CU_ASSERT_FATAL (d[i] != d[j]);
}
/* deleting participant 0 should tear down domain 0, but nothing else */
rc = dds_delete (pp[0]);
CU_ASSERT_FATAL (rc == 0);
rc = dds_get_domainid (pp[0], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_BAD_PARAMETER);
rc = dds_get_domainid (d[0], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_BAD_PARAMETER);
/* deleting domain should delete participant 1, but leave domain 2 alone */
rc = dds_delete (d[1]);
CU_ASSERT_FATAL (rc == 0);
rc = dds_get_domainid (pp[1], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_BAD_PARAMETER);
x = dds_get_parent (pp[2]);
CU_ASSERT_FATAL (x == d[2]);
/* after deleting participant 2, everything should be gone */
rc = dds_delete (pp[2]);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_get_domainid (pp[1], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_PRECONDITION_NOT_MET);
}
CU_Test(ddsc_domain, delete_cyclonedds)
{
dds_entity_t pp[3], d[3];
dds_return_t rc;
uint32_t did;
for (dds_domainid_t i = 0; i < (dds_domainid_t) (sizeof (pp) / sizeof (pp[0])); i++)
{
pp[i] = dds_create_participant (i, NULL, NULL);
CU_ASSERT_FATAL (pp[i] > 0);
d[i] = dds_get_parent (pp[i]);
CU_ASSERT_FATAL (d[i] > 0);
for (dds_domainid_t j = 0; j < i; j++)
CU_ASSERT_FATAL (d[i] != d[j]);
}
/* deleting participant 0 should tear down domain 0, but nothing else */
rc = dds_delete (DDS_CYCLONEDDS_HANDLE);
CU_ASSERT_FATAL (rc == 0);
rc = dds_get_domainid (pp[0], &did);
CU_ASSERT_FATAL (rc == DDS_RETCODE_PRECONDITION_NOT_MET);
}

View file

@ -290,9 +290,12 @@ CU_Test(ddsc_entity, get_entities, .init = create_entity, .fini = delete_entity)
par = dds_get_parent (0);
CU_ASSERT_EQUAL_FATAL(par, DDS_RETCODE_BAD_PARAMETER);
/* Get Parent, a participant doesn't have a parent. */
/* Get Parent, a participant always has a parent (the domain). */
par = dds_get_parent (entity);
CU_ASSERT_EQUAL_FATAL(par, DDS_ENTITY_NIL);
CU_ASSERT_NOT_EQUAL_FATAL(par, DDS_HANDLE_NIL);
/* The domain has a parent: the pseudo-entity for the library */
par = dds_get_parent (par);
CU_ASSERT_EQUAL_FATAL(par, DDS_CYCLONEDDS_HANDLE);
/* ---------- Get Participant ------------ */

View file

@ -313,7 +313,11 @@ CU_Test(ddsc_entity_get_parent, participant, .init=hierarchy_init, .fini=hierarc
{
dds_entity_t parent;
parent = dds_get_parent(g_participant);
CU_ASSERT_EQUAL_FATAL(parent, DDS_ENTITY_NIL);
CU_ASSERT_NOT_EQUAL_FATAL(parent, DDS_ENTITY_NIL);
parent = dds_get_parent(parent);
CU_ASSERT_NOT_EQUAL_FATAL(parent, DDS_ENTITY_NIL);
parent = dds_get_parent(parent);
CU_ASSERT_NOT_EQUAL_FATAL(parent, DDS_CYCLONEDDS_HANDLE);
}
/*************************************************************************************************/
@ -366,7 +370,7 @@ CU_Test(ddsc_entity_get_children, invalid_size, .init=hierarchy_init, .fini=hier
{
dds_return_t ret;
dds_entity_t child;
ret = dds_get_children(g_participant, &child, INT32_MAX);
ret = dds_get_children(g_participant, &child, SIZE_MAX);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_BAD_PARAMETER);
}
/*************************************************************************************************/
@ -866,9 +870,8 @@ CU_Test(ddsc_entity_get_children, implicit_publisher)
dds_delete(writer);
ret = dds_get_children(participant, child2, 2);
CU_ASSERT_EQUAL_FATAL(ret, 2);
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_FATAL( (child2[0] == child[0]) || (child2[0] == child[1]) );
CU_ASSERT_FATAL( (child2[1] == child[0]) || (child2[1] == child[1]) );
dds_delete(topic);
dds_delete(participant);
@ -911,9 +914,8 @@ CU_Test(ddsc_entity_get_children, implicit_subscriber)
dds_delete(reader);
ret = dds_get_children(participant, child2, 2);
CU_ASSERT_EQUAL_FATAL(ret, 2);
CU_ASSERT_EQUAL_FATAL(ret, 1);
CU_ASSERT_FATAL( (child2[0] == child[0]) || (child2[0] == child[1]) );
CU_ASSERT_FATAL( (child2[1] == child[0]) || (child2[1] == child[1]) );
dds_delete(topic);
dds_delete(participant);
@ -947,7 +949,7 @@ CU_Test(ddsc_entity_get_parent, implicit_publisher)
dds_delete(writer);
ret = dds_delete(parent);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_BAD_PARAMETER);
dds_delete(participant);
}
/*************************************************************************************************/
@ -978,7 +980,7 @@ CU_Test(ddsc_entity_get_parent, implicit_subscriber)
dds_delete(reader);
ret = dds_delete(parent);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_BAD_PARAMETER);
dds_delete(participant);
}

View file

@ -25,6 +25,8 @@ DDS_EXPORT void ddsrt_fini(void);
DDS_EXPORT ddsrt_mutex_t *ddsrt_get_singleton_mutex(void);
DDS_EXPORT ddsrt_cond_t *ddsrt_get_singleton_cond(void);
#if defined (__cplusplus)
}
#endif

View file

@ -28,6 +28,7 @@ extern void ddsrt_time_fini(void);
#define INIT_STATUS_OK 0x80000000u
static ddsrt_atomic_uint32_t init_status = DDSRT_ATOMIC_UINT32_INIT(0);
static ddsrt_mutex_t init_mutex;
static ddsrt_cond_t init_cond;
void ddsrt_init (void)
{
@ -38,6 +39,7 @@ retry:
return;
else if (v == 1) {
ddsrt_mutex_init(&init_mutex);
ddsrt_cond_init(&init_cond);
#if _WIN32
ddsrt_winsock_init();
ddsrt_time_init();
@ -67,6 +69,7 @@ void ddsrt_fini (void)
} while (!ddsrt_atomic_cas32(&init_status, v, nv));
if (nv == 1)
{
ddsrt_cond_destroy(&init_cond);
ddsrt_mutex_destroy(&init_mutex);
ddsrt_random_fini();
ddsrt_atomics_fini();
@ -83,6 +86,11 @@ ddsrt_mutex_t *ddsrt_get_singleton_mutex(void)
return &init_mutex;
}
ddsrt_cond_t *ddsrt_get_singleton_cond(void)
{
return &init_cond;
}
#ifdef _WIN32
#include "dds/ddsrt/threads.h"