diff --git a/src/core/ddsc/src/dds__writer.h b/src/core/ddsc/src/dds__writer.h index 0f1b7ef..69ff7e1 100644 --- a/src/core/ddsc/src/dds__writer.h +++ b/src/core/ddsc/src/dds__writer.h @@ -23,6 +23,7 @@ DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER) struct status_cb_data; void dds_writer_status_cb (void *entity, const struct status_cb_data * data); +dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout); #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index ae20658..b6f0792 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -15,6 +15,7 @@ #include "dds__listener.h" #include "dds__participant.h" #include "dds__publisher.h" +#include "dds__writer.h" #include "dds__qos.h" #include "dds/ddsi/ddsi_iid.h" #include "dds/ddsi/q_entity.h" @@ -94,10 +95,33 @@ dds_return_t dds_resume (dds_entity_t publisher) dds_return_t dds_wait_for_acks (dds_entity_t publisher_or_writer, dds_duration_t timeout) { + dds_return_t ret; + dds_entity *p_or_w_ent; + if (timeout < 0) return DDS_RETCODE_BAD_PARAMETER; - static const dds_entity_kind_t kinds[] = { DDS_KIND_WRITER, DDS_KIND_PUBLISHER }; - return dds_generic_unimplemented_operation_manykinds (publisher_or_writer, sizeof (kinds) / sizeof (kinds[0]), kinds); + + if ((ret = dds_entity_pin (publisher_or_writer, &p_or_w_ent)) < 0) + return ret; + + const dds_time_t tnow = dds_time (); + const dds_time_t abstimeout = (DDS_INFINITY - timeout <= tnow) ? DDS_NEVER : (tnow + timeout); + switch (dds_entity_kind (p_or_w_ent)) + { + case DDS_KIND_PUBLISHER: + /* FIXME: wait_for_acks on all writers of the same publisher */ + dds_entity_unpin (p_or_w_ent); + return DDS_RETCODE_UNSUPPORTED; + + case DDS_KIND_WRITER: + ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, abstimeout); + dds_entity_unpin (p_or_w_ent); + return ret; + + default: + dds_entity_unpin (p_or_w_ent); + return DDS_RETCODE_ILLEGAL_OPERATION; + } } dds_return_t dds_publisher_begin_coherent (dds_entity_t publisher) diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 1e023c1..163d4e1 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -379,6 +379,16 @@ dds_entity_t dds_get_publisher (dds_entity_t writer) } } +dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout) +{ + /* during lifetime of the writer m_wr is constant, it is only during deletion that it + gets erased at some point */ + if (wr->m_wr == NULL) + return DDS_RETCODE_OK; + else + return writer_wait_for_acks (wr->m_wr, abstimeout); +} + DDS_GET_STATUS(writer, publication_matched, PUBLICATION_MATCHED, total_count_change, current_count_change) DDS_GET_STATUS(writer, liveliness_lost, LIVELINESS_LOST, total_count_change) DDS_GET_STATUS(writer, offered_deadline_missed, OFFERED_DEADLINE_MISSED, total_count_change) diff --git a/src/core/ddsc/tests/unsupported.c b/src/core/ddsc/tests/unsupported.c index b7e63cb..9c538ce 100644 --- a/src/core/ddsc/tests/unsupported.c +++ b/src/core/ddsc/tests/unsupported.c @@ -84,21 +84,6 @@ CU_Test(ddsc_unsupported, dds_begin_end_coherent, .init = setup, .fini = teardow } } -CU_Test(ddsc_unsupported, dds_wait_for_acks, .init = setup, .fini = teardown) -{ - dds_return_t result; - static struct index_result pars[] = { - {PUB, DDS_RETCODE_UNSUPPORTED}, - {WRI, DDS_RETCODE_UNSUPPORTED}, - {BAD, DDS_RETCODE_BAD_PARAMETER} - }; - - for (size_t i=0; i < sizeof (pars) / sizeof (pars[0]);i++) { - result = dds_wait_for_acks(e[pars[i].index], 0); - CU_ASSERT_EQUAL(result, pars[i].exp_res); - } -} - CU_Test(ddsc_unsupported, dds_suspend_resume, .init = setup, .fini = teardown) { dds_return_t result; diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 778d5c0..f177a83 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -240,7 +240,7 @@ struct writer struct endpoint_common c; status_cb_t status_cb; void * status_cb_entity; - ddsrt_cond_t throttle_cond; /* used to trigger a transmit thread blocked in throttle_writer() */ + ddsrt_cond_t throttle_cond; /* used to trigger a transmit thread blocked in throttle_writer() or wait_for_acks() */ seqno_t seq; /* last sequence number (transmitted seqs are 1 ... seq) */ seqno_t cs_seq; /* 1st seq in coherent set (or 0) */ seq_xmit_t seq_xmit; /* last sequence number actually transmitted */ @@ -605,6 +605,7 @@ seqno_t writer_max_drop_seq (const struct writer *wr); int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst); void writer_set_retransmitting (struct writer *wr); void writer_clear_retransmitting (struct writer *wr); +dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout); dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_guid *guid); dds_return_t delete_writer (struct q_globals *gv, const struct ddsi_guid *guid); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index a4380c8..1bed076 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -2690,10 +2690,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); ASSERT_MUTEX_HELD (&wr->e.lock); n = whc_remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list); - /* when transitioning from >= low-water to < low-water, signal - anyone waiting in throttle_writer() */ - if (wr->throttling && whcst->unacked_bytes <= wr->whc_low) - ddsrt_cond_broadcast (&wr->throttle_cond); + /* trigger anyone waiting in throttle_writer() or wait_for_acks() */ + ddsrt_cond_broadcast (&wr->throttle_cond); if (wr->retransmitting && whcst->unacked_bytes == 0) writer_clear_retransmitting (wr); if (wr->state == WRST_LINGERING && whcst->unacked_bytes == 0) @@ -3092,6 +3090,20 @@ dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_g return 0; } +dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout) +{ + dds_return_t rc; + seqno_t ref_seq; + ddsrt_mutex_lock (&wr->e.lock); + ref_seq = wr->seq; + while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr)) + if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) + break; + rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; + ddsrt_mutex_unlock (&wr->e.lock); + return rc; +} + dds_return_t delete_writer_nolinger_locked (struct writer *wr) { ELOGDISC (wr, "delete_writer_nolinger(guid "PGUIDFMT") ...\n", PGUID (wr->e.guid));