diff --git a/src/core/ddsc/src/dds__entity.h b/src/core/ddsc/src/dds__entity.h index 23025c3..c94ff3e 100644 --- a/src/core/ddsc/src/dds__entity.h +++ b/src/core/ddsc/src/dds__entity.h @@ -84,7 +84,40 @@ DDS_EXPORT inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) { DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status); -DDS_EXPORT void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst); +union dds_status_union { + dds_inconsistent_topic_status_t inconsistent_topic; + dds_liveliness_changed_status_t liveliness_changed; + dds_liveliness_lost_status_t liveliness_lost; + dds_offered_deadline_missed_status_t offered_deadline_missed; + dds_offered_incompatible_qos_status_t offered_incompatible_qos; + dds_publication_matched_status_t publication_matched; + dds_requested_deadline_missed_status_t requested_deadline_missed; + dds_requested_incompatible_qos_status_t requested_incompatible_qos; + dds_sample_lost_status_t sample_lost; + dds_sample_rejected_status_t sample_rejected; + dds_subscription_matched_status_t subscription_matched; +}; + +#define STATUS_CB_IMPL(entity_kind_, name_, NAME_) \ + static void status_cb_##name_ (dds_##entity_kind_ * const e, const status_cb_data_t *data, bool enabled) \ + { \ + struct dds_listener const * const listener = &e->m_entity.m_listener; \ + const bool invoke = (listener->on_##name_ != 0) && enabled; \ + union dds_status_union lst; \ + update_##name_ (&e->m_##name_##_status, invoke ? &lst.name_ : NULL, data); \ + if (invoke) { \ + dds_entity_status_reset (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \ + e->m_entity.m_cb_pending_count++; \ + e->m_entity.m_cb_count++; \ + ddsrt_mutex_unlock (&e->m_entity.m_observers_lock); \ + listener->on_##name_ (e->m_entity.m_hdllink.hdl, lst.name_, listener->on_##name_##_arg); \ + ddsrt_mutex_lock (&e->m_entity.m_observers_lock); \ + e->m_entity.m_cb_count--; \ + e->m_entity.m_cb_pending_count--; \ + } else if (enabled) { \ + dds_entity_status_set (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \ + } \ + } DDS_EXPORT dds_participant *dds_entity_participant (const dds_entity *e); DDS_EXPORT const ddsi_guid_t *dds_entity_participant_guid (const dds_entity *e); diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 1946ec0..7b64357 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -940,77 +940,6 @@ dds_return_t dds_get_listener (dds_entity_t entity, dds_listener_t *listener) } } -void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst) -{ - struct dds_listener const * const lst = &entity->m_listener; - switch (which) - { - case DDS_INCONSISTENT_TOPIC_STATUS_ID: { - struct dds_inconsistent_topic_status const * const st = vst; - lst->on_inconsistent_topic (entity->m_hdllink.hdl, *st, lst->on_inconsistent_topic_arg); - break; - } - case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: { - struct dds_requested_deadline_missed_status const * const st = vst; - lst->on_requested_deadline_missed (entity->m_hdllink.hdl, *st, lst->on_requested_deadline_missed_arg); - break; - } - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_requested_incompatible_qos_status const * const st = vst; - lst->on_requested_incompatible_qos (entity->m_hdllink.hdl, *st, lst->on_requested_incompatible_qos_arg); - break; - } - case DDS_SAMPLE_LOST_STATUS_ID: { - struct dds_sample_lost_status const * const st = vst; - lst->on_sample_lost (entity->m_hdllink.hdl, *st, lst->on_sample_lost_arg); - break; - } - case DDS_SAMPLE_REJECTED_STATUS_ID: { - struct dds_sample_rejected_status const * const st = vst; - lst->on_sample_rejected (entity->m_hdllink.hdl, *st, lst->on_sample_rejected_arg); - break; - } - case DDS_LIVELINESS_CHANGED_STATUS_ID: { - struct dds_liveliness_changed_status const * const st = vst; - lst->on_liveliness_changed (entity->m_hdllink.hdl, *st, lst->on_liveliness_changed_arg); - break; - } - case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: { - struct dds_subscription_matched_status const * const st = vst; - lst->on_subscription_matched (entity->m_hdllink.hdl, *st, lst->on_subscription_matched_arg); - break; - } - case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: { - struct dds_offered_deadline_missed_status const * const st = vst; - lst->on_offered_deadline_missed (entity->m_hdllink.hdl, *st, lst->on_offered_deadline_missed_arg); - break; - } - case DDS_LIVELINESS_LOST_STATUS_ID: { - struct dds_liveliness_lost_status const * const st = vst; - lst->on_liveliness_lost (entity->m_hdllink.hdl, *st, lst->on_liveliness_lost_arg); - break; - } - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_offered_incompatible_qos_status const * const st = vst; - lst->on_offered_incompatible_qos (entity->m_hdllink.hdl, *st, lst->on_offered_incompatible_qos_arg); - break; - } - case DDS_PUBLICATION_MATCHED_STATUS_ID: { - struct dds_publication_matched_status const * const st = vst; - lst->on_publication_matched (entity->m_hdllink.hdl, *st, lst->on_publication_matched_arg); - break; - } - case DDS_DATA_AVAILABLE_STATUS_ID: { - lst->on_data_available (entity->m_hdllink.hdl, lst->on_data_available_arg); - break; - } - case DDS_DATA_ON_READERS_STATUS_ID: { - lst->on_data_on_readers (entity->m_hdllink.hdl, lst->on_data_on_readers_arg); - break; - } - } -} - static void clear_status_with_listener (struct dds_entity *e) { const struct dds_listener *lst = &e->m_listener; diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 4c1f999..0901b0c 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -174,6 +174,137 @@ void dds_reader_data_available_cb (struct dds_reader *rd) ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); } +static void update_requested_deadline_missed (struct dds_requested_deadline_missed_status * __restrict st, struct dds_requested_deadline_missed_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_instance_handle = data->handle; + st->total_count++; + // always incrementing st->total_count_change, then copying into *lst is + // a bit more than minimal work, but this guarantees the correct value + // also when enabling a listeners after some events have occurred + // + // (same line of reasoning for all of them) + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_requested_incompatible_qos (struct dds_requested_incompatible_qos_status * __restrict st, struct dds_requested_incompatible_qos_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_policy_id = data->extra; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_sample_lost (struct dds_sample_lost_status * __restrict st, struct dds_sample_lost_status * __restrict lst, const status_cb_data_t *data) +{ + (void) data; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_sample_rejected (struct dds_sample_rejected_status * __restrict st, struct dds_sample_rejected_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_reason = data->extra; + st->last_instance_handle = data->handle; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_liveliness_changed (struct dds_liveliness_changed_status * __restrict st, struct dds_liveliness_changed_status * __restrict lst, const status_cb_data_t *data) +{ + DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 && + LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE && + LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE && + LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE && + LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE && + LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE && + (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < UINT32_MAX); + assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE); + st->last_publication_handle = data->handle; + switch ((enum liveliness_changed_data_extra) data->extra) + { + case LIVELINESS_CHANGED_ADD_ALIVE: + st->alive_count++; + st->alive_count_change++; + break; + case LIVELINESS_CHANGED_ADD_NOT_ALIVE: + st->not_alive_count++; + st->not_alive_count_change++; + break; + case LIVELINESS_CHANGED_REMOVE_NOT_ALIVE: + st->not_alive_count--; + st->not_alive_count_change--; + break; + case LIVELINESS_CHANGED_REMOVE_ALIVE: + st->alive_count--; + st->alive_count_change--; + break; + case LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE: + st->alive_count--; + st->alive_count_change--; + st->not_alive_count++; + st->not_alive_count_change++; + break; + case LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE: + st->not_alive_count--; + st->not_alive_count_change--; + st->alive_count++; + st->alive_count_change++; + break; + } + if (lst != NULL) + { + *lst = *st; + st->alive_count_change = 0; + st->not_alive_count_change = 0; + } +} + +static void update_subscription_matched (struct dds_subscription_matched_status * __restrict st, struct dds_subscription_matched_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_publication_handle = data->handle; + if (data->add) { + st->total_count++; + st->current_count++; + st->total_count_change++; + st->current_count_change++; + } else { + st->current_count--; + st->current_count_change--; + } + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + st->current_count_change = 0; + } +} + +STATUS_CB_IMPL (reader, requested_deadline_missed, REQUESTED_DEADLINE_MISSED) +STATUS_CB_IMPL (reader, requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS) +STATUS_CB_IMPL (reader, sample_lost, SAMPLE_LOST) +STATUS_CB_IMPL (reader, sample_rejected, SAMPLE_REJECTED) +STATUS_CB_IMPL (reader, liveliness_changed, LIVELINESS_CHANGED) +STATUS_CB_IMPL (reader, subscription_matched, SUBSCRIPTION_MATCHED) + void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) { dds_reader * const rd = ventity; @@ -190,15 +321,6 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) return; } - struct dds_listener const * const lst = &rd->m_entity.m_listener; - enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; - bool invoke = false; - void *vst = NULL; - int32_t *reset[2] = { NULL, NULL }; - - /* DATA_AVAILABLE is handled by dds_reader_data_available_cb */ - assert (status_id != DDS_DATA_AVAILABLE_STATUS_ID); - /* Serialize listener invocations -- it is somewhat sad to do this, but then it may also be unreasonable to expect the application to handle concurrent invocations of a single listener. The benefit @@ -212,108 +334,28 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) while (rd->m_entity.m_cb_count > 0) ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock); - /* Update status metrics. */ - switch (status_id) { - case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: { - struct dds_requested_deadline_missed_status * const st = vst = &rd->m_requested_deadline_missed_status; - st->last_instance_handle = data->handle; - st->total_count++; - st->total_count_change++; - invoke = (lst->on_requested_deadline_missed != 0); - reset[0] = &st->total_count_change; + const enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; + const bool enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)) != 0; + switch (status_id) + { + case DDS_REQUESTED_DEADLINE_MISSED_STATUS_ID: + status_cb_requested_deadline_missed (rd, data, enabled); break; - } - case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_requested_incompatible_qos_status * const st = vst = &rd->m_requested_incompatible_qos_status; - st->total_count++; - st->total_count_change++; - st->last_policy_id = data->extra; - invoke = (lst->on_requested_incompatible_qos != 0); - reset[0] = &st->total_count_change; + case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID: + status_cb_requested_incompatible_qos (rd, data, enabled); break; - } - case DDS_SAMPLE_LOST_STATUS_ID: { - struct dds_sample_lost_status * const st = vst = &rd->m_sample_lost_status; - st->total_count++; - st->total_count_change++; - invoke = (lst->on_sample_lost != 0); - reset[0] = &st->total_count_change; + case DDS_SAMPLE_LOST_STATUS_ID: + status_cb_sample_lost (rd, data, enabled); break; - } - case DDS_SAMPLE_REJECTED_STATUS_ID: { - struct dds_sample_rejected_status * const st = vst = &rd->m_sample_rejected_status; - st->total_count++; - st->total_count_change++; - st->last_reason = data->extra; - st->last_instance_handle = data->handle; - invoke = (lst->on_sample_rejected != 0); - reset[0] = &st->total_count_change; + case DDS_SAMPLE_REJECTED_STATUS_ID: + status_cb_sample_rejected (rd, data, enabled); break; - } - case DDS_LIVELINESS_CHANGED_STATUS_ID: { - struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status; - DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 && - LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE && - LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE && - LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE && - LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE && - LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE && - (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < UINT32_MAX); - assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE); - switch ((enum liveliness_changed_data_extra) data->extra) - { - case LIVELINESS_CHANGED_ADD_ALIVE: - st->alive_count++; - st->alive_count_change++; - break; - case LIVELINESS_CHANGED_ADD_NOT_ALIVE: - st->not_alive_count++; - st->not_alive_count_change++; - break; - case LIVELINESS_CHANGED_REMOVE_NOT_ALIVE: - st->not_alive_count--; - st->not_alive_count_change--; - break; - case LIVELINESS_CHANGED_REMOVE_ALIVE: - st->alive_count--; - st->alive_count_change--; - break; - case LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE: - st->alive_count--; - st->alive_count_change--; - st->not_alive_count++; - st->not_alive_count_change++; - break; - case LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE: - st->not_alive_count--; - st->not_alive_count_change--; - st->alive_count++; - st->alive_count_change++; - break; - } - st->last_publication_handle = data->handle; - invoke = (lst->on_liveliness_changed != 0); - reset[0] = &st->alive_count_change; - reset[1] = &st->not_alive_count_change; + case DDS_LIVELINESS_CHANGED_STATUS_ID: + status_cb_liveliness_changed (rd, data, enabled); break; - } - case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: { - struct dds_subscription_matched_status * const st = vst = &rd->m_subscription_matched_status; - if (data->add) { - st->total_count++; - st->total_count_change++; - st->current_count++; - st->current_count_change++; - } else { - st->current_count--; - st->current_count_change--; - } - st->last_publication_handle = data->handle; - invoke = (lst->on_subscription_matched != 0); - reset[0] = &st->total_count_change; - reset[1] = &st->current_count_change; + case DDS_SUBSCRIPTION_MATCHED_STATUS_ID: + status_cb_subscription_matched (rd, data, enabled); break; - } case DDS_DATA_ON_READERS_STATUS_ID: case DDS_DATA_AVAILABLE_STATUS_ID: case DDS_INCONSISTENT_TOPIC_STATUS_ID: @@ -324,29 +366,6 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) assert (0); } - const uint32_t enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)); - if (!enabled) - { - /* Don't invoke listeners or set status flag if masked */ - } - else if (invoke) - { - rd->m_entity.m_cb_pending_count++; - rd->m_entity.m_cb_count++; - ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); - dds_entity_invoke_listener (&rd->m_entity, status_id, vst); - ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); - rd->m_entity.m_cb_count--; - rd->m_entity.m_cb_pending_count--; - *reset[0] = 0; - if (reset[1]) - *reset[1] = 0; - } - else - { - dds_entity_status_set (&rd->m_entity, (status_mask_t) (1u << status_id)); - } - ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond); ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); } diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index b539e22..166b2da 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -46,12 +46,71 @@ static dds_return_t dds_writer_status_validate (uint32_t mask) return (mask & ~DDS_WRITER_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK; } -/* - Handler function for all write related status callbacks. May trigger status - condition or call listener on writer. Each entity has a mask of - supported status types. According to DDS specification, if listener is called - then status conditions is not triggered. -*/ +static void update_offered_deadline_missed (struct dds_offered_deadline_missed_status * __restrict st, struct dds_offered_deadline_missed_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_instance_handle = data->handle; + st->total_count++; + // always incrementing st->total_count_change, then copying into *lst is + // a bit more than minimal work, but this guarantees the correct value + // also when enabling a listeners after some events have occurred + // + // (same line of reasoning for all of them) + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_offered_incompatible_qos (struct dds_offered_incompatible_qos_status * __restrict st, struct dds_offered_incompatible_qos_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_policy_id = data->extra; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_liveliness_lost (struct dds_liveliness_lost_status * __restrict st, struct dds_liveliness_lost_status * __restrict lst, const status_cb_data_t *data) +{ + (void) data; + st->total_count++; + st->total_count_change++; + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + } +} + +static void update_publication_matched (struct dds_publication_matched_status * __restrict st, struct dds_publication_matched_status * __restrict lst, const status_cb_data_t *data) +{ + st->last_subscription_handle = data->handle; + if (data->add) { + st->total_count++; + st->current_count++; + st->total_count_change++; + st->current_count_change++; + } else { + st->current_count--; + st->current_count_change--; + } + if (lst != NULL) + { + *lst = *st; + st->total_count_change = 0; + st->current_count_change = 0; + } +} + +STATUS_CB_IMPL (writer, offered_deadline_missed, OFFERED_DEADLINE_MISSED) +STATUS_CB_IMPL (writer, offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS) +STATUS_CB_IMPL (writer, liveliness_lost, LIVELINESS_LOST) +STATUS_CB_IMPL (writer, publication_matched, PUBLICATION_MATCHED) void dds_writer_status_cb (void *entity, const struct status_cb_data *data) { @@ -69,67 +128,27 @@ void dds_writer_status_cb (void *entity, const struct status_cb_data *data) return; } - struct dds_listener const * const lst = &wr->m_entity.m_listener; - enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; - bool invoke = false; - void *vst = NULL; - int32_t *reset[2] = { NULL, NULL }; - /* FIXME: why wait if no listener is set? */ ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); while (wr->m_entity.m_cb_count > 0) ddsrt_cond_wait (&wr->m_entity.m_observers_cond, &wr->m_entity.m_observers_lock); - /* Reset the status for possible Listener call. - * When a listener is not called, the status will be set (again). */ - dds_entity_status_reset (&wr->m_entity, (status_mask_t) (1u << status_id)); - - /* Update status metrics. */ + const enum dds_status_id status_id = (enum dds_status_id) data->raw_status_id; + const bool enabled = (ddsrt_atomic_ld32 (&wr->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)) != 0; switch (status_id) { - case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: { - struct dds_offered_deadline_missed_status * const st = vst = &wr->m_offered_deadline_missed_status; - st->total_count++; - st->total_count_change++; - st->last_instance_handle = data->handle; - invoke = (lst->on_offered_deadline_missed != 0); - reset[0] = &st->total_count_change; + case DDS_OFFERED_DEADLINE_MISSED_STATUS_ID: + status_cb_offered_deadline_missed (wr, data, enabled); break; - } - case DDS_LIVELINESS_LOST_STATUS_ID: { - struct dds_liveliness_lost_status * const st = vst = &wr->m_liveliness_lost_status; - st->total_count++; - st->total_count_change++; - invoke = (lst->on_liveliness_lost != 0); - reset[0] = &st->total_count_change; + case DDS_LIVELINESS_LOST_STATUS_ID: + status_cb_liveliness_lost (wr, data, enabled); break; - } - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: { - struct dds_offered_incompatible_qos_status * const st = vst = &wr->m_offered_incompatible_qos_status; - st->total_count++; - st->total_count_change++; - st->last_policy_id = data->extra; - invoke = (lst->on_offered_incompatible_qos != 0); - reset[0] = &st->total_count_change; + case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID: + status_cb_offered_incompatible_qos (wr, data, enabled); break; - } - case DDS_PUBLICATION_MATCHED_STATUS_ID: { - struct dds_publication_matched_status * const st = vst = &wr->m_publication_matched_status; - if (data->add) { - st->total_count++; - st->total_count_change++; - st->current_count++; - st->current_count_change++; - } else { - st->current_count--; - st->current_count_change--; - } - wr->m_publication_matched_status.last_subscription_handle = data->handle; - invoke = (lst->on_publication_matched != 0); - reset[0] = &st->total_count_change; - reset[1] = &st->current_count_change; + case DDS_PUBLICATION_MATCHED_STATUS_ID: + status_cb_publication_matched (wr, data, enabled); break; - } case DDS_DATA_AVAILABLE_STATUS_ID: case DDS_INCONSISTENT_TOPIC_STATUS_ID: case DDS_SAMPLE_LOST_STATUS_ID: @@ -142,29 +161,6 @@ void dds_writer_status_cb (void *entity, const struct status_cb_data *data) assert (0); } - const uint32_t enabled = (ddsrt_atomic_ld32 (&wr->m_entity.m_status.m_status_and_mask) & ((1u << status_id) << SAM_ENABLED_SHIFT)); - if (enabled == 0) - { - /* Don't invoke listeners or set status flag if masked */ - } - else if (invoke) - { - wr->m_entity.m_cb_pending_count++; - wr->m_entity.m_cb_count++; - ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); - dds_entity_invoke_listener (&wr->m_entity, status_id, vst); - ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); - wr->m_entity.m_cb_count--; - wr->m_entity.m_cb_pending_count--; - *reset[0] = 0; - if (reset[1]) - *reset[1] = 0; - } - else - { - dds_entity_status_set (&wr->m_entity, (status_mask_t) (1u << status_id)); - } - ddsrt_cond_broadcast (&wr->m_entity.m_observers_cond); ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); } diff --git a/src/core/ddsc/tests/liveliness.c b/src/core/ddsc/tests/liveliness.c index e6ad070..b1bd6ae 100644 --- a/src/core/ddsc/tests/liveliness.c +++ b/src/core/ddsc/tests/liveliness.c @@ -67,10 +67,6 @@ static void liveliness_init(void) static void liveliness_fini(void) { - dds_delete(g_sub_subscriber); - dds_delete(g_pub_publisher); - dds_delete(g_sub_participant); - dds_delete(g_pub_participant); dds_delete(g_sub_domain); dds_delete(g_pub_domain); } @@ -978,31 +974,17 @@ static unsigned get_and_check_status (dds_entity_t reader, dds_entity_t writer_a return result; } -static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader) +static void setup_reader_zero_or_one (dds_entity_t *reader, dds_entity_t *writer_active, dds_entity_t *waitset, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader, struct liveliness_changed_state *listener_state) { - const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 50 : 5; dds_entity_t pub_topic; dds_entity_t sub_topic = 0; - dds_entity_t reader; - dds_entity_t writer_active; /* writing */ dds_entity_t writer_inactive; /* not writing, liveliness should still toggle */ - dds_entity_t waitset; - dds_listener_t *listener; dds_qos_t *qos; dds_return_t rc; - struct dds_liveliness_changed_status lstatus; char name[100]; - Space_Type1 sample = {1, 0, 0}; - struct liveliness_changed_state listener_state = { - .weirdness = false, - .w0_handle = 0, - .w0_alive = 0, - .w0_not_alive = 0, - }; - ddsrt_mutex_init (&listener_state.lock); - waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE); - CU_ASSERT_FATAL(waitset > 0); + *waitset = dds_create_waitset(DDS_CYCLONEDDS_HANDLE); + CU_ASSERT_FATAL(*waitset > 0); qos = dds_create_qos(); CU_ASSERT_FATAL(qos != NULL); @@ -1020,22 +1002,22 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines /* reader liveliness is always automatic/infinity */ dds_qset_liveliness(qos, DDS_LIVELINESS_AUTOMATIC, DDS_INFINITY); - reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, qos, NULL); - CU_ASSERT_FATAL(reader > 0); - rc = dds_set_status_mask(reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); + *reader = dds_create_reader(remote_reader ? g_sub_participant : g_pub_participant, remote_reader ? sub_topic : pub_topic, qos, NULL); + CU_ASSERT_FATAL(*reader > 0); + rc = dds_set_status_mask(*reader, DDS_LIVELINESS_CHANGED_STATUS | DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_waitset_attach(waitset, reader, reader); + rc = dds_waitset_attach(*waitset, *reader, *reader); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); /* writer liveliness varies */ dds_qset_liveliness(qos, lkind, ldur); - writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); - CU_ASSERT_FATAL(writer_active > 0); + *writer_active = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); + CU_ASSERT_FATAL(*writer_active > 0); writer_inactive = dds_create_writer(g_pub_participant, pub_topic, qos, NULL); CU_ASSERT_FATAL(writer_inactive > 0); - rc = dds_set_status_mask(writer_active, DDS_PUBLICATION_MATCHED_STATUS); + rc = dds_set_status_mask(*writer_active, DDS_PUBLICATION_MATCHED_STATUS); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_waitset_attach(waitset, writer_active, writer_active); + rc = dds_waitset_attach(*waitset, *writer_active, *writer_active); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); dds_delete_qos(qos); @@ -1046,32 +1028,80 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines bool initial_sample_written = false, initial_sample_received = false; do { - status = get_and_check_status (reader, writer_active); + status = get_and_check_status (*reader, *writer_active); if (status & STATUS_DATA) initial_sample_received = true; if (status & STATUS_SYNCED && !initial_sample_written) { - rc = dds_write(writer_active, &sample); + Space_Type1 sample = {1, 0, 0}; + rc = dds_write(*writer_active, &sample); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); initial_sample_written = true; } if (status & STATUS_SYNCED && initial_sample_received) break; - rc = dds_waitset_wait(waitset, NULL, 0, DDS_SECS(5)); + rc = dds_waitset_wait(*waitset, NULL, 0, DDS_SECS(5)); if (rc < 1) { - get_and_check_status (reader, writer_active); + get_and_check_status (*reader, *writer_active); CU_ASSERT_FATAL(rc >= 1); } } while (1); /* switch to using a listener: those allow us to observe all events */ - listener = dds_create_listener (&listener_state); + dds_listener_t *listener; + listener = dds_create_listener (listener_state); dds_lset_liveliness_changed(listener, liveliness_changed_listener); - rc = dds_set_listener (reader, listener); + rc = dds_set_listener (*reader, listener); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); dds_delete_listener (listener); +} + +static void wait_for_notalive (dds_entity_t reader, struct liveliness_changed_state *listener_state) +{ + struct dds_liveliness_changed_status lstatus; + int retries = 100; + dds_return_t rc; + rc = dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); + + ddsrt_mutex_lock (&listener_state->lock); + printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state->w0_handle, listener_state->w0_alive, listener_state->w0_not_alive); + CU_ASSERT_FATAL(!listener_state->weirdness); + CU_ASSERT_FATAL(listener_state->w0_handle != 0); + while (listener_state->w0_not_alive < listener_state->w0_alive && retries-- > 0) + { + ddsrt_mutex_unlock(&listener_state->lock); + dds_sleepfor(DDS_MSECS(10)); + rc = dds_get_liveliness_changed_status(reader, &lstatus); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + ddsrt_mutex_lock(&listener_state->lock); + } + + printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); + printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state->w0_handle, listener_state->w0_alive, listener_state->w0_not_alive); + CU_ASSERT_FATAL(listener_state->w0_alive == listener_state->w0_not_alive); + ddsrt_mutex_unlock(&listener_state->lock); +} + +static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_liveliness_kind_t lkind, dds_duration_t ldur, bool remote_reader) +{ + const uint32_t nsamples = (sleep <= DDS_MSECS(10)) ? 50 : 5; + dds_entity_t reader; + dds_entity_t writer_active; + dds_entity_t waitset; + dds_return_t rc; + Space_Type1 sample = {1, 0, 0}; + struct liveliness_changed_state listener_state = { + .weirdness = false, + .w0_handle = 0, + .w0_alive = 0, + .w0_not_alive = 0, + }; + ddsrt_mutex_init (&listener_state.lock); + setup_reader_zero_or_one (&reader, &writer_active, &waitset, lkind, ldur, remote_reader, &listener_state); /* write as fast as possible - we don't expect this to cause the writers to gain and lose liveliness once for each sample, but it should have @@ -1102,28 +1132,9 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines CU_ASSERT_FATAL(cnt == nsamples + 1); /* transition to not alive is not necessarily immediate */ + wait_for_notalive (reader, &listener_state); + { - int retries = 100; - rc = dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - printf("early liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); - - ddsrt_mutex_lock (&listener_state.lock); - printf("early w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive); - CU_ASSERT_FATAL(!listener_state.weirdness); - CU_ASSERT_FATAL(listener_state.w0_handle != 0); - while (listener_state.w0_not_alive < listener_state.w0_alive && retries-- > 0) - { - ddsrt_mutex_unlock(&listener_state.lock); - dds_sleepfor(DDS_MSECS(10)); - rc = dds_get_liveliness_changed_status(reader, &lstatus); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - ddsrt_mutex_lock(&listener_state.lock); - } - - printf("late liveliness changed status: alive %"PRId32" not-alive %"PRId32"\n", lstatus.alive_count, lstatus.not_alive_count); - printf("final w0 %"PRIx64" alive %"PRId32" not-alive %"PRId32"\n", listener_state.w0_handle, listener_state.w0_alive, listener_state.w0_not_alive); - CU_ASSERT_FATAL(listener_state.w0_alive == listener_state.w0_not_alive); uint32_t exp_alive; if (sleep == 0) exp_alive = 1; /* if not sleeping, it's ok if the transition happens only once */ @@ -1131,28 +1142,15 @@ static void lease_duration_zero_or_one_impl (dds_duration_t sleep, dds_livelines exp_alive = nsamples / 3; /* if sleeping briefly, expect the a good number of writes to toggle liveliness */ else exp_alive = nsamples - nsamples / 5; /* if sleeping, expect the vast majority (80%) of the writes to toggle liveliness */ + ddsrt_mutex_lock(&listener_state.lock); printf("check w0_alive %d >= %d\n", listener_state.w0_alive, exp_alive); CU_ASSERT_FATAL(listener_state.w0_alive >= exp_alive); ddsrt_mutex_unlock(&listener_state.lock); } - /* cleanup */ rc = dds_delete(waitset); CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_delete(reader); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_delete(writer_active); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - rc = dds_delete(writer_inactive); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - if (remote_reader) - { - rc = dds_delete(sub_topic); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - } - rc = dds_delete(pub_topic); - CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); - + dds_set_listener (reader, NULL); // listener must not be invoked anymore ddsrt_mutex_destroy(&listener_state.lock); } @@ -1182,3 +1180,80 @@ CU_Test(ddsc_liveliness, lease_duration_zero_or_one, .init = liveliness_init, .f } } } + +struct getstatus_thread_arg { + dds_entity_t rd; + ddsrt_atomic_uint32_t stop; +}; + +static uint32_t getstatus_thread (void *varg) +{ + struct getstatus_thread_arg *arg = varg; + while (!ddsrt_atomic_ld32 (&arg->stop)) + { + dds_liveliness_changed_status_t s; + dds_return_t rc; + rc = dds_get_liveliness_changed_status (arg->rd, &s); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + /* change counts must be 0 because the listener gets invoked all the time */ + if (s.alive_count_change != 0 || s.not_alive_count_change != 0) + { + ddsrt_atomic_st32 (&arg->stop, 1); + return 0; + } + } + return 1; +} + +CU_Test(ddsc_liveliness, listener_vs_getstatus, .init = liveliness_init, .fini = liveliness_fini, .timeout = 30) +{ + dds_entity_t reader; + dds_entity_t writer_active; + dds_entity_t waitset; + dds_return_t rc; + Space_Type1 sample = {1, 0, 0}; + struct liveliness_changed_state listener_state = { + .weirdness = false, + .w0_handle = 0, + .w0_alive = 0, + .w0_not_alive = 0, + }; + ddsrt_mutex_init (&listener_state.lock); + setup_reader_zero_or_one (&reader, &writer_active, &waitset, DDS_LIVELINESS_MANUAL_BY_TOPIC, 1, false, &listener_state); + + /* start a thread that continually calls dds_get_liveliness_changed_status: that resets + the change counters, but that activity should not be visible in the listener argument */ + ddsrt_thread_t tid; + ddsrt_threadattr_t tattr; + ddsrt_threadattr_init(&tattr); + struct getstatus_thread_arg targ = { .rd = reader, .stop = DDSRT_ATOMIC_UINT32_INIT (0) }; + rc = ddsrt_thread_create(&tid, "getstatus", &tattr, getstatus_thread, &targ); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + /* write as fast as possible - we don't expect this to cause the writers + to gain and lose liveliness once for each sample, but it should have + become alive at least once and fall back to not alive afterward */ + dds_time_t tnow = dds_time (); + const dds_time_t tend = tnow + DDS_SECS(3); + while (tnow < tend && !ddsrt_atomic_ld32 (&targ.stop)) + { + rc = dds_write(writer_active, &sample); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + tnow = dds_time (); + } + + ddsrt_atomic_st32 (&targ.stop, 1); + uint32_t get_status_ok; + rc = ddsrt_thread_join (tid, &get_status_ok); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + CU_ASSERT_FATAL (get_status_ok != 0); + + /* transition to not alive is not necessarily immediate */ + wait_for_notalive (reader, &listener_state); + + rc = dds_delete(waitset); + CU_ASSERT_FATAL(rc == DDS_RETCODE_OK); + + dds_set_listener (reader, NULL); // listener must not be invoked anymore + ddsrt_mutex_destroy(&listener_state.lock); +}