timely initialization of builtin topics (#138)
The rtps_init function used to initialize all data structures and start all threads used by the protocol stack, allowing discovery of remote entities before the built-in topic data structures were initialized. (Very) early discovery of a remote participant thus led to a crash. This commit splits the initialisation, providing a separate function for starting, in particular, the threads receiving data from the network. In terms of threads created, it matches exactly with the rtps_stop / rtps_fini split that already existed to address the exact same problem on termination. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
		
							parent
							
								
									b9406b2dee
								
							
						
					
					
						commit
						cb0d1a9e50
					
				
					 3 changed files with 67 additions and 47 deletions
				
			
		| 
						 | 
					@ -113,6 +113,7 @@ dds_init(dds_domainid_t domain)
 | 
				
			||||||
    goto fail_rtps_config;
 | 
					    goto fail_rtps_config;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  upgrade_main_thread();
 | 
				
			||||||
  ut_avlInit(&dds_domaintree_def, &dds_global.m_domains);
 | 
					  ut_avlInit(&dds_domaintree_def, &dds_global.m_domains);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  /* Start monitoring the liveliness of all threads. */
 | 
					  /* Start monitoring the liveliness of all threads. */
 | 
				
			||||||
| 
						 | 
					@ -129,7 +130,7 @@ dds_init(dds_domainid_t domain)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (rtps_init() < 0)
 | 
					  if (rtps_init () < 0)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n");
 | 
					    DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n");
 | 
				
			||||||
    ret = DDS_ERRNO(DDS_RETCODE_ERROR);
 | 
					    ret = DDS_ERRNO(DDS_RETCODE_ERROR);
 | 
				
			||||||
| 
						 | 
					@ -138,6 +139,13 @@ dds_init(dds_domainid_t domain)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  dds__builtin_init ();
 | 
					  dds__builtin_init ();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (rtps_start () < 0)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    DDS_LOG(DDS_LC_CONFIG, "Failed to start RTPS\n");
 | 
				
			||||||
 | 
					    ret = DDS_ERRNO(DDS_RETCODE_ERROR);
 | 
				
			||||||
 | 
					    goto fail_rtps_start;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
 | 
					  if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    DDS_ERROR("Failed to start the servicelease\n");
 | 
					    DDS_ERROR("Failed to start the servicelease\n");
 | 
				
			||||||
| 
						 | 
					@ -145,8 +153,6 @@ dds_init(dds_domainid_t domain)
 | 
				
			||||||
    goto fail_servicelease_start;
 | 
					    goto fail_servicelease_start;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  upgrade_main_thread();
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  /* Set additional default participant properties */
 | 
					  /* Set additional default participant properties */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid();
 | 
					  gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid();
 | 
				
			||||||
| 
						 | 
					@ -173,6 +179,8 @@ fail_servicelease_start:
 | 
				
			||||||
  if (gv.servicelease)
 | 
					  if (gv.servicelease)
 | 
				
			||||||
    nn_servicelease_stop_renewing (gv.servicelease);
 | 
					    nn_servicelease_stop_renewing (gv.servicelease);
 | 
				
			||||||
  rtps_stop ();
 | 
					  rtps_stop ();
 | 
				
			||||||
 | 
					fail_rtps_start:
 | 
				
			||||||
 | 
					  dds__builtin_fini ();
 | 
				
			||||||
  rtps_fini ();
 | 
					  rtps_fini ();
 | 
				
			||||||
fail_rtps_init:
 | 
					fail_rtps_init:
 | 
				
			||||||
  if (gv.servicelease)
 | 
					  if (gv.servicelease)
 | 
				
			||||||
| 
						 | 
					@ -181,6 +189,7 @@ fail_rtps_init:
 | 
				
			||||||
    gv.servicelease = NULL;
 | 
					    gv.servicelease = NULL;
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
fail_servicelease_new:
 | 
					fail_servicelease_new:
 | 
				
			||||||
 | 
					  downgrade_main_thread ();
 | 
				
			||||||
  thread_states_fini();
 | 
					  thread_states_fini();
 | 
				
			||||||
fail_rtps_config:
 | 
					fail_rtps_config:
 | 
				
			||||||
fail_config_domainid:
 | 
					fail_config_domainid:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -76,6 +76,7 @@ struct cfgst;
 | 
				
			||||||
int rtps_config_prep (struct cfgst *cfgst);
 | 
					int rtps_config_prep (struct cfgst *cfgst);
 | 
				
			||||||
int rtps_config_open (void);
 | 
					int rtps_config_open (void);
 | 
				
			||||||
int rtps_init (void);
 | 
					int rtps_init (void);
 | 
				
			||||||
 | 
					int rtps_start (void);
 | 
				
			||||||
void ddsi_plugin_init (void);
 | 
					void ddsi_plugin_init (void);
 | 
				
			||||||
void rtps_stop (void);
 | 
					void rtps_stop (void);
 | 
				
			||||||
void rtps_fini (void);
 | 
					void rtps_fini (void);
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1275,61 +1275,20 @@ int rtps_init (void)
 | 
				
			||||||
  gv.rtps_keepgoing = 1;
 | 
					  gv.rtps_keepgoing = 1;
 | 
				
			||||||
  ddsrt_rwlock_init (&gv.qoslock);
 | 
					  ddsrt_rwlock_init (&gv.qoslock);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    int r;
 | 
					 | 
				
			||||||
    gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL);
 | 
					 | 
				
			||||||
    if ((r = xeventq_start (gv.xevents, NULL)) < 0)
 | 
					 | 
				
			||||||
    {
 | 
					 | 
				
			||||||
      DDS_FATAL("failed to start global event processing thread (%d)\n", r);
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (config.xpack_send_async)
 | 
					  if (config.xpack_send_async)
 | 
				
			||||||
  {
 | 
					  {
 | 
				
			||||||
    nn_xpack_sendq_init();
 | 
					    nn_xpack_sendq_init();
 | 
				
			||||||
    nn_xpack_sendq_start();
 | 
					    nn_xpack_sendq_start();
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL);
 | 
				
			||||||
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
 | 
					#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
 | 
				
			||||||
  /* Create a delivery queue and start tev for each channel */
 | 
					  for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next)
 | 
				
			||||||
  {
 | 
					    chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
 | 
				
			||||||
    struct config_channel_listelem * chptr = config.channels;
 | 
					 | 
				
			||||||
    while (chptr)
 | 
					 | 
				
			||||||
    {
 | 
					 | 
				
			||||||
      chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
 | 
					 | 
				
			||||||
      if (chptr->evq)
 | 
					 | 
				
			||||||
      {
 | 
					 | 
				
			||||||
        int r;
 | 
					 | 
				
			||||||
        if ((r = xeventq_start (chptr->evq, chptr->name)) < 0)
 | 
					 | 
				
			||||||
          DDS_FATAL("failed to start event processing thread for channel '%s' (%d)\n", chptr->name, r);
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
      chptr = chptr->next;
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
#else
 | 
					#else
 | 
				
			||||||
  gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
 | 
					  gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
 | 
				
			||||||
#endif
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  if (setup_and_start_recv_threads () < 0)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    DDS_FATAL("failed to start receive threads\n");
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (gv.listener)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (gv.startup_mode)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration));
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  if (config.monitor_port >= 0)
 | 
					 | 
				
			||||||
  {
 | 
					 | 
				
			||||||
    gv.debmon = new_debug_monitor (config.monitor_port);
 | 
					 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  return 0;
 | 
					  return 0;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
err_mc_conn:
 | 
					err_mc_conn:
 | 
				
			||||||
| 
						 | 
					@ -1402,6 +1361,57 @@ err_udp_tcp_init:
 | 
				
			||||||
  return -1;
 | 
					  return -1;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
 | 
				
			||||||
 | 
					static void stop_all_xeventq_upto (struct config_channel_listelem *chptr)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  for (struct config_channel_listelem *chptr1 = config.channels; chptr1 != chptr; chptr1 = chptr1->next)
 | 
				
			||||||
 | 
					    if (chptr1->evq)
 | 
				
			||||||
 | 
					      xeventq_stop (chptr1->evq);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					int rtps_start (void)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					  if (xeventq_start (gv.xevents, NULL) < 0)
 | 
				
			||||||
 | 
					    return -1;
 | 
				
			||||||
 | 
					#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
 | 
				
			||||||
 | 
					  for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    if (chptr->evq)
 | 
				
			||||||
 | 
					    {
 | 
				
			||||||
 | 
					      if (xeventq_start (chptr->evq, chptr->name) < 0)
 | 
				
			||||||
 | 
					      {
 | 
				
			||||||
 | 
					        stop_all_xeventq_upto (chptr);
 | 
				
			||||||
 | 
					        xeventq_stop (gv.xevents);
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if (setup_and_start_recv_threads () < 0)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
 | 
				
			||||||
 | 
					    stop_all_xeventq_upto (NULL);
 | 
				
			||||||
 | 
					#endif
 | 
				
			||||||
 | 
					    xeventq_stop (gv.xevents);
 | 
				
			||||||
 | 
					    return -1;
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  if (gv.listener)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  if (gv.startup_mode)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration));
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  if (config.monitor_port >= 0)
 | 
				
			||||||
 | 
					  {
 | 
				
			||||||
 | 
					    gv.debmon = new_debug_monitor (config.monitor_port);
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return 0;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct dq_builtins_ready_arg {
 | 
					struct dq_builtins_ready_arg {
 | 
				
			||||||
  ddsrt_mutex_t lock;
 | 
					  ddsrt_mutex_t lock;
 | 
				
			||||||
  ddsrt_cond_t cond;
 | 
					  ddsrt_cond_t cond;
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue