Perform type conversion for transient-local data
The changes in d92d491b83
to deal with
local readers and writers with the same topic and type name but
different underlying `struct ddsi_sertopic`s did not include the
provisioning of historical data from a (local) transient-local writer to
a (local) transient-local reader.
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
efefb5009f
commit
84abf48d4a
5 changed files with 180 additions and 99 deletions
|
@ -96,37 +96,21 @@ struct local_sourceinfo {
|
|||
static struct ddsi_serdata *local_make_sample (struct ddsi_tkmap_instance **tk, struct ddsi_domaingv *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo)
|
||||
{
|
||||
struct local_sourceinfo *si = vsourceinfo;
|
||||
if (topic == si->src_topic)
|
||||
{
|
||||
*tk = si->src_tk;
|
||||
/* FIXME: see if this pair of refc increments can't be avoided
|
||||
They're needed because free_sample_after_delivery will always be called, but
|
||||
in the common case of a local writer and a single sertopic, make_sample doesn't
|
||||
actually create a sample, and so free_sample_after_delivery doesn't actually
|
||||
have to free anything */
|
||||
ddsi_tkmap_instance_ref (si->src_tk);
|
||||
return ddsi_serdata_ref (si->src_payload);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* ouch ... convert a serdata from one sertopic to another ... */
|
||||
ddsrt_iovec_t iov;
|
||||
uint32_t size = ddsi_serdata_size (si->src_payload);
|
||||
(void) ddsi_serdata_to_ser_ref (si->src_payload, 0, size, &iov);
|
||||
struct ddsi_serdata *d = ddsi_serdata_from_ser_iov (topic, si->src_payload->kind, 1, &iov, size);
|
||||
ddsi_serdata_to_ser_unref (si->src_payload, &iov);
|
||||
if (d)
|
||||
{
|
||||
d->statusinfo = si->src_payload->statusinfo;
|
||||
d->timestamp = si->src_payload->timestamp;
|
||||
*tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, d);
|
||||
}
|
||||
else
|
||||
struct ddsi_serdata *d = ddsi_serdata_ref_as_topic (topic, si->src_payload);
|
||||
if (d == NULL)
|
||||
{
|
||||
DDS_CWARNING (&gv->logconfig, "local: deserialization %s/%s failed in topic type conversion\n", topic->name, topic->type_name);
|
||||
return NULL;
|
||||
}
|
||||
if (topic != si->src_topic)
|
||||
*tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, d);
|
||||
else
|
||||
{
|
||||
// if the topic is the same, we can avoid the lookup
|
||||
ddsi_tkmap_instance_ref (si->src_tk);
|
||||
*tk = si->src_tk;
|
||||
}
|
||||
return d;
|
||||
}
|
||||
}
|
||||
|
||||
static dds_return_t local_on_delivery_failure_fastpath (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo)
|
||||
|
|
|
@ -328,7 +328,48 @@ static void logsink (void *arg, const dds_log_data_t *msg)
|
|||
ddsrt_atomic_inc32 (deser_fail);
|
||||
}
|
||||
|
||||
static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, bool fastpath)
|
||||
enum multi_sertopic_mode {
|
||||
MSM_FASTPATH,
|
||||
MSM_SLOWPATH,
|
||||
MSM_TRANSLOCAL
|
||||
};
|
||||
|
||||
static const char *multi_sertopic_modestr (enum multi_sertopic_mode mode)
|
||||
{
|
||||
switch (mode)
|
||||
{
|
||||
case MSM_FASTPATH: return "fastpath";
|
||||
case MSM_SLOWPATH: return "slowpath";
|
||||
case MSM_TRANSLOCAL: return "transient-local";
|
||||
}
|
||||
return "?";
|
||||
}
|
||||
|
||||
static void create_readers (dds_entity_t pp_sub, size_t nrds, dds_entity_t *rds, size_t ntps, const dds_entity_t *tps, const dds_qos_t *qos, dds_entity_t waitset)
|
||||
{
|
||||
assert (nrds >= ntps && (nrds % ntps) == 0);
|
||||
for (size_t i = 0; i < ntps; i++)
|
||||
{
|
||||
rds[i] = dds_create_reader (pp_sub, tps[i], qos, NULL);
|
||||
CU_ASSERT_FATAL (rds[i] > 0);
|
||||
}
|
||||
for (size_t i = ntps; i < nrds; i++)
|
||||
{
|
||||
rds[i] = dds_create_reader (pp_sub, tps[(i - ntps) / (nrds / ntps - 1)], qos, NULL);
|
||||
CU_ASSERT_FATAL (rds[i] > 0);
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < nrds; i++)
|
||||
{
|
||||
dds_return_t rc;
|
||||
rc = dds_set_status_mask (rds[i], DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS);
|
||||
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
|
||||
rc = dds_waitset_attach (waitset, rds[i], (dds_attach_t)i);
|
||||
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
|
||||
}
|
||||
}
|
||||
|
||||
static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, enum multi_sertopic_mode mode)
|
||||
{
|
||||
#define SEQ_IDX 0
|
||||
#define ARY_IDX 1
|
||||
|
@ -345,7 +386,12 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
dds_qos_t *qos;
|
||||
dds_return_t rc;
|
||||
|
||||
printf ("multi_sertopic: %s %s\n", (pp_pub == pp_sub) ? "local" : "remote", fastpath ? "fastpath" : "slowpath");
|
||||
printf ("multi_sertopic: %s %s\n", (pp_pub == pp_sub) ? "local" : "remote", multi_sertopic_modestr (mode));
|
||||
|
||||
/* Transient-local mode is for checking the local historical data delivery path (for remote, there
|
||||
is nothing special about it), and knowing it is local means we don't have to wait for historical
|
||||
data to arrive. So check. */
|
||||
assert (pp_pub == pp_sub || mode != MSM_TRANSLOCAL);
|
||||
|
||||
waitset = dds_create_waitset (DDS_CYCLONEDDS_HANDLE);
|
||||
CU_ASSERT_FATAL (waitset > 0);
|
||||
|
@ -353,8 +399,13 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
qos = dds_create_qos ();
|
||||
CU_ASSERT_FATAL (qos != NULL);
|
||||
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY);
|
||||
dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP);
|
||||
dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_RECEPTION_TIMESTAMP);
|
||||
dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0);
|
||||
if (mode == MSM_TRANSLOCAL)
|
||||
{
|
||||
dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL);
|
||||
dds_qset_durability_service (qos, 0, DDS_HISTORY_KEEP_ALL, 0, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED);
|
||||
}
|
||||
|
||||
create_unique_topic_name ("ddsc_multi_sertopic_lease_duration_zero", name, sizeof name);
|
||||
|
||||
|
@ -373,24 +424,11 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
sub_topics[i] = dds_create_topic (pp_sub, descs[i], name, qos, NULL);
|
||||
CU_ASSERT_FATAL (sub_topics[i] > 0);
|
||||
}
|
||||
DDSRT_STATIC_ASSERT (sizeof (readers) >= sizeof (sub_topics));
|
||||
DDSRT_STATIC_ASSERT ((sizeof (readers) % sizeof (sub_topics)) == 0);
|
||||
for (size_t i = 0; i < sizeof (sub_topics) / sizeof (sub_topics[0]); i++)
|
||||
{
|
||||
readers[i] = dds_create_reader (pp_sub, sub_topics[i], qos, NULL);
|
||||
CU_ASSERT_FATAL (readers[i] > 0);
|
||||
}
|
||||
for (size_t i = sizeof (sub_topics) / sizeof (sub_topics[0]); i < sizeof (readers) / sizeof (readers[0]); i++)
|
||||
{
|
||||
const size_t nrd = sizeof (readers) / sizeof (readers[0]);
|
||||
const size_t ntp = sizeof (sub_topics) / sizeof (sub_topics[0]);
|
||||
readers[i] = dds_create_reader (pp_sub, sub_topics[(i - ntp) / (nrd / ntp - 1)], qos, NULL);
|
||||
CU_ASSERT_FATAL (readers[i] > 0);
|
||||
}
|
||||
|
||||
dds_delete_qos (qos);
|
||||
if (mode != MSM_TRANSLOCAL)
|
||||
{
|
||||
create_readers (pp_sub, sizeof (readers) / sizeof (readers[0]), readers, sizeof (sub_topics) / sizeof (sub_topics[0]), sub_topics, qos, waitset);
|
||||
|
||||
/* wait for discovery to complete */
|
||||
for (size_t i = 0; i < sizeof (writers) / sizeof (writers[0]); i++)
|
||||
{
|
||||
rc = dds_set_status_mask (writers[i], DDS_PUBLICATION_MATCHED_STATUS);
|
||||
|
@ -398,13 +436,6 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
rc = dds_waitset_attach (waitset, writers[i], -(dds_attach_t)i - 1);
|
||||
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
|
||||
}
|
||||
for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++)
|
||||
{
|
||||
rc = dds_set_status_mask (readers[i], DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS);
|
||||
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
|
||||
rc = dds_waitset_attach (waitset, readers[i], (dds_attach_t)i);
|
||||
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
|
||||
}
|
||||
|
||||
printf ("wait for discovery, fastpath_ok; delete & recreate readers\n");
|
||||
while (!(get_and_check_writer_status (sizeof (writers) / sizeof (writers[0]), writers, sizeof (readers) / sizeof (readers[0])) &&
|
||||
|
@ -418,12 +449,13 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
for it to be set on all (proxy) writers, then possibly reset it */
|
||||
for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++)
|
||||
waitfor_or_reset_fastpath (readers[i], true, sizeof (writers) / sizeof (writers[0]));
|
||||
if (!fastpath)
|
||||
if (mode == MSM_SLOWPATH)
|
||||
{
|
||||
printf ("clear fastpath_ok\n");
|
||||
for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++)
|
||||
waitfor_or_reset_fastpath (readers[i], false, sizeof (writers) / sizeof (writers[0]));
|
||||
}
|
||||
}
|
||||
|
||||
/* check the log output for deserialization failures */
|
||||
ddsrt_atomic_uint32_t deser_fail = DDSRT_ATOMIC_UINT32_INIT (0);
|
||||
|
@ -465,6 +497,11 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
|
||||
}
|
||||
|
||||
if (mode == MSM_TRANSLOCAL)
|
||||
{
|
||||
create_readers (pp_sub, sizeof (readers) / sizeof (readers[0]), readers, sizeof (sub_topics) / sizeof (sub_topics[0]), sub_topics, qos, waitset);
|
||||
}
|
||||
|
||||
/* All readers should have received three samples, and those that are of type seq
|
||||
should have received one extra (whereas the others should cause deserialization
|
||||
failure warnings) */
|
||||
|
@ -472,8 +509,14 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
const size_t nexp = ((sizeof (writers) / sizeof (writers[0])) *
|
||||
(sizeof (readers) / sizeof (readers[0])) +
|
||||
((sizeof (readers) / sizeof (readers[0])) / (sizeof (sub_topics) / sizeof (sub_topics[0]))));
|
||||
/* expecting exactly as many deserialization failures as there are topics other than seq */
|
||||
const size_t nexp_fail = sizeof (sub_topics) / sizeof (sub_topics[0]) - 1;
|
||||
/* For the volatile case, expecting exactly as many deserialization failures as there
|
||||
are topics other than seq because the conversion is done only once for each topic,
|
||||
even if there are multiple readers. For transient-local data, the data set is
|
||||
converted for each new reader (of a different topic) and there will therefore be more
|
||||
conversion failures. */
|
||||
const size_t nexp_fail =
|
||||
(sizeof (sub_topics) / sizeof (sub_topics[0]) - 1) *
|
||||
(mode != MSM_TRANSLOCAL ? 1 : (sizeof (readers) / sizeof (readers[0])) / (sizeof (sub_topics) / sizeof (sub_topics[0])));
|
||||
uint32_t nseen = 0;
|
||||
while (nseen < nexp)
|
||||
{
|
||||
|
@ -572,6 +615,7 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
/* deleting the waitset is important: it is bound to the library rather than to
|
||||
a domain and consequently won't be deleted simply because all domains are */
|
||||
rc = dds_delete (waitset);
|
||||
dds_delete_qos (qos);
|
||||
|
||||
CU_ASSERT_FATAL (rc == DDS_RETCODE_OK);
|
||||
dds_set_log_sink (0, NULL);
|
||||
|
@ -579,20 +623,25 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub,
|
|||
|
||||
CU_Test(ddsc_multi_sertopic, local, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
|
||||
{
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, true);
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, MSM_FASTPATH);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_multi_sertopic, remote, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
|
||||
{
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, true);
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, MSM_FASTPATH);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_multi_sertopic, local_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
|
||||
{
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, false);
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, MSM_SLOWPATH);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_multi_sertopic, remote_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
|
||||
{
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, false);
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, MSM_SLOWPATH);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_multi_sertopic, transient_local, .init = multi_sertopic_init, .fini = multi_sertopic_fini)
|
||||
{
|
||||
ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, MSM_TRANSLOCAL);
|
||||
}
|
||||
|
|
|
@ -163,6 +163,22 @@ struct ddsi_serdata_ops {
|
|||
|
||||
DDS_EXPORT void ddsi_serdata_init (struct ddsi_serdata *d, const struct ddsi_sertopic *tp, enum ddsi_serdata_kind kind);
|
||||
|
||||
/**
|
||||
* @brief Return a reference to a serdata with possible topic conversion
|
||||
*
|
||||
* If `serdata` is of topic `topic`, this increments the reference count and returns
|
||||
* `serdata`. Otherwise, it constructs a new one from the serialised representation of
|
||||
* `serdata`. This can fail, in which case it returns NULL.
|
||||
*
|
||||
* @param[in] topic sertopic the returned serdata must have
|
||||
* @param[in] serdata source sample (untouched except for the reference count and/or
|
||||
* extracting the serialised representation)
|
||||
* @returns A reference to a serdata that is equivalent to the input with the correct
|
||||
* topic, or a null pointer on failure. The reference must be released with @ref
|
||||
* ddsi_serdata_unref.
|
||||
*/
|
||||
DDS_EXPORT struct ddsi_serdata *ddsi_serdata_ref_as_topic (const struct ddsi_sertopic *topic, struct ddsi_serdata *serdata);
|
||||
|
||||
DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *serdata_const) {
|
||||
struct ddsi_serdata *serdata = (struct ddsi_serdata *)serdata_const;
|
||||
ddsrt_atomic_inc32 (&serdata->refc);
|
||||
|
|
|
@ -32,6 +32,27 @@ void ddsi_serdata_init (struct ddsi_serdata *d, const struct ddsi_sertopic *tp,
|
|||
ddsrt_atomic_st32 (&d->refc, 1);
|
||||
}
|
||||
|
||||
struct ddsi_serdata *ddsi_serdata_ref_as_topic (const struct ddsi_sertopic *topic, struct ddsi_serdata *serdata)
|
||||
{
|
||||
if (serdata->topic == topic)
|
||||
return ddsi_serdata_ref (serdata);
|
||||
else
|
||||
{
|
||||
/* ouch ... convert a serdata from one sertopic to another ... */
|
||||
struct ddsi_serdata *converted;
|
||||
ddsrt_iovec_t iov;
|
||||
uint32_t size = ddsi_serdata_size (serdata);
|
||||
(void) ddsi_serdata_to_ser_ref (serdata, 0, size, &iov);
|
||||
if ((converted = ddsi_serdata_from_ser_iov (topic, serdata->kind, 1, &iov, size)) != NULL)
|
||||
{
|
||||
converted->statusinfo = serdata->statusinfo;
|
||||
converted->timestamp = serdata->timestamp;
|
||||
}
|
||||
ddsi_serdata_to_ser_unref (serdata, &iov);
|
||||
return converted;
|
||||
}
|
||||
}
|
||||
|
||||
extern inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *serdata_const);
|
||||
extern inline void ddsi_serdata_unref (struct ddsi_serdata *serdata);
|
||||
extern inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d);
|
||||
|
|
|
@ -2274,6 +2274,34 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd,
|
|||
}
|
||||
}
|
||||
|
||||
static void deliver_historical_data (const struct writer *wr, const struct reader *rd)
|
||||
{
|
||||
struct ddsi_domaingv * const gv = wr->e.gv;
|
||||
struct ddsi_tkmap * const tkmap = gv->m_tkmap;
|
||||
struct whc_sample_iter it;
|
||||
struct whc_borrowed_sample sample;
|
||||
/* FIXME: should limit ourselves to what it is available because of durability history, not writer history */
|
||||
whc_sample_iter_init (wr->whc, &it);
|
||||
while (whc_sample_iter_borrow_next (&it, &sample))
|
||||
{
|
||||
struct ddsi_serdata *payload;
|
||||
if ((payload = ddsi_serdata_ref_as_topic (rd->topic, sample.serdata)) == NULL)
|
||||
{
|
||||
GVWARNING ("local: deserialization of %s/%s as %s/%s failed in topic type conversion\n",
|
||||
wr->topic->name, wr->topic->type_name, rd->topic->name, rd->topic->type_name);
|
||||
}
|
||||
else
|
||||
{
|
||||
struct ddsi_writer_info wrinfo;
|
||||
struct ddsi_tkmap_instance *tk = ddsi_tkmap_lookup_instance_ref (tkmap, payload);
|
||||
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo);
|
||||
(void) ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk);
|
||||
ddsi_tkmap_instance_unref (tkmap, tk);
|
||||
ddsi_serdata_unref (payload);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void writer_add_local_connection (struct writer *wr, struct reader *rd)
|
||||
{
|
||||
struct wr_rd_match *m = ddsrt_malloc (sizeof (*m));
|
||||
|
@ -2296,26 +2324,9 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd)
|
|||
local_reader_ary_insert (&wr->rdary, rd);
|
||||
|
||||
/* Store available data into the late joining reader when it is reliable (we don't do
|
||||
historical data for best-effort data over the wire, so also not locally).
|
||||
FIXME: should limit ourselves to what it is available because of durability history,
|
||||
not writer history */
|
||||
historical data for best-effort data over the wire, so also not locally). */
|
||||
if (rd->xqos->reliability.kind > DDS_RELIABILITY_BEST_EFFORT && rd->xqos->durability.kind > DDS_DURABILITY_VOLATILE)
|
||||
{
|
||||
struct ddsi_tkmap *tkmap = rd->e.gv->m_tkmap;
|
||||
struct whc_sample_iter it;
|
||||
struct whc_borrowed_sample sample;
|
||||
whc_sample_iter_init(wr->whc, &it);
|
||||
while (whc_sample_iter_borrow_next(&it, &sample))
|
||||
{
|
||||
struct ddsi_writer_info wrinfo;
|
||||
struct ddsi_serdata *payload = sample.serdata;
|
||||
/* FIXME: whc has tk reference in its index nodes, which is what we really should be iterating over anyway, and so we don't really have to look them up anymore */
|
||||
struct ddsi_tkmap_instance *tk = ddsi_tkmap_lookup_instance_ref (tkmap, payload);
|
||||
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, sample.serdata->statusinfo);
|
||||
(void) ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk);
|
||||
ddsi_tkmap_instance_unref (tkmap, tk);
|
||||
}
|
||||
}
|
||||
deliver_historical_data (wr, rd);
|
||||
|
||||
ddsrt_mutex_unlock (&wr->e.lock);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue