diff --git a/src/core/ddsc/src/dds__rhc.h b/src/core/ddsc/src/dds__rhc.h index e01f7c4..fc08a7f 100644 --- a/src/core/ddsc/src/dds__rhc.h +++ b/src/core/ddsc/src/dds__rhc.h @@ -26,7 +26,6 @@ struct proxy_writer_info; DDS_EXPORT struct rhc *dds_rhc_new (dds_reader *reader, const struct ddsi_sertopic *topic); DDS_EXPORT void dds_rhc_free (struct rhc *rhc); -DDS_EXPORT void dds_rhc_fini (struct rhc *rhc); DDS_EXPORT uint32_t dds_rhc_lock_samples (struct rhc *rhc); diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 1efd81d..e6558d5 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -273,7 +273,6 @@ void ddsi_plugin_init (void) ddsi_plugin.builtintopic_write = dds__builtin_write; ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free; - ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini; ddsi_plugin.rhc_plugin.rhc_store_fn = dds_rhc_store; ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr; ddsi_plugin.rhc_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership; diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index dcc3289..b3f241b 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -160,14 +160,20 @@ void dds_reader_data_available_cb (struct dds_reader *rd) status on the subscriber; secondly it is the only one for which overhead really matters. Otherwise, it is pretty much like dds_reader_status_cb. */ - struct dds_listener const * const lst = &rd->m_entity.m_listener; - dds_entity * const sub = rd->m_entity.m_parent; ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); + if (!(rd->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) + { + ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); + return; + } + while (rd->m_entity.m_cb_count > 0) ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock); rd->m_entity.m_cb_count++; + struct dds_listener const * const lst = &rd->m_entity.m_listener; + dds_entity * const sub = rd->m_entity.m_parent; if (lst->on_data_on_readers) { ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index 7f3029b..4eb64dc 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -277,7 +277,6 @@ struct rhc { /* Instance/Sample maximums from resource limits QoS */ - ddsrt_atomic_uint32_t n_cbs; /* # callbacks in progress */ int32_t max_instances; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */ int32_t max_samples; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */ int32_t max_samples_per_instance; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */ @@ -631,20 +630,6 @@ void dds_rhc_free (struct rhc *rhc) ddsrt_free (rhc); } -void dds_rhc_fini (struct rhc * rhc) -{ - ddsrt_mutex_lock (&rhc->lock); - rhc->reader = NULL; - ddsrt_mutex_unlock (&rhc->lock); - - /* Wait for all callbacks to complete */ - - while (ddsrt_atomic_ld32 (&rhc->n_cbs) > 0) - { - dds_sleepfor (DDS_MSECS (1)); - } -} - static void init_trigger_info_cmn_nonmatch (struct trigger_info_cmn *info) { info->qminst = ~0u; @@ -1462,17 +1447,10 @@ bool dds_rhc_store (struct rhc * __restrict rhc, const struct proxy_writer_info if (rhc->reader) { - if (notify_data_available && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) - { - ddsrt_atomic_inc32 (&rhc->n_cbs); + if (notify_data_available) dds_reader_data_available_cb (rhc->reader); - ddsrt_atomic_dec32 (&rhc->n_cbs); - } - if (trigger_waitsets) - { - dds_entity_status_signal(&rhc->reader->m_entity); - } + dds_entity_status_signal (&rhc->reader->m_entity); } return delivered; @@ -1489,13 +1467,8 @@ error_or_nochange: /* Make any reader status callback */ - if (cb_data.raw_status_id >= 0 && rhc->reader && rhc->reader->m_entity.m_status_enable) - { - ddsrt_atomic_inc32 (&rhc->n_cbs); + if (cb_data.raw_status_id >= 0 && rhc->reader) dds_reader_status_cb (&rhc->reader->m_entity, &cb_data); - ddsrt_atomic_dec32 (&rhc->n_cbs); - } - return delivered; } @@ -1575,17 +1548,10 @@ void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writ if (rhc->reader) { - if (notify_data_available && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS)) - { - ddsrt_atomic_inc32 (&rhc->n_cbs); + if (notify_data_available) dds_reader_data_available_cb (rhc->reader); - ddsrt_atomic_dec32 (&rhc->n_cbs); - } - if (trigger_waitsets) - { - dds_entity_status_signal(&rhc->reader->m_entity); - } + dds_entity_status_signal (&rhc->reader->m_entity); } } diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_rhc_plugin.h b/src/core/ddsi/include/dds/ddsi/ddsi_rhc_plugin.h index 7f884fa..57c8813 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_rhc_plugin.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_rhc_plugin.h @@ -30,7 +30,6 @@ struct proxy_writer_info struct ddsi_rhc_plugin { void (*rhc_free_fn) (struct rhc *rhc); - void (*rhc_fini_fn) (struct rhc *rhc); bool (*rhc_store_fn) (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 685543b..3dffa18 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -3386,10 +3386,6 @@ int delete_reader (const struct nn_guid *guid) DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) - unknown guid\n", PGUID (*guid)); return Q_ERR_UNKNOWN_ENTITY; } - if (rd->rhc) - { - (ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc); - } DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid)); ddsi_plugin.builtintopic_write (&rd->e, now(), false); ephash_remove_reader_guid (rd);