Respect locking order in waitset attach/detach
This fixes a possible deadlock when detaching an entity at the same time it is triggering: a triggering entity holds its m_observers_lock while trying to acquire waiset::m_mutex, and so attach and detach must not do the opposite. The deadlock had excellent reproducibility in a seemingly unrelated ROS2 application; this changes fixes it. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
		
							parent
							
								
									fbc05777f3
								
							
						
					
					
						commit
						ed59c388f5
					
				
					 4 changed files with 126 additions and 104 deletions
				
			
		| 
						 | 
				
			
			@ -123,8 +123,9 @@ DDS_EXPORT dds_return_t
 | 
			
		|||
dds_entity_observer_register(
 | 
			
		||||
  dds_entity *observed,
 | 
			
		||||
  dds_entity *observer,
 | 
			
		||||
  dds_entity_callback cb,
 | 
			
		||||
  dds_entity_delete_callback delete_cb);
 | 
			
		||||
  dds_entity_callback_t cb,
 | 
			
		||||
  dds_entity_attach_callback_t attach_cb, void *attach_arg,
 | 
			
		||||
  dds_entity_delete_callback_t delete_cb);
 | 
			
		||||
 | 
			
		||||
DDS_EXPORT dds_return_t
 | 
			
		||||
dds_entity_observer_unregister(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -98,19 +98,20 @@ struct dds_entity;
 | 
			
		|||
 | 
			
		||||
typedef struct dds_entity_deriver {
 | 
			
		||||
  /* Close can be used to terminate (blocking) actions on a entity before actually deleting it. */
 | 
			
		||||
  dds_return_t (*close)(struct dds_entity *e) ddsrt_nonnull_all;
 | 
			
		||||
  dds_return_t (*close) (struct dds_entity *e) ddsrt_nonnull_all;
 | 
			
		||||
  /* Delete is used to actually free the entity. */
 | 
			
		||||
  dds_return_t (*delete)(struct dds_entity *e) ddsrt_nonnull_all;
 | 
			
		||||
  dds_return_t (*set_qos)(struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all;
 | 
			
		||||
  dds_return_t (*validate_status)(uint32_t mask);
 | 
			
		||||
  dds_return_t (*delete) (struct dds_entity *e) ddsrt_nonnull_all;
 | 
			
		||||
  dds_return_t (*set_qos) (struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all;
 | 
			
		||||
  dds_return_t (*validate_status) (uint32_t mask);
 | 
			
		||||
} dds_entity_deriver;
 | 
			
		||||
 | 
			
		||||
typedef void (*dds_entity_callback)(struct dds_entity *observer, dds_entity_t observed, uint32_t status);
 | 
			
		||||
typedef void (*dds_entity_delete_callback)(struct dds_entity *observer, dds_entity_t observed);
 | 
			
		||||
typedef void (*dds_entity_callback_t) (struct dds_entity *observer, dds_entity_t observed, uint32_t status);
 | 
			
		||||
typedef void (*dds_entity_attach_callback_t) (struct dds_entity *observer, struct dds_entity *observed, void *attach_arg);
 | 
			
		||||
typedef void (*dds_entity_delete_callback_t) (struct dds_entity *observer, dds_entity_t observed);
 | 
			
		||||
 | 
			
		||||
typedef struct dds_entity_observer {
 | 
			
		||||
  dds_entity_callback m_cb;
 | 
			
		||||
  dds_entity_delete_callback m_delete_cb;
 | 
			
		||||
  dds_entity_callback_t m_cb;
 | 
			
		||||
  dds_entity_delete_callback_t m_delete_cb;
 | 
			
		||||
  struct dds_entity *m_observer;
 | 
			
		||||
  struct dds_entity_observer *m_next;
 | 
			
		||||
} dds_entity_observer;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1143,7 +1143,7 @@ static bool in_observer_list_p (const struct dds_entity *observed, const dds_ent
 | 
			
		|||
  return false;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *observer, dds_entity_callback cb, dds_entity_delete_callback delete_cb)
 | 
			
		||||
dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *observer, dds_entity_callback_t cb, dds_entity_attach_callback_t attach_cb, void *attach_arg, dds_entity_delete_callback_t delete_cb)
 | 
			
		||||
{
 | 
			
		||||
  dds_return_t rc;
 | 
			
		||||
  assert (observed);
 | 
			
		||||
| 
						 | 
				
			
			@ -1158,6 +1158,7 @@ dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *obs
 | 
			
		|||
    o->m_observer = observer;
 | 
			
		||||
    o->m_next = observed->m_observers;
 | 
			
		||||
    observed->m_observers = o;
 | 
			
		||||
    attach_cb (observer, observed, attach_arg);
 | 
			
		||||
    rc = DDS_RETCODE_OK;
 | 
			
		||||
  }
 | 
			
		||||
  ddsrt_mutex_unlock (&observed->m_observers_lock);
 | 
			
		||||
| 
						 | 
				
			
			@ -1185,6 +1186,7 @@ dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_entity *o
 | 
			
		|||
      observed->m_observers = idx->m_next;
 | 
			
		||||
    else
 | 
			
		||||
      prev->m_next = idx->m_next;
 | 
			
		||||
    idx->m_delete_cb (idx->m_observer, observed->m_hdllink.hdl);
 | 
			
		||||
    ddsrt_free (idx);
 | 
			
		||||
    rc = DDS_RETCODE_OK;
 | 
			
		||||
  }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,5 +1,5 @@
 | 
			
		|||
/*
 | 
			
		||||
 * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
 | 
			
		||||
 * Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
 | 
			
		||||
 *
 | 
			
		||||
 * This program and the accompanying materials are made available under the
 | 
			
		||||
 * terms of the Eclipse Public License v. 2.0 which is available at
 | 
			
		||||
| 
						 | 
				
			
			@ -93,8 +93,12 @@ static dds_return_t dds_waitset_close (struct dds_entity *e)
 | 
			
		|||
{
 | 
			
		||||
  /* deep in the process of deleting the entity, so this is the only thread */
 | 
			
		||||
  dds_waitset *ws = (dds_waitset *) e;
 | 
			
		||||
  for (size_t i = 0; i < ws->nentities; i++)
 | 
			
		||||
    (void) dds_entity_observer_unregister (ws->entities[i].entity, &ws->m_entity);
 | 
			
		||||
  while (ws->nentities > 0)
 | 
			
		||||
  {
 | 
			
		||||
    dds_return_t rc = dds_entity_observer_unregister (ws->entities[0].entity, &ws->m_entity);
 | 
			
		||||
    assert (rc == DDS_RETCODE_OK);
 | 
			
		||||
    (void) rc;
 | 
			
		||||
  }
 | 
			
		||||
  return DDS_RETCODE_OK;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -175,27 +179,6 @@ dds_return_t dds_waitset_get_entities (dds_entity_t waitset, dds_entity_t *entit
 | 
			
		|||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void dds_waitset_remove (dds_waitset *ws, dds_entity_t observed)
 | 
			
		||||
{
 | 
			
		||||
  size_t i;
 | 
			
		||||
  for (i = 0; i < ws->nentities; i++)
 | 
			
		||||
    if (ws->entities[i].handle == observed)
 | 
			
		||||
      break;
 | 
			
		||||
  if (i < ws->nentities)
 | 
			
		||||
  {
 | 
			
		||||
    if (i < ws->ntriggered)
 | 
			
		||||
    {
 | 
			
		||||
      ws->entities[i] = ws->entities[--ws->ntriggered];
 | 
			
		||||
      ws->entities[ws->ntriggered] = ws->entities[--ws->nentities];
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      ws->entities[i] = ws->entities[--ws->nentities];
 | 
			
		||||
    }
 | 
			
		||||
    return;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* This is called when the observed entity signals a status change. */
 | 
			
		||||
static void dds_waitset_observer (dds_entity *ent, dds_entity_t observed, uint32_t status)
 | 
			
		||||
{
 | 
			
		||||
| 
						 | 
				
			
			@ -220,53 +203,22 @@ static void dds_waitset_observer (dds_entity *ent, dds_entity_t observed, uint32
 | 
			
		|||
  ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void dds_waitset_delete_observer (dds_entity *ent, dds_entity_t observed)
 | 
			
		||||
struct dds_waitset_attach_observer_arg {
 | 
			
		||||
  dds_attach_t x;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
static void dds_waitset_attach_observer (struct dds_entity *observer, struct dds_entity *observed, void *varg)
 | 
			
		||||
{
 | 
			
		||||
  assert (dds_entity_kind (ent) == DDS_KIND_WAITSET);
 | 
			
		||||
  dds_waitset *ws = (dds_waitset *) ent;
 | 
			
		||||
  assert (dds_entity_kind (observer) == DDS_KIND_WAITSET);
 | 
			
		||||
  dds_waitset *ws = (dds_waitset *) observer;
 | 
			
		||||
  struct dds_waitset_attach_observer_arg *arg = varg;
 | 
			
		||||
  ddsrt_mutex_lock (&ws->m_entity.m_mutex);
 | 
			
		||||
  /* Remove this observed entity, which is being deleted, from the waitset. */
 | 
			
		||||
  dds_waitset_remove (ws, observed);
 | 
			
		||||
  /* Our registration to this observed entity will be removed automatically. */
 | 
			
		||||
  /* Trigger waitset to wake up. */
 | 
			
		||||
  ddsrt_cond_broadcast (&ws->m_entity.m_cond);
 | 
			
		||||
  ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_attach_t x)
 | 
			
		||||
{
 | 
			
		||||
  dds_entity *e;
 | 
			
		||||
  dds_waitset *ws;
 | 
			
		||||
  dds_return_t ret;
 | 
			
		||||
 | 
			
		||||
  if ((ret = dds_waitset_lock (waitset, &ws)) < 0)
 | 
			
		||||
    return ret;
 | 
			
		||||
 | 
			
		||||
  if (waitset == entity)
 | 
			
		||||
    e = &ws->m_entity;
 | 
			
		||||
  else if ((ret = dds_entity_pin (entity, &e)) < 0)
 | 
			
		||||
    goto err_waitset;
 | 
			
		||||
 | 
			
		||||
  /* Entity must be "in scope": within the participant, domain or (self-evidently true) Cyclone DDS,
 | 
			
		||||
     depending on the parent of the waitset, so that one can't use a waitset created in participant
 | 
			
		||||
     A to wait for entities in participant B, &c.  While there is no technical obstacle (though
 | 
			
		||||
     there might be one for cross-domain use one day), it seems rather unhygienic practice. */
 | 
			
		||||
  if (!dds_entity_in_scope (e, ws->m_entity.m_parent))
 | 
			
		||||
  {
 | 
			
		||||
    ret = DDS_RETCODE_BAD_PARAMETER;
 | 
			
		||||
    goto err_entity;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* This will fail if given entity is already attached (or deleted). */
 | 
			
		||||
  if ((ret = dds_entity_observer_register (e, &ws->m_entity, dds_waitset_observer, dds_waitset_delete_observer)) != DDS_RETCODE_OK)
 | 
			
		||||
    goto err_entity;
 | 
			
		||||
 | 
			
		||||
  ws->entities = ddsrt_realloc (ws->entities, (ws->nentities + 1) * sizeof (*ws->entities));
 | 
			
		||||
  ws->entities[ws->nentities].arg = x;
 | 
			
		||||
  ws->entities[ws->nentities].entity = e;
 | 
			
		||||
  ws->entities[ws->nentities].handle = e->m_hdllink.hdl;
 | 
			
		||||
  ws->entities[ws->nentities].arg = arg->x;
 | 
			
		||||
  ws->entities[ws->nentities].entity = observed;
 | 
			
		||||
  ws->entities[ws->nentities].handle = observed->m_hdllink.hdl;
 | 
			
		||||
  ws->nentities++;
 | 
			
		||||
  if (is_triggered (e))
 | 
			
		||||
  if (is_triggered (observed))
 | 
			
		||||
  {
 | 
			
		||||
    const size_t i = ws->nentities - 1;
 | 
			
		||||
    dds_attachment tmp = ws->entities[i];
 | 
			
		||||
| 
						 | 
				
			
			@ -274,46 +226,112 @@ dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_
 | 
			
		|||
    ws->entities[ws->ntriggered++] = tmp;
 | 
			
		||||
  }
 | 
			
		||||
  ddsrt_cond_broadcast (&ws->m_entity.m_cond);
 | 
			
		||||
  ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
err_entity:
 | 
			
		||||
  if (e != &ws->m_entity)
 | 
			
		||||
    dds_entity_unpin (e);
 | 
			
		||||
err_waitset:
 | 
			
		||||
  dds_waitset_unlock (ws);
 | 
			
		||||
  return ret;
 | 
			
		||||
static void dds_waitset_delete_observer (dds_entity *ent, dds_entity_t observed)
 | 
			
		||||
{
 | 
			
		||||
  assert (dds_entity_kind (ent) == DDS_KIND_WAITSET);
 | 
			
		||||
  dds_waitset *ws = (dds_waitset *) ent;
 | 
			
		||||
  size_t i;
 | 
			
		||||
  ddsrt_mutex_lock (&ws->m_entity.m_mutex);
 | 
			
		||||
  for (i = 0; i < ws->nentities; i++)
 | 
			
		||||
    if (ws->entities[i].handle == observed)
 | 
			
		||||
      break;
 | 
			
		||||
  if (i < ws->nentities)
 | 
			
		||||
  {
 | 
			
		||||
    if (i < ws->ntriggered)
 | 
			
		||||
    {
 | 
			
		||||
      ws->entities[i] = ws->entities[--ws->ntriggered];
 | 
			
		||||
      ws->entities[ws->ntriggered] = ws->entities[--ws->nentities];
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      ws->entities[i] = ws->entities[--ws->nentities];
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  ddsrt_cond_broadcast (&ws->m_entity.m_cond);
 | 
			
		||||
  ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_attach_t x)
 | 
			
		||||
{
 | 
			
		||||
  dds_entity *wsent;
 | 
			
		||||
  dds_entity *e;
 | 
			
		||||
  dds_return_t ret;
 | 
			
		||||
 | 
			
		||||
  if ((ret = dds_entity_pin (waitset, &wsent)) < 0)
 | 
			
		||||
    return ret;
 | 
			
		||||
  else if (dds_entity_kind (wsent) != DDS_KIND_WAITSET)
 | 
			
		||||
  {
 | 
			
		||||
    dds_entity_unpin (wsent);
 | 
			
		||||
    return DDS_RETCODE_ILLEGAL_OPERATION;
 | 
			
		||||
  }
 | 
			
		||||
  else
 | 
			
		||||
  {
 | 
			
		||||
    dds_waitset *ws = (dds_waitset *) wsent;
 | 
			
		||||
 | 
			
		||||
    if (waitset == entity)
 | 
			
		||||
      e = &ws->m_entity;
 | 
			
		||||
    else if ((ret = dds_entity_pin (entity, &e)) < 0)
 | 
			
		||||
      goto err_entity;
 | 
			
		||||
 | 
			
		||||
    /* Entity must be "in scope": within the participant, domain or (self-evidently true) Cyclone DDS,
 | 
			
		||||
       depending on the parent of the waitset, so that one can't use a waitset created in participant
 | 
			
		||||
       A to wait for entities in participant B, &c.  While there is no technical obstacle (though
 | 
			
		||||
       there might be one for cross-domain use one day), it seems rather unhygienic practice. */
 | 
			
		||||
    if (!dds_entity_in_scope (e, ws->m_entity.m_parent))
 | 
			
		||||
    {
 | 
			
		||||
      ret = DDS_RETCODE_BAD_PARAMETER;
 | 
			
		||||
      goto err_scope;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /* This will fail if given entity is already attached (or deleted). */
 | 
			
		||||
    struct dds_waitset_attach_observer_arg attach_arg = { .x = x };
 | 
			
		||||
    if ((ret = dds_entity_observer_register (e, &ws->m_entity, dds_waitset_observer, dds_waitset_attach_observer, &attach_arg, dds_waitset_delete_observer)) != DDS_RETCODE_OK)
 | 
			
		||||
      goto err_entity;
 | 
			
		||||
 | 
			
		||||
  err_scope:
 | 
			
		||||
    if (e != &ws->m_entity)
 | 
			
		||||
      dds_entity_unpin (e);
 | 
			
		||||
  err_entity:
 | 
			
		||||
    dds_entity_unpin (&ws->m_entity);
 | 
			
		||||
    return ret;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity)
 | 
			
		||||
{
 | 
			
		||||
  dds_waitset *ws;
 | 
			
		||||
  dds_entity *e;
 | 
			
		||||
  dds_entity *wsent;
 | 
			
		||||
  dds_return_t ret;
 | 
			
		||||
 | 
			
		||||
  if ((ret = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
 | 
			
		||||
  if ((ret = dds_entity_pin (waitset, &wsent)) != DDS_RETCODE_OK)
 | 
			
		||||
    return ret;
 | 
			
		||||
 | 
			
		||||
  /* Possibly fails when entity was not attached. */
 | 
			
		||||
  if (waitset == entity)
 | 
			
		||||
    ret = dds_entity_observer_unregister (&ws->m_entity, &ws->m_entity);
 | 
			
		||||
  else if ((ret = dds_entity_pin (entity, &e)) < 0)
 | 
			
		||||
    ; /* entity invalid */
 | 
			
		||||
  else
 | 
			
		||||
  else if (dds_entity_kind (wsent) != DDS_KIND_WAITSET)
 | 
			
		||||
  {
 | 
			
		||||
    ret = dds_entity_observer_unregister (e, &ws->m_entity);
 | 
			
		||||
    dds_entity_unpin (e);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (ret == DDS_RETCODE_OK)
 | 
			
		||||
  {
 | 
			
		||||
    dds_waitset_remove (ws, entity);
 | 
			
		||||
    dds_entity_unpin (wsent);
 | 
			
		||||
    return DDS_RETCODE_ILLEGAL_OPERATION;
 | 
			
		||||
  }
 | 
			
		||||
  else
 | 
			
		||||
  {
 | 
			
		||||
    if (ret != DDS_RETCODE_PRECONDITION_NOT_MET)
 | 
			
		||||
    dds_waitset *ws = (dds_waitset *) wsent;
 | 
			
		||||
    dds_entity *e;
 | 
			
		||||
    /* Possibly fails when entity was not attached. */
 | 
			
		||||
    if (waitset == entity)
 | 
			
		||||
      ret = dds_entity_observer_unregister (&ws->m_entity, &ws->m_entity);
 | 
			
		||||
    else if ((ret = dds_entity_pin (entity, &e)) < 0)
 | 
			
		||||
      ; /* entity invalid */
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      ret = dds_entity_observer_unregister (e, &ws->m_entity);
 | 
			
		||||
      dds_entity_unpin (e);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    dds_entity_unpin (&ws->m_entity);
 | 
			
		||||
    if (ret != DDS_RETCODE_OK && ret != DDS_RETCODE_PRECONDITION_NOT_MET)
 | 
			
		||||
      ret = DDS_RETCODE_BAD_PARAMETER;
 | 
			
		||||
    return ret;
 | 
			
		||||
  }
 | 
			
		||||
  dds_waitset_unlock (ws);
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
dds_return_t dds_waitset_wait_until (dds_entity_t waitset, dds_attach_t *xs, size_t nxs, dds_time_t abstimeout)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue