diff --git a/src/core/ddsc/include/dds/dds.h b/src/core/ddsc/include/dds/dds.h index e62be5e..8383b06 100644 --- a/src/core/ddsc/include/dds/dds.h +++ b/src/core/ddsc/include/dds/dds.h @@ -2756,6 +2756,53 @@ dds_take_mask_wl( uint32_t maxs, uint32_t mask); +#define DDS_HAS_READCDR 1 +/** + * @brief Access the collection of serialized data values (of same type) and + * sample info from the data reader, readcondition or querycondition. + * + * This call accesses the serialized data from the data reader, readcondition or + * querycondition and makes it available to the application. The serialized data + * is made available through \ref ddsi_serdata structures. Returned samples are + * marked as READ. + * + * Return value provides information about the number of samples read, which will + * be <= maxs. Based on the count, the buffer will contain serialized data to be + * read only when valid_data bit in sample info structure is set. + * The buffer required for data values, could be allocated explicitly or can + * use the memory from data reader to prevent copy. In the latter case, buffer and + * sample_info should be returned back, once it is no longer using the data. + * + * @param[in] reader_or_condition Reader, readcondition or querycondition entity. + * @param[out] buf An array of pointers to \ref ddsi_serdata structures that contain + * the serialized data. The pointers can be NULL. + * @param[in] maxs Maximum number of samples to read. + * @param[out] si Pointer to an array of \ref dds_sample_info_t returned for each data value. + * @param[in] mask Filter the data based on dds_sample_state_t|dds_view_state_t|dds_instance_state_t. + * + * @returns A dds_return_t with the number of samples read or an error code. + * + * @retval >=0 + * Number of samples read. + * @retval DDS_RETCODE_ERROR + * An internal error has occurred. + * @retval DDS_RETCODE_BAD_PARAMETER + * One of the given arguments is not valid. + * @retval DDS_RETCODE_ILLEGAL_OPERATION + * The operation is invoked on an inappropriate object. + * @retval DDS_RETCODE_ALREADY_DELETED + * The entity has already been deleted. + * @retval DDS_RETCODE_PRECONDITION_NOT_MET + * The precondition for this operation is not met. + */ +DDS_EXPORT dds_return_t +dds_readcdr( + dds_entity_t reader_or_condition, + struct ddsi_serdata **buf, + uint32_t maxs, + dds_sample_info_t *si, + uint32_t mask); + /** * @brief Access the collection of serialized data values (of same type) and * sample info from the data reader, readcondition or querycondition. diff --git a/src/core/ddsc/include/dds/ddsc/dds_rhc.h b/src/core/ddsc/include/dds/ddsc/dds_rhc.h index b5cba34..e3bf4d6 100644 --- a/src/core/ddsc/include/dds/ddsc/dds_rhc.h +++ b/src/core/ddsc/include/dds/ddsc/dds_rhc.h @@ -41,6 +41,7 @@ struct dds_rhc_ops { struct ddsi_rhc_ops rhc_ops; dds_rhc_read_take_t read; dds_rhc_read_take_t take; + dds_rhc_read_take_cdr_t readcdr; dds_rhc_read_take_cdr_t takecdr; dds_rhc_add_readcondition_t add_readcondition; dds_rhc_remove_readcondition_t remove_readcondition; @@ -81,6 +82,9 @@ DDS_EXPORT inline int32_t dds_rhc_read (struct dds_rhc *rhc, bool lock, void **v DDS_EXPORT inline int32_t dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) { return rhc->common.ops->take (rhc, lock, values, info_seq, max_samples, mask, handle, cond); } +DDS_EXPORT inline int32_t dds_rhc_readcdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) { + return rhc->common.ops->readcdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle); +} DDS_EXPORT inline int32_t dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) { return rhc->common.ops->takecdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle); } diff --git a/src/core/ddsc/src/dds_read.c b/src/core/ddsc/src/dds_read.c index fd9ae5b..a5c932d 100644 --- a/src/core/ddsc/src/dds_read.c +++ b/src/core/ddsc/src/dds_read.c @@ -171,7 +171,11 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER); dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); - ret = dds_rhc_takecdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand); + if (take) + ret = dds_rhc_takecdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand); + else + ret = dds_rhc_readcdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand); + dds_entity_unpin (entity); thread_state_asleep (ts1); return ret; @@ -225,6 +229,18 @@ dds_return_t dds_read_mask_wl (dds_entity_t rd_or_cnd, void **buf, dds_sample_in return dds_read_impl (false, rd_or_cnd, buf, maxs, maxs, si, mask, DDS_HANDLE_NIL, lock, false); } +dds_return_t dds_readcdr (dds_entity_t rd_or_cnd, struct ddsi_serdata **buf, uint32_t maxs, dds_sample_info_t *si, uint32_t mask) +{ + bool lock = true; + if (maxs == DDS_READ_WITHOUT_LOCK) + { + lock = false; + /* FIXME: Fix the interface. */ + maxs = 100; + } + return dds_readcdr_impl (false, rd_or_cnd, buf, maxs, si, mask, DDS_HANDLE_NIL, lock); +} + dds_return_t dds_read_instance (dds_entity_t rd_or_cnd, void **buf, dds_sample_info_t *si, size_t bufsz, uint32_t maxs, dds_instance_handle_t handle) { bool lock = true; diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index f159422..7db2ff0 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -22,6 +22,7 @@ extern inline void dds_rhc_set_qos (struct dds_rhc *rhc, const struct dds_qos *q extern inline void dds_rhc_free (struct dds_rhc *rhc); extern inline int dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); extern inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); +extern inline int dds_rhc_readcdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); extern inline int dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); extern inline bool dds_rhc_add_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond); extern inline void dds_rhc_remove_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond); diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index 07692e7..d5754f3 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -370,6 +370,7 @@ static void dds_rhc_default_relinquish_ownership (struct dds_rhc_default * __res static void dds_rhc_default_set_qos (struct dds_rhc_default *rhc, const struct dds_qos *qos); static int32_t dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond); static int32_t dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond); +static int32_t dds_rhc_default_readcdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond); static void dds_rhc_default_remove_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond); @@ -396,6 +397,9 @@ static int32_t dds_rhc_default_read_wrap (struct dds_rhc *rhc, bool lock, void * static int32_t dds_rhc_default_take_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) { return dds_rhc_default_take ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, mask, handle, cond); } +static int32_t dds_rhc_default_readcdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) { + return dds_rhc_default_readcdr ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle); +} static int32_t dds_rhc_default_takecdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) { return dds_rhc_default_takecdr ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle); } @@ -425,6 +429,7 @@ static const struct dds_rhc_ops dds_rhc_default_ops = { }, .read = dds_rhc_default_read_wrap, .take = dds_rhc_default_take_wrap, + .readcdr = dds_rhc_default_readcdr_wrap, .takecdr = dds_rhc_default_takecdr_wrap, .add_readcondition = dds_rhc_default_add_readcondition_wrap, .remove_readcondition = dds_rhc_default_remove_readcondition_wrap, @@ -2331,6 +2336,13 @@ static int32_t dds_rhc_take_w_qminv (struct dds_rhc_default *rhc, bool lock, voi return take_w_qminv (rhc, lock, values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample, read_take_to_invsample); } +static int32_t dds_rhc_readcdr_w_qminv (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond) +{ + DDSRT_STATIC_ASSERT (sizeof (void *) == sizeof (struct ddsi_serdata *)); + assert (max_samples <= INT32_MAX); + return read_w_qminv (rhc, lock, (void **) values, info_seq, (int32_t) max_samples, qminv, handle, cond, read_take_to_sample_ref, read_take_to_invsample_ref); +} + static int32_t dds_rhc_takecdr_w_qminv (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t qminv, dds_instance_handle_t handle, dds_readcond *cond) { DDSRT_STATIC_ASSERT (sizeof (void *) == sizeof (struct ddsi_serdata *)); @@ -2727,6 +2739,12 @@ static int32_t dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, voi return dds_rhc_take_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, cond); } +static int32_t dds_rhc_default_readcdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) +{ + uint32_t qminv = qmask_from_dcpsquery (sample_states, view_states, instance_states); + return dds_rhc_readcdr_w_qminv (rhc, lock, values, info_seq, max_samples, qminv, handle, NULL); +} + static int32_t dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) { uint32_t qminv = qmask_from_dcpsquery (sample_states, view_states, instance_states);