From 8ae5f706d01873d1d5954695979b325679017612 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Sun, 25 Aug 2019 09:24:43 +0200 Subject: [PATCH] Do not update match counts on redundant unmatch Dropping a match from a data reader generated unregisters and signalled a SUBSCRIPTION_MATCHED event even when another thread raced it and did so before. The consequence is possible undercounting of the number of matched writers. Signed-off-by: Erik Boasson --- src/core/ddsi/src/q_entity.c | 116 ++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 56 deletions(-) diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 1695eab..0cacf73 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -1454,27 +1454,29 @@ static void reader_drop_connection (const struct nn_guid *rd_guid, const struct if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL) ddsrt_avl_delete (&rd_writers_treedef, &rd->writers, m); ddsrt_mutex_unlock (&rd->e.lock); + if (m != NULL) + { + if (rd->rhc) + { + struct proxy_writer_info pwr_info; + make_proxy_writer_info (&pwr_info, &pwr->e, pwr->c.xqos); + rhc_unregister_wr (rd->rhc, &pwr_info); + } + if (rd->status_cb) + { + status_cb_data_t data; + + data.add = false; + data.handle = pwr->e.iid; + + data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); + + data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); + } + } free_rd_pwr_match (pwr->e.gv, m); - - if (rd->rhc) - { - struct proxy_writer_info pwr_info; - make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos); - rhc_unregister_wr (rd->rhc, &pwr_info); - } - if (rd->status_cb) - { - status_cb_data_t data; - - data.add = false; - data.handle = pwr->e.iid; - - data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; - (rd->status_cb) (rd->status_cb_entity, &data); - - data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; - (rd->status_cb) (rd->status_cb_entity, &data); - } } } @@ -1488,28 +1490,30 @@ static void reader_drop_local_connection (const struct nn_guid *rd_guid, const s if ((m = ddsrt_avl_lookup (&rd_local_writers_treedef, &rd->local_writers, &wr->e.guid)) != NULL) ddsrt_avl_delete (&rd_local_writers_treedef, &rd->local_writers, m); ddsrt_mutex_unlock (&rd->e.lock); + if (m != NULL) + { + if (rd->rhc) + { + /* FIXME: */ + struct proxy_writer_info pwr_info; + make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); + rhc_unregister_wr (rd->rhc, &pwr_info); + } + if (rd->status_cb) + { + status_cb_data_t data; + + data.add = false; + data.handle = wr->e.iid; + + data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); + + data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; + (rd->status_cb) (rd->status_cb_entity, &data); + } + } free_rd_wr_match (m); - - if (rd->rhc) - { - /* FIXME: */ - struct proxy_writer_info pwr_info; - make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos); - rhc_unregister_wr (rd->rhc, &pwr_info); - } - if (rd->status_cb) - { - status_cb_data_t data; - - data.add = false; - data.handle = wr->e.iid; - - data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID; - (rd->status_cb) (rd->status_cb_entity, &data); - - data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID; - (rd->status_cb) (rd->status_cb_entity, &data); - } } } @@ -1773,24 +1777,24 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr, ddsrt_mutex_unlock (&rd->e.lock); #ifdef DDSI_INCLUDE_SSM - if (rd->favours_ssm && pwr->supports_ssm) - { - /* pwr->supports_ssm is set if addrset_contains_ssm(pwr->ssm), so + if (rd->favours_ssm && pwr->supports_ssm) + { + /* pwr->supports_ssm is set if addrset_contains_ssm(pwr->ssm), so any_ssm must succeed. */ - if (!addrset_any_uc (pwr->c.as, &m->ssm_src_loc)) - assert (0); - if (!addrset_any_ssm (rd->e.gv, pwr->c.as, &m->ssm_mc_loc)) - assert (0); - /* FIXME: for now, assume that the ports match for datasock_mc -- + if (!addrset_any_uc (pwr->c.as, &m->ssm_src_loc)) + assert (0); + if (!addrset_any_ssm (rd->e.gv, pwr->c.as, &m->ssm_mc_loc)) + assert (0); + /* FIXME: for now, assume that the ports match for datasock_mc -- 't would be better to dynamically create and destroy sockets on an as needed basis. */ - ddsi_join_mc (rd->e.gv, rd->e.gv->mship, rd->e.gv->data_conn_mc, &m->ssm_src_loc, &m->ssm_mc_loc); - } - else - { - set_unspec_locator (&m->ssm_src_loc); - set_unspec_locator (&m->ssm_mc_loc); - } + ddsi_join_mc (rd->e.gv, rd->e.gv->mship, rd->e.gv->data_conn_mc, &m->ssm_src_loc, &m->ssm_mc_loc); + } + else + { + set_unspec_locator (&m->ssm_src_loc); + set_unspec_locator (&m->ssm_mc_loc); + } #endif if (rd->status_cb)