Merge pull request #139 from eboasson/master

Fix for crash in ddsi_serdata_from_keyhash
This commit is contained in:
eboasson 2019-03-27 13:38:38 +01:00 committed by GitHub
commit e97adcace0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 68 deletions

View file

@ -113,6 +113,7 @@ dds_init(dds_domainid_t domain)
goto fail_rtps_config; goto fail_rtps_config;
} }
upgrade_main_thread();
ut_avlInit(&dds_domaintree_def, &dds_global.m_domains); ut_avlInit(&dds_domaintree_def, &dds_global.m_domains);
/* Start monitoring the liveliness of all threads. */ /* Start monitoring the liveliness of all threads. */
@ -129,7 +130,7 @@ dds_init(dds_domainid_t domain)
} }
} }
if (rtps_init() < 0) if (rtps_init () < 0)
{ {
DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n"); DDS_LOG(DDS_LC_CONFIG, "Failed to initialize RTPS\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR); ret = DDS_ERRNO(DDS_RETCODE_ERROR);
@ -138,6 +139,13 @@ dds_init(dds_domainid_t domain)
dds__builtin_init (); dds__builtin_init ();
if (rtps_start () < 0)
{
DDS_LOG(DDS_LC_CONFIG, "Failed to start RTPS\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
goto fail_rtps_start;
}
if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0) if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
{ {
DDS_ERROR("Failed to start the servicelease\n"); DDS_ERROR("Failed to start the servicelease\n");
@ -145,8 +153,6 @@ dds_init(dds_domainid_t domain)
goto fail_servicelease_start; goto fail_servicelease_start;
} }
upgrade_main_thread();
/* Set additional default participant properties */ /* Set additional default participant properties */
gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid(); gv.default_plist_pp.process_id = (unsigned)ddsrt_getpid();
@ -173,6 +179,8 @@ fail_servicelease_start:
if (gv.servicelease) if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease); nn_servicelease_stop_renewing (gv.servicelease);
rtps_stop (); rtps_stop ();
fail_rtps_start:
dds__builtin_fini ();
rtps_fini (); rtps_fini ();
fail_rtps_init: fail_rtps_init:
if (gv.servicelease) if (gv.servicelease)
@ -181,6 +189,7 @@ fail_rtps_init:
gv.servicelease = NULL; gv.servicelease = NULL;
} }
fail_servicelease_new: fail_servicelease_new:
downgrade_main_thread ();
thread_states_fini(); thread_states_fini();
fail_rtps_config: fail_rtps_config:
fail_config_domainid: fail_config_domainid:

View file

@ -76,6 +76,7 @@ struct cfgst;
int rtps_config_prep (struct cfgst *cfgst); int rtps_config_prep (struct cfgst *cfgst);
int rtps_config_open (void); int rtps_config_open (void);
int rtps_init (void); int rtps_init (void);
int rtps_start (void);
void ddsi_plugin_init (void); void ddsi_plugin_init (void);
void rtps_stop (void); void rtps_stop (void);
void rtps_fini (void); void rtps_fini (void);

View file

@ -300,37 +300,46 @@ static int print_proxy_participants (struct thread_state1 *self, ddsi_tran_conn_
return x; return x;
} }
static void debmon_handle_connection (struct debug_monitor *dm, ddsi_tran_conn_t conn)
{
struct plugin *p;
int r = 0;
r += print_participants (dm->servts, conn);
if (r == 0)
r += print_proxy_participants (dm->servts, conn);
/* Note: can only add plugins (at the tail) */
ddsrt_mutex_lock (&dm->lock);
p = dm->plugins;
while (r == 0 && p != NULL)
{
ddsrt_mutex_unlock (&dm->lock);
r += p->fn (conn, cpf, p->arg);
ddsrt_mutex_lock (&dm->lock);
p = p->next;
}
ddsrt_mutex_unlock (&dm->lock);
}
static uint32_t debmon_main (void *vdm) static uint32_t debmon_main (void *vdm)
{ {
struct debug_monitor *dm = vdm; struct debug_monitor *dm = vdm;
ddsrt_mutex_lock (&dm->lock); ddsrt_mutex_lock (&dm->lock);
while (!dm->stop) while (!dm->stop)
{ {
ddsi_tran_conn_t conn;
ddsrt_mutex_unlock (&dm->lock); ddsrt_mutex_unlock (&dm->lock);
if ((conn = ddsi_listener_accept (dm->servsock)) != NULL) ddsi_tran_conn_t conn = ddsi_listener_accept (dm->servsock);
ddsrt_mutex_lock (&dm->lock);
if (conn != NULL && !dm->stop)
{ {
struct plugin *p;
int r = 0;
r += print_participants (dm->servts, conn);
if (r == 0)
r += print_proxy_participants (dm->servts, conn);
/* Note: can only add plugins (at the tail) */
ddsrt_mutex_lock (&dm->lock);
p = dm->plugins;
while (r == 0 && p != NULL)
{
ddsrt_mutex_unlock (&dm->lock);
r += p->fn (conn, cpf, p->arg);
ddsrt_mutex_lock (&dm->lock);
p = p->next;
}
ddsrt_mutex_unlock (&dm->lock); ddsrt_mutex_unlock (&dm->lock);
debmon_handle_connection (dm, conn);
ddsrt_mutex_lock (&dm->lock);
}
if (conn != NULL)
{
ddsi_conn_free (conn); ddsi_conn_free (conn);
} }
ddsrt_mutex_lock (&dm->lock);
} }
ddsrt_mutex_unlock (&dm->lock); ddsrt_mutex_unlock (&dm->lock);
return 0; return 0;

View file

@ -1275,61 +1275,20 @@ int rtps_init (void)
gv.rtps_keepgoing = 1; gv.rtps_keepgoing = 1;
ddsrt_rwlock_init (&gv.qoslock); ddsrt_rwlock_init (&gv.qoslock);
{
int r;
gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL);
if ((r = xeventq_start (gv.xevents, NULL)) < 0)
{
DDS_FATAL("failed to start global event processing thread (%d)\n", r);
}
}
if (config.xpack_send_async) if (config.xpack_send_async)
{ {
nn_xpack_sendq_init(); nn_xpack_sendq_init();
nn_xpack_sendq_start(); nn_xpack_sendq_start();
} }
gv.builtins_dqueue = nn_dqueue_new ("builtins", config.delivery_queue_maxsamples, builtins_dqueue_handler, NULL);
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS #ifdef DDSI_INCLUDE_NETWORK_CHANNELS
/* Create a delivery queue and start tev for each channel */ for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next)
{ chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
struct config_channel_listelem * chptr = config.channels;
while (chptr)
{
chptr->dqueue = nn_dqueue_new (chptr->name, config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
if (chptr->evq)
{
int r;
if ((r = xeventq_start (chptr->evq, chptr->name)) < 0)
DDS_FATAL("failed to start event processing thread for channel '%s' (%d)\n", chptr->name, r);
}
chptr = chptr->next;
}
}
#else #else
gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL); gv.user_dqueue = nn_dqueue_new ("user", config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
#endif #endif
if (setup_and_start_recv_threads () < 0)
{
DDS_FATAL("failed to start receive threads\n");
}
if (gv.listener)
{
gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
}
if (gv.startup_mode)
{
qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration));
}
if (config.monitor_port >= 0)
{
gv.debmon = new_debug_monitor (config.monitor_port);
}
return 0; return 0;
err_mc_conn: err_mc_conn:
@ -1402,6 +1361,57 @@ err_udp_tcp_init:
return -1; return -1;
} }
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
static void stop_all_xeventq_upto (struct config_channel_listelem *chptr)
{
for (struct config_channel_listelem *chptr1 = config.channels; chptr1 != chptr; chptr1 = chptr1->next)
if (chptr1->evq)
xeventq_stop (chptr1->evq);
}
#endif
int rtps_start (void)
{
if (xeventq_start (gv.xevents, NULL) < 0)
return -1;
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
for (struct config_channel_listelem *chptr = config.channels; chptr; chptr = chptr->next)
{
if (chptr->evq)
{
if (xeventq_start (chptr->evq, chptr->name) < 0)
{
stop_all_xeventq_upto (chptr);
xeventq_stop (gv.xevents);
return -1;
}
}
}
#endif
if (setup_and_start_recv_threads () < 0)
{
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
stop_all_xeventq_upto (NULL);
#endif
xeventq_stop (gv.xevents);
return -1;
}
if (gv.listener)
{
gv.listen_ts = create_thread ("listen", (uint32_t (*) (void *)) listen_thread, gv.listener);
}
if (gv.startup_mode)
{
qxev_end_startup_mode (add_duration_to_mtime (now_mt (), config.startup_mode_duration));
}
if (config.monitor_port >= 0)
{
gv.debmon = new_debug_monitor (config.monitor_port);
}
return 0;
}
struct dq_builtins_ready_arg { struct dq_builtins_ready_arg {
ddsrt_mutex_t lock; ddsrt_mutex_t lock;
ddsrt_cond_t cond; ddsrt_cond_t cond;

View file

@ -172,6 +172,7 @@ static uint32_t create_thread_wrapper (void *ptr)
{ {
uint32_t ret; uint32_t ret;
struct thread_context *ctx = ptr; struct thread_context *ctx = ptr;
DDS_TRACE ("started new thread %"PRIdTID": %s\n", ddsrt_gettid (), ctx->self->name);
ctx->self->tid = ddsrt_thread_self (); ctx->self->tid = ddsrt_thread_self ();
ret = ctx->f (ctx->arg); ret = ctx->f (ctx->arg);
ddsrt_free (ctx); ddsrt_free (ctx);
@ -272,7 +273,6 @@ struct thread_state1 *create_thread (const char *name, uint32_t (*f) (void *arg)
DDS_FATAL("create_thread: %s: ddsrt_thread_create failed\n", name); DDS_FATAL("create_thread: %s: ddsrt_thread_create failed\n", name);
goto fatal; goto fatal;
} }
DDS_TRACE("started new thread %"PRIdTID" : %s\n", ddsrt_gettid(), name);
ts1->extTid = tid; /* overwrite the temporary value with the correct external one */ ts1->extTid = tid; /* overwrite the temporary value with the correct external one */
ddsrt_mutex_unlock (&thread_states.lock); ddsrt_mutex_unlock (&thread_states.lock);
return ts1; return ts1;