nestable calls to thread_[state_]awake

Remove all the "if asleep then awake ..." stuff from the code by making
awake/asleep calls nestable, whereas before it "awake ; awake" really
meant a transition through "asleep".  This self-evidently necessitates
fixing those places where the old behaviour was relied on upon, but
fortunately those are few.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-04-07 18:47:27 +02:00 committed by eboasson
parent 7d281df24a
commit bb7373b90d
36 changed files with 450 additions and 663 deletions

View file

@ -65,12 +65,11 @@ void dds__builtin_init (void)
void dds__builtin_fini (void)
{
/* No more sources for builtin topic samples */
struct thread_state1 * const self = lookup_thread_state ();
thread_state_awake (self);
thread_state_awake (lookup_thread_state ());
delete_local_orphan_writer (builtintopic_writer_participant);
delete_local_orphan_writer (builtintopic_writer_publications);
delete_local_orphan_writer (builtintopic_writer_subscriptions);
thread_state_asleep (self);
thread_state_asleep (lookup_thread_state ());
ddsi_sertopic_unref (builtin_participant_topic);
ddsi_sertopic_unref (builtin_reader_topic);

View file

@ -117,12 +117,12 @@ static dds_handle_t dds_handle_create_int (struct dds_handle_link *link)
dds_handle_t dds_handle_create (struct dds_handle_link *link)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#endif
dds_handle_t ret;
#if USE_CHH
struct thread_state1 * const self = lookup_thread_state ();
const bool asleep = vtime_asleep_p (self->vtime);
if (asleep)
thread_state_awake (self);
thread_state_awake (ts1);
#endif
ddsrt_mutex_lock (&handles.lock);
if (handles.count == MAX_HANDLES)
@ -143,8 +143,7 @@ dds_handle_t dds_handle_create (struct dds_handle_link *link)
assert (ret > 0);
}
#if USE_CHH
if (asleep)
thread_state_asleep (self);
thread_state_asleep (ts1);
#endif
return ret;
}
@ -156,8 +155,10 @@ void dds_handle_close (struct dds_handle_link *link)
int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#endif
assert (ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED);
ddsrt_mutex_lock (&handles.lock);
if ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_COUNT_MASK) != 0)
{
@ -174,13 +175,9 @@ int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
}
}
#if USE_CHH
struct thread_state1 * const self = lookup_thread_state ();
const bool asleep = vtime_asleep_p (self->vtime);
if (asleep)
thread_state_awake (self);
thread_state_awake (ts1);
int x = ut_chhRemove (handles.ht, link);
if (asleep)
thread_state_asleep (self);
thread_state_asleep (ts1);
#else
int x = ut_hhRemove (handles.ht, link);
#endif
@ -194,6 +191,9 @@ int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
{
#if USE_CHH
struct thread_state1 * const ts1 = lookup_thread_state ();
#endif
struct dds_handle_link dummy = { .hdl = hdl };
int32_t rc;
/* it makes sense to check here for initialization: the first thing any operation
@ -208,10 +208,7 @@ int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
return DDS_RETCODE_PRECONDITION_NOT_MET;
#if USE_CHH
struct thread_state1 * const self = lookup_thread_state ();
const bool asleep = vtime_asleep_p (self->vtime);
if (asleep)
thread_state_awake (self);
thread_state_awake (ts1);
*link = ut_chhLookup (handles.ht, &dummy);
#else
ddsrt_mutex_lock (&handles.lock);
@ -234,10 +231,8 @@ int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
}
} while (!ddsrt_atomic_cas32 (&(*link)->cnt_flags, cnt_flags, cnt_flags + 1));
}
#if USE_CHH
if (asleep)
thread_state_asleep (self);
thread_state_asleep (ts1);
#else
ddsrt_mutex_unlock (&handles.lock);
#endif

View file

@ -26,7 +26,7 @@
#include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds/ddsi/q_servicelease.h"
#include "dds/ddsi/ddsi_threadmon.h"
#include "dds/ddsi/q_entity.h"
#include "dds/ddsi/q_config.h"
#include "dds/ddsi/q_gc.h"
@ -127,15 +127,15 @@ dds_init(dds_domainid_t domain)
/* Start monitoring the liveliness of all threads. */
if (!config.liveliness_monitoring)
gv.servicelease = NULL;
gv.threadmon = NULL;
else
{
gv.servicelease = nn_servicelease_new(0, 0);
if (gv.servicelease == NULL)
gv.threadmon = ddsi_threadmon_new ();
if (gv.threadmon == NULL)
{
DDS_ERROR("Failed to create a servicelease\n");
DDS_ERROR("Failed to create a thread monitor\n");
ret = DDS_ERRNO(DDS_RETCODE_OUT_OF_RESOURCES);
goto fail_servicelease_new;
goto fail_threadmon_new;
}
}
@ -162,11 +162,11 @@ dds_init(dds_domainid_t domain)
goto fail_rtps_start;
}
if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
if (gv.threadmon && ddsi_threadmon_start(gv.threadmon) < 0)
{
DDS_ERROR("Failed to start the servicelease\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
goto fail_servicelease_start;
goto fail_threadmon_start;
}
/* Set additional default participant properties */
@ -191,9 +191,9 @@ skip:
ddsrt_mutex_unlock(init_mutex);
return DDS_RETCODE_OK;
fail_servicelease_start:
if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease);
fail_threadmon_start:
if (gv.threadmon)
ddsi_threadmon_stop (gv.threadmon);
dds_handle_server_fini();
fail_handleserver:
rtps_stop ();
@ -201,12 +201,12 @@ fail_rtps_start:
dds__builtin_fini ();
rtps_fini ();
fail_rtps_init:
if (gv.servicelease)
if (gv.threadmon)
{
nn_servicelease_free (gv.servicelease);
gv.servicelease = NULL;
ddsi_threadmon_free (gv.threadmon);
gv.threadmon = NULL;
}
fail_servicelease_new:
fail_threadmon_new:
downgrade_main_thread ();
thread_states_fini();
fail_rtps_config:
@ -231,15 +231,15 @@ extern void dds_fini (void)
dds_global.m_init_count--;
if (dds_global.m_init_count == 0)
{
if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease);
if (gv.threadmon)
ddsi_threadmon_stop (gv.threadmon);
dds_handle_server_fini();
rtps_stop ();
dds__builtin_fini ();
rtps_fini ();
if (gv.servicelease)
nn_servicelease_free (gv.servicelease);
gv.servicelease = NULL;
if (gv.threadmon)
ddsi_threadmon_free (gv.threadmon);
gv.threadmon = NULL;
downgrade_main_thread ();
thread_states_fini ();

View file

@ -127,8 +127,7 @@ dds_register_instance(
dds_instance_handle_t *handle,
const void *data)
{
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
struct thread_state1 * const ts1 = lookup_thread_state ();
struct ddsi_tkmap_instance * inst;
dds_writer *wr;
dds_return_t ret;
@ -150,9 +149,7 @@ dds_register_instance(
ret = DDS_ERRNO(rc);
goto err;
}
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
inst = dds_instance_find (wr->m_topic, data, true);
if(inst != NULL){
*handle = inst->m_iid;
@ -161,9 +158,7 @@ dds_register_instance(
DDS_ERROR("Unable to create instance\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
dds_writer_unlock(wr);
err:
return ret;
@ -191,8 +186,7 @@ dds_unregister_instance_ts(
const void *data,
dds_time_t timestamp)
{
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret = DDS_RETCODE_OK;
dds_retcode_t rc;
bool autodispose = true;
@ -219,17 +213,13 @@ dds_unregister_instance_ts(
if (wr->m_entity.m_qos) {
dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose);
}
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
if (autodispose) {
dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL);
action |= DDS_WR_DISPOSE_BIT;
}
ret = dds_write_impl (wr, data, timestamp, action);
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
dds_writer_unlock(wr);
err:
return ret;
@ -241,8 +231,7 @@ dds_unregister_instance_ih_ts(
dds_instance_handle_t handle,
dds_time_t timestamp)
{
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret = DDS_RETCODE_OK;
dds_retcode_t rc;
bool autodispose = true;
@ -265,9 +254,7 @@ dds_unregister_instance_ih_ts(
action |= DDS_WR_DISPOSE_BIT;
}
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle);
if (tk) {
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
@ -280,9 +267,7 @@ dds_unregister_instance_ih_ts(
DDS_ERROR("No instance related with the provided handle is found\n");
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
dds_writer_unlock(wr);
err:
return ret;
@ -294,24 +279,19 @@ dds_writedispose_ts(
const void *data,
dds_time_t timestamp)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret;
dds_retcode_t rc;
dds_writer *wr;
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
ret = dds_write_impl (wr, data, timestamp, DDS_WR_ACTION_WRITE_DISPOSE);
if (ret == DDS_RETCODE_OK) {
dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
dds_writer_unlock(wr);
} else {
DDS_ERROR("Error occurred on locking writer\n");
@ -329,7 +309,7 @@ dds_dispose_impl(
dds_time_t timestamp)
{
dds_return_t ret;
assert(vtime_awake_p(lookup_thread_state()->vtime));
assert(thread_is_awake ());
assert(wr);
ret = dds_write_impl(wr, data, timestamp, DDS_WR_ACTION_DISPOSE);
if (ret == DDS_RETCODE_OK) {
@ -344,21 +324,16 @@ dds_dispose_ts(
const void *data,
dds_time_t timestamp)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret;
dds_retcode_t rc;
dds_writer *wr;
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
ret = dds_dispose_impl(wr, data, DDS_HANDLE_NIL, timestamp);
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
dds_writer_unlock(wr);
} else {
DDS_ERROR("Error occurred on locking writer\n");
@ -374,18 +349,15 @@ dds_dispose_ih_ts(
dds_instance_handle_t handle,
dds_time_t timestamp)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret;
dds_retcode_t rc;
dds_writer *wr;
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
struct ddsi_tkmap_instance *tk;
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle)) != NULL) {
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
void *sample = ddsi_sertopic_alloc_sample (tp);
@ -397,9 +369,7 @@ dds_dispose_ih_ts(
DDS_ERROR("No instance related with the provided handle is found\n");
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
dds_writer_unlock(wr);
} else {
DDS_ERROR("Error occurred on locking writer\n");
@ -414,6 +384,7 @@ dds_lookup_instance(
dds_entity_t entity,
const void *data)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_instance_handle_t ih = DDS_HANDLE_NIL;
const dds_topic * topic;
struct ddsi_tkmap * map = gv.m_tkmap;
@ -426,17 +397,11 @@ dds_lookup_instance(
topic = dds_instance_info_by_hdl (entity);
if (topic) {
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
sd = ddsi_serdata_from_sample (topic->m_stopic, SDK_KEY, data);
ih = ddsi_tkmap_lookup (map, sd);
ddsi_serdata_unref (sd);
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
} else {
DDS_ERROR("Acquired topic is NULL\n");
}
@ -458,8 +423,7 @@ dds_instance_get_key(
dds_instance_handle_t ih,
void *data)
{
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret;
const dds_topic * topic;
struct ddsi_tkmap_instance * tk;
@ -476,9 +440,7 @@ dds_instance_get_key(
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
}
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (ts1);
if ((tk = ddsi_tkmap_find_by_id(gv.m_tkmap, ih)) != NULL) {
ddsi_sertopic_zero_sample (topic->m_stopic, data);
ddsi_serdata_topicless_to_sample (topic->m_stopic, tk->m_sample, data, NULL, NULL);
@ -488,9 +450,7 @@ dds_instance_get_key(
DDS_ERROR("No instance related with the provided entity is found\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (ts1);
err:
return ret;
}

View file

@ -48,18 +48,13 @@ static dds_return_t
dds_participant_delete(
dds_entity *e)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
dds_entity *prev = NULL;
dds_entity *iter;
assert(e);
assert(thr);
assert(dds_entity_kind(e) == DDS_KIND_PARTICIPANT);
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (lookup_thread_state ());
dds_domain_free (e->m_domain);
@ -81,9 +76,7 @@ dds_participant_delete(
assert (iter);
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (lookup_thread_state ());
/* Every dds_init needs a dds_fini. */
dds_fini();
@ -155,8 +148,6 @@ dds_create_participant(
dds_participant * pp;
nn_plist_t plist;
dds_qos_t * new_qos = NULL;
struct thread_state1 * thr;
bool asleep;
/* Make sure DDS instance is initialized. */
ret = dds_init(domain);
@ -192,15 +183,9 @@ dds_create_participant(
nn_plist_init_empty(&plist);
dds_merge_qos (&plist.qos, new_qos);
thr = lookup_thread_state ();
asleep = !vtime_awake_p (thr->vtime);
if (asleep) {
thread_state_awake (thr);
}
thread_state_awake (lookup_thread_state ());
q_rc = new_participant (&guid, 0, &plist);
if (asleep) {
thread_state_asleep (thr);
}
thread_state_asleep (lookup_thread_state ());
nn_plist_fini (&plist);
if (q_rc != 0) {
DDS_ERROR("Internal error");

View file

@ -88,16 +88,12 @@ dds_read_impl(
bool lock,
bool only_reader)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret = DDS_RETCODE_OK;
dds_retcode_t rc;
struct dds_reader * rd;
struct dds_readcond * cond;
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
if (asleep) {
thread_state_awake (thr);
}
if (buf == NULL) {
DDS_ERROR("The provided buffer is NULL\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
@ -124,18 +120,19 @@ dds_read_impl(
goto fail;
}
thread_state_awake (ts1);
rc = dds_read_lock(reader_or_condition, &rd, &cond, only_reader);
if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Error occurred on locking entity\n");
ret = DDS_ERRNO(rc);
goto fail;
goto fail_awake;
}
if (hand != DDS_HANDLE_NIL) {
if (ddsi_tkmap_find_by_id(gv.m_tkmap, hand) == NULL) {
DDS_ERROR("Could not find instance\n");
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
dds_read_unlock(rd, cond);
goto fail;
goto fail_awake;
}
}
/* Allocate samples if not provided (assuming all or none provided) */
@ -173,10 +170,9 @@ dds_read_impl(
}
dds_read_unlock(rd, cond);
fail_awake:
thread_state_asleep (ts1);
fail:
if (asleep) {
thread_state_asleep (thr);
}
return ret;
}
@ -191,12 +187,11 @@ dds_readcdr_impl(
dds_instance_handle_t hand,
bool lock)
{
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_return_t ret = DDS_RETCODE_OK;
dds_retcode_t rc;
struct dds_reader * rd;
struct dds_readcond * cond;
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
assert (take);
assert (buf);
@ -205,10 +200,7 @@ dds_readcdr_impl(
assert (maxs > 0);
(void)take;
if (asleep)
{
thread_state_awake (thr);
}
thread_state_awake (ts1);
rc = dds_read_lock(reader_or_condition, &rd, &cond, false);
if (rc >= DDS_RETCODE_OK) {
ret = dds_rhc_takecdr
@ -235,11 +227,7 @@ dds_readcdr_impl(
ret = DDS_ERRNO(rc);
}
if (asleep)
{
thread_state_asleep (thr);
}
thread_state_asleep (ts1);
return ret;
}

View file

@ -54,23 +54,16 @@ dds_reader_close(
{
dds_retcode_t rc;
dds_return_t ret = DDS_RETCODE_OK;
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
assert(e);
assert(thr);
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (lookup_thread_state ());
if (delete_reader(&e->m_guid) != 0) {
DDS_ERROR("Internal error");
rc = DDS_RETCODE_ERROR;
ret = DDS_ERRNO(rc);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (lookup_thread_state ());
return ret;
}
@ -368,8 +361,6 @@ dds_create_reader(
dds_topic * tp;
dds_entity_t reader;
dds_entity_t t;
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
dds_return_t ret = DDS_RETCODE_OK;
bool internal_topic;
@ -474,17 +465,13 @@ dds_create_reader(
ddsrt_mutex_unlock(&tp->m_entity.m_mutex);
ddsrt_mutex_unlock(&sub->m_entity.m_mutex);
if (asleep) {
thread_state_awake (thr);
}
thread_state_awake (lookup_thread_state ());
rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic,
rqos, rhc, dds_reader_status_cb, rd);
ddsrt_mutex_lock(&sub->m_entity.m_mutex);
ddsrt_mutex_lock(&tp->m_entity.m_mutex);
assert (rd->m_rd);
if (asleep) {
thread_state_asleep (thr);
}
thread_state_asleep (lookup_thread_state ());
/* For persistent data register reader with durability */
if (dds_global.m_dur_reader && (rd->m_entity.m_qos->durability.kind > NN_TRANSIENT_LOCAL_DURABILITY_QOS)) {

View file

@ -313,8 +313,6 @@ dds_create_topic_arbitrary (
dds_qos_t *new_qos = NULL;
dds_entity_t hdl;
struct participant *ddsi_pp;
struct thread_state1 *const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
if (sertopic == NULL){
DDS_ERROR("Topic description is NULL\n");
@ -389,17 +387,13 @@ dds_create_topic_arbitrary (
ddsrt_mutex_unlock (&dds_global.m_mutex);
/* Publish Topic */
if (asleep) {
thread_state_awake (thr);
}
thread_state_awake (lookup_thread_state ());
ddsi_pp = ephash_lookup_participant_guid (&par->m_guid);
assert (ddsi_pp);
if (sedp_plist) {
sedp_write_topic (ddsi_pp, sedp_plist);
}
if (asleep) {
thread_state_asleep (thr);
}
thread_state_asleep (lookup_thread_state ());
}
qos_err:

View file

@ -148,8 +148,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct thread_state1 * const ts1 = lookup_thread_state ();
const bool writekey = action & DDS_WR_KEY_BIT;
struct writer *ddsi_wr = wr->m_wr;
struct ddsi_tkmap_instance *tk;
@ -168,8 +167,7 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx))
return DDS_RETCODE_OK;
if (asleep)
thread_state_awake (thr);
thread_state_awake (ts1);
/* Serialize and write data or key */
d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data);
@ -177,7 +175,7 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
d->timestamp.v = tstamp;
ddsi_serdata_ref (d);
tk = ddsi_tkmap_lookup_instance_ref (d);
w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk);
w_rc = write_sample_gc (ts1, wr->m_xp, ddsi_wr, d, tk);
if (w_rc >= 0)
{
@ -199,26 +197,21 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
ret = deliver_locally (ddsi_wr, d, tk);
ddsi_serdata_unref (d);
ddsi_tkmap_instance_unref (tk);
if (asleep)
thread_state_asleep (thr);
thread_state_asleep (ts1);
return ret;
}
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct thread_state1 * const ts1 = lookup_thread_state ();
struct ddsi_tkmap_instance * tk;
int ret = DDS_RETCODE_OK;
int w_rc;
if (asleep)
thread_state_awake (thr);
thread_state_awake (ts1);
ddsi_serdata_ref (d);
tk = ddsi_tkmap_lookup_instance_ref (d);
w_rc = write_sample_gc (xp, ddsi_wr, d, tk);
w_rc = write_sample_gc (ts1, xp, ddsi_wr, d, tk);
if (w_rc >= 0) {
/* Flush out write unless configured to batch */
if (!config.whc_batch && xp != NULL)
@ -239,10 +232,7 @@ dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack
ret = deliver_locally (ddsi_wr, d, tk);
ddsi_serdata_unref (d);
ddsi_tkmap_instance_unref (tk);
if (asleep)
thread_state_asleep (thr);
thread_state_asleep (ts1);
return ret;
}
@ -263,13 +253,10 @@ void dds_write_set_batch (bool enable)
void dds_write_flush (dds_entity_t writer)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct thread_state1 * const ts1 = lookup_thread_state ();
dds_writer *wr;
dds_retcode_t rc;
if (asleep)
thread_state_awake (thr);
thread_state_awake (ts1);
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
DDS_ERROR ("Error occurred on locking writer\n");
else
@ -277,8 +264,5 @@ void dds_write_flush (dds_entity_t writer)
nn_xpack_send (wr->m_xp, true);
dds_writer_unlock (wr);
}
if (asleep)
thread_state_asleep (thr);
return;
thread_state_asleep (ts1);
}

View file

@ -192,24 +192,16 @@ dds_writer_close(
{
dds_return_t ret = DDS_RETCODE_OK;
dds_writer *wr = (dds_writer*)e;
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = thr ? !vtime_awake_p(thr->vtime) : false;
assert(e);
if (asleep) {
thread_state_awake(thr);
}
if (thr) {
nn_xpack_send (wr->m_xp, false);
}
thread_state_awake (lookup_thread_state ());
nn_xpack_send (wr->m_xp, false);
if (delete_writer (&e->m_guid) != 0) {
DDS_ERROR("Internal error");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (lookup_thread_state ());
return ret;
}
@ -218,24 +210,11 @@ dds_writer_delete(
dds_entity *e)
{
dds_writer *wr = (dds_writer*)e;
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = thr ? !vtime_awake_p(thr->vtime) : false;
dds_return_t ret;
assert(e);
assert(thr);
/* FIXME: not freeing WHC here because it is owned by the DDSI entity */
if (asleep) {
thread_state_awake(thr);
}
if (thr) {
nn_xpack_free(wr->m_xp);
}
if (asleep) {
thread_state_asleep(thr);
}
thread_state_awake (lookup_thread_state ());
nn_xpack_free(wr->m_xp);
thread_state_asleep (lookup_thread_state ());
ret = dds_delete(wr->m_topic->m_entity.m_hdllink.hdl);
if(ret == DDS_RETCODE_OK){
ret = dds_delete_impl(e->m_parent->m_hdllink.hdl, true);
@ -304,15 +283,11 @@ dds_writer_qos_set(
dds_qget_ownership (e->m_qos, &kind);
if (kind == DDS_OWNERSHIP_EXCLUSIVE) {
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct writer * ddsi_wr = ((dds_writer*)e)->m_wr;
dds_qset_ownership_strength (e->m_qos, qos->ownership_strength.value);
if (asleep) {
thread_state_awake (thr);
}
thread_state_awake (lookup_thread_state ());
/* FIXME: with QoS changes being unsupported by the underlying stack I wonder what will happen; locking the underlying DDSI writer is of doubtful value as well */
ddsrt_mutex_lock (&ddsi_wr->e.lock);
@ -320,10 +295,7 @@ dds_writer_qos_set(
ddsi_wr->xqos->ownership_strength.value = qos->ownership_strength.value;
}
ddsrt_mutex_unlock (&ddsi_wr->e.lock);
if (asleep) {
thread_state_asleep (thr);
}
thread_state_asleep (lookup_thread_state ());
} else {
DDS_ERROR("Setting ownership strength doesn't make sense when the ownership is shared\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
@ -403,8 +375,6 @@ dds_create_writer(
dds_publisher * pub = NULL;
dds_topic * tp;
dds_entity_t publisher;
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
ddsi_tran_conn_t conn = gv.data_conn_uc;
dds_return_t ret;
@ -485,16 +455,12 @@ dds_create_writer(
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
if (asleep) {
thread_state_awake(thr);
}
thread_state_awake (lookup_thread_state ());
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
ddsrt_mutex_lock (&pub->m_entity.m_mutex);
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
assert(wr->m_wr);
if (asleep) {
thread_state_asleep(thr);
}
thread_state_asleep (lookup_thread_state ());
dds_topic_unlock(tp);
dds_publisher_unlock(pub);
return writer;