Address locking order for entity locks
There were inconsistencies in the order in which entity locks were taken when multiple entities needed to be locked at the same time. In most cases, the order was first locking entity X, then locking the parent entity of X. However, in some cases the order was reversed, a likely cause of deadlocks. This commit sorts these problems, and in particular propagating operations into children. The entity refcount is now part of the handle administration so that it is no longer necessary to lock an entity to determine whether it is still allowed to be used (previously it had to check the CLOSED flag afterward). This allows recursing into the children while holding handles and the underlying objects alive, but without violating lock order. Attendant changes that would warrant there own commits but are too hard to split off: * Children are now no longer in a singly linked list, but in an AVL tree; this was necessary at some intermediate stage to allow unlocking an entity and restarting iteration over all children at the "next" child (all thanks to the eternally unique instance handle); * Waitsets shifted to using arrays of attached entities instead of linked lists; this was a consequence of dealing with some locking issues in reading triggers and considering which operations on the "triggered" and "observed" sets are actually needed. * Entity status flags and waitset/condition trigger counts are now handled using atomic operations. Entities are now classified as having a "status" with a corresponding mask, or as having a "trigger count" (conditions). As there are fewer than 16 status bits, the status and its mask can squeeze into the same 32-bits as the trigger count. These atomic updates avoid the need for a separate lock just for the trigger/status values and results in a significant speedup with waitsets. * Create topic now has a more rational behaviour when multiple participants attempt to create the same topic: each participant now gets its own topic definition, but the underlying type representation is shared. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
f61b2d54da
commit
647f7466d6
26 changed files with 1224 additions and 1004 deletions
|
@ -79,7 +79,6 @@ DDS_EXPORT dds_domainid_t dds_domain_default (void);
|
|||
|
||||
/** @name Communication Status definitions
|
||||
@{**/
|
||||
/** Another topic exists with the same name but with different characteristics. */
|
||||
typedef enum dds_status_id {
|
||||
DDS_INCONSISTENT_TOPIC_STATUS_ID,
|
||||
DDS_OFFERED_DEADLINE_MISSED_STATUS_ID,
|
||||
|
@ -94,8 +93,9 @@ typedef enum dds_status_id {
|
|||
DDS_LIVELINESS_CHANGED_STATUS_ID,
|
||||
DDS_PUBLICATION_MATCHED_STATUS_ID,
|
||||
DDS_SUBSCRIPTION_MATCHED_STATUS_ID
|
||||
}
|
||||
dds_status_id_t;
|
||||
} dds_status_id_t;
|
||||
|
||||
/** Another topic exists with the same name but with different characteristics. */
|
||||
#define DDS_INCONSISTENT_TOPIC_STATUS (1u << DDS_INCONSISTENT_TOPIC_STATUS_ID)
|
||||
/** The deadline that the writer has committed through its deadline QoS policy was not respected for a specific instance. */
|
||||
#define DDS_OFFERED_DEADLINE_MISSED_STATUS (1u << DDS_OFFERED_DEADLINE_MISSED_STATUS_ID)
|
||||
|
|
|
@ -98,19 +98,18 @@ dds_topic_descriptor_t;
|
|||
|
||||
typedef enum dds_entity_kind
|
||||
{
|
||||
DDS_KIND_DONTCARE = 0x00000000,
|
||||
DDS_KIND_TOPIC = 0x01000000,
|
||||
DDS_KIND_PARTICIPANT = 0x02000000,
|
||||
DDS_KIND_READER = 0x03000000,
|
||||
DDS_KIND_WRITER = 0x04000000,
|
||||
DDS_KIND_SUBSCRIBER = 0x05000000,
|
||||
DDS_KIND_PUBLISHER = 0x06000000,
|
||||
DDS_KIND_COND_READ = 0x07000000,
|
||||
DDS_KIND_COND_QUERY = 0x08000000,
|
||||
DDS_KIND_COND_GUARD = 0x09000000,
|
||||
DDS_KIND_WAITSET = 0x0A000000
|
||||
}
|
||||
dds_entity_kind_t;
|
||||
DDS_KIND_DONTCARE,
|
||||
DDS_KIND_TOPIC,
|
||||
DDS_KIND_PARTICIPANT,
|
||||
DDS_KIND_READER,
|
||||
DDS_KIND_WRITER,
|
||||
DDS_KIND_SUBSCRIBER,
|
||||
DDS_KIND_PUBLISHER,
|
||||
DDS_KIND_COND_READ,
|
||||
DDS_KIND_COND_QUERY,
|
||||
DDS_KIND_COND_GUARD,
|
||||
DDS_KIND_WAITSET
|
||||
} dds_entity_kind_t;
|
||||
|
||||
/* Handles are opaque pointers to implementation types */
|
||||
typedef uint64_t dds_instance_handle_t;
|
||||
|
|
|
@ -26,20 +26,22 @@ dds_entity_init(
|
|||
dds_entity_kind_t kind,
|
||||
dds_qos_t * qos,
|
||||
const dds_listener_t *listener,
|
||||
uint32_t mask);
|
||||
status_mask_t mask);
|
||||
|
||||
DDS_EXPORT void
|
||||
dds_entity_add_ref(dds_entity *e);
|
||||
dds_entity_register_child (
|
||||
dds_entity *parent,
|
||||
dds_entity *child);
|
||||
|
||||
DDS_EXPORT void
|
||||
dds_entity_add_ref_nolock(dds_entity *e);
|
||||
dds_entity_add_ref_locked(dds_entity *e);
|
||||
|
||||
#define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \
|
||||
qualifier_ dds_return_t type_##_lock (dds_entity_t hdl, type_ **x) \
|
||||
{ \
|
||||
dds_return_t rc; \
|
||||
dds_entity *e; \
|
||||
if ((rc = dds_entity_lock (hdl, kind_, &e)) != DDS_RETCODE_OK) \
|
||||
if ((rc = dds_entity_lock (hdl, kind_, &e)) < 0) \
|
||||
return rc; \
|
||||
*x = (type_ *) e; \
|
||||
return DDS_RETCODE_OK; \
|
||||
|
@ -61,30 +63,27 @@ DDS_EXPORT inline bool dds_entity_is_enabled (const dds_entity *e) {
|
|||
return (e->m_flags & DDS_ENTITY_ENABLED) != 0;
|
||||
}
|
||||
|
||||
DDS_EXPORT void dds_entity_status_set (dds_entity *e, uint32_t t);
|
||||
DDS_EXPORT void dds_entity_status_set (dds_entity *e, status_mask_t t);
|
||||
DDS_EXPORT void dds_entity_trigger_set (dds_entity *e, uint32_t t);
|
||||
|
||||
DDS_EXPORT inline void dds_entity_status_reset (dds_entity *e, uint32_t t) {
|
||||
e->m_trigger &= ~t;
|
||||
}
|
||||
|
||||
DDS_EXPORT inline bool dds_entity_status_match (const dds_entity *e, uint32_t t) {
|
||||
return (e->m_trigger & t) != 0;
|
||||
DDS_EXPORT inline void dds_entity_status_reset (dds_entity *e, status_mask_t t) {
|
||||
ddsrt_atomic_and32 (&e->m_status.m_status_and_mask, SAM_ENABLED_MASK | (status_mask_t) ~t);
|
||||
}
|
||||
|
||||
DDS_EXPORT inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) {
|
||||
return e->m_kind;
|
||||
}
|
||||
|
||||
DDS_EXPORT void dds_entity_status_signal (dds_entity *e);
|
||||
DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status);
|
||||
|
||||
DDS_EXPORT void dds_entity_invoke_listener (const dds_entity *entity, enum dds_status_id which, const void *vst);
|
||||
|
||||
DDS_EXPORT dds_return_t
|
||||
dds_entity_claim (
|
||||
dds_entity_pin (
|
||||
dds_entity_t hdl,
|
||||
dds_entity **eptr);
|
||||
|
||||
DDS_EXPORT void dds_entity_release (
|
||||
DDS_EXPORT void dds_entity_unpin (
|
||||
dds_entity *e);
|
||||
|
||||
DDS_EXPORT dds_return_t
|
||||
|
@ -96,27 +95,17 @@ dds_entity_lock(
|
|||
DDS_EXPORT void
|
||||
dds_entity_unlock(dds_entity *e);
|
||||
|
||||
DDS_EXPORT dds_return_t
|
||||
dds_entity_observer_register_nl(
|
||||
dds_entity *observed,
|
||||
dds_entity_t observer,
|
||||
dds_entity_callback cb);
|
||||
|
||||
DDS_EXPORT dds_return_t
|
||||
dds_entity_observer_register(
|
||||
dds_entity_t observed,
|
||||
dds_entity_t observer,
|
||||
dds_entity_callback cb);
|
||||
|
||||
DDS_EXPORT dds_return_t
|
||||
dds_entity_observer_unregister_nl(
|
||||
dds_entity *observed,
|
||||
dds_entity_t observer);
|
||||
dds_entity *observer,
|
||||
dds_entity_callback cb,
|
||||
dds_entity_delete_callback delete_cb);
|
||||
|
||||
DDS_EXPORT dds_return_t
|
||||
dds_entity_observer_unregister(
|
||||
dds_entity_t observed,
|
||||
dds_entity_t observer);
|
||||
dds_entity *observed,
|
||||
dds_entity *observer);
|
||||
|
||||
DDS_EXPORT dds_return_t
|
||||
dds_delete_impl(
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
{ \
|
||||
if (status) \
|
||||
*status = ent->m_##status_##_status; \
|
||||
if (ent->m_entity.m_status_enable & DDS_##STATUS_##_STATUS) { \
|
||||
if (ddsrt_atomic_ld32 (&ent->m_entity.m_status.m_status_and_mask) & (DDS_##STATUS_##_STATUS << SAM_ENABLED_SHIFT)) { \
|
||||
do { DDS_GET_STATUS_LOCKED_RESET_N (DDSRT_COUNT_ARGS (__VA_ARGS__), status_, __VA_ARGS__) } while (0); \
|
||||
dds_entity_status_reset (&ent->m_entity, DDS_##STATUS_##_STATUS); \
|
||||
} \
|
||||
|
|
|
@ -145,13 +145,13 @@ dds_handle_delete(
|
|||
* Returns OK when succeeded.
|
||||
*/
|
||||
DDS_EXPORT int32_t
|
||||
dds_handle_claim(
|
||||
dds_handle_pin(
|
||||
dds_handle_t hdl,
|
||||
struct dds_handle_link **entity);
|
||||
|
||||
|
||||
DDS_EXPORT void
|
||||
dds_handle_claim_inc(
|
||||
dds_handle_repin(
|
||||
struct dds_handle_link *link);
|
||||
|
||||
|
||||
|
@ -159,7 +159,7 @@ dds_handle_claim_inc(
|
|||
* The active claims count is decreased.
|
||||
*/
|
||||
DDS_EXPORT void
|
||||
dds_handle_release(
|
||||
dds_handle_unpin(
|
||||
struct dds_handle_link *link);
|
||||
|
||||
|
||||
|
@ -177,6 +177,9 @@ dds_handle_is_closed(
|
|||
struct dds_handle_link *link);
|
||||
|
||||
|
||||
DDS_EXPORT void dds_handle_add_ref (struct dds_handle_link *link);
|
||||
DDS_EXPORT bool dds_handle_drop_ref (struct dds_handle_link *link);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -39,12 +39,11 @@ struct dds_statuscond;
|
|||
struct ddsi_sertopic;
|
||||
struct rhc;
|
||||
|
||||
/* Internal entity status flags */
|
||||
|
||||
#define DDS_INTERNAL_STATUS_MASK (0xFF000000u)
|
||||
|
||||
#define DDS_WAITSET_TRIGGER_STATUS (0x01000000u)
|
||||
#define DDS_DELETING_STATUS (0x02000000u)
|
||||
typedef uint16_t status_mask_t;
|
||||
typedef ddsrt_atomic_uint32_t status_and_enabled_t;
|
||||
#define SAM_STATUS_MASK 0xffffu
|
||||
#define SAM_ENABLED_MASK 0xffff0000u
|
||||
#define SAM_ENABLED_SHIFT 16
|
||||
|
||||
/* This can be used when polling for various states.
|
||||
* Obviously, it is encouraged to use condition variables and such. But
|
||||
|
@ -92,14 +91,12 @@ struct dds_listener {
|
|||
#define DDS_ENTITY_ENABLED 0x0001u
|
||||
#define DDS_ENTITY_IMPLICIT 0x0002u
|
||||
|
||||
typedef struct dds_domain
|
||||
{
|
||||
typedef struct dds_domain {
|
||||
ddsrt_avl_node_t m_node;
|
||||
dds_domainid_t m_id;
|
||||
ddsrt_avl_tree_t m_topics;
|
||||
uint32_t m_refc;
|
||||
}
|
||||
dds_domain;
|
||||
} dds_domain;
|
||||
|
||||
struct dds_entity;
|
||||
typedef struct dds_entity_deriver {
|
||||
|
@ -111,28 +108,28 @@ typedef struct dds_entity_deriver {
|
|||
dds_return_t (*validate_status)(uint32_t mask);
|
||||
} dds_entity_deriver;
|
||||
|
||||
typedef void (*dds_entity_callback)(dds_entity_t observer, dds_entity_t observed, uint32_t status);
|
||||
typedef void (*dds_entity_callback)(struct dds_entity *observer, dds_entity_t observed, uint32_t status);
|
||||
typedef void (*dds_entity_delete_callback)(struct dds_entity *observer, dds_entity_t observed);
|
||||
|
||||
typedef struct dds_entity_observer {
|
||||
dds_entity_callback m_cb;
|
||||
dds_entity_t m_observer;
|
||||
dds_entity_delete_callback m_delete_cb;
|
||||
struct dds_entity *m_observer;
|
||||
struct dds_entity_observer *m_next;
|
||||
} dds_entity_observer;
|
||||
|
||||
typedef struct dds_entity {
|
||||
struct dds_handle_link m_hdllink; /* handle is constant, cnt_flags private to dds_handle.c */
|
||||
dds_entity_kind_t m_kind; /* constant */
|
||||
dds_entity_deriver m_deriver; /* constant; FIXME: no point in having function pointers embedded */
|
||||
uint32_t m_refc; /* [m_mutex] */
|
||||
struct dds_entity *m_next; /* [m_mutex] */
|
||||
struct dds_entity *m_parent; /* constant */
|
||||
struct dds_entity *m_children; /* [m_mutex] */
|
||||
ddsrt_avl_node_t m_avlnode_child; /* [m_mutex of m_parent] */
|
||||
ddsrt_avl_tree_t m_children; /* [m_mutex] tree on m_iid using m_avlnode_child */
|
||||
struct dds_entity *m_participant; /* constant */
|
||||
struct dds_domain *m_domain; /* constant */
|
||||
dds_qos_t *m_qos; /* [m_mutex] */
|
||||
dds_domainid_t m_domainid; /* constant; FIXME: why? hardly ever used, m_domain should give that info, too */
|
||||
nn_guid_t m_guid; /* ~ constant: FIXME: set during creation, but possibly after becoming visible */
|
||||
dds_instance_handle_t m_iid; /* ~ constant: FIXME: like GUID */
|
||||
nn_guid_t m_guid; /* unique (if not 0) and constant; FIXME: set during creation, but possibly after becoming visible */
|
||||
dds_instance_handle_t m_iid; /* unique for all time, constant; FIXME: like GUID */
|
||||
uint32_t m_flags; /* [m_mutex] */
|
||||
|
||||
/* Allowed:
|
||||
|
@ -145,41 +142,72 @@ typedef struct dds_entity {
|
|||
ddsrt_mutex_t m_mutex;
|
||||
ddsrt_cond_t m_cond;
|
||||
|
||||
ddsrt_mutex_t m_observers_lock;
|
||||
union {
|
||||
status_and_enabled_t m_status_and_mask; /* for most entities */
|
||||
ddsrt_atomic_uint32_t m_trigger; /* for conditions & waitsets */
|
||||
} m_status;
|
||||
|
||||
ddsrt_mutex_t m_observers_lock; /* locking parent->...->m_observers_lock while holding it is allowed */
|
||||
ddsrt_cond_t m_observers_cond;
|
||||
dds_listener_t m_listener;
|
||||
uint32_t m_trigger;
|
||||
uint32_t m_status_enable;
|
||||
uint32_t m_cb_count;
|
||||
dds_entity_observer *m_observers;
|
||||
}
|
||||
dds_entity;
|
||||
dds_listener_t m_listener; /* [m_observers_lock] */
|
||||
uint32_t m_cb_count; /* [m_observers_lock] */
|
||||
dds_entity_observer *m_observers; /* [m_observers_lock] */
|
||||
} dds_entity;
|
||||
|
||||
extern const ddsrt_avl_treedef_t dds_topictree_def;
|
||||
extern const ddsrt_avl_treedef_t dds_entity_children_td;
|
||||
|
||||
typedef struct dds_subscriber
|
||||
{
|
||||
struct dds_entity m_entity;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_topic;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_participant;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_reader;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_writer;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_subscriber;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_publisher;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_readcondition;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_guardcondition;
|
||||
extern const struct dds_entity_deriver dds_entity_deriver_waitset;
|
||||
extern const struct dds_entity_deriver *dds_entity_deriver_table[];
|
||||
|
||||
dds_return_t dds_entity_deriver_dummy_close (struct dds_entity *e);
|
||||
dds_return_t dds_entity_deriver_dummy_delete (struct dds_entity *e);
|
||||
dds_return_t dds_entity_deriver_dummy_set_qos (struct dds_entity *e, const dds_qos_t *qos, bool enabled);
|
||||
dds_return_t dds_entity_deriver_dummy_validate_status (uint32_t mask);
|
||||
|
||||
inline dds_return_t dds_entity_deriver_close (struct dds_entity *e) {
|
||||
return dds_entity_deriver_table[e->m_kind]->close (e);
|
||||
}
|
||||
dds_subscriber;
|
||||
|
||||
typedef struct dds_publisher
|
||||
{
|
||||
struct dds_entity m_entity;
|
||||
inline dds_return_t dds_entity_deriver_delete (struct dds_entity *e) {
|
||||
return dds_entity_deriver_table[e->m_kind]->delete (e);
|
||||
}
|
||||
inline dds_return_t dds_entity_deriver_set_qos (struct dds_entity *e, const dds_qos_t *qos, bool enabled) {
|
||||
return dds_entity_deriver_table[e->m_kind]->set_qos (e, qos, enabled);
|
||||
}
|
||||
inline dds_return_t dds_entity_deriver_validate_status (struct dds_entity *e, uint32_t mask) {
|
||||
return dds_entity_deriver_table[e->m_kind]->validate_status (mask);
|
||||
}
|
||||
inline bool dds_entity_supports_set_qos (struct dds_entity *e) {
|
||||
return dds_entity_deriver_table[e->m_kind]->set_qos != dds_entity_deriver_dummy_set_qos;
|
||||
}
|
||||
inline bool dds_entity_supports_validate_status (struct dds_entity *e) {
|
||||
return dds_entity_deriver_table[e->m_kind]->validate_status != dds_entity_deriver_dummy_validate_status;
|
||||
}
|
||||
dds_publisher;
|
||||
|
||||
typedef struct dds_participant
|
||||
{
|
||||
typedef struct dds_subscriber {
|
||||
struct dds_entity m_entity;
|
||||
} dds_subscriber;
|
||||
|
||||
typedef struct dds_publisher {
|
||||
struct dds_entity m_entity;
|
||||
} dds_publisher;
|
||||
|
||||
typedef struct dds_participant {
|
||||
struct dds_entity m_entity;
|
||||
struct dds_entity * m_dur_reader;
|
||||
struct dds_entity * m_dur_writer;
|
||||
dds_entity_t m_builtin_subscriber;
|
||||
}
|
||||
dds_participant;
|
||||
} dds_participant;
|
||||
|
||||
typedef struct dds_reader
|
||||
{
|
||||
typedef struct dds_reader {
|
||||
struct dds_entity m_entity;
|
||||
const struct dds_topic *m_topic;
|
||||
struct reader *m_rd;
|
||||
|
@ -196,11 +224,9 @@ typedef struct dds_reader
|
|||
dds_requested_incompatible_qos_status_t m_requested_incompatible_qos_status;
|
||||
dds_sample_lost_status_t m_sample_lost_status;
|
||||
dds_subscription_matched_status_t m_subscription_matched_status;
|
||||
}
|
||||
dds_reader;
|
||||
} dds_reader;
|
||||
|
||||
typedef struct dds_writer
|
||||
{
|
||||
typedef struct dds_writer {
|
||||
struct dds_entity m_entity;
|
||||
const struct dds_topic *m_topic;
|
||||
struct nn_xpack *m_xp;
|
||||
|
@ -213,16 +239,14 @@ typedef struct dds_writer
|
|||
dds_offered_deadline_missed_status_t m_offered_deadline_missed_status;
|
||||
dds_offered_incompatible_qos_status_t m_offered_incompatible_qos_status;
|
||||
dds_publication_matched_status_t m_publication_matched_status;
|
||||
}
|
||||
dds_writer;
|
||||
} dds_writer;
|
||||
|
||||
#ifndef DDS_TOPIC_INTERN_FILTER_FN_DEFINED
|
||||
#define DDS_TOPIC_INTERN_FILTER_FN_DEFINED
|
||||
typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx);
|
||||
#endif
|
||||
|
||||
typedef struct dds_topic
|
||||
{
|
||||
typedef struct dds_topic {
|
||||
struct dds_entity m_entity;
|
||||
struct ddsi_sertopic *m_stopic;
|
||||
|
||||
|
@ -232,13 +256,11 @@ typedef struct dds_topic
|
|||
/* Status metrics */
|
||||
|
||||
dds_inconsistent_topic_status_t m_inconsistent_topic_status;
|
||||
}
|
||||
dds_topic;
|
||||
} dds_topic;
|
||||
|
||||
typedef uint32_t dds_querycond_mask_t;
|
||||
|
||||
typedef struct dds_readcond
|
||||
{
|
||||
typedef struct dds_readcond {
|
||||
dds_entity m_entity;
|
||||
struct rhc *m_rhc;
|
||||
uint32_t m_qminv;
|
||||
|
@ -247,40 +269,32 @@ typedef struct dds_readcond
|
|||
uint32_t m_instance_states;
|
||||
nn_guid_t m_rd_guid;
|
||||
struct dds_readcond *m_next;
|
||||
struct
|
||||
{
|
||||
struct {
|
||||
dds_querycondition_filter_fn m_filter;
|
||||
dds_querycond_mask_t m_qcmask; /* condition mask in RHC*/
|
||||
} m_query;
|
||||
}
|
||||
dds_readcond;
|
||||
} dds_readcond;
|
||||
|
||||
typedef struct dds_guardcond
|
||||
{
|
||||
typedef struct dds_guardcond {
|
||||
dds_entity m_entity;
|
||||
}
|
||||
dds_guardcond;
|
||||
} dds_guardcond;
|
||||
|
||||
typedef struct dds_attachment
|
||||
{
|
||||
typedef struct dds_attachment {
|
||||
dds_entity *entity;
|
||||
dds_entity_t handle;
|
||||
dds_attach_t arg;
|
||||
struct dds_attachment* next;
|
||||
}
|
||||
dds_attachment;
|
||||
} dds_attachment;
|
||||
|
||||
typedef struct dds_waitset
|
||||
{
|
||||
typedef struct dds_waitset {
|
||||
dds_entity m_entity;
|
||||
dds_attachment *observed;
|
||||
dds_attachment *triggered;
|
||||
}
|
||||
dds_waitset;
|
||||
size_t nentities; /* [m_entity.m_mutex] */
|
||||
size_t ntriggered; /* [m_entity.m_mutex] */
|
||||
dds_attachment *entities; /* [m_entity.m_mutex] 0 .. ntriggered are triggred, ntriggred .. nentities are not */
|
||||
} dds_waitset;
|
||||
|
||||
/* Globals */
|
||||
|
||||
typedef struct dds_globals
|
||||
{
|
||||
typedef struct dds_globals {
|
||||
dds_domainid_t m_default_domain;
|
||||
int32_t m_init_count;
|
||||
void (*m_dur_reader) (struct dds_reader * reader, struct rhc * rhc);
|
||||
|
@ -289,8 +303,7 @@ typedef struct dds_globals
|
|||
void (*m_dur_fini) (void);
|
||||
ddsrt_avl_tree_t m_domains;
|
||||
ddsrt_mutex_t m_mutex;
|
||||
}
|
||||
dds_globals;
|
||||
} dds_globals;
|
||||
|
||||
DDS_EXPORT extern dds_globals dds_global;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -21,6 +21,13 @@
|
|||
|
||||
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_guardcond)
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_guardcondition = {
|
||||
.close = dds_entity_deriver_dummy_close,
|
||||
.delete = dds_entity_deriver_dummy_delete,
|
||||
.set_qos = dds_entity_deriver_dummy_set_qos,
|
||||
.validate_status = dds_entity_deriver_dummy_validate_status
|
||||
};
|
||||
|
||||
dds_entity_t dds_create_guardcondition (dds_entity_t participant)
|
||||
{
|
||||
dds_participant *pp;
|
||||
|
@ -33,6 +40,7 @@ dds_entity_t dds_create_guardcondition (dds_entity_t participant)
|
|||
dds_guardcond *gcond = dds_alloc (sizeof (*gcond));
|
||||
dds_entity_t hdl = dds_entity_init (&gcond->m_entity, &pp->m_entity, DDS_KIND_COND_GUARD, NULL, NULL, 0);
|
||||
gcond->m_entity.m_iid = ddsi_iid_gen ();
|
||||
dds_entity_register_child (&pp->m_entity, &gcond->m_entity);
|
||||
dds_participant_unlock (pp);
|
||||
return hdl;
|
||||
}
|
||||
|
@ -47,12 +55,10 @@ dds_return_t dds_set_guardcondition (dds_entity_t condition, bool triggered)
|
|||
return rc;
|
||||
else
|
||||
{
|
||||
ddsrt_mutex_lock (&gcond->m_entity.m_observers_lock);
|
||||
if (triggered)
|
||||
dds_entity_status_set (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
|
||||
dds_entity_trigger_set (&gcond->m_entity, 1);
|
||||
else
|
||||
dds_entity_status_reset (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
|
||||
ddsrt_mutex_unlock (&gcond->m_entity.m_observers_lock);
|
||||
ddsrt_atomic_st32 (&gcond->m_entity.m_status.m_trigger, 0);
|
||||
dds_guardcond_unlock (gcond);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
@ -71,9 +77,7 @@ dds_return_t dds_read_guardcondition (dds_entity_t condition, bool *triggered)
|
|||
return rc;
|
||||
else
|
||||
{
|
||||
ddsrt_mutex_lock (&gcond->m_entity.m_observers_lock);
|
||||
*triggered = dds_entity_status_match (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
|
||||
ddsrt_mutex_unlock (&gcond->m_entity.m_observers_lock);
|
||||
*triggered = (ddsrt_atomic_ld32 (&gcond->m_entity.m_status.m_trigger) != 0);
|
||||
dds_guardcond_unlock (gcond);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
@ -92,10 +96,7 @@ dds_return_t dds_take_guardcondition (dds_entity_t condition, bool *triggered)
|
|||
return rc;
|
||||
else
|
||||
{
|
||||
ddsrt_mutex_lock (&gcond->m_entity.m_observers_lock);
|
||||
*triggered = dds_entity_status_match (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
|
||||
dds_entity_status_reset (&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
|
||||
ddsrt_mutex_unlock (&gcond->m_entity.m_observers_lock);
|
||||
*triggered = (ddsrt_atomic_and32_ov (&gcond->m_entity.m_status.m_trigger, 0) != 0);
|
||||
dds_guardcond_unlock (gcond);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,19 @@
|
|||
#define USE_CHH 0
|
||||
|
||||
#define HDL_FLAG_CLOSED (0x80000000u)
|
||||
#define HDL_COUNT_MASK (0x00ffffffu)
|
||||
|
||||
/* ref count: # outstanding references to this handle/object (not so sure it is
|
||||
ideal to have a one-to-one mapping between the two, but that is what the rest
|
||||
of the code assumes at the moment); so this limits one to having, e.g., no
|
||||
more than 64k endpoints referencing the same topic */
|
||||
#define HDL_REFCOUNT_MASK (0x0ffff000u)
|
||||
#define HDL_REFCOUNT_UNIT (0x00001000u)
|
||||
#define HDL_REFCOUNT_SHIFT 12
|
||||
|
||||
/* pin count: # concurrent operations, so allowing up to 4096 threads had better
|
||||
be enough ... */
|
||||
#define HDL_PINCOUNT_MASK (0x00000fffu)
|
||||
#define HDL_PINCOUNT_UNIT (0x00000001u)
|
||||
|
||||
/* Maximum number of handles is INT32_MAX - 1, but as the allocator relies on a
|
||||
random generator for finding a free one, the time spent in the dds_handle_create
|
||||
|
@ -105,7 +117,7 @@ static bool hhadd (struct ddsrt_hh *ht, void *elem) { return ddsrt_hh_add (ht, e
|
|||
#endif
|
||||
static dds_handle_t dds_handle_create_int (struct dds_handle_link *link)
|
||||
{
|
||||
ddsrt_atomic_st32 (&link->cnt_flags, 0);
|
||||
ddsrt_atomic_st32 (&link->cnt_flags, HDL_REFCOUNT_UNIT);
|
||||
do {
|
||||
do {
|
||||
link->hdl = (int32_t) (ddsrt_random () & INT32_MAX);
|
||||
|
@ -159,11 +171,12 @@ int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
|
|||
#endif
|
||||
assert (ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED);
|
||||
ddsrt_mutex_lock (&handles.lock);
|
||||
if ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_COUNT_MASK) != 0)
|
||||
if ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_PINCOUNT_MASK) != 0)
|
||||
{
|
||||
/* FIXME: */
|
||||
/* FIXME: there is no sensible solution when this times out, so it must
|
||||
never do that ... */
|
||||
const dds_time_t abstimeout = dds_time () + timeout;
|
||||
while ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_COUNT_MASK) != 0)
|
||||
while ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_PINCOUNT_MASK) != 0)
|
||||
{
|
||||
if (!ddsrt_cond_waituntil (&handles.cond, &handles.lock, abstimeout))
|
||||
{
|
||||
|
@ -188,7 +201,7 @@ int32_t dds_handle_delete (struct dds_handle_link *link, dds_duration_t timeout)
|
|||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
|
||||
int32_t dds_handle_pin (dds_handle_t hdl, struct dds_handle_link **link)
|
||||
{
|
||||
#if USE_CHH
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
|
@ -196,7 +209,7 @@ int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
|
|||
struct dds_handle_link dummy = { .hdl = hdl };
|
||||
int32_t rc;
|
||||
/* it makes sense to check here for initialization: the first thing any operation
|
||||
(other than create_participant) does is to call dds_handle_claim on the supplied
|
||||
(other than create_participant) does is to call dds_handle_pin on the supplied
|
||||
entity, so checking here whether the library has been initialised helps avoid
|
||||
crashes if someone forgets to create a participant (or allows a program to
|
||||
continue after failing to create one).
|
||||
|
@ -228,7 +241,7 @@ int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
|
|||
rc = DDS_RETCODE_BAD_PARAMETER;
|
||||
break;
|
||||
}
|
||||
} while (!ddsrt_atomic_cas32 (&(*link)->cnt_flags, cnt_flags, cnt_flags + 1));
|
||||
} while (!ddsrt_atomic_cas32 (&(*link)->cnt_flags, cnt_flags, cnt_flags + HDL_PINCOUNT_UNIT));
|
||||
}
|
||||
#if USE_CHH
|
||||
thread_state_asleep (ts1);
|
||||
|
@ -238,16 +251,18 @@ int32_t dds_handle_claim (dds_handle_t hdl, struct dds_handle_link **link)
|
|||
return rc;
|
||||
}
|
||||
|
||||
void dds_handle_claim_inc (struct dds_handle_link *link)
|
||||
void dds_handle_repin (struct dds_handle_link *link)
|
||||
{
|
||||
DDSRT_STATIC_ASSERT (HDL_PINCOUNT_UNIT == 1);
|
||||
uint32_t x = ddsrt_atomic_inc32_nv (&link->cnt_flags);
|
||||
assert (!(x & HDL_FLAG_CLOSED));
|
||||
(void) x;
|
||||
}
|
||||
|
||||
void dds_handle_release (struct dds_handle_link *link)
|
||||
void dds_handle_unpin (struct dds_handle_link *link)
|
||||
{
|
||||
if (ddsrt_atomic_dec32_ov (&link->cnt_flags) == (HDL_FLAG_CLOSED | 1))
|
||||
DDSRT_STATIC_ASSERT (HDL_PINCOUNT_UNIT == 1);
|
||||
if ((ddsrt_atomic_dec32_ov (&link->cnt_flags) & (HDL_FLAG_CLOSED | HDL_PINCOUNT_MASK)) == (HDL_FLAG_CLOSED | HDL_PINCOUNT_UNIT))
|
||||
{
|
||||
ddsrt_mutex_lock (&handles.lock);
|
||||
ddsrt_cond_broadcast (&handles.cond);
|
||||
|
@ -255,6 +270,25 @@ void dds_handle_release (struct dds_handle_link *link)
|
|||
}
|
||||
}
|
||||
|
||||
void dds_handle_add_ref (struct dds_handle_link *link)
|
||||
{
|
||||
ddsrt_atomic_add32 (&link->cnt_flags, HDL_REFCOUNT_UNIT);
|
||||
}
|
||||
|
||||
bool dds_handle_drop_ref (struct dds_handle_link *link)
|
||||
{
|
||||
assert ((ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_REFCOUNT_MASK) != 0);
|
||||
uint32_t old, new;
|
||||
do {
|
||||
old = ddsrt_atomic_ld32 (&link->cnt_flags);
|
||||
if ((old & HDL_REFCOUNT_MASK) != HDL_REFCOUNT_UNIT)
|
||||
new = old - HDL_REFCOUNT_UNIT;
|
||||
else
|
||||
new = (old - HDL_REFCOUNT_UNIT) | HDL_FLAG_CLOSED;
|
||||
} while (!ddsrt_atomic_cas32 (&link->cnt_flags, old, new));
|
||||
return (new & HDL_REFCOUNT_MASK) == 0;
|
||||
}
|
||||
|
||||
bool dds_handle_is_closed (struct dds_handle_link *link)
|
||||
{
|
||||
return (ddsrt_atomic_ld32 (&link->cnt_flags) & HDL_FLAG_CLOSED) != 0;
|
||||
|
|
|
@ -62,46 +62,6 @@ static void dds_instance_remove (const dds_topic *topic, const void *data, dds_i
|
|||
}
|
||||
}
|
||||
|
||||
static const dds_topic *dds_instance_info (dds_entity *e)
|
||||
{
|
||||
const dds_topic *topic;
|
||||
switch (dds_entity_kind (e))
|
||||
{
|
||||
case DDS_KIND_READER:
|
||||
topic = ((dds_reader*) e)->m_topic;
|
||||
break;
|
||||
case DDS_KIND_WRITER:
|
||||
topic = ((dds_writer*) e)->m_topic;
|
||||
break;
|
||||
default:
|
||||
assert (0);
|
||||
topic = NULL;
|
||||
}
|
||||
return topic;
|
||||
}
|
||||
|
||||
static const dds_topic *dds_instance_info_by_hdl (dds_entity_t e)
|
||||
{
|
||||
const dds_topic *topic;
|
||||
dds_entity *w_or_r;
|
||||
|
||||
if (dds_entity_lock (e, DDS_KIND_DONTCARE, &w_or_r) != DDS_RETCODE_OK)
|
||||
return NULL;
|
||||
|
||||
switch (dds_entity_kind (w_or_r))
|
||||
{
|
||||
case DDS_KIND_WRITER:
|
||||
case DDS_KIND_READER:
|
||||
topic = dds_instance_info (w_or_r);
|
||||
break;
|
||||
default:
|
||||
topic = NULL;
|
||||
break;
|
||||
}
|
||||
dds_entity_unlock (w_or_r);
|
||||
return topic;
|
||||
}
|
||||
|
||||
dds_return_t dds_register_instance (dds_entity_t writer, dds_instance_handle_t *handle, const void *data)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
|
@ -284,18 +244,32 @@ dds_instance_handle_t dds_lookup_instance (dds_entity_t entity, const void *data
|
|||
dds_instance_handle_t ih = DDS_HANDLE_NIL;
|
||||
const dds_topic *topic;
|
||||
struct ddsi_serdata *sd;
|
||||
dds_entity *w_or_r;
|
||||
|
||||
if (data == NULL)
|
||||
return DDS_HANDLE_NIL;
|
||||
|
||||
if ((topic = dds_instance_info_by_hdl (entity)) == NULL)
|
||||
if (dds_entity_lock (entity, DDS_KIND_DONTCARE, &w_or_r) < 0)
|
||||
return DDS_HANDLE_NIL;
|
||||
switch (dds_entity_kind (w_or_r))
|
||||
{
|
||||
case DDS_KIND_WRITER:
|
||||
topic = ((dds_writer *) w_or_r)->m_topic;
|
||||
break;
|
||||
case DDS_KIND_READER:
|
||||
topic = ((dds_reader *) w_or_r)->m_topic;
|
||||
break;
|
||||
default:
|
||||
dds_entity_unlock (w_or_r);
|
||||
return DDS_HANDLE_NIL;
|
||||
}
|
||||
|
||||
thread_state_awake (ts1);
|
||||
sd = ddsi_serdata_from_sample (topic->m_stopic, SDK_KEY, data);
|
||||
ih = ddsi_tkmap_lookup (gv.m_tkmap, sd);
|
||||
ddsi_serdata_unref (sd);
|
||||
thread_state_asleep (ts1);
|
||||
dds_entity_unlock (w_or_r);
|
||||
return ih;
|
||||
}
|
||||
|
||||
|
@ -310,12 +284,25 @@ dds_return_t dds_instance_get_key (dds_entity_t entity, dds_instance_handle_t ih
|
|||
dds_return_t ret;
|
||||
const dds_topic *topic;
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
dds_entity *w_or_r;
|
||||
|
||||
if (data == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
if ((topic = dds_instance_info_by_hdl (entity)) == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
if ((ret = dds_entity_lock (entity, DDS_KIND_DONTCARE, &w_or_r)) < 0)
|
||||
return ret;
|
||||
switch (dds_entity_kind (w_or_r))
|
||||
{
|
||||
case DDS_KIND_WRITER:
|
||||
topic = ((dds_writer *) w_or_r)->m_topic;
|
||||
break;
|
||||
case DDS_KIND_READER:
|
||||
topic = ((dds_reader *) w_or_r)->m_topic;
|
||||
break;
|
||||
default:
|
||||
dds_entity_unlock (w_or_r);
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
|
||||
thread_state_awake (ts1);
|
||||
if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, ih)) == NULL)
|
||||
|
@ -328,5 +315,6 @@ dds_return_t dds_instance_get_key (dds_entity_t entity, dds_instance_handle_t ih
|
|||
ret = DDS_RETCODE_OK;
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
dds_entity_unlock (w_or_r);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -84,6 +84,13 @@ static dds_return_t dds_participant_qos_set (dds_entity *e, const dds_qos_t *qos
|
|||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_participant = {
|
||||
.close = dds_entity_deriver_dummy_close,
|
||||
.delete = dds_participant_delete,
|
||||
.set_qos = dds_participant_qos_set,
|
||||
.validate_status = dds_participant_status_validate
|
||||
};
|
||||
|
||||
dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_t *qos, const dds_listener_t *listener)
|
||||
{
|
||||
dds_entity_t ret;
|
||||
|
@ -128,10 +135,6 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
|
|||
pp->m_entity.m_guid = guid;
|
||||
pp->m_entity.m_iid = get_entity_instance_id (&guid);
|
||||
pp->m_entity.m_domain = dds_domain_create (dds_domain_default ());
|
||||
pp->m_entity.m_domainid = dds_domain_default ();
|
||||
pp->m_entity.m_deriver.delete = dds_participant_delete;
|
||||
pp->m_entity.m_deriver.set_qos = dds_participant_qos_set;
|
||||
pp->m_entity.m_deriver.validate_status = dds_participant_status_validate;
|
||||
pp->m_builtin_subscriber = 0;
|
||||
|
||||
/* Add participant to extent */
|
||||
|
@ -175,7 +178,7 @@ dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *par
|
|||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
for (dds_entity *iter = dds_pp_head; iter; iter = iter->m_next)
|
||||
{
|
||||
if (iter->m_domainid == domain_id)
|
||||
if (iter->m_domain->m_id == domain_id)
|
||||
{
|
||||
if ((size_t) ret < size)
|
||||
participants[ret] = iter->m_hdllink.hdl;
|
||||
|
|
|
@ -36,6 +36,13 @@ static dds_return_t dds_publisher_status_validate (uint32_t mask)
|
|||
return (mask & ~DDS_PUBLISHER_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_publisher = {
|
||||
.close = dds_entity_deriver_dummy_close,
|
||||
.delete = dds_entity_deriver_dummy_delete,
|
||||
.set_qos = dds_publisher_qos_set,
|
||||
.validate_status = dds_publisher_status_validate
|
||||
};
|
||||
|
||||
dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qos, const dds_listener_t *listener)
|
||||
{
|
||||
dds_participant *par;
|
||||
|
@ -62,8 +69,7 @@ dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qo
|
|||
pub = dds_alloc (sizeof (*pub));
|
||||
hdl = dds_entity_init (&pub->m_entity, &par->m_entity, DDS_KIND_PUBLISHER, new_qos, listener, DDS_PUBLISHER_STATUS_MASK);
|
||||
pub->m_entity.m_iid = ddsi_iid_gen ();
|
||||
pub->m_entity.m_deriver.set_qos = dds_publisher_qos_set;
|
||||
pub->m_entity.m_deriver.validate_status = dds_publisher_status_validate;
|
||||
dds_entity_register_child (&par->m_entity, &pub->m_entity);
|
||||
dds_participant_unlock (par);
|
||||
return hdl;
|
||||
}
|
||||
|
|
|
@ -33,15 +33,8 @@ dds_entity_t dds_create_querycondition (dds_entity_t reader, uint32_t mask, dds_
|
|||
dds_entity_t hdl;
|
||||
dds_readcond *cond = dds_create_readcond (r, DDS_KIND_COND_QUERY, mask, filter);
|
||||
assert (cond);
|
||||
const bool success = (cond->m_entity.m_deriver.delete != 0);
|
||||
dds_reader_unlock (r);
|
||||
if (success)
|
||||
hdl = cond->m_entity.m_hdllink.hdl;
|
||||
else
|
||||
{
|
||||
dds_delete (cond->m_entity.m_hdllink.hdl);
|
||||
hdl = DDS_RETCODE_OUT_OF_RESOURCES;
|
||||
}
|
||||
return hdl;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -134,12 +134,11 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
|
|||
|
||||
/* read/take resets data available status -- must reset before reading because
|
||||
the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */
|
||||
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
|
||||
dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
|
||||
|
||||
/* reset DATA_ON_READERS status on subscriber after successful read/take */
|
||||
if (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER)
|
||||
assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER);
|
||||
dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
|
||||
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
|
||||
|
||||
if (take)
|
||||
ret = dds_rhc_take (rd->m_rd->rhc, lock, buf, si, maxs, mask, hand, cond);
|
||||
|
@ -192,12 +191,11 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
|
|||
{
|
||||
/* read/take resets data available status -- must reset before reading because
|
||||
the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */
|
||||
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
|
||||
dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
|
||||
|
||||
/* reset DATA_ON_READERS status on subscriber after successful read/take */
|
||||
if (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER)
|
||||
assert (dds_entity_kind (rd->m_entity.m_parent) == DDS_KIND_SUBSCRIBER);
|
||||
dds_entity_status_reset (rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
|
||||
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
|
||||
|
||||
ret = dds_rhc_takecdr (rd->m_rd->rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand);
|
||||
dds_read_unlock (rd, cond);
|
||||
|
|
|
@ -27,13 +27,20 @@ static dds_return_t dds_readcond_delete (dds_entity *e)
|
|||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_readcondition = {
|
||||
.close = dds_entity_deriver_dummy_close,
|
||||
.delete = dds_readcond_delete,
|
||||
.set_qos = dds_entity_deriver_dummy_set_qos,
|
||||
.validate_status = dds_entity_deriver_dummy_validate_status
|
||||
};
|
||||
|
||||
dds_readcond *dds_create_readcond (dds_reader *rd, dds_entity_kind_t kind, uint32_t mask, dds_querycondition_filter_fn filter)
|
||||
{
|
||||
dds_readcond *cond = dds_alloc (sizeof (*cond));
|
||||
assert ((kind == DDS_KIND_COND_READ && filter == 0) || (kind == DDS_KIND_COND_QUERY && filter != 0));
|
||||
(void) dds_entity_init (&cond->m_entity, &rd->m_entity, kind, NULL, NULL, 0);
|
||||
cond->m_entity.m_iid = ddsi_iid_gen ();
|
||||
cond->m_entity.m_deriver.delete = dds_readcond_delete;
|
||||
dds_entity_register_child (&rd->m_entity, &cond->m_entity);
|
||||
cond->m_rhc = rd->m_rd->rhc;
|
||||
cond->m_sample_states = mask & DDS_ANY_SAMPLE_STATE;
|
||||
cond->m_view_states = mask & DDS_ANY_VIEW_STATE;
|
||||
|
@ -47,9 +54,8 @@ dds_readcond *dds_create_readcond (dds_reader *rd, dds_entity_kind_t kind, uint3
|
|||
if (!dds_rhc_add_readcondition (cond))
|
||||
{
|
||||
/* FIXME: current entity management code can't deal with an error late in the creation of the
|
||||
entity because it doesn't allow deleting it again ... instead use a hack to signal a problem
|
||||
to the caller and let that one handle it. */
|
||||
cond->m_entity.m_deriver.delete = 0;
|
||||
entity because it doesn't allow deleting it again ... */
|
||||
abort();
|
||||
}
|
||||
return cond;
|
||||
}
|
||||
|
@ -65,7 +71,6 @@ dds_entity_t dds_create_readcondition (dds_entity_t reader, uint32_t mask)
|
|||
dds_entity_t hdl;
|
||||
dds_readcond *cond = dds_create_readcond(rd, DDS_KIND_COND_READ, mask, 0);
|
||||
assert (cond);
|
||||
assert (cond->m_entity.m_deriver.delete);
|
||||
hdl = cond->m_entity.m_hdllink.hdl;
|
||||
dds_reader_unlock (rd);
|
||||
return hdl;
|
||||
|
@ -76,7 +81,7 @@ dds_entity_t dds_get_datareader (dds_entity_t condition)
|
|||
{
|
||||
struct dds_entity *e;
|
||||
dds_return_t rc;
|
||||
if ((rc = dds_entity_claim (condition, &e)) != DDS_RETCODE_OK)
|
||||
if ((rc = dds_entity_pin (condition, &e)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
else
|
||||
{
|
||||
|
@ -92,7 +97,7 @@ dds_entity_t dds_get_datareader (dds_entity_t condition)
|
|||
rdh = DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
break;
|
||||
}
|
||||
dds_entity_release (e);
|
||||
dds_entity_unpin (e);
|
||||
return rdh;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,13 +96,11 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
|
|||
overhead really matters. Otherwise, it is pretty much like
|
||||
dds_reader_status_cb. */
|
||||
|
||||
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
|
||||
if (!(rd->m_entity.m_status_enable & DDS_DATA_AVAILABLE_STATUS))
|
||||
{
|
||||
ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock);
|
||||
const bool data_av_enabled = (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & (DDS_DATA_AVAILABLE_STATUS << SAM_ENABLED_SHIFT));
|
||||
if (!data_av_enabled)
|
||||
return;
|
||||
}
|
||||
|
||||
ddsrt_mutex_lock (&rd->m_entity.m_observers_lock);
|
||||
while (rd->m_entity.m_cb_count > 0)
|
||||
ddsrt_cond_wait (&rd->m_entity.m_observers_cond, &rd->m_entity.m_observers_lock);
|
||||
rd->m_entity.m_cb_count++;
|
||||
|
@ -136,7 +134,6 @@ void dds_reader_data_available_cb (struct dds_reader *rd)
|
|||
else
|
||||
{
|
||||
dds_entity_status_set (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
|
||||
|
||||
ddsrt_mutex_lock (&sub->m_observers_lock);
|
||||
dds_entity_status_set (sub, DDS_DATA_ON_READERS_STATUS);
|
||||
ddsrt_mutex_unlock (&sub->m_observers_lock);
|
||||
|
@ -156,7 +153,7 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
{
|
||||
/* Release the initial claim that was done during the create. This
|
||||
* will indicate that further API deletion is now possible. */
|
||||
dds_handle_release (&entity->m_hdllink);
|
||||
dds_handle_unpin (&entity->m_hdllink);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -278,7 +275,7 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
}
|
||||
else
|
||||
{
|
||||
dds_entity_status_set (entity, 1u << status_id);
|
||||
dds_entity_status_set (entity, (status_mask_t) (1u << status_id));
|
||||
}
|
||||
|
||||
entity->m_cb_count--;
|
||||
|
@ -286,6 +283,13 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
ddsrt_mutex_unlock (&entity->m_observers_lock);
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_reader = {
|
||||
.close = dds_reader_close,
|
||||
.delete = dds_reader_delete,
|
||||
.set_qos = dds_reader_qos_set,
|
||||
.validate_status = dds_reader_status_validate
|
||||
};
|
||||
|
||||
dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener)
|
||||
{
|
||||
dds_qos_t *rqos;
|
||||
|
@ -312,13 +316,13 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
|
|||
|
||||
default: {
|
||||
dds_entity *p_or_s;
|
||||
if ((ret = dds_entity_claim (participant_or_subscriber, &p_or_s)) != DDS_RETCODE_OK)
|
||||
if ((ret = dds_entity_pin (participant_or_subscriber, &p_or_s)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
if (dds_entity_kind (p_or_s) == DDS_KIND_PARTICIPANT)
|
||||
subscriber = dds_create_subscriber (participant_or_subscriber, qos, NULL);
|
||||
else
|
||||
subscriber = participant_or_subscriber;
|
||||
dds_entity_release (p_or_s);
|
||||
dds_entity_unpin (p_or_s);
|
||||
internal_topic = false;
|
||||
t = topic;
|
||||
break;
|
||||
|
@ -379,15 +383,11 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
|
|||
rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED;
|
||||
rd->m_topic = tp;
|
||||
rhc = dds_rhc_new (rd, tp->m_stopic);
|
||||
dds_entity_add_ref_nolock (&tp->m_entity);
|
||||
rd->m_entity.m_deriver.close = dds_reader_close;
|
||||
rd->m_entity.m_deriver.delete = dds_reader_delete;
|
||||
rd->m_entity.m_deriver.set_qos = dds_reader_qos_set;
|
||||
rd->m_entity.m_deriver.validate_status = dds_reader_status_validate;
|
||||
dds_entity_add_ref_locked (&tp->m_entity);
|
||||
|
||||
/* Extra claim of this reader to make sure that the delete waits until DDSI
|
||||
has deleted its reader as well. This can be known through the callback. */
|
||||
dds_handle_claim_inc (&rd->m_entity.m_hdllink);
|
||||
dds_handle_repin (&rd->m_entity.m_hdllink);
|
||||
|
||||
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
|
||||
ddsrt_mutex_unlock (&sub->m_entity.m_mutex);
|
||||
|
@ -398,7 +398,9 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
|
|||
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
|
||||
assert (ret == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
rd->m_entity.m_iid = get_entity_instance_id (&rd->m_entity.m_guid);
|
||||
dds_entity_register_child (&sub->m_entity, &rd->m_entity);
|
||||
|
||||
/* For persistent data register reader with durability */
|
||||
if (dds_global.m_dur_reader && (rd->m_entity.m_qos->durability.kind > DDS_DURABILITY_TRANSIENT_LOCAL)) {
|
||||
|
@ -429,11 +431,11 @@ err_sub_lock:
|
|||
void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb, void *cbarg)
|
||||
{
|
||||
dds_entity *dds_entity;
|
||||
if (dds_entity_claim (entity, &dds_entity) != DDS_RETCODE_OK)
|
||||
if (dds_entity_pin (entity, &dds_entity) != DDS_RETCODE_OK)
|
||||
return;
|
||||
if (dds_entity_kind (dds_entity) != DDS_KIND_READER)
|
||||
{
|
||||
dds_entity_release (dds_entity);
|
||||
dds_entity_unpin (dds_entity);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -473,7 +475,7 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb
|
|||
ddsrt_mutex_lock (&rd->e.lock);
|
||||
}
|
||||
ddsrt_mutex_unlock (&rd->e.lock);
|
||||
dds_entity_release (dds_entity);
|
||||
dds_entity_unpin (dds_entity);
|
||||
}
|
||||
|
||||
uint32_t dds_reader_lock_samples (dds_entity_t reader)
|
||||
|
@ -513,7 +515,7 @@ dds_entity_t dds_get_subscriber (dds_entity_t entity)
|
|||
{
|
||||
dds_entity *e;
|
||||
dds_return_t ret;
|
||||
if ((ret = dds_entity_claim (entity, &e)) != DDS_RETCODE_OK)
|
||||
if ((ret = dds_entity_pin (entity, &e)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
else
|
||||
{
|
||||
|
@ -534,7 +536,7 @@ dds_entity_t dds_get_subscriber (dds_entity_t entity)
|
|||
subh = DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
break;
|
||||
}
|
||||
dds_entity_release (e);
|
||||
dds_entity_unpin (e);
|
||||
return subh;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,6 +153,7 @@
|
|||
the tkmap data. */
|
||||
|
||||
#define MAX_ATTACHED_QUERYCONDS (CHAR_BIT * sizeof (dds_querycond_mask_t))
|
||||
#define MAX_FAST_TRIGGERS 32
|
||||
|
||||
#define INCLUDE_TRACE 1
|
||||
#if INCLUDE_TRACE
|
||||
|
@ -381,7 +382,7 @@ static void topicless_to_clean_invsample (const struct ddsi_sertopic *topic, con
|
|||
}
|
||||
|
||||
static unsigned qmask_of_inst (const struct rhc_instance *inst);
|
||||
static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst);
|
||||
static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst, struct dds_entity *triggers[], size_t *ntriggers);
|
||||
#ifndef NDEBUG
|
||||
static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds, bool check_qcmask);
|
||||
#endif
|
||||
|
@ -769,16 +770,19 @@ static bool add_sample (struct rhc *rhc, struct rhc_instance *inst, const struct
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool content_filter_accepts (const struct ddsi_sertopic *sertopic, const struct ddsi_serdata *sample)
|
||||
static bool content_filter_accepts (const dds_reader *reader, const struct ddsi_serdata *sample)
|
||||
{
|
||||
bool ret = true;
|
||||
const struct dds_topic *tp = sertopic->status_cb_entity;
|
||||
if (reader)
|
||||
{
|
||||
const struct dds_topic *tp = reader->m_topic;
|
||||
if (tp->filter_fn)
|
||||
{
|
||||
char *tmp = ddsi_sertopic_alloc_sample (sertopic);
|
||||
char *tmp = ddsi_sertopic_alloc_sample (tp->m_stopic);
|
||||
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
|
||||
ret = (tp->filter_fn) (tmp, tp->filter_ctx);
|
||||
ddsi_sertopic_free_sample (sertopic, tmp, DDS_FREE_ALL);
|
||||
ddsi_sertopic_free_sample (tp->m_stopic, tmp, DDS_FREE_ALL);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -822,7 +826,7 @@ static int inst_accepts_sample (const struct rhc *rhc, const struct rhc_instance
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
if (has_data && !content_filter_accepts (rhc->topic, sample))
|
||||
if (has_data && !content_filter_accepts (rhc->reader, sample))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
@ -1166,7 +1170,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
|
|||
attribute (rather than a key), an empty instance should be
|
||||
instantiated. */
|
||||
|
||||
if (has_data && !content_filter_accepts (rhc->topic, sample))
|
||||
if (has_data && !content_filter_accepts (rhc->reader, sample))
|
||||
{
|
||||
return RHC_FILTERED;
|
||||
}
|
||||
|
@ -1225,7 +1229,6 @@ bool dds_rhc_store (struct rhc * __restrict rhc, const struct proxy_writer_info
|
|||
struct trigger_info_pre pre;
|
||||
struct trigger_info_post post;
|
||||
struct trigger_info_qcond trig_qc;
|
||||
bool trigger_waitsets;
|
||||
rhc_store_result_t stored;
|
||||
status_cb_data_t cb_data; /* Callback data for reader status callback */
|
||||
bool delivered = true;
|
||||
|
@ -1440,7 +1443,10 @@ bool dds_rhc_store (struct rhc * __restrict rhc, const struct proxy_writer_info
|
|||
|
||||
TRACE (")\n");
|
||||
|
||||
trigger_waitsets = trigger_info_differs (rhc, &pre, &post, &trig_qc) && update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst);
|
||||
dds_entity *triggers[MAX_FAST_TRIGGERS];
|
||||
size_t ntriggers = 0;
|
||||
if (trigger_info_differs (rhc, &pre, &post, &trig_qc))
|
||||
update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, triggers, &ntriggers);
|
||||
|
||||
assert (rhc_check_counts_locked (rhc, true, true));
|
||||
|
||||
|
@ -1450,8 +1456,8 @@ bool dds_rhc_store (struct rhc * __restrict rhc, const struct proxy_writer_info
|
|||
{
|
||||
if (notify_data_available)
|
||||
dds_reader_data_available_cb (rhc->reader);
|
||||
if (trigger_waitsets)
|
||||
dds_entity_status_signal (&rhc->reader->m_entity);
|
||||
for (size_t i = 0; i < ntriggers; i++)
|
||||
dds_entity_status_signal (triggers[i], 0);
|
||||
}
|
||||
|
||||
return delivered;
|
||||
|
@ -1490,13 +1496,14 @@ void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writ
|
|||
need to get two IIDs: the one visible to the application in the
|
||||
built-in topics and in get_instance_handle, and one used internally
|
||||
for tracking registrations and unregistrations. */
|
||||
bool trigger_waitsets = false;
|
||||
bool notify_data_available = false;
|
||||
struct rhc_instance *inst;
|
||||
struct ddsrt_hh_iter iter;
|
||||
const uint64_t wr_iid = pwr_info->iid;
|
||||
const int auto_dispose = pwr_info->auto_dispose;
|
||||
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
|
||||
ddsrt_mutex_lock (&rhc->lock);
|
||||
TRACE ("rhc_unregister_wr_iid(%"PRIx64",%d:\n", wr_iid, auto_dispose);
|
||||
for (inst = ddsrt_hh_iter_first (rhc->instances, &iter); inst; inst = ddsrt_hh_iter_next (&iter))
|
||||
|
@ -1538,8 +1545,8 @@ void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writ
|
|||
TRACE ("\n");
|
||||
|
||||
notify_data_available = true;
|
||||
if (trigger_info_differs (rhc, &pre, &post, &trig_qc) && update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst))
|
||||
trigger_waitsets = true;
|
||||
if (trigger_info_differs (rhc, &pre, &post, &trig_qc))
|
||||
update_conditions_locked (rhc, true, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
assert (rhc_check_counts_locked (rhc, true, false));
|
||||
}
|
||||
}
|
||||
|
@ -1551,8 +1558,6 @@ void dds_rhc_unregister_wr (struct rhc * __restrict rhc, const struct proxy_writ
|
|||
{
|
||||
if (notify_data_available)
|
||||
dds_reader_data_available_cb (rhc->reader);
|
||||
if (trigger_waitsets)
|
||||
dds_entity_status_signal (&rhc->reader->m_entity);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1726,10 +1731,11 @@ static bool read_sample_update_conditions (struct rhc *rhc, struct trigger_info_
|
|||
trig_qc->dec_sample_read = sample_wasread;
|
||||
trig_qc->inc_sample_read = true;
|
||||
get_trigger_info_cmn (&post->c, inst);
|
||||
const bool trigger_waitsets = update_conditions_locked (rhc, false, pre, post, trig_qc, inst);
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
update_conditions_locked (rhc, false, pre, post, trig_qc, inst, NULL, &ntriggers);
|
||||
trig_qc->dec_conds_sample = trig_qc->inc_conds_sample = 0;
|
||||
pre->c = post->c;
|
||||
return trigger_waitsets;
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool take_sample_update_conditions (struct rhc *rhc, struct trigger_info_pre *pre, struct trigger_info_post *post, struct trigger_info_qcond *trig_qc, struct rhc_instance *inst, dds_querycond_mask_t conds, bool sample_wasread)
|
||||
|
@ -1742,15 +1748,15 @@ static bool take_sample_update_conditions (struct rhc *rhc, struct trigger_info_
|
|||
trig_qc->dec_conds_sample = conds;
|
||||
trig_qc->dec_sample_read = sample_wasread;
|
||||
get_trigger_info_cmn (&post->c, inst);
|
||||
const bool trigger_waitsets = update_conditions_locked (rhc, false, pre, post, trig_qc, inst);
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
update_conditions_locked (rhc, false, pre, post, trig_qc, inst, NULL, &ntriggers);
|
||||
trig_qc->dec_conds_sample = 0;
|
||||
pre->c = post->c;
|
||||
return trigger_waitsets;
|
||||
return false;
|
||||
}
|
||||
|
||||
static int dds_rhc_read_w_qminv (struct rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
bool trigger_waitsets = false;
|
||||
uint32_t n = 0;
|
||||
|
||||
if (lock)
|
||||
|
@ -1797,8 +1803,7 @@ static int dds_rhc_read_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
if (!sample->isread)
|
||||
{
|
||||
TRACE ("s");
|
||||
if (read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, false))
|
||||
trigger_waitsets = true;
|
||||
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, false);
|
||||
sample->isread = true;
|
||||
inst->nvread++;
|
||||
rhc->n_vread++;
|
||||
|
@ -1820,8 +1825,7 @@ static int dds_rhc_read_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
if (!inst->inv_isread)
|
||||
{
|
||||
TRACE ("i");
|
||||
if (read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, false))
|
||||
trigger_waitsets = true;
|
||||
read_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, false);
|
||||
inst->inv_isread = 1;
|
||||
rhc->n_invread++;
|
||||
}
|
||||
|
@ -1837,13 +1841,13 @@ static int dds_rhc_read_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
}
|
||||
if (nread != inst_nread (inst) || inst_became_old)
|
||||
{
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
get_trigger_info_cmn (&post.c, inst);
|
||||
assert (trig_qc.dec_conds_invsample == 0);
|
||||
assert (trig_qc.dec_conds_sample == 0);
|
||||
assert (trig_qc.inc_conds_invsample == 0);
|
||||
assert (trig_qc.inc_conds_sample == 0);
|
||||
if (update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst))
|
||||
trigger_waitsets = true;
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
}
|
||||
|
||||
if (n > n_first) {
|
||||
|
@ -1862,19 +1866,15 @@ static int dds_rhc_read_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
TRACE ("read: returning %"PRIu32"\n", n);
|
||||
assert (rhc_check_counts_locked (rhc, true, false));
|
||||
ddsrt_mutex_unlock (&rhc->lock);
|
||||
|
||||
if (trigger_waitsets)
|
||||
dds_entity_status_signal (&rhc->reader->m_entity);
|
||||
|
||||
assert (n <= INT_MAX);
|
||||
return (int)n;
|
||||
}
|
||||
|
||||
static int dds_rhc_take_w_qminv (struct rhc *rhc, bool lock, void **values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
bool trigger_waitsets = false;
|
||||
uint64_t iid;
|
||||
uint32_t n = 0;
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
|
||||
if (lock)
|
||||
{
|
||||
|
@ -1923,8 +1923,7 @@ static int dds_rhc_take_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
}
|
||||
else
|
||||
{
|
||||
if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread))
|
||||
trigger_waitsets = true;
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread);
|
||||
|
||||
set_sample_info (info_seq + n, inst, sample);
|
||||
ddsi_serdata_to_sample (sample->sample, values[n], 0, 0);
|
||||
|
@ -1963,8 +1962,7 @@ static int dds_rhc_take_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
#ifndef NDEBUG
|
||||
init_trigger_info_qcond (&dummy_trig_qc);
|
||||
#endif
|
||||
if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread))
|
||||
trigger_waitsets = true;
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread);
|
||||
set_sample_info_invsample (info_seq + n, inst);
|
||||
topicless_to_clean_invsample (rhc->topic, inst->tk->m_sample, values[n], 0, 0);
|
||||
inst_clear_invsample (rhc, inst, &dummy_trig_qc);
|
||||
|
@ -1986,8 +1984,7 @@ static int dds_rhc_take_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
assert (trig_qc.dec_conds_sample == 0);
|
||||
assert (trig_qc.inc_conds_invsample == 0);
|
||||
assert (trig_qc.inc_conds_sample == 0);
|
||||
if (update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst))
|
||||
trigger_waitsets = true;
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
}
|
||||
|
||||
if (inst_is_empty (inst))
|
||||
|
@ -2024,17 +2021,12 @@ static int dds_rhc_take_w_qminv (struct rhc *rhc, bool lock, void **values, dds_
|
|||
TRACE ("take: returning %"PRIu32"\n", n);
|
||||
assert (rhc_check_counts_locked (rhc, true, false));
|
||||
ddsrt_mutex_unlock (&rhc->lock);
|
||||
|
||||
if (trigger_waitsets)
|
||||
dds_entity_status_signal(&rhc->reader->m_entity);
|
||||
|
||||
assert (n <= INT_MAX);
|
||||
return (int)n;
|
||||
}
|
||||
|
||||
static int dds_rhc_takecdr_w_qminv (struct rhc *rhc, bool lock, struct ddsi_serdata ** values, dds_sample_info_t *info_seq, uint32_t max_samples, unsigned qminv, dds_instance_handle_t handle, dds_readcond *cond)
|
||||
{
|
||||
bool trigger_waitsets = false;
|
||||
uint64_t iid;
|
||||
uint32_t n = 0;
|
||||
(void)cond;
|
||||
|
@ -2085,9 +2077,7 @@ static int dds_rhc_takecdr_w_qminv (struct rhc *rhc, bool lock, struct ddsi_serd
|
|||
}
|
||||
else
|
||||
{
|
||||
if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread))
|
||||
trigger_waitsets = true;
|
||||
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, sample->conds, sample->isread);
|
||||
set_sample_info (info_seq + n, inst, sample);
|
||||
values[n] = ddsi_serdata_ref(sample->sample);
|
||||
rhc->n_vsamples--;
|
||||
|
@ -2119,8 +2109,7 @@ static int dds_rhc_takecdr_w_qminv (struct rhc *rhc, bool lock, struct ddsi_serd
|
|||
#ifndef NDEBUG
|
||||
init_trigger_info_qcond (&dummy_trig_qc);
|
||||
#endif
|
||||
if (take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread))
|
||||
trigger_waitsets = true;
|
||||
take_sample_update_conditions (rhc, &pre, &post, &trig_qc, inst, inst->conds, inst->inv_isread);
|
||||
set_sample_info_invsample (info_seq + n, inst);
|
||||
values[n] = ddsi_serdata_ref(inst->tk->m_sample);
|
||||
inst_clear_invsample (rhc, inst, &dummy_trig_qc);
|
||||
|
@ -2137,9 +2126,9 @@ static int dds_rhc_takecdr_w_qminv (struct rhc *rhc, bool lock, struct ddsi_serd
|
|||
{
|
||||
/* if nsamples = 0, it won't match anything, so no need to do
|
||||
anything here for drop_instance_noupdate_no_writers */
|
||||
size_t ntriggers = SIZE_MAX;
|
||||
get_trigger_info_cmn (&post.c, inst);
|
||||
if (update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst))
|
||||
trigger_waitsets = true;
|
||||
update_conditions_locked (rhc, false, &pre, &post, &trig_qc, inst, NULL, &ntriggers);
|
||||
}
|
||||
|
||||
if (inst_is_empty (inst))
|
||||
|
@ -2176,10 +2165,6 @@ static int dds_rhc_takecdr_w_qminv (struct rhc *rhc, bool lock, struct ddsi_serd
|
|||
TRACE ("take: returning %"PRIu32"\n", n);
|
||||
assert (rhc_check_counts_locked (rhc, true, false));
|
||||
ddsrt_mutex_unlock (&rhc->lock);
|
||||
|
||||
if (trigger_waitsets)
|
||||
dds_entity_status_signal (&rhc->reader->m_entity);
|
||||
|
||||
assert (n <= INT_MAX);
|
||||
return (int)n;
|
||||
}
|
||||
|
@ -2239,7 +2224,7 @@ bool dds_rhc_add_readcondition (dds_readcond *cond)
|
|||
|
||||
assert ((dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_READ && cond->m_query.m_filter == 0) ||
|
||||
(dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_QUERY && cond->m_query.m_filter != 0));
|
||||
assert (cond->m_entity.m_trigger == 0);
|
||||
assert (ddsrt_atomic_ld32 (&cond->m_entity.m_status.m_trigger) == 0);
|
||||
assert (cond->m_query.m_qcmask == 0);
|
||||
|
||||
cond->m_qminv = qmask_from_dcpsquery (cond->m_sample_states, cond->m_view_states, cond->m_instance_states);
|
||||
|
@ -2270,6 +2255,7 @@ bool dds_rhc_add_readcondition (dds_readcond *cond)
|
|||
cond->m_next = rhc->conds;
|
||||
rhc->conds = cond;
|
||||
|
||||
uint32_t trigger = 0;
|
||||
if (cond->m_query.m_filter == 0)
|
||||
{
|
||||
/* Read condition is not cached inside the instances and samples, so it only needs
|
||||
|
@ -2278,7 +2264,7 @@ bool dds_rhc_add_readcondition (dds_readcond *cond)
|
|||
{
|
||||
struct rhc_instance *inst = rhc->nonempty_instances;
|
||||
do {
|
||||
cond->m_entity.m_trigger += rhc_get_cond_trigger (inst, cond);
|
||||
trigger += rhc_get_cond_trigger (inst, cond);
|
||||
inst = inst->next;
|
||||
} while (inst != rhc->nonempty_instances);
|
||||
}
|
||||
|
@ -2296,7 +2282,6 @@ bool dds_rhc_add_readcondition (dds_readcond *cond)
|
|||
/* Attaching a query condition means clearing the allocated bit in all instances and
|
||||
samples, except for those that match the predicate. */
|
||||
const dds_querycond_mask_t qcmask = cond->m_query.m_qcmask;
|
||||
uint32_t trigger = 0;
|
||||
for (struct rhc_instance *inst = ddsrt_hh_iter_first (rhc->instances, &it); inst != NULL; inst = ddsrt_hh_iter_next (&it))
|
||||
{
|
||||
const bool instmatch = eval_predicate_invsample (rhc, inst, cond->m_query.m_filter);;
|
||||
|
@ -2317,11 +2302,13 @@ bool dds_rhc_add_readcondition (dds_readcond *cond)
|
|||
if (!inst_is_empty (inst) && rhc_get_cond_trigger (inst, cond))
|
||||
trigger += (inst->inv_exists ? instmatch : 0) + matches;
|
||||
}
|
||||
cond->m_entity.m_trigger = trigger;
|
||||
}
|
||||
|
||||
if (cond->m_entity.m_trigger)
|
||||
dds_entity_status_signal (&cond->m_entity);
|
||||
if (trigger)
|
||||
{
|
||||
ddsrt_atomic_st32 (&cond->m_entity.m_status.m_trigger, trigger);
|
||||
dds_entity_status_signal (&cond->m_entity, DDS_DATA_AVAILABLE_STATUS);
|
||||
}
|
||||
|
||||
TRACE ("add_readcondition(%p, %"PRIx32", %"PRIx32", %"PRIx32") => %p qminv %"PRIx32" ; rhc %"PRIu32" conds\n",
|
||||
(void *) rhc, cond->m_sample_states, cond->m_view_states,
|
||||
|
@ -2356,7 +2343,7 @@ void dds_rhc_remove_readcondition (dds_readcond *cond)
|
|||
ddsrt_mutex_unlock (&rhc->lock);
|
||||
}
|
||||
|
||||
static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst)
|
||||
static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert, const struct trigger_info_pre *pre, const struct trigger_info_post *post, const struct trigger_info_qcond *trig_qc, const struct rhc_instance *inst, struct dds_entity *triggers[], size_t *ntriggers)
|
||||
{
|
||||
/* Pre: rhc->lock held; returns 1 if triggering required, else 0. */
|
||||
bool trigger = false;
|
||||
|
@ -2417,14 +2404,14 @@ static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert,
|
|||
else if (m_pre < m_post)
|
||||
{
|
||||
TRACE ("now matches");
|
||||
trigger = (iter->m_entity.m_trigger++ == 0);
|
||||
trigger = (ddsrt_atomic_inc32_ov (&iter->m_entity.m_status.m_trigger) == 0);
|
||||
if (trigger)
|
||||
TRACE (" (cond now triggers)");
|
||||
}
|
||||
else
|
||||
{
|
||||
TRACE ("no longer matches");
|
||||
if (--iter->m_entity.m_trigger == 0)
|
||||
if (ddsrt_atomic_dec32_nv (&iter->m_entity.m_status.m_trigger) == 0)
|
||||
TRACE (" (cond no longer triggers)");
|
||||
}
|
||||
}
|
||||
|
@ -2477,19 +2464,19 @@ static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert,
|
|||
there is always space for a valid and an invalid sample, both add and remove
|
||||
inserting an update always has unread data added, but a read pretends it is a removal
|
||||
of whatever and an insertion of read data */
|
||||
assert (mdelta >= 0 || iter->m_entity.m_trigger >= (uint32_t) -mdelta);
|
||||
assert (mdelta >= 0 || ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger) >= (uint32_t) -mdelta);
|
||||
if (mdelta == 0)
|
||||
TRACE ("no change @ %"PRIu32" (0)", iter->m_entity.m_trigger);
|
||||
TRACE ("no change @ %"PRIu32" (0)", ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger));
|
||||
else
|
||||
TRACE ("m=%"PRId32" @ %"PRIu32" (0)", mdelta, iter->m_entity.m_trigger + (uint32_t) mdelta);
|
||||
TRACE ("m=%"PRId32" @ %"PRIu32" (0)", mdelta, ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger) + (uint32_t) mdelta);
|
||||
/* even though it matches now and matched before, it is not a given that any of the samples
|
||||
matched before, so m_trigger may still be 0 */
|
||||
if (mdelta > 0 && iter->m_entity.m_trigger == 0)
|
||||
const uint32_t ov = ddsrt_atomic_add32_ov (&iter->m_entity.m_status.m_trigger, (uint32_t) mdelta);
|
||||
if (mdelta > 0 && ov == 0)
|
||||
trigger = true;
|
||||
iter->m_entity.m_trigger += (uint32_t) mdelta;
|
||||
if (trigger)
|
||||
TRACE (" (cond now triggers)");
|
||||
else if (mdelta < 0 && iter->m_entity.m_trigger == 0)
|
||||
else if (mdelta < 0 && ov == (uint32_t) -mdelta)
|
||||
TRACE (" (cond no longer triggers)");
|
||||
}
|
||||
else
|
||||
|
@ -2509,7 +2496,7 @@ static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert,
|
|||
} while (sample != end);
|
||||
}
|
||||
if (mdelta == 0 && mcurrent == 0)
|
||||
TRACE ("no change @ %"PRIu32" (2)", iter->m_entity.m_trigger);
|
||||
TRACE ("no change @ %"PRIu32" (2)", ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger));
|
||||
else if (m_pre < m_post)
|
||||
{
|
||||
/* No match previously, so the instance wasn't accounted for at all in the trigger value.
|
||||
|
@ -2519,10 +2506,9 @@ static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert,
|
|||
sample, so mrem reflects the state before the change, and the incremental change needs
|
||||
to be taken into account. */
|
||||
const int32_t m = called_from_insert ? mcurrent : mcurrent + mdelta;
|
||||
TRACE ("mdelta=%"PRId32" mcurrent=%"PRId32" => %"PRId32" => %"PRIu32" (2a)", mdelta, mcurrent, m, iter->m_entity.m_trigger + (uint32_t) m);
|
||||
assert (m >= 0 || iter->m_entity.m_trigger >= (uint32_t) -m);
|
||||
trigger = (iter->m_entity.m_trigger == 0) && m > 0;
|
||||
iter->m_entity.m_trigger += (uint32_t) m;
|
||||
TRACE ("mdelta=%"PRId32" mcurrent=%"PRId32" => %"PRId32" => %"PRIu32" (2a)", mdelta, mcurrent, m, ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger) + (uint32_t) m);
|
||||
assert (m >= 0 || ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger) >= (uint32_t) -m);
|
||||
trigger = (ddsrt_atomic_add32_ov (&iter->m_entity.m_status.m_trigger, (uint32_t) m) == 0 && m > 0);
|
||||
if (trigger)
|
||||
TRACE (" (cond now triggers)");
|
||||
}
|
||||
|
@ -2532,18 +2518,21 @@ static bool update_conditions_locked (struct rhc *rhc, bool called_from_insert,
|
|||
of matches as well as those that were removed just before, hence need the incremental
|
||||
change as well */
|
||||
const int32_t m = mcurrent - mdelta;
|
||||
TRACE ("mdelta=%"PRId32" mcurrent=%"PRId32" => %"PRId32" => %"PRIu32" (2b)", mdelta, mcurrent, m, iter->m_entity.m_trigger - (uint32_t) m);
|
||||
assert (m < 0 || iter->m_entity.m_trigger >= (uint32_t) m);
|
||||
iter->m_entity.m_trigger -= (uint32_t) m;
|
||||
if (iter->m_entity.m_trigger == 0)
|
||||
TRACE ("mdelta=%"PRId32" mcurrent=%"PRId32" => %"PRId32" => %"PRIu32" (2b)", mdelta, mcurrent, m, ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger) - (uint32_t) m);
|
||||
assert (m < 0 || ddsrt_atomic_ld32 (&iter->m_entity.m_status.m_trigger) >= (uint32_t) m);
|
||||
if (ddsrt_atomic_sub32_nv (&iter->m_entity.m_status.m_trigger, (uint32_t) m) == 0)
|
||||
TRACE (" (cond no longer triggers)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (iter->m_entity.m_trigger)
|
||||
dds_entity_status_signal (&iter->m_entity);
|
||||
|
||||
if (trigger)
|
||||
{
|
||||
if (*ntriggers < MAX_FAST_TRIGGERS)
|
||||
triggers[(*ntriggers)++] = &iter->m_entity;
|
||||
else
|
||||
dds_entity_status_signal (&iter->m_entity, DDS_DATA_AVAILABLE_STATUS);
|
||||
}
|
||||
TRACE ("\n");
|
||||
iter = iter->m_next;
|
||||
}
|
||||
|
@ -2717,7 +2706,9 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds, bool chec
|
|||
if (check_conds)
|
||||
{
|
||||
for (i = 0, rciter = rhc->conds; i < ncheck; i++, rciter = rciter->m_next)
|
||||
assert (cond_match_count[i] == rciter->m_entity.m_trigger);
|
||||
{
|
||||
assert (cond_match_count[i] == ddsrt_atomic_ld32 (&rciter->m_entity.m_status.m_trigger));
|
||||
}
|
||||
}
|
||||
|
||||
if (rhc->n_nonempty_instances == 0)
|
||||
|
|
|
@ -38,8 +38,6 @@ struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic
|
|||
tp->c.ops = &ddsi_sertopic_ops_builtintopic;
|
||||
tp->c.serdata_ops = &ddsi_serdata_ops_builtintopic;
|
||||
tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops);
|
||||
tp->c.status_cb = 0;
|
||||
tp->c.status_cb_entity = NULL;
|
||||
ddsrt_atomic_st32 (&tp->c.refc, 1);
|
||||
tp->type = type;
|
||||
return &tp->c;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "dds__qos.h"
|
||||
#include "dds/ddsi/q_entity.h"
|
||||
#include "dds/ddsi/q_globals.h"
|
||||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds/version.h"
|
||||
|
||||
DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_subscriber)
|
||||
|
@ -35,6 +36,13 @@ static dds_return_t dds_subscriber_status_validate (uint32_t mask)
|
|||
return (mask & ~DDS_SUBSCRIBER_STATUS_MASK) ? DDS_RETCODE_BAD_PARAMETER : DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_subscriber = {
|
||||
.close = dds_entity_deriver_dummy_close,
|
||||
.delete = dds_entity_deriver_dummy_delete,
|
||||
.set_qos = dds_subscriber_qos_set,
|
||||
.validate_status = dds_subscriber_status_validate
|
||||
};
|
||||
|
||||
dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_qos_t *qos, const dds_listener_t *listener)
|
||||
{
|
||||
/* participant entity lock must be held */
|
||||
|
@ -56,8 +64,7 @@ dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_q
|
|||
sub = dds_alloc (sizeof (*sub));
|
||||
subscriber = dds_entity_init (&sub->m_entity, &participant->m_entity, DDS_KIND_SUBSCRIBER, new_qos, listener, DDS_SUBSCRIBER_STATUS_MASK);
|
||||
sub->m_entity.m_iid = ddsi_iid_gen ();
|
||||
sub->m_entity.m_deriver.set_qos = dds_subscriber_qos_set;
|
||||
sub->m_entity.m_deriver.validate_status = dds_subscriber_status_validate;
|
||||
dds_entity_register_child (&participant->m_entity, &sub->m_entity);
|
||||
return subscriber;
|
||||
}
|
||||
|
||||
|
@ -77,19 +84,10 @@ dds_return_t dds_notify_readers (dds_entity_t subscriber)
|
|||
{
|
||||
dds_subscriber *sub;
|
||||
dds_return_t ret;
|
||||
|
||||
if ((ret = dds_subscriber_lock (subscriber, &sub)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
|
||||
ret = DDS_RETCODE_UNSUPPORTED;
|
||||
for (dds_entity *iter = sub->m_entity.m_children; iter; iter = iter->m_next)
|
||||
{
|
||||
ddsrt_mutex_lock (&iter->m_mutex);
|
||||
// FIXME: check if reader has data available, call listener
|
||||
ddsrt_mutex_unlock(&iter->m_mutex);
|
||||
}
|
||||
dds_subscriber_unlock (sub);
|
||||
return ret;
|
||||
return DDS_RETCODE_UNSUPPORTED;
|
||||
}
|
||||
|
||||
dds_return_t dds_subscriber_begin_coherent (dds_entity_t e)
|
||||
|
|
|
@ -37,12 +37,20 @@ DECL_ENTITY_LOCK_UNLOCK (extern inline, dds_topic)
|
|||
#define DDS_TOPIC_STATUS_MASK \
|
||||
(DDS_INCONSISTENT_TOPIC_STATUS)
|
||||
|
||||
static int strcmp_wrapper (const void *va, const void *vb)
|
||||
struct topic_sertopic_node {
|
||||
ddsrt_avl_node_t avlnode;
|
||||
uint32_t refc;
|
||||
const struct ddsi_sertopic *st;
|
||||
};
|
||||
|
||||
static int topic_sertopic_node_cmp (const void *va, const void *vb)
|
||||
{
|
||||
return strcmp (va, vb);
|
||||
const struct ddsi_sertopic *a = va;
|
||||
const struct ddsi_sertopic *b = vb;
|
||||
return strcmp (a->name, b->name);
|
||||
}
|
||||
|
||||
const ddsrt_avl_treedef_t dds_topictree_def = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct ddsi_sertopic, avlnode), offsetof (struct ddsi_sertopic, name_type_name), strcmp_wrapper, 0);
|
||||
const ddsrt_avl_treedef_t dds_topictree_def = DDSRT_AVL_TREEDEF_INITIALIZER_INDKEY (offsetof (struct topic_sertopic_node, avlnode), offsetof (struct topic_sertopic_node, st), topic_sertopic_node_cmp, 0);
|
||||
|
||||
static bool is_valid_name (const char *name) ddsrt_nonnull_all;
|
||||
|
||||
|
@ -72,9 +80,10 @@ static dds_return_t dds_topic_status_validate (uint32_t mask)
|
|||
|
||||
/*
|
||||
Topic status change callback handler. Supports INCONSISTENT_TOPIC
|
||||
status (only defined status on a topic).
|
||||
status (only defined status on a topic). Irrelevant until inconsistent topic
|
||||
definitions can be detected, so until topic discovery is added.
|
||||
*/
|
||||
|
||||
#if 0
|
||||
static void dds_topic_status_cb (struct dds_topic *tp)
|
||||
{
|
||||
struct dds_listener const * const lst = &tp->m_entity.m_listener;
|
||||
|
@ -99,66 +108,31 @@ static void dds_topic_status_cb (struct dds_topic *tp)
|
|||
ddsrt_cond_broadcast (&tp->m_entity.m_observers_cond);
|
||||
ddsrt_mutex_unlock (&tp->m_entity.m_observers_lock);
|
||||
}
|
||||
|
||||
struct ddsi_sertopic *dds_topic_lookup_locked (dds_domain *domain, const char *name) ddsrt_nonnull_all;
|
||||
|
||||
struct ddsi_sertopic *dds_topic_lookup_locked (dds_domain *domain, const char *name)
|
||||
{
|
||||
ddsrt_avl_iter_t iter;
|
||||
for (struct ddsi_sertopic *st = ddsrt_avl_iter_first (&dds_topictree_def, &domain->m_topics, &iter); st; st = ddsrt_avl_iter_next (&iter))
|
||||
if (strcmp (st->name, name) == 0)
|
||||
return st;
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
struct ddsi_sertopic *dds_topic_lookup (dds_domain *domain, const char *name)
|
||||
{
|
||||
const struct ddsi_sertopic key = { .name = (char *) name };
|
||||
struct ddsi_sertopic *st;
|
||||
struct topic_sertopic_node *nst;
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
st = dds_topic_lookup_locked (domain, name);
|
||||
if ((nst = ddsrt_avl_lookup (&dds_topictree_def, &domain->m_topics, &key)) == NULL)
|
||||
st = NULL;
|
||||
else
|
||||
st = ddsi_sertopic_ref (nst->st);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
return st;
|
||||
}
|
||||
|
||||
void dds_topic_free (dds_domainid_t domainid, struct ddsi_sertopic *st)
|
||||
static bool dds_find_topic_check_and_add_ref (dds_entity_t participant, dds_entity_t topic, const char *name)
|
||||
{
|
||||
dds_domain *domain;
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
domain = ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &domainid);
|
||||
if (domain != NULL)
|
||||
{
|
||||
assert (ddsrt_avl_lookup (&dds_topictree_def, &domain->m_topics, st->name_type_name) != NULL);
|
||||
ddsrt_avl_delete (&dds_topictree_def, &domain->m_topics, st);
|
||||
}
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
st->status_cb_entity = NULL;
|
||||
ddsi_sertopic_unref (st);
|
||||
}
|
||||
dds_topic *tp;
|
||||
if (dds_topic_lock (topic, &tp) != DDS_RETCODE_OK)
|
||||
return false;
|
||||
|
||||
static void dds_topic_add_locked (dds_domainid_t id, struct ddsi_sertopic *st)
|
||||
{
|
||||
dds_domain *dom = dds_domain_find_locked (id);
|
||||
assert (dom);
|
||||
assert (ddsrt_avl_lookup (&dds_topictree_def, &dom->m_topics, st->name_type_name) == NULL);
|
||||
ddsrt_avl_insert (&dds_topictree_def, &dom->m_topics, st);
|
||||
}
|
||||
|
||||
dds_entity_t dds_find_topic (dds_entity_t participant, const char *name)
|
||||
{
|
||||
dds_entity_t tp;
|
||||
dds_participant *p;
|
||||
struct ddsi_sertopic *st;
|
||||
dds_return_t rc;
|
||||
|
||||
if (name == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
if ((rc = dds_participant_lock (participant, &p)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
if ((st = dds_topic_lookup_locked (p->m_entity.m_domain, name)) == NULL)
|
||||
tp = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
bool ret;
|
||||
if (tp->m_entity.m_participant->m_hdllink.hdl != participant || strcmp (tp->m_stopic->name, name) != 0)
|
||||
ret = false;
|
||||
else
|
||||
{
|
||||
/* FIXME: calling addref is wrong because the Cyclone library has no
|
||||
|
@ -166,19 +140,76 @@ dds_entity_t dds_find_topic (dds_entity_t participant, const char *name)
|
|||
won't make the ref count drop to 0. On the other hand, the DDS spec
|
||||
says find_topic (and a second call to create_topic) return a new
|
||||
proxy that must separately be deleted. */
|
||||
dds_entity_add_ref (&st->status_cb_entity->m_entity);
|
||||
tp = st->status_cb_entity->m_entity.m_hdllink.hdl;
|
||||
dds_entity_add_ref_locked (&tp->m_entity);
|
||||
ret = true;
|
||||
}
|
||||
dds_topic_unlock (tp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
dds_entity_t dds_find_topic (dds_entity_t participant, const char *name)
|
||||
{
|
||||
dds_entity *pe;
|
||||
dds_return_t ret;
|
||||
dds_entity_t topic;
|
||||
|
||||
if (name == NULL)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
/* claim participant handle to guarantee the handle remains valid after
|
||||
unlocking the participant prior to verifying the found topic still
|
||||
exists */
|
||||
if ((ret = dds_entity_pin (participant, &pe)) < 0)
|
||||
return ret;
|
||||
if (dds_entity_kind (pe) != DDS_KIND_PARTICIPANT)
|
||||
{
|
||||
dds_entity_unpin (pe);
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
|
||||
do {
|
||||
dds_participant *p;
|
||||
topic = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
if ((ret = dds_participant_lock (participant, &p)) == DDS_RETCODE_OK)
|
||||
{
|
||||
ddsrt_avl_iter_t it;
|
||||
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &p->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
if (dds_entity_kind (e) == DDS_KIND_TOPIC && strcmp (((dds_topic *) e)->m_stopic->name, name) == 0)
|
||||
{
|
||||
topic = e->m_hdllink.hdl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
dds_participant_unlock (p);
|
||||
return tp;
|
||||
}
|
||||
} while (topic > 0 && !dds_find_topic_check_and_add_ref (participant, topic, name));
|
||||
|
||||
dds_entity_unpin (pe);
|
||||
return topic;
|
||||
}
|
||||
|
||||
static dds_return_t dds_topic_delete (dds_entity *e) ddsrt_nonnull_all;
|
||||
|
||||
static dds_return_t dds_topic_delete (dds_entity *e)
|
||||
{
|
||||
dds_topic_free (e->m_domainid, ((dds_topic *) e)->m_stopic);
|
||||
dds_topic *tp = (dds_topic *) e;
|
||||
dds_domain *domain = tp->m_entity.m_domain;
|
||||
ddsrt_avl_dpath_t dp;
|
||||
struct topic_sertopic_node *stn;
|
||||
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
|
||||
stn = ddsrt_avl_lookup_dpath (&dds_topictree_def, &domain->m_topics, tp->m_stopic, &dp);
|
||||
assert (stn != NULL);
|
||||
if (--stn->refc == 0)
|
||||
{
|
||||
ddsrt_avl_delete_dpath (&dds_topictree_def, &domain->m_topics, stn, &dp);
|
||||
ddsrt_free (stn);
|
||||
}
|
||||
|
||||
ddsi_sertopic_unref (tp->m_stopic);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
|
@ -189,14 +220,14 @@ static dds_return_t dds_topic_qos_set (dds_entity *e, const dds_qos_t *qos, bool
|
|||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
static bool dupdef_qos_ok (const dds_qos_t *qos, const struct ddsi_sertopic *st)
|
||||
static bool dupdef_qos_ok (const dds_qos_t *qos, const dds_topic *tp)
|
||||
{
|
||||
if ((qos == NULL) != (st->status_cb_entity->m_entity.m_qos == NULL))
|
||||
if ((qos == NULL) != (tp->m_entity.m_qos == NULL))
|
||||
return false;
|
||||
else if (qos == NULL)
|
||||
return true;
|
||||
else
|
||||
return dds_qos_equal (st->status_cb_entity->m_entity.m_qos, qos);
|
||||
return dds_qos_equal (tp->m_entity.m_qos, qos);
|
||||
}
|
||||
|
||||
static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b)
|
||||
|
@ -212,11 +243,46 @@ static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct dds
|
|||
return true;
|
||||
}
|
||||
|
||||
static dds_return_t create_topic_topic_arbirary_check_sertopic (dds_entity_t participant, dds_entity_t topic, struct ddsi_sertopic *sertopic, const dds_qos_t *qos)
|
||||
{
|
||||
dds_topic *tp;
|
||||
dds_return_t ret;
|
||||
|
||||
if (dds_topic_lock (topic, &tp) < 0)
|
||||
return DDS_RETCODE_NOT_FOUND;
|
||||
|
||||
if (tp->m_entity.m_participant->m_hdllink.hdl != participant)
|
||||
ret = DDS_RETCODE_NOT_FOUND;
|
||||
else if (!sertopic_equivalent (tp->m_stopic, sertopic))
|
||||
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
else if (!dupdef_qos_ok (qos, tp))
|
||||
ret = DDS_RETCODE_INCONSISTENT_POLICY;
|
||||
else
|
||||
{
|
||||
/* FIXME: calling addref is wrong because the Cyclone library has no
|
||||
knowledge of the reference and hence simply deleting the participant
|
||||
won't make the ref count drop to 0. On the other hand, the DDS spec
|
||||
says find_topic (and a second call to create_topic) return a new
|
||||
proxy that must separately be deleted. */
|
||||
dds_entity_add_ref_locked (&tp->m_entity);
|
||||
ret = DDS_RETCODE_OK;
|
||||
}
|
||||
dds_topic_unlock (tp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_topic = {
|
||||
.close = dds_entity_deriver_dummy_close,
|
||||
.delete = dds_topic_delete,
|
||||
.set_qos = dds_topic_qos_set,
|
||||
.validate_status = dds_topic_status_validate
|
||||
};
|
||||
|
||||
dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_sertopic *sertopic, const dds_qos_t *qos, const dds_listener_t *listener, const nn_plist_t *sedp_plist)
|
||||
{
|
||||
struct ddsi_sertopic *stgeneric;
|
||||
dds_return_t rc;
|
||||
dds_participant *par;
|
||||
dds_entity *par_ent;
|
||||
dds_topic *top;
|
||||
dds_qos_t *new_qos = NULL;
|
||||
dds_entity_t hdl;
|
||||
|
@ -242,45 +308,122 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
|
|||
if ((rc = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK)
|
||||
goto err_invalid_qos;
|
||||
|
||||
/* Claim participant handle so we can be sure the handle will not be
|
||||
reused if we temporarily unlock the participant to check the an
|
||||
existing topic's compatibility */
|
||||
if ((rc = dds_entity_pin (participant, &par_ent)) < 0)
|
||||
goto err_claim_participant;
|
||||
|
||||
/* FIXME: just mutex_lock ought to be good enough, but there is the
|
||||
pesky "closed" check still ... */
|
||||
if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
|
||||
goto err_lock_participant;
|
||||
|
||||
/* Check if topic already exists with same name */
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
if ((stgeneric = dds_topic_lookup_locked (par->m_entity.m_domain, sertopic->name)) != NULL) {
|
||||
if (!sertopic_equivalent (stgeneric, sertopic)) {
|
||||
/* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */
|
||||
rc = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
goto err_mismatch;
|
||||
} else if (!dupdef_qos_ok (new_qos, stgeneric)) {
|
||||
/* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */
|
||||
rc = DDS_RETCODE_INCONSISTENT_POLICY;
|
||||
goto err_mismatch;
|
||||
} else {
|
||||
/* FIXME: calling addref is wrong because the Cyclone library has no
|
||||
knowledge of the reference and hence simply deleting the participant
|
||||
won't make the ref count drop to 0. On the other hand, the DDS spec
|
||||
says find_topic (and a second call to create_topic) return a new
|
||||
proxy that must separately be deleted. */
|
||||
dds_entity_add_ref (&stgeneric->status_cb_entity->m_entity);
|
||||
hdl = stgeneric->status_cb_entity->m_entity.m_hdllink.hdl;
|
||||
dds_delete_qos (new_qos);
|
||||
bool retry_lookup;
|
||||
do {
|
||||
dds_entity_t topic;
|
||||
|
||||
/* claim participant handle to guarantee the handle remains valid after
|
||||
unlocking the participant prior to verifying the found topic still
|
||||
exists */
|
||||
topic = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
ddsrt_avl_iter_t it;
|
||||
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &par->m_entity.m_children, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
if (dds_entity_kind (e) == DDS_KIND_TOPIC && strcmp (((dds_topic *) e)->m_stopic->name, sertopic->name) == 0)
|
||||
{
|
||||
topic = e->m_hdllink.hdl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (topic < 0)
|
||||
{
|
||||
/* no topic with the name exists; we have locked the participant, and
|
||||
so we can proceed with creating the topic */
|
||||
retry_lookup = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* some topic with the same name exists; need to lock the topic to
|
||||
perform the checks, but locking the topic while holding the
|
||||
participant lock violates the lock order (child -> parent). So
|
||||
unlock that participant and check the topic while accounting
|
||||
for the various scary cases. */
|
||||
dds_participant_unlock (par);
|
||||
|
||||
rc = create_topic_topic_arbirary_check_sertopic (participant, topic, sertopic, new_qos);
|
||||
switch (rc)
|
||||
{
|
||||
case DDS_RETCODE_OK: /* duplicate definition */
|
||||
dds_entity_unpin (par_ent);
|
||||
dds_delete_qos (new_qos);
|
||||
return topic;
|
||||
|
||||
case DDS_RETCODE_NOT_FOUND:
|
||||
/* either participant is now being deleted, topic was deleted, or
|
||||
topic was deleted & the handle reused for something else -- so */
|
||||
retry_lookup = true;
|
||||
break;
|
||||
|
||||
case DDS_RETCODE_PRECONDITION_NOT_MET: /* incompatible sertopic */
|
||||
case DDS_RETCODE_INCONSISTENT_POLICY: /* different QoS */
|
||||
/* inconsistent definition */
|
||||
dds_entity_unpin (par_ent);
|
||||
dds_delete_qos (new_qos);
|
||||
return rc;
|
||||
|
||||
default:
|
||||
abort ();
|
||||
}
|
||||
|
||||
if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
|
||||
goto err_lock_participant;
|
||||
}
|
||||
} while (retry_lookup);
|
||||
|
||||
/* FIXME: make this a function
|
||||
Add sertopic to domain -- but note that it may have been created by another thread
|
||||
on another participant that is attached to the same domain */
|
||||
{
|
||||
struct dds_domain *domain = par->m_entity.m_domain;
|
||||
|
||||
ddsrt_avl_ipath_t ip;
|
||||
struct topic_sertopic_node *stn;
|
||||
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
|
||||
stn = ddsrt_avl_lookup_ipath (&dds_topictree_def, &domain->m_topics, sertopic, &ip);
|
||||
if (stn == NULL)
|
||||
{
|
||||
/* no existing definition: use new */
|
||||
stn = ddsrt_malloc (sizeof (*stn));
|
||||
stn->refc = 1;
|
||||
stn->st = ddsi_sertopic_ref (sertopic);
|
||||
ddsrt_avl_insert (&dds_topictree_def, &domain->m_topics, stn);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
} else {
|
||||
}
|
||||
else if (sertopic_equivalent (stn->st, sertopic))
|
||||
{
|
||||
/* ok -- same definition, so use existing one instead */
|
||||
sertopic = ddsi_sertopic_ref (stn->st);
|
||||
stn->refc++;
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* bummer, delete */
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
rc = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
goto err_sertopic_reuse;
|
||||
}
|
||||
}
|
||||
|
||||
/* Create topic */
|
||||
top = dds_alloc (sizeof (*top));
|
||||
hdl = dds_entity_init (&top->m_entity, &par->m_entity, DDS_KIND_TOPIC, new_qos, listener, DDS_TOPIC_STATUS_MASK);
|
||||
top->m_entity.m_iid = ddsi_iid_gen ();
|
||||
top->m_entity.m_deriver.delete = dds_topic_delete;
|
||||
top->m_entity.m_deriver.set_qos = dds_topic_qos_set;
|
||||
top->m_entity.m_deriver.validate_status = dds_topic_status_validate;
|
||||
top->m_stopic = ddsi_sertopic_ref (sertopic);
|
||||
sertopic->status_cb_entity = top;
|
||||
|
||||
/* Add topic to extent */
|
||||
dds_topic_add_locked (par->m_entity.m_domainid, sertopic);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
dds_entity_register_child (&par->m_entity, &top->m_entity);
|
||||
top->m_stopic = sertopic;
|
||||
|
||||
/* Publish Topic */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
|
@ -296,14 +439,15 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
|
|||
nn_plist_fini (&plist);
|
||||
}
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
}
|
||||
dds_participant_unlock (par);
|
||||
dds_entity_unpin (par_ent);
|
||||
return hdl;
|
||||
|
||||
err_mismatch:
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
err_sertopic_reuse:
|
||||
dds_participant_unlock (par);
|
||||
err_lock_participant:
|
||||
dds_entity_unpin (par_ent);
|
||||
err_claim_participant:
|
||||
err_invalid_qos:
|
||||
dds_delete_qos (new_qos);
|
||||
return rc;
|
||||
|
@ -330,8 +474,6 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
|
|||
|
||||
ddsrt_atomic_st32 (&st->c.refc, 1);
|
||||
st->c.iid = ddsi_iid_gen ();
|
||||
st->c.status_cb = dds_topic_status_cb;
|
||||
st->c.status_cb_entity = NULL; /* set by dds_create_topic_arbitrary */
|
||||
st->c.name_type_name = key;
|
||||
st->c.name = ddsrt_strdup (name);
|
||||
st->c.type_name = ddsrt_strdup (typename);
|
||||
|
|
|
@ -22,127 +22,96 @@
|
|||
|
||||
DEFINE_ENTITY_LOCK_UNLOCK (static, dds_waitset, DDS_KIND_WAITSET)
|
||||
|
||||
static void dds_waitset_swap (dds_attachment **dst, dds_attachment **src, dds_attachment *prev, dds_attachment *idx)
|
||||
static bool is_triggered (struct dds_entity *e)
|
||||
{
|
||||
/* Remove from source. */
|
||||
if (prev == NULL)
|
||||
*src = idx->next;
|
||||
else
|
||||
prev->next = idx->next;
|
||||
|
||||
/* Add to destination. */
|
||||
idx->next = *dst;
|
||||
*dst = idx;
|
||||
bool t;
|
||||
switch (e->m_kind)
|
||||
{
|
||||
case DDS_KIND_COND_READ:
|
||||
case DDS_KIND_COND_QUERY:
|
||||
case DDS_KIND_COND_GUARD:
|
||||
case DDS_KIND_WAITSET:
|
||||
t = ddsrt_atomic_ld32 (&e->m_status.m_trigger) != 0;
|
||||
break;
|
||||
default:
|
||||
t = (ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask) & SAM_STATUS_MASK) != 0;
|
||||
break;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
static dds_return_t dds_waitset_wait_impl (dds_entity_t waitset, dds_attach_t *xs, size_t nxs, dds_time_t abstimeout)
|
||||
{
|
||||
dds_waitset *ws;
|
||||
dds_return_t ret;
|
||||
dds_attachment *idx;
|
||||
dds_attachment *prev;
|
||||
|
||||
if (xs == NULL && nxs != 0)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
if (xs != NULL && nxs == 0)
|
||||
if ((xs == NULL) != (nxs == 0))
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
/* Locking the waitset here will delay a possible deletion until it is
|
||||
* unlocked. Even when the related mutex is unlocked by a conditioned wait. */
|
||||
if ((ret = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
|
||||
{
|
||||
dds_entity *ent;
|
||||
if ((ret = dds_entity_pin (waitset, &ent)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
if (dds_entity_kind (ent) != DDS_KIND_WAITSET)
|
||||
{
|
||||
dds_entity_unpin (ent);
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
ws = (dds_waitset *) ent;
|
||||
}
|
||||
|
||||
/* Move any previously but no longer triggering entities back to the observed list */
|
||||
idx = ws->triggered;
|
||||
prev = NULL;
|
||||
while (idx != NULL)
|
||||
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
|
||||
ws->ntriggered = 0;
|
||||
for (size_t i = 0; i < ws->nentities; i++)
|
||||
{
|
||||
dds_attachment *next = idx->next;
|
||||
if (idx->entity->m_trigger == 0)
|
||||
dds_waitset_swap (&ws->observed, &ws->triggered, prev, idx);
|
||||
else
|
||||
prev = idx;
|
||||
idx = next;
|
||||
if (is_triggered (ws->entities[i].entity))
|
||||
{
|
||||
dds_attachment tmp = ws->entities[i];
|
||||
ws->entities[i] = ws->entities[ws->ntriggered];
|
||||
ws->entities[ws->ntriggered++] = tmp;
|
||||
}
|
||||
/* Check if any of the observed entities are currently triggered, moving them
|
||||
to the triggered list */
|
||||
idx = ws->observed;
|
||||
prev = NULL;
|
||||
while (idx != NULL)
|
||||
{
|
||||
dds_attachment *next = idx->next;
|
||||
if (idx->entity->m_trigger > 0)
|
||||
dds_waitset_swap (&ws->triggered, &ws->observed, prev, idx);
|
||||
else
|
||||
prev = idx;
|
||||
idx = next;
|
||||
}
|
||||
|
||||
/* Only wait/keep waiting when we have something to observe and there aren't any triggers yet. */
|
||||
while (ws->observed != NULL && ws->triggered == NULL)
|
||||
while (ws->nentities > 0 && ws->ntriggered == 0)
|
||||
if (!ddsrt_cond_waituntil (&ws->m_entity.m_cond, &ws->m_entity.m_mutex, abstimeout))
|
||||
break;
|
||||
|
||||
/* Get number of triggered entities
|
||||
* - set attach array when needed
|
||||
* - swap them back to observed */
|
||||
ret = 0;
|
||||
idx = ws->triggered;
|
||||
while (idx != NULL)
|
||||
{
|
||||
if ((uint32_t) ret < (uint32_t) nxs)
|
||||
xs[ret] = idx->arg;
|
||||
ret++;
|
||||
|
||||
/* The idx is always the first in triggered, so no prev. */
|
||||
dds_attachment *next = idx->next;
|
||||
dds_waitset_swap (&ws->observed, &ws->triggered, NULL, idx);
|
||||
idx = next;
|
||||
}
|
||||
dds_waitset_unlock (ws);
|
||||
ret = (int32_t) ws->ntriggered;
|
||||
for (size_t i = 0; i < ws->ntriggered && i < nxs; i++)
|
||||
xs[i] = ws->entities[i].arg;
|
||||
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
|
||||
dds_entity_unpin (&ws->m_entity);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void dds_waitset_close_list (dds_attachment **list, dds_entity_t waitset)
|
||||
{
|
||||
dds_attachment *idx = *list;
|
||||
dds_attachment *next;
|
||||
while (idx != NULL)
|
||||
{
|
||||
next = idx->next;
|
||||
(void) dds_entity_observer_unregister (idx->entity->m_hdllink.hdl, waitset);
|
||||
ddsrt_free (idx);
|
||||
idx = next;
|
||||
}
|
||||
*list = NULL;
|
||||
}
|
||||
|
||||
static bool dds_waitset_remove_from_list (dds_attachment **list, dds_entity_t observed)
|
||||
{
|
||||
dds_attachment *idx, *prev;
|
||||
for (idx = *list, prev = NULL; idx != NULL; prev = idx, idx = idx->next)
|
||||
if (idx->entity->m_hdllink.hdl == observed)
|
||||
break;
|
||||
if (idx == NULL)
|
||||
return false;
|
||||
|
||||
if (prev == NULL)
|
||||
*list = idx->next;
|
||||
else
|
||||
prev->next = idx->next;
|
||||
ddsrt_free (idx);
|
||||
return true;
|
||||
}
|
||||
|
||||
static dds_return_t dds_waitset_close (struct dds_entity *e)
|
||||
{
|
||||
/* deep in the process of deleting the entity, so this is the only thread */
|
||||
dds_waitset *ws = (dds_waitset *) e;
|
||||
dds_waitset_close_list (&ws->observed, e->m_hdllink.hdl);
|
||||
dds_waitset_close_list (&ws->triggered, e->m_hdllink.hdl);
|
||||
ddsrt_cond_broadcast (&e->m_cond);
|
||||
for (size_t i = 0; i < ws->nentities; i++)
|
||||
(void) dds_entity_observer_unregister (ws->entities[i].entity, &ws->m_entity);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
static dds_return_t dds_waitset_delete (struct dds_entity *e)
|
||||
{
|
||||
/* deep in the process of deleting the entity, so this is the only thread */
|
||||
dds_waitset *ws = (dds_waitset *) e;
|
||||
ddsrt_free (ws->entities);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_waitset = {
|
||||
.close = dds_waitset_close,
|
||||
.delete = dds_waitset_delete,
|
||||
.set_qos = dds_entity_deriver_dummy_set_qos,
|
||||
.validate_status = dds_entity_deriver_dummy_validate_status
|
||||
};
|
||||
|
||||
dds_entity_t dds_create_waitset (dds_entity_t participant)
|
||||
{
|
||||
dds_entity_t hdl;
|
||||
|
@ -155,14 +124,14 @@ dds_entity_t dds_create_waitset (dds_entity_t participant)
|
|||
dds_waitset *waitset = dds_alloc (sizeof (*waitset));
|
||||
hdl = dds_entity_init (&waitset->m_entity, &par->m_entity, DDS_KIND_WAITSET, NULL, NULL, 0);
|
||||
waitset->m_entity.m_iid = ddsi_iid_gen ();
|
||||
waitset->m_entity.m_deriver.close = dds_waitset_close;
|
||||
waitset->observed = NULL;
|
||||
waitset->triggered = NULL;
|
||||
dds_entity_register_child (&par->m_entity, &waitset->m_entity);
|
||||
waitset->nentities = 0;
|
||||
waitset->ntriggered = 0;
|
||||
waitset->entities = NULL;
|
||||
dds_participant_unlock (par);
|
||||
return hdl;
|
||||
}
|
||||
|
||||
|
||||
dds_return_t dds_waitset_get_entities (dds_entity_t waitset, dds_entity_t *entities, size_t size)
|
||||
{
|
||||
dds_return_t ret;
|
||||
|
@ -171,62 +140,72 @@ dds_return_t dds_waitset_get_entities (dds_entity_t waitset, dds_entity_t *entit
|
|||
if ((ret = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
|
||||
ret = 0;
|
||||
for (dds_attachment *iter = ws->observed; iter != NULL; iter = iter->next)
|
||||
if (entities != NULL)
|
||||
{
|
||||
if ((size_t) ret < size && entities != NULL)
|
||||
entities[ret] = iter->entity->m_hdllink.hdl;
|
||||
ret++;
|
||||
}
|
||||
for (dds_attachment *iter = ws->triggered; iter != NULL; iter = iter->next)
|
||||
{
|
||||
if ((size_t) ret < size && entities != NULL)
|
||||
entities[ret] = iter->entity->m_hdllink.hdl;
|
||||
ret++;
|
||||
for (size_t i = 0; i < ws->nentities && i < size; i++)
|
||||
entities[i] = ws->entities[i].handle;
|
||||
}
|
||||
ret = (int32_t) ws->nentities;
|
||||
dds_waitset_unlock (ws);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void dds_waitset_move (dds_attachment **src, dds_attachment **dst, dds_entity_t entity)
|
||||
{
|
||||
dds_attachment *idx, *prev;
|
||||
for (idx = *src, prev = NULL; idx != NULL; prev = idx, idx = idx->next)
|
||||
if (idx->entity->m_hdllink.hdl == entity)
|
||||
break;
|
||||
if (idx != NULL)
|
||||
{
|
||||
/* Swap idx from src to dst. */
|
||||
dds_waitset_swap (dst, src, prev, idx);
|
||||
}
|
||||
}
|
||||
|
||||
static void dds_waitset_remove (dds_waitset *ws, dds_entity_t observed)
|
||||
{
|
||||
if (!dds_waitset_remove_from_list (&ws->observed, observed))
|
||||
(void) dds_waitset_remove_from_list (&ws->triggered, observed);
|
||||
size_t i;
|
||||
for (i = 0; i < ws->nentities; i++)
|
||||
if (ws->entities[i].handle == observed)
|
||||
break;
|
||||
if (i < ws->nentities)
|
||||
{
|
||||
if (i < ws->ntriggered)
|
||||
{
|
||||
ws->entities[i] = ws->entities[--ws->ntriggered];
|
||||
ws->entities[ws->ntriggered] = ws->entities[--ws->nentities];
|
||||
}
|
||||
else
|
||||
{
|
||||
ws->entities[i] = ws->entities[--ws->nentities];
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* This is called when the observed entity signals a status change. */
|
||||
static void dds_waitset_observer (dds_entity_t observer, dds_entity_t observed, uint32_t status)
|
||||
static void dds_waitset_observer (dds_entity *ent, dds_entity_t observed, uint32_t status)
|
||||
{
|
||||
dds_waitset *ws;
|
||||
if (dds_waitset_lock (observer, &ws) == DDS_RETCODE_OK) {
|
||||
if (status & DDS_DELETING_STATUS) {
|
||||
/* Remove this observed entity, which is being deleted, from the waitset. */
|
||||
dds_waitset_remove (ws, observed);
|
||||
/* Our registration to this observed entity will be removed automatically. */
|
||||
} else if (status != 0) {
|
||||
assert (dds_entity_kind (ent) == DDS_KIND_WAITSET);
|
||||
dds_waitset *ws = (dds_waitset *) ent;
|
||||
(void) status;
|
||||
|
||||
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
|
||||
/* Move observed entity to triggered list. */
|
||||
dds_waitset_move (&ws->observed, &ws->triggered, observed);
|
||||
} else {
|
||||
/* Remove observed entity from triggered list (which it possibly resides in). */
|
||||
dds_waitset_move (&ws->triggered, &ws->observed, observed);
|
||||
size_t i;
|
||||
for (i = 0; i < ws->nentities; i++)
|
||||
if (ws->entities[i].handle == observed)
|
||||
break;
|
||||
if (i < ws->nentities && i >= ws->ntriggered)
|
||||
{
|
||||
dds_attachment tmp = ws->entities[i];
|
||||
ws->entities[i] = ws->entities[ws->ntriggered];
|
||||
ws->entities[ws->ntriggered++] = tmp;
|
||||
}
|
||||
/* Trigger waitset to wake up. */
|
||||
ddsrt_cond_broadcast (&ws->m_entity.m_cond);
|
||||
dds_waitset_unlock (ws);
|
||||
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
|
||||
}
|
||||
|
||||
static void dds_waitset_delete_observer (dds_entity *ent, dds_entity_t observed)
|
||||
{
|
||||
assert (dds_entity_kind (ent) == DDS_KIND_WAITSET);
|
||||
dds_waitset *ws = (dds_waitset *) ent;
|
||||
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
|
||||
/* Remove this observed entity, which is being deleted, from the waitset. */
|
||||
dds_waitset_remove (ws, observed);
|
||||
/* Our registration to this observed entity will be removed automatically. */
|
||||
/* Trigger waitset to wake up. */
|
||||
ddsrt_cond_broadcast (&ws->m_entity.m_cond);
|
||||
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
|
||||
}
|
||||
|
||||
dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_attach_t x)
|
||||
|
@ -235,35 +214,35 @@ dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_
|
|||
dds_waitset *ws;
|
||||
dds_return_t ret;
|
||||
|
||||
if ((ret = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
|
||||
if ((ret = dds_waitset_lock (waitset, &ws)) < 0)
|
||||
return ret;
|
||||
|
||||
if (waitset == entity)
|
||||
e = &ws->m_entity;
|
||||
else if ((ret = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
|
||||
{
|
||||
ret = DDS_RETCODE_BAD_PARAMETER;
|
||||
else if ((ret = dds_entity_pin (entity, &e)) < 0)
|
||||
goto err_waitset;
|
||||
}
|
||||
|
||||
/* This will fail if given entity is already attached (or deleted). */
|
||||
if ((ret = dds_entity_observer_register_nl (e, waitset, dds_waitset_observer)) != DDS_RETCODE_OK)
|
||||
if ((ret = dds_entity_observer_register (e, &ws->m_entity, dds_waitset_observer, dds_waitset_delete_observer)) != DDS_RETCODE_OK)
|
||||
goto err_entity;
|
||||
|
||||
dds_attachment *a = ddsrt_malloc (sizeof (*a));
|
||||
a->arg = x;
|
||||
a->entity = e;
|
||||
if (e->m_trigger > 0) {
|
||||
a->next = ws->triggered;
|
||||
ws->triggered = a;
|
||||
} else {
|
||||
a->next = ws->observed;
|
||||
ws->observed = a;
|
||||
ws->entities = ddsrt_realloc (ws->entities, (ws->nentities + 1) * sizeof (*ws->entities));
|
||||
ws->entities[ws->nentities].arg = x;
|
||||
ws->entities[ws->nentities].entity = e;
|
||||
ws->entities[ws->nentities].handle = e->m_hdllink.hdl;
|
||||
ws->nentities++;
|
||||
if (is_triggered (e))
|
||||
{
|
||||
const size_t i = ws->nentities - 1;
|
||||
dds_attachment tmp = ws->entities[i];
|
||||
ws->entities[i] = ws->entities[ws->ntriggered];
|
||||
ws->entities[ws->ntriggered++] = tmp;
|
||||
}
|
||||
ddsrt_cond_broadcast (&ws->m_entity.m_cond);
|
||||
|
||||
err_entity:
|
||||
if (e != &ws->m_entity)
|
||||
dds_entity_unlock (e);
|
||||
dds_entity_unpin (e);
|
||||
err_waitset:
|
||||
dds_waitset_unlock (ws);
|
||||
return ret;
|
||||
|
@ -272,6 +251,7 @@ err_waitset:
|
|||
dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity)
|
||||
{
|
||||
dds_waitset *ws;
|
||||
dds_entity *e;
|
||||
dds_return_t ret;
|
||||
|
||||
if ((ret = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
|
||||
|
@ -279,12 +259,19 @@ dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity)
|
|||
|
||||
/* Possibly fails when entity was not attached. */
|
||||
if (waitset == entity)
|
||||
ret = dds_entity_observer_unregister_nl (&ws->m_entity, waitset);
|
||||
ret = dds_entity_observer_unregister (&ws->m_entity, &ws->m_entity);
|
||||
else if ((ret = dds_entity_pin (entity, &e)) < 0)
|
||||
; /* entity invalid */
|
||||
else
|
||||
ret = dds_entity_observer_unregister (entity, waitset);
|
||||
{
|
||||
ret = dds_entity_observer_unregister (e, &ws->m_entity);
|
||||
dds_entity_unpin (e);
|
||||
}
|
||||
|
||||
if (ret == DDS_RETCODE_OK)
|
||||
{
|
||||
dds_waitset_remove (ws, entity);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (ret != DDS_RETCODE_PRECONDITION_NOT_MET)
|
||||
|
@ -310,22 +297,21 @@ dds_return_t dds_waitset_wait (dds_entity_t waitset, dds_attach_t *xs, size_t nx
|
|||
|
||||
dds_return_t dds_waitset_set_trigger (dds_entity_t waitset, bool trigger)
|
||||
{
|
||||
dds_waitset *ws;
|
||||
dds_entity *ent;
|
||||
dds_return_t rc;
|
||||
|
||||
if ((rc = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK)
|
||||
if ((rc = dds_entity_pin (waitset, &ent)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
else if (dds_entity_kind (ent) != DDS_KIND_WAITSET)
|
||||
{
|
||||
dds_entity_unpin (ent);
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
|
||||
ddsrt_mutex_unlock (&ws->m_entity.m_mutex);
|
||||
ddsrt_mutex_lock (&ent->m_observers_lock);
|
||||
dds_entity_trigger_set (ent, trigger);
|
||||
ddsrt_mutex_unlock (&ent->m_observers_lock);
|
||||
|
||||
ddsrt_mutex_lock (&ws->m_entity.m_observers_lock);
|
||||
if (trigger)
|
||||
dds_entity_status_set (&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS);
|
||||
else
|
||||
dds_entity_status_reset (&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS);
|
||||
ddsrt_mutex_unlock (&ws->m_entity.m_observers_lock);
|
||||
|
||||
ddsrt_mutex_lock (&ws->m_entity.m_mutex);
|
||||
dds_waitset_unlock (ws);
|
||||
dds_entity_unpin (ent);
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
#include "dds/dds.h"
|
||||
#include "dds/version.h"
|
||||
#include "dds/ddsrt/static_assert.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
#include "dds/ddsi/q_globals.h"
|
||||
#include "dds/ddsi/q_entity.h"
|
||||
|
@ -57,7 +58,7 @@ static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
{
|
||||
/* Release the initial claim that was done during the create. This
|
||||
* will indicate that further API deletion is now possible. */
|
||||
dds_handle_release (&entity->m_hdllink);
|
||||
dds_handle_unpin (&entity->m_hdllink);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -74,7 +75,7 @@ static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
|
||||
/* Reset the status for possible Listener call.
|
||||
* When a listener is not called, the status will be set (again). */
|
||||
dds_entity_status_reset (entity, 1u << status_id);
|
||||
dds_entity_status_reset (entity, (status_mask_t) (1u << status_id));
|
||||
|
||||
/* Update status metrics. */
|
||||
dds_writer * const wr = (dds_writer *) entity;
|
||||
|
@ -146,7 +147,7 @@ static void dds_writer_status_cb (void *ventity, const status_cb_data_t *data)
|
|||
}
|
||||
else
|
||||
{
|
||||
dds_entity_status_set (entity, 1u << status_id);
|
||||
dds_entity_status_set (entity, (status_mask_t) (1u << status_id));
|
||||
}
|
||||
|
||||
entity->m_cb_count--;
|
||||
|
@ -236,6 +237,13 @@ static struct whc *make_whc (const dds_qos_t *qos)
|
|||
return whc_new (handle_as_transient_local, hdepth, tldepth);
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_writer = {
|
||||
.close = dds_writer_close,
|
||||
.delete = dds_writer_delete,
|
||||
.set_qos = dds_writer_qos_set,
|
||||
.validate_status = dds_writer_status_validate
|
||||
};
|
||||
|
||||
dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entity_t topic, const dds_qos_t *qos, const dds_listener_t *listener)
|
||||
{
|
||||
dds_return_t rc;
|
||||
|
@ -249,13 +257,13 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
|
||||
{
|
||||
dds_entity *p_or_p;
|
||||
if ((rc = dds_entity_claim (participant_or_publisher, &p_or_p)) != DDS_RETCODE_OK)
|
||||
if ((rc = dds_entity_pin (participant_or_publisher, &p_or_p)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
if (dds_entity_kind (p_or_p) == DDS_KIND_PARTICIPANT)
|
||||
publisher = dds_create_publisher(participant_or_publisher, qos, NULL);
|
||||
else
|
||||
publisher = participant_or_publisher;
|
||||
dds_entity_release (p_or_p);
|
||||
dds_entity_unpin (p_or_p);
|
||||
}
|
||||
|
||||
if ((rc = dds_publisher_lock (publisher, &pub)) != DDS_RETCODE_OK)
|
||||
|
@ -291,17 +299,13 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
writer = dds_entity_init (&wr->m_entity, &pub->m_entity, DDS_KIND_WRITER, wqos, listener, DDS_WRITER_STATUS_MASK);
|
||||
|
||||
wr->m_topic = tp;
|
||||
dds_entity_add_ref_nolock (&tp->m_entity);
|
||||
dds_entity_add_ref_locked (&tp->m_entity);
|
||||
wr->m_xp = nn_xpack_new (conn, get_bandwidth_limit (wqos->transport_priority), config.xpack_send_async);
|
||||
wr->m_entity.m_deriver.close = dds_writer_close;
|
||||
wr->m_entity.m_deriver.delete = dds_writer_delete;
|
||||
wr->m_entity.m_deriver.set_qos = dds_writer_qos_set;
|
||||
wr->m_entity.m_deriver.validate_status = dds_writer_status_validate;
|
||||
wr->m_whc = make_whc (wqos);
|
||||
|
||||
/* Extra claim of this writer to make sure that the delete waits until DDSI
|
||||
* has deleted its writer as well. This can be known through the callback. */
|
||||
dds_handle_claim_inc (&wr->m_entity.m_hdllink);
|
||||
dds_handle_repin (&wr->m_entity.m_hdllink);
|
||||
|
||||
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
|
||||
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
|
||||
|
@ -312,7 +316,10 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
|
||||
assert(rc == DDS_RETCODE_OK);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
wr->m_entity.m_iid = get_entity_instance_id (&wr->m_entity.m_guid);
|
||||
dds_entity_register_child (&pub->m_entity, &wr->m_entity);
|
||||
|
||||
dds_topic_unlock (tp);
|
||||
dds_publisher_unlock (pub);
|
||||
return writer;
|
||||
|
@ -331,7 +338,7 @@ dds_entity_t dds_get_publisher (dds_entity_t writer)
|
|||
{
|
||||
dds_entity *e;
|
||||
dds_return_t rc;
|
||||
if ((rc = dds_entity_claim (writer, &e)) != DDS_RETCODE_OK)
|
||||
if ((rc = dds_entity_pin (writer, &e)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
else
|
||||
{
|
||||
|
@ -343,7 +350,7 @@ dds_entity_t dds_get_publisher (dds_entity_t writer)
|
|||
assert (dds_entity_kind (e->m_parent) == DDS_KIND_PUBLISHER);
|
||||
pubh = e->m_parent->m_hdllink.hdl;
|
||||
}
|
||||
dds_entity_release (e);
|
||||
dds_entity_unpin (e);
|
||||
return pubh;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ CU_Test(ddsc_instance_get_key, bad_entity, .init=setup, .fini=teardown)
|
|||
dds_return_t ret;
|
||||
|
||||
ret = dds_instance_get_key(participant, handle, &data);
|
||||
CU_ASSERT_EQUAL(ret, DDS_RETCODE_BAD_PARAMETER);
|
||||
CU_ASSERT_EQUAL(ret, DDS_RETCODE_ILLEGAL_OPERATION);
|
||||
}
|
||||
|
||||
CU_Test(ddsc_instance_get_key, null_data, .init=setup, .fini=teardown)
|
||||
|
|
|
@ -29,7 +29,6 @@ typedef void (*topic_cb_t) (struct dds_topic * topic);
|
|||
struct ddsi_sertopic_ops;
|
||||
|
||||
struct ddsi_sertopic {
|
||||
ddsrt_avl_node_t avlnode; /* index on name_typename */
|
||||
const struct ddsi_sertopic_ops *ops;
|
||||
const struct ddsi_serdata_ops *serdata_ops;
|
||||
uint32_t serdata_basehash;
|
||||
|
@ -38,9 +37,6 @@ struct ddsi_sertopic {
|
|||
char *type_name;
|
||||
uint64_t iid;
|
||||
ddsrt_atomic_uint32_t refc; /* counts refs from entities, not from data */
|
||||
|
||||
topic_cb_t status_cb;
|
||||
struct dds_topic * status_cb_entity;
|
||||
};
|
||||
|
||||
/* Called when the refcount dropped to zero */
|
||||
|
|
|
@ -2063,13 +2063,7 @@ static void writer_qos_mismatch (struct writer * wr, dds_qos_policy_id_t reason)
|
|||
{
|
||||
/* When the reason is DDS_INVALID_QOS_POLICY_ID, it means that we compared
|
||||
* readers/writers from different topics: ignore that. */
|
||||
if (reason != DDS_INVALID_QOS_POLICY_ID)
|
||||
{
|
||||
if (wr->topic->status_cb) {
|
||||
/* Handle INCONSISTENT_TOPIC on topic */
|
||||
(wr->topic->status_cb) (wr->topic->status_cb_entity);
|
||||
}
|
||||
if (wr->status_cb)
|
||||
if (reason != DDS_INVALID_QOS_POLICY_ID && wr->status_cb)
|
||||
{
|
||||
status_cb_data_t data;
|
||||
data.raw_status_id = (int) DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID;
|
||||
|
@ -2077,20 +2071,12 @@ static void writer_qos_mismatch (struct writer * wr, dds_qos_policy_id_t reason)
|
|||
(wr->status_cb) (wr->status_cb_entity, &data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void reader_qos_mismatch (struct reader * rd, dds_qos_policy_id_t reason)
|
||||
{
|
||||
/* When the reason is DDS_INVALID_QOS_POLICY_ID, it means that we compared
|
||||
* readers/writers from different topics: ignore that. */
|
||||
if (reason != DDS_INVALID_QOS_POLICY_ID)
|
||||
{
|
||||
if (rd->topic->status_cb)
|
||||
{
|
||||
/* Handle INCONSISTENT_TOPIC on topic */
|
||||
(rd->topic->status_cb) (rd->topic->status_cb_entity);
|
||||
}
|
||||
if (rd->status_cb)
|
||||
if (reason != DDS_INVALID_QOS_POLICY_ID && rd->status_cb)
|
||||
{
|
||||
status_cb_data_t data;
|
||||
data.raw_status_id = (int) DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID;
|
||||
|
@ -2098,7 +2084,6 @@ static void reader_qos_mismatch (struct reader * rd, dds_qos_policy_id_t reason)
|
|||
(rd->status_cb) (rd->status_cb_entity, &data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void connect_writer_with_proxy_reader (struct writer *wr, struct proxy_reader *prd, nn_mtime_t tnow)
|
||||
{
|
||||
|
|
|
@ -982,6 +982,7 @@ int main (int argc, char **argv)
|
|||
for (size_t i = 0; i < sizeof (rres_iseq) / sizeof (rres_iseq[0]); i++)
|
||||
RhcTypes_T_free (&rres_mseq[i], DDS_FREE_CONTENTS);
|
||||
|
||||
ddsi_sertopic_unref (mdtopic);
|
||||
dds_delete(pp);
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue