From f0f76454c7d261b2a515cfe9ad7a50392d6deb72 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 27 Mar 2019 09:15:41 +0100 Subject: [PATCH 1/3] timely initialization of builtin topics (#138) The rtps_init function used to initialize all data structures and start all threads used by the protocol stack, allowing discovery of remote entities before the built-in topic data structures were initialized. (Very) early discovery of a remote participant thus led to a crash. This commit splits the initialisation, providing a separate function for starting, in particular, the threads receiving data from the network. In terms of threads created, it matches exactly with the rtps_stop / rtps_fini split that already existed to address the exact same problem on termination. Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_init.c | 15 +++- src/core/ddsi/include/dds/ddsi/q_rtps.h | 1 + src/core/ddsi/src/q_init.c | 98 ++++++++++++++----------- 3 files changed, 67 insertions(+), 47 deletions(-) diff --git a/src/core/ddsc/src/dds_init.c b/src/core/ddsc/src/dds_init.c index 1a434b5..3aa8751 100644 --- a/src/core/ddsc/src/dds_init.c +++ b/src/core/ddsc/src/dds_init.c @@ -113,6 +113,7 @@ dds_init(dds_domainid_t domain) goto fail_rtps_config; } + upgrade_main_thread(); ut_avlInit(&dds_domaintree_def, &dds_global.m_domains); /* Start monitoring the liveliness of all threads. */ @@ -129,7 +130,7 @@ dds_init(dds_domainid_t domain) } } - if (rtps_init() < 0) + if (rtps_init () < 0) { DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n"); ret = DDS_ERRNO(DDS_RETCODE_ERROR); @@ -138,6 +139,13 @@ dds_init(dds_domainid_t domain) dds__builtin_init (); + if (rtps_start () < 0) + { + DDS_LOG(DDS_LC_CONFIG, "Failed to start RTPS\n"); + ret = DDS_ERRNO(DDS_RETCODE_ERROR); + goto fail_rtps_start; + } + if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0) { DDS_ERROR("Failed to start the servicelease\n"); @@ -145,8 +153,6 @@ dds_init(dds_domainid_t domain) goto fail_servicelease_start; } - upgrade_main_thread(); - /* Set additional default participant properties */ gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid(); @@ -173,6 +179,8 @@ fail_servicelease_start: if (gv.servicelease) nn_servicelease_stop_renewing (gv.servicelease); rtps_stop (); +fail_rtps_start: + dds__builtin_fini (); rtps_fini (); fail_rtps_init: if (gv.servicelease) @@ -181,6 +189,7 @@ fail_rtps_init: gv.servicelease = NULL; } fail_servicelease_new: + downgrade_main_thread (); thread_states_fini(); fail_rtps_config: fail_config_domainid: diff --git a/src/core/ddsi/include/dds/ddsi/q_rtps.h b/src/core/ddsi/include/dds/ddsi/q_rtps.h index e9793d6..731fbd9 100644 --- a/src/core/ddsi/include/dds/ddsi/q_rtps.h +++ b/src/core/ddsi/include/dds/ddsi/q_rtps.h @@ -76,6 +76,7 @@ struct cfgst; int rtps_config_prep (struct cfgst *cfgst); int rtps_config_open (void); int rtps_init (void); +int rtps_start (void); void ddsi_plugin_init (void); void rtps_stop (void); void rtps_fini (void); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index d5ca562..97ae856 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -1275,61 +1275,20 @@ int rtps_init (void) gv.rtps_keepgoing = 1; ddsrt_rwlock_init (&gv.qoslock); - { - int r; - gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL); - if ((r = xeventq_start (gv.xevents, NULL)) < 0) - { - DDS_FATAL("failed to start global event processing thread (%d)\n", r); - } - } - if (config.xpack_send_async) { nn_xpack_sendq_init(); nn_xpack_sendq_start(); } + gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL); #ifdef DDSI_INCLUDE_NETWORK_CHANNELS - /* Create a delivery queue and start tev for each channel */ - { - struct config_channel_listelem * chptr = config.channels; - while (chptr) - { - chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL); - if (chptr->evq) - { - int r; - if ((r = xeventq_start (chptr->evq, chptr->name)) < 0) - DDS_FATAL("failed to start event processing thread for channel '%s' (%d)\n", chptr->name, r); - } - chptr = chptr->next; - } - } + for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next) + chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL); #else gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL); #endif - if (setup_and_start_recv_threads () < 0) - { - DDS_FATAL("failed to start receive threads\n"); - } - - if (gv.listener) - { - gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener); - } - - if (gv.startup_mode) - { - qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration)); - } - - if (config.monitor_port >= 0) - { - gv.debmon = new_debug_monitor (config.monitor_port); - } - return 0; err_mc_conn: @@ -1402,6 +1361,57 @@ err_udp_tcp_init: return -1; } +#ifdef DDSI_INCLUDE_NETWORK_CHANNELS +static void stop_all_xeventq_upto (struct config_channel_listelem *chptr) +{ + for (struct config_channel_listelem *chptr1 = config.channels; chptr1 != chptr; chptr1 = chptr1->next) + if (chptr1->evq) + xeventq_stop (chptr1->evq); +} +#endif + +int rtps_start (void) +{ + if (xeventq_start (gv.xevents, NULL) < 0) + return -1; +#ifdef DDSI_INCLUDE_NETWORK_CHANNELS + for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next) + { + if (chptr->evq) + { + if (xeventq_start (chptr->evq, chptr->name) < 0) + { + stop_all_xeventq_upto (chptr); + xeventq_stop (gv.xevents); + return -1; + } + } + } +#endif + + if (setup_and_start_recv_threads () < 0) + { +#ifdef DDSI_INCLUDE_NETWORK_CHANNELS + stop_all_xeventq_upto (NULL); +#endif + xeventq_stop (gv.xevents); + return -1; + } + if (gv.listener) + { + gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener); + } + if (gv.startup_mode) + { + qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration)); + } + if (config.monitor_port >= 0) + { + gv.debmon = new_debug_monitor (config.monitor_port); + } + return 0; +} + struct dq_builtins_ready_arg { ddsrt_mutex_t lock; ddsrt_cond_t cond; From 774e52069daefce305ccd205a827cfc7e0bf383b Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 27 Mar 2019 09:21:09 +0100 Subject: [PATCH 2/3] trace correct thread id during thread creation The rewrite of the abstraction layer changed some details in thread ids used in tracing and functions to get those ids, with a result of always printing the parent thread's id in create_thread rather than the newly create thread's id. As all supported platforms use thread names in the trace, it is a rather insignificant matter, and so this provides the trivial fix by letting the new thread log the message. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_thread.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ddsi/src/q_thread.c b/src/core/ddsi/src/q_thread.c index cadbb6e..4d3fe45 100644 --- a/src/core/ddsi/src/q_thread.c +++ b/src/core/ddsi/src/q_thread.c @@ -172,6 +172,7 @@ static uint32_t create_thread_wrapper (void *ptr) { uint32_t ret; struct thread_context *ctx = ptr; + DDS_TRACE ("started new thread %"PRIdTID": %s\n", ddsrt_gettid (), ctx->self->name); ctx->self->tid = ddsrt_thread_self (); ret = ctx->f (ctx->arg); ddsrt_free (ctx); @@ -272,7 +273,6 @@ struct thread_state1 *create_thread (const char *name, uint32_t (*f) (void *arg) DDS_FATAL("create_thread: %s: ddsrt_thread_create failed\n", name); goto fatal; } - DDS_TRACE("started new thread %"PRIdTID" : %s\n", ddsrt_gettid(), name); ts1->extTid = tid; /* overwrite the temporary value with the correct external one */ ddsrt_mutex_unlock (&thread_states.lock); return ts1; From a15fc3594b91c55611bc7eaff9dfcce77ecb699e Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Wed, 27 Mar 2019 09:25:09 +0100 Subject: [PATCH 3/3] avoid debmon thread shutdown logging write errors During shutdown, the optional "debmon" thread for getting some information about internal state of the DDSI stack had a tendency to run into errors from calling write on a connection that had already been closed immediately after connecting successfully to wake the thread. Instead of blindly writing into the connection, it now checks whether it is supposed to shutdown before doing anything, avoiding this particular problem. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_debmon.c | 49 +++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index eb83d81..beaa9bf 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -300,37 +300,46 @@ static int print_proxy_participants (struct thread_state1 *self, ddsi_tran_conn_ return x; } +static void debmon_handle_connection (struct debug_monitor *dm, ddsi_tran_conn_t conn) +{ + struct plugin *p; + int r = 0; + r += print_participants (dm->servts, conn); + if (r == 0) + r += print_proxy_participants (dm->servts, conn); + + /* Note: can only add plugins (at the tail) */ + ddsrt_mutex_lock (&dm->lock); + p = dm->plugins; + while (r == 0 && p != NULL) + { + ddsrt_mutex_unlock (&dm->lock); + r += p->fn (conn, cpf, p->arg); + ddsrt_mutex_lock (&dm->lock); + p = p->next; + } + ddsrt_mutex_unlock (&dm->lock); +} + static uint32_t debmon_main (void *vdm) { struct debug_monitor *dm = vdm; ddsrt_mutex_lock (&dm->lock); while (!dm->stop) { - ddsi_tran_conn_t conn; ddsrt_mutex_unlock (&dm->lock); - if ((conn = ddsi_listener_accept (dm->servsock)) != NULL) + ddsi_tran_conn_t conn = ddsi_listener_accept (dm->servsock); + ddsrt_mutex_lock (&dm->lock); + if (conn != NULL && !dm->stop) { - struct plugin *p; - int r = 0; - r += print_participants (dm->servts, conn); - if (r == 0) - r += print_proxy_participants (dm->servts, conn); - - /* Note: can only add plugins (at the tail) */ - ddsrt_mutex_lock (&dm->lock); - p = dm->plugins; - while (r == 0 && p != NULL) - { - ddsrt_mutex_unlock (&dm->lock); - r += p->fn (conn, cpf, p->arg); - ddsrt_mutex_lock (&dm->lock); - p = p->next; - } ddsrt_mutex_unlock (&dm->lock); - + debmon_handle_connection (dm, conn); + ddsrt_mutex_lock (&dm->lock); + } + if (conn != NULL) + { ddsi_conn_free (conn); } - ddsrt_mutex_lock (&dm->lock); } ddsrt_mutex_unlock (&dm->lock); return 0;