diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index b3f241b..b039495 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -472,11 +472,11 @@ dds_create_reader( ddsrt_mutex_unlock(&sub->m_entity.m_mutex); thread_state_awake (lookup_thread_state ()); - rd->m_rd = new_reader(&rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic, - rqos, rhc, dds_reader_status_cb, rd); + ret = new_reader(&rd->m_rd, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic, + rqos, rhc, dds_reader_status_cb, rd); ddsrt_mutex_lock(&sub->m_entity.m_mutex); ddsrt_mutex_lock(&tp->m_entity.m_mutex); - assert (rd->m_rd); + assert (ret == DDS_RETCODE_OK); thread_state_asleep (lookup_thread_state ()); /* For persistent data register reader with durability */ diff --git a/src/core/ddsc/src/dds_writer.c b/src/core/ddsc/src/dds_writer.c index 5bd81d6..cd83d3e 100644 --- a/src/core/ddsc/src/dds_writer.c +++ b/src/core/ddsc/src/dds_writer.c @@ -456,10 +456,10 @@ dds_create_writer( ddsrt_mutex_unlock (&pub->m_entity.m_mutex); thread_state_awake (lookup_thread_state ()); - wr->m_wr = new_writer(&wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr); + ret = new_writer(&wr->m_wr, &wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr); ddsrt_mutex_lock (&pub->m_entity.m_mutex); ddsrt_mutex_lock (&tp->m_entity.m_mutex); - assert(wr->m_wr); + assert(ret == DDS_RETCODE_OK); thread_state_asleep (lookup_thread_state ()); dds_topic_unlock(tp); dds_publisher_unlock(pub); diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 6f79aac..9138e02 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -479,9 +479,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity GUID "ppguid". May return NULL if participant unknown or writer/reader already known. */ -struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg); +dds_retcode_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg); -struct reader *new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg); +dds_retcode_t new_reader (struct reader **rd_out, struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg); struct whc_node; struct whc_state; diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 3dffa18..565c72b 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -88,8 +88,8 @@ static const unsigned prismtech_builtin_writers_besmask = NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER | NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER; -static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg); -static struct reader * new_reader_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg); +static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_cbarg); +static dds_retcode_t new_reader_guid (struct reader **rd_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc *rhc, status_cb_t status_cb, void *status_cbarg); static struct participant *ref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity); static void unref_participant (struct participant *pp, const struct nn_guid *guid_of_refing_entity); static void delete_proxy_group_locked (struct proxy_group *pgroup, nn_wctime_t timestamp, int isimplicit); @@ -488,7 +488,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, whc_new(1, 1, 1), LAST_WR_PARAMS); /* But we need the as_disc address set for SPDP, because we need to send it to everyone regardless of the existence of readers. */ { @@ -511,23 +511,23 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_WRITER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_WRITER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_WRITER; } @@ -535,7 +535,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis { /* TODO: make this one configurable, we don't want all participants to publish all topics (or even just those that they use themselves) */ subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_TOPIC_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_TOPIC_ANNOUNCER; } @@ -543,7 +543,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis if (!(flags & RTPS_PF_NO_BUILTIN_WRITERS)) { subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER); - new_writer_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); + new_writer_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_wr, whc_new(1, 1, 1), LAST_WR_PARAMS); pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER; } @@ -551,31 +551,31 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis if (!(flags & RTPS_PF_NO_BUILTIN_READERS)) { subguid.entityid = to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_READER); - new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, NULL, NULL, NULL); + new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.spdp_endpoint_xqos, NULL, NULL, NULL); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER); - new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); + new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER); - new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); + new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); pp->bes |= NN_DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR; subguid.entityid = to_entityid (NN_ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER); - new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); + new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); pp->bes |= NN_BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_READER); - new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); + new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PARTICIPANT_READER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_PUBLISHER_READER); - new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); + new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_PUBLISHER_READER; subguid.entityid = to_entityid (NN_ENTITYID_SEDP_BUILTIN_CM_SUBSCRIBER_READER); - new_reader_guid (&subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); + new_reader_guid (NULL, &subguid, &group_guid, pp, NULL, &gv.builtin_endpoint_xqos_rd, NULL, NULL, NULL); pp->prismtech_bes |= NN_DISC_BUILTIN_ENDPOINT_CM_SUBSCRIBER_READER; } @@ -2779,7 +2779,7 @@ static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_se local_reader_ary_init (&wr->rdary); } -static struct writer *new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity) +static dds_retcode_t new_writer_guid (struct writer **wr_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity) { struct writer *wr; nn_mtime_t tnow = now_mt (); @@ -2790,6 +2790,8 @@ static struct writer *new_writer_guid (const struct nn_guid *guid, const struct new_reader_writer_common (guid, topic, xqos); wr = ddsrt_malloc (sizeof (*wr)); + if (wr_out) + *wr_out = wr; /* want a pointer to the participant so that a parallel call to delete_participant won't interfere with our ability to address @@ -2822,27 +2824,25 @@ static struct writer *new_writer_guid (const struct nn_guid *guid, const struct resched_xevent_if_earlier (pp->pmd_update_xevent, tsched); } - return wr; + return DDS_RETCODE_OK; } -struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg) +dds_retcode_t new_writer (struct writer **wr_out, struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg) { struct participant *pp; - struct writer * wr; if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL) { DDS_LOG(DDS_LC_DISCOVERY, "new_writer - participant %x:%x:%x:%x not found\n", PGUID (*ppguid)); - return NULL; + return DDS_RETCODE_NOT_FOUND; } /* participant can't be freed while we're mucking around cos we are awake and do not touch the thread's vtime (ephash_lookup already verifies we're awake) */ wrguid->prefix = pp->e.guid.prefix; if (pp_allocate_entityid (&wrguid->entityid, NN_ENTITYID_KIND_WRITER_WITH_KEY, pp) < 0) - return NULL; - wr = new_writer_guid (wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg); - return wr; + return DDS_RETCODE_OUT_OF_RESOURCES; + return new_writer_guid (wr_out, wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg); } struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc) @@ -3170,8 +3170,9 @@ static void leave_mcast_helper (const nn_locator_t *n, void * varg) } #endif /* DDSI_INCLUDE_NETWORK_PARTITIONS */ -static struct reader * new_reader_guid +static dds_retcode_t new_reader_guid ( + struct reader **rd_out, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, @@ -3193,6 +3194,8 @@ static struct reader * new_reader_guid new_reader_writer_common (guid, topic, xqos); rd = ddsrt_malloc (sizeof (*rd)); + if (rd_out) + *rd_out = rd; endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp); @@ -3300,11 +3303,12 @@ static struct reader * new_reader_guid match_reader_with_local_writers (rd, tnow); ddsi_plugin.builtintopic_write (&rd->e, now(), true); sedp_write_reader (rd); - return rd; + return DDS_RETCODE_OK; } -struct reader * new_reader +dds_retcode_t new_reader ( + struct reader **rd_out, struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, @@ -3316,18 +3320,16 @@ struct reader * new_reader ) { struct participant * pp; - struct reader * rd; if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL) { DDS_LOG(DDS_LC_DISCOVERY, "new_reader - participant %x:%x:%x:%x not found\n", PGUID (*ppguid)); - return NULL; + return DDS_RETCODE_NOT_FOUND; } rdguid->prefix = pp->e.guid.prefix; if (pp_allocate_entityid (&rdguid->entityid, NN_ENTITYID_KIND_READER_WITH_KEY, pp) < 0) - return NULL; - rd = new_reader_guid (rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg); - return rd; + return DDS_RETCODE_OUT_OF_RESOURCES; + return new_reader_guid (rd_out, rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg); } static void gc_delete_reader (struct gcreq *gcreq)