remove dds_rhc_fini abomination

It was called strangely early in the deleting of the reader, even before
the DDSI reader was no longer being accessed by other threads.  The
immediate and obvious problem is that it resets the pointer to the
upper-layer entity even though this can still be dereferenced in
invoking a listener, resulting in a crash.

Secondly it blocks until there are no listener calls any more (and the
resetting of that pointer will prevent any further listener
invocations), but a similar piece of logic is already in generic entity
code that resets the mask and then waits for all listener invocations to
complete.  Having both is a problem.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-04-16 17:09:08 +02:00 committed by eboasson
parent d6edfada81
commit 0202039f61
6 changed files with 13 additions and 48 deletions

View file

@ -26,7 +26,6 @@ struct proxy_writer_info;
DDS_EXPORT struct rhc *dds_rhc_new (dds_reader *reader, const struct ddsi_sertopic *topic);
DDS_EXPORT void dds_rhc_free (struct rhc *rhc);
DDS_EXPORT void dds_rhc_fini (struct rhc *rhc);
DDS_EXPORT uint32_t dds_rhc_lock_samples (struct rhc *rhc);

View file

@ -273,7 +273,6 @@ void ddsi_plugin_init (void)
ddsi_plugin.builtintopic_write = dds__builtin_write;
ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free;
ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini;
ddsi_plugin.rhc_plugin.rhc_store_fn = dds_rhc_store;
ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr;
ddsi_plugin.rhc_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership;

View file

@ -160,14 +160,20 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
status on the subscriber; secondly it is the only one for which
overhead really matters. Otherwise, it is pretty much like
dds_reader_status_cb. */
struct dds_listener const * const lst = &rd->m_entity.m_listener;
dds_entity * const sub = rd->m_entity.m_parent;
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
if (!(rd->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS))
{
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
return;
}
while (rd->m_entity.m_cb_count > 0)
ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock);
rd->m_entity.m_cb_count++;
struct dds_listener const * const lst = &rd->m_entity.m_listener;
dds_entity * const sub = rd->m_entity.m_parent;
if (lst->on_data_on_readers)
{
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);

View file

@ -277,7 +277,6 @@ struct rhc {
/* Instance/Sample maximums from resource limits QoS */
ddsrt_atomic_uint32_t n_cbs; /* # callbacks in progress */
int32_t max_instances; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */
int32_t max_samples; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */
int32_t max_samples_per_instance; /* FIXME: probably better as uint32_t with MAX_UINT32 for unlimited */
@ -631,20 +630,6 @@ void dds_rhc_free (struct rhc *rhc)
ddsrt_free (rhc);
}
void dds_rhc_fini (struct rhc * rhc)
{
ddsrt_mutex_lock (&rhc->lock);
rhc->reader = NULL;
ddsrt_mutex_unlock (&rhc->lock);
/* Wait for all callbacks to complete */
while (ddsrt_atomic_ld32 (&rhc->n_cbs) > 0)
{
dds_sleepfor (DDS_MSECS (1));
}
}
static void init_trigger_info_cmn_nonmatch (struct trigger_info_cmn *info)
{
info->qminst = ~0u;
@ -1462,17 +1447,10 @@ bool dds_rhc_store (struct rhc * __restrict rhc, const struct proxy_writer_info
if (rhc->reader)
{
if (notify_data_available && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS))
{
ddsrt_atomic_inc32 (&rhc->n_cbs);
if (notify_data_available)
dds_reader_data_available_cb (rhc->reader);
ddsrt_atomic_dec32 (&rhc->n_cbs);
}
if (trigger_waitsets)
{
dds_entity_status_signal(&rhc->reader->m_entity);
}
dds_entity_status_signal (&rhc->reader->m_entity);
}
return delivered;
@ -1489,13 +1467,8 @@ error_or_nochange:
/* Make any reader status callback */
if (cb_data.raw_status_id >= 0 && rhc->reader && rhc->reader->m_entity.m_status_enable)
{
ddsrt_atomic_inc32 (&rhc->n_cbs);
if (cb_data.raw_status_id >= 0 && rhc->reader)
dds_reader_status_cb (&rhc->reader->m_entity, &cb_data);
ddsrt_atomic_dec32 (&rhc->n_cbs);
}
return delivered;
}
@ -1575,17 +1548,10 @@ void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writ
if (rhc->reader)
{
if (notify_data_available && (rhc->reader->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS))
{
ddsrt_atomic_inc32 (&rhc->n_cbs);
if (notify_data_available)
dds_reader_data_available_cb (rhc->reader);
ddsrt_atomic_dec32 (&rhc->n_cbs);
}
if (trigger_waitsets)
{
dds_entity_status_signal(&rhc->reader->m_entity);
}
dds_entity_status_signal (&rhc->reader->m_entity);
}
}

View file

@ -30,7 +30,6 @@ struct proxy_writer_info
struct ddsi_rhc_plugin
{
void (*rhc_free_fn) (struct rhc *rhc);
void (*rhc_fini_fn) (struct rhc *rhc);
bool (*rhc_store_fn)
(struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info,
struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk);

View file

@ -3386,10 +3386,6 @@ int delete_reader (const struct nn_guid *guid)
DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) - unknown guid\n", PGUID (*guid));
return Q_ERR_UNKNOWN_ENTITY;
}
if (rd->rhc)
{
(ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc);
}
DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid));
ddsi_plugin.builtintopic_write (&rd->e, now(), false);
ephash_remove_reader_guid (rd);