diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index cc0b1e8..5d7539c 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -96,37 +96,21 @@ struct local_sourceinfo { static struct ddsi_serdata *local_make_sample (struct ddsi_tkmap_instance **tk, struct ddsi_domaingv *gv, struct ddsi_sertopic const * const topic, void *vsourceinfo) { struct local_sourceinfo *si = vsourceinfo; - if (topic == si->src_topic) + struct ddsi_serdata *d = ddsi_serdata_ref_as_topic (topic, si->src_payload); + if (d == NULL) { - *tk = si->src_tk; - /* FIXME: see if this pair of refc increments can't be avoided - They're needed because free_sample_after_delivery will always be called, but - in the common case of a local writer and a single sertopic, make_sample doesn't - actually create a sample, and so free_sample_after_delivery doesn't actually - have to free anything */ - ddsi_tkmap_instance_ref (si->src_tk); - return ddsi_serdata_ref (si->src_payload); + DDS_CWARNING (&gv->logconfig, "local: deserialization %s/%s failed in topic type conversion\n", topic->name, topic->type_name); + return NULL; } + if (topic != si->src_topic) + *tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, d); else { - /* ouch ... convert a serdata from one sertopic to another ... */ - ddsrt_iovec_t iov; - uint32_t size = ddsi_serdata_size (si->src_payload); - (void) ddsi_serdata_to_ser_ref (si->src_payload, 0, size, &iov); - struct ddsi_serdata *d = ddsi_serdata_from_ser_iov (topic, si->src_payload->kind, 1, &iov, size); - ddsi_serdata_to_ser_unref (si->src_payload, &iov); - if (d) - { - d->statusinfo = si->src_payload->statusinfo; - d->timestamp = si->src_payload->timestamp; - *tk = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, d); - } - else - { - DDS_CWARNING (&gv->logconfig, "local: deserialization %s/%s failed in topic type conversion\n", topic->name, topic->type_name); - } - return d; + // if the topic is the same, we can avoid the lookup + ddsi_tkmap_instance_ref (si->src_tk); + *tk = si->src_tk; } + return d; } static dds_return_t local_on_delivery_failure_fastpath (struct entity_common *source_entity, bool source_entity_locked, struct local_reader_ary *fastpath_rdary, void *vsourceinfo) diff --git a/src/core/ddsc/tests/multi_sertopic.c b/src/core/ddsc/tests/multi_sertopic.c index f66b89b..31518ed 100644 --- a/src/core/ddsc/tests/multi_sertopic.c +++ b/src/core/ddsc/tests/multi_sertopic.c @@ -328,7 +328,48 @@ static void logsink (void *arg, const dds_log_data_t *msg) ddsrt_atomic_inc32 (deser_fail); } -static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, bool fastpath) +enum multi_sertopic_mode { + MSM_FASTPATH, + MSM_SLOWPATH, + MSM_TRANSLOCAL +}; + +static const char *multi_sertopic_modestr (enum multi_sertopic_mode mode) +{ + switch (mode) + { + case MSM_FASTPATH: return "fastpath"; + case MSM_SLOWPATH: return "slowpath"; + case MSM_TRANSLOCAL: return "transient-local"; + } + return "?"; +} + +static void create_readers (dds_entity_t pp_sub, size_t nrds, dds_entity_t *rds, size_t ntps, const dds_entity_t *tps, const dds_qos_t *qos, dds_entity_t waitset) +{ + assert (nrds >= ntps && (nrds % ntps) == 0); + for (size_t i = 0; i < ntps; i++) + { + rds[i] = dds_create_reader (pp_sub, tps[i], qos, NULL); + CU_ASSERT_FATAL (rds[i] > 0); + } + for (size_t i = ntps; i < nrds; i++) + { + rds[i] = dds_create_reader (pp_sub, tps[(i - ntps) / (nrds / ntps - 1)], qos, NULL); + CU_ASSERT_FATAL (rds[i] > 0); + } + + for (size_t i = 0; i < nrds; i++) + { + dds_return_t rc; + rc = dds_set_status_mask (rds[i], DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + rc = dds_waitset_attach (waitset, rds[i], (dds_attach_t)i); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + } +} + +static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, enum multi_sertopic_mode mode) { #define SEQ_IDX 0 #define ARY_IDX 1 @@ -345,7 +386,12 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, dds_qos_t *qos; dds_return_t rc; - printf ("multi_sertopic: %s %s\n", (pp_pub == pp_sub) ? "local" : "remote", fastpath ? "fastpath" : "slowpath"); + printf ("multi_sertopic: %s %s\n", (pp_pub == pp_sub) ? "local" : "remote", multi_sertopic_modestr (mode)); + + /* Transient-local mode is for checking the local historical data delivery path (for remote, there + is nothing special about it), and knowing it is local means we don't have to wait for historical + data to arrive. So check. */ + assert (pp_pub == pp_sub || mode != MSM_TRANSLOCAL); waitset = dds_create_waitset (DDS_CYCLONEDDS_HANDLE); CU_ASSERT_FATAL (waitset > 0); @@ -353,8 +399,13 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, qos = dds_create_qos (); CU_ASSERT_FATAL (qos != NULL); dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_INFINITY); - dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_SOURCE_TIMESTAMP); + dds_qset_destination_order (qos, DDS_DESTINATIONORDER_BY_RECEPTION_TIMESTAMP); dds_qset_history (qos, DDS_HISTORY_KEEP_ALL, 0); + if (mode == MSM_TRANSLOCAL) + { + dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL); + dds_qset_durability_service (qos, 0, DDS_HISTORY_KEEP_ALL, 0, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED, DDS_LENGTH_UNLIMITED); + } create_unique_topic_name ("ddsc_multi_sertopic_lease_duration_zero", name, sizeof name); @@ -373,56 +424,37 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, sub_topics[i] = dds_create_topic (pp_sub, descs[i], name, qos, NULL); CU_ASSERT_FATAL (sub_topics[i] > 0); } - DDSRT_STATIC_ASSERT (sizeof (readers) >= sizeof (sub_topics)); - DDSRT_STATIC_ASSERT ((sizeof (readers) % sizeof (sub_topics)) == 0); - for (size_t i = 0; i < sizeof (sub_topics) / sizeof (sub_topics[0]); i++) - { - readers[i] = dds_create_reader (pp_sub, sub_topics[i], qos, NULL); - CU_ASSERT_FATAL (readers[i] > 0); - } - for (size_t i = sizeof (sub_topics) / sizeof (sub_topics[0]); i < sizeof (readers) / sizeof (readers[0]); i++) - { - const size_t nrd = sizeof (readers) / sizeof (readers[0]); - const size_t ntp = sizeof (sub_topics) / sizeof (sub_topics[0]); - readers[i] = dds_create_reader (pp_sub, sub_topics[(i - ntp) / (nrd / ntp - 1)], qos, NULL); - CU_ASSERT_FATAL (readers[i] > 0); - } - dds_delete_qos (qos); + if (mode != MSM_TRANSLOCAL) + { + create_readers (pp_sub, sizeof (readers) / sizeof (readers[0]), readers, sizeof (sub_topics) / sizeof (sub_topics[0]), sub_topics, qos, waitset); - /* wait for discovery to complete */ - for (size_t i = 0; i < sizeof (writers) / sizeof (writers[0]); i++) - { - rc = dds_set_status_mask (writers[i], DDS_PUBLICATION_MATCHED_STATUS); - CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); - rc = dds_waitset_attach (waitset, writers[i], -(dds_attach_t)i - 1); - CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); - } - for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++) - { - rc = dds_set_status_mask (readers[i], DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_DATA_AVAILABLE_STATUS); - CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); - rc = dds_waitset_attach (waitset, readers[i], (dds_attach_t)i); - CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); - } + for (size_t i = 0; i < sizeof (writers) / sizeof (writers[0]); i++) + { + rc = dds_set_status_mask (writers[i], DDS_PUBLICATION_MATCHED_STATUS); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + rc = dds_waitset_attach (waitset, writers[i], -(dds_attach_t)i - 1); + CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); + } - printf ("wait for discovery, fastpath_ok; delete & recreate readers\n"); - while (!(get_and_check_writer_status (sizeof (writers) / sizeof (writers[0]), writers, sizeof (readers) / sizeof (readers[0])) && - get_and_check_reader_status (sizeof (readers) / sizeof (readers[0]), readers, sizeof (writers) / sizeof (writers[0])))) - { - rc = dds_waitset_wait (waitset, NULL, 0, DDS_SECS(5)); - CU_ASSERT_FATAL (rc >= 1); - } + printf ("wait for discovery, fastpath_ok; delete & recreate readers\n"); + while (!(get_and_check_writer_status (sizeof (writers) / sizeof (writers[0]), writers, sizeof (readers) / sizeof (readers[0])) && + get_and_check_reader_status (sizeof (readers) / sizeof (readers[0]), readers, sizeof (writers) / sizeof (writers[0])))) + { + rc = dds_waitset_wait (waitset, NULL, 0, DDS_SECS(5)); + CU_ASSERT_FATAL (rc >= 1); + } - /* we want to check both the fast path and the slow path ... so first wait - for it to be set on all (proxy) writers, then possibly reset it */ - for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++) - waitfor_or_reset_fastpath (readers[i], true, sizeof (writers) / sizeof (writers[0])); - if (!fastpath) - { - printf ("clear fastpath_ok\n"); + /* we want to check both the fast path and the slow path ... so first wait + for it to be set on all (proxy) writers, then possibly reset it */ for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++) - waitfor_or_reset_fastpath (readers[i], false, sizeof (writers) / sizeof (writers[0])); + waitfor_or_reset_fastpath (readers[i], true, sizeof (writers) / sizeof (writers[0])); + if (mode == MSM_SLOWPATH) + { + printf ("clear fastpath_ok\n"); + for (size_t i = 0; i < sizeof (readers) / sizeof (readers[0]); i++) + waitfor_or_reset_fastpath (readers[i], false, sizeof (writers) / sizeof (writers[0])); + } } /* check the log output for deserialization failures */ @@ -465,6 +497,11 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); } + if (mode == MSM_TRANSLOCAL) + { + create_readers (pp_sub, sizeof (readers) / sizeof (readers[0]), readers, sizeof (sub_topics) / sizeof (sub_topics[0]), sub_topics, qos, waitset); + } + /* All readers should have received three samples, and those that are of type seq should have received one extra (whereas the others should cause deserialization failure warnings) */ @@ -472,8 +509,14 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, const size_t nexp = ((sizeof (writers) / sizeof (writers[0])) * (sizeof (readers) / sizeof (readers[0])) + ((sizeof (readers) / sizeof (readers[0])) / (sizeof (sub_topics) / sizeof (sub_topics[0])))); - /* expecting exactly as many deserialization failures as there are topics other than seq */ - const size_t nexp_fail = sizeof (sub_topics) / sizeof (sub_topics[0]) - 1; + /* For the volatile case, expecting exactly as many deserialization failures as there + are topics other than seq because the conversion is done only once for each topic, + even if there are multiple readers. For transient-local data, the data set is + converted for each new reader (of a different topic) and there will therefore be more + conversion failures. */ + const size_t nexp_fail = + (sizeof (sub_topics) / sizeof (sub_topics[0]) - 1) * + (mode != MSM_TRANSLOCAL ? 1 : (sizeof (readers) / sizeof (readers[0])) / (sizeof (sub_topics) / sizeof (sub_topics[0]))); uint32_t nseen = 0; while (nseen < nexp) { @@ -572,6 +615,7 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, /* deleting the waitset is important: it is bound to the library rather than to a domain and consequently won't be deleted simply because all domains are */ rc = dds_delete (waitset); + dds_delete_qos (qos); CU_ASSERT_FATAL (rc == DDS_RETCODE_OK); dds_set_log_sink (0, NULL); @@ -579,20 +623,25 @@ static void ddsc_multi_sertopic_impl (dds_entity_t pp_pub, dds_entity_t pp_sub, CU_Test(ddsc_multi_sertopic, local, .init = multi_sertopic_init, .fini = multi_sertopic_fini) { - ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, true); + ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, MSM_FASTPATH); } CU_Test(ddsc_multi_sertopic, remote, .init = multi_sertopic_init, .fini = multi_sertopic_fini) { - ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, true); + ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, MSM_FASTPATH); } CU_Test(ddsc_multi_sertopic, local_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini) { - ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, false); + ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, MSM_SLOWPATH); } CU_Test(ddsc_multi_sertopic, remote_slowpath, .init = multi_sertopic_init, .fini = multi_sertopic_fini) { - ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, false); + ddsc_multi_sertopic_impl (g_pub_participant, g_sub_participant, MSM_SLOWPATH); +} + +CU_Test(ddsc_multi_sertopic, transient_local, .init = multi_sertopic_init, .fini = multi_sertopic_fini) +{ + ddsc_multi_sertopic_impl (g_pub_participant, g_pub_participant, MSM_TRANSLOCAL); } diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h index 1343c23..39a8841 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_serdata.h @@ -163,6 +163,22 @@ struct ddsi_serdata_ops { DDS_EXPORT void ddsi_serdata_init (struct ddsi_serdata *d, const struct ddsi_sertopic *tp, enum ddsi_serdata_kind kind); +/** + * @brief Return a reference to a serdata with possible topic conversion + * + * If `serdata` is of topic `topic`, this increments the reference count and returns + * `serdata`. Otherwise, it constructs a new one from the serialised representation of + * `serdata`. This can fail, in which case it returns NULL. + * + * @param[in] topic sertopic the returned serdata must have + * @param[in] serdata source sample (untouched except for the reference count and/or + * extracting the serialised representation) + * @returns A reference to a serdata that is equivalent to the input with the correct + * topic, or a null pointer on failure. The reference must be released with @ref + * ddsi_serdata_unref. + */ +DDS_EXPORT struct ddsi_serdata *ddsi_serdata_ref_as_topic (const struct ddsi_sertopic *topic, struct ddsi_serdata *serdata); + DDS_EXPORT inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *serdata_const) { struct ddsi_serdata *serdata = (struct ddsi_serdata *)serdata_const; ddsrt_atomic_inc32 (&serdata->refc); diff --git a/src/core/ddsi/src/ddsi_serdata.c b/src/core/ddsi/src/ddsi_serdata.c index d1f5fa6..9bae824 100644 --- a/src/core/ddsi/src/ddsi_serdata.c +++ b/src/core/ddsi/src/ddsi_serdata.c @@ -32,6 +32,27 @@ void ddsi_serdata_init (struct ddsi_serdata *d, const struct ddsi_sertopic *tp, ddsrt_atomic_st32 (&d->refc, 1); } +struct ddsi_serdata *ddsi_serdata_ref_as_topic (const struct ddsi_sertopic *topic, struct ddsi_serdata *serdata) +{ + if (serdata->topic == topic) + return ddsi_serdata_ref (serdata); + else + { + /* ouch ... convert a serdata from one sertopic to another ... */ + struct ddsi_serdata *converted; + ddsrt_iovec_t iov; + uint32_t size = ddsi_serdata_size (serdata); + (void) ddsi_serdata_to_ser_ref (serdata, 0, size, &iov); + if ((converted = ddsi_serdata_from_ser_iov (topic, serdata->kind, 1, &iov, size)) != NULL) + { + converted->statusinfo = serdata->statusinfo; + converted->timestamp = serdata->timestamp; + } + ddsi_serdata_to_ser_unref (serdata, &iov); + return converted; + } +} + extern inline struct ddsi_serdata *ddsi_serdata_ref (const struct ddsi_serdata *serdata_const); extern inline void ddsi_serdata_unref (struct ddsi_serdata *serdata); extern inline uint32_t ddsi_serdata_size (const struct ddsi_serdata *d); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 3fa1806..51ef631 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2274,6 +2274,34 @@ static void writer_add_connection (struct writer *wr, struct proxy_reader *prd, } } +static void deliver_historical_data (const struct writer *wr, const struct reader *rd) +{ + struct ddsi_domaingv * const gv = wr->e.gv; + struct ddsi_tkmap * const tkmap = gv->m_tkmap; + struct whc_sample_iter it; + struct whc_borrowed_sample sample; + /* FIXME: should limit ourselves to what it is available because of durability history, not writer history */ + whc_sample_iter_init (wr->whc, &it); + while (whc_sample_iter_borrow_next (&it, &sample)) + { + struct ddsi_serdata *payload; + if ((payload = ddsi_serdata_ref_as_topic (rd->topic, sample.serdata)) == NULL) + { + GVWARNING ("local: deserialization of %s/%s as %s/%s failed in topic type conversion\n", + wr->topic->name, wr->topic->type_name, rd->topic->name, rd->topic->type_name); + } + else + { + struct ddsi_writer_info wrinfo; + struct ddsi_tkmap_instance *tk = ddsi_tkmap_lookup_instance_ref (tkmap, payload); + ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, payload->statusinfo); + (void) ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk); + ddsi_tkmap_instance_unref (tkmap, tk); + ddsi_serdata_unref (payload); + } + } +} + static void writer_add_local_connection (struct writer *wr, struct reader *rd) { struct wr_rd_match *m = ddsrt_malloc (sizeof (*m)); @@ -2296,26 +2324,9 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd) local_reader_ary_insert (&wr->rdary, rd); /* Store available data into the late joining reader when it is reliable (we don't do - historical data for best-effort data over the wire, so also not locally). - FIXME: should limit ourselves to what it is available because of durability history, - not writer history */ + historical data for best-effort data over the wire, so also not locally). */ if (rd->xqos->reliability.kind > DDS_RELIABILITY_BEST_EFFORT && rd->xqos->durability.kind > DDS_DURABILITY_VOLATILE) - { - struct ddsi_tkmap *tkmap = rd->e.gv->m_tkmap; - struct whc_sample_iter it; - struct whc_borrowed_sample sample; - whc_sample_iter_init(wr->whc, &it); - while (whc_sample_iter_borrow_next(&it, &sample)) - { - struct ddsi_writer_info wrinfo; - struct ddsi_serdata *payload = sample.serdata; - /* FIXME: whc has tk reference in its index nodes, which is what we really should be iterating over anyway, and so we don't really have to look them up anymore */ - struct ddsi_tkmap_instance *tk = ddsi_tkmap_lookup_instance_ref (tkmap, payload); - ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos, sample.serdata->statusinfo); - (void) ddsi_rhc_store (rd->rhc, &wrinfo, payload, tk); - ddsi_tkmap_instance_unref (tkmap, tk); - } - } + deliver_historical_data (wr, rd); ddsrt_mutex_unlock (&wr->e.lock);