Make wait_for_acks implementation capable of waiting for one reader

The dds_wait_for_acks function follows the DCPS specification and allows
waiting for all matching readers to have acknowledged all data written
prior to that point.  This commit leaves the API unchanged but extends
the implementation to make it possible to wait until a specific reader
has acknowledged everything, as this is a useful device in testing with
deliberate one-way disconnections using dds_domain_set_deafmute.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-05-12 09:07:08 +02:00 committed by eboasson
parent 8fea8d5673
commit fe81a6bda5
5 changed files with 24 additions and 10 deletions

View file

@ -23,7 +23,7 @@ DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER)
struct status_cb_data; struct status_cb_data;
void dds_writer_status_cb (void *entity, const struct status_cb_data * data); void dds_writer_status_cb (void *entity, const struct status_cb_data * data);
dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout); DDS_EXPORT dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -114,7 +114,7 @@ dds_return_t dds_wait_for_acks (dds_entity_t publisher_or_writer, dds_duration_t
return DDS_RETCODE_UNSUPPORTED; return DDS_RETCODE_UNSUPPORTED;
case DDS_KIND_WRITER: case DDS_KIND_WRITER:
ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, abstimeout); ret = dds__writer_wait_for_acks ((struct dds_writer *) p_or_w_ent, NULL, abstimeout);
dds_entity_unpin (p_or_w_ent); dds_entity_unpin (p_or_w_ent);
return ret; return ret;

View file

@ -410,14 +410,14 @@ dds_entity_t dds_get_publisher (dds_entity_t writer)
} }
} }
dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, dds_time_t abstimeout) dds_return_t dds__writer_wait_for_acks (struct dds_writer *wr, ddsi_guid_t *rdguid, dds_time_t abstimeout)
{ {
/* during lifetime of the writer m_wr is constant, it is only during deletion that it /* during lifetime of the writer m_wr is constant, it is only during deletion that it
gets erased at some point */ gets erased at some point */
if (wr->m_wr == NULL) if (wr->m_wr == NULL)
return DDS_RETCODE_OK; return DDS_RETCODE_OK;
else else
return writer_wait_for_acks (wr->m_wr, abstimeout); return writer_wait_for_acks (wr->m_wr, rdguid, abstimeout);
} }
DDS_GET_STATUS(writer, publication_matched, PUBLICATION_MATCHED, total_count_change, current_count_change) DDS_GET_STATUS(writer, publication_matched, PUBLICATION_MATCHED, total_count_change, current_count_change)

View file

@ -655,7 +655,7 @@ seqno_t writer_max_drop_seq (const struct writer *wr);
int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst); int writer_must_have_hb_scheduled (const struct writer *wr, const struct whc_state *whcst);
void writer_set_retransmitting (struct writer *wr); void writer_set_retransmitting (struct writer *wr);
void writer_clear_retransmitting (struct writer *wr); void writer_clear_retransmitting (struct writer *wr);
dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout); dds_return_t writer_wait_for_acks (struct writer *wr, const ddsi_guid_t *rdguid, dds_time_t abstimeout);
dds_return_t unblock_throttled_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid); dds_return_t unblock_throttled_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid);
dds_return_t delete_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid); dds_return_t delete_writer (struct ddsi_domaingv *gv, const struct ddsi_guid *guid);

View file

@ -3969,16 +3969,30 @@ dds_return_t unblock_throttled_writer (struct ddsi_domaingv *gv, const struct dd
return 0; return 0;
} }
dds_return_t writer_wait_for_acks (struct writer *wr, dds_time_t abstimeout) dds_return_t writer_wait_for_acks (struct writer *wr, const ddsi_guid_t *rdguid, dds_time_t abstimeout)
{ {
dds_return_t rc; dds_return_t rc;
seqno_t ref_seq; seqno_t ref_seq;
ddsrt_mutex_lock (&wr->e.lock); ddsrt_mutex_lock (&wr->e.lock);
ref_seq = wr->seq; ref_seq = wr->seq;
while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr)) if (rdguid == NULL)
if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout)) {
break; while (wr->state == WRST_OPERATIONAL && ref_seq > writer_max_drop_seq (wr))
rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT; if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout))
break;
rc = (ref_seq <= writer_max_drop_seq (wr)) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
}
else
{
struct wr_prd_match *m = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, rdguid);
while (wr->state == WRST_OPERATIONAL && m && ref_seq > m->seq)
{
if (!ddsrt_cond_waituntil (&wr->throttle_cond, &wr->e.lock, abstimeout))
break;
m = ddsrt_avl_lookup (&wr_readers_treedef, &wr->readers, rdguid);
}
rc = (m == NULL || ref_seq <= m->seq) ? DDS_RETCODE_OK : DDS_RETCODE_TIMEOUT;
}
ddsrt_mutex_unlock (&wr->e.lock); ddsrt_mutex_unlock (&wr->e.lock);
return rc; return rc;
} }