diff --git a/src/core/ddsc/src/dds_read.c b/src/core/ddsc/src/dds_read.c index b4ede31..09139f5 100644 --- a/src/core/ddsc/src/dds_read.c +++ b/src/core/ddsc/src/dds_read.c @@ -21,50 +21,6 @@ #include "dds/ddsi/q_globals.h" #include "dds/ddsi/ddsi_sertopic.h" -static dds_return_t dds_read_lock (dds_entity_t hdl, dds_reader **reader, dds_readcond **condition, bool only_reader) -{ - dds_return_t rc; - dds_entity *entity, *parent_entity; - if ((rc = dds_entity_lock (hdl, DDS_KIND_DONTCARE, &entity)) != DDS_RETCODE_OK) - { - return rc; - } - else if (dds_entity_kind (entity) == DDS_KIND_READER) - { - *reader = (dds_reader *) entity; - *condition = NULL; - return DDS_RETCODE_OK; - } - else if (only_reader) - { - dds_entity_unlock (entity); - return DDS_RETCODE_ILLEGAL_OPERATION; - } - else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) - { - dds_entity_unlock (entity); - return DDS_RETCODE_ILLEGAL_OPERATION; - } - else if ((rc = dds_entity_lock (entity->m_parent->m_hdllink.hdl, DDS_KIND_READER, &parent_entity)) != DDS_RETCODE_OK) - { - dds_entity_unlock (entity); - return rc; - } - else - { - *reader = (dds_reader *) parent_entity; - *condition = (dds_readcond *) entity; - return DDS_RETCODE_OK; - } -} - -static void dds_read_unlock (dds_reader *reader, dds_readcond *condition) -{ - dds_entity_unlock (&reader->m_entity); - if (condition) - dds_entity_unlock (&condition->m_entity); -} - /* dds_read_impl: Core read/take function. Usually maxs is size of buf and si into which samples/status are written, when set to zero is special case @@ -76,6 +32,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, { struct thread_state1 * const ts1 = lookup_thread_state (); dds_return_t ret = DDS_RETCODE_OK; + struct dds_entity *entity; struct dds_reader *rd; struct dds_readcond *cond; unsigned nodata_cleanups = 0; @@ -87,14 +44,28 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, return DDS_RETCODE_BAD_PARAMETER; thread_state_awake (ts1); - if ((ret = dds_read_lock (reader_or_condition, &rd, &cond, only_reader)) != DDS_RETCODE_OK) + + if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) { goto fail_awake; + } else if (dds_entity_kind (entity) == DDS_KIND_READER) { + rd = (dds_reader *) entity; + cond = NULL; + } else if (only_reader) { + ret = DDS_RETCODE_ILLEGAL_OPERATION; + goto fail_awake_pinned; + } else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) { + ret = DDS_RETCODE_ILLEGAL_OPERATION; + goto fail_awake_pinned; + } else { + rd = (dds_reader *) entity->m_parent; + cond = (dds_readcond *) entity; + } if (hand != DDS_HANDLE_NIL) { if (ddsi_tkmap_find_by_id (gv.m_tkmap, hand) == NULL) { ret = DDS_RETCODE_PRECONDITION_NOT_MET; - goto fail_awake_lock; + goto fail_awake_pinned; } } @@ -102,6 +73,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, if (buf[0] == NULL) { /* Allocate, use or reallocate loan cached on reader */ + ddsrt_mutex_lock (&rd->m_entity.m_mutex); if (rd->m_loan_out) { ddsi_sertopic_realloc_samples (buf, rd->m_topic->m_stopic, NULL, 0, maxs); @@ -130,6 +102,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, rd->m_loan_out = true; nodata_cleanups |= NC_CLEAR_LOAN_OUT; } + ddsrt_mutex_unlock (&rd->m_entity.m_mutex); } /* read/take resets data available status -- must reset before reading because @@ -150,14 +123,16 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, rd->m_loan */ if (ret <= 0 && nodata_cleanups) { + ddsrt_mutex_lock (&rd->m_entity.m_mutex); if (nodata_cleanups & NC_CLEAR_LOAN_OUT) rd->m_loan_out = false; if (nodata_cleanups & NC_FREE_BUF) ddsi_sertopic_free_samples (rd->m_topic->m_stopic, buf[0], maxs, DDS_FREE_ALL); if (nodata_cleanups & NC_RESET_BUF) buf[0] = NULL; + ddsrt_mutex_unlock (&rd->m_entity.m_mutex); } - dds_read_unlock (rd, cond); + dds_entity_unpin (entity); thread_state_asleep (ts1); return ret; @@ -165,8 +140,8 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition, #undef NC_FREE_BUF #undef NC_RESET_BUF -fail_awake_lock: - dds_read_unlock (rd, cond); +fail_awake_pinned: + dds_entity_unpin (entity); fail_awake: thread_state_asleep (ts1); return ret; @@ -177,7 +152,7 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio struct thread_state1 * const ts1 = lookup_thread_state (); dds_return_t ret = DDS_RETCODE_OK; struct dds_reader *rd; - struct dds_readcond *cond; + struct dds_entity *entity; assert (take); assert (buf); @@ -187,19 +162,31 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio (void)take; thread_state_awake (ts1); - if ((ret = dds_read_lock (reader_or_condition, &rd, &cond, false)) == DDS_RETCODE_OK) - { - /* read/take resets data available status -- must reset before reading because - the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */ - dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); - /* reset DATA_ON_READERS status on subscriber after successful read/take */ - assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER); - dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); - - ret = dds_rhc_takecdr (rd->m_rd->rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand); - dds_read_unlock (rd, cond); + if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) { + goto fail_awake; + } else if (dds_entity_kind (entity) == DDS_KIND_READER) { + rd = (dds_reader *) entity; + } else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) { + dds_entity_unpin (entity); + ret = DDS_RETCODE_ILLEGAL_OPERATION; + goto fail_awake; + } else { + rd = (dds_reader *) entity->m_parent; } + + /* read/take resets data available status -- must reset before reading because + the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */ + dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); + + /* reset DATA_ON_READERS status on subscriber after successful read/take */ + assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER); + dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); + + ret = dds_rhc_takecdr (rd->m_rd->rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand); + dds_entity_unpin (entity); + +fail_awake: thread_state_asleep (ts1); return ret; } @@ -471,27 +458,36 @@ dds_return_t dds_return_loan (dds_entity_t reader_or_condition, void **buf, int3 { const struct ddsi_sertopic *st; dds_reader *rd; - dds_readcond *cond; + dds_entity *entity; dds_return_t ret = DDS_RETCODE_OK; if (buf == NULL || (*buf == NULL && bufsz > 0)) return DDS_RETCODE_BAD_PARAMETER; - if ((ret = dds_read_lock(reader_or_condition, &rd, &cond, false)) != DDS_RETCODE_OK) + if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) { return ret; + } else if (dds_entity_kind (entity) == DDS_KIND_READER) { + rd = (dds_reader *) entity; + } else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) { + dds_entity_unpin (entity); + return DDS_RETCODE_ILLEGAL_OPERATION; + } else { + rd = (dds_reader *) entity->m_parent; + } st = rd->m_topic->m_stopic; for (int32_t i = 0; i < bufsz; i++) ddsi_sertopic_free_sample (st, buf[i], DDS_FREE_CONTENTS); /* If possible return loan buffer to reader */ + ddsrt_mutex_lock (&rd->m_entity.m_mutex); if (rd->m_loan != 0 && (buf[0] == rd->m_loan)) { rd->m_loan_out = false; ddsi_sertopic_zero_samples (st, rd->m_loan, rd->m_loan_size); buf[0] = NULL; } - - dds_read_unlock(rd, cond); + ddsrt_mutex_unlock (&rd->m_entity.m_mutex); + dds_entity_unpin (entity); return DDS_RETCODE_OK; } diff --git a/src/tools/ddsperf/ddsperf.c b/src/tools/ddsperf/ddsperf.c index 8d51e5b..4524751 100644 --- a/src/tools/ddsperf/ddsperf.c +++ b/src/tools/ddsperf/ddsperf.c @@ -1886,7 +1886,7 @@ int main (int argc, char *argv[]) have a reader or will never really be receiving data) */ struct subthread_arg subarg_data, subarg_ping, subarg_pong; init_eseq_admin (&eseq_admin, nkeyvals); - subthread_arg_init (&subarg_data, rd_data, 100); + subthread_arg_init (&subarg_data, rd_data, 1000); subthread_arg_init (&subarg_ping, rd_ping, 100); subthread_arg_init (&subarg_pong, rd_pong, 100); uint32_t (*subthread_func) (void *arg) = 0;