Address race conditions in deleting entities

Deleting entities concurrently with operating on them or creating child
entities should work properly, even if it is essentially abuse.  This
commit fixes (most?) of them, with exception of some nastiness when
deleting the last entity, and thus deinitialising the library, in
parallel to attempting to operate on a (by definition invalid) handle.

* Interrupting a blocked operation at the beginning of "delete" is now a
  separate operation.  E.g., a wait call on a waitset must be interrupted,
  but the data structures can't be touched yet because other threads may
  be doing an attach/detach in parallel.

* DDSI writer can now be switched to an intermediate state,
  "INTERRUPTED", to indicate that it should unblock any waiting threads
  and refuse to transmit any further data, but without actually
  embarking on the path of deleting data structures.

* The extra "pinning" of readers and writers is now gone, they remain
  fully functional until the no other threads can still access the
  entity.

* Future listener invocations are prevented as part of deleting the
  entity, but now it also guarantees the application can no longer
  re-enable them.  It furthermore waits until there are no further
  current or pending listener invocations, rather than simply no current
  ones.

* The internal state of the waitset now has its own lock, otherwise
  attaching the parent entity of the waitset can require locking the
  waitset after having locking the parent, which violates locking order.

* Handles are created in a pending state, where they are not included in
  a dds_get_children operation and refuse to be pinned.  This makes it
  possible (in a future commit) to undo deletion of complex entities.

* There is a test (ddsc_waitset_torture) that exercises some of these
  corner cases.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-09-07 08:21:56 +02:00 committed by eboasson
parent 81be40ec0e
commit 133014cdfa
24 changed files with 1042 additions and 568 deletions

View file

@ -28,6 +28,8 @@ dds_entity_init(
const dds_listener_t *listener,
status_mask_t mask);
DDS_EXPORT void dds_entity_init_complete (dds_entity *entity);
DDS_EXPORT void
dds_entity_register_child (
dds_entity *parent,
@ -36,7 +38,7 @@ dds_entity_register_child (
DDS_EXPORT void
dds_entity_add_ref_locked(dds_entity *e);
#define DEFINE_ENTITY_LOCK_UNLOCK_ONLY(qualifier_, type_, kind_) \
#define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \
qualifier_ dds_return_t type_##_lock (dds_entity_t hdl, type_ **x) \
{ \
dds_return_t rc; \
@ -50,25 +52,10 @@ dds_entity_add_ref_locked(dds_entity *e);
{ \
dds_entity_unlock (&x->m_entity); \
}
#define DECL_ENTITY_LOCK_UNLOCK_ONLY(qualifier_, type_) \
#define DECL_ENTITY_LOCK_UNLOCK(qualifier_, type_) \
qualifier_ dds_return_t type_##_lock (dds_entity_t hdl, type_ **x); \
qualifier_ void type_##_unlock (type_ *x);
#define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \
DEFINE_ENTITY_LOCK_UNLOCK_ONLY (qualifier_, type_, kind_) \
qualifier_ dds_return_t type_##_lock_for_create (dds_entity_t hdl, type_ **x) \
{ \
dds_return_t rc; \
dds_entity *e; \
if ((rc = dds_entity_lock_for_create (hdl, kind_, &e)) < 0) \
return rc; \
*x = (type_ *) e; \
return DDS_RETCODE_OK; \
}
#define DECL_ENTITY_LOCK_UNLOCK(qualifier_, type_) \
DECL_ENTITY_LOCK_UNLOCK_ONLY (qualifier_, type_) \
qualifier_ dds_return_t type_##_lock_for_create (dds_entity_t hdl, type_ **x);
DDS_EXPORT inline dds_entity *dds_entity_from_handle_link (struct dds_handle_link *hdllink) {
return (dds_entity *) ((char *) hdllink - offsetof (struct dds_entity, m_hdllink));
}
@ -96,6 +83,14 @@ DDS_EXPORT dds_participant *dds_entity_participant (dds_entity *e);
DDS_EXPORT void dds_entity_final_deinit_before_free (dds_entity *e);
DDS_EXPORT bool dds_entity_in_scope (const dds_entity *e, const dds_entity *root);
enum delete_impl_state {
DIS_EXPLICIT, /* explicit delete on this entity */
DIS_FROM_PARENT, /* called because the parent is being deleted */
DIS_IMPLICIT /* called from child; delete if implicit w/o children */
};
DDS_EXPORT dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delstate);
DDS_EXPORT dds_return_t
dds_entity_pin (
dds_entity_t hdl,
@ -110,19 +105,13 @@ dds_entity_lock(
dds_entity_kind_t kind,
dds_entity **e);
DDS_EXPORT dds_return_t
dds_entity_lock_for_create(
dds_entity_t hdl,
dds_entity_kind_t kind,
dds_entity **e);
DDS_EXPORT void
dds_entity_unlock(dds_entity *e);
DDS_EXPORT dds_return_t
dds_entity_observer_register(
dds_entity *observed,
dds_entity *observer,
dds_waitset *observer,
dds_entity_callback_t cb,
dds_entity_attach_callback_t attach_cb, void *attach_arg,
dds_entity_delete_callback_t delete_cb);
@ -130,7 +119,8 @@ dds_entity_observer_register(
DDS_EXPORT dds_return_t
dds_entity_observer_unregister(
dds_entity *observed,
dds_entity *observer);
dds_waitset *observer,
bool invoke_delete_cb);
DDS_EXPORT dds_return_t
dds_generic_unimplemented_operation_manykinds(

View file

@ -69,6 +69,13 @@ typedef int32_t dds_handle_t;
* This handlelink is invalid after the related handle is deleted and should
* never be used afterwards.
*/
/* Closing & closed can be combined, but having two gives a means for enforcing
that close() be called first, then close_wait(), and then delete(). */
#define HDL_FLAG_CLOSING (0x80000000u)
#define HDL_FLAG_CLOSED (0x40000000u)
#define HDL_FLAG_PENDING (0x20000000u)
struct dds_handle_link {
dds_handle_t hdl;
ddsrt_atomic_uint32_t cnt_flags;
@ -119,6 +126,7 @@ DDS_EXPORT dds_return_t
dds_handle_register_special (
struct dds_handle_link *link, dds_handle_t handle);
DDS_EXPORT void dds_handle_unpend (struct dds_handle_link *link);
/*
* This will close the handle. All information remains, only new claims will
@ -127,10 +135,9 @@ dds_handle_register_special (
* This is a noop on an already closed handle.
*/
DDS_EXPORT void
dds_handle_close(
dds_handle_close_wait (
struct dds_handle_link *link);
/*
* This will remove the handle related information from the server administration
* to free up space.
@ -142,8 +149,7 @@ dds_handle_close(
*/
DDS_EXPORT int32_t
dds_handle_delete(
struct dds_handle_link *link,
dds_time_t timeout);
struct dds_handle_link *link);
/*
@ -157,6 +163,11 @@ dds_handle_pin(
dds_handle_t hdl,
struct dds_handle_link **entity);
DDS_EXPORT int32_t
dds_handle_pin_and_ref(
dds_handle_t hdl,
struct dds_handle_link **entity);
DDS_EXPORT void
dds_handle_repin(
@ -180,14 +191,15 @@ dds_handle_unpin(
* break of your process and release the handle, making the deletion
* possible.
*/
DDS_EXPORT bool
dds_handle_is_closed(
struct dds_handle_link *link);
DDS_EXPORT void dds_handle_add_ref (struct dds_handle_link *link);
DDS_EXPORT bool dds_handle_drop_ref (struct dds_handle_link *link);
DDS_EXPORT inline bool dds_handle_is_closed (struct dds_handle_link *link) {
return (ddsrt_atomic_ld32 (&link->cnt_flags) & (HDL_FLAG_CLOSED | HDL_FLAG_CLOSING)) != 0;
}
#if defined (__cplusplus)
}
#endif

View file

@ -90,29 +90,32 @@ struct dds_listener {
/* Entity flag values */
#define DDS_ENTITY_ENABLED 0x0001u
#define DDS_ENTITY_IMPLICIT 0x0002u
#define DDS_ENTITY_ENABLED ((uint32_t) 0x1) /* DDS "enabled" state */
#define DDS_ENTITY_IMPLICIT ((uint32_t) 0x2) /* implicit ones get deleted when the last child is deleted */
struct dds_domain;
struct dds_entity;
typedef struct dds_entity_deriver {
/* Close can be used to terminate (blocking) actions on a entity before actually deleting it. */
dds_return_t (*close) (struct dds_entity *e) ddsrt_nonnull_all;
/* Pending close can be used to terminate (blocking) actions on a entity before actually deleting it. */
void (*interrupt) (struct dds_entity *e) ddsrt_nonnull_all;
/* Close can be used to do ... */
void (*close) (struct dds_entity *e) ddsrt_nonnull_all;
/* Delete is used to actually free the entity. */
dds_return_t (*delete) (struct dds_entity *e) ddsrt_nonnull_all;
dds_return_t (*set_qos) (struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all;
dds_return_t (*validate_status) (uint32_t mask);
} dds_entity_deriver;
typedef void (*dds_entity_callback_t) (struct dds_entity *observer, dds_entity_t observed, uint32_t status);
typedef void (*dds_entity_attach_callback_t) (struct dds_entity *observer, struct dds_entity *observed, void *attach_arg);
typedef void (*dds_entity_delete_callback_t) (struct dds_entity *observer, dds_entity_t observed);
struct dds_waitset;
typedef void (*dds_entity_callback_t) (struct dds_waitset *observer, dds_entity_t observed, uint32_t status);
typedef bool (*dds_entity_attach_callback_t) (struct dds_waitset *observer, struct dds_entity *observed, void *attach_arg);
typedef void (*dds_entity_delete_callback_t) (struct dds_waitset *observer, dds_entity_t observed);
typedef struct dds_entity_observer {
dds_entity_callback_t m_cb;
dds_entity_delete_callback_t m_delete_cb;
struct dds_entity *m_observer;
struct dds_waitset *m_observer;
struct dds_entity_observer *m_next;
} dds_entity_observer;
@ -135,6 +138,7 @@ typedef struct dds_entity {
(no hierarchical relationship there)
- locking topic::m_mutex while holding {reader,writer}::m_mutex
- locking observers_lock while holding m_mutex
- locking waitset::wait_lock
*/
ddsrt_mutex_t m_mutex;
ddsrt_cond_t m_cond;
@ -148,6 +152,7 @@ typedef struct dds_entity {
ddsrt_cond_t m_observers_cond;
dds_listener_t m_listener; /* [m_observers_lock] */
uint32_t m_cb_count; /* [m_observers_lock] */
uint32_t m_cb_pending_count; /* [m_observers_lock] */
dds_entity_observer *m_observers; /* [m_observers_lock] */
} dds_entity;
@ -167,13 +172,17 @@ extern const struct dds_entity_deriver dds_entity_deriver_domain;
extern const struct dds_entity_deriver dds_entity_deriver_cyclonedds;
extern const struct dds_entity_deriver *dds_entity_deriver_table[];
dds_return_t dds_entity_deriver_dummy_close (struct dds_entity *e);
void dds_entity_deriver_dummy_interrupt (struct dds_entity *e);
void dds_entity_deriver_dummy_close (struct dds_entity *e);
dds_return_t dds_entity_deriver_dummy_delete (struct dds_entity *e);
dds_return_t dds_entity_deriver_dummy_set_qos (struct dds_entity *e, const dds_qos_t *qos, bool enabled);
dds_return_t dds_entity_deriver_dummy_validate_status (uint32_t mask);
inline dds_return_t dds_entity_deriver_close (struct dds_entity *e) {
return (dds_entity_deriver_table[e->m_kind]->close) (e);
inline void dds_entity_deriver_interrupt (struct dds_entity *e) {
(dds_entity_deriver_table[e->m_kind]->interrupt) (e);
}
inline void dds_entity_deriver_close (struct dds_entity *e) {
(dds_entity_deriver_table[e->m_kind]->close) (e);
}
inline dds_return_t dds_entity_deriver_delete (struct dds_entity *e) {
return dds_entity_deriver_table[e->m_kind]->delete (e);
@ -314,9 +323,15 @@ typedef struct dds_attachment {
typedef struct dds_waitset {
dds_entity m_entity;
size_t nentities; /* [m_entity.m_mutex] */
size_t ntriggered; /* [m_entity.m_mutex] */
dds_attachment *entities; /* [m_entity.m_mutex] 0 .. ntriggered are triggred, ntriggred .. nentities are not */
/* Need a lock other than m_entity.m_mutex because the locking order an entity lock may not be
acquired while holding an ancestor's lock, but a waitset must be capable of triggering on
events on its parent */
ddsrt_mutex_t wait_lock;
ddsrt_cond_t wait_cond;
size_t nentities; /* [wait_lock] */
size_t ntriggered; /* [wait_lock] */
dds_attachment *entities; /* [wait_lock] 0 .. ntriggered are triggred, ntriggred .. nentities are not */
} dds_waitset;
DDS_EXPORT extern dds_cyclonedds_entity dds_global;

View file

@ -33,6 +33,7 @@
static dds_return_t dds_domain_free (dds_entity *vdomain);
const struct dds_entity_deriver dds_entity_deriver_domain = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_entity_deriver_dummy_close,
.delete = dds_domain_free,
.set_qos = dds_entity_deriver_dummy_set_qos,
@ -168,6 +169,7 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
if (domain->gv.config.liveliness_monitoring)
ddsi_threadmon_register_domain (dds_global.threadmon, &domain->gv);
dds_entity_init_complete (&domain->m_entity);
return DDS_RETCODE_OK;
rtps_stop (&domain->gv);
@ -186,8 +188,7 @@ fail_rtps_init:
fail_rtps_config:
config_fini (domain->cfgst);
fail_config:
dds_handle_close (&domain->m_entity.m_hdllink);
dds_handle_delete (&domain->m_entity.m_hdllink, DDS_INFINITY);
dds_handle_delete (&domain->m_entity.m_hdllink);
return ret;
}
@ -317,7 +318,8 @@ void dds_write_set_batch (bool enable)
/* FIXME: get channels + latency budget working and get rid of this; in the mean time, any ugly hack will do. */
struct dds_domain *dom;
dds_domainid_t next_id = 0;
dds_init ();
if (dds_init () < 0)
return;
ddsrt_mutex_lock (&dds_global.m_mutex);
while ((dom = ddsrt_avl_lookup_succ_eq (&dds_domaintree_def, &dds_global.m_domains, &next_id)) != NULL)
{
@ -343,6 +345,5 @@ void dds_write_set_batch (bool enable)
}
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
/* FIXME */
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
}

View file

@ -44,8 +44,11 @@ const struct dds_entity_deriver *dds_entity_deriver_table[] = {
[DDS_KIND_CYCLONEDDS] = &dds_entity_deriver_cyclonedds
};
dds_return_t dds_entity_deriver_dummy_close (struct dds_entity *e) {
(void) e; return DDS_RETCODE_OK;
void dds_entity_deriver_dummy_interrupt (struct dds_entity *e) {
(void) e;
}
void dds_entity_deriver_dummy_close (struct dds_entity *e) {
(void) e;
}
dds_return_t dds_entity_deriver_dummy_delete (struct dds_entity *e) {
(void) e; return DDS_RETCODE_OK;
@ -57,7 +60,8 @@ dds_return_t dds_entity_deriver_dummy_validate_status (uint32_t mask) {
(void) mask; return DDS_RETCODE_ILLEGAL_OPERATION;
}
extern inline dds_return_t dds_entity_deriver_close (struct dds_entity *e);
extern inline void dds_entity_deriver_interrupt (struct dds_entity *e);
extern inline void dds_entity_deriver_close (struct dds_entity *e);
extern inline dds_return_t dds_entity_deriver_delete (struct dds_entity *e);
extern inline dds_return_t dds_entity_deriver_set_qos (struct dds_entity *e, const dds_qos_t *qos, bool enabled);
extern inline dds_return_t dds_entity_deriver_validate_status (struct dds_entity *e, uint32_t mask);
@ -75,7 +79,6 @@ const ddsrt_avl_treedef_t dds_entity_children_td = DDSRT_AVL_TREEDEF_INITIALIZER
static void dds_entity_observers_signal (dds_entity *observed, uint32_t status);
static void dds_entity_observers_signal_delete (dds_entity *observed);
static void dds_entity_observers_delete (dds_entity *observed);
void dds_entity_add_ref_locked (dds_entity *e)
{
@ -117,6 +120,7 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
e->m_kind = kind;
e->m_qos = qos;
e->m_cb_count = 0;
e->m_cb_pending_count = 0;
e->m_observers = NULL;
/* TODO: CHAM-96: Implement dynamic enabling of entity. */
@ -171,6 +175,11 @@ dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind
return (dds_entity_t) handle;
}
void dds_entity_init_complete (dds_entity *entity)
{
dds_handle_unpend (&entity->m_hdllink);
}
void dds_entity_register_child (dds_entity *parent, dds_entity *child)
{
assert (child->m_iid != 0);
@ -189,12 +198,6 @@ static dds_entity *next_non_topic_child (ddsrt_avl_tree_t *remaining_children)
return NULL;
}
enum delete_impl_state {
DIS_EXPLICIT, /* explicit delete on this entity */
DIS_FROM_PARENT, /* called because the parent is being deleted */
DIS_IMPLICIT /* called from child; delete if implicit w/o children */
};
#define TRACE_DELETE 0 /* FIXME: use DDS_LOG for this */
#if TRACE_DELETE
static const char *entity_kindstr (dds_entity_kind_t kind)
@ -229,11 +232,11 @@ static void print_delete (const dds_entity *e, enum delete_impl_state delstate ,
}
#endif
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate, dds_instance_handle_t iid);
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate);
dds_return_t dds_delete (dds_entity_t entity)
{
return dds_delete_impl (entity, DIS_EXPLICIT, 0);
return dds_delete_impl (entity, DIS_EXPLICIT);
}
void dds_entity_final_deinit_before_free (dds_entity *e)
@ -245,65 +248,95 @@ void dds_entity_final_deinit_before_free (dds_entity *e)
ddsrt_mutex_destroy (&e->m_observers_lock);
}
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate, dds_instance_handle_t iid)
static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate)
{
dds_entity *e;
dds_return_t ret;
if ((ret = dds_entity_pin (entity, &e)) < 0)
return ret;
else
return dds_delete_impl_pinned (e, delstate);
}
dds_return_t dds_delete_impl_pinned (dds_entity *e, enum delete_impl_state delstate)
{
dds_time_t timeout = DDS_SECS (10);
dds_entity *child;
dds_return_t ret;
dds_entity *e;
/* iid is used to guarantee that attempts at deleting implicit parents never touch the wrong
entity: there is a tiny chance that the parent got deleted in parallel and the handle has been
reused, but there is no risk of the iid getting reused. There is no such risk for the other
cases, so there require iid = 0. */
assert ((delstate == DIS_IMPLICIT) == (iid != 0));
if ((ret = dds_entity_pin (entity, &e)) < 0)
{
#if TRACE_DELETE
printf ("delete %"PRId32" - pin failed: %s\n", entity, dds_strretcode (ret));
#endif
return ret;
}
/* Any number of threads pinning it, possibly in delete, or having pinned it and
trying to acquire m_mutex */
ddsrt_mutex_lock (&e->m_mutex);
#if TRACE_DELETE
print_delete (e, delstate, iid);
#endif
/* If another thread was racing us in delete, it will have set the CLOSING flag
while holding m_mutex and we had better bail out. */
if (dds_handle_is_closed (&e->m_hdllink))
{
dds_entity_unlock (e);
return DDS_RETCODE_OK;
}
/* Ignore children calling up to delete an implicit parent if there are still
(or again) children */
if (delstate == DIS_IMPLICIT)
{
/* Called after deleting a child; only delete if implicit & no remaining children. There is a
tiny chance that the parent got deleted in parallel and the handle has been reused, but there
is no risk of the iid getting reused. */
if (!(e->m_iid == iid && (e->m_flags & DDS_ENTITY_IMPLICIT) && ddsrt_avl_is_empty (&e->m_children)))
if (!((e->m_flags & DDS_ENTITY_IMPLICIT) && ddsrt_avl_is_empty (&e->m_children)))
{
ddsrt_mutex_unlock (&e->m_mutex);
dds_entity_unpin (e);
dds_entity_unlock (e);
return DDS_RETCODE_OK;
}
}
/* Drop reference, atomically setting CLOSING if no other references remain.
FIXME: that's not quite right: this is really only for topics. After a call
to delete, the handle ought to become invalid even if the topic stays (and
should perhaps even be revivable via find_topic). */
if (! dds_handle_drop_ref (&e->m_hdllink))
{
ddsrt_mutex_unlock (&e->m_mutex);
dds_entity_unpin (e);
dds_entity_unlock (e);
return DDS_RETCODE_OK;
}
/* No threads pinning it anymore, no need to worry about other threads deleting
it, but there can still be plenty of threads that have it pinned and are
trying to acquire m_mutex to do their thing (including creating children,
attaching to waitsets, &c.) */
assert (dds_handle_is_closed (&e->m_hdllink));
/* Trigger blocked threads (and, still, delete DDSI reader/writer to trigger
continued cleanup -- while that's quite safe given that GUIDs don't get
reused quickly, it needs an update) */
dds_entity_deriver_interrupt (e);
/* Prevent further listener invocations; dds_set_status_mask locks m_mutex and
checks for a pending delete to guarantee that once we clear the mask here,
no new listener invocations will occur beyond those already in progress.
(FIXME: or committed to? I think in-progress only, better check.) */
ddsrt_mutex_lock (&e->m_observers_lock);
if (entity_has_status (e))
ddsrt_atomic_and32 (&e->m_status.m_status_and_mask, SAM_STATUS_MASK);
dds_reset_listener (&e->m_listener);
/* Signal observers that this entity will be deleted and wait for
all listeners to complete. */
ddsrt_mutex_unlock (&e->m_mutex);
dds_entity_observers_signal_delete (e);
while (e->m_cb_count > 0)
/* wait for all listeners to complete - FIXME: rely on pincount instead?
that would require all listeners to pin the entity instead, but it
would prevent them from doing much. */
while (e->m_cb_pending_count > 0)
ddsrt_cond_wait (&e->m_observers_cond, &e->m_observers_lock);
ddsrt_mutex_unlock (&e->m_observers_lock);
/* Wait for all other threads to unpin the entity */
dds_handle_close_wait (&e->m_hdllink);
/* Pin count dropped to one with CLOSING flag set: no other threads still
in operations involving this entity */
dds_entity_observers_signal_delete (e);
dds_entity_deriver_close (e);
/*
* Recursively delete children.
*
@ -321,47 +354,36 @@ static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state
*
* To circumvent the problem. We ignore topics in the first loop.
*/
ret = DDS_RETCODE_OK;
ddsrt_mutex_lock (&e->m_mutex);
while ((child = next_non_topic_child (&e->m_children)) && ret == DDS_RETCODE_OK)
while ((child = next_non_topic_child (&e->m_children)) != NULL)
{
/* FIXME: dds_delete can fail if the child is being deleted in parallel, in which case: wait */
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT, 0);
(void) dds_delete_impl (child_handle, DIS_FROM_PARENT);
ddsrt_mutex_lock (&e->m_mutex);
}
while ((child = ddsrt_avl_find_min (&dds_entity_children_td, &e->m_children)) != NULL && ret == DDS_RETCODE_OK)
while ((child = ddsrt_avl_find_min (&dds_entity_children_td, &e->m_children)) != NULL)
{
assert (dds_entity_kind (child) == DDS_KIND_TOPIC);
dds_entity_t child_handle = child->m_hdllink.hdl;
ddsrt_mutex_unlock (&e->m_mutex);
ret = dds_delete_impl (child_handle, DIS_FROM_PARENT, 0);
(void) dds_delete_impl (child_handle, DIS_FROM_PARENT);
ddsrt_mutex_lock (&e->m_mutex);
}
ddsrt_mutex_unlock (&e->m_mutex);
if (ret == DDS_RETCODE_OK)
ret = dds_entity_deriver_close (e);
dds_entity_unpin (e);
/* FIXME: deleting shouldn't fail, and bailing out halfway through deleting is also
bad */
if (ret != DDS_RETCODE_OK)
return ret;
/* The dds_handle_delete will wait until the last active claim on that handle is
released. It is possible that this last release will be done by a thread that was
kicked during the close(). */
if ((ret = dds_handle_delete (&e->m_hdllink, timeout)) != DDS_RETCODE_OK)
return ret;
/* Remove all possible observers. */
dds_entity_observers_delete (e);
ret = dds_handle_delete (&e->m_hdllink);
assert (ret == DDS_RETCODE_OK);
(void) ret;
/* Remove from parent; schedule deletion of parent if it was created implicitly, no
longer has any remaining children, and we didn't arrive here as a consequence of
deleting the parent. */
dds_entity_t parent_handle = 0;
dds_instance_handle_t parent_iid = 0;
dds_entity *parent_to_delete = NULL;
if (e->m_parent != NULL)
{
struct dds_entity * const p = e->m_parent;
@ -369,11 +391,13 @@ static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state
ddsrt_mutex_lock (&p->m_mutex);
assert (ddsrt_avl_lookup (&dds_entity_children_td, &p->m_children, &e->m_iid) != NULL);
ddsrt_avl_delete (&dds_entity_children_td, &p->m_children, e);
/* trigger parent in case it is waiting in delete */
ddsrt_cond_broadcast (&p->m_cond);
if (delstate != DIS_FROM_PARENT && (p->m_flags & DDS_ENTITY_IMPLICIT) && ddsrt_avl_is_empty (&p->m_children))
{
parent_handle = p->m_hdllink.hdl;
parent_iid = p->m_iid;
if ((ret = dds_entity_pin (p->m_hdllink.hdl, &parent_to_delete)) < 0)
parent_to_delete = NULL;
}
ddsrt_mutex_unlock (&p->m_mutex);
@ -388,14 +412,18 @@ static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state
if (ret == DDS_RETCODE_NO_DATA)
ret = DDS_RETCODE_OK;
else if (ret != DDS_RETCODE_OK)
{
if (parent_to_delete)
dds_entity_unpin (parent_to_delete);
return ret;
}
else
{
dds_entity_final_deinit_before_free (e);
dds_free (e);
}
return (parent_handle != 0) ? dds_delete_impl (parent_handle, DIS_IMPLICIT, parent_iid) : DDS_RETCODE_OK;
return (parent_to_delete != NULL) ? dds_delete_impl_pinned (parent_to_delete, DIS_IMPLICIT) : DDS_RETCODE_OK;
}
bool dds_entity_in_scope (const dds_entity *e, const dds_entity *root)
@ -459,10 +487,16 @@ dds_return_t dds_get_children (dds_entity_t entity, dds_entity_t *children, size
struct dds_entity *c;
ddsrt_mutex_lock (&e->m_mutex);
for (c = ddsrt_avl_iter_first (&dds_entity_children_td, &e->m_children, &it); c != NULL; c = ddsrt_avl_iter_next (&it))
{
struct dds_entity *tmp;
/* Attempt at pinning the entity will fail if it is still pending */
if (dds_entity_pin (c->m_hdllink.hdl, &tmp) == DDS_RETCODE_OK)
{
if (n < size)
children[n] = c->m_hdllink.hdl;
n++;
dds_entity_unpin (tmp);
}
}
ddsrt_mutex_unlock (&e->m_mutex);
dds_entity_unpin (e);
@ -836,7 +870,7 @@ static void pushdown_listener (dds_entity *e)
ddsrt_mutex_unlock (&e->m_mutex);
ddsrt_mutex_lock (&c->m_observers_lock);
while (c->m_cb_count > 0)
while (c->m_cb_pending_count > 0)
ddsrt_cond_wait (&c->m_observers_cond, &c->m_observers_lock);
ddsrt_mutex_lock (&e->m_observers_lock);
@ -864,7 +898,7 @@ dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listen
return rc;
ddsrt_mutex_lock (&e->m_observers_lock);
while (e->m_cb_count > 0)
while (e->m_cb_pending_count > 0)
ddsrt_cond_wait (&e->m_observers_cond, &e->m_observers_lock);
/* new listener is constructed by combining "listener" with the ancestral listeners;
@ -965,17 +999,26 @@ dds_return_t dds_set_status_mask (dds_entity_t entity, uint32_t mask)
if ((mask & ~SAM_STATUS_MASK) != 0)
return DDS_RETCODE_BAD_PARAMETER;
if ((ret = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
if ((ret = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
return ret;
if (dds_handle_is_closed (&e->m_hdllink))
{
/* The sole reason for locking the mutex was so we can do this atomically with
respect to dds_delete (which in turn requires checking whether the handle
is still open), because delete relies on the mask to shut down all listener
invocations. */
dds_entity_unlock (e);
return DDS_RETCODE_PRECONDITION_NOT_MET;
}
if ((ret = dds_entity_deriver_validate_status (e, mask)) == DDS_RETCODE_OK)
{
assert (entity_has_status (e));
ddsrt_mutex_lock (&e->m_observers_lock);
while (e->m_cb_count > 0)
while (e->m_cb_pending_count > 0)
ddsrt_cond_wait (&e->m_observers_cond, &e->m_observers_lock);
/* Don't block internal status triggers. */
uint32_t old, new;
do {
old = ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask);
@ -983,7 +1026,7 @@ dds_return_t dds_set_status_mask (dds_entity_t entity, uint32_t mask)
} while (!ddsrt_atomic_cas32 (&e->m_status.m_status_and_mask, old, new));
ddsrt_mutex_unlock (&e->m_observers_lock);
}
dds_entity_unpin (e);
dds_entity_unlock (e);
return ret;
}
@ -1019,7 +1062,6 @@ static dds_return_t dds_readtake_status (dds_entity_t entity, uint32_t *status,
return ret;
}
dds_return_t dds_read_status (dds_entity_t entity, uint32_t *status, uint32_t mask)
{
return dds_readtake_status (entity, status, mask, false);
@ -1099,24 +1141,10 @@ dds_return_t dds_entity_lock (dds_entity_t hdl, dds_entity_kind_t kind, dds_enti
}
}
dds_return_t dds_entity_lock_for_create (dds_entity_t hdl, dds_entity_kind_t kind, dds_entity **eptr)
{
dds_return_t res;
if ((res = dds_entity_lock (hdl, kind, eptr)) < 0)
return res;
else if (!dds_handle_is_closed (&(*eptr)->m_hdllink))
return DDS_RETCODE_OK;
else
{
ddsrt_mutex_unlock (&(*eptr)->m_mutex);
return DDS_RETCODE_BAD_PARAMETER;
}
}
void dds_entity_unlock (dds_entity *e)
{
ddsrt_mutex_unlock (&e->m_mutex);
dds_handle_unpin (&e->m_hdllink);
dds_entity_unpin (e);
}
dds_return_t dds_triggered (dds_entity_t entity)
@ -1134,7 +1162,7 @@ dds_return_t dds_triggered (dds_entity_t entity)
return ret;
}
static bool in_observer_list_p (const struct dds_entity *observed, const dds_entity *observer)
static bool in_observer_list_p (const struct dds_entity *observed, const dds_waitset *observer)
{
dds_entity_observer *cur;
for (cur = observed->m_observers; cur != NULL; cur = cur->m_next)
@ -1143,13 +1171,15 @@ static bool in_observer_list_p (const struct dds_entity *observed, const dds_ent
return false;
}
dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *observer, dds_entity_callback_t cb, dds_entity_attach_callback_t attach_cb, void *attach_arg, dds_entity_delete_callback_t delete_cb)
dds_return_t dds_entity_observer_register (dds_entity *observed, dds_waitset *observer, dds_entity_callback_t cb, dds_entity_attach_callback_t attach_cb, void *attach_arg, dds_entity_delete_callback_t delete_cb)
{
dds_return_t rc;
assert (observed);
ddsrt_mutex_lock (&observed->m_observers_lock);
if (in_observer_list_p (observed, observer))
rc = DDS_RETCODE_PRECONDITION_NOT_MET;
else if (!attach_cb (observer, observed, attach_arg))
rc = DDS_RETCODE_BAD_PARAMETER;
else
{
dds_entity_observer *o = ddsrt_malloc (sizeof (dds_entity_observer));
@ -1158,14 +1188,13 @@ dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *obs
o->m_observer = observer;
o->m_next = observed->m_observers;
observed->m_observers = o;
attach_cb (observer, observed, attach_arg);
rc = DDS_RETCODE_OK;
}
ddsrt_mutex_unlock (&observed->m_observers_lock);
return rc;
}
dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_entity *observer)
dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_waitset *observer, bool invoke_delete_cb)
{
dds_return_t rc;
dds_entity_observer *prev, *idx;
@ -1186,6 +1215,7 @@ dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_entity *o
observed->m_observers = idx->m_next;
else
prev->m_next = idx->m_next;
if (invoke_delete_cb)
idx->m_delete_cb (idx->m_observer, observed->m_hdllink.hdl);
ddsrt_free (idx);
rc = DDS_RETCODE_OK;
@ -1194,21 +1224,6 @@ dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_entity *o
return rc;
}
static void dds_entity_observers_delete (dds_entity *observed)
{
dds_entity_observer *idx;
ddsrt_mutex_lock (&observed->m_observers_lock);
idx = observed->m_observers;
while (idx != NULL)
{
dds_entity_observer *next = idx->m_next;
ddsrt_free (idx);
idx = next;
}
observed->m_observers = NULL;
ddsrt_mutex_unlock (&observed->m_observers_lock);
}
static void dds_entity_observers_signal (dds_entity *observed, uint32_t status)
{
for (dds_entity_observer *idx = observed->m_observers; idx; idx = idx->m_next)
@ -1217,8 +1232,16 @@ static void dds_entity_observers_signal (dds_entity *observed, uint32_t status)
static void dds_entity_observers_signal_delete (dds_entity *observed)
{
for (dds_entity_observer *idx = observed->m_observers; idx; idx = idx->m_next)
dds_entity_observer *idx;
idx = observed->m_observers;
while (idx != NULL)
{
dds_entity_observer *next = idx->m_next;
idx->m_delete_cb (idx->m_observer, observed->m_hdllink.hdl);
ddsrt_free (idx);
idx = next;
}
observed->m_observers = NULL;
}
void dds_entity_status_signal (dds_entity *e, uint32_t status)
@ -1247,11 +1270,13 @@ void dds_entity_trigger_set (dds_entity *e, uint32_t t)
{
assert (! entity_has_status (e));
uint32_t oldst;
ddsrt_mutex_lock (&e->m_observers_lock);
do {
oldst = ddsrt_atomic_ld32 (&e->m_status.m_trigger);
} while (!ddsrt_atomic_cas32 (&e->m_status.m_trigger, oldst, t));
if (oldst == 0 && t != 0)
dds_entity_observers_signal (e, t);
ddsrt_mutex_unlock (&e->m_observers_lock);
}
dds_entity_t dds_get_topic (dds_entity_t entity)

View file

@ -23,6 +23,7 @@
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_guardcond)
const struct dds_entity_deriver dds_entity_deriver_guardcondition = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_entity_deriver_dummy_close,
.delete = dds_entity_deriver_dummy_delete,
.set_qos = dds_entity_deriver_dummy_set_qos,
@ -41,7 +42,7 @@ dds_entity_t dds_create_guardcondition (dds_entity_t owner)
if ((rc = dds_init ()) < 0)
return rc;
if ((rc = dds_entity_lock_for_create (owner, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
if ((rc = dds_entity_lock (owner, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
goto err_entity_lock;
switch (dds_entity_kind (e))
@ -59,14 +60,15 @@ dds_entity_t dds_create_guardcondition (dds_entity_t owner)
dds_entity_t hdl = dds_entity_init (&gcond->m_entity, e, DDS_KIND_COND_GUARD, NULL, NULL, 0);
gcond->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (e, &gcond->m_entity);
dds_entity_init_complete (&gcond->m_entity);
dds_entity_unlock (e);
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
return hdl;
err_entity_kind:
dds_entity_unlock (e);
err_entity_lock:
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
return rc;
}

View file

@ -20,27 +20,10 @@
#include "dds__handles.h"
#include "dds__types.h"
/* FIXME: this code isn't really correct when USE_CHH is set:
- the DDS entity code doesn't really play by the awake/asleep mechanism
- there is no provision in the code for a handle being deleted concurrent to a lookup,
that is, deleting handle links should also go through the GC
entity framework needs a fair bit of rewriting anyway ... */
#define USE_CHH 0
#define HDL_FLAG_CLOSED (0x80000000u)
/* ref count: # outstanding references to this handle/object (not so sure it is
ideal to have a one-to-one mapping between the two, but that is what the rest
of the code assumes at the moment); so this limits one to having, e.g., no
more than 64k endpoints referencing the same topic */
#define HDL_REFCOUNT_MASK (0x0ffff000u)
#define HDL_REFCOUNT_UNIT (0x00001000u)
#define HDL_REFCOUNT_SHIFT 12
/* pin count: # concurrent operations, so allowing up to 4096 threads had better
be enough ... */
#define HDL_PINCOUNT_MASK (0x00000fffu)
#define HDL_PINCOUNT_UNIT (0x00000001u)
/* Maximum number of handles is INT32_MAX - 1, but as the allocator relies on a
random generator for finding a free one, the time spent in the dds_handle_create
@ -50,11 +33,7 @@
#define MAX_HANDLES (INT32_MAX / 128)
struct dds_handle_server {
#if USE_CHH
struct ddsrt_chh *ht;
#else
struct ddsrt_hh *ht;
#endif
size_t count;
ddsrt_mutex_t lock;
ddsrt_cond_t cond;
@ -78,52 +57,46 @@ static int handle_equal (const void *va, const void *vb)
dds_return_t dds_handle_server_init (void)
{
#if USE_CHH
handles.ht = ddsrt_chh_new (128, handle_hash, handle_equal, free_via_gc);
#else
/* called with ddsrt's singleton mutex held (see dds_init/fini) */
if (handles.ht == NULL)
{
handles.ht = ddsrt_hh_new (128, handle_hash, handle_equal);
#endif
handles.count = 0;
ddsrt_mutex_init (&handles.lock);
ddsrt_cond_init (&handles.cond);
}
return DDS_RETCODE_OK;
}
void dds_handle_server_fini (void)
{
#if USE_CHH
#ifndef NDEBUG
struct ddsrt_chh_iter it;
assert (ddsrt_chh_iter_first (handles.ht, &it) == NULL);
#endif
ddsrt_chh_free (handles.ht);
#else /* USE_CHH */
/* called with ddsrt's singleton mutex held (see dds_init/fini) */
if (handles.ht != NULL)
{
#ifndef NDEBUG
struct ddsrt_hh_iter it;
for (struct dds_handle_link *link = ddsrt_hh_iter_first (handles.ht, &it); link != NULL; link = ddsrt_hh_iter_next (&it))
{
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
DDS_ERROR ("handle %"PRId32" pin %"PRIu32" ref %"PRIu32" %s\n", link->hdl,
DDS_ERROR ("handle %"PRId32" pin %"PRIu32" refc %"PRIu32"%s%s%s\n", link->hdl,
cf & HDL_PINCOUNT_MASK, (cf & HDL_REFCOUNT_MASK) >> HDL_REFCOUNT_SHIFT,
cf & HDL_FLAG_CLOSED ? "closed" : "open");
cf & HDL_FLAG_PENDING ? " pending" : "",
cf & HDL_FLAG_CLOSING ? " closing" : "",
cf & HDL_FLAG_CLOSED ? " closed" : "");
}
assert (ddsrt_hh_iter_first (handles.ht, &it) == NULL);
#endif
ddsrt_hh_free (handles.ht);
#endif /* USE_CHH */
ddsrt_cond_destroy (&handles.cond);
ddsrt_mutex_destroy (&handles.lock);
handles.ht = NULL;
}
}
#if USE_CHH
static bool hhadd (struct ddsrt_chh *ht, void *elem) { return ddsrt_chh_add (ht, elem); }
#else
static bool hhadd (struct ddsrt_hh *ht, void *elem) { return ddsrt_hh_add (ht, elem); }
#endif
static dds_handle_t dds_handle_create_int (struct dds_handle_link *link)
{
ddsrt_atomic_st32 (&link->cnt_flags, HDL_REFCOUNT_UNIT);
ddsrt_atomic_st32 (&link->cnt_flags, HDL_FLAG_PENDING | HDL_REFCOUNT_UNIT | 1u);
do {
do {
link->hdl = (int32_t) (ddsrt_random () & INT32_MAX);
@ -134,13 +107,7 @@ static dds_handle_t dds_handle_create_int (struct dds_handle_link *link)
dds_handle_t dds_handle_create (struct dds_handle_link *link)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#endif
dds_handle_t ret;
#if USE_CHH
thread_state_awake (ts1);
#endif
ddsrt_mutex_lock (&handles.lock);
if (handles.count == MAX_HANDLES)
{
@ -150,32 +117,18 @@ dds_handle_t dds_handle_create (struct dds_handle_link *link)
else
{
handles.count++;
#if USE_CHH
ddsrt_mutex_unlock (&handles.lock);
ret = dds_handle_create_int (link);
#else
ret = dds_handle_create_int (link);
ddsrt_mutex_unlock (&handles.lock);
#endif
assert (ret > 0);
}
#if USE_CHH
thread_state_asleep (ts1);
#endif
return ret;
}
dds_return_t dds_handle_register_special (struct dds_handle_link *link, dds_handle_t handle)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#endif
dds_return_t ret;
if (handle <= 0)
return DDS_RETCODE_BAD_PARAMETER;
#if USE_CHH
thread_state_awake (ts1);
#endif
ddsrt_mutex_lock (&handles.lock);
if (handles.count == MAX_HANDLES)
{
@ -185,64 +138,46 @@ dds_return_t dds_handle_register_special (struct dds_handle_link *link, dds_hand
else
{
handles.count++;
ddsrt_atomic_st32 (&link->cnt_flags, HDL_REFCOUNT_UNIT);
ddsrt_atomic_st32 (&link->cnt_flags, HDL_FLAG_PENDING | HDL_REFCOUNT_UNIT | 1u);
link->hdl = handle;
#if USE_CHH
ddsrt_mutex_unlock (&handles.lock);
if (hhadd (handles.ht, link))
ret = handle;
else
ret = DDS_RETCODE_BAD_PARAMETER;
return link->hdl;
#else
if (hhadd (handles.ht, link))
ret = handle;
else
ret = DDS_RETCODE_BAD_PARAMETER;
ddsrt_mutex_unlock (&handles.lock);
#endif
assert (ret > 0);
}
#if USE_CHH
thread_state_asleep (ts1);
#endif
return ret;
}
void dds_handle_close (struct dds_handle_link *link)
void dds_handle_unpend (struct dds_handle_link *link)
{
ddsrt_atomic_or32 (&link->cnt_flags, HDL_FLAG_CLOSED);
#ifndef NDEBUG
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
assert ((cf & HDL_FLAG_PENDING));
assert (!(cf & HDL_FLAG_CLOSED));
assert (!(cf & HDL_FLAG_CLOSING));
assert ((cf & HDL_REFCOUNT_MASK) >= HDL_REFCOUNT_UNIT);
assert ((cf & HDL_PINCOUNT_MASK) >= 1u);
#endif
ddsrt_atomic_and32 (&link->cnt_flags, ~HDL_FLAG_PENDING);
dds_handle_unpin (link);
}
int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
int32_t dds_handle_delete (struct dds_handle_link *link)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#ifndef NDEBUG
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
if (!(cf & HDL_FLAG_PENDING))
{
assert (cf & HDL_FLAG_CLOSING);
assert (cf & HDL_FLAG_CLOSED);
}
assert ((cf & HDL_REFCOUNT_MASK) == 0u);
assert ((cf & HDL_PINCOUNT_MASK) == 1u);
#endif
assert (ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED);
ddsrt_mutex_lock (&handles.lock);
if ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_PINCOUNT_MASK) != 0)
{
/* FIXME: there is no sensible solution when this times out, so it must
never do that ... */
const dds_time_t abstimeout = dds_time () + timeout;
while ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_PINCOUNT_MASK) != 0)
{
if (!ddsrt_cond_waituntil (&handles.cond, &handles.lock, abstimeout))
{
ddsrt_mutex_unlock (&handles.lock);
fprintf (stderr, "** timeout in handle_delete **\n");
return DDS_RETCODE_TIMEOUT;
}
}
}
#if USE_CHH
thread_state_awake (ts1);
int x = ddsrt_chh_remove (handles.ht, link);
thread_state_asleep (ts1);
#else
int x = ddsrt_hh_remove (handles.ht, link);
#endif
assert(x);
(void)x;
assert (handles.count > 0);
@ -251,11 +186,8 @@ int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
return DDS_RETCODE_OK;
}
int32_t dds_handle_pin (dds_handle_t hdl, struct dds_handle_link **link)
static int32_t dds_handle_pin_int (dds_handle_t hdl, uint32_t delta, struct dds_handle_link **link)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#endif
struct dds_handle_link dummy = { .hdl = hdl };
int32_t rc;
/* it makes sense to check here for initialization: the first thing any operation
@ -269,41 +201,41 @@ int32_t dds_handle_pin (dds_handle_t hdl, struct dds_handle_link **link)
if (handles.ht == NULL)
return DDS_RETCODE_PRECONDITION_NOT_MET;
#if USE_CHH
thread_state_awake (ts1);
*link = ddsrt_chh_lookup (handles.ht, &dummy);
#else
ddsrt_mutex_lock (&handles.lock);
*link = ddsrt_hh_lookup (handles.ht, &dummy);
#endif
if (*link == NULL)
rc = DDS_RETCODE_BAD_PARAMETER;
else
{
uint32_t cnt_flags;
uint32_t cf;
/* Assume success; bail out if the object turns out to be in the process of
being deleted */
rc = DDS_RETCODE_OK;
do {
cnt_flags = ddsrt_atomic_ld32 (&(*link)->cnt_flags);
if (cnt_flags & HDL_FLAG_CLOSED)
cf = ddsrt_atomic_ld32 (&(*link)->cnt_flags);
if (cf & (HDL_FLAG_CLOSED | HDL_FLAG_CLOSING | HDL_FLAG_PENDING))
{
rc = DDS_RETCODE_BAD_PARAMETER;
break;
}
} while (!ddsrt_atomic_cas32 (&(*link)->cnt_flags, cnt_flags, cnt_flags + HDL_PINCOUNT_UNIT));
} while (!ddsrt_atomic_cas32 (&(*link)->cnt_flags, cf, cf + delta));
}
#if USE_CHH
thread_state_asleep (ts1);
#else
ddsrt_mutex_unlock (&handles.lock);
#endif
return rc;
}
int32_t dds_handle_pin (dds_handle_t hdl, struct dds_handle_link **link)
{
return dds_handle_pin_int (hdl, 1u, link);
}
int32_t dds_handle_pin_and_ref (dds_handle_t hdl, struct dds_handle_link **link)
{
return dds_handle_pin_int (hdl, HDL_REFCOUNT_UNIT + 1u, link);
}
void dds_handle_repin (struct dds_handle_link *link)
{
DDSRT_STATIC_ASSERT (HDL_PINCOUNT_UNIT == 1);
uint32_t x = ddsrt_atomic_inc32_nv (&link->cnt_flags);
assert (!(x & HDL_FLAG_CLOSED));
(void) x;
@ -311,13 +243,20 @@ void dds_handle_repin (struct dds_handle_link *link)
void dds_handle_unpin (struct dds_handle_link *link)
{
DDSRT_STATIC_ASSERT (HDL_PINCOUNT_UNIT == 1);
if ((ddsrt_atomic_dec32_ov (&link->cnt_flags) & (HDL_FLAG_CLOSED | HDL_PINCOUNT_MASK)) == (HDL_FLAG_CLOSED | HDL_PINCOUNT_UNIT))
{
#ifndef NDEBUG
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
assert (!(cf & HDL_FLAG_CLOSED));
if (cf & HDL_FLAG_CLOSING)
assert ((cf & HDL_PINCOUNT_MASK) > 1u);
else
assert ((cf & HDL_PINCOUNT_MASK) >= 1u);
#endif
ddsrt_mutex_lock (&handles.lock);
if ((ddsrt_atomic_dec32_nv (&link->cnt_flags) & (HDL_FLAG_CLOSING | HDL_PINCOUNT_MASK)) == (HDL_FLAG_CLOSING | 1u))
{
ddsrt_cond_broadcast (&handles.cond);
ddsrt_mutex_unlock (&handles.lock);
}
ddsrt_mutex_unlock (&handles.lock);
}
void dds_handle_add_ref (struct dds_handle_link *link)
@ -334,12 +273,27 @@ bool dds_handle_drop_ref (struct dds_handle_link *link)
if ((old & HDL_REFCOUNT_MASK) != HDL_REFCOUNT_UNIT)
new = old - HDL_REFCOUNT_UNIT;
else
new = (old - HDL_REFCOUNT_UNIT) | HDL_FLAG_CLOSED;
new = (old - HDL_REFCOUNT_UNIT) | HDL_FLAG_CLOSING;
} while (!ddsrt_atomic_cas32 (&link->cnt_flags, old, new));
return (new & HDL_REFCOUNT_MASK) == 0;
}
bool dds_handle_is_closed (struct dds_handle_link *link)
void dds_handle_close_wait (struct dds_handle_link *link)
{
return (ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED) != 0;
#ifndef NDEBUG
uint32_t cf = ddsrt_atomic_ld32 (&link->cnt_flags);
assert ((cf & HDL_FLAG_CLOSING));
assert (!(cf & HDL_FLAG_CLOSED));
assert ((cf & HDL_REFCOUNT_MASK) == 0u);
assert ((cf & HDL_PINCOUNT_MASK) >= 1u);
#endif
ddsrt_mutex_lock (&handles.lock);
while ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_PINCOUNT_MASK) != 1u)
ddsrt_cond_wait (&handles.cond, &handles.lock);
/* only one thread may call close_wait on a given handle */
assert (!(ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED));
ddsrt_atomic_or32 (&link->cnt_flags, HDL_FLAG_CLOSED);
ddsrt_mutex_unlock (&handles.lock);
}
extern inline bool dds_handle_is_closed (struct dds_handle_link *link);

View file

@ -33,10 +33,11 @@
#include "dds/ddsi/q_globals.h"
#include "dds/version.h"
static dds_return_t dds_close (struct dds_entity *e);
static void dds_close (struct dds_entity *e);
static dds_return_t dds_fini (struct dds_entity *e);
const struct dds_entity_deriver dds_entity_deriver_cyclonedds = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_close,
.delete = dds_fini,
.set_qos = dds_entity_deriver_dummy_set_qos,
@ -45,47 +46,34 @@ const struct dds_entity_deriver dds_entity_deriver_cyclonedds = {
dds_cyclonedds_entity dds_global;
enum dds_cyclonedds_state {
CDDS_STATE_ZERO,
CDDS_STATE_STARTING,
CDDS_STATE_READY,
CDDS_STATE_STOPPING
};
static enum dds_cyclonedds_state dds_state;
#define CDDS_STATE_ZERO 0u
#define CDDS_STATE_STARTING 1u
#define CDDS_STATE_READY 2u
#define CDDS_STATE_STOPPING 3u
static ddsrt_atomic_uint32_t dds_state = DDSRT_ATOMIC_UINT32_INIT (CDDS_STATE_ZERO);
static void common_cleanup (void)
{
downgrade_main_thread ();
thread_states_fini ();
if (thread_states_fini ())
dds_handle_server_fini ();
ddsi_iid_fini ();
ddsrt_cond_destroy (&dds_global.m_cond);
ddsrt_mutex_destroy (&dds_global.m_mutex);
dds_state = CDDS_STATE_ZERO;
ddsrt_atomic_st32 (&dds_state, CDDS_STATE_ZERO);
ddsrt_cond_broadcast (ddsrt_get_singleton_cond ());
}
static bool cyclonedds_entity_ready (void)
static bool cyclonedds_entity_ready (uint32_t s)
{
assert (dds_state != CDDS_STATE_ZERO);
if (dds_state == CDDS_STATE_STARTING || dds_state == CDDS_STATE_STOPPING)
assert (s != CDDS_STATE_ZERO);
if (s == CDDS_STATE_STARTING || s == CDDS_STATE_STOPPING)
return false;
else
{
struct dds_handle_link *x;
bool ready;
if (dds_handle_pin (DDS_CYCLONEDDS_HANDLE, &x) < 0)
ready = false;
else
{
ddsrt_mutex_lock (&dds_global.m_entity.m_mutex);
ready = !dds_handle_is_closed (x);
if (ready)
dds_entity_add_ref_locked (&dds_global.m_entity);
ddsrt_mutex_unlock (&dds_global.m_entity.m_mutex);
dds_handle_unpin (x);
}
return ready;
return dds_handle_pin_and_ref (DDS_CYCLONEDDS_HANDLE, &x) == DDS_RETCODE_OK;
}
}
@ -98,16 +86,20 @@ dds_return_t dds_init (void)
ddsrt_cond_t * const init_cond = ddsrt_get_singleton_cond ();
ddsrt_mutex_lock (init_mutex);
while (dds_state != CDDS_STATE_ZERO && !cyclonedds_entity_ready ())
uint32_t s = ddsrt_atomic_ld32 (&dds_state);
while (s != CDDS_STATE_ZERO && !cyclonedds_entity_ready (s))
{
ddsrt_cond_wait (init_cond, init_mutex);
switch (dds_state)
s = ddsrt_atomic_ld32 (&dds_state);
}
switch (s)
{
case CDDS_STATE_READY:
assert (dds_global.m_entity.m_hdllink.hdl == DDS_CYCLONEDDS_HANDLE);
ddsrt_mutex_unlock (init_mutex);
return DDS_RETCODE_OK;
case CDDS_STATE_ZERO:
dds_state = CDDS_STATE_STARTING;
ddsrt_atomic_st32 (&dds_state, CDDS_STATE_STARTING);
break;
default:
ddsrt_mutex_unlock (init_mutex);
@ -118,9 +110,7 @@ dds_return_t dds_init (void)
ddsrt_mutex_init (&dds_global.m_mutex);
ddsrt_cond_init (&dds_global.m_cond);
ddsi_iid_init ();
thread_states_init_static ();
thread_states_init (64);
upgrade_main_thread ();
if (dds_handle_server_init () != DDS_RETCODE_OK)
{
@ -132,39 +122,31 @@ dds_return_t dds_init (void)
dds_entity_init (&dds_global.m_entity, NULL, DDS_KIND_CYCLONEDDS, NULL, NULL, 0);
dds_global.m_entity.m_iid = ddsi_iid_gen ();
dds_global.m_entity.m_flags = DDS_ENTITY_IMPLICIT;
ddsrt_mutex_lock (&dds_global.m_entity.m_mutex);
dds_handle_repin (&dds_global.m_entity.m_hdllink);
dds_entity_add_ref_locked (&dds_global.m_entity);
ddsrt_mutex_unlock (&dds_global.m_entity.m_mutex);
dds_state = CDDS_STATE_READY;
dds_entity_init_complete (&dds_global.m_entity);
ddsrt_atomic_st32 (&dds_state, CDDS_STATE_READY);
ddsrt_mutex_unlock (init_mutex);
return DDS_RETCODE_OK;
fail_handleserver:
assert (dds_state == CDDS_STATE_STARTING);
common_cleanup ();
ddsrt_mutex_unlock (init_mutex);
ddsrt_fini ();
return ret;
}
static dds_return_t dds_close (struct dds_entity *e)
static void dds_close (struct dds_entity *e)
{
(void) e;
ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
ddsrt_cond_t * const init_cond = ddsrt_get_singleton_cond ();
ddsrt_mutex_lock (init_mutex);
assert (dds_state == CDDS_STATE_READY);
dds_state = CDDS_STATE_STOPPING;
ddsrt_cond_broadcast (init_cond);
ddsrt_mutex_unlock (init_mutex);
return DDS_RETCODE_OK;
assert (ddsrt_atomic_ld32 (&dds_state) == CDDS_STATE_READY);
ddsrt_atomic_st32 (&dds_state, CDDS_STATE_STOPPING);
}
static dds_return_t dds_fini (struct dds_entity *e)
{
(void) e;
ddsrt_mutex_t * const init_mutex = ddsrt_get_singleton_mutex ();
/* If there are multiple domains shutting down simultaneously, the one "deleting" the top-level
entity (and thus arriving here) may have overtaken another thread that is still in the process
of deleting its domain object. For most entities such races are not an issue, but here we tear
@ -175,9 +157,8 @@ static dds_return_t dds_fini (struct dds_entity *e)
ddsrt_mutex_unlock (&dds_global.m_mutex);
ddsrt_mutex_lock (init_mutex);
assert (dds_state == CDDS_STATE_STOPPING);
assert (ddsrt_atomic_ld32 (&dds_state) == CDDS_STATE_STOPPING);
dds_entity_final_deinit_before_free (e);
dds_handle_server_fini ();
common_cleanup ();
ddsrt_mutex_unlock (init_mutex);
ddsrt_fini ();

View file

@ -67,6 +67,7 @@ static dds_return_t dds_participant_qos_set (dds_entity *e, const dds_qos_t *qos
}
const struct dds_entity_deriver dds_entity_deriver_participant = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_entity_deriver_dummy_close,
.delete = dds_participant_delete,
.set_qos = dds_participant_qos_set,
@ -124,9 +125,10 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
dds_entity_register_child (&dom->m_entity, &pp->m_entity);
ddsrt_mutex_unlock (&dds_global.m_entity.m_mutex);
dds_entity_init_complete (&pp->m_entity);
/* drop temporary extra ref to domain, dds_init */
dds_delete (dom->m_entity.m_hdllink.hdl);
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
return ret;
err_entity_init:
@ -136,7 +138,7 @@ err_qos_validation:
dds_delete_qos (new_qos);
dds_delete (dom->m_entity.m_hdllink.hdl);
err_domain_create:
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
err_dds_init:
return ret;
}
@ -168,6 +170,6 @@ dds_return_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *par
}
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
return ret;
}

View file

@ -38,6 +38,7 @@ static dds_return_t dds_publisher_status_validate (uint32_t mask)
}
const struct dds_entity_deriver dds_entity_deriver_publisher = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_entity_deriver_dummy_close,
.delete = dds_entity_deriver_dummy_delete,
.set_qos = dds_publisher_qos_set,
@ -52,7 +53,7 @@ dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qo
dds_qos_t *new_qos;
dds_return_t ret;
if ((ret = dds_participant_lock_for_create (participant, &par)) != DDS_RETCODE_OK)
if ((ret = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
return ret;
new_qos = dds_create_qos ();
@ -69,6 +70,7 @@ dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qo
hdl = dds_entity_init (&pub->m_entity, &par->m_entity, DDS_KIND_PUBLISHER, new_qos, listener, DDS_PUBLISHER_STATUS_MASK);
pub->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (&par->m_entity, &pub->m_entity);
dds_entity_init_complete (&pub->m_entity);
dds_participant_unlock (par);
return hdl;
}

View file

@ -26,15 +26,16 @@ dds_entity_t dds_create_querycondition (dds_entity_t reader, uint32_t mask, dds_
dds_return_t rc;
dds_reader *r;
if ((rc = dds_reader_lock_for_create (reader, &r)) != DDS_RETCODE_OK)
if ((rc = dds_reader_lock (reader, &r)) != DDS_RETCODE_OK)
return rc;
else
{
dds_entity_t hdl;
dds_readcond *cond = dds_create_readcond (r, DDS_KIND_COND_QUERY, mask, filter);
assert (cond);
dds_reader_unlock (r);
hdl = cond->m_entity.m_hdllink.hdl;
dds_entity_init_complete (&cond->m_entity);
dds_reader_unlock (r);
return hdl;
}
}

View file

@ -30,6 +30,7 @@ static dds_return_t dds_readcond_delete (dds_entity *e)
}
const struct dds_entity_deriver dds_entity_deriver_readcondition = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_entity_deriver_dummy_close,
.delete = dds_readcond_delete,
.set_qos = dds_entity_deriver_dummy_set_qos,
@ -64,7 +65,7 @@ dds_entity_t dds_create_readcondition (dds_entity_t reader, uint32_t mask)
{
dds_reader *rd;
dds_return_t rc;
if ((rc = dds_reader_lock_for_create (reader, &rd)) != DDS_RETCODE_OK)
if ((rc = dds_reader_lock (reader, &rd)) != DDS_RETCODE_OK)
return rc;
else
{
@ -72,6 +73,7 @@ dds_entity_t dds_create_readcondition (dds_entity_t reader, uint32_t mask)
dds_readcond *cond = dds_create_readcond(rd, DDS_KIND_COND_READ, mask, 0);
assert (cond);
hdl = cond->m_entity.m_hdllink.hdl;
dds_entity_init_complete (&cond->m_entity);
dds_reader_unlock (rd);
return hdl;
}

View file

@ -40,16 +40,21 @@ DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_reader)
DDS_SAMPLE_LOST_STATUS |\
DDS_SUBSCRIPTION_MATCHED_STATUS)
static dds_return_t dds_reader_close (dds_entity *e) ddsrt_nonnull_all;
static void dds_reader_close (dds_entity *e) ddsrt_nonnull_all;
static dds_return_t dds_reader_close (dds_entity *e)
static void dds_reader_close (dds_entity *e)
{
dds_return_t ret = DDS_RETCODE_OK;
struct dds_reader * const rd = (struct dds_reader *) e;
assert (rd->m_rd != NULL);
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if (delete_reader (&e->m_domain->gv, &e->m_guid) != 0)
ret = DDS_RETCODE_ERROR;
(void) delete_reader (&e->m_domain->gv, &e->m_guid);
thread_state_asleep (lookup_thread_state ());
return ret;
ddsrt_mutex_lock (&e->m_mutex);
while (rd->m_rd != NULL)
ddsrt_cond_wait (&e->m_cond, &e->m_mutex);
ddsrt_mutex_unlock (&e->m_mutex);
}
static dds_return_t dds_reader_delete (dds_entity *e) ddsrt_nonnull_all;
@ -57,10 +62,12 @@ static dds_return_t dds_reader_delete (dds_entity *e) ddsrt_nonnull_all;
static dds_return_t dds_reader_delete (dds_entity *e)
{
dds_reader * const rd = (dds_reader *) e;
dds_return_t ret;
ret = dds_delete (rd->m_topic->m_entity.m_hdllink.hdl);
(void) dds_delete (rd->m_topic->m_entity.m_hdllink.hdl);
dds_free (rd->m_loan);
return ret;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
dds_rhc_free (rd->m_rhc);
thread_state_asleep (lookup_thread_state ());
return DDS_RETCODE_OK;
}
static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, bool enabled)
@ -90,11 +97,14 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
overhead really matters. Otherwise, it is pretty much like
dds_reader_status_cb. */
const bool data_av_enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & (DDS_DATA_AVAILABLE_STATUS << SAM_ENABLED_SHIFT));
if (!data_av_enabled)
const uint32_t data_av_enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & (DDS_DATA_AVAILABLE_STATUS << SAM_ENABLED_SHIFT));
if (data_av_enabled == 0)
return;
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
rd->m_entity.m_cb_pending_count++;
/* FIXME: why wait if no listener is set? */
while (rd->m_entity.m_cb_count > 0)
ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock);
rd->m_entity.m_cb_count++;
@ -106,6 +116,10 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
ddsrt_mutex_lock (&sub->m_observers_lock);
const uint32_t data_on_rds_enabled = (ddsrt_atomic_ld32 (&sub->m_status.m_status_and_mask) & (DDS_DATA_ON_READERS_STATUS << SAM_ENABLED_SHIFT));
if (data_on_rds_enabled)
{
sub->m_cb_pending_count++;
while (sub->m_cb_count > 0)
ddsrt_cond_wait (&sub->m_observers_cond, &sub->m_observers_lock);
sub->m_cb_count++;
@ -116,7 +130,9 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
ddsrt_mutex_lock (&sub->m_observers_lock);
sub->m_cb_count--;
sub->m_cb_pending_count--;
ddsrt_cond_broadcast (&sub->m_observers_cond);
}
ddsrt_mutex_unlock (&sub->m_observers_lock);
}
else if (rd->m_entity.m_listener.on_data_available)
@ -134,24 +150,28 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
}
rd->m_entity.m_cb_count--;
rd->m_entity.m_cb_pending_count--;
ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond);
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
}
void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
{
struct dds_entity * const entity = ventity;
dds_reader * const rd = ventity;
/* When data is NULL, it means that the DDSI reader is deleted. */
if (data == NULL)
{
/* Release the initial claim that was done during the create. This
* will indicate that further API deletion is now possible. */
dds_handle_unpin (&entity->m_hdllink);
ddsrt_mutex_lock (&rd->m_entity.m_mutex);
rd->m_rd = NULL;
ddsrt_cond_broadcast (&rd->m_entity.m_cond);
ddsrt_mutex_unlock (&rd->m_entity.m_mutex);
return;
}
struct dds_listener const * const lst = &entity->m_listener;
struct dds_listener const * const lst = &rd->m_entity.m_listener;
enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id;
bool invoke = false;
void *vst = NULL;
@ -168,13 +188,12 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
m_observers_lock for the duration of the listener call itself,
and that similarly the listener function and argument pointers
are stable */
ddsrt_mutex_lock (&entity->m_observers_lock);
while (entity->m_cb_count > 0)
ddsrt_cond_wait (&entity->m_observers_cond, &entity->m_observers_lock);
entity->m_cb_count++;
/* FIXME: why do this if no listener is set? */
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
while (rd->m_entity.m_cb_count > 0)
ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock);
/* Update status metrics. */
dds_reader * const rd = (dds_reader *) entity;
switch (status_id) {
case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: {
struct dds_requested_deadline_missed_status * const st = vst = &rd->m_requested_deadline_missed_status;
@ -258,26 +277,35 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
assert (0);
}
if (invoke)
const uint32_t enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT));
if (!enabled)
{
ddsrt_mutex_unlock (&entity->m_observers_lock);
dds_entity_invoke_listener (entity, status_id, vst);
ddsrt_mutex_lock (&entity->m_observers_lock);
/* Don't invoke listeners or set status flag if masked */
}
else if (invoke)
{
rd->m_entity.m_cb_pending_count++;
rd->m_entity.m_cb_count++;
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
dds_entity_invoke_listener (&rd->m_entity, status_id, vst);
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
rd->m_entity.m_cb_count--;
rd->m_entity.m_cb_pending_count--;
*reset[0] = 0;
if (reset[1])
*reset[1] = 0;
}
else
{
dds_entity_status_set (entity, (status_mask_t) (1u << status_id));
dds_entity_status_set (&rd->m_entity, (status_mask_t) (1u << status_id));
}
entity->m_cb_count--;
ddsrt_cond_broadcast (&entity->m_observers_cond);
ddsrt_mutex_unlock (&entity->m_observers_lock);
ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond);
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
}
const struct dds_entity_deriver dds_entity_deriver_reader = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_reader_close,
.delete = dds_reader_delete,
.set_qos = dds_reader_qos_set,
@ -323,7 +351,7 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
}
}
if ((ret = dds_subscriber_lock_for_create (subscriber, &sub)) != DDS_RETCODE_OK)
if ((ret = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK)
{
reader = ret;
goto err_sub_lock;
@ -335,7 +363,7 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
sub->m_entity.m_flags |= DDS_ENTITY_IMPLICIT;
}
if ((ret = dds_topic_lock_for_create (t, &tp)) != DDS_RETCODE_OK)
if ((ret = dds_topic_lock (t, &tp)) != DDS_RETCODE_OK)
{
reader = ret;
goto err_tp_lock;
@ -389,17 +417,13 @@ static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscribe
}
dds_entity_add_ref_locked (&tp->m_entity);
/* Extra claim of this reader to make sure that the delete waits until DDSI
has deleted its reader as well. This can be known through the callback. */
dds_handle_repin (&rd->m_entity.m_hdllink);
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
ddsrt_mutex_unlock (&sub->m_entity.m_mutex);
/* FIXME: listeners can come too soon ... should set mask based on listeners
then atomically set the listeners, save the mask to a pending set and clear
it; and then invoke those listeners that are in the pending set */
dds_entity_init_complete (&rd->m_entity);
thread_state_awake (lookup_thread_state (), &sub->m_entity.m_domain->gv);
ret = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
ddsrt_mutex_lock (&sub->m_entity.m_mutex);
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
assert (ret == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */
thread_state_asleep (lookup_thread_state ());

View file

@ -38,6 +38,7 @@ static dds_return_t dds_subscriber_status_validate (uint32_t mask)
}
const struct dds_entity_deriver dds_entity_deriver_subscriber = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_entity_deriver_dummy_close,
.delete = dds_entity_deriver_dummy_delete,
.set_qos = dds_subscriber_qos_set,
@ -66,6 +67,7 @@ dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_q
subscriber = dds_entity_init (&sub->m_entity, &participant->m_entity, DDS_KIND_SUBSCRIBER, new_qos, listener, DDS_SUBSCRIBER_STATUS_MASK);
sub->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (&participant->m_entity, &sub->m_entity);
dds_entity_init_complete (&sub->m_entity);
return subscriber;
}
@ -74,7 +76,7 @@ dds_entity_t dds_create_subscriber (dds_entity_t participant, const dds_qos_t *q
dds_participant *par;
dds_entity_t hdl;
dds_return_t ret;
if ((ret = dds_participant_lock_for_create (participant, &par)) != DDS_RETCODE_OK)
if ((ret = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
return ret;
hdl = dds__create_subscriber_l (par, qos, listener);
dds_participant_unlock (par);

View file

@ -243,7 +243,7 @@ static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct dds
return true;
}
static dds_return_t create_topic_topic_arbirary_check_sertopic (dds_entity_t participant, dds_entity_t topic, struct ddsi_sertopic *sertopic, const dds_qos_t *qos)
static dds_return_t create_topic_topic_arbitrary_check_sertopic (dds_entity_t participant, dds_entity_t topic, struct ddsi_sertopic *sertopic, const dds_qos_t *qos)
{
dds_topic *tp;
dds_return_t ret;
@ -272,6 +272,7 @@ static dds_return_t create_topic_topic_arbirary_check_sertopic (dds_entity_t par
}
const struct dds_entity_deriver dds_entity_deriver_topic = {
.interrupt = dds_entity_deriver_dummy_interrupt,
.close = dds_entity_deriver_dummy_close,
.delete = dds_topic_delete,
.set_qos = dds_topic_qos_set,
@ -324,7 +325,7 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
/* FIXME: just mutex_lock ought to be good enough, but there is the
pesky "closed" check still ... */
if ((rc = dds_participant_lock_for_create (participant, &par)) != DDS_RETCODE_OK)
if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
goto err_lock_participant;
bool retry_lookup;
@ -359,7 +360,7 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
for the various scary cases. */
dds_participant_unlock (par);
rc = create_topic_topic_arbirary_check_sertopic (participant, topic, sertopic, new_qos);
rc = create_topic_topic_arbitrary_check_sertopic (participant, topic, sertopic, new_qos);
switch (rc)
{
case DDS_RETCODE_OK: /* duplicate definition */
@ -384,7 +385,7 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
abort ();
}
if ((rc = dds_participant_lock_for_create (participant, &par)) != DDS_RETCODE_OK)
if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
goto err_lock_participant;
}
} while (retry_lookup);
@ -447,6 +448,8 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
nn_plist_fini (&plist);
}
thread_state_asleep (lookup_thread_state ());
dds_entity_init_complete (&top->m_entity);
dds_participant_unlock (par);
dds_entity_unpin (par_ent);
return hdl;

View file

@ -21,8 +21,6 @@
#include "dds/ddsc/dds_rhc.h"
#include "dds/ddsi/ddsi_iid.h"
DEFINE_ENTITY_LOCK_UNLOCK_ONLY (static, dds_waitset, DDS_KIND_WAITSET)
static bool is_triggered (struct dds_entity *e)
{
bool t;
@ -64,7 +62,7 @@ static dds_return_t dds_waitset_wait_impl (dds_entity_t waitset, dds_attach_t *x
}
/* Move any previously but no longer triggering entities back to the observed list */
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
ddsrt_mutex_lock (&ws->wait_lock);
ws->ntriggered = 0;
for (size_t i = 0; i < ws->nentities; i++)
{
@ -77,40 +75,64 @@ static dds_return_t dds_waitset_wait_impl (dds_entity_t waitset, dds_attach_t *x
}
/* Only wait/keep waiting when we have something to observe and there aren't any triggers yet. */
while (ws->nentities > 0 && ws->ntriggered == 0)
if (!ddsrt_cond_waituntil (&ws->m_entity.m_cond, &ws->m_entity.m_mutex, abstimeout))
while (ws->nentities > 0 && ws->ntriggered == 0 && !dds_handle_is_closed (&ws->m_entity.m_hdllink))
if (!ddsrt_cond_waituntil (&ws->wait_cond, &ws->wait_lock, abstimeout))
break;
ret = (int32_t) ws->ntriggered;
for (size_t i = 0; i < ws->ntriggered && i < nxs; i++)
xs[i] = ws->entities[i].arg;
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
ddsrt_mutex_unlock (&ws->wait_lock);
dds_entity_unpin (&ws->m_entity);
return ret;
}
static dds_return_t dds_waitset_close (struct dds_entity *e)
static void dds_waitset_interrupt (struct dds_entity *e)
{
/* deep in the process of deleting the entity, so this is the only thread */
dds_waitset *ws = (dds_waitset *) e;
ddsrt_mutex_lock (&ws->wait_lock);
assert (dds_handle_is_closed (&ws->m_entity.m_hdllink));
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
}
static void dds_waitset_close (struct dds_entity *e)
{
dds_waitset *ws = (dds_waitset *) e;
ddsrt_mutex_lock (&ws->wait_lock);
while (ws->nentities > 0)
{
dds_return_t rc = dds_entity_observer_unregister (ws->entities[0].entity, &ws->m_entity);
assert (rc == DDS_RETCODE_OK);
(void) rc;
dds_entity *observed;
if (dds_entity_pin (ws->entities[0].handle, &observed) < 0)
{
/* can't be pinned => being deleted => will be removed from wait set soon enough
and go through delete_observer (which will trigger the condition variable) */
ddsrt_cond_wait (&ws->wait_cond, &ws->wait_lock);
}
return DDS_RETCODE_OK;
else
{
/* entity will remain in existence */
ddsrt_mutex_unlock (&ws->wait_lock);
(void) dds_entity_observer_unregister (observed, ws, true);
ddsrt_mutex_lock (&ws->wait_lock);
assert (ws->nentities == 0 || ws->entities[0].entity != observed);
dds_entity_unpin (observed);
}
}
ddsrt_mutex_unlock (&ws->wait_lock);
}
static dds_return_t dds_waitset_delete (struct dds_entity *e)
{
/* deep in the process of deleting the entity, so this is the only thread */
dds_waitset *ws = (dds_waitset *) e;
ddsrt_mutex_destroy (&ws->wait_lock);
ddsrt_cond_destroy (&ws->wait_cond);
ddsrt_free (ws->entities);
return DDS_RETCODE_OK;
}
const struct dds_entity_deriver dds_entity_deriver_waitset = {
.interrupt = dds_waitset_interrupt,
.close = dds_waitset_close,
.delete = dds_waitset_delete,
.set_qos = dds_entity_deriver_dummy_set_qos,
@ -129,7 +151,7 @@ dds_entity_t dds_create_waitset (dds_entity_t owner)
if ((rc = dds_init ()) < 0)
return rc;
if ((rc = dds_entity_lock_for_create (owner, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
if ((rc = dds_entity_lock (owner, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
goto err_entity_lock;
switch (dds_entity_kind (e))
@ -145,48 +167,58 @@ dds_entity_t dds_create_waitset (dds_entity_t owner)
dds_waitset *waitset = dds_alloc (sizeof (*waitset));
dds_entity_t hdl = dds_entity_init (&waitset->m_entity, e, DDS_KIND_WAITSET, NULL, NULL, 0);
ddsrt_mutex_init (&waitset->wait_lock);
ddsrt_cond_init (&waitset->wait_cond);
waitset->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (e, &waitset->m_entity);
waitset->nentities = 0;
waitset->ntriggered = 0;
waitset->entities = NULL;
dds_entity_init_complete (&waitset->m_entity);
dds_entity_unlock (e);
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
return hdl;
err_entity_kind:
dds_entity_unlock (e);
err_entity_lock:
dds_delete (DDS_CYCLONEDDS_HANDLE);
dds_delete_impl_pinned (&dds_global.m_entity, DIS_EXPLICIT);
return rc;
}
dds_return_t dds_waitset_get_entities (dds_entity_t waitset, dds_entity_t *entities, size_t size)
{
dds_return_t ret;
dds_waitset *ws;
if ((ret = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
dds_entity *wsent;
if ((ret = dds_entity_pin (waitset, &wsent)) < 0)
return ret;
else if (dds_entity_kind (wsent) != DDS_KIND_WAITSET)
{
dds_entity_unpin (wsent);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
else
{
dds_waitset *ws = (dds_waitset *) wsent;
ddsrt_mutex_lock (&ws->wait_lock);
if (entities != NULL)
{
for (size_t i = 0; i < ws->nentities && i < size; i++)
entities[i] = ws->entities[i].handle;
}
ret = (int32_t) ws->nentities;
dds_waitset_unlock (ws);
ddsrt_mutex_unlock (&ws->wait_lock);
dds_entity_unpin (&ws->m_entity);
return ret;
}
}
/* This is called when the observed entity signals a status change. */
static void dds_waitset_observer (dds_entity *ent, dds_entity_t observed, uint32_t status)
static void dds_waitset_observer (struct dds_waitset *ws, dds_entity_t observed, uint32_t status)
{
assert (dds_entity_kind (ent) == DDS_KIND_WAITSET);
dds_waitset *ws = (dds_waitset *) ent;
(void) status;
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
ddsrt_mutex_lock (&ws->wait_lock);
/* Move observed entity to triggered list. */
size_t i;
for (i = 0; i < ws->nentities; i++)
@ -199,20 +231,18 @@ static void dds_waitset_observer (dds_entity *ent, dds_entity_t observed, uint32
ws->entities[ws->ntriggered++] = tmp;
}
/* Trigger waitset to wake up. */
ddsrt_cond_broadcast (&ws->m_entity.m_cond);
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
}
struct dds_waitset_attach_observer_arg {
dds_attach_t x;
};
static void dds_waitset_attach_observer (struct dds_entity *observer, struct dds_entity *observed, void *varg)
static bool dds_waitset_attach_observer (struct dds_waitset *ws, struct dds_entity *observed, void *varg)
{
assert (dds_entity_kind (observer) == DDS_KIND_WAITSET);
dds_waitset *ws = (dds_waitset *) observer;
struct dds_waitset_attach_observer_arg *arg = varg;
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
ddsrt_mutex_lock (&ws->wait_lock);
ws->entities = ddsrt_realloc (ws->entities, (ws->nentities + 1) * sizeof (*ws->entities));
ws->entities[ws->nentities].arg = arg->x;
ws->entities[ws->nentities].entity = observed;
@ -225,16 +255,15 @@ static void dds_waitset_attach_observer (struct dds_entity *observer, struct dds
ws->entities[i] = ws->entities[ws->ntriggered];
ws->entities[ws->ntriggered++] = tmp;
}
ddsrt_cond_broadcast (&ws->m_entity.m_cond);
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
return true;
}
static void dds_waitset_delete_observer (dds_entity *ent, dds_entity_t observed)
static void dds_waitset_delete_observer (struct dds_waitset *ws, dds_entity_t observed)
{
assert (dds_entity_kind (ent) == DDS_KIND_WAITSET);
dds_waitset *ws = (dds_waitset *) ent;
size_t i;
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
ddsrt_mutex_lock (&ws->wait_lock);
for (i = 0; i < ws->nentities; i++)
if (ws->entities[i].handle == observed)
break;
@ -250,8 +279,8 @@ static void dds_waitset_delete_observer (dds_entity *ent, dds_entity_t observed)
ws->entities[i] = ws->entities[--ws->nentities];
}
}
ddsrt_cond_broadcast (&ws->m_entity.m_cond);
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
ddsrt_cond_broadcast (&ws->wait_cond);
ddsrt_mutex_unlock (&ws->wait_lock);
}
dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_attach_t x)
@ -271,9 +300,7 @@ dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_
{
dds_waitset *ws = (dds_waitset *) wsent;
if (waitset == entity)
e = &ws->m_entity;
else if ((ret = dds_entity_pin (entity, &e)) < 0)
if ((ret = dds_entity_pin (entity, &e)) < 0)
goto err_entity;
/* Entity must be "in scope": within the participant, domain or (self-evidently true) Cyclone DDS,
@ -288,11 +315,9 @@ dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_
/* This will fail if given entity is already attached (or deleted). */
struct dds_waitset_attach_observer_arg attach_arg = { .x = x };
if ((ret = dds_entity_observer_register (e, &ws->m_entity, dds_waitset_observer, dds_waitset_attach_observer, &attach_arg, dds_waitset_delete_observer)) != DDS_RETCODE_OK)
goto err_entity;
ret = dds_entity_observer_register (e, ws, dds_waitset_observer, dds_waitset_attach_observer, &attach_arg, dds_waitset_delete_observer);
err_scope:
if (e != &ws->m_entity)
dds_entity_unpin (e);
err_entity:
dds_entity_unpin (&ws->m_entity);
@ -318,12 +343,12 @@ dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity)
dds_entity *e;
/* Possibly fails when entity was not attached. */
if (waitset == entity)
ret = dds_entity_observer_unregister (&ws->m_entity, &ws->m_entity);
ret = dds_entity_observer_unregister (&ws->m_entity, ws, true);
else if ((ret = dds_entity_pin (entity, &e)) < 0)
; /* entity invalid */
else
{
ret = dds_entity_observer_unregister (e, &ws->m_entity);
ret = dds_entity_observer_unregister (e, ws, true);
dds_entity_unpin (e);
}
@ -352,7 +377,6 @@ dds_return_t dds_waitset_set_trigger (dds_entity_t waitset, bool trigger)
{
dds_entity *ent;
dds_return_t rc;
if ((rc = dds_entity_pin (waitset, &ent)) != DDS_RETCODE_OK)
return rc;
else if (dds_entity_kind (ent) != DDS_KIND_WAITSET)
@ -360,11 +384,10 @@ dds_return_t dds_waitset_set_trigger (dds_entity_t waitset, bool trigger)
dds_entity_unpin (ent);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
ddsrt_mutex_lock (&ent->m_observers_lock);
else
{
dds_entity_trigger_set (ent, trigger);
ddsrt_mutex_unlock (&ent->m_observers_lock);
dds_entity_unpin (ent);
return DDS_RETCODE_OK;
}
}

View file

@ -51,34 +51,36 @@ static dds_return_t dds_writer_status_validate (uint32_t mask)
static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data)
{
struct dds_entity * const entity = ventity;
dds_writer * const wr = ventity;
/* When data is NULL, it means that the writer is deleted. */
/* When data is NULL, it means that the DDSI reader is deleted. */
if (data == NULL)
{
/* Release the initial claim that was done during the create. This
* will indicate that further API deletion is now possible. */
dds_handle_unpin (&entity->m_hdllink);
ddsrt_mutex_lock (&wr->m_entity.m_mutex);
wr->m_wr = NULL;
ddsrt_cond_broadcast (&wr->m_entity.m_cond);
ddsrt_mutex_unlock (&wr->m_entity.m_mutex);
return;
}
struct dds_listener const * const lst = &entity->m_listener;
struct dds_listener const * const lst = &wr->m_entity.m_listener;
enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id;
bool invoke = false;
void *vst = NULL;
int32_t *reset[2] = { NULL, NULL };
ddsrt_mutex_lock (&entity->m_observers_lock);
while (entity->m_cb_count > 0)
ddsrt_cond_wait (&entity->m_observers_cond, &entity->m_observers_lock);
entity->m_cb_count++;
/* FIXME: why wait if no listener is set? */
ddsrt_mutex_lock (&wr->m_entity.m_observers_lock);
while (wr->m_entity.m_cb_count > 0)
ddsrt_cond_wait (&wr->m_entity.m_observers_cond, &wr->m_entity.m_observers_lock);
/* Reset the status for possible Listener call.
* When a listener is not called, the status will be set (again). */
dds_entity_status_reset (entity, (status_mask_t) (1u << status_id));
dds_entity_status_reset (&wr->m_entity, (status_mask_t) (1u << status_id));
/* Update status metrics. */
dds_writer * const wr = (dds_writer *) entity;
switch (status_id)
{
case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: {
@ -136,23 +138,31 @@ static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data)
assert (0);
}
if (invoke)
const uint32_t enabled = (ddsrt_atomic_ld32 (&wr->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT));
if (enabled == 0)
{
ddsrt_mutex_unlock (&entity->m_observers_lock);
dds_entity_invoke_listener(entity, status_id, vst);
ddsrt_mutex_lock (&entity->m_observers_lock);
/* Don't invoke listeners or set status flag if masked */
}
else if (invoke)
{
wr->m_entity.m_cb_pending_count++;
wr->m_entity.m_cb_count++;
ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock);
dds_entity_invoke_listener (&wr->m_entity, status_id, vst);
ddsrt_mutex_lock (&wr->m_entity.m_observers_lock);
wr->m_entity.m_cb_count--;
wr->m_entity.m_cb_pending_count--;
*reset[0] = 0;
if (reset[1])
*reset[1] = 0;
}
else
{
dds_entity_status_set (entity, (status_mask_t) (1u << status_id));
dds_entity_status_set (&wr->m_entity, (status_mask_t) (1u << status_id));
}
entity->m_cb_count--;
ddsrt_cond_broadcast (&entity->m_observers_cond);
ddsrt_mutex_unlock (&entity->m_observers_lock);
ddsrt_cond_broadcast (&wr->m_entity.m_observers_cond);
ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock);
}
static uint32_t get_bandwidth_limit (dds_transport_priority_qospolicy_t transport_priority)
@ -166,18 +176,32 @@ static uint32_t get_bandwidth_limit (dds_transport_priority_qospolicy_t transpor
#endif
}
static dds_return_t dds_writer_close (dds_entity *e) ddsrt_nonnull_all;
static void dds_writer_interrupt (dds_entity *e) ddsrt_nonnull_all;
static dds_return_t dds_writer_close (dds_entity *e)
static void dds_writer_interrupt (dds_entity *e)
{
dds_writer * const wr = (dds_writer *) e;
dds_return_t ret;
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
nn_xpack_send (wr->m_xp, false);
if ((ret = delete_writer (&e->m_domain->gv, &e->m_guid)) < 0)
ret = DDS_RETCODE_ERROR;
struct q_globals * const gv = &e->m_domain->gv;
thread_state_awake (lookup_thread_state (), gv);
unblock_throttled_writer (gv, &e->m_guid);
thread_state_asleep (lookup_thread_state ());
return ret;
}
static void dds_writer_close (dds_entity *e) ddsrt_nonnull_all;
static void dds_writer_close (dds_entity *e)
{
struct dds_writer * const wr = (struct dds_writer *) e;
struct q_globals * const gv = &e->m_domain->gv;
struct thread_state1 * const ts1 = lookup_thread_state ();
thread_state_awake (ts1, gv);
nn_xpack_send (wr->m_xp, false);
(void) delete_writer (gv, &e->m_guid);
thread_state_asleep (ts1);
ddsrt_mutex_lock (&e->m_mutex);
while (wr->m_wr != NULL)
ddsrt_cond_wait (&e->m_cond, &e->m_mutex);
ddsrt_mutex_unlock (&e->m_mutex);
}
static dds_return_t dds_writer_delete (dds_entity *e) ddsrt_nonnull_all;
@ -233,6 +257,7 @@ static struct whc *make_whc (struct dds_domain *dom, const dds_qos_t *qos)
}
const struct dds_entity_deriver dds_entity_deriver_writer = {
.interrupt = dds_writer_interrupt,
.close = dds_writer_close,
.delete = dds_writer_delete,
.set_qos = dds_writer_qos_set,
@ -261,14 +286,14 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
dds_entity_unpin (p_or_p);
}
if ((rc = dds_publisher_lock_for_create (publisher, &pub)) != DDS_RETCODE_OK)
if ((rc = dds_publisher_lock (publisher, &pub)) != DDS_RETCODE_OK)
return rc;
ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc;
if (publisher != participant_or_publisher)
pub->m_entity.m_flags |= DDS_ENTITY_IMPLICIT;
if ((rc = dds_topic_lock_for_create (topic, &tp)) != DDS_RETCODE_OK)
if ((rc = dds_topic_lock (topic, &tp)) != DDS_RETCODE_OK)
goto err_tp_lock;
assert (tp->m_stopic);
@ -305,23 +330,15 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
wr->m_whc = make_whc (pub->m_entity.m_domain, wqos);
wr->whc_batch = pub->m_entity.m_domain->gv.config.whc_batch;
/* Extra claim of this writer to make sure that the delete waits until DDSI
* has deleted its writer as well. This can be known through the callback. */
dds_handle_repin (&wr->m_entity.m_hdllink);
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
thread_state_awake (lookup_thread_state (), &pub->m_entity.m_domain->gv);
rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, &pp->m_entity.m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
ddsrt_mutex_lock (&pub->m_entity.m_mutex);
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
assert(rc == DDS_RETCODE_OK);
thread_state_asleep (lookup_thread_state ());
wr->m_entity.m_iid = get_entity_instance_id (&wr->m_entity.m_domain->gv, &wr->m_entity.m_guid);
dds_entity_register_child (&pub->m_entity, &wr->m_entity);
dds_entity_init_complete (&wr->m_entity);
dds_topic_unlock (tp);
dds_publisher_unlock (pub);
return writer;

View file

@ -47,6 +47,7 @@ set(ddsc_test_sources
"unregister.c"
"unsupported.c"
"waitset.c"
"waitset_torture.c"
"write.c"
"writer.c")

View file

@ -0,0 +1,398 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <limits.h>
#include "dds/dds.h"
#include "CUnit/Theory.h"
#include "RoundTrip.h"
#include "dds/ddsrt/cdtors.h"
#include "dds/ddsrt/misc.h"
#include "dds/ddsrt/process.h"
#include "dds/ddsrt/threads.h"
#include "dds/ddsrt/atomics.h"
#include "dds/ddsrt/time.h"
#include "dds/ddsrt/random.h"
#define N_WAITSETS 20
#define N_ENTITIES 32
#define N_GUARDCONDS 20
#define N_SUBSCRIBERS 4
#define N_READERS 4
#define N_READCONDS 4
static dds_entity_t ppant;
static dds_entity_t topic;
static ddsrt_atomic_uint32_t terminate;
static ddsrt_atomic_uint32_t waitsets[N_WAITSETS];
static ddsrt_atomic_uint32_t entities[N_ENTITIES], signalled;
static ddsrt_atomic_uint32_t attach_ok, detach_ok, settrig_ok;
static ddsrt_atomic_uint32_t create_ent_ok[5], delete_ent_ok[5];
static ddsrt_atomic_uint32_t create_ws_ok, delete_ws_ok;
#define RESERVED ((uint32_t) 0xffffffffu)
static void init_prng (ddsrt_prng_t *prng)
{
ddsrt_prng_seed_t prng_seed;
for (size_t i = 0; i < sizeof (prng_seed.key) / sizeof (prng_seed.key[0]); i++)
prng_seed.key[i] = ddsrt_random ();
ddsrt_prng_init (prng, &prng_seed);
}
static void choose_index (uint32_t *p_idx, uint32_t *p_handle, ddsrt_prng_t *prng, ddsrt_atomic_uint32_t elems[], size_t nelems)
{
uint32_t idx, h, h1;
retry_idx:
idx = ddsrt_prng_random (prng) % (uint32_t) nelems;
retry_cas:
h = ddsrt_atomic_ld32 (&elems[idx]);
if (h == 0)
h1 = RESERVED;
else if ((int32_t) h > 0)
h1 = 0;
else
goto retry_idx;
if (!ddsrt_atomic_cas32 (&elems[idx], h, h1))
goto retry_cas;
*p_idx = idx;
*p_handle = h;
}
static dds_entity_t pick_a_subscriber (void)
{
uint32_t idx = ddsrt_random () % N_SUBSCRIBERS;
uint32_t x = ddsrt_atomic_ld32 (&entities[N_GUARDCONDS + idx]);
return (dds_entity_t) x;
}
static dds_entity_t pick_a_reader (void)
{
uint32_t idx = ddsrt_random () % N_READERS;
uint32_t x = ddsrt_atomic_ld32 (&entities[N_GUARDCONDS + N_SUBSCRIBERS + idx]);
return (dds_entity_t) x;
}
static int index_to_counter_index (uint32_t idx)
{
if (idx < N_GUARDCONDS)
return 0;
else if (idx < N_GUARDCONDS + N_SUBSCRIBERS)
return 1;
else if (idx < N_GUARDCONDS + N_SUBSCRIBERS + N_READERS)
return 2;
else
return 4;
}
static uint32_t guardcond_create_delete_thread (void *varg)
{
(void) varg;
ddsrt_prng_t prng;
init_prng (&prng);
while (!ddsrt_atomic_ld32 (&terminate))
{
uint32_t idx, handle;
choose_index (&idx, &handle, &prng, entities, N_ENTITIES);
if (handle == 0)
{
dds_entity_t ent = 0, parent = 0;
if (idx < N_GUARDCONDS)
ent = dds_create_guardcondition (DDS_CYCLONEDDS_HANDLE);
else if (idx < N_GUARDCONDS + N_SUBSCRIBERS)
ent = dds_create_subscriber (ppant, NULL, NULL);
else if (idx < N_GUARDCONDS + N_SUBSCRIBERS + N_READERS)
{
if ((parent = pick_a_subscriber ()) == 0)
parent = ppant;
ent = dds_create_reader (parent, topic, NULL, NULL);
}
else if ((parent = pick_a_reader ()) != 0)
{
ent = dds_create_readcondition (parent, DDS_ANY_STATE);
}
if (ent > 0)
{
ddsrt_atomic_inc32 (&create_ent_ok[index_to_counter_index (idx) + (parent == ppant)]);
ddsrt_atomic_st32 (&entities[idx], (uint32_t) ent);
}
else if (ent < 0 && idx < N_GUARDCONDS)
{
fprintf (stderr, "dds_create_guardcondition failed: %s\n", dds_strretcode (ent));
ddsrt_atomic_st32 (&terminate, 1);
return 1;
}
}
else
{
dds_return_t rc = dds_delete ((dds_entity_t) handle);
if (rc == 0)
ddsrt_atomic_inc32 (&delete_ent_ok[index_to_counter_index (idx)]);
}
}
return 0;
}
static uint32_t waitset_create_delete_thread (void *varg)
{
(void) varg;
ddsrt_prng_t prng;
init_prng (&prng);
while (!ddsrt_atomic_ld32 (&terminate))
{
uint32_t idx, handle;
choose_index (&idx, &handle, &prng, waitsets, N_WAITSETS);
if (handle == 0)
{
dds_entity_t ws = dds_create_waitset (DDS_CYCLONEDDS_HANDLE);
if (ws < 0)
{
fprintf (stderr, "dds_create_waitset failed: %s\n", dds_strretcode (ws));
ddsrt_atomic_st32 (&terminate, 1);
return 1;
}
ddsrt_atomic_inc32 (&create_ws_ok);
ddsrt_atomic_st32 (&waitsets[idx], (uint32_t) ws);
}
else
{
dds_return_t rc = dds_delete ((dds_entity_t) handle);
if (rc == 0)
ddsrt_atomic_inc32 (&delete_ws_ok);
}
}
return 0;
}
static uint32_t guardcond_trigger_thread (void *varg)
{
(void) varg;
ddsrt_prng_t prng;
init_prng (&prng);
while (!ddsrt_atomic_ld32 (&terminate))
{
uint32_t idx = ddsrt_prng_random (&prng) % N_ENTITIES;
uint32_t h = ddsrt_atomic_ld32 (&entities[idx]);
if ((int32_t) h <= 0)
continue;
else
{
uint32_t s, s1;
do {
s = ddsrt_atomic_ld32 (&signalled);
s1 = s ^ (1u << idx);
} while (!ddsrt_atomic_cas32 (&signalled, s, s1));
dds_return_t rc = dds_set_guardcondition ((dds_entity_t) h, (s & (1u << idx)) ? false : true);
if (rc == 0)
ddsrt_atomic_inc32 (&settrig_ok);
}
}
return 0;
}
static uint32_t waitset_attach_detach_thread (void *varg)
{
(void) varg;
ddsrt_prng_t prng;
init_prng (&prng);
while (!ddsrt_atomic_ld32 (&terminate))
{
uint32_t wsidx = ddsrt_prng_random (&prng) % N_WAITSETS;
uint32_t wsh = ddsrt_atomic_ld32 (&waitsets[wsidx]);
if ((int32_t) wsh <= 0)
continue;
uint32_t gcidx = ddsrt_prng_random (&prng) % N_ENTITIES;
uint32_t gch = ddsrt_atomic_ld32 (&entities[gcidx]);
if ((int32_t) gch <= 0)
continue;
dds_return_t rc;
rc = dds_waitset_detach ((dds_entity_t) wsh, (dds_entity_t) gch);
if (rc == 0)
{
ddsrt_atomic_inc32 (&detach_ok);
}
else if (rc != DDS_RETCODE_PRECONDITION_NOT_MET && rc != DDS_RETCODE_BAD_PARAMETER)
{
/* attempts at attaching a guard condition twice or detaching an unattached
one are expected, and those result in a PRECONDITION_NOT_MET */
fprintf (stderr, "dds_waitset_detach 0x%"PRIx32" 0x%"PRIx32" failed: %s\n", (dds_entity_t) wsh, (dds_entity_t) gch, dds_strretcode (rc));
ddsrt_atomic_st32 (&terminate, 1);
return 1;
}
else
{
/* should imply it is already attached, so try detaching */
rc = dds_waitset_attach ((dds_entity_t) wsh, (dds_entity_t) gch, 0);
if (rc == 0)
{
ddsrt_atomic_inc32 (&attach_ok);
}
else if (rc != DDS_RETCODE_PRECONDITION_NOT_MET && rc != DDS_RETCODE_BAD_PARAMETER)
{
fprintf (stderr, "dds_waitset_attach 0x%"PRIx32" 0x%"PRIx32" failed: %s\n", (dds_entity_t) wsh, (dds_entity_t) gch, dds_strretcode (rc));
ddsrt_atomic_st32 (&terminate, 1);
return 1;
}
}
}
return 0;
}
CU_Test (ddsc_waitset, torture)
{
dds_return_t rc;
ddsrt_thread_t tids[8];
ddsrt_threadattr_t tattr;
ddsrt_threadattr_init (&tattr);
/* This keeps the library initialised -- it shouldn't be necessary */
ppant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL);
CU_ASSERT_FATAL (ppant > 0);
topic = dds_create_topic (ppant, &RoundTripModule_DataType_desc, "waitset_torture_topic", NULL, NULL);
CU_ASSERT_FATAL (topic > 0);
rc = ddsrt_thread_create (&tids[0], "gc_cd", &tattr, guardcond_create_delete_thread, 0);
CU_ASSERT_FATAL (rc == 0);
rc = ddsrt_thread_create (&tids[1], "gc_cd", &tattr, guardcond_create_delete_thread, 0);
CU_ASSERT_FATAL (rc == 0);
rc = ddsrt_thread_create (&tids[2], "ws_cd", &tattr, waitset_create_delete_thread, 0);
CU_ASSERT_FATAL (rc == 0);
rc = ddsrt_thread_create (&tids[3], "ws_cd", &tattr, waitset_create_delete_thread, 0);
CU_ASSERT_FATAL (rc == 0);
rc = ddsrt_thread_create (&tids[4], "gc_t", &tattr, guardcond_trigger_thread, 0);
CU_ASSERT_FATAL (rc == 0);
rc = ddsrt_thread_create (&tids[5], "gc_t", &tattr, guardcond_trigger_thread, 0);
CU_ASSERT_FATAL (rc == 0);
rc = ddsrt_thread_create (&tids[6], "ws_ad", &tattr, waitset_attach_detach_thread, 0);
CU_ASSERT_FATAL (rc == 0);
rc = ddsrt_thread_create (&tids[7], "ws_ad", &tattr, waitset_attach_detach_thread, 0);
CU_ASSERT_FATAL (rc == 0);
uint32_t wait_err = 0, wait_ok[N_ENTITIES + 1] = { 0 };
dds_time_t tstop = dds_time () + DDS_SECS (5);
while (dds_time () < tstop && !ddsrt_atomic_ld32 (&terminate))
{
/* Try waiting on the waitset in slot 0 if it exists (it shouldn't make much
difference which waitset we use; this is easy). There are never more than
N_ENTITIES guard conditions, so there are also never more than that many
triggering entities, and so we can easily do a small histogram. (The longer
you run it, the longer the tail of triggering entities one expects.)
Error handling: the waitset may be deleted in between loading the handle
and pinning it wait(), so BAD_PARAMETER is to be expected. If the "extragc"
isn't there to ensure the library stays initialised, it is even possible
that we get PRECONDITION_NOT_MET if it just so happened that with the
deleting of that waitset, no entities remain at all. */
dds_entity_t ws = (dds_entity_t) ddsrt_atomic_ld32 (&waitsets[0]);
if (ws > 0)
{
int32_t n = dds_waitset_wait (ws, NULL, 0, DDS_MSECS (10));
if (!((rc >= 0 && rc <= N_ENTITIES) || rc == DDS_RETCODE_BAD_PARAMETER))
{
fprintf (stderr, "dds_waitset_wait failed: %s\n", dds_strretcode (rc));
ddsrt_atomic_st32 (&terminate, 1);
rc = DDS_RETCODE_ERROR;
}
else
{
if (n >= 0)
wait_ok[n]++;
else
wait_err++;
}
}
}
ddsrt_atomic_st32 (&terminate, 1);
CU_ASSERT (rc != DDS_RETCODE_ERROR);
for (size_t i = 0; i < sizeof (tids) / sizeof (tids[0]); i++)
{
uint32_t retval;
rc = ddsrt_thread_join (tids[i], &retval);
CU_ASSERT_FATAL (rc == 0);
CU_ASSERT (retval == 0);
}
/* The threads don't bother to clean up, so delete whatever guard conditions and
waitsets happen to still exist. Passing garbage into dds_delete is supposed
to work, so don't bother with any validation or error checking. */
for (uint32_t i = 0; i < N_ENTITIES; i++)
{
if (dds_delete ((dds_entity_t) ddsrt_atomic_ld32 (&entities[i])) == DDS_RETCODE_OK)
ddsrt_atomic_inc32 (&delete_ent_ok[index_to_counter_index (i)]);
}
for (uint32_t i = 0; i < N_WAITSETS; i++)
{
if (dds_delete ((dds_entity_t) ddsrt_atomic_ld32 (&waitsets[i])) == DDS_RETCODE_OK)
ddsrt_atomic_inc32 (&delete_ws_ok);
}
/* All we should be left within the participant is the topic */
rc = dds_delete (topic);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
rc = dds_get_children (ppant, NULL, 0);
CU_ASSERT_FATAL (rc == 0);
rc = dds_delete (ppant);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
printf ("attach %"PRIu32" detach %"PRIu32" settrig %"PRIu32"\n", ddsrt_atomic_ld32 (&attach_ok), ddsrt_atomic_ld32 (&detach_ok), ddsrt_atomic_ld32 (&settrig_ok));
printf ("create/delete ent");
uint32_t create_ent_ok_sum = 0;
for (size_t i = 0; i < sizeof (create_ent_ok) / sizeof (create_ent_ok[0]); i++)
{
uint32_t c = ddsrt_atomic_ld32 (&create_ent_ok[i]);
create_ent_ok_sum += c;
printf (" %"PRIu32"/%"PRIu32, c, ddsrt_atomic_ld32 (&delete_ent_ok[i]));
}
printf ("\n");
{
uint32_t rd_cr_sub = ddsrt_atomic_ld32 (&create_ent_ok[2]);
uint32_t rd_cr_ppant = ddsrt_atomic_ld32 (&create_ent_ok[3]);
uint32_t rd_del = ddsrt_atomic_ld32 (&delete_ent_ok[2]);
uint32_t sub_del = ddsrt_atomic_ld32 (&delete_ent_ok[1]);
CU_ASSERT (rd_del <= rd_cr_sub + rd_cr_ppant); /* can't have deleted more readers than were created */
CU_ASSERT (rd_del >= rd_cr_ppant); /* readers created with ppant as owner must have been deleted explicitly */
CU_ASSERT (rd_del - rd_cr_ppant <= sub_del); /* other readers may have been deleted by deleting a sub */
}
printf ("create/delete ws %"PRIu32"/%"PRIu32"\n", ddsrt_atomic_ld32 (&create_ws_ok), ddsrt_atomic_ld32 (&delete_ws_ok));
printf ("wait {err %"PRIu32"}", wait_err);
uint32_t wait_ok_sum = 0;
for (size_t i = 0; i < sizeof (wait_ok) / sizeof (wait_ok[0]); i++)
{
wait_ok_sum += wait_ok[i];
printf (" %"PRIu32, wait_ok[i]);
}
printf ("\n");
/* Running on Windows on the CI infrastructure has very little concurrency, but Linux
and macOS seem ok. The thresholds here appear to be sufficiently low to not give
many spurious failures, while still being sanity check that at least something
happened. */
CU_ASSERT (ddsrt_atomic_ld32 (&attach_ok) +
ddsrt_atomic_ld32 (&settrig_ok) +
ddsrt_atomic_ld32 (&create_ws_ok) +
create_ent_ok_sum +
wait_ok_sum > 1000);
rc = dds_get_parent (DDS_CYCLONEDDS_HANDLE);
CU_ASSERT_FATAL (rc == DDS_RETCODE_PRECONDITION_NOT_MET);
}

View file

@ -198,6 +198,7 @@ struct generic_endpoint { /* FIXME: currently only local endpoints; proxies use
enum writer_state {
WRST_OPERATIONAL, /* normal situation */
WRST_INTERRUPT, /* will be deleted, unblock throttle_writer but do not do anything further */
WRST_LINGERING, /* writer deletion has been requested but still has unack'd data */
WRST_DELETING /* writer is actually being deleted (removed from hash table) */
};
@ -403,6 +404,7 @@ struct deleted_participants_admin *deleted_participants_admin_new (int64_t delay
void deleted_participants_admin_free (struct deleted_participants_admin *admin);
int is_deleted_participant_guid (struct deleted_participants_admin *admin, const struct ddsi_guid *guid, unsigned for_what);
bool is_null_guid (const ddsi_guid_t *guid);
ddsi_entityid_t to_entityid (unsigned u);
int is_builtin_entityid (ddsi_entityid_t id, nn_vendorid_t vendorid);
int is_builtin_endpoint (ddsi_entityid_t id, nn_vendorid_t vendorid);
@ -568,6 +570,7 @@ int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_sta
void writer_set_retransmitting (struct writer *wr);
void writer_clear_retransmitting (struct writer *wr);
dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_guid *guid);
dds_return_t delete_writer (struct q_globals *gv, const struct ddsi_guid *guid);
dds_return_t delete_writer_nolinger (struct q_globals *gv, const struct ddsi_guid *guid);
dds_return_t delete_writer_nolinger_locked (struct writer *wr);

View file

@ -98,12 +98,9 @@ struct thread_states {
extern DDS_EXPORT struct thread_states thread_states;
extern ddsrt_thread_local struct thread_state1 *tsd_thread_state;
DDS_EXPORT void thread_states_init_static (void);
DDS_EXPORT void thread_states_init (unsigned maxthreads);
DDS_EXPORT void thread_states_fini (void);
DDS_EXPORT bool thread_states_fini (void);
DDS_EXPORT void upgrade_main_thread (void);
DDS_EXPORT void downgrade_main_thread (void);
DDS_EXPORT const struct config_thread_properties_listelem *lookup_thread_properties (const struct config *config, const char *name);
DDS_EXPORT dds_return_t create_thread_with_properties (struct thread_state1 **ts1, struct config_thread_properties_listelem const * const tprops, const char *name, uint32_t (*f) (void *arg), void *arg);
DDS_EXPORT dds_return_t create_thread (struct thread_state1 **ts, const struct q_globals *gv, const char *name, uint32_t (*f) (void *arg), void *arg);

View file

@ -114,6 +114,11 @@ static int compare_guid (const void *va, const void *vb)
return memcmp (va, vb, sizeof (ddsi_guid_t));
}
bool is_null_guid (const ddsi_guid_t *guid)
{
return guid->prefix.u[0] == 0 && guid->prefix.u[1] == 0 && guid->prefix.u[2] == 0 && guid->entityid.u == 0;
}
ddsi_entityid_t to_entityid (unsigned u)
{
ddsi_entityid_t e;
@ -3069,6 +3074,22 @@ static void writer_set_state (struct writer *wr, enum writer_state newstate)
wr->state = newstate;
}
dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_guid *guid)
{
struct writer *wr;
assert (is_writer_entityid (guid->entityid));
if ((wr = ephash_lookup_writer_guid (gv->guid_hash, guid)) == NULL)
{
GVLOGDISC ("unblock_throttled_writer(guid "PGUIDFMT") - unknown guid\n", PGUID (*guid));
return DDS_RETCODE_BAD_PARAMETER;
}
GVLOGDISC ("unblock_throttled_writer(guid "PGUIDFMT") ...\n", PGUID (*guid));
ddsrt_mutex_lock (&wr->e.lock);
writer_set_state (wr, WRST_INTERRUPT);
ddsrt_mutex_unlock (&wr->e.lock);
return 0;
}
dds_return_t delete_writer_nolinger_locked (struct writer *wr)
{
ELOGDISC (wr, "delete_writer_nolinger(guid "PGUIDFMT") ...\n", PGUID (wr->e.guid));
@ -3105,6 +3126,7 @@ dds_return_t delete_writer_nolinger (struct q_globals *gv, const struct ddsi_gui
void delete_local_orphan_writer (struct local_orphan_writer *lowr)
{
assert (thread_is_awake ());
ddsrt_mutex_lock (&lowr->wr.e.lock);
delete_writer_nolinger_locked (&lowr->wr);
ddsrt_mutex_unlock (&lowr->wr.e.lock);
@ -3458,7 +3480,7 @@ static void gc_delete_reader (struct gcreq *gcreq)
addrset_forall (rd->as, leave_mcast_helper, &arg);
}
#endif
if (rd->rhc)
if (rd->rhc && is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE))
{
ddsi_rhc_free (rd->rhc);
}

View file

@ -28,8 +28,6 @@
#include "dds/ddsi/q_globals.h"
#include "dds/ddsi/sysdeps.h"
static char main_thread_name[] = "main";
struct thread_states thread_states;
ddsrt_thread_local struct thread_state1 *tsd_thread_state;
@ -47,6 +45,7 @@ extern inline void thread_state_awake_fixed_domain (struct thread_state1 *ts1);
extern inline void thread_state_awake_to_awake_no_nest (struct thread_state1 *ts1);
static struct thread_state1 *init_thread_state (const char *tname, const struct q_globals *gv, enum thread_state state);
static void reap_thread_state (struct thread_state1 *ts1);
static void *ddsrt_malloc_aligned_cacheline (size_t size)
{
@ -74,20 +73,16 @@ static void ddsrt_free_aligned (void *ptr)
}
}
void thread_states_init_static (void)
{
static struct thread_state1 ts = {
.state = THREAD_STATE_ALIVE, .vtime = DDSRT_ATOMIC_UINT32_INIT (0), .name = "(anon)"
};
tsd_thread_state = &ts;
}
void thread_states_init (unsigned maxthreads)
{
/* Called with ddsrt's singleton mutex held (see dds_init/fini). Application threads
remaining alive can result in thread_states remaining alive, and as those thread
cache the address, we must then re-use the old array. */
if (thread_states.ts == NULL)
{
ddsrt_mutex_init (&thread_states.lock);
thread_states.nthreads = maxthreads;
thread_states.ts =
ddsrt_malloc_aligned_cacheline (maxthreads * sizeof (*thread_states.ts));
thread_states.ts = ddsrt_malloc_aligned_cacheline (maxthreads * sizeof (*thread_states.ts));
memset (thread_states.ts, 0, maxthreads * sizeof (*thread_states.ts));
/* The compiler doesn't realize that ts is large enough. */
DDSRT_WARNING_MSVC_OFF(6386);
@ -100,8 +95,28 @@ void thread_states_init (unsigned maxthreads)
DDSRT_WARNING_MSVC_ON(6386);
}
void thread_states_fini (void)
/* This thread should be at the same address as before, or never have had a slot
in the past. Also, allocate a slot for this thread if it didn't have one yet
(not strictly required, but it'll get one eventually anyway, and this makes
it rather more clear). */
#ifndef NDEBUG
struct thread_state1 * const ts0 = tsd_thread_state;
#endif
struct thread_state1 * const ts1 = lookup_thread_state_real ();
assert (ts0 == NULL || ts0 == ts1);
(void) ts1;
}
bool thread_states_fini (void)
{
/* Calling thread is the one shutting everything down, so it certainly won't (well, shouldn't)
need its slot anymore. Clean it up so that if all other threads happen to have been stopped
already, we can release all resources. */
struct thread_state1 *ts1 = lookup_thread_state ();
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
reap_thread_state (ts1);
tsd_thread_state = NULL;
/* Some applications threads that, at some point, required a thread state, may still be around.
Of those, the cleanup routine is invoked when the thread terminates. This should be rewritten
to not rely on this global thing and with each thread owning its own bit state, e.g., linked
@ -120,6 +135,11 @@ void thread_states_fini (void)
ddsrt_mutex_destroy (&thread_states.lock);
ddsrt_free_aligned (thread_states.ts);
thread_states.ts = NULL;
return true;
}
else
{
return false;
}
}
@ -211,26 +231,6 @@ static int find_free_slot (const char *name)
return -1;
}
void upgrade_main_thread (void)
{
int cand;
struct thread_state1 *ts1;
ddsrt_mutex_lock (&thread_states.lock);
if ((cand = find_free_slot ("name")) < 0)
abort ();
ts1 = &thread_states.ts[cand];
if (ts1->state == THREAD_STATE_ZERO)
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
ts1->state = THREAD_STATE_LAZILY_CREATED;
ts1->tid = ddsrt_thread_self ();
DDSRT_WARNING_MSVC_OFF(4996);
strncpy (ts1->name, main_thread_name, sizeof (ts1->name));
DDSRT_WARNING_MSVC_ON(4996);
ts1->name[sizeof (ts1->name) - 1] = 0;
ddsrt_mutex_unlock (&thread_states.lock);
tsd_thread_state = ts1;
}
const struct config_thread_properties_listelem *lookup_thread_properties (const struct config *config, const char *name)
{
const struct config_thread_properties_listelem *e;
@ -340,15 +340,6 @@ void reset_thread_state (struct thread_state1 *ts1)
reap_thread_state (ts1);
}
void downgrade_main_thread (void)
{
struct thread_state1 *ts1 = lookup_thread_state ();
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
/* no need to sync with service lease: already stopped */
reap_thread_state (ts1);
thread_states_init_static ();
}
void log_stack_traces (const struct ddsrt_log_cfg *logcfg, const struct q_globals *gv)
{
for (uint32_t i = 0; i < thread_states.nthreads; i++)

View file

@ -900,7 +900,6 @@ static int writer_may_continue (const struct writer *wr, const struct whc_state
return (whcst->unacked_bytes <= wr->whc_low && !wr->retransmitting) || (wr->state != WRST_OPERATIONAL);
}
static dds_return_t throttle_writer (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr)
{
/* Sleep (cond_wait) without updating the thread's vtime: the
@ -1084,6 +1083,13 @@ static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *
}
}
if (wr->state != WRST_OPERATIONAL)
{
r = DDS_RETCODE_PRECONDITION_NOT_MET;
ddsrt_mutex_unlock (&wr->e.lock);
goto drop;
}
/* Always use the current monotonic time */
tnow = now_mt ();
serdata->twrite = tnow;