diff --git a/src/core/ddsc/include/ddsc/dds_public_stream.h b/src/core/ddsc/include/ddsc/dds_public_stream.h index ef125c8..fed0f7a 100644 --- a/src/core/ddsc/include/ddsc/dds_public_stream.h +++ b/src/core/ddsc/include/ddsc/dds_public_stream.h @@ -76,11 +76,11 @@ DDS_EXPORT double dds_stream_read_double (dds_stream_t * is); DDS_EXPORT char * dds_stream_read_string (dds_stream_t * is); DDS_EXPORT void dds_stream_read_buffer (dds_stream_t * is, uint8_t * buffer, uint32_t len); -#define dds_stream_read_char(s) ((char) dds_stream_read_uint8 (s)) -#define dds_stream_read_int8(s) ((int8_t) dds_stream_read_uint8 (s)) -#define dds_stream_read_int16(s) ((int16_t) dds_stream_read_uint16 (s)) -#define dds_stream_read_int32(s) ((int32_t) dds_stream_read_uint32 (s)) -#define dds_stream_read_int64(s) ((int64_t) dds_stream_read_uint64 (s)) +inline char dds_stream_read_char (dds_stream_t *is) { return (char) dds_stream_read_uint8 (is); } +inline int8_t dds_stream_read_int8 (dds_stream_t *is) { return (int8_t) dds_stream_read_uint8 (is); } +inline int16_t dds_stream_read_int16 (dds_stream_t *is) { return (int16_t) dds_stream_read_uint16 (is); } +inline int32_t dds_stream_read_int32 (dds_stream_t *is) { return (int32_t) dds_stream_read_uint32 (is); } +inline int64_t dds_stream_read_int64 (dds_stream_t *is) { return (int64_t) dds_stream_read_uint64 (is); } DDS_EXPORT void dds_stream_write_bool (dds_stream_t * os, bool val); DDS_EXPORT void dds_stream_write_uint8 (dds_stream_t * os, uint8_t val); @@ -94,11 +94,11 @@ DDS_EXPORT void dds_stream_write_buffer (dds_stream_t * os, uint32_t len, const DDS_EXPORT void *dds_stream_address (dds_stream_t * s); DDS_EXPORT void *dds_stream_alignto (dds_stream_t * s, uint32_t a); -#define dds_stream_write_char(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v))) -#define dds_stream_write_int8(s,v) (dds_stream_write_uint8 ((s), (uint8_t)(v))) -#define dds_stream_write_int16(s,v) (dds_stream_write_uint16 ((s), (uint16_t)(v))) -#define dds_stream_write_int32(s,v) (dds_stream_write_uint32 ((s), (uint32_t)(v))) -#define dds_stream_write_int64(s,v) (dds_stream_write_uint64 ((s), (uint64_t)(v))) +inline void dds_stream_write_char (dds_stream_t * os, char val) { dds_stream_write_uint8 (os, (uint8_t) val); } +inline void dds_stream_write_int8 (dds_stream_t * os, int8_t val) { dds_stream_write_uint8 (os, (uint8_t) val); } +inline void dds_stream_write_int16 (dds_stream_t * os, int16_t val) { dds_stream_write_uint16 (os, (uint16_t) val); } +inline void dds_stream_write_int32 (dds_stream_t * os, int32_t val) { dds_stream_write_uint32 (os, (uint32_t) val); } +inline void dds_stream_write_int64 (dds_stream_t * os, int64_t val) { dds_stream_write_uint64 (os, (uint64_t) val); } #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__entity.h b/src/core/ddsc/src/dds__entity.h index 872278f..62285c9 100644 --- a/src/core/ddsc/src/dds__entity.h +++ b/src/core/ddsc/src/dds__entity.h @@ -45,11 +45,48 @@ dds_entity_listener_propagation( _In_opt_ void *metrics, _In_ bool propagate); -#define dds_entity_is_enabled(e, k) (((dds_entity*)e)->m_flags & DDS_ENTITY_ENABLED) +#define DEFINE_ENTITY_LOCK_UNLOCK(qualifier_, type_, kind_) \ + qualifier_ dds__retcode_t type_##_lock (dds_entity_t hdl, type_ **x) \ + { \ + dds__retcode_t rc; \ + dds_entity *e; \ + if ((rc = dds_entity_lock (hdl, kind_, &e)) != DDS_RETCODE_OK) \ + return rc; \ + *x = (type_ *) e; \ + return DDS_RETCODE_OK; \ + } \ + \ + qualifier_ void type_##_unlock (type_ *x) \ + { \ + dds_entity_unlock (&x->m_entity); \ + } +#define DECL_ENTITY_LOCK_UNLOCK(qualifier_, type_) \ + qualifier_ dds__retcode_t type_##_lock (dds_entity_t hdl, type_ **x); \ + qualifier_ void type_##_unlock (type_ *x); -#define dds_entity_status_set(e, t) (((dds_entity*)e)->m_trigger |= (((dds_entity*)e)->m_status_enable & t)) -#define dds_entity_status_reset(e,t) (((dds_entity*)e)->m_trigger &= ~t) -#define dds_entity_status_match(e,t) (((dds_entity*)e)->m_trigger & t) +inline bool dds_entity_is_enabled (const dds_entity *e) { + return (e->m_flags & DDS_ENTITY_ENABLED) != 0; +} + +inline void dds_entity_status_set (dds_entity *e, uint32_t t) { + e->m_trigger |= e->m_status_enable & t; +} + +inline void dds_entity_status_reset (dds_entity *e, uint32_t t) { + e->m_trigger &= ~t; +} + +inline bool dds_entity_status_match (const dds_entity *e, uint32_t t) { + return (e->m_trigger & t) != 0; +} + +inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) { + return (dds_entity_kind_t) (e->m_hdl & DDS_ENTITY_KIND_MASK); +} + +inline dds_entity_kind_t dds_entity_kind_from_handle (dds_entity_t hdl) { + return (hdl > 0) ? (dds_entity_kind_t) (hdl & DDS_ENTITY_KIND_MASK) : DDS_KIND_DONTCARE; +} /* The mutex needs to be unlocked when calling this because the entity can be called * within the signal callback from other contexts. That shouldn't deadlock. */ @@ -74,8 +111,6 @@ void dds_entity_unlock( _Inout_ dds_entity *e); -#define dds_entity_kind(hdl) ((hdl > 0) ? (hdl & DDS_ENTITY_KIND_MASK) : 0) - _Check_return_ dds__retcode_t dds_entity_observer_register_nl( _In_ dds_entity* observed, diff --git a/src/core/ddsc/src/dds__guardcond.h b/src/core/ddsc/src/dds__guardcond.h index 6497f70..5f1b030 100644 --- a/src/core/ddsc/src/dds__guardcond.h +++ b/src/core/ddsc/src/dds__guardcond.h @@ -18,4 +18,6 @@ _Must_inspect_result_ dds_guardcond* dds_create_guardcond( _In_ dds_participant *pp); +DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_guardcond, DDS_KIND_COND_GUARD) + #endif diff --git a/src/core/ddsc/src/dds__participant.h b/src/core/ddsc/src/dds__participant.h index 0299a7a..ef52d86 100644 --- a/src/core/ddsc/src/dds__participant.h +++ b/src/core/ddsc/src/dds__participant.h @@ -12,10 +12,13 @@ #ifndef _DDS_PPANT_H_ #define _DDS_PPANT_H_ +#include "dds__entity.h" + #if defined (__cplusplus) extern "C" { #endif +DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_participant, DDS_KIND_PARTICIPANT) #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__reader.h b/src/core/ddsc/src/dds__reader.h index 85ce0a0..48b1168 100644 --- a/src/core/ddsc/src/dds__reader.h +++ b/src/core/ddsc/src/dds__reader.h @@ -13,6 +13,7 @@ #define _DDS_READER_H_ #include "dds__types.h" +#include "dds__entity.h" #if defined (__cplusplus) extern "C" { @@ -20,7 +21,7 @@ extern "C" { struct status_cb_data; -void dds_reader_status_cb (void * entity, const struct status_cb_data * data); +void dds_reader_status_cb (void *entity, const struct status_cb_data * data); /* dds_reader_lock_samples: Returns number of samples in read cache and locks the @@ -40,8 +41,7 @@ struct nn_rsample_info; struct nn_rdata; DDS_EXPORT void dds_reader_ddsi2direct (dds_entity_t entity, void (*cb) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg), void *cbarg); -#define dds_reader_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_READER, (dds_entity**)obj) -#define dds_reader_unlock(obj) dds_entity_unlock((dds_entity*)obj); +DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_reader, DDS_KIND_READER) #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds__topic.h b/src/core/ddsc/src/dds__topic.h index 32bd8ca..cb81561 100644 --- a/src/core/ddsc/src/dds__topic.h +++ b/src/core/ddsc/src/dds__topic.h @@ -13,13 +13,13 @@ #define _DDS_TOPIC_H_ #include "dds__types.h" +#include "dds__entity.h" #if defined (__cplusplus) extern "C" { #endif -#define dds_topic_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_TOPIC, (dds_entity**)obj) -#define dds_topic_unlock(obj) dds_entity_unlock((dds_entity*)obj); +DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_topic, DDS_KIND_TOPIC) extern struct ddsi_sertopic * dds_topic_lookup (dds_domain * domain, const char * name); extern void dds_topic_free (dds_domainid_t domainid, struct ddsi_sertopic * st); diff --git a/src/core/ddsc/src/dds__writer.h b/src/core/ddsc/src/dds__writer.h index 25b21e8..c2c9336 100644 --- a/src/core/ddsc/src/dds__writer.h +++ b/src/core/ddsc/src/dds__writer.h @@ -18,8 +18,7 @@ extern "C" { #endif -#define dds_writer_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_WRITER, (dds_entity**)obj) -#define dds_writer_unlock(obj) dds_entity_unlock((dds_entity*)obj); +DEFINE_ENTITY_LOCK_UNLOCK(inline, dds_writer, DDS_KIND_WRITER) #if defined (__cplusplus) } diff --git a/src/core/ddsc/src/dds_builtin.c b/src/core/ddsc/src/dds_builtin.c index bff293b..d5ad2f1 100644 --- a/src/core/ddsc/src/dds_builtin.c +++ b/src/core/ddsc/src/dds_builtin.c @@ -148,19 +148,17 @@ dds_entity_t dds__get_builtin_subscriber (dds_entity_t e) dds_return_t ret; dds_entity_t pp; dds_participant *p; - dds_entity *part_entity; if ((pp = dds_get_participant (e)) <= 0) return pp; - if ((ret = dds_entity_lock (pp, DDS_KIND_PARTICIPANT, &part_entity)) < 0) + if ((ret = dds_participant_lock (pp, &p)) != DDS_RETCODE_OK) return ret; - p = (dds_participant *) part_entity; if (p->m_builtin_subscriber <= 0) { - p->m_builtin_subscriber = dds__create_builtin_subscriber (part_entity); + p->m_builtin_subscriber = dds__create_builtin_subscriber (&p->m_entity); } sub = p->m_builtin_subscriber; - dds_entity_unlock(part_entity); + dds_participant_unlock(p); return sub; } diff --git a/src/core/ddsc/src/dds_coherent.c b/src/core/ddsc/src/dds_coherent.c index bf6832d..9556776 100644 --- a/src/core/ddsc/src/dds_coherent.c +++ b/src/core/ddsc/src/dds_coherent.c @@ -27,7 +27,7 @@ dds_begin_coherent( { dds_return_t ret; - switch(dds_entity_kind(entity)) { + switch(dds_entity_kind_from_handle(entity)) { case DDS_KIND_READER: case DDS_KIND_WRITER: /* Invoking on a writer/reader behaves as if invoked on @@ -58,7 +58,7 @@ dds_end_coherent( { dds_return_t ret; - switch(dds_entity_kind(entity)) { + switch(dds_entity_kind_from_handle(entity)) { case DDS_KIND_READER: case DDS_KIND_WRITER: /* Invoking on a writer/reader behaves as if invoked on diff --git a/src/core/ddsc/src/dds_domain.c b/src/core/ddsc/src/dds_domain.c index 4c10db5..08613b8 100644 --- a/src/core/ddsc/src/dds_domain.c +++ b/src/core/ddsc/src/dds_domain.c @@ -27,7 +27,7 @@ const ut_avlTreedef_t dds_domaintree_def = UT_AVL_TREEDEF_INITIALIZER dds_domain * dds_domain_find_locked (dds_domainid_t id) { - return (dds_domain*) ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &id); + return ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &id); } dds_domain * dds_domain_create (dds_domainid_t id) diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index a18976d..7e679f7 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -25,6 +25,12 @@ #endif +extern inline bool dds_entity_is_enabled (const dds_entity *e); +extern inline void dds_entity_status_set (dds_entity *e, uint32_t t); +extern inline void dds_entity_status_reset (dds_entity *e, uint32_t t); +extern inline bool dds_entity_status_match (const dds_entity *e, uint32_t t); +extern inline dds_entity_kind_t dds_entity_kind (const dds_entity *e); +extern inline dds_entity_kind_t dds_entity_kind_from_handle (dds_entity_t hdl); static void dds_entity_observers_delete( @@ -83,7 +89,7 @@ dds_entity_listener_propagation( if (e) { rc = dds_entity_lock(e->m_hdl, DDS_KIND_DONTCARE, &dummy); if (rc == DDS_RETCODE_OK) { - dds_listener_t *l = (dds_listener_t *)(&e->m_listener); + dds_listener_t *l = &e->m_listener; assert(e == dummy); @@ -360,12 +366,12 @@ dds_delete_impl( * To circumvent the problem. We ignore topics in the first loop. */ child = e->m_children; - while ((child != NULL) && (dds_entity_kind(child->m_hdl) == DDS_KIND_TOPIC)) { + while ((child != NULL) && (dds_entity_kind_from_handle(child->m_hdl) == DDS_KIND_TOPIC)) { child = child->m_next; } while ((child != NULL) && (ret == DDS_RETCODE_OK)) { next = child->m_next; - while ((next != NULL) && (dds_entity_kind(next->m_hdl) == DDS_KIND_TOPIC)) { + while ((next != NULL) && (dds_entity_kind_from_handle(next->m_hdl) == DDS_KIND_TOPIC)) { next = next->m_next; } /* This will probably delete the child entry from @@ -377,7 +383,7 @@ dds_delete_impl( child = e->m_children; while ((child != NULL) && (ret == DDS_RETCODE_OK)) { next = child->m_next; - assert(dds_entity_kind(child->m_hdl) == DDS_KIND_TOPIC); + assert(dds_entity_kind_from_handle(child->m_hdl) == DDS_KIND_TOPIC); /* This will probably delete the child entry from * the current childrens list */ ret = dds_delete(child->m_hdl); @@ -1282,7 +1288,7 @@ dds_get_topic( if (rc == DDS_RETCODE_OK) { hdl = wr->m_topic->m_entity.m_hdl; dds_writer_unlock(wr); - } else if (dds_entity_kind(entity) == DDS_KIND_COND_READ || dds_entity_kind(entity) == DDS_KIND_COND_QUERY) { + } else if (dds_entity_kind_from_handle(entity) == DDS_KIND_COND_READ || dds_entity_kind_from_handle(entity) == DDS_KIND_COND_QUERY) { hdl = dds_get_topic(dds_get_parent(entity)); rc = DDS_RETCODE_OK; } diff --git a/src/core/ddsc/src/dds_guardcond.c b/src/core/ddsc/src/dds_guardcond.c index 3262aa6..a69f650 100644 --- a/src/core/ddsc/src/dds_guardcond.c +++ b/src/core/ddsc/src/dds_guardcond.c @@ -13,18 +13,20 @@ #include #include "dds__reader.h" #include "dds__guardcond.h" -#include "dds__entity.h" +#include "dds__participant.h" #include "dds__err.h" #include "ddsi/q_ephash.h" #include "ddsi/q_entity.h" #include "ddsi/q_thread.h" +DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_guardcond) + _Must_inspect_result_ dds_guardcond* dds_create_guardcond( _In_ dds_participant *pp) { dds_guardcond * gcond = dds_alloc(sizeof(*gcond)); - gcond->m_entity.m_hdl = dds_entity_init(&gcond->m_entity, (dds_entity*)pp, DDS_KIND_COND_GUARD, NULL, NULL, 0); + gcond->m_entity.m_hdl = dds_entity_init(&gcond->m_entity, &pp->m_entity, DDS_KIND_COND_GUARD, NULL, NULL, 0); return gcond; } @@ -34,15 +36,15 @@ dds_create_guardcondition( _In_ dds_entity_t participant) { dds_entity_t hdl; - dds_entity * pp; + dds_participant * pp; dds__retcode_t rc; - rc = dds_entity_lock(participant, DDS_KIND_PARTICIPANT, &pp); + rc = dds_participant_lock(participant, &pp); if (rc == DDS_RETCODE_OK) { - dds_guardcond *cond = dds_create_guardcond((dds_participant *)pp); + dds_guardcond *cond = dds_create_guardcond(pp); assert(cond); hdl = cond->m_entity.m_hdl; - dds_entity_unlock(pp); + dds_participant_unlock(pp); } else { DDS_ERROR("Error occurred on locking reader\n"); hdl = DDS_ERRNO(rc); @@ -64,10 +66,10 @@ dds_set_guardcondition( rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond); if (rc == DDS_RETCODE_OK) { if (triggered) { - dds_entity_status_set(gcond, DDS_WAITSET_TRIGGER_STATUS); + dds_entity_status_set(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); dds_entity_status_signal(&gcond->m_entity); } else { - dds_entity_status_reset(gcond, DDS_WAITSET_TRIGGER_STATUS); + dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); } dds_entity_unlock(&gcond->m_entity); ret = DDS_RETCODE_OK; @@ -91,10 +93,10 @@ dds_read_guardcondition( if (triggered != NULL) { *triggered = false; - rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond); + rc = dds_guardcond_lock(condition, &gcond); if (rc == DDS_RETCODE_OK) { - *triggered = dds_entity_status_match(gcond, DDS_WAITSET_TRIGGER_STATUS); - dds_entity_unlock((dds_entity*)gcond); + *triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + dds_guardcond_unlock(gcond); ret = DDS_RETCODE_OK; } else { DDS_ERROR("Argument condition is not valid\n"); @@ -120,11 +122,11 @@ dds_take_guardcondition( if (triggered != NULL) { *triggered = false; - rc = dds_entity_lock(condition, DDS_KIND_COND_GUARD, (dds_entity**)&gcond); + rc = dds_guardcond_lock(condition, &gcond); if (rc == DDS_RETCODE_OK) { - *triggered = dds_entity_status_match(gcond, DDS_WAITSET_TRIGGER_STATUS); - dds_entity_status_reset(gcond, DDS_WAITSET_TRIGGER_STATUS); - dds_entity_unlock((dds_entity*)gcond); + *triggered = dds_entity_status_match(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + dds_entity_status_reset(&gcond->m_entity, DDS_WAITSET_TRIGGER_STATUS); + dds_guardcond_unlock (gcond); ret = DDS_RETCODE_OK; } else { DDS_ERROR("Argument condition is not valid\n"); diff --git a/src/core/ddsc/src/dds_instance.c b/src/core/ddsc/src/dds_instance.c index ae2e05d..7912217 100644 --- a/src/core/ddsc/src/dds_instance.c +++ b/src/core/ddsc/src/dds_instance.c @@ -81,21 +81,22 @@ dds_instance_remove( } } -static const dds_topic* -dds_instance_info( - _In_ dds_entity *e) +static const dds_topic *dds_instance_info (dds_entity *e) { - const dds_topic *topic = NULL; - - assert (e); - assert ((dds_entity_kind(e->m_hdl) == DDS_KIND_READER) || (dds_entity_kind(e->m_hdl) == DDS_KIND_WRITER)); - - if (dds_entity_kind(e->m_hdl) == DDS_KIND_READER) { - topic = ((dds_reader*)e)->m_topic; - } else { - topic = ((dds_writer*)e)->m_topic; - } - return topic; + const dds_topic *topic; + switch (dds_entity_kind (e)) + { + case DDS_KIND_READER: + topic = ((dds_reader*) e)->m_topic; + break; + case DDS_KIND_WRITER: + topic = ((dds_writer*) e)->m_topic; + break; + default: + assert (0); + topic = NULL; + } + return topic; } static const dds_topic * dds_instance_info_by_hdl (dds_entity_t e) @@ -127,7 +128,7 @@ dds_register_instance( struct thread_state1 * const thr = lookup_thread_state(); const bool asleep = !vtime_awake_p(thr->vtime); struct ddsi_tkmap_instance * inst; - dds_entity *wr; + dds_writer *wr; dds_return_t ret; dds__retcode_t rc; @@ -141,7 +142,7 @@ dds_register_instance( ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); goto err; } - rc = dds_entity_lock(writer, DDS_KIND_WRITER, &wr); + rc = dds_writer_lock(writer, &wr); if (rc != DDS_RETCODE_OK) { DDS_ERROR("Error occurred on locking writer\n"); ret = DDS_ERRNO(rc); @@ -150,7 +151,7 @@ dds_register_instance( if (asleep) { thread_state_awake(thr); } - inst = dds_instance_find (((dds_writer*) wr)->m_topic, data, true); + inst = dds_instance_find (wr->m_topic, data, true); if(inst != NULL){ *handle = inst->m_iid; ret = DDS_RETCODE_OK; @@ -161,7 +162,7 @@ dds_register_instance( if (asleep) { thread_state_asleep(thr); } - dds_entity_unlock(wr); + dds_writer_unlock(wr); err: return ret; } @@ -197,8 +198,7 @@ dds_unregister_instance_ts( dds__retcode_t rc; bool autodispose = true; dds_write_action action = DDS_WR_ACTION_UNREGISTER; - void * sample = (void*) data; - dds_entity *wr; + dds_writer *wr; if (data == NULL){ DDS_ERROR("Argument data is NULL\n"); @@ -210,28 +210,28 @@ dds_unregister_instance_ts( ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); goto err; } - rc = dds_entity_lock(writer, DDS_KIND_WRITER, &wr); + rc = dds_writer_lock(writer, &wr); if (rc != DDS_RETCODE_OK) { DDS_ERROR("Error occurred on locking writer\n"); ret = DDS_ERRNO(rc); goto err; } - if (wr->m_qos) { - dds_qget_writer_data_lifecycle (wr->m_qos, &autodispose); + if (wr->m_entity.m_qos) { + dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose); } if (asleep) { thread_state_awake(thr); } if (autodispose) { - dds_instance_remove (((dds_writer*) wr)->m_topic, data, DDS_HANDLE_NIL); + dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL); action |= DDS_WR_DISPOSE_BIT; } - ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action); + ret = dds_write_impl (wr, data, timestamp, action); if (asleep) { thread_state_asleep(thr); } - dds_entity_unlock(wr); + dds_writer_unlock(wr); err: return ret; } @@ -249,21 +249,21 @@ dds_unregister_instance_ih_ts( dds__retcode_t rc; bool autodispose = true; dds_write_action action = DDS_WR_ACTION_UNREGISTER; - dds_entity *wr; + dds_writer *wr; struct ddsi_tkmap_instance *tk; - rc = dds_entity_lock(writer, DDS_KIND_WRITER, &wr); + rc = dds_writer_lock(writer, &wr); if (rc != DDS_RETCODE_OK) { DDS_ERROR("Error occurred on locking writer\n"); ret = DDS_ERRNO(rc); goto err; } - if (wr->m_qos) { - dds_qget_writer_data_lifecycle (wr->m_qos, &autodispose); + if (wr->m_entity.m_qos) { + dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose); } if (autodispose) { - dds_instance_remove (((dds_writer*) wr)->m_topic, NULL, handle); + dds_instance_remove (wr->m_topic, NULL, handle); action |= DDS_WR_DISPOSE_BIT; } @@ -272,11 +272,11 @@ dds_unregister_instance_ih_ts( } tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle); if (tk) { - struct ddsi_sertopic *tp = ((dds_writer*) wr)->m_topic->m_stopic; + struct ddsi_sertopic *tp = wr->m_topic->m_stopic; void *sample = ddsi_sertopic_alloc_sample (tp); ddsi_serdata_topicless_to_sample (tp, tk->m_sample, sample, NULL, NULL); ddsi_tkmap_instance_unref (tk); - ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action); + ret = dds_write_impl (wr, sample, timestamp, action); ddsi_sertopic_free_sample (tp, sample, DDS_FREE_ALL); } else { DDS_ERROR("No instance related with the provided handle is found\n"); @@ -285,7 +285,7 @@ dds_unregister_instance_ih_ts( if (asleep) { thread_state_asleep(thr); } - dds_entity_unlock(wr); + dds_writer_unlock(wr); err: return ret; } @@ -392,7 +392,7 @@ dds_dispose_ih_ts( thread_state_awake(thr); } if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle)) != NULL) { - struct ddsi_sertopic *tp = ((dds_writer*) wr)->m_topic->m_stopic; + struct ddsi_sertopic *tp = wr->m_topic->m_stopic; void *sample = ddsi_sertopic_alloc_sample (tp); ddsi_serdata_topicless_to_sample (tp, tk->m_sample, sample, NULL, NULL); ddsi_tkmap_instance_unref (tk); diff --git a/src/core/ddsc/src/dds_listener.c b/src/core/ddsc/src/dds_listener.c index dcc26a6..f4e5e51 100644 --- a/src/core/ddsc/src/dds_listener.c +++ b/src/core/ddsc/src/dds_listener.c @@ -180,7 +180,7 @@ void dds_lset_data_available (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_data_available_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_data_available = callback; + listener->on_data_available = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -190,7 +190,7 @@ void dds_lset_data_on_readers (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_data_on_readers_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_data_on_readers = callback; + listener->on_data_on_readers = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -200,7 +200,7 @@ void dds_lset_inconsistent_topic (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_inconsistent_topic_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_inconsistent_topic = callback; + listener->on_inconsistent_topic = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -210,7 +210,7 @@ void dds_lset_liveliness_changed (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_liveliness_changed_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_liveliness_changed = callback; + listener->on_liveliness_changed = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -220,7 +220,7 @@ void dds_lset_liveliness_lost (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_liveliness_lost_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_liveliness_lost = callback; + listener->on_liveliness_lost = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -230,7 +230,7 @@ void dds_lset_offered_deadline_missed (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_offered_deadline_missed_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_offered_deadline_missed = callback; + listener->on_offered_deadline_missed = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -240,7 +240,7 @@ void dds_lset_offered_incompatible_qos (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_offered_incompatible_qos_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_offered_incompatible_qos = callback; + listener->on_offered_incompatible_qos = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -250,7 +250,7 @@ void dds_lset_publication_matched (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_publication_matched_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_publication_matched = callback; + listener->on_publication_matched = callback; } else { DDS_ERROR("Argument listener is NULL"); } @@ -260,7 +260,7 @@ void dds_lset_requested_deadline_missed (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_requested_deadline_missed_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_requested_deadline_missed = callback; + listener->on_requested_deadline_missed = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -270,7 +270,7 @@ void dds_lset_requested_incompatible_qos (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_requested_incompatible_qos_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_requested_incompatible_qos = callback; + listener->on_requested_incompatible_qos = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -280,7 +280,7 @@ void dds_lset_sample_lost (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_sample_lost_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_sample_lost = callback; + listener->on_sample_lost = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -290,7 +290,7 @@ void dds_lset_sample_rejected (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_sample_rejected_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_sample_rejected = callback; + listener->on_sample_rejected = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -300,7 +300,7 @@ void dds_lset_subscription_matched (_Inout_ dds_listener_t * __restrict listener, _In_opt_ dds_on_subscription_matched_fn callback) { if (listener) { - ((c_listener_t*)listener)->on_subscription_matched = callback; + listener->on_subscription_matched = callback; } else { DDS_ERROR("Argument listener is NULL\n"); } @@ -321,7 +321,7 @@ dds_lget_data_available (_In_ const dds_listener_t * __restrict listener, _Outpt DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_data_available; + *callback = listener->on_data_available; } void @@ -335,7 +335,7 @@ dds_lget_data_on_readers (_In_ const dds_listener_t * __restrict listener, _Outp DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_data_on_readers; + *callback = listener->on_data_on_readers; } void dds_lget_inconsistent_topic (_In_ const dds_listener_t * __restrict listener, _Outptr_result_maybenull_ dds_on_inconsistent_topic_fn *callback) @@ -348,7 +348,7 @@ void dds_lget_inconsistent_topic (_In_ const dds_listener_t * __restrict listene DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_inconsistent_topic; + *callback = listener->on_inconsistent_topic; } void @@ -362,7 +362,7 @@ dds_lget_liveliness_changed (_In_ const dds_listener_t * __restrict listener, _O DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_liveliness_changed; + *callback = listener->on_liveliness_changed; } void @@ -376,7 +376,7 @@ dds_lget_liveliness_lost (_In_ const dds_listener_t * __restrict listener, _Outp DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_liveliness_lost; + *callback = listener->on_liveliness_lost; } void @@ -390,7 +390,7 @@ dds_lget_offered_deadline_missed (_In_ const dds_listener_t * __restrict listene DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_offered_deadline_missed; + *callback = listener->on_offered_deadline_missed; } void @@ -404,7 +404,7 @@ dds_lget_offered_incompatible_qos (_In_ const dds_listener_t * __restrict listen DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_offered_incompatible_qos; + *callback = listener->on_offered_incompatible_qos; } void @@ -418,7 +418,7 @@ dds_lget_publication_matched (_In_ const dds_listener_t * __restrict listener, _ DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_publication_matched; + *callback = listener->on_publication_matched; } void @@ -432,7 +432,7 @@ dds_lget_requested_deadline_missed (_In_ const dds_listener_t * __restrict liste DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_requested_deadline_missed; + *callback = listener->on_requested_deadline_missed; } void @@ -446,7 +446,7 @@ dds_lget_requested_incompatible_qos (_In_ const dds_listener_t * __restrict list DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_requested_incompatible_qos; + *callback = listener->on_requested_incompatible_qos; } void @@ -460,7 +460,7 @@ dds_lget_sample_lost (_In_ const dds_listener_t *__restrict listener, _Outptr_re DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_sample_lost; + *callback = listener->on_sample_lost; } void @@ -474,7 +474,7 @@ dds_lget_sample_rejected (_In_ const dds_listener_t *__restrict listener, _Outp DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_sample_rejected; + *callback = listener->on_sample_rejected; } void @@ -488,5 +488,5 @@ dds_lget_subscription_matched (_In_ const dds_listener_t * __restrict listener, DDS_ERROR("Argument listener is NULL\n"); return ; } - *callback = ((c_listener_t*)listener)->on_subscription_matched; + *callback = listener->on_subscription_matched; } diff --git a/src/core/ddsc/src/dds_participant.c b/src/core/ddsc/src/dds_participant.c index 5daf5a5..5a56ea5 100644 --- a/src/core/ddsc/src/dds_participant.c +++ b/src/core/ddsc/src/dds_participant.c @@ -20,6 +20,8 @@ #include "dds__err.h" #include "dds__builtin.h" +DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_participant) + #define DDS_PARTICIPANT_STATUS_MASK 0u /* List of created participants */ @@ -51,7 +53,7 @@ dds_participant_delete( assert(e); assert(thr); - assert(dds_entity_kind(e->m_hdl) == DDS_KIND_PARTICIPANT); + assert(dds_entity_kind_from_handle(e->m_hdl) == DDS_KIND_PARTICIPANT); if (asleep) { thread_state_awake(thr); diff --git a/src/core/ddsc/src/dds_publisher.c b/src/core/ddsc/src/dds_publisher.c index 807f5b7..33d10f9 100644 --- a/src/core/ddsc/src/dds_publisher.c +++ b/src/core/ddsc/src/dds_publisher.c @@ -149,7 +149,7 @@ dds_suspend( { dds_return_t ret; - if(dds_entity_kind(publisher) != DDS_KIND_PUBLISHER) { + if(dds_entity_kind_from_handle(publisher) != DDS_KIND_PUBLISHER) { DDS_ERROR("Provided entity is not a publisher kind\n"); ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); goto err; @@ -169,7 +169,7 @@ dds_resume( { dds_return_t ret = DDS_RETCODE_OK; - if(dds_entity_kind(publisher) != DDS_KIND_PUBLISHER) { + if(dds_entity_kind_from_handle(publisher) != DDS_KIND_PUBLISHER) { DDS_ERROR("Provided entity is not a publisher kind\n"); ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); goto err; @@ -194,7 +194,7 @@ dds_wait_for_acks( /* TODO: CHAM-125 Currently unsupported. */ OS_UNUSED_ARG(timeout); - switch(dds_entity_kind(publisher_or_writer)) { + switch(dds_entity_kind_from_handle(publisher_or_writer)) { case DDS_KIND_WRITER: DDS_ERROR("Wait for acknowledgments on a writer is not being supported yet\n"); ret = DDS_ERRNO(DDS_RETCODE_UNSUPPORTED); diff --git a/src/core/ddsc/src/dds_read.c b/src/core/ddsc/src/dds_read.c index ef7f16f..197aa9b 100644 --- a/src/core/ddsc/src/dds_read.c +++ b/src/core/ddsc/src/dds_read.c @@ -21,59 +21,53 @@ #include "ddsi/q_entity.h" #include "ddsi/ddsi_sertopic.h" - -static _Check_return_ dds__retcode_t -dds_read_lock( - _In_ dds_entity_t hdl, - _Out_ dds_reader **reader, - _Out_ dds_readcond **condition, - _In_ bool only_reader) +static dds__retcode_t dds_read_lock (dds_entity_t hdl, dds_reader **reader, dds_readcond **condition, bool only_reader) { - dds__retcode_t rc = hdl; - assert(reader); - assert(condition); - *reader = NULL; - *condition = NULL; - - rc = dds_entity_lock(hdl, DDS_KIND_READER, (dds_entity**)reader); - if (rc == DDS_RETCODE_ILLEGAL_OPERATION) { - if (!only_reader) { - if ((dds_entity_kind(hdl) == DDS_KIND_COND_READ ) || (dds_entity_kind(hdl) == DDS_KIND_COND_QUERY) ){ - rc = dds_entity_lock(hdl, DDS_KIND_DONTCARE, (dds_entity**)condition); - if (rc == DDS_RETCODE_OK) { - dds_entity *parent = ((dds_entity*)*condition)->m_parent; - assert(parent); - rc = dds_entity_lock(parent->m_hdl, DDS_KIND_READER, (dds_entity**)reader); - if (rc != DDS_RETCODE_OK) { - dds_entity_unlock((dds_entity*)*condition); - DDS_ERROR("Failed to lock condition reader\n"); - } - } else { - DDS_ERROR("Failed to lock condition\n"); - } - } else { - DDS_ERROR("Given entity is not a reader nor a condition\n"); - } - } else { - DDS_ERROR("Given entity is not a reader\n"); - } - } else if (rc != DDS_RETCODE_OK) { - DDS_ERROR("Failed to lock reader\n"); - } + dds__retcode_t rc; + dds_entity *entity, *parent_entity; + if ((rc = dds_entity_lock (hdl, DDS_KIND_DONTCARE, &entity)) != DDS_RETCODE_OK) + { return rc; + } + else if (dds_entity_kind (entity) == DDS_KIND_READER) + { + *reader = (dds_reader *) entity; + *condition = NULL; + return DDS_RETCODE_OK; + } + else if (only_reader) + { + dds_entity_unlock (entity); + DDS_ERROR ("Given entity is not a reader\n"); + return DDS_RETCODE_ILLEGAL_OPERATION; + } + else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) + { + dds_entity_unlock (entity); + DDS_ERROR ("Given entity is a reader nor a condition\n"); + return DDS_RETCODE_ILLEGAL_OPERATION; + } + else if ((rc = dds_entity_lock (entity->m_parent->m_hdl, DDS_KIND_READER, &parent_entity)) != DDS_RETCODE_OK) + { + dds_entity_unlock (entity); + DDS_ERROR ("Failed to lock condition's reader\n"); + return rc; + } + else + { + *reader = (dds_reader *) parent_entity; + *condition = (dds_readcond *) entity; + return DDS_RETCODE_OK; + } } -static void -dds_read_unlock( - _In_ dds_reader *reader, - _In_ dds_readcond *condition) +static void dds_read_unlock (dds_reader *reader, dds_readcond *condition) { - assert(reader); - dds_entity_unlock((dds_entity*)reader); - if (condition) { - dds_entity_unlock((dds_entity*)condition); - } + dds_entity_unlock (&reader->m_entity); + if (condition) + dds_entity_unlock (&condition->m_entity); } + /* dds_read_impl: Core read/take function. Usually maxs is size of buf and si into which samples/status are written, when set to zero is special case @@ -172,10 +166,10 @@ dds_read_impl( ret = (dds_return_t)dds_rhc_read(rd->m_rd->rhc, lock, buf, si, maxs, mask, hand, cond); } /* read/take resets data available status */ - dds_entity_status_reset(rd, DDS_DATA_AVAILABLE_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); /* reset DATA_ON_READERS status on subscriber after successful read/take */ - if (dds_entity_kind(((dds_entity*)rd)->m_parent->m_hdl) == DDS_KIND_SUBSCRIBER) { - dds_entity_status_reset(((dds_entity*)rd)->m_parent, DDS_DATA_ON_READERS_STATUS); + if (dds_entity_kind_from_handle(rd->m_entity.m_parent->m_hdl) == DDS_KIND_SUBSCRIBER) { + dds_entity_status_reset(rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); } dds_read_unlock(rd, cond); @@ -227,13 +221,13 @@ dds_readcdr_impl( ); /* read/take resets data available status */ - dds_entity_status_reset(rd, DDS_DATA_AVAILABLE_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); /* reset DATA_ON_READERS status on subscriber after successful read/take */ - if (dds_entity_kind(((dds_entity*)rd)->m_parent->m_hdl) == DDS_KIND_SUBSCRIBER) + if (dds_entity_kind_from_handle(rd->m_entity.m_parent->m_hdl) == DDS_KIND_SUBSCRIBER) { - dds_entity_status_reset(((dds_entity*)rd)->m_parent, DDS_DATA_ON_READERS_STATUS); + dds_entity_status_reset(rd->m_entity.m_parent, DDS_DATA_ON_READERS_STATUS); } dds_read_unlock(rd, cond); } else { diff --git a/src/core/ddsc/src/dds_readcond.c b/src/core/ddsc/src/dds_readcond.c index f555a14..1ec86cc 100644 --- a/src/core/ddsc/src/dds_readcond.c +++ b/src/core/ddsc/src/dds_readcond.c @@ -41,7 +41,7 @@ dds_create_readcond( cond->m_sample_states = mask & DDS_ANY_SAMPLE_STATE; cond->m_view_states = mask & DDS_ANY_VIEW_STATE; cond->m_instance_states = mask & DDS_ANY_INSTANCE_STATE; - cond->m_rd_guid = ((dds_entity*)rd)->m_guid; + cond->m_rd_guid = rd->m_entity.m_guid; dds_rhc_add_readcondition (cond); return cond; } @@ -70,17 +70,13 @@ dds_create_readcondition( return hdl; } -_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_READ ) || \ - ((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_QUERY) ) -dds_entity_t -dds_get_datareader( - _In_ dds_entity_t condition) +dds_entity_t dds_get_datareader (dds_entity_t condition) { dds_entity_t hdl; - if (dds_entity_kind(condition) == DDS_KIND_COND_READ) { + if (dds_entity_kind_from_handle(condition) == DDS_KIND_COND_READ) { hdl = dds_get_parent(condition); - } else if (dds_entity_kind(condition) == DDS_KIND_COND_QUERY) { + } else if (dds_entity_kind_from_handle(condition) == DDS_KIND_COND_QUERY) { hdl = dds_get_parent(condition); } else { DDS_ERROR("Argument condition is not valid\n"); @@ -90,39 +86,26 @@ dds_get_datareader( return hdl; } - -_Pre_satisfies_(((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_READ ) || \ - ((condition & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_QUERY) ) -_Check_return_ dds_return_t -dds_get_mask( - _In_ dds_entity_t condition, - _Out_ uint32_t *mask) +dds_return_t dds_get_mask (dds_entity_t condition, uint32_t *mask) { - dds_return_t ret; - dds_readcond *cond; - dds__retcode_t rc; + dds_entity *entity; + dds__retcode_t rc; - if (mask != NULL) { - *mask = 0; - if ((dds_entity_kind(condition) == DDS_KIND_COND_READ ) || - (dds_entity_kind(condition) == DDS_KIND_COND_QUERY) ){ - rc = dds_entity_lock(condition, DDS_KIND_DONTCARE, (dds_entity**)&cond); - if (rc == DDS_RETCODE_OK) { - *mask = (cond->m_sample_states | cond->m_view_states | cond->m_instance_states); - dds_entity_unlock((dds_entity*)cond); - ret = DDS_RETCODE_OK; - } else{ - DDS_ERROR("Error occurred on locking condition\n"); - ret = DDS_ERRNO(rc); - } - } else { - DDS_ERROR("Argument condition is not valid\n"); - ret = DDS_ERRNO(dds_valid_hdl(condition, DDS_KIND_COND_READ)); - } - } else { - DDS_ERROR("Argument mask is NULL\n"); - ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER); - } + if (mask == NULL) + return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); - return ret; + if ((rc = dds_entity_lock (condition, DDS_KIND_DONTCARE, &entity)) != DDS_RETCODE_OK) + return DDS_ERRNO (rc); + else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) + { + dds_entity_unlock (entity); + return DDS_ERRNO (dds_valid_hdl (condition, DDS_KIND_COND_READ)); + } + else + { + dds_readcond *cond = (dds_readcond *) entity; + *mask = (cond->m_sample_states | cond->m_view_states | cond->m_instance_states); + dds_entity_unlock (entity); + return DDS_RETCODE_OK; + } } diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index 87c6792..1526142 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -19,6 +19,7 @@ #include "dds__init.h" #include "dds__rhc.h" #include "dds__err.h" +#include "dds__topic.h" #include "ddsi/q_entity.h" #include "ddsi/q_thread.h" #include "dds__builtin.h" @@ -27,6 +28,7 @@ #include "os/os.h" +DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_reader) #define DDS_READER_STATUS_MASK \ DDS_SAMPLE_REJECTED_STATUS |\ @@ -162,9 +164,10 @@ dds_reader_status_validate( void dds_reader_status_cb( - void *entity, + void *ventity, const status_cb_data_t *data) { + struct dds_entity * const entity = ventity; dds_reader *rd; dds__retcode_t rc; void *metrics = NULL; @@ -173,14 +176,14 @@ dds_reader_status_cb( if (data == NULL) { /* Release the initial claim that was done during the create. This * will indicate that further API deletion is now possible. */ - ut_handle_release(((dds_entity*)entity)->m_hdl, ((dds_entity*)entity)->m_hdllink); + ut_handle_release(entity->m_hdl, ((dds_entity*)entity)->m_hdllink); return; } - if (dds_reader_lock(((dds_entity*)entity)->m_hdl, &rd) != DDS_RETCODE_OK) { + if (dds_reader_lock(entity->m_hdl, &rd) != DDS_RETCODE_OK) { return; } - assert(rd == entity); + assert(&rd->m_entity == entity); /* Reset the status for possible Listener call. * When a listener is not called, the status will be set (again). */ @@ -192,20 +195,20 @@ dds_reader_status_cb( rd->m_requested_deadline_missed_status.total_count++; rd->m_requested_deadline_missed_status.total_count_change++; rd->m_requested_deadline_missed_status.last_instance_handle = data->handle; - metrics = (void*)&(rd->m_requested_deadline_missed_status); + metrics = &rd->m_requested_deadline_missed_status; break; } case DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS: { rd->m_requested_incompatible_qos_status.total_count++; rd->m_requested_incompatible_qos_status.total_count_change++; rd->m_requested_incompatible_qos_status.last_policy_id = data->extra; - metrics = (void*)&(rd->m_requested_incompatible_qos_status); + metrics = &rd->m_requested_incompatible_qos_status; break; } case DDS_SAMPLE_LOST_STATUS: { rd->m_sample_lost_status.total_count++; rd->m_sample_lost_status.total_count_change++; - metrics = (void*)&(rd->m_sample_lost_status); + metrics = &rd->m_sample_lost_status; break; } case DDS_SAMPLE_REJECTED_STATUS: { @@ -213,7 +216,7 @@ dds_reader_status_cb( rd->m_sample_rejected_status.total_count_change++; rd->m_sample_rejected_status.last_reason = data->extra; rd->m_sample_rejected_status.last_instance_handle = data->handle; - metrics = (void*)&(rd->m_sample_rejected_status); + metrics = &rd->m_sample_rejected_status; break; } case DDS_DATA_AVAILABLE_STATUS: { @@ -233,7 +236,7 @@ dds_reader_status_cb( rd->m_liveliness_changed_status.not_alive_count_change++; } rd->m_liveliness_changed_status.last_publication_handle = data->handle; - metrics = (void*)&(rd->m_liveliness_changed_status); + metrics = &rd->m_liveliness_changed_status; break; } case DDS_SUBSCRIPTION_MATCHED_STATUS: { @@ -247,7 +250,7 @@ dds_reader_status_cb( rd->m_subscription_matched_status.current_count_change--; } rd->m_subscription_matched_status.last_publication_handle = data->handle; - metrics = (void*)&(rd->m_subscription_matched_status); + metrics = &rd->m_subscription_matched_status; break; } default: assert (0); @@ -283,8 +286,8 @@ dds_reader_status_cb( if (rc == DDS_RETCODE_OK) { /* Event was eaten by a listener. */ - if (dds_reader_lock(((dds_entity*)entity)->m_hdl, &rd) == DDS_RETCODE_OK) { - assert(rd == entity); + if (dds_reader_lock(entity->m_hdl, &rd) == DDS_RETCODE_OK) { + assert(&rd->m_entity == entity); /* Reset the change counts of the metrics. */ switch (data->status) { @@ -353,16 +356,16 @@ dds_create_reader( dds_entity_t subscriber; dds_reader * rd; struct rhc * rhc; - dds_entity * tp; + dds_topic * tp; dds_entity_t reader; dds_entity_t t; struct thread_state1 * const thr = lookup_thread_state (); const bool asleep = !vtime_awake_p (thr->vtime); dds_return_t ret = DDS_RETCODE_OK; - if (dds_entity_kind(topic) != DDS_KIND_INTERNAL) { + if (dds_entity_kind_from_handle(topic) != DDS_KIND_INTERNAL) { /* Try claiming a participant. If that's not working, then it could be a subscriber. */ - if (dds_entity_kind(participant_or_subscriber) == DDS_KIND_PARTICIPANT) { + if (dds_entity_kind_from_handle(participant_or_subscriber) == DDS_KIND_PARTICIPANT) { subscriber = dds_create_subscriber(participant_or_subscriber, qos, NULL); } else { subscriber = participant_or_subscriber; @@ -381,19 +384,19 @@ dds_create_reader( } if ((subscriber != participant_or_subscriber) && - (dds_entity_kind(topic) != DDS_KIND_INTERNAL)) { + (dds_entity_kind_from_handle(topic) != DDS_KIND_INTERNAL)) { /* Delete implicit subscriber if reader creation fails */ sub->m_flags |= DDS_ENTITY_IMPLICIT; } - rc = dds_entity_lock(t, DDS_KIND_TOPIC, &tp); + rc = dds_topic_lock(t, &tp); if (rc != DDS_RETCODE_OK) { DDS_ERROR("Error occurred on locking topic\n"); reader = DDS_ERRNO(rc); goto err_tp_lock; } - assert (((dds_topic*)tp)->m_stopic); - assert (sub->m_domain == tp->m_domain); + assert (tp->m_stopic); + assert (sub->m_domain == tp->m_entity.m_domain); /* Merge qos from topic and subscriber */ rqos = dds_create_qos (); @@ -407,8 +410,8 @@ dds_create_reader( dds_merge_qos (rqos, sub->m_qos); } - if (tp->m_qos) { - dds_merge_qos (rqos, tp->m_qos); + if (tp->m_entity.m_qos) { + dds_merge_qos (rqos, tp->m_entity.m_qos); /* reset the following qos policies if set during topic qos merge as they aren't applicable for reader */ rqos->present &= ~(QP_DURABILITY_SERVICE | QP_TRANSPORT_PRIORITY | QP_LIFESPAN); @@ -423,7 +426,7 @@ dds_create_reader( } /* Additional checks required for built-in topics */ - if (dds_entity_kind(topic) == DDS_KIND_INTERNAL && !dds__validate_builtin_reader_qos(topic, qos)) { + if (dds_entity_kind_from_handle(topic) == DDS_KIND_INTERNAL && !dds__validate_builtin_reader_qos(topic, qos)) { dds_delete_qos(rqos); DDS_ERROR("Invalid QoS specified for built-in topic reader"); reader = DDS_ERRNO(DDS_RETCODE_INCONSISTENT_POLICY); @@ -434,9 +437,9 @@ dds_create_reader( rd = dds_alloc (sizeof (*rd)); reader = dds_entity_init (&rd->m_entity, sub, DDS_KIND_READER, rqos, listener, DDS_READER_STATUS_MASK); rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED; - rd->m_topic = (dds_topic*)tp; - rhc = dds_rhc_new (rd, ((dds_topic*)tp)->m_stopic); - dds_entity_add_ref_nolock (tp); + rd->m_topic = tp; + rhc = dds_rhc_new (rd, tp->m_stopic); + dds_entity_add_ref_nolock (&tp->m_entity); rd->m_entity.m_deriver.close = dds_reader_close; rd->m_entity.m_deriver.delete = dds_reader_delete; rd->m_entity.m_deriver.set_qos = dds_reader_qos_set; @@ -449,16 +452,16 @@ dds_create_reader( assert(0); } - os_mutexUnlock(&tp->m_mutex); + os_mutexUnlock(&tp->m_entity.m_mutex); os_mutexUnlock(&sub->m_mutex); if (asleep) { thread_state_awake (thr); } - rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic, + rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_participant->m_guid, tp->m_stopic, rqos, rhc, dds_reader_status_cb, rd); os_mutexLock(&sub->m_mutex); - os_mutexLock(&tp->m_mutex); + os_mutexLock(&tp->m_entity.m_mutex); assert (rd->m_rd); if (asleep) { thread_state_asleep (thr); @@ -468,10 +471,10 @@ dds_create_reader( if (dds_global.m_dur_reader && (rd->m_entity.m_qos->durability.kind > NN_TRANSIENT_LOCAL_DURABILITY_QOS)) { (dds_global.m_dur_reader) (rd, rhc); } - dds_entity_unlock(tp); + dds_topic_unlock(tp); dds_entity_unlock(sub); - if (dds_entity_kind(topic) == DDS_KIND_INTERNAL) { + if (dds_entity_kind_from_handle(topic) == DDS_KIND_INTERNAL) { /* If topic is builtin, then the topic entity is local and should * be deleted because the application won't. */ dds_delete(t); @@ -480,14 +483,14 @@ dds_create_reader( return reader; err_bad_qos: - dds_entity_unlock(tp); + dds_topic_unlock(tp); err_tp_lock: dds_entity_unlock(sub); if((sub->m_flags & DDS_ENTITY_IMPLICIT) != 0){ (void)dds_delete(subscriber); } err_sub_lock: - if (dds_entity_kind(topic) == DDS_KIND_INTERNAL) { + if (dds_entity_kind_from_handle(topic) == DDS_KIND_INTERNAL) { /* If topic is builtin, then the topic entity is local and should * be deleted because the application won't. */ dds_delete(t); @@ -495,11 +498,7 @@ err_sub_lock: return reader; } -void -dds_reader_ddsi2direct( - dds_entity_t entity, - ddsi2direct_directread_cb_t cb, - void *cbarg) +void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb, void *cbarg) { dds_reader *dds_rd; @@ -540,77 +539,62 @@ dds_reader_ddsi2direct( os_mutexLock (&rd->e.lock); } os_mutexUnlock (&rd->e.lock); - ut_handle_release(entity, ((dds_entity*)rd)->m_hdllink); + ut_handle_release(entity, dds_rd->m_entity.m_hdllink); } } -uint32_t -dds_reader_lock_samples( - dds_entity_t reader) +uint32_t dds_reader_lock_samples (dds_entity_t reader) { - uint32_t ret = 0; - dds_reader *rd; - if (dds_reader_lock(reader, &rd) == DDS_RETCODE_OK) { - ret = dds_rhc_lock_samples(rd->m_rd->rhc); - dds_reader_unlock(rd); - } else { - ret = 0; - } - return ret; + dds_reader *rd; + uint32_t n; + if (dds_reader_lock (reader, &rd) != DDS_RETCODE_OK) + return 0; + n = dds_rhc_lock_samples (rd->m_rd->rhc); + dds_reader_unlock (rd); + return n; } -_Pre_satisfies_((reader & DDS_ENTITY_KIND_MASK) == DDS_KIND_READER) -int -dds_reader_wait_for_historical_data( - dds_entity_t reader, - dds_duration_t max_wait) +int dds_reader_wait_for_historical_data (dds_entity_t reader, dds_duration_t max_wait) { - int ret; - dds_reader *rd; - - assert (reader); - - ret = dds_reader_lock(reader, &rd); - if (ret == DDS_RETCODE_OK) { - if (((dds_entity*)rd)->m_qos->durability.kind > NN_TRANSIENT_LOCAL_DURABILITY_QOS) { - ret = (dds_global.m_dur_wait) (rd, max_wait); - } else { - DDS_ERROR("Can not wait for historical data on a reader with volatile durability\n"); - ret = DDS_ERRNO(DDS_RETCODE_ERROR); - } - dds_reader_unlock(rd); - } else { - DDS_ERROR("Error occurred on locking reader\n"); - ret = DDS_ERRNO(ret); - } - - return ret; + dds_reader *rd; + int ret; + if ((ret = dds_reader_lock (reader, &rd)) != DDS_RETCODE_OK) + return DDS_ERRNO (ret); + switch (rd->m_entity.m_qos->durability.kind) + { + case DDS_DURABILITY_VOLATILE: + ret = DDS_RETCODE_OK; + break; + case DDS_DURABILITY_TRANSIENT_LOCAL: + break; + case DDS_DURABILITY_TRANSIENT: + case DDS_DURABILITY_PERSISTENT: + ret = (dds_global.m_dur_wait) (rd, max_wait); + break; + } + dds_reader_unlock(rd); + return ret; } -_Pre_satisfies_(((entity & DDS_ENTITY_KIND_MASK) == DDS_KIND_READER ) || \ - ((entity & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_READ ) || \ - ((entity & DDS_ENTITY_KIND_MASK) == DDS_KIND_COND_QUERY) ) -dds_entity_t -dds_get_subscriber( - _In_ dds_entity_t entity) +dds_entity_t dds_get_subscriber (dds_entity_t entity) { - dds_entity_t hdl; + dds_entity_t hdl; + if (dds_entity_kind_from_handle (entity) == DDS_KIND_READER) + hdl = dds_get_parent (entity); + else if (dds_entity_kind_from_handle (entity) == DDS_KIND_COND_READ || dds_entity_kind_from_handle (entity) == DDS_KIND_COND_QUERY) + { + hdl = dds_get_parent (entity); + if (hdl > 0) + hdl = dds_get_subscriber (hdl); + DDS_ERROR ("Reader of this condition is already deleted\n"); + } + else + { + DDS_ERROR ("Provided entity is not a reader nor a condition\n"); + hdl = DDS_ERRNO (dds_valid_hdl (entity, DDS_KIND_READER)); + } - if (dds_entity_kind(entity) == DDS_KIND_READER) { - hdl = dds_get_parent(entity); - } else if (dds_entity_kind(entity) == DDS_KIND_COND_READ || dds_entity_kind(entity) == DDS_KIND_COND_QUERY) { - hdl = dds_get_parent(entity); - if(hdl > 0){ - hdl = dds_get_subscriber(hdl); - } else { - DDS_ERROR("Reader of this condition is already deleted\n"); - } - } else { - DDS_ERROR("Provided entity is not a reader nor a condition\n"); - hdl = DDS_ERRNO(dds_valid_hdl(entity, DDS_KIND_READER)); - } - - return hdl; + return hdl; } _Pre_satisfies_((reader & DDS_ENTITY_KIND_MASK) == DDS_KIND_READER) @@ -633,10 +617,10 @@ dds_get_subscription_matched_status ( if (status) { *status = rd->m_subscription_matched_status; } - if (((dds_entity*)rd)->m_status_enable & DDS_SUBSCRIPTION_MATCHED_STATUS) { + if (rd->m_entity.m_status_enable & DDS_SUBSCRIPTION_MATCHED_STATUS) { rd->m_subscription_matched_status.total_count_change = 0; rd->m_subscription_matched_status.current_count_change = 0; - dds_entity_status_reset(rd, DDS_SUBSCRIPTION_MATCHED_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_SUBSCRIPTION_MATCHED_STATUS); } dds_reader_unlock(rd); fail: @@ -663,10 +647,10 @@ dds_get_liveliness_changed_status ( if (status) { *status = rd->m_liveliness_changed_status; } - if (((dds_entity*)rd)->m_status_enable & DDS_LIVELINESS_CHANGED_STATUS) { + if (rd->m_entity.m_status_enable & DDS_LIVELINESS_CHANGED_STATUS) { rd->m_liveliness_changed_status.alive_count_change = 0; rd->m_liveliness_changed_status.not_alive_count_change = 0; - dds_entity_status_reset(rd, DDS_LIVELINESS_CHANGED_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_LIVELINESS_CHANGED_STATUS); } dds_reader_unlock(rd); fail: @@ -692,10 +676,10 @@ dds_return_t dds_get_sample_rejected_status ( if (status) { *status = rd->m_sample_rejected_status; } - if (((dds_entity*)rd)->m_status_enable & DDS_SAMPLE_REJECTED_STATUS) { + if (rd->m_entity.m_status_enable & DDS_SAMPLE_REJECTED_STATUS) { rd->m_sample_rejected_status.total_count_change = 0; rd->m_sample_rejected_status.last_reason = DDS_NOT_REJECTED; - dds_entity_status_reset(rd, DDS_SAMPLE_REJECTED_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_SAMPLE_REJECTED_STATUS); } dds_reader_unlock(rd); fail: @@ -721,9 +705,9 @@ dds_return_t dds_get_sample_lost_status ( if (status) { *status = rd->m_sample_lost_status; } - if (((dds_entity*)rd)->m_status_enable & DDS_SAMPLE_LOST_STATUS) { + if (rd->m_entity.m_status_enable & DDS_SAMPLE_LOST_STATUS) { rd->m_sample_lost_status.total_count_change = 0; - dds_entity_status_reset(rd, DDS_SAMPLE_LOST_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_SAMPLE_LOST_STATUS); } dds_reader_unlock(rd); fail: @@ -749,9 +733,9 @@ dds_return_t dds_get_requested_deadline_missed_status ( if (status) { *status = rd->m_requested_deadline_missed_status; } - if (((dds_entity*)rd)->m_status_enable & DDS_REQUESTED_DEADLINE_MISSED_STATUS) { + if (rd->m_entity.m_status_enable & DDS_REQUESTED_DEADLINE_MISSED_STATUS) { rd->m_requested_deadline_missed_status.total_count_change = 0; - dds_entity_status_reset(rd, DDS_REQUESTED_DEADLINE_MISSED_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_REQUESTED_DEADLINE_MISSED_STATUS); } dds_reader_unlock(rd); fail: @@ -777,9 +761,9 @@ dds_return_t dds_get_requested_incompatible_qos_status ( if (status) { *status = rd->m_requested_incompatible_qos_status; } - if (((dds_entity*)rd)->m_status_enable & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS) { + if (rd->m_entity.m_status_enable & DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS) { rd->m_requested_incompatible_qos_status.total_count_change = 0; - dds_entity_status_reset(rd, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS); + dds_entity_status_reset(&rd->m_entity, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS); } dds_reader_unlock(rd); fail: diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index d71e599..f06754a 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -310,13 +310,40 @@ struct trigger_info bool has_changed; }; -#define QMASK_OF_SAMPLE(s) ((s)->isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE) -#define QMASK_OF_INVSAMPLE(i) ((i)->inv_isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE) -#define INST_NSAMPLES(i) ((i)->nvsamples + (i)->inv_exists) -#define INST_NREAD(i) ((i)->nvread + (unsigned)((i)->inv_exists & (i)->inv_isread)) -#define INST_IS_EMPTY(i) (INST_NSAMPLES (i) == 0) -#define INST_HAS_READ(i) (INST_NREAD (i) > 0) -#define INST_HAS_UNREAD(i) (INST_NREAD (i) < INST_NSAMPLES (i)) +static unsigned qmask_of_sample (const struct rhc_sample *s) +{ + return s->isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE; +} + +static unsigned qmask_of_invsample (const struct rhc_instance *i) +{ + return i->inv_isread ? DDS_READ_SAMPLE_STATE : DDS_NOT_READ_SAMPLE_STATE; +} + +static uint32_t inst_nsamples (const struct rhc_instance *i) +{ + return i->nvsamples + i->inv_exists; +} + +static uint32_t inst_nread (const struct rhc_instance *i) +{ + return i->nvread + (uint32_t) (i->inv_exists & i->inv_isread); +} + +static bool inst_is_empty (const struct rhc_instance *i) +{ + return inst_nsamples (i) == 0; +} + +static bool inst_has_read (const struct rhc_instance *i) +{ + return inst_nread (i) > 0; +} + +static bool inst_has_unread (const struct rhc_instance *i) +{ + return inst_nread (i) < inst_nsamples (i); +} static unsigned qmask_of_inst (const struct rhc_instance *inst); static bool update_conditions_locked (struct rhc *rhc, const struct trigger_info *pre, const struct trigger_info *post, const struct ddsi_serdata *sample); @@ -363,7 +390,7 @@ static void add_inst_to_nonempty_list (_Inout_ struct rhc *rhc, _Inout_ struct r static void remove_inst_from_nonempty_list (struct rhc *rhc, struct rhc_instance *inst) { - assert (INST_IS_EMPTY (inst)); + assert (inst_is_empty (inst)); #ifndef NDEBUG { const struct rhc_instance *x = rhc->nonempty_instances; @@ -488,7 +515,7 @@ static void free_instance (void *vnode, void *varg) struct rhc *rhc = varg; struct rhc_instance *inst = vnode; struct rhc_sample *s = inst->latest; - const bool was_empty = INST_IS_EMPTY (inst); + const bool was_empty = inst_is_empty (inst); if (s) { do { @@ -559,8 +586,8 @@ static void init_trigger_info_nonmatch (struct trigger_info *info) static void get_trigger_info (struct trigger_info *info, struct rhc_instance *inst, bool pre) { info->qminst = qmask_of_inst (inst); - info->has_read = INST_HAS_READ (inst); - info->has_not_read = INST_HAS_UNREAD (inst); + info->has_read = inst_has_read (inst); + info->has_not_read = inst_has_unread (inst); /* reset instance has_changed before adding/overwriting a sample */ if (pre) { @@ -749,7 +776,7 @@ static void update_inst static void drop_instance_noupdate_no_writers (struct rhc *rhc, struct rhc_instance *inst) { int ret; - assert (INST_IS_EMPTY (inst)); + assert (inst_is_empty (inst)); rhc->n_instances--; @@ -799,7 +826,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 inst->no_writers_gen++; DDS_TRACE("new1"); - if (!INST_IS_EMPTY (inst) && !inst->isdisposed) + if (!inst_is_empty (inst) && !inst->isdisposed) rhc->n_not_alive_no_writers--; } else if (inst_wr_iid == 0 && inst->wrcount == 1) @@ -876,7 +903,7 @@ static void dds_rhc_register (struct rhc *rhc, struct rhc_instance *inst, uint64 static void account_for_empty_to_nonempty_transition (struct rhc *rhc, struct rhc_instance *inst) { - assert (INST_NSAMPLES (inst) == 1); + assert (inst_nsamples (inst) == 1); add_inst_to_nonempty_list (rhc, inst); rhc->n_new += inst->isnew; if (inst->isdisposed) @@ -953,7 +980,7 @@ static int rhc_unregister_updateinst } else { - if (!INST_IS_EMPTY (inst)) + if (!inst_is_empty (inst)) { /* Instance still has content - do not drop until application takes the last sample. Set the invalid sample if the latest @@ -985,7 +1012,7 @@ static int rhc_unregister_updateinst { /* Add invalid samples for transition to no-writers */ DDS_TRACE(",#0,empty,nowriters"); - assert (INST_IS_EMPTY (inst)); + assert (inst_is_empty (inst)); inst_set_invsample (rhc, inst); update_inst (rhc, inst, pwr_info, false, tstamp); account_for_empty_to_nonempty_transition (rhc, inst); @@ -1228,7 +1255,7 @@ bool dds_rhc_store const int not_alive = inst->wrcount == 0 || inst->isdisposed; const bool old_isdisposed = inst->isdisposed; const bool old_isnew = inst->isnew; - const bool was_empty = INST_IS_EMPTY (inst); + const bool was_empty = inst_is_empty (inst); int inst_became_disposed = 0; /* Not just an unregister, so a write and/or a dispose (possibly @@ -1314,7 +1341,7 @@ bool dds_rhc_store } else { - assert (INST_IS_EMPTY (inst) == was_empty); + assert (inst_is_empty (inst) == was_empty); } } @@ -1369,7 +1396,7 @@ bool dds_rhc_store if (rhc->reader && trigger_waitsets) { - dds_entity_status_signal((dds_entity*)(rhc->reader)); + dds_entity_status_signal(&rhc->reader->m_entity); } return delivered; @@ -1447,7 +1474,7 @@ void dds_rhc_unregister_wr } else { - const bool was_empty = INST_IS_EMPTY (inst); + const bool was_empty = inst_is_empty (inst); inst_set_invsample (rhc, inst); if (was_empty) account_for_empty_to_nonempty_transition (rhc, inst); @@ -1476,7 +1503,7 @@ void dds_rhc_unregister_wr if (trigger_waitsets) { - dds_entity_status_signal((dds_entity*)(rhc->reader)); + dds_entity_status_signal(&rhc->reader->m_entity); } } @@ -1663,11 +1690,11 @@ static int dds_rhc_read_w_qminv { if (handle == DDS_HANDLE_NIL || inst->iid == handle) { - if (!INST_IS_EMPTY (inst) && (qmask_of_inst (inst) & qminv) == 0) + if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0) { /* samples present & instance, view state matches */ struct trigger_info pre, post; - const unsigned nread = INST_NREAD (inst); + const unsigned nread = inst_nread (inst); const uint32_t n_first = n; get_trigger_info (&pre, inst, true); @@ -1676,13 +1703,13 @@ static int dds_rhc_read_w_qminv struct rhc_sample *sample = inst->latest->next, * const end1 = sample; do { - if ((QMASK_OF_SAMPLE (sample) & qminv) == 0) + if ((qmask_of_sample (sample) & qminv) == 0) { /* sample state matches too */ set_sample_info (info_seq + n, inst, sample); ddsi_serdata_to_sample (sample->sample, values[n], 0, 0); if (cond == NULL - || (dds_entity_kind(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY) + || (dds_entity_kind_from_handle(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY) || (cond->m_query.m_filter != NULL && cond->m_query.m_filter(values[n]))) { if (!sample->isread) @@ -1709,7 +1736,7 @@ static int dds_rhc_read_w_qminv while (sample != end1); } - if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0) + if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0) { set_sample_info_invsample (info_seq + n, inst); ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0); @@ -1726,7 +1753,7 @@ static int dds_rhc_read_w_qminv inst->isnew = 0; rhc->n_new--; } - if (nread != INST_NREAD (inst)) + if (nread != inst_nread (inst)) { get_trigger_info (&post, inst, false); if (update_conditions_locked (rhc, &pre, &post, NULL)) @@ -1754,7 +1781,7 @@ static int dds_rhc_read_w_qminv if (trigger_waitsets) { - dds_entity_status_signal((dds_entity*)(rhc->reader)); + dds_entity_status_signal(&rhc->reader->m_entity); } assert (n <= INT_MAX); @@ -1792,7 +1819,7 @@ static int dds_rhc_take_w_qminv iid = inst->iid; if (handle == DDS_HANDLE_NIL || iid == handle) { - if (!INST_IS_EMPTY (inst) && (qmask_of_inst (inst) & qminv) == 0) + if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0) { struct trigger_info pre, post; unsigned nvsamples = inst->nvsamples; @@ -1807,7 +1834,7 @@ static int dds_rhc_take_w_qminv { struct rhc_sample * const sample1 = sample->next; - if ((QMASK_OF_SAMPLE (sample) & qminv) != 0) + if ((qmask_of_sample (sample) & qminv) != 0) { psample = sample; } @@ -1816,7 +1843,7 @@ static int dds_rhc_take_w_qminv set_sample_info (info_seq + n, inst, sample); ddsi_serdata_to_sample (sample->sample, values[n], 0, 0); if (cond == NULL - || (dds_entity_kind(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY) + || (dds_entity_kind_from_handle(cond->m_entity.m_hdl) != DDS_KIND_COND_QUERY) || ( cond->m_query.m_filter != NULL && cond->m_query.m_filter(values[n]))) { rhc->n_vsamples--; @@ -1855,7 +1882,7 @@ static int dds_rhc_take_w_qminv } } - if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0) + if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0) { set_sample_info_invsample (info_seq + n, inst); ddsi_serdata_topicless_to_sample (rhc->topic, inst->tk->m_sample, values[n], 0, 0); @@ -1880,7 +1907,7 @@ static int dds_rhc_take_w_qminv } } - if (INST_IS_EMPTY (inst)) + if (inst_is_empty (inst)) { remove_inst_from_nonempty_list (rhc, inst); @@ -1918,7 +1945,7 @@ static int dds_rhc_take_w_qminv if (trigger_waitsets) { - dds_entity_status_signal((dds_entity*)(rhc->reader)); + dds_entity_status_signal(&rhc->reader->m_entity); } assert (n <= INT_MAX); @@ -1957,7 +1984,7 @@ static int dds_rhc_takecdr_w_qminv iid = inst->iid; if (handle == DDS_HANDLE_NIL || iid == handle) { - if (!INST_IS_EMPTY (inst) && (qmask_of_inst (inst) & qminv) == 0) + if (!inst_is_empty (inst) && (qmask_of_inst (inst) & qminv) == 0) { struct trigger_info pre, post; unsigned nvsamples = inst->nvsamples; @@ -1972,7 +1999,7 @@ static int dds_rhc_takecdr_w_qminv { struct rhc_sample * const sample1 = sample->next; - if ((QMASK_OF_SAMPLE (sample) & qminv) != 0) + if ((qmask_of_sample (sample) & qminv) != 0) { psample = sample; } @@ -2006,7 +2033,7 @@ static int dds_rhc_takecdr_w_qminv } } - if (inst->inv_exists && n < max_samples && (QMASK_OF_INVSAMPLE (inst) & qminv) == 0) + if (inst->inv_exists && n < max_samples && (qmask_of_invsample (inst) & qminv) == 0) { set_sample_info_invsample (info_seq + n, inst); values[n] = ddsi_serdata_ref(inst->tk->m_sample); @@ -2031,7 +2058,7 @@ static int dds_rhc_takecdr_w_qminv } } - if (INST_IS_EMPTY (inst)) + if (inst_is_empty (inst)) { remove_inst_from_nonempty_list (rhc, inst); @@ -2069,7 +2096,7 @@ static int dds_rhc_takecdr_w_qminv if (trigger_waitsets) { - dds_entity_status_signal((dds_entity*)(rhc->reader)); + dds_entity_status_signal(&rhc->reader->m_entity); } assert (n <= INT_MAX); @@ -2086,15 +2113,15 @@ static uint32_t rhc_get_cond_trigger (struct rhc_instance * const inst, const dd switch (c->m_sample_states) { case DDS_SST_READ: - m = m && INST_HAS_READ (inst); + m = m && inst_has_read (inst); break; case DDS_SST_NOT_READ: - m = m && INST_HAS_UNREAD (inst); + m = m && inst_has_unread (inst); break; case DDS_SST_READ | DDS_SST_NOT_READ: case 0: /* note: we get here only if inst not empty, so this is a no-op */ - m = m && !INST_IS_EMPTY (inst); + m = m && !inst_is_empty (inst); break; default: DDS_FATAL("update_readconditions: sample_states invalid: %x\n", c->m_sample_states); @@ -2118,11 +2145,11 @@ void dds_rhc_add_readcondition (dds_readcond * cond) os_mutexLock (&rhc->lock); for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter)) { - if (dds_entity_kind(cond->m_entity.m_hdl) == DDS_KIND_COND_READ) + if (dds_entity_kind_from_handle(cond->m_entity.m_hdl) == DDS_KIND_COND_READ) { - ((dds_entity*)cond)->m_trigger += rhc_get_cond_trigger (inst, cond); - if (((dds_entity*)cond)->m_trigger) { - dds_entity_status_signal((dds_entity*)cond); + cond->m_entity.m_trigger += rhc_get_cond_trigger (inst, cond); + if (cond->m_entity.m_trigger) { + dds_entity_status_signal(&cond->m_entity); } } } @@ -2225,7 +2252,7 @@ static bool update_conditions_locked } else if (m_pre < m_post) { - if (sample && tmp == NULL && (dds_entity_kind(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY)) + if (sample && tmp == NULL && (dds_entity_kind_from_handle(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY)) { tmp = ddsi_sertopic_alloc_sample (rhc->topic); ddsi_serdata_to_sample (sample, tmp, NULL, NULL); @@ -2233,7 +2260,7 @@ static bool update_conditions_locked if ( (sample == NULL) - || (dds_entity_kind(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY) + || (dds_entity_kind_from_handle(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY) || (iter->m_query.m_filter != NULL && iter->m_query.m_filter (tmp)) ) { @@ -2335,7 +2362,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds) for (inst = ut_hhIterFirst (rhc->instances, &iter); inst; inst = ut_hhIterNext (&iter)) { n_instances++; - if (!INST_IS_EMPTY (inst)) + if (!inst_is_empty (inst)) { /* samples present (or an invalid sample is) */ unsigned n_vsamples_in_instance = 0, n_read_vsamples_in_instance = 0; @@ -2387,7 +2414,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds) dds_readcond * rciter = rhc->conds; for (i = 0; i < (rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS); i++) { - if (dds_entity_kind(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ) + if (dds_entity_kind_from_handle(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ) { cond_match_count[i] += rhc_get_cond_trigger (inst, rciter); } @@ -2412,7 +2439,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds) dds_readcond * rciter = rhc->conds; for (i = 0; i < (rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS); i++) { - if (dds_entity_kind(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ) + if (dds_entity_kind_from_handle(rciter->m_entity.m_hdl) == DDS_KIND_COND_READ) { assert (cond_match_count[i] == rciter->m_entity.m_trigger); } @@ -2433,7 +2460,7 @@ static int rhc_check_counts_locked (struct rhc *rhc, bool check_conds) inst = rhc->nonempty_instances; n_nonempty_instances = 0; do { - assert (!INST_IS_EMPTY (inst)); + assert (!inst_is_empty (inst)); assert (prev->next == inst); assert (inst->prev == prev); prev = inst; diff --git a/src/core/ddsc/src/dds_stream.c b/src/core/ddsc/src/dds_stream.c index 339a73b..0861bb0 100644 --- a/src/core/ddsc/src/dds_stream.c +++ b/src/core/ddsc/src/dds_stream.c @@ -40,10 +40,8 @@ static const char * stream_op_type[11] = const uint32_t dds_op_size[5] = { 0, 1u, 2u, 4u, 8u }; -static void dds_stream_write - (dds_stream_t * os, const char * data, const uint32_t * ops); -static void dds_stream_read - (dds_stream_t * is, char * data, const uint32_t * ops); +static void dds_stream_write (dds_stream_t * os, const char * data, const uint32_t * ops); +static void dds_stream_read (dds_stream_t * is, char * data, const uint32_t * ops); #define DDS_SWAP16(v) \ ((uint16_t)(((v) >> 8) | ((v) << 8))) @@ -258,6 +256,12 @@ uint64_t dds_stream_read_uint64 (dds_stream_t * is) return val; } +extern inline char dds_stream_read_char (dds_stream_t *is); +extern inline int8_t dds_stream_read_int8 (dds_stream_t *is); +extern inline int16_t dds_stream_read_int16 (dds_stream_t *is); +extern inline int32_t dds_stream_read_int32 (dds_stream_t *is); +extern inline int64_t dds_stream_read_int64 (dds_stream_t *is); + float dds_stream_read_float (dds_stream_t * is) { float val = 0.0; @@ -417,6 +421,12 @@ void dds_stream_write_uint64 (dds_stream_t * os, uint64_t val) DDS_OS_PUT8 (os, val, uint64_t); } +extern inline void dds_stream_write_char (dds_stream_t * os, char val); +extern inline void dds_stream_write_int8 (dds_stream_t * os, int8_t val); +extern inline void dds_stream_write_int16 (dds_stream_t * os, int16_t val); +extern inline void dds_stream_write_int32 (dds_stream_t * os, int32_t val); +extern inline void dds_stream_write_int64 (dds_stream_t * os, int64_t val); + void dds_stream_write_float (dds_stream_t * os, float val) { union { float f; uint32_t u; } u; diff --git a/src/core/ddsc/src/dds_subscriber.c b/src/core/ddsc/src/dds_subscriber.c index b11ae58..7e6cab4 100644 --- a/src/core/ddsc/src/dds_subscriber.c +++ b/src/core/ddsc/src/dds_subscriber.c @@ -101,22 +101,20 @@ dds_subscriber_status_validate( Set boolean on readers that indicates state of DATA_ON_READERS status on parent subscriber */ -static dds_return_t -dds_subscriber_status_propagate( - dds_entity *sub, - uint32_t mask, - bool set) +static dds_return_t dds_subscriber_status_propagate (dds_entity *sub, uint32_t mask, bool set) { - if (mask & DDS_DATA_ON_READERS_STATUS) { - dds_entity *iter = sub->m_children; - while (iter) { - os_mutexLock (&iter->m_mutex); - ((dds_reader*) iter)->m_data_on_readers = set; - os_mutexUnlock (&iter->m_mutex); - iter = iter->m_next; - } + if (mask & DDS_DATA_ON_READERS_STATUS) + { + dds_entity *iter = sub->m_children; + while (iter) { + os_mutexLock (&iter->m_mutex); + assert (dds_entity_kind (iter) == DDS_KIND_READER); + ((dds_reader*) iter)->m_data_on_readers = set; + os_mutexUnlock (&iter->m_mutex); + iter = iter->m_next; } - return DDS_RETCODE_OK; + } + return DDS_RETCODE_OK; } _Requires_exclusive_lock_held_(participant) diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index 1466fde..9364199 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -26,6 +26,8 @@ #include "os/os_atomics.h" #include "ddsi/ddsi_iid.h" +DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_topic) + #define DDS_TOPIC_STATUS_MASK \ DDS_INCONSISTENT_TOPIC_STATUS @@ -97,7 +99,7 @@ dds_topic_status_cb( dds_topic *topic; dds__retcode_t rc; - if (dds_topic_lock(((dds_entity*)cb_t)->m_hdl, &topic) != DDS_RETCODE_OK) { + if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) != DDS_RETCODE_OK) { return; } assert(topic == cb_t); @@ -116,24 +118,24 @@ dds_topic_status_cb( dds_topic_unlock(topic); /* Is anybody interested within the entity hierarchy through listeners? */ - rc = dds_entity_listener_propagation((dds_entity*)topic, - (dds_entity*)topic, + rc = dds_entity_listener_propagation(&topic->m_entity, + &topic->m_entity, DDS_INCONSISTENT_TOPIC_STATUS, - (void*)&(topic->m_inconsistent_topic_status), + &topic->m_inconsistent_topic_status, true); if (rc == DDS_RETCODE_OK) { /* Event was eaten by a listener. */ - if (dds_topic_lock(((dds_entity*)cb_t)->m_hdl, &topic) == DDS_RETCODE_OK) { + if (dds_topic_lock(cb_t->m_entity.m_hdl, &topic) == DDS_RETCODE_OK) { /* Reset the change counts of the metrics. */ topic->m_inconsistent_topic_status.total_count_change = 0; dds_topic_unlock(topic); } } else if (rc == DDS_RETCODE_NO_DATA) { /* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */ - dds_entity_status_set((dds_entity*)topic, DDS_INCONSISTENT_TOPIC_STATUS); + dds_entity_status_set(&topic->m_entity, DDS_INCONSISTENT_TOPIC_STATUS); /* Notify possible interested observers. */ - dds_entity_status_signal((dds_entity*)topic); + dds_entity_status_signal(&topic->m_entity); } else if (rc == DDS_RETCODE_ALREADY_DELETED) { /* An entity up the hierarchy is being deleted; consider it successful. */ } else { @@ -184,7 +186,7 @@ dds_topic_free( assert (st); os_mutexLock (&dds_global.m_mutex); - domain = (dds_domain*) ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &domainid); + domain = ut_avlLookup (&dds_domaintree_def, &dds_global.m_domains, &domainid); if (domain != NULL) { ut_avlDelete (&dds_topictree_def, &domain->m_topics, st); } @@ -715,9 +717,9 @@ dds_get_inconsistent_topic_status( if (status) { *status = t->m_inconsistent_topic_status; } - if (((dds_entity*)t)->m_status_enable & DDS_INCONSISTENT_TOPIC_STATUS) { + if (t->m_entity.m_status_enable & DDS_INCONSISTENT_TOPIC_STATUS) { t->m_inconsistent_topic_status.total_count_change = 0; - dds_entity_status_reset(t, DDS_INCONSISTENT_TOPIC_STATUS); + dds_entity_status_reset(&t->m_entity, DDS_INCONSISTENT_TOPIC_STATUS); } dds_topic_unlock(t); fail: diff --git a/src/core/ddsc/src/dds_waitset.c b/src/core/ddsc/src/dds_waitset.c index 3d927c5..a6c2c08 100644 --- a/src/core/ddsc/src/dds_waitset.c +++ b/src/core/ddsc/src/dds_waitset.c @@ -17,10 +17,7 @@ #include "dds__rhc.h" #include "dds__err.h" - -#define dds_waitset_lock(hdl, obj) dds_entity_lock(hdl, DDS_KIND_WAITSET, (dds_entity**)obj) -#define dds_waitset_unlock(obj) dds_entity_unlock((dds_entity*)obj); - +DEFINE_ENTITY_LOCK_UNLOCK(static, dds_waitset, DDS_KIND_WAITSET) static void dds_waitset_swap( @@ -41,18 +38,16 @@ dds_waitset_swap( *dst = idx; } -static void -dds_waitset_signal_entity( - _In_ dds_waitset *ws) +static void dds_waitset_signal_entity (dds_waitset *ws) { - dds_entity *e = (dds_entity*)ws; - /* When signaling any observers of us through the entity, - * we need to be unlocked. We still have claimed the related - * handle, so possible deletions will be delayed until we - * release it. */ - os_mutexUnlock(&(e->m_mutex)); - dds_entity_status_signal(e); - os_mutexLock(&(e->m_mutex)); + dds_entity *e = &ws->m_entity; + /* When signaling any observers of us through the entity, + * we need to be unlocked. We still have claimed the related + * handle, so possible deletions will be delayed until we + * release it. */ + os_mutexUnlock (&e->m_mutex); + dds_entity_status_signal (e); + os_mutexLock (&e->m_mutex); } static dds_return_t @@ -214,8 +209,8 @@ dds_waitset_close( { dds_waitset *ws = (dds_waitset*)e; - dds_waitset_close_list(&(ws->observed), e->m_hdl); - dds_waitset_close_list(&(ws->triggered), e->m_hdl); + dds_waitset_close_list(&ws->observed, e->m_hdl); + dds_waitset_close_list(&ws->triggered, e->m_hdl); /* Trigger waitset to wake up. */ os_condBroadcast(&e->m_cond); @@ -368,7 +363,7 @@ dds_waitset_attach( e = NULL; } } else { - e = (dds_entity*)ws; + e = &ws->m_entity; } /* This will fail if given entity is already attached (or deleted). */ @@ -421,7 +416,7 @@ dds_waitset_detach( if (rc == DDS_RETCODE_OK) { /* Possibly fails when entity was not attached. */ if (waitset == entity) { - rc = dds_entity_observer_unregister_nl((dds_entity*)ws, waitset); + rc = dds_entity_observer_unregister_nl(&ws->m_entity, waitset); } else { rc = dds_entity_observer_unregister(entity, waitset); } @@ -497,9 +492,9 @@ dds_waitset_set_trigger( goto fail; } if (trigger) { - dds_entity_status_set(ws, DDS_WAITSET_TRIGGER_STATUS); + dds_entity_status_set(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); } else { - dds_entity_status_reset(ws, DDS_WAITSET_TRIGGER_STATUS); + dds_entity_status_reset(&ws->m_entity, DDS_WAITSET_TRIGGER_STATUS); } dds_waitset_signal_entity(ws); dds_waitset_unlock(ws); diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 044f2c6..b9d7cd3 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -20,10 +20,13 @@ #include "dds__qos.h" #include "dds__err.h" #include "dds__init.h" +#include "dds__topic.h" #include "ddsi/ddsi_tkmap.h" #include "dds__whc.h" #include "ddsc/ddsc_project.h" +DECL_ENTITY_LOCK_UNLOCK(extern inline, dds_writer) + #define DDS_WRITER_STATUS_MASK \ DDS_LIVELINESS_LOST_STATUS |\ DDS_OFFERED_DEADLINE_MISSED_STATUS |\ @@ -64,121 +67,133 @@ dds_writer_status_validate( static void dds_writer_status_cb( - void *entity, + void *ventity, const status_cb_data_t *data) { - dds_writer *wr; - dds__retcode_t rc; - void *metrics = NULL; + struct dds_entity * const entity = ventity; + dds_writer *wr; + dds__retcode_t rc; + void *metrics = NULL; - /* When data is NULL, it means that the writer is deleted. */ - if (data == NULL) { - /* Release the initial claim that was done during the create. This - * will indicate that further API deletion is now possible. */ - ut_handle_release(((dds_entity*)entity)->m_hdl, ((dds_entity*)entity)->m_hdllink); - return; + /* When data is NULL, it means that the writer is deleted. */ + if (data == NULL) + { + /* Release the initial claim that was done during the create. This + * will indicate that further API deletion is now possible. */ + ut_handle_release (entity->m_hdl, entity->m_hdllink); + return; + } + + if (dds_writer_lock (entity->m_hdl, &wr) != DDS_RETCODE_OK) { + /* There's a deletion or closing going on. */ + return; + } + assert (&wr->m_entity == entity); + + /* Reset the status for possible Listener call. + * When a listener is not called, the status will be set (again). */ + dds_entity_status_reset (entity, data->status); + + /* Update status metrics. */ + switch (data->status) + { + case DDS_OFFERED_DEADLINE_MISSED_STATUS: { + struct dds_offered_deadline_missed_status * const st = &wr->m_offered_deadline_missed_status; + st->total_count++; + st->total_count_change++; + st->last_instance_handle = data->handle; + metrics = st; + break; } - - if (dds_writer_lock(((dds_entity*)entity)->m_hdl, &wr) != DDS_RETCODE_OK) { - /* There's a deletion or closing going on. */ - return; + case DDS_LIVELINESS_LOST_STATUS: { + struct dds_liveliness_lost_status * const st = &wr->m_liveliness_lost_status; + st->total_count++; + st->total_count_change++; + metrics = st; + break; } - assert(wr == entity); - - /* Reset the status for possible Listener call. - * When a listener is not called, the status will be set (again). */ - dds_entity_status_reset(entity, data->status); - - /* Update status metrics. */ - switch (data->status) { - case DDS_OFFERED_DEADLINE_MISSED_STATUS: { - wr->m_offered_deadline_missed_status.total_count++; - wr->m_offered_deadline_missed_status.total_count_change++; - wr->m_offered_deadline_missed_status.last_instance_handle = data->handle; - metrics = (void*)&(wr->m_offered_deadline_missed_status); - break; - } - case DDS_LIVELINESS_LOST_STATUS: { - wr->m_liveliness_lost_status.total_count++; - wr->m_liveliness_lost_status.total_count_change++; - metrics = (void*)&(wr->m_liveliness_lost_status); - break; - } - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: { - wr->m_offered_incompatible_qos_status.total_count++; - wr->m_offered_incompatible_qos_status.total_count_change++; - wr->m_offered_incompatible_qos_status.last_policy_id = data->extra; - metrics = (void*)&(wr->m_offered_incompatible_qos_status); - break; - } - case DDS_PUBLICATION_MATCHED_STATUS: { - if (data->add) { - wr->m_publication_matched_status.total_count++; - wr->m_publication_matched_status.total_count_change++; - wr->m_publication_matched_status.current_count++; - wr->m_publication_matched_status.current_count_change++; - } else { - wr->m_publication_matched_status.current_count--; - wr->m_publication_matched_status.current_count_change--; - } - wr->m_publication_matched_status.last_subscription_handle = data->handle; - metrics = (void*)&(wr->m_publication_matched_status); - break; - } - default: assert (0); + case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: { + struct dds_offered_incompatible_qos_status * const st = &wr->m_offered_incompatible_qos_status; + st->total_count++; + st->total_count_change++; + st->last_policy_id = data->extra; + metrics = st; + break; } - - /* The writer needs to be unlocked when propagating the (possible) listener - * call because the application should be able to call this writer within - * the callback function. */ - dds_writer_unlock(wr); - - /* Is anybody interested within the entity hierarchy through listeners? */ - rc = dds_entity_listener_propagation(entity, entity, data->status, metrics, true); - - if (rc == DDS_RETCODE_OK) { - /* Event was eaten by a listener. */ - if (dds_writer_lock(((dds_entity*)entity)->m_hdl, &wr) == DDS_RETCODE_OK) { - assert(wr == entity); - - /* Reset the status. */ - dds_entity_status_reset(entity, data->status); - - /* Reset the change counts of the metrics. */ - switch (data->status) { - case DDS_OFFERED_DEADLINE_MISSED_STATUS: { - wr->m_offered_deadline_missed_status.total_count_change = 0; - break; - } - case DDS_LIVELINESS_LOST_STATUS: { - wr->m_liveliness_lost_status.total_count_change = 0; - break; - } - case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: { - wr->m_offered_incompatible_qos_status.total_count_change = 0; - break; - } - case DDS_PUBLICATION_MATCHED_STATUS: { - wr->m_publication_matched_status.total_count_change = 0; - wr->m_publication_matched_status.current_count_change = 0; - break; - } - default: assert (0); - } - dds_writer_unlock(wr); - } else { - /* There's a deletion or closing going on. */ - } - } else if (rc == DDS_RETCODE_NO_DATA) { - /* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */ - dds_entity_status_set(entity, data->status); - /* Notify possible interested observers. */ - dds_entity_status_signal(entity); - } else if (rc == DDS_RETCODE_ALREADY_DELETED) { - /* An entity up the hierarchy is being deleted; consider it successful. */ - } else { - /* Something went wrong up the hierarchy. */ + case DDS_PUBLICATION_MATCHED_STATUS: { + struct dds_publication_matched_status * const st = &wr->m_publication_matched_status; + if (data->add) { + st->total_count++; + st->total_count_change++; + st->current_count++; + st->current_count_change++; + } else { + st->current_count--; + st->current_count_change--; + } + st->last_subscription_handle = data->handle; + metrics = st; + break; } + default: + assert (0); + } + + /* The writer needs to be unlocked when propagating the (possible) listener + * call because the application should be able to call this writer within + * the callback function. */ + dds_writer_unlock (wr); + + /* Is anybody interested within the entity hierarchy through listeners? */ + rc = dds_entity_listener_propagation (entity, entity, data->status, metrics, true); + + if (rc == DDS_RETCODE_OK) + { + /* Event was eaten by a listener. */ + if (dds_writer_lock (entity->m_hdl, &wr) == DDS_RETCODE_OK) + { + assert (&wr->m_entity == entity); + + /* Reset the status. */ + dds_entity_status_reset (entity, data->status); + + /* Reset the change counts of the metrics. */ + switch (data->status) + { + case DDS_OFFERED_DEADLINE_MISSED_STATUS: + wr->m_offered_deadline_missed_status.total_count_change = 0; + break; + case DDS_LIVELINESS_LOST_STATUS: + wr->m_liveliness_lost_status.total_count_change = 0; + break; + case DDS_OFFERED_INCOMPATIBLE_QOS_STATUS: + wr->m_offered_incompatible_qos_status.total_count_change = 0; + break; + case DDS_PUBLICATION_MATCHED_STATUS: + wr->m_publication_matched_status.total_count_change = 0; + wr->m_publication_matched_status.current_count_change = 0; + break; + default: + assert (0); + } + dds_writer_unlock (wr); + } + } + else if (rc == DDS_RETCODE_NO_DATA) + { + /* Nobody was interested through a listener (NO_DATA == NO_CALL): set the status; consider it successful. */ + dds_entity_status_set (entity, data->status); + /* Notify possible interested observers. */ + dds_entity_status_signal (entity); + } + else if (rc == DDS_RETCODE_ALREADY_DELETED) + { + /* An entity up the hierarchy is being deleted; consider it successful. */ + } + else + { + /* Something went wrong up the hierarchy. */ + } } static uint32_t @@ -412,7 +427,7 @@ dds_create_writer( dds_writer * wr; dds_entity_t writer; dds_entity * pub = NULL; - dds_entity * tp; + dds_topic * tp; dds_entity_t publisher; struct thread_state1 * const thr = lookup_thread_state(); const bool asleep = !vtime_awake_p(thr->vtime); @@ -420,7 +435,7 @@ dds_create_writer( dds_return_t ret; /* Try claiming a participant. If that's not working, then it could be a subscriber. */ - if(dds_entity_kind(participant_or_publisher) == DDS_KIND_PARTICIPANT){ + if(dds_entity_kind_from_handle(participant_or_publisher) == DDS_KIND_PARTICIPANT){ publisher = dds_create_publisher(participant_or_publisher, qos, NULL); } else{ publisher = participant_or_publisher; @@ -437,14 +452,14 @@ dds_create_writer( pub->m_flags |= DDS_ENTITY_IMPLICIT; } - rc = dds_entity_lock(topic, DDS_KIND_TOPIC, &tp); + rc = dds_topic_lock(topic, &tp); if (rc != DDS_RETCODE_OK) { DDS_ERROR("Error occurred on locking topic\n"); writer = DDS_ERRNO(rc); goto err_tp_lock; } - assert(((dds_topic*)tp)->m_stopic); - assert(pub->m_domain == tp->m_domain); + assert(tp->m_stopic); + assert(pub->m_domain == tp->m_entity.m_domain); /* Merge Topic & Publisher qos */ wqos = dds_create_qos(); @@ -458,9 +473,9 @@ dds_create_writer( dds_merge_qos(wqos, pub->m_qos); } - if (tp->m_qos) { + if (tp->m_entity.m_qos) { /* merge topic qos data to writer qos */ - dds_merge_qos(wqos, tp->m_qos); + dds_merge_qos(wqos, tp->m_entity.m_qos); } nn_xqos_mergein_missing(wqos, &gv.default_xqos_wr); @@ -475,8 +490,8 @@ dds_create_writer( wr = dds_alloc(sizeof (*wr)); writer = dds_entity_init(&wr->m_entity, pub, DDS_KIND_WRITER, wqos, listener, DDS_WRITER_STATUS_MASK); - wr->m_topic = (dds_topic*)tp; - dds_entity_add_ref_nolock(tp); + wr->m_topic = tp; + dds_entity_add_ref_nolock(&tp->m_entity); wr->m_xp = nn_xpack_new(conn, get_bandwidth_limit(wqos->transport_priority), config.xpack_send_async); wr->m_entity.m_deriver.close = dds_writer_close; wr->m_entity.m_deriver.delete = dds_writer_delete; @@ -491,25 +506,25 @@ dds_create_writer( assert(0); } - os_mutexUnlock(&tp->m_mutex); + os_mutexUnlock(&tp->m_entity.m_mutex); os_mutexUnlock(&pub->m_mutex); if (asleep) { thread_state_awake(thr); } - wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, ((dds_topic*)tp)->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr); + wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr); os_mutexLock(&pub->m_mutex); - os_mutexLock(&tp->m_mutex); + os_mutexLock(&tp->m_entity.m_mutex); assert(wr->m_wr); if (asleep) { thread_state_asleep(thr); } - dds_entity_unlock(tp); + dds_topic_unlock(tp); dds_entity_unlock(pub); return writer; err_bad_qos: - dds_entity_unlock(tp); + dds_topic_unlock(tp); err_tp_lock: dds_entity_unlock(pub); if((pub->m_flags & DDS_ENTITY_IMPLICIT) != 0){ @@ -559,10 +574,10 @@ dds_get_publication_matched_status ( if (status) { *status = wr->m_publication_matched_status; } - if (((dds_entity*)wr)->m_status_enable & DDS_PUBLICATION_MATCHED_STATUS) { + if (wr->m_entity.m_status_enable & DDS_PUBLICATION_MATCHED_STATUS) { wr->m_publication_matched_status.total_count_change = 0; wr->m_publication_matched_status.current_count_change = 0; - dds_entity_status_reset(wr, DDS_PUBLICATION_MATCHED_STATUS); + dds_entity_status_reset(&wr->m_entity, DDS_PUBLICATION_MATCHED_STATUS); } dds_writer_unlock(wr); fail: @@ -589,9 +604,9 @@ dds_get_liveliness_lost_status ( if (status) { *status = wr->m_liveliness_lost_status; } - if (((dds_entity*)wr)->m_status_enable & DDS_LIVELINESS_LOST_STATUS) { + if (wr->m_entity.m_status_enable & DDS_LIVELINESS_LOST_STATUS) { wr->m_liveliness_lost_status.total_count_change = 0; - dds_entity_status_reset(wr, DDS_LIVELINESS_LOST_STATUS); + dds_entity_status_reset(&wr->m_entity, DDS_LIVELINESS_LOST_STATUS); } dds_writer_unlock(wr); fail: @@ -618,9 +633,9 @@ dds_get_offered_deadline_missed_status( if (status) { *status = wr->m_offered_deadline_missed_status; } - if (((dds_entity*)wr)->m_status_enable & DDS_OFFERED_DEADLINE_MISSED_STATUS) { + if (wr->m_entity.m_status_enable & DDS_OFFERED_DEADLINE_MISSED_STATUS) { wr->m_offered_deadline_missed_status.total_count_change = 0; - dds_entity_status_reset(wr, DDS_OFFERED_DEADLINE_MISSED_STATUS); + dds_entity_status_reset(&wr->m_entity, DDS_OFFERED_DEADLINE_MISSED_STATUS); } dds_writer_unlock(wr); fail: @@ -647,9 +662,9 @@ dds_get_offered_incompatible_qos_status ( if (status) { *status = wr->m_offered_incompatible_qos_status; } - if (((dds_entity*)wr)->m_status_enable & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS) { + if (wr->m_entity.m_status_enable & DDS_OFFERED_INCOMPATIBLE_QOS_STATUS) { wr->m_offered_incompatible_qos_status.total_count_change = 0; - dds_entity_status_reset(wr, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS); + dds_entity_status_reset(&wr->m_entity, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS); } dds_writer_unlock(wr); fail: diff --git a/src/core/ddsi/CMakeLists.txt b/src/core/ddsi/CMakeLists.txt index 0ddb2c3..077a422 100644 --- a/src/core/ddsi/CMakeLists.txt +++ b/src/core/ddsi/CMakeLists.txt @@ -79,7 +79,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi" ddsi_tkmap.h probes-constants.h q_addrset.h - q_align.h q_bitset.h q_bswap.h q_config.h diff --git a/src/core/ddsi/include/ddsi/ddsi_tran.h b/src/core/ddsi/include/ddsi/ddsi_tran.h index 5f2fc89..e573591 100644 --- a/src/core/ddsi/include/ddsi/ddsi_tran.h +++ b/src/core/ddsi/include/ddsi/ddsi_tran.h @@ -178,27 +178,6 @@ struct ddsi_tran_qos int m_diffserv; }; -/* Functions and pseudo functions (macro wrappers) */ - -void ddsi_factory_conn_init (ddsi_tran_factory_t, ddsi_tran_conn_t); - -#define ddsi_tran_type(b) (((ddsi_tran_base_t) (b))->m_trantype) -#define ddsi_tran_port(b) (((ddsi_tran_base_t) (b))->m_port) -int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc); -void ddsi_tran_free (ddsi_tran_base_t base); -void ddsi_tran_free_qos (ddsi_tran_qos_t qos); -ddsi_tran_qos_t ddsi_tran_create_qos (void); -os_socket ddsi_tran_handle (ddsi_tran_base_t base); - -#define ddsi_factory_create_listener(f,p,q) (((f)->m_create_listener_fn) ((p), (q))) -#define ddsi_factory_supports(f,k) (((f)->m_supports_fn) (k)) - -ddsi_tran_conn_t ddsi_factory_create_conn -( - ddsi_tran_factory_t factory, - uint32_t port, - ddsi_tran_qos_t qos -); void ddsi_tran_factories_fini (void); void ddsi_factory_add (ddsi_tran_factory_t factory); void ddsi_factory_free (ddsi_tran_factory_t factory); @@ -206,24 +185,56 @@ ddsi_tran_factory_t ddsi_factory_find (const char * type); ddsi_tran_factory_t ddsi_factory_find_supported_kind (int32_t kind); void ddsi_factory_conn_init (ddsi_tran_factory_t factory, ddsi_tran_conn_t conn); -#define ddsi_conn_handle(c) (ddsi_tran_handle (&(c)->m_base)) -#define ddsi_conn_locator(c,l) (ddsi_tran_locator (&(c)->m_base,(l))) -OSAPI_EXPORT ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags); -ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc); +inline bool ddsi_factory_supports (ddsi_tran_factory_t factory, int32_t kind) { + return factory->m_supports_fn (kind); +} +inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos) { + return factory->m_create_conn_fn (port, qos); +} +inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, int port, ddsi_tran_qos_t qos) { + return factory->m_create_listener_fn (port, qos); +} + +void ddsi_tran_free (ddsi_tran_base_t base); +void ddsi_tran_free_qos (ddsi_tran_qos_t qos); +ddsi_tran_qos_t ddsi_tran_create_qos (void); +inline os_socket ddsi_tran_handle (ddsi_tran_base_t base) { + return base->m_handle_fn (base); +} +inline int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc) { + return base->m_locator_fn (base, loc); +} + +inline os_socket ddsi_conn_handle (ddsi_tran_conn_t conn) { + return conn->m_base.m_handle_fn (&conn->m_base); +} +inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn) { + return conn->m_base.m_trantype; +} +inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn) { + return conn->m_base.m_port; +} +inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc) { + return conn->m_base.m_locator_fn (&conn->m_base, loc); +} +inline ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags) { + return conn->m_closed ? -1 : (conn->m_write_fn) (conn, dst, niov, iov, flags); +} +inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc) { + return conn->m_closed ? -1 : conn->m_read_fn (conn, buf, len, srcloc); +} bool ddsi_conn_peer_locator (ddsi_tran_conn_t conn, nn_locator_t * loc); void ddsi_conn_disable_multiplexing (ddsi_tran_conn_t conn); void ddsi_conn_add_ref (ddsi_tran_conn_t conn); void ddsi_conn_free (ddsi_tran_conn_t conn); - int ddsi_conn_join_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcip, const nn_locator_t *mcip, const struct nn_interface *interf); int ddsi_conn_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcip, const nn_locator_t *mcip, const struct nn_interface *interf); - void ddsi_conn_transfer_group_membership (ddsi_tran_conn_t conn, ddsi_tran_conn_t newconn); int ddsi_conn_rejoin_transferred_mcgroups (ddsi_tran_conn_t conn); - int ddsi_is_mcaddr (const nn_locator_t *loc); int ddsi_is_ssm_mcaddr (const nn_locator_t *loc); enum ddsi_nearby_address_result ddsi_is_nearby_address (const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[]); + enum ddsi_locator_from_string_result ddsi_locator_from_string (nn_locator_t *loc, const char *str); /* 8 for transport/ @@ -242,9 +253,15 @@ char *ddsi_locator_to_string_no_port (char *dst, size_t sizeof_dst, const nn_loc int ddsi_enumerate_interfaces (ddsi_tran_factory_t factory, os_ifaddrs_t **interfs); -#define ddsi_listener_locator(s,l) (ddsi_tran_locator (&(s)->m_base,(l))) -ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener); -int ddsi_listener_listen (ddsi_tran_listener_t listener); +inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc) { + return listener->m_base.m_locator_fn (&listener->m_base, loc); +} +inline int ddsi_listener_listen (ddsi_tran_listener_t listener) { + return listener->m_listen_fn (listener); +} +inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener) { + return listener->m_accept_fn (listener); +} void ddsi_listener_unblock (ddsi_tran_listener_t listener); void ddsi_listener_free (ddsi_tran_listener_t listener); diff --git a/src/core/ddsi/include/ddsi/q_align.h b/src/core/ddsi/include/ddsi/q_align.h deleted file mode 100644 index 170936d..0000000 --- a/src/core/ddsi/include/ddsi/q_align.h +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v. 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License - * v. 1.0 which is available at - * http://www.eclipse.org/org/documents/edl-v10.php. - * - * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause - */ -#ifndef NN_ALIGN_H -#define NN_ALIGN_H - -#define ALIGN4(x) (((x) + 3) & -4u) -#define ALIGN8(x) (((x) + 7) & -8u) - -#endif /* NN_ALIGN_H */ diff --git a/src/core/ddsi/include/ddsi/q_bswap.h b/src/core/ddsi/include/ddsi/q_bswap.h index 33e62be..c3ae2a4 100644 --- a/src/core/ddsi/include/ddsi/q_bswap.h +++ b/src/core/ddsi/include/ddsi/q_bswap.h @@ -17,13 +17,14 @@ #include "ddsi/q_rtps.h" /* for nn_guid_t, nn_guid_prefix_t */ #include "ddsi/q_protocol.h" /* for nn_sequence_number_t */ -#define bswap2(x) ((int16_t) bswap2u ((uint16_t) (x))) -#define bswap4(x) ((int32_t) bswap4u ((uint32_t) (x))) -#define bswap8(x) ((int64_t) bswap8u ((uint64_t) (x))) - inline uint16_t bswap2u (uint16_t x) { - return (unsigned short) ((x >> 8) | (x << 8)); + return (uint16_t) ((x >> 8) | (x << 8)); +} + +inline int16_t bswap2 (int16_t x) +{ + return (int16_t) bswap2u ((uint16_t) x); } inline uint32_t bswap4u (uint32_t x) @@ -31,6 +32,11 @@ inline uint32_t bswap4u (uint32_t x) return (x >> 24) | ((x >> 8) & 0xff00) | ((x << 8) & 0xff0000) | (x << 24); } +inline int32_t bswap4 (int32_t x) +{ + return (int32_t) bswap4u ((uint32_t) x); +} + inline uint64_t bswap8u (uint64_t x) { const uint32_t newhi = bswap4u ((uint32_t) x); @@ -38,6 +44,11 @@ inline uint64_t bswap8u (uint64_t x) return ((uint64_t) newhi << 32) | (uint64_t) newlo; } +inline int64_t bswap8 (int64_t x) +{ + return (int64_t) bswap8u ((uint64_t) x); +} + inline void bswapSN (nn_sequence_number_t *sn) { sn->high = bswap4 (sn->high); diff --git a/src/core/ddsi/include/ddsi/q_entity.h b/src/core/ddsi/include/ddsi/q_entity.h index fbf99bd..64c3c87 100644 --- a/src/core/ddsi/include/ddsi/q_entity.h +++ b/src/core/ddsi/include/ddsi/q_entity.h @@ -51,7 +51,7 @@ typedef struct status_cb_data } status_cb_data_t; -typedef void (*status_cb_t) (void * entity, const status_cb_data_t * data); +typedef void (*status_cb_t) (void *entity, const status_cb_data_t *data); struct prd_wr_match { ut_avlNode_t avlnode; diff --git a/src/core/ddsi/src/ddsi_raweth.c b/src/core/ddsi/src/ddsi_raweth.c index b538872..e68f201 100644 --- a/src/core/ddsi/src/ddsi_raweth.c +++ b/src/core/ddsi/src/ddsi_raweth.c @@ -312,6 +312,22 @@ static int ddsi_raweth_is_mcaddr (const ddsi_tran_factory_t tran, const nn_locat return (loc->address[10] & 1); } +static int ddsi_raweth_is_ssm_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc) +{ + (void) tran; + (void) loc; + return 0; +} + +static enum ddsi_nearby_address_result ddsi_raweth_is_nearby_address (ddsi_tran_factory_t tran, const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[]) +{ + (void) tran; + (void) loc; + (void) ninterf; + (void) interf; + return DNAR_LOCAL; +} + static enum ddsi_locator_from_string_result ddsi_raweth_address_from_string (ddsi_tran_factory_t tran, nn_locator_t *loc, const char *str) { int i = 0; @@ -371,7 +387,8 @@ int ddsi_raweth_init (void) ddsi_raweth_factory_g.m_join_mc_fn = ddsi_raweth_join_mc; ddsi_raweth_factory_g.m_leave_mc_fn = ddsi_raweth_leave_mc; ddsi_raweth_factory_g.m_is_mcaddr_fn = ddsi_raweth_is_mcaddr; - ddsi_raweth_factory_g.m_is_nearby_address_fn = ddsi_ipaddr_is_nearby_address; + ddsi_raweth_factory_g.m_is_ssm_mcaddr_fn = ddsi_raweth_is_ssm_mcaddr; + ddsi_raweth_factory_g.m_is_nearby_address_fn = ddsi_raweth_is_nearby_address; ddsi_raweth_factory_g.m_locator_from_string_fn = ddsi_raweth_address_from_string; ddsi_raweth_factory_g.m_locator_to_string_fn = ddsi_raweth_to_string; ddsi_raweth_factory_g.m_enumerate_interfaces_fn = ddsi_raweth_enumerate_interfaces; diff --git a/src/core/ddsi/src/ddsi_tcp.c b/src/core/ddsi/src/ddsi_tcp.c index c2e54b4..b922578 100644 --- a/src/core/ddsi/src/ddsi_tcp.c +++ b/src/core/ddsi/src/ddsi_tcp.c @@ -1042,6 +1042,25 @@ static enum ddsi_locator_from_string_result ddsi_tcp_address_from_string (ddsi_t return ddsi_ipaddr_from_string(tran, loc, str, ddsi_tcp_factory_g.m_kind); } +static int ddsi_tcp_is_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc) +{ + (void) tran; + (void) loc; + return 0; +} + +static int ddsi_tcp_is_ssm_mcaddr (const ddsi_tran_factory_t tran, const nn_locator_t *loc) +{ + (void) tran; + (void) loc; + return 0; +} + +static enum ddsi_nearby_address_result ddsi_tcp_is_nearby_address (ddsi_tran_factory_t tran, const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[]) +{ + return ddsi_ipaddr_is_nearby_address(tran, loc, ninterf, interf); +} + int ddsi_tcp_init (void) { static bool init = false; @@ -1063,6 +1082,9 @@ int ddsi_tcp_init (void) ddsi_tcp_factory_g.m_locator_from_string_fn = ddsi_tcp_address_from_string; ddsi_tcp_factory_g.m_locator_to_string_fn = ddsi_ipaddr_to_string; ddsi_tcp_factory_g.m_enumerate_interfaces_fn = ddsi_eth_enumerate_interfaces; + ddsi_tcp_factory_g.m_is_mcaddr_fn = ddsi_tcp_is_mcaddr; + ddsi_tcp_factory_g.m_is_ssm_mcaddr_fn = ddsi_tcp_is_ssm_mcaddr; + ddsi_tcp_factory_g.m_is_nearby_address_fn = ddsi_tcp_is_nearby_address; ddsi_factory_add (&ddsi_tcp_factory_g); #if OS_SOCKET_HAS_IPV6 diff --git a/src/core/ddsi/src/ddsi_tran.c b/src/core/ddsi/src/ddsi_tran.c index 990abaf..85c3b8e 100644 --- a/src/core/ddsi/src/ddsi_tran.c +++ b/src/core/ddsi/src/ddsi_tran.c @@ -19,6 +19,21 @@ static ddsi_tran_factory_t ddsi_tran_factories = NULL; +extern inline uint32_t ddsi_conn_type (ddsi_tran_conn_t conn); +extern inline uint32_t ddsi_conn_port (ddsi_tran_conn_t conn); +extern inline ddsi_tran_listener_t ddsi_factory_create_listener (ddsi_tran_factory_t factory, int port, ddsi_tran_qos_t qos); +extern inline bool ddsi_factory_supports (ddsi_tran_factory_t factory, int32_t kind); +extern inline os_socket ddsi_conn_handle (ddsi_tran_conn_t conn); +extern inline int ddsi_conn_locator (ddsi_tran_conn_t conn, nn_locator_t * loc); +extern inline os_socket ddsi_tran_handle (ddsi_tran_base_t base); +extern inline ddsi_tran_conn_t ddsi_factory_create_conn (ddsi_tran_factory_t factory, uint32_t port, ddsi_tran_qos_t qos); +extern inline int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc); +extern inline int ddsi_listener_locator (ddsi_tran_listener_t listener, nn_locator_t * loc); +extern inline int ddsi_listener_listen (ddsi_tran_listener_t listener); +extern inline ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener); +extern inline ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc); +extern inline ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags); + void ddsi_factory_add (ddsi_tran_factory_t factory) { factory->m_factory = ddsi_tran_factories; @@ -144,32 +159,6 @@ void ddsi_factory_conn_init (ddsi_tran_factory_t factory, ddsi_tran_conn_t conn) conn->m_factory = factory; } -ssize_t ddsi_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, size_t len, nn_locator_t *srcloc) -{ - return (conn->m_closed) ? -1 : (conn->m_read_fn) (conn, buf, len, srcloc); -} - -ssize_t ddsi_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *dst, size_t niov, const os_iovec_t *iov, uint32_t flags) -{ - ssize_t ret = -1; - if (! conn->m_closed) - { - ret = (conn->m_write_fn) (conn, dst, niov, iov, flags); - } - - /* Check that write function is atomic (all or nothing) */ -#ifndef NDEBUG - { - size_t i, len; - for (i = 0, len = 0; i < niov; i++) { - len += iov[i].iov_len; - } - assert (ret == -1 || (size_t) ret == len); - } -#endif - return ret; -} - void ddsi_conn_disable_multiplexing (ddsi_tran_conn_t conn) { if (conn->m_disable_multiplexing_fn) { @@ -202,11 +191,6 @@ int ddsi_conn_leave_mc (ddsi_tran_conn_t conn, const nn_locator_t *srcloc, const return conn->m_factory->m_leave_mc_fn (conn, srcloc, mcloc, interf); } -os_socket ddsi_tran_handle (ddsi_tran_base_t base) -{ - return (base->m_handle_fn) (base); -} - ddsi_tran_qos_t ddsi_tran_create_qos (void) { ddsi_tran_qos_t qos; @@ -215,31 +199,6 @@ ddsi_tran_qos_t ddsi_tran_create_qos (void) return qos; } -ddsi_tran_conn_t ddsi_factory_create_conn -( - ddsi_tran_factory_t factory, - uint32_t port, - ddsi_tran_qos_t qos -) -{ - return factory->m_create_conn_fn (port, qos); -} - -int ddsi_tran_locator (ddsi_tran_base_t base, nn_locator_t * loc) -{ - return (base->m_locator_fn) (base, loc); -} - -int ddsi_listener_listen (ddsi_tran_listener_t listener) -{ - return (listener->m_listen_fn) (listener); -} - -ddsi_tran_conn_t ddsi_listener_accept (ddsi_tran_listener_t listener) -{ - return (listener->m_accept_fn) (listener); -} - void ddsi_tran_free (ddsi_tran_base_t base) { if (base) @@ -274,21 +233,20 @@ void ddsi_listener_free (ddsi_tran_listener_t listener) int ddsi_is_mcaddr (const nn_locator_t *loc) { - /* FIXME: should set m_is_mcaddr_fn to a function returning false if transport doesn't provide an implementation, and get rid of the test */ - ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind(loc->kind); - return tran && tran->m_is_mcaddr_fn ? tran->m_is_mcaddr_fn (tran, loc) : 0; + ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind (loc->kind); + return tran ? tran->m_is_mcaddr_fn (tran, loc) : 0; } int ddsi_is_ssm_mcaddr (const nn_locator_t *loc) { ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind(loc->kind); - return tran && tran->m_is_ssm_mcaddr_fn ? tran->m_is_ssm_mcaddr_fn (tran, loc) : 0; + return tran ? tran->m_is_ssm_mcaddr_fn (tran, loc) : 0; } enum ddsi_nearby_address_result ddsi_is_nearby_address (const nn_locator_t *loc, size_t ninterf, const struct nn_interface interf[]) { ddsi_tran_factory_t tran = ddsi_factory_find_supported_kind(loc->kind); - return tran->m_is_nearby_address_fn ? tran->m_is_nearby_address_fn (tran, loc, ninterf, interf) : DNAR_DISTANT; + return tran ? tran->m_is_nearby_address_fn (tran, loc, ninterf, interf) : DNAR_DISTANT; } enum ddsi_locator_from_string_result ddsi_locator_from_string (nn_locator_t *loc, const char *str) diff --git a/src/core/ddsi/src/q_bswap_inlines.c b/src/core/ddsi/src/q_bswap_inlines.c index ed2c5d0..1ac45d5 100644 --- a/src/core/ddsi/src/q_bswap_inlines.c +++ b/src/core/ddsi/src/q_bswap_inlines.c @@ -14,5 +14,8 @@ extern inline uint16_t bswap2u (uint16_t x); extern inline uint32_t bswap4u (uint32_t x); extern inline uint64_t bswap8u (uint64_t x); +extern inline int16_t bswap2 (int16_t x); +extern inline int32_t bswap4 (int32_t x); +extern inline int64_t bswap8 (int64_t x); extern inline void bswapSN (nn_sequence_number_t *sn); diff --git a/src/core/ddsi/src/q_init.c b/src/core/ddsi/src/q_init.c index c0758bd..3ba7d69 100644 --- a/src/core/ddsi/src/q_init.c +++ b/src/core/ddsi/src/q_init.c @@ -29,7 +29,6 @@ #include "ddsi/q_lat_estim.h" #include "ddsi/q_bitset.h" #include "ddsi/q_xevent.h" -#include "ddsi/q_align.h" #include "ddsi/q_addrset.h" #include "ddsi/q_ddsi_discovery.h" #include "ddsi/q_radmin.h" @@ -680,7 +679,7 @@ int create_multicast_sockets(void) gv.disc_conn_mc = disc; gv.data_conn_mc = data; DDS_TRACE("Multicast Ports: discovery %d data %d \n", - ddsi_tran_port (gv.disc_conn_mc), ddsi_tran_port (gv.data_conn_mc)); + ddsi_conn_port (gv.disc_conn_mc), ddsi_conn_port (gv.data_conn_mc)); return 1; err_data: @@ -1113,7 +1112,7 @@ int rtps_init (void) if (gv.m_factory->m_connless) { if (!(config.many_sockets_mode == MSM_NO_UNICAST && config.allowMulticast)) - DDS_TRACE("Unicast Ports: discovery %d data %d\n", ddsi_tran_port (gv.disc_conn_uc), ddsi_tran_port (gv.data_conn_uc)); + DDS_TRACE("Unicast Ports: discovery %d data %d\n", ddsi_conn_port (gv.disc_conn_uc), ddsi_conn_port (gv.data_conn_uc)); if (config.allowMulticast) { @@ -1128,11 +1127,11 @@ int rtps_init (void) /* Set multicast locators */ if (!is_unspec_locator(&gv.loc_spdp_mc)) - gv.loc_spdp_mc.port = ddsi_tran_port (gv.disc_conn_mc); + gv.loc_spdp_mc.port = ddsi_conn_port (gv.disc_conn_mc); if (!is_unspec_locator(&gv.loc_meta_mc)) - gv.loc_meta_mc.port = ddsi_tran_port (gv.disc_conn_mc); + gv.loc_meta_mc.port = ddsi_conn_port (gv.disc_conn_mc); if (!is_unspec_locator(&gv.loc_default_mc)) - gv.loc_default_mc.port = ddsi_tran_port (gv.data_conn_mc); + gv.loc_default_mc.port = ddsi_conn_port (gv.data_conn_mc); if (joinleave_spdp_defmcip (1) < 0) goto err_mc_conn; @@ -1167,7 +1166,7 @@ int rtps_init (void) /* Create shared transmit connection */ gv.tev_conn = gv.data_conn_uc; - DDS_TRACE("Timed event transmit port: %d\n", (int) ddsi_tran_port (gv.tev_conn)); + DDS_TRACE("Timed event transmit port: %d\n", (int) ddsi_conn_port (gv.tev_conn)); #ifdef DDSI_INCLUDE_NETWORK_CHANNELS { diff --git a/src/core/ddsi/src/q_plist.c b/src/core/ddsi/src/q_plist.c index 7594705..72e2da7 100644 --- a/src/core/ddsi/src/q_plist.c +++ b/src/core/ddsi/src/q_plist.c @@ -21,7 +21,6 @@ #include "ddsi/q_bswap.h" #include "ddsi/q_unused.h" -#include "ddsi/q_align.h" #include "ddsi/q_error.h" #include "ddsi/q_plist.h" #include "ddsi/q_time.h" diff --git a/src/core/ddsi/src/q_radmin.c b/src/core/ddsi/src/q_radmin.c index 99a1869..73b8cf0 100644 --- a/src/core/ddsi/src/q_radmin.c +++ b/src/core/ddsi/src/q_radmin.c @@ -32,7 +32,6 @@ #include "ddsi/q_config.h" #include "ddsi/q_log.h" -#include "ddsi/q_align.h" #include "ddsi/q_plist.h" #include "ddsi/q_unused.h" #include "ddsi/q_radmin.h" diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index a9a6200..542a408 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -30,7 +30,6 @@ #include "ddsi/q_lat_estim.h" #include "ddsi/q_bitset.h" #include "ddsi/q_xevent.h" -#include "ddsi/q_align.h" #include "ddsi/q_addrset.h" #include "ddsi/q_ddsi_discovery.h" #include "ddsi/q_radmin.h" diff --git a/src/core/ddsi/src/q_xmsg.c b/src/core/ddsi/src/q_xmsg.c index 9e5c232..5d1e95c 100644 --- a/src/core/ddsi/src/q_xmsg.c +++ b/src/core/ddsi/src/q_xmsg.c @@ -34,7 +34,6 @@ #include "ddsi/q_log.h" #include "ddsi/q_unused.h" #include "ddsi/q_xmsg.h" -#include "ddsi/q_align.h" #include "ddsi/q_config.h" #include "ddsi/q_entity.h" #include "ddsi/q_globals.h" @@ -1339,7 +1338,18 @@ static ssize_t nn_xpack_send1 (const nn_locator_t *loc, void * varg) #endif { if (!gv.mute) + { nbytes = ddsi_conn_write (xp->conn, loc, xp->niov, xp->iov, xp->call_flags); +#ifndef NDEBUG + { + size_t i, len; + for (i = 0, len = 0; i < xp->niov; i++) { + len += xp->iov[i].iov_len; + } + assert (nbytes == -1 || (size_t) nbytes == len); + } +#endif + } else { DDS_TRACE("(dropped)");