Test interface dropping incoming/outgoing packets
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
9c272c98b8
commit
0845337f47
5 changed files with 96 additions and 102 deletions
|
@ -3370,6 +3370,40 @@ DDS_EXPORT dds_return_t
|
||||||
dds_assert_liveliness (
|
dds_assert_liveliness (
|
||||||
dds_entity_t entity);
|
dds_entity_t entity);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief This operation allows making the domain's network stack
|
||||||
|
* temporarily deaf and/or mute. It is a support function for testing and,
|
||||||
|
* other special uses and is subject to change.
|
||||||
|
*
|
||||||
|
* @param[in] entity A domain entity or an entity bound to a domain, such
|
||||||
|
* as a participant, reader or writer.
|
||||||
|
* @param[in] deaf Whether to network stack should pretend to be deaf and
|
||||||
|
* ignore any incoming packets.
|
||||||
|
* @param[in] mute Whether to network stack should pretend to be mute and
|
||||||
|
* discard any outgoing packets where it normally would.
|
||||||
|
* pass them to the operating system kernel for transmission.
|
||||||
|
* @param[in] reset_after Any value less than INFINITY will cause it to
|
||||||
|
* set deaf = mute = false after reset_after ns have passed.
|
||||||
|
* This is done by an event scheduled for the appropriate
|
||||||
|
* time and otherwise forgotten. These events are not
|
||||||
|
* affected by subsequent calls to this function.
|
||||||
|
*
|
||||||
|
* @returns A dds_return_t indicating success or failure.
|
||||||
|
*
|
||||||
|
* @retval DDS_RETCODE_OK
|
||||||
|
* The operation was successful.
|
||||||
|
* @retval DDS_BAD_PARAMETER
|
||||||
|
* The entity parameter is not a valid parameter.
|
||||||
|
* @retval DDS_RETCODE_ILLEGAL_OPERATION
|
||||||
|
* The operation is invoked on an inappropriate object.
|
||||||
|
*/
|
||||||
|
DDS_EXPORT dds_return_t
|
||||||
|
dds_domain_set_deafmute (
|
||||||
|
dds_entity_t entity,
|
||||||
|
bool deaf,
|
||||||
|
bool mute,
|
||||||
|
dds_duration_t reset_after);
|
||||||
|
|
||||||
#if defined (__cplusplus)
|
#if defined (__cplusplus)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -286,6 +286,23 @@ static dds_return_t dds_domain_free (dds_entity *vdomain)
|
||||||
return DDS_RETCODE_NO_DATA;
|
return DDS_RETCODE_NO_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dds_return_t dds_domain_set_deafmute (dds_entity_t entity, bool deaf, bool mute, dds_duration_t reset_after)
|
||||||
|
{
|
||||||
|
struct dds_entity *e;
|
||||||
|
dds_return_t rc;
|
||||||
|
if ((rc = dds_entity_pin (entity, &e)) < 0)
|
||||||
|
return rc;
|
||||||
|
if (e->m_domain == NULL)
|
||||||
|
rc = DDS_RETCODE_ILLEGAL_OPERATION;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ddsi_set_deafmute (&e->m_domain->gv, deaf, mute, reset_after);
|
||||||
|
rc = DDS_RETCODE_OK;
|
||||||
|
}
|
||||||
|
dds_entity_unpin (e);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
#include "dds__entity.h"
|
#include "dds__entity.h"
|
||||||
static void pushdown_set_batch (struct dds_entity *e, bool enable)
|
static void pushdown_set_batch (struct dds_entity *e, bool enable)
|
||||||
{
|
{
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
#ifndef NN_RTPS_H
|
#ifndef NN_RTPS_H
|
||||||
#define NN_RTPS_H
|
#define NN_RTPS_H
|
||||||
|
|
||||||
|
#include "dds/export.h"
|
||||||
#include "dds/ddsi/ddsi_vendor.h"
|
#include "dds/ddsi/ddsi_vendor.h"
|
||||||
#include "dds/ddsi/ddsi_guid.h"
|
#include "dds/ddsi/ddsi_guid.h"
|
||||||
|
|
||||||
|
@ -59,12 +60,14 @@ typedef int64_t seqno_t;
|
||||||
|
|
||||||
struct cfgst;
|
struct cfgst;
|
||||||
struct ddsi_domaingv;
|
struct ddsi_domaingv;
|
||||||
int rtps_config_prep (struct ddsi_domaingv *config, struct cfgst *cfgst);
|
int rtps_config_prep (struct ddsi_domaingv *gv, struct cfgst *cfgst);
|
||||||
int rtps_config_open_trace (struct ddsi_domaingv *config);
|
int rtps_config_open_trace (struct ddsi_domaingv *gv);
|
||||||
int rtps_init (struct ddsi_domaingv *config);
|
int rtps_init (struct ddsi_domaingv *gv);
|
||||||
int rtps_start (struct ddsi_domaingv *config);
|
int rtps_start (struct ddsi_domaingv *gv);
|
||||||
void rtps_stop (struct ddsi_domaingv *config);
|
void rtps_stop (struct ddsi_domaingv *gv);
|
||||||
void rtps_fini (struct ddsi_domaingv *config);
|
void rtps_fini (struct ddsi_domaingv *gv);
|
||||||
|
|
||||||
|
DDS_EXPORT void ddsi_set_deafmute (struct ddsi_domaingv *gv, bool deaf, bool mute, int64_t reset_after);
|
||||||
|
|
||||||
#if defined (__cplusplus)
|
#if defined (__cplusplus)
|
||||||
}
|
}
|
||||||
|
|
|
@ -926,11 +926,35 @@ static uint32_t ddsi_sertopic_hash_wrap (const void *tp)
|
||||||
return ddsi_sertopic_hash (tp);
|
return ddsi_sertopic_hash (tp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void reset_deaf_mute (struct xevent *xev, void *varg, UNUSED_ARG (nn_mtime_t tnow))
|
||||||
|
{
|
||||||
|
struct ddsi_domaingv *gv = varg;
|
||||||
|
gv->deaf = 0;
|
||||||
|
gv->mute = 0;
|
||||||
|
GVLOGDISC ("DEAFMUTE auto-reset to [deaf, mute]=[%d, %d]\n", gv->deaf, gv->mute);
|
||||||
|
delete_xevent (xev);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ddsi_set_deafmute (struct ddsi_domaingv *gv, bool deaf, bool mute, int64_t reset_after)
|
||||||
|
{
|
||||||
|
gv->deaf = deaf;
|
||||||
|
gv->mute = mute;
|
||||||
|
GVLOGDISC (" DEAFMUTE set [deaf, mute]=[%d, %d]", gv->deaf, gv->mute);
|
||||||
|
if (reset_after < DDS_INFINITY)
|
||||||
|
{
|
||||||
|
nn_mtime_t when = add_duration_to_mtime (now_mt (), reset_after);
|
||||||
|
GVTRACE (" reset after %"PRId64".%09u ns", reset_after / DDS_NSECS_IN_SEC, (unsigned) (reset_after % DDS_NSECS_IN_SEC));
|
||||||
|
qxev_callback (gv->xevents, when, reset_deaf_mute, gv);
|
||||||
|
}
|
||||||
|
GVLOGDISC ("\n");
|
||||||
|
}
|
||||||
|
|
||||||
int rtps_init (struct ddsi_domaingv *gv)
|
int rtps_init (struct ddsi_domaingv *gv)
|
||||||
{
|
{
|
||||||
uint32_t port_disc_uc = 0;
|
uint32_t port_disc_uc = 0;
|
||||||
uint32_t port_data_uc = 0;
|
uint32_t port_data_uc = 0;
|
||||||
bool mc_available = true;
|
bool mc_available = true;
|
||||||
|
nn_mtime_t reset_deaf_mute_time = { T_NEVER };
|
||||||
|
|
||||||
gv->tstart = now (); /* wall clock time, used in logs */
|
gv->tstart = now (); /* wall clock time, used in logs */
|
||||||
|
|
||||||
|
@ -954,6 +978,15 @@ int rtps_init (struct ddsi_domaingv *gv)
|
||||||
GVLOG (DDS_LC_CONFIG, "started at %d.06%d -- %s\n", sec, usec, str);
|
GVLOG (DDS_LC_CONFIG, "started at %d.06%d -- %s\n", sec, usec, str);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Allow configuration to set "deaf_mute" in case we want to start out that way */
|
||||||
|
gv->deaf = gv->config.initial_deaf;
|
||||||
|
gv->mute = gv->config.initial_mute;
|
||||||
|
if (gv->deaf || gv->mute)
|
||||||
|
{
|
||||||
|
GVLOG (DDS_LC_CONFIG | DDS_LC_DISCOVERY, "DEAFMUTE initial deaf=%d mute=%d reset after %"PRId64"d ns\n", gv->deaf, gv->mute, gv->config.initial_deaf_mute_reset);
|
||||||
|
reset_deaf_mute_time = add_duration_to_mtime (now_mt (), gv->config.initial_deaf_mute_reset);
|
||||||
|
}
|
||||||
|
|
||||||
/* Initialize thread pool */
|
/* Initialize thread pool */
|
||||||
if (gv->config.tp_enable)
|
if (gv->config.tp_enable)
|
||||||
{
|
{
|
||||||
|
@ -1400,6 +1433,8 @@ int rtps_init (struct ddsi_domaingv *gv)
|
||||||
gv->user_dqueue = nn_dqueue_new ("user", gv, gv->config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
|
gv->user_dqueue = nn_dqueue_new ("user", gv, gv->config.delivery_queue_maxsamples, user_dqueue_handler, NULL);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
if (reset_deaf_mute_time.v < T_NEVER)
|
||||||
|
qxev_callback (gv->xevents, reset_deaf_mute_time, reset_deaf_mute, gv);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
err_mc_conn:
|
err_mc_conn:
|
||||||
|
|
|
@ -3140,97 +3140,6 @@ static int recv_thread_waitset_add_conn (os_sockWaitset ws, ddsi_tran_conn_t con
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum local_deaf_state_recover {
|
|
||||||
LDSR_NORMAL = 0, /* matches gv.deaf for normal operation */
|
|
||||||
LDSR_DEAF = 1, /* matches gv.deaf for "deaf" state */
|
|
||||||
LDSR_REJOIN = 2
|
|
||||||
};
|
|
||||||
|
|
||||||
struct local_deaf_state {
|
|
||||||
enum local_deaf_state_recover state;
|
|
||||||
nn_mtime_t tnext;
|
|
||||||
};
|
|
||||||
|
|
||||||
static int check_and_handle_deafness_recover (struct ddsi_domaingv *gv, struct local_deaf_state *st, unsigned num_fixed_uc)
|
|
||||||
{
|
|
||||||
int rebuildws = 0;
|
|
||||||
if (now_mt().v < st->tnext.v)
|
|
||||||
{
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d too early\n", (int)st->state);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
switch (st->state)
|
|
||||||
{
|
|
||||||
case LDSR_NORMAL:
|
|
||||||
assert(0);
|
|
||||||
break;
|
|
||||||
case LDSR_DEAF: {
|
|
||||||
ddsi_tran_conn_t disc = gv->disc_conn_mc, data = gv->data_conn_mc;
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d create new sockets\n", (int) st->state);
|
|
||||||
if (!create_multicast_sockets (gv))
|
|
||||||
goto error;
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d transfer group membership admin\n", (int) st->state);
|
|
||||||
ddsi_transfer_group_membership (gv->mship, disc, gv->disc_conn_mc);
|
|
||||||
ddsi_transfer_group_membership (gv->mship, data, gv->data_conn_mc);
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d drop from waitset and add new\n", (int) st->state);
|
|
||||||
/* see waitset construction code in recv_thread */
|
|
||||||
os_sockWaitsetPurge (gv->recv_threads[0].arg.u.many.ws, num_fixed_uc);
|
|
||||||
if (recv_thread_waitset_add_conn (gv->recv_threads[0].arg.u.many.ws, gv->disc_conn_mc) < 0)
|
|
||||||
DDS_FATAL("check_and_handle_deafness_recover: failed to add disc_conn_mc to waitset\n");
|
|
||||||
if (recv_thread_waitset_add_conn (gv->recv_threads[0].arg.u.many.ws, gv->data_conn_mc) < 0)
|
|
||||||
DDS_FATAL("check_and_handle_deafness_recover: failed to add data_conn_mc to waitset\n");
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d close sockets\n", (int)st->state);
|
|
||||||
ddsi_conn_free(disc);
|
|
||||||
ddsi_conn_free(data);
|
|
||||||
rebuildws = 1;
|
|
||||||
st->state = LDSR_REJOIN;
|
|
||||||
}
|
|
||||||
/* FALLS THROUGH */
|
|
||||||
case LDSR_REJOIN:
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d rejoin on disc socket\n", (int)st->state);
|
|
||||||
if (ddsi_rejoin_transferred_mcgroups (gv, gv->mship, gv->disc_conn_mc) < 0)
|
|
||||||
goto error;
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d rejoin on data socket\n", (int)st->state);
|
|
||||||
if (ddsi_rejoin_transferred_mcgroups (gv, gv->mship, gv->data_conn_mc) < 0)
|
|
||||||
goto error;
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d done\n", (int)st->state);
|
|
||||||
st->state = LDSR_NORMAL;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d returning %d\n", (int)st->state, rebuildws);
|
|
||||||
return rebuildws;
|
|
||||||
error:
|
|
||||||
GVTRACE ("check_and_handle_deafness_recover: state %d failed, returning %d\n", (int)st->state, rebuildws);
|
|
||||||
st->state = LDSR_DEAF;
|
|
||||||
st->tnext = add_duration_to_mtime(now_mt(), T_SECOND);
|
|
||||||
return rebuildws;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int check_and_handle_deafness (struct ddsi_domaingv *gv, struct local_deaf_state *st, unsigned num_fixed_uc)
|
|
||||||
{
|
|
||||||
const int gv_deaf = gv->deaf;
|
|
||||||
assert (gv_deaf == 0 || gv_deaf == 1);
|
|
||||||
if (gv_deaf == (int)st->state)
|
|
||||||
return 0;
|
|
||||||
else if (gv_deaf)
|
|
||||||
{
|
|
||||||
GVTRACE ("check_and_handle_deafness: going deaf (%d -> %d)\n", (int)st->state, (int)LDSR_DEAF);
|
|
||||||
st->state = LDSR_DEAF;
|
|
||||||
st->tnext = now_mt();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
else if (!gv->config.allowMulticast)
|
|
||||||
{
|
|
||||||
GVTRACE ("check_and_handle_deafness: no longer deaf (multicast disabled)\n");
|
|
||||||
st->state = LDSR_NORMAL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
return check_and_handle_deafness_recover (gv, st, num_fixed_uc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void trigger_recv_threads (const struct ddsi_domaingv *gv)
|
void trigger_recv_threads (const struct ddsi_domaingv *gv)
|
||||||
{
|
{
|
||||||
for (uint32_t i = 0; i < gv->n_recv_threads; i++)
|
for (uint32_t i = 0; i < gv->n_recv_threads; i++)
|
||||||
|
@ -3283,9 +3192,6 @@ uint32_t recv_thread (void *vrecv_thread_arg)
|
||||||
struct local_participant_set lps;
|
struct local_participant_set lps;
|
||||||
unsigned num_fixed = 0, num_fixed_uc = 0;
|
unsigned num_fixed = 0, num_fixed_uc = 0;
|
||||||
os_sockWaitsetCtx ctx;
|
os_sockWaitsetCtx ctx;
|
||||||
struct local_deaf_state lds;
|
|
||||||
lds.state = gv->deaf ? LDSR_DEAF : LDSR_NORMAL;
|
|
||||||
lds.tnext = now_mt();
|
|
||||||
local_participant_set_init (&lps, &gv->participant_set_generation);
|
local_participant_set_init (&lps, &gv->participant_set_generation);
|
||||||
if (gv->m_factory->m_connless)
|
if (gv->m_factory->m_connless)
|
||||||
{
|
{
|
||||||
|
@ -3307,9 +3213,8 @@ uint32_t recv_thread (void *vrecv_thread_arg)
|
||||||
|
|
||||||
while (ddsrt_atomic_ld32 (&gv->rtps_keepgoing))
|
while (ddsrt_atomic_ld32 (&gv->rtps_keepgoing))
|
||||||
{
|
{
|
||||||
int rebuildws;
|
int rebuildws = 0;
|
||||||
LOG_THREAD_CPUTIME (&gv->logconfig, next_thread_cputime);
|
LOG_THREAD_CPUTIME (&gv->logconfig, next_thread_cputime);
|
||||||
rebuildws = check_and_handle_deafness (gv, &lds, num_fixed_uc);
|
|
||||||
if (gv->config.many_sockets_mode != MSM_MANY_UNICAST)
|
if (gv->config.many_sockets_mode != MSM_MANY_UNICAST)
|
||||||
{
|
{
|
||||||
/* no other sockets to check */
|
/* no other sockets to check */
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue