crash invoking data available on built-in reader

The DDSI reader/writer pointers are now returned as out parameters
instead of as a return value, so that the upper-layer reference is set
before any listener can be invoked.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-04-15 11:04:26 +02:00 committed by eboasson
parent c891fdc685
commit 6e2068173a
4 changed files with 42 additions and 40 deletions

View file

@ -472,11 +472,11 @@ dds_create_reader(
ddsrt_mutex_unlock(&sub->m_entity.m_mutex);
thread_state_awake (lookup_thread_state ());
rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic,
rqos, rhc, dds_reader_status_cb, rd);
ret = new_reader(&rd->m_rd, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic,
rqos, rhc, dds_reader_status_cb, rd);
ddsrt_mutex_lock(&sub->m_entity.m_mutex);
ddsrt_mutex_lock(&tp->m_entity.m_mutex);
assert (rd->m_rd);
assert (ret == DDS_RETCODE_OK);
thread_state_asleep (lookup_thread_state ());
/* For persistent data register reader with durability */

View file

@ -456,10 +456,10 @@ dds_create_writer(
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
thread_state_awake (lookup_thread_state ());
wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
ret = new_writer(&wr->m_wr, &wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
ddsrt_mutex_lock (&pub->m_entity.m_mutex);
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
assert(wr->m_wr);
assert(ret == DDS_RETCODE_OK);
thread_state_asleep (lookup_thread_state ());
dds_topic_unlock(tp);
dds_publisher_unlock(pub);

View file

@ -479,9 +479,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
GUID "ppguid". May return NULL if participant unknown or
writer/reader already known. */
struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg);
dds_retcode_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg);
struct reader *new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg);
dds_retcode_t new_reader (struct reader **rd_out, struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg);
struct whc_node;
struct whc_state;

View file

@ -88,8 +88,8 @@ static const unsigned prismtech_builtin_writers_besmask =
NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER |
NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER;
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg);
static struct reader * new_reader_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg);
static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg);
static dds_retcode_t new_reader_guid (struct reader **rd_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg);
static struct participant *ref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
static void unref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity);
static void delete_proxy_group_locked (struct proxy_group *pgroup, nn_wctime_t timestamp, int isimplicit);
@ -488,7 +488,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS);
/* But we need the as_disc address set for SPDP, because we need to
send it to everyone regardless of the existence of readers. */
{
@ -511,23 +511,23 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_WRITER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER;
}
@ -535,7 +535,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
{
/* TODO: make this one configurable, we don't want all participants to publish all topics (or even just those that they use themselves) */
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_TOPIC_ANNOUNCER;
}
@ -543,7 +543,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER);
new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER;
}
@ -551,31 +551,31 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
if (!(flags & RTPS_PF_NO_BUILTIN_READERS))
{
subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER);
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, NULL, NULL, NULL);
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, NULL, NULL, NULL);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER);
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER);
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR;
subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER);
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_READER);
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_READER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_READER);
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_READER;
subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_READER);
new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL);
pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_READER;
}
@ -2779,7 +2779,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se
local_reader_ary_init (&wr->rdary);
}
static struct writer *new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity)
static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity)
{
struct writer *wr;
nn_mtime_t tnow = now_mt ();
@ -2790,6 +2790,8 @@ static struct writer *new_writer_guid (const struct nn_guid *guid, const struct
new_reader_writer_common (guid, topic, xqos);
wr = ddsrt_malloc (sizeof (*wr));
if (wr_out)
*wr_out = wr;
/* want a pointer to the participant so that a parallel call to
delete_participant won't interfere with our ability to address
@ -2822,27 +2824,25 @@ static struct writer *new_writer_guid (const struct nn_guid *guid, const struct
resched_xevent_if_earlier (pp->pmd_update_xevent, tsched);
}
return wr;
return DDS_RETCODE_OK;
}
struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg)
dds_retcode_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg)
{
struct participant *pp;
struct writer * wr;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
{
DDS_LOG(DDS_LC_DISCOVERY, "new_writer - participant %x:%x:%x:%x not found\n", PGUID (*ppguid));
return NULL;
return DDS_RETCODE_NOT_FOUND;
}
/* participant can't be freed while we're mucking around cos we are
awake and do not touch the thread's vtime (ephash_lookup already
verifies we're awake) */
wrguid->prefix = pp->e.guid.prefix;
if (pp_allocate_entityid (&wrguid->entityid, NN_ENTITYID_KIND_WRITER_WITH_KEY, pp) < 0)
return NULL;
wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg);
return wr;
return DDS_RETCODE_OUT_OF_RESOURCES;
return new_writer_guid (wr_out, wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg);
}
struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc)
@ -3170,8 +3170,9 @@ static void leave_mcast_helper (const nn_locator_t *n, void * varg)
}
#endif /* DDSI_INCLUDE_NETWORK_PARTITIONS */
static struct reader * new_reader_guid
static dds_retcode_t new_reader_guid
(
struct reader **rd_out,
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp,
@ -3193,6 +3194,8 @@ static struct reader * new_reader_guid
new_reader_writer_common (guid, topic, xqos);
rd = ddsrt_malloc (sizeof (*rd));
if (rd_out)
*rd_out = rd;
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp);
@ -3300,11 +3303,12 @@ static struct reader * new_reader_guid
match_reader_with_local_writers (rd, tnow);
ddsi_plugin.builtintopic_write (&rd->e, now(), true);
sedp_write_reader (rd);
return rd;
return DDS_RETCODE_OK;
}
struct reader * new_reader
dds_retcode_t new_reader
(
struct reader **rd_out,
struct nn_guid *rdguid,
const struct nn_guid *group_guid,
const struct nn_guid *ppguid,
@ -3316,18 +3320,16 @@ struct reader * new_reader
)
{
struct participant * pp;
struct reader * rd;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
{
DDS_LOG(DDS_LC_DISCOVERY, "new_reader - participant %x:%x:%x:%x not found\n", PGUID (*ppguid));
return NULL;
return DDS_RETCODE_NOT_FOUND;
}
rdguid->prefix = pp->e.guid.prefix;
if (pp_allocate_entityid (&rdguid->entityid, NN_ENTITYID_KIND_READER_WITH_KEY, pp) < 0)
return NULL;
rd = new_reader_guid (rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg);
return rd;
return DDS_RETCODE_OUT_OF_RESOURCES;
return new_reader_guid (rd_out, rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg);
}
static void gc_delete_reader (struct gcreq *gcreq)