crash invoking data available on built-in reader
The DDSI reader/writer pointers are now returned as out parameters instead of as a return value, so that the upper-layer reference is set before any listener can be invoked. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
31b8baa03b
commit
a6b5229510
4 changed files with 42 additions and 40 deletions
|
@ -472,11 +472,11 @@ dds_create_reader(
|
|||
ddsrt_mutex_unlock(&sub->m_entity.m_mutex);
|
||||
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic,
|
||||
rqos, rhc, dds_reader_status_cb, rd);
|
||||
ret = new_reader(&rd->m_rd, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic,
|
||||
rqos, rhc, dds_reader_status_cb, rd);
|
||||
ddsrt_mutex_lock(&sub->m_entity.m_mutex);
|
||||
ddsrt_mutex_lock(&tp->m_entity.m_mutex);
|
||||
assert (rd->m_rd);
|
||||
assert (ret == DDS_RETCODE_OK);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
/* For persistent data register reader with durability */
|
||||
|
|
|
@ -456,10 +456,10 @@ dds_create_writer(
|
|||
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
|
||||
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
|
||||
ret = new_writer(&wr->m_wr, &wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
|
||||
ddsrt_mutex_lock (&pub->m_entity.m_mutex);
|
||||
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
|
||||
assert(wr->m_wr);
|
||||
assert(ret == DDS_RETCODE_OK);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
dds_topic_unlock(tp);
|
||||
dds_publisher_unlock(pub);
|
||||
|
|
|
@ -479,9 +479,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
|
|||
GUID "ppguid". May return NULL if participant unknown or
|
||||
writer/reader already known. */
|
||||
|
||||
struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg);
|
||||
dds_retcode_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg);
|
||||
|
||||
struct reader *new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg);
|
||||
dds_retcode_t new_reader (struct reader **rd_out, struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg);
|
||||
|
||||
struct whc_node;
|
||||
struct whc_state;
|
||||
|
|
|
@ -88,8 +88,8 @@ static const unsigned prismtech_builtin_writers_besmask =
|
|||
NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER |
|
||||
NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER;
|
||||
|
||||
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg);
|
||||
static struct reader * new_reader_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg);
|
||||
static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg);
|
||||
static dds_retcode_t new_reader_guid (struct reader **rd_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg);
|
||||
static struct participant *ref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
|
||||
static void unref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
|
||||
static void delete_proxy_group_locked (struct proxy_group *pgroup, nn_wctime_t timestamp, int isimplicit);
|
||||
|
@ -488,7 +488,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
|
|||
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
|
||||
{
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
/* But we need the as_disc address set for SPDP, because we need to
|
||||
send it to everyone regardless of the existence of readers. */
|
||||
{
|
||||
|
@ -511,23 +511,23 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
|
|||
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
|
||||
{
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_WRITER;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER;
|
||||
}
|
||||
|
||||
|
@ -535,7 +535,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
|
|||
{
|
||||
/* TODO: make this one configurable, we don't want all participants to publish all topics (or even just those that they use themselves) */
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_TOPIC_ANNOUNCER;
|
||||
}
|
||||
|
||||
|
@ -543,7 +543,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
|
|||
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
|
||||
{
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
|
||||
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
|
||||
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER;
|
||||
}
|
||||
|
||||
|
@ -551,31 +551,31 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
|
|||
if (!(flags & RTPS_PF_NO_BUILTIN_READERS))
|
||||
{
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER);
|
||||
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, NULL, NULL, NULL);
|
||||
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, NULL, NULL, NULL);
|
||||
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER);
|
||||
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER);
|
||||
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER);
|
||||
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_READER);
|
||||
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_READER;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_READER);
|
||||
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_READER;
|
||||
|
||||
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_READER);
|
||||
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
|
||||
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_READER;
|
||||
|
||||
}
|
||||
|
@ -2779,7 +2779,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
|
|||
local_reader_ary_init (&wr->rdary);
|
||||
}
|
||||
|
||||
static struct writer *new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity)
|
||||
static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity)
|
||||
{
|
||||
struct writer *wr;
|
||||
nn_mtime_t tnow = now_mt ();
|
||||
|
@ -2790,6 +2790,8 @@ static struct writer *new_writer_guid (const struct nn_guid *guid, const struct
|
|||
|
||||
new_reader_writer_common (guid, topic, xqos);
|
||||
wr = ddsrt_malloc (sizeof (*wr));
|
||||
if (wr_out)
|
||||
*wr_out = wr;
|
||||
|
||||
/* want a pointer to the participant so that a parallel call to
|
||||
delete_participant won't interfere with our ability to address
|
||||
|
@ -2822,27 +2824,25 @@ static struct writer *new_writer_guid (const struct nn_guid *guid, const struct
|
|||
resched_xevent_if_earlier (pp->pmd_update_xevent, tsched);
|
||||
}
|
||||
|
||||
return wr;
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg)
|
||||
dds_retcode_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg)
|
||||
{
|
||||
struct participant *pp;
|
||||
struct writer * wr;
|
||||
|
||||
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
|
||||
{
|
||||
DDS_LOG(DDS_LC_DISCOVERY, "new_writer - participant %x:%x:%x:%x not found\n", PGUID (*ppguid));
|
||||
return NULL;
|
||||
return DDS_RETCODE_NOT_FOUND;
|
||||
}
|
||||
/* participant can't be freed while we're mucking around cos we are
|
||||
awake and do not touch the thread's vtime (ephash_lookup already
|
||||
verifies we're awake) */
|
||||
wrguid->prefix = pp->e.guid.prefix;
|
||||
if (pp_allocate_entityid (&wrguid->entityid, NN_ENTITYID_KIND_WRITER_WITH_KEY, pp) < 0)
|
||||
return NULL;
|
||||
wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg);
|
||||
return wr;
|
||||
return DDS_RETCODE_OUT_OF_RESOURCES;
|
||||
return new_writer_guid (wr_out, wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg);
|
||||
}
|
||||
|
||||
struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc)
|
||||
|
@ -3170,8 +3170,9 @@ static void leave_mcast_helper (const nn_locator_t *n, void * varg)
|
|||
}
|
||||
#endif /* DDSI_INCLUDE_NETWORK_PARTITIONS */
|
||||
|
||||
static struct reader * new_reader_guid
|
||||
static dds_retcode_t new_reader_guid
|
||||
(
|
||||
struct reader **rd_out,
|
||||
const struct nn_guid *guid,
|
||||
const struct nn_guid *group_guid,
|
||||
struct participant *pp,
|
||||
|
@ -3193,6 +3194,8 @@ static struct reader * new_reader_guid
|
|||
|
||||
new_reader_writer_common (guid, topic, xqos);
|
||||
rd = ddsrt_malloc (sizeof (*rd));
|
||||
if (rd_out)
|
||||
*rd_out = rd;
|
||||
|
||||
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp);
|
||||
|
||||
|
@ -3300,11 +3303,12 @@ static struct reader * new_reader_guid
|
|||
match_reader_with_local_writers (rd, tnow);
|
||||
ddsi_plugin.builtintopic_write (&rd->e, now(), true);
|
||||
sedp_write_reader (rd);
|
||||
return rd;
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
struct reader * new_reader
|
||||
dds_retcode_t new_reader
|
||||
(
|
||||
struct reader **rd_out,
|
||||
struct nn_guid *rdguid,
|
||||
const struct nn_guid *group_guid,
|
||||
const struct nn_guid *ppguid,
|
||||
|
@ -3316,18 +3320,16 @@ struct reader * new_reader
|
|||
)
|
||||
{
|
||||
struct participant * pp;
|
||||
struct reader * rd;
|
||||
|
||||
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
|
||||
{
|
||||
DDS_LOG(DDS_LC_DISCOVERY, "new_reader - participant %x:%x:%x:%x not found\n", PGUID (*ppguid));
|
||||
return NULL;
|
||||
return DDS_RETCODE_NOT_FOUND;
|
||||
}
|
||||
rdguid->prefix = pp->e.guid.prefix;
|
||||
if (pp_allocate_entityid (&rdguid->entityid, NN_ENTITYID_KIND_READER_WITH_KEY, pp) < 0)
|
||||
return NULL;
|
||||
rd = new_reader_guid (rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg);
|
||||
return rd;
|
||||
return DDS_RETCODE_OUT_OF_RESOURCES;
|
||||
return new_reader_guid (rd_out, rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg);
|
||||
}
|
||||
|
||||
static void gc_delete_reader (struct gcreq *gcreq)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue