diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index fb5b825..78533c8 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -345,7 +345,6 @@ struct proxy_endpoint_common struct proxy_endpoint_common *next_ep; /* next \ endpoint belonging to this proxy participant */ struct proxy_endpoint_common *prev_ep; /* prev / -- this is in arbitrary ordering */ struct dds_qos *xqos; /* proxy endpoint QoS lives here; FIXME: local ones should have it moved to common as well */ - struct ddsi_sertopic * topic; /* topic may be NULL: for built-ins, but also for never-yet matched proxies (so we don't have to know the topic; when we match, we certainly do know) */ struct addrset *as; /* address set to use for communicating with this endpoint */ nn_guid_t group_guid; /* 0:0:0:0 if not available */ nn_vendorid_t vendor; /* cached from proxypp->vendor */ diff --git a/src/core/ddsi/src/q_ddsi_discovery.c b/src/core/ddsi/src/q_ddsi_discovery.c index 227c961..5b799d1 100644 --- a/src/core/ddsi/src/q_ddsi_discovery.c +++ b/src/core/ddsi/src/q_ddsi_discovery.c @@ -1643,7 +1643,6 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str /* Built-ins still do their own deserialization (SPDP <=> pwr == NULL)). */ - assert (pwr == NULL || pwr->c.topic == NULL); if (statusinfo == 0) { if (datasz == 0 || !(data_smhdr_flags & DATA_FLAG_DATAFLAG)) diff --git a/src/core/ddsi/src/q_debmon.c b/src/core/ddsi/src/q_debmon.c index 9012f7b..c11b78c 100644 --- a/src/core/ddsi/src/q_debmon.c +++ b/src/core/ddsi/src/q_debmon.c @@ -107,8 +107,7 @@ static int print_addrset_if_notempty (ddsi_tran_conn_t conn, const char *prefix, return print_addrset (conn, prefix, as, suffix); } -static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, - const struct dds_qos *xqos, const struct ddsi_sertopic *topic) +static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct dds_qos *xqos) { int x = 0; x += cpf (conn, " %s %x:%x:%x:%x ", label, PGUID (e->guid)); @@ -118,24 +117,24 @@ static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label, for (uint32_t i = 0; i < xqos->partition.n; i++) x += cpf (conn, "%s%s", i == 0 ? "" : ",", xqos->partition.strs[i]); if (xqos->partition.n > 1) cpf (conn, "}"); - x += cpf (conn, ".%s/%s", - topic && topic->name ? topic->name : (xqos->present & QP_TOPIC_NAME) ? xqos->topic_name : "(null)", - topic && topic->type_name ? topic->type_name : (xqos->present & QP_TYPE_NAME) ? xqos->type_name : "(null)"); + const char *topic_name = (xqos->present & QP_TOPIC_NAME) ? xqos->topic_name : "null"; + const char *topic_typename = (xqos->present & QP_TYPE_NAME) ? xqos->type_name : "null"; + x += cpf (conn, ".%s/%s", topic_name, topic_typename); } cpf (conn, "\n"); return x; } -static int print_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct endpoint_common *c, const struct dds_qos *xqos, const struct ddsi_sertopic *topic) +static int print_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct endpoint_common *c, const struct dds_qos *xqos) { DDSRT_UNUSED_ARG (c); - return print_any_endpoint_common (conn, label, e, xqos, topic); + return print_any_endpoint_common (conn, label, e, xqos); } static int print_proxy_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct proxy_endpoint_common *c) { int x = 0; - x += print_any_endpoint_common (conn, label, e, c->xqos, c->topic); + x += print_any_endpoint_common (conn, label, e, c->xqos); x += print_addrset_if_notempty (conn, " as", c->as, "\n"); return x; } @@ -165,7 +164,7 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global if (r->c.pp != p) continue; ddsrt_mutex_lock (&r->e.lock); - print_endpoint_common (conn, "rd", &r->e, &r->c, r->xqos, r->topic); + print_endpoint_common (conn, "rd", &r->e, &r->c, r->xqos); #ifdef DDSI_INCLUDE_NETWORK_PARTITIONS x += print_addrset_if_notempty (conn, " as", r->as, "\n"); #endif @@ -188,7 +187,7 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global if (w->c.pp != p) continue; ddsrt_mutex_lock (&w->e.lock); - print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos, w->topic); + print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos); whc_get_state(w->whc, &whcst); x += cpf (conn, " whc [%lld,%lld] unacked %"PRIuSIZE"%s [%u,%u] seq %lld seq_xmit %lld cs_seq %lld\n", whcst.min_seq, whcst.max_seq, whcst.unacked_bytes, diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index 06afbbf..0c71cc0 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -1855,8 +1855,7 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader if (ddsrt_avl_lookup_ipath (&pwr_readers_treedef, &pwr->readers, &rd->e.guid, &path)) goto already_matched; - if (pwr->c.topic == NULL && rd->topic) - pwr->c.topic = ddsi_sertopic_ref (rd->topic); + assert (rd->topic || is_builtin_endpoint (rd->e.guid.entityid, NN_VENDORID_ECLIPSE)); if (pwr->ddsi2direct_cb == 0 && rd->ddsi2direct_cb != 0) { pwr->ddsi2direct_cb = rd->ddsi2direct_cb; @@ -1981,7 +1980,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader return; already_matched: - assert (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor) ? (pwr->c.topic == NULL) : (pwr->c.topic != NULL)); ELOGDISC (pwr, " proxy_writer_add_connection(pwr "PGUIDFMT" rd "PGUIDFMT") - already connected\n", PGUID (pwr->e.guid), PGUID (rd->e.guid)); ddsrt_mutex_unlock (&pwr->e.lock); @@ -1996,8 +1994,6 @@ static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer m->wr_guid = wr->e.guid; ddsrt_mutex_lock (&prd->e.lock); - if (prd->c.topic == NULL) - prd->c.topic = ddsi_sertopic_ref (wr->topic); if (ddsrt_avl_lookup_ipath (&prd_writers_treedef, &prd->writers, &wr->e.guid, &path)) { ELOGDISC (prd, " proxy_reader_add_connection(wr "PGUIDFMT" prd "PGUIDFMT") - already connected\n", @@ -2007,6 +2003,7 @@ static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer } else { + assert (wr->topic || is_builtin_endpoint (wr->e.guid.entityid, NN_VENDORID_ECLIPSE)); ELOGDISC (prd, " proxy_reader_add_connection(wr "PGUIDFMT" prd "PGUIDFMT")\n", PGUID (wr->e.guid), PGUID (prd->e.guid)); ddsrt_avl_insert_ipath (&prd_writers_treedef, &prd->writers, m, &path); @@ -4009,7 +4006,6 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en entity_common_init (e, proxypp->e.gv, guid, name, kind, tcreate, proxypp->vendor, false); c->xqos = nn_xqos_dup (&plist->qos); c->as = ref_addrset (as); - c->topic = NULL; /* set from first matching reader/writer */ c->vendor = proxypp->vendor; c->seq = seq; @@ -4024,12 +4020,9 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_endpoint_common *c) { unref_proxy_participant (c->proxypp, c); - - ddsi_sertopic_unref (c->topic); nn_xqos_fini (c->xqos); ddsrt_free (c->xqos); unref_addrset (c->as); - entity_common_fini (e); } diff --git a/src/core/ddsi/src/q_receive.c b/src/core/ddsi/src/q_receive.c index 00b678c..a54fd68 100644 --- a/src/core/ddsi/src/q_receive.c +++ b/src/core/ddsi/src/q_receive.c @@ -1739,16 +1739,10 @@ static struct ddsi_serdata *get_serdata (struct ddsi_sertopic const * const topi return sd; } -static struct ddsi_serdata *extract_sample_from_data -( - const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags, - const nn_plist_t *qos, const struct nn_rdata *fragchain, unsigned statusinfo, - nn_wctime_t tstamp, struct ddsi_sertopic const * const topic -) +static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **tk1, struct q_globals *gv, const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags, const nn_plist_t *qos, const struct nn_rdata *fragchain, unsigned statusinfo, nn_wctime_t tstamp, struct ddsi_sertopic const * const topic) { - static const nn_guid_t null_guid = {{{0,0,0,0,0,0,0,0,0,0,0,0}},{0}}; const char *failmsg = NULL; - struct ddsi_serdata * sample = NULL; + struct ddsi_serdata *sample = NULL; if (statusinfo == 0) { @@ -1756,8 +1750,11 @@ static struct ddsi_serdata *extract_sample_from_data if (!(data_smhdr_flags & DATA_FLAG_DATAFLAG) || sampleinfo->size == 0) { const struct proxy_writer *pwr = sampleinfo->pwr; - nn_guid_t guid = pwr ? pwr->e.guid : null_guid; /* can't be null _yet_, but that might change some day */ - DDS_CTRACE (&sampleinfo->rst->gv->logconfig, + nn_guid_t guid; + /* pwr can't currently be null, but that might change some day, and this being + an error path, it doesn't hurt to survive that */ + if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid)); + DDS_CTRACE (&gv->logconfig, "data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": write without proper payload (data_smhdr_flags 0x%x size %"PRIu32")\n", sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1], PGUID (guid), sampleinfo->seq, @@ -1785,7 +1782,7 @@ static struct ddsi_serdata *extract_sample_from_data { /* RTI always tries to make us survive on the keyhash. RTI must mend its ways. */ - if (NN_STRICT_P (sampleinfo->rst->gv->config)) + if (NN_STRICT_P (gv->config)) failmsg = "no content"; else if (!(qos->present & PP_KEYHASH)) failmsg = "qos present but without keyhash"; @@ -1805,17 +1802,32 @@ static struct ddsi_serdata *extract_sample_from_data { /* No message => error out */ const struct proxy_writer *pwr = sampleinfo->pwr; - nn_guid_t guid = pwr ? pwr->e.guid : null_guid; /* can't be null _yet_, but that might change some day */ - DDS_CWARNING (&sampleinfo->rst->gv->logconfig, + nn_guid_t guid; + if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid)); + DDS_CWARNING (&gv->logconfig, "data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": deserialization %s/%s failed (%s)\n", sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1], PGUID (guid), sampleinfo->seq, topic->name, topic->type_name, failmsg ? failmsg : "for reasons unknown"); } + else + { + if ((*tk1 = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, sample)) == NULL) + { + ddsi_serdata_unref (sample); + sample = NULL; + } + } return sample; } +static void free_sample_after_store (struct q_globals *gv, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk) +{ + ddsi_tkmap_instance_unref (gv->m_tkmap, tk); + ddsi_serdata_unref (sample); +} + unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr) { switch ((SubmessageKind_t) smhdr->submessageId) @@ -1837,17 +1849,36 @@ unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr) } } +static struct reader *proxy_writer_first_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it) +{ + struct pwr_rd_match *m; + struct reader *rd; + for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, it); m != NULL; m = ddsrt_avl_iter_next (it)) + if (m->in_sync == PRMSS_SYNC && (rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL) + return rd; + return NULL; +} + +static struct reader *proxy_writer_next_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it) +{ + struct pwr_rd_match *m; + struct reader *rd; + for (m = ddsrt_avl_iter_next (it); m != NULL; m = ddsrt_avl_iter_next (it)) + if (m->in_sync == PRMSS_SYNC && (rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL) + return rd; + return NULL; +} + static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const nn_guid_t *rdguid, int pwr_locked) { struct receiver_state const * const rst = sampleinfo->rst; + struct q_globals * const gv = rst->gv; struct proxy_writer * const pwr = sampleinfo->pwr; - struct ddsi_sertopic const * const topic = pwr->c.topic; unsigned statusinfo; Data_DataFrag_common_t *msg; unsigned char data_smhdr_flags; nn_plist_t qos; int need_keyhash; - struct ddsi_serdata * payload; if (pwr->ddsi2direct_cb) { @@ -1855,22 +1886,11 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st return 0; } - /* NOTE: pwr->e.lock need not be held for correct processing (though - it may be useful to hold it for maintaining order all the way to - v_groupWrite): guid is constant, set_vmsg_header() explains about - the qos issue (and will have to deal with that); and - pwr->groupset takes care of itself. FIXME: groupset may be - taking care of itself, but it is currently doing so in an - annoyingly simplistic manner ... */ - /* FIXME: fragments are now handled by copying the message to freshly malloced memory (see defragment()) ... that'll have to change eventually */ assert (fragchain->min == 0); assert (!is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor)); - /* Can only get here if at some point readers existed => topic can't - still be NULL, even if there are no readers at the moment */ - assert (topic != NULL); /* Luckily, the Data header (up to inline QoS) is a prefix of the DataFrag header, so for the fixed-position things that we're @@ -1906,13 +1926,13 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st src.encoding = (msg->smhdr.flags & SMFLAG_ENDIANNESS) ? PL_CDR_LE : PL_CDR_BE; src.buf = NN_RMSG_PAYLOADOFF (fragchain->rmsg, qos_offset); src.bufsz = NN_RDATA_PAYLOAD_OFF (fragchain) - qos_offset; - src.strict = NN_STRICT_P (rst->gv->config); - src.factory = rst->gv->m_factory; - src.logconfig = &rst->gv->logconfig; + src.strict = NN_STRICT_P (gv->config); + src.factory = gv->m_factory; + src.logconfig = &gv->logconfig; if ((plist_ret = nn_plist_init_frommsg (&qos, NULL, PP_STATUSINFO | PP_KEYHASH | PP_COHERENT_SET, 0, &src)) < 0) { if (plist_ret != DDS_RETCODE_UNSUPPORTED) - DDS_CWARNING (&sampleinfo->rst->gv->logconfig, + DDS_CWARNING (&gv->logconfig, "data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": invalid inline qos\n", src.vendorid.id[0], src.vendorid.id[1], PGUID (pwr->e.guid), sampleinfo->seq); return 0; @@ -1920,67 +1940,54 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st statusinfo = (qos.present & PP_STATUSINFO) ? qos.statusinfo : 0; } - /* Note: deserializing done potentially many times for a historical - data sample (once per reader that cares about that data). For - now, this is accepted as sufficiently abnormal behaviour to not - worry about it. */ - { - nn_wctime_t tstamp; - if (sampleinfo->timestamp.v != NN_WCTIME_INVALID.v) - tstamp = sampleinfo->timestamp; - else - tstamp.v = 0; - payload = extract_sample_from_data (sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, topic); - } - if (payload == NULL) - { - goto no_payload; - } + /* FIXME: should it be 0, local wall clock time or INVALID? */ + const nn_wctime_t tstamp = (sampleinfo->timestamp.v != NN_WCTIME_INVALID.v) ? sampleinfo->timestamp : ((nn_wctime_t) {0}); + struct proxy_writer_info pwr_info; + make_proxy_writer_info (&pwr_info, &pwr->e, pwr->c.xqos); - /* Generate the DDS_SampleInfo (which is faked to some extent - because we don't actually have a data reader) */ - struct ddsi_tkmap_instance *tk; - if ((tk = ddsi_tkmap_lookup_instance_ref (pwr->e.gv->m_tkmap, payload)) != NULL) + if (rdguid == NULL) { - struct proxy_writer_info pwr_info; - make_proxy_writer_info (&pwr_info, &pwr->e, pwr->c.xqos); + ETRACE (pwr, " %"PRId64"=>EVERYONE\n", sampleinfo->seq); - if (rdguid == NULL) + /* FIXME: Retry loop, for re-delivery of rejected reliable samples. Is a + temporary hack till throttling back of writer is implemented (with late + acknowledgement of sample and nack). */ + retry: + ddsrt_mutex_lock (&pwr->rdary.rdary_lock); + if (pwr->rdary.fastpath_ok) { - ETRACE (pwr, " %"PRId64"=>EVERYONE\n", sampleinfo->seq); - - /* FIXME: pwr->rdary is an array of pointers to attached - readers. There's only one thread delivering data for the - proxy writer (as long as there is only one receive thread), - so could get away with not locking at all, and doing safe - updates + GC of rdary instead. */ - - /* Retry loop, for re-delivery of rejected reliable samples. Is a - temporary hack till throttling back of writer is implemented - (with late acknowledgement of sample and nack). */ - retry: - - ddsrt_mutex_lock (&pwr->rdary.rdary_lock); - if (pwr->rdary.fastpath_ok) + struct reader ** const rdary = pwr->rdary.rdary; + if (rdary[0]) { - struct reader ** const rdary = pwr->rdary.rdary; - for (uint32_t i = 0; rdary[i]; i++) + struct ddsi_serdata *payload; + struct ddsi_tkmap_instance *tk; + if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rdary[0]->topic)) != NULL) { - ETRACE (pwr, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid)); - if (!rhc_store (rdary[i]->rhc, &pwr_info, payload, tk)) - { - if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); - dds_sleepfor (DDS_MSECS (10)); - if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); - goto retry; - } + uint32_t i = 0; + do { + ETRACE (pwr, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid)); + if (!rhc_store (rdary[i]->rhc, &pwr_info, payload, tk)) + { + if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); + /* It is painful to drop the sample, but there is no guarantee that the readers + will still be there after unlocking; indeed, it is even possible that the + topic definition got replaced in the meantime. Fortunately, this is in + the midst of a FIXME for many other reasons. */ + free_sample_after_store (gv, payload, tk); + dds_sleepfor (DDS_MSECS (10)); + if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + goto retry; + } + } while (rdary[++i]); + free_sample_after_store (gv, payload, tk); } - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); } - else - { - /* When deleting, pwr is no longer accessible via the hash + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); + } + else + { + /* When deleting, pwr is no longer accessible via the hash tables, and consequently, a reader may be deleted without it being possible to remove it from rdary. The primary reason rdary exists is to avoid locking the proxy writer @@ -1988,39 +1995,58 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st we fall back to using the GUIDs so that we can deliver all samples we received from it. As writer being deleted any reliable samples that are rejected are simply discarded. */ - ddsrt_avl_iter_t it; - struct pwr_rd_match *m; - ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); - if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); - for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it)) + ddsrt_avl_iter_t it; + struct reader *rd; + ddsrt_mutex_unlock (&pwr->rdary.rdary_lock); + if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + if ((rd = proxy_writer_first_in_sync_reader (pwr, &it)) != NULL) + { + struct ddsi_serdata *payload; + struct ddsi_tkmap_instance *tk; + if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL) { - struct reader *rd; - if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL && m->in_sync == PRMSS_SYNC) - { + do { ETRACE (pwr, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid)); (void) rhc_store (rd->rhc, &pwr_info, payload, tk); + rd = proxy_writer_next_in_sync_reader (pwr, &it); + } while (rd != NULL); + free_sample_after_store (gv, payload, tk); + } + } + if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); + } + + ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1)); + } + else + { + struct reader *rd = ephash_lookup_reader_guid (gv->guid_hash, rdguid); + ETRACE (pwr, " %"PRId64"=>"PGUIDFMT"%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?"); + if (rd != NULL) + { + struct ddsi_serdata *payload; + struct ddsi_tkmap_instance *tk; + if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL) + { + /* FIXME: why look up rd,pwr again? Their states remains valid while the thread stays + "awake" (although a delete can be initiated), and blocking like this is a stopgap + anyway -- quite possibly to abort once either is deleted */ + while (!rhc_store (rd->rhc, &pwr_info, payload, tk)) + { + if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); + dds_sleepfor (DDS_MSECS (1)); + if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + if (ephash_lookup_reader_guid (gv->guid_hash, rdguid) == NULL || + ephash_lookup_proxy_writer_guid (gv->guid_hash, &pwr->e.guid) == NULL) + { + /* give up when reader or proxy writer no longer accessible */ + break; } } - if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - } - - ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1)); - } - else - { - struct reader *rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rdguid); - ETRACE (pwr, " %"PRId64"=>"PGUIDFMT"%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?"); - while (rd && ! rhc_store (rd->rhc, &pwr_info, payload, tk) && ephash_lookup_proxy_writer_guid (pwr->e.gv->guid_hash, &pwr->e.guid)) - { - if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock); - dds_sleepfor (DDS_MSECS (1)); - if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock); + free_sample_after_store (gv, payload, tk); } } - ddsi_tkmap_instance_unref (pwr->e.gv->m_tkmap, tk); } - ddsi_serdata_unref (payload); - no_payload: nn_plist_fini (&qos); return 0; } @@ -2308,8 +2334,8 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con if (gap_was_valuable) { - const char *tname = pwr->c.topic ? pwr->c.topic->name : "(null)"; - const char *ttname = pwr->c.topic ? pwr->c.topic->type_name : "(null)"; + const char *tname = (pwr->c.xqos->present & QP_TOPIC_NAME) ? pwr->c.xqos->topic_name : "(null)"; + const char *ttname = (pwr->c.xqos->present & QP_TYPE_NAME) ? pwr->c.xqos->type_name : "(null)"; DDS_CWARNING (&rst->gv->logconfig, "dropping oversize (%"PRIu32" > %"PRIu32") sample %"PRId64" from remote writer "PGUIDFMT" %s/%s\n", sampleinfo->size, rst->gv->config.max_sample_size, sampleinfo->seq, PGUIDPREFIX (rst->src_guid_prefix), msg->writerId.u,