diff --git a/src/core/ddsc/src/dds__writer.h b/src/core/ddsc/src/dds__writer.h index 69ff7e1..111e93d 100644 --- a/src/core/ddsc/src/dds__writer.h +++ b/src/core/ddsc/src/dds__writer.h @@ -23,7 +23,7 @@ DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER) struct status_cb_data; void dds_writer_status_cb (void *entity, const struct status_cb_data * data); -dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout); +DDS_EXPORT dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index 00ea520..ca17999 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -114,7 +114,7 @@ dds_return_t dds_wait_for_acks (dds_entity_t publisher_or_writer, dds_duration_t return DDS_RETCODE_UNSUPPORTED; case DDS_KIND_WRITER: - ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, abstimeout); + ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, NULL, abstimeout); dds_entity_unpin (p_or_w_ent); return ret; diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index bebebcc..cad0492 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -410,14 +410,14 @@ dds_entity_t dds_get_publisher (dds_entity_t writer) } } -dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout) +dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout) { /* during lifetime of the writer m_wr is constant, it is only during deletion that it gets erased at some point */ if (wr->m_wr == NULL) return DDS_RETCODE_OK; else - return writer_wait_for_acks (wr->m_wr, abstimeout); + return writer_wait_for_acks (wr->m_wr, rdguid, abstimeout); } DDS_GET_STATUS(writer, publication_matched, PUBLICATION_MATCHED, total_count_change, current_count_change) diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 14f59f8..289ab84 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -655,7 +655,7 @@ seqno_t writer_max_drop_seq (const struct writer *wr); int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst); void writer_set_retransmitting (struct writer *wr); void writer_clear_retransmitting (struct writer *wr); -dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout); +dds_return_t writer_wait_for_acks (struct writer *wr, const ddsi_guid_t *rdguid, dds_time_t abstimeout); dds_return_t unblock_throttled_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid); dds_return_t delete_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 4fb30cd..9a8b5ef 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -3969,16 +3969,30 @@ dds_return_t unblock_throttled_writer (struct ddsi_domaingv *gv, const struct dd return 0; } -dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout) +dds_return_t writer_wait_for_acks (struct writer *wr, const ddsi_guid_t *rdguid, dds_time_t abstimeout) { dds_return_t rc; seqno_t ref_seq; ddsrt_mutex_lock (&wr->e.lock); ref_seq = wr->seq; - while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr)) - if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) - break; - rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; + if (rdguid == NULL) + { + while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr)) + if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) + break; + rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; + } + else + { + struct wr_prd_match *m = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, rdguid); + while (wr->state == WRST_OPERATIONAL && m && ref_seq > m->seq) + { + if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) + break; + m = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, rdguid); + } + rc = (m == NULL || ref_seq <= m->seq) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; + } ddsrt_mutex_unlock (&wr->e.lock); return rc; }