diff --git a/src/core/ddsc/include/dds/dds.h b/src/core/ddsc/include/dds/dds.h index b78c2e1..b50f1d0 100644 --- a/src/core/ddsc/include/dds/dds.h +++ b/src/core/ddsc/include/dds/dds.h @@ -3370,6 +3370,40 @@ DDS_EXPORT dds_return_t dds_assert_liveliness ( dds_entity_t entity); +/** + * @brief This operation allows making the domain's network stack + * temporarily deaf and/or mute. It is a support function for testing and, + * other special uses and is subject to change. + * + * @param[in] entity A domain entity or an entity bound to a domain, such + * as a participant, reader or writer. + * @param[in] deaf Whether to network stack should pretend to be deaf and + * ignore any incoming packets. + * @param[in] mute Whether to network stack should pretend to be mute and + * discard any outgoing packets where it normally would. + * pass them to the operating system kernel for transmission. + * @param[in] reset_after Any value less than INFINITY will cause it to + * set deaf = mute = false after reset_after ns have passed. + * This is done by an event scheduled for the appropriate + * time and otherwise forgotten. These events are not + * affected by subsequent calls to this function. + * + * @returns A dds_return_t indicating success or failure. + * + * @retval DDS_RETCODE_OK + * The operation was successful. + * @retval DDS_BAD_PARAMETER + * The entity parameter is not a valid parameter. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. +*/ +DDS_EXPORT dds_return_t +dds_domain_set_deafmute ( + dds_entity_t entity, + bool deaf, + bool mute, + dds_duration_t reset_after); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsc/src/dds_domain.c b/src/core/ddsc/src/dds_domain.c index 489b49c..dcc25a3 100644 --- a/src/core/ddsc/src/dds_domain.c +++ b/src/core/ddsc/src/dds_domain.c @@ -286,6 +286,23 @@ static dds_return_t dds_domain_free (dds_entity *vdomain) return DDS_RETCODE_NO_DATA; } +dds_return_t dds_domain_set_deafmute (dds_entity_t entity, bool deaf, bool mute, dds_duration_t reset_after) +{ + struct dds_entity *e; + dds_return_t rc; + if ((rc = dds_entity_pin (entity, &e)) < 0) + return rc; + if (e->m_domain == NULL) + rc = DDS_RETCODE_ILLEGAL_OPERATION; + else + { + ddsi_set_deafmute (&e->m_domain->gv, deaf, mute, reset_after); + rc = DDS_RETCODE_OK; + } + dds_entity_unpin (e); + return rc; +} + #include "dds__entity.h" static void pushdown_set_batch (struct dds_entity *e, bool enable) { diff --git a/src/core/ddsi/include/dds/ddsi/q_rtps.h b/src/core/ddsi/include/dds/ddsi/q_rtps.h index a822d4e..384a15e 100644 --- a/src/core/ddsi/include/dds/ddsi/q_rtps.h +++ b/src/core/ddsi/include/dds/ddsi/q_rtps.h @@ -12,6 +12,7 @@ #ifndef NN_RTPS_H #define NN_RTPS_H +#include "dds/export.h" #include "dds/ddsi/ddsi_vendor.h" #include "dds/ddsi/ddsi_guid.h" @@ -59,12 +60,14 @@ typedef int64_t seqno_t; struct cfgst; struct ddsi_domaingv; -int rtps_config_prep (struct ddsi_domaingv *config, struct cfgst *cfgst); -int rtps_config_open_trace (struct ddsi_domaingv *config); -int rtps_init (struct ddsi_domaingv *config); -int rtps_start (struct ddsi_domaingv *config); -void rtps_stop (struct ddsi_domaingv *config); -void rtps_fini (struct ddsi_domaingv *config); +int rtps_config_prep (struct ddsi_domaingv *gv, struct cfgst *cfgst); +int rtps_config_open_trace (struct ddsi_domaingv *gv); +int rtps_init (struct ddsi_domaingv *gv); +int rtps_start (struct ddsi_domaingv *gv); +void rtps_stop (struct ddsi_domaingv *gv); +void rtps_fini (struct ddsi_domaingv *gv); + +DDS_EXPORT void ddsi_set_deafmute (struct ddsi_domaingv *gv, bool deaf, bool mute, int64_t reset_after); #if defined (__cplusplus) } diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index 3d48219..46ac147 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -926,11 +926,35 @@ static uint32_t ddsi_sertopic_hash_wrap (const void *tp) return ddsi_sertopic_hash (tp); } +static void reset_deaf_mute (struct xevent *xev, void *varg, UNUSED_ARG (nn_mtime_t tnow)) +{ + struct ddsi_domaingv *gv = varg; + gv->deaf = 0; + gv->mute = 0; + GVLOGDISC ("DEAFMUTE auto-reset to [deaf, mute]=[%d, %d]\n", gv->deaf, gv->mute); + delete_xevent (xev); +} + +void ddsi_set_deafmute (struct ddsi_domaingv *gv, bool deaf, bool mute, int64_t reset_after) +{ + gv->deaf = deaf; + gv->mute = mute; + GVLOGDISC (" DEAFMUTE set [deaf, mute]=[%d, %d]", gv->deaf, gv->mute); + if (reset_after < DDS_INFINITY) + { + nn_mtime_t when = add_duration_to_mtime (now_mt (), reset_after); + GVTRACE (" reset after %"PRId64".%09u ns", reset_after / DDS_NSECS_IN_SEC, (unsigned) (reset_after % DDS_NSECS_IN_SEC)); + qxev_callback (gv->xevents, when, reset_deaf_mute, gv); + } + GVLOGDISC ("\n"); +} + int rtps_init (struct ddsi_domaingv *gv) { uint32_t port_disc_uc = 0; uint32_t port_data_uc = 0; bool mc_available = true; + nn_mtime_t reset_deaf_mute_time = { T_NEVER }; gv->tstart = now (); /* wall clock time, used in logs */ @@ -954,6 +978,15 @@ int rtps_init (struct ddsi_domaingv *gv) GVLOG (DDS_LC_CONFIG, "started at %d.06%d -- %s\n", sec, usec, str); } + /* Allow configuration to set "deaf_mute" in case we want to start out that way */ + gv->deaf = gv->config.initial_deaf; + gv->mute = gv->config.initial_mute; + if (gv->deaf || gv->mute) + { + GVLOG (DDS_LC_CONFIG | DDS_LC_DISCOVERY, "DEAFMUTE initial deaf=%d mute=%d reset after %"PRId64"d ns\n", gv->deaf, gv->mute, gv->config.initial_deaf_mute_reset); + reset_deaf_mute_time = add_duration_to_mtime (now_mt (), gv->config.initial_deaf_mute_reset); + } + /* Initialize thread pool */ if (gv->config.tp_enable) { @@ -1400,6 +1433,8 @@ int rtps_init (struct ddsi_domaingv *gv) gv->user_dqueue = nn_dqueue_new ("user", gv, gv->config.delivery_queue_maxsamples, user_dqueue_handler, NULL); #endif + if (reset_deaf_mute_time.v < T_NEVER) + qxev_callback (gv->xevents, reset_deaf_mute_time, reset_deaf_mute, gv); return 0; err_mc_conn: diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 7f0bf28..6ae7d41 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -3140,97 +3140,6 @@ static int recv_thread_waitset_add_conn (os_sockWaitset ws, ddsi_tran_conn_t con } } -enum local_deaf_state_recover { - LDSR_NORMAL = 0, /* matches gv.deaf for normal operation */ - LDSR_DEAF = 1, /* matches gv.deaf for "deaf" state */ - LDSR_REJOIN = 2 -}; - -struct local_deaf_state { - enum local_deaf_state_recover state; - nn_mtime_t tnext; -}; - -static int check_and_handle_deafness_recover (struct ddsi_domaingv *gv, struct local_deaf_state *st, unsigned num_fixed_uc) -{ - int rebuildws = 0; - if (now_mt().v < st->tnext.v) - { - GVTRACE ("check_and_handle_deafness_recover: state %d too early\n", (int)st->state); - return 0; - } - switch (st->state) - { - case LDSR_NORMAL: - assert(0); - break; - case LDSR_DEAF: { - ddsi_tran_conn_t disc = gv->disc_conn_mc, data = gv->data_conn_mc; - GVTRACE ("check_and_handle_deafness_recover: state %d create new sockets\n", (int) st->state); - if (!create_multicast_sockets (gv)) - goto error; - GVTRACE ("check_and_handle_deafness_recover: state %d transfer group membership admin\n", (int) st->state); - ddsi_transfer_group_membership (gv->mship, disc, gv->disc_conn_mc); - ddsi_transfer_group_membership (gv->mship, data, gv->data_conn_mc); - GVTRACE ("check_and_handle_deafness_recover: state %d drop from waitset and add new\n", (int) st->state); - /* see waitset construction code in recv_thread */ - os_sockWaitsetPurge (gv->recv_threads[0].arg.u.many.ws, num_fixed_uc); - if (recv_thread_waitset_add_conn (gv->recv_threads[0].arg.u.many.ws, gv->disc_conn_mc) < 0) - DDS_FATAL("check_and_handle_deafness_recover: failed to add disc_conn_mc to waitset\n"); - if (recv_thread_waitset_add_conn (gv->recv_threads[0].arg.u.many.ws, gv->data_conn_mc) < 0) - DDS_FATAL("check_and_handle_deafness_recover: failed to add data_conn_mc to waitset\n"); - GVTRACE ("check_and_handle_deafness_recover: state %d close sockets\n", (int)st->state); - ddsi_conn_free(disc); - ddsi_conn_free(data); - rebuildws = 1; - st->state = LDSR_REJOIN; - } - /* FALLS THROUGH */ - case LDSR_REJOIN: - GVTRACE ("check_and_handle_deafness_recover: state %d rejoin on disc socket\n", (int)st->state); - if (ddsi_rejoin_transferred_mcgroups (gv, gv->mship, gv->disc_conn_mc) < 0) - goto error; - GVTRACE ("check_and_handle_deafness_recover: state %d rejoin on data socket\n", (int)st->state); - if (ddsi_rejoin_transferred_mcgroups (gv, gv->mship, gv->data_conn_mc) < 0) - goto error; - GVTRACE ("check_and_handle_deafness_recover: state %d done\n", (int)st->state); - st->state = LDSR_NORMAL; - break; - } - GVTRACE ("check_and_handle_deafness_recover: state %d returning %d\n", (int)st->state, rebuildws); - return rebuildws; -error: - GVTRACE ("check_and_handle_deafness_recover: state %d failed, returning %d\n", (int)st->state, rebuildws); - st->state = LDSR_DEAF; - st->tnext = add_duration_to_mtime(now_mt(), T_SECOND); - return rebuildws; -} - -static int check_and_handle_deafness (struct ddsi_domaingv *gv, struct local_deaf_state *st, unsigned num_fixed_uc) -{ - const int gv_deaf = gv->deaf; - assert (gv_deaf == 0 || gv_deaf == 1); - if (gv_deaf == (int)st->state) - return 0; - else if (gv_deaf) - { - GVTRACE ("check_and_handle_deafness: going deaf (%d -> %d)\n", (int)st->state, (int)LDSR_DEAF); - st->state = LDSR_DEAF; - st->tnext = now_mt(); - return 0; - } - else if (!gv->config.allowMulticast) - { - GVTRACE ("check_and_handle_deafness: no longer deaf (multicast disabled)\n"); - st->state = LDSR_NORMAL; - return 0; - } - else - { - return check_and_handle_deafness_recover (gv, st, num_fixed_uc); - } -} - void trigger_recv_threads (const struct ddsi_domaingv *gv) { for (uint32_t i = 0; i < gv->n_recv_threads; i++) @@ -3283,9 +3192,6 @@ uint32_t recv_thread (void *vrecv_thread_arg) struct local_participant_set lps; unsigned num_fixed = 0, num_fixed_uc = 0; os_sockWaitsetCtx ctx; - struct local_deaf_state lds; - lds.state = gv->deaf ? LDSR_DEAF : LDSR_NORMAL; - lds.tnext = now_mt(); local_participant_set_init (&lps, &gv->participant_set_generation); if (gv->m_factory->m_connless) { @@ -3307,9 +3213,8 @@ uint32_t recv_thread (void *vrecv_thread_arg) while (ddsrt_atomic_ld32 (&gv->rtps_keepgoing)) { - int rebuildws; + int rebuildws = 0; LOG_THREAD_CPUTIME (&gv->logconfig, next_thread_cputime); - rebuildws = check_and_handle_deafness (gv, &lds, num_fixed_uc); if (gv->config.many_sockets_mode != MSM_MANY_UNICAST) { /* no other sockets to check */