Make it possible to create reader with custom RHC

The default RHC implementation is not always ideal and rather than
trying to squeeze everything in a fixed interface it makes more sense to
allow the caller to provide an arbitrary implementation of the
interface.

This is not yet a stable interface.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-09-02 09:08:13 +02:00 committed by eboasson
parent 57d20e07a4
commit fbc05777f3
7 changed files with 75 additions and 14 deletions

View file

@ -49,6 +49,7 @@ typedef int32_t dds_entity_t;
extern "C" { extern "C" {
#endif #endif
struct dds_rhc;
struct ddsi_serdata; struct ddsi_serdata;
#define DDS_MIN_PSEUDO_HANDLE ((dds_entity_t) 0x7fff0000) #define DDS_MIN_PSEUDO_HANDLE ((dds_entity_t) 0x7fff0000)
@ -1195,6 +1196,34 @@ dds_create_reader(
const dds_qos_t *qos, const dds_qos_t *qos,
const dds_listener_t *listener); const dds_listener_t *listener);
/**
* @brief Creates a new instance of a DDS reader with a custom history cache.
*
* This implicit subscriber will be deleted automatically when the created reader
* is deleted.
*
* @param[in] participant_or_subscriber The participant or subscriber on which the reader is being created.
* @param[in] topic The topic to read.
* @param[in] qos The QoS to set on the new reader (can be NULL).
* @param[in] listener Any listener functions associated with the new reader (can be NULL).
* @param[in] rhc Reader history cache to use, reader becomes the owner
*
* @returns A valid reader handle or an error code.
*
* @retval >0
* A valid reader handle.
* @retval DDS_RETCODE_ERROR
* An internal error occurred.
*/
/* TODO: Complete list of error codes */
DDS_EXPORT dds_entity_t
dds_create_reader_rhc(
dds_entity_t participant_or_subscriber,
dds_entity_t topic,
const dds_qos_t *qos,
const dds_listener_t *listener,
struct dds_rhc *rhc);
/** /**
* @brief Wait until reader receives all historic data * @brief Wait until reader receives all historic data
* *

View file

@ -23,7 +23,10 @@ extern "C" {
struct dds_rhc; struct dds_rhc;
struct dds_readcond; struct dds_readcond;
struct dds_reader;
struct ddsi_tkmap;
typedef dds_return_t (*dds_rhc_associate_t) (struct dds_rhc *rhc, struct dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap);
typedef int (*dds_rhc_read_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); typedef int (*dds_rhc_read_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
typedef int (*dds_rhc_take_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond); typedef int (*dds_rhc_take_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
typedef int (*dds_rhc_takecdr_t) (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle); typedef int (*dds_rhc_takecdr_t) (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
@ -43,6 +46,7 @@ struct dds_rhc_ops {
dds_rhc_add_readcondition_t add_readcondition; dds_rhc_add_readcondition_t add_readcondition;
dds_rhc_remove_readcondition_t remove_readcondition; dds_rhc_remove_readcondition_t remove_readcondition;
dds_rhc_lock_samples_t lock_samples; dds_rhc_lock_samples_t lock_samples;
dds_rhc_associate_t associate;
}; };
struct dds_rhc { struct dds_rhc {
@ -54,11 +58,14 @@ struct dds_rhc {
DDSRT_STATIC_ASSERT (offsetof (struct dds_rhc, common.ops) == offsetof (struct ddsi_rhc, ops)); DDSRT_STATIC_ASSERT (offsetof (struct dds_rhc, common.ops) == offsetof (struct ddsi_rhc, ops));
DDS_EXPORT inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) { DDS_EXPORT inline dds_return_t dds_rhc_associate (struct dds_rhc *rhc, struct dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap) {
return rhc->common.ops->rhc_ops.store (&rhc->common.rhc, pwr_info, sample, tk); return rhc->common.ops->associate (rhc, reader, topic, tkmap);
} }
DDS_EXPORT inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info) { DDS_EXPORT inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) {
rhc->common.ops->rhc_ops.unregister_wr (&rhc->common.rhc, pwr_info); return rhc->common.ops->rhc_ops.store (&rhc->common.rhc, wrinfo, sample, tk);
}
DDS_EXPORT inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo) {
rhc->common.ops->rhc_ops.unregister_wr (&rhc->common.rhc, wrinfo);
} }
DDS_EXPORT inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid) { DDS_EXPORT inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid) {
rhc->common.ops->rhc_ops.relinquish_ownership (&rhc->common.rhc, wr_iid); rhc->common.ops->rhc_ops.relinquish_ownership (&rhc->common.rhc, wr_iid);
@ -88,6 +95,8 @@ DDS_EXPORT inline uint32_t dds_rhc_lock_samples (struct dds_rhc *rhc) {
return rhc->common.ops->lock_samples (rhc); return rhc->common.ops->lock_samples (rhc);
} }
DDS_EXPORT void dds_reader_data_available_cb (struct dds_reader *rd);
#if defined (__cplusplus) #if defined (__cplusplus)
} }
#endif #endif

View file

@ -23,8 +23,6 @@ struct status_cb_data;
void dds_reader_status_cb (void *entity, const struct status_cb_data * data); void dds_reader_status_cb (void *entity, const struct status_cb_data * data);
void dds_reader_data_available_cb (struct dds_reader *entity);
/* /*
dds_reader_lock_samples: Returns number of samples in read cache and locks the dds_reader_lock_samples: Returns number of samples in read cache and locks the
reader cache to make sure that the samples content doesn't change. reader cache to make sure that the samples content doesn't change.

View file

@ -284,7 +284,7 @@ const struct dds_entity_deriver dds_entity_deriver_reader = {
.validate_status = dds_reader_status_validate .validate_status = dds_reader_status_validate
}; };
dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener) static dds_entity_t dds_create_reader_int (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener, struct dds_rhc *rhc)
{ {
dds_qos_t *rqos; dds_qos_t *rqos;
dds_subscriber *sub = NULL; dds_subscriber *sub = NULL;
@ -376,12 +376,17 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
goto err_bad_qos; goto err_bad_qos;
} }
/* Create reader and associated read cache */ /* Create reader and associated read cache (if not provided by caller) */
rd = dds_alloc (sizeof (*rd)); rd = dds_alloc (sizeof (*rd));
reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, rqos, listener, DDS_READER_STATUS_MASK); reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, rqos, listener, DDS_READER_STATUS_MASK);
rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED; rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED;
rd->m_topic = tp; rd->m_topic = tp;
rd->m_rhc = dds_rhc_default_new (rd, tp->m_stopic); rd->m_rhc = rhc ? rhc : dds_rhc_default_new (rd, tp->m_stopic);
if (dds_rhc_associate (rd->m_rhc, rd, tp->m_stopic, rd->m_entity.m_domain->gv.m_tkmap) < 0)
{
/* FIXME: see also create_querycond, need to be able to undo entity_init */
abort ();
}
dds_entity_add_ref_locked (&tp->m_entity); dds_entity_add_ref_locked (&tp->m_entity);
/* Extra claim of this reader to make sure that the delete waits until DDSI /* Extra claim of this reader to make sure that the delete waits until DDSI
@ -474,6 +479,18 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb
dds_entity_unpin (dds_entity); dds_entity_unpin (dds_entity);
} }
dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener)
{
return dds_create_reader_int (participant_or_subscriber, topic, qos, listener, NULL);
}
dds_entity_t dds_create_reader_rhc (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener, struct dds_rhc *rhc)
{
if (rhc == NULL)
return DDS_RETCODE_BAD_PARAMETER;
return dds_create_reader_int (participant_or_subscriber, topic, qos, listener, rhc);
}
uint32_t dds_reader_lock_samples (dds_entity_t reader) uint32_t dds_reader_lock_samples (dds_entity_t reader)
{ {
dds_reader *rd; dds_reader *rd;

View file

@ -14,6 +14,7 @@
#include "dds/ddsi/ddsi_rhc.h" #include "dds/ddsi/ddsi_rhc.h"
#include "dds/ddsc/dds_rhc.h" #include "dds/ddsc/dds_rhc.h"
extern inline dds_return_t dds_rhc_associate (struct dds_rhc *rhc, struct dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap);
extern inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk); extern inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk);
extern inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info); extern inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info);
extern inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid); extern inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid);

View file

@ -390,6 +390,12 @@ static void dds_rhc_default_remove_readcondition_wrap (struct dds_rhc *rhc, dds_
static uint32_t dds_rhc_default_lock_samples_wrap (struct dds_rhc *rhc) { static uint32_t dds_rhc_default_lock_samples_wrap (struct dds_rhc *rhc) {
return dds_rhc_default_lock_samples ((struct dds_rhc_default *) rhc); return dds_rhc_default_lock_samples ((struct dds_rhc_default *) rhc);
} }
static dds_return_t dds_rhc_default_associate (struct dds_rhc *rhc, dds_reader *reader, const struct ddsi_sertopic *topic, struct ddsi_tkmap *tkmap)
{
/* ignored out of laziness */
(void) rhc; (void) reader; (void) topic; (void) tkmap;
return DDS_RETCODE_OK;
}
static const struct dds_rhc_ops dds_rhc_default_ops = { static const struct dds_rhc_ops dds_rhc_default_ops = {
.rhc_ops = { .rhc_ops = {
@ -404,7 +410,8 @@ static const struct dds_rhc_ops dds_rhc_default_ops = {
.takecdr = dds_rhc_default_takecdr_wrap, .takecdr = dds_rhc_default_takecdr_wrap,
.add_readcondition = dds_rhc_default_add_readcondition_wrap, .add_readcondition = dds_rhc_default_add_readcondition_wrap,
.remove_readcondition = dds_rhc_default_remove_readcondition_wrap, .remove_readcondition = dds_rhc_default_remove_readcondition_wrap,
.lock_samples = dds_rhc_default_lock_samples_wrap .lock_samples = dds_rhc_default_lock_samples_wrap,
.associate = dds_rhc_default_associate
}; };
static unsigned qmask_of_sample (const struct rhc_sample *s) static unsigned qmask_of_sample (const struct rhc_sample *s)

View file

@ -58,11 +58,11 @@ struct ddsi_rhc {
const struct ddsi_rhc_ops *ops; const struct ddsi_rhc_ops *ops;
}; };
DDS_EXPORT inline bool ddsi_rhc_store (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) { DDS_EXPORT inline bool ddsi_rhc_store (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) {
return rhc->ops->store (rhc, pwr_info, sample, tk); return rhc->ops->store (rhc, wrinfo, sample, tk);
} }
DDS_EXPORT inline void ddsi_rhc_unregister_wr (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info) { DDS_EXPORT inline void ddsi_rhc_unregister_wr (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo) {
rhc->ops->unregister_wr (rhc, pwr_info); rhc->ops->unregister_wr (rhc, wrinfo);
} }
DDS_EXPORT inline void ddsi_rhc_relinquish_ownership (struct ddsi_rhc * __restrict rhc, const uint64_t wr_iid) { DDS_EXPORT inline void ddsi_rhc_relinquish_ownership (struct ddsi_rhc * __restrict rhc, const uint64_t wr_iid) {
rhc->ops->relinquish_ownership (rhc, wr_iid); rhc->ops->relinquish_ownership (rhc, wr_iid);