Liveliness monitoring to dds_global
Thread liveliness monitoring moves to dds_global and there is one monitor running if there is at least one domain that requests it. The synchronization over freeing the thread name when reaping the thread state is gone by no longer dynamically allocating the thread name. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
f05dd6ac07
commit
8a591fdc9b
7 changed files with 48 additions and 46 deletions
|
@ -309,6 +309,9 @@ typedef struct dds_globals {
|
|||
ddsrt_avl_tree_t m_domains;
|
||||
ddsrt_mutex_t m_mutex;
|
||||
|
||||
uint32_t threadmon_count;
|
||||
struct ddsi_threadmon *threadmon;
|
||||
|
||||
struct ddsi_sertopic *builtin_participant_topic;
|
||||
struct ddsi_sertopic *builtin_reader_topic;
|
||||
struct ddsi_sertopic *builtin_writer_topic;
|
||||
|
|
|
@ -93,16 +93,17 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
}
|
||||
|
||||
/* Start monitoring the liveliness of all threads. */
|
||||
if (!config.liveliness_monitoring)
|
||||
gv.threadmon = NULL;
|
||||
else
|
||||
if (config.liveliness_monitoring)
|
||||
{
|
||||
gv.threadmon = ddsi_threadmon_new ();
|
||||
if (gv.threadmon == NULL)
|
||||
if (++dds_global.threadmon_count == 0)
|
||||
{
|
||||
DDS_ERROR ("Failed to create a thread monitor\n");
|
||||
ret = DDS_RETCODE_OUT_OF_RESOURCES;
|
||||
goto fail_threadmon_new;
|
||||
dds_global.threadmon = ddsi_threadmon_new ();
|
||||
if (dds_global.threadmon == NULL)
|
||||
{
|
||||
DDS_ERROR ("Failed to create a thread liveliness monitor\n");
|
||||
ret = DDS_RETCODE_OUT_OF_RESOURCES;
|
||||
goto fail_threadmon_new;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -122,11 +123,14 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
goto fail_rtps_start;
|
||||
}
|
||||
|
||||
if (gv.threadmon && ddsi_threadmon_start (gv.threadmon) < 0)
|
||||
if (config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
{
|
||||
DDS_ERROR ("Failed to start the servicelease\n");
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
goto fail_threadmon_start;
|
||||
if (ddsi_threadmon_start (dds_global.threadmon) < 0)
|
||||
{
|
||||
DDS_ERROR ("Failed to start the thread liveliness monitor\n");
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
goto fail_threadmon_start;
|
||||
}
|
||||
}
|
||||
|
||||
/* Set additional default participant properties */
|
||||
|
@ -151,16 +155,19 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
return DDS_RETCODE_OK;
|
||||
|
||||
fail_threadmon_start:
|
||||
if (gv.threadmon)
|
||||
ddsi_threadmon_stop (gv.threadmon);
|
||||
if (config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
ddsi_threadmon_stop (dds_global.threadmon);
|
||||
rtps_stop ();
|
||||
fail_rtps_start:
|
||||
rtps_fini ();
|
||||
fail_rtps_init:
|
||||
if (gv.threadmon)
|
||||
if (config.liveliness_monitoring)
|
||||
{
|
||||
ddsi_threadmon_free (gv.threadmon);
|
||||
gv.threadmon = NULL;
|
||||
if (--dds_global.threadmon_count == 0)
|
||||
{
|
||||
ddsi_threadmon_free (dds_global.threadmon);
|
||||
dds_global.threadmon = NULL;
|
||||
}
|
||||
}
|
||||
fail_threadmon_new:
|
||||
downgrade_main_thread ();
|
||||
|
@ -174,14 +181,19 @@ fail_config:
|
|||
|
||||
static void dds_domain_fini (struct dds_domain *domain)
|
||||
{
|
||||
if (gv.threadmon)
|
||||
ddsi_threadmon_stop (gv.threadmon);
|
||||
if (config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
ddsi_threadmon_stop (dds_global.threadmon);
|
||||
rtps_stop ();
|
||||
dds__builtin_fini (domain);
|
||||
rtps_fini ();
|
||||
if (gv.threadmon)
|
||||
ddsi_threadmon_free (gv.threadmon);
|
||||
gv.threadmon = NULL;
|
||||
if (config.liveliness_monitoring)
|
||||
{
|
||||
if (--dds_global.threadmon_count == 0)
|
||||
{
|
||||
ddsi_threadmon_free (dds_global.threadmon);
|
||||
dds_global.threadmon = NULL;
|
||||
}
|
||||
}
|
||||
config_fini (domain->cfgst);
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ struct ddsi_threadmon *ddsi_threadmon_new (void);
|
|||
dds_return_t ddsi_threadmon_start (struct ddsi_threadmon *sl);
|
||||
void ddsi_threadmon_stop (struct ddsi_threadmon *sl);
|
||||
void ddsi_threadmon_free (struct ddsi_threadmon *sl);
|
||||
void ddsi_threadmon_statechange_barrier (struct ddsi_threadmon *sl);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
|
|
|
@ -101,7 +101,6 @@ struct q_globals {
|
|||
|
||||
/* Queue for garbage collection requests */
|
||||
struct gcreq_queue *gcreq_queue;
|
||||
struct ddsi_threadmon *threadmon;
|
||||
|
||||
/* Lease junk */
|
||||
ddsrt_mutex_t leaseheap_lock;
|
||||
|
|
|
@ -65,10 +65,10 @@ struct logbuf;
|
|||
*/
|
||||
#define THREAD_BASE \
|
||||
ddsrt_atomic_uint32_t vtime; \
|
||||
enum thread_state state; \
|
||||
ddsrt_thread_t tid; \
|
||||
ddsrt_thread_t extTid; \
|
||||
enum thread_state state; \
|
||||
char *name /* note: no semicolon! */
|
||||
char name[24] /* note: no semicolon! */
|
||||
|
||||
struct thread_state_base {
|
||||
THREAD_BASE;
|
||||
|
|
|
@ -180,12 +180,6 @@ dds_return_t ddsi_threadmon_start (struct ddsi_threadmon *sl)
|
|||
return DDS_RETCODE_ERROR;
|
||||
}
|
||||
|
||||
void ddsi_threadmon_statechange_barrier (struct ddsi_threadmon *sl)
|
||||
{
|
||||
ddsrt_mutex_lock (&sl->lock);
|
||||
ddsrt_mutex_unlock (&sl->lock);
|
||||
}
|
||||
|
||||
void ddsi_threadmon_stop (struct ddsi_threadmon *sl)
|
||||
{
|
||||
if (sl->keepgoing != -1)
|
||||
|
|
|
@ -93,7 +93,7 @@ void thread_states_init (unsigned maxthreads)
|
|||
{
|
||||
thread_states.ts[i].state = THREAD_STATE_ZERO;
|
||||
ddsrt_atomic_st32 (&thread_states.ts[i].vtime, 0);
|
||||
thread_states.ts[i].name = NULL;
|
||||
memset (thread_states.ts[i].name, 0, sizeof (thread_states.ts[i].name));
|
||||
}
|
||||
DDSRT_WARNING_MSVC_ON(6386);
|
||||
}
|
||||
|
@ -211,7 +211,8 @@ void upgrade_main_thread (void)
|
|||
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
|
||||
ts1->state = THREAD_STATE_LAZILY_CREATED;
|
||||
ts1->tid = ddsrt_thread_self ();
|
||||
ts1->name = main_thread_name;
|
||||
strncpy (ts1->name, main_thread_name, sizeof (ts1->name));
|
||||
ts1->name[sizeof (ts1->name) - 1] = 0;
|
||||
ddsrt_mutex_unlock (&thread_states.lock);
|
||||
tsd_thread_state = ts1;
|
||||
}
|
||||
|
@ -235,7 +236,8 @@ static struct thread_state1 *init_thread_state (const char *tname, enum thread_s
|
|||
|
||||
ts = &thread_states.ts[cand];
|
||||
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts->vtime)));
|
||||
ts->name = ddsrt_strdup (tname);
|
||||
strncpy (ts->name, tname, sizeof (ts->name));
|
||||
ts->name[sizeof (ts->name) - 1] = 0;
|
||||
ts->state = state;
|
||||
|
||||
return ts;
|
||||
|
@ -285,14 +287,10 @@ dds_return_t create_thread (struct thread_state1 **ts1, const char *name, uint32
|
|||
return DDS_RETCODE_ERROR;
|
||||
}
|
||||
|
||||
static void reap_thread_state (struct thread_state1 *ts1, int sync_with_servicelease)
|
||||
static void reap_thread_state (struct thread_state1 *ts1)
|
||||
{
|
||||
ddsrt_mutex_lock (&thread_states.lock);
|
||||
ts1->state = THREAD_STATE_ZERO;
|
||||
if (sync_with_servicelease && gv.threadmon)
|
||||
ddsi_threadmon_statechange_barrier (gv.threadmon);
|
||||
if (ts1->name != main_thread_name)
|
||||
ddsrt_free (ts1->name);
|
||||
ddsrt_mutex_unlock (&thread_states.lock);
|
||||
}
|
||||
|
||||
|
@ -302,17 +300,14 @@ dds_return_t join_thread (struct thread_state1 *ts1)
|
|||
assert (ts1->state == THREAD_STATE_ALIVE);
|
||||
ret = ddsrt_thread_join (ts1->extTid, NULL);
|
||||
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
|
||||
reap_thread_state (ts1, 1);
|
||||
reap_thread_state (ts1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void reset_thread_state (struct thread_state1 *ts1)
|
||||
{
|
||||
if (ts1)
|
||||
{
|
||||
reap_thread_state (ts1, 1);
|
||||
ts1->name = NULL;
|
||||
}
|
||||
reap_thread_state (ts1);
|
||||
}
|
||||
|
||||
void downgrade_main_thread (void)
|
||||
|
@ -320,7 +315,7 @@ void downgrade_main_thread (void)
|
|||
struct thread_state1 *ts1 = lookup_thread_state ();
|
||||
assert (vtime_asleep_p (ddsrt_atomic_ld32 (&ts1->vtime)));
|
||||
/* no need to sync with service lease: already stopped */
|
||||
reap_thread_state (ts1, 0);
|
||||
reap_thread_state (ts1);
|
||||
thread_states_init_static ();
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue