From 1b448dee9bf9c133e9c47a56db175faf69ddc26f Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Tue, 12 May 2020 09:22:56 +0200 Subject: [PATCH] Add wait-for-acknowledgement to oneliner tests Signed-off-by: Erik Boasson --- src/core/ddsc/tests/listener.c | 29 ++++- src/core/ddsc/tests/test_oneliner.c | 136 +++++++++++++++++------ src/core/ddsi/include/dds/ddsi/q_bswap.h | 4 +- 3 files changed, 124 insertions(+), 45 deletions(-) diff --git a/src/core/ddsc/tests/listener.c b/src/core/ddsc/tests/listener.c index 378f0b3..1bc7005 100644 --- a/src/core/ddsc/tests/listener.c +++ b/src/core/ddsc/tests/listener.c @@ -286,16 +286,33 @@ CU_Test (ddsc_listener, matched) dotest ("sm r pm w' ?pm w' ?sm r"); // Disconnect + reconnect; "deaf P" means the disconnect is asymmetrical: P no longer observes P' - // but P' still observes P. If r did not ACK the data before losing connectivity, w' will hold + // but P' still observes P. If r did not ack the data before losing connectivity, w' will hold // the data and it will be re-delivered after reconnecting, depending on QoS settings (the "..." // allows for extra samples) and whether the instance was taken or not // - // the uncertainty also means we don't really know how many "data available" events there will be - // and the "sleep 0.3" simply gives it a bit more time after the first event + // If r did ack the data, w will drop it and it can't be delivered. If there is another r'' that + // did not ack, r will still not get the data because the writer determines that it was ack'd + // already and it won't retransmit. + // FIXME: this differs from what the spec says should happen, maybe it should be changed? + // (It is a fall-out from changes to make sure a volatile reader doesn't get historical data, but + // it could be handled differently.) + // + // Waiting for an acknowledgement therefore makes sense (and in the other runs, a 0.3s sleep + // kind-a solves the problem of not known exactly how many events there will be: it means at + // least one event has been observed, and behaviour of Cyclone in a simple case like this means + // the full retransmit request will be replied to with a single packet, and that therefore the + // likelihood of the retransmitted data arriving within a window of 0.3s is very high. (Where + // 0.3s is an attempt to pick a duration on the long side of what's needed and short enough not + // to delay things much.) + dotest ("sm da r pm w' ?sm r ?pm w' ;" // matched reader/writer pair + " wr w' 1 ; ?da r take{(1,0,0)} r ?ack w' ;" // wait-for-acks => writer drops data + " deaf P ; ?sm(1,0,0,-1,w') r ?da r take{d1} r ; wr w' 2 ;" // write lost on "wire" + " hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{(2,0,0)} r ; ?!pm"); dotest ("sm da r pm w' ; ?sm r ?pm w' ;" - " wr w' 1 ; ?da r take{(1,0,0)} r sleep 1;" - " deaf P ; ?sm(1,0,0,-1,w') r ?da r take{d1} r ; wr w' 2 ;" - " hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{(2,0,0),...} r ; ?!pm"); + " r'' ?pm w' deaf P'' ;" // with second reader: reader is deaf so won't ACK + " wr w' 1 ; ?da r take{(1,0,0)} r ?ack(r) w' ;" // wait for ack from r' (not r'') + " deaf P ; ?sm(1,0,0,-1,w') r ?da r take{d1} r ; wr w' 2 ;" // write lost on "wire" + " hearing P ; ?sm(2,1,1,1,w') r ?da r sleep 0.3 take{(2,0,0)} r ; ?!pm"); // same without taking the "dispose" after disconnect // sample 1 will be delivered anew dotest ("sm da r pm w' ; ?sm r ?pm w' ; wr w' 1 ; ?da r take{(1,0,0)} r ;" diff --git a/src/core/ddsc/tests/test_oneliner.c b/src/core/ddsc/tests/test_oneliner.c index 255400a..2b635bd 100644 --- a/src/core/ddsc/tests/test_oneliner.c +++ b/src/core/ddsc/tests/test_oneliner.c @@ -25,6 +25,8 @@ #include "dds__types.h" #include "dds__entity.h" +#include "dds__writer.h" +#include "dds/ddsi/q_bswap.h" #include "dds/ddsi/q_lease.h" #include "dds/ddsi/q_xevent.h" #include "dds/ddsi/ddsi_entity_index.h" @@ -1502,46 +1504,106 @@ static void checklistener (struct oneliner_ctx *ctx, int ll, int ent, struct one testfail (ctx, "listener %s: status mask not cleared", lldesc[ll].name); } -static void dochecklistener (struct oneliner_ctx *ctx) +static void dowaitforack (struct oneliner_ctx *ctx) { - const bool expectclear = nexttok_if (&ctx->l, '!'); - const int ll = parse_listener (ctx); - if (ll < 0) - error (ctx, "check listener: requires listener name"); - else if (expectclear) + dds_return_t ret; + int ent, ent1 = -1; + union { dds_guid_t x; ddsi_guid_t i; } rdguid; + if (*ctx->l.inp == '(') // reader present { - printf ("listener %s: check not called", lldesc[ll].name); - fflush (stdout); - ddsrt_mutex_lock (&ctx->g_mutex); - bool ret = true; - for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++) - { - printf (" cb_called %"PRIu32" %s\n", ctx->cb[i].cb_called[lldesc[ll].id], ctx->cb[i].cb_called[lldesc[ll].id] == 0 ? "ok" : "fail"); - if (ctx->cb[i].cb_called[lldesc[ll].id] != 0) - ret = false; - } - ddsrt_mutex_unlock (&ctx->g_mutex); - if (!ret) - testfail (ctx, "callback %s invoked unexpectedly", lldesc[ll].name); + nexttok (&ctx->l, NULL); + if ((ent1 = parse_entity (ctx)) < 0) + error (ctx, "wait for ack: expecting entity"); + if ((ent1 % 9) < 3 || (ent1 % 9) > 5 || ctx->es[ent1] == 0) + error (ctx, "wait for ack: expecting existing reader as argument"); + if ((ret = dds_get_guid (ctx->es[ent1], &rdguid.x)) != 0) + error_dds (ctx, ret, "wait for ack: failed to get GUID for reader %"PRId32, ctx->es[ent1]); + rdguid.i = nn_ntoh_guid (rdguid.i); + if (!nexttok_if (&ctx->l, ')')) + error (ctx, "wait for ack: expecting ')'"); + } + if ((ent = parse_entity (ctx)) < 0) + error (ctx, "wait for ack: expecting writer"); + if (ent1 >= 0 && ent / 9 == ent1 / 9) + error (ctx, "wait for ack: reader and writer must be in different domains"); + if (ctx->es[ent] == 0) + make_entity (ctx, ent, NULL); + printf ("wait for ack %"PRId32" reader %"PRId32"\n", ctx->es[ent], ent1 < 0 ? 0 : ctx->es[ent1]); + + // without a reader argument a simple dds_wait_for_acks (ctx->es[ent], DDS_SECS (5)) suffices + struct dds_entity *x; + if ((ret = dds_entity_pin (ctx->es[ent], &x)) < 0) + error_dds (ctx, ret, "wait for ack: pin entity failed %"PRId32, ctx->es[ent]); + if (dds_entity_kind (x) != DDS_KIND_WRITER) + error_dds (ctx, ret, "wait for ack: %"PRId32" is not a writer", ctx->es[ent]); + else + ret = dds__writer_wait_for_acks ((struct dds_writer *) x, (ent1 < 0) ? NULL : &rdguid.i, dds_time () + DDS_SECS (5)); + dds_entity_unpin (x); + if (ret != 0) + { + if (ret == DDS_RETCODE_TIMEOUT) + testfail (ctx, "wait for acks timed out on entity %"PRId32, ctx->es[ent]); + else + error_dds (ctx, ret, "wait for acks failed on entity %"PRId32, ctx->es[ent]); + } +} + +static void dowaitfornolistener (struct oneliner_ctx *ctx, int ll) +{ + printf ("listener %s: check not called", lldesc[ll].name); + fflush (stdout); + ddsrt_mutex_lock (&ctx->g_mutex); + bool ret = true; + for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++) + { + printf (" %"PRIu32, ctx->cb[i].cb_called[lldesc[ll].id]); + if (ctx->cb[i].cb_called[lldesc[ll].id] != 0) + ret = false; + } + printf (" (%s)\n", ret ? "ok" : "fail"); + ddsrt_mutex_unlock (&ctx->g_mutex); + if (!ret) + testfail (ctx, "callback %s invoked unexpectedly", lldesc[ll].name); +} + +static void dowaitforlistener (struct oneliner_ctx *ctx, int ll) +{ + struct oneliner_lex l1 = ctx->l; + // no whitespace between name and args + const bool have_args = (*ctx->l.inp == '('); + if (have_args) + { + // skip args: we need the entity before we can interpret them + int tok; + while ((tok = nexttok (&ctx->l, NULL)) != EOF && tok != ')') + ; + } + const int ent = parse_entity (ctx); + if (ent < 0) + error (ctx, "check listener: requires an entity"); + if (ctx->es[ent] == 0) + setlistener (ctx, NULL, ll, ent); + checklistener (ctx, ll, ent, have_args ? &l1 : NULL); +} + +static void dowait (struct oneliner_ctx *ctx) +{ + union oneliner_tokval tokval; + if (peektok (&ctx->l, &tokval) == TOK_NAME && strcmp (tokval.n, "ack") == 0) + { + nexttok (&ctx->l, NULL); + dowaitforack (ctx); } else { - struct oneliner_lex l1 = ctx->l; - // no whitespace between name and args - const bool have_args = (*ctx->l.inp == '('); - if (have_args) - { - // skip args: we need the entity before we can interpret them - int tok; - while ((tok = nexttok (&ctx->l, NULL)) != EOF && tok != ')') - ; - } - const int ent = parse_entity (ctx); - if (ent < 0) - error (ctx, "check listener: requires an entity"); - if (ctx->es[ent] == 0) - setlistener (ctx, NULL, ll, ent); - checklistener (ctx, ll, ent, have_args ? &l1 : NULL); + const bool expectclear = nexttok_if (&ctx->l, '!'); + const int ll = parse_listener (ctx); + if (ll < 0) + error (ctx, "check listener: requires listener name"); + if (expectclear) + dowaitfornolistener (ctx, ll); + else + dowaitforlistener (ctx, ll); } } @@ -1628,7 +1690,7 @@ static void dispatchcmd (struct oneliner_ctx *ctx) void (*fn) (struct oneliner_ctx *ct); } cs[] = { { "-", dodelete }, - { "?", dochecklistener }, + { "?", dowait }, { "wr", dowr }, { "wrdisp", dowrdisp }, { "disp", dodisp }, @@ -1744,7 +1806,7 @@ int test_oneliner_fini (struct oneliner_ctx *ctx) setresult (ctx, ret, "terminate: reset listener failed on %"PRId32, ctx->es[i]); if (ctx->result == 0) { - printf ("\n"); + printf ("\n-- dumping content of readers after failure --\n"); for (int i = 0; i < (int) (sizeof (ctx->doms) / sizeof (ctx->doms[0])); i++) { for (int j = 3; j <= 5; j++) diff --git a/src/core/ddsi/include/dds/ddsi/q_bswap.h b/src/core/ddsi/include/dds/ddsi/q_bswap.h index 93a3706..2a2dded 100644 --- a/src/core/ddsi/include/dds/ddsi/q_bswap.h +++ b/src/core/ddsi/include/dds/ddsi/q_bswap.h @@ -33,8 +33,8 @@ ddsi_guid_prefix_t nn_hton_guid_prefix (ddsi_guid_prefix_t p); ddsi_guid_prefix_t nn_ntoh_guid_prefix (ddsi_guid_prefix_t p); ddsi_entityid_t nn_hton_entityid (ddsi_entityid_t e); ddsi_entityid_t nn_ntoh_entityid (ddsi_entityid_t e); -ddsi_guid_t nn_hton_guid (ddsi_guid_t g); -ddsi_guid_t nn_ntoh_guid (ddsi_guid_t g); +DDS_EXPORT ddsi_guid_t nn_hton_guid (ddsi_guid_t g); +DDS_EXPORT ddsi_guid_t nn_ntoh_guid (ddsi_guid_t g); void bswap_sequence_number_set_hdr (nn_sequence_number_set_header_t *snset); void bswap_sequence_number_set_bitmap (nn_sequence_number_set_header_t *snset, uint32_t *bits);