Rearrange things to make RHC interface public
This makes it possible to write one's own RHC implementation. This is not a stable interface. It shuffles a few things around and renames some types used throughout the code to stick to having a "dds" prefix for all the external things. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
801ae26872
commit
57d20e07a4
50 changed files with 574 additions and 555 deletions
|
@ -50,6 +50,7 @@ PREPEND(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include/dd
|
|||
ddsc/dds_public_qos.h
|
||||
ddsc/dds_public_qosdefs.h
|
||||
ddsc/dds_public_status.h
|
||||
ddsc/dds_rhc.h
|
||||
)
|
||||
|
||||
PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
|
||||
|
@ -67,7 +68,6 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
|
|||
dds__readcond.h
|
||||
dds__guardcond.h
|
||||
dds__reader.h
|
||||
dds__rhc.h
|
||||
dds__rhc_default.h
|
||||
dds__stream.h
|
||||
dds__subscriber.h
|
||||
|
|
|
@ -13,8 +13,7 @@
|
|||
#define _DDS_RHC_H_
|
||||
|
||||
#include "dds/ddsrt/static_assert.h"
|
||||
#include "dds/ddsi/q_rhc.h"
|
||||
#include "dds__types.h" /* for dds_readcond */
|
||||
#include "dds/ddsi/ddsi_rhc.h"
|
||||
|
||||
#define NO_STATE_MASK_SET (DDS_ANY_STATE + 1)
|
||||
|
||||
|
@ -23,20 +22,21 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
struct dds_rhc;
|
||||
struct dds_readcond;
|
||||
|
||||
typedef int (*dds_rhc_read_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
|
||||
typedef int (*dds_rhc_take_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
|
||||
typedef int (*dds_rhc_takecdr_t) (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
|
||||
|
||||
typedef bool (*dds_rhc_add_readcondition_t) (struct dds_readcond *cond);
|
||||
typedef void (*dds_rhc_remove_readcondition_t) (struct dds_readcond *cond);
|
||||
typedef bool (*dds_rhc_add_readcondition_t) (struct dds_rhc *rhc, struct dds_readcond *cond);
|
||||
typedef void (*dds_rhc_remove_readcondition_t) (struct dds_rhc *rhc, struct dds_readcond *cond);
|
||||
|
||||
typedef uint32_t (*dds_rhc_lock_samples_t) (struct dds_rhc *rhc);
|
||||
|
||||
struct dds_rhc_ops {
|
||||
/* A copy of DDSI rhc ops comes first so we can use either interface without
|
||||
additional indirections */
|
||||
struct rhc_ops rhc_ops;
|
||||
struct ddsi_rhc_ops rhc_ops;
|
||||
dds_rhc_read_t read;
|
||||
dds_rhc_take_t take;
|
||||
dds_rhc_takecdr_t takecdr;
|
||||
|
@ -48,16 +48,16 @@ struct dds_rhc_ops {
|
|||
struct dds_rhc {
|
||||
union {
|
||||
const struct dds_rhc_ops *ops;
|
||||
struct rhc rhc;
|
||||
struct ddsi_rhc rhc;
|
||||
} common;
|
||||
};
|
||||
|
||||
DDSRT_STATIC_ASSERT (offsetof (struct dds_rhc, common.ops) == offsetof (struct rhc, ops));
|
||||
DDSRT_STATIC_ASSERT (offsetof (struct dds_rhc, common.ops) == offsetof (struct ddsi_rhc, ops));
|
||||
|
||||
DDS_EXPORT inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) {
|
||||
DDS_EXPORT inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) {
|
||||
return rhc->common.ops->rhc_ops.store (&rhc->common.rhc, pwr_info, sample, tk);
|
||||
}
|
||||
DDS_EXPORT inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info) {
|
||||
DDS_EXPORT inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info) {
|
||||
rhc->common.ops->rhc_ops.unregister_wr (&rhc->common.rhc, pwr_info);
|
||||
}
|
||||
DDS_EXPORT inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid) {
|
||||
|
@ -78,11 +78,11 @@ DDS_EXPORT inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **value
|
|||
DDS_EXPORT inline int dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
|
||||
return rhc->common.ops->takecdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
|
||||
}
|
||||
DDS_EXPORT inline bool dds_rhc_add_readcondition (struct dds_readcond *cond) {
|
||||
return cond->m_rhc->common.ops->add_readcondition (cond);
|
||||
DDS_EXPORT inline bool dds_rhc_add_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond) {
|
||||
return rhc->common.ops->add_readcondition (rhc, cond);
|
||||
}
|
||||
DDS_EXPORT inline void dds_rhc_remove_readcondition (struct dds_readcond *cond) {
|
||||
cond->m_rhc->common.ops->remove_readcondition (cond);
|
||||
DDS_EXPORT inline void dds_rhc_remove_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond) {
|
||||
rhc->common.ops->remove_readcondition (rhc, cond);
|
||||
}
|
||||
DDS_EXPORT inline uint32_t dds_rhc_lock_samples (struct dds_rhc *rhc) {
|
||||
return rhc->common.ops->lock_samples (rhc);
|
|
@ -22,7 +22,7 @@ extern "C" {
|
|||
|
||||
struct ddsi_serdata_builtintopic {
|
||||
struct ddsi_serdata c;
|
||||
nn_guid_t key;
|
||||
ddsi_guid_t key;
|
||||
dds_instance_handle_t pphandle;
|
||||
dds_qos_t xqos;
|
||||
};
|
||||
|
|
|
@ -39,7 +39,7 @@ struct dds_guardcond;
|
|||
struct dds_statuscond;
|
||||
|
||||
struct ddsi_sertopic;
|
||||
struct rhc;
|
||||
struct ddsi_rhc;
|
||||
|
||||
typedef uint16_t status_mask_t;
|
||||
typedef ddsrt_atomic_uint32_t status_and_enabled_t;
|
||||
|
@ -124,7 +124,7 @@ typedef struct dds_entity {
|
|||
ddsrt_avl_tree_t m_children; /* [m_mutex] tree on m_iid using m_avlnode_child */
|
||||
struct dds_domain *m_domain; /* constant */
|
||||
dds_qos_t *m_qos; /* [m_mutex] */
|
||||
nn_guid_t m_guid; /* unique (if not 0) and constant; FIXME: set during creation, but possibly after becoming visible */
|
||||
ddsi_guid_t m_guid; /* unique (if not 0) and constant; FIXME: set during creation, but possibly after becoming visible */
|
||||
dds_instance_handle_t m_iid; /* unique for all time, constant; FIXME: like GUID */
|
||||
uint32_t m_flags; /* [m_mutex] */
|
||||
|
||||
|
@ -290,12 +290,10 @@ typedef uint32_t dds_querycond_mask_t;
|
|||
|
||||
typedef struct dds_readcond {
|
||||
dds_entity m_entity;
|
||||
struct dds_rhc *m_rhc;
|
||||
uint32_t m_qminv;
|
||||
uint32_t m_sample_states;
|
||||
uint32_t m_view_states;
|
||||
uint32_t m_instance_states;
|
||||
nn_guid_t m_rd_guid;
|
||||
struct dds_readcond *m_next;
|
||||
struct {
|
||||
dds_querycondition_filter_fn m_filter;
|
||||
|
|
|
@ -162,7 +162,7 @@ static bool dds__builtin_is_builtintopic (const struct ddsi_sertopic *tp, void *
|
|||
return tp->ops == &ddsi_sertopic_ops_builtintopic;
|
||||
}
|
||||
|
||||
static bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendorid, void *vdomain)
|
||||
static bool dds__builtin_is_visible (const ddsi_guid_t *guid, nn_vendorid_t vendorid, void *vdomain)
|
||||
{
|
||||
(void) vdomain;
|
||||
if (is_builtin_endpoint (guid->entityid, vendorid))
|
||||
|
@ -170,7 +170,7 @@ static bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendor
|
|||
return true;
|
||||
}
|
||||
|
||||
static struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid, void *vdomain)
|
||||
static struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct ddsi_guid *guid, void *vdomain)
|
||||
{
|
||||
struct dds_domain *domain = vdomain;
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
#include "dds/ddsrt/process.h"
|
||||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds__init.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds__domain.h"
|
||||
#include "dds__builtin.h"
|
||||
#include "dds__whc_builtintopic.h"
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#include "dds/ddsrt/process.h"
|
||||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds__init.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds__domain.h"
|
||||
#include "dds__builtin.h"
|
||||
#include "dds__whc_builtintopic.h"
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "dds__entity.h"
|
||||
#include "dds__write.h"
|
||||
#include "dds__writer.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds/ddsi/ddsi_tkmap.h"
|
||||
#include "dds/ddsi/ddsi_serdata.h"
|
||||
#include "dds/ddsi/q_entity.h"
|
||||
|
|
|
@ -123,10 +123,10 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
|
|||
}
|
||||
}
|
||||
|
||||
static dds_builtintopic_endpoint_t *make_builtintopic_endpoint (const nn_guid_t *guid, const nn_guid_t *ppguid, dds_instance_handle_t ppiid, const dds_qos_t *qos)
|
||||
static dds_builtintopic_endpoint_t *make_builtintopic_endpoint (const ddsi_guid_t *guid, const ddsi_guid_t *ppguid, dds_instance_handle_t ppiid, const dds_qos_t *qos)
|
||||
{
|
||||
dds_builtintopic_endpoint_t *ep;
|
||||
nn_guid_t tmp;
|
||||
ddsi_guid_t tmp;
|
||||
ep = dds_alloc (sizeof (*ep));
|
||||
tmp = nn_hton_guid (*guid);
|
||||
memcpy (&ep->key, &tmp, sizeof (ep->key));
|
||||
|
|
|
@ -77,7 +77,7 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
|
|||
{
|
||||
dds_domain *dom;
|
||||
dds_entity_t ret;
|
||||
nn_guid_t guid;
|
||||
ddsi_guid_t guid;
|
||||
dds_participant * pp;
|
||||
nn_plist_t plist;
|
||||
dds_qos_t *new_qos = NULL;
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
#include "dds__entity.h"
|
||||
#include "dds__reader.h"
|
||||
#include "dds/ddsi/ddsi_tkmap.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds/ddsi/q_thread.h"
|
||||
#include "dds/ddsi/q_ephash.h"
|
||||
#include "dds/ddsi/q_entity.h"
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
#include <assert.h>
|
||||
#include "dds__reader.h"
|
||||
#include "dds__readcond.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds__entity.h"
|
||||
#include "dds/ddsi/ddsi_iid.h"
|
||||
#include "dds/ddsi/q_ephash.h"
|
||||
|
@ -23,7 +23,9 @@ static dds_return_t dds_readcond_delete (dds_entity *e) ddsrt_nonnull_all;
|
|||
|
||||
static dds_return_t dds_readcond_delete (dds_entity *e)
|
||||
{
|
||||
dds_rhc_remove_readcondition ((dds_readcond *) e);
|
||||
struct dds_reader * const rd = (struct dds_reader *) e->m_parent;
|
||||
assert (dds_entity_kind (&rd->m_entity) == DDS_KIND_READER);
|
||||
dds_rhc_remove_readcondition (rd->m_rhc, (dds_readcond *) e);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
|
@ -41,17 +43,15 @@ dds_readcond *dds_create_readcond (dds_reader *rd, dds_entity_kind_t kind, uint3
|
|||
(void) dds_entity_init (&cond->m_entity, &rd->m_entity, kind, NULL, NULL, 0);
|
||||
cond->m_entity.m_iid = ddsi_iid_gen ();
|
||||
dds_entity_register_child (&rd->m_entity, &cond->m_entity);
|
||||
cond->m_rhc = rd->m_rhc;
|
||||
cond->m_sample_states = mask & DDS_ANY_SAMPLE_STATE;
|
||||
cond->m_view_states = mask & DDS_ANY_VIEW_STATE;
|
||||
cond->m_instance_states = mask & DDS_ANY_INSTANCE_STATE;
|
||||
cond->m_rd_guid = rd->m_entity.m_guid;
|
||||
if (kind == DDS_KIND_COND_QUERY)
|
||||
{
|
||||
cond->m_query.m_filter = filter;
|
||||
cond->m_query.m_qcmask = 0;
|
||||
}
|
||||
if (!dds_rhc_add_readcondition (cond))
|
||||
if (!dds_rhc_add_readcondition (rd->m_rhc, cond))
|
||||
{
|
||||
/* FIXME: current entity management code can't deal with an error late in the creation of the
|
||||
entity because it doesn't allow deleting it again ... */
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#include "dds__reader.h"
|
||||
#include "dds__listener.h"
|
||||
#include "dds__init.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds__rhc_default.h"
|
||||
#include "dds__topic.h"
|
||||
#include "dds__get_status.h"
|
||||
|
@ -437,7 +437,7 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb
|
|||
|
||||
dds_reader *dds_rd = (dds_reader *) dds_entity;
|
||||
struct reader *rd = dds_rd->m_rd;
|
||||
nn_guid_t pwrguid;
|
||||
ddsi_guid_t pwrguid;
|
||||
struct proxy_writer *pwr;
|
||||
struct rd_pwr_match *m;
|
||||
memset (&pwrguid, 0, sizeof (pwrguid));
|
||||
|
@ -450,7 +450,7 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb
|
|||
/* have to be careful walking the tree -- pretty is different, but
|
||||
I want to check this before I write a lookup_succ function. */
|
||||
struct rd_pwr_match *m_next;
|
||||
nn_guid_t pwrguid_next;
|
||||
ddsi_guid_t pwrguid_next;
|
||||
pwrguid = m->pwr_guid;
|
||||
if ((m_next = ddsrt_avl_find_succ (&rd_writers_treedef, &rd->writers, m)) != NULL)
|
||||
pwrguid_next = m_next->pwr_guid;
|
||||
|
|
|
@ -11,17 +11,17 @@
|
|||
*/
|
||||
|
||||
#include "dds/dds.h"
|
||||
#include "dds/ddsi/q_rhc.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsi/ddsi_rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
|
||||
extern inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk);
|
||||
extern inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);
|
||||
extern inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk);
|
||||
extern inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict pwr_info);
|
||||
extern inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid);
|
||||
extern inline void dds_rhc_set_qos (struct dds_rhc *rhc, const struct dds_qos *qos);
|
||||
extern inline void dds_rhc_free (struct dds_rhc *rhc);
|
||||
extern inline int dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
extern inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
extern inline int dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
|
||||
extern inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
|
||||
extern inline int dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
|
||||
extern inline bool dds_rhc_add_readcondition (struct dds_readcond *cond);
|
||||
extern inline void dds_rhc_remove_readcondition (struct dds_readcond *cond);
|
||||
extern inline bool dds_rhc_add_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond);
|
||||
extern inline void dds_rhc_remove_readcondition (struct dds_rhc *rhc, struct dds_readcond *cond);
|
||||
extern inline uint32_t dds_rhc_lock_samples (struct dds_rhc *rhc);
|
||||
|
|
|
@ -25,12 +25,12 @@
|
|||
|
||||
#include "dds__entity.h"
|
||||
#include "dds__reader.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds__rhc_default.h"
|
||||
#include "dds/ddsi/ddsi_tkmap.h"
|
||||
#include "dds/ddsrt/hopscotch.h"
|
||||
#include "dds/ddsrt/avl.h"
|
||||
#include "dds/ddsi/q_rhc.h"
|
||||
#include "dds/ddsi/ddsi_rhc.h"
|
||||
#include "dds/ddsi/q_xqos.h"
|
||||
#include "dds/ddsi/q_unused.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
|
@ -263,7 +263,7 @@ struct rhc_instance {
|
|||
uint32_t disposed_gen; /* bloody generation counters - worst invention of mankind */
|
||||
uint32_t no_writers_gen; /* __/ */
|
||||
int32_t strength; /* "current" ownership strength */
|
||||
nn_guid_t wr_guid; /* guid of last writer (if wr_iid != 0 then wr_guid is the corresponding guid, else undef) */
|
||||
ddsi_guid_t wr_guid; /* guid of last writer (if wr_iid != 0 then wr_guid is the corresponding guid, else undef) */
|
||||
nn_wctime_t tstamp; /* source time stamp of last update */
|
||||
struct rhc_instance *next; /* next non-empty instance in arbitrary ordering */
|
||||
struct rhc_instance *prev;
|
||||
|
@ -346,30 +346,30 @@ struct trigger_info_post {
|
|||
};
|
||||
|
||||
static void dds_rhc_default_free (struct dds_rhc_default *rhc);
|
||||
static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk);
|
||||
static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);
|
||||
static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk);
|
||||
static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo);
|
||||
static void dds_rhc_default_relinquish_ownership (struct dds_rhc_default * __restrict rhc, const uint64_t wr_iid);
|
||||
static void dds_rhc_default_set_qos (struct dds_rhc_default *rhc, const struct dds_qos *qos);
|
||||
static int dds_rhc_default_read (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
static int dds_rhc_default_take (struct dds_rhc_default *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond);
|
||||
static int dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
|
||||
static bool dds_rhc_default_add_readcondition (dds_readcond *cond);
|
||||
static void dds_rhc_default_remove_readcondition (dds_readcond *cond);
|
||||
static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond);
|
||||
static void dds_rhc_default_remove_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond);
|
||||
static uint32_t dds_rhc_default_lock_samples (struct dds_rhc_default *rhc);
|
||||
|
||||
static void dds_rhc_default_free_wrap (struct rhc *rhc) {
|
||||
static void dds_rhc_default_free_wrap (struct ddsi_rhc *rhc) {
|
||||
dds_rhc_default_free ((struct dds_rhc_default *) rhc);
|
||||
}
|
||||
static bool dds_rhc_default_store_wrap (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) {
|
||||
return dds_rhc_default_store ((struct dds_rhc_default *) rhc, pwr_info, sample, tk);
|
||||
static bool dds_rhc_default_store_wrap (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) {
|
||||
return dds_rhc_default_store ((struct dds_rhc_default *) rhc, wrinfo, sample, tk);
|
||||
}
|
||||
static void dds_rhc_default_unregister_wr_wrap (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info) {
|
||||
dds_rhc_default_unregister_wr ((struct dds_rhc_default *) rhc, pwr_info);
|
||||
static void dds_rhc_default_unregister_wr_wrap (struct ddsi_rhc * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo) {
|
||||
dds_rhc_default_unregister_wr ((struct dds_rhc_default *) rhc, wrinfo);
|
||||
}
|
||||
static void dds_rhc_default_relinquish_ownership_wrap (struct rhc * __restrict rhc, const uint64_t wr_iid) {
|
||||
static void dds_rhc_default_relinquish_ownership_wrap (struct ddsi_rhc * __restrict rhc, const uint64_t wr_iid) {
|
||||
dds_rhc_default_relinquish_ownership ((struct dds_rhc_default *) rhc, wr_iid);
|
||||
}
|
||||
static void dds_rhc_default_set_qos_wrap (struct rhc *rhc, const struct dds_qos *qos) {
|
||||
static void dds_rhc_default_set_qos_wrap (struct ddsi_rhc *rhc, const struct dds_qos *qos) {
|
||||
dds_rhc_default_set_qos ((struct dds_rhc_default *) rhc, qos);
|
||||
}
|
||||
static int dds_rhc_default_read_wrap (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, dds_readcond *cond) {
|
||||
|
@ -381,11 +381,11 @@ static int dds_rhc_default_take_wrap (struct dds_rhc *rhc, bool lock, void **val
|
|||
static int dds_rhc_default_takecdr_wrap (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
|
||||
return dds_rhc_default_takecdr ((struct dds_rhc_default *) rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
|
||||
}
|
||||
static bool dds_rhc_default_add_readcondition_wrap (dds_readcond *cond) {
|
||||
return dds_rhc_default_add_readcondition (cond);
|
||||
static bool dds_rhc_default_add_readcondition_wrap (struct dds_rhc *rhc, dds_readcond *cond) {
|
||||
return dds_rhc_default_add_readcondition ((struct dds_rhc_default *) rhc, cond);
|
||||
}
|
||||
static void dds_rhc_default_remove_readcondition_wrap (dds_readcond *cond) {
|
||||
dds_rhc_default_remove_readcondition (cond);
|
||||
static void dds_rhc_default_remove_readcondition_wrap (struct dds_rhc *rhc, dds_readcond *cond) {
|
||||
dds_rhc_default_remove_readcondition ((struct dds_rhc_default *) rhc, cond);
|
||||
}
|
||||
static uint32_t dds_rhc_default_lock_samples_wrap (struct dds_rhc *rhc) {
|
||||
return dds_rhc_default_lock_samples ((struct dds_rhc_default *) rhc);
|
||||
|
@ -758,7 +758,7 @@ static bool trigger_info_differs (const struct dds_rhc_default *rhc, const struc
|
|||
trig_qc->dec_sample_read != trig_qc->inc_sample_read);
|
||||
}
|
||||
|
||||
static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct proxy_writer_info *pwr_info, const struct ddsi_serdata *sample, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc)
|
||||
static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info *wrinfo, const struct ddsi_serdata *sample, status_cb_data_t *cb_data, struct trigger_info_qcond *trig_qc)
|
||||
{
|
||||
struct rhc_sample *s;
|
||||
|
||||
|
@ -832,7 +832,7 @@ static bool add_sample (struct dds_rhc_default *rhc, struct rhc_instance *inst,
|
|||
}
|
||||
|
||||
s->sample = ddsi_serdata_ref (sample); /* drops const (tho refcount does change) */
|
||||
s->wr_iid = pwr_info->iid;
|
||||
s->wr_iid = wrinfo->iid;
|
||||
s->isread = false;
|
||||
s->disposed_gen = inst->disposed_gen;
|
||||
s->no_writers_gen = inst->no_writers_gen;
|
||||
|
@ -867,12 +867,12 @@ static bool content_filter_accepts (const dds_reader *reader, const struct ddsi_
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct proxy_writer_info *pwr_info)
|
||||
static int inst_accepts_sample_by_writer_guid (const struct rhc_instance *inst, const struct ddsi_writer_info *wrinfo)
|
||||
{
|
||||
return (inst->wr_iid_islive && inst->wr_iid == pwr_info->iid) || memcmp (&pwr_info->guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0;
|
||||
return (inst->wr_iid_islive && inst->wr_iid == wrinfo->iid) || memcmp (&wrinfo->guid, &inst->wr_guid, sizeof (inst->wr_guid)) < 0;
|
||||
}
|
||||
|
||||
static int inst_accepts_sample (const struct dds_rhc_default *rhc, const struct rhc_instance *inst, const struct proxy_writer_info *pwr_info, const struct ddsi_serdata *sample, const bool has_data)
|
||||
static int inst_accepts_sample (const struct dds_rhc_default *rhc, const struct rhc_instance *inst, const struct ddsi_writer_info *wrinfo, const struct ddsi_serdata *sample, const bool has_data)
|
||||
{
|
||||
if (rhc->by_source_ordering)
|
||||
{
|
||||
|
@ -884,7 +884,7 @@ static int inst_accepts_sample (const struct dds_rhc_default *rhc, const struct
|
|||
{
|
||||
return 0;
|
||||
}
|
||||
else if (inst_accepts_sample_by_writer_guid (inst, pwr_info))
|
||||
else if (inst_accepts_sample_by_writer_guid (inst, wrinfo))
|
||||
{
|
||||
/* ok */
|
||||
}
|
||||
|
@ -893,14 +893,14 @@ static int inst_accepts_sample (const struct dds_rhc_default *rhc, const struct
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
if (rhc->exclusive_ownership && inst->wr_iid_islive && inst->wr_iid != pwr_info->iid)
|
||||
if (rhc->exclusive_ownership && inst->wr_iid_islive && inst->wr_iid != wrinfo->iid)
|
||||
{
|
||||
int32_t strength = pwr_info->ownership_strength;
|
||||
int32_t strength = wrinfo->ownership_strength;
|
||||
if (strength > inst->strength) {
|
||||
/* ok */
|
||||
} else if (strength < inst->strength) {
|
||||
return 0;
|
||||
} else if (inst_accepts_sample_by_writer_guid (inst, pwr_info)) {
|
||||
} else if (inst_accepts_sample_by_writer_guid (inst, wrinfo)) {
|
||||
/* ok */
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -913,17 +913,17 @@ static int inst_accepts_sample (const struct dds_rhc_default *rhc, const struct
|
|||
return 1;
|
||||
}
|
||||
|
||||
static void update_inst (struct rhc_instance *inst, const struct proxy_writer_info * __restrict pwr_info, bool wr_iid_valid, nn_wctime_t tstamp)
|
||||
static void update_inst (struct rhc_instance *inst, const struct ddsi_writer_info * __restrict wrinfo, bool wr_iid_valid, nn_wctime_t tstamp)
|
||||
{
|
||||
inst->tstamp = tstamp;
|
||||
inst->wr_iid_islive = wr_iid_valid;
|
||||
if (wr_iid_valid)
|
||||
{
|
||||
inst->wr_iid = pwr_info->iid;
|
||||
if (inst->wr_iid != pwr_info->iid)
|
||||
inst->wr_guid = pwr_info->guid;
|
||||
inst->wr_iid = wrinfo->iid;
|
||||
if (inst->wr_iid != wrinfo->iid)
|
||||
inst->wr_guid = wrinfo->guid;
|
||||
}
|
||||
inst->strength = pwr_info->ownership_strength;
|
||||
inst->strength = wrinfo->ownership_strength;
|
||||
}
|
||||
|
||||
static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, struct rhc_instance *inst)
|
||||
|
@ -1109,13 +1109,13 @@ static int rhc_unregister_isreg_w_sideeffects (struct dds_rhc_default *rhc, cons
|
|||
}
|
||||
}
|
||||
|
||||
static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp, struct trigger_info_qcond *trig_qc, bool *nda)
|
||||
static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info * __restrict wrinfo, nn_wctime_t tstamp, struct trigger_info_qcond *trig_qc, bool *nda)
|
||||
{
|
||||
assert (inst->wrcount > 0);
|
||||
|
||||
if (--inst->wrcount > 0)
|
||||
{
|
||||
if (inst->wr_iid_islive && pwr_info->iid == inst->wr_iid)
|
||||
if (inst->wr_iid_islive && wrinfo->iid == inst->wr_iid)
|
||||
{
|
||||
/* Next register will have to do real work before we have a cached
|
||||
wr_iid again */
|
||||
|
@ -1142,7 +1142,7 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
|
|||
if (inst->latest == NULL || inst->latest->isread)
|
||||
{
|
||||
inst_set_invsample (rhc, inst, trig_qc, nda);
|
||||
update_inst (inst, pwr_info, false, tstamp);
|
||||
update_inst (inst, wrinfo, false, tstamp);
|
||||
}
|
||||
if (!inst->isdisposed)
|
||||
{
|
||||
|
@ -1164,7 +1164,7 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
|
|||
TRACE (",#0,empty,nowriters");
|
||||
assert (inst_is_empty (inst));
|
||||
inst_set_invsample (rhc, inst, trig_qc, nda);
|
||||
update_inst (inst, pwr_info, false, tstamp);
|
||||
update_inst (inst, wrinfo, false, tstamp);
|
||||
account_for_empty_to_nonempty_transition (rhc, inst);
|
||||
inst->wr_iid_islive = 0;
|
||||
return 0;
|
||||
|
@ -1172,18 +1172,18 @@ static int rhc_unregister_updateinst (struct dds_rhc_default *rhc, struct rhc_in
|
|||
}
|
||||
}
|
||||
|
||||
static bool dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct proxy_writer_info * __restrict pwr_info, nn_wctime_t tstamp, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc)
|
||||
static bool dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance *inst, const struct ddsi_writer_info * __restrict wrinfo, nn_wctime_t tstamp, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc)
|
||||
{
|
||||
bool notify_data_available = false;
|
||||
|
||||
/* 'post' always gets set; instance may have been freed upon return. */
|
||||
TRACE (" unregister:");
|
||||
if (!rhc_unregister_isreg_w_sideeffects (rhc, inst, pwr_info->iid))
|
||||
if (!rhc_unregister_isreg_w_sideeffects (rhc, inst, wrinfo->iid))
|
||||
{
|
||||
/* other registrations remain */
|
||||
get_trigger_info_cmn (&post->c, inst);
|
||||
}
|
||||
else if (rhc_unregister_updateinst (rhc, inst, pwr_info, tstamp, trig_qc, ¬ify_data_available))
|
||||
else if (rhc_unregister_updateinst (rhc, inst, wrinfo, tstamp, trig_qc, ¬ify_data_available))
|
||||
{
|
||||
/* instance dropped */
|
||||
init_trigger_info_cmn_nonmatch (&post->c);
|
||||
|
@ -1196,7 +1196,7 @@ static bool dds_rhc_unregister (struct dds_rhc_default *rhc, struct rhc_instance
|
|||
return notify_data_available;
|
||||
}
|
||||
|
||||
static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
|
||||
static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
|
||||
{
|
||||
struct rhc_instance *inst;
|
||||
|
||||
|
@ -1210,11 +1210,11 @@ static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rh
|
|||
inst->isnew = 1;
|
||||
inst->a_sample_free = 1;
|
||||
inst->conds = 0;
|
||||
inst->wr_iid = pwr_info->iid;
|
||||
inst->wr_iid = wrinfo->iid;
|
||||
inst->wr_iid_islive = (inst->wrcount != 0);
|
||||
inst->wr_guid = pwr_info->guid;
|
||||
inst->wr_guid = wrinfo->guid;
|
||||
inst->tstamp = serdata->timestamp;
|
||||
inst->strength = pwr_info->ownership_strength;
|
||||
inst->strength = wrinfo->ownership_strength;
|
||||
|
||||
if (rhc->nqconds != 0)
|
||||
{
|
||||
|
@ -1230,7 +1230,7 @@ static struct rhc_instance *alloc_new_instance (const struct dds_rhc_default *rh
|
|||
return inst;
|
||||
}
|
||||
|
||||
static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst, struct dds_rhc_default *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk, const bool has_data, status_cb_data_t *cb_data, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc)
|
||||
static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst, struct dds_rhc_default *rhc, const struct ddsi_writer_info *wrinfo, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk, const bool has_data, status_cb_data_t *cb_data, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc)
|
||||
{
|
||||
struct rhc_instance *inst;
|
||||
int ret;
|
||||
|
@ -1265,10 +1265,10 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
|
|||
return RHC_REJECTED;
|
||||
}
|
||||
|
||||
inst = alloc_new_instance (rhc, pwr_info, sample, tk);
|
||||
inst = alloc_new_instance (rhc, wrinfo, sample, tk);
|
||||
if (has_data)
|
||||
{
|
||||
if (!add_sample (rhc, inst, pwr_info, sample, cb_data, trig_qc))
|
||||
if (!add_sample (rhc, inst, wrinfo, sample, cb_data, trig_qc))
|
||||
{
|
||||
free_empty_instance (inst, rhc);
|
||||
return RHC_REJECTED;
|
||||
|
@ -1298,9 +1298,9 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
|
|||
delivered (true unless a reliable sample rejected).
|
||||
*/
|
||||
|
||||
static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk)
|
||||
static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk)
|
||||
{
|
||||
const uint64_t wr_iid = pwr_info->iid;
|
||||
const uint64_t wr_iid = wrinfo->iid;
|
||||
const unsigned statusinfo = sample->statusinfo;
|
||||
const bool has_data = (sample->kind == SDK_DATA);
|
||||
const int is_dispose = (statusinfo & NN_STATUSINFO_DISPOSE) != 0;
|
||||
|
@ -1347,7 +1347,7 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
|
|||
else
|
||||
{
|
||||
TRACE (" new instance");
|
||||
stored = rhc_store_new_instance (&inst, rhc, pwr_info, sample, tk, has_data, &cb_data, &post, &trig_qc);
|
||||
stored = rhc_store_new_instance (&inst, rhc, wrinfo, sample, tk, has_data, &cb_data, &post, &trig_qc);
|
||||
if (stored != RHC_STORED)
|
||||
{
|
||||
goto error_or_nochange;
|
||||
|
@ -1356,7 +1356,7 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
|
|||
notify_data_available = true;
|
||||
}
|
||||
}
|
||||
else if (!inst_accepts_sample (rhc, inst, pwr_info, sample, has_data))
|
||||
else if (!inst_accepts_sample (rhc, inst, wrinfo, sample, has_data))
|
||||
{
|
||||
/* Rejected samples (and disposes) should still register the writer;
|
||||
unregister *must* be processed, or we have a memory leak. (We
|
||||
|
@ -1372,7 +1372,7 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
|
|||
}
|
||||
if (statusinfo & NN_STATUSINFO_UNREGISTER)
|
||||
{
|
||||
if (dds_rhc_unregister (rhc, inst, pwr_info, sample->timestamp, &post, &trig_qc))
|
||||
if (dds_rhc_unregister (rhc, inst, wrinfo, sample->timestamp, &post, &trig_qc))
|
||||
notify_data_available = true;
|
||||
}
|
||||
else
|
||||
|
@ -1450,7 +1450,7 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
|
|||
if (has_data)
|
||||
{
|
||||
TRACE (" add_sample");
|
||||
if (!add_sample (rhc, inst, pwr_info, sample, &cb_data, &trig_qc))
|
||||
if (!add_sample (rhc, inst, wrinfo, sample, &cb_data, &trig_qc))
|
||||
{
|
||||
TRACE ("(reject)");
|
||||
stored = RHC_REJECTED;
|
||||
|
@ -1469,7 +1469,7 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
|
|||
if (inst_became_disposed && inst->latest == NULL)
|
||||
inst_set_invsample (rhc, inst, &trig_qc, ¬ify_data_available);
|
||||
|
||||
update_inst (inst, pwr_info, true, sample->timestamp);
|
||||
update_inst (inst, wrinfo, true, sample->timestamp);
|
||||
|
||||
/* Can only add samples => only need to give special treatment
|
||||
to instances that were empty before. It is, however, not
|
||||
|
@ -1513,7 +1513,7 @@ static bool dds_rhc_default_store (struct dds_rhc_default * __restrict rhc, cons
|
|||
mean an application reading "x" after the write and reading it
|
||||
again after the unregister will see a change in the
|
||||
no_writers_generation field? */
|
||||
dds_rhc_unregister (rhc, inst, pwr_info, sample->timestamp, &post, &trig_qc);
|
||||
dds_rhc_unregister (rhc, inst, wrinfo, sample->timestamp, &post, &trig_qc);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1559,7 +1559,7 @@ error_or_nochange:
|
|||
return delivered;
|
||||
}
|
||||
|
||||
static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info)
|
||||
static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict rhc, const struct ddsi_writer_info * __restrict wrinfo)
|
||||
{
|
||||
/* Only to be called when writer with ID WR_IID has died.
|
||||
|
||||
|
@ -1579,8 +1579,8 @@ static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict r
|
|||
bool notify_data_available = false;
|
||||
struct rhc_instance *inst;
|
||||
struct ddsrt_hh_iter iter;
|
||||
const uint64_t wr_iid = pwr_info->iid;
|
||||
const int auto_dispose = pwr_info->auto_dispose;
|
||||
const uint64_t wr_iid = wrinfo->iid;
|
||||
const int auto_dispose = wrinfo->auto_dispose;
|
||||
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
|
||||
|
@ -1620,7 +1620,7 @@ static void dds_rhc_default_unregister_wr (struct dds_rhc_default * __restrict r
|
|||
}
|
||||
}
|
||||
|
||||
dds_rhc_unregister (rhc, inst, pwr_info, inst->tstamp, &post, &trig_qc);
|
||||
dds_rhc_unregister (rhc, inst, wrinfo, inst->tstamp, &post, &trig_qc);
|
||||
|
||||
TRACE ("\n");
|
||||
|
||||
|
@ -2292,13 +2292,12 @@ static bool cond_is_sample_state_dependent (const struct dds_readcond *cond)
|
|||
}
|
||||
}
|
||||
|
||||
static bool dds_rhc_default_add_readcondition (dds_readcond *cond)
|
||||
static bool dds_rhc_default_add_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond)
|
||||
{
|
||||
/* On the assumption that a readcondition will be attached to a
|
||||
waitset for nearly all of its life, we keep track of all
|
||||
readconditions on a reader in one set, without distinguishing
|
||||
between those attached to a waitset or not. */
|
||||
struct dds_rhc_default *rhc = (struct dds_rhc_default *) cond->m_rhc;
|
||||
struct ddsrt_hh_iter it;
|
||||
|
||||
assert ((dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_READ && cond->m_query.m_filter == 0) ||
|
||||
|
@ -2397,9 +2396,8 @@ static bool dds_rhc_default_add_readcondition (dds_readcond *cond)
|
|||
return true;
|
||||
}
|
||||
|
||||
static void dds_rhc_default_remove_readcondition (dds_readcond *cond)
|
||||
static void dds_rhc_default_remove_readcondition (struct dds_rhc_default *rhc, dds_readcond *cond)
|
||||
{
|
||||
struct dds_rhc_default *rhc = (struct dds_rhc_default *) cond->m_rhc;
|
||||
dds_readcond **ptr;
|
||||
ddsrt_mutex_lock (&rhc->lock);
|
||||
ptr = &rhc->conds;
|
||||
|
|
|
@ -32,7 +32,7 @@ static const uint64_t unihashconsts[] = {
|
|||
UINT64_C (16728792139623414127)
|
||||
};
|
||||
|
||||
static uint32_t hash_guid (const nn_guid_t *g)
|
||||
static uint32_t hash_guid (const ddsi_guid_t *g)
|
||||
{
|
||||
return
|
||||
(uint32_t) (((((uint32_t) g->prefix.u[0] + unihashconsts[0]) *
|
||||
|
@ -131,7 +131,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi
|
|||
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
|
||||
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
|
||||
/* keyhash must in host format (which the GUIDs always are internally) */
|
||||
struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const nn_guid_t *) keyhash->value);
|
||||
struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const ddsi_guid_t *) keyhash->value);
|
||||
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
|
||||
memcpy (&d->key, keyhash->value, sizeof (d->key));
|
||||
if (entity)
|
||||
|
@ -175,9 +175,9 @@ static struct ddsi_serdata *serdata_builtin_to_topicless (const struct ddsi_serd
|
|||
return ddsi_serdata_ref (serdata_common);
|
||||
}
|
||||
|
||||
static void convkey (dds_builtintopic_guid_t *key, const nn_guid_t *guid)
|
||||
static void convkey (dds_builtintopic_guid_t *key, const ddsi_guid_t *guid)
|
||||
{
|
||||
nn_guid_t tmp;
|
||||
ddsi_guid_t tmp;
|
||||
tmp = nn_hton_guid (*guid);
|
||||
memcpy (key, &tmp, sizeof (*key));
|
||||
}
|
||||
|
@ -214,7 +214,7 @@ static bool to_sample_pp (const struct ddsi_serdata_builtintopic *d, struct dds_
|
|||
|
||||
static bool to_sample_endpoint (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_endpoint *sample)
|
||||
{
|
||||
nn_guid_t ppguid;
|
||||
ddsi_guid_t ppguid;
|
||||
convkey (&sample->key, &d->key);
|
||||
ppguid = d->key;
|
||||
ppguid.entityid.u = NN_ENTITYID_PARTICIPANT;
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#include "dds__querycond.h"
|
||||
#include "dds__readcond.h"
|
||||
#include "dds__init.h"
|
||||
#include "dds__rhc.h"
|
||||
#include "dds/ddsc/dds_rhc.h"
|
||||
#include "dds/ddsi/ddsi_iid.h"
|
||||
|
||||
DEFINE_ENTITY_LOCK_UNLOCK_ONLY (static, dds_waitset, DDS_KIND_WAITSET)
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "dds/ddsi/ddsi_tkmap.h"
|
||||
#include "dds/ddsi/q_thread.h"
|
||||
#include "dds/ddsi/q_xmsg.h"
|
||||
#include "dds/ddsi/q_rhc.h"
|
||||
#include "dds/ddsi/ddsi_rhc.h"
|
||||
#include "dds/ddsi/ddsi_serdata.h"
|
||||
#include "dds__stream.h"
|
||||
#include "dds/ddsi/q_transmit.h"
|
||||
|
@ -71,9 +71,9 @@ dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t tim
|
|||
return ret;
|
||||
}
|
||||
|
||||
static dds_return_t try_store (struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms)
|
||||
static dds_return_t try_store (struct ddsi_rhc *rhc, const struct ddsi_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms)
|
||||
{
|
||||
while (! rhc_store (rhc, pwr_info, payload, tk))
|
||||
while (! ddsi_rhc_store (rhc, pwr_info, payload, tk))
|
||||
{
|
||||
if (*max_block_ms > 0)
|
||||
{
|
||||
|
@ -98,8 +98,8 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
|
|||
if (rdary[0])
|
||||
{
|
||||
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
|
||||
struct proxy_writer_info pwr_info;
|
||||
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
|
||||
struct ddsi_writer_info pwr_info;
|
||||
ddsi_make_writer_info (&pwr_info, &wr->e, wr->xqos);
|
||||
for (uint32_t i = 0; rdary[i]; i++) {
|
||||
DDS_CTRACE (&wr->e.gv->logconfig, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
|
||||
if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
|
||||
|
@ -120,11 +120,11 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
|
|||
reliable samples that are rejected are simply discarded. */
|
||||
ddsrt_avl_iter_t it;
|
||||
struct pwr_rd_match *m;
|
||||
struct proxy_writer_info pwr_info;
|
||||
struct ddsi_writer_info wrinfo;
|
||||
const struct ephash *gh = wr->e.gv->guid_hash;
|
||||
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
|
||||
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
|
||||
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
|
||||
ddsi_make_writer_info (&wrinfo, &wr->e, wr->xqos);
|
||||
ddsrt_mutex_lock (&wr->e.lock);
|
||||
for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
|
@ -133,7 +133,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
|
|||
{
|
||||
DDS_CTRACE (&wr->e.gv->logconfig, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
|
||||
/* Copied the return value ignore from DDSI deliver_user_data () function. */
|
||||
if ((ret = try_store (rd->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
|
||||
if ((ret = try_store (rd->rhc, &wrinfo, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue