From 79773729d6bc3b5b21599d88c1664f8a21d4d0bf Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Thu, 23 Jul 2020 15:04:15 +0200 Subject: [PATCH] Invoke listeners with a copy of the status This changes the status argument of the listener call to a local copy of the entity's status field, fixing a race between dds_get_xxx_status and the xxx listener invocations. In that case there was a window during which the "change" fields could be reset by the former prior to the latter getting invoked. One symptom of this particular race condition is the (very rare) failure of the liveliness tests for 0 and 1ns lease durations, while waiting for the writers to all become not alive. In that particular scenario, the liveliness changed listener observes alive_count_change and not_alive_count_change both 0, which it (rightly) considers an error. A regression test is added that reliably reproduces the problem. Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds__entity.h | 35 +++- src/core/ddsc/src/dds_entity.c | 71 -------- src/core/ddsc/src/dds_reader.c | 275 +++++++++++++++++-------------- src/core/ddsc/src/dds_writer.c | 154 +++++++++-------- src/core/ddsc/tests/liveliness.c | 217 ++++++++++++++++-------- 5 files changed, 402 insertions(+), 350 deletions(-) diff --git a/src/core/ddsc/src/dds__entity.h b/src/core/ddsc/src/dds__entity.h index 23025c3..c94ff3e 100644 --- a/src/core/ddsc/src/dds__entity.h +++ b/src/core/ddsc/src/dds__entity.h @@ -84,7 +84,40 @@ DDS_EXPORT inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) { DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status); -DDS_EXPORT void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst); +union dds_status_union { + dds_inconsistent_topic_status_t inconsistent_topic; + dds_liveliness_changed_status_t liveliness_changed; + dds_liveliness_lost_status_t liveliness_lost; + dds_offered_deadline_missed_status_t offered_deadline_missed; + dds_offered_incompatible_qos_status_t offered_incompatible_qos; + dds_publication_matched_status_t publication_matched; + dds_requested_deadline_missed_status_t requested_deadline_missed; + dds_requested_incompatible_qos_status_t requested_incompatible_qos; + dds_sample_lost_status_t sample_lost; + dds_sample_rejected_status_t sample_rejected; + dds_subscription_matched_status_t subscription_matched; +}; + +#define STATUS_CB_IMPL(entity_kind_, name_, NAME_) \ + static void status_cb_##name_ (dds_##entity_kind_ * const e, const status_cb_data_t *data, bool enabled) \ + { \ + struct dds_listener const * const listener = &e->m_entity.m_listener; \ + const bool invoke = (listener->on_##name_ != 0) && enabled; \ + union dds_status_union lst; \ + update_##name_ (&e->m_##name_##_status, invoke ? &lst.name_ : NULL, data); \ + if (invoke) { \ + dds_entity_status_reset (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \ + e->m_entity.m_cb_pending_count++; \ + e->m_entity.m_cb_count++; \ + ddsrt_mutex_unlock (&e->m_entity.m_observers_lock); \ + listener->on_##name_ (e->m_entity.m_hdllink.hdl, lst.name_, listener->on_##name_##_arg); \ + ddsrt_mutex_lock (&e->m_entity.m_observers_lock); \ + e->m_entity.m_cb_count--; \ + e->m_entity.m_cb_pending_count--; \ + } else if (enabled) { \ + dds_entity_status_set (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \ + } \ + } DDS_EXPORT dds_participant *dds_entity_participant (const dds_entity *e); DDS_EXPORT const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e); diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 1946ec0..7b64357 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -940,77 +940,6 @@ dds_return_t dds_get_listener (dds_entity_t entity, dds_listener_t *listener) } } -void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst) -{ - struct dds_listener const * const lst = &entity->m_listener; - switch (which) - { - case DDS_INCONSISTENT_TOPIC_STATUS_ID: { - struct dds_inconsistent_topic_status const * const st = vst; - lst->on_inconsistent_topic (entity->m_hdllink.hdl, *st, lst->on_inconsistent_topic_arg); - break; - } - case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: { - struct dds_requested_deadline_missed_status const * const st = vst; - lst->on_requested_deadline_missed (entity->m_hdllink.hdl, *st, lst->on_requested_deadline_missed_arg); - break; - } - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_requested_incompatible_qos_status const * const st = vst; - lst->on_requested_incompatible_qos (entity->m_hdllink.hdl, *st, lst->on_requested_incompatible_qos_arg); - break; - } - case DDS_SAMPLE_LOST_STATUS_ID: { - struct dds_sample_lost_status const * const st = vst; - lst->on_sample_lost (entity->m_hdllink.hdl, *st, lst->on_sample_lost_arg); - break; - } - case DDS_SAMPLE_REJECTED_STATUS_ID: { - struct dds_sample_rejected_status const * const st = vst; - lst->on_sample_rejected (entity->m_hdllink.hdl, *st, lst->on_sample_rejected_arg); - break; - } - case DDS_LIVELINESS_CHANGED_STATUS_ID: { - struct dds_liveliness_changed_status const * const st = vst; - lst->on_liveliness_changed (entity->m_hdllink.hdl, *st, lst->on_liveliness_changed_arg); - break; - } - case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: { - struct dds_subscription_matched_status const * const st = vst; - lst->on_subscription_matched (entity->m_hdllink.hdl, *st, lst->on_subscription_matched_arg); - break; - } - case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: { - struct dds_offered_deadline_missed_status const * const st = vst; - lst->on_offered_deadline_missed (entity->m_hdllink.hdl, *st, lst->on_offered_deadline_missed_arg); - break; - } - case DDS_LIVELINESS_LOST_STATUS_ID: { - struct dds_liveliness_lost_status const * const st = vst; - lst->on_liveliness_lost (entity->m_hdllink.hdl, *st, lst->on_liveliness_lost_arg); - break; - } - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_offered_incompatible_qos_status const * const st = vst; - lst->on_offered_incompatible_qos (entity->m_hdllink.hdl, *st, lst->on_offered_incompatible_qos_arg); - break; - } - case DDS_PUBLICATION_MATCHED_STATUS_ID: { - struct dds_publication_matched_status const * const st = vst; - lst->on_publication_matched (entity->m_hdllink.hdl, *st, lst->on_publication_matched_arg); - break; - } - case DDS_DATA_AVAILABLE_STATUS_ID: { - lst->on_data_available (entity->m_hdllink.hdl, lst->on_data_available_arg); - break; - } - case DDS_DATA_ON_READERS_STATUS_ID: { - lst->on_data_on_readers (entity->m_hdllink.hdl, lst->on_data_on_readers_arg); - break; - } - } -} - static void clear_status_with_listener (struct dds_entity *e) { const struct dds_listener *lst = &e->m_listener; diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 4c1f999..0901b0c 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -174,6 +174,137 @@ void dds_reader_data_available_cb (struct dds_reader *rd) ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); } +static void update_requested_deadline_missed (struct dds_requested_deadline_missed_status * __restrict st, struct dds_requested_deadline_missed_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_instance_handle = data->handle; + st->total_count++; + // always incrementing st->total_count_change, then copying into *lst is + // a bit more than minimal work, but this guarantees the correct value + // also when enabling a listeners after some events have occurred + // + // (same line of reasoning for all of them) + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_requested_incompatible_qos (struct dds_requested_incompatible_qos_status * __restrict st, struct dds_requested_incompatible_qos_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_policy_id = data->extra; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_sample_lost (struct dds_sample_lost_status * __restrict st, struct dds_sample_lost_status * __restrict lst, const status_cb_data_t *data) +{ + (void) data; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_sample_rejected (struct dds_sample_rejected_status * __restrict st, struct dds_sample_rejected_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_reason = data->extra; + st->last_instance_handle = data->handle; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_liveliness_changed (struct dds_liveliness_changed_status * __restrict st, struct dds_liveliness_changed_status * __restrict lst, const status_cb_data_t *data) +{ + DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 && + LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE && + LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE && + LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE && + LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE && + LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE && + (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < UINT32_MAX); + assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE); + st->last_publication_handle = data->handle; + switch ((enum liveliness_changed_data_extra) data->extra) + { + case LIVELINESS_CHANGED_ADD_ALIVE: + st->alive_count++; + st->alive_count_change++; + break; + case LIVELINESS_CHANGED_ADD_NOT_ALIVE: + st->not_alive_count++; + st->not_alive_count_change++; + break; + case LIVELINESS_CHANGED_REMOVE_NOT_ALIVE: + st->not_alive_count--; + st->not_alive_count_change--; + break; + case LIVELINESS_CHANGED_REMOVE_ALIVE: + st->alive_count--; + st->alive_count_change--; + break; + case LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE: + st->alive_count--; + st->alive_count_change--; + st->not_alive_count++; + st->not_alive_count_change++; + break; + case LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE: + st->not_alive_count--; + st->not_alive_count_change--; + st->alive_count++; + st->alive_count_change++; + break; + } + if (lst != NULL) + { + *lst = *st; + st->alive_count_change = 0; + st->not_alive_count_change = 0; + } +} + +static void update_subscription_matched (struct dds_subscription_matched_status * __restrict st, struct dds_subscription_matched_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_publication_handle = data->handle; + if (data->add) { + st->total_count++; + st->current_count++; + st->total_count_change++; + st->current_count_change++; + } else { + st->current_count--; + st->current_count_change--; + } + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + st->current_count_change = 0; + } +} + +STATUS_CB_IMPL (reader, requested_deadline_missed, REQUESTED_DEADLINE_MISSED) +STATUS_CB_IMPL (reader, requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS) +STATUS_CB_IMPL (reader, sample_lost, SAMPLE_LOST) +STATUS_CB_IMPL (reader, sample_rejected, SAMPLE_REJECTED) +STATUS_CB_IMPL (reader, liveliness_changed, LIVELINESS_CHANGED) +STATUS_CB_IMPL (reader, subscription_matched, SUBSCRIPTION_MATCHED) + void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) { dds_reader * const rd = ventity; @@ -190,15 +321,6 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) return; } - struct dds_listener const * const lst = &rd->m_entity.m_listener; - enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; - bool invoke = false; - void *vst = NULL; - int32_t *reset[2] = { NULL, NULL }; - - /* DATA_AVAILABLE is handled by dds_reader_data_available_cb */ - assert (status_id != DDS_DATA_AVAILABLE_STATUS_ID); - /* Serialize listener invocations -- it is somewhat sad to do this, but then it may also be unreasonable to expect the application to handle concurrent invocations of a single listener. The benefit @@ -212,108 +334,28 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) while (rd->m_entity.m_cb_count > 0) ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock); - /* Update status metrics. */ - switch (status_id) { - case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: { - struct dds_requested_deadline_missed_status * const st = vst = &rd->m_requested_deadline_missed_status; - st->last_instance_handle = data->handle; - st->total_count++; - st->total_count_change++; - invoke = (lst->on_requested_deadline_missed != 0); - reset[0] = &st->total_count_change; + const enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; + const bool enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)) != 0; + switch (status_id) + { + case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: + status_cb_requested_deadline_missed (rd, data, enabled); break; - } - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_requested_incompatible_qos_status * const st = vst = &rd->m_requested_incompatible_qos_status; - st->total_count++; - st->total_count_change++; - st->last_policy_id = data->extra; - invoke = (lst->on_requested_incompatible_qos != 0); - reset[0] = &st->total_count_change; + case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: + status_cb_requested_incompatible_qos (rd, data, enabled); break; - } - case DDS_SAMPLE_LOST_STATUS_ID: { - struct dds_sample_lost_status * const st = vst = &rd->m_sample_lost_status; - st->total_count++; - st->total_count_change++; - invoke = (lst->on_sample_lost != 0); - reset[0] = &st->total_count_change; + case DDS_SAMPLE_LOST_STATUS_ID: + status_cb_sample_lost (rd, data, enabled); break; - } - case DDS_SAMPLE_REJECTED_STATUS_ID: { - struct dds_sample_rejected_status * const st = vst = &rd->m_sample_rejected_status; - st->total_count++; - st->total_count_change++; - st->last_reason = data->extra; - st->last_instance_handle = data->handle; - invoke = (lst->on_sample_rejected != 0); - reset[0] = &st->total_count_change; + case DDS_SAMPLE_REJECTED_STATUS_ID: + status_cb_sample_rejected (rd, data, enabled); break; - } - case DDS_LIVELINESS_CHANGED_STATUS_ID: { - struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status; - DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 && - LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE && - LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE && - LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE && - LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE && - LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE && - (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < UINT32_MAX); - assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE); - switch ((enum liveliness_changed_data_extra) data->extra) - { - case LIVELINESS_CHANGED_ADD_ALIVE: - st->alive_count++; - st->alive_count_change++; - break; - case LIVELINESS_CHANGED_ADD_NOT_ALIVE: - st->not_alive_count++; - st->not_alive_count_change++; - break; - case LIVELINESS_CHANGED_REMOVE_NOT_ALIVE: - st->not_alive_count--; - st->not_alive_count_change--; - break; - case LIVELINESS_CHANGED_REMOVE_ALIVE: - st->alive_count--; - st->alive_count_change--; - break; - case LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE: - st->alive_count--; - st->alive_count_change--; - st->not_alive_count++; - st->not_alive_count_change++; - break; - case LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE: - st->not_alive_count--; - st->not_alive_count_change--; - st->alive_count++; - st->alive_count_change++; - break; - } - st->last_publication_handle = data->handle; - invoke = (lst->on_liveliness_changed != 0); - reset[0] = &st->alive_count_change; - reset[1] = &st->not_alive_count_change; + case DDS_LIVELINESS_CHANGED_STATUS_ID: + status_cb_liveliness_changed (rd, data, enabled); break; - } - case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: { - struct dds_subscription_matched_status * const st = vst = &rd->m_subscription_matched_status; - if (data->add) { - st->total_count++; - st->total_count_change++; - st->current_count++; - st->current_count_change++; - } else { - st->current_count--; - st->current_count_change--; - } - st->last_publication_handle = data->handle; - invoke = (lst->on_subscription_matched != 0); - reset[0] = &st->total_count_change; - reset[1] = &st->current_count_change; + case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: + status_cb_subscription_matched (rd, data, enabled); break; - } case DDS_DATA_ON_READERS_STATUS_ID: case DDS_DATA_AVAILABLE_STATUS_ID: case DDS_INCONSISTENT_TOPIC_STATUS_ID: @@ -324,29 +366,6 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) assert (0); } - const uint32_t enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)); - if (!enabled) - { - /* Don't invoke listeners or set status flag if masked */ - } - else if (invoke) - { - rd->m_entity.m_cb_pending_count++; - rd->m_entity.m_cb_count++; - ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); - dds_entity_invoke_listener (&rd->m_entity, status_id, vst); - ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); - rd->m_entity.m_cb_count--; - rd->m_entity.m_cb_pending_count--; - *reset[0] = 0; - if (reset[1]) - *reset[1] = 0; - } - else - { - dds_entity_status_set (&rd->m_entity, (status_mask_t) (1u << status_id)); - } - ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond); ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); } diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index b539e22..166b2da 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -46,12 +46,71 @@ static dds_return_t dds_writer_status_validate (uint32_t mask) return (mask & ~DDS_WRITER_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK; } -/* - Handler function for all write related status callbacks. May trigger status - condition or call listener on writer. Each entity has a mask of - supported status types. According to DDS specification, if listener is called - then status conditions is not triggered. -*/ +static void update_offered_deadline_missed (struct dds_offered_deadline_missed_status * __restrict st, struct dds_offered_deadline_missed_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_instance_handle = data->handle; + st->total_count++; + // always incrementing st->total_count_change, then copying into *lst is + // a bit more than minimal work, but this guarantees the correct value + // also when enabling a listeners after some events have occurred + // + // (same line of reasoning for all of them) + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_offered_incompatible_qos (struct dds_offered_incompatible_qos_status * __restrict st, struct dds_offered_incompatible_qos_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_policy_id = data->extra; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_liveliness_lost (struct dds_liveliness_lost_status * __restrict st, struct dds_liveliness_lost_status * __restrict lst, const status_cb_data_t *data) +{ + (void) data; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_publication_matched (struct dds_publication_matched_status * __restrict st, struct dds_publication_matched_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_subscription_handle = data->handle; + if (data->add) { + st->total_count++; + st->current_count++; + st->total_count_change++; + st->current_count_change++; + } else { + st->current_count--; + st->current_count_change--; + } + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + st->current_count_change = 0; + } +} + +STATUS_CB_IMPL (writer, offered_deadline_missed, OFFERED_DEADLINE_MISSED) +STATUS_CB_IMPL (writer, offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS) +STATUS_CB_IMPL (writer, liveliness_lost, LIVELINESS_LOST) +STATUS_CB_IMPL (writer, publication_matched, PUBLICATION_MATCHED) void dds_writer_status_cb (void *entity, const struct status_cb_data *data) { @@ -69,67 +128,27 @@ void dds_writer_status_cb (void *entity, const struct status_cb_data *data) return; } - struct dds_listener const * const lst = &wr->m_entity.m_listener; - enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; - bool invoke = false; - void *vst = NULL; - int32_t *reset[2] = { NULL, NULL }; - /* FIXME: why wait if no listener is set? */ ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); while (wr->m_entity.m_cb_count > 0) ddsrt_cond_wait (&wr->m_entity.m_observers_cond, &wr->m_entity.m_observers_lock); - /* Reset the status for possible Listener call. - * When a listener is not called, the status will be set (again). */ - dds_entity_status_reset (&wr->m_entity, (status_mask_t) (1u << status_id)); - - /* Update status metrics. */ + const enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; + const bool enabled = (ddsrt_atomic_ld32 (&wr->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)) != 0; switch (status_id) { - case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: { - struct dds_offered_deadline_missed_status * const st = vst = &wr->m_offered_deadline_missed_status; - st->total_count++; - st->total_count_change++; - st->last_instance_handle = data->handle; - invoke = (lst->on_offered_deadline_missed != 0); - reset[0] = &st->total_count_change; + case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: + status_cb_offered_deadline_missed (wr, data, enabled); break; - } - case DDS_LIVELINESS_LOST_STATUS_ID: { - struct dds_liveliness_lost_status * const st = vst = &wr->m_liveliness_lost_status; - st->total_count++; - st->total_count_change++; - invoke = (lst->on_liveliness_lost != 0); - reset[0] = &st->total_count_change; + case DDS_LIVELINESS_LOST_STATUS_ID: + status_cb_liveliness_lost (wr, data, enabled); break; - } - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_offered_incompatible_qos_status * const st = vst = &wr->m_offered_incompatible_qos_status; - st->total_count++; - st->total_count_change++; - st->last_policy_id = data->extra; - invoke = (lst->on_offered_incompatible_qos != 0); - reset[0] = &st->total_count_change; + case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: + status_cb_offered_incompatible_qos (wr, data, enabled); break; - } - case DDS_PUBLICATION_MATCHED_STATUS_ID: { - struct dds_publication_matched_status * const st = vst = &wr->m_publication_matched_status; - if (data->add) { - st->total_count++; - st->total_count_change++; - st->current_count++; - st->current_count_change++; - } else { - st->current_count--; - st->current_count_change--; - } - wr->m_publication_matched_status.last_subscription_handle = data->handle; - invoke = (lst->on_publication_matched != 0); - reset[0] = &st->total_count_change; - reset[1] = &st->current_count_change; + case DDS_PUBLICATION_MATCHED_STATUS_ID: + status_cb_publication_matched (wr, data, enabled); break; - } case DDS_DATA_AVAILABLE_STATUS_ID: case DDS_INCONSISTENT_TOPIC_STATUS_ID: case DDS_SAMPLE_LOST_STATUS_ID: @@ -142,29 +161,6 @@ void dds_writer_status_cb (void *entity, const struct status_cb_data *data) assert (0); } - const uint32_t enabled = (ddsrt_atomic_ld32 (&wr->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)); - if (enabled == 0) - { - /* Don't invoke listeners or set status flag if masked */ - } - else if (invoke) - { - wr->m_entity.m_cb_pending_count++; - wr->m_entity.m_cb_count++; - ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); - dds_entity_invoke_listener (&wr->m_entity, status_id, vst); - ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); - wr->m_entity.m_cb_count--; - wr->m_entity.m_cb_pending_count--; - *reset[0] = 0; - if (reset[1]) - *reset[1] = 0; - } - else - { - dds_entity_status_set (&wr->m_entity, (status_mask_t) (1u << status_id)); - } - ddsrt_cond_broadcast (&wr->m_entity.m_observers_cond); ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); } diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c index e6ad070..b1bd6ae 100644 --- a/src/core/ddsc/tests/liveliness.c +++ b/src/core/ddsc/tests/liveliness.c @@ -67,10 +67,6 @@ static void liveliness_init(void) static void liveliness_fini(void) { - dds_delete(g_sub_subscriber); - dds_delete(g_pub_publisher); - dds_delete(g_sub_participant); - dds_delete(g_pub_participant); dds_delete(g_sub_domain); dds_delete(g_pub_domain); } @@ -978,31 +974,17 @@ static unsigned get_and_check_status (dds_entity_t reader, dds_entity_t writer_a return result; } -static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader) +static void setup_reader_zero_or_one (dds_entity_t *reader, dds_entity_t *writer_active, dds_entity_t *waitset, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader, struct liveliness_changed_state *listener_state) { - const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 50 : 5; dds_entity_t pub_topic; dds_entity_t sub_topic = 0; - dds_entity_t reader; - dds_entity_t writer_active; /* writing */ dds_entity_t writer_inactive; /* not writing, liveliness should still toggle */ - dds_entity_t waitset; - dds_listener_t *listener; dds_qos_t *qos; dds_return_t rc; - struct dds_liveliness_changed_status lstatus; char name[100]; - Space_Type1 sample = {1, 0, 0}; - struct liveliness_changed_state listener_state = { - .weirdness = false, - .w0_handle = 0, - .w0_alive = 0, - .w0_not_alive = 0, - }; - ddsrt_mutex_init (&listener_state.lock); - waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE); - CU_ASSERT_FATAL(waitset > 0); + *waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE); + CU_ASSERT_FATAL(*waitset > 0); qos = dds_create_qos(); CU_ASSERT_FATAL(qos != NULL); @@ -1020,22 +1002,22 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines /* reader liveliness is always automatic/infinity */ dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, qos, NULL); - CU_ASSERT_FATAL(reader > 0); - rc = dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); + *reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, qos, NULL); + CU_ASSERT_FATAL(*reader > 0); + rc = dds_set_status_mask(*reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_waitset_attach(waitset, reader, reader); + rc = dds_waitset_attach(*waitset, *reader, *reader); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); /* writer liveliness varies */ dds_qset_liveliness(qos, lkind, ldur); - writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); - CU_ASSERT_FATAL(writer_active > 0); + *writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); + CU_ASSERT_FATAL(*writer_active > 0); writer_inactive = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); CU_ASSERT_FATAL(writer_inactive > 0); - rc = dds_set_status_mask(writer_active, DDS_PUBLICATION_MATCHED_STATUS); + rc = dds_set_status_mask(*writer_active, DDS_PUBLICATION_MATCHED_STATUS); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_waitset_attach(waitset, writer_active, writer_active); + rc = dds_waitset_attach(*waitset, *writer_active, *writer_active); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); dds_delete_qos(qos); @@ -1046,32 +1028,80 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines bool initial_sample_written = false, initial_sample_received = false; do { - status = get_and_check_status (reader, writer_active); + status = get_and_check_status (*reader, *writer_active); if (status & STATUS_DATA) initial_sample_received = true; if (status & STATUS_SYNCED && !initial_sample_written) { - rc = dds_write(writer_active, &sample); + Space_Type1 sample = {1, 0, 0}; + rc = dds_write(*writer_active, &sample); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); initial_sample_written = true; } if (status & STATUS_SYNCED && initial_sample_received) break; - rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(5)); + rc = dds_waitset_wait(*waitset, NULL, 0, DDS_SECS(5)); if (rc < 1) { - get_and_check_status (reader, writer_active); + get_and_check_status (*reader, *writer_active); CU_ASSERT_FATAL(rc >= 1); } } while (1); /* switch to using a listener: those allow us to observe all events */ - listener = dds_create_listener (&listener_state); + dds_listener_t *listener; + listener = dds_create_listener (listener_state); dds_lset_liveliness_changed(listener, liveliness_changed_listener); - rc = dds_set_listener (reader, listener); + rc = dds_set_listener (*reader, listener); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); dds_delete_listener (listener); +} + +static void wait_for_notalive (dds_entity_t reader, struct liveliness_changed_state *listener_state) +{ + struct dds_liveliness_changed_status lstatus; + int retries = 100; + dds_return_t rc; + rc = dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); + + ddsrt_mutex_lock (&listener_state->lock); + printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state->w0_handle, listener_state->w0_alive, listener_state->w0_not_alive); + CU_ASSERT_FATAL(!listener_state->weirdness); + CU_ASSERT_FATAL(listener_state->w0_handle != 0); + while (listener_state->w0_not_alive < listener_state->w0_alive && retries-- > 0) + { + ddsrt_mutex_unlock(&listener_state->lock); + dds_sleepfor(DDS_MSECS(10)); + rc = dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + ddsrt_mutex_lock(&listener_state->lock); + } + + printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); + printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state->w0_handle, listener_state->w0_alive, listener_state->w0_not_alive); + CU_ASSERT_FATAL(listener_state->w0_alive == listener_state->w0_not_alive); + ddsrt_mutex_unlock(&listener_state->lock); +} + +static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader) +{ + const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 50 : 5; + dds_entity_t reader; + dds_entity_t writer_active; + dds_entity_t waitset; + dds_return_t rc; + Space_Type1 sample = {1, 0, 0}; + struct liveliness_changed_state listener_state = { + .weirdness = false, + .w0_handle = 0, + .w0_alive = 0, + .w0_not_alive = 0, + }; + ddsrt_mutex_init (&listener_state.lock); + setup_reader_zero_or_one (&reader, &writer_active, &waitset, lkind, ldur, remote_reader, &listener_state); /* write as fast as possible - we don't expect this to cause the writers to gain and lose liveliness once for each sample, but it should have @@ -1102,28 +1132,9 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines CU_ASSERT_FATAL(cnt == nsamples + 1); /* transition to not alive is not necessarily immediate */ + wait_for_notalive (reader, &listener_state); + { - int retries = 100; - rc = dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); - - ddsrt_mutex_lock (&listener_state.lock); - printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive); - CU_ASSERT_FATAL(!listener_state.weirdness); - CU_ASSERT_FATAL(listener_state.w0_handle != 0); - while (listener_state.w0_not_alive < listener_state.w0_alive && retries-- > 0) - { - ddsrt_mutex_unlock(&listener_state.lock); - dds_sleepfor(DDS_MSECS(10)); - rc = dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - ddsrt_mutex_lock(&listener_state.lock); - } - - printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); - printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive); - CU_ASSERT_FATAL(listener_state.w0_alive == listener_state.w0_not_alive); uint32_t exp_alive; if (sleep == 0) exp_alive = 1; /* if not sleeping, it's ok if the transition happens only once */ @@ -1131,28 +1142,15 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines exp_alive = nsamples / 3; /* if sleeping briefly, expect the a good number of writes to toggle liveliness */ else exp_alive = nsamples - nsamples / 5; /* if sleeping, expect the vast majority (80%) of the writes to toggle liveliness */ + ddsrt_mutex_lock(&listener_state.lock); printf("check w0_alive %d >= %d\n", listener_state.w0_alive, exp_alive); CU_ASSERT_FATAL(listener_state.w0_alive >= exp_alive); ddsrt_mutex_unlock(&listener_state.lock); } - /* cleanup */ rc = dds_delete(waitset); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_delete(reader); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_delete(writer_active); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_delete(writer_inactive); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - if (remote_reader) - { - rc = dds_delete(sub_topic); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - } - rc = dds_delete(pub_topic); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - + dds_set_listener (reader, NULL); // listener must not be invoked anymore ddsrt_mutex_destroy(&listener_state.lock); } @@ -1182,3 +1180,80 @@ CU_Test(ddsc_liveliness, lease_duration_zero_or_one, .init = liveliness_init, .f } } } + +struct getstatus_thread_arg { + dds_entity_t rd; + ddsrt_atomic_uint32_t stop; +}; + +static uint32_t getstatus_thread (void *varg) +{ + struct getstatus_thread_arg *arg = varg; + while (!ddsrt_atomic_ld32 (&arg->stop)) + { + dds_liveliness_changed_status_t s; + dds_return_t rc; + rc = dds_get_liveliness_changed_status (arg->rd, &s); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + /* change counts must be 0 because the listener gets invoked all the time */ + if (s.alive_count_change != 0 || s.not_alive_count_change != 0) + { + ddsrt_atomic_st32 (&arg->stop, 1); + return 0; + } + } + return 1; +} + +CU_Test(ddsc_liveliness, listener_vs_getstatus, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30) +{ + dds_entity_t reader; + dds_entity_t writer_active; + dds_entity_t waitset; + dds_return_t rc; + Space_Type1 sample = {1, 0, 0}; + struct liveliness_changed_state listener_state = { + .weirdness = false, + .w0_handle = 0, + .w0_alive = 0, + .w0_not_alive = 0, + }; + ddsrt_mutex_init (&listener_state.lock); + setup_reader_zero_or_one (&reader, &writer_active, &waitset, DDS_LIVELINESS_MANUAL_BY_TOPIC, 1, false, &listener_state); + + /* start a thread that continually calls dds_get_liveliness_changed_status: that resets + the change counters, but that activity should not be visible in the listener argument */ + ddsrt_thread_t tid; + ddsrt_threadattr_t tattr; + ddsrt_threadattr_init(&tattr); + struct getstatus_thread_arg targ = { .rd = reader, .stop = DDSRT_ATOMIC_UINT32_INIT (0) }; + rc = ddsrt_thread_create(&tid, "getstatus", &tattr, getstatus_thread, &targ); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + /* write as fast as possible - we don't expect this to cause the writers + to gain and lose liveliness once for each sample, but it should have + become alive at least once and fall back to not alive afterward */ + dds_time_t tnow = dds_time (); + const dds_time_t tend = tnow + DDS_SECS(3); + while (tnow < tend && !ddsrt_atomic_ld32 (&targ.stop)) + { + rc = dds_write(writer_active, &sample); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + tnow = dds_time (); + } + + ddsrt_atomic_st32 (&targ.stop, 1); + uint32_t get_status_ok; + rc = ddsrt_thread_join (tid, &get_status_ok); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + CU_ASSERT_FATAL (get_status_ok != 0); + + /* transition to not alive is not necessarily immediate */ + wait_for_notalive (reader, &listener_state); + + rc = dds_delete(waitset); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + dds_set_listener (reader, NULL); // listener must not be invoked anymore + ddsrt_mutex_destroy(&listener_state.lock); +}