From fe81a6bda50cd84cb6537524c8f07884f122fafd Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Tue, 12 May 2020 09:07:08 +0200 Subject: [PATCH] Make wait_for_acks implementation capable of waiting for one reader The dds_wait_for_acks function follows the DCPS specification and allows waiting for all matching readers to have acknowledged all data written prior to that point. This commit leaves the API unchanged but extends the implementation to make it possible to wait until a specific reader has acknowledged everything, as this is a useful device in testing with deliberate one-way disconnections using dds_domain_set_deafmute. Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds__writer.h | 2 +- src/core/ddsc/src/dds_publisher.c | 2 +- src/core/ddsc/src/dds_writer.c | 4 ++-- src/core/ddsi/include/dds/ddsi/q_entity.h | 2 +- src/core/ddsi/src/q_entity.c | 24 ++++++++++++++++++----- 5 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/core/ddsc/src/dds__writer.h b/src/core/ddsc/src/dds__writer.h index 69ff7e1..111e93d 100644 --- a/src/core/ddsc/src/dds__writer.h +++ b/src/core/ddsc/src/dds__writer.h @@ -23,7 +23,7 @@ DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER) struct status_cb_data; void dds_writer_status_cb (void *entity, const struct status_cb_data * data); -dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout); +DDS_EXPORT dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index 00ea520..ca17999 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -114,7 +114,7 @@ dds_return_t dds_wait_for_acks (dds_entity_t publisher_or_writer, dds_duration_t return DDS_RETCODE_UNSUPPORTED; case DDS_KIND_WRITER: - ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, abstimeout); + ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, NULL, abstimeout); dds_entity_unpin (p_or_w_ent); return ret; diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index bebebcc..cad0492 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -410,14 +410,14 @@ dds_entity_t dds_get_publisher (dds_entity_t writer) } } -dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout) +dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout) { /* during lifetime of the writer m_wr is constant, it is only during deletion that it gets erased at some point */ if (wr->m_wr == NULL) return DDS_RETCODE_OK; else - return writer_wait_for_acks (wr->m_wr, abstimeout); + return writer_wait_for_acks (wr->m_wr, rdguid, abstimeout); } DDS_GET_STATUS(writer, publication_matched, PUBLICATION_MATCHED, total_count_change, current_count_change) diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 14f59f8..289ab84 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -655,7 +655,7 @@ seqno_t writer_max_drop_seq (const struct writer *wr); int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst); void writer_set_retransmitting (struct writer *wr); void writer_clear_retransmitting (struct writer *wr); -dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout); +dds_return_t writer_wait_for_acks (struct writer *wr, const ddsi_guid_t *rdguid, dds_time_t abstimeout); dds_return_t unblock_throttled_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid); dds_return_t delete_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 4fb30cd..9a8b5ef 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -3969,16 +3969,30 @@ dds_return_t unblock_throttled_writer (struct ddsi_domaingv *gv, const struct dd return 0; } -dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout) +dds_return_t writer_wait_for_acks (struct writer *wr, const ddsi_guid_t *rdguid, dds_time_t abstimeout) { dds_return_t rc; seqno_t ref_seq; ddsrt_mutex_lock (&wr->e.lock); ref_seq = wr->seq; - while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr)) - if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) - break; - rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; + if (rdguid == NULL) + { + while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr)) + if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) + break; + rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; + } + else + { + struct wr_prd_match *m = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, rdguid); + while (wr->state == WRST_OPERATIONAL && m && ref_seq > m->seq) + { + if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) + break; + m = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, rdguid); + } + rc = (m == NULL || ref_seq <= m->seq) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; + } ddsrt_mutex_unlock (&wr->e.lock); return rc; }