Small entity deletion wip refactoring.

Signed-off-by: Martin Bremmer <martin.bremmer@adlinktech.com>
This commit is contained in:
Martin Bremmer 2019-11-04 16:24:47 +01:00 committed by eboasson
parent 40973d8e29
commit fc8b8fef3a
3 changed files with 93 additions and 96 deletions

View file

@ -48,7 +48,7 @@ static int dds_domain_compare (const void *va, const void *vb)
static const ddsrt_avl_treedef_t dds_domaintree_def = DDSRT_AVL_TREEDEF_INITIALIZER (
offsetof (dds_domain, m_node), offsetof (dds_domain, m_id), dds_domain_compare, 0);
static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_id, const char *config, bool implicit)
static dds_entity_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_id, const char *config, bool implicit)
{
dds_entity_t domh;
uint32_t len;
@ -193,27 +193,37 @@ dds_domain *dds_domain_find_locked (dds_domainid_t id)
dds_entity_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t id, bool implicit, const char *config)
{
struct dds_domain *dom;
dds_entity_t domh;
dds_entity_t domh = DDS_RETCODE_ERROR;
/* FIXME: should perhaps lock parent object just like everywhere */
ddsrt_mutex_lock (&dds_global.m_mutex);
retry:
if (id != DDS_DOMAIN_DEFAULT)
{
if ((dom = dds_domain_find_locked (id)) == NULL)
domh = DDS_RETCODE_NOT_FOUND;
dom = dds_domain_find_locked (id);
else
domh = dom->m_entity.m_hdllink.hdl;
}
else
{
if ((dom = ddsrt_avl_find_min (&dds_domaintree_def, &dds_global.m_domains)) != NULL)
domh = dom->m_entity.m_hdllink.hdl;
else
domh = DDS_RETCODE_NOT_FOUND;
}
dom = ddsrt_avl_find_min (&dds_domaintree_def, &dds_global.m_domains);
if (domh == DDS_RETCODE_NOT_FOUND)
if (dom)
{
if (!implicit)
domh = DDS_RETCODE_PRECONDITION_NOT_MET;
else
{
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
if (dds_handle_is_closed (&dom->m_entity.m_hdllink))
{
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
ddsrt_cond_wait (&dds_global.m_cond, &dds_global.m_mutex);
goto retry;
}
dds_entity_add_ref_locked (&dom->m_entity);
dds_handle_repin (&dom->m_entity.m_hdllink);
domh = dom->m_entity.m_hdllink.hdl;
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}
}
else
{
dom = dds_alloc (sizeof (*dom));
if ((domh = dds_domain_init (dom, id, config, implicit)) < 0)
@ -228,35 +238,7 @@ dds_entity_t dds_domain_create_internal (dds_domain **domain_out, dds_domainid_t
dds_entity_add_ref_locked (&dom->m_entity);
dds_handle_repin (&dom->m_entity.m_hdllink);
}
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}
}
else if (domh <= DDS_RETCODE_OK)
{
assert (0);
domh = DDS_RETCODE_ERROR;
}
else if (!implicit)
{
domh = DDS_RETCODE_PRECONDITION_NOT_MET;
}
else
{
ddsrt_mutex_lock (&dom->m_entity.m_mutex);
if (dds_handle_is_closed (&dom->m_entity.m_hdllink))
{
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
ddsrt_cond_wait (&dds_global.m_cond, &dds_global.m_mutex);
goto retry;
}
else
{
if (implicit)
{
dds_entity_add_ref_locked (&dom->m_entity);
dds_handle_repin (&dom->m_entity.m_hdllink);
}
domh = dom->m_entity.m_hdllink.hdl;
ddsrt_mutex_unlock (&dom->m_entity.m_mutex);
*domain_out = dom;
}

View file

@ -92,10 +92,7 @@ void dds_entity_drop_ref (dds_entity *e)
{
if (dds_handle_drop_ref (&e->m_hdllink))
{
/* increment pin count unconditionally to satisfy the "pinned" requirement */
dds_handle_repin (&e->m_hdllink);
ddsrt_mutex_lock (&e->m_mutex);
dds_return_t ret = really_delete_pinned_closed_locked (e, DIS_EXPLICIT);
dds_return_t ret = dds_delete_impl(e->m_hdllink.hdl, DIS_EXPLICIT);
assert (ret == DDS_RETCODE_OK);
(void) ret;
}
@ -105,10 +102,7 @@ void dds_entity_unpin_and_drop_ref (dds_entity *e)
{
if (dds_handle_unpin_and_drop_ref (&e->m_hdllink))
{
/* increment pin count unconditionally to satisfy the "pinned" requirement */
dds_handle_repin (&e->m_hdllink);
ddsrt_mutex_lock (&e->m_mutex);
dds_return_t ret = really_delete_pinned_closed_locked (e, DIS_EXPLICIT);
dds_return_t ret = dds_delete_impl(e->m_hdllink.hdl, DIS_EXPLICIT);
assert (ret == DDS_RETCODE_OK);
(void) ret;
}
@ -223,17 +217,45 @@ void dds_entity_register_child (dds_entity *parent, dds_entity *child)
dds_entity_add_ref_locked (parent);
}
static dds_entity *next_non_topic_child (ddsrt_avl_tree_t *remaining_children)
static dds_entity *get_first_child (ddsrt_avl_tree_t *remaining_children, bool ignore_topics)
{
ddsrt_avl_iter_t it;
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, remaining_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
{
if (dds_entity_kind (e) != DDS_KIND_TOPIC)
if ((!ignore_topics) || (dds_entity_kind(e) != DDS_KIND_TOPIC))
return e;
}
return NULL;
}
static void delete_children(struct dds_entity *parent, bool ignore_topics)
{
dds_entity *child;
dds_return_t ret;
ddsrt_mutex_lock (&parent->m_mutex);
while ((child = get_first_child(&parent->m_children, ignore_topics)) != NULL)
{
dds_entity_t child_handle = child->m_hdllink.hdl;
/* The child will remove itself from the parent->m_children list. */
ddsrt_mutex_unlock (&parent->m_mutex);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT);
assert (ret == DDS_RETCODE_OK || ret == DDS_RETCODE_BAD_PARAMETER);
ddsrt_mutex_lock (&parent->m_mutex);
/* The dds_delete can fail if the child is being deleted in parallel,
* in which case: wait when its not deleted yet.
* The child will trigger the condition after it removed itself from
* the childrens list. */
if ((ret == DDS_RETCODE_BAD_PARAMETER) &&
(get_first_child(&parent->m_children, ignore_topics) == child))
{
ddsrt_cond_wait (&parent->m_cond, &parent->m_mutex);
}
}
ddsrt_mutex_unlock (&parent->m_mutex);
}
#define TRACE_DELETE 0 /* FIXME: use DDS_LOG for this */
#if TRACE_DELETE
static const char *entity_kindstr (dds_entity_kind_t kind)
@ -312,7 +334,6 @@ dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delst
static dds_return_t really_delete_pinned_closed_locked (struct dds_entity *e, enum delete_impl_state delstate)
{
dds_entity *child;
dds_return_t ret;
/* No threads pinning it anymore, no need to worry about other threads deleting
@ -369,32 +390,8 @@ static dds_return_t really_delete_pinned_closed_locked (struct dds_entity *e, en
*
* To circumvent the problem. We ignore topics in the first loop.
*/
ddsrt_mutex_lock (&e->m_mutex);
while ((child = next_non_topic_child (&e->m_children)) != NULL)
{
/* FIXME: dds_delete can fail if the child is being deleted in parallel, in which case: wait */
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT);
assert (ret == DDS_RETCODE_OK || ret == DDS_RETCODE_BAD_PARAMETER);
(void) ret;
ddsrt_mutex_lock (&e->m_mutex);
if (ret == DDS_RETCODE_BAD_PARAMETER && child == next_non_topic_child (&e->m_children))
{
ddsrt_cond_wait (&e->m_cond, &e->m_mutex);
}
}
while ((child = ddsrt_avl_find_min (&dds_entity_children_td, &e->m_children)) != NULL)
{
assert (dds_entity_kind (child) == DDS_KIND_TOPIC);
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT);
assert (ret == DDS_RETCODE_OK);
(void) ret;
ddsrt_mutex_lock (&e->m_mutex);
}
ddsrt_mutex_unlock (&e->m_mutex);
delete_children(e, true /* ignore topics */);
delete_children(e, false /* delete topics */);
/* The dds_handle_delete will wait until the last active claim on that handle is
released. It is possible that this last release will be done by a thread that was
@ -416,6 +413,7 @@ static dds_return_t really_delete_pinned_closed_locked (struct dds_entity *e, en
ddsrt_avl_delete (&dds_entity_children_td, &p->m_children, e);
if (dds_handle_drop_childref_and_pin (&p->m_hdllink, delstate != DIS_FROM_PARENT))
{
dds_handle_close(&p->m_hdllink);
assert (dds_handle_is_closed (&p->m_hdllink));
assert (dds_handle_is_not_refd (&p->m_hdllink));
assert (ddsrt_avl_is_empty (&p->m_children));

View file

@ -294,9 +294,17 @@ int32_t dds_handle_pin_for_delete (dds_handle_t hdl, bool explicit, struct dds_h
outstanding references. This implies that there are no children, because then the
entire hierarchy would simply have been deleted. */
assert (!(cf & HDL_FLAG_ALLOW_CHILDREN));
if (cf & HDL_REFCOUNT_MASK)
{
rc = DDS_RETCODE_ALREADY_DELETED;
break;
}
else
{
/* Refcount reached zero. Pin to allow deletion. */
cf1 = (cf + 1u) | HDL_FLAG_CLOSING;
}
}
else if (explicit)
{
/* Explicit call to dds_delete (either by application or by parent deleting its children) */
@ -305,21 +313,24 @@ int32_t dds_handle_pin_for_delete (dds_handle_t hdl, bool explicit, struct dds_h
/* Entity is implicit, so handle doesn't hold a reference */
cf1 = (cf + 1u) | HDL_FLAG_CLOSING;
}
else if (cf & HDL_FLAG_ALLOW_CHILDREN)
else
{
/* Entity is explicit, so handle held a reference, refc only counts children as so is not our concern */
assert ((cf & HDL_REFCOUNT_MASK) > 0);
if ((cf & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT)
{
/* Last reference is closing. Pin entity and indicate that it is closing. */
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
}
else if (!(cf & HDL_FLAG_ALLOW_CHILDREN))
{
/* The refcnt does not contain children.
* Indicate that the closing of the entity is deferred. */
cf1 = (cf - HDL_REFCOUNT_UNIT) | HDL_FLAG_DELETE_DEFERRED;
}
else
{
/* Entity is explicit, so handle held a reference, refc counts non-children, refc > 1 means drop ref and error (so don't pin) */
assert ((cf & HDL_REFCOUNT_MASK) > 0);
if ((cf & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT)
/* Entity is explicit, so handle held a reference, refc only counts children as so is not our concern */
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
else
{
cf1 = (cf - HDL_REFCOUNT_UNIT) | HDL_FLAG_DELETE_DEFERRED;
}
}
}
@ -328,11 +339,21 @@ int32_t dds_handle_pin_for_delete (dds_handle_t hdl, bool explicit, struct dds_h
/* Implicit call to dds_delete (child invoking delete on its parent) */
if (cf & HDL_FLAG_IMPLICIT)
{
assert ((cf & HDL_REFCOUNT_MASK) > 0);
if ((cf & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT)
{
/* Last reference is closing. Pin entity and indicate that it is closing. */
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
}
else if (!(cf & HDL_FLAG_ALLOW_CHILDREN))
{
/* The refcnt does not contain children.
* Indicate that the closing of the entity is deferred. */
cf1 = (cf - HDL_REFCOUNT_UNIT) | HDL_FLAG_DELETE_DEFERRED;
}
else
{
assert ((cf & HDL_REFCOUNT_MASK) > 0);
/* Just reduce the children refcount by one. */
cf1 = (cf - HDL_REFCOUNT_UNIT);
}
}
@ -373,7 +394,7 @@ bool dds_handle_drop_childref_and_pin (struct dds_handle_link *link, bool may_de
/* Implicit parent: delete if last ref */
if ((cf & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT && may_delete_parent)
{
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u) | HDL_FLAG_CLOSING;
cf1 = (cf - HDL_REFCOUNT_UNIT + 1u);
del_parent = true;
}
else
@ -436,8 +457,6 @@ bool dds_handle_drop_ref (struct dds_handle_link *link)
old = ddsrt_atomic_ld32 (&link->cnt_flags);
assert ((old & HDL_REFCOUNT_MASK) > 0);
new = old - HDL_REFCOUNT_UNIT;
if ((old & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT)
new |= HDL_FLAG_CLOSING;
} while (!ddsrt_atomic_cas32 (&link->cnt_flags, old, new));
ddsrt_mutex_lock (&handles.lock);
if ((new & (HDL_FLAG_CLOSING | HDL_PINCOUNT_MASK)) == (HDL_FLAG_CLOSING | 1u))
@ -445,7 +464,7 @@ bool dds_handle_drop_ref (struct dds_handle_link *link)
ddsrt_cond_broadcast (&handles.cond);
}
ddsrt_mutex_unlock (&handles.lock);
return (new & (HDL_FLAG_CLOSING | HDL_REFCOUNT_MASK)) == (HDL_FLAG_CLOSING | 0);
return ((new & HDL_REFCOUNT_MASK) == 0);
}
bool dds_handle_unpin_and_drop_ref (struct dds_handle_link *link)
@ -456,8 +475,6 @@ bool dds_handle_unpin_and_drop_ref (struct dds_handle_link *link)
assert ((old & HDL_REFCOUNT_MASK) > 0);
assert ((old & HDL_PINCOUNT_MASK) > 0);
new = old - HDL_REFCOUNT_UNIT - 1u;
if ((old & HDL_REFCOUNT_MASK) == HDL_REFCOUNT_UNIT && (old & HDL_FLAG_IMPLICIT))
new |= HDL_FLAG_CLOSING;
} while (!ddsrt_atomic_cas32 (&link->cnt_flags, old, new));
ddsrt_mutex_lock (&handles.lock);
if ((new & (HDL_FLAG_CLOSING | HDL_PINCOUNT_MASK)) == (HDL_FLAG_CLOSING | 1u))
@ -465,7 +482,7 @@ bool dds_handle_unpin_and_drop_ref (struct dds_handle_link *link)
ddsrt_cond_broadcast (&handles.cond);
}
ddsrt_mutex_unlock (&handles.lock);
return (new & (HDL_FLAG_CLOSING | HDL_REFCOUNT_MASK)) == (HDL_FLAG_CLOSING | 0);
return ((new & HDL_REFCOUNT_MASK) == 0);
}
bool dds_handle_close (struct dds_handle_link *link)