Add wait-for-acknowledgement to oneliner tests

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-05-12 09:22:56 +02:00 committed by eboasson
parent fe81a6bda5
commit 1b448dee9b
3 changed files with 124 additions and 45 deletions

View file

@ -286,16 +286,33 @@ CU_Test (ddsc_listener, matched)
dotest ("sm r pm w' ?pm w' ?sm r");
// Disconnect + reconnect; "deaf P" means the disconnect is asymmetrical: P no longer observes P'
// but P' still observes P. If r did not ACK the data before losing connectivity, w' will hold
// but P' still observes P. If r did not ack the data before losing connectivity, w' will hold
// the data and it will be re-delivered after reconnecting, depending on QoS settings (the "..."
// allows for extra samples) and whether the instance was taken or not
//
// the uncertainty also means we don't really know how many "data available" events there will be
// and the "sleep 0.3" simply gives it a bit more time after the first event
// If r did ack the data, w will drop it and it can't be delivered. If there is another r'' that
// did not ack, r will still not get the data because the writer determines that it was ack'd
// already and it won't retransmit.
// FIXME: this differs from what the spec says should happen, maybe it should be changed?
// (It is a fall-out from changes to make sure a volatile reader doesn't get historical data, but
// it could be handled differently.)
//
// Waiting for an acknowledgement therefore makes sense (and in the other runs, a 0.3s sleep
// kind-a solves the problem of not known exactly how many events there will be: it means at
// least one event has been observed, and behaviour of Cyclone in a simple case like this means
// the full retransmit request will be replied to with a single packet, and that therefore the
// likelihood of the retransmitted data arriving within a window of 0.3s is very high. (Where
// 0.3s is an attempt to pick a duration on the long side of what's needed and short enough not
// to delay things much.)
dotest ("sm da r pm w' ?sm r ?pm w' ;" // matched reader/writer pair
" wr w' 1 ; ?da r take{(1,0,0)} r ?ack w' ;" // wait-for-acks => writer drops data
" deaf P ; ?sm(1,0,0,-1,w') r ?da r take{d1} r ; wr w' 2 ;" // write lost on "wire"
" hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{(2,0,0)} r ; ?!pm");
dotest ("sm da r pm w' ; ?sm r ?pm w' ;"
" wr w' 1 ; ?da r take{(1,0,0)} r sleep 1;"
" deaf P ; ?sm(1,0,0,-1,w') r ?da r take{d1} r ; wr w' 2 ;"
" hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{(2,0,0),...} r ; ?!pm");
" r'' ?pm w' deaf P'' ;" // with second reader: reader is deaf so won't ACK
" wr w' 1 ; ?da r take{(1,0,0)} r ?ack(r) w' ;" // wait for ack from r' (not r'')
" deaf P ; ?sm(1,0,0,-1,w') r ?da r take{d1} r ; wr w' 2 ;" // write lost on "wire"
" hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{(2,0,0)} r ; ?!pm");
// same without taking the "dispose" after disconnect
// sample 1 will be delivered anew
dotest ("sm da r pm w' ; ?sm r ?pm w' ; wr w' 1 ; ?da r take{(1,0,0)} r ;"

View file

@ -25,6 +25,8 @@
#include "dds__types.h"
#include "dds__entity.h"
#include "dds__writer.h"
#include "dds/ddsi/q_bswap.h"
#include "dds/ddsi/q_lease.h"
#include "dds/ddsi/q_xevent.h"
#include "dds/ddsi/ddsi_entity_index.h"
@ -1502,30 +1504,70 @@ static void checklistener (struct oneliner_ctx *ctx, int ll, int ent, struct one
testfail (ctx, "listener %s: status mask not cleared", lldesc[ll].name);
}
static void dochecklistener (struct oneliner_ctx *ctx)
static void dowaitforack (struct oneliner_ctx *ctx)
{
const bool expectclear = nexttok_if (&ctx->l, '!');
const int ll = parse_listener (ctx);
if (ll < 0)
error (ctx, "check listener: requires listener name");
else if (expectclear)
dds_return_t ret;
int ent, ent1 = -1;
union { dds_guid_t x; ddsi_guid_t i; } rdguid;
if (*ctx->l.inp == '(') // reader present
{
nexttok (&ctx->l, NULL);
if ((ent1 = parse_entity (ctx)) < 0)
error (ctx, "wait for ack: expecting entity");
if ((ent1 % 9) < 3 || (ent1 % 9) > 5 || ctx->es[ent1] == 0)
error (ctx, "wait for ack: expecting existing reader as argument");
if ((ret = dds_get_guid (ctx->es[ent1], &rdguid.x)) != 0)
error_dds (ctx, ret, "wait for ack: failed to get GUID for reader %"PRId32, ctx->es[ent1]);
rdguid.i = nn_ntoh_guid (rdguid.i);
if (!nexttok_if (&ctx->l, ')'))
error (ctx, "wait for ack: expecting ')'");
}
if ((ent = parse_entity (ctx)) < 0)
error (ctx, "wait for ack: expecting writer");
if (ent1 >= 0 && ent / 9 == ent1 / 9)
error (ctx, "wait for ack: reader and writer must be in different domains");
if (ctx->es[ent] == 0)
make_entity (ctx, ent, NULL);
printf ("wait for ack %"PRId32" reader %"PRId32"\n", ctx->es[ent], ent1 < 0 ? 0 : ctx->es[ent1]);
// without a reader argument a simple dds_wait_for_acks (ctx->es[ent], DDS_SECS (5)) suffices
struct dds_entity *x;
if ((ret = dds_entity_pin (ctx->es[ent], &x)) < 0)
error_dds (ctx, ret, "wait for ack: pin entity failed %"PRId32, ctx->es[ent]);
if (dds_entity_kind (x) != DDS_KIND_WRITER)
error_dds (ctx, ret, "wait for ack: %"PRId32" is not a writer", ctx->es[ent]);
else
ret = dds__writer_wait_for_acks ((struct dds_writer *) x, (ent1 < 0) ? NULL : &rdguid.i, dds_time () + DDS_SECS (5));
dds_entity_unpin (x);
if (ret != 0)
{
if (ret == DDS_RETCODE_TIMEOUT)
testfail (ctx, "wait for acks timed out on entity %"PRId32, ctx->es[ent]);
else
error_dds (ctx, ret, "wait for acks failed on entity %"PRId32, ctx->es[ent]);
}
}
static void dowaitfornolistener (struct oneliner_ctx *ctx, int ll)
{
printf ("listener %s: check not called", lldesc[ll].name);
fflush (stdout);
ddsrt_mutex_lock (&ctx->g_mutex);
bool ret = true;
for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++)
{
printf (" cb_called %"PRIu32" %s\n", ctx->cb[i].cb_called[lldesc[ll].id], ctx->cb[i].cb_called[lldesc[ll].id] == 0 ? "ok" : "fail");
printf (" %"PRIu32, ctx->cb[i].cb_called[lldesc[ll].id]);
if (ctx->cb[i].cb_called[lldesc[ll].id] != 0)
ret = false;
}
printf (" (%s)\n", ret ? "ok" : "fail");
ddsrt_mutex_unlock (&ctx->g_mutex);
if (!ret)
testfail (ctx, "callback %s invoked unexpectedly", lldesc[ll].name);
}
else
{
}
static void dowaitforlistener (struct oneliner_ctx *ctx, int ll)
{
struct oneliner_lex l1 = ctx->l;
// no whitespace between name and args
const bool have_args = (*ctx->l.inp == '(');
@ -1542,6 +1584,26 @@ static void dochecklistener (struct oneliner_ctx *ctx)
if (ctx->es[ent] == 0)
setlistener (ctx, NULL, ll, ent);
checklistener (ctx, ll, ent, have_args ? &l1 : NULL);
}
static void dowait (struct oneliner_ctx *ctx)
{
union oneliner_tokval tokval;
if (peektok (&ctx->l, &tokval) == TOK_NAME && strcmp (tokval.n, "ack") == 0)
{
nexttok (&ctx->l, NULL);
dowaitforack (ctx);
}
else
{
const bool expectclear = nexttok_if (&ctx->l, '!');
const int ll = parse_listener (ctx);
if (ll < 0)
error (ctx, "check listener: requires listener name");
if (expectclear)
dowaitfornolistener (ctx, ll);
else
dowaitforlistener (ctx, ll);
}
}
@ -1628,7 +1690,7 @@ static void dispatchcmd (struct oneliner_ctx *ctx)
void (*fn) (struct oneliner_ctx *ct);
} cs[] = {
{ "-", dodelete },
{ "?", dochecklistener },
{ "?", dowait },
{ "wr", dowr },
{ "wrdisp", dowrdisp },
{ "disp", dodisp },
@ -1744,7 +1806,7 @@ int test_oneliner_fini (struct oneliner_ctx *ctx)
setresult (ctx, ret, "terminate: reset listener failed on %"PRId32, ctx->es[i]);
if (ctx->result == 0)
{
printf ("\n");
printf ("\n-- dumping content of readers after failure --\n");
for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++)
{
for (int j = 3; j <= 5; j++)

View file

@ -33,8 +33,8 @@ ddsi_guid_prefix_t nn_hton_guid_prefix (ddsi_guid_prefix_t p);
ddsi_guid_prefix_t nn_ntoh_guid_prefix (ddsi_guid_prefix_t p);
ddsi_entityid_t nn_hton_entityid (ddsi_entityid_t e);
ddsi_entityid_t nn_ntoh_entityid (ddsi_entityid_t e);
ddsi_guid_t nn_hton_guid (ddsi_guid_t g);
ddsi_guid_t nn_ntoh_guid (ddsi_guid_t g);
DDS_EXPORT ddsi_guid_t nn_hton_guid (ddsi_guid_t g);
DDS_EXPORT ddsi_guid_t nn_ntoh_guid (ddsi_guid_t g);
void bswap_sequence_number_set_hdr (nn_sequence_number_set_header_t *snset);
void bswap_sequence_number_set_bitmap (nn_sequence_number_set_header_t *snset, uint32_t *bits);