ensure delivery of writes immediately following pub match event (#165)

A long-standing bug of Cyclone is that a sample written immediately
after a publication-matched event may never arrive at the reader that
was just matched.  This happened because the reader need not have
completed discovery of the writer by the time the writer discovers the
reader, at which point the reader ignores the sample because it either
doesn't know the writer at all, or it hasn't yet seen a Heartbeat from
it.

That Heartbeat arrives shortly after, but by then it is too late: the
reader slaves decides to accept the next sample to be written by the
writer.  (It has no choice, really: either you risk losing some data, or
you will be requesting all historical data, which is empathically not
what a volatile reader is about ...)

A related issue is the handling of historical data for transient-local
readers: it used to deliver this out-of-order, but that is firstly
against the specification, and secondly, against reasonable expectations
of those who use DDS as a mere publish-subscribe messaging system.  To
add insult to injury, it didn't completely handle some reordering issues
with disposes ...

This commit changes the way writers respond to a request for
retransmission from volatile proxy readers and the way the
in-sync/out-of-sync setting of a reader with respect to a proxy-writer
is used.  The first makes it safe for a Cyclone reader to ask a Cyclone
writer for all data (all these details not being covered in the specs it
errs on the reasonable side for other vendors, but that may cause the
data loss mentioned above): the writer simply send a Gap message to the
reader for all the sequence numbers prior to the matching.

The second changes the rule for switching from out-of-sync to in-sync:
that transition is now simply once the next sequence number to be
delivered to the reader equals the next sequence number that will be
delivered directly from the proxy writer object to all readers.  (I.e.,
a much more intuitive notion than reaching some seemingly arbitrary
sequence number.)

To avoid duplicates the rule for delivery straight from a proxy writer
has changed: where samples were delivered from the proxy writer to all
matching readers, they are now delivered only to the matching readers
that are in-sync.  To avoid ordering problems, the idea that historical
data can be delivered through the asynchronous delivery path even when
the regular data goes through the synchronous delivery path has been
abandoned.  All data now always follows the same path.

As these same mechanisms are used for getting historical data into
transient-local readers, the ordering problem for the historical data
also disappeared.

The test stuff in src/core/xtests/initsampledeliv covers a lot of the
interesting cases: data published before the existene of a reader, after
it, mixes of volatile and transient-local.  Running them takes quite a
bit of time, and they are not yet integrated in the CI builds (if ever,
because of that time).

Note: the "conservative built-in startup" option has been removed,
because it really makes no sense to keep a vague compatibility option
added a decade ago "just in case" that has never been used ...

Note: the workaround in the src/mpt/tests/basic/procs/hello.c (use
transient-local to ensure delivery of data) has been removed, as has
been its workaround for the already-fixed #146.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-05-22 20:53:57 +02:00 committed by eboasson
parent e822dba9c1
commit a652ecb78e
16 changed files with 677 additions and 172 deletions

View file

@ -9,18 +9,5 @@
#
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#
idlc_generate(RhcTypes RhcTypes.idl)
add_executable(rhc_torture rhc_torture.c)
target_include_directories(
rhc_torture PRIVATE
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../ddsc/src>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../ddsi/include>")
target_link_libraries(rhc_torture RhcTypes ddsc)
add_test(
NAME rhc_torture
COMMAND rhc_torture 314159265 0 5000 0)
set_property(TEST rhc_torture PROPERTY TIMEOUT 20)
add_subdirectory(rhc_torture)
add_subdirectory(initsampledeliv)

View file

@ -0,0 +1,20 @@
#
# Copyright(c) 2019 ADLINK Technology Limited and others
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
# v. 1.0 which is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#
cmake_minimum_required(VERSION 3.5)
idlc_generate(InitSampleDeliv_lib InitSampleDelivData.idl)
add_executable(InitSampleDelivPub publisher.c)
add_executable(InitSampleDelivSub subscriber.c)
target_link_libraries(InitSampleDelivPub InitSampleDeliv_lib ddsc)
target_link_libraries(InitSampleDelivSub InitSampleDeliv_lib ddsc)

View file

@ -0,0 +1,19 @@
// Copyright(c) 2019 ADLINK Technology Limited and others
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
// v. 1.0 which is available at
// http://www.eclipse.org/org/documents/edl-v10.php.
//
// SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
struct Msg {
long keyval;
long seq;
long tldepth;
long final_seq;
long seq_at_match[2];
};
#pragma keylist Msg keyval

View file

@ -0,0 +1,163 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include "dds/dds.h"
#include "dds/ddsrt/atomics.h"
#include "InitSampleDelivData.h"
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
static void oops (const char *file, int line)
{
fflush (stdout);
fprintf (stderr, "%s:%d\n", file, line);
abort ();
}
#define oops() oops(__FILE__, __LINE__)
static void on_pub_matched (dds_entity_t wr, const dds_publication_matched_status_t st, void *varg)
{
ddsrt_atomic_uint32_t *new_readers = varg;
dds_sample_info_t info;
void *raw = NULL;
dds_entity_t rd;
printf ("pubmatched\n");
if ((rd = dds_create_reader (dds_get_participant (wr), DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION, NULL, NULL)) < 0)
oops ();
if (dds_read_instance (rd, &raw, &info, 1, 1, st.last_subscription_handle) != 1)
oops ();
const dds_builtintopic_endpoint_t *sample = raw;
/* in our test the user data must be present */
void *ud;
size_t udsz;
if (!dds_qget_userdata (sample->qos, &ud, &udsz))
oops ();
int rdid = atoi (ud);
if (rdid < 0 || rdid > 31)
oops ();
printf ("pubmatched: %d\n", rdid);
fflush (stdout);
ddsrt_atomic_or32 (new_readers, UINT32_C (1) << rdid);
dds_free (ud);
dds_return_loan (rd, &raw, 1);
}
static uint32_t get_publication_matched_count (dds_entity_t wr)
{
dds_publication_matched_status_t status;
if (dds_get_publication_matched_status (wr, &status) < 0)
oops ();
return status.current_count;
}
int main (int argc, char ** argv)
{
dds_entity_t ppant;
dds_entity_t tp;
dds_entity_t wr;
dds_qos_t *qos;
ddsrt_atomic_uint32_t newreaders = DDSRT_ATOMIC_UINT32_INIT (0);
int opt;
bool flag_prewrite = false;
bool flag_translocal = false;
const int32_t tlhist = 10;
while ((opt = getopt (argc, argv, "tp")) != EOF)
{
switch (opt)
{
case 't':
flag_translocal = true;
break;
case 'p':
flag_prewrite = true;
break;
default:
fprintf (stderr, "usage error: see source code\n");
exit (2);
}
}
if ((ppant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL)) < 0)
oops ();
qos = dds_create_qos ();
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL);
if ((tp = dds_create_topic (ppant, &Msg_desc, "Msg", qos, NULL)) < 0)
oops ();
/* Writer has overrides for history, durability */
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
dds_qset_durability (qos, flag_translocal ? DDS_DURABILITY_TRANSIENT_LOCAL : DDS_DURABILITY_VOLATILE);
dds_qset_durability_service (qos, 0, DDS_HISTORY_KEEP_LAST, tlhist, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
dds_listener_t *list = dds_create_listener (&newreaders);
dds_lset_publication_matched (list, on_pub_matched);
if ((wr = dds_create_writer (ppant, tp, qos, list)) < 0)
oops ();
dds_delete_listener (list);
dds_delete_qos (qos);
Msg sample = {
.keyval = 0,
.seq = 1,
.tldepth = tlhist,
.final_seq = 30,
.seq_at_match = { 0, 0 }
};
dds_time_t tlast = 0, tnewrd = 0;
while (sample.seq <= sample.final_seq)
{
uint32_t newrd = ddsrt_atomic_and32_ov (&newreaders, 0);
for (uint32_t i = 0; i < 32; i++)
{
if (newrd & (UINT32_C (1) << i))
{
if (i >= (uint32_t) (sizeof (sample.seq_at_match) / sizeof (sample.seq_at_match[0])))
oops ();
if (sample.seq_at_match[i] != 0)
oops ();
sample.seq_at_match[i] = sample.seq;
tnewrd = dds_time ();
printf ("%d.%09d newreader %d: start seq %d\n", (int) (tnewrd / DDS_NSECS_IN_SEC), (int) (tnewrd % DDS_NSECS_IN_SEC), (int) i, (int) sample.seq_at_match[i]);
fflush (stdout);
}
}
if (get_publication_matched_count (wr) || (flag_prewrite && sample.seq <= tlhist + 1))
{
dds_time_t tnow = dds_time ();
if (tnow - tlast > DDS_MSECS (100) || newrd)
{
if (dds_write (wr, &sample) < 0)
oops ();
sample.seq++;
tlast = tnow;
if (sample.seq > sample.final_seq)
{
tnow = dds_time ();
printf ("%d.%09d done writing\n", (int) (tnow / DDS_NSECS_IN_SEC), (int) (tnow % DDS_NSECS_IN_SEC));
fflush (stdout);
}
}
}
dds_sleepfor (DDS_MSECS (1));
}
dds_sleepfor (DDS_MSECS (100));
dds_wait_for_acks (wr, DDS_INFINITY);
dds_delete (ppant);
return 0;
}

View file

@ -0,0 +1,46 @@
#!/bin/bash
#
# Copyright(c) 2019 ADLINK Technology Limited and others
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
# v. 1.0 which is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#
ok=true
for sd in "" "-d1" "-d12" ; do
for st in "" "-t" ; do
for sT in "" "-T" ; do
if [ "$st" = "-t" -o "$sT" = "-T" ] ; then
maybeV=false
else
maybeV=true
fi
for sw in "" "-w" ; do
for pt in "" "-t" ; do
for pp in "" "-p" ; do
if [ "$sT" = "" -a "$sd" != "" -a \( "$pt" = "-t" -o $maybeV = true \) ] ; then
if $ok ; then
echo "bin/InitSampleDelivSub $sw $sd $st $sT & bin/InitSampleDelivPub $pt $pp"
bin/InitSampleDelivSub $sw $sd $st $sT & spid=$!
bin/InitSampleDelivPub $pt $pp
wait $spid || ok=false
fi
if $ok ; then
echo "bin/InitSampleDelivPub $pt $pp & sleep 2 ; bin/InitSampleDelivSub $sw $sd $st $sT "
bin/InitSampleDelivPub $pt $pp & ppid=$!
sleep 2
bin/InitSampleDelivSub $sw $sd $st $sT & spid=$!
wait $spid || ok=false
wait
fi
fi
done
done
done
done
done
done

View file

@ -0,0 +1,198 @@
/*
* Copyright(c) 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include "dds/dds.h"
#include "InitSampleDelivData.h"
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <getopt.h>
static void oops (const char *file, int line)
{
fflush (stdout);
fprintf (stderr, "%s:%d\n", file, line);
abort ();
}
#define oops() oops(__FILE__, __LINE__)
static void wait_for_writer (dds_entity_t ppant)
{
dds_entity_t rd;
dds_sample_info_t info;
void *raw = NULL;
int32_t n;
if ((rd = dds_create_reader (ppant, DDS_BUILTIN_TOPIC_DCPSPUBLICATION, NULL, NULL)) < 0)
oops ();
bool done = false;
do {
dds_sleepfor (DDS_MSECS (100));
while ((n = dds_take (rd, &raw, &info, 1, 1)) == 1)
{
const dds_builtintopic_endpoint_t *sample = raw;
if (strcmp (sample->topic_name, "Msg") == 0)
done = true;
dds_return_loan (rd, &raw, n);
}
if (n < 0) oops ();
} while (!done);
dds_delete (rd);
}
static uint32_t get_subscription_matched_count (dds_entity_t rd)
{
dds_subscription_matched_status_t status;
if (dds_get_subscription_matched_status (rd, &status) < 0)
oops ();
return status.current_count;
}
int main (int argc, char ** argv)
{
dds_entity_t ppant;
dds_entity_t tp;
dds_entity_t rd[2] = { 0, 0 };
dds_qos_t *qos;
int opt;
bool flag_wait = false;
bool flag_translocal[sizeof (rd) / sizeof (rd[0])] = { false };
int flag_create_2nd_rd = -1;
while ((opt = getopt (argc, argv, "d:tTw")) != EOF)
{
switch (opt)
{
case 'd':
flag_create_2nd_rd = atoi (optarg);
break;
case 't':
flag_translocal[0] = true;
break;
case 'T':
flag_translocal[1] = true;
break;
case 'w':
flag_wait = true;
break;
default:
fprintf (stderr, "usage error: see source code\n");
exit (2);
}
}
if ((ppant = dds_create_participant (DDS_DOMAIN_DEFAULT, NULL, NULL)) < 0)
oops ();
qos = dds_create_qos ();
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_SECS (10));
dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL);
if ((tp = dds_create_topic (ppant, &Msg_desc, "Msg", qos, NULL)) < 0)
oops ();
if (flag_wait)
{
printf ("waiting for writer ...\n");
fflush (stdout);
wait_for_writer (ppant);
printf ("writer seen; giving it some time to discover us and publish data ...\n");
fflush (stdout);
dds_sleepfor (DDS_SECS (1));
printf ("continuing ...\n");
fflush (stdout);
}
/* Reader has overrides for history, durability */
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
dds_qset_durability (qos, flag_translocal[0] ? DDS_DURABILITY_TRANSIENT_LOCAL : DDS_DURABILITY_VOLATILE);
dds_qset_userdata (qos, "0", 1);
if ((rd[0] = dds_create_reader (ppant, tp, qos, NULL)) < 0)
oops ();
dds_qset_durability (qos, flag_translocal[1] ? DDS_DURABILITY_TRANSIENT_LOCAL : DDS_DURABILITY_VOLATILE);
dds_qset_userdata (qos, "1", 1);
int32_t firstmsg[2] = { 0 };
int32_t prevmsg[2] = { 0 };
int32_t seqatmatch[2] = { 0 };
int32_t tldepth = 0;
int32_t endmsg = 0;
while (prevmsg[0] == 0 || get_subscription_matched_count (rd[0]) > 0)
{
void *raw = NULL;
dds_sample_info_t info;
int32_t n;
for (int i = 0; i < 2 && rd[i]; i++)
{
if ((n = dds_take (rd[i], &raw, &info, 1, 1)) < 0)
oops ();
else if (n > 0 && info.valid_data)
{
const Msg *msg = raw;
if (prevmsg[i] == 0)
{
/* have to postpone first seq# check for transient-local data because the limit
t-l history means the first sample we read may have an arbitrary sequence
that antedated the matching */
printf ("reader %d: first seq %d\n", i, (int) msg->seq);
fflush (stdout);
firstmsg[i] = msg->seq;
}
else if (msg->seq != prevmsg[i] + 1)
{
printf ("reader %d: received %d, previous %d\n", i, (int) msg->seq, (int) prevmsg[i]);
oops ();
}
prevmsg[i] = msg->seq;
endmsg = msg->final_seq;
tldepth = msg->tldepth;
if (seqatmatch[i] == 0)
seqatmatch[i] = msg->seq_at_match[i];
dds_return_loan (rd[i], &raw, n);
}
}
if (rd[1] == 0 && prevmsg[0] == flag_create_2nd_rd)
{
if ((rd[1] = dds_create_reader (ppant, tp, qos, NULL)) < 0)
oops ();
}
dds_sleepfor (DDS_MSECS (10));
}
if (tldepth == 0 || endmsg == 0)
oops ();
for (int i = 0; i < 2; i++)
{
if (rd[i] == 0)
continue;
if (prevmsg[i] != endmsg)
oops ();
int32_t refseq;
if (!flag_translocal[i])
refseq = seqatmatch[i];
else if (seqatmatch[i] <= tldepth)
refseq = 1;
else
refseq = seqatmatch[i] - tldepth;
if (flag_translocal[i] ? (firstmsg[i] > refseq + 1) : firstmsg[i] > refseq)
{
/* allow the rare cases where an additional sample was received for volatile data
(for t-l data, the publisher waits to give so the subscriber can get the data
in time */
printf ("reader %d: first seq %d but refseq %d\n", i, (int) firstmsg[i], refseq);
oops ();
}
}
dds_delete_qos (qos);
dds_delete (ppant);
return 0;
}

View file

@ -0,0 +1,26 @@
#
# Copyright(c) 2019 ADLINK Technology Limited and others
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0 which is available at
# http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
# v. 1.0 which is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
#
idlc_generate(RhcTypes RhcTypes.idl)
add_executable(rhc_torture rhc_torture.c)
target_include_directories(
rhc_torture PRIVATE
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsc/src>"
"$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../../ddsi/include>")
target_link_libraries(rhc_torture RhcTypes ddsc)
add_test(
NAME rhc_torture
COMMAND rhc_torture 314159265 0 5000 0)
set_property(TEST rhc_torture PROPERTY TIMEOUT 20)