timely initialization of builtin topics (#138)

The rtps_init function used to initialize all data structures and start
all threads used by the protocol stack, allowing discovery of remote
entities before the built-in topic data structures were initialized.
(Very) early discovery of a remote participant thus led to a crash.

This commit splits the initialisation, providing a separate function for
starting, in particular, the threads receiving data from the network.
In terms of threads created, it matches exactly with the rtps_stop /
rtps_fini split that already existed to address the exact same problem
on termination.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-03-27 09:15:41 +01:00
parent b9406b2dee
commit f0f76454c7
3 changed files with 67 additions and 47 deletions

View file

@ -113,6 +113,7 @@ dds_init(dds_domainid_t domain)
goto fail_rtps_config;
}
upgrade_main_thread();
ut_avlInit(&dds_domaintree_def, &dds_global.m_domains);
/* Start monitoring the liveliness of all threads. */
@ -129,7 +130,7 @@ dds_init(dds_domainid_t domain)
}
}
if (rtps_init() < 0)
if (rtps_init () < 0)
{
DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
@ -138,6 +139,13 @@ dds_init(dds_domainid_t domain)
dds__builtin_init ();
if (rtps_start () < 0)
{
DDS_LOG(DDS_LC_CONFIG, "Failed to start RTPS\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
goto fail_rtps_start;
}
if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
{
DDS_ERROR("Failed to start the servicelease\n");
@ -145,8 +153,6 @@ dds_init(dds_domainid_t domain)
goto fail_servicelease_start;
}
upgrade_main_thread();
/* Set additional default participant properties */
gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid();
@ -173,6 +179,8 @@ fail_servicelease_start:
if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease);
rtps_stop ();
fail_rtps_start:
dds__builtin_fini ();
rtps_fini ();
fail_rtps_init:
if (gv.servicelease)
@ -181,6 +189,7 @@ fail_rtps_init:
gv.servicelease = NULL;
}
fail_servicelease_new:
downgrade_main_thread ();
thread_states_fini();
fail_rtps_config:
fail_config_domainid:

View file

@ -76,6 +76,7 @@ struct cfgst;
int rtps_config_prep (struct cfgst *cfgst);
int rtps_config_open (void);
int rtps_init (void);
int rtps_start (void);
void ddsi_plugin_init (void);
void rtps_stop (void);
void rtps_fini (void);

View file

@ -1275,61 +1275,20 @@ int rtps_init (void)
gv.rtps_keepgoing = 1;
ddsrt_rwlock_init (&gv.qoslock);
{
int r;
gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL);
if ((r = xeventq_start (gv.xevents, NULL)) < 0)
{
DDS_FATAL("failed to start global event processing thread (%d)\n", r);
}
}
if (config.xpack_send_async)
{
nn_xpack_sendq_init();
nn_xpack_sendq_start();
}
gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL);
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
/* Create a delivery queue and start tev for each channel */
{
struct config_channel_listelem * chptr = config.channels;
while (chptr)
{
chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
if (chptr->evq)
{
int r;
if ((r = xeventq_start (chptr->evq, chptr->name)) < 0)
DDS_FATAL("failed to start event processing thread for channel '%s' (%d)\n", chptr->name, r);
}
chptr = chptr->next;
}
}
for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next)
chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
#else
gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
#endif
if (setup_and_start_recv_threads () < 0)
{
DDS_FATAL("failed to start receive threads\n");
}
if (gv.listener)
{
gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
}
if (gv.startup_mode)
{
qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration));
}
if (config.monitor_port >= 0)
{
gv.debmon = new_debug_monitor (config.monitor_port);
}
return 0;
err_mc_conn:
@ -1402,6 +1361,57 @@ err_udp_tcp_init:
return -1;
}
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
static void stop_all_xeventq_upto (struct config_channel_listelem *chptr)
{
for (struct config_channel_listelem *chptr1 = config.channels; chptr1 != chptr; chptr1 = chptr1->next)
if (chptr1->evq)
xeventq_stop (chptr1->evq);
}
#endif
int rtps_start (void)
{
if (xeventq_start (gv.xevents, NULL) < 0)
return -1;
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next)
{
if (chptr->evq)
{
if (xeventq_start (chptr->evq, chptr->name) < 0)
{
stop_all_xeventq_upto (chptr);
xeventq_stop (gv.xevents);
return -1;
}
}
}
#endif
if (setup_and_start_recv_threads () < 0)
{
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
stop_all_xeventq_upto (NULL);
#endif
xeventq_stop (gv.xevents);
return -1;
}
if (gv.listener)
{
gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
}
if (gv.startup_mode)
{
qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration));
}
if (config.monitor_port >= 0)
{
gv.debmon = new_debug_monitor (config.monitor_port);
}
return 0;
}
struct dq_builtins_ready_arg {
ddsrt_mutex_t lock;
ddsrt_cond_t cond;