From fbc05777f3945bdb073bc51b255965346549d16f Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Mon, 2 Sep 2019 09:08:13 +0200 Subject: [PATCH] Make it possible to create reader with custom RHC The default RHC implementation is not always ideal and rather than trying to squeeze everything in a fixed interface it makes more sense to allow the caller to provide an arbitrary implementation of the interface. This is not yet a stable interface. Signed-off-by: Erik Boasson --- src/core/ddsc/include/dds/dds.h | 29 +++++++++++++++++++++++ src/core/ddsc/include/dds/ddsc/dds_rhc.h | 17 +++++++++---- src/core/ddsc/src/dds__reader.h | 2 -- src/core/ddsc/src/dds_reader.c | 23 +++++++++++++++--- src/core/ddsc/src/dds_rhc.c | 1 + src/core/ddsc/src/dds_rhc_default.c | 9 ++++++- src/core/ddsi/include/dds/ddsi/ddsi_rhc.h | 8 +++---- 7 files changed, 75 insertions(+), 14 deletions(-) diff --git a/src/core/ddsc/include/dds/dds.h b/src/core/ddsc/include/dds/dds.h index c5e6d9b..884eb3f 100644 --- a/src/core/ddsc/include/dds/dds.h +++ b/src/core/ddsc/include/dds/dds.h @@ -49,6 +49,7 @@ typedef int32_t dds_entity_t; extern "C" { #endif +struct dds_rhc; struct ddsi_serdata; #define DDS_MIN_PSEUDO_HANDLE ((dds_entity_t) 0x7fff0000) @@ -1195,6 +1196,34 @@ dds_create_reader( const dds_qos_t *qos, const dds_listener_t *listener); +/** + * @brief Creates a new instance of a DDS reader with a custom history cache. + * + * This implicit subscriber will be deleted automatically when the created reader + * is deleted. + * + * @param[in] participant_or_subscriber The participant or subscriber on which the reader is being created. + * @param[in] topic The topic to read. + * @param[in] qos The QoS to set on the new reader (can be NULL). + * @param[in] listener Any listener functions associated with the new reader (can be NULL). + * @param[in] rhc Reader history cache to use, reader becomes the owner + * + * @returns A valid reader handle or an error code. + * + * @retval >0 + * A valid reader handle. + * @retval DDS_RETCODE_ERROR + * An internal error occurred. + */ +/* TODO: Complete list of error codes */ +DDS_EXPORT dds_entity_t +dds_create_reader_rhc( + dds_entity_t participant_or_subscriber, + dds_entity_t topic, + const dds_qos_t *qos, + const dds_listener_t *listener, + struct dds_rhc *rhc); + /** * @brief Wait until reader receives all historic data * diff --git a/src/core/ddsc/include/dds/ddsc/dds_rhc.h b/src/core/ddsc/include/dds/ddsc/dds_rhc.h index 61f9e3e..d0413e6 100644 --- a/src/core/ddsc/include/dds/ddsc/dds_rhc.h +++ b/src/core/ddsc/include/dds/ddsc/dds_rhc.h @@ -23,7 +23,10 @@ extern "C" { struct dds_rhc; struct dds_readcond; +struct dds_reader; +struct ddsi_tkmap; +typedef dds_return_t (*dds_rhc_associate_t) (struct dds_rhc *rhc, struct dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap); typedef int (*dds_rhc_read_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); typedef int (*dds_rhc_take_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); typedef int (*dds_rhc_takecdr_t) (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); @@ -43,6 +46,7 @@ struct dds_rhc_ops { dds_rhc_add_readcondition_t add_readcondition; dds_rhc_remove_readcondition_t remove_readcondition; dds_rhc_lock_samples_t lock_samples; + dds_rhc_associate_t associate; }; struct dds_rhc { @@ -54,11 +58,14 @@ struct dds_rhc { DDSRT_STATIC_ASSERT (offsetof (struct dds_rhc, common.ops) == offsetof (struct ddsi_rhc, ops)); -DDS_EXPORT inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) { - return rhc->common.ops->rhc_ops.store (&rhc->common.rhc, pwr_info, sample, tk); +DDS_EXPORT inline dds_return_t dds_rhc_associate (struct dds_rhc *rhc, struct dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap) { + return rhc->common.ops->associate (rhc, reader, topic, tkmap); } -DDS_EXPORT inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info) { - rhc->common.ops->rhc_ops.unregister_wr (&rhc->common.rhc, pwr_info); +DDS_EXPORT inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) { + return rhc->common.ops->rhc_ops.store (&rhc->common.rhc, wrinfo, sample, tk); +} +DDS_EXPORT inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo) { + rhc->common.ops->rhc_ops.unregister_wr (&rhc->common.rhc, wrinfo); } DDS_EXPORT inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid) { rhc->common.ops->rhc_ops.relinquish_ownership (&rhc->common.rhc, wr_iid); @@ -88,6 +95,8 @@ DDS_EXPORT inline uint32_t dds_rhc_lock_samples (struct dds_rhc *rhc) { return rhc->common.ops->lock_samples (rhc); } +DDS_EXPORT void dds_reader_data_available_cb (struct dds_reader *rd); + #if defined (__cplusplus) } #endif diff --git a/src/core/ddsc/src/dds__reader.h b/src/core/ddsc/src/dds__reader.h index 5c9684b..48b1168 100644 --- a/src/core/ddsc/src/dds__reader.h +++ b/src/core/ddsc/src/dds__reader.h @@ -23,8 +23,6 @@ struct status_cb_data; void dds_reader_status_cb (void *entity, const struct status_cb_data * data); -void dds_reader_data_available_cb (struct dds_reader *entity); - /* dds_reader_lock_samples: Returns number of samples in read cache and locks the reader cache to make sure that the samples content doesn't change. diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 17105f4..b30137d 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -284,7 +284,7 @@ const struct dds_entity_deriver dds_entity_deriver_reader = { .validate_status = dds_reader_status_validate }; -dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener) +static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener, struct dds_rhc *rhc) { dds_qos_t *rqos; dds_subscriber *sub = NULL; @@ -376,12 +376,17 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti goto err_bad_qos; } - /* Create reader and associated read cache */ + /* Create reader and associated read cache (if not provided by caller) */ rd = dds_alloc (sizeof (*rd)); reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, rqos, listener, DDS_READER_STATUS_MASK); rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED; rd->m_topic = tp; - rd->m_rhc = dds_rhc_default_new (rd, tp->m_stopic); + rd->m_rhc = rhc ? rhc : dds_rhc_default_new (rd, tp->m_stopic); + if (dds_rhc_associate (rd->m_rhc, rd, tp->m_stopic, rd->m_entity.m_domain->gv.m_tkmap) < 0) + { + /* FIXME: see also create_querycond, need to be able to undo entity_init */ + abort (); + } dds_entity_add_ref_locked (&tp->m_entity); /* Extra claim of this reader to make sure that the delete waits until DDSI @@ -474,6 +479,18 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb dds_entity_unpin (dds_entity); } +dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener) +{ + return dds_create_reader_int (participant_or_subscriber, topic, qos, listener, NULL); +} + +dds_entity_t dds_create_reader_rhc (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener, struct dds_rhc *rhc) +{ + if (rhc == NULL) + return DDS_RETCODE_BAD_PARAMETER; + return dds_create_reader_int (participant_or_subscriber, topic, qos, listener, rhc); +} + uint32_t dds_reader_lock_samples (dds_entity_t reader) { dds_reader *rd; diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index 1673930..f159422 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -14,6 +14,7 @@ #include "dds/ddsi/ddsi_rhc.h" #include "dds/ddsc/dds_rhc.h" +extern inline dds_return_t dds_rhc_associate (struct dds_rhc *rhc, struct dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap); extern inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk); extern inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info); extern inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid); diff --git a/src/core/ddsc/src/dds_rhc_default.c b/src/core/ddsc/src/dds_rhc_default.c index c1f6575..dee3eb6 100644 --- a/src/core/ddsc/src/dds_rhc_default.c +++ b/src/core/ddsc/src/dds_rhc_default.c @@ -390,6 +390,12 @@ static void dds_rhc_default_remove_readcondition_wrap (struct dds_rhc *rhc, dds_ static uint32_t dds_rhc_default_lock_samples_wrap (struct dds_rhc *rhc) { return dds_rhc_default_lock_samples ((struct dds_rhc_default *) rhc); } +static dds_return_t dds_rhc_default_associate (struct dds_rhc *rhc, dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap) +{ + /* ignored out of laziness */ + (void) rhc; (void) reader; (void) topic; (void) tkmap; + return DDS_RETCODE_OK; +} static const struct dds_rhc_ops dds_rhc_default_ops = { .rhc_ops = { @@ -404,7 +410,8 @@ static const struct dds_rhc_ops dds_rhc_default_ops = { .takecdr = dds_rhc_default_takecdr_wrap, .add_readcondition = dds_rhc_default_add_readcondition_wrap, .remove_readcondition = dds_rhc_default_remove_readcondition_wrap, - .lock_samples = dds_rhc_default_lock_samples_wrap + .lock_samples = dds_rhc_default_lock_samples_wrap, + .associate = dds_rhc_default_associate }; static unsigned qmask_of_sample (const struct rhc_sample *s) diff --git a/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h b/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h index aa4a207..2e57e08 100644 --- a/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h +++ b/src/core/ddsi/include/dds/ddsi/ddsi_rhc.h @@ -58,11 +58,11 @@ struct ddsi_rhc { const struct ddsi_rhc_ops *ops; }; -DDS_EXPORT inline bool ddsi_rhc_store (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) { - return rhc->ops->store (rhc, pwr_info, sample, tk); +DDS_EXPORT inline bool ddsi_rhc_store (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) { + return rhc->ops->store (rhc, wrinfo, sample, tk); } -DDS_EXPORT inline void ddsi_rhc_unregister_wr (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info) { - rhc->ops->unregister_wr (rhc, pwr_info); +DDS_EXPORT inline void ddsi_rhc_unregister_wr (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo) { + rhc->ops->unregister_wr (rhc, wrinfo); } DDS_EXPORT inline void ddsi_rhc_relinquish_ownership (struct ddsi_rhc * __restrict rhc, const uint64_t wr_iid) { rhc->ops->relinquish_ownership (rhc, wr_iid);