Fix dds_create_domain return and error handling

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-10-30 14:57:55 +01:00 committed by eboasson
parent bd858ea97f
commit 33ba911192
23 changed files with 562 additions and 302 deletions

View file

@ -755,11 +755,8 @@ dds_create_participant(
* @param[in] domain The domain to be created. DEFAULT_DOMAIN is not allowed.
* @param[in] config A configuration string containing file names and/or XML fragments representing the configuration.
*
* @returns A return code
* @returns A valid entity handle or an error code.
*
* @retval DDS_RETCODE_OK
* The domain with the domain identifier has been created from
* given configuration string.
* @retval DDS_RETCODE_BAD_PARAMETER
* Illegal value for domain id or the configfile parameter is NULL.
* @retval DDS_PRECONDITION_NOT_MET
@ -767,7 +764,7 @@ dds_create_participant(
* @retval DDS_RETCODE_ERROR
* An internal error has occurred.
*/
DDS_EXPORT dds_return_t
DDS_EXPORT dds_entity_t
dds_create_domain(const dds_domainid_t domain, const char *config);
/**

View file

@ -18,7 +18,7 @@
extern "C" {
#endif
DDS_EXPORT dds_return_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t id, bool use_existing, const char *config) ddsrt_nonnull((1,4));
DDS_EXPORT dds_entity_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t id, bool implicit, const char *config) ddsrt_nonnull((1,4));
DDS_EXPORT dds_domain *dds_domain_find_locked (dds_domainid_t id);
#if defined (__cplusplus)

View file

@ -24,6 +24,7 @@ dds_entity_init(
dds_entity * e,
dds_entity * parent,
dds_entity_kind_t kind,
bool implicit,
dds_qos_t * qos,
const dds_listener_t *listener,
status_mask_t mask);
@ -38,6 +39,12 @@ dds_entity_register_child (
DDS_EXPORT void
dds_entity_add_ref_locked(dds_entity *e);
DDS_EXPORT void
dds_entity_drop_ref(dds_entity *e);
DDS_EXPORT void
dds_entity_unpin_and_drop_ref (dds_entity *e);
#define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \
qualifier_ dds_return_t type_##_lock (dds_entity_t hdl, type_ **x) \
{ \
@ -96,6 +103,8 @@ dds_entity_pin (
dds_entity_t hdl,
dds_entity **eptr);
DDS_EXPORT dds_return_t dds_entity_pin_for_delete (dds_entity_t hdl, bool explicit, dds_entity **eptr);
DDS_EXPORT void dds_entity_unpin (
dds_entity *e);

View file

@ -73,8 +73,10 @@ typedef int32_t dds_handle_t;
/* Closing & closed can be combined, but having two gives a means for enforcing
that close() be called first, then close_wait(), and then delete(). */
#define HDL_FLAG_CLOSING (0x80000000u)
#define HDL_FLAG_CLOSED (0x40000000u)
#define HDL_FLAG_DELETE_DEFERRED (0x40000000u)
#define HDL_FLAG_PENDING (0x20000000u)
#define HDL_FLAG_IMPLICIT (0x10000000u)
#define HDL_FLAG_ALLOW_CHILDREN (0x08000000u) /* refc counts children */
struct dds_handle_link {
dds_handle_t hdl;
@ -116,7 +118,9 @@ dds_handle_server_fini(void);
*/
DDS_EXPORT dds_handle_t
dds_handle_create(
struct dds_handle_link *link);
struct dds_handle_link *link,
bool implicit,
bool allow_children);
/*
@ -124,7 +128,7 @@ dds_handle_create(
*/
DDS_EXPORT dds_return_t
dds_handle_register_special (
struct dds_handle_link *link, dds_handle_t handle);
struct dds_handle_link *link, bool implicit, bool allow_children, dds_handle_t handle);
DDS_EXPORT void dds_handle_unpend (struct dds_handle_link *link);
@ -181,6 +185,8 @@ DDS_EXPORT void
dds_handle_unpin(
struct dds_handle_link *link);
int32_t dds_handle_pin_for_delete (dds_handle_t hdl, bool explicit, struct dds_handle_link **link);
bool dds_handle_drop_childref_and_pin (struct dds_handle_link *link, bool may_delete_parent);
/*
* Check if the handle is closed.
@ -195,11 +201,15 @@ dds_handle_unpin(
DDS_EXPORT void dds_handle_add_ref (struct dds_handle_link *link);
DDS_EXPORT bool dds_handle_drop_ref (struct dds_handle_link *link);
DDS_EXPORT bool dds_handle_close (struct dds_handle_link *link);
DDS_EXPORT bool dds_handle_unpin_and_drop_ref (struct dds_handle_link *link);
DDS_EXPORT inline bool dds_handle_is_closed (struct dds_handle_link *link) {
return (ddsrt_atomic_ld32 (&link->cnt_flags) & (HDL_FLAG_CLOSED | HDL_FLAG_CLOSING)) != 0;
return (ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSING) != 0;
}
DDS_EXPORT bool dds_handle_is_not_refd (struct dds_handle_link *link);
#if defined (__cplusplus)
}
#endif

View file

@ -21,6 +21,13 @@ extern "C" {
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_publisher, DDS_KIND_PUBLISHER)
dds_entity_t
dds__create_publisher_l(
struct dds_participant *participant, /* entity-lock must be held */
bool implicit,
const dds_qos_t *qos,
const dds_listener_t *listener);
dds_return_t dds_publisher_begin_coherent (dds_entity_t e);
dds_return_t dds_publisher_end_coherent (dds_entity_t e);

View file

@ -24,6 +24,7 @@ DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_subscriber, DDS_KIND_SUBSCRIBER)
dds_entity_t
dds__create_subscriber_l(
struct dds_participant *participant, /* entity-lock must be held */
bool implicit,
const dds_qos_t *qos,
const dds_listener_t *listener);

View file

@ -245,7 +245,7 @@ typedef struct dds_participant {
typedef struct dds_reader {
struct dds_entity m_entity;
const struct dds_topic *m_topic;
struct dds_topic *m_topic;
struct dds_rhc *m_rhc; /* aliases m_rd->rhc with a wider interface, FIXME: but m_rd owns it for resource management */
struct reader *m_rd;
bool m_data_on_readers;
@ -265,7 +265,7 @@ typedef struct dds_reader {
typedef struct dds_writer {
struct dds_entity m_entity;
const struct dds_topic *m_topic;
struct dds_topic *m_topic;
struct nn_xpack *m_xp;
struct writer *m_wr;
struct whc *m_whc; /* FIXME: ownership still with underlying DDSI writer (cos of DDSI built-in writers )*/

View file

@ -131,7 +131,7 @@ bool dds__validate_builtin_reader_qos (const dds_domain *dom, dds_entity_t topic
static dds_entity_t dds__create_builtin_subscriber (dds_participant *participant)
{
dds_qos_t *qos = dds__create_builtin_qos ();
dds_entity_t sub = dds__create_subscriber_l (participant, qos, NULL);
dds_entity_t sub = dds__create_subscriber_l (participant, false, qos, NULL);
dds_delete_qos (qos);
return sub;
}

View file

@ -48,16 +48,14 @@ static int dds_domain_compare (const void *va, const void *vb)
static const ddsrt_avl_treedef_t dds_domaintree_def = DDSRT_AVL_TREEDEF_INITIALIZER (
offsetof (dds_domain, m_node), offsetof (dds_domain, m_id), dds_domain_compare, 0);
static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_id, const char *config)
static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_id, const char *config, bool implicit)
{
dds_return_t ret = DDS_RETCODE_OK;
dds_entity_t domh;
uint32_t len;
dds_entity_t domain_handle;
if ((domain_handle = dds_entity_init (&domain->m_entity, &dds_global.m_entity, DDS_KIND_DOMAIN, NULL, NULL, 0)) < 0)
return domain_handle;
if ((domh = dds_entity_init (&domain->m_entity, &dds_global.m_entity, DDS_KIND_DOMAIN, implicit, NULL, NULL, 0)) < 0)
return domh;
domain->m_entity.m_domain = domain;
domain->m_entity.m_flags |= DDS_ENTITY_IMPLICIT;
domain->m_entity.m_iid = ddsi_iid_gen ();
domain->gv.tstart = now ();
@ -90,7 +88,7 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
if (domain->cfgst == NULL)
{
DDS_ILOG (DDS_LC_CONFIG, domain_id, "Failed to parse configuration\n");
ret = DDS_RETCODE_ERROR;
domh = DDS_RETCODE_ERROR;
goto fail_config;
}
@ -100,14 +98,14 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
if (rtps_config_prep (&domain->gv, domain->cfgst) != 0)
{
DDS_ILOG (DDS_LC_CONFIG, domain->m_id, "Failed to configure RTPS\n");
ret = DDS_RETCODE_ERROR;
domh = DDS_RETCODE_ERROR;
goto fail_rtps_config;
}
if (rtps_init (&domain->gv) < 0)
{
DDS_ILOG (DDS_LC_CONFIG, domain->m_id, "Failed to initialize RTPS\n");
ret = DDS_RETCODE_ERROR;
domh = DDS_RETCODE_ERROR;
goto fail_rtps_init;
}
@ -122,14 +120,14 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
if (dds_global.threadmon == NULL)
{
DDS_ILOG (DDS_LC_CONFIG, domain->m_id, "Failed to create a thread liveliness monitor\n");
ret = DDS_RETCODE_OUT_OF_RESOURCES;
domh = DDS_RETCODE_OUT_OF_RESOURCES;
goto fail_threadmon_new;
}
/* FIXME: thread properties */
if (ddsi_threadmon_start (dds_global.threadmon, "threadmon") < 0)
{
DDS_ILOG (DDS_LC_ERROR, domain->m_id, "Failed to start the thread liveliness monitor\n");
ret = DDS_RETCODE_ERROR;
domh = DDS_RETCODE_ERROR;
goto fail_threadmon_start;
}
}
@ -159,14 +157,14 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
if (rtps_start (&domain->gv) < 0)
{
DDS_ILOG (DDS_LC_CONFIG, domain->m_id, "Failed to start RTPS\n");
ret = DDS_RETCODE_ERROR;
domh = DDS_RETCODE_ERROR;
goto fail_rtps_start;
}
if (domain->gv.config.liveliness_monitoring)
ddsi_threadmon_register_domain (dds_global.threadmon, &domain->gv);
dds_entity_init_complete (&domain->m_entity);
return DDS_RETCODE_OK;
return domh;
fail_rtps_start:
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
@ -184,7 +182,7 @@ fail_rtps_config:
config_fini (domain->cfgst);
fail_config:
dds_handle_delete (&domain->m_entity.m_hdllink);
return ret;
return domh;
}
dds_domain *dds_domain_find_locked (dds_domainid_t id)
@ -192,10 +190,10 @@ dds_domain *dds_domain_find_locked (dds_domainid_t id)
return ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &id);
}
dds_return_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t id, bool use_existing, const char *config)
dds_entity_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t id, bool implicit, const char *config)
{
struct dds_domain *dom;
dds_return_t ret;
dds_entity_t domh;
/* FIXME: should perhaps lock parent object just like everywhere */
ddsrt_mutex_lock (&dds_global.m_mutex);
@ -203,26 +201,48 @@ dds_return_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t
if (id != DDS_DOMAIN_DEFAULT)
{
if ((dom = dds_domain_find_locked (id)) == NULL)
ret = DDS_RETCODE_NOT_FOUND;
domh = DDS_RETCODE_NOT_FOUND;
else
ret = DDS_RETCODE_OK;
domh = dom->m_entity.m_hdllink.hdl;
}
else
{
if ((dom = ddsrt_avl_find_min (&dds_domaintree_def, &dds_global.m_domains)) != NULL)
ret = DDS_RETCODE_OK;
domh = dom->m_entity.m_hdllink.hdl;
else
ret = DDS_RETCODE_NOT_FOUND;
domh = DDS_RETCODE_NOT_FOUND;
}
switch (ret)
if (domh == DDS_RETCODE_NOT_FOUND)
{
case DDS_RETCODE_OK:
if (!use_existing)
dom = dds_alloc (sizeof (*dom));
if ((domh = dds_domain_init (dom, id, config, implicit)) < 0)
dds_free (dom);
else
{
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
break;
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
ddsrt_avl_insert (&dds_domaintree_def, &dds_global.m_domains, dom);
dds_entity_register_child (&dds_global.m_entity, &dom->m_entity);
if (implicit)
{
dds_entity_add_ref_locked (&dom->m_entity);
dds_handle_repin (&dom->m_entity.m_hdllink);
}
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}
}
else if (domh <= DDS_RETCODE_OK)
{
assert (0);
domh = DDS_RETCODE_ERROR;
}
else if (!implicit)
{
domh = DDS_RETCODE_PRECONDITION_NOT_MET;
}
else
{
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
if (dds_handle_is_closed (&dom->m_entity.m_hdllink))
{
@ -232,31 +252,20 @@ dds_return_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t
}
else
{
dds_entity_add_ref_locked (&dom->m_entity);
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}
break;
case DDS_RETCODE_NOT_FOUND:
dom = dds_alloc (sizeof (*dom));
if ((ret = dds_domain_init (dom, id, config)) < 0)
dds_free (dom);
else
if (implicit)
{
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
ddsrt_avl_insert (&dds_domaintree_def, &dds_global.m_domains, dom);
dds_entity_register_child (&dds_global.m_entity, &dom->m_entity);
dds_entity_add_ref_locked (&dom->m_entity);
dds_handle_repin (&dom->m_entity.m_hdllink);
}
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}
break;
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
return ret;
return domh;
}
dds_return_t dds_create_domain(const dds_domainid_t domain, const char *config)
dds_entity_t dds_create_domain (const dds_domainid_t domain, const char *config)
{
dds_domain *dom;
dds_entity_t ret;
@ -266,16 +275,10 @@ dds_return_t dds_create_domain(const dds_domainid_t domain, const char *config)
/* Make sure DDS instance is initialized. */
if ((ret = dds_init ()) < 0)
goto err_dds_init;
return ret;
if ((ret = dds_domain_create_internal (&dom, domain, false, config)) < 0)
goto err_domain_create;
return DDS_RETCODE_OK;
err_domain_create:
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
err_dds_init:
ret = dds_domain_create_internal (&dom, domain, false, config);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
return ret;
}
@ -368,5 +371,5 @@ void dds_write_set_batch (bool enable)
}
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
}

View file

@ -80,11 +80,40 @@ const ddsrt_avl_treedef_t dds_entity_children_td = DDSRT_AVL_TREEDEF_INITIALIZER
static void dds_entity_observers_signal (dds_entity *observed, uint32_t status);
static void dds_entity_observers_signal_delete (dds_entity *observed);
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate);
static dds_return_t really_delete_pinned_closed_locked (struct dds_entity *e, enum delete_impl_state delstate);
void dds_entity_add_ref_locked (dds_entity *e)
{
dds_handle_add_ref (&e->m_hdllink);
}
void dds_entity_drop_ref (dds_entity *e)
{
if (dds_handle_drop_ref (&e->m_hdllink))
{
/* increment pin count unconditionally to satisfy the "pinned" requirement */
dds_handle_repin (&e->m_hdllink);
ddsrt_mutex_lock (&e->m_mutex);
dds_return_t ret = really_delete_pinned_closed_locked (e, DIS_EXPLICIT);
assert (ret == DDS_RETCODE_OK);
(void) ret;
}
}
void dds_entity_unpin_and_drop_ref (dds_entity *e)
{
if (dds_handle_unpin_and_drop_ref (&e->m_hdllink))
{
/* increment pin count unconditionally to satisfy the "pinned" requirement */
dds_handle_repin (&e->m_hdllink);
ddsrt_mutex_lock (&e->m_mutex);
dds_return_t ret = really_delete_pinned_closed_locked (e, DIS_EXPLICIT);
assert (ret == DDS_RETCODE_OK);
(void) ret;
}
}
static bool entity_has_status (const dds_entity *e)
{
switch (e->m_kind)
@ -110,7 +139,7 @@ static bool entity_has_status (const dds_entity *e)
return false;
}
dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind_t kind, dds_qos_t *qos, const dds_listener_t *listener, status_mask_t mask)
dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind_t kind, bool implicit, dds_qos_t *qos, const dds_listener_t *listener, status_mask_t mask)
{
dds_handle_t handle;
@ -125,6 +154,8 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
/* TODO: CHAM-96: Implement dynamic enabling of entity. */
e->m_flags |= DDS_ENTITY_ENABLED;
if (implicit)
e->m_flags |= DDS_ENTITY_IMPLICIT;
/* set the status enable based on kind */
if (entity_has_status (e))
@ -162,12 +193,14 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
if (kind == DDS_KIND_CYCLONEDDS)
{
if ((handle = dds_handle_register_special (&e->m_hdllink, DDS_CYCLONEDDS_HANDLE)) <= 0)
if ((handle = dds_handle_register_special (&e->m_hdllink, implicit, true, DDS_CYCLONEDDS_HANDLE)) <= 0)
return (dds_entity_t) handle;
}
else
{
if ((handle = dds_handle_create (&e->m_hdllink)) <= 0)
/* for topics, refc counts readers/writers, for all others, it counts children (this we can get away with
as long as topics can't have children) */
if ((handle = dds_handle_create (&e->m_hdllink, implicit, (kind != DDS_KIND_TOPIC))) <= 0)
return (dds_entity_t) handle;
}
@ -182,9 +215,12 @@ void dds_entity_init_complete (dds_entity *entity)
void dds_entity_register_child (dds_entity *parent, dds_entity *child)
{
/* parent must be tracking children in its refc, or children can't be added */
assert (ddsrt_atomic_ld32 (&parent->m_hdllink.cnt_flags) & HDL_FLAG_ALLOW_CHILDREN);
assert (child->m_iid != 0);
assert (ddsrt_avl_lookup (&dds_entity_children_td, &parent->m_children, &child->m_iid) == NULL);
ddsrt_avl_insert (&dds_entity_children_td, &parent->m_children, child);
dds_entity_add_ref_locked (parent);
}
static dds_entity *next_non_topic_child (ddsrt_avl_tree_t *remaining_children)
@ -227,13 +263,11 @@ static void print_delete (const dds_entity *e, enum delete_impl_state delstate ,
printf ("delete(%p, delstate %s, iid %"PRIx64"): %s%s %d pin %u refc %u %s %s\n",
(void *) e, (delstate == DIS_IMPLICIT) ? "implicit" : (delstate == DIS_EXPLICIT) ? "explicit" : "from_parent", iid,
entity_kindstr (e->m_kind), (e->m_flags & DDS_ENTITY_IMPLICIT) ? " [implicit]" : "",
e->m_hdllink.hdl, cm & 0xfff, (cm >> 12) & 0xffff, (cm & 0x80000000) ? "closed" : "open",
e->m_hdllink.hdl, cm & 0xfff, (cm >> 12) & 0x7fff, (cm & 0x80000000) ? "closed" : "open",
ddsrt_avl_is_empty (&e->m_children) ? "childless" : "has-children");
}
#endif
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate);
dds_return_t dds_delete (dds_entity_t entity)
{
return dds_delete_impl (entity, DIS_EXPLICIT);
@ -252,53 +286,34 @@ static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state
{
dds_entity *e;
dds_return_t ret;
if ((ret = dds_entity_pin (entity, &e)) < 0)
return ret;
else
if ((ret = dds_entity_pin_for_delete (entity, (delstate != DIS_IMPLICIT), &e)) == DDS_RETCODE_OK)
return dds_delete_impl_pinned (e, delstate);
else if (ret == DDS_RETCODE_TRY_AGAIN) /* non-child refs exist */
return DDS_RETCODE_OK;
else
return ret;
}
dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delstate)
{
dds_entity *child;
dds_return_t ret;
/* Any number of threads pinning it, possibly in delete, or having pinned it and
trying to acquire m_mutex */
ddsrt_mutex_lock (&e->m_mutex);
#if TRACE_DELETE
print_delete (e, delstate, iid);
print_delete (e, delstate, e->m_iid);
#endif
/* If another thread was racing us in delete, it will have set the CLOSING flag
while holding m_mutex and we had better bail out. */
if (dds_handle_is_closed (&e->m_hdllink))
{
dds_entity_unlock (e);
return DDS_RETCODE_OK;
}
assert (dds_handle_is_closed (&e->m_hdllink));
return really_delete_pinned_closed_locked (e, delstate);
}
/* Ignore children calling up to delete an implicit parent if there are still
(or again) children */
if (delstate == DIS_IMPLICIT)
{
if (!((e->m_flags & DDS_ENTITY_IMPLICIT) && ddsrt_avl_is_empty (&e->m_children)))
{
dds_entity_unlock (e);
return DDS_RETCODE_OK;
}
}
/* Drop reference, atomically setting CLOSING if no other references remain.
FIXME: that's not quite right: this is really only for topics. After a call
to delete, the handle ought to become invalid even if the topic stays (and
should perhaps even be revivable via find_topic). */
if (! dds_handle_drop_ref (&e->m_hdllink))
{
dds_entity_unlock (e);
return DDS_RETCODE_OK;
}
static dds_return_t really_delete_pinned_closed_locked (struct dds_entity *e, enum delete_impl_state delstate)
{
dds_entity *child;
dds_return_t ret;
/* No threads pinning it anymore, no need to worry about other threads deleting
it, but there can still be plenty of threads that have it pinned and are
@ -360,15 +375,23 @@ dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delst
/* FIXME: dds_delete can fail if the child is being deleted in parallel, in which case: wait */
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
(void) dds_delete_impl (child_handle, DIS_FROM_PARENT);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT);
assert (ret == DDS_RETCODE_OK || ret == DDS_RETCODE_BAD_PARAMETER);
(void) ret;
ddsrt_mutex_lock (&e->m_mutex);
if (ret == DDS_RETCODE_BAD_PARAMETER && child == next_non_topic_child (&e->m_children))
{
ddsrt_cond_wait (&e->m_cond, &e->m_mutex);
}
}
while ((child = ddsrt_avl_find_min (&dds_entity_children_td, &e->m_children)) != NULL)
{
assert (dds_entity_kind (child) == DDS_KIND_TOPIC);
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
(void) dds_delete_impl (child_handle, DIS_FROM_PARENT);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT);
assert (ret == DDS_RETCODE_OK);
(void) ret;
ddsrt_mutex_lock (&e->m_mutex);
}
ddsrt_mutex_unlock (&e->m_mutex);
@ -391,15 +414,15 @@ dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delst
ddsrt_mutex_lock (&p->m_mutex);
assert (ddsrt_avl_lookup (&dds_entity_children_td, &p->m_children, &e->m_iid) != NULL);
ddsrt_avl_delete (&dds_entity_children_td, &p->m_children, e);
if (dds_handle_drop_childref_and_pin (&p->m_hdllink, delstate != DIS_FROM_PARENT))
{
assert (dds_handle_is_closed (&p->m_hdllink));
assert (dds_handle_is_not_refd (&p->m_hdllink));
assert (ddsrt_avl_is_empty (&p->m_children));
parent_to_delete = p;
}
/* trigger parent in case it is waiting in delete */
ddsrt_cond_broadcast (&p->m_cond);
if (delstate != DIS_FROM_PARENT && (p->m_flags & DDS_ENTITY_IMPLICIT) && ddsrt_avl_is_empty (&p->m_children))
{
if ((ret = dds_entity_pin (p->m_hdllink.hdl, &parent_to_delete)) < 0)
parent_to_delete = NULL;
}
ddsrt_mutex_unlock (&p->m_mutex);
}
@ -1121,6 +1144,19 @@ dds_return_t dds_entity_pin (dds_entity_t hdl, dds_entity **eptr)
}
}
dds_return_t dds_entity_pin_for_delete (dds_entity_t hdl, bool explicit, dds_entity **eptr)
{
dds_return_t hres;
struct dds_handle_link *hdllink;
if ((hres = dds_handle_pin_for_delete (hdl, explicit, &hdllink)) < 0)
return hres;
else
{
*eptr = dds_entity_from_handle_link (hdllink);
return DDS_RETCODE_OK;
}
}
void dds_entity_unpin (dds_entity *e)
{
dds_handle_unpin (&e->m_hdllink);

View file

@ -57,18 +57,18 @@ dds_entity_t dds_create_guardcondition (dds_entity_t owner)
}
dds_guardcond *gcond = dds_alloc (sizeof (*gcond));
dds_entity_t hdl = dds_entity_init (&gcond->m_entity, e, DDS_KIND_COND_GUARD, NULL, NULL, 0);
dds_entity_t hdl = dds_entity_init (&gcond->m_entity, e, DDS_KIND_COND_GUARD, false, NULL, NULL, 0);
gcond->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (e, &gcond->m_entity);
dds_entity_init_complete (&gcond->m_entity);
dds_entity_unlock (e);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
return hdl;
err_entity_kind:
dds_entity_unlock (e);
err_entity_lock:
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
return rc;
}

View file

@ -20,11 +20,36 @@
#include "dds__handles.h"
#include "dds__types.h"
#define HDL_REFCOUNT_MASK (0x0ffff000u)
#define HDL_REFCOUNT_MASK (0x07fff000u)
#define HDL_REFCOUNT_UNIT (0x00001000u)
#define HDL_REFCOUNT_SHIFT 12
#define HDL_PINCOUNT_MASK (0x00000fffu)
/*
"regular" entities other than topics:
- create makes it
- delete deletes it and its children immediately
- explicit domain: additional protection for bootstrapping complications need extra care
implicit entities other than topics (pub, sub, domain, cyclonedds):
- created "spontaneously" as a consequence of creating the writer/reader/participant
- delete of last child causes it to disappear
- explicit delete treated like a delete of a "regular" entity
- domain, cyclonedds: bootstrapping complications require additional protection
topics:
- create makes it
- never has children (so the handle's cnt_flags can have a different meaning)
- readers, writers keep it in existence
- delete deferred until no readers/writers exist
- an attempt at deleting it fails if in "deferred delete" state (or should it simply
return ok while doing nothing?), other operations keep going so, e.g., listeners
remain useful
built-in topics:
- implicit variant of a topic
*/
/* Maximum number of handles is INT32_MAX - 1, but as the allocator relies on a
random generator for finding a free one, the time spent in the dds_handle_create
increases with an increasing number of handles. 16M handles seems likely to be
@ -82,7 +107,7 @@ void dds_handle_server_fini (void)
cf & HDL_PINCOUNT_MASK, (cf & HDL_REFCOUNT_MASK) >> HDL_REFCOUNT_SHIFT,
cf & HDL_FLAG_PENDING ? " pending" : "",
cf & HDL_FLAG_CLOSING ? " closing" : "",
cf & HDL_FLAG_CLOSED ? " closed" : "");
cf & HDL_FLAG_DELETE_DEFERRED ? " delete-deferred" : "");
}
assert (ddsrt_hh_iter_first (handles.ht, &it) == NULL);
#endif
@ -94,9 +119,9 @@ void dds_handle_server_fini (void)
}
static bool hhadd (struct ddsrt_hh *ht, void *elem) { return ddsrt_hh_add (ht, elem); }
static dds_handle_t dds_handle_create_int (struct dds_handle_link *link)
static dds_handle_t dds_handle_create_int (struct dds_handle_link *link, bool implicit, bool refc_counts_children)
{
ddsrt_atomic_st32 (&link->cnt_flags, HDL_FLAG_PENDING | HDL_REFCOUNT_UNIT | 1u);
ddsrt_atomic_st32 (&link->cnt_flags, HDL_FLAG_PENDING | (implicit ? HDL_FLAG_IMPLICIT : HDL_REFCOUNT_UNIT) | (refc_counts_children ? HDL_FLAG_ALLOW_CHILDREN : 0) | 1u);
do {
do {
link->hdl = (int32_t) (ddsrt_random () & INT32_MAX);
@ -105,7 +130,7 @@ static dds_handle_t dds_handle_create_int (struct dds_handle_link *link)
return link->hdl;
}
dds_handle_t dds_handle_create (struct dds_handle_link *link)
dds_handle_t dds_handle_create (struct dds_handle_link *link, bool implicit, bool allow_children)
{
dds_handle_t ret;
ddsrt_mutex_lock (&handles.lock);
@ -117,14 +142,14 @@ dds_handle_t dds_handle_create (struct dds_handle_link *link)
else
{
handles.count++;
ret = dds_handle_create_int (link);
ret = dds_handle_create_int (link, implicit, allow_children);
ddsrt_mutex_unlock (&handles.lock);
assert (ret > 0);
}
return ret;
}
dds_return_t dds_handle_register_special (struct dds_handle_link *link, dds_handle_t handle)
dds_return_t dds_handle_register_special (struct dds_handle_link *link, bool implicit, bool allow_children, dds_handle_t handle)
{
dds_return_t ret;
if (handle <= 0)
@ -138,7 +163,7 @@ dds_return_t dds_handle_register_special (struct dds_handle_link *link, dds_hand
else
{
handles.count++;
ddsrt_atomic_st32 (&link->cnt_flags, HDL_FLAG_PENDING | HDL_REFCOUNT_UNIT | 1u);
ddsrt_atomic_st32 (&link->cnt_flags, HDL_FLAG_PENDING | (implicit ? HDL_FLAG_IMPLICIT : HDL_REFCOUNT_UNIT) | (allow_children ? HDL_FLAG_ALLOW_CHILDREN : 0) | 1u);
link->hdl = handle;
if (hhadd (handles.ht, link))
ret = handle;
@ -155,9 +180,9 @@ void dds_handle_unpend (struct dds_handle_link *link)
#ifndef NDEBUG
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
assert ((cf & HDL_FLAG_PENDING));
assert (!(cf & HDL_FLAG_CLOSED));
assert (!(cf & HDL_FLAG_DELETE_DEFERRED));
assert (!(cf & HDL_FLAG_CLOSING));
assert ((cf & HDL_REFCOUNT_MASK) >= HDL_REFCOUNT_UNIT);
assert ((cf & HDL_REFCOUNT_MASK) >= HDL_REFCOUNT_UNIT || (cf & HDL_FLAG_IMPLICIT));
assert ((cf & HDL_PINCOUNT_MASK) >= 1u);
#endif
ddsrt_atomic_and32 (&link->cnt_flags, ~HDL_FLAG_PENDING);
@ -171,7 +196,6 @@ int32_t dds_handle_delete (struct dds_handle_link *link)
if (!(cf & HDL_FLAG_PENDING))
{
assert (cf & HDL_FLAG_CLOSING);
assert (cf & HDL_FLAG_CLOSED);
assert ((cf & HDL_REFCOUNT_MASK) == 0u);
}
assert ((cf & HDL_PINCOUNT_MASK) == 1u);
@ -213,7 +237,7 @@ static int32_t dds_handle_pin_int (dds_handle_t hdl, uint32_t delta, struct dds_
rc = DDS_RETCODE_OK;
do {
cf = ddsrt_atomic_ld32 (&(*link)->cnt_flags);
if (cf & (HDL_FLAG_CLOSED | HDL_FLAG_CLOSING | HDL_FLAG_PENDING))
if (cf & (HDL_FLAG_CLOSING | HDL_FLAG_PENDING))
{
rc = DDS_RETCODE_BAD_PARAMETER;
break;
@ -229,6 +253,149 @@ int32_t dds_handle_pin (dds_handle_t hdl, struct dds_handle_link **link)
return dds_handle_pin_int (hdl, 1u, link);
}
int32_t dds_handle_pin_for_delete (dds_handle_t hdl, bool explicit, struct dds_handle_link **link)
{
struct dds_handle_link dummy = { .hdl = hdl };
int32_t rc;
/* it makes sense to check here for initialization: the first thing any operation
(other than create_participant) does is to call dds_handle_pin on the supplied
entity, so checking here whether the library has been initialised helps avoid
crashes if someone forgets to create a participant (or allows a program to
continue after failing to create one).
One could check that the handle is > 0, but that would catch fewer errors
without any advantages. */
if (handles.ht == NULL)
return DDS_RETCODE_PRECONDITION_NOT_MET;
ddsrt_mutex_lock (&handles.lock);
*link = ddsrt_hh_lookup (handles.ht, &dummy);
if (*link == NULL)
rc = DDS_RETCODE_BAD_PARAMETER;
else
{
uint32_t cf, cf1;
/* Assume success; bail out if the object turns out to be in the process of
being deleted */
rc = DDS_RETCODE_OK;
do {
cf = ddsrt_atomic_ld32 (&(*link)->cnt_flags);
if (cf & (HDL_FLAG_CLOSING | HDL_FLAG_PENDING))
{
/* Only one can succeed (and if closing is already set, the handle's reference has
already been dropped) */
rc = DDS_RETCODE_BAD_PARAMETER;
break;
}
else if (cf & HDL_FLAG_DELETE_DEFERRED)
{
/* Someone already called delete, but the operation was deferred becauses there are still
outstanding references. This implies that there are no children, because then the
entire hierarchy would simply have been deleted. */
assert (!(cf & HDL_FLAG_ALLOW_CHILDREN));
rc = DDS_RETCODE_ALREADY_DELETED;
break;
}
else if (explicit)
{
/* Explicit call to dds_delete (either by application or by parent deleting its children) */
if (cf & HDL_FLAG_IMPLICIT)
{
/* Entity is implicit, so handle doesn't hold a reference */
cf1 = (cf + 1u) | HDL_FLAG_CLOSING;
}
else if (cf & HDL_FLAG_ALLOW_CHILDREN)
{
/* Entity is explicit, so handle held a reference, refc only counts children as so is not our concern */
assert ((cf & HDL_REFCOUNT_MASK) > 0);
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
}
else
{
/* Entity is explicit, so handle held a reference, refc counts non-children, refc > 1 means drop ref and error (so don't pin) */
assert ((cf & HDL_REFCOUNT_MASK) > 0);
if ((cf & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT)
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
else
{
cf1 = (cf - HDL_REFCOUNT_UNIT) | HDL_FLAG_DELETE_DEFERRED;
}
}
}
else
{
/* Implicit call to dds_delete (child invoking delete on its parent) */
if (cf & HDL_FLAG_IMPLICIT)
{
if ((cf & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT)
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
else
{
assert ((cf & HDL_REFCOUNT_MASK) > 0);
cf1 = (cf - HDL_REFCOUNT_UNIT);
}
}
else
{
/* Child can't delete an explicit parent */
rc = DDS_RETCODE_ILLEGAL_OPERATION;
break;
}
}
rc = ((cf1 & HDL_REFCOUNT_MASK) == 0 || (cf1 & HDL_FLAG_ALLOW_CHILDREN)) ? DDS_RETCODE_OK : DDS_RETCODE_TRY_AGAIN;
} while (!ddsrt_atomic_cas32 (&(*link)->cnt_flags, cf, cf1));
}
ddsrt_mutex_unlock (&handles.lock);
return rc;
}
bool dds_handle_drop_childref_and_pin (struct dds_handle_link *link, bool may_delete_parent)
{
bool del_parent = false;
ddsrt_mutex_lock (&handles.lock);
uint32_t cf, cf1;
do {
cf = ddsrt_atomic_ld32 (&link->cnt_flags);
if (cf & (HDL_FLAG_CLOSING | HDL_FLAG_PENDING))
{
/* Only one can succeed; child ref still to be removed */
assert ((cf & HDL_REFCOUNT_MASK) > 0);
cf1 = (cf - HDL_REFCOUNT_UNIT);
del_parent = false;
}
else
{
if (cf & HDL_FLAG_IMPLICIT)
{
/* Implicit parent: delete if last ref */
if ((cf & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT && may_delete_parent)
{
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
del_parent = true;
}
else
{
assert ((cf & HDL_REFCOUNT_MASK) > 0);
cf1 = (cf - HDL_REFCOUNT_UNIT);
del_parent = false;
}
}
else
{
/* Child can't delete an explicit parent; child ref still to be removed */
assert ((cf & HDL_REFCOUNT_MASK) > 0);
cf1 = (cf - HDL_REFCOUNT_UNIT);
del_parent = false;
}
}
} while (!ddsrt_atomic_cas32 (&link->cnt_flags, cf, cf1));
ddsrt_mutex_unlock (&handles.lock);
return del_parent;
}
int32_t dds_handle_pin_and_ref (dds_handle_t hdl, struct dds_handle_link **link)
{
return dds_handle_pin_int (hdl, HDL_REFCOUNT_UNIT + 1u, link);
@ -237,7 +404,6 @@ int32_t dds_handle_pin_and_ref (dds_handle_t hdl, struct dds_handle_link **link)
void dds_handle_repin (struct dds_handle_link *link)
{
uint32_t x = ddsrt_atomic_inc32_nv (&link->cnt_flags);
assert (!(x & HDL_FLAG_CLOSED));
(void) x;
}
@ -245,7 +411,6 @@ void dds_handle_unpin (struct dds_handle_link *link)
{
#ifndef NDEBUG
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
assert (!(cf & HDL_FLAG_CLOSED));
if (cf & HDL_FLAG_CLOSING)
assert ((cf & HDL_PINCOUNT_MASK) > 1u);
else
@ -266,16 +431,47 @@ void dds_handle_add_ref (struct dds_handle_link *link)
bool dds_handle_drop_ref (struct dds_handle_link *link)
{
assert ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_REFCOUNT_MASK) != 0);
uint32_t old, new;
do {
old = ddsrt_atomic_ld32 (&link->cnt_flags);
if ((old & HDL_REFCOUNT_MASK) != HDL_REFCOUNT_UNIT)
assert ((old & HDL_REFCOUNT_MASK) > 0);
new = old - HDL_REFCOUNT_UNIT;
else
new = (old - HDL_REFCOUNT_UNIT) | HDL_FLAG_CLOSING;
if ((old & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT)
new |= HDL_FLAG_CLOSING;
} while (!ddsrt_atomic_cas32 (&link->cnt_flags, old, new));
return (new & HDL_REFCOUNT_MASK) == 0;
ddsrt_mutex_lock (&handles.lock);
if ((new & (HDL_FLAG_CLOSING | HDL_PINCOUNT_MASK)) == (HDL_FLAG_CLOSING | 1u))
{
ddsrt_cond_broadcast (&handles.cond);
}
ddsrt_mutex_unlock (&handles.lock);
return (new & (HDL_FLAG_CLOSING | HDL_REFCOUNT_MASK)) == (HDL_FLAG_CLOSING | 0);
}
bool dds_handle_unpin_and_drop_ref (struct dds_handle_link *link)
{
uint32_t old, new;
do {
old = ddsrt_atomic_ld32 (&link->cnt_flags);
assert ((old & HDL_REFCOUNT_MASK) > 0);
assert ((old & HDL_PINCOUNT_MASK) > 0);
new = old - HDL_REFCOUNT_UNIT - 1u;
if ((old & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT && (old & HDL_FLAG_IMPLICIT))
new |= HDL_FLAG_CLOSING;
} while (!ddsrt_atomic_cas32 (&link->cnt_flags, old, new));
ddsrt_mutex_lock (&handles.lock);
if ((new & (HDL_FLAG_CLOSING | HDL_PINCOUNT_MASK)) == (HDL_FLAG_CLOSING | 1u))
{
ddsrt_cond_broadcast (&handles.cond);
}
ddsrt_mutex_unlock (&handles.lock);
return (new & (HDL_FLAG_CLOSING | HDL_REFCOUNT_MASK)) == (HDL_FLAG_CLOSING | 0);
}
bool dds_handle_close (struct dds_handle_link *link)
{
uint32_t old = ddsrt_atomic_or32_ov (&link->cnt_flags, HDL_FLAG_CLOSING);
return (old & HDL_REFCOUNT_MASK) == 0;
}
void dds_handle_close_wait (struct dds_handle_link *link)
@ -283,17 +479,18 @@ void dds_handle_close_wait (struct dds_handle_link *link)
#ifndef NDEBUG
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
assert ((cf & HDL_FLAG_CLOSING));
assert (!(cf & HDL_FLAG_CLOSED));
assert ((cf & HDL_REFCOUNT_MASK) == 0u);
assert ((cf & HDL_PINCOUNT_MASK) >= 1u);
#endif
ddsrt_mutex_lock (&handles.lock);
while ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_PINCOUNT_MASK) != 1u)
ddsrt_cond_wait (&handles.cond, &handles.lock);
/* only one thread may call close_wait on a given handle */
assert (!(ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED));
ddsrt_atomic_or32 (&link->cnt_flags, HDL_FLAG_CLOSED);
ddsrt_mutex_unlock (&handles.lock);
}
bool dds_handle_is_not_refd (struct dds_handle_link *link)
{
return ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_REFCOUNT_MASK) == 0);
}
extern inline bool dds_handle_is_closed (struct dds_handle_link *link);

View file

@ -119,9 +119,8 @@ dds_return_t dds_init (void)
goto fail_handleserver;
}
dds_entity_init (&dds_global.m_entity, NULL, DDS_KIND_CYCLONEDDS, NULL, NULL, 0);
dds_entity_init (&dds_global.m_entity, NULL, DDS_KIND_CYCLONEDDS, true, NULL, NULL, 0);
dds_global.m_entity.m_iid = ddsi_iid_gen ();
dds_global.m_entity.m_flags = DDS_ENTITY_IMPLICIT;
dds_handle_repin (&dds_global.m_entity.m_hdllink);
dds_entity_add_ref_locked (&dds_global.m_entity);
dds_entity_init_complete (&dds_global.m_entity);

View file

@ -117,7 +117,7 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
}
pp = dds_alloc (sizeof (*pp));
if ((ret = dds_entity_init (&pp->m_entity, &dom->m_entity, DDS_KIND_PARTICIPANT, new_qos, listener, DDS_PARTICIPANT_STATUS_MASK)) < 0)
if ((ret = dds_entity_init (&pp->m_entity, &dom->m_entity, DDS_KIND_PARTICIPANT, false, new_qos, listener, DDS_PARTICIPANT_STATUS_MASK)) < 0)
goto err_entity_init;
pp->m_entity.m_guid = guid;
@ -132,8 +132,8 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
dds_entity_init_complete (&pp->m_entity);
/* drop temporary extra ref to domain, dds_init */
dds_delete (dom->m_entity.m_hdllink.hdl);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dom->m_entity);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
return ret;
err_entity_init:
@ -141,9 +141,9 @@ err_entity_init:
err_new_participant:
err_qos_validation:
dds_delete_qos (new_qos);
dds_delete (dom->m_entity.m_hdllink.hdl);
dds_entity_unpin_and_drop_ref (&dom->m_entity);
err_domain_create:
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
err_dds_init:
return ret;
}
@ -175,6 +175,6 @@ dds_return_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *par
}
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
return ret;
}

View file

@ -45,17 +45,13 @@ const struct dds_entity_deriver dds_entity_deriver_publisher = {
.validate_status = dds_publisher_status_validate
};
dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qos, const dds_listener_t *listener)
dds_entity_t dds__create_publisher_l (dds_participant *par, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)
{
dds_participant *par;
dds_publisher *pub;
dds_entity_t hdl;
dds_qos_t *new_qos;
dds_return_t ret;
if ((ret = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
return ret;
new_qos = dds_create_qos ();
if (qos)
nn_xqos_mergein_missing (new_qos, qos, DDS_PUBLISHER_QOS_MASK);
@ -67,10 +63,21 @@ dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qo
}
pub = dds_alloc (sizeof (*pub));
hdl = dds_entity_init (&pub->m_entity, &par->m_entity, DDS_KIND_PUBLISHER, new_qos, listener, DDS_PUBLISHER_STATUS_MASK);
hdl = dds_entity_init (&pub->m_entity, &par->m_entity, DDS_KIND_PUBLISHER, implicit, new_qos, listener, DDS_PUBLISHER_STATUS_MASK);
pub->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (&par->m_entity, &pub->m_entity);
dds_entity_init_complete (&pub->m_entity);
return hdl;
}
dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qos, const dds_listener_t *listener)
{
dds_participant *par;
dds_entity_t hdl;
dds_return_t ret;
if ((ret = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
return ret;
hdl = dds__create_publisher_l (par, false, qos, listener);
dds_participant_unlock (par);
return hdl;
}

View file

@ -41,7 +41,7 @@ dds_readcond *dds_create_readcond (dds_reader *rd, dds_entity_kind_t kind, uint3
{
dds_readcond *cond = dds_alloc (sizeof (*cond));
assert ((kind == DDS_KIND_COND_READ && filter == 0) || (kind == DDS_KIND_COND_QUERY && filter != 0));
(void) dds_entity_init (&cond->m_entity, &rd->m_entity, kind, NULL, NULL, 0);
(void) dds_entity_init (&cond->m_entity, &rd->m_entity, kind, false, NULL, NULL, 0);
cond->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (&rd->m_entity, &cond->m_entity);
cond->m_sample_states = mask & DDS_ANY_SAMPLE_STATE;

View file

@ -62,11 +62,11 @@ static dds_return_t dds_reader_delete (dds_entity *e) ddsrt_nonnull_all;
static dds_return_t dds_reader_delete (dds_entity *e)
{
dds_reader * const rd = (dds_reader *) e;
(void) dds_delete (rd->m_topic->m_entity.m_hdllink.hdl);
dds_free (rd->m_loan);
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
dds_rhc_free (rd->m_rhc);
thread_state_asleep (lookup_thread_state ());
dds_entity_drop_ref (&rd->m_topic->m_entity);
return DDS_RETCODE_OK;
}
@ -332,36 +332,37 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION:
internal_topic = true;
subscriber = dds__get_builtin_subscriber (participant_or_subscriber);
if ((ret = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK)
return ret;
t = dds__get_builtin_topic (subscriber, topic);
break;
default: {
dds_entity *p_or_s;
if ((ret = dds_entity_pin (participant_or_subscriber, &p_or_s)) != DDS_RETCODE_OK)
if ((ret = dds_entity_lock (participant_or_subscriber, DDS_KIND_DONTCARE, &p_or_s)) != DDS_RETCODE_OK)
return ret;
if (dds_entity_kind (p_or_s) == DDS_KIND_PARTICIPANT)
subscriber = dds_create_subscriber (participant_or_subscriber, qos, NULL);
else
switch (dds_entity_kind (p_or_s))
{
case DDS_KIND_SUBSCRIBER:
subscriber = participant_or_subscriber;
dds_entity_unpin (p_or_s);
sub = (dds_subscriber *) p_or_s;
break;
case DDS_KIND_PARTICIPANT:
subscriber = dds__create_subscriber_l ((dds_participant *) p_or_s, true, qos, NULL);
dds_entity_unlock (p_or_s);
if ((ret = dds_subscriber_lock (subscriber, &sub)) < 0)
return ret;
break;
default:
dds_entity_unlock (p_or_s);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
internal_topic = false;
t = topic;
break;
}
}
if ((ret = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK)
{
reader = ret;
goto err_sub_lock;
}
if (subscriber != participant_or_subscriber && !internal_topic)
{
/* Delete implicit subscriber if reader creation fails */
sub->m_entity.m_flags |= DDS_ENTITY_IMPLICIT;
}
if ((ret = dds_topic_lock (t, &tp)) != DDS_RETCODE_OK)
{
reader = ret;
@ -405,7 +406,7 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
/* Create reader and associated read cache (if not provided by caller) */
rd = dds_alloc (sizeof (*rd));
reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, rqos, listener, DDS_READER_STATUS_MASK);
reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, false, rqos, listener, DDS_READER_STATUS_MASK);
rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED;
rd->m_topic = tp;
rd->m_rhc = rhc ? rhc : dds_rhc_default_new (rd, tp->m_stopic);
@ -431,12 +432,6 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
dds_topic_unlock (tp);
dds_subscriber_unlock (sub);
if (internal_topic)
{
/* If topic is builtin, then the topic entity is local and should be deleted because the application won't. */
dds_delete (t);
}
return reader;
err_bad_qos:
@ -446,9 +441,6 @@ err_tp_lock:
dds_subscriber_unlock (sub);
if ((sub->m_entity.m_flags & DDS_ENTITY_IMPLICIT) != 0)
(void) dds_delete (subscriber);
err_sub_lock:
if (internal_topic)
dds_delete (t);
return reader;
}

View file

@ -45,7 +45,7 @@ const struct dds_entity_deriver dds_entity_deriver_subscriber = {
.validate_status = dds_subscriber_status_validate
};
dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_qos_t *qos, const dds_listener_t *listener)
dds_entity_t dds__create_subscriber_l (dds_participant *participant, bool implicit, const dds_qos_t *qos, const dds_listener_t *listener)
{
/* participant entity lock must be held */
dds_subscriber *sub;
@ -64,7 +64,7 @@ dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_q
}
sub = dds_alloc (sizeof (*sub));
subscriber = dds_entity_init (&sub->m_entity, &participant->m_entity, DDS_KIND_SUBSCRIBER, new_qos, listener, DDS_SUBSCRIBER_STATUS_MASK);
subscriber = dds_entity_init (&sub->m_entity, &participant->m_entity, DDS_KIND_SUBSCRIBER, implicit, new_qos, listener, DDS_SUBSCRIBER_STATUS_MASK);
sub->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (&participant->m_entity, &sub->m_entity);
dds_entity_init_complete (&sub->m_entity);
@ -78,7 +78,7 @@ dds_entity_t dds_create_subscriber (dds_entity_t participant, const dds_qos_t *q
dds_return_t ret;
if ((ret = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
return ret;
hdl = dds__create_subscriber_l (par, qos, listener);
hdl = dds__create_subscriber_l (par, false, qos, listener);
dds_participant_unlock (par);
return hdl;
}

View file

@ -31,6 +31,7 @@
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_plist.h"
#include "dds/ddsi/q_globals.h"
#include "dds__serdata_builtintopic.h"
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_topic)
@ -135,12 +136,16 @@ static bool dds_find_topic_check_and_add_ref (dds_entity_t participant, dds_enti
ret = false;
else
{
/* FIXME: calling addref is wrong because the Cyclone library has no
knowledge of the reference and hence simply deleting the participant
won't make the ref count drop to 0. On the other hand, the DDS spec
says find_topic (and a second call to create_topic) return a new
proxy that must separately be deleted. */
dds_entity_add_ref_locked (&tp->m_entity);
/* Simply return the same topic, though that is different to the spirit
of the DDS specification, which gives you a unique copy. Giving that
unique copy means there potentially many versions of exactly the same
topic around, and that two entities can be dealing with the same data
even though they have different topics objects (though with the same
name). That I find a confusing model.
As far as I can tell, the only benefit is the ability to set different
listeners on the various copies of the topic. And that seems to be a
really small benefit. */
ret = true;
}
dds_topic_unlock (tp);
@ -259,12 +264,7 @@ static dds_return_t create_topic_topic_arbitrary_check_sertopic (dds_entity_t pa
ret = DDS_RETCODE_INCONSISTENT_POLICY;
else
{
/* FIXME: calling addref is wrong because the Cyclone library has no
knowledge of the reference and hence simply deleting the participant
won't make the ref count drop to 0. On the other hand, the DDS spec
says find_topic (and a second call to create_topic) return a new
proxy that must separately be deleted. */
dds_entity_add_ref_locked (&tp->m_entity);
/* See dds_find_topic_check_and_add_ref */
ret = DDS_RETCODE_OK;
}
dds_topic_unlock (tp);
@ -429,7 +429,8 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
/* Create topic */
top = dds_alloc (sizeof (*top));
hdl = dds_entity_init (&top->m_entity, &par->m_entity, DDS_KIND_TOPIC, new_qos, listener, DDS_TOPIC_STATUS_MASK);
/* FIXME: setting "implicit" based on sertopic->ops is a hack */
hdl = dds_entity_init (&top->m_entity, &par->m_entity, DDS_KIND_TOPIC, (sertopic->ops == &ddsi_sertopic_ops_builtintopic), new_qos, listener, DDS_TOPIC_STATUS_MASK);
top->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (&par->m_entity, &top->m_entity);
top->m_stopic = sertopic;

View file

@ -166,7 +166,7 @@ dds_entity_t dds_create_waitset (dds_entity_t owner)
}
dds_waitset *waitset = dds_alloc (sizeof (*waitset));
dds_entity_t hdl = dds_entity_init (&waitset->m_entity, e, DDS_KIND_WAITSET, NULL, NULL, 0);
dds_entity_t hdl = dds_entity_init (&waitset->m_entity, e, DDS_KIND_WAITSET, false, NULL, NULL, 0);
ddsrt_mutex_init (&waitset->wait_lock);
ddsrt_cond_init (&waitset->wait_cond);
waitset->m_entity.m_iid = ddsi_iid_gen ();
@ -176,13 +176,13 @@ dds_entity_t dds_create_waitset (dds_entity_t owner)
waitset->entities = NULL;
dds_entity_init_complete (&waitset->m_entity);
dds_entity_unlock (e);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
return hdl;
err_entity_kind:
dds_entity_unlock (e);
err_entity_lock:
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
dds_entity_unpin_and_drop_ref (&dds_global.m_entity);
return rc;
}

View file

@ -209,13 +209,12 @@ static dds_return_t dds_writer_delete (dds_entity *e) ddsrt_nonnull_all;
static dds_return_t dds_writer_delete (dds_entity *e)
{
dds_writer * const wr = (dds_writer *) e;
dds_return_t ret;
/* FIXME: not freeing WHC here because it is owned by the DDSI entity */
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
nn_xpack_free (wr->m_xp);
thread_state_asleep (lookup_thread_state ());
ret = dds_delete (wr->m_topic->m_entity.m_hdllink.hdl);
return ret;
dds_entity_drop_ref (&wr->m_topic->m_entity);
return DDS_RETCODE_OK;
}
static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled)
@ -277,21 +276,27 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
{
dds_entity *p_or_p;
if ((rc = dds_entity_pin (participant_or_publisher, &p_or_p)) != DDS_RETCODE_OK)
if ((rc = dds_entity_lock (participant_or_publisher, DDS_KIND_DONTCARE, &p_or_p)) != DDS_RETCODE_OK)
return rc;
if (dds_entity_kind (p_or_p) == DDS_KIND_PARTICIPANT)
publisher = dds_create_publisher (participant_or_publisher, qos, NULL);
else
switch (dds_entity_kind (p_or_p))
{
case DDS_KIND_PUBLISHER:
publisher = participant_or_publisher;
dds_entity_unpin (p_or_p);
pub = (dds_publisher *) p_or_p;
break;
case DDS_KIND_PARTICIPANT:
publisher = dds__create_publisher_l ((dds_participant *) p_or_p, true, qos, NULL);
dds_entity_unlock (p_or_p);
if ((rc = dds_publisher_lock (publisher, &pub)) < 0)
return rc;
break;
default:
dds_entity_unlock (p_or_p);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
}
if ((rc = dds_publisher_lock (publisher, &pub)) != DDS_RETCODE_OK)
return rc;
ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc;
if (publisher != participant_or_publisher)
pub->m_entity.m_flags |= DDS_ENTITY_IMPLICIT;
if ((rc = dds_topic_lock (topic, &tp)) != DDS_RETCODE_OK)
goto err_tp_lock;
@ -322,7 +327,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
/* Create writer */
wr = dds_alloc (sizeof (*wr));
writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, wqos, listener, DDS_WRITER_STATUS_MASK);
writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, false, wqos, listener, DDS_WRITER_STATUS_MASK);
wr->m_topic = tp;
dds_entity_add_ref_locked (&tp->m_entity);

View file

@ -25,96 +25,88 @@
#define URI_VARIABLE DDS_PROJECT_NAME_NOSPACE_CAPS"_URI"
#define MAX_PARTICIPANTS_VARIABLE "MAX_PARTICIPANTS"
static void config__check_env(
const char * env_variable,
const char * expected_value)
static void config__check_env (const char *env_variable, const char *expected_value)
{
char * env_uri = NULL;
ddsrt_getenv(env_variable, &env_uri);
#if 0
const char * const env_not_set = "Environment variable '%s' isn't set. This needs to be set to '%s' for this test to run.";
const char * const env_not_as_expected = "Environment variable '%s' has an unexpected value: '%s' (expected: '%s')";
#endif
char *env_uri = NULL;
ddsrt_getenv (env_variable, &env_uri);
#ifdef FORCE_ENV
{
bool env_ok;
if ( env_uri == NULL ) {
if (env_uri == NULL)
env_ok = false;
} else if ( strncmp(env_uri, expected_value, strlen(expected_value)) != 0 ) {
else if (strncmp (env_uri, expected_value, strlen (expected_value)) != 0)
env_ok = false;
} else {
else
env_ok = true;
}
if ( !env_ok ) {
dds_return_t r;
r = ddsrt_setenv(env_variable, expected_value);
CU_ASSERT_EQUAL_FATAL(r, DDS_RETCODE_OK);
if (!env_ok)
{
dds_return_t r = ddsrt_setenv (env_variable, expected_value);
CU_ASSERT_EQUAL_FATAL (r, DDS_RETCODE_OK);
}
}
#else
CU_ASSERT_PTR_NOT_NULL_FATAL(env_uri);
CU_ASSERT_STRING_EQUAL_FATAL(env_uri, expected_value);
CU_ASSERT_PTR_NOT_NULL_FATAL (env_uri);
CU_ASSERT_STRING_EQUAL_FATAL (env_uri, expected_value);
#endif /* FORCE_ENV */
}
CU_Test(ddsc_config, simple_udp, .init = ddsrt_init, .fini = ddsrt_fini) {
CU_Test (ddsc_config, simple_udp, .init = ddsrt_init, .fini = ddsrt_fini)
{
dds_entity_t participant;
config__check_env(URI_VARIABLE, CONFIG_ENV_SIMPLE_UDP);
config__check_env(MAX_PARTICIPANTS_VARIABLE, CONFIG_ENV_MAX_PARTICIPANTS);
participant = dds_create_participant(DDS_DOMAIN_DEFAULT, NULL, NULL);
CU_ASSERT_FATAL(participant> 0);
dds_delete(participant);
config__check_env (URI_VARIABLE, CONFIG_ENV_SIMPLE_UDP);
config__check_env (MAX_PARTICIPANTS_VARIABLE, CONFIG_ENV_MAX_PARTICIPANTS);
participant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
CU_ASSERT_FATAL (participant> 0);
dds_delete (participant);
}
CU_Test(ddsc_config, user_config, .init = ddsrt_init, .fini = ddsrt_fini) {
CU_ASSERT_FATAL(dds_create_domain(1,
CU_Test (ddsc_config, user_config, .init = ddsrt_init, .fini = ddsrt_fini)
{
dds_entity_t domain;
domain = dds_create_domain (1,
"<"DDS_PROJECT_NAME"><Domain><Id>any</Id></Domain>"
"<DDSI2E><Internal><MaxParticipants>2</MaxParticipants></Internal></DDSI2E>"
"</"DDS_PROJECT_NAME">") == DDS_RETCODE_OK);
dds_entity_t participant_1;
dds_entity_t participant_2;
dds_entity_t participant_3;
participant_1 = dds_create_participant(1, NULL, NULL);
"</"DDS_PROJECT_NAME">");
CU_ASSERT_FATAL (domain > 0);
dds_entity_t participant_1 = dds_create_participant (1, NULL, NULL);
CU_ASSERT_FATAL(participant_1 > 0);
participant_2 = dds_create_participant(1, NULL, NULL);
dds_entity_t participant_2 = dds_create_participant (1, NULL, NULL);
CU_ASSERT_FATAL(participant_2 > 0);
participant_3 = dds_create_participant(1, NULL, NULL);
dds_entity_t participant_3 = dds_create_participant (1, NULL, NULL);
CU_ASSERT(participant_3 < 0);
CU_ASSERT(participant_3 <= 0);
dds_delete(participant_3);
dds_delete(participant_2);
dds_delete(participant_1);
dds_delete (domain);
}
CU_Test(ddsc_config, incorrect_config, .init = ddsrt_init, .fini = ddsrt_fini) {
CU_Test (ddsc_config, incorrect_config, .init = ddsrt_init, .fini = ddsrt_fini)
{
dds_entity_t dom;
CU_ASSERT_FATAL(dds_create_domain(1, NULL) == DDS_RETCODE_BAD_PARAMETER);
CU_ASSERT_FATAL(dds_create_domain(1, "<CycloneDDS incorrect XML") != DDS_RETCODE_OK);
CU_ASSERT_FATAL(dds_create_domain(DDS_DOMAIN_DEFAULT,
dom = dds_create_domain (1, NULL);
CU_ASSERT_FATAL (dom == DDS_RETCODE_BAD_PARAMETER);
dom = dds_create_domain (1, "<CycloneDDS incorrect XML");
CU_ASSERT_FATAL (dom == DDS_RETCODE_ERROR); /* FIXME: "error" is rather unspecific for a bad configuration */
dom = dds_create_domain (DDS_DOMAIN_DEFAULT,
"<"DDS_PROJECT_NAME"><Domain><Id>any</Id></Domain>"
"<DDSI2E><Internal><MaxParticipants>2</MaxParticipants></Internal></DDSI2E>"
"</"DDS_PROJECT_NAME">") == DDS_RETCODE_BAD_PARAMETER);
CU_ASSERT_FATAL(dds_create_domain(2,
"</"DDS_PROJECT_NAME">");
CU_ASSERT_FATAL (dom == DDS_RETCODE_BAD_PARAMETER);
dom = dds_create_domain (2,
"<"DDS_PROJECT_NAME"><Domain><Id>any</Id></Domain>"
"<DDSI2E><Internal><MaxParticipants>2</MaxParticipants></Internal></DDSI2E>"
"</"DDS_PROJECT_NAME">") == DDS_RETCODE_OK);
CU_ASSERT_FATAL(dds_create_domain(2, "") == DDS_RETCODE_PRECONDITION_NOT_MET);
"</"DDS_PROJECT_NAME">");
CU_ASSERT_FATAL (dom > 0);
/* 2nd attempt at creating the same domain should fail */
CU_ASSERT_FATAL (dds_create_domain (2, "") == DDS_RETCODE_PRECONDITION_NOT_MET);
dds_delete (dom);
}

View file

@ -192,6 +192,10 @@ CU_Test(ddsc_entity_delete, recursive_with_deleted_topic)
ret = dds_delete(g_topic);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_OK);
/* Second call to delete a topic must fail */
ret = dds_delete(g_topic);
CU_ASSERT_EQUAL_FATAL(ret, DDS_RETCODE_ALREADY_DELETED);
/* Third, deleting the participant should delete all children of which
* the writer with the last topic reference is one. */
ret = dds_delete(g_participant);