publish built-in topics prior to matching

The built-in topics for readers and writers should be published before a
subscription or publication matched listener is invoked, otherwise the
instance handle provided to the listener is not yet available in a
reader for the corresponding topic.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-04-15 11:43:39 +02:00 committed by eboasson
parent 4778d6c5df
commit 7fb9ef2ab0

View file

@ -2832,7 +2832,10 @@ static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_gu
the order if hash insert & matching creates a window during which the order if hash insert & matching creates a window during which
neither of two endpoints being created in parallel can discover neither of two endpoints being created in parallel can discover
the other. */ the other. */
ddsrt_mutex_lock (&wr->e.lock);
ephash_insert_writer_guid (wr); ephash_insert_writer_guid (wr);
ddsi_plugin.builtintopic_write (&wr->e, now(), true);
ddsrt_mutex_unlock (&wr->e.lock);
/* once it exists, match it with proxy writers and broadcast /* once it exists, match it with proxy writers and broadcast
existence (I don't think it matters much what the order of these existence (I don't think it matters much what the order of these
@ -2842,7 +2845,6 @@ static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_gu
deleted while we do so */ deleted while we do so */
match_writer_with_proxy_readers (wr, tnow); match_writer_with_proxy_readers (wr, tnow);
match_writer_with_local_readers (wr, tnow); match_writer_with_local_readers (wr, tnow);
ddsi_plugin.builtintopic_write (&wr->e, now(), true);
sedp_write_writer (wr); sedp_write_writer (wr);
if (wr->lease_duration != T_NEVER) if (wr->lease_duration != T_NEVER)
@ -2890,8 +2892,8 @@ struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, str
memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid)); memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid));
new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL); new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL);
ephash_insert_writer_guid (wr); ephash_insert_writer_guid (wr);
match_writer_with_local_readers (wr, tnow);
ddsi_plugin.builtintopic_write (&wr->e, now(), true); ddsi_plugin.builtintopic_write (&wr->e, now(), true);
match_writer_with_local_readers (wr, tnow);
return lowr; return lowr;
} }
@ -3325,10 +3327,13 @@ static dds_retcode_t new_reader_guid
ut_avlInit (&rd_writers_treedef, &rd->writers); ut_avlInit (&rd_writers_treedef, &rd->writers);
ut_avlInit (&rd_local_writers_treedef, &rd->local_writers); ut_avlInit (&rd_local_writers_treedef, &rd->local_writers);
ddsrt_mutex_lock (&rd->e.lock);
ephash_insert_reader_guid (rd); ephash_insert_reader_guid (rd);
ddsi_plugin.builtintopic_write (&rd->e, now(), true);
ddsrt_mutex_unlock (&rd->e.lock);
match_reader_with_proxy_writers (rd, tnow); match_reader_with_proxy_writers (rd, tnow);
match_reader_with_local_writers (rd, tnow); match_reader_with_local_writers (rd, tnow);
ddsi_plugin.builtintopic_write (&rd->e, now(), true);
sedp_write_reader (rd); sedp_write_reader (rd);
return DDS_RETCODE_OK; return DDS_RETCODE_OK;
} }
@ -4170,9 +4175,14 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
pwr->ddsi2direct_cbarg = 0; pwr->ddsi2direct_cbarg = 0;
local_reader_ary_init (&pwr->rdary); local_reader_ary_init (&pwr->rdary);
/* locking the entity prevents matching while the built-in topic hasn't been published yet */
ddsrt_mutex_lock (&pwr->e.lock);
ephash_insert_proxy_writer_guid (pwr); ephash_insert_proxy_writer_guid (pwr);
match_proxy_writer_with_readers (pwr, tnow);
ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true); ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true);
ddsrt_mutex_unlock (&pwr->e.lock);
match_proxy_writer_with_readers (pwr, tnow);
ddsrt_mutex_lock (&pwr->e.lock); ddsrt_mutex_lock (&pwr->e.lock);
pwr->local_matching_inprogress = 0; pwr->local_matching_inprogress = 0;
@ -4344,9 +4354,14 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
prd->assert_pp_lease = (unsigned) !!config.arrival_of_data_asserts_pp_and_ep_liveliness; prd->assert_pp_lease = (unsigned) !!config.arrival_of_data_asserts_pp_and_ep_liveliness;
ut_avlInit (&prd_writers_treedef, &prd->writers); ut_avlInit (&prd_writers_treedef, &prd->writers);
/* locking the entity prevents matching while the built-in topic hasn't been published yet */
ddsrt_mutex_lock (&prd->e.lock);
ephash_insert_proxy_reader_guid (prd); ephash_insert_proxy_reader_guid (prd);
match_proxy_reader_with_writers (prd, tnow);
ddsi_plugin.builtintopic_write (&prd->e, timestamp, true); ddsi_plugin.builtintopic_write (&prd->e, timestamp, true);
ddsrt_mutex_unlock (&prd->e.lock);
match_proxy_reader_with_writers (prd, tnow);
return 0; return 0;
} }