nestable calls to thread_[state_]awake
Remove all the "if asleep then awake ..." stuff from the code by making awake/asleep calls nestable, whereas before it "awake ; awake" really meant a transition through "asleep". This self-evidently necessitates fixing those places where the old behaviour was relied on upon, but fortunately those are few. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
9b3a71e1ab
commit
c3dca32a2f
36 changed files with 450 additions and 663 deletions
|
@ -65,12 +65,11 @@ void dds__builtin_init (void)
|
|||
void dds__builtin_fini (void)
|
||||
{
|
||||
/* No more sources for builtin topic samples */
|
||||
struct thread_state1 * const self = lookup_thread_state ();
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
delete_local_orphan_writer (builtintopic_writer_participant);
|
||||
delete_local_orphan_writer (builtintopic_writer_publications);
|
||||
delete_local_orphan_writer (builtintopic_writer_subscriptions);
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
ddsi_sertopic_unref (builtin_participant_topic);
|
||||
ddsi_sertopic_unref (builtin_reader_topic);
|
||||
|
|
|
@ -117,12 +117,12 @@ static dds_handle_t dds_handle_create_int (struct dds_handle_link *link)
|
|||
|
||||
dds_handle_t dds_handle_create (struct dds_handle_link *link)
|
||||
{
|
||||
#if USE_CHH
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
#endif
|
||||
dds_handle_t ret;
|
||||
#if USE_CHH
|
||||
struct thread_state1 * const self = lookup_thread_state ();
|
||||
const bool asleep = vtime_asleep_p (self->vtime);
|
||||
if (asleep)
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
#endif
|
||||
ddsrt_mutex_lock (&handles.lock);
|
||||
if (handles.count == MAX_HANDLES)
|
||||
|
@ -143,8 +143,7 @@ dds_handle_t dds_handle_create (struct dds_handle_link *link)
|
|||
assert (ret > 0);
|
||||
}
|
||||
#if USE_CHH
|
||||
if (asleep)
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
@ -156,8 +155,10 @@ void dds_handle_close (struct dds_handle_link *link)
|
|||
|
||||
int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
|
||||
{
|
||||
#if USE_CHH
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
#endif
|
||||
assert (ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED);
|
||||
|
||||
ddsrt_mutex_lock (&handles.lock);
|
||||
if ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_COUNT_MASK) != 0)
|
||||
{
|
||||
|
@ -174,13 +175,9 @@ int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
|
|||
}
|
||||
}
|
||||
#if USE_CHH
|
||||
struct thread_state1 * const self = lookup_thread_state ();
|
||||
const bool asleep = vtime_asleep_p (self->vtime);
|
||||
if (asleep)
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
int x = ut_chhRemove (handles.ht, link);
|
||||
if (asleep)
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
#else
|
||||
int x = ut_hhRemove (handles.ht, link);
|
||||
#endif
|
||||
|
@ -194,6 +191,9 @@ int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
|
|||
|
||||
int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
|
||||
{
|
||||
#if USE_CHH
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
#endif
|
||||
struct dds_handle_link dummy = { .hdl = hdl };
|
||||
int32_t rc;
|
||||
/* it makes sense to check here for initialization: the first thing any operation
|
||||
|
@ -208,10 +208,7 @@ int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
|
|||
return DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
|
||||
#if USE_CHH
|
||||
struct thread_state1 * const self = lookup_thread_state ();
|
||||
const bool asleep = vtime_asleep_p (self->vtime);
|
||||
if (asleep)
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
*link = ut_chhLookup (handles.ht, &dummy);
|
||||
#else
|
||||
ddsrt_mutex_lock (&handles.lock);
|
||||
|
@ -234,10 +231,8 @@ int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
|
|||
}
|
||||
} while (!ddsrt_atomic_cas32 (&(*link)->cnt_flags, cnt_flags, cnt_flags + 1));
|
||||
}
|
||||
|
||||
#if USE_CHH
|
||||
if (asleep)
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
#else
|
||||
ddsrt_mutex_unlock (&handles.lock);
|
||||
#endif
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
#include "dds/ddsi/ddsi_iid.h"
|
||||
#include "dds/ddsi/ddsi_tkmap.h"
|
||||
#include "dds/ddsi/ddsi_serdata.h"
|
||||
#include "dds/ddsi/q_servicelease.h"
|
||||
#include "dds/ddsi/ddsi_threadmon.h"
|
||||
#include "dds/ddsi/q_entity.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
#include "dds/ddsi/q_gc.h"
|
||||
|
@ -127,15 +127,15 @@ dds_init(dds_domainid_t domain)
|
|||
|
||||
/* Start monitoring the liveliness of all threads. */
|
||||
if (!config.liveliness_monitoring)
|
||||
gv.servicelease = NULL;
|
||||
gv.threadmon = NULL;
|
||||
else
|
||||
{
|
||||
gv.servicelease = nn_servicelease_new(0, 0);
|
||||
if (gv.servicelease == NULL)
|
||||
gv.threadmon = ddsi_threadmon_new ();
|
||||
if (gv.threadmon == NULL)
|
||||
{
|
||||
DDS_ERROR("Failed to create a servicelease\n");
|
||||
DDS_ERROR("Failed to create a thread monitor\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_OUT_OF_RESOURCES);
|
||||
goto fail_servicelease_new;
|
||||
goto fail_threadmon_new;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,11 +162,11 @@ dds_init(dds_domainid_t domain)
|
|||
goto fail_rtps_start;
|
||||
}
|
||||
|
||||
if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
|
||||
if (gv.threadmon && ddsi_threadmon_start(gv.threadmon) < 0)
|
||||
{
|
||||
DDS_ERROR("Failed to start the servicelease\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
|
||||
goto fail_servicelease_start;
|
||||
goto fail_threadmon_start;
|
||||
}
|
||||
|
||||
/* Set additional default participant properties */
|
||||
|
@ -191,9 +191,9 @@ skip:
|
|||
ddsrt_mutex_unlock(init_mutex);
|
||||
return DDS_RETCODE_OK;
|
||||
|
||||
fail_servicelease_start:
|
||||
if (gv.servicelease)
|
||||
nn_servicelease_stop_renewing (gv.servicelease);
|
||||
fail_threadmon_start:
|
||||
if (gv.threadmon)
|
||||
ddsi_threadmon_stop (gv.threadmon);
|
||||
dds_handle_server_fini();
|
||||
fail_handleserver:
|
||||
rtps_stop ();
|
||||
|
@ -201,12 +201,12 @@ fail_rtps_start:
|
|||
dds__builtin_fini ();
|
||||
rtps_fini ();
|
||||
fail_rtps_init:
|
||||
if (gv.servicelease)
|
||||
if (gv.threadmon)
|
||||
{
|
||||
nn_servicelease_free (gv.servicelease);
|
||||
gv.servicelease = NULL;
|
||||
ddsi_threadmon_free (gv.threadmon);
|
||||
gv.threadmon = NULL;
|
||||
}
|
||||
fail_servicelease_new:
|
||||
fail_threadmon_new:
|
||||
downgrade_main_thread ();
|
||||
thread_states_fini();
|
||||
fail_rtps_config:
|
||||
|
@ -231,15 +231,15 @@ extern void dds_fini (void)
|
|||
dds_global.m_init_count--;
|
||||
if (dds_global.m_init_count == 0)
|
||||
{
|
||||
if (gv.servicelease)
|
||||
nn_servicelease_stop_renewing (gv.servicelease);
|
||||
if (gv.threadmon)
|
||||
ddsi_threadmon_stop (gv.threadmon);
|
||||
dds_handle_server_fini();
|
||||
rtps_stop ();
|
||||
dds__builtin_fini ();
|
||||
rtps_fini ();
|
||||
if (gv.servicelease)
|
||||
nn_servicelease_free (gv.servicelease);
|
||||
gv.servicelease = NULL;
|
||||
if (gv.threadmon)
|
||||
ddsi_threadmon_free (gv.threadmon);
|
||||
gv.threadmon = NULL;
|
||||
downgrade_main_thread ();
|
||||
thread_states_fini ();
|
||||
|
||||
|
|
|
@ -127,8 +127,7 @@ dds_register_instance(
|
|||
dds_instance_handle_t *handle,
|
||||
const void *data)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct ddsi_tkmap_instance * inst;
|
||||
dds_writer *wr;
|
||||
dds_return_t ret;
|
||||
|
@ -150,9 +149,7 @@ dds_register_instance(
|
|||
ret = DDS_ERRNO(rc);
|
||||
goto err;
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
inst = dds_instance_find (wr->m_topic, data, true);
|
||||
if(inst != NULL){
|
||||
*handle = inst->m_iid;
|
||||
|
@ -161,9 +158,7 @@ dds_register_instance(
|
|||
DDS_ERROR("Unable to create instance\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock(wr);
|
||||
err:
|
||||
return ret;
|
||||
|
@ -191,8 +186,7 @@ dds_unregister_instance_ts(
|
|||
const void *data,
|
||||
dds_time_t timestamp)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
dds_retcode_t rc;
|
||||
bool autodispose = true;
|
||||
|
@ -219,17 +213,13 @@ dds_unregister_instance_ts(
|
|||
if (wr->m_entity.m_qos) {
|
||||
dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
if (autodispose) {
|
||||
dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL);
|
||||
action |= DDS_WR_DISPOSE_BIT;
|
||||
}
|
||||
ret = dds_write_impl (wr, data, timestamp, action);
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock(wr);
|
||||
err:
|
||||
return ret;
|
||||
|
@ -241,8 +231,7 @@ dds_unregister_instance_ih_ts(
|
|||
dds_instance_handle_t handle,
|
||||
dds_time_t timestamp)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
dds_retcode_t rc;
|
||||
bool autodispose = true;
|
||||
|
@ -265,9 +254,7 @@ dds_unregister_instance_ih_ts(
|
|||
action |= DDS_WR_DISPOSE_BIT;
|
||||
}
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle);
|
||||
if (tk) {
|
||||
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
|
||||
|
@ -280,9 +267,7 @@ dds_unregister_instance_ih_ts(
|
|||
DDS_ERROR("No instance related with the provided handle is found\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock(wr);
|
||||
err:
|
||||
return ret;
|
||||
|
@ -294,24 +279,19 @@ dds_writedispose_ts(
|
|||
const void *data,
|
||||
dds_time_t timestamp)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret;
|
||||
dds_retcode_t rc;
|
||||
dds_writer *wr;
|
||||
|
||||
rc = dds_writer_lock(writer, &wr);
|
||||
if (rc == DDS_RETCODE_OK) {
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
ret = dds_write_impl (wr, data, timestamp, DDS_WR_ACTION_WRITE_DISPOSE);
|
||||
if (ret == DDS_RETCODE_OK) {
|
||||
dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock(wr);
|
||||
} else {
|
||||
DDS_ERROR("Error occurred on locking writer\n");
|
||||
|
@ -329,7 +309,7 @@ dds_dispose_impl(
|
|||
dds_time_t timestamp)
|
||||
{
|
||||
dds_return_t ret;
|
||||
assert(vtime_awake_p(lookup_thread_state()->vtime));
|
||||
assert(thread_is_awake ());
|
||||
assert(wr);
|
||||
ret = dds_write_impl(wr, data, timestamp, DDS_WR_ACTION_DISPOSE);
|
||||
if (ret == DDS_RETCODE_OK) {
|
||||
|
@ -344,21 +324,16 @@ dds_dispose_ts(
|
|||
const void *data,
|
||||
dds_time_t timestamp)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret;
|
||||
dds_retcode_t rc;
|
||||
dds_writer *wr;
|
||||
|
||||
rc = dds_writer_lock(writer, &wr);
|
||||
if (rc == DDS_RETCODE_OK) {
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
ret = dds_dispose_impl(wr, data, DDS_HANDLE_NIL, timestamp);
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock(wr);
|
||||
} else {
|
||||
DDS_ERROR("Error occurred on locking writer\n");
|
||||
|
@ -374,18 +349,15 @@ dds_dispose_ih_ts(
|
|||
dds_instance_handle_t handle,
|
||||
dds_time_t timestamp)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret;
|
||||
dds_retcode_t rc;
|
||||
dds_writer *wr;
|
||||
|
||||
rc = dds_writer_lock(writer, &wr);
|
||||
if (rc == DDS_RETCODE_OK) {
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle)) != NULL) {
|
||||
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
|
||||
void *sample = ddsi_sertopic_alloc_sample (tp);
|
||||
|
@ -397,9 +369,7 @@ dds_dispose_ih_ts(
|
|||
DDS_ERROR("No instance related with the provided handle is found\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock(wr);
|
||||
} else {
|
||||
DDS_ERROR("Error occurred on locking writer\n");
|
||||
|
@ -414,6 +384,7 @@ dds_lookup_instance(
|
|||
dds_entity_t entity,
|
||||
const void *data)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_instance_handle_t ih = DDS_HANDLE_NIL;
|
||||
const dds_topic * topic;
|
||||
struct ddsi_tkmap * map = gv.m_tkmap;
|
||||
|
@ -426,17 +397,11 @@ dds_lookup_instance(
|
|||
|
||||
topic = dds_instance_info_by_hdl (entity);
|
||||
if (topic) {
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
sd = ddsi_serdata_from_sample (topic->m_stopic, SDK_KEY, data);
|
||||
ih = ddsi_tkmap_lookup (map, sd);
|
||||
ddsi_serdata_unref (sd);
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
} else {
|
||||
DDS_ERROR("Acquired topic is NULL\n");
|
||||
}
|
||||
|
@ -458,8 +423,7 @@ dds_instance_get_key(
|
|||
dds_instance_handle_t ih,
|
||||
void *data)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret;
|
||||
const dds_topic * topic;
|
||||
struct ddsi_tkmap_instance * tk;
|
||||
|
@ -476,9 +440,7 @@ dds_instance_get_key(
|
|||
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||
goto err;
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
if ((tk = ddsi_tkmap_find_by_id(gv.m_tkmap, ih)) != NULL) {
|
||||
ddsi_sertopic_zero_sample (topic->m_stopic, data);
|
||||
ddsi_serdata_topicless_to_sample (topic->m_stopic, tk->m_sample, data, NULL, NULL);
|
||||
|
@ -488,9 +450,7 @@ dds_instance_get_key(
|
|||
DDS_ERROR("No instance related with the provided entity is found\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
err:
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -48,18 +48,13 @@ static dds_return_t
|
|||
dds_participant_delete(
|
||||
dds_entity *e)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
dds_entity *prev = NULL;
|
||||
dds_entity *iter;
|
||||
|
||||
assert(e);
|
||||
assert(thr);
|
||||
assert(dds_entity_kind(e) == DDS_KIND_PARTICIPANT);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
|
||||
dds_domain_free (e->m_domain);
|
||||
|
||||
|
@ -81,9 +76,7 @@ dds_participant_delete(
|
|||
|
||||
assert (iter);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
/* Every dds_init needs a dds_fini. */
|
||||
dds_fini();
|
||||
|
@ -155,8 +148,6 @@ dds_create_participant(
|
|||
dds_participant * pp;
|
||||
nn_plist_t plist;
|
||||
dds_qos_t * new_qos = NULL;
|
||||
struct thread_state1 * thr;
|
||||
bool asleep;
|
||||
|
||||
/* Make sure DDS instance is initialized. */
|
||||
ret = dds_init(domain);
|
||||
|
@ -192,15 +183,9 @@ dds_create_participant(
|
|||
nn_plist_init_empty(&plist);
|
||||
dds_merge_qos (&plist.qos, new_qos);
|
||||
|
||||
thr = lookup_thread_state ();
|
||||
asleep = !vtime_awake_p (thr->vtime);
|
||||
if (asleep) {
|
||||
thread_state_awake (thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
q_rc = new_participant (&guid, 0, &plist);
|
||||
if (asleep) {
|
||||
thread_state_asleep (thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
nn_plist_fini (&plist);
|
||||
if (q_rc != 0) {
|
||||
DDS_ERROR("Internal error");
|
||||
|
|
|
@ -88,16 +88,12 @@ dds_read_impl(
|
|||
bool lock,
|
||||
bool only_reader)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
dds_retcode_t rc;
|
||||
struct dds_reader * rd;
|
||||
struct dds_readcond * cond;
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake (thr);
|
||||
}
|
||||
if (buf == NULL) {
|
||||
DDS_ERROR("The provided buffer is NULL\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||
|
@ -124,18 +120,19 @@ dds_read_impl(
|
|||
goto fail;
|
||||
}
|
||||
|
||||
thread_state_awake (ts1);
|
||||
rc = dds_read_lock(reader_or_condition, &rd, &cond, only_reader);
|
||||
if (rc != DDS_RETCODE_OK) {
|
||||
DDS_ERROR("Error occurred on locking entity\n");
|
||||
ret = DDS_ERRNO(rc);
|
||||
goto fail;
|
||||
goto fail_awake;
|
||||
}
|
||||
if (hand != DDS_HANDLE_NIL) {
|
||||
if (ddsi_tkmap_find_by_id(gv.m_tkmap, hand) == NULL) {
|
||||
DDS_ERROR("Could not find instance\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
||||
dds_read_unlock(rd, cond);
|
||||
goto fail;
|
||||
goto fail_awake;
|
||||
}
|
||||
}
|
||||
/* Allocate samples if not provided (assuming all or none provided) */
|
||||
|
@ -173,10 +170,9 @@ dds_read_impl(
|
|||
}
|
||||
dds_read_unlock(rd, cond);
|
||||
|
||||
fail_awake:
|
||||
thread_state_asleep (ts1);
|
||||
fail:
|
||||
if (asleep) {
|
||||
thread_state_asleep (thr);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -191,12 +187,11 @@ dds_readcdr_impl(
|
|||
dds_instance_handle_t hand,
|
||||
bool lock)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
dds_retcode_t rc;
|
||||
struct dds_reader * rd;
|
||||
struct dds_readcond * cond;
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
|
||||
assert (take);
|
||||
assert (buf);
|
||||
|
@ -205,10 +200,7 @@ dds_readcdr_impl(
|
|||
assert (maxs > 0);
|
||||
(void)take;
|
||||
|
||||
if (asleep)
|
||||
{
|
||||
thread_state_awake (thr);
|
||||
}
|
||||
thread_state_awake (ts1);
|
||||
rc = dds_read_lock(reader_or_condition, &rd, &cond, false);
|
||||
if (rc >= DDS_RETCODE_OK) {
|
||||
ret = dds_rhc_takecdr
|
||||
|
@ -235,11 +227,7 @@ dds_readcdr_impl(
|
|||
ret = DDS_ERRNO(rc);
|
||||
}
|
||||
|
||||
if (asleep)
|
||||
{
|
||||
thread_state_asleep (thr);
|
||||
}
|
||||
|
||||
thread_state_asleep (ts1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -54,23 +54,16 @@ dds_reader_close(
|
|||
{
|
||||
dds_retcode_t rc;
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
|
||||
assert(e);
|
||||
assert(thr);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
if (delete_reader(&e->m_guid) != 0) {
|
||||
DDS_ERROR("Internal error");
|
||||
rc = DDS_RETCODE_ERROR;
|
||||
ret = DDS_ERRNO(rc);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -368,8 +361,6 @@ dds_create_reader(
|
|||
dds_topic * tp;
|
||||
dds_entity_t reader;
|
||||
dds_entity_t t;
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
bool internal_topic;
|
||||
|
||||
|
@ -474,17 +465,13 @@ dds_create_reader(
|
|||
ddsrt_mutex_unlock(&tp->m_entity.m_mutex);
|
||||
ddsrt_mutex_unlock(&sub->m_entity.m_mutex);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake (thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic,
|
||||
rqos, rhc, dds_reader_status_cb, rd);
|
||||
ddsrt_mutex_lock(&sub->m_entity.m_mutex);
|
||||
ddsrt_mutex_lock(&tp->m_entity.m_mutex);
|
||||
assert (rd->m_rd);
|
||||
if (asleep) {
|
||||
thread_state_asleep (thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
/* For persistent data register reader with durability */
|
||||
if (dds_global.m_dur_reader && (rd->m_entity.m_qos->durability.kind > NN_TRANSIENT_LOCAL_DURABILITY_QOS)) {
|
||||
|
|
|
@ -313,8 +313,6 @@ dds_create_topic_arbitrary (
|
|||
dds_qos_t *new_qos = NULL;
|
||||
dds_entity_t hdl;
|
||||
struct participant *ddsi_pp;
|
||||
struct thread_state1 *const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
|
||||
if (sertopic == NULL){
|
||||
DDS_ERROR("Topic description is NULL\n");
|
||||
|
@ -389,17 +387,13 @@ dds_create_topic_arbitrary (
|
|||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
|
||||
/* Publish Topic */
|
||||
if (asleep) {
|
||||
thread_state_awake (thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
ddsi_pp = ephash_lookup_participant_guid (&par->m_guid);
|
||||
assert (ddsi_pp);
|
||||
if (sedp_plist) {
|
||||
sedp_write_topic (ddsi_pp, sedp_plist);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep (thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
}
|
||||
|
||||
qos_err:
|
||||
|
|
|
@ -148,8 +148,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
|
|||
|
||||
dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
const bool writekey = action & DDS_WR_KEY_BIT;
|
||||
struct writer *ddsi_wr = wr->m_wr;
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
|
@ -168,8 +167,7 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
|
|||
if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx))
|
||||
return DDS_RETCODE_OK;
|
||||
|
||||
if (asleep)
|
||||
thread_state_awake (thr);
|
||||
thread_state_awake (ts1);
|
||||
|
||||
/* Serialize and write data or key */
|
||||
d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data);
|
||||
|
@ -177,7 +175,7 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
|
|||
d->timestamp.v = tstamp;
|
||||
ddsi_serdata_ref (d);
|
||||
tk = ddsi_tkmap_lookup_instance_ref (d);
|
||||
w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk);
|
||||
w_rc = write_sample_gc (ts1, wr->m_xp, ddsi_wr, d, tk);
|
||||
|
||||
if (w_rc >= 0)
|
||||
{
|
||||
|
@ -199,26 +197,21 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
|
|||
ret = deliver_locally (ddsi_wr, d, tk);
|
||||
ddsi_serdata_unref (d);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
|
||||
if (asleep)
|
||||
thread_state_asleep (thr);
|
||||
thread_state_asleep (ts1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct ddsi_tkmap_instance * tk;
|
||||
int ret = DDS_RETCODE_OK;
|
||||
int w_rc;
|
||||
|
||||
if (asleep)
|
||||
thread_state_awake (thr);
|
||||
|
||||
thread_state_awake (ts1);
|
||||
ddsi_serdata_ref (d);
|
||||
tk = ddsi_tkmap_lookup_instance_ref (d);
|
||||
w_rc = write_sample_gc (xp, ddsi_wr, d, tk);
|
||||
w_rc = write_sample_gc (ts1, xp, ddsi_wr, d, tk);
|
||||
if (w_rc >= 0) {
|
||||
/* Flush out write unless configured to batch */
|
||||
if (!config.whc_batch && xp != NULL)
|
||||
|
@ -239,10 +232,7 @@ dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack
|
|||
ret = deliver_locally (ddsi_wr, d, tk);
|
||||
ddsi_serdata_unref (d);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
|
||||
if (asleep)
|
||||
thread_state_asleep (thr);
|
||||
|
||||
thread_state_asleep (ts1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -263,13 +253,10 @@ void dds_write_set_batch (bool enable)
|
|||
|
||||
void dds_write_flush (dds_entity_t writer)
|
||||
{
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_writer *wr;
|
||||
dds_retcode_t rc;
|
||||
|
||||
if (asleep)
|
||||
thread_state_awake (thr);
|
||||
thread_state_awake (ts1);
|
||||
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
|
||||
DDS_ERROR ("Error occurred on locking writer\n");
|
||||
else
|
||||
|
@ -277,8 +264,5 @@ void dds_write_flush (dds_entity_t writer)
|
|||
nn_xpack_send (wr->m_xp, true);
|
||||
dds_writer_unlock (wr);
|
||||
}
|
||||
|
||||
if (asleep)
|
||||
thread_state_asleep (thr);
|
||||
return;
|
||||
thread_state_asleep (ts1);
|
||||
}
|
||||
|
|
|
@ -192,24 +192,16 @@ dds_writer_close(
|
|||
{
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
dds_writer *wr = (dds_writer*)e;
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = thr ? !vtime_awake_p(thr->vtime) : false;
|
||||
|
||||
assert(e);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
if (thr) {
|
||||
nn_xpack_send (wr->m_xp, false);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
nn_xpack_send (wr->m_xp, false);
|
||||
if (delete_writer (&e->m_guid) != 0) {
|
||||
DDS_ERROR("Internal error");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -218,24 +210,11 @@ dds_writer_delete(
|
|||
dds_entity *e)
|
||||
{
|
||||
dds_writer *wr = (dds_writer*)e;
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = thr ? !vtime_awake_p(thr->vtime) : false;
|
||||
dds_return_t ret;
|
||||
|
||||
assert(e);
|
||||
assert(thr);
|
||||
|
||||
/* FIXME: not freeing WHC here because it is owned by the DDSI entity */
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
if (thr) {
|
||||
nn_xpack_free(wr->m_xp);
|
||||
}
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
nn_xpack_free(wr->m_xp);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
ret = dds_delete(wr->m_topic->m_entity.m_hdllink.hdl);
|
||||
if(ret == DDS_RETCODE_OK){
|
||||
ret = dds_delete_impl(e->m_parent->m_hdllink.hdl, true);
|
||||
|
@ -304,15 +283,11 @@ dds_writer_qos_set(
|
|||
dds_qget_ownership (e->m_qos, &kind);
|
||||
|
||||
if (kind == DDS_OWNERSHIP_EXCLUSIVE) {
|
||||
struct thread_state1 * const thr = lookup_thread_state ();
|
||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||
struct writer * ddsi_wr = ((dds_writer*)e)->m_wr;
|
||||
|
||||
dds_qset_ownership_strength (e->m_qos, qos->ownership_strength.value);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake (thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
|
||||
/* FIXME: with QoS changes being unsupported by the underlying stack I wonder what will happen; locking the underlying DDSI writer is of doubtful value as well */
|
||||
ddsrt_mutex_lock (&ddsi_wr->e.lock);
|
||||
|
@ -320,10 +295,7 @@ dds_writer_qos_set(
|
|||
ddsi_wr->xqos->ownership_strength.value = qos->ownership_strength.value;
|
||||
}
|
||||
ddsrt_mutex_unlock (&ddsi_wr->e.lock);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_asleep (thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
} else {
|
||||
DDS_ERROR("Setting ownership strength doesn't make sense when the ownership is shared\n");
|
||||
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
|
||||
|
@ -403,8 +375,6 @@ dds_create_writer(
|
|||
dds_publisher * pub = NULL;
|
||||
dds_topic * tp;
|
||||
dds_entity_t publisher;
|
||||
struct thread_state1 * const thr = lookup_thread_state();
|
||||
const bool asleep = !vtime_awake_p(thr->vtime);
|
||||
ddsi_tran_conn_t conn = gv.data_conn_uc;
|
||||
dds_return_t ret;
|
||||
|
||||
|
@ -485,16 +455,12 @@ dds_create_writer(
|
|||
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
|
||||
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
|
||||
|
||||
if (asleep) {
|
||||
thread_state_awake(thr);
|
||||
}
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
|
||||
ddsrt_mutex_lock (&pub->m_entity.m_mutex);
|
||||
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
|
||||
assert(wr->m_wr);
|
||||
if (asleep) {
|
||||
thread_state_asleep(thr);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
dds_topic_unlock(tp);
|
||||
dds_publisher_unlock(pub);
|
||||
return writer;
|
||||
|
|
|
@ -26,6 +26,7 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
|
|||
ddsi_iid.c
|
||||
ddsi_tkmap.c
|
||||
ddsi_vendor.c
|
||||
ddsi_threadmon.c
|
||||
q_addrset.c
|
||||
q_bitset_inlines.c
|
||||
q_bswap.c
|
||||
|
@ -47,10 +48,8 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
|
|||
q_radmin.c
|
||||
q_receive.c
|
||||
q_security.c
|
||||
q_servicelease.c
|
||||
q_sockwaitset.c
|
||||
q_thread.c
|
||||
q_thread_inlines.c
|
||||
q_time.c
|
||||
q_transmit.c
|
||||
q_inverse_uint32_set.c
|
||||
|
@ -78,6 +77,7 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
|
|||
ddsi_iid.h
|
||||
ddsi_tkmap.h
|
||||
ddsi_vendor.h
|
||||
ddsi_threadmon.h
|
||||
q_addrset.h
|
||||
q_bitset.h
|
||||
q_bswap.h
|
||||
|
@ -105,7 +105,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/dds/ddsi"
|
|||
q_receive.h
|
||||
q_rtps.h
|
||||
q_security.h
|
||||
q_servicelease.h
|
||||
q_sockwaitset.h
|
||||
q_static_assert.h
|
||||
q_thread.h
|
||||
|
|
31
src/core/ddsi/include/dds/ddsi/ddsi_threadmon.h
Normal file
31
src/core/ddsi/include/dds/ddsi/ddsi_threadmon.h
Normal file
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License v. 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
||||
* v. 1.0 which is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
*/
|
||||
#ifndef DDSI_THREADMON_H
|
||||
#define DDSI_THREADMON_H
|
||||
|
||||
#if defined (__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct ddsi_threadmon;
|
||||
|
||||
struct ddsi_threadmon *ddsi_threadmon_new (void);
|
||||
int ddsi_threadmon_start (struct ddsi_threadmon *sl);
|
||||
void ddsi_threadmon_stop (struct ddsi_threadmon *sl);
|
||||
void ddsi_threadmon_free (struct ddsi_threadmon *sl);
|
||||
void ddsi_threadmon_statechange_barrier (struct ddsi_threadmon *sl);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* DDSI_THREADMON_H */
|
|
@ -267,6 +267,7 @@ struct config
|
|||
int forward_all_messages;
|
||||
int liveliness_monitoring;
|
||||
int noprogress_log_stacktraces;
|
||||
int64_t liveliness_monitoring_interval;
|
||||
int prioritize_retransmit;
|
||||
int xpack_send_async;
|
||||
int multiple_recv_threads;
|
||||
|
@ -277,9 +278,6 @@ struct config
|
|||
|
||||
unsigned delivery_queue_maxsamples;
|
||||
|
||||
float servicelease_expiry_time;
|
||||
float servicelease_update_factor;
|
||||
|
||||
int enableLoopback;
|
||||
enum durability_cdr durability_cdr;
|
||||
|
||||
|
|
|
@ -105,7 +105,7 @@ struct q_globals {
|
|||
|
||||
/* Queue for garbage collection requests */
|
||||
struct gcreq_queue *gcreq_queue;
|
||||
struct nn_servicelease *servicelease;
|
||||
struct ddsi_threadmon *threadmon;
|
||||
|
||||
/* Lease junk */
|
||||
ddsrt_mutex_t leaseheap_lock;
|
||||
|
|
|
@ -22,7 +22,6 @@ struct receiver_state;
|
|||
struct participant;
|
||||
struct lease;
|
||||
struct entity_common;
|
||||
struct thread_state1;
|
||||
|
||||
void lease_management_init (void);
|
||||
void lease_management_term (void);
|
||||
|
@ -31,7 +30,7 @@ void lease_register (struct lease *l);
|
|||
void lease_free (struct lease *l);
|
||||
void lease_renew (struct lease *l, nn_etime_t tnow);
|
||||
void lease_set_expiry (struct lease *l, nn_etime_t when);
|
||||
int64_t check_and_handle_lease_expiration (struct thread_state1 *self, nn_etime_t tnow);
|
||||
int64_t check_and_handle_lease_expiration (nn_etime_t tnow);
|
||||
|
||||
void handle_PMD (const struct receiver_state *rst, nn_wctime_t timestamp, unsigned statusinfo, const void *vdata, unsigned len);
|
||||
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
/*
|
||||
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License v. 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
||||
* v. 1.0 which is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
*/
|
||||
#ifndef NN_SERVICELEASE_H
|
||||
#define NN_SERVICELEASE_H
|
||||
|
||||
#if defined (__cplusplus)
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct nn_servicelease;
|
||||
|
||||
struct nn_servicelease *nn_servicelease_new (void (*renew_cb) (void *arg), void *renew_arg);
|
||||
int nn_servicelease_start_renewing (struct nn_servicelease *sl);
|
||||
void nn_servicelease_stop_renewing (struct nn_servicelease *sl);
|
||||
void nn_servicelease_free (struct nn_servicelease *sl);
|
||||
void nn_servicelease_statechange_barrier (struct nn_servicelease *sl);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* NN_SERVICELEASE_H */
|
|
@ -12,6 +12,7 @@
|
|||
#ifndef Q_THREAD_H
|
||||
#define Q_THREAD_H
|
||||
|
||||
#include <assert.h>
|
||||
#include "dds/export.h"
|
||||
#include "dds/ddsrt/atomics.h"
|
||||
#include "dds/ddsrt/sync.h"
|
||||
|
@ -30,6 +31,9 @@ extern "C" {
|
|||
|
||||
typedef uint32_t vtime_t;
|
||||
typedef int32_t svtime_t; /* signed version */
|
||||
#define VTIME_NEST_MASK 0xfu
|
||||
#define VTIME_TIME_MASK 0xfffffff0u
|
||||
#define VTIME_TIME_SHIFT 4
|
||||
|
||||
/* GCC has a nifty feature allowing the specification of the required
|
||||
alignment: __attribute__ ((aligned (CACHE_LINE_SIZE))) in this
|
||||
|
@ -59,7 +63,6 @@ struct logbuf;
|
|||
*/
|
||||
#define THREAD_BASE \
|
||||
volatile vtime_t vtime; \
|
||||
volatile vtime_t watchdog; \
|
||||
ddsrt_thread_t tid; \
|
||||
ddsrt_thread_t extTid; \
|
||||
enum thread_state state; \
|
||||
|
@ -94,8 +97,7 @@ DDS_EXPORT void thread_states_fini (void);
|
|||
DDS_EXPORT void upgrade_main_thread (void);
|
||||
DDS_EXPORT void downgrade_main_thread (void);
|
||||
DDS_EXPORT const struct config_thread_properties_listelem *lookup_thread_properties (const char *name);
|
||||
DDS_EXPORT struct thread_state1 *create_thread (const char *name, uint32_t (*f) (void *arg), void *arg);
|
||||
DDS_EXPORT struct thread_state1 *lookup_thread_state (void);
|
||||
DDS_EXPORT dds_retcode_t create_thread (struct thread_state1 **ts, const char *name, uint32_t (*f) (void *arg), void *arg);
|
||||
DDS_EXPORT struct thread_state1 *lookup_thread_state_real (void);
|
||||
DDS_EXPORT int join_thread (struct thread_state1 *ts1);
|
||||
DDS_EXPORT void log_stack_traces (void);
|
||||
|
@ -104,83 +106,69 @@ DDS_EXPORT struct thread_state1 * init_thread_state (const char *tname);
|
|||
DDS_EXPORT void reset_thread_state (struct thread_state1 *ts1);
|
||||
DDS_EXPORT int thread_exists (const char *name);
|
||||
|
||||
DDS_EXPORT inline int vtime_awake_p (vtime_t vtime)
|
||||
{
|
||||
return (vtime % 2) == 0;
|
||||
DDS_EXPORT inline struct thread_state1 *lookup_thread_state (void) {
|
||||
struct thread_state1 *ts1 = tsd_thread_state;
|
||||
if (ts1)
|
||||
return ts1;
|
||||
else
|
||||
return lookup_thread_state_real ();
|
||||
}
|
||||
|
||||
DDS_EXPORT inline int vtime_asleep_p (vtime_t vtime)
|
||||
DDS_EXPORT inline bool vtime_awake_p (vtime_t vtime)
|
||||
{
|
||||
return (vtime % 2) == 1;
|
||||
return (vtime & VTIME_NEST_MASK) != 0;
|
||||
}
|
||||
|
||||
DDS_EXPORT inline int vtime_gt (vtime_t vtime1, vtime_t vtime0)
|
||||
DDS_EXPORT inline bool vtime_asleep_p (vtime_t vtime)
|
||||
{
|
||||
return (vtime & VTIME_NEST_MASK) == 0;
|
||||
}
|
||||
|
||||
DDS_EXPORT inline bool vtime_gt (vtime_t vtime1, vtime_t vtime0)
|
||||
{
|
||||
Q_STATIC_ASSERT_CODE (sizeof (vtime_t) == sizeof (svtime_t));
|
||||
return (svtime_t) (vtime1 - vtime0) > 0;
|
||||
return (svtime_t) ((vtime1 & VTIME_TIME_MASK) - (vtime0 & VTIME_TIME_MASK)) > 0;
|
||||
}
|
||||
|
||||
DDS_EXPORT inline bool thread_is_awake (void)
|
||||
{
|
||||
return vtime_awake_p (lookup_thread_state ()->vtime);
|
||||
}
|
||||
|
||||
DDS_EXPORT inline bool thread_is_asleep (void)
|
||||
{
|
||||
return vtime_asleep_p (lookup_thread_state ()->vtime);
|
||||
}
|
||||
|
||||
DDS_EXPORT inline void thread_state_asleep (struct thread_state1 *ts1)
|
||||
{
|
||||
vtime_t vt = ts1->vtime;
|
||||
vtime_t wd = ts1->watchdog;
|
||||
if (vtime_awake_p (vt))
|
||||
{
|
||||
ddsrt_atomic_fence_rel ();
|
||||
ts1->vtime = vt + 1;
|
||||
}
|
||||
assert (vtime_awake_p (vt));
|
||||
/* nested calls a rare and an extra fence doesn't break things */
|
||||
ddsrt_atomic_fence_rel ();
|
||||
if ((vt & VTIME_NEST_MASK) == 1)
|
||||
vt += (1u << VTIME_TIME_SHIFT) - 1u;
|
||||
else
|
||||
{
|
||||
ddsrt_atomic_fence_rel ();
|
||||
ts1->vtime = vt + 2;
|
||||
ddsrt_atomic_fence_acq ();
|
||||
}
|
||||
|
||||
if ( wd % 2 ){
|
||||
ts1->watchdog = wd + 2;
|
||||
} else {
|
||||
ts1->watchdog = wd + 1;
|
||||
}
|
||||
vt -= 1u;
|
||||
ts1->vtime = vt;
|
||||
}
|
||||
|
||||
DDS_EXPORT inline void thread_state_awake (struct thread_state1 *ts1)
|
||||
{
|
||||
vtime_t vt = ts1->vtime;
|
||||
vtime_t wd = ts1->watchdog;
|
||||
if (vtime_asleep_p (vt))
|
||||
ts1->vtime = vt + 1;
|
||||
else
|
||||
{
|
||||
ddsrt_atomic_fence_rel ();
|
||||
ts1->vtime = vt + 2;
|
||||
}
|
||||
assert ((vt & VTIME_NEST_MASK) < VTIME_NEST_MASK);
|
||||
ts1->vtime = vt + 1u;
|
||||
/* nested calls a rare and an extra fence doesn't break things */
|
||||
ddsrt_atomic_fence_acq ();
|
||||
|
||||
if ( wd % 2 ){
|
||||
ts1->watchdog = wd + 1;
|
||||
} else {
|
||||
ts1->watchdog = wd + 2;
|
||||
}
|
||||
}
|
||||
|
||||
DDS_EXPORT inline void thread_state_blocked (struct thread_state1 *ts1)
|
||||
DDS_EXPORT inline void thread_state_awake_to_awake_no_nest (struct thread_state1 *ts1)
|
||||
{
|
||||
vtime_t wd = ts1->watchdog;
|
||||
if ( wd % 2 ){
|
||||
ts1->watchdog = wd + 2;
|
||||
} else {
|
||||
ts1->watchdog = wd + 1;
|
||||
}
|
||||
}
|
||||
|
||||
DDS_EXPORT inline void thread_state_unblocked (struct thread_state1 *ts1)
|
||||
{
|
||||
vtime_t wd = ts1->watchdog;
|
||||
if ( wd % 2 ){
|
||||
ts1->watchdog = wd + 1;
|
||||
} else {
|
||||
ts1->watchdog = wd + 2;
|
||||
}
|
||||
vtime_t vt = ts1->vtime;
|
||||
assert ((vt & VTIME_NEST_MASK) == 1);
|
||||
ddsrt_atomic_fence_rel ();
|
||||
ts1->vtime = vt + (1u << VTIME_TIME_SHIFT);
|
||||
ddsrt_atomic_fence_acq ();
|
||||
}
|
||||
|
||||
#if defined (__cplusplus)
|
||||
|
|
|
@ -25,6 +25,7 @@ struct whc_state;
|
|||
struct proxy_reader;
|
||||
struct ddsi_serdata;
|
||||
struct ddsi_tkmap_instance;
|
||||
struct thread_state1;
|
||||
|
||||
/* Writing new data; serdata_twrite (serdata) is assumed to be really
|
||||
recentish; serdata is unref'd. If xp == NULL, data is queued, else
|
||||
|
@ -33,10 +34,10 @@ struct ddsi_tkmap_instance;
|
|||
"nogc": no GC may occur, so it may not block to throttle the writer if the high water mark of the WHC is reached, which implies true KEEP_LAST behaviour. This is true for all the DDSI built-in writers.
|
||||
"gc": GC may occur, which means the writer history and watermarks can be anything. This must be used for all application data.
|
||||
*/
|
||||
int write_sample_gc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
|
||||
int write_sample_nogc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
|
||||
int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
|
||||
int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
|
||||
int write_sample_gc (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
|
||||
int write_sample_nogc (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk);
|
||||
int write_sample_gc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
|
||||
int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata);
|
||||
|
||||
/* When calling the following functions, wr->lock must be held */
|
||||
int create_fragment_message (struct writer *wr, seqno_t seq, const struct nn_plist *plist, struct ddsi_serdata *serdata, unsigned fragnum, struct proxy_reader *prd,struct nn_xmsg **msg, int isnew);
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
#include "dds/ddsrt/sync.h"
|
||||
#include "dds/ddsrt/threads.h"
|
||||
|
||||
#include "dds/ddsi/q_servicelease.h"
|
||||
#include "dds/ddsi/ddsi_threadmon.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
#include "dds/ddsi/q_log.h"
|
||||
#include "dds/ddsi/q_thread.h"
|
||||
|
@ -25,32 +25,14 @@
|
|||
#include "dds/ddsi/q_globals.h" /* for mattr, cattr */
|
||||
#include "dds/ddsi/q_receive.h"
|
||||
|
||||
static dds_time_t nn_retrieve_lease_settings (void)
|
||||
{
|
||||
const double leaseSec = config.servicelease_expiry_time;
|
||||
double sleepSec = leaseSec * config.servicelease_update_factor;
|
||||
|
||||
/* Run at no less than 1Hz: internal liveliness monitoring is slaved
|
||||
to this interval as well. 1Hz lease renewals and liveliness
|
||||
checks is no large burden, and performing liveliness checks once
|
||||
a second is a lot more useful than doing it once every few
|
||||
seconds. Besides -- we're now also gathering CPU statistics. */
|
||||
if (sleepSec > 1.0f)
|
||||
return DDS_NSECS_IN_SEC;
|
||||
|
||||
return ((dds_time_t)(sleepSec * DDS_NSECS_IN_SEC)) +
|
||||
((dds_time_t)(sleepSec * (double)DDS_NSECS_IN_SEC) % DDS_NSECS_IN_SEC);
|
||||
}
|
||||
|
||||
struct alive_wd {
|
||||
char alive;
|
||||
vtime_t wd;
|
||||
struct alive_vt {
|
||||
bool alive;
|
||||
vtime_t vt;
|
||||
};
|
||||
|
||||
struct nn_servicelease {
|
||||
dds_time_t sleepTime;
|
||||
struct ddsi_threadmon {
|
||||
int keepgoing;
|
||||
struct alive_wd *av_ary;
|
||||
struct alive_vt *av_ary;
|
||||
void (*renew_cb) (void *arg);
|
||||
void *renew_arg;
|
||||
|
||||
|
@ -59,37 +41,38 @@ struct nn_servicelease {
|
|||
struct thread_state1 *ts;
|
||||
};
|
||||
|
||||
static uint32_t lease_renewal_thread (struct nn_servicelease *sl)
|
||||
static uint32_t threadmon_thread (struct ddsi_threadmon *sl)
|
||||
{
|
||||
/* Do not check more often than once every 100ms (no particular
|
||||
reason why it has to be 100ms), regardless of the lease settings.
|
||||
Note: can't trust sl->self, may have been scheduled before the
|
||||
assignment. */
|
||||
const int64_t min_progress_check_intv = 100 * T_MILLISECOND;
|
||||
struct thread_state1 *self = lookup_thread_state ();
|
||||
nn_mtime_t next_thread_cputime = { 0 };
|
||||
nn_mtime_t tlast = { 0 };
|
||||
int was_alive = 1;
|
||||
bool was_alive = true;
|
||||
unsigned i;
|
||||
for (i = 0; i < thread_states.nthreads; i++)
|
||||
{
|
||||
sl->av_ary[i].alive = 1;
|
||||
sl->av_ary[i].wd = thread_states.ts[i].watchdog - 1;
|
||||
sl->av_ary[i].alive = true;
|
||||
}
|
||||
ddsrt_mutex_lock (&sl->lock);
|
||||
while (sl->keepgoing)
|
||||
{
|
||||
/* Guard against spurious wakeups by checking only when cond_waitfor signals a timeout */
|
||||
if (ddsrt_cond_waitfor (&sl->cond, &sl->lock, config.liveliness_monitoring_interval))
|
||||
continue;
|
||||
|
||||
unsigned n_alive = 0;
|
||||
nn_mtime_t tnow = now_mt ();
|
||||
|
||||
LOG_THREAD_CPUTIME (next_thread_cputime);
|
||||
|
||||
DDS_TRACE("servicelease: tnow %"PRId64":", tnow.v);
|
||||
DDS_TRACE("threadmon: tnow %"PRId64":", tnow.v);
|
||||
|
||||
/* Check progress only if enough time has passed: there is no
|
||||
guarantee that os_cond_timedwait wont ever return early, and we
|
||||
do want to avoid spurious warnings. */
|
||||
if (tnow.v < tlast.v + min_progress_check_intv)
|
||||
if (tnow.v < tlast.v)
|
||||
{
|
||||
n_alive = thread_states.nthreads;
|
||||
}
|
||||
|
@ -103,11 +86,10 @@ static uint32_t lease_renewal_thread (struct nn_servicelease *sl)
|
|||
else
|
||||
{
|
||||
vtime_t vt = thread_states.ts[i].vtime;
|
||||
vtime_t wd = thread_states.ts[i].watchdog;
|
||||
int alive = vtime_asleep_p (vt) || vtime_asleep_p (wd) || vtime_gt (wd, sl->av_ary[i].wd);
|
||||
bool alive = vtime_asleep_p (vt) || vtime_asleep_p (sl->av_ary[i].vt) || vtime_gt (vt, sl->av_ary[i].vt);
|
||||
n_alive += (unsigned) alive;
|
||||
DDS_TRACE(" %u(%s):%c:%u:%u->%u:", i, thread_states.ts[i].name, alive ? 'a' : 'd', vt, sl->av_ary[i].wd, wd);
|
||||
sl->av_ary[i].wd = wd;
|
||||
DDS_TRACE(" %u(%s):%c:%x->%x", i, thread_states.ts[i].name, alive ? 'a' : 'd', sl->av_ary[i].vt, vt);
|
||||
sl->av_ary[i].vt = vt;
|
||||
if (sl->av_ary[i].alive != alive)
|
||||
{
|
||||
const char *name = thread_states.ts[i].name;
|
||||
|
@ -117,31 +99,23 @@ static uint32_t lease_renewal_thread (struct nn_servicelease *sl)
|
|||
else
|
||||
msg = "once again made progress";
|
||||
DDS_INFO("thread %s %s\n", name ? name : "(anon)", msg);
|
||||
sl->av_ary[i].alive = (char) alive;
|
||||
sl->av_ary[i].alive = alive;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Only renew the lease if all threads are alive, so that one
|
||||
thread blocking for a while but not too extremely long will
|
||||
cause warnings for that thread in the log file, but won't cause
|
||||
the DDSI2 service to be marked as dead. */
|
||||
if (n_alive == thread_states.nthreads)
|
||||
{
|
||||
DDS_TRACE(": [%u] renewing\n", n_alive);
|
||||
/* FIXME: perhaps it would be nice to control automatic
|
||||
liveliness updates from here.
|
||||
FIXME: should terminate failure of renew_cb() */
|
||||
sl->renew_cb (sl->renew_arg);
|
||||
was_alive = 1;
|
||||
DDS_TRACE(": [%u] OK\n", n_alive);
|
||||
was_alive = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
DDS_TRACE(": [%u] NOT renewing\n", n_alive);
|
||||
DDS_TRACE(": [%u] FAIL\n", n_alive);
|
||||
if (was_alive)
|
||||
log_stack_traces ();
|
||||
was_alive = 0;
|
||||
was_alive = false;
|
||||
}
|
||||
|
||||
if (dds_get_log_mask() & DDS_LC_TIMING)
|
||||
|
@ -151,20 +125,14 @@ static uint32_t lease_renewal_thread (struct nn_servicelease *sl)
|
|||
{
|
||||
DDS_LOG(DDS_LC_TIMING,
|
||||
"rusage: utime %d.%09d stime %d.%09d maxrss %ld data %ld vcsw %ld ivcsw %ld\n",
|
||||
(int) u.utime / DDS_NSECS_IN_SEC,
|
||||
(int) u.utime % DDS_NSECS_IN_SEC,
|
||||
(int) u.stime / DDS_NSECS_IN_SEC,
|
||||
(int) u.stime % DDS_NSECS_IN_SEC,
|
||||
(int) (u.utime / DDS_NSECS_IN_SEC),
|
||||
(int) (u.utime % DDS_NSECS_IN_SEC),
|
||||
(int) (u.stime / DDS_NSECS_IN_SEC),
|
||||
(int) (u.stime % DDS_NSECS_IN_SEC),
|
||||
u.maxrss, u.idrss, u.nvcsw, u.nivcsw);
|
||||
}
|
||||
}
|
||||
|
||||
ddsrt_cond_waitfor (&sl->cond, &sl->lock, sl->sleepTime);
|
||||
|
||||
/* We are never active in a way that matters for the garbage
|
||||
collection of old writers, &c. */
|
||||
thread_state_asleep (self);
|
||||
|
||||
/* While deaf, we need to make sure the receive thread wakes up
|
||||
every now and then to try recreating sockets & rejoining multicast
|
||||
groups */
|
||||
|
@ -175,19 +143,12 @@ static uint32_t lease_renewal_thread (struct nn_servicelease *sl)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void dummy_renew_cb (UNUSED_ARG (void *arg))
|
||||
struct ddsi_threadmon *ddsi_threadmon_new (void)
|
||||
{
|
||||
}
|
||||
|
||||
struct nn_servicelease *nn_servicelease_new (void (*renew_cb) (void *arg), void *renew_arg)
|
||||
{
|
||||
struct nn_servicelease *sl;
|
||||
struct ddsi_threadmon *sl;
|
||||
|
||||
sl = ddsrt_malloc (sizeof (*sl));
|
||||
sl->sleepTime = nn_retrieve_lease_settings ();
|
||||
sl->keepgoing = -1;
|
||||
sl->renew_cb = renew_cb ? renew_cb : dummy_renew_cb;
|
||||
sl->renew_arg = renew_arg;
|
||||
sl->ts = NULL;
|
||||
|
||||
if ((sl->av_ary = ddsrt_malloc (thread_states.nthreads * sizeof (*sl->av_ary))) == NULL)
|
||||
|
@ -203,15 +164,14 @@ struct nn_servicelease *nn_servicelease_new (void (*renew_cb) (void *arg), void
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int nn_servicelease_start_renewing (struct nn_servicelease *sl)
|
||||
int ddsi_threadmon_start (struct ddsi_threadmon *sl)
|
||||
{
|
||||
ddsrt_mutex_lock (&sl->lock);
|
||||
assert (sl->keepgoing == -1);
|
||||
sl->keepgoing = 1;
|
||||
ddsrt_mutex_unlock (&sl->lock);
|
||||
|
||||
sl->ts = create_thread ("lease", (uint32_t (*) (void *)) lease_renewal_thread, sl);
|
||||
if (sl->ts == NULL)
|
||||
if (create_thread (&sl->ts, "lease", (uint32_t (*) (void *)) threadmon_thread, sl) != DDS_RETCODE_OK)
|
||||
goto fail_thread;
|
||||
return 0;
|
||||
|
||||
|
@ -220,13 +180,13 @@ int nn_servicelease_start_renewing (struct nn_servicelease *sl)
|
|||
return Q_ERR_UNSPECIFIED;
|
||||
}
|
||||
|
||||
void nn_servicelease_statechange_barrier (struct nn_servicelease *sl)
|
||||
void ddsi_threadmon_statechange_barrier (struct ddsi_threadmon *sl)
|
||||
{
|
||||
ddsrt_mutex_lock (&sl->lock);
|
||||
ddsrt_mutex_unlock (&sl->lock);
|
||||
}
|
||||
|
||||
void nn_servicelease_stop_renewing (struct nn_servicelease *sl)
|
||||
void ddsi_threadmon_stop (struct ddsi_threadmon *sl)
|
||||
{
|
||||
if (sl->keepgoing != -1)
|
||||
{
|
||||
|
@ -238,7 +198,7 @@ void nn_servicelease_stop_renewing (struct nn_servicelease *sl)
|
|||
}
|
||||
}
|
||||
|
||||
void nn_servicelease_free (struct nn_servicelease *sl)
|
||||
void ddsi_threadmon_free (struct ddsi_threadmon *sl)
|
||||
{
|
||||
ddsrt_cond_destroy (&sl->cond);
|
||||
ddsrt_mutex_destroy (&sl->lock);
|
|
@ -113,7 +113,7 @@ uint64_t ddsi_tkmap_lookup (struct ddsi_tkmap * map, const struct ddsi_serdata *
|
|||
{
|
||||
struct ddsi_tkmap_instance dummy;
|
||||
struct ddsi_tkmap_instance * tk;
|
||||
assert (vtime_awake_p(lookup_thread_state()->vtime));
|
||||
assert (thread_is_awake ());
|
||||
dummy.m_sample = (struct ddsi_serdata *) sd;
|
||||
tk = ut_chhLookup (map->m_hh, &dummy);
|
||||
return (tk) ? tk->m_iid : DDS_HANDLE_NIL;
|
||||
|
@ -125,7 +125,7 @@ struct ddsi_tkmap_instance *ddsi_tkmap_find_by_id (struct ddsi_tkmap *map, uint6
|
|||
struct ut_chhIter it;
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
uint32_t refc;
|
||||
assert (vtime_awake_p(lookup_thread_state()->vtime));
|
||||
assert (thread_is_awake ());
|
||||
for (tk = ut_chhIterFirst (map->m_hh, &it); tk; tk = ut_chhIterNext (&it))
|
||||
if (tk->m_iid == iid)
|
||||
break;
|
||||
|
@ -162,7 +162,7 @@ ddsi_tkmap_find(
|
|||
struct ddsi_tkmap_instance * tk;
|
||||
struct ddsi_tkmap * map = gv.m_tkmap;
|
||||
|
||||
assert (vtime_awake_p(lookup_thread_state()->vtime));
|
||||
assert (thread_is_awake ());
|
||||
dummy.m_sample = sd;
|
||||
retry:
|
||||
if ((tk = ut_chhLookup(map->m_hh, &dummy)) != NULL)
|
||||
|
@ -221,7 +221,7 @@ void ddsi_tkmap_instance_ref (struct ddsi_tkmap_instance *tk)
|
|||
void ddsi_tkmap_instance_unref (struct ddsi_tkmap_instance * tk)
|
||||
{
|
||||
uint32_t old, new;
|
||||
assert (vtime_awake_p(lookup_thread_state()->vtime));
|
||||
assert (thread_is_awake ());
|
||||
do {
|
||||
old = ddsrt_atomic_ld32(&tk->m_refc);
|
||||
if (old == 1)
|
||||
|
|
|
@ -141,7 +141,6 @@ DU(tracingOutputFileName);
|
|||
DU(verbosity);
|
||||
DUPF(logcat);
|
||||
DUPF(xcheck);
|
||||
DUPF(float);
|
||||
DUPF(int);
|
||||
DUPF(uint);
|
||||
DUPF(int32);
|
||||
|
@ -158,6 +157,7 @@ DU(duration_inf);
|
|||
DU(duration_ms_1hr);
|
||||
DU(duration_ms_1s);
|
||||
DU(duration_us_1s);
|
||||
DU(duration_100ms_1hr);
|
||||
PF(duration);
|
||||
DUPF(standards_conformance);
|
||||
DUPF(besmode);
|
||||
|
@ -516,6 +516,8 @@ static const struct cfgelem heartbeat_interval_attrs[] = {
|
|||
static const struct cfgelem liveliness_monitoring_attrs[] = {
|
||||
{ ATTR("StackTraces"), 1, "true", ABSOFF(noprogress_log_stacktraces), 0, uf_boolean, 0, pf_boolean,
|
||||
"<p>This element controls whether or not to write stack traces to the DDSI2 trace when a thread fails to make progress (on select platforms only).</p>" },
|
||||
{ ATTR("Interval"), 1, "1s", ABSOFF(liveliness_monitoring_interval), 0, uf_duration_100ms_1hr, 0, pf_duration,
|
||||
"<p>This element controls the interval at which to check whether threads have been making progress.</p>" },
|
||||
END_MARKER
|
||||
};
|
||||
|
||||
|
@ -878,23 +880,14 @@ static const struct cfgelem ddsi2_cfgelems[] = {
|
|||
END_MARKER
|
||||
};
|
||||
|
||||
/* Note: using 2e-1 instead of 0.2 to avoid use of the decimal
|
||||
separator, which is locale dependent. */
|
||||
static const struct cfgelem lease_expiry_time_cfgattrs[] = {
|
||||
{ ATTR("update_factor"), 1, "2e-1", ABSOFF(servicelease_update_factor), 0, uf_float, 0, pf_float, NULL },
|
||||
END_MARKER
|
||||
static const struct cfgelem deprecated_lease_cfgelems[] = {
|
||||
WILDCARD,
|
||||
END_MARKER
|
||||
};
|
||||
|
||||
static const struct cfgelem lease_cfgelems[] = {
|
||||
{ LEAF_W_ATTRS("ExpiryTime", lease_expiry_time_cfgattrs), 1, "10", ABSOFF(servicelease_expiry_time), 0, uf_float, 0, pf_float, NULL },
|
||||
END_MARKER
|
||||
};
|
||||
|
||||
|
||||
static const struct cfgelem domain_cfgelems[] = {
|
||||
{ GROUP("Lease", lease_cfgelems), NULL },
|
||||
{ LEAF("Id"), 1, "any", ABSOFF(domainId), 0, uf_domainId, 0, pf_domainId, NULL },
|
||||
WILDCARD,
|
||||
{ GROUP("|Lease", deprecated_lease_cfgelems), NULL },
|
||||
END_MARKER
|
||||
};
|
||||
|
||||
|
@ -908,7 +901,7 @@ static const struct cfgelem root_cfgelems[] = {
|
|||
{ "DDSI2E|DDSI2", ddsi2_cfgelems, NULL, NODATA,
|
||||
"<p>DDSI2 settings ...</p>" },
|
||||
{ "Durability", durability_cfgelems, NULL, NODATA, NULL },
|
||||
{ "Lease", lease_cfgelems, NULL, NODATA, NULL },
|
||||
{ GROUP("|Lease", deprecated_lease_cfgelems), NULL },
|
||||
END_MARKER
|
||||
};
|
||||
|
||||
|
@ -1529,7 +1522,7 @@ static int uf_string(struct cfgst *cfgst, void *parent, struct cfgelem const * c
|
|||
}
|
||||
|
||||
DDSRT_WARNING_MSVC_OFF(4996);
|
||||
static int uf_natint64_unit(struct cfgst *cfgst, int64_t *elem, const char *value, const struct unit *unittab, int64_t def_mult, int64_t max)
|
||||
static int uf_natint64_unit(struct cfgst *cfgst, int64_t *elem, const char *value, const struct unit *unittab, int64_t def_mult, int64_t min, int64_t max)
|
||||
{
|
||||
int pos;
|
||||
double v_dbl;
|
||||
|
@ -1542,14 +1535,14 @@ static int uf_natint64_unit(struct cfgst *cfgst, int64_t *elem, const char *valu
|
|||
return cfg_error(cfgst, "%s: empty string is not a valid value", value);
|
||||
} else if ( sscanf(value, "%lld%n", (long long int *) &v_int, &pos) == 1 && (mult = lookup_multiplier(cfgst, unittab, value, pos, v_int == 0, def_mult, 0)) != 0 ) {
|
||||
assert(mult > 0);
|
||||
if ( v_int < 0 || v_int > max / mult )
|
||||
if ( v_int < 0 || v_int > max / mult || mult * v_int < min)
|
||||
return cfg_error(cfgst, "%s: value out of range", value);
|
||||
*elem = mult * v_int;
|
||||
return 1;
|
||||
} else if ( sscanf(value, "%lf%n", &v_dbl, &pos) == 1 && (mult = lookup_multiplier(cfgst, unittab, value, pos, v_dbl == 0, def_mult, 1)) != 0 ) {
|
||||
double dmult = (double) mult;
|
||||
assert(dmult > 0);
|
||||
if ( v_dbl < 0 || (int64_t) (v_dbl * dmult + 0.5) > max )
|
||||
if ( (int64_t) (v_dbl * dmult + 0.5) < min || (int64_t) (v_dbl * dmult + 0.5) > max )
|
||||
return cfg_error(cfgst, "%s: value out of range", value);
|
||||
*elem = (int64_t) (v_dbl * dmult + 0.5);
|
||||
return 1;
|
||||
|
@ -1587,7 +1580,7 @@ static int uf_bandwidth(struct cfgst *cfgst, void *parent, struct cfgelem const
|
|||
static int uf_memsize(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
{
|
||||
int64_t size = 0;
|
||||
if ( !uf_natint64_unit(cfgst, &size, value, unittab_memsize, 1, INT32_MAX) )
|
||||
if ( !uf_natint64_unit(cfgst, &size, value, unittab_memsize, 1, 0, INT32_MAX) )
|
||||
return 0;
|
||||
else {
|
||||
uint32_t *elem = cfg_address(cfgst, parent, cfgelem);
|
||||
|
@ -1844,7 +1837,7 @@ static int uf_maybe_memsize(struct cfgst *cfgst, void *parent, struct cfgelem co
|
|||
elem->isdefault = 1;
|
||||
elem->value = 0;
|
||||
return 1;
|
||||
} else if ( !uf_natint64_unit(cfgst, &size, value, unittab_memsize, 1, INT32_MAX) ) {
|
||||
} else if ( !uf_natint64_unit(cfgst, &size, value, unittab_memsize, 1, 0, INT32_MAX) ) {
|
||||
return 0;
|
||||
} else {
|
||||
elem->isdefault = 0;
|
||||
|
@ -1853,19 +1846,6 @@ static int uf_maybe_memsize(struct cfgst *cfgst, void *parent, struct cfgelem co
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
static int uf_float(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
{
|
||||
float *elem = cfg_address(cfgst, parent, cfgelem);
|
||||
char *endptr;
|
||||
float f;
|
||||
dds_retcode_t rc = ddsrt_strtof(value, &endptr, &f);
|
||||
if (rc != DDS_RETCODE_OK || *value == 0 || *endptr != 0 )
|
||||
return cfg_error(cfgst, "%s: not a floating point number", value);
|
||||
*elem = f;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int uf_int(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
{
|
||||
int *elem = cfg_address(cfgst, parent, cfgelem);
|
||||
|
@ -1879,9 +1859,9 @@ static int uf_int(struct cfgst *cfgst, void *parent, struct cfgelem const * cons
|
|||
return 1;
|
||||
}
|
||||
|
||||
static int uf_duration_gen(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, const char *value, int64_t def_mult, int64_t max_ns)
|
||||
static int uf_duration_gen(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, const char *value, int64_t def_mult, int64_t min_ns, int64_t max_ns)
|
||||
{
|
||||
return uf_natint64_unit(cfgst, cfg_address(cfgst, parent, cfgelem), value, unittab_duration, def_mult, max_ns);
|
||||
return uf_natint64_unit(cfgst, cfg_address(cfgst, parent, cfgelem), value, unittab_duration, def_mult, min_ns, max_ns);
|
||||
}
|
||||
|
||||
static int uf_duration_inf(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
|
@ -1891,23 +1871,28 @@ static int uf_duration_inf(struct cfgst *cfgst, void *parent, struct cfgelem con
|
|||
*elem = T_NEVER;
|
||||
return 1;
|
||||
} else {
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, 0, T_NEVER - 1);
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, 0, 0, T_NEVER - 1);
|
||||
}
|
||||
}
|
||||
|
||||
static int uf_duration_ms_1hr(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
{
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, T_MILLISECOND, 3600 * T_SECOND);
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, T_MILLISECOND, 0, 3600 * T_SECOND);
|
||||
}
|
||||
|
||||
static int uf_duration_ms_1s(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
{
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, T_MILLISECOND, T_SECOND);
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, T_MILLISECOND, 0, T_SECOND);
|
||||
}
|
||||
|
||||
static int uf_duration_us_1s(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
{
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, 1000, T_SECOND);
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, 1000, 0, T_SECOND);
|
||||
}
|
||||
|
||||
static int uf_duration_100ms_1hr(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
{
|
||||
return uf_duration_gen(cfgst, parent, cfgelem, value, 0, 100 * T_MILLISECOND, 3600 * T_SECOND);
|
||||
}
|
||||
|
||||
static int uf_int32(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, UNUSED_ARG(int first), const char *value)
|
||||
|
@ -2309,12 +2294,6 @@ static void pf_domainId(struct cfgst *cfgst, void *parent, struct cfgelem const
|
|||
cfg_log(cfgst, "%d%s", p->value, is_default ? " [def]" : "");
|
||||
}
|
||||
|
||||
static void pf_float(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, int is_default)
|
||||
{
|
||||
float *p = cfg_address(cfgst, parent, cfgelem);
|
||||
cfg_log(cfgst, "%f%s", *p, is_default ? " [def]" : "");
|
||||
}
|
||||
|
||||
static void pf_boolean(struct cfgst *cfgst, void *parent, struct cfgelem const * const cfgelem, int is_default)
|
||||
{
|
||||
int *p = cfg_address(cfgst, parent, cfgelem);
|
||||
|
|
|
@ -172,13 +172,14 @@ static void maybe_add_pp_as_meta_to_as_disc (const struct addrset *as_meta)
|
|||
|
||||
static int write_mpayload (struct writer *wr, int alive, nn_parameterid_t keyparam, struct nn_xmsg *mpayload)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct ddsi_plist_sample plist_sample;
|
||||
struct ddsi_serdata *serdata;
|
||||
nn_xmsg_payload_to_plistsample (&plist_sample, keyparam, mpayload);
|
||||
serdata = ddsi_serdata_from_sample (gv.plist_topic, alive ? SDK_DATA : SDK_KEY, &plist_sample);
|
||||
serdata->statusinfo = alive ? 0 : NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
|
||||
serdata->timestamp = now ();
|
||||
return write_sample_nogc_notk (NULL, wr, serdata);
|
||||
return write_sample_nogc_notk (ts1, NULL, wr, serdata);
|
||||
}
|
||||
|
||||
int spdp_write (struct participant *pp)
|
||||
|
|
|
@ -142,12 +142,12 @@ static int print_proxy_endpoint_common (ddsi_tran_conn_t conn, const char *label
|
|||
}
|
||||
|
||||
|
||||
static int print_participants (struct thread_state1 *self, ddsi_tran_conn_t conn)
|
||||
static int print_participants (struct thread_state1 * const ts1, ddsi_tran_conn_t conn)
|
||||
{
|
||||
struct ephash_enum_participant e;
|
||||
struct participant *p;
|
||||
int x = 0;
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
ephash_enum_participant_init (&e);
|
||||
while ((p = ephash_enum_participant_next (&e)) != NULL)
|
||||
{
|
||||
|
@ -223,16 +223,16 @@ static int print_participants (struct thread_state1 *self, ddsi_tran_conn_t conn
|
|||
}
|
||||
}
|
||||
ephash_enum_participant_fini (&e);
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
return x;
|
||||
}
|
||||
|
||||
static int print_proxy_participants (struct thread_state1 *self, ddsi_tran_conn_t conn)
|
||||
static int print_proxy_participants (struct thread_state1 * const ts1, ddsi_tran_conn_t conn)
|
||||
{
|
||||
struct ephash_enum_proxy_participant e;
|
||||
struct proxy_participant *p;
|
||||
int x = 0;
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
ephash_enum_proxy_participant_init (&e);
|
||||
while ((p = ephash_enum_proxy_participant_next (&e)) != NULL)
|
||||
{
|
||||
|
@ -296,17 +296,18 @@ static int print_proxy_participants (struct thread_state1 *self, ddsi_tran_conn_
|
|||
}
|
||||
}
|
||||
ephash_enum_proxy_participant_fini (&e);
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
return x;
|
||||
}
|
||||
|
||||
static void debmon_handle_connection (struct debug_monitor *dm, ddsi_tran_conn_t conn)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct plugin *p;
|
||||
int r = 0;
|
||||
r += print_participants (dm->servts, conn);
|
||||
r += print_participants (ts1, conn);
|
||||
if (r == 0)
|
||||
r += print_proxy_participants (dm->servts, conn);
|
||||
r += print_proxy_participants (ts1, conn);
|
||||
|
||||
/* Note: can only add plugins (at the tail) */
|
||||
ddsrt_mutex_lock (&dm->lock);
|
||||
|
@ -379,7 +380,7 @@ struct debug_monitor *new_debug_monitor (int port)
|
|||
if (ddsi_listener_listen (dm->servsock) < 0)
|
||||
goto err_listen;
|
||||
dm->stop = 0;
|
||||
dm->servts = create_thread("debmon", debmon_main, dm);
|
||||
create_thread(&dm->servts, "debmon", debmon_main, dm);
|
||||
return dm;
|
||||
|
||||
err_listen:
|
||||
|
|
|
@ -3862,14 +3862,11 @@ void purge_proxy_participants (const nn_locator_t *loc, bool delete_from_as_disc
|
|||
{
|
||||
/* FIXME: check whether addr:port can't be reused for a new connection by the time we get here. */
|
||||
/* NOTE: This function exists for the sole purpose of cleaning up after closing a TCP connection in ddsi_tcp_close_conn and the state of the calling thread could be anything at this point. Because of that we do the unspeakable and toggle the thread state conditionally. We can't afford to have it in "asleep", as that causes a race with the garbage collector. */
|
||||
struct thread_state1 * const self = lookup_thread_state();
|
||||
const int self_is_awake = vtime_awake_p (self->vtime);
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct ephash_enum_proxy_participant est;
|
||||
struct proxy_purge_data data;
|
||||
|
||||
if (!self_is_awake)
|
||||
thread_state_awake(self);
|
||||
|
||||
thread_state_awake (ts1);
|
||||
data.loc = loc;
|
||||
data.timestamp = now();
|
||||
ephash_enum_proxy_participant_init (&est);
|
||||
|
@ -3881,8 +3878,7 @@ void purge_proxy_participants (const nn_locator_t *loc, bool delete_from_as_disc
|
|||
if (delete_from_as_disc)
|
||||
remove_from_addrset (gv.as_disc, loc);
|
||||
|
||||
if (!self_is_awake)
|
||||
thread_state_asleep(self);
|
||||
thread_state_asleep (ts1);
|
||||
}
|
||||
|
||||
int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t timestamp, int isimplicit)
|
||||
|
|
|
@ -79,7 +79,7 @@ static int threads_vtime_check (unsigned *nivs, struct idx_vtime *ivs)
|
|||
|
||||
static uint32_t gcreq_queue_thread (struct gcreq_queue *q)
|
||||
{
|
||||
struct thread_state1 *self = lookup_thread_state ();
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
nn_mtime_t next_thread_cputime = { 0 };
|
||||
dds_time_t shortsleep = 1 * T_MILLISECOND;
|
||||
int64_t delay = T_MILLISECOND; /* force evaluation after startup */
|
||||
|
@ -124,9 +124,9 @@ static uint32_t gcreq_queue_thread (struct gcreq_queue *q)
|
|||
very little impact on its primary purpose and be less of a
|
||||
burden on the system than having a separate thread or adding it
|
||||
to the workload of the data handling threads. */
|
||||
thread_state_awake (self);
|
||||
delay = check_and_handle_lease_expiration (self, now_et ());
|
||||
thread_state_asleep (self);
|
||||
thread_state_awake (ts1);
|
||||
delay = check_and_handle_lease_expiration (now_et ());
|
||||
thread_state_asleep (ts1);
|
||||
|
||||
if (gcreq)
|
||||
{
|
||||
|
@ -151,9 +151,9 @@ static uint32_t gcreq_queue_thread (struct gcreq_queue *q)
|
|||
multi-phase delete) or freeing the delete request. Reset
|
||||
the current gcreq as this one obviously is no more. */
|
||||
DDS_TRACE("gc %p: deleting\n", (void*)gcreq);
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
gcreq->cb (gcreq);
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
gcreq = NULL;
|
||||
trace_shortsleep = 1;
|
||||
}
|
||||
|
@ -174,9 +174,15 @@ struct gcreq_queue *gcreq_queue_new (void)
|
|||
q->count = 0;
|
||||
ddsrt_mutex_init (&q->lock);
|
||||
ddsrt_cond_init (&q->cond);
|
||||
q->ts = create_thread ("gc", (uint32_t (*) (void *)) gcreq_queue_thread, q);
|
||||
assert (q->ts);
|
||||
return q;
|
||||
if (create_thread (&q->ts, "gc", (uint32_t (*) (void *)) gcreq_queue_thread, q) == DDS_RETCODE_OK)
|
||||
return q;
|
||||
else
|
||||
{
|
||||
ddsrt_mutex_destroy (&q->lock);
|
||||
ddsrt_cond_destroy (&q->cond);
|
||||
ddsrt_free (q);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void gcreq_queue_drain (struct gcreq_queue *q)
|
||||
|
|
|
@ -839,7 +839,7 @@ static int setup_and_start_recv_threads (void)
|
|||
goto fail;
|
||||
}
|
||||
}
|
||||
if ((gv.recv_threads[i].ts = create_thread (gv.recv_threads[i].name, recv_thread, &gv.recv_threads[i].arg)) == NULL)
|
||||
if (create_thread (&gv.recv_threads[i].ts, gv.recv_threads[i].name, recv_thread, &gv.recv_threads[i].arg) != DDS_RETCODE_OK)
|
||||
{
|
||||
DDS_ERROR("rtps_init: failed to start thread %s\n", gv.recv_threads[i].name);
|
||||
goto fail;
|
||||
|
@ -1399,7 +1399,7 @@ int rtps_start (void)
|
|||
}
|
||||
if (gv.listener)
|
||||
{
|
||||
gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
|
||||
create_thread (&gv.listen_ts, "listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
|
||||
}
|
||||
if (gv.startup_mode)
|
||||
{
|
||||
|
@ -1429,7 +1429,8 @@ static void builtins_dqueue_ready_cb (void *varg)
|
|||
|
||||
void rtps_stop (void)
|
||||
{
|
||||
struct thread_state1 *self = lookup_thread_state ();
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
|
||||
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
|
||||
struct config_channel_listelem * chptr;
|
||||
#endif
|
||||
|
@ -1496,14 +1497,14 @@ void rtps_stop (void)
|
|||
/* Clean up proxy readers, proxy writers and proxy
|
||||
participants. Deleting a proxy participants deletes all its
|
||||
readers and writers automatically */
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
ephash_enum_proxy_participant_init (&est);
|
||||
while ((proxypp = ephash_enum_proxy_participant_next (&est)) != NULL)
|
||||
{
|
||||
delete_proxy_participant_by_guid(&proxypp->e.guid, tnow, 1);
|
||||
}
|
||||
ephash_enum_proxy_participant_fini (&est);
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -1518,7 +1519,7 @@ void rtps_stop (void)
|
|||
rwriters to get all SEDP and SPDP dispose+unregister messages
|
||||
out. FIXME: need to keep xevent thread alive for a while
|
||||
longer. */
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
ephash_enum_writer_init (&est_wr);
|
||||
while ((wr = ephash_enum_writer_next (&est_wr)) != NULL)
|
||||
{
|
||||
|
@ -1526,7 +1527,7 @@ void rtps_stop (void)
|
|||
delete_writer_nolinger (&wr->e.guid);
|
||||
}
|
||||
ephash_enum_writer_fini (&est_wr);
|
||||
thread_state_awake (self);
|
||||
thread_state_awake_to_awake_no_nest (ts1);
|
||||
ephash_enum_reader_init (&est_rd);
|
||||
while ((rd = ephash_enum_reader_next (&est_rd)) != NULL)
|
||||
{
|
||||
|
@ -1534,14 +1535,14 @@ void rtps_stop (void)
|
|||
(void)delete_reader (&rd->e.guid);
|
||||
}
|
||||
ephash_enum_reader_fini (&est_rd);
|
||||
thread_state_awake (self);
|
||||
thread_state_awake_to_awake_no_nest (ts1);
|
||||
ephash_enum_participant_init (&est_pp);
|
||||
while ((pp = ephash_enum_participant_next (&est_pp)) != NULL)
|
||||
{
|
||||
delete_participant (&pp->e.guid);
|
||||
}
|
||||
ephash_enum_participant_fini (&est_pp);
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
}
|
||||
|
||||
/* Wait until all participants are really gone => by then we can be
|
||||
|
|
|
@ -205,7 +205,7 @@ void lease_set_expiry (struct lease *l, nn_etime_t when)
|
|||
force_lease_check();
|
||||
}
|
||||
|
||||
int64_t check_and_handle_lease_expiration (UNUSED_ARG (struct thread_state1 *self), nn_etime_t tnowE)
|
||||
int64_t check_and_handle_lease_expiration (nn_etime_t tnowE)
|
||||
{
|
||||
struct lease *l;
|
||||
int64_t delay;
|
||||
|
|
|
@ -2397,7 +2397,7 @@ static enum dqueue_elem_kind dqueue_elem_kind (const struct nn_rsample_chain_ele
|
|||
|
||||
static uint32_t dqueue_thread (struct nn_dqueue *q)
|
||||
{
|
||||
struct thread_state1 *self = lookup_thread_state ();
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
nn_mtime_t next_thread_cputime = { 0 };
|
||||
int keepgoing = 1;
|
||||
nn_guid_t rdguid, *prdguid = NULL;
|
||||
|
@ -2416,6 +2416,7 @@ static uint32_t dqueue_thread (struct nn_dqueue *q)
|
|||
q->sc.first = q->sc.last = NULL;
|
||||
ddsrt_mutex_unlock (&q->lock);
|
||||
|
||||
thread_state_awake (ts1);
|
||||
while (sc.first)
|
||||
{
|
||||
struct nn_rsample_chain_elem *e = sc.first;
|
||||
|
@ -2424,7 +2425,7 @@ static uint32_t dqueue_thread (struct nn_dqueue *q)
|
|||
if (ddsrt_atomic_dec32_ov (&q->nof_samples) == 1) {
|
||||
ddsrt_cond_broadcast (&q->cond);
|
||||
}
|
||||
thread_state_awake (self);
|
||||
thread_state_awake_to_awake_no_nest (ts1);
|
||||
switch (dqueue_elem_kind (e))
|
||||
{
|
||||
case DQEK_DATA:
|
||||
|
@ -2474,9 +2475,9 @@ static uint32_t dqueue_thread (struct nn_dqueue *q)
|
|||
break;
|
||||
}
|
||||
}
|
||||
thread_state_asleep (self);
|
||||
}
|
||||
|
||||
thread_state_asleep (ts1);
|
||||
ddsrt_mutex_lock (&q->lock);
|
||||
}
|
||||
ddsrt_mutex_unlock (&q->lock);
|
||||
|
@ -2506,7 +2507,7 @@ struct nn_dqueue *nn_dqueue_new (const char *name, uint32_t max_samples, nn_dque
|
|||
if ((thrname = ddsrt_malloc (thrnamesz)) == NULL)
|
||||
goto fail_thrname;
|
||||
snprintf (thrname, thrnamesz, "dq.%s", name);
|
||||
if ((q->ts = create_thread (thrname, (uint32_t (*) (void *)) dqueue_thread, q)) == NULL)
|
||||
if (create_thread (&q->ts, thrname, (uint32_t (*) (void *)) dqueue_thread, q) != DDS_RETCODE_OK)
|
||||
goto fail_thread;
|
||||
ddsrt_free (thrname);
|
||||
return q;
|
||||
|
|
|
@ -2660,9 +2660,9 @@ static struct receiver_state *rst_cow_if_needed (int *rst_live, struct nn_rmsg *
|
|||
|
||||
static int handle_submsg_sequence
|
||||
(
|
||||
struct thread_state1 * const ts1,
|
||||
ddsi_tran_conn_t conn,
|
||||
const nn_locator_t *srcloc,
|
||||
struct thread_state1 * const self,
|
||||
nn_wctime_t tnowWC,
|
||||
nn_etime_t tnowE,
|
||||
const nn_guid_prefix_t * const src_prefix,
|
||||
|
@ -2709,10 +2709,12 @@ static int handle_submsg_sequence
|
|||
ts_for_latmeas = 0;
|
||||
timestamp = invalid_ddsi_timestamp;
|
||||
|
||||
assert (thread_is_asleep ());
|
||||
thread_state_awake (ts1);
|
||||
while (submsg <= (end - sizeof (SubmessageHeader_t)))
|
||||
{
|
||||
Submessage_t *sm = (Submessage_t *) submsg;
|
||||
int byteswap;
|
||||
bool byteswap;
|
||||
unsigned octetsToNextHeader;
|
||||
|
||||
if (sm->smhdr.flags & SMFLAG_ENDIANNESS)
|
||||
|
@ -2749,7 +2751,7 @@ static int handle_submsg_sequence
|
|||
break;
|
||||
}
|
||||
|
||||
thread_state_awake (self);
|
||||
thread_state_awake_to_awake_no_nest (ts1);
|
||||
state_smkind = sm->smhdr.submessageId;
|
||||
switch (sm->smhdr.submessageId)
|
||||
{
|
||||
|
@ -2855,9 +2857,7 @@ static int handle_submsg_sequence
|
|||
unsigned char *datap;
|
||||
/* valid_Data does not validate the payload */
|
||||
if (!valid_Data (rst, rmsg, &sm->data, submsg_size, byteswap, &sampleinfo, &datap))
|
||||
{
|
||||
goto malformed;
|
||||
}
|
||||
sampleinfo.timestamp = timestamp;
|
||||
sampleinfo.reception_timestamp = tnowWC;
|
||||
handle_Data (rst, tnowE, rmsg, &sm->data, submsg_size, &sampleinfo, datap);
|
||||
|
@ -2886,8 +2886,10 @@ static int handle_submsg_sequence
|
|||
size_t len2 = decode_container (submsg1, len1);
|
||||
if ( len2 != 0 ) {
|
||||
TRACE ((")\n"));
|
||||
if (handle_submsg_sequence (conn, srcloc, self, tnowWC, tnowE, src_prefix, dst_prefix, msg, (size_t) (submsg1 - msg) + len2, submsg1, rmsg) < 0)
|
||||
goto malformed;
|
||||
thread_state_asleep (ts1);
|
||||
if (handle_submsg_sequence (conn, srcloc, tnowWC, tnowE, src_prefix, dst_prefix, msg, (size_t) (submsg1 - msg) + len2, submsg1, rmsg) < 0)
|
||||
goto malformed_asleep;
|
||||
thread_state_awake (ts1);
|
||||
}
|
||||
TRACE (("PT_INFO_CONTAINER END"));
|
||||
}
|
||||
|
@ -2952,19 +2954,24 @@ static int handle_submsg_sequence
|
|||
state = "parse:shortmsg";
|
||||
state_smkind = SMID_PAD;
|
||||
DDS_TRACE("short (size %"PRIuSIZE" exp %p act %p)", submsg_size, (void *) submsg, (void *) end);
|
||||
goto malformed;
|
||||
goto malformed_asleep;
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
assert (thread_is_asleep ());
|
||||
return 0;
|
||||
|
||||
malformed:
|
||||
|
||||
thread_state_asleep (ts1);
|
||||
assert (thread_is_asleep ());
|
||||
malformed_asleep:
|
||||
assert (thread_is_asleep ());
|
||||
malformed_packet_received (msg, submsg, len, state, state_smkind, hdr->vendorid);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static bool do_packet
|
||||
(
|
||||
struct thread_state1 *self,
|
||||
struct thread_state1 * const ts1,
|
||||
ddsi_tran_conn_t conn,
|
||||
const nn_guid_prefix_t * guidprefix,
|
||||
struct nn_rbufpool *rbpool
|
||||
|
@ -3052,7 +3059,7 @@ static bool do_packet
|
|||
if (sz > 0 && !gv.deaf)
|
||||
{
|
||||
nn_rmsg_setsize (rmsg, (uint32_t) sz);
|
||||
assert (vtime_asleep_p (self->vtime));
|
||||
assert (thread_is_asleep ());
|
||||
|
||||
if ((size_t)sz < RTPS_MESSAGE_HEADER_SIZE || *(uint32_t *)buff != NN_PROTOCOLID_AS_UINT32)
|
||||
{
|
||||
|
@ -3078,9 +3085,8 @@ static bool do_packet
|
|||
PGUIDPREFIX (hdr->guid_prefix), hdr->vendorid.id[0], hdr->vendorid.id[1], (unsigned long) sz, addrstr);
|
||||
}
|
||||
|
||||
handle_submsg_sequence (conn, &srcloc, self, now (), now_et (), &hdr->guid_prefix, guidprefix, buff, (size_t) sz, buff + RTPS_MESSAGE_HEADER_SIZE, rmsg);
|
||||
handle_submsg_sequence (ts1, conn, &srcloc, now (), now_et (), &hdr->guid_prefix, guidprefix, buff, (size_t) sz, buff + RTPS_MESSAGE_HEADER_SIZE, rmsg);
|
||||
}
|
||||
thread_state_asleep (self);
|
||||
}
|
||||
nn_rmsg_commit (rmsg);
|
||||
return (sz > 0);
|
||||
|
@ -3144,13 +3150,13 @@ static void local_participant_set_fini (struct local_participant_set *lps)
|
|||
ddsrt_free (lps->ps);
|
||||
}
|
||||
|
||||
static void rebuild_local_participant_set (struct thread_state1 *self, struct local_participant_set *lps)
|
||||
static void rebuild_local_participant_set (struct thread_state1 * const ts1, struct local_participant_set *lps)
|
||||
{
|
||||
struct ephash_enum_participant est;
|
||||
struct participant *pp;
|
||||
unsigned nps_alloc;
|
||||
DDS_TRACE("pp set gen changed: local %u global %"PRIu32"\n", lps->gen, ddsrt_atomic_ld32(&gv.participant_set_generation));
|
||||
thread_state_awake (self);
|
||||
thread_state_awake (ts1);
|
||||
restart:
|
||||
lps->gen = ddsrt_atomic_ld32 (&gv.participant_set_generation);
|
||||
/* Actual local set of participants may never be older than the
|
||||
|
@ -3195,7 +3201,7 @@ static void rebuild_local_participant_set (struct thread_state1 *self, struct lo
|
|||
DDS_TRACE(" set changed - restarting\n");
|
||||
goto restart;
|
||||
}
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
|
||||
/* The definition of the hash enumeration allows visiting one
|
||||
participant multiple times, so guard against that, too. Note
|
||||
|
@ -3361,8 +3367,8 @@ void trigger_recv_threads (void)
|
|||
|
||||
uint32_t recv_thread (void *vrecv_thread_arg)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct recv_thread_arg *recv_thread_arg = vrecv_thread_arg;
|
||||
struct thread_state1 *self = lookup_thread_state ();
|
||||
struct nn_rbufpool *rbpool = recv_thread_arg->rbpool;
|
||||
os_sockWaitset waitset = recv_thread_arg->mode == RTM_MANY ? recv_thread_arg->u.many.ws : NULL;
|
||||
nn_mtime_t next_thread_cputime = { 0 };
|
||||
|
@ -3373,7 +3379,7 @@ uint32_t recv_thread (void *vrecv_thread_arg)
|
|||
while (gv.rtps_keepgoing)
|
||||
{
|
||||
LOG_THREAD_CPUTIME (next_thread_cputime);
|
||||
(void) do_packet (self, recv_thread_arg->u.single.conn, NULL, rbpool);
|
||||
(void) do_packet (ts1, recv_thread_arg->u.single.conn, NULL, rbpool);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -3422,7 +3428,7 @@ uint32_t recv_thread (void *vrecv_thread_arg)
|
|||
{
|
||||
/* first rebuild local participant set - unless someone's toggling "deafness", this
|
||||
only happens when the participant set has changed, so might as well rebuild it */
|
||||
rebuild_local_participant_set (self, &lps);
|
||||
rebuild_local_participant_set (ts1, &lps);
|
||||
os_sockWaitsetPurge (waitset, num_fixed);
|
||||
for (i = 0; i < lps.nps; i++)
|
||||
{
|
||||
|
@ -3443,7 +3449,7 @@ uint32_t recv_thread (void *vrecv_thread_arg)
|
|||
else
|
||||
guid_prefix = &lps.ps[(unsigned)idx - num_fixed].guid_prefix;
|
||||
/* Process message and clean out connection if failed or closed */
|
||||
if (!do_packet (self, conn, guid_prefix, rbpool) && !conn->m_connless)
|
||||
if (!do_packet (ts1, conn, guid_prefix, rbpool) && !conn->m_connless)
|
||||
ddsi_conn_free (conn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#include "dds/ddsrt/misc.h"
|
||||
|
||||
#include "dds/ddsi/q_thread.h"
|
||||
#include "dds/ddsi/q_servicelease.h"
|
||||
#include "dds/ddsi/ddsi_threadmon.h"
|
||||
#include "dds/ddsi/q_error.h"
|
||||
#include "dds/ddsi/q_log.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
|
@ -34,6 +34,16 @@ static char main_thread_name[] = "main";
|
|||
struct thread_states thread_states;
|
||||
ddsrt_thread_local struct thread_state1 *tsd_thread_state;
|
||||
|
||||
extern inline bool vtime_awake_p (vtime_t vtime);
|
||||
extern inline bool vtime_asleep_p (vtime_t vtime);
|
||||
extern inline bool vtime_gt (vtime_t vtime1, vtime_t vtime0);
|
||||
|
||||
extern inline struct thread_state1 *lookup_thread_state (void);
|
||||
extern inline bool thread_is_asleep (void);
|
||||
extern inline bool thread_is_awake (void);
|
||||
extern inline void thread_state_asleep (struct thread_state1 *ts1);
|
||||
extern inline void thread_state_awake (struct thread_state1 *ts1);
|
||||
extern inline void thread_state_awake_to_awake_no_nest (struct thread_state1 *ts1);
|
||||
|
||||
void *ddsrt_malloc_aligned_cacheline (size_t size)
|
||||
{
|
||||
|
@ -63,9 +73,10 @@ static void ddsrt_free_aligned (void *ptr)
|
|||
|
||||
void thread_states_init_static (void)
|
||||
{
|
||||
static struct thread_state1 ts =
|
||||
{ .state = THREAD_STATE_ALIVE, .vtime = 1, .watchdog = 1, .name = "(anon)" };
|
||||
tsd_thread_state = &ts;
|
||||
static struct thread_state1 ts = {
|
||||
.state = THREAD_STATE_ALIVE, .vtime = 0u, .name = "(anon)"
|
||||
};
|
||||
tsd_thread_state = &ts;
|
||||
}
|
||||
|
||||
void thread_states_init (unsigned maxthreads)
|
||||
|
@ -82,8 +93,7 @@ DDSRT_WARNING_MSVC_OFF(6386);
|
|||
for (i = 0; i < thread_states.nthreads; i++)
|
||||
{
|
||||
thread_states.ts[i].state = THREAD_STATE_ZERO;
|
||||
thread_states.ts[i].vtime = 1;
|
||||
thread_states.ts[i].watchdog = 1;
|
||||
thread_states.ts[i].vtime = 0u;
|
||||
thread_states.ts[i].name = NULL;
|
||||
}
|
||||
DDSRT_WARNING_MSVC_ON(6386);
|
||||
|
@ -115,43 +125,9 @@ static void cleanup_thread_state (void *data)
|
|||
ddsrt_fini();
|
||||
}
|
||||
|
||||
struct thread_state1 *lookup_thread_state (void)
|
||||
{
|
||||
struct thread_state1 *ts1 = NULL;
|
||||
char name[128];
|
||||
ddsrt_thread_t thr;
|
||||
|
||||
if ((ts1 = tsd_thread_state) == NULL) {
|
||||
if ((ts1 = lookup_thread_state_real()) == NULL) {
|
||||
/* This situation only arises for threads that were not created using
|
||||
create_thread, aka application threads. Since registering thread
|
||||
state should be fully automatic the name is simply the identifier. */
|
||||
thr = ddsrt_thread_self();
|
||||
ddsrt_thread_getname(name, sizeof(name));
|
||||
ddsrt_mutex_lock(&thread_states.lock);
|
||||
ts1 = init_thread_state(name);
|
||||
if (ts1 != NULL) {
|
||||
ddsrt_init();
|
||||
ts1->extTid = thr;
|
||||
ts1->tid = thr;
|
||||
DDS_TRACE("started application thread %s\n", name);
|
||||
ddsrt_thread_cleanup_push(&cleanup_thread_state, NULL);
|
||||
}
|
||||
ddsrt_mutex_unlock(&thread_states.lock);
|
||||
}
|
||||
|
||||
tsd_thread_state = ts1;
|
||||
}
|
||||
|
||||
assert(ts1 != NULL);
|
||||
|
||||
return ts1;
|
||||
}
|
||||
|
||||
struct thread_state1 *lookup_thread_state_real (void)
|
||||
static struct thread_state1 *find_thread_state (ddsrt_thread_t tid)
|
||||
{
|
||||
if (thread_states.ts) {
|
||||
ddsrt_thread_t tid = ddsrt_thread_self ();
|
||||
unsigned i;
|
||||
for (i = 0; i < thread_states.nthreads; i++) {
|
||||
if (ddsrt_thread_equal (thread_states.ts[i].tid, tid)) {
|
||||
|
@ -162,6 +138,40 @@ struct thread_state1 *lookup_thread_state_real (void)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static struct thread_state1 *lazy_create_thread_state (ddsrt_thread_t self)
|
||||
{
|
||||
/* This situation only arises for threads that were not created using
|
||||
create_thread, aka application threads. Since registering thread
|
||||
state should be fully automatic the name is simply the identifier. */
|
||||
struct thread_state1 *ts1;
|
||||
char name[128];
|
||||
ddsrt_thread_getname (name, sizeof (name));
|
||||
ddsrt_mutex_lock (&thread_states.lock);
|
||||
if ((ts1 = init_thread_state (name)) != NULL) {
|
||||
ddsrt_init ();
|
||||
ts1->extTid = self;
|
||||
ts1->tid = self;
|
||||
DDS_TRACE ("started application thread %s\n", name);
|
||||
ddsrt_thread_cleanup_push (&cleanup_thread_state, NULL);
|
||||
}
|
||||
ddsrt_mutex_unlock (&thread_states.lock);
|
||||
return ts1;
|
||||
}
|
||||
|
||||
struct thread_state1 *lookup_thread_state_real (void)
|
||||
{
|
||||
struct thread_state1 *ts1 = tsd_thread_state;
|
||||
if (ts1 == NULL)
|
||||
{
|
||||
ddsrt_thread_t self = ddsrt_thread_self ();
|
||||
if ((ts1 = find_thread_state (self)) == NULL)
|
||||
ts1 = lazy_create_thread_state (self);
|
||||
tsd_thread_state = ts1;
|
||||
}
|
||||
assert(ts1 != NULL);
|
||||
return ts1;
|
||||
}
|
||||
|
||||
struct thread_context {
|
||||
struct thread_state1 *self;
|
||||
uint32_t (*f) (void *arg);
|
||||
|
@ -221,7 +231,7 @@ const struct config_thread_properties_listelem *lookup_thread_properties (const
|
|||
return e;
|
||||
}
|
||||
|
||||
struct thread_state1 * init_thread_state (const char *tname)
|
||||
struct thread_state1 *init_thread_state (const char *tname)
|
||||
{
|
||||
int cand;
|
||||
struct thread_state1 *ts;
|
||||
|
@ -231,29 +241,29 @@ struct thread_state1 * init_thread_state (const char *tname)
|
|||
|
||||
ts = &thread_states.ts[cand];
|
||||
if (ts->state == THREAD_STATE_ZERO)
|
||||
{
|
||||
assert (vtime_asleep_p (ts->vtime));
|
||||
}
|
||||
ts->name = ddsrt_strdup (tname);
|
||||
ts->state = THREAD_STATE_ALIVE;
|
||||
|
||||
return ts;
|
||||
}
|
||||
|
||||
struct thread_state1 *create_thread (const char *name, uint32_t (*f) (void *arg), void *arg)
|
||||
dds_retcode_t create_thread (struct thread_state1 **ts1, const char *name, uint32_t (*f) (void *arg), void *arg)
|
||||
{
|
||||
struct config_thread_properties_listelem const * const tprops = lookup_thread_properties (name);
|
||||
ddsrt_threadattr_t tattr;
|
||||
struct thread_state1 *ts1;
|
||||
ddsrt_thread_t tid;
|
||||
struct thread_context *ctxt;
|
||||
ctxt = ddsrt_malloc (sizeof (*ctxt));
|
||||
ddsrt_mutex_lock (&thread_states.lock);
|
||||
|
||||
ts1 = init_thread_state (name);
|
||||
|
||||
if (ts1 == NULL)
|
||||
*ts1 = init_thread_state (name);
|
||||
if (*ts1 == NULL)
|
||||
goto fatal;
|
||||
|
||||
ctxt->self = ts1;
|
||||
ctxt->self = *ts1;
|
||||
ctxt->f = f;
|
||||
ctxt->arg = arg;
|
||||
ddsrt_threadattr_init (&tattr);
|
||||
|
@ -269,26 +279,27 @@ struct thread_state1 *create_thread (const char *name, uint32_t (*f) (void *arg)
|
|||
|
||||
if (ddsrt_thread_create (&tid, name, &tattr, &create_thread_wrapper, ctxt) != DDS_RETCODE_OK)
|
||||
{
|
||||
ts1->state = THREAD_STATE_ZERO;
|
||||
(*ts1)->state = THREAD_STATE_ZERO;
|
||||
DDS_FATAL("create_thread: %s: ddsrt_thread_create failed\n", name);
|
||||
goto fatal;
|
||||
}
|
||||
ts1->extTid = tid; /* overwrite the temporary value with the correct external one */
|
||||
(*ts1)->extTid = tid; /* overwrite the temporary value with the correct external one */
|
||||
ddsrt_mutex_unlock (&thread_states.lock);
|
||||
return ts1;
|
||||
return DDS_RETCODE_OK;
|
||||
fatal:
|
||||
ddsrt_mutex_unlock (&thread_states.lock);
|
||||
ddsrt_free (ctxt);
|
||||
*ts1 = NULL;
|
||||
abort ();
|
||||
return NULL;
|
||||
return DDS_RETCODE_ERROR;
|
||||
}
|
||||
|
||||
static void reap_thread_state (struct thread_state1 *ts1, int sync_with_servicelease)
|
||||
{
|
||||
ddsrt_mutex_lock (&thread_states.lock);
|
||||
ts1->state = THREAD_STATE_ZERO;
|
||||
if (sync_with_servicelease && gv.servicelease)
|
||||
nn_servicelease_statechange_barrier (gv.servicelease);
|
||||
if (sync_with_servicelease && gv.threadmon)
|
||||
ddsi_threadmon_statechange_barrier (gv.threadmon);
|
||||
if (ts1->name != main_thread_name)
|
||||
ddsrt_free (ts1->name);
|
||||
ddsrt_mutex_unlock (&thread_states.lock);
|
||||
|
@ -319,7 +330,7 @@ void reset_thread_state (struct thread_state1 *ts1)
|
|||
void downgrade_main_thread (void)
|
||||
{
|
||||
struct thread_state1 *ts1 = lookup_thread_state ();
|
||||
thread_state_asleep (ts1);
|
||||
assert (vtime_asleep_p (ts1->vtime));
|
||||
/* no need to sync with service lease: already stopped */
|
||||
reap_thread_state (ts1, 0);
|
||||
thread_states_init_static ();
|
||||
|
|
|
@ -1,21 +0,0 @@
|
|||
/*
|
||||
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
|
||||
*
|
||||
* This program and the accompanying materials are made available under the
|
||||
* terms of the Eclipse Public License v. 2.0 which is available at
|
||||
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
|
||||
* v. 1.0 which is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||
*/
|
||||
#include "dds/ddsi/q_thread.h"
|
||||
|
||||
extern inline int vtime_awake_p (vtime_t vtime);
|
||||
extern inline int vtime_asleep_p (vtime_t vtime);
|
||||
extern inline int vtime_gt (vtime_t vtime1, vtime_t vtime0);
|
||||
|
||||
extern inline void thread_state_asleep (struct thread_state1 *ts1);
|
||||
extern inline void thread_state_awake (struct thread_state1 *ts1);
|
||||
extern inline void thread_state_blocked (struct thread_state1 *ts1);
|
||||
extern inline void thread_state_unblocked (struct thread_state1 *ts1);
|
|
@ -911,7 +911,7 @@ static int writer_may_continue (const struct writer *wr, const struct whc_state
|
|||
}
|
||||
|
||||
|
||||
static dds_retcode_t throttle_writer (struct nn_xpack *xp, struct writer *wr)
|
||||
static dds_retcode_t throttle_writer (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr)
|
||||
{
|
||||
/* Sleep (cond_wait) without updating the thread's vtime: the
|
||||
garbage collector won't free the writer while we leave it
|
||||
|
@ -956,7 +956,7 @@ static dds_retcode_t throttle_writer (struct nn_xpack *xp, struct writer *wr)
|
|||
{
|
||||
ASSERT_MUTEX_HELD (&wr->e.lock);
|
||||
assert (wr->throttling == 0);
|
||||
assert (vtime_awake_p (lookup_thread_state ()->vtime));
|
||||
assert (thread_is_awake ());
|
||||
assert (!is_builtin_entityid(wr->e.guid.entityid, NN_VENDORID_ECLIPSE));
|
||||
}
|
||||
|
||||
|
@ -988,10 +988,10 @@ static dds_retcode_t throttle_writer (struct nn_xpack *xp, struct writer *wr)
|
|||
result = DDS_RETCODE_TIMEOUT;
|
||||
if (reltimeout > 0)
|
||||
{
|
||||
thread_state_asleep (lookup_thread_state());
|
||||
thread_state_asleep (ts1);
|
||||
if (ddsrt_cond_waitfor (&wr->throttle_cond, &wr->e.lock, reltimeout))
|
||||
result = DDS_RETCODE_OK;
|
||||
thread_state_awake (lookup_thread_state());
|
||||
thread_state_awake (ts1);
|
||||
whc_get_state(wr->whc, &whcst);
|
||||
}
|
||||
if (result == DDS_RETCODE_TIMEOUT)
|
||||
|
@ -1028,7 +1028,7 @@ static int maybe_grow_whc (struct writer *wr)
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, int end_of_txn, int gc_allowed)
|
||||
static int write_sample_eot (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk, int end_of_txn, int gc_allowed)
|
||||
{
|
||||
int r;
|
||||
seqno_t seq;
|
||||
|
@ -1070,14 +1070,14 @@ static int write_sample_eot (struct nn_xpack *xp, struct writer *wr, struct nn_p
|
|||
dds_retcode_t ores;
|
||||
assert(gc_allowed); /* also see beginning of the function */
|
||||
if (config.prioritize_retransmit && wr->retransmitting)
|
||||
ores = throttle_writer (xp, wr);
|
||||
ores = throttle_writer (ts1, xp, wr);
|
||||
else
|
||||
{
|
||||
maybe_grow_whc (wr);
|
||||
if (whcst.unacked_bytes <= wr->whc_high)
|
||||
ores = DDS_RETCODE_OK;
|
||||
else
|
||||
ores = throttle_writer (xp, wr);
|
||||
ores = throttle_writer (ts1, xp, wr);
|
||||
}
|
||||
if (ores == DDS_RETCODE_TIMEOUT)
|
||||
{
|
||||
|
@ -1168,32 +1168,34 @@ drop:
|
|||
return r;
|
||||
}
|
||||
|
||||
int write_sample_gc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
|
||||
int write_sample_gc (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
|
||||
{
|
||||
return write_sample_eot (xp, wr, NULL, serdata, tk, 0, 1);
|
||||
return write_sample_eot (ts1, xp, wr, NULL, serdata, tk, 0, 1);
|
||||
}
|
||||
|
||||
int write_sample_nogc (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
|
||||
int write_sample_nogc (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
|
||||
{
|
||||
return write_sample_eot (xp, wr, NULL, serdata, tk, 0, 0);
|
||||
return write_sample_eot (ts1, xp, wr, NULL, serdata, tk, 0, 0);
|
||||
}
|
||||
|
||||
int write_sample_gc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata)
|
||||
int write_sample_gc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata)
|
||||
{
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
int res;
|
||||
assert (thread_is_awake ());
|
||||
tk = ddsi_tkmap_lookup_instance_ref (serdata);
|
||||
res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 1);
|
||||
res = write_sample_eot (ts1, xp, wr, NULL, serdata, tk, 0, 1);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
return res;
|
||||
}
|
||||
|
||||
int write_sample_nogc_notk (struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata)
|
||||
int write_sample_nogc_notk (struct thread_state1 * const ts1, struct nn_xpack *xp, struct writer *wr, struct ddsi_serdata *serdata)
|
||||
{
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
int res;
|
||||
assert (thread_is_awake ());
|
||||
tk = ddsi_tkmap_lookup_instance_ref (serdata);
|
||||
res = write_sample_eot (xp, wr, NULL, serdata, tk, 0, 0);
|
||||
res = write_sample_eot (ts1, xp, wr, NULL, serdata, tk, 0, 0);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -504,6 +504,7 @@ struct xeventq * xeventq_new
|
|||
|
||||
int xeventq_start (struct xeventq *evq, const char *name)
|
||||
{
|
||||
dds_retcode_t rc;
|
||||
char * evqname = "tev";
|
||||
assert (evq->ts == NULL);
|
||||
|
||||
|
@ -515,13 +516,13 @@ int xeventq_start (struct xeventq *evq, const char *name)
|
|||
}
|
||||
|
||||
evq->terminate = 0;
|
||||
evq->ts = create_thread (evqname, (uint32_t (*) (void *)) xevent_thread, evq);
|
||||
rc = create_thread (&evq->ts, evqname, (uint32_t (*) (void *)) xevent_thread, evq);
|
||||
|
||||
if (name)
|
||||
{
|
||||
ddsrt_free (evqname);
|
||||
}
|
||||
return (evq->ts == NULL) ? Q_ERR_UNSPECIFIED : 0;
|
||||
return (rc != DDS_RETCODE_OK) ? Q_ERR_UNSPECIFIED : 0;
|
||||
}
|
||||
|
||||
void xeventq_stop (struct xeventq *evq)
|
||||
|
@ -1114,7 +1115,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
|
|||
}
|
||||
}
|
||||
|
||||
static void write_pmd_message (struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind)
|
||||
static void write_pmd_message (struct thread_state1 * const ts1, struct nn_xpack *xp, struct participant *pp, unsigned pmd_kind)
|
||||
{
|
||||
#define PMD_DATA_LENGTH 1
|
||||
struct writer *wr;
|
||||
|
@ -1146,12 +1147,12 @@ static void write_pmd_message (struct nn_xpack *xp, struct participant *pp, unsi
|
|||
serdata->timestamp = now ();
|
||||
|
||||
tk = ddsi_tkmap_lookup_instance_ref(serdata);
|
||||
write_sample_nogc (xp, wr, serdata, tk);
|
||||
write_sample_nogc (ts1, xp, wr, serdata, tk);
|
||||
ddsi_tkmap_instance_unref(tk);
|
||||
#undef PMD_DATA_LENGTH
|
||||
}
|
||||
|
||||
static void handle_xevk_pmd_update (struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow)
|
||||
static void handle_xevk_pmd_update (struct thread_state1 * const ts1, struct nn_xpack *xp, struct xevent *ev, nn_mtime_t tnow)
|
||||
{
|
||||
struct participant *pp;
|
||||
int64_t intv;
|
||||
|
@ -1162,7 +1163,7 @@ static void handle_xevk_pmd_update (struct nn_xpack *xp, struct xevent *ev, nn_m
|
|||
return;
|
||||
}
|
||||
|
||||
write_pmd_message (xp, pp, PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
|
||||
write_pmd_message (ts1, xp, pp, PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE);
|
||||
|
||||
/* QoS changes can't change lease durations. So the only thing that
|
||||
could cause trouble here is that the addition or removal of a
|
||||
|
@ -1217,7 +1218,7 @@ static void handle_xevk_delete_writer (UNUSED_ARG (struct nn_xpack *xp), struct
|
|||
delete_xevent (ev);
|
||||
}
|
||||
|
||||
static void handle_individual_xevent (struct xevent *xev, struct nn_xpack *xp, nn_mtime_t tnow)
|
||||
static void handle_individual_xevent (struct thread_state1 * const ts1, struct xevent *xev, struct nn_xpack *xp, nn_mtime_t tnow)
|
||||
{
|
||||
switch (xev->kind)
|
||||
{
|
||||
|
@ -1231,7 +1232,7 @@ static void handle_individual_xevent (struct xevent *xev, struct nn_xpack *xp, n
|
|||
handle_xevk_spdp (xp, xev, tnow);
|
||||
break;
|
||||
case XEVK_PMD_UPDATE:
|
||||
handle_xevk_pmd_update (xp, xev, tnow);
|
||||
handle_xevk_pmd_update (ts1, xp, xev, tnow);
|
||||
break;
|
||||
case XEVK_END_STARTUP_MODE:
|
||||
handle_xevk_end_startup_mode (xp, xev, tnow);
|
||||
|
@ -1262,7 +1263,7 @@ static void handle_individual_xevent_nt (struct xevent_nt *xev, struct nn_xpack
|
|||
ddsrt_free (xev);
|
||||
}
|
||||
|
||||
static void handle_timed_xevent (struct thread_state1 *self, struct xevent *xev, struct nn_xpack *xp, nn_mtime_t tnow /* monotonic */)
|
||||
static void handle_timed_xevent (struct thread_state1 * const ts1, struct xevent *xev, struct nn_xpack *xp, nn_mtime_t tnow /* monotonic */)
|
||||
{
|
||||
/* This function handles the individual xevent irrespective of
|
||||
whether it is a "timed" or "non-timed" xevent */
|
||||
|
@ -1276,14 +1277,13 @@ static void handle_timed_xevent (struct thread_state1 *self, struct xevent *xev,
|
|||
assert (xev->tsched.v != TSCHED_DELETE);
|
||||
|
||||
ddsrt_mutex_unlock (&xevq->lock);
|
||||
thread_state_awake (self);
|
||||
handle_individual_xevent (xev, xp, tnow /* monotonic */);
|
||||
handle_individual_xevent (ts1, xev, xp, tnow /* monotonic */);
|
||||
ddsrt_mutex_lock (&xevq->lock);
|
||||
|
||||
ASSERT_MUTEX_HELD (&xevq->lock);
|
||||
}
|
||||
|
||||
static void handle_nontimed_xevent (struct thread_state1 *self, struct xevent_nt *xev, struct nn_xpack *xp)
|
||||
static void handle_nontimed_xevent (struct xevent_nt *xev, struct nn_xpack *xp)
|
||||
{
|
||||
/* This function handles the individual xevent irrespective of
|
||||
whether it is a "timed" or "non-timed" xevent */
|
||||
|
@ -1296,7 +1296,6 @@ static void handle_nontimed_xevent (struct thread_state1 *self, struct xevent_nt
|
|||
assert (xev->evq == xevq);
|
||||
|
||||
ddsrt_mutex_unlock (&xevq->lock);
|
||||
thread_state_awake (self);
|
||||
handle_individual_xevent_nt (xev, xp);
|
||||
/* non-timed xevents are freed by the handlers */
|
||||
ddsrt_mutex_lock (&xevq->lock);
|
||||
|
@ -1304,11 +1303,12 @@ static void handle_nontimed_xevent (struct thread_state1 *self, struct xevent_nt
|
|||
ASSERT_MUTEX_HELD (&xevq->lock);
|
||||
}
|
||||
|
||||
static void handle_xevents (struct thread_state1 *self, struct xeventq *xevq, struct nn_xpack *xp, nn_mtime_t tnow /* monotonic */)
|
||||
static void handle_xevents (struct thread_state1 * const ts1, struct xeventq *xevq, struct nn_xpack *xp, nn_mtime_t tnow /* monotonic */)
|
||||
{
|
||||
int xeventsToProcess = 1;
|
||||
|
||||
ASSERT_MUTEX_HELD (&xevq->lock);
|
||||
assert (thread_is_awake ());
|
||||
|
||||
/* The following loops give priority to the "timed" events (heartbeats,
|
||||
acknacks etc) if there are any. The algorithm is that we handle all
|
||||
|
@ -1334,7 +1334,8 @@ static void handle_xevents (struct thread_state1 *self, struct xeventq *xevq, st
|
|||
scheduled or not), so set to TSCHED_NEVER to indicate it
|
||||
currently isn't. */
|
||||
xev->tsched.v = T_NEVER;
|
||||
handle_timed_xevent (self, xev, xp, tnow);
|
||||
thread_state_awake_to_awake_no_nest (ts1);
|
||||
handle_timed_xevent (ts1, xev, xp, tnow);
|
||||
}
|
||||
|
||||
/* Limited-bandwidth channels means events can take a LONG time
|
||||
|
@ -1345,7 +1346,8 @@ static void handle_xevents (struct thread_state1 *self, struct xeventq *xevq, st
|
|||
if (!non_timed_xmit_list_is_empty (xevq))
|
||||
{
|
||||
struct xevent_nt *xev = getnext_from_non_timed_xmit_list (xevq);
|
||||
handle_nontimed_xevent (self, xev, xp);
|
||||
thread_state_awake_to_awake_no_nest (ts1);
|
||||
handle_nontimed_xevent (xev, xp);
|
||||
tnow = now_mt ();
|
||||
}
|
||||
else
|
||||
|
@ -1359,7 +1361,7 @@ static void handle_xevents (struct thread_state1 *self, struct xeventq *xevq, st
|
|||
|
||||
static uint32_t xevent_thread (struct xeventq * xevq)
|
||||
{
|
||||
struct thread_state1 *self = lookup_thread_state ();
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct nn_xpack *xp;
|
||||
nn_mtime_t next_thread_cputime = { 0 };
|
||||
|
||||
|
@ -1372,15 +1374,13 @@ static uint32_t xevent_thread (struct xeventq * xevq)
|
|||
|
||||
LOG_THREAD_CPUTIME (next_thread_cputime);
|
||||
|
||||
handle_xevents (self, xevq, xp, tnow);
|
||||
|
||||
/* Send to the network unlocked, as it may sleep due to bandwidth
|
||||
limitation */
|
||||
thread_state_awake (ts1);
|
||||
handle_xevents (ts1, xevq, xp, tnow);
|
||||
/* Send to the network unlocked, as it may sleep due to bandwidth limitation */
|
||||
ddsrt_mutex_unlock (&xevq->lock);
|
||||
nn_xpack_send (xp, false);
|
||||
ddsrt_mutex_lock (&xevq->lock);
|
||||
|
||||
thread_state_asleep (self);
|
||||
thread_state_asleep (ts1);
|
||||
|
||||
if (!non_timed_xmit_list_is_empty (xevq) || xevq->terminate)
|
||||
{
|
||||
|
|
|
@ -1459,7 +1459,7 @@ void nn_xpack_sendq_init (void)
|
|||
|
||||
void nn_xpack_sendq_start (void)
|
||||
{
|
||||
gv.sendq_ts = create_thread("sendq", nn_xpack_sendq_thread, NULL);
|
||||
create_thread (&gv.sendq_ts, "sendq", nn_xpack_sendq_thread, NULL);
|
||||
}
|
||||
|
||||
void nn_xpack_sendq_stop (void)
|
||||
|
|
|
@ -51,14 +51,14 @@ void log_stacktrace (const char *name, ddsrt_thread_t tid)
|
|||
if (dds_get_log_mask() == 0)
|
||||
; /* no op if nothing logged */
|
||||
else if (!config.noprogress_log_stacktraces)
|
||||
DDS_LOG(~0u, "-- stack trace of %s requested, but traces disabled --\n", name);
|
||||
DDS_LOG(~DDS_LC_FATAL, "-- stack trace of %s requested, but traces disabled --\n", name);
|
||||
else
|
||||
{
|
||||
const dds_time_t d = 1000000;
|
||||
struct sigaction act, oact;
|
||||
char **strs;
|
||||
int i;
|
||||
DDS_LOG(~0u, "-- stack trace of %s requested --\n", name);
|
||||
DDS_LOG(~DDS_LC_FATAL, "-- stack trace of %s requested --\n", name);
|
||||
act.sa_handler = log_stacktrace_sigh;
|
||||
act.sa_flags = 0;
|
||||
sigfillset (&act.sa_mask);
|
||||
|
@ -70,15 +70,15 @@ void log_stacktrace (const char *name, ddsrt_thread_t tid)
|
|||
dds_sleepfor (d);
|
||||
sigaction (SIGXCPU, &oact, NULL);
|
||||
if (pthread_kill (tid.v, 0) != 0)
|
||||
DDS_LOG(~0u, "-- thread exited --\n");
|
||||
DDS_LOG(~DDS_LC_FATAL, "-- thread exited --\n");
|
||||
else
|
||||
{
|
||||
DDS_LOG(~0u, "-- stack trace follows --\n");
|
||||
DDS_LOG(~DDS_LC_FATAL, "-- stack trace follows --\n");
|
||||
strs = backtrace_symbols (log_stacktrace_stk.stk, log_stacktrace_stk.depth);
|
||||
for (i = 0; i < log_stacktrace_stk.depth; i++)
|
||||
DDS_LOG(~0u, "%s\n", strs[i]);
|
||||
DDS_LOG(~DDS_LC_FATAL, "%s\n", strs[i]);
|
||||
free (strs);
|
||||
DDS_LOG(~0u, "-- end of stack trace --\n");
|
||||
DDS_LOG(~DDS_LC_FATAL, "-- end of stack trace --\n");
|
||||
}
|
||||
ddsrt_atomic_st32 (&log_stacktrace_flag, 0);
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ static uint64_t store (struct rhc *rhc, struct proxy_writer *wr, struct ddsi_ser
|
|||
/* beware: unrefs sd */
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
struct proxy_writer_info pwr_info;
|
||||
thread_state_awake (mainthread);
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
tk = ddsi_tkmap_lookup_instance_ref(sd);
|
||||
uint64_t iid = tk->m_iid;
|
||||
if (print)
|
||||
|
@ -132,7 +132,7 @@ static uint64_t store (struct rhc *rhc, struct proxy_writer *wr, struct ddsi_ser
|
|||
pwr_info.ownership_strength = wr->c.xqos->ownership_strength.value;
|
||||
dds_rhc_store (rhc, &pwr_info, sd, tk);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
thread_state_asleep (mainthread);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
ddsi_serdata_unref (sd);
|
||||
return iid;
|
||||
}
|
||||
|
@ -171,18 +171,18 @@ static struct rhc *mkrhc (dds_reader *rd, nn_history_kind_t hk, int32_t hdepth,
|
|||
rqos.history.depth = hdepth;
|
||||
rqos.destination_order.kind = dok;
|
||||
nn_xqos_mergein_missing (&rqos, &gv.default_xqos_rd);
|
||||
thread_state_awake (mainthread);
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
rhc = dds_rhc_new (rd, mdtopic);
|
||||
dds_rhc_set_qos(rhc, &rqos);
|
||||
thread_state_asleep (mainthread);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
return rhc;
|
||||
}
|
||||
|
||||
static void frhc (struct rhc *rhc)
|
||||
{
|
||||
thread_state_awake (mainthread);
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
dds_rhc_free (rhc);
|
||||
thread_state_asleep (mainthread);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
}
|
||||
|
||||
static char si2is (const dds_sample_info_t *si)
|
||||
|
@ -288,9 +288,9 @@ static void rdtkcond (struct rhc *rhc, dds_readcond *cond, const struct check *c
|
|||
if (print)
|
||||
printf ("%s:\n", opname);
|
||||
|
||||
thread_state_awake (mainthread);
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
cnt = op (rhc, true, rres_ptrs, rres_iseq, (max <= 0) ? (uint32_t) (sizeof (rres_iseq) / sizeof (rres_iseq[0])) : (uint32_t) max, cond ? NO_STATE_MASK_SET : (DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE), 0, cond);
|
||||
thread_state_asleep (mainthread);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
if (max > 0 && cnt > max) {
|
||||
printf ("%s TOO MUCH DATA (%d > %d)\n", opname, cnt, max);
|
||||
abort ();
|
||||
|
@ -764,8 +764,8 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count,
|
|||
tkcond (rhc[k], rhcconds[cond], NULL, 1, print && k == 0, states_seen);
|
||||
break;
|
||||
}
|
||||
case 11:
|
||||
thread_state_awake (mainthread);
|
||||
case 11: {
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
struct proxy_writer_info wr_info;
|
||||
wr_info.auto_dispose = wr[which]->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances;
|
||||
wr_info.guid = wr[which]->e.guid;
|
||||
|
@ -773,8 +773,9 @@ static void test_conditions (dds_entity_t pp, dds_entity_t tp, const int count,
|
|||
wr_info.ownership_strength = wr[which]->c.xqos->ownership_strength.value;
|
||||
for (size_t k = 0; k < nrd; k++)
|
||||
dds_rhc_unregister_wr (rhc[k], &wr_info);
|
||||
thread_state_asleep (mainthread);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((i % 200) == 0)
|
||||
|
@ -869,14 +870,14 @@ int main (int argc, char **argv)
|
|||
{ 0, 0, 0, 0, 0, 0, 0, 0 }
|
||||
};
|
||||
rdall (rhc, c1, print, states_seen);
|
||||
thread_state_awake (mainthread);
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
struct proxy_writer_info wr0_info;
|
||||
wr0_info.auto_dispose = wr0->c.xqos->writer_data_lifecycle.autodispose_unregistered_instances;
|
||||
wr0_info.guid = wr0->e.guid;
|
||||
wr0_info.iid = wr0->e.iid;
|
||||
wr0_info.ownership_strength = wr0->c.xqos->ownership_strength.value;
|
||||
dds_rhc_unregister_wr (rhc, &wr0_info);
|
||||
thread_state_asleep (mainthread);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
const struct check c2[] = {
|
||||
{ "ROU", iid0, wr0->e.iid, 0,0, 1, 0,1 },
|
||||
{ "ROU", iid0, 0, 0,0, 0, 0,0 },
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue