Add a dds_readcdr analogous to dds_takecdr

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2020-04-19 17:38:25 +02:00 committed by eboasson
parent 9aef05542f
commit 0006e09566
5 changed files with 87 additions and 1 deletions

View file

@ -2756,6 +2756,53 @@ dds_take_mask_wl(
uint32_t maxs, uint32_t maxs,
uint32_t mask); uint32_t mask);
#define DDS_HAS_READCDR 1
/**
* @brief Access the collection of serialized data values (of same type) and
* sample info from the data reader, readcondition or querycondition.
*
* This call accesses the serialized data from the data reader, readcondition or
* querycondition and makes it available to the application. The serialized data
* is made available through \ref ddsi_serdata structures. Returned samples are
* marked as READ.
*
* Return value provides information about the number of samples read, which will
* be <= maxs. Based on the count, the buffer will contain serialized data to be
* read only when valid_data bit in sample info structure is set.
* The buffer required for data values, could be allocated explicitly or can
* use the memory from data reader to prevent copy. In the latter case, buffer and
* sample_info should be returned back, once it is no longer using the data.
*
* @param[in] reader_or_condition Reader, readcondition or querycondition entity.
* @param[out] buf An array of pointers to \ref ddsi_serdata structures that contain
* the serialized data. The pointers can be NULL.
* @param[in] maxs Maximum number of samples to read.
* @param[out] si Pointer to an array of \ref dds_sample_info_t returned for each data value.
* @param[in] mask Filter the data based on dds_sample_state_t|dds_view_state_t|dds_instance_state_t.
*
* @returns A dds_return_t with the number of samples read or an error code.
*
* @retval >=0
* Number of samples read.
* @retval DDS_RETCODE_ERROR
* An internal error has occurred.
* @retval DDS_RETCODE_BAD_PARAMETER
* One of the given arguments is not valid.
* @retval DDS_RETCODE_ILLEGAL_OPERATION
* The operation is invoked on an inappropriate object.
* @retval DDS_RETCODE_ALREADY_DELETED
* The entity has already been deleted.
* @retval DDS_RETCODE_PRECONDITION_NOT_MET
* The precondition for this operation is not met.
*/
DDS_EXPORT dds_return_t
dds_readcdr(
dds_entity_t reader_or_condition,
struct ddsi_serdata **buf,
uint32_t maxs,
dds_sample_info_t *si,
uint32_t mask);
/** /**
* @brief Access the collection of serialized data values (of same type) and * @brief Access the collection of serialized data values (of same type) and
* sample info from the data reader, readcondition or querycondition. * sample info from the data reader, readcondition or querycondition.

View file

@ -41,6 +41,7 @@ struct dds_rhc_ops {
struct ddsi_rhc_ops rhc_ops; struct ddsi_rhc_ops rhc_ops;
dds_rhc_read_take_t read; dds_rhc_read_take_t read;
dds_rhc_read_take_t take; dds_rhc_read_take_t take;
dds_rhc_read_take_cdr_t readcdr;
dds_rhc_read_take_cdr_t takecdr; dds_rhc_read_take_cdr_t takecdr;
dds_rhc_add_readcondition_t add_readcondition; dds_rhc_add_readcondition_t add_readcondition;
dds_rhc_remove_readcondition_t remove_readcondition; dds_rhc_remove_readcondition_t remove_readcondition;
@ -81,6 +82,9 @@ DDS_EXPORT inline int32_t dds_rhc_read (struct dds_rhc *rhc, bool lock, void **v
DDS_EXPORT inline int32_t dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) { DDS_EXPORT inline int32_t dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) {
return rhc->common.ops->take (rhc, lock, values, info_seq, max_samples, mask, handle, cond); return rhc->common.ops->take (rhc, lock, values, info_seq, max_samples, mask, handle, cond);
} }
DDS_EXPORT inline int32_t dds_rhc_readcdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
return rhc->common.ops->readcdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
}
DDS_EXPORT inline int32_t dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) { DDS_EXPORT inline int32_t dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
return rhc->common.ops->takecdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle); return rhc->common.ops->takecdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
} }

View file

@ -171,7 +171,11 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER); assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER);
dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
ret = dds_rhc_takecdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand); if (take)
ret = dds_rhc_takecdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand);
else
ret = dds_rhc_readcdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand);
dds_entity_unpin (entity); dds_entity_unpin (entity);
thread_state_asleep (ts1); thread_state_asleep (ts1);
return ret; return ret;
@ -225,6 +229,18 @@ dds_return_t dds_read_mask_wl (dds_entity_t rd_or_cnd, void **buf, dds_sample_in
return dds_read_impl (false, rd_or_cnd, buf, maxs, maxs, si, mask, DDS_HANDLE_NIL, lock, false); return dds_read_impl (false, rd_or_cnd, buf, maxs, maxs, si, mask, DDS_HANDLE_NIL, lock, false);
} }
dds_return_t dds_readcdr (dds_entity_t rd_or_cnd, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, uint32_t mask)
{
bool lock = true;
if (maxs == DDS_READ_WITHOUT_LOCK)
{
lock = false;
/* FIXME: Fix the interface. */
maxs = 100;
}
return dds_readcdr_impl (false, rd_or_cnd, buf, maxs, si, mask, DDS_HANDLE_NIL, lock);
}
dds_return_t dds_read_instance (dds_entity_t rd_or_cnd, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, dds_instance_handle_t handle) dds_return_t dds_read_instance (dds_entity_t rd_or_cnd, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, dds_instance_handle_t handle)
{ {
bool lock = true; bool lock = true;

View file

@ -22,6 +22,7 @@ extern inline void dds_rhc_set_qos (struct dds_rhc *rhc, const struct dds_qos *q
extern inline void dds_rhc_free (struct dds_rhc *rhc); extern inline void dds_rhc_free (struct dds_rhc *rhc);
extern inline int dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); extern inline int dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
extern inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); extern inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
extern inline int dds_rhc_readcdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
extern inline int dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); extern inline int dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
extern inline bool dds_rhc_add_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond); extern inline bool dds_rhc_add_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond);
extern inline void dds_rhc_remove_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond); extern inline void dds_rhc_remove_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond);

View file

@ -370,6 +370,7 @@ static void dds_rhc_default_relinquish_ownership (struct dds_rhc_default * __res
static void dds_rhc_default_set_qos (struct dds_rhc_default *rhc, const struct dds_qos *qos); static void dds_rhc_default_set_qos (struct dds_rhc_default *rhc, const struct dds_qos *qos);
static int32_t dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond); static int32_t dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
static int32_t dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond); static int32_t dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
static int32_t dds_rhc_default_readcdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond); static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond);
static void dds_rhc_default_remove_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond); static void dds_rhc_default_remove_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond);
@ -396,6 +397,9 @@ static int32_t dds_rhc_default_read_wrap (struct dds_rhc *rhc, bool lock, void *
static int32_t dds_rhc_default_take_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) { static int32_t dds_rhc_default_take_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) {
return dds_rhc_default_take ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, mask, handle, cond); return dds_rhc_default_take ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, mask, handle, cond);
} }
static int32_t dds_rhc_default_readcdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
return dds_rhc_default_readcdr ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
}
static int32_t dds_rhc_default_takecdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) { static int32_t dds_rhc_default_takecdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
return dds_rhc_default_takecdr ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle); return dds_rhc_default_takecdr ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
} }
@ -425,6 +429,7 @@ static const struct dds_rhc_ops dds_rhc_default_ops = {
}, },
.read = dds_rhc_default_read_wrap, .read = dds_rhc_default_read_wrap,
.take = dds_rhc_default_take_wrap, .take = dds_rhc_default_take_wrap,
.readcdr = dds_rhc_default_readcdr_wrap,
.takecdr = dds_rhc_default_takecdr_wrap, .takecdr = dds_rhc_default_takecdr_wrap,
.add_readcondition = dds_rhc_default_add_readcondition_wrap, .add_readcondition = dds_rhc_default_add_readcondition_wrap,
.remove_readcondition = dds_rhc_default_remove_readcondition_wrap, .remove_readcondition = dds_rhc_default_remove_readcondition_wrap,
@ -2331,6 +2336,13 @@ static int32_t dds_rhc_take_w_qminv (struct dds_rhc_default *rhc, bool lock, voi
return take_w_qminv (rhc, lock, values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample, read_take_to_invsample); return take_w_qminv (rhc, lock, values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample, read_take_to_invsample);
} }
static int32_t dds_rhc_readcdr_w_qminv (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond)
{
DDSRT_STATIC_ASSERT (sizeof (void *) == sizeof (struct ddsi_serdata *));
assert (max_samples <= INT32_MAX);
return read_w_qminv (rhc, lock, (void **) values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample_ref, read_take_to_invsample_ref);
}
static int32_t dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond) static int32_t dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond)
{ {
DDSRT_STATIC_ASSERT (sizeof (void *) == sizeof (struct ddsi_serdata *)); DDSRT_STATIC_ASSERT (sizeof (void *) == sizeof (struct ddsi_serdata *));
@ -2727,6 +2739,12 @@ static int32_t dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, voi
return dds_rhc_take_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, cond); return dds_rhc_take_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, cond);
} }
static int32_t dds_rhc_default_readcdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle)
{
uint32_t qminv = qmask_from_dcpsquery (sample_states, view_states, instance_states);
return dds_rhc_readcdr_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, NULL);
}
static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle)
{ {
uint32_t qminv = qmask_from_dcpsquery (sample_states, view_states, instance_states); uint32_t qminv = qmask_from_dcpsquery (sample_states, view_states, instance_states);