From f0f76454c7d261b2a515cfe9ad7a50392d6deb72 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 27 Mar 2019 09:15:41 +0100 Subject: [PATCH] timely initialization of builtin topics (#138) The rtps_init function used to initialize all data structures and start all threads used by the protocol stack, allowing discovery of remote entities before the built-in topic data structures were initialized. (Very) early discovery of a remote participant thus led to a crash. This commit splits the initialisation, providing a separate function for starting, in particular, the threads receiving data from the network. In terms of threads created, it matches exactly with the rtps_stop / rtps_fini split that already existed to address the exact same problem on termination. Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_init.c | 15 +++- src/core/ddsi/include/dds/ddsi/q_rtps.h | 1 + src/core/ddsi/src/q_init.c | 98 ++++++++++++++----------- 3 files changed, 67 insertions(+), 47 deletions(-) diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 1a434b5..3aa8751 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -113,6 +113,7 @@ dds_init(dds_domainid_t domain) goto fail_rtps_config; } + upgrade_main_thread(); ut_avlInit(&dds_domaintree_def, &dds_global.m_domains); /* Start monitoring the liveliness of all threads. */ @@ -129,7 +130,7 @@ dds_init(dds_domainid_t domain) } } - if (rtps_init() < 0) + if (rtps_init () < 0) { DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n"); ret = DDS_ERRNO(DDS_RETCODE_ERROR); @@ -138,6 +139,13 @@ dds_init(dds_domainid_t domain) dds__builtin_init (); + if (rtps_start () < 0) + { + DDS_LOG(DDS_LC_CONFIG, "Failed to start RTPS\n"); + ret = DDS_ERRNO(DDS_RETCODE_ERROR); + goto fail_rtps_start; + } + if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0) { DDS_ERROR("Failed to start the servicelease\n"); @@ -145,8 +153,6 @@ dds_init(dds_domainid_t domain) goto fail_servicelease_start; } - upgrade_main_thread(); - /* Set additional default participant properties */ gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid(); @@ -173,6 +179,8 @@ fail_servicelease_start: if (gv.servicelease) nn_servicelease_stop_renewing (gv.servicelease); rtps_stop (); +fail_rtps_start: + dds__builtin_fini (); rtps_fini (); fail_rtps_init: if (gv.servicelease) @@ -181,6 +189,7 @@ fail_rtps_init: gv.servicelease = NULL; } fail_servicelease_new: + downgrade_main_thread (); thread_states_fini(); fail_rtps_config: fail_config_domainid: diff --git a/src/core/ddsi/include/dds/ddsi/q_rtps.h b/src/core/ddsi/include/dds/ddsi/q_rtps.h index e9793d6..731fbd9 100644 --- a/src/core/ddsi/include/dds/ddsi/q_rtps.h +++ b/src/core/ddsi/include/dds/ddsi/q_rtps.h @@ -76,6 +76,7 @@ struct cfgst; int rtps_config_prep (struct cfgst *cfgst); int rtps_config_open (void); int rtps_init (void); +int rtps_start (void); void ddsi_plugin_init (void); void rtps_stop (void); void rtps_fini (void); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index d5ca562..97ae856 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -1275,61 +1275,20 @@ int rtps_init (void) gv.rtps_keepgoing = 1; ddsrt_rwlock_init (&gv.qoslock); - { - int r; - gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL); - if ((r = xeventq_start (gv.xevents, NULL)) < 0) - { - DDS_FATAL("failed to start global event processing thread (%d)\n", r); - } - } - if (config.xpack_send_async) { nn_xpack_sendq_init(); nn_xpack_sendq_start(); } + gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL); #ifdef DDSI_INCLUDE_NETWORK_CHANNELS - /* Create a delivery queue and start tev for each channel */ - { - struct config_channel_listelem * chptr = config.channels; - while (chptr) - { - chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL); - if (chptr->evq) - { - int r; - if ((r = xeventq_start (chptr->evq, chptr->name)) < 0) - DDS_FATAL("failed to start event processing thread for channel '%s' (%d)\n", chptr->name, r); - } - chptr = chptr->next; - } - } + for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next) + chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL); #else gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL); #endif - if (setup_and_start_recv_threads () < 0) - { - DDS_FATAL("failed to start receive threads\n"); - } - - if (gv.listener) - { - gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener); - } - - if (gv.startup_mode) - { - qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration)); - } - - if (config.monitor_port >= 0) - { - gv.debmon = new_debug_monitor (config.monitor_port); - } - return 0; err_mc_conn: @@ -1402,6 +1361,57 @@ err_udp_tcp_init: return -1; } +#ifdef DDSI_INCLUDE_NETWORK_CHANNELS +static void stop_all_xeventq_upto (struct config_channel_listelem *chptr) +{ + for (struct config_channel_listelem *chptr1 = config.channels; chptr1 != chptr; chptr1 = chptr1->next) + if (chptr1->evq) + xeventq_stop (chptr1->evq); +} +#endif + +int rtps_start (void) +{ + if (xeventq_start (gv.xevents, NULL) < 0) + return -1; +#ifdef DDSI_INCLUDE_NETWORK_CHANNELS + for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next) + { + if (chptr->evq) + { + if (xeventq_start (chptr->evq, chptr->name) < 0) + { + stop_all_xeventq_upto (chptr); + xeventq_stop (gv.xevents); + return -1; + } + } + } +#endif + + if (setup_and_start_recv_threads () < 0) + { +#ifdef DDSI_INCLUDE_NETWORK_CHANNELS + stop_all_xeventq_upto (NULL); +#endif + xeventq_stop (gv.xevents); + return -1; + } + if (gv.listener) + { + gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener); + } + if (gv.startup_mode) + { + qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration)); + } + if (config.monitor_port >= 0) + { + gv.debmon = new_debug_monitor (config.monitor_port); + } + return 0; +} + struct dq_builtins_ready_arg { ddsrt_mutex_t lock; ddsrt_cond_t cond;