rework listener invocation and entity status flags

Listener/status management invocation was rather expensive, and
especially the cost of checking listeners, then setting status flags and
triggering waitsets ran into severe lock contention.

A major cost was the repeated use of dds_entity_lock and
dds_entity_unlock, these have been eliminated.  Another cost was that
each time an event occurred (with DATA_AVAILABLE the most problematic
one) it would walk the chain of ancestors to see if any had a relevant
listener, and only if none of them had any, it would set the status
flags.

The locking/unlocking of the entity has been eliminated by moving the
listener/status flag manipulation from the general entity lock to its
m_observers_lock.  That lock has a much smaller scope, and consequently
contention has been significantly reduced.

Instead of walking the entity hierarchy looking for listeners, an entity
now inherits the ancestors' listeners.  The set_listener operation has
been made a little more complicated by the need to not only set the
listeners for the specified entity, but to also update any inherited
listeners its descendants.

The commit is a bit larger than strictly needed ... I've started
reformatting the code to reduce the variety of styles ... as there I
haven't been able to find a single tool that does what I want, it may
well end up as manual work.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-01-15 10:04:30 +01:00
parent 2e5ecb2e76
commit ba46cb1140
14 changed files with 1401 additions and 1795 deletions

View file

@ -44,16 +44,9 @@ typedef void (*dds_on_requested_incompatible_qos_fn) (dds_entity_t reader, const
typedef void (*dds_on_publication_matched_fn) (dds_entity_t writer, const dds_publication_matched_status_t status, void* arg); typedef void (*dds_on_publication_matched_fn) (dds_entity_t writer, const dds_publication_matched_status_t status, void* arg);
typedef void (*dds_on_subscription_matched_fn) (dds_entity_t reader, const dds_subscription_matched_status_t status, void* arg); typedef void (*dds_on_subscription_matched_fn) (dds_entity_t reader, const dds_subscription_matched_status_t status, void* arg);
#if 0 #define DDS_LUNSET 0
/* TODO: Why use (*dds_on_any_fn) (); and DDS_LUNSET? Why not just set the callbacks to NULL? */ struct dds_listener;
typedef void (*dds_on_any_fn) (); /**< Empty parameter list on purpose; should be assignable without cast to all of the above. @todo check with an actual compiler; I'm a sloppy compiler */ typedef struct dds_listener dds_listener_t;
#define DDS_LUNSET ((dds_on_any_fn)1) /**< Callback indicating a callback isn't set */
#else
#define DDS_LUNSET (NULL)
#endif
struct c_listener;
typedef struct c_listener dds_listener_t;
/** /**
* @brief Allocate memory and initializes to default values (::DDS_LUNSET) of a listener * @brief Allocate memory and initializes to default values (::DDS_LUNSET) of a listener
@ -104,7 +97,6 @@ DDS_DEPRECATED_EXPORT void dds_listener_copy (_Out_ dds_listener_t * __restrict
DDS_EXPORT void dds_merge_listener (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src); DDS_EXPORT void dds_merge_listener (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src);
DDS_DEPRECATED_EXPORT void dds_listener_merge (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src); DDS_DEPRECATED_EXPORT void dds_listener_merge (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src);
/************************************************************************************************ /************************************************************************************************
* Setters * Setters
************************************************************************************************/ ************************************************************************************************/

View file

@ -37,14 +37,6 @@ void
dds_entity_add_ref_nolock( dds_entity_add_ref_nolock(
_In_ dds_entity *e); _In_ dds_entity *e);
_Check_return_ dds__retcode_t
dds_entity_listener_propagation(
_Inout_opt_ dds_entity *e,
_In_ dds_entity *src,
_In_ uint32_t status,
_In_opt_ void *metrics,
_In_ bool propagate);
#define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \ #define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \
qualifier_ dds__retcode_t type_##_lock (dds_entity_t hdl, type_ **x) \ qualifier_ dds__retcode_t type_##_lock (dds_entity_t hdl, type_ **x) \
{ \ { \
@ -68,9 +60,7 @@ inline bool dds_entity_is_enabled (const dds_entity *e) {
return (e->m_flags & DDS_ENTITY_ENABLED) != 0; return (e->m_flags & DDS_ENTITY_ENABLED) != 0;
} }
inline void dds_entity_status_set (dds_entity *e, uint32_t t) { void dds_entity_status_set (dds_entity *e, uint32_t t);
e->m_trigger |= e->m_status_enable & t;
}
inline void dds_entity_status_reset (dds_entity *e, uint32_t t) { inline void dds_entity_status_reset (dds_entity *e, uint32_t t) {
e->m_trigger &= ~t; e->m_trigger &= ~t;
@ -88,11 +78,9 @@ inline dds_entity_kind_t dds_entity_kind_from_handle (dds_entity_t hdl) {
return (hdl > 0) ? (dds_entity_kind_t) (hdl & DDS_ENTITY_KIND_MASK) : DDS_KIND_DONTCARE; return (hdl > 0) ? (dds_entity_kind_t) (hdl & DDS_ENTITY_KIND_MASK) : DDS_KIND_DONTCARE;
} }
/* The mutex needs to be unlocked when calling this because the entity can be called void dds_entity_status_signal (dds_entity *e);
* within the signal callback from other contexts. That shouldn't deadlock. */
void void dds_entity_invoke_listener (const dds_entity *entity, uint32_t status, const void *vst);
dds_entity_status_signal(
_In_ dds_entity *e);
_Check_return_ dds__retcode_t _Check_return_ dds__retcode_t
dds_valid_hdl( dds_valid_hdl(

View file

@ -19,11 +19,8 @@
extern "C" { extern "C" {
#endif #endif
/* void dds_override_inherited_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src);
* Listener API (internal & external) are present in void dds_inherit_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src);
* dds__types.h
* ddsc/dds_public_listener.h
*/
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -23,6 +23,8 @@ struct status_cb_data;
void dds_reader_status_cb (void *entity, const struct status_cb_data * data); void dds_reader_status_cb (void *entity, const struct status_cb_data * data);
void dds_reader_data_available_cb (struct dds_reader *entity);
/* /*
dds_reader_lock_samples: Returns number of samples in read cache and locks the dds_reader_lock_samples: Returns number of samples in read cache and locks the
reader cache to make sure that the samples content doesn't change. reader cache to make sure that the samples content doesn't change.

View file

@ -59,22 +59,35 @@ typedef bool (*dds_querycondition_filter_with_ctx_fn) (const void * sample, cons
/* The listener struct. */ /* The listener struct. */
typedef struct c_listener { struct dds_listener {
dds_on_inconsistent_topic_fn on_inconsistent_topic; uint32_t inherited;
dds_on_liveliness_lost_fn on_liveliness_lost; dds_on_inconsistent_topic_fn on_inconsistent_topic;
dds_on_offered_deadline_missed_fn on_offered_deadline_missed; void *on_inconsistent_topic_arg;
dds_on_offered_incompatible_qos_fn on_offered_incompatible_qos; dds_on_liveliness_lost_fn on_liveliness_lost;
dds_on_data_on_readers_fn on_data_on_readers; void *on_liveliness_lost_arg;
dds_on_sample_lost_fn on_sample_lost; dds_on_offered_deadline_missed_fn on_offered_deadline_missed;
dds_on_data_available_fn on_data_available; void *on_offered_deadline_missed_arg;
dds_on_sample_rejected_fn on_sample_rejected; dds_on_offered_incompatible_qos_fn on_offered_incompatible_qos;
dds_on_liveliness_changed_fn on_liveliness_changed; void *on_offered_incompatible_qos_arg;
dds_on_requested_deadline_missed_fn on_requested_deadline_missed; dds_on_data_on_readers_fn on_data_on_readers;
dds_on_requested_incompatible_qos_fn on_requested_incompatible_qos; void *on_data_on_readers_arg;
dds_on_publication_matched_fn on_publication_matched; dds_on_sample_lost_fn on_sample_lost;
dds_on_subscription_matched_fn on_subscription_matched; void *on_sample_lost_arg;
void *arg; dds_on_data_available_fn on_data_available;
} c_listener_t; void *on_data_available_arg;
dds_on_sample_rejected_fn on_sample_rejected;
void *on_sample_rejected_arg;
dds_on_liveliness_changed_fn on_liveliness_changed;
void *on_liveliness_changed_arg;
dds_on_requested_deadline_missed_fn on_requested_deadline_missed;
void *on_requested_deadline_missed_arg;
dds_on_requested_incompatible_qos_fn on_requested_incompatible_qos;
void *on_requested_incompatible_qos_arg;
dds_on_publication_matched_fn on_publication_matched;
void *on_publication_matched_arg;
dds_on_subscription_matched_fn on_subscription_matched;
void *on_subscription_matched_arg;
};
/* Entity flag values */ /* Entity flag values */
@ -98,7 +111,6 @@ typedef struct dds_entity_deriver {
dds_return_t (*delete)(struct dds_entity *e); dds_return_t (*delete)(struct dds_entity *e);
dds_return_t (*set_qos)(struct dds_entity *e, const dds_qos_t *qos, bool enabled); dds_return_t (*set_qos)(struct dds_entity *e, const dds_qos_t *qos, bool enabled);
dds_return_t (*validate_status)(uint32_t mask); dds_return_t (*validate_status)(uint32_t mask);
dds_return_t (*propagate_status)(struct dds_entity *e, uint32_t mask, bool set);
dds_return_t (*get_instance_hdl)(struct dds_entity *e, dds_instance_handle_t *i); dds_return_t (*get_instance_hdl)(struct dds_entity *e, dds_instance_handle_t *i);
} }
dds_entity_deriver; dds_entity_deriver;
@ -126,15 +138,18 @@ typedef struct dds_entity
dds_qos_t * m_qos; dds_qos_t * m_qos;
dds_domainid_t m_domainid; dds_domainid_t m_domainid;
nn_guid_t m_guid; nn_guid_t m_guid;
uint32_t m_status_enable;
uint32_t m_flags; uint32_t m_flags;
uint32_t m_cb_count;
os_mutex m_mutex; os_mutex m_mutex;
os_cond m_cond; os_cond m_cond;
c_listener_t m_listener;
uint32_t m_trigger;
os_mutex m_observers_lock; os_mutex m_observers_lock;
os_cond m_observers_cond;
dds_listener_t m_listener;
uint32_t m_trigger;
uint32_t m_status_enable;
uint32_t m_cb_count;
dds_entity_observer *m_observers; dds_entity_observer *m_observers;
struct ut_handlelink *m_hdllink; struct ut_handlelink *m_hdllink;
} }
dds_entity; dds_entity;

File diff suppressed because it is too large Load diff

View file

@ -21,121 +21,81 @@
DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_guardcond) DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_guardcond)
_Must_inspect_result_ dds_guardcond* dds_entity_t dds_create_guardcondition (dds_entity_t participant)
dds_create_guardcond(
_In_ dds_participant *pp)
{ {
dds_guardcond * gcond = dds_alloc(sizeof(*gcond)); dds_participant *pp;
gcond->m_entity.m_hdl = dds_entity_init(&gcond->m_entity, &pp->m_entity, DDS_KIND_COND_GUARD, NULL, NULL, 0); dds__retcode_t rc;
return gcond;
}
_Pre_satisfies_((reader & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)
_Must_inspect_result_ dds_entity_t
dds_create_guardcondition(
_In_ dds_entity_t participant)
{
dds_entity_t hdl;
dds_participant * pp;
dds__retcode_t rc;
rc = dds_participant_lock(participant, &pp);
if (rc == DDS_RETCODE_OK) {
dds_guardcond *cond = dds_create_guardcond(pp);
assert(cond);
hdl = cond->m_entity.m_hdl;
dds_participant_unlock(pp);
} else {
DDS_ERROR("Error occurred on locking reader\n");
hdl = DDS_ERRNO(rc);
}
if ((rc = dds_participant_lock (participant, &pp)) != DDS_RETCODE_OK)
return DDS_ERRNO (rc);
else
{
dds_guardcond * gcond = dds_alloc (sizeof (*gcond));
dds_entity_t hdl = dds_entity_init (&gcond->m_entity, &pp->m_entity, DDS_KIND_COND_GUARD, NULL, NULL, 0);
dds_participant_unlock (pp);
return hdl; return hdl;
}
} }
_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_GUARD) ) dds_return_t dds_set_guardcondition (dds_entity_t condition, bool triggered)
dds_return_t
dds_set_guardcondition(
_In_ dds_entity_t condition,
_In_ bool triggered)
{ {
dds_return_t ret; dds_guardcond *gcond;
dds_guardcond *gcond; dds__retcode_t rc;
dds__retcode_t rc;
rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond); if ((rc = dds_guardcond_lock (condition, &gcond)) != DDS_RETCODE_OK)
if (rc == DDS_RETCODE_OK) { return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_GUARD));
if (triggered) { else
dds_entity_status_set(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); {
dds_entity_status_signal(&gcond->m_entity); os_mutexLock (&gcond->m_entity.m_observers_lock);
} else { if (triggered)
dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); dds_entity_status_set (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
} else
dds_entity_unlock(&gcond->m_entity); dds_entity_status_reset (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
ret = DDS_RETCODE_OK; os_mutexUnlock (&gcond->m_entity.m_observers_lock);
} else { dds_guardcond_unlock (gcond);
DDS_ERROR("Argument condition is not valid\n"); return DDS_RETCODE_OK;
ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_GUARD)); }
}
return ret;
} }
_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_GUARD) ) dds_return_t dds_read_guardcondition (dds_entity_t condition, bool *triggered)
dds_return_t
dds_read_guardcondition(
_In_ dds_entity_t condition,
_Out_ bool *triggered)
{ {
dds_return_t ret; dds_guardcond *gcond;
dds_guardcond *gcond; dds__retcode_t rc;
dds__retcode_t rc;
if (triggered != NULL) { if (triggered == NULL)
*triggered = false; return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
rc = dds_guardcond_lock(condition, &gcond);
if (rc == DDS_RETCODE_OK) {
*triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_guardcond_unlock(gcond);
ret = DDS_RETCODE_OK;
} else {
DDS_ERROR("Argument condition is not valid\n");
ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_GUARD));
}
} else {
DDS_ERROR("Argument triggered is NULL\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
return ret; *triggered = false;
if ((rc = dds_guardcond_lock (condition, &gcond)) != DDS_RETCODE_OK)
return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_GUARD));
else
{
os_mutexLock (&gcond->m_entity.m_observers_lock);
*triggered = dds_entity_status_match (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
os_mutexUnlock (&gcond->m_entity.m_observers_lock);
dds_guardcond_unlock (gcond);
return DDS_RETCODE_OK;
}
} }
_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_GUARD) ) dds_return_t dds_take_guardcondition (dds_entity_t condition, bool *triggered)
dds_return_t
dds_take_guardcondition(
_In_ dds_entity_t condition,
_Out_ bool *triggered)
{ {
dds_return_t ret; dds_guardcond *gcond;
dds_guardcond *gcond; dds__retcode_t rc;
dds__retcode_t rc;
if (triggered != NULL) { if (triggered == NULL)
*triggered = false; return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
rc = dds_guardcond_lock(condition, &gcond);
if (rc == DDS_RETCODE_OK) {
*triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_guardcond_unlock (gcond);
ret = DDS_RETCODE_OK;
} else {
DDS_ERROR("Argument condition is not valid\n");
ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_GUARD));
}
} else {
DDS_ERROR("Argument triggered is NULL\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
return ret; *triggered = false;
if ((rc = dds_guardcond_lock (condition, &gcond)) != DDS_RETCODE_OK)
return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_GUARD));
else
{
os_mutexLock (&gcond->m_entity.m_observers_lock);
*triggered = dds_entity_status_match (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_reset (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
os_mutexUnlock (&gcond->m_entity.m_observers_lock);
dds_guardcond_unlock (gcond);
return DDS_RETCODE_OK;
}
} }

View file

@ -13,163 +13,200 @@
#include "ddsc/dds.h" #include "ddsc/dds.h"
#include "dds__listener.h" #include "dds__listener.h"
dds_listener_t *dds_create_listener (void* arg)
_Ret_notnull_
dds_listener_t*
dds_create_listener(_In_opt_ void* arg)
{ {
c_listener_t *l = dds_alloc(sizeof(*l)); dds_listener_t *l = dds_alloc (sizeof (*l));
dds_reset_listener(l); dds_reset_listener (l);
l->arg = arg; l->on_inconsistent_topic_arg = arg;
return l; l->on_liveliness_lost_arg = arg;
l->on_offered_deadline_missed_arg = arg;
l->on_offered_incompatible_qos_arg = arg;
l->on_data_on_readers_arg = arg;
l->on_sample_lost_arg = arg;
l->on_data_available_arg = arg;
l->on_sample_rejected_arg = arg;
l->on_liveliness_changed_arg = arg;
l->on_requested_deadline_missed_arg = arg;
l->on_requested_incompatible_qos_arg = arg;
l->on_publication_matched_arg = arg;
l->on_subscription_matched_arg = arg;
return l;
} }
_Ret_notnull_ dds_listener_t *dds_listener_create (void* arg)
dds_listener_t*
dds_listener_create(_In_opt_ void* arg)
{ {
return dds_create_listener(arg); return dds_create_listener (arg);
} }
void void dds_delete_listener (dds_listener_t * __restrict listener)
dds_delete_listener(_In_ _Post_invalid_ dds_listener_t * __restrict listener)
{ {
if (listener) { dds_free (listener);
dds_free(listener);
}
} }
void void dds_listener_delete (dds_listener_t * __restrict listener)
dds_listener_delete(_In_ _Post_invalid_ dds_listener_t * __restrict listener)
{ {
dds_delete_listener(listener); dds_delete_listener (listener);
} }
void void dds_reset_listener (dds_listener_t * __restrict listener)
dds_reset_listener(_Out_ dds_listener_t * __restrict listener)
{ {
if (listener) { if (listener)
c_listener_t *l = listener; {
l->on_data_available = DDS_LUNSET; dds_listener_t * const l = listener;
l->on_data_on_readers = DDS_LUNSET; l->inherited = 0;
l->on_inconsistent_topic = DDS_LUNSET; l->on_data_available = 0;
l->on_liveliness_changed = DDS_LUNSET; l->on_data_on_readers = 0;
l->on_liveliness_lost = DDS_LUNSET; l->on_inconsistent_topic = 0;
l->on_offered_deadline_missed = DDS_LUNSET; l->on_liveliness_changed = 0;
l->on_offered_incompatible_qos = DDS_LUNSET; l->on_liveliness_lost = 0;
l->on_publication_matched = DDS_LUNSET; l->on_offered_deadline_missed = 0;
l->on_requested_deadline_missed = DDS_LUNSET; l->on_offered_incompatible_qos = 0;
l->on_requested_incompatible_qos = DDS_LUNSET; l->on_publication_matched = 0;
l->on_sample_lost = DDS_LUNSET; l->on_requested_deadline_missed = 0;
l->on_sample_rejected = DDS_LUNSET; l->on_requested_incompatible_qos = 0;
l->on_subscription_matched = DDS_LUNSET; l->on_sample_lost = 0;
} else { l->on_sample_rejected = 0;
DDS_ERROR("Argument listener is NULL\n"); l->on_subscription_matched = 0;
} }
} }
void void dds_listener_reset (dds_listener_t * __restrict listener)
dds_listener_reset(_Out_ dds_listener_t * __restrict listener)
{ {
dds_reset_listener(listener); dds_reset_listener (listener);
} }
void void dds_copy_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src)
dds_copy_listener(_Out_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src)
{ {
const c_listener_t *srcl = src; if (dst && src)
c_listener_t *dstl = dst; *dst = *src;
if(!src){
DDS_ERROR("Argument source(src) is NULL\n");
return ;
}
if(!dst){
DDS_ERROR("Argument destination(dst) is NULL\n");
return ;
}
dstl->on_data_available = srcl->on_data_available;
dstl->on_data_on_readers = srcl->on_data_on_readers;
dstl->on_inconsistent_topic = srcl->on_inconsistent_topic;
dstl->on_liveliness_changed = srcl->on_liveliness_changed;
dstl->on_liveliness_lost = srcl->on_liveliness_lost;
dstl->on_offered_deadline_missed = srcl->on_offered_deadline_missed;
dstl->on_offered_incompatible_qos = srcl->on_offered_incompatible_qos;
dstl->on_publication_matched = srcl->on_publication_matched;
dstl->on_requested_deadline_missed = srcl->on_requested_deadline_missed;
dstl->on_requested_incompatible_qos = srcl->on_requested_incompatible_qos;
dstl->on_sample_lost = srcl->on_sample_lost;
dstl->on_sample_rejected = srcl->on_sample_rejected;
dstl->on_subscription_matched = srcl->on_subscription_matched;
dstl->arg = srcl->arg;
} }
void void dds_listener_copy(dds_listener_t * __restrict dst, const dds_listener_t * __restrict src)
dds_listener_copy(_Out_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src)
{ {
dds_copy_listener(dst, src); dds_copy_listener (dst, src);
} }
void static bool dds_combine_listener_merge (uint32_t inherited, void (*dst)(void), void (*src)(void))
dds_merge_listener (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src)
{ {
const c_listener_t *srcl = src; (void)inherited;
c_listener_t *dstl = dst; (void)src;
return dst == 0;
if(!src){
DDS_ERROR("Argument source(src) is NULL\n");
return ;
}
if(!dst){
DDS_ERROR("Argument destination(dst) is NULL\n");
return ;
}
if (dstl->on_data_available == DDS_LUNSET) {
dstl->on_data_available = srcl->on_data_available;
}
if (dstl->on_data_on_readers == DDS_LUNSET) {
dstl->on_data_on_readers = srcl->on_data_on_readers;
}
if (dstl->on_inconsistent_topic == DDS_LUNSET) {
dstl->on_inconsistent_topic = srcl->on_inconsistent_topic;
}
if (dstl->on_liveliness_changed == DDS_LUNSET) {
dstl->on_liveliness_changed = srcl->on_liveliness_changed;
}
if (dstl->on_liveliness_lost == DDS_LUNSET) {
dstl->on_liveliness_lost = srcl->on_liveliness_lost;
}
if (dstl->on_offered_deadline_missed == DDS_LUNSET) {
dstl->on_offered_deadline_missed = srcl->on_offered_deadline_missed;
}
if (dstl->on_offered_incompatible_qos == DDS_LUNSET) {
dstl->on_offered_incompatible_qos = srcl->on_offered_incompatible_qos;
}
if (dstl->on_publication_matched == DDS_LUNSET) {
dstl->on_publication_matched = srcl->on_publication_matched;
}
if (dstl->on_requested_deadline_missed == DDS_LUNSET) {
dstl->on_requested_deadline_missed = srcl->on_requested_deadline_missed;
}
if (dstl->on_requested_incompatible_qos == DDS_LUNSET) {
dstl->on_requested_incompatible_qos = srcl->on_requested_incompatible_qos;
}
if (dstl->on_sample_lost == DDS_LUNSET) {
dstl->on_sample_lost = srcl->on_sample_lost;
}
if (dstl->on_sample_rejected == DDS_LUNSET) {
dstl->on_sample_rejected = srcl->on_sample_rejected;
}
if (dstl->on_subscription_matched == DDS_LUNSET) {
dstl->on_subscription_matched = srcl->on_subscription_matched;
}
} }
void static bool dds_combine_listener_override_inherited (uint32_t inherited, void (*dst)(void), void (*src)(void))
dds_listener_merge (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src)
{ {
dds_merge_listener(dst, src); (void)dst;
(void)src;
return inherited;
}
static void dds_combine_listener (bool (*op) (uint32_t inherited, void (*)(void), void (*)(void)), dds_listener_t * __restrict dst, const dds_listener_t * __restrict src)
{
if (op (dst->inherited & DDS_DATA_AVAILABLE_STATUS, (void (*)(void)) dst->on_data_available, (void (*)(void)) src->on_data_available))
{
dst->inherited |= DDS_DATA_AVAILABLE_STATUS;
dst->on_data_available = src->on_data_available;
dst->on_data_available_arg = src->on_data_available_arg;
}
if (op (dst->inherited & DDS_DATA_ON_READERS_STATUS, (void (*)(void)) dst->on_data_on_readers, (void (*)(void)) src->on_data_on_readers))
{
dst->inherited |= DDS_DATA_ON_READERS_STATUS;
dst->on_data_on_readers = src->on_data_on_readers;
dst->on_data_on_readers_arg = src->on_data_on_readers_arg;
}
if (op (dst->inherited & DDS_INCONSISTENT_TOPIC_STATUS, (void (*)(void)) dst->on_inconsistent_topic, (void (*)(void)) src->on_inconsistent_topic))
{
dst->inherited |= DDS_INCONSISTENT_TOPIC_STATUS;
dst->on_inconsistent_topic = src->on_inconsistent_topic;
dst->on_inconsistent_topic_arg = src->on_inconsistent_topic_arg;
}
if (op (dst->inherited & DDS_LIVELINESS_CHANGED_STATUS, (void (*)(void)) dst->on_liveliness_changed, (void (*)(void)) src->on_liveliness_changed))
{
dst->inherited |= DDS_LIVELINESS_CHANGED_STATUS;
dst->on_liveliness_changed = src->on_liveliness_changed;
dst->on_liveliness_changed_arg = src->on_liveliness_changed_arg;
}
if (op (dst->inherited & DDS_LIVELINESS_LOST_STATUS, (void (*)(void)) dst->on_liveliness_lost, (void (*)(void)) src->on_liveliness_lost))
{
dst->inherited |= DDS_LIVELINESS_LOST_STATUS;
dst->on_liveliness_lost = src->on_liveliness_lost;
dst->on_liveliness_lost_arg = src->on_liveliness_lost_arg;
}
if (op (dst->inherited & DDS_OFFERED_DEADLINE_MISSED_STATUS, (void (*)(void)) dst->on_offered_deadline_missed, (void (*)(void)) src->on_offered_deadline_missed))
{
dst->inherited |= DDS_OFFERED_DEADLINE_MISSED_STATUS;
dst->on_offered_deadline_missed = src->on_offered_deadline_missed;
dst->on_offered_deadline_missed_arg = src->on_offered_deadline_missed_arg;
}
if (op (dst->inherited & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS, (void (*)(void)) dst->on_offered_incompatible_qos, (void (*)(void)) src->on_offered_incompatible_qos))
{
dst->inherited |= DDS_OFFERED_INCOMPATIBLE_QOS_STATUS;
dst->on_offered_incompatible_qos = src->on_offered_incompatible_qos;
dst->on_offered_incompatible_qos_arg = src->on_offered_incompatible_qos_arg;
}
if (op (dst->inherited & DDS_PUBLICATION_MATCHED_STATUS, (void (*)(void)) dst->on_publication_matched, (void (*)(void)) src->on_publication_matched))
{
dst->inherited |= DDS_PUBLICATION_MATCHED_STATUS;
dst->on_publication_matched = src->on_publication_matched;
dst->on_publication_matched_arg = src->on_publication_matched_arg;
}
if (op (dst->inherited & DDS_REQUESTED_DEADLINE_MISSED_STATUS, (void (*)(void)) dst->on_requested_deadline_missed, (void (*)(void)) src->on_requested_deadline_missed))
{
dst->inherited |= DDS_REQUESTED_DEADLINE_MISSED_STATUS;
dst->on_requested_deadline_missed = src->on_requested_deadline_missed;
dst->on_requested_deadline_missed_arg = src->on_requested_deadline_missed_arg;
}
if (op (dst->inherited & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS, (void (*)(void)) dst->on_requested_incompatible_qos, (void (*)(void)) src->on_requested_incompatible_qos))
{
dst->inherited |= DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS;
dst->on_requested_incompatible_qos = src->on_requested_incompatible_qos;
dst->on_requested_incompatible_qos_arg = src->on_requested_incompatible_qos_arg;
}
if (op (dst->inherited & DDS_SAMPLE_LOST_STATUS, (void (*)(void)) dst->on_sample_lost, (void (*)(void)) src->on_sample_lost))
{
dst->inherited |= DDS_SAMPLE_LOST_STATUS;
dst->on_sample_lost = src->on_sample_lost;
dst->on_sample_lost_arg = src->on_sample_lost_arg;
}
if (op (dst->inherited & DDS_SAMPLE_REJECTED_STATUS, (void (*)(void)) dst->on_sample_rejected, (void (*)(void)) src->on_sample_rejected))
{
dst->inherited |= DDS_SAMPLE_REJECTED_STATUS;
dst->on_sample_rejected = src->on_sample_rejected;
dst->on_sample_rejected_arg = src->on_sample_rejected_arg;
}
if (op (dst->inherited & DDS_SUBSCRIPTION_MATCHED_STATUS, (void (*)(void)) dst->on_subscription_matched, (void (*)(void)) src->on_subscription_matched))
{
dst->inherited |= DDS_SUBSCRIPTION_MATCHED_STATUS;
dst->on_subscription_matched = src->on_subscription_matched;
dst->on_subscription_matched_arg = src->on_subscription_matched_arg;
}
}
void dds_override_inherited_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src)
{
if (dst && src)
dds_combine_listener (dds_combine_listener_override_inherited, dst, src);
}
void dds_inherit_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src)
{
if (dst && src)
dds_combine_listener (dds_combine_listener_merge, dst, src);
}
void dds_merge_listener (dds_listener_t * __restrict dst, const dds_listener_t * __restrict src)
{
if (dst && src)
{
uint32_t inherited = dst->inherited;
dds_combine_listener (dds_combine_listener_merge, dst, src);
dst->inherited = inherited;
}
}
void dds_listener_merge (_Inout_ dds_listener_t * __restrict dst, _In_ const dds_listener_t * __restrict src)
{
dds_merge_listener(dst, src);
} }
/************************************************************************************************ /************************************************************************************************

View file

@ -162,181 +162,189 @@ dds_reader_status_validate(
DDS_RETCODE_OK; DDS_RETCODE_OK;
} }
void void dds_reader_data_available_cb (struct dds_reader *rd)
dds_reader_status_cb(
void *ventity,
const status_cb_data_t *data)
{ {
struct dds_entity * const entity = ventity; /* DATA_AVAILABLE is special in two ways: firstly, it should first try
dds_reader *rd; DATA_ON_READERS on the line of ancestors, and if not consumed set the
dds__retcode_t rc; status on the subscriber; secondly it is the only one for which
void *metrics = NULL; overhead really matters. Otherwise, it is pretty much like
dds_reader_status_cb. */
struct dds_listener const * const lst = &rd->m_entity.m_listener;
dds_entity * const sub = rd->m_entity.m_parent;
/* When data is NULL, it means that the DDSI reader is deleted. */ os_mutexLock (&rd->m_entity.m_observers_lock);
if (data == NULL) { while (rd->m_entity.m_cb_count > 0)
/* Release the initial claim that was done during the create. This os_condWait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock);
* will indicate that further API deletion is now possible. */ rd->m_entity.m_cb_count++;
ut_handle_release(entity->m_hdl, ((dds_entity*)entity)->m_hdllink);
return; if (lst->on_data_on_readers)
{
os_mutexUnlock (&rd->m_entity.m_observers_lock);
os_mutexLock (&sub->m_observers_lock);
while (sub->m_cb_count > 0)
os_condWait (&sub->m_observers_cond, &sub->m_observers_lock);
sub->m_cb_count++;
os_mutexUnlock (&sub->m_observers_lock);
lst->on_data_on_readers (sub->m_hdl, lst->on_data_on_readers_arg);
os_mutexLock (&rd->m_entity.m_observers_lock);
os_mutexLock (&sub->m_observers_lock);
sub->m_cb_count--;
os_condBroadcast (&sub->m_observers_cond);
os_mutexUnlock (&sub->m_observers_lock);
}
else if (rd->m_entity.m_listener.on_data_available)
{
os_mutexUnlock (&rd->m_entity.m_observers_lock);
lst->on_data_available (rd->m_entity.m_hdl, lst->on_data_available_arg);
os_mutexLock (&rd->m_entity.m_observers_lock);
}
else
{
dds_entity_status_set (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
os_mutexLock (&sub->m_observers_lock);
dds_entity_status_set (sub, DDS_DATA_ON_READERS_STATUS);
os_mutexUnlock (&sub->m_observers_lock);
}
rd->m_entity.m_cb_count--;
os_condBroadcast (&rd->m_entity.m_observers_cond);
os_mutexUnlock (&rd->m_entity.m_observers_lock);
}
void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
{
struct dds_entity * const entity = ventity;
/* When data is NULL, it means that the DDSI reader is deleted. */
if (data == NULL)
{
/* Release the initial claim that was done during the create. This
* will indicate that further API deletion is now possible. */
ut_handle_release (entity->m_hdl, entity->m_hdllink);
return;
}
struct dds_listener const * const lst = &entity->m_listener;
bool invoke = false;
void *vst = NULL;
int32_t *reset[2] = { NULL, NULL };
/* DATA_AVAILABLE is handled by dds_reader_data_available_cb */
assert (data->status != DDS_DATA_AVAILABLE_STATUS);
/* Serialize listener invocations -- it is somewhat sad to do this,
but then it may also be unreasonable to expect the application to
handle concurrent invocations of a single listener. The benefit
here is that it means the counters and "change" counters
can safely be incremented and/or reset while releasing
m_observers_lock for the duration of the listener call itself,
and that similarly the listener function and argument pointers
are stable */
os_mutexLock (&entity->m_observers_lock);
while (entity->m_cb_count > 0)
os_condWait (&entity->m_observers_cond, &entity->m_observers_lock);
entity->m_cb_count++;
/* Update status metrics. */
dds_reader * const rd = (dds_reader *) entity;
switch (data->status) {
case DDS_REQUESTED_DEADLINE_MISSED_STATUS: {
struct dds_requested_deadline_missed_status * const st = vst = &rd->m_requested_deadline_missed_status;
st->last_instance_handle = data->handle;
st->total_count++;
st->total_count_change++;
invoke = (lst->on_requested_deadline_missed != 0);
reset[0] = &st->total_count_change;
break;
} }
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: {
if (dds_reader_lock(entity->m_hdl, &rd) != DDS_RETCODE_OK) { struct dds_requested_incompatible_qos_status * const st = vst = &rd->m_requested_incompatible_qos_status;
return; st->total_count++;
st->total_count_change++;
st->last_policy_id = data->extra;
invoke = (lst->on_requested_incompatible_qos != 0);
reset[0] = &st->total_count_change;
break;
} }
assert(&rd->m_entity == entity); case DDS_SAMPLE_LOST_STATUS: {
struct dds_sample_lost_status * const st = vst = &rd->m_sample_lost_status;
/* Reset the status for possible Listener call. st->total_count++;
* When a listener is not called, the status will be set (again). */ st->total_count_change++;
dds_entity_status_reset(entity, data->status); invoke = (lst->on_sample_lost != 0);
reset[0] = &st->total_count_change;
/* Update status metrics. */ break;
switch (data->status) {
case DDS_REQUESTED_DEADLINE_MISSED_STATUS: {
rd->m_requested_deadline_missed_status.total_count++;
rd->m_requested_deadline_missed_status.total_count_change++;
rd->m_requested_deadline_missed_status.last_instance_handle = data->handle;
metrics = &rd->m_requested_deadline_missed_status;
break;
}
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: {
rd->m_requested_incompatible_qos_status.total_count++;
rd->m_requested_incompatible_qos_status.total_count_change++;
rd->m_requested_incompatible_qos_status.last_policy_id = data->extra;
metrics = &rd->m_requested_incompatible_qos_status;
break;
}
case DDS_SAMPLE_LOST_STATUS: {
rd->m_sample_lost_status.total_count++;
rd->m_sample_lost_status.total_count_change++;
metrics = &rd->m_sample_lost_status;
break;
}
case DDS_SAMPLE_REJECTED_STATUS: {
rd->m_sample_rejected_status.total_count++;
rd->m_sample_rejected_status.total_count_change++;
rd->m_sample_rejected_status.last_reason = data->extra;
rd->m_sample_rejected_status.last_instance_handle = data->handle;
metrics = &rd->m_sample_rejected_status;
break;
}
case DDS_DATA_AVAILABLE_STATUS: {
metrics = NULL;
break;
}
case DDS_LIVELINESS_CHANGED_STATUS: {
if (data->add) {
rd->m_liveliness_changed_status.alive_count++;
rd->m_liveliness_changed_status.alive_count_change++;
if (rd->m_liveliness_changed_status.not_alive_count > 0) {
rd->m_liveliness_changed_status.not_alive_count--;
}
} else {
rd->m_liveliness_changed_status.alive_count--;
rd->m_liveliness_changed_status.not_alive_count++;
rd->m_liveliness_changed_status.not_alive_count_change++;
}
rd->m_liveliness_changed_status.last_publication_handle = data->handle;
metrics = &rd->m_liveliness_changed_status;
break;
}
case DDS_SUBSCRIPTION_MATCHED_STATUS: {
if (data->add) {
rd->m_subscription_matched_status.total_count++;
rd->m_subscription_matched_status.total_count_change++;
rd->m_subscription_matched_status.current_count++;
rd->m_subscription_matched_status.current_count_change++;
} else {
rd->m_subscription_matched_status.current_count--;
rd->m_subscription_matched_status.current_count_change--;
}
rd->m_subscription_matched_status.last_publication_handle = data->handle;
metrics = &rd->m_subscription_matched_status;
break;
}
default: assert (0);
} }
case DDS_SAMPLE_REJECTED_STATUS: {
/* The reader needs to be unlocked when propagating the (possible) listener struct dds_sample_rejected_status * const st = vst = &rd->m_sample_rejected_status;
* call because the application should be able to call this reader within st->total_count++;
* the callback function. */ st->total_count_change++;
dds_reader_unlock(rd); st->last_reason = data->extra;
st->last_instance_handle = data->handle;
/* DATA_AVAILABLE is handled differently to normal status changes. */ invoke = (lst->on_sample_rejected != 0);
if (data->status == DDS_DATA_AVAILABLE_STATUS) { reset[0] = &st->total_count_change;
dds_entity *parent = rd->m_entity.m_parent; break;
/* First, try to ship it off to its parent(s) DDS_DATA_ON_READERS_STATUS. */
rc = dds_entity_listener_propagation(parent, parent, DDS_DATA_ON_READERS_STATUS, NULL, true);
if (rc == DDS_RETCODE_NO_DATA) {
/* No parent was interested (NO_DATA == NO_CALL).
* What about myself with DDS_DATA_AVAILABLE_STATUS? */
rc = dds_entity_listener_propagation(entity, entity, DDS_DATA_AVAILABLE_STATUS, NULL, false);
}
if ( rc == DDS_RETCODE_NO_DATA ) {
/* Nobody was interested (NO_DATA == NO_CALL). Set the status on the subscriber. */
dds_entity_status_set(parent, DDS_DATA_ON_READERS_STATUS);
/* Notify possible interested observers of the subscriber. */
dds_entity_status_signal(parent);
}
} else {
/* Is anybody interested within the entity hierarchy through listeners? */
rc = dds_entity_listener_propagation(entity, entity, data->status, metrics, true);
} }
case DDS_LIVELINESS_CHANGED_STATUS: {
if (rc == DDS_RETCODE_OK) { struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status;
/* Event was eaten by a listener. */ if (data->add) {
if (dds_reader_lock(entity->m_hdl, &rd) == DDS_RETCODE_OK) { st->alive_count++;
assert(&rd->m_entity == entity); st->alive_count_change++;
if (st->not_alive_count > 0) {
/* Reset the change counts of the metrics. */ st->not_alive_count--;
switch (data->status) {
case DDS_REQUESTED_DEADLINE_MISSED_STATUS: {
rd->m_requested_deadline_missed_status.total_count_change = 0;
break;
}
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: {
rd->m_requested_incompatible_qos_status.total_count_change = 0;
break;
}
case DDS_SAMPLE_LOST_STATUS: {
rd->m_sample_lost_status.total_count_change = 0;
break;
}
case DDS_SAMPLE_REJECTED_STATUS: {
rd->m_sample_rejected_status.total_count_change = 0;
break;
}
case DDS_DATA_AVAILABLE_STATUS: {
/* Nothing to reset. */;
break;
}
case DDS_LIVELINESS_CHANGED_STATUS: {
rd->m_liveliness_changed_status.alive_count_change = 0;
rd->m_liveliness_changed_status.not_alive_count_change = 0;
break;
}
case DDS_SUBSCRIPTION_MATCHED_STATUS: {
rd->m_subscription_matched_status.total_count_change = 0;
rd->m_subscription_matched_status.current_count_change = 0;
break;
}
default: assert (0);
}
dds_reader_unlock(rd);
} else {
/* There's a deletion or closing going on. */
} }
} else if (rc == DDS_RETCODE_NO_DATA) { } else {
/* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status, consider successful. */ st->alive_count--;
dds_entity_status_set(entity, data->status); st->not_alive_count++;
/* Notify possible interested observers. */ st->not_alive_count_change++;
dds_entity_status_signal(entity); }
} else if (rc == DDS_RETCODE_ALREADY_DELETED) { st->last_publication_handle = data->handle;
/* An entity up the hierarchy is being deleted, consider successful. */ invoke = (lst->on_liveliness_changed != 0);
} else { reset[0] = &st->alive_count_change;
/* Something went wrong up the hierarchy. */ reset[1] = &st->not_alive_count_change;
break;
} }
case DDS_SUBSCRIPTION_MATCHED_STATUS: {
struct dds_subscription_matched_status * const st = vst = &rd->m_subscription_matched_status;
if (data->add) {
st->total_count++;
st->total_count_change++;
st->current_count++;
st->current_count_change++;
} else {
st->current_count--;
st->current_count_change--;
}
st->last_publication_handle = data->handle;
invoke = (lst->on_subscription_matched != 0);
reset[0] = &st->total_count_change;
reset[1] = &st->current_count_change;
break;
}
default:
assert (0);
}
if (invoke)
{
os_mutexUnlock (&entity->m_observers_lock);
dds_entity_invoke_listener(entity, data->status, vst);
os_mutexLock (&entity->m_observers_lock);
*reset[0] = 0;
if (reset[1])
*reset[1] = 0;
}
else
{
dds_entity_status_set (entity, data->status);
}
entity->m_cb_count--;
os_condBroadcast (&entity->m_observers_cond);
os_mutexUnlock (&entity->m_observers_lock);
} }
_Pre_satisfies_(((participant_or_subscriber & DDS_ENTITY_KIND_MASK) == DDS_KIND_SUBSCRIBER ) ||\ _Pre_satisfies_(((participant_or_subscriber & DDS_ENTITY_KIND_MASK) == DDS_KIND_SUBSCRIBER ) ||\

View file

@ -146,8 +146,6 @@
"signal_conditions" after releasing the RHC lock. "signal_conditions" after releasing the RHC lock.
*/ */
static const status_cb_data_t dds_rhc_data_avail_cb_data = { DDS_DATA_AVAILABLE_STATUS, 0, 0, true };
/* FIXME: tkmap should perhaps retain data with timestamp set to invalid /* FIXME: tkmap should perhaps retain data with timestamp set to invalid
An invalid timestamp is (logically) unordered with respect to valid An invalid timestamp is (logically) unordered with respect to valid
timestamps, and that would mean BY_SOURCE order could be respected timestamps, and that would mean BY_SOURCE order could be respected
@ -1389,7 +1387,7 @@ bool dds_rhc_store
if (rhc->reader && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) if (rhc->reader && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS))
{ {
os_atomic_inc32 (&rhc->n_cbs); os_atomic_inc32 (&rhc->n_cbs);
dds_reader_status_cb (&rhc->reader->m_entity, &dds_rhc_data_avail_cb_data); dds_reader_data_available_cb (rhc->reader);
os_atomic_dec32 (&rhc->n_cbs); os_atomic_dec32 (&rhc->n_cbs);
} }
} }

View file

@ -97,26 +97,6 @@ dds_subscriber_status_validate(
return ret; return ret;
} }
/*
Set boolean on readers that indicates state of DATA_ON_READERS
status on parent subscriber
*/
static dds_return_t dds_subscriber_status_propagate (dds_entity *sub, uint32_t mask, bool set)
{
if (mask & DDS_DATA_ON_READERS_STATUS)
{
dds_entity *iter = sub->m_children;
while (iter) {
os_mutexLock (&iter->m_mutex);
assert (dds_entity_kind (iter) == DDS_KIND_READER);
((dds_reader*) iter)->m_data_on_readers = set;
os_mutexUnlock (&iter->m_mutex);
iter = iter->m_next;
}
}
return DDS_RETCODE_OK;
}
_Requires_exclusive_lock_held_(participant) _Requires_exclusive_lock_held_(participant)
_Check_return_ dds_entity_t _Check_return_ dds_entity_t
dds__create_subscriber_l( dds__create_subscriber_l(
@ -147,7 +127,6 @@ dds__create_subscriber_l(
subscriber = dds_entity_init(&sub->m_entity, participant, DDS_KIND_SUBSCRIBER, new_qos, listener, DDS_SUBSCRIBER_STATUS_MASK); subscriber = dds_entity_init(&sub->m_entity, participant, DDS_KIND_SUBSCRIBER, new_qos, listener, DDS_SUBSCRIBER_STATUS_MASK);
sub->m_entity.m_deriver.set_qos = dds_subscriber_qos_set; sub->m_entity.m_deriver.set_qos = dds_subscriber_qos_set;
sub->m_entity.m_deriver.validate_status = dds_subscriber_status_validate; sub->m_entity.m_deriver.validate_status = dds_subscriber_status_validate;
sub->m_entity.m_deriver.propagate_status = dds_subscriber_status_propagate;
sub->m_entity.m_deriver.get_instance_hdl = dds_subscriber_instance_hdl; sub->m_entity.m_deriver.get_instance_hdl = dds_subscriber_instance_hdl;
return subscriber; return subscriber;

View file

@ -92,55 +92,29 @@ dds_topic_status_validate(
status (only defined status on a topic). status (only defined status on a topic).
*/ */
static void static void dds_topic_status_cb (struct dds_topic *tp)
dds_topic_status_cb(
struct dds_topic *cb_t)
{ {
dds_topic *topic; struct dds_listener const * const lst = &tp->m_entity.m_listener;
dds__retcode_t rc;
if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) != DDS_RETCODE_OK) { os_mutexLock (&tp->m_entity.m_observers_lock);
return; while (tp->m_entity.m_cb_count > 0)
} os_condWait (&tp->m_entity.m_observers_cond, &tp->m_entity.m_observers_lock);
assert(topic == cb_t); tp->m_entity.m_cb_count++;
/* Reset the status for possible Listener call. tp->m_inconsistent_topic_status.total_count++;
* When a listener is not called, the status will be set (again). */ tp->m_inconsistent_topic_status.total_count_change++;
if (lst->on_inconsistent_topic)
{
os_mutexUnlock (&tp->m_entity.m_observers_lock);
dds_entity_invoke_listener(&tp->m_entity, DDS_INCONSISTENT_TOPIC_STATUS, &tp->m_inconsistent_topic_status);
os_mutexLock (&tp->m_entity.m_observers_lock);
tp->m_inconsistent_topic_status.total_count_change = 0;
}
/* Update status metrics. */ dds_entity_status_set(&tp->m_entity, DDS_INCONSISTENT_TOPIC_STATUS);
topic->m_inconsistent_topic_status.total_count++; tp->m_entity.m_cb_count--;
topic->m_inconsistent_topic_status.total_count_change++; os_condBroadcast (&tp->m_entity.m_observers_cond);
os_mutexUnlock (&tp->m_entity.m_observers_lock);
/* The topic needs to be unlocked when propagating the (possible) listener
* call because the application should be able to call this topic within
* the callback function. */
dds_topic_unlock(topic);
/* Is anybody interested within the entity hierarchy through listeners? */
rc = dds_entity_listener_propagation(&topic->m_entity,
&topic->m_entity,
DDS_INCONSISTENT_TOPIC_STATUS,
&topic->m_inconsistent_topic_status,
true);
if (rc == DDS_RETCODE_OK) {
/* Event was eaten by a listener. */
if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) == DDS_RETCODE_OK) {
/* Reset the change counts of the metrics. */
topic->m_inconsistent_topic_status.total_count_change = 0;
dds_topic_unlock(topic);
}
} else if (rc == DDS_RETCODE_NO_DATA) {
/* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */
dds_entity_status_set(&topic->m_entity, DDS_INCONSISTENT_TOPIC_STATUS);
/* Notify possible interested observers. */
dds_entity_status_signal(&topic->m_entity);
} else if (rc == DDS_RETCODE_ALREADY_DELETED) {
/* An entity up the hierarchy is being deleted; consider it successful. */
} else {
/* Something went wrong up the hierarchy. */
}
} }
struct ddsi_sertopic * struct ddsi_sertopic *

View file

@ -38,18 +38,6 @@ dds_waitset_swap(
*dst = idx; *dst = idx;
} }
static void dds_waitset_signal_entity (dds_waitset *ws)
{
dds_entity *e = &ws->m_entity;
/* When signaling any observers of us through the entity,
* we need to be unlocked. We still have claimed the related
* handle, so possible deletions will be delayed until we
* release it. */
os_mutexUnlock (&e->m_mutex);
dds_entity_status_signal (e);
os_mutexLock (&e->m_mutex);
}
static dds_return_t static dds_return_t
dds_waitset_wait_impl( dds_waitset_wait_impl(
_In_ dds_entity_t waitset, _In_ dds_entity_t waitset,
@ -472,33 +460,25 @@ dds_waitset_wait(
return ret; return ret;
} }
_Pre_satisfies_((waitset & DDS_ENTITY_KIND_MASK) == DDS_KIND_WAITSET) dds_return_t dds_waitset_set_trigger (dds_entity_t waitset, bool trigger)
dds_return_t
dds_waitset_set_trigger(
_In_ dds_entity_t waitset,
_In_ bool trigger)
{ {
dds_waitset *ws; dds_waitset *ws;
dds__retcode_t rc; dds__retcode_t rc;
dds_return_t ret = DDS_RETCODE_OK;
/* Locking the waitset here will delay a possible deletion until it is if ((rc = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
* unlocked. Even when the related mutex is unlocked when we want to send return DDS_ERRNO (rc);
* a signal. */
rc = dds_waitset_lock(waitset, &ws); os_mutexUnlock (&ws->m_entity.m_mutex);
if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Error occurred on locking waitset\n"); os_mutexLock (&ws->m_entity.m_observers_lock);
ret = DDS_ERRNO(rc); if (trigger)
goto fail; dds_entity_status_set (&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS);
} else
if (trigger) { dds_entity_status_reset (&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_set(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); os_mutexUnlock (&ws->m_entity.m_observers_lock);
} else {
dds_entity_status_reset(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); os_mutexLock (&ws->m_entity.m_mutex);
} dds_waitset_unlock (ws);
dds_waitset_signal_entity(ws); return DDS_RETCODE_OK;
dds_waitset_unlock(ws);
fail:
return ret;
} }

View file

@ -65,15 +65,9 @@ dds_writer_status_validate(
then status conditions is not triggered. then status conditions is not triggered.
*/ */
static void static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data)
dds_writer_status_cb(
void *ventity,
const status_cb_data_t *data)
{ {
struct dds_entity * const entity = ventity; struct dds_entity * const entity = ventity;
dds_writer *wr;
dds__retcode_t rc;
void *metrics = NULL;
/* When data is NULL, it means that the writer is deleted. */ /* When data is NULL, it means that the writer is deleted. */
if (data == NULL) if (data == NULL)
@ -84,44 +78,52 @@ dds_writer_status_cb(
return; return;
} }
if (dds_writer_lock (entity->m_hdl, &wr) != DDS_RETCODE_OK) { struct dds_listener const * const lst = &entity->m_listener;
/* There's a deletion or closing going on. */ bool invoke = false;
return; void *vst = NULL;
} int32_t *reset[2] = { NULL, NULL };
assert (&wr->m_entity == entity);
os_mutexLock (&entity->m_observers_lock);
while (entity->m_cb_count > 0)
os_condWait (&entity->m_observers_cond, &entity->m_observers_lock);
entity->m_cb_count++;
/* Reset the status for possible Listener call. /* Reset the status for possible Listener call.
* When a listener is not called, the status will be set (again). */ * When a listener is not called, the status will be set (again). */
dds_entity_status_reset (entity, data->status); dds_entity_status_reset (entity, data->status);
/* Update status metrics. */ /* Update status metrics. */
dds_writer * const wr = (dds_writer *) entity;
switch (data->status) switch (data->status)
{ {
case DDS_OFFERED_DEADLINE_MISSED_STATUS: { case DDS_OFFERED_DEADLINE_MISSED_STATUS: {
struct dds_offered_deadline_missed_status * const st = &wr->m_offered_deadline_missed_status; struct dds_offered_deadline_missed_status * const st = vst = &wr->m_offered_deadline_missed_status;
st->total_count++; st->total_count++;
st->total_count_change++; st->total_count_change++;
st->last_instance_handle = data->handle; st->last_instance_handle = data->handle;
metrics = st; invoke = (lst->on_offered_deadline_missed != 0);
reset[0] = &st->total_count_change;
break; break;
} }
case DDS_LIVELINESS_LOST_STATUS: { case DDS_LIVELINESS_LOST_STATUS: {
struct dds_liveliness_lost_status * const st = &wr->m_liveliness_lost_status; struct dds_liveliness_lost_status * const st = vst = &wr->m_liveliness_lost_status;
st->total_count++; st->total_count++;
st->total_count_change++; st->total_count_change++;
metrics = st; invoke = (lst->on_liveliness_lost != 0);
reset[0] = &st->total_count_change;
break; break;
} }
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: { case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: {
struct dds_offered_incompatible_qos_status * const st = &wr->m_offered_incompatible_qos_status; struct dds_offered_incompatible_qos_status * const st = vst = &wr->m_offered_incompatible_qos_status;
st->total_count++; st->total_count++;
st->total_count_change++; st->total_count_change++;
st->last_policy_id = data->extra; st->last_policy_id = data->extra;
metrics = st; invoke = (lst->on_offered_incompatible_qos != 0);
reset[0] = &st->total_count_change;
break; break;
} }
case DDS_PUBLICATION_MATCHED_STATUS: { case DDS_PUBLICATION_MATCHED_STATUS: {
struct dds_publication_matched_status * const st = &wr->m_publication_matched_status; struct dds_publication_matched_status * const st = vst = &wr->m_publication_matched_status;
if (data->add) { if (data->add) {
st->total_count++; st->total_count++;
st->total_count_change++; st->total_count_change++;
@ -131,69 +133,33 @@ dds_writer_status_cb(
st->current_count--; st->current_count--;
st->current_count_change--; st->current_count_change--;
} }
st->last_subscription_handle = data->handle; wr->m_publication_matched_status.last_subscription_handle = data->handle;
metrics = st; invoke = (lst->on_publication_matched != 0);
reset[0] = &st->total_count_change;
reset[1] = &st->current_count_change;
break; break;
} }
default: default:
assert (0); assert (0);
} }
/* The writer needs to be unlocked when propagating the (possible) listener if (invoke)
* call because the application should be able to call this writer within
* the callback function. */
dds_writer_unlock (wr);
/* Is anybody interested within the entity hierarchy through listeners? */
rc = dds_entity_listener_propagation (entity, entity, data->status, metrics, true);
if (rc == DDS_RETCODE_OK)
{ {
/* Event was eaten by a listener. */ os_mutexUnlock (&entity->m_observers_lock);
if (dds_writer_lock (entity->m_hdl, &wr) == DDS_RETCODE_OK) dds_entity_invoke_listener(entity, data->status, vst);
{ os_mutexLock (&entity->m_observers_lock);
assert (&wr->m_entity == entity); *reset[0] = 0;
if (reset[1])
/* Reset the status. */ *reset[1] = 0;
dds_entity_status_reset (entity, data->status);
/* Reset the change counts of the metrics. */
switch (data->status)
{
case DDS_OFFERED_DEADLINE_MISSED_STATUS:
wr->m_offered_deadline_missed_status.total_count_change = 0;
break;
case DDS_LIVELINESS_LOST_STATUS:
wr->m_liveliness_lost_status.total_count_change = 0;
break;
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS:
wr->m_offered_incompatible_qos_status.total_count_change = 0;
break;
case DDS_PUBLICATION_MATCHED_STATUS:
wr->m_publication_matched_status.total_count_change = 0;
wr->m_publication_matched_status.current_count_change = 0;
break;
default:
assert (0);
}
dds_writer_unlock (wr);
}
}
else if (rc == DDS_RETCODE_NO_DATA)
{
/* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */
dds_entity_status_set (entity, data->status);
/* Notify possible interested observers. */
dds_entity_status_signal (entity);
}
else if (rc == DDS_RETCODE_ALREADY_DELETED)
{
/* An entity up the hierarchy is being deleted; consider it successful. */
} }
else else
{ {
/* Something went wrong up the hierarchy. */ dds_entity_status_set (entity, data->status);
} }
entity->m_cb_count--;
os_condBroadcast (&entity->m_observers_cond);
os_mutexUnlock (&entity->m_observers_lock);
} }
static uint32_t static uint32_t