Abstract RHC interface

This makes it possible to use a different RHC implementations for
different readers and removes the need for the RHC interface to be part
of the global state.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-06-27 09:51:12 +02:00 committed by eboasson
parent 483f4d2b77
commit 2e9ce9b4c1
20 changed files with 3047 additions and 2872 deletions

View file

@ -19,6 +19,7 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds_init.c
dds_publisher.c
dds_rhc.c
dds_rhc_default.c
dds_domain.c
dds_instance.c
dds_qos.c
@ -67,6 +68,7 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds__guardcond.h
dds__reader.h
dds__rhc.h
dds__rhc_default.h
dds__stream.h
dds__subscriber.h
dds__topic.h

View file

@ -12,60 +12,81 @@
#ifndef _DDS_RHC_H_
#define _DDS_RHC_H_
#include "dds/ddsrt/static_assert.h"
#include "dds/ddsi/q_rhc.h"
#include "dds__types.h" /* for dds_readcond */
#define NO_STATE_MASK_SET (DDS_ANY_STATE + 1)
#if defined (__cplusplus)
extern "C" {
#endif
struct rhc;
struct dds_qos;
struct ddsi_serdata;
struct ddsi_tkmap_instance;
struct proxy_writer_info;
struct dds_rhc;
DDS_EXPORT struct rhc *dds_rhc_new (dds_reader *reader, const struct ddsi_sertopic *topic);
DDS_EXPORT void dds_rhc_free (struct rhc *rhc);
typedef int (*dds_rhc_read_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
typedef int (*dds_rhc_take_t) (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond);
typedef int (*dds_rhc_takecdr_t) (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle);
DDS_EXPORT uint32_t dds_rhc_lock_samples (struct rhc *rhc);
typedef bool (*dds_rhc_add_readcondition_t) (struct dds_readcond *cond);
typedef void (*dds_rhc_remove_readcondition_t) (struct dds_readcond *cond);
DDS_EXPORT bool dds_rhc_store (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk);
DDS_EXPORT void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info);
DDS_EXPORT void dds_rhc_relinquish_ownership (struct rhc * __restrict rhc, const uint64_t wr_iid);
typedef uint32_t (*dds_rhc_lock_samples_t) (struct dds_rhc *rhc);
DDS_EXPORT int
dds_rhc_read(
struct rhc *rhc,
bool lock,
void ** values,
dds_sample_info_t *info_seq,
uint32_t max_samples,
uint32_t mask,
dds_instance_handle_t handle,
dds_readcond *cond);
DDS_EXPORT int
dds_rhc_take(
struct rhc *rhc,
bool lock,
void ** values,
dds_sample_info_t *info_seq,
uint32_t max_samples,
uint32_t mask,
dds_instance_handle_t handle,
dds_readcond *cond);
struct dds_rhc_ops {
/* A copy of DDSI rhc ops comes first so we can use either interface without
additional indirections */
struct rhc_ops rhc_ops;
dds_rhc_read_t read;
dds_rhc_take_t take;
dds_rhc_takecdr_t takecdr;
dds_rhc_add_readcondition_t add_readcondition;
dds_rhc_remove_readcondition_t remove_readcondition;
dds_rhc_lock_samples_t lock_samples;
};
DDS_EXPORT void dds_rhc_set_qos (struct rhc * rhc, const struct dds_qos * qos);
struct dds_rhc {
union {
const struct dds_rhc_ops *ops;
struct rhc rhc;
} common;
};
DDS_EXPORT bool dds_rhc_add_readcondition (dds_readcond * cond);
DDS_EXPORT void dds_rhc_remove_readcondition (dds_readcond * cond);
DDSRT_STATIC_ASSERT (offsetof (struct dds_rhc, common.ops) == offsetof (struct rhc, ops));
DDS_EXPORT int dds_rhc_takecdr
(
struct rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq,
uint32_t max_samples, uint32_t sample_states,
uint32_t view_states, uint32_t instance_states,
dds_instance_handle_t handle
);
DDS_EXPORT inline bool dds_rhc_store (struct dds_rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info, struct ddsi_serdata * __restrict sample, struct ddsi_tkmap_instance * __restrict tk) {
return rhc->common.ops->rhc_ops.store (&rhc->common.rhc, pwr_info, sample, tk);
}
DDS_EXPORT inline void dds_rhc_unregister_wr (struct dds_rhc * __restrict rhc, const struct proxy_writer_info * __restrict pwr_info) {
rhc->common.ops->rhc_ops.unregister_wr (&rhc->common.rhc, pwr_info);
}
DDS_EXPORT inline void dds_rhc_relinquish_ownership (struct dds_rhc * __restrict rhc, const uint64_t wr_iid) {
rhc->common.ops->rhc_ops.relinquish_ownership (&rhc->common.rhc, wr_iid);
}
DDS_EXPORT inline void dds_rhc_set_qos (struct dds_rhc *rhc, const struct dds_qos *qos) {
rhc->common.ops->rhc_ops.set_qos (&rhc->common.rhc, qos);
}
DDS_EXPORT inline void dds_rhc_free (struct dds_rhc *rhc) {
rhc->common.ops->rhc_ops.free (&rhc->common.rhc);
}
DDS_EXPORT inline int dds_rhc_read (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) {
return rhc->common.ops->read (rhc, lock, values, info_seq, max_samples, mask, handle, cond);
}
DDS_EXPORT inline int dds_rhc_take (struct dds_rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t mask, dds_instance_handle_t handle, struct dds_readcond *cond) {
return rhc->common.ops->take (rhc, lock, values, info_seq, max_samples, mask, handle, cond);
}
DDS_EXPORT inline int dds_rhc_takecdr (struct dds_rhc *rhc, bool lock, struct ddsi_serdata **values, dds_sample_info_t *info_seq, uint32_t max_samples, uint32_t sample_states, uint32_t view_states, uint32_t instance_states, dds_instance_handle_t handle) {
return rhc->common.ops->takecdr (rhc, lock, values, info_seq, max_samples, sample_states, view_states, instance_states, handle);
}
DDS_EXPORT inline bool dds_rhc_add_readcondition (struct dds_readcond *cond) {
return cond->m_rhc->common.ops->add_readcondition (cond);
}
DDS_EXPORT inline void dds_rhc_remove_readcondition (struct dds_readcond *cond) {
cond->m_rhc->common.ops->remove_readcondition (cond);
}
DDS_EXPORT inline uint32_t dds_rhc_lock_samples (struct dds_rhc *rhc) {
return rhc->common.ops->lock_samples (rhc);
}
#if defined (__cplusplus)
}

View file

@ -0,0 +1,28 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef _DDS_RHC_DEFAULT_H_
#define _DDS_RHC_DEFAULT_H_
#if defined (__cplusplus)
extern "C" {
#endif
struct dds_rhc;
struct dds_reader;
struct ddsi_sertopic;
DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic);
#if defined (__cplusplus)
}
#endif
#endif

View file

@ -211,6 +211,7 @@ typedef struct dds_participant {
typedef struct dds_reader {
struct dds_entity m_entity;
const struct dds_topic *m_topic;
struct dds_rhc *m_rhc; /* aliases m_rd->rhc with a wider interface, FIXME: but m_rd owns it for resource management */
struct reader *m_rd;
bool m_data_on_readers;
bool m_loan_out;
@ -263,7 +264,7 @@ typedef uint32_t dds_querycond_mask_t;
typedef struct dds_readcond {
dds_entity m_entity;
struct rhc *m_rhc;
struct dds_rhc *m_rhc;
uint32_t m_qminv;
uint32_t m_sample_states;
uint32_t m_view_states;

View file

@ -1,5 +1,5 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
* Copyright(c) 2006 to 2019 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at

View file

@ -272,12 +272,6 @@ void ddsi_plugin_init (void)
ddsi_plugin.builtintopic_is_visible = dds__builtin_is_visible;
ddsi_plugin.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry;
ddsi_plugin.builtintopic_write = dds__builtin_write;
ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free;
ddsi_plugin.rhc_plugin.rhc_store_fn = dds_rhc_store;
ddsi_plugin.rhc_plugin.rhc_unregister_wr_fn = dds_rhc_unregister_wr;
ddsi_plugin.rhc_plugin.rhc_relinquish_ownership_fn = dds_rhc_relinquish_ownership;
ddsi_plugin.rhc_plugin.rhc_set_qos_fn = dds_rhc_set_qos;
}
//provides explicit default domain id.

View file

@ -117,9 +117,9 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
if (take)
ret = dds_rhc_take (rd->m_rd->rhc, lock, buf, si, maxs, mask, hand, cond);
ret = dds_rhc_take (rd->m_rhc, lock, buf, si, maxs, mask, hand, cond);
else
ret = dds_rhc_read (rd->m_rd->rhc, lock, buf, si, maxs, mask, hand, cond);
ret = dds_rhc_read (rd->m_rhc, lock, buf, si, maxs, mask, hand, cond);
/* if no data read, restore the state to what it was before the call, with the sole
exception of holding on to a buffer we just allocated and that is pointed to by
@ -186,7 +186,7 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER);
dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
ret = dds_rhc_takecdr (rd->m_rd->rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand);
ret = dds_rhc_takecdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand);
dds_entity_unpin (entity);
fail_awake:

View file

@ -41,7 +41,7 @@ dds_readcond *dds_create_readcond (dds_reader *rd, dds_entity_kind_t kind, uint3
(void) dds_entity_init (&cond->m_entity, &rd->m_entity, kind, NULL, NULL, 0);
cond->m_entity.m_iid = ddsi_iid_gen ();
dds_entity_register_child (&rd->m_entity, &cond->m_entity);
cond->m_rhc = rd->m_rd->rhc;
cond->m_rhc = rd->m_rhc;
cond->m_sample_states = mask & DDS_ANY_SAMPLE_STATE;
cond->m_view_states = mask & DDS_ANY_VIEW_STATE;
cond->m_instance_states = mask & DDS_ANY_INSTANCE_STATE;

View file

@ -19,6 +19,7 @@
#include "dds__listener.h"
#include "dds__init.h"
#include "dds__rhc.h"
#include "dds__rhc_default.h"
#include "dds__topic.h"
#include "dds__get_status.h"
#include "dds__qos.h"
@ -296,7 +297,6 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
dds_subscriber *sub = NULL;
dds_entity_t subscriber;
dds_reader *rd;
struct rhc *rhc;
dds_topic *tp;
dds_entity_t reader;
dds_entity_t t;
@ -382,7 +382,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
reader = dds_entity_init (&rd->m_entity, &sub->m_entity, DDS_KIND_READER, rqos, listener, DDS_READER_STATUS_MASK);
rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED;
rd->m_topic = tp;
rhc = dds_rhc_new (rd, tp->m_stopic);
rd->m_rhc = dds_rhc_default_new (rd, tp->m_stopic);
dds_entity_add_ref_locked (&tp->m_entity);
/* Extra claim of this reader to make sure that the delete waits until DDSI
@ -393,7 +393,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
ddsrt_mutex_unlock (&sub->m_entity.m_mutex);
thread_state_awake (lookup_thread_state ());
ret = new_reader (&rd->m_rd, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic, rqos, rhc, dds_reader_status_cb, rd);
ret = new_reader (&rd->m_rd, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
ddsrt_mutex_lock (&sub->m_entity.m_mutex);
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
assert (ret == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */
@ -404,7 +404,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
/* For persistent data register reader with durability */
if (dds_global.m_dur_reader && (rd->m_entity.m_qos->durability.kind > DDS_DURABILITY_TRANSIENT_LOCAL)) {
(dds_global.m_dur_reader) (rd, rhc);
(dds_global.m_dur_reader) (rd, &rd->m_rhc->common.rhc);
}
dds_topic_unlock (tp);
dds_subscriber_unlock (sub);
@ -484,7 +484,7 @@ uint32_t dds_reader_lock_samples (dds_entity_t reader)
uint32_t n;
if (dds_reader_lock (reader, &rd) != DDS_RETCODE_OK)
return 0;
n = dds_rhc_lock_samples (rd->m_rd->rhc);
n = dds_rhc_lock_samples (rd->m_rhc);
dds_reader_unlock (rd);
return n;
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -16,6 +16,7 @@
#include "dds/ddsi/ddsi_tkmap.h"
#include "dds/ddsi/q_thread.h"
#include "dds/ddsi/q_xmsg.h"
#include "dds/ddsi/q_rhc.h"
#include "dds/ddsi/ddsi_serdata.h"
#include "dds__stream.h"
#include "dds/ddsi/q_transmit.h"
@ -71,7 +72,7 @@ dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t tim
static dds_return_t try_store (struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms)
{
while (!(ddsi_plugin.rhc_plugin.rhc_store_fn) (rhc, pwr_info, payload, tk))
while (! rhc_store (rhc, pwr_info, payload, tk))
{
if (*max_block_ms > 0)
{