Remove superfluous lock/unlock pairs in read

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-06-12 16:27:25 +02:00 committed by eboasson
parent 647f7466d6
commit 12d2a82823
2 changed files with 62 additions and 66 deletions

View file

@ -21,50 +21,6 @@
#include "dds/ddsi/q_globals.h" #include "dds/ddsi/q_globals.h"
#include "dds/ddsi/ddsi_sertopic.h" #include "dds/ddsi/ddsi_sertopic.h"
static dds_return_t dds_read_lock (dds_entity_t hdl, dds_reader **reader, dds_readcond **condition, bool only_reader)
{
dds_return_t rc;
dds_entity *entity, *parent_entity;
if ((rc = dds_entity_lock (hdl, DDS_KIND_DONTCARE, &entity)) != DDS_RETCODE_OK)
{
return rc;
}
else if (dds_entity_kind (entity) == DDS_KIND_READER)
{
*reader = (dds_reader *) entity;
*condition = NULL;
return DDS_RETCODE_OK;
}
else if (only_reader)
{
dds_entity_unlock (entity);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY)
{
dds_entity_unlock (entity);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
else if ((rc = dds_entity_lock (entity->m_parent->m_hdllink.hdl, DDS_KIND_READER, &parent_entity)) != DDS_RETCODE_OK)
{
dds_entity_unlock (entity);
return rc;
}
else
{
*reader = (dds_reader *) parent_entity;
*condition = (dds_readcond *) entity;
return DDS_RETCODE_OK;
}
}
static void dds_read_unlock (dds_reader *reader, dds_readcond *condition)
{
dds_entity_unlock (&reader->m_entity);
if (condition)
dds_entity_unlock (&condition->m_entity);
}
/* /*
dds_read_impl: Core read/take function. Usually maxs is size of buf and si dds_read_impl: Core read/take function. Usually maxs is size of buf and si
into which samples/status are written, when set to zero is special case into which samples/status are written, when set to zero is special case
@ -76,6 +32,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
{ {
struct thread_state1 * const ts1 = lookup_thread_state (); struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret = DDS_RETCODE_OK; dds_return_t ret = DDS_RETCODE_OK;
struct dds_entity *entity;
struct dds_reader *rd; struct dds_reader *rd;
struct dds_readcond *cond; struct dds_readcond *cond;
unsigned nodata_cleanups = 0; unsigned nodata_cleanups = 0;
@ -87,14 +44,28 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
return DDS_RETCODE_BAD_PARAMETER; return DDS_RETCODE_BAD_PARAMETER;
thread_state_awake (ts1); thread_state_awake (ts1);
if ((ret = dds_read_lock (reader_or_condition, &rd, &cond, only_reader)) != DDS_RETCODE_OK)
if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) {
goto fail_awake; goto fail_awake;
} else if (dds_entity_kind (entity) == DDS_KIND_READER) {
rd = (dds_reader *) entity;
cond = NULL;
} else if (only_reader) {
ret = DDS_RETCODE_ILLEGAL_OPERATION;
goto fail_awake_pinned;
} else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) {
ret = DDS_RETCODE_ILLEGAL_OPERATION;
goto fail_awake_pinned;
} else {
rd = (dds_reader *) entity->m_parent;
cond = (dds_readcond *) entity;
}
if (hand != DDS_HANDLE_NIL) if (hand != DDS_HANDLE_NIL)
{ {
if (ddsi_tkmap_find_by_id (gv.m_tkmap, hand) == NULL) { if (ddsi_tkmap_find_by_id (gv.m_tkmap, hand) == NULL) {
ret = DDS_RETCODE_PRECONDITION_NOT_MET; ret = DDS_RETCODE_PRECONDITION_NOT_MET;
goto fail_awake_lock; goto fail_awake_pinned;
} }
} }
@ -102,6 +73,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
if (buf[0] == NULL) if (buf[0] == NULL)
{ {
/* Allocate, use or reallocate loan cached on reader */ /* Allocate, use or reallocate loan cached on reader */
ddsrt_mutex_lock (&rd->m_entity.m_mutex);
if (rd->m_loan_out) if (rd->m_loan_out)
{ {
ddsi_sertopic_realloc_samples (buf, rd->m_topic->m_stopic, NULL, 0, maxs); ddsi_sertopic_realloc_samples (buf, rd->m_topic->m_stopic, NULL, 0, maxs);
@ -130,6 +102,7 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
rd->m_loan_out = true; rd->m_loan_out = true;
nodata_cleanups |= NC_CLEAR_LOAN_OUT; nodata_cleanups |= NC_CLEAR_LOAN_OUT;
} }
ddsrt_mutex_unlock (&rd->m_entity.m_mutex);
} }
/* read/take resets data available status -- must reset before reading because /* read/take resets data available status -- must reset before reading because
@ -150,14 +123,16 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
rd->m_loan */ rd->m_loan */
if (ret <= 0 && nodata_cleanups) if (ret <= 0 && nodata_cleanups)
{ {
ddsrt_mutex_lock (&rd->m_entity.m_mutex);
if (nodata_cleanups & NC_CLEAR_LOAN_OUT) if (nodata_cleanups & NC_CLEAR_LOAN_OUT)
rd->m_loan_out = false; rd->m_loan_out = false;
if (nodata_cleanups & NC_FREE_BUF) if (nodata_cleanups & NC_FREE_BUF)
ddsi_sertopic_free_samples (rd->m_topic->m_stopic, buf[0], maxs, DDS_FREE_ALL); ddsi_sertopic_free_samples (rd->m_topic->m_stopic, buf[0], maxs, DDS_FREE_ALL);
if (nodata_cleanups & NC_RESET_BUF) if (nodata_cleanups & NC_RESET_BUF)
buf[0] = NULL; buf[0] = NULL;
ddsrt_mutex_unlock (&rd->m_entity.m_mutex);
} }
dds_read_unlock (rd, cond); dds_entity_unpin (entity);
thread_state_asleep (ts1); thread_state_asleep (ts1);
return ret; return ret;
@ -165,8 +140,8 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
#undef NC_FREE_BUF #undef NC_FREE_BUF
#undef NC_RESET_BUF #undef NC_RESET_BUF
fail_awake_lock: fail_awake_pinned:
dds_read_unlock (rd, cond); dds_entity_unpin (entity);
fail_awake: fail_awake:
thread_state_asleep (ts1); thread_state_asleep (ts1);
return ret; return ret;
@ -177,7 +152,7 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
struct thread_state1 * const ts1 = lookup_thread_state (); struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret = DDS_RETCODE_OK; dds_return_t ret = DDS_RETCODE_OK;
struct dds_reader *rd; struct dds_reader *rd;
struct dds_readcond *cond; struct dds_entity *entity;
assert (take); assert (take);
assert (buf); assert (buf);
@ -187,19 +162,31 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
(void)take; (void)take;
thread_state_awake (ts1); thread_state_awake (ts1);
if ((ret = dds_read_lock (reader_or_condition, &rd, &cond, false)) == DDS_RETCODE_OK)
{
/* read/take resets data available status -- must reset before reading because
the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */
dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
/* reset DATA_ON_READERS status on subscriber after successful read/take */ if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) {
assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER); goto fail_awake;
dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); } else if (dds_entity_kind (entity) == DDS_KIND_READER) {
rd = (dds_reader *) entity;
ret = dds_rhc_takecdr (rd->m_rd->rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand); } else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) {
dds_read_unlock (rd, cond); dds_entity_unpin (entity);
ret = DDS_RETCODE_ILLEGAL_OPERATION;
goto fail_awake;
} else {
rd = (dds_reader *) entity->m_parent;
} }
/* read/take resets data available status -- must reset before reading because
the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */
dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
/* reset DATA_ON_READERS status on subscriber after successful read/take */
assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER);
dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
ret = dds_rhc_takecdr (rd->m_rd->rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand);
dds_entity_unpin (entity);
fail_awake:
thread_state_asleep (ts1); thread_state_asleep (ts1);
return ret; return ret;
} }
@ -471,27 +458,36 @@ dds_return_t dds_return_loan (dds_entity_t reader_or_condition, void **buf, int3
{ {
const struct ddsi_sertopic *st; const struct ddsi_sertopic *st;
dds_reader *rd; dds_reader *rd;
dds_readcond *cond; dds_entity *entity;
dds_return_t ret = DDS_RETCODE_OK; dds_return_t ret = DDS_RETCODE_OK;
if (buf == NULL || (*buf == NULL && bufsz > 0)) if (buf == NULL || (*buf == NULL && bufsz > 0))
return DDS_RETCODE_BAD_PARAMETER; return DDS_RETCODE_BAD_PARAMETER;
if ((ret = dds_read_lock(reader_or_condition, &rd, &cond, false)) != DDS_RETCODE_OK) if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) {
return ret; return ret;
} else if (dds_entity_kind (entity) == DDS_KIND_READER) {
rd = (dds_reader *) entity;
} else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) {
dds_entity_unpin (entity);
return DDS_RETCODE_ILLEGAL_OPERATION;
} else {
rd = (dds_reader *) entity->m_parent;
}
st = rd->m_topic->m_stopic; st = rd->m_topic->m_stopic;
for (int32_t i = 0; i < bufsz; i++) for (int32_t i = 0; i < bufsz; i++)
ddsi_sertopic_free_sample (st, buf[i], DDS_FREE_CONTENTS); ddsi_sertopic_free_sample (st, buf[i], DDS_FREE_CONTENTS);
/* If possible return loan buffer to reader */ /* If possible return loan buffer to reader */
ddsrt_mutex_lock (&rd->m_entity.m_mutex);
if (rd->m_loan != 0 && (buf[0] == rd->m_loan)) if (rd->m_loan != 0 && (buf[0] == rd->m_loan))
{ {
rd->m_loan_out = false; rd->m_loan_out = false;
ddsi_sertopic_zero_samples (st, rd->m_loan, rd->m_loan_size); ddsi_sertopic_zero_samples (st, rd->m_loan, rd->m_loan_size);
buf[0] = NULL; buf[0] = NULL;
} }
ddsrt_mutex_unlock (&rd->m_entity.m_mutex);
dds_read_unlock(rd, cond); dds_entity_unpin (entity);
return DDS_RETCODE_OK; return DDS_RETCODE_OK;
} }

View file

@ -1886,7 +1886,7 @@ int main (int argc, char *argv[])
have a reader or will never really be receiving data) */ have a reader or will never really be receiving data) */
struct subthread_arg subarg_data, subarg_ping, subarg_pong; struct subthread_arg subarg_data, subarg_ping, subarg_pong;
init_eseq_admin (&eseq_admin, nkeyvals); init_eseq_admin (&eseq_admin, nkeyvals);
subthread_arg_init (&subarg_data, rd_data, 100); subthread_arg_init (&subarg_data, rd_data, 1000);
subthread_arg_init (&subarg_ping, rd_ping, 100); subthread_arg_init (&subarg_ping, rd_ping, 100);
subthread_arg_init (&subarg_pong, rd_pong, 100); subthread_arg_init (&subarg_pong, rd_pong, 100);
uint32_t (*subthread_func) (void *arg) = 0; uint32_t (*subthread_func) (void *arg) = 0;