diff --git a/src/core/ddsc/src/dds__entity.h b/src/core/ddsc/src/dds__entity.h index 2928438..f7aae40 100644 --- a/src/core/ddsc/src/dds__entity.h +++ b/src/core/ddsc/src/dds__entity.h @@ -123,8 +123,9 @@ DDS_EXPORT dds_return_t dds_entity_observer_register( dds_entity *observed, dds_entity *observer, - dds_entity_callback cb, - dds_entity_delete_callback delete_cb); + dds_entity_callback_t cb, + dds_entity_attach_callback_t attach_cb, void *attach_arg, + dds_entity_delete_callback_t delete_cb); DDS_EXPORT dds_return_t dds_entity_observer_unregister( diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index 6face9c..4efbc13 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -98,19 +98,20 @@ struct dds_entity; typedef struct dds_entity_deriver { /* Close can be used to terminate (blocking) actions on a entity before actually deleting it. */ - dds_return_t (*close)(struct dds_entity *e) ddsrt_nonnull_all; + dds_return_t (*close) (struct dds_entity *e) ddsrt_nonnull_all; /* Delete is used to actually free the entity. */ - dds_return_t (*delete)(struct dds_entity *e) ddsrt_nonnull_all; - dds_return_t (*set_qos)(struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; - dds_return_t (*validate_status)(uint32_t mask); + dds_return_t (*delete) (struct dds_entity *e) ddsrt_nonnull_all; + dds_return_t (*set_qos) (struct dds_entity *e, const dds_qos_t *qos, bool enabled) ddsrt_nonnull_all; + dds_return_t (*validate_status) (uint32_t mask); } dds_entity_deriver; -typedef void (*dds_entity_callback)(struct dds_entity *observer, dds_entity_t observed, uint32_t status); -typedef void (*dds_entity_delete_callback)(struct dds_entity *observer, dds_entity_t observed); +typedef void (*dds_entity_callback_t) (struct dds_entity *observer, dds_entity_t observed, uint32_t status); +typedef void (*dds_entity_attach_callback_t) (struct dds_entity *observer, struct dds_entity *observed, void *attach_arg); +typedef void (*dds_entity_delete_callback_t) (struct dds_entity *observer, dds_entity_t observed); typedef struct dds_entity_observer { - dds_entity_callback m_cb; - dds_entity_delete_callback m_delete_cb; + dds_entity_callback_t m_cb; + dds_entity_delete_callback_t m_delete_cb; struct dds_entity *m_observer; struct dds_entity_observer *m_next; } dds_entity_observer; diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index 06c66d4..73e266b 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -1143,7 +1143,7 @@ static bool in_observer_list_p (const struct dds_entity *observed, const dds_ent return false; } -dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *observer, dds_entity_callback cb, dds_entity_delete_callback delete_cb) +dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *observer, dds_entity_callback_t cb, dds_entity_attach_callback_t attach_cb, void *attach_arg, dds_entity_delete_callback_t delete_cb) { dds_return_t rc; assert (observed); @@ -1158,6 +1158,7 @@ dds_return_t dds_entity_observer_register (dds_entity *observed, dds_entity *obs o->m_observer = observer; o->m_next = observed->m_observers; observed->m_observers = o; + attach_cb (observer, observed, attach_arg); rc = DDS_RETCODE_OK; } ddsrt_mutex_unlock (&observed->m_observers_lock); @@ -1185,6 +1186,7 @@ dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_entity *o observed->m_observers = idx->m_next; else prev->m_next = idx->m_next; + idx->m_delete_cb (idx->m_observer, observed->m_hdllink.hdl); ddsrt_free (idx); rc = DDS_RETCODE_OK; } diff --git a/src/core/ddsc/src/dds_waitset.c b/src/core/ddsc/src/dds_waitset.c index d942414..8beb214 100644 --- a/src/core/ddsc/src/dds_waitset.c +++ b/src/core/ddsc/src/dds_waitset.c @@ -1,5 +1,5 @@ /* - * Copyright(c) 2006 to 2018 ADLINK Technology Limited and others + * Copyright(c) 2006 to 2019 ADLINK Technology Limited and others * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v. 2.0 which is available at @@ -93,8 +93,12 @@ static dds_return_t dds_waitset_close (struct dds_entity *e) { /* deep in the process of deleting the entity, so this is the only thread */ dds_waitset *ws = (dds_waitset *) e; - for (size_t i = 0; i < ws->nentities; i++) - (void) dds_entity_observer_unregister (ws->entities[i].entity, &ws->m_entity); + while (ws->nentities > 0) + { + dds_return_t rc = dds_entity_observer_unregister (ws->entities[0].entity, &ws->m_entity); + assert (rc == DDS_RETCODE_OK); + (void) rc; + } return DDS_RETCODE_OK; } @@ -175,27 +179,6 @@ dds_return_t dds_waitset_get_entities (dds_entity_t waitset, dds_entity_t *entit return ret; } -static void dds_waitset_remove (dds_waitset *ws, dds_entity_t observed) -{ - size_t i; - for (i = 0; i < ws->nentities; i++) - if (ws->entities[i].handle == observed) - break; - if (i < ws->nentities) - { - if (i < ws->ntriggered) - { - ws->entities[i] = ws->entities[--ws->ntriggered]; - ws->entities[ws->ntriggered] = ws->entities[--ws->nentities]; - } - else - { - ws->entities[i] = ws->entities[--ws->nentities]; - } - return; - } -} - /* This is called when the observed entity signals a status change. */ static void dds_waitset_observer (dds_entity *ent, dds_entity_t observed, uint32_t status) { @@ -220,53 +203,22 @@ static void dds_waitset_observer (dds_entity *ent, dds_entity_t observed, uint32 ddsrt_mutex_unlock (&ws->m_entity.m_mutex); } -static void dds_waitset_delete_observer (dds_entity *ent, dds_entity_t observed) +struct dds_waitset_attach_observer_arg { + dds_attach_t x; +}; + +static void dds_waitset_attach_observer (struct dds_entity *observer, struct dds_entity *observed, void *varg) { - assert (dds_entity_kind (ent) == DDS_KIND_WAITSET); - dds_waitset *ws = (dds_waitset *) ent; + assert (dds_entity_kind (observer) == DDS_KIND_WAITSET); + dds_waitset *ws = (dds_waitset *) observer; + struct dds_waitset_attach_observer_arg *arg = varg; ddsrt_mutex_lock (&ws->m_entity.m_mutex); - /* Remove this observed entity, which is being deleted, from the waitset. */ - dds_waitset_remove (ws, observed); - /* Our registration to this observed entity will be removed automatically. */ - /* Trigger waitset to wake up. */ - ddsrt_cond_broadcast (&ws->m_entity.m_cond); - ddsrt_mutex_unlock (&ws->m_entity.m_mutex); -} - -dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_attach_t x) -{ - dds_entity *e; - dds_waitset *ws; - dds_return_t ret; - - if ((ret = dds_waitset_lock (waitset, &ws)) < 0) - return ret; - - if (waitset == entity) - e = &ws->m_entity; - else if ((ret = dds_entity_pin (entity, &e)) < 0) - goto err_waitset; - - /* Entity must be "in scope": within the participant, domain or (self-evidently true) Cyclone DDS, - depending on the parent of the waitset, so that one can't use a waitset created in participant - A to wait for entities in participant B, &c. While there is no technical obstacle (though - there might be one for cross-domain use one day), it seems rather unhygienic practice. */ - if (!dds_entity_in_scope (e, ws->m_entity.m_parent)) - { - ret = DDS_RETCODE_BAD_PARAMETER; - goto err_entity; - } - - /* This will fail if given entity is already attached (or deleted). */ - if ((ret = dds_entity_observer_register (e, &ws->m_entity, dds_waitset_observer, dds_waitset_delete_observer)) != DDS_RETCODE_OK) - goto err_entity; - ws->entities = ddsrt_realloc (ws->entities, (ws->nentities + 1) * sizeof (*ws->entities)); - ws->entities[ws->nentities].arg = x; - ws->entities[ws->nentities].entity = e; - ws->entities[ws->nentities].handle = e->m_hdllink.hdl; + ws->entities[ws->nentities].arg = arg->x; + ws->entities[ws->nentities].entity = observed; + ws->entities[ws->nentities].handle = observed->m_hdllink.hdl; ws->nentities++; - if (is_triggered (e)) + if (is_triggered (observed)) { const size_t i = ws->nentities - 1; dds_attachment tmp = ws->entities[i]; @@ -274,46 +226,112 @@ dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_ ws->entities[ws->ntriggered++] = tmp; } ddsrt_cond_broadcast (&ws->m_entity.m_cond); + ddsrt_mutex_unlock (&ws->m_entity.m_mutex); +} -err_entity: - if (e != &ws->m_entity) - dds_entity_unpin (e); -err_waitset: - dds_waitset_unlock (ws); - return ret; +static void dds_waitset_delete_observer (dds_entity *ent, dds_entity_t observed) +{ + assert (dds_entity_kind (ent) == DDS_KIND_WAITSET); + dds_waitset *ws = (dds_waitset *) ent; + size_t i; + ddsrt_mutex_lock (&ws->m_entity.m_mutex); + for (i = 0; i < ws->nentities; i++) + if (ws->entities[i].handle == observed) + break; + if (i < ws->nentities) + { + if (i < ws->ntriggered) + { + ws->entities[i] = ws->entities[--ws->ntriggered]; + ws->entities[ws->ntriggered] = ws->entities[--ws->nentities]; + } + else + { + ws->entities[i] = ws->entities[--ws->nentities]; + } + } + ddsrt_cond_broadcast (&ws->m_entity.m_cond); + ddsrt_mutex_unlock (&ws->m_entity.m_mutex); +} + +dds_return_t dds_waitset_attach (dds_entity_t waitset, dds_entity_t entity, dds_attach_t x) +{ + dds_entity *wsent; + dds_entity *e; + dds_return_t ret; + + if ((ret = dds_entity_pin (waitset, &wsent)) < 0) + return ret; + else if (dds_entity_kind (wsent) != DDS_KIND_WAITSET) + { + dds_entity_unpin (wsent); + return DDS_RETCODE_ILLEGAL_OPERATION; + } + else + { + dds_waitset *ws = (dds_waitset *) wsent; + + if (waitset == entity) + e = &ws->m_entity; + else if ((ret = dds_entity_pin (entity, &e)) < 0) + goto err_entity; + + /* Entity must be "in scope": within the participant, domain or (self-evidently true) Cyclone DDS, + depending on the parent of the waitset, so that one can't use a waitset created in participant + A to wait for entities in participant B, &c. While there is no technical obstacle (though + there might be one for cross-domain use one day), it seems rather unhygienic practice. */ + if (!dds_entity_in_scope (e, ws->m_entity.m_parent)) + { + ret = DDS_RETCODE_BAD_PARAMETER; + goto err_scope; + } + + /* This will fail if given entity is already attached (or deleted). */ + struct dds_waitset_attach_observer_arg attach_arg = { .x = x }; + if ((ret = dds_entity_observer_register (e, &ws->m_entity, dds_waitset_observer, dds_waitset_attach_observer, &attach_arg, dds_waitset_delete_observer)) != DDS_RETCODE_OK) + goto err_entity; + + err_scope: + if (e != &ws->m_entity) + dds_entity_unpin (e); + err_entity: + dds_entity_unpin (&ws->m_entity); + return ret; + } } dds_return_t dds_waitset_detach (dds_entity_t waitset, dds_entity_t entity) { - dds_waitset *ws; - dds_entity *e; + dds_entity *wsent; dds_return_t ret; - if ((ret = dds_waitset_lock (waitset, &ws)) != DDS_RETCODE_OK) + if ((ret = dds_entity_pin (waitset, &wsent)) != DDS_RETCODE_OK) return ret; - - /* Possibly fails when entity was not attached. */ - if (waitset == entity) - ret = dds_entity_observer_unregister (&ws->m_entity, &ws->m_entity); - else if ((ret = dds_entity_pin (entity, &e)) < 0) - ; /* entity invalid */ - else + else if (dds_entity_kind (wsent) != DDS_KIND_WAITSET) { - ret = dds_entity_observer_unregister (e, &ws->m_entity); - dds_entity_unpin (e); - } - - if (ret == DDS_RETCODE_OK) - { - dds_waitset_remove (ws, entity); + dds_entity_unpin (wsent); + return DDS_RETCODE_ILLEGAL_OPERATION; } else { - if (ret != DDS_RETCODE_PRECONDITION_NOT_MET) + dds_waitset *ws = (dds_waitset *) wsent; + dds_entity *e; + /* Possibly fails when entity was not attached. */ + if (waitset == entity) + ret = dds_entity_observer_unregister (&ws->m_entity, &ws->m_entity); + else if ((ret = dds_entity_pin (entity, &e)) < 0) + ; /* entity invalid */ + else + { + ret = dds_entity_observer_unregister (e, &ws->m_entity); + dds_entity_unpin (e); + } + + dds_entity_unpin (&ws->m_entity); + if (ret != DDS_RETCODE_OK && ret != DDS_RETCODE_PRECONDITION_NOT_MET) ret = DDS_RETCODE_BAD_PARAMETER; + return ret; } - dds_waitset_unlock (ws); - return ret; } dds_return_t dds_waitset_wait_until (dds_entity_t waitset, dds_attach_t *xs, size_t nxs, dds_time_t abstimeout)