Invoke listeners with a copy of the status

This changes the status argument of the listener call to a local copy of
the entity's status field, fixing a race between dds_get_xxx_status and
the xxx listener invocations.  In that case there was a window during
which the "change" fields could be reset by the former prior to the
latter getting invoked.

One symptom of this particular race condition is the (very rare) failure
of the liveliness tests for 0 and 1ns lease durations, while waiting for
the writers to all become not alive.  In that particular scenario, the
liveliness changed listener observes alive_count_change and
not_alive_count_change both 0, which it (rightly) considers an error.

A regression test is added that reliably reproduces the problem.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-07-23 15:04:15 +02:00 committed by eboasson
parent fac58ab1a9
commit 79773729d6
5 changed files with 402 additions and 350 deletions

View file

@ -84,7 +84,40 @@ DDS_EXPORT inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) {
DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status); DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status);
DDS_EXPORT void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst); union dds_status_union {
dds_inconsistent_topic_status_t inconsistent_topic;
dds_liveliness_changed_status_t liveliness_changed;
dds_liveliness_lost_status_t liveliness_lost;
dds_offered_deadline_missed_status_t offered_deadline_missed;
dds_offered_incompatible_qos_status_t offered_incompatible_qos;
dds_publication_matched_status_t publication_matched;
dds_requested_deadline_missed_status_t requested_deadline_missed;
dds_requested_incompatible_qos_status_t requested_incompatible_qos;
dds_sample_lost_status_t sample_lost;
dds_sample_rejected_status_t sample_rejected;
dds_subscription_matched_status_t subscription_matched;
};
#define STATUS_CB_IMPL(entity_kind_, name_, NAME_) \
static void status_cb_##name_ (dds_##entity_kind_ * const e, const status_cb_data_t *data, bool enabled) \
{ \
struct dds_listener const * const listener = &e->m_entity.m_listener; \
const bool invoke = (listener->on_##name_ != 0) && enabled; \
union dds_status_union lst; \
update_##name_ (&e->m_##name_##_status, invoke ? &lst.name_ : NULL, data); \
if (invoke) { \
dds_entity_status_reset (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \
e->m_entity.m_cb_pending_count++; \
e->m_entity.m_cb_count++; \
ddsrt_mutex_unlock (&e->m_entity.m_observers_lock); \
listener->on_##name_ (e->m_entity.m_hdllink.hdl, lst.name_, listener->on_##name_##_arg); \
ddsrt_mutex_lock (&e->m_entity.m_observers_lock); \
e->m_entity.m_cb_count--; \
e->m_entity.m_cb_pending_count--; \
} else if (enabled) { \
dds_entity_status_set (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \
} \
}
DDS_EXPORT dds_participant *dds_entity_participant (const dds_entity *e); DDS_EXPORT dds_participant *dds_entity_participant (const dds_entity *e);
DDS_EXPORT const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e); DDS_EXPORT const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e);

View file

@ -940,77 +940,6 @@ dds_return_t dds_get_listener (dds_entity_t entity, dds_listener_t *listener)
} }
} }
void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst)
{
struct dds_listener const * const lst = &entity->m_listener;
switch (which)
{
case DDS_INCONSISTENT_TOPIC_STATUS_ID: {
struct dds_inconsistent_topic_status const * const st = vst;
lst->on_inconsistent_topic (entity->m_hdllink.hdl, *st, lst->on_inconsistent_topic_arg);
break;
}
case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: {
struct dds_requested_deadline_missed_status const * const st = vst;
lst->on_requested_deadline_missed (entity->m_hdllink.hdl, *st, lst->on_requested_deadline_missed_arg);
break;
}
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: {
struct dds_requested_incompatible_qos_status const * const st = vst;
lst->on_requested_incompatible_qos (entity->m_hdllink.hdl, *st, lst->on_requested_incompatible_qos_arg);
break;
}
case DDS_SAMPLE_LOST_STATUS_ID: {
struct dds_sample_lost_status const * const st = vst;
lst->on_sample_lost (entity->m_hdllink.hdl, *st, lst->on_sample_lost_arg);
break;
}
case DDS_SAMPLE_REJECTED_STATUS_ID: {
struct dds_sample_rejected_status const * const st = vst;
lst->on_sample_rejected (entity->m_hdllink.hdl, *st, lst->on_sample_rejected_arg);
break;
}
case DDS_LIVELINESS_CHANGED_STATUS_ID: {
struct dds_liveliness_changed_status const * const st = vst;
lst->on_liveliness_changed (entity->m_hdllink.hdl, *st, lst->on_liveliness_changed_arg);
break;
}
case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: {
struct dds_subscription_matched_status const * const st = vst;
lst->on_subscription_matched (entity->m_hdllink.hdl, *st, lst->on_subscription_matched_arg);
break;
}
case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: {
struct dds_offered_deadline_missed_status const * const st = vst;
lst->on_offered_deadline_missed (entity->m_hdllink.hdl, *st, lst->on_offered_deadline_missed_arg);
break;
}
case DDS_LIVELINESS_LOST_STATUS_ID: {
struct dds_liveliness_lost_status const * const st = vst;
lst->on_liveliness_lost (entity->m_hdllink.hdl, *st, lst->on_liveliness_lost_arg);
break;
}
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: {
struct dds_offered_incompatible_qos_status const * const st = vst;
lst->on_offered_incompatible_qos (entity->m_hdllink.hdl, *st, lst->on_offered_incompatible_qos_arg);
break;
}
case DDS_PUBLICATION_MATCHED_STATUS_ID: {
struct dds_publication_matched_status const * const st = vst;
lst->on_publication_matched (entity->m_hdllink.hdl, *st, lst->on_publication_matched_arg);
break;
}
case DDS_DATA_AVAILABLE_STATUS_ID: {
lst->on_data_available (entity->m_hdllink.hdl, lst->on_data_available_arg);
break;
}
case DDS_DATA_ON_READERS_STATUS_ID: {
lst->on_data_on_readers (entity->m_hdllink.hdl, lst->on_data_on_readers_arg);
break;
}
}
}
static void clear_status_with_listener (struct dds_entity *e) static void clear_status_with_listener (struct dds_entity *e)
{ {
const struct dds_listener *lst = &e->m_listener; const struct dds_listener *lst = &e->m_listener;

View file

@ -174,84 +174,62 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
} }
void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) static void update_requested_deadline_missed (struct dds_requested_deadline_missed_status * __restrict st, struct dds_requested_deadline_missed_status * __restrict lst, const status_cb_data_t *data)
{ {
dds_reader * const rd = ventity;
/* When data is NULL, it means that the DDSI reader is deleted. */
if (data == NULL)
{
/* Release the initial claim that was done during the create. This
* will indicate that further API deletion is now possible. */
ddsrt_mutex_lock (&rd->m_entity.m_mutex);
rd->m_rd = NULL;
ddsrt_cond_broadcast (&rd->m_entity.m_cond);
ddsrt_mutex_unlock (&rd->m_entity.m_mutex);
return;
}
struct dds_listener const * const lst = &rd->m_entity.m_listener;
enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id;
bool invoke = false;
void *vst = NULL;
int32_t *reset[2] = { NULL, NULL };
/* DATA_AVAILABLE is handled by dds_reader_data_available_cb */
assert (status_id != DDS_DATA_AVAILABLE_STATUS_ID);
/* Serialize listener invocations -- it is somewhat sad to do this,
but then it may also be unreasonable to expect the application to
handle concurrent invocations of a single listener. The benefit
here is that it means the counters and "change" counters
can safely be incremented and/or reset while releasing
m_observers_lock for the duration of the listener call itself,
and that similarly the listener function and argument pointers
are stable */
/* FIXME: why do this if no listener is set? */
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
while (rd->m_entity.m_cb_count > 0)
ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock);
/* Update status metrics. */
switch (status_id) {
case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: {
struct dds_requested_deadline_missed_status * const st = vst = &rd->m_requested_deadline_missed_status;
st->last_instance_handle = data->handle; st->last_instance_handle = data->handle;
st->total_count++; st->total_count++;
// always incrementing st->total_count_change, then copying into *lst is
// a bit more than minimal work, but this guarantees the correct value
// also when enabling a listeners after some events have occurred
//
// (same line of reasoning for all of them)
st->total_count_change++; st->total_count_change++;
invoke = (lst->on_requested_deadline_missed != 0); if (lst != NULL)
reset[0] = &st->total_count_change; {
break; *lst = *st;
st->total_count_change = 0;
} }
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: { }
struct dds_requested_incompatible_qos_status * const st = vst = &rd->m_requested_incompatible_qos_status;
st->total_count++; static void update_requested_incompatible_qos (struct dds_requested_incompatible_qos_status * __restrict st, struct dds_requested_incompatible_qos_status * __restrict lst, const status_cb_data_t *data)
st->total_count_change++; {
st->last_policy_id = data->extra; st->last_policy_id = data->extra;
invoke = (lst->on_requested_incompatible_qos != 0);
reset[0] = &st->total_count_change;
break;
}
case DDS_SAMPLE_LOST_STATUS_ID: {
struct dds_sample_lost_status * const st = vst = &rd->m_sample_lost_status;
st->total_count++; st->total_count++;
st->total_count_change++; st->total_count_change++;
invoke = (lst->on_sample_lost != 0); if (lst != NULL)
reset[0] = &st->total_count_change; {
break; *lst = *st;
st->total_count_change = 0;
} }
case DDS_SAMPLE_REJECTED_STATUS_ID: { }
struct dds_sample_rejected_status * const st = vst = &rd->m_sample_rejected_status;
static void update_sample_lost (struct dds_sample_lost_status * __restrict st, struct dds_sample_lost_status * __restrict lst, const status_cb_data_t *data)
{
(void) data;
st->total_count++; st->total_count++;
st->total_count_change++; st->total_count_change++;
if (lst != NULL)
{
*lst = *st;
st->total_count_change = 0;
}
}
static void update_sample_rejected (struct dds_sample_rejected_status * __restrict st, struct dds_sample_rejected_status * __restrict lst, const status_cb_data_t *data)
{
st->last_reason = data->extra; st->last_reason = data->extra;
st->last_instance_handle = data->handle; st->last_instance_handle = data->handle;
invoke = (lst->on_sample_rejected != 0); st->total_count++;
reset[0] = &st->total_count_change; st->total_count_change++;
break; if (lst != NULL)
{
*lst = *st;
st->total_count_change = 0;
} }
case DDS_LIVELINESS_CHANGED_STATUS_ID: { }
struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status;
static void update_liveliness_changed (struct dds_liveliness_changed_status * __restrict st, struct dds_liveliness_changed_status * __restrict lst, const status_cb_data_t *data)
{
DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 && DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 &&
LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE && LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE &&
LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE && LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE &&
@ -260,6 +238,7 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE && LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE &&
(uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < UINT32_MAX); (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < UINT32_MAX);
assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE); assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE);
st->last_publication_handle = data->handle;
switch ((enum liveliness_changed_data_extra) data->extra) switch ((enum liveliness_changed_data_extra) data->extra)
{ {
case LIVELINESS_CHANGED_ADD_ALIVE: case LIVELINESS_CHANGED_ADD_ALIVE:
@ -291,29 +270,92 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
st->alive_count_change++; st->alive_count_change++;
break; break;
} }
st->last_publication_handle = data->handle; if (lst != NULL)
invoke = (lst->on_liveliness_changed != 0); {
reset[0] = &st->alive_count_change; *lst = *st;
reset[1] = &st->not_alive_count_change; st->alive_count_change = 0;
break; st->not_alive_count_change = 0;
} }
case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: { }
struct dds_subscription_matched_status * const st = vst = &rd->m_subscription_matched_status;
static void update_subscription_matched (struct dds_subscription_matched_status * __restrict st, struct dds_subscription_matched_status * __restrict lst, const status_cb_data_t *data)
{
st->last_publication_handle = data->handle;
if (data->add) { if (data->add) {
st->total_count++; st->total_count++;
st->total_count_change++;
st->current_count++; st->current_count++;
st->total_count_change++;
st->current_count_change++; st->current_count_change++;
} else { } else {
st->current_count--; st->current_count--;
st->current_count_change--; st->current_count_change--;
} }
st->last_publication_handle = data->handle; if (lst != NULL)
invoke = (lst->on_subscription_matched != 0); {
reset[0] = &st->total_count_change; *lst = *st;
reset[1] = &st->current_count_change; st->total_count_change = 0;
break; st->current_count_change = 0;
} }
}
STATUS_CB_IMPL (reader, requested_deadline_missed, REQUESTED_DEADLINE_MISSED)
STATUS_CB_IMPL (reader, requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS)
STATUS_CB_IMPL (reader, sample_lost, SAMPLE_LOST)
STATUS_CB_IMPL (reader, sample_rejected, SAMPLE_REJECTED)
STATUS_CB_IMPL (reader, liveliness_changed, LIVELINESS_CHANGED)
STATUS_CB_IMPL (reader, subscription_matched, SUBSCRIPTION_MATCHED)
void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
{
dds_reader * const rd = ventity;
/* When data is NULL, it means that the DDSI reader is deleted. */
if (data == NULL)
{
/* Release the initial claim that was done during the create. This
* will indicate that further API deletion is now possible. */
ddsrt_mutex_lock (&rd->m_entity.m_mutex);
rd->m_rd = NULL;
ddsrt_cond_broadcast (&rd->m_entity.m_cond);
ddsrt_mutex_unlock (&rd->m_entity.m_mutex);
return;
}
/* Serialize listener invocations -- it is somewhat sad to do this,
but then it may also be unreasonable to expect the application to
handle concurrent invocations of a single listener. The benefit
here is that it means the counters and "change" counters
can safely be incremented and/or reset while releasing
m_observers_lock for the duration of the listener call itself,
and that similarly the listener function and argument pointers
are stable */
/* FIXME: why do this if no listener is set? */
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
while (rd->m_entity.m_cb_count > 0)
ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock);
const enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id;
const bool enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)) != 0;
switch (status_id)
{
case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID:
status_cb_requested_deadline_missed (rd, data, enabled);
break;
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID:
status_cb_requested_incompatible_qos (rd, data, enabled);
break;
case DDS_SAMPLE_LOST_STATUS_ID:
status_cb_sample_lost (rd, data, enabled);
break;
case DDS_SAMPLE_REJECTED_STATUS_ID:
status_cb_sample_rejected (rd, data, enabled);
break;
case DDS_LIVELINESS_CHANGED_STATUS_ID:
status_cb_liveliness_changed (rd, data, enabled);
break;
case DDS_SUBSCRIPTION_MATCHED_STATUS_ID:
status_cb_subscription_matched (rd, data, enabled);
break;
case DDS_DATA_ON_READERS_STATUS_ID: case DDS_DATA_ON_READERS_STATUS_ID:
case DDS_DATA_AVAILABLE_STATUS_ID: case DDS_DATA_AVAILABLE_STATUS_ID:
case DDS_INCONSISTENT_TOPIC_STATUS_ID: case DDS_INCONSISTENT_TOPIC_STATUS_ID:
@ -324,29 +366,6 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
assert (0); assert (0);
} }
const uint32_t enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT));
if (!enabled)
{
/* Don't invoke listeners or set status flag if masked */
}
else if (invoke)
{
rd->m_entity.m_cb_pending_count++;
rd->m_entity.m_cb_count++;
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
dds_entity_invoke_listener (&rd->m_entity, status_id, vst);
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
rd->m_entity.m_cb_count--;
rd->m_entity.m_cb_pending_count--;
*reset[0] = 0;
if (reset[1])
*reset[1] = 0;
}
else
{
dds_entity_status_set (&rd->m_entity, (status_mask_t) (1u << status_id));
}
ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond); ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond);
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
} }

View file

@ -46,12 +46,71 @@ static dds_return_t dds_writer_status_validate (uint32_t mask)
return (mask & ~DDS_WRITER_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK; return (mask & ~DDS_WRITER_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK;
} }
/* static void update_offered_deadline_missed (struct dds_offered_deadline_missed_status * __restrict st, struct dds_offered_deadline_missed_status * __restrict lst, const status_cb_data_t *data)
Handler function for all write related status callbacks. May trigger status {
condition or call listener on writer. Each entity has a mask of st->last_instance_handle = data->handle;
supported status types. According to DDS specification, if listener is called st->total_count++;
then status conditions is not triggered. // always incrementing st->total_count_change, then copying into *lst is
*/ // a bit more than minimal work, but this guarantees the correct value
// also when enabling a listeners after some events have occurred
//
// (same line of reasoning for all of them)
st->total_count_change++;
if (lst != NULL)
{
*lst = *st;
st->total_count_change = 0;
}
}
static void update_offered_incompatible_qos (struct dds_offered_incompatible_qos_status * __restrict st, struct dds_offered_incompatible_qos_status * __restrict lst, const status_cb_data_t *data)
{
st->last_policy_id = data->extra;
st->total_count++;
st->total_count_change++;
if (lst != NULL)
{
*lst = *st;
st->total_count_change = 0;
}
}
static void update_liveliness_lost (struct dds_liveliness_lost_status * __restrict st, struct dds_liveliness_lost_status * __restrict lst, const status_cb_data_t *data)
{
(void) data;
st->total_count++;
st->total_count_change++;
if (lst != NULL)
{
*lst = *st;
st->total_count_change = 0;
}
}
static void update_publication_matched (struct dds_publication_matched_status * __restrict st, struct dds_publication_matched_status * __restrict lst, const status_cb_data_t *data)
{
st->last_subscription_handle = data->handle;
if (data->add) {
st->total_count++;
st->current_count++;
st->total_count_change++;
st->current_count_change++;
} else {
st->current_count--;
st->current_count_change--;
}
if (lst != NULL)
{
*lst = *st;
st->total_count_change = 0;
st->current_count_change = 0;
}
}
STATUS_CB_IMPL (writer, offered_deadline_missed, OFFERED_DEADLINE_MISSED)
STATUS_CB_IMPL (writer, offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS)
STATUS_CB_IMPL (writer, liveliness_lost, LIVELINESS_LOST)
STATUS_CB_IMPL (writer, publication_matched, PUBLICATION_MATCHED)
void dds_writer_status_cb (void *entity, const struct status_cb_data *data) void dds_writer_status_cb (void *entity, const struct status_cb_data *data)
{ {
@ -69,67 +128,27 @@ void dds_writer_status_cb (void *entity, const struct status_cb_data *data)
return; return;
} }
struct dds_listener const * const lst = &wr->m_entity.m_listener;
enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id;
bool invoke = false;
void *vst = NULL;
int32_t *reset[2] = { NULL, NULL };
/* FIXME: why wait if no listener is set? */ /* FIXME: why wait if no listener is set? */
ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); ddsrt_mutex_lock (&wr->m_entity.m_observers_lock);
while (wr->m_entity.m_cb_count > 0) while (wr->m_entity.m_cb_count > 0)
ddsrt_cond_wait (&wr->m_entity.m_observers_cond, &wr->m_entity.m_observers_lock); ddsrt_cond_wait (&wr->m_entity.m_observers_cond, &wr->m_entity.m_observers_lock);
/* Reset the status for possible Listener call. const enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id;
* When a listener is not called, the status will be set (again). */ const bool enabled = (ddsrt_atomic_ld32 (&wr->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)) != 0;
dds_entity_status_reset (&wr->m_entity, (status_mask_t) (1u << status_id));
/* Update status metrics. */
switch (status_id) switch (status_id)
{ {
case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: { case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID:
struct dds_offered_deadline_missed_status * const st = vst = &wr->m_offered_deadline_missed_status; status_cb_offered_deadline_missed (wr, data, enabled);
st->total_count++;
st->total_count_change++;
st->last_instance_handle = data->handle;
invoke = (lst->on_offered_deadline_missed != 0);
reset[0] = &st->total_count_change;
break; break;
} case DDS_LIVELINESS_LOST_STATUS_ID:
case DDS_LIVELINESS_LOST_STATUS_ID: { status_cb_liveliness_lost (wr, data, enabled);
struct dds_liveliness_lost_status * const st = vst = &wr->m_liveliness_lost_status;
st->total_count++;
st->total_count_change++;
invoke = (lst->on_liveliness_lost != 0);
reset[0] = &st->total_count_change;
break; break;
} case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID:
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: { status_cb_offered_incompatible_qos (wr, data, enabled);
struct dds_offered_incompatible_qos_status * const st = vst = &wr->m_offered_incompatible_qos_status;
st->total_count++;
st->total_count_change++;
st->last_policy_id = data->extra;
invoke = (lst->on_offered_incompatible_qos != 0);
reset[0] = &st->total_count_change;
break; break;
} case DDS_PUBLICATION_MATCHED_STATUS_ID:
case DDS_PUBLICATION_MATCHED_STATUS_ID: { status_cb_publication_matched (wr, data, enabled);
struct dds_publication_matched_status * const st = vst = &wr->m_publication_matched_status;
if (data->add) {
st->total_count++;
st->total_count_change++;
st->current_count++;
st->current_count_change++;
} else {
st->current_count--;
st->current_count_change--;
}
wr->m_publication_matched_status.last_subscription_handle = data->handle;
invoke = (lst->on_publication_matched != 0);
reset[0] = &st->total_count_change;
reset[1] = &st->current_count_change;
break; break;
}
case DDS_DATA_AVAILABLE_STATUS_ID: case DDS_DATA_AVAILABLE_STATUS_ID:
case DDS_INCONSISTENT_TOPIC_STATUS_ID: case DDS_INCONSISTENT_TOPIC_STATUS_ID:
case DDS_SAMPLE_LOST_STATUS_ID: case DDS_SAMPLE_LOST_STATUS_ID:
@ -142,29 +161,6 @@ void dds_writer_status_cb (void *entity, const struct status_cb_data *data)
assert (0); assert (0);
} }
const uint32_t enabled = (ddsrt_atomic_ld32 (&wr->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT));
if (enabled == 0)
{
/* Don't invoke listeners or set status flag if masked */
}
else if (invoke)
{
wr->m_entity.m_cb_pending_count++;
wr->m_entity.m_cb_count++;
ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock);
dds_entity_invoke_listener (&wr->m_entity, status_id, vst);
ddsrt_mutex_lock (&wr->m_entity.m_observers_lock);
wr->m_entity.m_cb_count--;
wr->m_entity.m_cb_pending_count--;
*reset[0] = 0;
if (reset[1])
*reset[1] = 0;
}
else
{
dds_entity_status_set (&wr->m_entity, (status_mask_t) (1u << status_id));
}
ddsrt_cond_broadcast (&wr->m_entity.m_observers_cond); ddsrt_cond_broadcast (&wr->m_entity.m_observers_cond);
ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock);
} }

View file

@ -67,10 +67,6 @@ static void liveliness_init(void)
static void liveliness_fini(void) static void liveliness_fini(void)
{ {
dds_delete(g_sub_subscriber);
dds_delete(g_pub_publisher);
dds_delete(g_sub_participant);
dds_delete(g_pub_participant);
dds_delete(g_sub_domain); dds_delete(g_sub_domain);
dds_delete(g_pub_domain); dds_delete(g_pub_domain);
} }
@ -978,31 +974,17 @@ static unsigned get_and_check_status (dds_entity_t reader, dds_entity_t writer_a
return result; return result;
} }
static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader) static void setup_reader_zero_or_one (dds_entity_t *reader, dds_entity_t *writer_active, dds_entity_t *waitset, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader, struct liveliness_changed_state *listener_state)
{ {
const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 50 : 5;
dds_entity_t pub_topic; dds_entity_t pub_topic;
dds_entity_t sub_topic = 0; dds_entity_t sub_topic = 0;
dds_entity_t reader;
dds_entity_t writer_active; /* writing */
dds_entity_t writer_inactive; /* not writing, liveliness should still toggle */ dds_entity_t writer_inactive; /* not writing, liveliness should still toggle */
dds_entity_t waitset;
dds_listener_t *listener;
dds_qos_t *qos; dds_qos_t *qos;
dds_return_t rc; dds_return_t rc;
struct dds_liveliness_changed_status lstatus;
char name[100]; char name[100];
Space_Type1 sample = {1, 0, 0};
struct liveliness_changed_state listener_state = {
.weirdness = false,
.w0_handle = 0,
.w0_alive = 0,
.w0_not_alive = 0,
};
ddsrt_mutex_init (&listener_state.lock);
waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE); *waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE);
CU_ASSERT_FATAL(waitset > 0); CU_ASSERT_FATAL(*waitset > 0);
qos = dds_create_qos(); qos = dds_create_qos();
CU_ASSERT_FATAL(qos != NULL); CU_ASSERT_FATAL(qos != NULL);
@ -1020,22 +1002,22 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
/* reader liveliness is always automatic/infinity */ /* reader liveliness is always automatic/infinity */
dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY);
reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, qos, NULL); *reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, qos, NULL);
CU_ASSERT_FATAL(reader > 0); CU_ASSERT_FATAL(*reader > 0);
rc = dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); rc = dds_set_status_mask(*reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
rc = dds_waitset_attach(waitset, reader, reader); rc = dds_waitset_attach(*waitset, *reader, *reader);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
/* writer liveliness varies */ /* writer liveliness varies */
dds_qset_liveliness(qos, lkind, ldur); dds_qset_liveliness(qos, lkind, ldur);
writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); *writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL);
CU_ASSERT_FATAL(writer_active > 0); CU_ASSERT_FATAL(*writer_active > 0);
writer_inactive = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); writer_inactive = dds_create_writer(g_pub_participant, pub_topic, qos, NULL);
CU_ASSERT_FATAL(writer_inactive > 0); CU_ASSERT_FATAL(writer_inactive > 0);
rc = dds_set_status_mask(writer_active, DDS_PUBLICATION_MATCHED_STATUS); rc = dds_set_status_mask(*writer_active, DDS_PUBLICATION_MATCHED_STATUS);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
rc = dds_waitset_attach(waitset, writer_active, writer_active); rc = dds_waitset_attach(*waitset, *writer_active, *writer_active);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
dds_delete_qos(qos); dds_delete_qos(qos);
@ -1046,32 +1028,80 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
bool initial_sample_written = false, initial_sample_received = false; bool initial_sample_written = false, initial_sample_received = false;
do do
{ {
status = get_and_check_status (reader, writer_active); status = get_and_check_status (*reader, *writer_active);
if (status & STATUS_DATA) if (status & STATUS_DATA)
initial_sample_received = true; initial_sample_received = true;
if (status & STATUS_SYNCED && !initial_sample_written) if (status & STATUS_SYNCED && !initial_sample_written)
{ {
rc = dds_write(writer_active, &sample); Space_Type1 sample = {1, 0, 0};
rc = dds_write(*writer_active, &sample);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
initial_sample_written = true; initial_sample_written = true;
} }
if (status & STATUS_SYNCED && initial_sample_received) if (status & STATUS_SYNCED && initial_sample_received)
break; break;
rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(5)); rc = dds_waitset_wait(*waitset, NULL, 0, DDS_SECS(5));
if (rc < 1) if (rc < 1)
{ {
get_and_check_status (reader, writer_active); get_and_check_status (*reader, *writer_active);
CU_ASSERT_FATAL(rc >= 1); CU_ASSERT_FATAL(rc >= 1);
} }
} while (1); } while (1);
/* switch to using a listener: those allow us to observe all events */ /* switch to using a listener: those allow us to observe all events */
listener = dds_create_listener (&listener_state); dds_listener_t *listener;
listener = dds_create_listener (listener_state);
dds_lset_liveliness_changed(listener, liveliness_changed_listener); dds_lset_liveliness_changed(listener, liveliness_changed_listener);
rc = dds_set_listener (reader, listener); rc = dds_set_listener (*reader, listener);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
dds_delete_listener (listener); dds_delete_listener (listener);
}
static void wait_for_notalive (dds_entity_t reader, struct liveliness_changed_state *listener_state)
{
struct dds_liveliness_changed_status lstatus;
int retries = 100;
dds_return_t rc;
rc = dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count);
ddsrt_mutex_lock (&listener_state->lock);
printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state->w0_handle, listener_state->w0_alive, listener_state->w0_not_alive);
CU_ASSERT_FATAL(!listener_state->weirdness);
CU_ASSERT_FATAL(listener_state->w0_handle != 0);
while (listener_state->w0_not_alive < listener_state->w0_alive && retries-- > 0)
{
ddsrt_mutex_unlock(&listener_state->lock);
dds_sleepfor(DDS_MSECS(10));
rc = dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
ddsrt_mutex_lock(&listener_state->lock);
}
printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count);
printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state->w0_handle, listener_state->w0_alive, listener_state->w0_not_alive);
CU_ASSERT_FATAL(listener_state->w0_alive == listener_state->w0_not_alive);
ddsrt_mutex_unlock(&listener_state->lock);
}
static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader)
{
const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 50 : 5;
dds_entity_t reader;
dds_entity_t writer_active;
dds_entity_t waitset;
dds_return_t rc;
Space_Type1 sample = {1, 0, 0};
struct liveliness_changed_state listener_state = {
.weirdness = false,
.w0_handle = 0,
.w0_alive = 0,
.w0_not_alive = 0,
};
ddsrt_mutex_init (&listener_state.lock);
setup_reader_zero_or_one (&reader, &writer_active, &waitset, lkind, ldur, remote_reader, &listener_state);
/* write as fast as possible - we don't expect this to cause the writers /* write as fast as possible - we don't expect this to cause the writers
to gain and lose liveliness once for each sample, but it should have to gain and lose liveliness once for each sample, but it should have
@ -1102,28 +1132,9 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
CU_ASSERT_FATAL(cnt == nsamples + 1); CU_ASSERT_FATAL(cnt == nsamples + 1);
/* transition to not alive is not necessarily immediate */ /* transition to not alive is not necessarily immediate */
{ wait_for_notalive (reader, &listener_state);
int retries = 100;
rc = dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count);
ddsrt_mutex_lock (&listener_state.lock);
printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive);
CU_ASSERT_FATAL(!listener_state.weirdness);
CU_ASSERT_FATAL(listener_state.w0_handle != 0);
while (listener_state.w0_not_alive < listener_state.w0_alive && retries-- > 0)
{ {
ddsrt_mutex_unlock(&listener_state.lock);
dds_sleepfor(DDS_MSECS(10));
rc = dds_get_liveliness_changed_status(reader, &lstatus);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
ddsrt_mutex_lock(&listener_state.lock);
}
printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count);
printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive);
CU_ASSERT_FATAL(listener_state.w0_alive == listener_state.w0_not_alive);
uint32_t exp_alive; uint32_t exp_alive;
if (sleep == 0) if (sleep == 0)
exp_alive = 1; /* if not sleeping, it's ok if the transition happens only once */ exp_alive = 1; /* if not sleeping, it's ok if the transition happens only once */
@ -1131,28 +1142,15 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines
exp_alive = nsamples / 3; /* if sleeping briefly, expect the a good number of writes to toggle liveliness */ exp_alive = nsamples / 3; /* if sleeping briefly, expect the a good number of writes to toggle liveliness */
else else
exp_alive = nsamples - nsamples / 5; /* if sleeping, expect the vast majority (80%) of the writes to toggle liveliness */ exp_alive = nsamples - nsamples / 5; /* if sleeping, expect the vast majority (80%) of the writes to toggle liveliness */
ddsrt_mutex_lock(&listener_state.lock);
printf("check w0_alive %d >= %d\n", listener_state.w0_alive, exp_alive); printf("check w0_alive %d >= %d\n", listener_state.w0_alive, exp_alive);
CU_ASSERT_FATAL(listener_state.w0_alive >= exp_alive); CU_ASSERT_FATAL(listener_state.w0_alive >= exp_alive);
ddsrt_mutex_unlock(&listener_state.lock); ddsrt_mutex_unlock(&listener_state.lock);
} }
/* cleanup */
rc = dds_delete(waitset); rc = dds_delete(waitset);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
rc = dds_delete(reader); dds_set_listener (reader, NULL); // listener must not be invoked anymore
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
rc = dds_delete(writer_active);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
rc = dds_delete(writer_inactive);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
if (remote_reader)
{
rc = dds_delete(sub_topic);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
}
rc = dds_delete(pub_topic);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
ddsrt_mutex_destroy(&listener_state.lock); ddsrt_mutex_destroy(&listener_state.lock);
} }
@ -1182,3 +1180,80 @@ CU_Test(ddsc_liveliness, lease_duration_zero_or_one, .init = liveliness_init, .f
} }
} }
} }
struct getstatus_thread_arg {
dds_entity_t rd;
ddsrt_atomic_uint32_t stop;
};
static uint32_t getstatus_thread (void *varg)
{
struct getstatus_thread_arg *arg = varg;
while (!ddsrt_atomic_ld32 (&arg->stop))
{
dds_liveliness_changed_status_t s;
dds_return_t rc;
rc = dds_get_liveliness_changed_status (arg->rd, &s);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
/* change counts must be 0 because the listener gets invoked all the time */
if (s.alive_count_change != 0 || s.not_alive_count_change != 0)
{
ddsrt_atomic_st32 (&arg->stop, 1);
return 0;
}
}
return 1;
}
CU_Test(ddsc_liveliness, listener_vs_getstatus, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30)
{
dds_entity_t reader;
dds_entity_t writer_active;
dds_entity_t waitset;
dds_return_t rc;
Space_Type1 sample = {1, 0, 0};
struct liveliness_changed_state listener_state = {
.weirdness = false,
.w0_handle = 0,
.w0_alive = 0,
.w0_not_alive = 0,
};
ddsrt_mutex_init (&listener_state.lock);
setup_reader_zero_or_one (&reader, &writer_active, &waitset, DDS_LIVELINESS_MANUAL_BY_TOPIC, 1, false, &listener_state);
/* start a thread that continually calls dds_get_liveliness_changed_status: that resets
the change counters, but that activity should not be visible in the listener argument */
ddsrt_thread_t tid;
ddsrt_threadattr_t tattr;
ddsrt_threadattr_init(&tattr);
struct getstatus_thread_arg targ = { .rd = reader, .stop = DDSRT_ATOMIC_UINT32_INIT (0) };
rc = ddsrt_thread_create(&tid, "getstatus", &tattr, getstatus_thread, &targ);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
/* write as fast as possible - we don't expect this to cause the writers
to gain and lose liveliness once for each sample, but it should have
become alive at least once and fall back to not alive afterward */
dds_time_t tnow = dds_time ();
const dds_time_t tend = tnow + DDS_SECS(3);
while (tnow < tend && !ddsrt_atomic_ld32 (&targ.stop))
{
rc = dds_write(writer_active, &sample);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
tnow = dds_time ();
}
ddsrt_atomic_st32 (&targ.stop, 1);
uint32_t get_status_ok;
rc = ddsrt_thread_join (tid, &get_status_ok);
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
CU_ASSERT_FATAL (get_status_ok != 0);
/* transition to not alive is not necessarily immediate */
wait_for_notalive (reader, &listener_state);
rc = dds_delete(waitset);
CU_ASSERT_FATAL(rc == DDS_RETCODE_OK);
dds_set_listener (reader, NULL); // listener must not be invoked anymore
ddsrt_mutex_destroy(&listener_state.lock);
}