Do not update match counts on redundant unmatch
Dropping a match from a data reader generated unregisters and signalled a SUBSCRIPTION_MATCHED event even when another thread raced it and did so before. The consequence is possible undercounting of the number of matched writers. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
		
							parent
							
								
									87398bdc98
								
							
						
					
					
						commit
						8ae5f706d0
					
				
					 1 changed files with 60 additions and 56 deletions
				
			
		| 
						 | 
				
			
			@ -1454,27 +1454,29 @@ static void reader_drop_connection (const struct nn_guid *rd_guid, const struct
 | 
			
		|||
    if ((m = ddsrt_avl_lookup (&rd_writers_treedef, &rd->writers, &pwr->e.guid)) != NULL)
 | 
			
		||||
      ddsrt_avl_delete (&rd_writers_treedef, &rd->writers, m);
 | 
			
		||||
    ddsrt_mutex_unlock (&rd->e.lock);
 | 
			
		||||
    if (m != NULL)
 | 
			
		||||
    {
 | 
			
		||||
      if (rd->rhc)
 | 
			
		||||
      {
 | 
			
		||||
        struct proxy_writer_info pwr_info;
 | 
			
		||||
        make_proxy_writer_info (&pwr_info, &pwr->e, pwr->c.xqos);
 | 
			
		||||
        rhc_unregister_wr (rd->rhc, &pwr_info);
 | 
			
		||||
      }
 | 
			
		||||
      if (rd->status_cb)
 | 
			
		||||
      {
 | 
			
		||||
        status_cb_data_t data;
 | 
			
		||||
 | 
			
		||||
        data.add = false;
 | 
			
		||||
        data.handle = pwr->e.iid;
 | 
			
		||||
 | 
			
		||||
        data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
 | 
			
		||||
        (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
 | 
			
		||||
        data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
 | 
			
		||||
        (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    free_rd_pwr_match (pwr->e.gv, m);
 | 
			
		||||
 | 
			
		||||
    if (rd->rhc)
 | 
			
		||||
    {
 | 
			
		||||
      struct proxy_writer_info pwr_info;
 | 
			
		||||
      make_proxy_writer_info(&pwr_info, &pwr->e, pwr->c.xqos);
 | 
			
		||||
      rhc_unregister_wr (rd->rhc, &pwr_info);
 | 
			
		||||
    }
 | 
			
		||||
    if (rd->status_cb)
 | 
			
		||||
    {
 | 
			
		||||
      status_cb_data_t data;
 | 
			
		||||
 | 
			
		||||
      data.add = false;
 | 
			
		||||
      data.handle = pwr->e.iid;
 | 
			
		||||
 | 
			
		||||
      data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
 | 
			
		||||
      (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
 | 
			
		||||
      data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
 | 
			
		||||
      (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1488,28 +1490,30 @@ static void reader_drop_local_connection (const struct nn_guid *rd_guid, const s
 | 
			
		|||
    if ((m = ddsrt_avl_lookup (&rd_local_writers_treedef, &rd->local_writers, &wr->e.guid)) != NULL)
 | 
			
		||||
      ddsrt_avl_delete (&rd_local_writers_treedef, &rd->local_writers, m);
 | 
			
		||||
    ddsrt_mutex_unlock (&rd->e.lock);
 | 
			
		||||
    if (m != NULL)
 | 
			
		||||
    {
 | 
			
		||||
      if (rd->rhc)
 | 
			
		||||
      {
 | 
			
		||||
        /* FIXME: */
 | 
			
		||||
        struct proxy_writer_info pwr_info;
 | 
			
		||||
        make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
 | 
			
		||||
        rhc_unregister_wr (rd->rhc, &pwr_info);
 | 
			
		||||
      }
 | 
			
		||||
      if (rd->status_cb)
 | 
			
		||||
      {
 | 
			
		||||
        status_cb_data_t data;
 | 
			
		||||
 | 
			
		||||
        data.add = false;
 | 
			
		||||
        data.handle = wr->e.iid;
 | 
			
		||||
 | 
			
		||||
        data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
 | 
			
		||||
        (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
 | 
			
		||||
        data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
 | 
			
		||||
        (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    free_rd_wr_match (m);
 | 
			
		||||
 | 
			
		||||
    if (rd->rhc)
 | 
			
		||||
    {
 | 
			
		||||
      /* FIXME: */
 | 
			
		||||
      struct proxy_writer_info pwr_info;
 | 
			
		||||
      make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
 | 
			
		||||
      rhc_unregister_wr (rd->rhc, &pwr_info);
 | 
			
		||||
    }
 | 
			
		||||
    if (rd->status_cb)
 | 
			
		||||
    {
 | 
			
		||||
      status_cb_data_t data;
 | 
			
		||||
 | 
			
		||||
      data.add = false;
 | 
			
		||||
      data.handle = wr->e.iid;
 | 
			
		||||
 | 
			
		||||
      data.raw_status_id = (int) DDS_LIVELINESS_CHANGED_STATUS_ID;
 | 
			
		||||
      (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
 | 
			
		||||
      data.raw_status_id = (int) DDS_SUBSCRIPTION_MATCHED_STATUS_ID;
 | 
			
		||||
      (rd->status_cb) (rd->status_cb_entity, &data);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1773,24 +1777,24 @@ static void reader_add_connection (struct reader *rd, struct proxy_writer *pwr,
 | 
			
		|||
    ddsrt_mutex_unlock (&rd->e.lock);
 | 
			
		||||
 | 
			
		||||
#ifdef DDSI_INCLUDE_SSM
 | 
			
		||||
  if (rd->favours_ssm && pwr->supports_ssm)
 | 
			
		||||
  {
 | 
			
		||||
    /* pwr->supports_ssm is set if addrset_contains_ssm(pwr->ssm), so
 | 
			
		||||
    if (rd->favours_ssm && pwr->supports_ssm)
 | 
			
		||||
    {
 | 
			
		||||
      /* pwr->supports_ssm is set if addrset_contains_ssm(pwr->ssm), so
 | 
			
		||||
       any_ssm must succeed. */
 | 
			
		||||
    if (!addrset_any_uc (pwr->c.as, &m->ssm_src_loc))
 | 
			
		||||
      assert (0);
 | 
			
		||||
    if (!addrset_any_ssm (rd->e.gv, pwr->c.as, &m->ssm_mc_loc))
 | 
			
		||||
      assert (0);
 | 
			
		||||
    /* FIXME: for now, assume that the ports match for datasock_mc --
 | 
			
		||||
      if (!addrset_any_uc (pwr->c.as, &m->ssm_src_loc))
 | 
			
		||||
        assert (0);
 | 
			
		||||
      if (!addrset_any_ssm (rd->e.gv, pwr->c.as, &m->ssm_mc_loc))
 | 
			
		||||
        assert (0);
 | 
			
		||||
      /* FIXME: for now, assume that the ports match for datasock_mc --
 | 
			
		||||
       't would be better to dynamically create and destroy sockets on
 | 
			
		||||
       an as needed basis. */
 | 
			
		||||
    ddsi_join_mc (rd->e.gv, rd->e.gv->mship, rd->e.gv->data_conn_mc, &m->ssm_src_loc, &m->ssm_mc_loc);
 | 
			
		||||
  }
 | 
			
		||||
  else
 | 
			
		||||
  {
 | 
			
		||||
    set_unspec_locator (&m->ssm_src_loc);
 | 
			
		||||
    set_unspec_locator (&m->ssm_mc_loc);
 | 
			
		||||
  }
 | 
			
		||||
      ddsi_join_mc (rd->e.gv, rd->e.gv->mship, rd->e.gv->data_conn_mc, &m->ssm_src_loc, &m->ssm_mc_loc);
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      set_unspec_locator (&m->ssm_src_loc);
 | 
			
		||||
      set_unspec_locator (&m->ssm_mc_loc);
 | 
			
		||||
    }
 | 
			
		||||
#endif
 | 
			
		||||
 | 
			
		||||
    if (rd->status_cb)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue