From ba46cb1140ea86c7e5321e27bc6a20dedb745ff4 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Tue, 15 Jan 2019 10:04:30 +0100 Subject: [PATCH] rework listener invocation and entity status flags Listener/status management invocation was rather expensive, and especially the cost of checking listeners, then setting status flags and triggering waitsets ran into severe lock contention. A major cost was the repeated use of dds_entity_lock and dds_entity_unlock, these have been eliminated. Another cost was that each time an event occurred (with DATA_AVAILABLE the most problematic one) it would walk the chain of ancestors to see if any had a relevant listener, and only if none of them had any, it would set the status flags. The locking/unlocking of the entity has been eliminated by moving the listener/status flag manipulation from the general entity lock to its m_observers_lock. That lock has a much smaller scope, and consequently contention has been significantly reduced. Instead of walking the entity hierarchy looking for listeners, an entity now inherits the ancestors' listeners. The set_listener operation has been made a little more complicated by the need to not only set the listeners for the specified entity, but to also update any inherited listeners its descendants. The commit is a bit larger than strictly needed ... I've started reformatting the code to reduce the variety of styles ... as there I haven't been able to find a single tool that does what I want, it may well end up as manual work. Signed-off-by: Erik Boasson --- .../ddsc/include/ddsc/dds_public_listener.h | 14 +- src/core/ddsc/src/dds__entity.h | 20 +- src/core/ddsc/src/dds__listener.h | 7 +- src/core/ddsc/src/dds__reader.h | 2 + src/core/ddsc/src/dds__types.h | 57 +- src/core/ddsc/src/dds_entity.c | 2044 +++++++---------- src/core/ddsc/src/dds_guardcond.c | 160 +- src/core/ddsc/src/dds_listener.c | 295 +-- src/core/ddsc/src/dds_reader.c | 342 +-- src/core/ddsc/src/dds_rhc.c | 4 +- src/core/ddsc/src/dds_subscriber.c | 21 - src/core/ddsc/src/dds_topic.c | 64 +- src/core/ddsc/src/dds_waitset.c | 56 +- src/core/ddsc/src/dds_writer.c | 110 +- 14 files changed, 1401 insertions(+), 1795 deletions(-) diff --git a/src/core/ddsc/include/ddsc/dds_public_listener.h b/src/core/ddsc/include/ddsc/dds_public_listener.h index e3aa792..bec465b 100644 --- a/src/core/ddsc/include/ddsc/dds_public_listener.h +++ b/src/core/ddsc/include/ddsc/dds_public_listener.h @@ -44,16 +44,9 @@ typedef void (*dds_on_requested_incompatible_qos_fn) (dds_entity_t reader, const typedef void (*dds_on_publication_matched_fn) (dds_entity_t writer, const dds_publication_matched_status_t status, void* arg); typedef void (*dds_on_subscription_matched_fn) (dds_entity_t reader, const dds_subscription_matched_status_t status, void* arg); -#if 0 -/* TODO: Why use (*dds_on_any_fn) (); and DDS_LUNSET? Why not just set the callbacks to NULL? */ -typedef void (*dds_on_any_fn) (); /**< Empty parameter list on purpose; should be assignable without cast to all of the above. @todo check with an actual compiler; I'm a sloppy compiler */ -#define DDS_LUNSET ((dds_on_any_fn)1) /**< Callback indicating a callback isn't set */ -#else -#define DDS_LUNSET (NULL) -#endif - -struct c_listener; -typedef struct c_listener dds_listener_t; +#define DDS_LUNSET 0 +struct dds_listener; +typedef struct dds_listener dds_listener_t; /** * @brief Allocate memory and initializes to default values (::DDS_LUNSET) of a listener @@ -104,7 +97,6 @@ DDS_DEPRECATED_EXPORT void dds_listener_copy (_Out_ dds_listener_t * __restrict DDS_EXPORT void dds_merge_listener (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src); DDS_DEPRECATED_EXPORT void dds_listener_merge (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src); - /************************************************************************************************ * Setters ************************************************************************************************/ diff --git a/src/core/ddsc/src/dds__entity.h b/src/core/ddsc/src/dds__entity.h index 62285c9..ac92fee 100644 --- a/src/core/ddsc/src/dds__entity.h +++ b/src/core/ddsc/src/dds__entity.h @@ -37,14 +37,6 @@ void dds_entity_add_ref_nolock( _In_ dds_entity *e); -_Check_return_ dds__retcode_t -dds_entity_listener_propagation( - _Inout_opt_ dds_entity *e, - _In_ dds_entity *src, - _In_ uint32_t status, - _In_opt_ void *metrics, - _In_ bool propagate); - #define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \ qualifier_ dds__retcode_t type_##_lock (dds_entity_t hdl, type_ **x) \ { \ @@ -68,9 +60,7 @@ inline bool dds_entity_is_enabled (const dds_entity *e) { return (e->m_flags & DDS_ENTITY_ENABLED) != 0; } -inline void dds_entity_status_set (dds_entity *e, uint32_t t) { - e->m_trigger |= e->m_status_enable & t; -} +void dds_entity_status_set (dds_entity *e, uint32_t t); inline void dds_entity_status_reset (dds_entity *e, uint32_t t) { e->m_trigger &= ~t; @@ -88,11 +78,9 @@ inline dds_entity_kind_t dds_entity_kind_from_handle (dds_entity_t hdl) { return (hdl > 0) ? (dds_entity_kind_t) (hdl & DDS_ENTITY_KIND_MASK) : DDS_KIND_DONTCARE; } -/* The mutex needs to be unlocked when calling this because the entity can be called - * within the signal callback from other contexts. That shouldn't deadlock. */ -void -dds_entity_status_signal( - _In_ dds_entity *e); +void dds_entity_status_signal (dds_entity *e); + +void dds_entity_invoke_listener (const dds_entity *entity, uint32_t status, const void *vst); _Check_return_ dds__retcode_t dds_valid_hdl( diff --git a/src/core/ddsc/src/dds__listener.h b/src/core/ddsc/src/dds__listener.h index b25453b..2d3ee90 100644 --- a/src/core/ddsc/src/dds__listener.h +++ b/src/core/ddsc/src/dds__listener.h @@ -19,11 +19,8 @@ extern "C" { #endif -/* - * Listener API (internal & external) are present in - * dds__types.h - * ddsc/dds_public_listener.h - */ +void dds_override_inherited_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src); +void dds_inherit_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__reader.h b/src/core/ddsc/src/dds__reader.h index 48b1168..5c9684b 100644 --- a/src/core/ddsc/src/dds__reader.h +++ b/src/core/ddsc/src/dds__reader.h @@ -23,6 +23,8 @@ struct status_cb_data; void dds_reader_status_cb (void *entity, const struct status_cb_data * data); +void dds_reader_data_available_cb (struct dds_reader *entity); + /* dds_reader_lock_samples: Returns number of samples in read cache and locks the reader cache to make sure that the samples content doesn't change. diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index 78b4665..4b4cb03 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -59,22 +59,35 @@ typedef bool (*dds_querycondition_filter_with_ctx_fn) (const void * sample, cons /* The listener struct. */ -typedef struct c_listener { - dds_on_inconsistent_topic_fn on_inconsistent_topic; - dds_on_liveliness_lost_fn on_liveliness_lost; - dds_on_offered_deadline_missed_fn on_offered_deadline_missed; - dds_on_offered_incompatible_qos_fn on_offered_incompatible_qos; - dds_on_data_on_readers_fn on_data_on_readers; - dds_on_sample_lost_fn on_sample_lost; - dds_on_data_available_fn on_data_available; - dds_on_sample_rejected_fn on_sample_rejected; - dds_on_liveliness_changed_fn on_liveliness_changed; - dds_on_requested_deadline_missed_fn on_requested_deadline_missed; - dds_on_requested_incompatible_qos_fn on_requested_incompatible_qos; - dds_on_publication_matched_fn on_publication_matched; - dds_on_subscription_matched_fn on_subscription_matched; - void *arg; -} c_listener_t; +struct dds_listener { + uint32_t inherited; + dds_on_inconsistent_topic_fn on_inconsistent_topic; + void *on_inconsistent_topic_arg; + dds_on_liveliness_lost_fn on_liveliness_lost; + void *on_liveliness_lost_arg; + dds_on_offered_deadline_missed_fn on_offered_deadline_missed; + void *on_offered_deadline_missed_arg; + dds_on_offered_incompatible_qos_fn on_offered_incompatible_qos; + void *on_offered_incompatible_qos_arg; + dds_on_data_on_readers_fn on_data_on_readers; + void *on_data_on_readers_arg; + dds_on_sample_lost_fn on_sample_lost; + void *on_sample_lost_arg; + dds_on_data_available_fn on_data_available; + void *on_data_available_arg; + dds_on_sample_rejected_fn on_sample_rejected; + void *on_sample_rejected_arg; + dds_on_liveliness_changed_fn on_liveliness_changed; + void *on_liveliness_changed_arg; + dds_on_requested_deadline_missed_fn on_requested_deadline_missed; + void *on_requested_deadline_missed_arg; + dds_on_requested_incompatible_qos_fn on_requested_incompatible_qos; + void *on_requested_incompatible_qos_arg; + dds_on_publication_matched_fn on_publication_matched; + void *on_publication_matched_arg; + dds_on_subscription_matched_fn on_subscription_matched; + void *on_subscription_matched_arg; +}; /* Entity flag values */ @@ -98,7 +111,6 @@ typedef struct dds_entity_deriver { dds_return_t (*delete)(struct dds_entity *e); dds_return_t (*set_qos)(struct dds_entity *e, const dds_qos_t *qos, bool enabled); dds_return_t (*validate_status)(uint32_t mask); - dds_return_t (*propagate_status)(struct dds_entity *e, uint32_t mask, bool set); dds_return_t (*get_instance_hdl)(struct dds_entity *e, dds_instance_handle_t *i); } dds_entity_deriver; @@ -126,15 +138,18 @@ typedef struct dds_entity dds_qos_t * m_qos; dds_domainid_t m_domainid; nn_guid_t m_guid; - uint32_t m_status_enable; uint32_t m_flags; - uint32_t m_cb_count; os_mutex m_mutex; os_cond m_cond; - c_listener_t m_listener; - uint32_t m_trigger; + os_mutex m_observers_lock; + os_cond m_observers_cond; + dds_listener_t m_listener; + uint32_t m_trigger; + uint32_t m_status_enable; + uint32_t m_cb_count; dds_entity_observer *m_observers; + struct ut_handlelink *m_hdllink; } dds_entity; diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 843405e..71fc819 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -24,1301 +24,1011 @@ #error "DDS_ENTITY_KIND_MASK != UT_HANDLE_KIND_MASK" #endif - extern inline bool dds_entity_is_enabled (const dds_entity *e); -extern inline void dds_entity_status_set (dds_entity *e, uint32_t t); extern inline void dds_entity_status_reset (dds_entity *e, uint32_t t); extern inline bool dds_entity_status_match (const dds_entity *e, uint32_t t); extern inline dds_entity_kind_t dds_entity_kind (const dds_entity *e); extern inline dds_entity_kind_t dds_entity_kind_from_handle (dds_entity_t hdl); -static void -dds_entity_observers_delete( - _In_ dds_entity *observed); +static void dds_entity_observers_signal (dds_entity *observed, uint32_t status); +static void dds_entity_observers_delete (dds_entity *observed); -static void -dds_entity_observers_signal( - _In_ dds_entity *observed, - _In_ uint32_t status); - -void -dds_entity_add_ref(_In_ dds_entity * e) +void dds_entity_add_ref_nolock (dds_entity *e) { - assert (e); - os_mutexLock (&e->m_mutex); - e->m_refc++; - os_mutexUnlock (&e->m_mutex); + e->m_refc++; } -dds_domain * -dds__entity_domain(_In_ dds_entity* e) +void dds_entity_add_ref (dds_entity *e) { - return e->m_domain; + os_mutexLock (&e->m_mutex); + dds_entity_add_ref_nolock (e); + os_mutexUnlock (&e->m_mutex); } -static void -dds_set_explicit( - _In_ dds_entity_t entity); - -/*This function returns the parent entity of e. If e is a participant it returns NULL*/ -_Ret_maybenull_ -static dds_entity * -dds__nonself_parent( - _In_ dds_entity *e){ - return e->m_parent == e ? NULL : e->m_parent; +dds_domain *dds__entity_domain (dds_entity *e) +{ + return e->m_domain; } -void -dds_entity_add_ref_nolock(_In_ dds_entity *e) +static void dds_set_explicit (dds_entity_t entity) { - assert(e); - e->m_refc++; + dds_entity *e; + dds__retcode_t rc; + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) == DDS_RETCODE_OK) + { + e->m_flags &= ~DDS_ENTITY_IMPLICIT; + dds_entity_unlock (e); + } } -_Check_return_ dds__retcode_t -dds_entity_listener_propagation( - _Inout_opt_ dds_entity *e, - _In_ dds_entity *src, - _In_ uint32_t status, - _In_opt_ void *metrics, - _In_ bool propagate) +static dds_entity *dds__nonself_parent (dds_entity *e) { - dds__retcode_t rc = DDS_RETCODE_NO_DATA; /* Mis-use NO_DATA as NO_CALL. */ - dds_entity *dummy; - /* e will be NULL when reaching the top of the entity hierarchy. */ - if (e) { - rc = dds_entity_lock(e->m_hdl, DDS_KIND_DONTCARE, &dummy); - if (rc == DDS_RETCODE_OK) { - dds_listener_t *l = &e->m_listener; - - assert(e == dummy); - - /* Indicate that a callback will be in progress, so that a parallel - * delete/set_listener will wait. */ - e->m_cb_count++; - - /* Calling the actual listener should be done unlocked. */ - dds_entity_unlock(e); - - /* Now, perform the callback when available. */ - rc = DDS_RETCODE_NO_DATA; - switch (status) { - case DDS_INCONSISTENT_TOPIC_STATUS: - if (l->on_inconsistent_topic != DDS_LUNSET) { - assert(metrics); - l->on_inconsistent_topic(src->m_hdl, *((dds_inconsistent_topic_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_OFFERED_DEADLINE_MISSED_STATUS: - if (l->on_offered_deadline_missed != DDS_LUNSET) { - assert(metrics); - l->on_offered_deadline_missed(src->m_hdl, *((dds_offered_deadline_missed_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_REQUESTED_DEADLINE_MISSED_STATUS: - if (l->on_requested_deadline_missed != DDS_LUNSET) { - assert(metrics); - l->on_requested_deadline_missed(src->m_hdl, *((dds_requested_deadline_missed_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: - if (l->on_offered_incompatible_qos != DDS_LUNSET) { - assert(metrics); - l->on_offered_incompatible_qos(src->m_hdl, *((dds_offered_incompatible_qos_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: - if (l->on_requested_incompatible_qos != DDS_LUNSET) { - assert(metrics); - l->on_requested_incompatible_qos(src->m_hdl, *((dds_requested_incompatible_qos_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_SAMPLE_LOST_STATUS: - if (l->on_sample_lost != DDS_LUNSET) { - assert(metrics); - l->on_sample_lost(src->m_hdl, *((dds_sample_lost_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_SAMPLE_REJECTED_STATUS: - if (l->on_sample_rejected != DDS_LUNSET) { - assert(metrics); - l->on_sample_rejected(src->m_hdl, *((dds_sample_rejected_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_DATA_ON_READERS_STATUS: - if (l->on_data_on_readers != DDS_LUNSET) { - l->on_data_on_readers(src->m_hdl, l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_DATA_AVAILABLE_STATUS: - if (l->on_data_available != DDS_LUNSET) { - l->on_data_available(src->m_hdl, l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_LIVELINESS_LOST_STATUS: - if (l->on_liveliness_lost != DDS_LUNSET) { - assert(metrics); - l->on_liveliness_lost(src->m_hdl, *((dds_liveliness_lost_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_LIVELINESS_CHANGED_STATUS: - if (l->on_liveliness_changed != DDS_LUNSET) { - assert(metrics); - l->on_liveliness_changed(src->m_hdl, *((dds_liveliness_changed_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_PUBLICATION_MATCHED_STATUS: - if (l->on_publication_matched != DDS_LUNSET) { - assert(metrics); - l->on_publication_matched(src->m_hdl, *((dds_publication_matched_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - case DDS_SUBSCRIPTION_MATCHED_STATUS: - if (l->on_subscription_matched != DDS_LUNSET) { - assert(metrics); - l->on_subscription_matched(src->m_hdl, *((dds_subscription_matched_status_t*)metrics), l->arg); - rc = DDS_RETCODE_OK; - } - break; - default: assert (0); - } - if ((rc == DDS_RETCODE_NO_DATA) && propagate) { - /* See if the parent is interested. */ - rc = dds_entity_listener_propagation(dds__nonself_parent(e), src, status, metrics, propagate); - } - - os_mutexLock(&(e->m_mutex)); - /* We are done with our callback. */ - e->m_cb_count--; - /* Wake up possible waiting threads. */ - os_condBroadcast(&e->m_cond); - os_mutexUnlock(&(e->m_mutex)); - } - } - return rc; + return e->m_parent == e ? NULL : e->m_parent; } - -static void -dds_entity_cb_wait (_In_ dds_entity *e) +dds_entity_t dds_entity_init (dds_entity *e, dds_entity *parent, dds_entity_kind_t kind, dds_qos_t *qos, const dds_listener_t *listener, uint32_t mask) { - while (e->m_cb_count > 0) { - os_condWait (&e->m_cond, &e->m_mutex); - } -} + assert ((kind == DDS_KIND_PARTICIPANT) == (parent == NULL)); + assert (e); + e->m_refc = 1; + e->m_qos = qos; + e->m_cb_count = 0; + e->m_observers = NULL; + e->m_trigger = 0; + /* TODO: CHAM-96: Implement dynamic enabling of entity. */ + e->m_flags |= DDS_ENTITY_ENABLED; -_Check_return_ dds_entity_t -dds_entity_init( - _In_ dds_entity * e, - _When_(kind != DDS_KIND_PARTICIPANT, _Notnull_) - _When_(kind == DDS_KIND_PARTICIPANT, _Null_) - _In_opt_ dds_entity * parent, - _In_ dds_entity_kind_t kind, - _In_opt_ dds_qos_t * qos, - _In_opt_ const dds_listener_t *listener, - _In_ uint32_t mask) -{ - assert (e); + /* set the status enable based on kind */ + e->m_status_enable = mask | DDS_INTERNAL_STATUS_MASK; - e->m_refc = 1; - e->m_qos = qos; - e->m_cb_count = 0; - e->m_observers = NULL; - e->m_trigger = 0; + os_mutexInit (&e->m_mutex); + os_mutexInit (&e->m_observers_lock); + os_condInit (&e->m_cond, &e->m_mutex); + os_condInit (&e->m_observers_cond, &e->m_observers_lock); - /* TODO: CHAM-96: Implement dynamic enabling of entity. */ - e->m_flags |= DDS_ENTITY_ENABLED; + if (parent) + { + e->m_parent = parent; + e->m_domain = parent->m_domain; + e->m_domainid = parent->m_domainid; + e->m_participant = parent->m_participant; + e->m_next = parent->m_children; + parent->m_children = e; + } + else + { + e->m_participant = e; + e->m_parent = e; + } - /* set the status enable based on kind */ - e->m_status_enable = mask | DDS_INTERNAL_STATUS_MASK; + dds_reset_listener (&e->m_listener); + if (listener) + dds_merge_listener (&e->m_listener, listener); + if (parent) + { + os_mutexLock (&e->m_observers_lock); + dds_inherit_listener (&e->m_listener, &parent->m_listener); + os_mutexUnlock (&e->m_observers_lock); + } - os_mutexInit (&e->m_mutex); - os_mutexInit (&e->m_observers_lock); - os_condInit (&e->m_cond, &e->m_mutex); - - if (parent) { - e->m_parent = parent; - e->m_domain = parent->m_domain; - e->m_domainid = parent->m_domainid; - e->m_participant = parent->m_participant; - e->m_next = parent->m_children; - parent->m_children = e; + e->m_hdllink = NULL; + e->m_hdl = ut_handle_create ((int32_t) kind, e); + if (e->m_hdl > 0) + { + e->m_hdllink = ut_handle_get_link (e->m_hdl); + assert(e->m_hdllink); + } + else + { + if (e->m_hdl == UT_HANDLE_OUT_OF_RESOURCES) { + DDS_ERROR ("Can not create new entity; too many where created previously\n"); + e->m_hdl = DDS_ERRNO (DDS_RETCODE_OUT_OF_RESOURCES); + } else if (e->m_hdl == UT_HANDLE_NOT_INITALIZED) { + DDS_ERROR (DDSC_PROJECT_NAME" is not yet initialized. Please create a participant before executing an other method\n"); + e->m_hdl = DDS_ERRNO (DDS_RETCODE_PRECONDITION_NOT_MET); } else { - assert (kind == DDS_KIND_PARTICIPANT); - e->m_participant = e; - e->m_parent = e; + DDS_ERROR ("An internal error has occurred\n"); + e->m_hdl = DDS_ERRNO (DDS_RETCODE_ERROR); } + } - if (listener) { - dds_copy_listener(&e->m_listener, listener); - } else { - dds_reset_listener(&e->m_listener); - } - - e->m_hdllink = NULL; - e->m_hdl = ut_handle_create((int32_t)kind, e); - if (e->m_hdl > 0) { - e->m_hdllink = ut_handle_get_link(e->m_hdl); - assert(e->m_hdllink); - } else{ - if (e->m_hdl == UT_HANDLE_OUT_OF_RESOURCES) { - DDS_ERROR("Can not create new entity; too many where created previously\n"); - e->m_hdl = DDS_ERRNO(DDS_RETCODE_OUT_OF_RESOURCES); - } else if (e->m_hdl == UT_HANDLE_NOT_INITALIZED) { - DDS_ERROR(DDSC_PROJECT_NAME" is not yet initialized. Please create a participant before executing an other method\n"); - e->m_hdl = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET); - } else { - DDS_ERROR("An internal error has occurred\n"); - e->m_hdl = DDS_ERRNO(DDS_RETCODE_ERROR); - } - } - - /* An ut_handle_t is directly used as dds_entity_t. */ - return (dds_entity_t)(e->m_hdl); + /* An ut_handle_t is directly used as dds_entity_t. */ + return (dds_entity_t) e->m_hdl; } - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -dds_return_t -dds_delete( - _In_ dds_entity_t entity) +dds_return_t dds_delete (dds_entity_t entity) { - dds_return_t ret; - - ret = dds_delete_impl(entity, false); - - return ret; + return dds_delete_impl (entity, false); } - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -dds_return_t -dds_delete_impl( - _In_ dds_entity_t entity, - _In_ bool keep_if_explicit) +static dds_entity *next_non_topic_child (dds_entity *remaining_children) { - os_time timeout = { 10, 0 }; - dds_entity *e; - dds_entity *child; - dds_entity *parent; - dds_entity *prev = NULL; - dds_entity *next = NULL; - dds_return_t ret = DDS_RETCODE_OK; - dds__retcode_t rc; + while (remaining_children != NULL && dds_entity_kind (remaining_children) == DDS_KIND_TOPIC) + remaining_children = remaining_children->m_next; + return remaining_children; +} - rc = dds_entity_lock(entity, UT_HANDLE_DONTCARE_KIND, &e); - if (rc != DDS_RETCODE_OK) { - DDS_ERROR("Error on locking entity\n"); - return DDS_ERRNO(rc); - } +dds_return_t dds_delete_impl (dds_entity_t entity, bool keep_if_explicit) +{ + os_time timeout = { 10, 0 }; + dds_entity *e; + dds_entity *child; + dds_entity *parent; + dds_entity *prev = NULL; + dds_entity *next = NULL; + dds_return_t ret; + dds__retcode_t rc; - if(keep_if_explicit == true && ((e->m_flags & DDS_ENTITY_IMPLICIT) == 0)){ - dds_entity_unlock(e); - return DDS_RETCODE_OK; - } - - if (--e->m_refc != 0) { - dds_entity_unlock(e); - return DDS_RETCODE_OK; - } - - dds_entity_cb_wait(e); - - ut_handle_close(e->m_hdl, e->m_hdllink); - e->m_status_enable = 0; - dds_reset_listener(&e->m_listener); - e->m_trigger |= DDS_DELETING_STATUS; + rc = dds_entity_lock (entity, UT_HANDLE_DONTCARE_KIND, &e); + if (rc != DDS_RETCODE_OK) + { + DDS_ERROR ("Error on locking entity\n"); + return DDS_ERRNO (rc); + } + if (keep_if_explicit == true && (e->m_flags & DDS_ENTITY_IMPLICIT) == 0) + { dds_entity_unlock(e); + return DDS_RETCODE_OK; + } - /* Signal observers that this entity will be deleted. */ - dds_entity_status_signal(e); + if (--e->m_refc != 0) + { + dds_entity_unlock (e); + return DDS_RETCODE_OK; + } - /* - * Recursively delete children. - * - * It is possible that a writer/reader has the last reference - * to a topic. This will mean that when deleting a writer could - * cause a topic to be deleted. - * This can cause issues when deleting the children of a participant: - * when a topic is the next child in line to be deleted, while at the - * same time it is already being deleted due to the recursive deletion - * of a publisher->writer. - * - * Another problem is that when the topic was already deleted, and - * we'd delete it here for the second time before the writer/reader - * is deleted, they will have dangling pointers. - * - * To circumvent the problem. We ignore topics in the first loop. - */ - child = e->m_children; - while ((child != NULL) && (dds_entity_kind_from_handle(child->m_hdl) == DDS_KIND_TOPIC)) { + ut_handle_close (e->m_hdl, e->m_hdllink); + os_mutexLock (&e->m_observers_lock); + while (e->m_cb_count > 0) + os_condWait (&e->m_observers_cond, &e->m_observers_lock); + e->m_status_enable = 0; + dds_reset_listener (&e->m_listener); + e->m_trigger |= DDS_DELETING_STATUS; + os_mutexUnlock (&e->m_observers_lock); + dds_entity_unlock(e); + + /* Signal observers that this entity will be deleted. */ + dds_entity_status_signal (e); + + /* + * Recursively delete children. + * + * It is possible that a writer/reader has the last reference + * to a topic. This will mean that when deleting a writer could + * cause a topic to be deleted. + * This can cause issues when deleting the children of a participant: + * when a topic is the next child in line to be deleted, while at the + * same time it is already being deleted due to the recursive deletion + * of a publisher->writer. + * + * Another problem is that when the topic was already deleted, and + * we'd delete it here for the second time before the writer/reader + * is deleted, they will have dangling pointers. + * + * To circumvent the problem. We ignore topics in the first loop. + */ + ret = DDS_RETCODE_OK; + child = next_non_topic_child (e->m_children); + while (child != NULL && ret == DDS_RETCODE_OK) + { + next = next_non_topic_child (child->m_next); + /* This will probably delete the child entry from the current children's list */ + ret = dds_delete (child->m_hdl); + child = next; + } + child = e->m_children; + while (child != NULL && ret == DDS_RETCODE_OK) + { + next = child->m_next; + assert (dds_entity_kind (child) == DDS_KIND_TOPIC); + ret = dds_delete (child->m_hdl); + child = next; + } + if (ret == DDS_RETCODE_OK && e->m_deriver.close) + { + /* Close the entity. This can terminate threads or kick of + * other destroy stuff that takes a while. */ + ret = e->m_deriver.close (e); + } + + if (ret == DDS_RETCODE_OK) + { + /* The ut_handle_delete will wait until the last active claim on that handle + * is released. It is possible that this last release will be done by a thread + * that was kicked during the close(). */ + if (ut_handle_delete (e->m_hdl, e->m_hdllink, timeout) != UT_HANDLE_OK) + return DDS_ERRNO (DDS_RETCODE_TIMEOUT); + } + + if (ret == DDS_RETCODE_OK) + { + /* Remove all possible observers. */ + dds_entity_observers_delete (e); + + /* Remove from parent */ + if ((parent = dds__nonself_parent(e)) != NULL) + { + os_mutexLock (&parent->m_mutex); + child = parent->m_children; + while (child && child != e) + { + prev = child; child = child->m_next; - } - while ((child != NULL) && (ret == DDS_RETCODE_OK)) { - next = child->m_next; - while ((next != NULL) && (dds_entity_kind_from_handle(next->m_hdl) == DDS_KIND_TOPIC)) { - next = next->m_next; - } - /* This will probably delete the child entry from - * the current childrens list */ - ret = dds_delete(child->m_hdl); - /* Next child. */ - child = next; - } - child = e->m_children; - while ((child != NULL) && (ret == DDS_RETCODE_OK)) { - next = child->m_next; - assert(dds_entity_kind_from_handle(child->m_hdl) == DDS_KIND_TOPIC); - /* This will probably delete the child entry from - * the current childrens list */ - ret = dds_delete(child->m_hdl); - /* Next child. */ - child = next; + } + assert (child != NULL); + if (prev) + prev->m_next = e->m_next; + else + parent->m_children = e->m_next; + os_mutexUnlock (&parent->m_mutex); } + /* Do some specific deletion when needed. */ + if (e->m_deriver.delete) + ret = e->m_deriver.delete(e); + } - if (ret == DDS_RETCODE_OK) { - /* Close the entity. This can terminate threads or kick of - * other destroy stuff that takes a while. */ - if (e->m_deriver.close) { - ret = e->m_deriver.close(e); - } - } - - if (ret == DDS_RETCODE_OK) { - /* The ut_handle_delete will wait until the last active claim on that handle - * is released. It is possible that this last release will be done by a thread - * that was kicked during the close(). */ - if (ut_handle_delete(e->m_hdl, e->m_hdllink, timeout) != UT_HANDLE_OK) { - DDS_ERROR("Entity deletion did not release resources\n"); - return DDS_ERRNO(DDS_RETCODE_TIMEOUT); - } - } - - if (ret == DDS_RETCODE_OK) { - /* Remove all possible observers. */ - dds_entity_observers_delete(e); - - /* Remove from parent */ - if ((parent = dds__nonself_parent(e)) != NULL) { - os_mutexLock (&parent->m_mutex); - child = parent->m_children; - while (child) { - if (child == e) { - if (prev) { - prev->m_next = e->m_next; - } else { - parent->m_children = e->m_next; - } - break; - } - prev = child; - child = child->m_next; - } - os_mutexUnlock (&parent->m_mutex); - } - - /* Do some specific deletion when needed. */ - if (e->m_deriver.delete) { - ret = e->m_deriver.delete(e); - } - } - - if (ret == DDS_RETCODE_OK) { - /* Destroy last few things. */ - dds_delete_qos (e->m_qos); - os_condDestroy (&e->m_cond); - os_mutexDestroy (&e->m_mutex); - os_mutexDestroy (&e->m_observers_lock); - dds_free (e); - } + if (ret == DDS_RETCODE_OK) + { + dds_delete_qos (e->m_qos); + os_condDestroy (&e->m_cond); + os_condDestroy (&e->m_observers_cond); + os_mutexDestroy (&e->m_mutex); + os_mutexDestroy (&e->m_observers_lock); + dds_free (e); + } return ret; + } + +dds_entity_t dds_get_parent (dds_entity_t entity) +{ + dds_entity *e; + dds__retcode_t rc; + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + else + { + dds_entity *parent; + dds_entity_t hdl; + if ((parent = dds__nonself_parent(e)) == NULL) + hdl = DDS_ENTITY_NIL; + else + { + hdl = parent->m_hdl; + dds_set_explicit (hdl); + } + dds_entity_unlock (e); + return hdl; + } } - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_entity_t -dds_get_parent( - _In_ dds_entity_t entity) +dds_entity_t dds_get_participant (dds_entity_t entity) { - dds_entity *e; - dds__retcode_t rc; - dds_entity_t hdl; - dds_entity *parent; - - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if ((parent = dds__nonself_parent(e)) != NULL) { - hdl = parent->m_hdl; - dds_set_explicit(hdl); - } else { - hdl = DDS_ENTITY_NIL; - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error on locking handle entity\n"); - hdl = DDS_ERRNO(rc); - } - + dds_entity *e; + dds__retcode_t rc; + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + else + { + dds_entity_t hdl = e->m_participant->m_hdl; + dds_entity_unlock (e); return hdl; + } +} + +dds_return_t dds_get_children (dds_entity_t entity, dds_entity_t *children, size_t size) +{ + dds_entity *e; + dds__retcode_t rc; + + if (children != NULL && (size <= 0 || size >= INT32_MAX)) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + if (children == NULL && size != 0) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + else + { + dds_return_t n = 0; + dds_entity *iter = e->m_children; + while (iter) + { + if ((size_t) n < size) + { + children[n] = iter->m_hdl; + dds_set_explicit (iter->m_hdl); + } + n++; + iter = iter->m_next; + } + dds_entity_unlock(e); + return n; + } +} + +dds_return_t dds_get_qos (dds_entity_t entity, dds_qos_t *qos) +{ + dds_entity *e; + dds__retcode_t rc; + dds_return_t ret; + + if (qos == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if (e->m_deriver.set_qos) + ret = dds_copy_qos (qos, e->m_qos); + else + ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); + dds_entity_unlock(e); + return ret; +} + +dds_return_t dds_set_qos (dds_entity_t entity, const dds_qos_t *qos) +{ + dds_entity *e; + dds__retcode_t rc; + dds_return_t ret; + + if (qos == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if (e->m_deriver.set_qos == 0) + ret = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); + else + { + if ((ret = e->m_deriver.set_qos (e, qos, e->m_flags & DDS_ENTITY_ENABLED)) == DDS_RETCODE_OK) + { + if (e->m_qos == NULL) + e->m_qos = dds_create_qos (); + rc = dds_copy_qos (e->m_qos, qos); + ret = DDS_ERRNO (rc); + } + } + dds_entity_unlock (e); + return ret; } dds_return_t dds_get_listener (dds_entity_t entity, dds_listener_t *listener) { - dds_entity *e; - dds__retcode_t rc; - dds_entity_t hdl; + dds_entity *e; + dds_return_t ret = DDS_RETCODE_OK; + dds__retcode_t rc; + if (listener != NULL) { rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); if (rc == DDS_RETCODE_OK) { - assert(e->m_participant); - hdl = e->m_participant->m_hdl; - dds_entity_unlock(e); + os_mutexLock (&e->m_observers_lock); + dds_copy_listener (listener, &e->m_listener); + os_mutexUnlock (&e->m_observers_lock); + dds_entity_unlock(e); } else { - DDS_ERROR("Error on locking handle entity\n"); - hdl = DDS_ERRNO(rc); + DDS_ERROR("Error occurred on locking entity\n"); + ret = DDS_ERRNO(rc); } + } else { + DDS_ERROR("Argument listener is NULL\n"); + ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); + } - return hdl; + return ret; } - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_get_children( - _In_ dds_entity_t entity, - _Out_opt_ dds_entity_t *children, - _In_ size_t size) +void dds_entity_invoke_listener (const dds_entity *entity, uint32_t status, const void *vst) { - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - dds_entity* iter; - - if ((children != NULL) && ((size <= 0) || (size >= INT32_MAX))) { - DDS_ERROR("Array is given, but with invalid size\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err; + struct dds_listener const * const lst = &entity->m_listener; + switch (status) + { + case DDS_INCONSISTENT_TOPIC_STATUS: { + struct dds_inconsistent_topic_status const * const st = vst; + lst->on_inconsistent_topic (entity->m_hdl, *st, lst->on_inconsistent_topic_arg); + break; } - - if ((children == NULL) && (size != 0)) { - DDS_ERROR("Size is given, but no array\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto err; + case DDS_REQUESTED_DEADLINE_MISSED_STATUS: { + struct dds_requested_deadline_missed_status const * const st = vst; + lst->on_requested_deadline_missed (entity->m_hdl, *st, lst->on_requested_deadline_missed_arg); + break; } - - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc != DDS_RETCODE_OK) { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - goto err; + case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: { + struct dds_requested_incompatible_qos_status const * const st = vst; + lst->on_requested_incompatible_qos (entity->m_hdl, *st, lst->on_requested_incompatible_qos_arg); + break; } - /* Initialize first child to satisfy SAL. */ - if (children) { - children[0] = 0; + case DDS_SAMPLE_LOST_STATUS: { + struct dds_sample_lost_status const * const st = vst; + lst->on_sample_lost (entity->m_hdl, *st, lst->on_sample_lost_arg); + break; } - ret = 0; - iter = e->m_children; - while (iter) { - if ((size_t)ret < size) { /*To fix the warning of signed/unsigned mismatch, type casting is done for the variable 'ret'*/ - children[ret] = iter->m_hdl; - dds_set_explicit(iter->m_hdl); - } - ret++; - iter = iter->m_next; + case DDS_SAMPLE_REJECTED_STATUS: { + struct dds_sample_rejected_status const * const st = vst; + lst->on_sample_rejected (entity->m_hdl, *st, lst->on_sample_rejected_arg); + break; } - dds_entity_unlock(e); - -err: - return ret; + case DDS_LIVELINESS_CHANGED_STATUS: { + struct dds_liveliness_changed_status const * const st = vst; + lst->on_liveliness_changed (entity->m_hdl, *st, lst->on_liveliness_changed_arg); + break; + } + case DDS_SUBSCRIPTION_MATCHED_STATUS: { + struct dds_subscription_matched_status const * const st = vst; + lst->on_subscription_matched (entity->m_hdl, *st, lst->on_subscription_matched_arg); + break; + } + case DDS_OFFERED_DEADLINE_MISSED_STATUS: { + struct dds_offered_deadline_missed_status const * const st = vst; + lst->on_offered_deadline_missed (entity->m_hdl, *st, lst->on_offered_deadline_missed_arg); + break; + } + case DDS_LIVELINESS_LOST_STATUS: { + struct dds_liveliness_lost_status const * const st = vst; + lst->on_liveliness_lost (entity->m_hdl, *st, lst->on_liveliness_lost_arg); + break; + } + case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: { + struct dds_offered_incompatible_qos_status const * const st = vst; + lst->on_offered_incompatible_qos (entity->m_hdl, *st, lst->on_offered_incompatible_qos_arg); + break; + } + case DDS_PUBLICATION_MATCHED_STATUS: { + struct dds_publication_matched_status const * const st = vst; + lst->on_publication_matched (entity->m_hdl, *st, lst->on_publication_matched_arg); + break; + } + case DDS_DATA_AVAILABLE_STATUS: { + lst->on_data_available (entity->m_hdl, lst->on_data_available_arg); + break; + } + case DDS_DATA_ON_READERS_STATUS: { + lst->on_data_on_readers (entity->m_hdl, lst->on_data_on_readers_arg); + break; + } + } } - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_get_qos( - _In_ dds_entity_t entity, - _Out_ dds_qos_t *qos) +static void clear_status_with_listener (struct dds_entity *e) { - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - - if (qos == NULL) { - DDS_ERROR("Argument qos is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - goto fail; - } - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc != DDS_RETCODE_OK) { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - goto fail; - } - if (e->m_deriver.set_qos) { - ret = dds_copy_qos(qos, e->m_qos); - } else { - DDS_ERROR("QoS cannot be set on this entity\n"); - ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); - } - dds_entity_unlock(e); -fail: - return ret; + const struct dds_listener *lst = &e->m_listener; + if (lst->on_inconsistent_topic) + e->m_trigger &= ~DDS_INCONSISTENT_TOPIC_STATUS; + if (lst->on_liveliness_lost) + e->m_trigger &= ~DDS_LIVELINESS_LOST_STATUS; + if (lst->on_offered_deadline_missed) + e->m_trigger &= ~DDS_OFFERED_DEADLINE_MISSED_STATUS; + if (lst->on_offered_deadline_missed_arg) + e->m_trigger &= ~DDS_OFFERED_DEADLINE_MISSED_STATUS; + if (lst->on_offered_incompatible_qos) + e->m_trigger &= ~DDS_OFFERED_INCOMPATIBLE_QOS_STATUS; + if (lst->on_data_on_readers) + e->m_trigger &= ~DDS_DATA_ON_READERS_STATUS; + if (lst->on_sample_lost) + e->m_trigger &= ~DDS_SAMPLE_LOST_STATUS; + if (lst->on_data_available) + e->m_trigger &= ~DDS_DATA_AVAILABLE_STATUS; + if (lst->on_sample_rejected) + e->m_trigger &= ~DDS_SAMPLE_REJECTED_STATUS; + if (lst->on_liveliness_changed) + e->m_trigger &= ~DDS_LIVELINESS_CHANGED_STATUS; + if (lst->on_requested_deadline_missed) + e->m_trigger &= ~DDS_REQUESTED_DEADLINE_MISSED_STATUS; + if (lst->on_requested_incompatible_qos) + e->m_trigger &= ~DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS; + if (lst->on_publication_matched) + e->m_trigger &= ~DDS_PUBLICATION_MATCHED_STATUS; + if (lst->on_subscription_matched) + e->m_trigger &= ~DDS_SUBSCRIPTION_MATCHED_STATUS; } - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_set_qos( - _In_ dds_entity_t entity, - _In_ const dds_qos_t *qos) +static void pushdown_listener (dds_entity_t entity) { + dds_entity_t *cs = NULL; + int ncs, size = 0; + while ((ncs = dds_get_children (entity, cs, (size_t) size)) > size) + { + size = ncs; + cs = os_realloc (cs, (size_t) size * sizeof (*cs)); + } + for (int i = 0; i < ncs; i++) + { dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - - if (qos != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if (e->m_deriver.set_qos) { - ret = e->m_deriver.set_qos(e, qos, e->m_flags & DDS_ENTITY_ENABLED); - } else { - DDS_ERROR("QoS cannot be set on this entity\n"); - ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); - } - if (ret == DDS_RETCODE_OK) { - /* Remember this QoS. */ - if (e->m_qos == NULL) { - e->m_qos = dds_create_qos(); - } - rc = dds_copy_qos(e->m_qos, qos); - DDS_ERROR("QoS cannot be set on this entity\n"); - ret = DDS_ERRNO(rc); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("Argument QoS is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); + if (dds_entity_lock (cs[i], DDS_KIND_DONTCARE, &e) == DDS_RETCODE_OK) + { + dds_listener_t tmp; + os_mutexLock (&e->m_observers_lock); + while (e->m_cb_count > 0) + os_condWait (&e->m_observers_cond, &e->m_observers_lock); + dds_get_listener (entity, &tmp); + dds_override_inherited_listener (&e->m_listener, &tmp); + clear_status_with_listener (e); + os_mutexUnlock (&e->m_observers_lock); + dds_entity_unlock (e); } - - return ret; -} - - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_get_listener( - _In_ dds_entity_t entity, - _Out_ dds_listener_t *listener) -{ - dds_entity *e; - dds_return_t ret = DDS_RETCODE_OK; - dds__retcode_t rc; - - if (listener != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - dds_entity_cb_wait(e); - dds_copy_listener (listener, &e->m_listener); - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("Argument listener is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - - return ret; + } + os_free (cs); } dds_return_t dds_set_listener (dds_entity_t entity, const dds_listener_t *listener) { + dds_entity *e, *x; + dds__retcode_t rc; + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + os_mutexLock (&e->m_observers_lock); + while (e->m_cb_count > 0) + os_condWait (&e->m_observers_cond, &e->m_observers_lock); + + /* new listener is constructed by combining "listener" with the ancestral listeners; + the new set of listeners is then pushed down into the descendant entities, overriding + the ones they originally inherited from */ + dds_reset_listener (&e->m_listener); + if (listener) + dds_merge_listener (&e->m_listener, listener); + x = e; + while (dds_entity_kind (x) != DDS_KIND_PARTICIPANT) + { + x = x->m_parent; + dds_inherit_listener (&e->m_listener, &x->m_listener); + } + clear_status_with_listener (e); + os_mutexUnlock (&e->m_observers_lock); + dds_entity_unlock (e); + pushdown_listener (entity); + return DDS_RETCODE_OK; +} + +dds_return_t dds_enable (dds_entity_t entity) +{ + dds_entity *e; + dds__retcode_t rc; + + if ((rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if ((e->m_flags & DDS_ENTITY_ENABLED) == 0) + { + /* TODO: Really enable. */ + e->m_flags |= DDS_ENTITY_ENABLED; + DDS_ERROR ("Delayed entity enabling is not supported\n"); + } + dds_entity_unlock(e); + return DDS_RETCODE_OK; +} + +dds_return_t dds_get_status_changes (dds_entity_t entity, uint32_t *status) +{ + dds_entity *e; + dds__retcode_t rc; + dds_return_t ret; + + if (status == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if (e->m_deriver.validate_status == 0) + ret = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); + else + { + os_mutexLock (&e->m_observers_lock); + *status = e->m_trigger; + os_mutexUnlock (&e->m_observers_lock); + ret = DDS_RETCODE_OK; + } + dds_entity_unlock(e); + return ret; +} + +dds_return_t dds_get_status_mask (dds_entity_t entity, uint32_t *mask) +{ + dds_entity *e; + dds__retcode_t rc; + dds_return_t ret; + + if (mask == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if (e->m_deriver.validate_status == 0) + ret = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); + else + { + os_mutexLock (&e->m_observers_lock); + *mask = (e->m_status_enable & ~DDS_INTERNAL_STATUS_MASK); + os_mutexUnlock (&e->m_observers_lock); + ret = DDS_RETCODE_OK; + } + dds_entity_unlock(e); + return ret; +} + +dds_return_t dds_get_enabled_status (dds_entity_t entity, uint32_t *status) +{ + return dds_get_status_mask(entity, status); +} + +dds_return_t dds_set_status_mask (dds_entity_t entity, uint32_t mask) +{ + dds_entity *e; + dds__retcode_t rc; + dds_return_t ret; + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if (e->m_deriver.validate_status == 0) + ret = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); + else if ((ret = e->m_deriver.validate_status (mask)) == DDS_RETCODE_OK) + { + os_mutexLock (&e->m_observers_lock); + /* Don't block internal status triggers. */ + mask |= DDS_INTERNAL_STATUS_MASK; + e->m_status_enable = mask; + e->m_trigger &= mask; + os_mutexUnlock (&e->m_observers_lock); + } + dds_entity_unlock(e); + return ret; +} + +dds_return_t dds_set_enabled_status(dds_entity_t entity, uint32_t mask) +{ + return dds_set_status_mask( entity, mask); +} + +static dds_return_t dds_readtake_status (dds_entity_t entity, uint32_t *status, uint32_t mask, bool reset) +{ + dds_entity *e; + dds__retcode_t rc; + dds_return_t ret; + + if (status == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if (e->m_deriver.validate_status == 0) + ret = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); + else if ((ret = e->m_deriver.validate_status (mask)) == DDS_RETCODE_OK) + { + os_mutexLock (&e->m_observers_lock); + *status = e->m_trigger & mask; + if (reset) + e->m_trigger &= ~mask; + os_mutexUnlock (&e->m_observers_lock); + } + dds_entity_unlock (e); + return ret; +} + + +dds_return_t dds_read_status (dds_entity_t entity, uint32_t *status, uint32_t mask) +{ + return dds_readtake_status (entity, status, mask, false); +} + +dds_return_t dds_take_status (dds_entity_t entity, uint32_t *status, uint32_t mask) +{ + return dds_readtake_status (entity, status, mask, true); +} + +dds_return_t dds_get_domainid (dds_entity_t entity, dds_domainid_t *id) +{ + dds_entity *e; + dds__retcode_t rc; + + if (id == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + *id = e->m_domainid; + dds_entity_unlock(e); + return DDS_RETCODE_OK; +} + +dds_return_t dds_get_instance_handle (dds_entity_t entity, dds_instance_handle_t *ihdl) +{ + dds_entity *e; + dds__retcode_t rc; + dds_return_t ret; + + if (ihdl == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); + + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + if (e->m_deriver.get_instance_hdl) + ret = e->m_deriver.get_instance_hdl (e, ihdl); + else + ret = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); + dds_entity_unlock(e); + return ret; +} + + +dds__retcode_t dds_valid_hdl (dds_entity_t hdl, dds_entity_kind_t kind) +{ + ut_handle_t utr; + if ((utr = ut_handle_status (hdl, NULL, (int32_t) kind)) == UT_HANDLE_OK) + return DDS_RETCODE_OK; + else if (hdl < 0) + return DDS_RETCODE_BAD_PARAMETER; + else + { + switch (utr) + { + case UT_HANDLE_OK: + assert (0); + /* FALLS THROUGH */ + case UT_HANDLE_UNEQUAL_KIND: + return DDS_RETCODE_ILLEGAL_OPERATION; + case UT_HANDLE_INVALID: + return DDS_RETCODE_BAD_PARAMETER; + case UT_HANDLE_DELETED: + case UT_HANDLE_CLOSED: + return DDS_RETCODE_ALREADY_DELETED; + default: + return DDS_RETCODE_ERROR; + } + } +} + +dds__retcode_t dds_entity_lock (dds_entity_t hdl, dds_entity_kind_t kind, dds_entity **eptr) +{ + ut_handle_t utr; + void *raw; + + /* When the given handle already contains an error, then return that + * same error to retain the original information. */ + if ((utr = ut_handle_claim (hdl, NULL, (int32_t) kind, &raw)) == UT_HANDLE_OK) + { dds_entity *e; - dds__retcode_t rc; - dds_return_t ret = DDS_RETCODE_OK; + *eptr = e = raw; + os_mutexLock (&e->m_mutex); + /* FIXME: The handle could have been closed while we were waiting for the mutex -- that should be handled differently! - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if(rc != DDS_RETCODE_OK){ - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - goto fail; + For now, however, it is really important at two points in the logic: + (1) preventing creating new entities as children of a one that is currently being deleted, and + (2) preventing dds_delete_impl from doing anything once the entity is being deleted. + + Without (1), it would be possible to add children while trying to delete them, without (2) you're looking at crashes. */ + if (ut_handle_is_closed (hdl, e->m_hdllink)) + { + dds_entity_unlock (e); + return DDS_RETCODE_ALREADY_DELETED; } - dds_entity_cb_wait(e); - if (listener) { - dds_copy_listener(&e->m_listener, listener); - } else { - dds_reset_listener(&e->m_listener); + return DDS_RETCODE_OK; + } + else if (hdl < 0) + { + return DDS_RETCODE_BAD_PARAMETER; + } + else + { + switch (utr) + { + case UT_HANDLE_OK: + assert (0); + /* FALLS THROUGH */ + case UT_HANDLE_UNEQUAL_KIND: + return DDS_RETCODE_ILLEGAL_OPERATION; + case UT_HANDLE_INVALID: + return DDS_RETCODE_BAD_PARAMETER; + case UT_HANDLE_DELETED: + case UT_HANDLE_CLOSED: + return DDS_RETCODE_ALREADY_DELETED; + default: + return DDS_RETCODE_ERROR; } - dds_entity_unlock(e); -fail: - return ret; + } } - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_enable( - _In_ dds_entity_t entity) +void dds_entity_unlock (dds_entity *e) { - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret = DDS_RETCODE_OK; - - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if ((e->m_flags & DDS_ENTITY_ENABLED) == 0) { - /* TODO: CHAM-96: Really enable. */ - e->m_flags |= DDS_ENTITY_ENABLED; - DDS_ERROR("Delayed entity enabling is not supported\n"); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - - return ret; + os_mutexUnlock (&e->m_mutex); + ut_handle_release (e->m_hdl, e->m_hdllink); } - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Must_inspect_result_ dds_return_t -dds_get_status_changes( - _In_ dds_entity_t entity, - _Out_ uint32_t *status) +dds_return_t dds_triggered (dds_entity_t entity) { - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; + dds_entity *e; + dds_return_t ret; + dds__retcode_t rc; - if (status != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if (e->m_deriver.validate_status) { - *status = e->m_trigger; - ret = DDS_RETCODE_OK; - } else { - DDS_ERROR("This entity does not maintain a status\n"); - ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("Argument status is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - - return ret; + if ((rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + os_mutexLock (&e->m_observers_lock); + ret = (e->m_trigger != 0); + os_mutexUnlock (&e->m_observers_lock); + dds_entity_unlock (e); + return ret; } - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_get_status_mask( - _In_ dds_entity_t entity, - _Out_ uint32_t *mask) -{ - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - - if (mask != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if (e->m_deriver.validate_status) { - *mask = (e->m_status_enable & ~DDS_INTERNAL_STATUS_MASK); - ret = DDS_RETCODE_OK; - } else { - DDS_ERROR("This entity does not maintain a status mask\n"); - ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else{ - DDS_ERROR("Argument mask is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - - return ret; -} - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_get_enabled_status( - _In_ dds_entity_t entity, - _Out_ uint32_t *status) -{ - return dds_get_status_mask(entity, status); -} - - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -DDS_EXPORT dds_return_t -dds_set_status_mask( - _In_ dds_entity_t entity, - _In_ uint32_t mask) -{ - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if (e->m_deriver.validate_status) { - ret = e->m_deriver.validate_status(mask); - if (ret == DDS_RETCODE_OK) { - /* Don't block internal status triggers. */ - mask |= DDS_INTERNAL_STATUS_MASK; - e->m_status_enable = mask; - e->m_trigger &= mask; - if (e->m_deriver.propagate_status) { - ret = e->m_deriver.propagate_status(e, mask, true); - } - } - } else { - DDS_ERROR("This entity does not maintain a status\n"); - ret = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - - return ret; -} - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -DDS_EXPORT dds_return_t -dds_set_enabled_status( - _In_ dds_entity_t entity, - _In_ uint32_t mask) -{ - return dds_set_status_mask(entity, mask); -} - - - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_read_status( - _In_ dds_entity_t entity, - _Out_ uint32_t *status, - _In_ uint32_t mask) -{ - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - - if (status != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if (e->m_deriver.validate_status) { - ret = e->m_deriver.validate_status(mask); - assert(ret <= DDS_RETCODE_OK); - if (ret == DDS_RETCODE_OK) { - *status = e->m_trigger & mask; - } - } else { - DDS_ERROR("This entity does not maintain a status\n"); - ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("Argument status is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - - return ret; -} - - - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_take_status( - _In_ dds_entity_t entity, - _Out_ uint32_t *status, - _In_ uint32_t mask) -{ - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - - if (status != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if (e->m_deriver.validate_status) { - ret = e->m_deriver.validate_status(mask); - assert(ret <= DDS_RETCODE_OK); - if (ret == DDS_RETCODE_OK) { - *status = e->m_trigger & mask; - if (e->m_deriver.propagate_status) { - ret = e->m_deriver.propagate_status(e, *status, false); - } - e->m_trigger &= ~mask; - } - } else { - DDS_ERROR("This entity does not maintain a status\n"); - ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("Argument status is NUL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - - return ret; -} - - - -void -dds_entity_status_signal( - _In_ dds_entity *e) -{ - assert(e); - /* Signal the observers in an unlocked state. - * This is safe because we use handles and the current entity - * will not be deleted while we're in this signaling state - * due to a claimed handle. */ - dds_entity_observers_signal(e, e->m_trigger); -} - - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_get_domainid( - _In_ dds_entity_t entity, - _Out_ dds_domainid_t *id) -{ - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret = DDS_RETCODE_OK; - - if (id != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - *id = e->m_domainid; - dds_entity_unlock(e); - } else{ - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else{ - DDS_ERROR("Argument domain id is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - - return ret; -} - - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -_Check_return_ dds_return_t -dds_get_instance_handle( - _In_ dds_entity_t entity, - _Out_ dds_instance_handle_t *ihdl) -{ - dds_entity *e; - dds__retcode_t rc; - dds_return_t ret; - - if (ihdl != NULL) { - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - if (e->m_deriver.get_instance_hdl) { - ret = e->m_deriver.get_instance_hdl(e, ihdl); - } else { - DDS_ERROR("Instance handle is not valid\n"); - ret = DDS_ERRNO(DDS_RETCODE_ILLEGAL_OPERATION); - } - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("Argument instance handle is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } - - return ret; -} - - -_Check_return_ dds__retcode_t -dds_valid_hdl( - _In_ dds_entity_t hdl, - _In_ dds_entity_kind_t kind) -{ - dds__retcode_t rc = hdl; - ut_handle_t utr; - - /* When the given handle already contains an error, then return that - * same error to retain the original information. */ - if (hdl >= 0) { - utr = ut_handle_status(hdl, NULL, (int32_t)kind); - if(utr == UT_HANDLE_OK){ - rc = DDS_RETCODE_OK; - } else if(utr == UT_HANDLE_UNEQUAL_KIND){ - rc = DDS_RETCODE_ILLEGAL_OPERATION; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity type can not perform this operation\n", dds__entity_kind_str(hdl), hdl); - } else if(utr == UT_HANDLE_INVALID){ - rc = DDS_RETCODE_BAD_PARAMETER; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity is invalid\n", dds__entity_kind_str(hdl), hdl); - } else if(utr == UT_HANDLE_DELETED){ - rc = DDS_RETCODE_ALREADY_DELETED; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity is already deleted\n", dds__entity_kind_str(hdl), hdl); - } else if(utr == UT_HANDLE_CLOSED){ - rc = DDS_RETCODE_ALREADY_DELETED; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity is already deleted\n", dds__entity_kind_str(hdl), hdl); - } else { - rc = DDS_RETCODE_ERROR; - DDS_WARNING("An internal error occurred\n"); - } - } else { - rc = DDS_RETCODE_BAD_PARAMETER; - DDS_WARNING("Given entity (0x%08"PRIx32") was not properly created\n", hdl); - } - return rc; -} - -_Acquires_exclusive_lock_(*e) -_Check_return_ dds__retcode_t -dds_entity_lock( - _In_ dds_entity_t hdl, - _In_ dds_entity_kind_t kind, - _Out_ dds_entity **e) -{ - dds__retcode_t rc = hdl; - ut_handle_t utr; - assert(e); - /* When the given handle already contains an error, then return that - * same error to retain the original information. */ - if (hdl >= 0) { - utr = ut_handle_claim(hdl, NULL, (int32_t)kind, (void**)e); - if (utr == UT_HANDLE_OK) { - os_mutexLock(&((*e)->m_mutex)); - /* The handle could have been closed while we were waiting for the mutex. */ - if (ut_handle_is_closed(hdl, (*e)->m_hdllink)) { - dds_entity_unlock(*e); - utr = UT_HANDLE_CLOSED; - } - } - if(utr == UT_HANDLE_OK){ - rc = DDS_RETCODE_OK; - } else if(utr == UT_HANDLE_UNEQUAL_KIND){ - rc = DDS_RETCODE_ILLEGAL_OPERATION; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity type can not perform this operation\n", dds__entity_kind_str(hdl), hdl); - } else if(utr == UT_HANDLE_INVALID){ - rc = DDS_RETCODE_BAD_PARAMETER; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity is invalid\n", dds__entity_kind_str(hdl), hdl); - } else if(utr == UT_HANDLE_DELETED){ - rc = DDS_RETCODE_ALREADY_DELETED; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity is already deleted\n", dds__entity_kind_str(hdl), hdl); - } else if(utr == UT_HANDLE_CLOSED){ - rc = DDS_RETCODE_ALREADY_DELETED; - DDS_WARNING("Given (%s) [0x%08"PRIx32"] entity is already deleted\n", dds__entity_kind_str(hdl), hdl); - } else { - rc = DDS_RETCODE_ERROR; - DDS_WARNING("An internal error occurred\n"); - } - } else { - rc = DDS_RETCODE_BAD_PARAMETER; - DDS_WARNING("Given entity (0x%08"PRIx32") was not properly created\n", hdl); - } - return rc; -} - - -_Releases_exclusive_lock_(e) -void -dds_entity_unlock( - _Inout_ dds_entity *e) -{ - assert(e); - os_mutexUnlock(&e->m_mutex); - ut_handle_release(e->m_hdl, e->m_hdllink); -} - - - -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -dds_return_t -dds_triggered( - _In_ dds_entity_t entity) -{ - dds_entity *e; - dds_return_t ret; - dds__retcode_t rc; - - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - ret = (e->m_trigger != 0); - dds_entity_unlock(e); - } else { - DDS_ERROR("Error occurred on locking entity\n"); - ret = DDS_ERRNO(rc); - } - - return ret; -} - - static bool in_observer_list_p (const struct dds_entity *observed, const dds_entity_t observer) { - dds_entity_observer *cur; - for (cur = observed->m_observers; cur != NULL; cur = cur->m_next) { - if (cur->m_observer == observer) { - return true; - } - } - return false; + dds_entity_observer *cur; + for (cur = observed->m_observers; cur != NULL; cur = cur->m_next) + if (cur->m_observer == observer) + return true; + return false; } -_Check_return_ dds__retcode_t -dds_entity_observer_register_nl( - _In_ dds_entity* observed, - _In_ dds_entity_t observer, - _In_ dds_entity_callback cb) +dds__retcode_t dds_entity_observer_register_nl (dds_entity *observed, dds_entity_t observer, dds_entity_callback cb) { - dds__retcode_t rc; - dds_entity_observer *o = os_malloc(sizeof(dds_entity_observer)); - assert(observed); + dds__retcode_t rc; + assert (observed); + os_mutexLock (&observed->m_observers_lock); + if (in_observer_list_p (observed, observer)) + rc = DDS_RETCODE_PRECONDITION_NOT_MET; + else + { + dds_entity_observer *o = os_malloc (sizeof (dds_entity_observer)); o->m_cb = cb; o->m_observer = observer; - os_mutexLock(&observed->m_observers_lock); - if (in_observer_list_p (observed, observer)) { - rc = DDS_RETCODE_PRECONDITION_NOT_MET; - os_free (o); - } else { - rc = DDS_RETCODE_OK; - o->m_next = observed->m_observers; - observed->m_observers = o; - } - os_mutexUnlock(&observed->m_observers_lock); + o->m_next = observed->m_observers; + observed->m_observers = o; + rc = DDS_RETCODE_OK; + } + os_mutexUnlock (&observed->m_observers_lock); + return rc; +} + +dds__retcode_t dds_entity_observer_register (dds_entity_t observed, dds_entity_t observer, dds_entity_callback cb) +{ + dds__retcode_t rc; + dds_entity *e; + assert (cb); + if ((rc = dds_entity_lock (observed, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) return rc; + rc = dds_entity_observer_register_nl (e, observer, cb); + dds_entity_unlock (e); + return rc; } - - -_Check_return_ dds__retcode_t -dds_entity_observer_register( - _In_ dds_entity_t observed, - _In_ dds_entity_t observer, - _In_ dds_entity_callback cb) +dds__retcode_t dds_entity_observer_unregister_nl (dds_entity *observed, dds_entity_t observer) { - dds__retcode_t rc; - dds_entity *e; - assert(cb); - rc = dds_entity_lock(observed, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - rc = dds_entity_observer_register_nl(e, observer, cb); - dds_entity_unlock(e); - } else{ - DDS_ERROR("Error occurred on locking observer\n"); - } + dds__retcode_t rc; + dds_entity_observer *prev, *idx; + + os_mutexLock (&observed->m_observers_lock); + prev = NULL; + idx = observed->m_observers; + while (idx != NULL && idx->m_observer != observer) + { + prev = idx; + idx = idx->m_next; + } + if (idx == NULL) + rc = DDS_RETCODE_PRECONDITION_NOT_MET; + else + { + if (prev == NULL) + observed->m_observers = idx->m_next; + else + prev->m_next = idx->m_next; + os_free (idx); + rc = DDS_RETCODE_OK; + } + os_mutexUnlock (&observed->m_observers_lock); + return rc; +} + +dds__retcode_t dds_entity_observer_unregister (dds_entity_t observed, dds_entity_t observer) +{ + dds__retcode_t rc; + dds_entity *e; + if ((rc = dds_entity_lock (observed, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) return rc; + rc = dds_entity_observer_unregister_nl (e, observer); + dds_entity_unlock (e); + return rc; } - - -dds__retcode_t -dds_entity_observer_unregister_nl( - _In_ dds_entity* observed, - _In_ dds_entity_t observer) +static void dds_entity_observers_delete (dds_entity *observed) { - dds__retcode_t rc = DDS_RETCODE_PRECONDITION_NOT_MET; - dds_entity_observer *prev = NULL; - dds_entity_observer *idx; - os_mutexLock(&observed->m_observers_lock); - idx = observed->m_observers; - while (idx != NULL) { - if (idx->m_observer == observer) { - if (prev == NULL) { - observed->m_observers = idx->m_next; - } else { - prev->m_next = idx->m_next; - } - os_free(idx); - idx = NULL; - rc = DDS_RETCODE_OK; - } else { - prev = idx; - idx = idx->m_next; - } - } - os_mutexUnlock(&observed->m_observers_lock); - return rc; + dds_entity_observer *idx; + os_mutexLock (&observed->m_observers_lock); + idx = observed->m_observers; + while (idx != NULL) + { + dds_entity_observer *next = idx->m_next; + os_free (idx); + idx = next; + } + observed->m_observers = NULL; + os_mutexUnlock (&observed->m_observers_lock); } - - -dds__retcode_t -dds_entity_observer_unregister( - _In_ dds_entity_t observed, - _In_ dds_entity_t observer) +static void dds_entity_observers_signal (dds_entity *observed, uint32_t status) { - dds__retcode_t rc; - dds_entity *e; - rc = dds_entity_lock(observed, DDS_KIND_DONTCARE, &e); - if (rc == DDS_RETCODE_OK) { - rc = dds_entity_observer_unregister_nl(e, observer); - dds_entity_unlock(e); - } else{ - DDS_ERROR("Error occurred on locking entity\n"); - (void)DDS_ERRNO(rc); - } - return rc; + for (dds_entity_observer *idx = observed->m_observers; idx; idx = idx->m_next) + idx->m_cb (idx->m_observer, observed->m_hdl, status); } - - -static void -dds_entity_observers_delete( - _In_ dds_entity *observed) +void dds_entity_status_signal (dds_entity *e) { - dds_entity_observer *next; - dds_entity_observer *idx; - os_mutexLock(&observed->m_observers_lock); - idx = observed->m_observers; - while (idx != NULL) { - next = idx->m_next; - os_free(idx); - idx = next; - } - observed->m_observers = NULL; - os_mutexUnlock(&observed->m_observers_lock); + os_mutexLock (&e->m_observers_lock); + dds_entity_observers_signal (e, e->m_trigger); + os_mutexUnlock (&e->m_observers_lock); } - - -static void -dds_entity_observers_signal( - _In_ dds_entity *observed, - _In_ uint32_t status) +void dds_entity_status_set (dds_entity *e, uint32_t t) { - dds_entity_observer *idx; - os_mutexLock(&observed->m_observers_lock); - idx = observed->m_observers; - while (idx != NULL) { - idx->m_cb(idx->m_observer, observed->m_hdl, status); - idx = idx->m_next; - } - os_mutexUnlock(&observed->m_observers_lock); + if (!(e->m_trigger & t)) + { + e->m_trigger |= e->m_status_enable & t; + dds_entity_observers_signal (e, e->m_trigger); + } } -_Pre_satisfies_(entity & DDS_ENTITY_KIND_MASK) -dds_entity_t -dds_get_topic( - _In_ dds_entity_t entity) +dds_entity_t dds_get_topic (dds_entity_t entity) { - dds__retcode_t rc; - dds_entity_t hdl = entity; - dds_reader *rd; - dds_writer *wr; + dds__retcode_t rc; + dds_entity_t hdl; + dds_entity *e; - rc = dds_reader_lock(entity, &rd); - if(rc == DDS_RETCODE_OK) { - hdl = rd->m_topic->m_entity.m_hdl; - dds_reader_unlock(rd); - } else if (rc == DDS_RETCODE_ILLEGAL_OPERATION) { - rc = dds_writer_lock(entity, &wr); - if (rc == DDS_RETCODE_OK) { - hdl = wr->m_topic->m_entity.m_hdl; - dds_writer_unlock(wr); - } else if (dds_entity_kind_from_handle(entity) == DDS_KIND_COND_READ || dds_entity_kind_from_handle(entity) == DDS_KIND_COND_QUERY) { - hdl = dds_get_topic(dds_get_parent(entity)); - rc = DDS_RETCODE_OK; - } + if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + switch (dds_entity_kind (e)) + { + case DDS_KIND_READER: { + dds_reader *rd = (dds_reader *) e; + hdl = rd->m_topic->m_entity.m_hdl; + break; } - if (rc != DDS_RETCODE_OK) { - DDS_ERROR("Error occurred on locking entity\n"); - hdl = DDS_ERRNO(rc); + case DDS_KIND_WRITER: { + dds_writer *wr = (dds_writer *) e; + hdl = wr->m_topic->m_entity.m_hdl; + break; } - - return hdl; + case DDS_KIND_COND_READ: + case DDS_KIND_COND_QUERY: { + assert (dds_entity_kind (e->m_parent) == DDS_KIND_READER); + dds_reader *rd = (dds_reader *) e->m_parent; + hdl = rd->m_topic->m_entity.m_hdl; + break; + } + default: { + hdl = DDS_ERRNO (DDS_RETCODE_ILLEGAL_OPERATION); + break; + } + } + dds_entity_unlock (e); + return hdl; } -static void -dds_set_explicit( - _In_ dds_entity_t entity) +const char *dds__entity_kind_str (dds_entity_t e) { - dds_entity *e; - dds__retcode_t rc; - rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e); - if( rc == DDS_RETCODE_OK){ - e->m_flags &= ~DDS_ENTITY_IMPLICIT; - dds_entity_unlock(e); - } -} - -const char * -dds__entity_kind_str(_In_ dds_entity_t e) -{ - if(e <= 0) { - return "(ERROR)"; - } - switch(e & DDS_ENTITY_KIND_MASK) { - case DDS_KIND_TOPIC: return "Topic"; - case DDS_KIND_PARTICIPANT: return "Participant"; - case DDS_KIND_READER: return "Reader"; - case DDS_KIND_WRITER: return "Writer"; - case DDS_KIND_SUBSCRIBER: return "Subscriber"; - case DDS_KIND_PUBLISHER: return "Publisher"; - case DDS_KIND_COND_READ: return "ReadCondition"; - case DDS_KIND_COND_QUERY: return "QueryCondition"; - case DDS_KIND_WAITSET: return "WaitSet"; - default: return "(INVALID_ENTITY)"; - } + if (e <= 0) + return "(ERROR)"; + switch (e & DDS_ENTITY_KIND_MASK) + { + case DDS_KIND_TOPIC: return "Topic"; + case DDS_KIND_PARTICIPANT: return "Participant"; + case DDS_KIND_READER: return "Reader"; + case DDS_KIND_WRITER: return "Writer"; + case DDS_KIND_SUBSCRIBER: return "Subscriber"; + case DDS_KIND_PUBLISHER: return "Publisher"; + case DDS_KIND_COND_READ: return "ReadCondition"; + case DDS_KIND_COND_QUERY: return "QueryCondition"; + case DDS_KIND_WAITSET: return "WaitSet"; + default: return "(INVALID_ENTITY)"; + } } diff --git a/src/core/ddsc/src/dds_guardcond.c b/src/core/ddsc/src/dds_guardcond.c index a69f650..eadbadc 100644 --- a/src/core/ddsc/src/dds_guardcond.c +++ b/src/core/ddsc/src/dds_guardcond.c @@ -21,121 +21,81 @@ DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_guardcond) -_Must_inspect_result_ dds_guardcond* -dds_create_guardcond( - _In_ dds_participant *pp) +dds_entity_t dds_create_guardcondition (dds_entity_t participant) { - dds_guardcond * gcond = dds_alloc(sizeof(*gcond)); - gcond->m_entity.m_hdl = dds_entity_init(&gcond->m_entity, &pp->m_entity, DDS_KIND_COND_GUARD, NULL, NULL, 0); - return gcond; -} - -_Pre_satisfies_((reader & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT) -_Must_inspect_result_ dds_entity_t -dds_create_guardcondition( - _In_ dds_entity_t participant) -{ - dds_entity_t hdl; - dds_participant * pp; - dds__retcode_t rc; - - rc = dds_participant_lock(participant, &pp); - if (rc == DDS_RETCODE_OK) { - dds_guardcond *cond = dds_create_guardcond(pp); - assert(cond); - hdl = cond->m_entity.m_hdl; - dds_participant_unlock(pp); - } else { - DDS_ERROR("Error occurred on locking reader\n"); - hdl = DDS_ERRNO(rc); - } + dds_participant *pp; + dds__retcode_t rc; + if ((rc = dds_participant_lock (participant, &pp)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + else + { + dds_guardcond * gcond = dds_alloc (sizeof (*gcond)); + dds_entity_t hdl = dds_entity_init (&gcond->m_entity, &pp->m_entity, DDS_KIND_COND_GUARD, NULL, NULL, 0); + dds_participant_unlock (pp); return hdl; + } } -_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_GUARD) ) -dds_return_t -dds_set_guardcondition( - _In_ dds_entity_t condition, - _In_ bool triggered) +dds_return_t dds_set_guardcondition (dds_entity_t condition, bool triggered) { - dds_return_t ret; - dds_guardcond *gcond; - dds__retcode_t rc; + dds_guardcond *gcond; + dds__retcode_t rc; - rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond); - if (rc == DDS_RETCODE_OK) { - if (triggered) { - dds_entity_status_set(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); - dds_entity_status_signal(&gcond->m_entity); - } else { - dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); - } - dds_entity_unlock(&gcond->m_entity); - ret = DDS_RETCODE_OK; - } else { - DDS_ERROR("Argument condition is not valid\n"); - ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_GUARD)); - } - - return ret; + if ((rc = dds_guardcond_lock (condition, &gcond)) != DDS_RETCODE_OK) + return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_GUARD)); + else + { + os_mutexLock (&gcond->m_entity.m_observers_lock); + if (triggered) + dds_entity_status_set (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + else + dds_entity_status_reset (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + os_mutexUnlock (&gcond->m_entity.m_observers_lock); + dds_guardcond_unlock (gcond); + return DDS_RETCODE_OK; + } } -_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_GUARD) ) -dds_return_t -dds_read_guardcondition( - _In_ dds_entity_t condition, - _Out_ bool *triggered) +dds_return_t dds_read_guardcondition (dds_entity_t condition, bool *triggered) { - dds_return_t ret; - dds_guardcond *gcond; - dds__retcode_t rc; + dds_guardcond *gcond; + dds__retcode_t rc; - if (triggered != NULL) { - *triggered = false; - rc = dds_guardcond_lock(condition, &gcond); - if (rc == DDS_RETCODE_OK) { - *triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); - dds_guardcond_unlock(gcond); - ret = DDS_RETCODE_OK; - } else { - DDS_ERROR("Argument condition is not valid\n"); - ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_GUARD)); - } - } else { - DDS_ERROR("Argument triggered is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } + if (triggered == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); - return ret; + *triggered = false; + if ((rc = dds_guardcond_lock (condition, &gcond)) != DDS_RETCODE_OK) + return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_GUARD)); + else + { + os_mutexLock (&gcond->m_entity.m_observers_lock); + *triggered = dds_entity_status_match (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + os_mutexUnlock (&gcond->m_entity.m_observers_lock); + dds_guardcond_unlock (gcond); + return DDS_RETCODE_OK; + } } -_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_GUARD) ) -dds_return_t -dds_take_guardcondition( - _In_ dds_entity_t condition, - _Out_ bool *triggered) +dds_return_t dds_take_guardcondition (dds_entity_t condition, bool *triggered) { - dds_return_t ret; - dds_guardcond *gcond; - dds__retcode_t rc; + dds_guardcond *gcond; + dds__retcode_t rc; - if (triggered != NULL) { - *triggered = false; - rc = dds_guardcond_lock(condition, &gcond); - if (rc == DDS_RETCODE_OK) { - *triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); - dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); - dds_guardcond_unlock (gcond); - ret = DDS_RETCODE_OK; - } else { - DDS_ERROR("Argument condition is not valid\n"); - ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_GUARD)); - } - } else { - DDS_ERROR("Argument triggered is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } + if (triggered == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); - return ret; + *triggered = false; + if ((rc = dds_guardcond_lock (condition, &gcond)) != DDS_RETCODE_OK) + return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_GUARD)); + else + { + os_mutexLock (&gcond->m_entity.m_observers_lock); + *triggered = dds_entity_status_match (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + dds_entity_status_reset (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + os_mutexUnlock (&gcond->m_entity.m_observers_lock); + dds_guardcond_unlock (gcond); + return DDS_RETCODE_OK; + } } diff --git a/src/core/ddsc/src/dds_listener.c b/src/core/ddsc/src/dds_listener.c index f4e5e51..640745b 100644 --- a/src/core/ddsc/src/dds_listener.c +++ b/src/core/ddsc/src/dds_listener.c @@ -13,163 +13,200 @@ #include "ddsc/dds.h" #include "dds__listener.h" - - -_Ret_notnull_ -dds_listener_t* -dds_create_listener(_In_opt_ void* arg) +dds_listener_t *dds_create_listener (void* arg) { - c_listener_t *l = dds_alloc(sizeof(*l)); - dds_reset_listener(l); - l->arg = arg; - return l; + dds_listener_t *l = dds_alloc (sizeof (*l)); + dds_reset_listener (l); + l->on_inconsistent_topic_arg = arg; + l->on_liveliness_lost_arg = arg; + l->on_offered_deadline_missed_arg = arg; + l->on_offered_incompatible_qos_arg = arg; + l->on_data_on_readers_arg = arg; + l->on_sample_lost_arg = arg; + l->on_data_available_arg = arg; + l->on_sample_rejected_arg = arg; + l->on_liveliness_changed_arg = arg; + l->on_requested_deadline_missed_arg = arg; + l->on_requested_incompatible_qos_arg = arg; + l->on_publication_matched_arg = arg; + l->on_subscription_matched_arg = arg; + return l; } -_Ret_notnull_ -dds_listener_t* -dds_listener_create(_In_opt_ void* arg) +dds_listener_t *dds_listener_create (void* arg) { - return dds_create_listener(arg); + return dds_create_listener (arg); } -void -dds_delete_listener(_In_ _Post_invalid_ dds_listener_t * __restrict listener) +void dds_delete_listener (dds_listener_t * __restrict listener) { - if (listener) { - dds_free(listener); - } + dds_free (listener); } -void -dds_listener_delete(_In_ _Post_invalid_ dds_listener_t * __restrict listener) +void dds_listener_delete (dds_listener_t * __restrict listener) { - dds_delete_listener(listener); + dds_delete_listener (listener); } -void -dds_reset_listener(_Out_ dds_listener_t * __restrict listener) +void dds_reset_listener (dds_listener_t * __restrict listener) { - if (listener) { - c_listener_t *l = listener; - l->on_data_available = DDS_LUNSET; - l->on_data_on_readers = DDS_LUNSET; - l->on_inconsistent_topic = DDS_LUNSET; - l->on_liveliness_changed = DDS_LUNSET; - l->on_liveliness_lost = DDS_LUNSET; - l->on_offered_deadline_missed = DDS_LUNSET; - l->on_offered_incompatible_qos = DDS_LUNSET; - l->on_publication_matched = DDS_LUNSET; - l->on_requested_deadline_missed = DDS_LUNSET; - l->on_requested_incompatible_qos = DDS_LUNSET; - l->on_sample_lost = DDS_LUNSET; - l->on_sample_rejected = DDS_LUNSET; - l->on_subscription_matched = DDS_LUNSET; - } else { - DDS_ERROR("Argument listener is NULL\n"); - } + if (listener) + { + dds_listener_t * const l = listener; + l->inherited = 0; + l->on_data_available = 0; + l->on_data_on_readers = 0; + l->on_inconsistent_topic = 0; + l->on_liveliness_changed = 0; + l->on_liveliness_lost = 0; + l->on_offered_deadline_missed = 0; + l->on_offered_incompatible_qos = 0; + l->on_publication_matched = 0; + l->on_requested_deadline_missed = 0; + l->on_requested_incompatible_qos = 0; + l->on_sample_lost = 0; + l->on_sample_rejected = 0; + l->on_subscription_matched = 0; + } } -void -dds_listener_reset(_Out_ dds_listener_t * __restrict listener) +void dds_listener_reset (dds_listener_t * __restrict listener) { - dds_reset_listener(listener); + dds_reset_listener (listener); } -void -dds_copy_listener(_Out_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src) +void dds_copy_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src) { - const c_listener_t *srcl = src; - c_listener_t *dstl = dst; - - if(!src){ - DDS_ERROR("Argument source(src) is NULL\n"); - return ; - } - if(!dst){ - DDS_ERROR("Argument destination(dst) is NULL\n"); - return ; - } - dstl->on_data_available = srcl->on_data_available; - dstl->on_data_on_readers = srcl->on_data_on_readers; - dstl->on_inconsistent_topic = srcl->on_inconsistent_topic; - dstl->on_liveliness_changed = srcl->on_liveliness_changed; - dstl->on_liveliness_lost = srcl->on_liveliness_lost; - dstl->on_offered_deadline_missed = srcl->on_offered_deadline_missed; - dstl->on_offered_incompatible_qos = srcl->on_offered_incompatible_qos; - dstl->on_publication_matched = srcl->on_publication_matched; - dstl->on_requested_deadline_missed = srcl->on_requested_deadline_missed; - dstl->on_requested_incompatible_qos = srcl->on_requested_incompatible_qos; - dstl->on_sample_lost = srcl->on_sample_lost; - dstl->on_sample_rejected = srcl->on_sample_rejected; - dstl->on_subscription_matched = srcl->on_subscription_matched; - dstl->arg = srcl->arg; + if (dst && src) + *dst = *src; } -void -dds_listener_copy(_Out_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src) +void dds_listener_copy(dds_listener_t * __restrict dst, const dds_listener_t * __restrict src) { - dds_copy_listener(dst, src); + dds_copy_listener (dst, src); } -void -dds_merge_listener (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src) +static bool dds_combine_listener_merge (uint32_t inherited, void (*dst)(void), void (*src)(void)) { - const c_listener_t *srcl = src; - c_listener_t *dstl = dst; - - if(!src){ - DDS_ERROR("Argument source(src) is NULL\n"); - return ; - } - if(!dst){ - DDS_ERROR("Argument destination(dst) is NULL\n"); - return ; - } - if (dstl->on_data_available == DDS_LUNSET) { - dstl->on_data_available = srcl->on_data_available; - } - if (dstl->on_data_on_readers == DDS_LUNSET) { - dstl->on_data_on_readers = srcl->on_data_on_readers; - } - if (dstl->on_inconsistent_topic == DDS_LUNSET) { - dstl->on_inconsistent_topic = srcl->on_inconsistent_topic; - } - if (dstl->on_liveliness_changed == DDS_LUNSET) { - dstl->on_liveliness_changed = srcl->on_liveliness_changed; - } - if (dstl->on_liveliness_lost == DDS_LUNSET) { - dstl->on_liveliness_lost = srcl->on_liveliness_lost; - } - if (dstl->on_offered_deadline_missed == DDS_LUNSET) { - dstl->on_offered_deadline_missed = srcl->on_offered_deadline_missed; - } - if (dstl->on_offered_incompatible_qos == DDS_LUNSET) { - dstl->on_offered_incompatible_qos = srcl->on_offered_incompatible_qos; - } - if (dstl->on_publication_matched == DDS_LUNSET) { - dstl->on_publication_matched = srcl->on_publication_matched; - } - if (dstl->on_requested_deadline_missed == DDS_LUNSET) { - dstl->on_requested_deadline_missed = srcl->on_requested_deadline_missed; - } - if (dstl->on_requested_incompatible_qos == DDS_LUNSET) { - dstl->on_requested_incompatible_qos = srcl->on_requested_incompatible_qos; - } - if (dstl->on_sample_lost == DDS_LUNSET) { - dstl->on_sample_lost = srcl->on_sample_lost; - } - if (dstl->on_sample_rejected == DDS_LUNSET) { - dstl->on_sample_rejected = srcl->on_sample_rejected; - } - if (dstl->on_subscription_matched == DDS_LUNSET) { - dstl->on_subscription_matched = srcl->on_subscription_matched; - } + (void)inherited; + (void)src; + return dst == 0; } -void -dds_listener_merge (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src) +static bool dds_combine_listener_override_inherited (uint32_t inherited, void (*dst)(void), void (*src)(void)) { - dds_merge_listener(dst, src); + (void)dst; + (void)src; + return inherited; +} + +static void dds_combine_listener (bool (*op) (uint32_t inherited, void (*)(void), void (*)(void)), dds_listener_t * __restrict dst, const dds_listener_t * __restrict src) +{ + if (op (dst->inherited & DDS_DATA_AVAILABLE_STATUS, (void (*)(void)) dst->on_data_available, (void (*)(void)) src->on_data_available)) + { + dst->inherited |= DDS_DATA_AVAILABLE_STATUS; + dst->on_data_available = src->on_data_available; + dst->on_data_available_arg = src->on_data_available_arg; + } + if (op (dst->inherited & DDS_DATA_ON_READERS_STATUS, (void (*)(void)) dst->on_data_on_readers, (void (*)(void)) src->on_data_on_readers)) + { + dst->inherited |= DDS_DATA_ON_READERS_STATUS; + dst->on_data_on_readers = src->on_data_on_readers; + dst->on_data_on_readers_arg = src->on_data_on_readers_arg; + } + if (op (dst->inherited & DDS_INCONSISTENT_TOPIC_STATUS, (void (*)(void)) dst->on_inconsistent_topic, (void (*)(void)) src->on_inconsistent_topic)) + { + dst->inherited |= DDS_INCONSISTENT_TOPIC_STATUS; + dst->on_inconsistent_topic = src->on_inconsistent_topic; + dst->on_inconsistent_topic_arg = src->on_inconsistent_topic_arg; + } + if (op (dst->inherited & DDS_LIVELINESS_CHANGED_STATUS, (void (*)(void)) dst->on_liveliness_changed, (void (*)(void)) src->on_liveliness_changed)) + { + dst->inherited |= DDS_LIVELINESS_CHANGED_STATUS; + dst->on_liveliness_changed = src->on_liveliness_changed; + dst->on_liveliness_changed_arg = src->on_liveliness_changed_arg; + } + if (op (dst->inherited & DDS_LIVELINESS_LOST_STATUS, (void (*)(void)) dst->on_liveliness_lost, (void (*)(void)) src->on_liveliness_lost)) + { + dst->inherited |= DDS_LIVELINESS_LOST_STATUS; + dst->on_liveliness_lost = src->on_liveliness_lost; + dst->on_liveliness_lost_arg = src->on_liveliness_lost_arg; + } + if (op (dst->inherited & DDS_OFFERED_DEADLINE_MISSED_STATUS, (void (*)(void)) dst->on_offered_deadline_missed, (void (*)(void)) src->on_offered_deadline_missed)) + { + dst->inherited |= DDS_OFFERED_DEADLINE_MISSED_STATUS; + dst->on_offered_deadline_missed = src->on_offered_deadline_missed; + dst->on_offered_deadline_missed_arg = src->on_offered_deadline_missed_arg; + } + if (op (dst->inherited & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS, (void (*)(void)) dst->on_offered_incompatible_qos, (void (*)(void)) src->on_offered_incompatible_qos)) + { + dst->inherited |= DDS_OFFERED_INCOMPATIBLE_QOS_STATUS; + dst->on_offered_incompatible_qos = src->on_offered_incompatible_qos; + dst->on_offered_incompatible_qos_arg = src->on_offered_incompatible_qos_arg; + } + if (op (dst->inherited & DDS_PUBLICATION_MATCHED_STATUS, (void (*)(void)) dst->on_publication_matched, (void (*)(void)) src->on_publication_matched)) + { + dst->inherited |= DDS_PUBLICATION_MATCHED_STATUS; + dst->on_publication_matched = src->on_publication_matched; + dst->on_publication_matched_arg = src->on_publication_matched_arg; + } + if (op (dst->inherited & DDS_REQUESTED_DEADLINE_MISSED_STATUS, (void (*)(void)) dst->on_requested_deadline_missed, (void (*)(void)) src->on_requested_deadline_missed)) + { + dst->inherited |= DDS_REQUESTED_DEADLINE_MISSED_STATUS; + dst->on_requested_deadline_missed = src->on_requested_deadline_missed; + dst->on_requested_deadline_missed_arg = src->on_requested_deadline_missed_arg; + } + if (op (dst->inherited & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS, (void (*)(void)) dst->on_requested_incompatible_qos, (void (*)(void)) src->on_requested_incompatible_qos)) + { + dst->inherited |= DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS; + dst->on_requested_incompatible_qos = src->on_requested_incompatible_qos; + dst->on_requested_incompatible_qos_arg = src->on_requested_incompatible_qos_arg; + } + if (op (dst->inherited & DDS_SAMPLE_LOST_STATUS, (void (*)(void)) dst->on_sample_lost, (void (*)(void)) src->on_sample_lost)) + { + dst->inherited |= DDS_SAMPLE_LOST_STATUS; + dst->on_sample_lost = src->on_sample_lost; + dst->on_sample_lost_arg = src->on_sample_lost_arg; + } + if (op (dst->inherited & DDS_SAMPLE_REJECTED_STATUS, (void (*)(void)) dst->on_sample_rejected, (void (*)(void)) src->on_sample_rejected)) + { + dst->inherited |= DDS_SAMPLE_REJECTED_STATUS; + dst->on_sample_rejected = src->on_sample_rejected; + dst->on_sample_rejected_arg = src->on_sample_rejected_arg; + } + if (op (dst->inherited & DDS_SUBSCRIPTION_MATCHED_STATUS, (void (*)(void)) dst->on_subscription_matched, (void (*)(void)) src->on_subscription_matched)) + { + dst->inherited |= DDS_SUBSCRIPTION_MATCHED_STATUS; + dst->on_subscription_matched = src->on_subscription_matched; + dst->on_subscription_matched_arg = src->on_subscription_matched_arg; + } +} + +void dds_override_inherited_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src) +{ + if (dst && src) + dds_combine_listener (dds_combine_listener_override_inherited, dst, src); +} + +void dds_inherit_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src) +{ + if (dst && src) + dds_combine_listener (dds_combine_listener_merge, dst, src); +} + +void dds_merge_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src) +{ + if (dst && src) + { + uint32_t inherited = dst->inherited; + dds_combine_listener (dds_combine_listener_merge, dst, src); + dst->inherited = inherited; + } +} + +void dds_listener_merge (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src) +{ + dds_merge_listener(dst, src); } /************************************************************************************************ diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 1526142..dc01549 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -162,181 +162,189 @@ dds_reader_status_validate( DDS_RETCODE_OK; } -void -dds_reader_status_cb( - void *ventity, - const status_cb_data_t *data) +void dds_reader_data_available_cb (struct dds_reader *rd) { - struct dds_entity * const entity = ventity; - dds_reader *rd; - dds__retcode_t rc; - void *metrics = NULL; + /* DATA_AVAILABLE is special in two ways: firstly, it should first try + DATA_ON_READERS on the line of ancestors, and if not consumed set the + status on the subscriber; secondly it is the only one for which + overhead really matters. Otherwise, it is pretty much like + dds_reader_status_cb. */ + struct dds_listener const * const lst = &rd->m_entity.m_listener; + dds_entity * const sub = rd->m_entity.m_parent; - /* When data is NULL, it means that the DDSI reader is deleted. */ - if (data == NULL) { - /* Release the initial claim that was done during the create. This - * will indicate that further API deletion is now possible. */ - ut_handle_release(entity->m_hdl, ((dds_entity*)entity)->m_hdllink); - return; + os_mutexLock (&rd->m_entity.m_observers_lock); + while (rd->m_entity.m_cb_count > 0) + os_condWait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock); + rd->m_entity.m_cb_count++; + + if (lst->on_data_on_readers) + { + os_mutexUnlock (&rd->m_entity.m_observers_lock); + + os_mutexLock (&sub->m_observers_lock); + while (sub->m_cb_count > 0) + os_condWait (&sub->m_observers_cond, &sub->m_observers_lock); + sub->m_cb_count++; + os_mutexUnlock (&sub->m_observers_lock); + + lst->on_data_on_readers (sub->m_hdl, lst->on_data_on_readers_arg); + + os_mutexLock (&rd->m_entity.m_observers_lock); + os_mutexLock (&sub->m_observers_lock); + sub->m_cb_count--; + os_condBroadcast (&sub->m_observers_cond); + os_mutexUnlock (&sub->m_observers_lock); + } + else if (rd->m_entity.m_listener.on_data_available) + { + os_mutexUnlock (&rd->m_entity.m_observers_lock); + lst->on_data_available (rd->m_entity.m_hdl, lst->on_data_available_arg); + os_mutexLock (&rd->m_entity.m_observers_lock); + } + else + { + dds_entity_status_set (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); + + os_mutexLock (&sub->m_observers_lock); + dds_entity_status_set (sub, DDS_DATA_ON_READERS_STATUS); + os_mutexUnlock (&sub->m_observers_lock); + } + + rd->m_entity.m_cb_count--; + os_condBroadcast (&rd->m_entity.m_observers_cond); + os_mutexUnlock (&rd->m_entity.m_observers_lock); +} + +void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) +{ + struct dds_entity * const entity = ventity; + + /* When data is NULL, it means that the DDSI reader is deleted. */ + if (data == NULL) + { + /* Release the initial claim that was done during the create. This + * will indicate that further API deletion is now possible. */ + ut_handle_release (entity->m_hdl, entity->m_hdllink); + return; + } + + struct dds_listener const * const lst = &entity->m_listener; + bool invoke = false; + void *vst = NULL; + int32_t *reset[2] = { NULL, NULL }; + + /* DATA_AVAILABLE is handled by dds_reader_data_available_cb */ + assert (data->status != DDS_DATA_AVAILABLE_STATUS); + + /* Serialize listener invocations -- it is somewhat sad to do this, + but then it may also be unreasonable to expect the application to + handle concurrent invocations of a single listener. The benefit + here is that it means the counters and "change" counters + can safely be incremented and/or reset while releasing + m_observers_lock for the duration of the listener call itself, + and that similarly the listener function and argument pointers + are stable */ + os_mutexLock (&entity->m_observers_lock); + while (entity->m_cb_count > 0) + os_condWait (&entity->m_observers_cond, &entity->m_observers_lock); + entity->m_cb_count++; + + /* Update status metrics. */ + dds_reader * const rd = (dds_reader *) entity; + switch (data->status) { + case DDS_REQUESTED_DEADLINE_MISSED_STATUS: { + struct dds_requested_deadline_missed_status * const st = vst = &rd->m_requested_deadline_missed_status; + st->last_instance_handle = data->handle; + st->total_count++; + st->total_count_change++; + invoke = (lst->on_requested_deadline_missed != 0); + reset[0] = &st->total_count_change; + break; } - - if (dds_reader_lock(entity->m_hdl, &rd) != DDS_RETCODE_OK) { - return; + case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: { + struct dds_requested_incompatible_qos_status * const st = vst = &rd->m_requested_incompatible_qos_status; + st->total_count++; + st->total_count_change++; + st->last_policy_id = data->extra; + invoke = (lst->on_requested_incompatible_qos != 0); + reset[0] = &st->total_count_change; + break; } - assert(&rd->m_entity == entity); - - /* Reset the status for possible Listener call. - * When a listener is not called, the status will be set (again). */ - dds_entity_status_reset(entity, data->status); - - /* Update status metrics. */ - switch (data->status) { - case DDS_REQUESTED_DEADLINE_MISSED_STATUS: { - rd->m_requested_deadline_missed_status.total_count++; - rd->m_requested_deadline_missed_status.total_count_change++; - rd->m_requested_deadline_missed_status.last_instance_handle = data->handle; - metrics = &rd->m_requested_deadline_missed_status; - break; - } - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: { - rd->m_requested_incompatible_qos_status.total_count++; - rd->m_requested_incompatible_qos_status.total_count_change++; - rd->m_requested_incompatible_qos_status.last_policy_id = data->extra; - metrics = &rd->m_requested_incompatible_qos_status; - break; - } - case DDS_SAMPLE_LOST_STATUS: { - rd->m_sample_lost_status.total_count++; - rd->m_sample_lost_status.total_count_change++; - metrics = &rd->m_sample_lost_status; - break; - } - case DDS_SAMPLE_REJECTED_STATUS: { - rd->m_sample_rejected_status.total_count++; - rd->m_sample_rejected_status.total_count_change++; - rd->m_sample_rejected_status.last_reason = data->extra; - rd->m_sample_rejected_status.last_instance_handle = data->handle; - metrics = &rd->m_sample_rejected_status; - break; - } - case DDS_DATA_AVAILABLE_STATUS: { - metrics = NULL; - break; - } - case DDS_LIVELINESS_CHANGED_STATUS: { - if (data->add) { - rd->m_liveliness_changed_status.alive_count++; - rd->m_liveliness_changed_status.alive_count_change++; - if (rd->m_liveliness_changed_status.not_alive_count > 0) { - rd->m_liveliness_changed_status.not_alive_count--; - } - } else { - rd->m_liveliness_changed_status.alive_count--; - rd->m_liveliness_changed_status.not_alive_count++; - rd->m_liveliness_changed_status.not_alive_count_change++; - } - rd->m_liveliness_changed_status.last_publication_handle = data->handle; - metrics = &rd->m_liveliness_changed_status; - break; - } - case DDS_SUBSCRIPTION_MATCHED_STATUS: { - if (data->add) { - rd->m_subscription_matched_status.total_count++; - rd->m_subscription_matched_status.total_count_change++; - rd->m_subscription_matched_status.current_count++; - rd->m_subscription_matched_status.current_count_change++; - } else { - rd->m_subscription_matched_status.current_count--; - rd->m_subscription_matched_status.current_count_change--; - } - rd->m_subscription_matched_status.last_publication_handle = data->handle; - metrics = &rd->m_subscription_matched_status; - break; - } - default: assert (0); + case DDS_SAMPLE_LOST_STATUS: { + struct dds_sample_lost_status * const st = vst = &rd->m_sample_lost_status; + st->total_count++; + st->total_count_change++; + invoke = (lst->on_sample_lost != 0); + reset[0] = &st->total_count_change; + break; } - - /* The reader needs to be unlocked when propagating the (possible) listener - * call because the application should be able to call this reader within - * the callback function. */ - dds_reader_unlock(rd); - - /* DATA_AVAILABLE is handled differently to normal status changes. */ - if (data->status == DDS_DATA_AVAILABLE_STATUS) { - dds_entity *parent = rd->m_entity.m_parent; - /* First, try to ship it off to its parent(s) DDS_DATA_ON_READERS_STATUS. */ - rc = dds_entity_listener_propagation(parent, parent, DDS_DATA_ON_READERS_STATUS, NULL, true); - - if (rc == DDS_RETCODE_NO_DATA) { - /* No parent was interested (NO_DATA == NO_CALL). - * What about myself with DDS_DATA_AVAILABLE_STATUS? */ - rc = dds_entity_listener_propagation(entity, entity, DDS_DATA_AVAILABLE_STATUS, NULL, false); - } - - if ( rc == DDS_RETCODE_NO_DATA ) { - /* Nobody was interested (NO_DATA == NO_CALL). Set the status on the subscriber. */ - dds_entity_status_set(parent, DDS_DATA_ON_READERS_STATUS); - /* Notify possible interested observers of the subscriber. */ - dds_entity_status_signal(parent); - } - } else { - /* Is anybody interested within the entity hierarchy through listeners? */ - rc = dds_entity_listener_propagation(entity, entity, data->status, metrics, true); + case DDS_SAMPLE_REJECTED_STATUS: { + struct dds_sample_rejected_status * const st = vst = &rd->m_sample_rejected_status; + st->total_count++; + st->total_count_change++; + st->last_reason = data->extra; + st->last_instance_handle = data->handle; + invoke = (lst->on_sample_rejected != 0); + reset[0] = &st->total_count_change; + break; } - - if (rc == DDS_RETCODE_OK) { - /* Event was eaten by a listener. */ - if (dds_reader_lock(entity->m_hdl, &rd) == DDS_RETCODE_OK) { - assert(&rd->m_entity == entity); - - /* Reset the change counts of the metrics. */ - switch (data->status) { - case DDS_REQUESTED_DEADLINE_MISSED_STATUS: { - rd->m_requested_deadline_missed_status.total_count_change = 0; - break; - } - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: { - rd->m_requested_incompatible_qos_status.total_count_change = 0; - break; - } - case DDS_SAMPLE_LOST_STATUS: { - rd->m_sample_lost_status.total_count_change = 0; - break; - } - case DDS_SAMPLE_REJECTED_STATUS: { - rd->m_sample_rejected_status.total_count_change = 0; - break; - } - case DDS_DATA_AVAILABLE_STATUS: { - /* Nothing to reset. */; - break; - } - case DDS_LIVELINESS_CHANGED_STATUS: { - rd->m_liveliness_changed_status.alive_count_change = 0; - rd->m_liveliness_changed_status.not_alive_count_change = 0; - break; - } - case DDS_SUBSCRIPTION_MATCHED_STATUS: { - rd->m_subscription_matched_status.total_count_change = 0; - rd->m_subscription_matched_status.current_count_change = 0; - break; - } - default: assert (0); - } - dds_reader_unlock(rd); - } else { - /* There's a deletion or closing going on. */ + case DDS_LIVELINESS_CHANGED_STATUS: { + struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status; + if (data->add) { + st->alive_count++; + st->alive_count_change++; + if (st->not_alive_count > 0) { + st->not_alive_count--; } - } else if (rc == DDS_RETCODE_NO_DATA) { - /* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status, consider successful. */ - dds_entity_status_set(entity, data->status); - /* Notify possible interested observers. */ - dds_entity_status_signal(entity); - } else if (rc == DDS_RETCODE_ALREADY_DELETED) { - /* An entity up the hierarchy is being deleted, consider successful. */ - } else { - /* Something went wrong up the hierarchy. */ + } else { + st->alive_count--; + st->not_alive_count++; + st->not_alive_count_change++; + } + st->last_publication_handle = data->handle; + invoke = (lst->on_liveliness_changed != 0); + reset[0] = &st->alive_count_change; + reset[1] = &st->not_alive_count_change; + break; } + case DDS_SUBSCRIPTION_MATCHED_STATUS: { + struct dds_subscription_matched_status * const st = vst = &rd->m_subscription_matched_status; + if (data->add) { + st->total_count++; + st->total_count_change++; + st->current_count++; + st->current_count_change++; + } else { + st->current_count--; + st->current_count_change--; + } + st->last_publication_handle = data->handle; + invoke = (lst->on_subscription_matched != 0); + reset[0] = &st->total_count_change; + reset[1] = &st->current_count_change; + break; + } + default: + assert (0); + } + + if (invoke) + { + os_mutexUnlock (&entity->m_observers_lock); + dds_entity_invoke_listener(entity, data->status, vst); + os_mutexLock (&entity->m_observers_lock); + *reset[0] = 0; + if (reset[1]) + *reset[1] = 0; + } + else + { + dds_entity_status_set (entity, data->status); + } + + entity->m_cb_count--; + os_condBroadcast (&entity->m_observers_cond); + os_mutexUnlock (&entity->m_observers_lock); } _Pre_satisfies_(((participant_or_subscriber & DDS_ENTITY_KIND_MASK) == DDS_KIND_SUBSCRIBER ) ||\ diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index f06754a..f931114 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -146,8 +146,6 @@ "signal_conditions" after releasing the RHC lock. */ -static const status_cb_data_t dds_rhc_data_avail_cb_data = { DDS_DATA_AVAILABLE_STATUS, 0, 0, true }; - /* FIXME: tkmap should perhaps retain data with timestamp set to invalid An invalid timestamp is (logically) unordered with respect to valid timestamps, and that would mean BY_SOURCE order could be respected @@ -1389,7 +1387,7 @@ bool dds_rhc_store if (rhc->reader && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) { os_atomic_inc32 (&rhc->n_cbs); - dds_reader_status_cb (&rhc->reader->m_entity, &dds_rhc_data_avail_cb_data); + dds_reader_data_available_cb (rhc->reader); os_atomic_dec32 (&rhc->n_cbs); } } diff --git a/src/core/ddsc/src/dds_subscriber.c b/src/core/ddsc/src/dds_subscriber.c index 7e6cab4..a55dd93 100644 --- a/src/core/ddsc/src/dds_subscriber.c +++ b/src/core/ddsc/src/dds_subscriber.c @@ -97,26 +97,6 @@ dds_subscriber_status_validate( return ret; } -/* - Set boolean on readers that indicates state of DATA_ON_READERS - status on parent subscriber -*/ -static dds_return_t dds_subscriber_status_propagate (dds_entity *sub, uint32_t mask, bool set) -{ - if (mask & DDS_DATA_ON_READERS_STATUS) - { - dds_entity *iter = sub->m_children; - while (iter) { - os_mutexLock (&iter->m_mutex); - assert (dds_entity_kind (iter) == DDS_KIND_READER); - ((dds_reader*) iter)->m_data_on_readers = set; - os_mutexUnlock (&iter->m_mutex); - iter = iter->m_next; - } - } - return DDS_RETCODE_OK; -} - _Requires_exclusive_lock_held_(participant) _Check_return_ dds_entity_t dds__create_subscriber_l( @@ -147,7 +127,6 @@ dds__create_subscriber_l( subscriber = dds_entity_init(&sub->m_entity, participant, DDS_KIND_SUBSCRIBER, new_qos, listener, DDS_SUBSCRIBER_STATUS_MASK); sub->m_entity.m_deriver.set_qos = dds_subscriber_qos_set; sub->m_entity.m_deriver.validate_status = dds_subscriber_status_validate; - sub->m_entity.m_deriver.propagate_status = dds_subscriber_status_propagate; sub->m_entity.m_deriver.get_instance_hdl = dds_subscriber_instance_hdl; return subscriber; diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 9364199..4027bdc 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -92,55 +92,29 @@ dds_topic_status_validate( status (only defined status on a topic). */ -static void -dds_topic_status_cb( - struct dds_topic *cb_t) +static void dds_topic_status_cb (struct dds_topic *tp) { - dds_topic *topic; - dds__retcode_t rc; + struct dds_listener const * const lst = &tp->m_entity.m_listener; - if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) != DDS_RETCODE_OK) { - return; - } - assert(topic == cb_t); + os_mutexLock (&tp->m_entity.m_observers_lock); + while (tp->m_entity.m_cb_count > 0) + os_condWait (&tp->m_entity.m_observers_cond, &tp->m_entity.m_observers_lock); + tp->m_entity.m_cb_count++; - /* Reset the status for possible Listener call. - * When a listener is not called, the status will be set (again). */ + tp->m_inconsistent_topic_status.total_count++; + tp->m_inconsistent_topic_status.total_count_change++; + if (lst->on_inconsistent_topic) + { + os_mutexUnlock (&tp->m_entity.m_observers_lock); + dds_entity_invoke_listener(&tp->m_entity, DDS_INCONSISTENT_TOPIC_STATUS, &tp->m_inconsistent_topic_status); + os_mutexLock (&tp->m_entity.m_observers_lock); + tp->m_inconsistent_topic_status.total_count_change = 0; + } - /* Update status metrics. */ - topic->m_inconsistent_topic_status.total_count++; - topic->m_inconsistent_topic_status.total_count_change++; - - - /* The topic needs to be unlocked when propagating the (possible) listener - * call because the application should be able to call this topic within - * the callback function. */ - dds_topic_unlock(topic); - - /* Is anybody interested within the entity hierarchy through listeners? */ - rc = dds_entity_listener_propagation(&topic->m_entity, - &topic->m_entity, - DDS_INCONSISTENT_TOPIC_STATUS, - &topic->m_inconsistent_topic_status, - true); - - if (rc == DDS_RETCODE_OK) { - /* Event was eaten by a listener. */ - if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) == DDS_RETCODE_OK) { - /* Reset the change counts of the metrics. */ - topic->m_inconsistent_topic_status.total_count_change = 0; - dds_topic_unlock(topic); - } - } else if (rc == DDS_RETCODE_NO_DATA) { - /* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */ - dds_entity_status_set(&topic->m_entity, DDS_INCONSISTENT_TOPIC_STATUS); - /* Notify possible interested observers. */ - dds_entity_status_signal(&topic->m_entity); - } else if (rc == DDS_RETCODE_ALREADY_DELETED) { - /* An entity up the hierarchy is being deleted; consider it successful. */ - } else { - /* Something went wrong up the hierarchy. */ - } + dds_entity_status_set(&tp->m_entity, DDS_INCONSISTENT_TOPIC_STATUS); + tp->m_entity.m_cb_count--; + os_condBroadcast (&tp->m_entity.m_observers_cond); + os_mutexUnlock (&tp->m_entity.m_observers_lock); } struct ddsi_sertopic * diff --git a/src/core/ddsc/src/dds_waitset.c b/src/core/ddsc/src/dds_waitset.c index a6c2c08..1b0782a 100644 --- a/src/core/ddsc/src/dds_waitset.c +++ b/src/core/ddsc/src/dds_waitset.c @@ -38,18 +38,6 @@ dds_waitset_swap( *dst = idx; } -static void dds_waitset_signal_entity (dds_waitset *ws) -{ - dds_entity *e = &ws->m_entity; - /* When signaling any observers of us through the entity, - * we need to be unlocked. We still have claimed the related - * handle, so possible deletions will be delayed until we - * release it. */ - os_mutexUnlock (&e->m_mutex); - dds_entity_status_signal (e); - os_mutexLock (&e->m_mutex); -} - static dds_return_t dds_waitset_wait_impl( _In_ dds_entity_t waitset, @@ -472,33 +460,25 @@ dds_waitset_wait( return ret; } -_Pre_satisfies_((waitset & DDS_ENTITY_KIND_MASK) == DDS_KIND_WAITSET) -dds_return_t -dds_waitset_set_trigger( - _In_ dds_entity_t waitset, - _In_ bool trigger) +dds_return_t dds_waitset_set_trigger (dds_entity_t waitset, bool trigger) { - dds_waitset *ws; - dds__retcode_t rc; - dds_return_t ret = DDS_RETCODE_OK; + dds_waitset *ws; + dds__retcode_t rc; - /* Locking the waitset here will delay a possible deletion until it is - * unlocked. Even when the related mutex is unlocked when we want to send - * a signal. */ - rc = dds_waitset_lock(waitset, &ws); - if (rc != DDS_RETCODE_OK) { - DDS_ERROR("Error occurred on locking waitset\n"); - ret = DDS_ERRNO(rc); - goto fail; - } - if (trigger) { - dds_entity_status_set(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); - } else { - dds_entity_status_reset(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); - } - dds_waitset_signal_entity(ws); - dds_waitset_unlock(ws); -fail: - return ret; + if ((rc = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + + os_mutexUnlock (&ws->m_entity.m_mutex); + + os_mutexLock (&ws->m_entity.m_observers_lock); + if (trigger) + dds_entity_status_set (&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); + else + dds_entity_status_reset (&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); + os_mutexUnlock (&ws->m_entity.m_observers_lock); + + os_mutexLock (&ws->m_entity.m_mutex); + dds_waitset_unlock (ws); + return DDS_RETCODE_OK; } diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index b9d7cd3..8ffab83 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -65,15 +65,9 @@ dds_writer_status_validate( then status conditions is not triggered. */ -static void -dds_writer_status_cb( - void *ventity, - const status_cb_data_t *data) +static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data) { - struct dds_entity * const entity = ventity; - dds_writer *wr; - dds__retcode_t rc; - void *metrics = NULL; + struct dds_entity * const entity = ventity; /* When data is NULL, it means that the writer is deleted. */ if (data == NULL) @@ -84,44 +78,52 @@ dds_writer_status_cb( return; } - if (dds_writer_lock (entity->m_hdl, &wr) != DDS_RETCODE_OK) { - /* There's a deletion or closing going on. */ - return; - } - assert (&wr->m_entity == entity); + struct dds_listener const * const lst = &entity->m_listener; + bool invoke = false; + void *vst = NULL; + int32_t *reset[2] = { NULL, NULL }; + + os_mutexLock (&entity->m_observers_lock); + while (entity->m_cb_count > 0) + os_condWait (&entity->m_observers_cond, &entity->m_observers_lock); + entity->m_cb_count++; /* Reset the status for possible Listener call. * When a listener is not called, the status will be set (again). */ dds_entity_status_reset (entity, data->status); /* Update status metrics. */ + dds_writer * const wr = (dds_writer *) entity; switch (data->status) { case DDS_OFFERED_DEADLINE_MISSED_STATUS: { - struct dds_offered_deadline_missed_status * const st = &wr->m_offered_deadline_missed_status; + struct dds_offered_deadline_missed_status * const st = vst = &wr->m_offered_deadline_missed_status; st->total_count++; st->total_count_change++; st->last_instance_handle = data->handle; - metrics = st; + invoke = (lst->on_offered_deadline_missed != 0); + reset[0] = &st->total_count_change; break; } case DDS_LIVELINESS_LOST_STATUS: { - struct dds_liveliness_lost_status * const st = &wr->m_liveliness_lost_status; + struct dds_liveliness_lost_status * const st = vst = &wr->m_liveliness_lost_status; st->total_count++; st->total_count_change++; - metrics = st; + invoke = (lst->on_liveliness_lost != 0); + reset[0] = &st->total_count_change; break; } case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: { - struct dds_offered_incompatible_qos_status * const st = &wr->m_offered_incompatible_qos_status; + struct dds_offered_incompatible_qos_status * const st = vst = &wr->m_offered_incompatible_qos_status; st->total_count++; st->total_count_change++; st->last_policy_id = data->extra; - metrics = st; + invoke = (lst->on_offered_incompatible_qos != 0); + reset[0] = &st->total_count_change; break; } case DDS_PUBLICATION_MATCHED_STATUS: { - struct dds_publication_matched_status * const st = &wr->m_publication_matched_status; + struct dds_publication_matched_status * const st = vst = &wr->m_publication_matched_status; if (data->add) { st->total_count++; st->total_count_change++; @@ -131,69 +133,33 @@ dds_writer_status_cb( st->current_count--; st->current_count_change--; } - st->last_subscription_handle = data->handle; - metrics = st; + wr->m_publication_matched_status.last_subscription_handle = data->handle; + invoke = (lst->on_publication_matched != 0); + reset[0] = &st->total_count_change; + reset[1] = &st->current_count_change; break; } default: assert (0); } - /* The writer needs to be unlocked when propagating the (possible) listener - * call because the application should be able to call this writer within - * the callback function. */ - dds_writer_unlock (wr); - - /* Is anybody interested within the entity hierarchy through listeners? */ - rc = dds_entity_listener_propagation (entity, entity, data->status, metrics, true); - - if (rc == DDS_RETCODE_OK) + if (invoke) { - /* Event was eaten by a listener. */ - if (dds_writer_lock (entity->m_hdl, &wr) == DDS_RETCODE_OK) - { - assert (&wr->m_entity == entity); - - /* Reset the status. */ - dds_entity_status_reset (entity, data->status); - - /* Reset the change counts of the metrics. */ - switch (data->status) - { - case DDS_OFFERED_DEADLINE_MISSED_STATUS: - wr->m_offered_deadline_missed_status.total_count_change = 0; - break; - case DDS_LIVELINESS_LOST_STATUS: - wr->m_liveliness_lost_status.total_count_change = 0; - break; - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: - wr->m_offered_incompatible_qos_status.total_count_change = 0; - break; - case DDS_PUBLICATION_MATCHED_STATUS: - wr->m_publication_matched_status.total_count_change = 0; - wr->m_publication_matched_status.current_count_change = 0; - break; - default: - assert (0); - } - dds_writer_unlock (wr); - } - } - else if (rc == DDS_RETCODE_NO_DATA) - { - /* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */ - dds_entity_status_set (entity, data->status); - /* Notify possible interested observers. */ - dds_entity_status_signal (entity); - } - else if (rc == DDS_RETCODE_ALREADY_DELETED) - { - /* An entity up the hierarchy is being deleted; consider it successful. */ + os_mutexUnlock (&entity->m_observers_lock); + dds_entity_invoke_listener(entity, data->status, vst); + os_mutexLock (&entity->m_observers_lock); + *reset[0] = 0; + if (reset[1]) + *reset[1] = 0; } else { - /* Something went wrong up the hierarchy. */ + dds_entity_status_set (entity, data->status); } + + entity->m_cb_count--; + os_condBroadcast (&entity->m_observers_cond); + os_mutexUnlock (&entity->m_observers_lock); } static uint32_t