diff --git a/src/core/ddsc/src/dds_read.c b/src/core/ddsc/src/dds_read.c index 6a9dc73..114e6ff 100644 --- a/src/core/ddsc/src/dds_read.c +++ b/src/core/ddsc/src/dds_read.c @@ -159,11 +159,14 @@ dds_read_impl( /* read/take resets data available status -- must reset before reading because the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */ + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); /* reset DATA_ON_READERS status on subscriber after successful read/take */ if (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER) { dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); + if (take) { ret = (dds_return_t)dds_rhc_take(rd->m_rd->rhc, lock, buf, si, maxs, mask, hand, cond); } else { @@ -206,11 +209,14 @@ dds_readcdr_impl( if (rc == DDS_RETCODE_OK) { /* read/take resets data available status -- must reset before reading because the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */ + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); /* reset DATA_ON_READERS status on subscriber after successful read/take */ if (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER) { dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); + ret = dds_rhc_takecdr ( rd->m_rd->rhc, lock, buf, si, maxs, diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 10951e2..6ce57b9 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -643,11 +643,13 @@ dds_get_subscription_matched_status ( if (status) { *status = rd->m_subscription_matched_status; } + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); if (rd->m_entity.m_status_enable & DDS_SUBSCRIPTION_MATCHED_STATUS) { rd->m_subscription_matched_status.total_count_change = 0; rd->m_subscription_matched_status.current_count_change = 0; dds_entity_status_reset(&rd->m_entity, DDS_SUBSCRIPTION_MATCHED_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); dds_reader_unlock(rd); fail: return ret; @@ -672,11 +674,13 @@ dds_get_liveliness_changed_status ( if (status) { *status = rd->m_liveliness_changed_status; } + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); if (rd->m_entity.m_status_enable & DDS_LIVELINESS_CHANGED_STATUS) { rd->m_liveliness_changed_status.alive_count_change = 0; rd->m_liveliness_changed_status.not_alive_count_change = 0; dds_entity_status_reset(&rd->m_entity, DDS_LIVELINESS_CHANGED_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); dds_reader_unlock(rd); fail: return ret; @@ -700,11 +704,13 @@ dds_return_t dds_get_sample_rejected_status ( if (status) { *status = rd->m_sample_rejected_status; } + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); if (rd->m_entity.m_status_enable & DDS_SAMPLE_REJECTED_STATUS) { rd->m_sample_rejected_status.total_count_change = 0; rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED; dds_entity_status_reset(&rd->m_entity, DDS_SAMPLE_REJECTED_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); dds_reader_unlock(rd); fail: return ret; @@ -728,10 +734,12 @@ dds_return_t dds_get_sample_lost_status ( if (status) { *status = rd->m_sample_lost_status; } + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); if (rd->m_entity.m_status_enable & DDS_SAMPLE_LOST_STATUS) { rd->m_sample_lost_status.total_count_change = 0; dds_entity_status_reset(&rd->m_entity, DDS_SAMPLE_LOST_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); dds_reader_unlock(rd); fail: return ret; @@ -755,10 +763,12 @@ dds_return_t dds_get_requested_deadline_missed_status ( if (status) { *status = rd->m_requested_deadline_missed_status; } + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); if (rd->m_entity.m_status_enable & DDS_REQUESTED_DEADLINE_MISSED_STATUS) { rd->m_requested_deadline_missed_status.total_count_change = 0; dds_entity_status_reset(&rd->m_entity, DDS_REQUESTED_DEADLINE_MISSED_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); dds_reader_unlock(rd); fail: return ret; @@ -782,10 +792,12 @@ dds_return_t dds_get_requested_incompatible_qos_status ( if (status) { *status = rd->m_requested_incompatible_qos_status; } + ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); if (rd->m_entity.m_status_enable & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS) { rd->m_requested_incompatible_qos_status.total_count_change = 0; dds_entity_status_reset(&rd->m_entity, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS); } + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); dds_reader_unlock(rd); fail: return ret; diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 5d62fdf..f834f00 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -669,10 +669,12 @@ dds_get_inconsistent_topic_status( if (status) { *status = t->m_inconsistent_topic_status; } + ddsrt_mutex_lock (&t->m_entity.m_observers_lock); if (t->m_entity.m_status_enable & DDS_INCONSISTENT_TOPIC_STATUS) { t->m_inconsistent_topic_status.total_count_change = 0; dds_entity_status_reset(&t->m_entity, DDS_INCONSISTENT_TOPIC_STATUS); } + ddsrt_mutex_unlock (&t->m_entity.m_observers_lock); dds_topic_unlock(t); fail: return ret; diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index cd83d3e..f98d70c 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -518,11 +518,13 @@ dds_get_publication_matched_status ( if (status) { *status = wr->m_publication_matched_status; } + ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); if (wr->m_entity.m_status_enable & DDS_PUBLICATION_MATCHED_STATUS) { wr->m_publication_matched_status.total_count_change = 0; wr->m_publication_matched_status.current_count_change = 0; dds_entity_status_reset(&wr->m_entity, DDS_PUBLICATION_MATCHED_STATUS); } + ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); dds_writer_unlock(wr); fail: return ret; @@ -547,10 +549,12 @@ dds_get_liveliness_lost_status ( if (status) { *status = wr->m_liveliness_lost_status; } + ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); if (wr->m_entity.m_status_enable & DDS_LIVELINESS_LOST_STATUS) { wr->m_liveliness_lost_status.total_count_change = 0; dds_entity_status_reset(&wr->m_entity, DDS_LIVELINESS_LOST_STATUS); } + ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); dds_writer_unlock(wr); fail: return ret; @@ -575,10 +579,12 @@ dds_get_offered_deadline_missed_status( if (status) { *status = wr->m_offered_deadline_missed_status; } + ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); if (wr->m_entity.m_status_enable & DDS_OFFERED_DEADLINE_MISSED_STATUS) { wr->m_offered_deadline_missed_status.total_count_change = 0; dds_entity_status_reset(&wr->m_entity, DDS_OFFERED_DEADLINE_MISSED_STATUS); } + ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); dds_writer_unlock(wr); fail: return ret; @@ -603,10 +609,12 @@ dds_get_offered_incompatible_qos_status ( if (status) { *status = wr->m_offered_incompatible_qos_status; } + ddsrt_mutex_lock (&wr->m_entity.m_observers_lock); if (wr->m_entity.m_status_enable & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS) { wr->m_offered_incompatible_qos_status.total_count_change = 0; dds_entity_status_reset(&wr->m_entity, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS); } + ddsrt_mutex_unlock (&wr->m_entity.m_observers_lock); dds_writer_unlock(wr); fail: return ret;