Implement dds_wait_for_acks (writer only)

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-01-15 12:42:08 +01:00 committed by eboasson
parent 60e51479c1
commit 46e0c6dc94
6 changed files with 55 additions and 22 deletions

View file

@ -23,6 +23,7 @@ DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER)
struct status_cb_data; struct status_cb_data;
void dds_writer_status_cb (void *entity, const struct status_cb_data * data); void dds_writer_status_cb (void *entity, const struct status_cb_data * data);
dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -15,6 +15,7 @@
#include "dds__listener.h" #include "dds__listener.h"
#include "dds__participant.h" #include "dds__participant.h"
#include "dds__publisher.h" #include "dds__publisher.h"
#include "dds__writer.h"
#include "dds__qos.h" #include "dds__qos.h"
#include "dds/ddsi/ddsi_iid.h" #include "dds/ddsi/ddsi_iid.h"
#include "dds/ddsi/q_entity.h" #include "dds/ddsi/q_entity.h"
@ -94,10 +95,33 @@ dds_return_t dds_resume (dds_entity_t publisher)
dds_return_t dds_wait_for_acks (dds_entity_t publisher_or_writer, dds_duration_t timeout) dds_return_t dds_wait_for_acks (dds_entity_t publisher_or_writer, dds_duration_t timeout)
{ {
dds_return_t ret;
dds_entity *p_or_w_ent;
if (timeout < 0) if (timeout < 0)
return DDS_RETCODE_BAD_PARAMETER; return DDS_RETCODE_BAD_PARAMETER;
static const dds_entity_kind_t kinds[] = { DDS_KIND_WRITER, DDS_KIND_PUBLISHER };
return dds_generic_unimplemented_operation_manykinds (publisher_or_writer, sizeof (kinds) / sizeof (kinds[0]), kinds); if ((ret = dds_entity_pin (publisher_or_writer, &p_or_w_ent)) < 0)
return ret;
const dds_time_t tnow = dds_time ();
const dds_time_t abstimeout = (DDS_INFINITY - timeout <= tnow) ? DDS_NEVER : (tnow + timeout);
switch (dds_entity_kind (p_or_w_ent))
{
case DDS_KIND_PUBLISHER:
/* FIXME: wait_for_acks on all writers of the same publisher */
dds_entity_unpin (p_or_w_ent);
return DDS_RETCODE_UNSUPPORTED;
case DDS_KIND_WRITER:
ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, abstimeout);
dds_entity_unpin (p_or_w_ent);
return ret;
default:
dds_entity_unpin (p_or_w_ent);
return DDS_RETCODE_ILLEGAL_OPERATION;
}
} }
dds_return_t dds_publisher_begin_coherent (dds_entity_t publisher) dds_return_t dds_publisher_begin_coherent (dds_entity_t publisher)

View file

@ -379,6 +379,16 @@ dds_entity_t dds_get_publisher (dds_entity_t writer)
} }
} }
dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout)
{
/* during lifetime of the writer m_wr is constant, it is only during deletion that it
gets erased at some point */
if (wr->m_wr == NULL)
return DDS_RETCODE_OK;
else
return writer_wait_for_acks (wr->m_wr, abstimeout);
}
DDS_GET_STATUS(writer, publication_matched, PUBLICATION_MATCHED, total_count_change, current_count_change) DDS_GET_STATUS(writer, publication_matched, PUBLICATION_MATCHED, total_count_change, current_count_change)
DDS_GET_STATUS(writer, liveliness_lost, LIVELINESS_LOST, total_count_change) DDS_GET_STATUS(writer, liveliness_lost, LIVELINESS_LOST, total_count_change)
DDS_GET_STATUS(writer, offered_deadline_missed, OFFERED_DEADLINE_MISSED, total_count_change) DDS_GET_STATUS(writer, offered_deadline_missed, OFFERED_DEADLINE_MISSED, total_count_change)

View file

@ -84,21 +84,6 @@ CU_Test(ddsc_unsupported, dds_begin_end_coherent, .init = setup, .fini = teardow
} }
} }
CU_Test(ddsc_unsupported, dds_wait_for_acks, .init = setup, .fini = teardown)
{
dds_return_t result;
static struct index_result pars[] = {
{PUB, DDS_RETCODE_UNSUPPORTED},
{WRI, DDS_RETCODE_UNSUPPORTED},
{BAD, DDS_RETCODE_BAD_PARAMETER}
};
for (size_t i=0; i < sizeof (pars) / sizeof (pars[0]);i++) {
result = dds_wait_for_acks(e[pars[i].index], 0);
CU_ASSERT_EQUAL(result, pars[i].exp_res);
}
}
CU_Test(ddsc_unsupported, dds_suspend_resume, .init = setup, .fini = teardown) CU_Test(ddsc_unsupported, dds_suspend_resume, .init = setup, .fini = teardown)
{ {
dds_return_t result; dds_return_t result;

View file

@ -240,7 +240,7 @@ struct writer
struct endpoint_common c; struct endpoint_common c;
status_cb_t status_cb; status_cb_t status_cb;
void * status_cb_entity; void * status_cb_entity;
ddsrt_cond_t throttle_cond; /* used to trigger a transmit thread blocked in throttle_writer() */ ddsrt_cond_t throttle_cond; /* used to trigger a transmit thread blocked in throttle_writer() or wait_for_acks() */
seqno_t seq; /* last sequence number (transmitted seqs are 1 ... seq) */ seqno_t seq; /* last sequence number (transmitted seqs are 1 ... seq) */
seqno_t cs_seq; /* 1st seq in coherent set (or 0) */ seqno_t cs_seq; /* 1st seq in coherent set (or 0) */
seq_xmit_t seq_xmit; /* last sequence number actually transmitted */ seq_xmit_t seq_xmit; /* last sequence number actually transmitted */
@ -605,6 +605,7 @@ seqno_t writer_max_drop_seq (const struct writer *wr);
int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst); int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst);
void writer_set_retransmitting (struct writer *wr); void writer_set_retransmitting (struct writer *wr);
void writer_clear_retransmitting (struct writer *wr); void writer_clear_retransmitting (struct writer *wr);
dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout);
dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_guid *guid); dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_guid *guid);
dds_return_t delete_writer (struct q_globals *gv, const struct ddsi_guid *guid); dds_return_t delete_writer (struct q_globals *gv, const struct ddsi_guid *guid);

View file

@ -2690,10 +2690,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru
assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); assert (wr->e.guid.entityid.u != NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
ASSERT_MUTEX_HELD (&wr->e.lock); ASSERT_MUTEX_HELD (&wr->e.lock);
n = whc_remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list); n = whc_remove_acked_messages (wr->whc, writer_max_drop_seq (wr), whcst, deferred_free_list);
/* when transitioning from >= low-water to < low-water, signal /* trigger anyone waiting in throttle_writer() or wait_for_acks() */
anyone waiting in throttle_writer() */ ddsrt_cond_broadcast (&wr->throttle_cond);
if (wr->throttling && whcst->unacked_bytes <= wr->whc_low)
ddsrt_cond_broadcast (&wr->throttle_cond);
if (wr->retransmitting && whcst->unacked_bytes == 0) if (wr->retransmitting && whcst->unacked_bytes == 0)
writer_clear_retransmitting (wr); writer_clear_retransmitting (wr);
if (wr->state == WRST_LINGERING && whcst->unacked_bytes == 0) if (wr->state == WRST_LINGERING && whcst->unacked_bytes == 0)
@ -3092,6 +3090,20 @@ dds_return_t unblock_throttled_writer (struct q_globals *gv, const struct ddsi_g
return 0; return 0;
} }
dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout)
{
dds_return_t rc;
seqno_t ref_seq;
ddsrt_mutex_lock (&wr->e.lock);
ref_seq = wr->seq;
while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr))
if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout))
break;
rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
ddsrt_mutex_unlock (&wr->e.lock);
return rc;
}
dds_return_t delete_writer_nolinger_locked (struct writer *wr) dds_return_t delete_writer_nolinger_locked (struct writer *wr)
{ {
ELOGDISC (wr, "delete_writer_nolinger(guid "PGUIDFMT") ...\n", PGUID (wr->e.guid)); ELOGDISC (wr, "delete_writer_nolinger(guid "PGUIDFMT") ...\n", PGUID (wr->e.guid));