From 801def8bd563dc53b9d84516ff1eaa52c7f94682 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Thu, 21 Nov 2019 10:57:21 +0100 Subject: [PATCH] Invoke liveliness changed without holding pwr lock Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds_reader.c | 48 ++++- src/core/ddsi/include/dds/ddsi/q_entity.h | 25 ++- src/core/ddsi/src/ddsi_pmd.c | 5 +- src/core/ddsi/src/q_entity.c | 231 +++++++++++++++------- src/core/ddsi/src/q_lease.c | 21 +- src/core/ddsi/src/q_receive.c | 16 +- 6 files changed, 238 insertions(+), 108 deletions(-) diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index d754f10..be1c045 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -233,16 +233,46 @@ void dds_reader_status_cb (void *ventity, const status_cb_data_t *data) } case DDS_LIVELINESS_CHANGED_STATUS_ID: { struct dds_liveliness_changed_status * const st = vst = &rd->m_liveliness_changed_status; - if (data->add) { - st->alive_count++; - st->alive_count_change++; - if (st->not_alive_count > 0) { + DDSRT_STATIC_ASSERT ((uint32_t) LIVELINESS_CHANGED_ADD_ALIVE == 0 && + LIVELINESS_CHANGED_ADD_ALIVE < LIVELINESS_CHANGED_ADD_NOT_ALIVE && + LIVELINESS_CHANGED_ADD_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_NOT_ALIVE && + LIVELINESS_CHANGED_REMOVE_NOT_ALIVE < LIVELINESS_CHANGED_REMOVE_ALIVE && + LIVELINESS_CHANGED_REMOVE_ALIVE < LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE && + LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE < LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE && + LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE < LIVELINESS_CHANGED_TWITCH && + (uint32_t) LIVELINESS_CHANGED_TWITCH < UINT32_MAX); + assert (data->extra <= (uint32_t) LIVELINESS_CHANGED_TWITCH); + switch ((enum liveliness_changed_data_extra) data->extra) + { + case LIVELINESS_CHANGED_ADD_ALIVE: + st->alive_count++; + st->alive_count_change++; + break; + case LIVELINESS_CHANGED_ADD_NOT_ALIVE: + st->not_alive_count++; + st->not_alive_count_change++; + break; + case LIVELINESS_CHANGED_REMOVE_NOT_ALIVE: + break; + case LIVELINESS_CHANGED_REMOVE_ALIVE: + st->alive_count--; + st->not_alive_count++; + st->not_alive_count_change++; + break; + case LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE: + st->alive_count--; + st->not_alive_count++; + st->not_alive_count_change++; + break; + case LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE: st->not_alive_count--; - } - } else { - st->alive_count--; - st->not_alive_count++; - st->not_alive_count_change++; + st->alive_count++; + st->alive_count_change++; + break; + case LIVELINESS_CHANGED_TWITCH: + st->alive_count_change++; + st->not_alive_count_change++; + break; } st->last_publication_handle = data->handle; invoke = (lst->on_liveliness_changed != 0); diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 12e62dd..c899296 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -48,14 +48,25 @@ struct proxy_group; struct proxy_endpoint_common; typedef void (*ddsi2direct_directread_cb_t) (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, void *arg); +/* Liveliness changed is more complicated than just add/remove. Encode the event + in status_cb_data_t::extra and ignore status_cb_data_t::add */ +enum liveliness_changed_data_extra { + LIVELINESS_CHANGED_ADD_ALIVE, + LIVELINESS_CHANGED_ADD_NOT_ALIVE, + LIVELINESS_CHANGED_REMOVE_NOT_ALIVE, + LIVELINESS_CHANGED_REMOVE_ALIVE, + LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE, + LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE, + LIVELINESS_CHANGED_TWITCH +}; + typedef struct status_cb_data { int raw_status_id; uint32_t extra; uint64_t handle; bool add; -} -status_cb_data_t; +} status_cb_data_t; typedef void (*status_cb_t) (void *entity, const status_cb_data_t *data); @@ -67,6 +78,8 @@ struct prd_wr_match { struct rd_pwr_match { ddsrt_avl_node_t avlnode; ddsi_guid_t pwr_guid; + unsigned pwr_alive: 1; /* tracks pwr's alive state */ + uint32_t pwr_alive_vclock; /* used to ensure progress */ #ifdef DDSI_INCLUDE_SSM nn_locator_t ssm_mc_loc; nn_locator_t ssm_src_loc; @@ -374,6 +387,7 @@ struct proxy_writer { #ifdef DDSI_INCLUDE_SSM unsigned supports_ssm: 1; /* iff 1, this proxy writer supports SSM */ #endif + uint32_t alive_vclock; /* virtual clock counting transitions between alive/not-alive */ struct nn_defrag *defrag; /* defragmenter for this proxy writer; FIXME: perhaps shouldn't be for historical data */ struct nn_reorder *reorder; /* message reordering for this proxy writer, out-of-sync readers can have their own, see pwr_rd_match */ struct nn_dqueue *dqueue; /* delivery queue for asynchronous delivery (historical data is always delivered asynchronously) */ @@ -652,8 +666,9 @@ int delete_proxy_reader (struct q_globals *gv, const struct ddsi_guid *guid, nn_ void update_proxy_reader (struct proxy_reader *prd, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); void update_proxy_writer (struct proxy_writer *pwr, seqno_t seq, struct addrset *as, const struct dds_qos *xqos, nn_wctime_t timestamp); -int proxy_writer_set_alive (struct proxy_writer *pwr); -int proxy_writer_set_notalive (struct proxy_writer *pwr); +void proxy_writer_set_alive_may_unlock (struct proxy_writer *pwr, bool notify); +int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify); +void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify); int new_proxy_group (const struct ddsi_guid *guid, const char *name, const struct dds_qos *xqos, nn_wctime_t timestamp); void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, nn_wctime_t timestamp, int isimplicit); @@ -662,8 +677,6 @@ void delete_proxy_group (struct ephash *guid_hash, const struct ddsi_guid *guid, rebuild them all (which only makes sense after previously having emptied them all). */ void rebuild_or_clear_writer_addrsets(struct q_globals *gv, int rebuild); -void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch); - void local_reader_ary_setfastpath_ok (struct local_reader_ary *x, bool fastpath_ok); struct ddsi_writer_info; diff --git a/src/core/ddsi/src/ddsi_pmd.c b/src/core/ddsi/src/ddsi_pmd.c index f187abb..035a9bf 100644 --- a/src/core/ddsi/src/ddsi_pmd.c +++ b/src/core/ddsi/src/ddsi_pmd.c @@ -129,9 +129,8 @@ void handle_pmd_message (const struct receiver_state *rst, nn_wctime_t timestamp ppguid.entityid.u = NN_ENTITYID_PARTICIPANT; if ((proxypp = ephash_lookup_proxy_participant_guid (rst->gv->guid_hash, &ppguid)) == NULL) RSTTRACE (" PPunknown"); - - if (kind == PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE && - (l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL) + else if (kind == PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE && + (l = ddsrt_atomic_ldvoidp (&proxypp->minl_man)) != NULL) { /* Renew lease for entity with shortest manual-by-participant lease */ lease_renew (l, now_et ()); diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index b55606e..a199f63 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -64,6 +64,11 @@ struct deleted_participants_admin { int64_t delay; }; +struct proxy_writer_alive_state { + bool alive; + uint32_t vclock; +}; + static int compare_guid (const void *va, const void *vb); static void augment_wr_prd_match (void *vnode, const void *vleft, const void *vright); @@ -1431,6 +1436,19 @@ static void free_wr_rd_match (struct wr_rd_match *m) if (m) ddsrt_free (m); } +static void proxy_writer_get_alive_state_locked (struct proxy_writer *pwr, struct proxy_writer_alive_state *st) +{ + st->alive = pwr->alive; + st->vclock = pwr->alive_vclock; +} + +static void proxy_writer_get_alive_state (struct proxy_writer *pwr, struct proxy_writer_alive_state *st) +{ + ddsrt_mutex_lock (&pwr->e.lock); + proxy_writer_get_alive_state_locked (pwr, st); + ddsrt_mutex_unlock (&pwr->e.lock); +} + static void writer_drop_connection (const struct ddsi_guid *wr_guid, const struct proxy_reader *prd) { struct writer *wr; @@ -1488,14 +1506,63 @@ static void writer_drop_local_connection (const struct ddsi_guid *wr_guid, struc } } -void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, bool unmatch) +static void reader_update_notify_pwr_alive_state (struct reader *rd, const struct proxy_writer *pwr, const struct proxy_writer_alive_state *alive_state) +{ + struct rd_pwr_match *m; + bool notify = false; + int delta = 0; /* -1: alive -> not_alive; 0: unchanged; 1: not_alive -> alive */ + ddsrt_mutex_lock (&rd->e.lock); + if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL) + { + if ((int32_t) (alive_state->vclock - m->pwr_alive_vclock) > 0) + { + delta = (int) alive_state->alive - (int) m->pwr_alive; + notify = true; + m->pwr_alive = alive_state->alive; + m->pwr_alive_vclock = alive_state->vclock; + } + } + ddsrt_mutex_unlock (&rd->e.lock); + + if (delta < 0 && rd->rhc) + { + struct ddsi_writer_info wrinfo; + ddsi_make_writer_info (&wrinfo, &pwr->e, pwr->c.xqos); + ddsi_rhc_unregister_wr (rd->rhc, &wrinfo); + } + + /* Liveliness changed events can race each other and can, potentially, be delivered + in a different order. */ + if (notify && rd->status_cb) + { + status_cb_data_t data; + data.handle = pwr->e.iid; + if (delta == 0) + data.extra = (uint32_t) LIVELINESS_CHANGED_TWITCH; + else if (delta < 0) + data.extra = (uint32_t) LIVELINESS_CHANGED_ALIVE_TO_NOT_ALIVE; + else + data.extra = (uint32_t) LIVELINESS_CHANGED_NOT_ALIVE_TO_ALIVE; + data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); + } +} + +static void reader_update_notify_pwr_alive_state_guid (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr, const struct proxy_writer_alive_state *alive_state) +{ + struct reader *rd; + if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL) + reader_update_notify_pwr_alive_state (rd, pwr, alive_state); +} + +static void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy_writer *pwr) { struct reader *rd; if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rd_guid)) != NULL) { struct rd_pwr_match *m; ddsrt_mutex_lock (&rd->e.lock); - if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL && unmatch) + if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL) ddsrt_avl_delete (&rd_writers_treedef, &rd->writers, m); ddsrt_mutex_unlock (&rd->e.lock); if (m != NULL) @@ -1509,20 +1576,18 @@ void reader_drop_connection (const struct ddsi_guid *rd_guid, const struct proxy if (rd->status_cb) { status_cb_data_t data; - data.add = false; data.handle = pwr->e.iid; + data.add = false; + data.extra = (uint32_t) (m->pwr_alive ? LIVELINESS_CHANGED_REMOVE_ALIVE : LIVELINESS_CHANGED_REMOVE_NOT_ALIVE); + data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; (rd->status_cb) (rd->status_cb_entity, &data); - if (unmatch) - { - data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; - (rd->status_cb) (rd->status_cb_entity, &data); - } + data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); } } - if (unmatch) - free_rd_pwr_match (pwr->e.gv, m); + free_rd_pwr_match (pwr->e.gv, m); } } @@ -1548,9 +1613,9 @@ static void reader_drop_local_connection (const struct ddsi_guid *rd_guid, const if (rd->status_cb) { status_cb_data_t data; - - data.add = false; data.handle = wr->e.iid; + data.add = false; + data.extra = (uint32_t) LIVELINESS_CHANGED_REMOVE_ALIVE; data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; (rd->status_cb) (rd->status_cb_entity, &data); @@ -1789,12 +1854,14 @@ static void writer_add_local_connection (struct writer *wr, struct reader *rd) } } -static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count) +static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, nn_count_t *init_count, const struct proxy_writer_alive_state *alive_state) { struct rd_pwr_match *m = ddsrt_malloc (sizeof (*m)); ddsrt_avl_ipath_t path; m->pwr_guid = pwr->e.guid; + m->pwr_alive = alive_state->alive; + m->pwr_alive_vclock = alive_state->vclock; ddsrt_mutex_lock (&rd->e.lock); @@ -1848,9 +1915,14 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, if (rd->status_cb) { status_cb_data_t data; - data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; - data.add = true; data.handle = pwr->e.iid; + data.add = true; + data.extra = (uint32_t) (alive_state->alive ? LIVELINESS_CHANGED_ADD_ALIVE : LIVELINESS_CHANGED_ADD_NOT_ALIVE); + + data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); + + data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; (rd->status_cb) (rd->status_cb_entity, &data); } } @@ -1882,14 +1954,15 @@ static void reader_add_local_connection (struct reader *rd, struct writer *wr) if (rd->status_cb) { status_cb_data_t data; - data.add = true; data.handle = wr->e.iid; - - data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; - (rd->status_cb) (rd->status_cb_entity, &data); + data.add = true; + data.extra = (uint32_t) LIVELINESS_CHANGED_ADD_ALIVE; data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; (rd->status_cb) (rd->status_cb_entity, &data); + + data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); } } } @@ -2007,16 +2080,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader qxev_pwr_entityid (pwr, &rd->e.guid.prefix); ELOGDISC (pwr, "\n"); - - if (rd->status_cb) - { - status_cb_data_t data; - data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; - data.add = true; - data.handle = pwr->e.iid; - (rd->status_cb) (rd->status_cb_entity, &data); - } - return; already_matched: @@ -2188,6 +2251,7 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r const int isb1 = (is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE) != 0); dds_qos_policy_id_t reason; nn_count_t init_count; + struct proxy_writer_alive_state alive_state; if (isb0 != isb1) return; if (rd->e.onlylocal) @@ -2197,8 +2261,18 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r reader_qos_mismatch (rd, reason); return; } - reader_add_connection (rd, pwr, &init_count); + + /* Initialze the reader's tracking information for the writer liveliness state to something + sensible, but that may be outdated by the time the reader gets added to the writer's list + of matching readers. */ + proxy_writer_get_alive_state (pwr, &alive_state); + reader_add_connection (rd, pwr, &init_count, &alive_state); proxy_writer_add_connection (pwr, rd, tnow, init_count); + + /* Once everything is set up: update with the latest state, any updates to the alive state + happening in parallel will cause this to become a no-op. */ + proxy_writer_get_alive_state (pwr, &alive_state); + reader_update_notify_pwr_alive_state (rd, pwr, &alive_state); } static bool ignore_local_p (const ddsi_guid_t *guid1, const ddsi_guid_t *guid2, const struct dds_qos *xqos1, const struct dds_qos *xqos2) @@ -4274,6 +4348,7 @@ int new_proxy_writer (struct q_globals *gv, const struct ddsi_guid *ppguid, cons pwr->nackfragcount = 0; pwr->last_fragnum_reset = 0; pwr->alive = 1; + pwr->alive_vclock = 0; ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, 1); if (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)) { /* The DDSI built-in proxy writers always deliver @@ -4453,7 +4528,7 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq) { struct pwr_rd_match *m = ddsrt_avl_root_non_empty (&pwr_readers_treedef, &pwr->readers); ddsrt_avl_delete (&pwr_readers_treedef, &pwr->readers, m); - reader_drop_connection (&m->rd_guid, pwr, true); + reader_drop_connection (&m->rd_guid, pwr); update_reader_init_acknack_count (&pwr->e.gv->logconfig, pwr->e.gv->guid_hash, &m->rd_guid, m->count); free_pwr_rd_match (m); } @@ -4490,62 +4565,86 @@ int delete_proxy_writer (struct q_globals *gv, const struct ddsi_guid *guid, nn_ builtintopic_write (gv->builtin_topic_interface, &pwr->e, timestamp, false); ephash_remove_proxy_writer_guid (gv->guid_hash, pwr); ddsrt_mutex_unlock (&gv->lock); - if (proxy_writer_set_notalive (pwr) != DDS_RETCODE_OK) + if (proxy_writer_set_notalive (pwr, false) != DDS_RETCODE_OK) GVLOGDISC ("proxy_writer_set_notalive failed for "PGUIDFMT"\n", PGUID(*guid)); - /* Set lease expiry for this pwr to never so that the pwr will not be set - to alive again while it is scheduled for being deleted. */ - if (pwr->c.xqos->liveliness.lease_duration != T_NEVER) - lease_renew (pwr->lease, (nn_etime_t){ T_NEVER }); gcreq_proxy_writer (pwr); return DDS_RETCODE_OK; } -int proxy_writer_set_alive (struct proxy_writer *pwr) +static void proxy_writer_notify_liveliness_change_may_unlock (struct proxy_writer *pwr) { - /* Caller has pwr->e.lock, so we can safely read pwr->alive. For updating - * this field this function is also taking pwr->c.proxypp->e.lock */ - ddsrt_avl_iter_t it; - if (pwr->alive) - return DDS_RETCODE_PRECONDITION_NOT_MET; + struct proxy_writer_alive_state alive_state; + proxy_writer_get_alive_state_locked (pwr, &alive_state); + + struct ddsi_guid rdguid; + struct pwr_rd_match *m; + memset (&rdguid, 0, sizeof (rdguid)); + while (pwr->alive_vclock == alive_state.vclock && + (m = ddsrt_avl_lookup_succ (&pwr_readers_treedef, &pwr->readers, &rdguid)) != NULL) + { + rdguid = m->rd_guid; + ddsrt_mutex_unlock (&pwr->e.lock); + /* unlocking pwr means alive state may have changed already; we break out of the loop once we + detect this but there for the reader in the current iteration, anything is possible */ + reader_update_notify_pwr_alive_state_guid (&rdguid, pwr, &alive_state); + ddsrt_mutex_lock (&pwr->e.lock); + } +} + +void proxy_writer_set_alive_may_unlock (struct proxy_writer *pwr, bool notify) +{ + /* Caller has pwr->e.lock, so we can safely read pwr->alive. Updating pwr->alive requires + also taking pwr->c.proxypp->e.lock because pwr->alive <=> (pwr->lease in proxypp's lease + heap). */ + assert (!pwr->alive); + ddsrt_mutex_lock (&pwr->c.proxypp->e.lock); pwr->alive = true; - - for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) - { - struct reader *rd; - if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL) - { - status_cb_data_t data; - data.add = true; - data.handle = pwr->e.iid; - data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; - (rd->status_cb) (rd->status_cb_entity, &data); - } - } + pwr->alive_vclock++; if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) proxy_participant_add_pwr_lease_locked (pwr->c.proxypp, pwr); ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock); - return DDS_RETCODE_OK; + + if (notify) + proxy_writer_notify_liveliness_change_may_unlock (pwr); } -int proxy_writer_set_notalive (struct proxy_writer *pwr) +int proxy_writer_set_notalive (struct proxy_writer *pwr, bool notify) { /* Caller should not have taken pwr->e.lock and pwr->c.proxypp->e.lock; * this function takes both locks to update pwr->alive value */ - int ret = DDS_RETCODE_OK; ddsrt_mutex_lock (&pwr->e.lock); if (!pwr->alive) - ret = DDS_RETCODE_PRECONDITION_NOT_MET; + { + ddsrt_mutex_unlock (&pwr->e.lock); + return DDS_RETCODE_PRECONDITION_NOT_MET; + } + + ddsrt_mutex_lock (&pwr->c.proxypp->e.lock); + pwr->alive = false; + pwr->alive_vclock++; + if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) + proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr); + ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock); + + if (notify) + proxy_writer_notify_liveliness_change_may_unlock (pwr); + ddsrt_mutex_unlock (&pwr->e.lock); + return DDS_RETCODE_OK; +} + +void proxy_writer_set_notalive_guid (struct q_globals *gv, const struct ddsi_guid *pwrguid, bool notify) +{ + struct proxy_writer *pwr; + if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, pwrguid)) == NULL) + GVLOGDISC (" "PGUIDFMT"?\n", PGUID (*pwrguid)); else { - ddsrt_mutex_lock (&pwr->c.proxypp->e.lock); - pwr->alive = false; - if (pwr->c.xqos->liveliness.lease_duration != T_NEVER && pwr->c.xqos->liveliness.kind != DDS_LIVELINESS_MANUAL_BY_TOPIC) - proxy_participant_remove_pwr_lease_locked (pwr->c.proxypp, pwr); - ddsrt_mutex_unlock (&pwr->c.proxypp->e.lock); + GVLOGDISC ("proxy_writer_set_notalive_guid ("PGUIDFMT")", PGUID (*pwrguid)); + if (proxy_writer_set_notalive (pwr, notify) == DDS_RETCODE_PRECONDITION_NOT_MET) + GVLOGDISC (" pwr was not alive"); + GVLOGDISC ("\n"); } - ddsrt_mutex_unlock (&pwr->e.lock); - return ret; } /* PROXY-READER ----------------------------------------------------- */ diff --git a/src/core/ddsi/src/q_lease.c b/src/core/ddsi/src/q_lease.c index 0db530c..ccd0306 100644 --- a/src/core/ddsi/src/q_lease.c +++ b/src/core/ddsi/src/q_lease.c @@ -275,27 +275,8 @@ int64_t check_and_handle_lease_expiration (struct q_globals *gv, nn_etime_t tnow delete_proxy_participant_by_guid (gv, &g, now(), 1); break; case EK_PROXY_WRITER: - { - struct proxy_writer *pwr; - ddsrt_avl_iter_t it; - if ((pwr = ephash_lookup_proxy_writer_guid (gv->guid_hash, &g)) == NULL) - { - GVLOGDISC (" "PGUIDFMT"?\n", PGUID (g)); - ddsrt_mutex_lock (&gv->leaseheap_lock); - continue; - } - GVLOGDISC ("proxy_writer_set_notalive ("PGUIDFMT")", PGUID (g)); - if (proxy_writer_set_notalive (pwr) == DDS_RETCODE_PRECONDITION_NOT_MET) - { - GVLOGDISC (" pwr was not alive"); - ddsrt_mutex_lock (&gv->leaseheap_lock); - continue; - } - GVLOGDISC ("\n"); - for (struct pwr_rd_match *m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) - reader_drop_connection (&m->rd_guid, pwr, false); + proxy_writer_set_notalive_guid (gv, &g, true); break; - } case EK_PARTICIPANT: case EK_READER: case EK_WRITER: diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index b026022..b20dd06 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -2114,18 +2114,26 @@ static void handle_regular (struct receiver_state *rst, nn_etime_t tnow, struct return; } + /* Proxy participant's "automatic" lease has to be renewed always, manual-by-participant one only + for data published by the application. If pwr->lease exists, it is in some manual lease mode, + so check whether it is actually in manual-by-topic mode before renewing it. As pwr->lease is + set once (during entity creation) we can read it outside the lock, keeping all the lease + renewals together. */ lease_renew (ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_auto), tnow); if ((lease = ddsrt_atomic_ldvoidp (&pwr->c.proxypp->minl_man)) != NULL && renew_manbypp_lease) lease_renew (lease, tnow); + if (pwr->lease && pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC) + lease_renew (pwr->lease, tnow); /* Shouldn't lock the full writer, but will do so for now */ ddsrt_mutex_lock (&pwr->e.lock); - if (pwr->c.xqos->liveliness.kind == DDS_LIVELINESS_MANUAL_BY_TOPIC && pwr->c.xqos->liveliness.lease_duration != T_NEVER) - lease_renew (pwr->lease, tnow); - + /* A change in transition from not-alive to alive is relatively complicated + and may involve temporarily unlocking the proxy writer during the process + (to avoid unnecessarily holding pwr->e.lock while invoking listeners on + the reader) */ if (!pwr->alive) - proxy_writer_set_alive (pwr); + proxy_writer_set_alive_may_unlock (pwr, true); /* Don't accept data when reliable readers exist and we haven't yet seen a heartbeat telling us what the "current" sequence number of the writer