code cleanup: replacement of lots of function-like macros by inline functions, removal of unnecessary casts

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-01-10 10:41:27 +01:00
parent cf7eab5298
commit e4360d25a0
39 changed files with 748 additions and 678 deletions

View file

@ -76,11 +76,11 @@ DDS_EXPORT double dds_stream_read_double (dds_stream_t * is);
DDS_EXPORT char * dds_stream_read_string (dds_stream_t * is);
DDS_EXPORT void dds_stream_read_buffer (dds_stream_t * is, uint8_t * buffer, uint32_t len);
#define dds_stream_read_char(s) ((char) dds_stream_read_uint8 (s))
#define dds_stream_read_int8(s) ((int8_t) dds_stream_read_uint8 (s))
#define dds_stream_read_int16(s) ((int16_t) dds_stream_read_uint16 (s))
#define dds_stream_read_int32(s) ((int32_t) dds_stream_read_uint32 (s))
#define dds_stream_read_int64(s) ((int64_t) dds_stream_read_uint64 (s))
inline char dds_stream_read_char (dds_stream_t *is) { return (char) dds_stream_read_uint8 (is); }
inline int8_t dds_stream_read_int8 (dds_stream_t *is) { return (int8_t) dds_stream_read_uint8 (is); }
inline int16_t dds_stream_read_int16 (dds_stream_t *is) { return (int16_t) dds_stream_read_uint16 (is); }
inline int32_t dds_stream_read_int32 (dds_stream_t *is) { return (int32_t) dds_stream_read_uint32 (is); }
inline int64_t dds_stream_read_int64 (dds_stream_t *is) { return (int64_t) dds_stream_read_uint64 (is); }
DDS_EXPORT void dds_stream_write_bool (dds_stream_t * os, bool val);
DDS_EXPORT void dds_stream_write_uint8 (dds_stream_t * os, uint8_t val);
@ -94,11 +94,11 @@ DDS_EXPORT void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, const
DDS_EXPORT void *dds_stream_address (dds_stream_t * s);
DDS_EXPORT void *dds_stream_alignto (dds_stream_t * s, uint32_t a);
#define dds_stream_write_char(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v)))
#define dds_stream_write_int8(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v)))
#define dds_stream_write_int16(s,v) (dds_stream_write_uint16 ((s), (uint16_t)(v)))
#define dds_stream_write_int32(s,v) (dds_stream_write_uint32 ((s), (uint32_t)(v)))
#define dds_stream_write_int64(s,v) (dds_stream_write_uint64 ((s), (uint64_t)(v)))
inline void dds_stream_write_char (dds_stream_t * os, char val) { dds_stream_write_uint8 (os, (uint8_t) val); }
inline void dds_stream_write_int8 (dds_stream_t * os, int8_t val) { dds_stream_write_uint8 (os, (uint8_t) val); }
inline void dds_stream_write_int16 (dds_stream_t * os, int16_t val) { dds_stream_write_uint16 (os, (uint16_t) val); }
inline void dds_stream_write_int32 (dds_stream_t * os, int32_t val) { dds_stream_write_uint32 (os, (uint32_t) val); }
inline void dds_stream_write_int64 (dds_stream_t * os, int64_t val) { dds_stream_write_uint64 (os, (uint64_t) val); }
#if defined (__cplusplus)
}

View file

@ -45,11 +45,48 @@ dds_entity_listener_propagation(
_In_opt_ void *metrics,
_In_ bool propagate);
#define dds_entity_is_enabled(e, k) (((dds_entity*)e)->m_flags & DDS_ENTITY_ENABLED)
#define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \
qualifier_ dds__retcode_t type_##_lock (dds_entity_t hdl, type_ **x) \
{ \
dds__retcode_t rc; \
dds_entity *e; \
if ((rc = dds_entity_lock (hdl, kind_, &e)) != DDS_RETCODE_OK) \
return rc; \
*x = (type_ *) e; \
return DDS_RETCODE_OK; \
} \
\
qualifier_ void type_##_unlock (type_ *x) \
{ \
dds_entity_unlock (&x->m_entity); \
}
#define DECL_ENTITY_LOCK_UNLOCK(qualifier_, type_) \
qualifier_ dds__retcode_t type_##_lock (dds_entity_t hdl, type_ **x); \
qualifier_ void type_##_unlock (type_ *x);
#define dds_entity_status_set(e, t) (((dds_entity*)e)->m_trigger |= (((dds_entity*)e)->m_status_enable & t))
#define dds_entity_status_reset(e,t) (((dds_entity*)e)->m_trigger &= ~t)
#define dds_entity_status_match(e,t) (((dds_entity*)e)->m_trigger & t)
inline bool dds_entity_is_enabled (const dds_entity *e) {
return (e->m_flags & DDS_ENTITY_ENABLED) != 0;
}
inline void dds_entity_status_set (dds_entity *e, uint32_t t) {
e->m_trigger |= e->m_status_enable & t;
}
inline void dds_entity_status_reset (dds_entity *e, uint32_t t) {
e->m_trigger &= ~t;
}
inline bool dds_entity_status_match (const dds_entity *e, uint32_t t) {
return (e->m_trigger & t) != 0;
}
inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) {
return (dds_entity_kind_t) (e->m_hdl & DDS_ENTITY_KIND_MASK);
}
inline dds_entity_kind_t dds_entity_kind_from_handle (dds_entity_t hdl) {
return (hdl > 0) ? (dds_entity_kind_t) (hdl & DDS_ENTITY_KIND_MASK) : DDS_KIND_DONTCARE;
}
/* The mutex needs to be unlocked when calling this because the entity can be called
* within the signal callback from other contexts. That shouldn't deadlock. */
@ -74,8 +111,6 @@ void
dds_entity_unlock(
_Inout_ dds_entity *e);
#define dds_entity_kind(hdl) ((hdl > 0) ? (hdl & DDS_ENTITY_KIND_MASK) : 0)
_Check_return_ dds__retcode_t
dds_entity_observer_register_nl(
_In_ dds_entity* observed,

View file

@ -18,4 +18,6 @@ _Must_inspect_result_ dds_guardcond*
dds_create_guardcond(
_In_ dds_participant *pp);
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_guardcond, DDS_KIND_COND_GUARD)
#endif

View file

@ -12,10 +12,13 @@
#ifndef _DDS_PPANT_H_
#define _DDS_PPANT_H_
#include "dds__entity.h"
#if defined (__cplusplus)
extern "C" {
#endif
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_participant, DDS_KIND_PARTICIPANT)
#if defined (__cplusplus)
}

View file

@ -13,6 +13,7 @@
#define _DDS_READER_H_
#include "dds__types.h"
#include "dds__entity.h"
#if defined (__cplusplus)
extern "C" {
@ -20,7 +21,7 @@ extern "C" {
struct status_cb_data;
void dds_reader_status_cb (void * entity, const struct status_cb_data * data);
void dds_reader_status_cb (void *entity, const struct status_cb_data * data);
/*
dds_reader_lock_samples: Returns number of samples in read cache and locks the
@ -40,8 +41,7 @@ struct nn_rsample_info;
struct nn_rdata;
DDS_EXPORT void dds_reader_ddsi2direct (dds_entity_t entity, void (*cb) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg), void *cbarg);
#define dds_reader_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_READER, (dds_entity**)obj)
#define dds_reader_unlock(obj) dds_entity_unlock((dds_entity*)obj);
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_reader, DDS_KIND_READER)
#if defined (__cplusplus)
}

View file

@ -13,13 +13,13 @@
#define _DDS_TOPIC_H_
#include "dds__types.h"
#include "dds__entity.h"
#if defined (__cplusplus)
extern "C" {
#endif
#define dds_topic_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_TOPIC, (dds_entity**)obj)
#define dds_topic_unlock(obj) dds_entity_unlock((dds_entity*)obj);
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_topic, DDS_KIND_TOPIC)
extern struct ddsi_sertopic * dds_topic_lookup (dds_domain * domain, const char * name);
extern void dds_topic_free (dds_domainid_t domainid, struct ddsi_sertopic * st);

View file

@ -18,8 +18,7 @@
extern "C" {
#endif
#define dds_writer_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_WRITER, (dds_entity**)obj)
#define dds_writer_unlock(obj) dds_entity_unlock((dds_entity*)obj);
DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER)
#if defined (__cplusplus)
}

View file

@ -148,19 +148,17 @@ dds_entity_t dds__get_builtin_subscriber (dds_entity_t e)
dds_return_t ret;
dds_entity_t pp;
dds_participant *p;
dds_entity *part_entity;
if ((pp = dds_get_participant (e)) <= 0)
return pp;
if ((ret = dds_entity_lock (pp, DDS_KIND_PARTICIPANT, &part_entity)) < 0)
if ((ret = dds_participant_lock (pp, &p)) != DDS_RETCODE_OK)
return ret;
p = (dds_participant *) part_entity;
if (p->m_builtin_subscriber <= 0) {
p->m_builtin_subscriber = dds__create_builtin_subscriber (part_entity);
p->m_builtin_subscriber = dds__create_builtin_subscriber (&p->m_entity);
}
sub = p->m_builtin_subscriber;
dds_entity_unlock(part_entity);
dds_participant_unlock(p);
return sub;
}

View file

@ -27,7 +27,7 @@ dds_begin_coherent(
{
dds_return_t ret;
switch(dds_entity_kind(entity)) {
switch(dds_entity_kind_from_handle(entity)) {
case DDS_KIND_READER:
case DDS_KIND_WRITER:
/* Invoking on a writer/reader behaves as if invoked on
@ -58,7 +58,7 @@ dds_end_coherent(
{
dds_return_t ret;
switch(dds_entity_kind(entity)) {
switch(dds_entity_kind_from_handle(entity)) {
case DDS_KIND_READER:
case DDS_KIND_WRITER:
/* Invoking on a writer/reader behaves as if invoked on

View file

@ -27,7 +27,7 @@ const ut_avlTreedef_t dds_domaintree_def = UT_AVL_TREEDEF_INITIALIZER
dds_domain * dds_domain_find_locked (dds_domainid_t id)
{
return (dds_domain*) ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &id);
return ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &id);
}
dds_domain * dds_domain_create (dds_domainid_t id)

View file

@ -25,6 +25,12 @@
#endif
extern inline bool dds_entity_is_enabled (const dds_entity *e);
extern inline void dds_entity_status_set (dds_entity *e, uint32_t t);
extern inline void dds_entity_status_reset (dds_entity *e, uint32_t t);
extern inline bool dds_entity_status_match (const dds_entity *e, uint32_t t);
extern inline dds_entity_kind_t dds_entity_kind (const dds_entity *e);
extern inline dds_entity_kind_t dds_entity_kind_from_handle (dds_entity_t hdl);
static void
dds_entity_observers_delete(
@ -83,7 +89,7 @@ dds_entity_listener_propagation(
if (e) {
rc = dds_entity_lock(e->m_hdl, DDS_KIND_DONTCARE, &dummy);
if (rc == DDS_RETCODE_OK) {
dds_listener_t *l = (dds_listener_t *)(&e->m_listener);
dds_listener_t *l = &e->m_listener;
assert(e == dummy);
@ -360,12 +366,12 @@ dds_delete_impl(
* To circumvent the problem. We ignore topics in the first loop.
*/
child = e->m_children;
while ((child != NULL) && (dds_entity_kind(child->m_hdl) == DDS_KIND_TOPIC)) {
while ((child != NULL) && (dds_entity_kind_from_handle(child->m_hdl) == DDS_KIND_TOPIC)) {
child = child->m_next;
}
while ((child != NULL) && (ret == DDS_RETCODE_OK)) {
next = child->m_next;
while ((next != NULL) && (dds_entity_kind(next->m_hdl) == DDS_KIND_TOPIC)) {
while ((next != NULL) && (dds_entity_kind_from_handle(next->m_hdl) == DDS_KIND_TOPIC)) {
next = next->m_next;
}
/* This will probably delete the child entry from
@ -377,7 +383,7 @@ dds_delete_impl(
child = e->m_children;
while ((child != NULL) && (ret == DDS_RETCODE_OK)) {
next = child->m_next;
assert(dds_entity_kind(child->m_hdl) == DDS_KIND_TOPIC);
assert(dds_entity_kind_from_handle(child->m_hdl) == DDS_KIND_TOPIC);
/* This will probably delete the child entry from
* the current childrens list */
ret = dds_delete(child->m_hdl);
@ -1282,7 +1288,7 @@ dds_get_topic(
if (rc == DDS_RETCODE_OK) {
hdl = wr->m_topic->m_entity.m_hdl;
dds_writer_unlock(wr);
} else if (dds_entity_kind(entity) == DDS_KIND_COND_READ || dds_entity_kind(entity) == DDS_KIND_COND_QUERY) {
} else if (dds_entity_kind_from_handle(entity) == DDS_KIND_COND_READ || dds_entity_kind_from_handle(entity) == DDS_KIND_COND_QUERY) {
hdl = dds_get_topic(dds_get_parent(entity));
rc = DDS_RETCODE_OK;
}

View file

@ -13,18 +13,20 @@
#include <string.h>
#include "dds__reader.h"
#include "dds__guardcond.h"
#include "dds__entity.h"
#include "dds__participant.h"
#include "dds__err.h"
#include "ddsi/q_ephash.h"
#include "ddsi/q_entity.h"
#include "ddsi/q_thread.h"
DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_guardcond)
_Must_inspect_result_ dds_guardcond*
dds_create_guardcond(
_In_ dds_participant *pp)
{
dds_guardcond * gcond = dds_alloc(sizeof(*gcond));
gcond->m_entity.m_hdl = dds_entity_init(&gcond->m_entity, (dds_entity*)pp, DDS_KIND_COND_GUARD, NULL, NULL, 0);
gcond->m_entity.m_hdl = dds_entity_init(&gcond->m_entity, &pp->m_entity, DDS_KIND_COND_GUARD, NULL, NULL, 0);
return gcond;
}
@ -34,15 +36,15 @@ dds_create_guardcondition(
_In_ dds_entity_t participant)
{
dds_entity_t hdl;
dds_entity * pp;
dds_participant * pp;
dds__retcode_t rc;
rc = dds_entity_lock(participant, DDS_KIND_PARTICIPANT, &pp);
rc = dds_participant_lock(participant, &pp);
if (rc == DDS_RETCODE_OK) {
dds_guardcond *cond = dds_create_guardcond((dds_participant *)pp);
dds_guardcond *cond = dds_create_guardcond(pp);
assert(cond);
hdl = cond->m_entity.m_hdl;
dds_entity_unlock(pp);
dds_participant_unlock(pp);
} else {
DDS_ERROR("Error occurred on locking reader\n");
hdl = DDS_ERRNO(rc);
@ -64,10 +66,10 @@ dds_set_guardcondition(
rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond);
if (rc == DDS_RETCODE_OK) {
if (triggered) {
dds_entity_status_set(gcond, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_set(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_signal(&gcond->m_entity);
} else {
dds_entity_status_reset(gcond, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
}
dds_entity_unlock(&gcond->m_entity);
ret = DDS_RETCODE_OK;
@ -91,10 +93,10 @@ dds_read_guardcondition(
if (triggered != NULL) {
*triggered = false;
rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond);
rc = dds_guardcond_lock(condition, &gcond);
if (rc == DDS_RETCODE_OK) {
*triggered = dds_entity_status_match(gcond, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_unlock((dds_entity*)gcond);
*triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_guardcond_unlock(gcond);
ret = DDS_RETCODE_OK;
} else {
DDS_ERROR("Argument condition is not valid\n");
@ -120,11 +122,11 @@ dds_take_guardcondition(
if (triggered != NULL) {
*triggered = false;
rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond);
rc = dds_guardcond_lock(condition, &gcond);
if (rc == DDS_RETCODE_OK) {
*triggered = dds_entity_status_match(gcond, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_reset(gcond, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_unlock((dds_entity*)gcond);
*triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS);
dds_guardcond_unlock (gcond);
ret = DDS_RETCODE_OK;
} else {
DDS_ERROR("Argument condition is not valid\n");

View file

@ -81,19 +81,20 @@ dds_instance_remove(
}
}
static const dds_topic*
dds_instance_info(
_In_ dds_entity *e)
static const dds_topic *dds_instance_info (dds_entity *e)
{
const dds_topic *topic = NULL;
assert (e);
assert ((dds_entity_kind(e->m_hdl) == DDS_KIND_READER) || (dds_entity_kind(e->m_hdl) == DDS_KIND_WRITER));
if (dds_entity_kind(e->m_hdl) == DDS_KIND_READER) {
topic = ((dds_reader*)e)->m_topic;
} else {
topic = ((dds_writer*)e)->m_topic;
const dds_topic *topic;
switch (dds_entity_kind (e))
{
case DDS_KIND_READER:
topic = ((dds_reader*) e)->m_topic;
break;
case DDS_KIND_WRITER:
topic = ((dds_writer*) e)->m_topic;
break;
default:
assert (0);
topic = NULL;
}
return topic;
}
@ -127,7 +128,7 @@ dds_register_instance(
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
struct ddsi_tkmap_instance * inst;
dds_entity *wr;
dds_writer *wr;
dds_return_t ret;
dds__retcode_t rc;
@ -141,7 +142,7 @@ dds_register_instance(
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
}
rc = dds_entity_lock(writer, DDS_KIND_WRITER, &wr);
rc = dds_writer_lock(writer, &wr);
if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Error occurred on locking writer\n");
ret = DDS_ERRNO(rc);
@ -150,7 +151,7 @@ dds_register_instance(
if (asleep) {
thread_state_awake(thr);
}
inst = dds_instance_find (((dds_writer*) wr)->m_topic, data, true);
inst = dds_instance_find (wr->m_topic, data, true);
if(inst != NULL){
*handle = inst->m_iid;
ret = DDS_RETCODE_OK;
@ -161,7 +162,7 @@ dds_register_instance(
if (asleep) {
thread_state_asleep(thr);
}
dds_entity_unlock(wr);
dds_writer_unlock(wr);
err:
return ret;
}
@ -197,8 +198,7 @@ dds_unregister_instance_ts(
dds__retcode_t rc;
bool autodispose = true;
dds_write_action action = DDS_WR_ACTION_UNREGISTER;
void * sample = (void*) data;
dds_entity *wr;
dds_writer *wr;
if (data == NULL){
DDS_ERROR("Argument data is NULL\n");
@ -210,28 +210,28 @@ dds_unregister_instance_ts(
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
}
rc = dds_entity_lock(writer, DDS_KIND_WRITER, &wr);
rc = dds_writer_lock(writer, &wr);
if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Error occurred on locking writer\n");
ret = DDS_ERRNO(rc);
goto err;
}
if (wr->m_qos) {
dds_qget_writer_data_lifecycle (wr->m_qos, &autodispose);
if (wr->m_entity.m_qos) {
dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose);
}
if (asleep) {
thread_state_awake(thr);
}
if (autodispose) {
dds_instance_remove (((dds_writer*) wr)->m_topic, data, DDS_HANDLE_NIL);
dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL);
action |= DDS_WR_DISPOSE_BIT;
}
ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action);
ret = dds_write_impl (wr, data, timestamp, action);
if (asleep) {
thread_state_asleep(thr);
}
dds_entity_unlock(wr);
dds_writer_unlock(wr);
err:
return ret;
}
@ -249,21 +249,21 @@ dds_unregister_instance_ih_ts(
dds__retcode_t rc;
bool autodispose = true;
dds_write_action action = DDS_WR_ACTION_UNREGISTER;
dds_entity *wr;
dds_writer *wr;
struct ddsi_tkmap_instance *tk;
rc = dds_entity_lock(writer, DDS_KIND_WRITER, &wr);
rc = dds_writer_lock(writer, &wr);
if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Error occurred on locking writer\n");
ret = DDS_ERRNO(rc);
goto err;
}
if (wr->m_qos) {
dds_qget_writer_data_lifecycle (wr->m_qos, &autodispose);
if (wr->m_entity.m_qos) {
dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose);
}
if (autodispose) {
dds_instance_remove (((dds_writer*) wr)->m_topic, NULL, handle);
dds_instance_remove (wr->m_topic, NULL, handle);
action |= DDS_WR_DISPOSE_BIT;
}
@ -272,11 +272,11 @@ dds_unregister_instance_ih_ts(
}
tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle);
if (tk) {
struct ddsi_sertopic *tp = ((dds_writer*) wr)->m_topic->m_stopic;
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
void *sample = ddsi_sertopic_alloc_sample (tp);
ddsi_serdata_topicless_to_sample (tp, tk->m_sample, sample, NULL, NULL);
ddsi_tkmap_instance_unref (tk);
ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action);
ret = dds_write_impl (wr, sample, timestamp, action);
ddsi_sertopic_free_sample (tp, sample, DDS_FREE_ALL);
} else {
DDS_ERROR("No instance related with the provided handle is found\n");
@ -285,7 +285,7 @@ dds_unregister_instance_ih_ts(
if (asleep) {
thread_state_asleep(thr);
}
dds_entity_unlock(wr);
dds_writer_unlock(wr);
err:
return ret;
}
@ -392,7 +392,7 @@ dds_dispose_ih_ts(
thread_state_awake(thr);
}
if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle)) != NULL) {
struct ddsi_sertopic *tp = ((dds_writer*) wr)->m_topic->m_stopic;
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
void *sample = ddsi_sertopic_alloc_sample (tp);
ddsi_serdata_topicless_to_sample (tp, tk->m_sample, sample, NULL, NULL);
ddsi_tkmap_instance_unref (tk);

View file

@ -180,7 +180,7 @@ void
dds_lset_data_available (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_data_available_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_data_available = callback;
listener->on_data_available = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -190,7 +190,7 @@ void
dds_lset_data_on_readers (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_data_on_readers_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_data_on_readers = callback;
listener->on_data_on_readers = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -200,7 +200,7 @@ void
dds_lset_inconsistent_topic (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_inconsistent_topic_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_inconsistent_topic = callback;
listener->on_inconsistent_topic = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -210,7 +210,7 @@ void
dds_lset_liveliness_changed (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_liveliness_changed_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_liveliness_changed = callback;
listener->on_liveliness_changed = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -220,7 +220,7 @@ void
dds_lset_liveliness_lost (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_liveliness_lost_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_liveliness_lost = callback;
listener->on_liveliness_lost = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -230,7 +230,7 @@ void
dds_lset_offered_deadline_missed (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_offered_deadline_missed_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_offered_deadline_missed = callback;
listener->on_offered_deadline_missed = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -240,7 +240,7 @@ void
dds_lset_offered_incompatible_qos (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_offered_incompatible_qos_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_offered_incompatible_qos = callback;
listener->on_offered_incompatible_qos = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -250,7 +250,7 @@ void
dds_lset_publication_matched (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_publication_matched_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_publication_matched = callback;
listener->on_publication_matched = callback;
} else {
DDS_ERROR("Argument listener is NULL");
}
@ -260,7 +260,7 @@ void
dds_lset_requested_deadline_missed (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_requested_deadline_missed_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_requested_deadline_missed = callback;
listener->on_requested_deadline_missed = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -270,7 +270,7 @@ void
dds_lset_requested_incompatible_qos (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_requested_incompatible_qos_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_requested_incompatible_qos = callback;
listener->on_requested_incompatible_qos = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -280,7 +280,7 @@ void
dds_lset_sample_lost (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_sample_lost_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_sample_lost = callback;
listener->on_sample_lost = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -290,7 +290,7 @@ void
dds_lset_sample_rejected (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_sample_rejected_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_sample_rejected = callback;
listener->on_sample_rejected = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -300,7 +300,7 @@ void
dds_lset_subscription_matched (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_subscription_matched_fn callback)
{
if (listener) {
((c_listener_t*)listener)->on_subscription_matched = callback;
listener->on_subscription_matched = callback;
} else {
DDS_ERROR("Argument listener is NULL\n");
}
@ -321,7 +321,7 @@ dds_lget_data_available (_In_ const dds_listener_t * __restrict listener, _Outpt
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_data_available;
*callback = listener->on_data_available;
}
void
@ -335,7 +335,7 @@ dds_lget_data_on_readers (_In_ const dds_listener_t * __restrict listener, _Outp
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_data_on_readers;
*callback = listener->on_data_on_readers;
}
void dds_lget_inconsistent_topic (_In_ const dds_listener_t * __restrict listener, _Outptr_result_maybenull_ dds_on_inconsistent_topic_fn *callback)
@ -348,7 +348,7 @@ void dds_lget_inconsistent_topic (_In_ const dds_listener_t * __restrict listene
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_inconsistent_topic;
*callback = listener->on_inconsistent_topic;
}
void
@ -362,7 +362,7 @@ dds_lget_liveliness_changed (_In_ const dds_listener_t * __restrict listener, _O
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_liveliness_changed;
*callback = listener->on_liveliness_changed;
}
void
@ -376,7 +376,7 @@ dds_lget_liveliness_lost (_In_ const dds_listener_t * __restrict listener, _Outp
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_liveliness_lost;
*callback = listener->on_liveliness_lost;
}
void
@ -390,7 +390,7 @@ dds_lget_offered_deadline_missed (_In_ const dds_listener_t * __restrict listene
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_offered_deadline_missed;
*callback = listener->on_offered_deadline_missed;
}
void
@ -404,7 +404,7 @@ dds_lget_offered_incompatible_qos (_In_ const dds_listener_t * __restrict listen
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_offered_incompatible_qos;
*callback = listener->on_offered_incompatible_qos;
}
void
@ -418,7 +418,7 @@ dds_lget_publication_matched (_In_ const dds_listener_t * __restrict listener, _
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_publication_matched;
*callback = listener->on_publication_matched;
}
void
@ -432,7 +432,7 @@ dds_lget_requested_deadline_missed (_In_ const dds_listener_t * __restrict liste
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_requested_deadline_missed;
*callback = listener->on_requested_deadline_missed;
}
void
@ -446,7 +446,7 @@ dds_lget_requested_incompatible_qos (_In_ const dds_listener_t * __restrict list
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_requested_incompatible_qos;
*callback = listener->on_requested_incompatible_qos;
}
void
@ -460,7 +460,7 @@ dds_lget_sample_lost (_In_ const dds_listener_t *__restrict listener, _Outptr_re
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_sample_lost;
*callback = listener->on_sample_lost;
}
void
@ -474,7 +474,7 @@ dds_lget_sample_rejected (_In_ const dds_listener_t *__restrict listener, _Outp
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_sample_rejected;
*callback = listener->on_sample_rejected;
}
void
@ -488,5 +488,5 @@ dds_lget_subscription_matched (_In_ const dds_listener_t * __restrict listener,
DDS_ERROR("Argument listener is NULL\n");
return ;
}
*callback = ((c_listener_t*)listener)->on_subscription_matched;
*callback = listener->on_subscription_matched;
}

View file

@ -20,6 +20,8 @@
#include "dds__err.h"
#include "dds__builtin.h"
DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_participant)
#define DDS_PARTICIPANT_STATUS_MASK 0u
/* List of created participants */
@ -51,7 +53,7 @@ dds_participant_delete(
assert(e);
assert(thr);
assert(dds_entity_kind(e->m_hdl) == DDS_KIND_PARTICIPANT);
assert(dds_entity_kind_from_handle(e->m_hdl) == DDS_KIND_PARTICIPANT);
if (asleep) {
thread_state_awake(thr);

View file

@ -149,7 +149,7 @@ dds_suspend(
{
dds_return_t ret;
if(dds_entity_kind(publisher) != DDS_KIND_PUBLISHER) {
if(dds_entity_kind_from_handle(publisher) != DDS_KIND_PUBLISHER) {
DDS_ERROR("Provided entity is not a publisher kind\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
@ -169,7 +169,7 @@ dds_resume(
{
dds_return_t ret = DDS_RETCODE_OK;
if(dds_entity_kind(publisher) != DDS_KIND_PUBLISHER) {
if(dds_entity_kind_from_handle(publisher) != DDS_KIND_PUBLISHER) {
DDS_ERROR("Provided entity is not a publisher kind\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
@ -194,7 +194,7 @@ dds_wait_for_acks(
/* TODO: CHAM-125 Currently unsupported. */
OS_UNUSED_ARG(timeout);
switch(dds_entity_kind(publisher_or_writer)) {
switch(dds_entity_kind_from_handle(publisher_or_writer)) {
case DDS_KIND_WRITER:
DDS_ERROR("Wait for acknowledgments on a writer is not being supported yet\n");
ret = DDS_ERRNO(DDS_RETCODE_UNSUPPORTED);

View file

@ -21,59 +21,53 @@
#include "ddsi/q_entity.h"
#include "ddsi/ddsi_sertopic.h"
static _Check_return_ dds__retcode_t
dds_read_lock(
_In_ dds_entity_t hdl,
_Out_ dds_reader **reader,
_Out_ dds_readcond **condition,
_In_ bool only_reader)
static dds__retcode_t dds_read_lock (dds_entity_t hdl, dds_reader **reader, dds_readcond **condition, bool only_reader)
{
dds__retcode_t rc = hdl;
assert(reader);
assert(condition);
*reader = NULL;
*condition = NULL;
rc = dds_entity_lock(hdl, DDS_KIND_READER, (dds_entity**)reader);
if (rc == DDS_RETCODE_ILLEGAL_OPERATION) {
if (!only_reader) {
if ((dds_entity_kind(hdl) == DDS_KIND_COND_READ ) || (dds_entity_kind(hdl) == DDS_KIND_COND_QUERY) ){
rc = dds_entity_lock(hdl, DDS_KIND_DONTCARE, (dds_entity**)condition);
if (rc == DDS_RETCODE_OK) {
dds_entity *parent = ((dds_entity*)*condition)->m_parent;
assert(parent);
rc = dds_entity_lock(parent->m_hdl, DDS_KIND_READER, (dds_entity**)reader);
if (rc != DDS_RETCODE_OK) {
dds_entity_unlock((dds_entity*)*condition);
DDS_ERROR("Failed to lock condition reader\n");
}
} else {
DDS_ERROR("Failed to lock condition\n");
}
} else {
DDS_ERROR("Given entity is not a reader nor a condition\n");
}
} else {
DDS_ERROR("Given entity is not a reader\n");
}
} else if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Failed to lock reader\n");
}
dds__retcode_t rc;
dds_entity *entity, *parent_entity;
if ((rc = dds_entity_lock (hdl, DDS_KIND_DONTCARE, &entity)) != DDS_RETCODE_OK)
{
return rc;
}
static void
dds_read_unlock(
_In_ dds_reader *reader,
_In_ dds_readcond *condition)
{
assert(reader);
dds_entity_unlock((dds_entity*)reader);
if (condition) {
dds_entity_unlock((dds_entity*)condition);
}
else if (dds_entity_kind (entity) == DDS_KIND_READER)
{
*reader = (dds_reader *) entity;
*condition = NULL;
return DDS_RETCODE_OK;
}
else if (only_reader)
{
dds_entity_unlock (entity);
DDS_ERROR ("Given entity is not a reader\n");
return DDS_RETCODE_ILLEGAL_OPERATION;
}
else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY)
{
dds_entity_unlock (entity);
DDS_ERROR ("Given entity is a reader nor a condition\n");
return DDS_RETCODE_ILLEGAL_OPERATION;
}
else if ((rc = dds_entity_lock (entity->m_parent->m_hdl, DDS_KIND_READER, &parent_entity)) != DDS_RETCODE_OK)
{
dds_entity_unlock (entity);
DDS_ERROR ("Failed to lock condition's reader\n");
return rc;
}
else
{
*reader = (dds_reader *) parent_entity;
*condition = (dds_readcond *) entity;
return DDS_RETCODE_OK;
}
}
static void dds_read_unlock (dds_reader *reader, dds_readcond *condition)
{
dds_entity_unlock (&reader->m_entity);
if (condition)
dds_entity_unlock (&condition->m_entity);
}
/*
dds_read_impl: Core read/take function. Usually maxs is size of buf and si
into which samples/status are written, when set to zero is special case
@ -172,10 +166,10 @@ dds_read_impl(
ret = (dds_return_t)dds_rhc_read(rd->m_rd->rhc, lock, buf, si, maxs, mask, hand, cond);
}
/* read/take resets data available status */
dds_entity_status_reset(rd, DDS_DATA_AVAILABLE_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
/* reset DATA_ON_READERS status on subscriber after successful read/take */
if (dds_entity_kind(((dds_entity*)rd)->m_parent->m_hdl) == DDS_KIND_SUBSCRIBER) {
dds_entity_status_reset(((dds_entity*)rd)->m_parent, DDS_DATA_ON_READERS_STATUS);
if (dds_entity_kind_from_handle(rd->m_entity.m_parent->m_hdl) == DDS_KIND_SUBSCRIBER) {
dds_entity_status_reset(rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
}
dds_read_unlock(rd, cond);
@ -227,13 +221,13 @@ dds_readcdr_impl(
);
/* read/take resets data available status */
dds_entity_status_reset(rd, DDS_DATA_AVAILABLE_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
/* reset DATA_ON_READERS status on subscriber after successful read/take */
if (dds_entity_kind(((dds_entity*)rd)->m_parent->m_hdl) == DDS_KIND_SUBSCRIBER)
if (dds_entity_kind_from_handle(rd->m_entity.m_parent->m_hdl) == DDS_KIND_SUBSCRIBER)
{
dds_entity_status_reset(((dds_entity*)rd)->m_parent, DDS_DATA_ON_READERS_STATUS);
dds_entity_status_reset(rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS);
}
dds_read_unlock(rd, cond);
} else {

View file

@ -41,7 +41,7 @@ dds_create_readcond(
cond->m_sample_states = mask & DDS_ANY_SAMPLE_STATE;
cond->m_view_states = mask & DDS_ANY_VIEW_STATE;
cond->m_instance_states = mask & DDS_ANY_INSTANCE_STATE;
cond->m_rd_guid = ((dds_entity*)rd)->m_guid;
cond->m_rd_guid = rd->m_entity.m_guid;
dds_rhc_add_readcondition (cond);
return cond;
}
@ -70,17 +70,13 @@ dds_create_readcondition(
return hdl;
}
_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_READ ) || \
((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_QUERY) )
dds_entity_t
dds_get_datareader(
_In_ dds_entity_t condition)
dds_entity_t dds_get_datareader (dds_entity_t condition)
{
dds_entity_t hdl;
if (dds_entity_kind(condition) == DDS_KIND_COND_READ) {
if (dds_entity_kind_from_handle(condition) == DDS_KIND_COND_READ) {
hdl = dds_get_parent(condition);
} else if (dds_entity_kind(condition) == DDS_KIND_COND_QUERY) {
} else if (dds_entity_kind_from_handle(condition) == DDS_KIND_COND_QUERY) {
hdl = dds_get_parent(condition);
} else {
DDS_ERROR("Argument condition is not valid\n");
@ -90,39 +86,26 @@ dds_get_datareader(
return hdl;
}
_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_READ ) || \
((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_QUERY) )
_Check_return_ dds_return_t
dds_get_mask(
_In_ dds_entity_t condition,
_Out_ uint32_t *mask)
dds_return_t dds_get_mask (dds_entity_t condition, uint32_t *mask)
{
dds_return_t ret;
dds_readcond *cond;
dds_entity *entity;
dds__retcode_t rc;
if (mask != NULL) {
*mask = 0;
if ((dds_entity_kind(condition) == DDS_KIND_COND_READ ) ||
(dds_entity_kind(condition) == DDS_KIND_COND_QUERY) ){
rc = dds_entity_lock(condition, DDS_KIND_DONTCARE, (dds_entity**)&cond);
if (rc == DDS_RETCODE_OK) {
*mask = (cond->m_sample_states | cond->m_view_states | cond->m_instance_states);
dds_entity_unlock((dds_entity*)cond);
ret = DDS_RETCODE_OK;
} else{
DDS_ERROR("Error occurred on locking condition\n");
ret = DDS_ERRNO(rc);
}
} else {
DDS_ERROR("Argument condition is not valid\n");
ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_READ));
}
} else {
DDS_ERROR("Argument mask is NULL\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
if (mask == NULL)
return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
return ret;
if ((rc = dds_entity_lock (condition, DDS_KIND_DONTCARE, &entity)) != DDS_RETCODE_OK)
return DDS_ERRNO (rc);
else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY)
{
dds_entity_unlock (entity);
return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_READ));
}
else
{
dds_readcond *cond = (dds_readcond *) entity;
*mask = (cond->m_sample_states | cond->m_view_states | cond->m_instance_states);
dds_entity_unlock (entity);
return DDS_RETCODE_OK;
}
}

View file

@ -19,6 +19,7 @@
#include "dds__init.h"
#include "dds__rhc.h"
#include "dds__err.h"
#include "dds__topic.h"
#include "ddsi/q_entity.h"
#include "ddsi/q_thread.h"
#include "dds__builtin.h"
@ -27,6 +28,7 @@
#include "os/os.h"
DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_reader)
#define DDS_READER_STATUS_MASK \
DDS_SAMPLE_REJECTED_STATUS |\
@ -162,9 +164,10 @@ dds_reader_status_validate(
void
dds_reader_status_cb(
void *entity,
void *ventity,
const status_cb_data_t *data)
{
struct dds_entity * const entity = ventity;
dds_reader *rd;
dds__retcode_t rc;
void *metrics = NULL;
@ -173,14 +176,14 @@ dds_reader_status_cb(
if (data == NULL) {
/* Release the initial claim that was done during the create. This
* will indicate that further API deletion is now possible. */
ut_handle_release(((dds_entity*)entity)->m_hdl, ((dds_entity*)entity)->m_hdllink);
ut_handle_release(entity->m_hdl, ((dds_entity*)entity)->m_hdllink);
return;
}
if (dds_reader_lock(((dds_entity*)entity)->m_hdl, &rd) != DDS_RETCODE_OK) {
if (dds_reader_lock(entity->m_hdl, &rd) != DDS_RETCODE_OK) {
return;
}
assert(rd == entity);
assert(&rd->m_entity == entity);
/* Reset the status for possible Listener call.
* When a listener is not called, the status will be set (again). */
@ -192,20 +195,20 @@ dds_reader_status_cb(
rd->m_requested_deadline_missed_status.total_count++;
rd->m_requested_deadline_missed_status.total_count_change++;
rd->m_requested_deadline_missed_status.last_instance_handle = data->handle;
metrics = (void*)&(rd->m_requested_deadline_missed_status);
metrics = &rd->m_requested_deadline_missed_status;
break;
}
case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: {
rd->m_requested_incompatible_qos_status.total_count++;
rd->m_requested_incompatible_qos_status.total_count_change++;
rd->m_requested_incompatible_qos_status.last_policy_id = data->extra;
metrics = (void*)&(rd->m_requested_incompatible_qos_status);
metrics = &rd->m_requested_incompatible_qos_status;
break;
}
case DDS_SAMPLE_LOST_STATUS: {
rd->m_sample_lost_status.total_count++;
rd->m_sample_lost_status.total_count_change++;
metrics = (void*)&(rd->m_sample_lost_status);
metrics = &rd->m_sample_lost_status;
break;
}
case DDS_SAMPLE_REJECTED_STATUS: {
@ -213,7 +216,7 @@ dds_reader_status_cb(
rd->m_sample_rejected_status.total_count_change++;
rd->m_sample_rejected_status.last_reason = data->extra;
rd->m_sample_rejected_status.last_instance_handle = data->handle;
metrics = (void*)&(rd->m_sample_rejected_status);
metrics = &rd->m_sample_rejected_status;
break;
}
case DDS_DATA_AVAILABLE_STATUS: {
@ -233,7 +236,7 @@ dds_reader_status_cb(
rd->m_liveliness_changed_status.not_alive_count_change++;
}
rd->m_liveliness_changed_status.last_publication_handle = data->handle;
metrics = (void*)&(rd->m_liveliness_changed_status);
metrics = &rd->m_liveliness_changed_status;
break;
}
case DDS_SUBSCRIPTION_MATCHED_STATUS: {
@ -247,7 +250,7 @@ dds_reader_status_cb(
rd->m_subscription_matched_status.current_count_change--;
}
rd->m_subscription_matched_status.last_publication_handle = data->handle;
metrics = (void*)&(rd->m_subscription_matched_status);
metrics = &rd->m_subscription_matched_status;
break;
}
default: assert (0);
@ -283,8 +286,8 @@ dds_reader_status_cb(
if (rc == DDS_RETCODE_OK) {
/* Event was eaten by a listener. */
if (dds_reader_lock(((dds_entity*)entity)->m_hdl, &rd) == DDS_RETCODE_OK) {
assert(rd == entity);
if (dds_reader_lock(entity->m_hdl, &rd) == DDS_RETCODE_OK) {
assert(&rd->m_entity == entity);
/* Reset the change counts of the metrics. */
switch (data->status) {
@ -353,16 +356,16 @@ dds_create_reader(
dds_entity_t subscriber;
dds_reader * rd;
struct rhc * rhc;
dds_entity * tp;
dds_topic * tp;
dds_entity_t reader;
dds_entity_t t;
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
dds_return_t ret = DDS_RETCODE_OK;
if (dds_entity_kind(topic) != DDS_KIND_INTERNAL) {
if (dds_entity_kind_from_handle(topic) != DDS_KIND_INTERNAL) {
/* Try claiming a participant. If that's not working, then it could be a subscriber. */
if (dds_entity_kind(participant_or_subscriber) == DDS_KIND_PARTICIPANT) {
if (dds_entity_kind_from_handle(participant_or_subscriber) == DDS_KIND_PARTICIPANT) {
subscriber = dds_create_subscriber(participant_or_subscriber, qos, NULL);
} else {
subscriber = participant_or_subscriber;
@ -381,19 +384,19 @@ dds_create_reader(
}
if ((subscriber != participant_or_subscriber) &&
(dds_entity_kind(topic) != DDS_KIND_INTERNAL)) {
(dds_entity_kind_from_handle(topic) != DDS_KIND_INTERNAL)) {
/* Delete implicit subscriber if reader creation fails */
sub->m_flags |= DDS_ENTITY_IMPLICIT;
}
rc = dds_entity_lock(t, DDS_KIND_TOPIC, &tp);
rc = dds_topic_lock(t, &tp);
if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Error occurred on locking topic\n");
reader = DDS_ERRNO(rc);
goto err_tp_lock;
}
assert (((dds_topic*)tp)->m_stopic);
assert (sub->m_domain == tp->m_domain);
assert (tp->m_stopic);
assert (sub->m_domain == tp->m_entity.m_domain);
/* Merge qos from topic and subscriber */
rqos = dds_create_qos ();
@ -407,8 +410,8 @@ dds_create_reader(
dds_merge_qos (rqos, sub->m_qos);
}
if (tp->m_qos) {
dds_merge_qos (rqos, tp->m_qos);
if (tp->m_entity.m_qos) {
dds_merge_qos (rqos, tp->m_entity.m_qos);
/* reset the following qos policies if set during topic qos merge as they aren't applicable for reader */
rqos->present &= ~(QP_DURABILITY_SERVICE | QP_TRANSPORT_PRIORITY | QP_LIFESPAN);
@ -423,7 +426,7 @@ dds_create_reader(
}
/* Additional checks required for built-in topics */
if (dds_entity_kind(topic) == DDS_KIND_INTERNAL && !dds__validate_builtin_reader_qos(topic, qos)) {
if (dds_entity_kind_from_handle(topic) == DDS_KIND_INTERNAL && !dds__validate_builtin_reader_qos(topic, qos)) {
dds_delete_qos(rqos);
DDS_ERROR("Invalid QoS specified for built-in topic reader");
reader = DDS_ERRNO(DDS_RETCODE_INCONSISTENT_POLICY);
@ -434,9 +437,9 @@ dds_create_reader(
rd = dds_alloc (sizeof (*rd));
reader = dds_entity_init (&rd->m_entity, sub, DDS_KIND_READER, rqos, listener, DDS_READER_STATUS_MASK);
rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED;
rd->m_topic = (dds_topic*)tp;
rhc = dds_rhc_new (rd, ((dds_topic*)tp)->m_stopic);
dds_entity_add_ref_nolock (tp);
rd->m_topic = tp;
rhc = dds_rhc_new (rd, tp->m_stopic);
dds_entity_add_ref_nolock (&tp->m_entity);
rd->m_entity.m_deriver.close = dds_reader_close;
rd->m_entity.m_deriver.delete = dds_reader_delete;
rd->m_entity.m_deriver.set_qos = dds_reader_qos_set;
@ -449,16 +452,16 @@ dds_create_reader(
assert(0);
}
os_mutexUnlock(&tp->m_mutex);
os_mutexUnlock(&tp->m_entity.m_mutex);
os_mutexUnlock(&sub->m_mutex);
if (asleep) {
thread_state_awake (thr);
}
rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic,
rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_participant->m_guid, tp->m_stopic,
rqos, rhc, dds_reader_status_cb, rd);
os_mutexLock(&sub->m_mutex);
os_mutexLock(&tp->m_mutex);
os_mutexLock(&tp->m_entity.m_mutex);
assert (rd->m_rd);
if (asleep) {
thread_state_asleep (thr);
@ -468,10 +471,10 @@ dds_create_reader(
if (dds_global.m_dur_reader && (rd->m_entity.m_qos->durability.kind > NN_TRANSIENT_LOCAL_DURABILITY_QOS)) {
(dds_global.m_dur_reader) (rd, rhc);
}
dds_entity_unlock(tp);
dds_topic_unlock(tp);
dds_entity_unlock(sub);
if (dds_entity_kind(topic) == DDS_KIND_INTERNAL) {
if (dds_entity_kind_from_handle(topic) == DDS_KIND_INTERNAL) {
/* If topic is builtin, then the topic entity is local and should
* be deleted because the application won't. */
dds_delete(t);
@ -480,14 +483,14 @@ dds_create_reader(
return reader;
err_bad_qos:
dds_entity_unlock(tp);
dds_topic_unlock(tp);
err_tp_lock:
dds_entity_unlock(sub);
if((sub->m_flags & DDS_ENTITY_IMPLICIT) != 0){
(void)dds_delete(subscriber);
}
err_sub_lock:
if (dds_entity_kind(topic) == DDS_KIND_INTERNAL) {
if (dds_entity_kind_from_handle(topic) == DDS_KIND_INTERNAL) {
/* If topic is builtin, then the topic entity is local and should
* be deleted because the application won't. */
dds_delete(t);
@ -495,11 +498,7 @@ err_sub_lock:
return reader;
}
void
dds_reader_ddsi2direct(
dds_entity_t entity,
ddsi2direct_directread_cb_t cb,
void *cbarg)
void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb, void *cbarg)
{
dds_reader *dds_rd;
@ -540,74 +539,59 @@ dds_reader_ddsi2direct(
os_mutexLock (&rd->e.lock);
}
os_mutexUnlock (&rd->e.lock);
ut_handle_release(entity, ((dds_entity*)rd)->m_hdllink);
ut_handle_release(entity, dds_rd->m_entity.m_hdllink);
}
}
uint32_t
dds_reader_lock_samples(
dds_entity_t reader)
uint32_t dds_reader_lock_samples (dds_entity_t reader)
{
uint32_t ret = 0;
dds_reader *rd;
if (dds_reader_lock(reader, &rd) == DDS_RETCODE_OK) {
ret = dds_rhc_lock_samples(rd->m_rd->rhc);
dds_reader_unlock(rd);
} else {
ret = 0;
}
return ret;
uint32_t n;
if (dds_reader_lock (reader, &rd) != DDS_RETCODE_OK)
return 0;
n = dds_rhc_lock_samples (rd->m_rd->rhc);
dds_reader_unlock (rd);
return n;
}
_Pre_satisfies_((reader & DDS_ENTITY_KIND_MASK) == DDS_KIND_READER)
int
dds_reader_wait_for_historical_data(
dds_entity_t reader,
dds_duration_t max_wait)
int dds_reader_wait_for_historical_data (dds_entity_t reader, dds_duration_t max_wait)
{
dds_reader *rd;
int ret;
dds_reader *rd;
assert (reader);
ret = dds_reader_lock(reader, &rd);
if (ret == DDS_RETCODE_OK) {
if (((dds_entity*)rd)->m_qos->durability.kind > NN_TRANSIENT_LOCAL_DURABILITY_QOS) {
if ((ret = dds_reader_lock (reader, &rd)) != DDS_RETCODE_OK)
return DDS_ERRNO (ret);
switch (rd->m_entity.m_qos->durability.kind)
{
case DDS_DURABILITY_VOLATILE:
ret = DDS_RETCODE_OK;
break;
case DDS_DURABILITY_TRANSIENT_LOCAL:
break;
case DDS_DURABILITY_TRANSIENT:
case DDS_DURABILITY_PERSISTENT:
ret = (dds_global.m_dur_wait) (rd, max_wait);
} else {
DDS_ERROR("Can not wait for historical data on a reader with volatile durability\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
break;
}
dds_reader_unlock(rd);
} else {
DDS_ERROR("Error occurred on locking reader\n");
ret = DDS_ERRNO(ret);
}
return ret;
}
_Pre_satisfies_(((entity & DDS_ENTITY_KIND_MASK) == DDS_KIND_READER ) || \
((entity & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_READ ) || \
((entity & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_QUERY) )
dds_entity_t
dds_get_subscriber(
_In_ dds_entity_t entity)
dds_entity_t dds_get_subscriber (dds_entity_t entity)
{
dds_entity_t hdl;
if (dds_entity_kind(entity) == DDS_KIND_READER) {
hdl = dds_get_parent(entity);
} else if (dds_entity_kind(entity) == DDS_KIND_COND_READ || dds_entity_kind(entity) == DDS_KIND_COND_QUERY) {
hdl = dds_get_parent(entity);
if(hdl > 0){
hdl = dds_get_subscriber(hdl);
} else {
DDS_ERROR("Reader of this condition is already deleted\n");
if (dds_entity_kind_from_handle (entity) == DDS_KIND_READER)
hdl = dds_get_parent (entity);
else if (dds_entity_kind_from_handle (entity) == DDS_KIND_COND_READ || dds_entity_kind_from_handle (entity) == DDS_KIND_COND_QUERY)
{
hdl = dds_get_parent (entity);
if (hdl > 0)
hdl = dds_get_subscriber (hdl);
DDS_ERROR ("Reader of this condition is already deleted\n");
}
} else {
DDS_ERROR("Provided entity is not a reader nor a condition\n");
hdl = DDS_ERRNO(dds_valid_hdl(entity, DDS_KIND_READER));
else
{
DDS_ERROR ("Provided entity is not a reader nor a condition\n");
hdl = DDS_ERRNO (dds_valid_hdl (entity, DDS_KIND_READER));
}
return hdl;
@ -633,10 +617,10 @@ dds_get_subscription_matched_status (
if (status) {
*status = rd->m_subscription_matched_status;
}
if (((dds_entity*)rd)->m_status_enable & DDS_SUBSCRIPTION_MATCHED_STATUS) {
if (rd->m_entity.m_status_enable & DDS_SUBSCRIPTION_MATCHED_STATUS) {
rd->m_subscription_matched_status.total_count_change = 0;
rd->m_subscription_matched_status.current_count_change = 0;
dds_entity_status_reset(rd, DDS_SUBSCRIPTION_MATCHED_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_SUBSCRIPTION_MATCHED_STATUS);
}
dds_reader_unlock(rd);
fail:
@ -663,10 +647,10 @@ dds_get_liveliness_changed_status (
if (status) {
*status = rd->m_liveliness_changed_status;
}
if (((dds_entity*)rd)->m_status_enable & DDS_LIVELINESS_CHANGED_STATUS) {
if (rd->m_entity.m_status_enable & DDS_LIVELINESS_CHANGED_STATUS) {
rd->m_liveliness_changed_status.alive_count_change = 0;
rd->m_liveliness_changed_status.not_alive_count_change = 0;
dds_entity_status_reset(rd, DDS_LIVELINESS_CHANGED_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_LIVELINESS_CHANGED_STATUS);
}
dds_reader_unlock(rd);
fail:
@ -692,10 +676,10 @@ dds_return_t dds_get_sample_rejected_status (
if (status) {
*status = rd->m_sample_rejected_status;
}
if (((dds_entity*)rd)->m_status_enable & DDS_SAMPLE_REJECTED_STATUS) {
if (rd->m_entity.m_status_enable & DDS_SAMPLE_REJECTED_STATUS) {
rd->m_sample_rejected_status.total_count_change = 0;
rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED;
dds_entity_status_reset(rd, DDS_SAMPLE_REJECTED_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_SAMPLE_REJECTED_STATUS);
}
dds_reader_unlock(rd);
fail:
@ -721,9 +705,9 @@ dds_return_t dds_get_sample_lost_status (
if (status) {
*status = rd->m_sample_lost_status;
}
if (((dds_entity*)rd)->m_status_enable & DDS_SAMPLE_LOST_STATUS) {
if (rd->m_entity.m_status_enable & DDS_SAMPLE_LOST_STATUS) {
rd->m_sample_lost_status.total_count_change = 0;
dds_entity_status_reset(rd, DDS_SAMPLE_LOST_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_SAMPLE_LOST_STATUS);
}
dds_reader_unlock(rd);
fail:
@ -749,9 +733,9 @@ dds_return_t dds_get_requested_deadline_missed_status (
if (status) {
*status = rd->m_requested_deadline_missed_status;
}
if (((dds_entity*)rd)->m_status_enable & DDS_REQUESTED_DEADLINE_MISSED_STATUS) {
if (rd->m_entity.m_status_enable & DDS_REQUESTED_DEADLINE_MISSED_STATUS) {
rd->m_requested_deadline_missed_status.total_count_change = 0;
dds_entity_status_reset(rd, DDS_REQUESTED_DEADLINE_MISSED_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_REQUESTED_DEADLINE_MISSED_STATUS);
}
dds_reader_unlock(rd);
fail:
@ -777,9 +761,9 @@ dds_return_t dds_get_requested_incompatible_qos_status (
if (status) {
*status = rd->m_requested_incompatible_qos_status;
}
if (((dds_entity*)rd)->m_status_enable & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS) {
if (rd->m_entity.m_status_enable & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS) {
rd->m_requested_incompatible_qos_status.total_count_change = 0;
dds_entity_status_reset(rd, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS);
dds_entity_status_reset(&rd->m_entity, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS);
}
dds_reader_unlock(rd);
fail:

View file

@ -310,13 +310,40 @@ struct trigger_info
bool has_changed;
};
#define QMASK_OF_SAMPLE(s) ((s)->isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE)
#define QMASK_OF_INVSAMPLE(i) ((i)->inv_isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE)
#define INST_NSAMPLES(i) ((i)->nvsamples + (i)->inv_exists)
#define INST_NREAD(i) ((i)->nvread + (unsigned)((i)->inv_exists & (i)->inv_isread))
#define INST_IS_EMPTY(i) (INST_NSAMPLES (i) == 0)
#define INST_HAS_READ(i) (INST_NREAD (i) > 0)
#define INST_HAS_UNREAD(i) (INST_NREAD (i) < INST_NSAMPLES (i))
static unsigned qmask_of_sample (const struct rhc_sample *s)
{
return s->isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE;
}
static unsigned qmask_of_invsample (const struct rhc_instance *i)
{
return i->inv_isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE;
}
static uint32_t inst_nsamples (const struct rhc_instance *i)
{
return i->nvsamples + i->inv_exists;
}
static uint32_t inst_nread (const struct rhc_instance *i)
{
return i->nvread + (uint32_t) (i->inv_exists & i->inv_isread);
}
static bool inst_is_empty (const struct rhc_instance *i)
{
return inst_nsamples (i) == 0;
}
static bool inst_has_read (const struct rhc_instance *i)
{
return inst_nread (i) > 0;
}
static bool inst_has_unread (const struct rhc_instance *i)
{
return inst_nread (i) < inst_nsamples (i);
}
static unsigned qmask_of_inst (const struct rhc_instance *inst);
static bool update_conditions_locked (struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct ddsi_serdata *sample);
@ -363,7 +390,7 @@ static void add_inst_to_nonempty_list (_Inout_ struct rhc *rhc, _Inout_ struct r
static void remove_inst_from_nonempty_list (struct rhc *rhc, struct rhc_instance *inst)
{
assert (INST_IS_EMPTY (inst));
assert (inst_is_empty (inst));
#ifndef NDEBUG
{
const struct rhc_instance *x = rhc->nonempty_instances;
@ -488,7 +515,7 @@ static void free_instance (void *vnode, void *varg)
struct rhc *rhc = varg;
struct rhc_instance *inst = vnode;
struct rhc_sample *s = inst->latest;
const bool was_empty = INST_IS_EMPTY (inst);
const bool was_empty = inst_is_empty (inst);
if (s)
{
do {
@ -559,8 +586,8 @@ static void init_trigger_info_nonmatch (struct trigger_info *info)
static void get_trigger_info (struct trigger_info *info, struct rhc_instance *inst, bool pre)
{
info->qminst = qmask_of_inst (inst);
info->has_read = INST_HAS_READ (inst);
info->has_not_read = INST_HAS_UNREAD (inst);
info->has_read = inst_has_read (inst);
info->has_not_read = inst_has_unread (inst);
/* reset instance has_changed before adding/overwriting a sample */
if (pre)
{
@ -749,7 +776,7 @@ static void update_inst
static void drop_instance_noupdate_no_writers (struct rhc *rhc, struct rhc_instance *inst)
{
int ret;
assert (INST_IS_EMPTY (inst));
assert (inst_is_empty (inst));
rhc->n_instances--;
@ -799,7 +826,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
inst->no_writers_gen++;
DDS_TRACE("new1");
if (!INST_IS_EMPTY (inst) && !inst->isdisposed)
if (!inst_is_empty (inst) && !inst->isdisposed)
rhc->n_not_alive_no_writers--;
}
else if (inst_wr_iid == 0 && inst->wrcount == 1)
@ -876,7 +903,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64
static void account_for_empty_to_nonempty_transition (struct rhc *rhc, struct rhc_instance *inst)
{
assert (INST_NSAMPLES (inst) == 1);
assert (inst_nsamples (inst) == 1);
add_inst_to_nonempty_list (rhc, inst);
rhc->n_new += inst->isnew;
if (inst->isdisposed)
@ -953,7 +980,7 @@ static int rhc_unregister_updateinst
}
else
{
if (!INST_IS_EMPTY (inst))
if (!inst_is_empty (inst))
{
/* Instance still has content - do not drop until application
takes the last sample. Set the invalid sample if the latest
@ -985,7 +1012,7 @@ static int rhc_unregister_updateinst
{
/* Add invalid samples for transition to no-writers */
DDS_TRACE(",#0,empty,nowriters");
assert (INST_IS_EMPTY (inst));
assert (inst_is_empty (inst));
inst_set_invsample (rhc, inst);
update_inst (rhc, inst, pwr_info, false, tstamp);
account_for_empty_to_nonempty_transition (rhc, inst);
@ -1228,7 +1255,7 @@ bool dds_rhc_store
const int not_alive = inst->wrcount == 0 || inst->isdisposed;
const bool old_isdisposed = inst->isdisposed;
const bool old_isnew = inst->isnew;
const bool was_empty = INST_IS_EMPTY (inst);
const bool was_empty = inst_is_empty (inst);
int inst_became_disposed = 0;
/* Not just an unregister, so a write and/or a dispose (possibly
@ -1314,7 +1341,7 @@ bool dds_rhc_store
}
else
{
assert (INST_IS_EMPTY (inst) == was_empty);
assert (inst_is_empty (inst) == was_empty);
}
}
@ -1369,7 +1396,7 @@ bool dds_rhc_store
if (rhc->reader && trigger_waitsets)
{
dds_entity_status_signal((dds_entity*)(rhc->reader));
dds_entity_status_signal(&rhc->reader->m_entity);
}
return delivered;
@ -1447,7 +1474,7 @@ void dds_rhc_unregister_wr
}
else
{
const bool was_empty = INST_IS_EMPTY (inst);
const bool was_empty = inst_is_empty (inst);
inst_set_invsample (rhc, inst);
if (was_empty)
account_for_empty_to_nonempty_transition (rhc, inst);
@ -1476,7 +1503,7 @@ void dds_rhc_unregister_wr
if (trigger_waitsets)
{
dds_entity_status_signal((dds_entity*)(rhc->reader));
dds_entity_status_signal(&rhc->reader->m_entity);
}
}
@ -1663,11 +1690,11 @@ static int dds_rhc_read_w_qminv
{
if (handle == DDS_HANDLE_NIL || inst->iid == handle)
{
if (!INST_IS_EMPTY (inst) && (qmask_of_inst (inst) & qminv) == 0)
if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0)
{
/* samples present & instance, view state matches */
struct trigger_info pre, post;
const unsigned nread = INST_NREAD (inst);
const unsigned nread = inst_nread (inst);
const uint32_t n_first = n;
get_trigger_info (&pre, inst, true);
@ -1676,13 +1703,13 @@ static int dds_rhc_read_w_qminv
struct rhc_sample *sample = inst->latest->next, * const end1 = sample;
do
{
if ((QMASK_OF_SAMPLE (sample) & qminv) == 0)
if ((qmask_of_sample (sample) & qminv) == 0)
{
/* sample state matches too */
set_sample_info (info_seq + n, inst, sample);
ddsi_serdata_to_sample (sample->sample, values[n], 0, 0);
if (cond == NULL
|| (dds_entity_kind(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|| (dds_entity_kind_from_handle(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|| (cond->m_query.m_filter != NULL && cond->m_query.m_filter(values[n])))
{
if (!sample->isread)
@ -1709,7 +1736,7 @@ static int dds_rhc_read_w_qminv
while (sample != end1);
}
if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0)
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0)
{
set_sample_info_invsample (info_seq + n, inst);
ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0);
@ -1726,7 +1753,7 @@ static int dds_rhc_read_w_qminv
inst->isnew = 0;
rhc->n_new--;
}
if (nread != INST_NREAD (inst))
if (nread != inst_nread (inst))
{
get_trigger_info (&post, inst, false);
if (update_conditions_locked (rhc, &pre, &post, NULL))
@ -1754,7 +1781,7 @@ static int dds_rhc_read_w_qminv
if (trigger_waitsets)
{
dds_entity_status_signal((dds_entity*)(rhc->reader));
dds_entity_status_signal(&rhc->reader->m_entity);
}
assert (n <= INT_MAX);
@ -1792,7 +1819,7 @@ static int dds_rhc_take_w_qminv
iid = inst->iid;
if (handle == DDS_HANDLE_NIL || iid == handle)
{
if (!INST_IS_EMPTY (inst) && (qmask_of_inst (inst) & qminv) == 0)
if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0)
{
struct trigger_info pre, post;
unsigned nvsamples = inst->nvsamples;
@ -1807,7 +1834,7 @@ static int dds_rhc_take_w_qminv
{
struct rhc_sample * const sample1 = sample->next;
if ((QMASK_OF_SAMPLE (sample) & qminv) != 0)
if ((qmask_of_sample (sample) & qminv) != 0)
{
psample = sample;
}
@ -1816,7 +1843,7 @@ static int dds_rhc_take_w_qminv
set_sample_info (info_seq + n, inst, sample);
ddsi_serdata_to_sample (sample->sample, values[n], 0, 0);
if (cond == NULL
|| (dds_entity_kind(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|| (dds_entity_kind_from_handle(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|| ( cond->m_query.m_filter != NULL && cond->m_query.m_filter(values[n])))
{
rhc->n_vsamples--;
@ -1855,7 +1882,7 @@ static int dds_rhc_take_w_qminv
}
}
if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0)
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0)
{
set_sample_info_invsample (info_seq + n, inst);
ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0);
@ -1880,7 +1907,7 @@ static int dds_rhc_take_w_qminv
}
}
if (INST_IS_EMPTY (inst))
if (inst_is_empty (inst))
{
remove_inst_from_nonempty_list (rhc, inst);
@ -1918,7 +1945,7 @@ static int dds_rhc_take_w_qminv
if (trigger_waitsets)
{
dds_entity_status_signal((dds_entity*)(rhc->reader));
dds_entity_status_signal(&rhc->reader->m_entity);
}
assert (n <= INT_MAX);
@ -1957,7 +1984,7 @@ static int dds_rhc_takecdr_w_qminv
iid = inst->iid;
if (handle == DDS_HANDLE_NIL || iid == handle)
{
if (!INST_IS_EMPTY (inst) && (qmask_of_inst (inst) & qminv) == 0)
if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0)
{
struct trigger_info pre, post;
unsigned nvsamples = inst->nvsamples;
@ -1972,7 +1999,7 @@ static int dds_rhc_takecdr_w_qminv
{
struct rhc_sample * const sample1 = sample->next;
if ((QMASK_OF_SAMPLE (sample) & qminv) != 0)
if ((qmask_of_sample (sample) & qminv) != 0)
{
psample = sample;
}
@ -2006,7 +2033,7 @@ static int dds_rhc_takecdr_w_qminv
}
}
if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0)
if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0)
{
set_sample_info_invsample (info_seq + n, inst);
values[n] = ddsi_serdata_ref(inst->tk->m_sample);
@ -2031,7 +2058,7 @@ static int dds_rhc_takecdr_w_qminv
}
}
if (INST_IS_EMPTY (inst))
if (inst_is_empty (inst))
{
remove_inst_from_nonempty_list (rhc, inst);
@ -2069,7 +2096,7 @@ static int dds_rhc_takecdr_w_qminv
if (trigger_waitsets)
{
dds_entity_status_signal((dds_entity*)(rhc->reader));
dds_entity_status_signal(&rhc->reader->m_entity);
}
assert (n <= INT_MAX);
@ -2086,15 +2113,15 @@ static uint32_t rhc_get_cond_trigger (struct rhc_instance * const inst, const dd
switch (c->m_sample_states)
{
case DDS_SST_READ:
m = m && INST_HAS_READ (inst);
m = m && inst_has_read (inst);
break;
case DDS_SST_NOT_READ:
m = m && INST_HAS_UNREAD (inst);
m = m && inst_has_unread (inst);
break;
case DDS_SST_READ | DDS_SST_NOT_READ:
case 0:
/* note: we get here only if inst not empty, so this is a no-op */
m = m && !INST_IS_EMPTY (inst);
m = m && !inst_is_empty (inst);
break;
default:
DDS_FATAL("update_readconditions: sample_states invalid: %x\n", c->m_sample_states);
@ -2118,11 +2145,11 @@ void dds_rhc_add_readcondition (dds_readcond * cond)
os_mutexLock (&rhc->lock);
for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter))
{
if (dds_entity_kind(cond->m_entity.m_hdl) == DDS_KIND_COND_READ)
if (dds_entity_kind_from_handle(cond->m_entity.m_hdl) == DDS_KIND_COND_READ)
{
((dds_entity*)cond)->m_trigger += rhc_get_cond_trigger (inst, cond);
if (((dds_entity*)cond)->m_trigger) {
dds_entity_status_signal((dds_entity*)cond);
cond->m_entity.m_trigger += rhc_get_cond_trigger (inst, cond);
if (cond->m_entity.m_trigger) {
dds_entity_status_signal(&cond->m_entity);
}
}
}
@ -2225,7 +2252,7 @@ static bool update_conditions_locked
}
else if (m_pre < m_post)
{
if (sample && tmp == NULL && (dds_entity_kind(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY))
if (sample && tmp == NULL && (dds_entity_kind_from_handle(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY))
{
tmp = ddsi_sertopic_alloc_sample (rhc->topic);
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
@ -2233,7 +2260,7 @@ static bool update_conditions_locked
if
(
(sample == NULL)
|| (dds_entity_kind(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|| (dds_entity_kind_from_handle(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|| (iter->m_query.m_filter != NULL && iter->m_query.m_filter (tmp))
)
{
@ -2335,7 +2362,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds)
for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter))
{
n_instances++;
if (!INST_IS_EMPTY (inst))
if (!inst_is_empty (inst))
{
/* samples present (or an invalid sample is) */
unsigned n_vsamples_in_instance = 0, n_read_vsamples_in_instance = 0;
@ -2387,7 +2414,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds)
dds_readcond * rciter = rhc->conds;
for (i = 0; i < (rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS); i++)
{
if (dds_entity_kind(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ)
if (dds_entity_kind_from_handle(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ)
{
cond_match_count[i] += rhc_get_cond_trigger (inst, rciter);
}
@ -2412,7 +2439,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds)
dds_readcond * rciter = rhc->conds;
for (i = 0; i < (rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS); i++)
{
if (dds_entity_kind(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ)
if (dds_entity_kind_from_handle(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ)
{
assert (cond_match_count[i] == rciter->m_entity.m_trigger);
}
@ -2433,7 +2460,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds)
inst = rhc->nonempty_instances;
n_nonempty_instances = 0;
do {
assert (!INST_IS_EMPTY (inst));
assert (!inst_is_empty (inst));
assert (prev->next == inst);
assert (inst->prev == prev);
prev = inst;

View file

@ -40,10 +40,8 @@ static const char * stream_op_type[11] =
const uint32_t dds_op_size[5] = { 0, 1u, 2u, 4u, 8u };
static void dds_stream_write
(dds_stream_t * os, const char * data, const uint32_t * ops);
static void dds_stream_read
(dds_stream_t * is, char * data, const uint32_t * ops);
static void dds_stream_write (dds_stream_t * os, const char * data, const uint32_t * ops);
static void dds_stream_read (dds_stream_t * is, char * data, const uint32_t * ops);
#define DDS_SWAP16(v) \
((uint16_t)(((v) >> 8) | ((v) << 8)))
@ -258,6 +256,12 @@ uint64_t dds_stream_read_uint64 (dds_stream_t * is)
return val;
}
extern inline char dds_stream_read_char (dds_stream_t *is);
extern inline int8_t dds_stream_read_int8 (dds_stream_t *is);
extern inline int16_t dds_stream_read_int16 (dds_stream_t *is);
extern inline int32_t dds_stream_read_int32 (dds_stream_t *is);
extern inline int64_t dds_stream_read_int64 (dds_stream_t *is);
float dds_stream_read_float (dds_stream_t * is)
{
float val = 0.0;
@ -417,6 +421,12 @@ void dds_stream_write_uint64 (dds_stream_t * os, uint64_t val)
DDS_OS_PUT8 (os, val, uint64_t);
}
extern inline void dds_stream_write_char (dds_stream_t * os, char val);
extern inline void dds_stream_write_int8 (dds_stream_t * os, int8_t val);
extern inline void dds_stream_write_int16 (dds_stream_t * os, int16_t val);
extern inline void dds_stream_write_int32 (dds_stream_t * os, int32_t val);
extern inline void dds_stream_write_int64 (dds_stream_t * os, int64_t val);
void dds_stream_write_float (dds_stream_t * os, float val)
{
union { float f; uint32_t u; } u;

View file

@ -101,16 +101,14 @@ dds_subscriber_status_validate(
Set boolean on readers that indicates state of DATA_ON_READERS
status on parent subscriber
*/
static dds_return_t
dds_subscriber_status_propagate(
dds_entity *sub,
uint32_t mask,
bool set)
static dds_return_t dds_subscriber_status_propagate (dds_entity *sub, uint32_t mask, bool set)
{
if (mask & DDS_DATA_ON_READERS_STATUS) {
if (mask & DDS_DATA_ON_READERS_STATUS)
{
dds_entity *iter = sub->m_children;
while (iter) {
os_mutexLock (&iter->m_mutex);
assert (dds_entity_kind (iter) == DDS_KIND_READER);
((dds_reader*) iter)->m_data_on_readers = set;
os_mutexUnlock (&iter->m_mutex);
iter = iter->m_next;

View file

@ -26,6 +26,8 @@
#include "os/os_atomics.h"
#include "ddsi/ddsi_iid.h"
DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_topic)
#define DDS_TOPIC_STATUS_MASK \
DDS_INCONSISTENT_TOPIC_STATUS
@ -97,7 +99,7 @@ dds_topic_status_cb(
dds_topic *topic;
dds__retcode_t rc;
if (dds_topic_lock(((dds_entity*)cb_t)->m_hdl, &topic) != DDS_RETCODE_OK) {
if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) != DDS_RETCODE_OK) {
return;
}
assert(topic == cb_t);
@ -116,24 +118,24 @@ dds_topic_status_cb(
dds_topic_unlock(topic);
/* Is anybody interested within the entity hierarchy through listeners? */
rc = dds_entity_listener_propagation((dds_entity*)topic,
(dds_entity*)topic,
rc = dds_entity_listener_propagation(&topic->m_entity,
&topic->m_entity,
DDS_INCONSISTENT_TOPIC_STATUS,
(void*)&(topic->m_inconsistent_topic_status),
&topic->m_inconsistent_topic_status,
true);
if (rc == DDS_RETCODE_OK) {
/* Event was eaten by a listener. */
if (dds_topic_lock(((dds_entity*)cb_t)->m_hdl, &topic) == DDS_RETCODE_OK) {
if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) == DDS_RETCODE_OK) {
/* Reset the change counts of the metrics. */
topic->m_inconsistent_topic_status.total_count_change = 0;
dds_topic_unlock(topic);
}
} else if (rc == DDS_RETCODE_NO_DATA) {
/* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */
dds_entity_status_set((dds_entity*)topic, DDS_INCONSISTENT_TOPIC_STATUS);
dds_entity_status_set(&topic->m_entity, DDS_INCONSISTENT_TOPIC_STATUS);
/* Notify possible interested observers. */
dds_entity_status_signal((dds_entity*)topic);
dds_entity_status_signal(&topic->m_entity);
} else if (rc == DDS_RETCODE_ALREADY_DELETED) {
/* An entity up the hierarchy is being deleted; consider it successful. */
} else {
@ -184,7 +186,7 @@ dds_topic_free(
assert (st);
os_mutexLock (&dds_global.m_mutex);
domain = (dds_domain*) ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &domainid);
domain = ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &domainid);
if (domain != NULL) {
ut_avlDelete (&dds_topictree_def, &domain->m_topics, st);
}
@ -715,9 +717,9 @@ dds_get_inconsistent_topic_status(
if (status) {
*status = t->m_inconsistent_topic_status;
}
if (((dds_entity*)t)->m_status_enable & DDS_INCONSISTENT_TOPIC_STATUS) {
if (t->m_entity.m_status_enable & DDS_INCONSISTENT_TOPIC_STATUS) {
t->m_inconsistent_topic_status.total_count_change = 0;
dds_entity_status_reset(t, DDS_INCONSISTENT_TOPIC_STATUS);
dds_entity_status_reset(&t->m_entity, DDS_INCONSISTENT_TOPIC_STATUS);
}
dds_topic_unlock(t);
fail:

View file

@ -17,10 +17,7 @@
#include "dds__rhc.h"
#include "dds__err.h"
#define dds_waitset_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_WAITSET, (dds_entity**)obj)
#define dds_waitset_unlock(obj) dds_entity_unlock((dds_entity*)obj);
DEFINE_ENTITY_LOCK_UNLOCK(static, dds_waitset, DDS_KIND_WAITSET)
static void
dds_waitset_swap(
@ -41,18 +38,16 @@ dds_waitset_swap(
*dst = idx;
}
static void
dds_waitset_signal_entity(
_In_ dds_waitset *ws)
static void dds_waitset_signal_entity (dds_waitset *ws)
{
dds_entity *e = (dds_entity*)ws;
dds_entity *e = &ws->m_entity;
/* When signaling any observers of us through the entity,
* we need to be unlocked. We still have claimed the related
* handle, so possible deletions will be delayed until we
* release it. */
os_mutexUnlock(&(e->m_mutex));
dds_entity_status_signal(e);
os_mutexLock(&(e->m_mutex));
os_mutexUnlock (&e->m_mutex);
dds_entity_status_signal (e);
os_mutexLock (&e->m_mutex);
}
static dds_return_t
@ -214,8 +209,8 @@ dds_waitset_close(
{
dds_waitset *ws = (dds_waitset*)e;
dds_waitset_close_list(&(ws->observed), e->m_hdl);
dds_waitset_close_list(&(ws->triggered), e->m_hdl);
dds_waitset_close_list(&ws->observed, e->m_hdl);
dds_waitset_close_list(&ws->triggered, e->m_hdl);
/* Trigger waitset to wake up. */
os_condBroadcast(&e->m_cond);
@ -368,7 +363,7 @@ dds_waitset_attach(
e = NULL;
}
} else {
e = (dds_entity*)ws;
e = &ws->m_entity;
}
/* This will fail if given entity is already attached (or deleted). */
@ -421,7 +416,7 @@ dds_waitset_detach(
if (rc == DDS_RETCODE_OK) {
/* Possibly fails when entity was not attached. */
if (waitset == entity) {
rc = dds_entity_observer_unregister_nl((dds_entity*)ws, waitset);
rc = dds_entity_observer_unregister_nl(&ws->m_entity, waitset);
} else {
rc = dds_entity_observer_unregister(entity, waitset);
}
@ -497,9 +492,9 @@ dds_waitset_set_trigger(
goto fail;
}
if (trigger) {
dds_entity_status_set(ws, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_set(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS);
} else {
dds_entity_status_reset(ws, DDS_WAITSET_TRIGGER_STATUS);
dds_entity_status_reset(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS);
}
dds_waitset_signal_entity(ws);
dds_waitset_unlock(ws);

View file

@ -20,10 +20,13 @@
#include "dds__qos.h"
#include "dds__err.h"
#include "dds__init.h"
#include "dds__topic.h"
#include "ddsi/ddsi_tkmap.h"
#include "dds__whc.h"
#include "ddsc/ddsc_project.h"
DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_writer)
#define DDS_WRITER_STATUS_MASK \
DDS_LIVELINESS_LOST_STATUS |\
DDS_OFFERED_DEADLINE_MISSED_STATUS |\
@ -64,119 +67,131 @@ dds_writer_status_validate(
static void
dds_writer_status_cb(
void *entity,
void *ventity,
const status_cb_data_t *data)
{
struct dds_entity * const entity = ventity;
dds_writer *wr;
dds__retcode_t rc;
void *metrics = NULL;
/* When data is NULL, it means that the writer is deleted. */
if (data == NULL) {
if (data == NULL)
{
/* Release the initial claim that was done during the create. This
* will indicate that further API deletion is now possible. */
ut_handle_release(((dds_entity*)entity)->m_hdl, ((dds_entity*)entity)->m_hdllink);
ut_handle_release (entity->m_hdl, entity->m_hdllink);
return;
}
if (dds_writer_lock(((dds_entity*)entity)->m_hdl, &wr) != DDS_RETCODE_OK) {
if (dds_writer_lock (entity->m_hdl, &wr) != DDS_RETCODE_OK) {
/* There's a deletion or closing going on. */
return;
}
assert(wr == entity);
assert (&wr->m_entity == entity);
/* Reset the status for possible Listener call.
* When a listener is not called, the status will be set (again). */
dds_entity_status_reset(entity, data->status);
dds_entity_status_reset (entity, data->status);
/* Update status metrics. */
switch (data->status) {
switch (data->status)
{
case DDS_OFFERED_DEADLINE_MISSED_STATUS: {
wr->m_offered_deadline_missed_status.total_count++;
wr->m_offered_deadline_missed_status.total_count_change++;
wr->m_offered_deadline_missed_status.last_instance_handle = data->handle;
metrics = (void*)&(wr->m_offered_deadline_missed_status);
struct dds_offered_deadline_missed_status * const st = &wr->m_offered_deadline_missed_status;
st->total_count++;
st->total_count_change++;
st->last_instance_handle = data->handle;
metrics = st;
break;
}
case DDS_LIVELINESS_LOST_STATUS: {
wr->m_liveliness_lost_status.total_count++;
wr->m_liveliness_lost_status.total_count_change++;
metrics = (void*)&(wr->m_liveliness_lost_status);
struct dds_liveliness_lost_status * const st = &wr->m_liveliness_lost_status;
st->total_count++;
st->total_count_change++;
metrics = st;
break;
}
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: {
wr->m_offered_incompatible_qos_status.total_count++;
wr->m_offered_incompatible_qos_status.total_count_change++;
wr->m_offered_incompatible_qos_status.last_policy_id = data->extra;
metrics = (void*)&(wr->m_offered_incompatible_qos_status);
struct dds_offered_incompatible_qos_status * const st = &wr->m_offered_incompatible_qos_status;
st->total_count++;
st->total_count_change++;
st->last_policy_id = data->extra;
metrics = st;
break;
}
case DDS_PUBLICATION_MATCHED_STATUS: {
struct dds_publication_matched_status * const st = &wr->m_publication_matched_status;
if (data->add) {
wr->m_publication_matched_status.total_count++;
wr->m_publication_matched_status.total_count_change++;
wr->m_publication_matched_status.current_count++;
wr->m_publication_matched_status.current_count_change++;
st->total_count++;
st->total_count_change++;
st->current_count++;
st->current_count_change++;
} else {
wr->m_publication_matched_status.current_count--;
wr->m_publication_matched_status.current_count_change--;
st->current_count--;
st->current_count_change--;
}
wr->m_publication_matched_status.last_subscription_handle = data->handle;
metrics = (void*)&(wr->m_publication_matched_status);
st->last_subscription_handle = data->handle;
metrics = st;
break;
}
default: assert (0);
default:
assert (0);
}
/* The writer needs to be unlocked when propagating the (possible) listener
* call because the application should be able to call this writer within
* the callback function. */
dds_writer_unlock(wr);
dds_writer_unlock (wr);
/* Is anybody interested within the entity hierarchy through listeners? */
rc = dds_entity_listener_propagation(entity, entity, data->status, metrics, true);
rc = dds_entity_listener_propagation (entity, entity, data->status, metrics, true);
if (rc == DDS_RETCODE_OK) {
if (rc == DDS_RETCODE_OK)
{
/* Event was eaten by a listener. */
if (dds_writer_lock(((dds_entity*)entity)->m_hdl, &wr) == DDS_RETCODE_OK) {
assert(wr == entity);
if (dds_writer_lock (entity->m_hdl, &wr) == DDS_RETCODE_OK)
{
assert (&wr->m_entity == entity);
/* Reset the status. */
dds_entity_status_reset(entity, data->status);
dds_entity_status_reset (entity, data->status);
/* Reset the change counts of the metrics. */
switch (data->status) {
case DDS_OFFERED_DEADLINE_MISSED_STATUS: {
switch (data->status)
{
case DDS_OFFERED_DEADLINE_MISSED_STATUS:
wr->m_offered_deadline_missed_status.total_count_change = 0;
break;
}
case DDS_LIVELINESS_LOST_STATUS: {
case DDS_LIVELINESS_LOST_STATUS:
wr->m_liveliness_lost_status.total_count_change = 0;
break;
}
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: {
case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS:
wr->m_offered_incompatible_qos_status.total_count_change = 0;
break;
}
case DDS_PUBLICATION_MATCHED_STATUS: {
case DDS_PUBLICATION_MATCHED_STATUS:
wr->m_publication_matched_status.total_count_change = 0;
wr->m_publication_matched_status.current_count_change = 0;
break;
default:
assert (0);
}
default: assert (0);
dds_writer_unlock (wr);
}
dds_writer_unlock(wr);
} else {
/* There's a deletion or closing going on. */
}
} else if (rc == DDS_RETCODE_NO_DATA) {
else if (rc == DDS_RETCODE_NO_DATA)
{
/* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */
dds_entity_status_set(entity, data->status);
dds_entity_status_set (entity, data->status);
/* Notify possible interested observers. */
dds_entity_status_signal(entity);
} else if (rc == DDS_RETCODE_ALREADY_DELETED) {
dds_entity_status_signal (entity);
}
else if (rc == DDS_RETCODE_ALREADY_DELETED)
{
/* An entity up the hierarchy is being deleted; consider it successful. */
} else {
}
else
{
/* Something went wrong up the hierarchy. */
}
}
@ -412,7 +427,7 @@ dds_create_writer(
dds_writer * wr;
dds_entity_t writer;
dds_entity * pub = NULL;
dds_entity * tp;
dds_topic * tp;
dds_entity_t publisher;
struct thread_state1 * const thr = lookup_thread_state();
const bool asleep = !vtime_awake_p(thr->vtime);
@ -420,7 +435,7 @@ dds_create_writer(
dds_return_t ret;
/* Try claiming a participant. If that's not working, then it could be a subscriber. */
if(dds_entity_kind(participant_or_publisher) == DDS_KIND_PARTICIPANT){
if(dds_entity_kind_from_handle(participant_or_publisher) == DDS_KIND_PARTICIPANT){
publisher = dds_create_publisher(participant_or_publisher, qos, NULL);
} else{
publisher = participant_or_publisher;
@ -437,14 +452,14 @@ dds_create_writer(
pub->m_flags |= DDS_ENTITY_IMPLICIT;
}
rc = dds_entity_lock(topic, DDS_KIND_TOPIC, &tp);
rc = dds_topic_lock(topic, &tp);
if (rc != DDS_RETCODE_OK) {
DDS_ERROR("Error occurred on locking topic\n");
writer = DDS_ERRNO(rc);
goto err_tp_lock;
}
assert(((dds_topic*)tp)->m_stopic);
assert(pub->m_domain == tp->m_domain);
assert(tp->m_stopic);
assert(pub->m_domain == tp->m_entity.m_domain);
/* Merge Topic & Publisher qos */
wqos = dds_create_qos();
@ -458,9 +473,9 @@ dds_create_writer(
dds_merge_qos(wqos, pub->m_qos);
}
if (tp->m_qos) {
if (tp->m_entity.m_qos) {
/* merge topic qos data to writer qos */
dds_merge_qos(wqos, tp->m_qos);
dds_merge_qos(wqos, tp->m_entity.m_qos);
}
nn_xqos_mergein_missing(wqos, &gv.default_xqos_wr);
@ -475,8 +490,8 @@ dds_create_writer(
wr = dds_alloc(sizeof (*wr));
writer = dds_entity_init(&wr->m_entity, pub, DDS_KIND_WRITER, wqos, listener, DDS_WRITER_STATUS_MASK);
wr->m_topic = (dds_topic*)tp;
dds_entity_add_ref_nolock(tp);
wr->m_topic = tp;
dds_entity_add_ref_nolock(&tp->m_entity);
wr->m_xp = nn_xpack_new(conn, get_bandwidth_limit(wqos->transport_priority), config.xpack_send_async);
wr->m_entity.m_deriver.close = dds_writer_close;
wr->m_entity.m_deriver.delete = dds_writer_delete;
@ -491,25 +506,25 @@ dds_create_writer(
assert(0);
}
os_mutexUnlock(&tp->m_mutex);
os_mutexUnlock(&tp->m_entity.m_mutex);
os_mutexUnlock(&pub->m_mutex);
if (asleep) {
thread_state_awake(thr);
}
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
os_mutexLock(&pub->m_mutex);
os_mutexLock(&tp->m_mutex);
os_mutexLock(&tp->m_entity.m_mutex);
assert(wr->m_wr);
if (asleep) {
thread_state_asleep(thr);
}
dds_entity_unlock(tp);
dds_topic_unlock(tp);
dds_entity_unlock(pub);
return writer;
err_bad_qos:
dds_entity_unlock(tp);
dds_topic_unlock(tp);
err_tp_lock:
dds_entity_unlock(pub);
if((pub->m_flags & DDS_ENTITY_IMPLICIT) != 0){
@ -559,10 +574,10 @@ dds_get_publication_matched_status (
if (status) {
*status = wr->m_publication_matched_status;
}
if (((dds_entity*)wr)->m_status_enable & DDS_PUBLICATION_MATCHED_STATUS) {
if (wr->m_entity.m_status_enable & DDS_PUBLICATION_MATCHED_STATUS) {
wr->m_publication_matched_status.total_count_change = 0;
wr->m_publication_matched_status.current_count_change = 0;
dds_entity_status_reset(wr, DDS_PUBLICATION_MATCHED_STATUS);
dds_entity_status_reset(&wr->m_entity, DDS_PUBLICATION_MATCHED_STATUS);
}
dds_writer_unlock(wr);
fail:
@ -589,9 +604,9 @@ dds_get_liveliness_lost_status (
if (status) {
*status = wr->m_liveliness_lost_status;
}
if (((dds_entity*)wr)->m_status_enable & DDS_LIVELINESS_LOST_STATUS) {
if (wr->m_entity.m_status_enable & DDS_LIVELINESS_LOST_STATUS) {
wr->m_liveliness_lost_status.total_count_change = 0;
dds_entity_status_reset(wr, DDS_LIVELINESS_LOST_STATUS);
dds_entity_status_reset(&wr->m_entity, DDS_LIVELINESS_LOST_STATUS);
}
dds_writer_unlock(wr);
fail:
@ -618,9 +633,9 @@ dds_get_offered_deadline_missed_status(
if (status) {
*status = wr->m_offered_deadline_missed_status;
}
if (((dds_entity*)wr)->m_status_enable & DDS_OFFERED_DEADLINE_MISSED_STATUS) {
if (wr->m_entity.m_status_enable & DDS_OFFERED_DEADLINE_MISSED_STATUS) {
wr->m_offered_deadline_missed_status.total_count_change = 0;
dds_entity_status_reset(wr, DDS_OFFERED_DEADLINE_MISSED_STATUS);
dds_entity_status_reset(&wr->m_entity, DDS_OFFERED_DEADLINE_MISSED_STATUS);
}
dds_writer_unlock(wr);
fail:
@ -647,9 +662,9 @@ dds_get_offered_incompatible_qos_status (
if (status) {
*status = wr->m_offered_incompatible_qos_status;
}
if (((dds_entity*)wr)->m_status_enable & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS) {
if (wr->m_entity.m_status_enable & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS) {
wr->m_offered_incompatible_qos_status.total_count_change = 0;
dds_entity_status_reset(wr, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS);
dds_entity_status_reset(&wr->m_entity, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS);
}
dds_writer_unlock(wr);
fail:

View file

@ -79,7 +79,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi"
ddsi_tkmap.h
probes-constants.h
q_addrset.h
q_align.h
q_bitset.h
q_bswap.h
q_config.h

View file

@ -178,27 +178,6 @@ struct ddsi_tran_qos
int m_diffserv;
};
/* Functions and pseudo functions (macro wrappers) */
void ddsi_factory_conn_init (ddsi_tran_factory_t, ddsi_tran_conn_t);
#define ddsi_tran_type(b) (((ddsi_tran_base_t) (b))->m_trantype)
#define ddsi_tran_port(b) (((ddsi_tran_base_t) (b))->m_port)
int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc);
void ddsi_tran_free (ddsi_tran_base_t base);
void ddsi_tran_free_qos (ddsi_tran_qos_t qos);
ddsi_tran_qos_t ddsi_tran_create_qos (void);
os_socket ddsi_tran_handle (ddsi_tran_base_t base);
#define ddsi_factory_create_listener(f,p,q) (((f)->m_create_listener_fn) ((p), (q)))
#define ddsi_factory_supports(f,k) (((f)->m_supports_fn) (k))
ddsi_tran_conn_t ddsi_factory_create_conn
(
ddsi_tran_factory_t factory,
uint32_t port,
ddsi_tran_qos_t qos
);
void ddsi_tran_factories_fini (void);
void ddsi_factory_add (ddsi_tran_factory_t factory);
void ddsi_factory_free (ddsi_tran_factory_t factory);
@ -206,24 +185,56 @@ ddsi_tran_factory_t ddsi_factory_find (const char * type);
ddsi_tran_factory_t ddsi_factory_find_supported_kind (int32_t kind);
void ddsi_factory_conn_init (ddsi_tran_factory_t factory, ddsi_tran_conn_t conn);
#define ddsi_conn_handle(c) (ddsi_tran_handle (&(c)->m_base))
#define ddsi_conn_locator(c,l) (ddsi_tran_locator (&(c)->m_base,(l)))
OSAPI_EXPORT ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags);
ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc);
inline bool ddsi_factory_supports (ddsi_tran_factory_t factory, int32_t kind) {
return factory->m_supports_fn (kind);
}
inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) {
return factory->m_create_conn_fn (port, qos);
}
inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, int port, ddsi_tran_qos_t qos) {
return factory->m_create_listener_fn (port, qos);
}
void ddsi_tran_free (ddsi_tran_base_t base);
void ddsi_tran_free_qos (ddsi_tran_qos_t qos);
ddsi_tran_qos_t ddsi_tran_create_qos (void);
inline os_socket ddsi_tran_handle (ddsi_tran_base_t base) {
return base->m_handle_fn (base);
}
inline int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc) {
return base->m_locator_fn (base, loc);
}
inline os_socket ddsi_conn_handle (ddsi_tran_conn_t conn) {
return conn->m_base.m_handle_fn (&conn->m_base);
}
inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn) {
return conn->m_base.m_trantype;
}
inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn) {
return conn->m_base.m_port;
}
inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc) {
return conn->m_base.m_locator_fn (&conn->m_base, loc);
}
inline ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags) {
return conn->m_closed ? -1 : (conn->m_write_fn) (conn, dst, niov, iov, flags);
}
inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc) {
return conn->m_closed ? -1 : conn->m_read_fn (conn, buf, len, srcloc);
}
bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc);
void ddsi_conn_disable_multiplexing (ddsi_tran_conn_t conn);
void ddsi_conn_add_ref (ddsi_tran_conn_t conn);
void ddsi_conn_free (ddsi_tran_conn_t conn);
int ddsi_conn_join_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcip, const nn_locator_t *mcip, const struct nn_interface *interf);
int ddsi_conn_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcip, const nn_locator_t *mcip, const struct nn_interface *interf);
void ddsi_conn_transfer_group_membership (ddsi_tran_conn_t conn, ddsi_tran_conn_t newconn);
int ddsi_conn_rejoin_transferred_mcgroups (ddsi_tran_conn_t conn);
int ddsi_is_mcaddr (const nn_locator_t *loc);
int ddsi_is_ssm_mcaddr (const nn_locator_t *loc);
enum ddsi_nearby_address_result ddsi_is_nearby_address (const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[]);
enum ddsi_locator_from_string_result ddsi_locator_from_string (nn_locator_t *loc, const char *str);
/* 8 for transport/
@ -242,9 +253,15 @@ char *ddsi_locator_to_string_no_port (char *dst, size_t sizeof_dst, const nn_loc
int ddsi_enumerate_interfaces (ddsi_tran_factory_t factory, os_ifaddrs_t **interfs);
#define ddsi_listener_locator(s,l) (ddsi_tran_locator (&(s)->m_base,(l)))
ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener);
int ddsi_listener_listen (ddsi_tran_listener_t listener);
inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc) {
return listener->m_base.m_locator_fn (&listener->m_base, loc);
}
inline int ddsi_listener_listen (ddsi_tran_listener_t listener) {
return listener->m_listen_fn (listener);
}
inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener) {
return listener->m_accept_fn (listener);
}
void ddsi_listener_unblock (ddsi_tran_listener_t listener);
void ddsi_listener_free (ddsi_tran_listener_t listener);

View file

@ -1,18 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef NN_ALIGN_H
#define NN_ALIGN_H
#define ALIGN4(x) (((x) + 3) & -4u)
#define ALIGN8(x) (((x) + 7) & -8u)
#endif /* NN_ALIGN_H */

View file

@ -17,13 +17,14 @@
#include "ddsi/q_rtps.h" /* for nn_guid_t, nn_guid_prefix_t */
#include "ddsi/q_protocol.h" /* for nn_sequence_number_t */
#define bswap2(x) ((int16_t) bswap2u ((uint16_t) (x)))
#define bswap4(x) ((int32_t) bswap4u ((uint32_t) (x)))
#define bswap8(x) ((int64_t) bswap8u ((uint64_t) (x)))
inline uint16_t bswap2u (uint16_t x)
{
return (unsigned short) ((x >> 8) | (x << 8));
return (uint16_t) ((x >> 8) | (x << 8));
}
inline int16_t bswap2 (int16_t x)
{
return (int16_t) bswap2u ((uint16_t) x);
}
inline uint32_t bswap4u (uint32_t x)
@ -31,6 +32,11 @@ inline uint32_t bswap4u (uint32_t x)
return (x >> 24) | ((x >> 8) & 0xff00) | ((x << 8) & 0xff0000) | (x << 24);
}
inline int32_t bswap4 (int32_t x)
{
return (int32_t) bswap4u ((uint32_t) x);
}
inline uint64_t bswap8u (uint64_t x)
{
const uint32_t newhi = bswap4u ((uint32_t) x);
@ -38,6 +44,11 @@ inline uint64_t bswap8u (uint64_t x)
return ((uint64_t) newhi << 32) | (uint64_t) newlo;
}
inline int64_t bswap8 (int64_t x)
{
return (int64_t) bswap8u ((uint64_t) x);
}
inline void bswapSN (nn_sequence_number_t *sn)
{
sn->high = bswap4 (sn->high);

View file

@ -51,7 +51,7 @@ typedef struct status_cb_data
}
status_cb_data_t;
typedef void (*status_cb_t) (void * entity, const status_cb_data_t * data);
typedef void (*status_cb_t) (void *entity, const status_cb_data_t *data);
struct prd_wr_match {
ut_avlNode_t avlnode;

View file

@ -312,6 +312,22 @@ static int ddsi_raweth_is_mcaddr (const ddsi_tran_factory_t tran, const nn_locat
return (loc->address[10] & 1);
}
static int ddsi_raweth_is_ssm_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc)
{
(void) tran;
(void) loc;
return 0;
}
static enum ddsi_nearby_address_result ddsi_raweth_is_nearby_address (ddsi_tran_factory_t tran, const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[])
{
(void) tran;
(void) loc;
(void) ninterf;
(void) interf;
return DNAR_LOCAL;
}
static enum ddsi_locator_from_string_result ddsi_raweth_address_from_string (ddsi_tran_factory_t tran, nn_locator_t *loc, const char *str)
{
int i = 0;
@ -371,7 +387,8 @@ int ddsi_raweth_init (void)
ddsi_raweth_factory_g.m_join_mc_fn = ddsi_raweth_join_mc;
ddsi_raweth_factory_g.m_leave_mc_fn = ddsi_raweth_leave_mc;
ddsi_raweth_factory_g.m_is_mcaddr_fn = ddsi_raweth_is_mcaddr;
ddsi_raweth_factory_g.m_is_nearby_address_fn = ddsi_ipaddr_is_nearby_address;
ddsi_raweth_factory_g.m_is_ssm_mcaddr_fn = ddsi_raweth_is_ssm_mcaddr;
ddsi_raweth_factory_g.m_is_nearby_address_fn = ddsi_raweth_is_nearby_address;
ddsi_raweth_factory_g.m_locator_from_string_fn = ddsi_raweth_address_from_string;
ddsi_raweth_factory_g.m_locator_to_string_fn = ddsi_raweth_to_string;
ddsi_raweth_factory_g.m_enumerate_interfaces_fn = ddsi_raweth_enumerate_interfaces;

View file

@ -1042,6 +1042,25 @@ static enum ddsi_locator_from_string_result ddsi_tcp_address_from_string (ddsi_t
return ddsi_ipaddr_from_string(tran, loc, str, ddsi_tcp_factory_g.m_kind);
}
static int ddsi_tcp_is_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc)
{
(void) tran;
(void) loc;
return 0;
}
static int ddsi_tcp_is_ssm_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc)
{
(void) tran;
(void) loc;
return 0;
}
static enum ddsi_nearby_address_result ddsi_tcp_is_nearby_address (ddsi_tran_factory_t tran, const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[])
{
return ddsi_ipaddr_is_nearby_address(tran, loc, ninterf, interf);
}
int ddsi_tcp_init (void)
{
static bool init = false;
@ -1063,6 +1082,9 @@ int ddsi_tcp_init (void)
ddsi_tcp_factory_g.m_locator_from_string_fn = ddsi_tcp_address_from_string;
ddsi_tcp_factory_g.m_locator_to_string_fn = ddsi_ipaddr_to_string;
ddsi_tcp_factory_g.m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces;
ddsi_tcp_factory_g.m_is_mcaddr_fn = ddsi_tcp_is_mcaddr;
ddsi_tcp_factory_g.m_is_ssm_mcaddr_fn = ddsi_tcp_is_ssm_mcaddr;
ddsi_tcp_factory_g.m_is_nearby_address_fn = ddsi_tcp_is_nearby_address;
ddsi_factory_add (&ddsi_tcp_factory_g);
#if OS_SOCKET_HAS_IPV6

View file

@ -19,6 +19,21 @@
static ddsi_tran_factory_t ddsi_tran_factories = NULL;
extern inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn);
extern inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn);
extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, int port, ddsi_tran_qos_t qos);
extern inline bool ddsi_factory_supports (ddsi_tran_factory_t factory, int32_t kind);
extern inline os_socket ddsi_conn_handle (ddsi_tran_conn_t conn);
extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc);
extern inline os_socket ddsi_tran_handle (ddsi_tran_base_t base);
extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos);
extern inline int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc);
extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc);
extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener);
extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener);
extern inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc);
extern inline ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags);
void ddsi_factory_add (ddsi_tran_factory_t factory)
{
factory->m_factory = ddsi_tran_factories;
@ -144,32 +159,6 @@ void ddsi_factory_conn_init (ddsi_tran_factory_t factory, ddsi_tran_conn_t conn)
conn->m_factory = factory;
}
ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc)
{
return (conn->m_closed) ? -1 : (conn->m_read_fn) (conn, buf, len, srcloc);
}
ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags)
{
ssize_t ret = -1;
if (! conn->m_closed)
{
ret = (conn->m_write_fn) (conn, dst, niov, iov, flags);
}
/* Check that write function is atomic (all or nothing) */
#ifndef NDEBUG
{
size_t i, len;
for (i = 0, len = 0; i < niov; i++) {
len += iov[i].iov_len;
}
assert (ret == -1 || (size_t) ret == len);
}
#endif
return ret;
}
void ddsi_conn_disable_multiplexing (ddsi_tran_conn_t conn)
{
if (conn->m_disable_multiplexing_fn) {
@ -202,11 +191,6 @@ int ddsi_conn_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const
return conn->m_factory->m_leave_mc_fn (conn, srcloc, mcloc, interf);
}
os_socket ddsi_tran_handle (ddsi_tran_base_t base)
{
return (base->m_handle_fn) (base);
}
ddsi_tran_qos_t ddsi_tran_create_qos (void)
{
ddsi_tran_qos_t qos;
@ -215,31 +199,6 @@ ddsi_tran_qos_t ddsi_tran_create_qos (void)
return qos;
}
ddsi_tran_conn_t ddsi_factory_create_conn
(
ddsi_tran_factory_t factory,
uint32_t port,
ddsi_tran_qos_t qos
)
{
return factory->m_create_conn_fn (port, qos);
}
int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc)
{
return (base->m_locator_fn) (base, loc);
}
int ddsi_listener_listen (ddsi_tran_listener_t listener)
{
return (listener->m_listen_fn) (listener);
}
ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener)
{
return (listener->m_accept_fn) (listener);
}
void ddsi_tran_free (ddsi_tran_base_t base)
{
if (base)
@ -274,21 +233,20 @@ void ddsi_listener_free (ddsi_tran_listener_t listener)
int ddsi_is_mcaddr (const nn_locator_t *loc)
{
/* FIXME: should set m_is_mcaddr_fn to a function returning false if transport doesn't provide an implementation, and get rid of the test */
ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind(loc->kind);
return tran && tran->m_is_mcaddr_fn ? tran->m_is_mcaddr_fn (tran, loc) : 0;
ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind (loc->kind);
return tran ? tran->m_is_mcaddr_fn (tran, loc) : 0;
}
int ddsi_is_ssm_mcaddr (const nn_locator_t *loc)
{
ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind(loc->kind);
return tran && tran->m_is_ssm_mcaddr_fn ? tran->m_is_ssm_mcaddr_fn (tran, loc) : 0;
return tran ? tran->m_is_ssm_mcaddr_fn (tran, loc) : 0;
}
enum ddsi_nearby_address_result ddsi_is_nearby_address (const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[])
{
ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind(loc->kind);
return tran->m_is_nearby_address_fn ? tran->m_is_nearby_address_fn (tran, loc, ninterf, interf) : DNAR_DISTANT;
return tran ? tran->m_is_nearby_address_fn (tran, loc, ninterf, interf) : DNAR_DISTANT;
}
enum ddsi_locator_from_string_result ddsi_locator_from_string (nn_locator_t *loc, const char *str)

View file

@ -14,5 +14,8 @@
extern inline uint16_t bswap2u (uint16_t x);
extern inline uint32_t bswap4u (uint32_t x);
extern inline uint64_t bswap8u (uint64_t x);
extern inline int16_t bswap2 (int16_t x);
extern inline int32_t bswap4 (int32_t x);
extern inline int64_t bswap8 (int64_t x);
extern inline void bswapSN (nn_sequence_number_t *sn);

View file

@ -29,7 +29,6 @@
#include "ddsi/q_lat_estim.h"
#include "ddsi/q_bitset.h"
#include "ddsi/q_xevent.h"
#include "ddsi/q_align.h"
#include "ddsi/q_addrset.h"
#include "ddsi/q_ddsi_discovery.h"
#include "ddsi/q_radmin.h"
@ -680,7 +679,7 @@ int create_multicast_sockets(void)
gv.disc_conn_mc = disc;
gv.data_conn_mc = data;
DDS_TRACE("Multicast Ports: discovery %d data %d \n",
ddsi_tran_port (gv.disc_conn_mc), ddsi_tran_port (gv.data_conn_mc));
ddsi_conn_port (gv.disc_conn_mc), ddsi_conn_port (gv.data_conn_mc));
return 1;
err_data:
@ -1113,7 +1112,7 @@ int rtps_init (void)
if (gv.m_factory->m_connless)
{
if (!(config.many_sockets_mode == MSM_NO_UNICAST && config.allowMulticast))
DDS_TRACE("Unicast Ports: discovery %d data %d\n", ddsi_tran_port (gv.disc_conn_uc), ddsi_tran_port (gv.data_conn_uc));
DDS_TRACE("Unicast Ports: discovery %d data %d\n", ddsi_conn_port (gv.disc_conn_uc), ddsi_conn_port (gv.data_conn_uc));
if (config.allowMulticast)
{
@ -1128,11 +1127,11 @@ int rtps_init (void)
/* Set multicast locators */
if (!is_unspec_locator(&gv.loc_spdp_mc))
gv.loc_spdp_mc.port = ddsi_tran_port (gv.disc_conn_mc);
gv.loc_spdp_mc.port = ddsi_conn_port (gv.disc_conn_mc);
if (!is_unspec_locator(&gv.loc_meta_mc))
gv.loc_meta_mc.port = ddsi_tran_port (gv.disc_conn_mc);
gv.loc_meta_mc.port = ddsi_conn_port (gv.disc_conn_mc);
if (!is_unspec_locator(&gv.loc_default_mc))
gv.loc_default_mc.port = ddsi_tran_port (gv.data_conn_mc);
gv.loc_default_mc.port = ddsi_conn_port (gv.data_conn_mc);
if (joinleave_spdp_defmcip (1) < 0)
goto err_mc_conn;
@ -1167,7 +1166,7 @@ int rtps_init (void)
/* Create shared transmit connection */
gv.tev_conn = gv.data_conn_uc;
DDS_TRACE("Timed event transmit port: %d\n", (int) ddsi_tran_port (gv.tev_conn));
DDS_TRACE("Timed event transmit port: %d\n", (int) ddsi_conn_port (gv.tev_conn));
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
{

View file

@ -21,7 +21,6 @@
#include "ddsi/q_bswap.h"
#include "ddsi/q_unused.h"
#include "ddsi/q_align.h"
#include "ddsi/q_error.h"
#include "ddsi/q_plist.h"
#include "ddsi/q_time.h"

View file

@ -32,7 +32,6 @@
#include "ddsi/q_config.h"
#include "ddsi/q_log.h"
#include "ddsi/q_align.h"
#include "ddsi/q_plist.h"
#include "ddsi/q_unused.h"
#include "ddsi/q_radmin.h"

View file

@ -30,7 +30,6 @@
#include "ddsi/q_lat_estim.h"
#include "ddsi/q_bitset.h"
#include "ddsi/q_xevent.h"
#include "ddsi/q_align.h"
#include "ddsi/q_addrset.h"
#include "ddsi/q_ddsi_discovery.h"
#include "ddsi/q_radmin.h"

View file

@ -34,7 +34,6 @@
#include "ddsi/q_log.h"
#include "ddsi/q_unused.h"
#include "ddsi/q_xmsg.h"
#include "ddsi/q_align.h"
#include "ddsi/q_config.h"
#include "ddsi/q_entity.h"
#include "ddsi/q_globals.h"
@ -1339,7 +1338,18 @@ static ssize_t nn_xpack_send1 (const nn_locator_t *loc, void * varg)
#endif
{
if (!gv.mute)
{
nbytes = ddsi_conn_write (xp->conn, loc, xp->niov, xp->iov, xp->call_flags);
#ifndef NDEBUG
{
size_t i, len;
for (i = 0, len = 0; i < xp->niov; i++) {
len += xp->iov[i].iov_len;
}
assert (nbytes == -1 || (size_t) nbytes == len);
}
#endif
}
else
{
DDS_TRACE("(dropped)");