Do not cache topic in proxy endpoint (#207)
The only place it was used for more than printing the name (which is available via the QoS object) was in deserializing the sample, but there is never a need to deserialize a sample if there is no local reader. So instead of caching the topic object in the proxy endpoint, take it from a reader to which it is to be delivered and avoid having to keep things in sync. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
		
							parent
							
								
									bf095a32d4
								
							
						
					
					
						commit
						68b85d0a2d
					
				
					 5 changed files with 147 additions and 131 deletions
				
			
		| 
						 | 
				
			
			@ -345,7 +345,6 @@ struct proxy_endpoint_common
 | 
			
		|||
  struct proxy_endpoint_common *next_ep; /* next \ endpoint belonging to this proxy participant */
 | 
			
		||||
  struct proxy_endpoint_common *prev_ep; /* prev / -- this is in arbitrary ordering */
 | 
			
		||||
  struct dds_qos *xqos; /* proxy endpoint QoS lives here; FIXME: local ones should have it moved to common as well */
 | 
			
		||||
  struct ddsi_sertopic * topic; /* topic may be NULL: for built-ins, but also for never-yet matched proxies (so we don't have to know the topic; when we match, we certainly do know) */
 | 
			
		||||
  struct addrset *as; /* address set to use for communicating with this endpoint */
 | 
			
		||||
  nn_guid_t group_guid; /* 0:0:0:0 if not available */
 | 
			
		||||
  nn_vendorid_t vendor; /* cached from proxypp->vendor */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1643,7 +1643,6 @@ int builtins_dqueue_handler (const struct nn_rsample_info *sampleinfo, const str
 | 
			
		|||
 | 
			
		||||
  /* Built-ins still do their own deserialization (SPDP <=> pwr ==
 | 
			
		||||
     NULL)). */
 | 
			
		||||
  assert (pwr == NULL || pwr->c.topic == NULL);
 | 
			
		||||
  if (statusinfo == 0)
 | 
			
		||||
  {
 | 
			
		||||
    if (datasz == 0 || !(data_smhdr_flags & DATA_FLAG_DATAFLAG))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -107,8 +107,7 @@ static int print_addrset_if_notempty (ddsi_tran_conn_t conn, const char *prefix,
 | 
			
		|||
    return print_addrset (conn, prefix, as, suffix);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e,
 | 
			
		||||
                                      const struct dds_qos *xqos, const struct ddsi_sertopic *topic)
 | 
			
		||||
static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct dds_qos *xqos)
 | 
			
		||||
{
 | 
			
		||||
  int x = 0;
 | 
			
		||||
  x += cpf (conn, "  %s %x:%x:%x:%x ", label, PGUID (e->guid));
 | 
			
		||||
| 
						 | 
				
			
			@ -118,24 +117,24 @@ static int print_any_endpoint_common (ddsi_tran_conn_t conn, const char *label,
 | 
			
		|||
    for (uint32_t i = 0; i < xqos->partition.n; i++)
 | 
			
		||||
      x += cpf (conn, "%s%s", i == 0 ? "" : ",", xqos->partition.strs[i]);
 | 
			
		||||
    if (xqos->partition.n > 1) cpf (conn, "}");
 | 
			
		||||
    x += cpf (conn, ".%s/%s",
 | 
			
		||||
              topic && topic->name ? topic->name : (xqos->present & QP_TOPIC_NAME) ? xqos->topic_name : "(null)",
 | 
			
		||||
              topic && topic->type_name ? topic->type_name : (xqos->present & QP_TYPE_NAME) ? xqos->type_name : "(null)");
 | 
			
		||||
    const char *topic_name = (xqos->present & QP_TOPIC_NAME) ? xqos->topic_name : "null";
 | 
			
		||||
    const char *topic_typename = (xqos->present & QP_TYPE_NAME) ? xqos->type_name : "null";
 | 
			
		||||
    x += cpf (conn, ".%s/%s", topic_name, topic_typename);
 | 
			
		||||
  }
 | 
			
		||||
  cpf (conn, "\n");
 | 
			
		||||
  return x;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int print_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct endpoint_common *c, const struct dds_qos *xqos, const struct ddsi_sertopic *topic)
 | 
			
		||||
static int print_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct endpoint_common *c, const struct dds_qos *xqos)
 | 
			
		||||
{
 | 
			
		||||
  DDSRT_UNUSED_ARG (c);
 | 
			
		||||
  return print_any_endpoint_common (conn, label, e, xqos, topic);
 | 
			
		||||
  return print_any_endpoint_common (conn, label, e, xqos);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int print_proxy_endpoint_common (ddsi_tran_conn_t conn, const char *label, const struct entity_common *e, const struct proxy_endpoint_common *c)
 | 
			
		||||
{
 | 
			
		||||
  int x = 0;
 | 
			
		||||
  x += print_any_endpoint_common (conn, label, e, c->xqos, c->topic);
 | 
			
		||||
  x += print_any_endpoint_common (conn, label, e, c->xqos);
 | 
			
		||||
  x += print_addrset_if_notempty (conn, "    as", c->as, "\n");
 | 
			
		||||
  return x;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -165,7 +164,7 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global
 | 
			
		|||
        if (r->c.pp != p)
 | 
			
		||||
          continue;
 | 
			
		||||
        ddsrt_mutex_lock (&r->e.lock);
 | 
			
		||||
        print_endpoint_common (conn, "rd", &r->e, &r->c, r->xqos, r->topic);
 | 
			
		||||
        print_endpoint_common (conn, "rd", &r->e, &r->c, r->xqos);
 | 
			
		||||
#ifdef DDSI_INCLUDE_NETWORK_PARTITIONS
 | 
			
		||||
        x += print_addrset_if_notempty (conn, "    as", r->as, "\n");
 | 
			
		||||
#endif
 | 
			
		||||
| 
						 | 
				
			
			@ -188,7 +187,7 @@ static int print_participants (struct thread_state1 * const ts1, struct q_global
 | 
			
		|||
        if (w->c.pp != p)
 | 
			
		||||
          continue;
 | 
			
		||||
        ddsrt_mutex_lock (&w->e.lock);
 | 
			
		||||
        print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos, w->topic);
 | 
			
		||||
        print_endpoint_common (conn, "wr", &w->e, &w->c, w->xqos);
 | 
			
		||||
        whc_get_state(w->whc, &whcst);
 | 
			
		||||
        x += cpf (conn, "    whc [%lld,%lld] unacked %"PRIuSIZE"%s [%u,%u] seq %lld seq_xmit %lld cs_seq %lld\n",
 | 
			
		||||
                  whcst.min_seq, whcst.max_seq, whcst.unacked_bytes,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1855,8 +1855,7 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
 | 
			
		|||
  if (ddsrt_avl_lookup_ipath (&pwr_readers_treedef, &pwr->readers, &rd->e.guid, &path))
 | 
			
		||||
    goto already_matched;
 | 
			
		||||
 | 
			
		||||
  if (pwr->c.topic == NULL && rd->topic)
 | 
			
		||||
    pwr->c.topic = ddsi_sertopic_ref (rd->topic);
 | 
			
		||||
  assert (rd->topic || is_builtin_endpoint (rd->e.guid.entityid, NN_VENDORID_ECLIPSE));
 | 
			
		||||
  if (pwr->ddsi2direct_cb == 0 && rd->ddsi2direct_cb != 0)
 | 
			
		||||
  {
 | 
			
		||||
    pwr->ddsi2direct_cb = rd->ddsi2direct_cb;
 | 
			
		||||
| 
						 | 
				
			
			@ -1981,7 +1980,6 @@ static void proxy_writer_add_connection (struct proxy_writer *pwr, struct reader
 | 
			
		|||
  return;
 | 
			
		||||
 | 
			
		||||
already_matched:
 | 
			
		||||
  assert (is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor) ? (pwr->c.topic == NULL) : (pwr->c.topic != NULL));
 | 
			
		||||
  ELOGDISC (pwr, "  proxy_writer_add_connection(pwr "PGUIDFMT" rd "PGUIDFMT") - already connected\n",
 | 
			
		||||
            PGUID (pwr->e.guid), PGUID (rd->e.guid));
 | 
			
		||||
  ddsrt_mutex_unlock (&pwr->e.lock);
 | 
			
		||||
| 
						 | 
				
			
			@ -1996,8 +1994,6 @@ static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer
 | 
			
		|||
 | 
			
		||||
  m->wr_guid = wr->e.guid;
 | 
			
		||||
  ddsrt_mutex_lock (&prd->e.lock);
 | 
			
		||||
  if (prd->c.topic == NULL)
 | 
			
		||||
    prd->c.topic = ddsi_sertopic_ref (wr->topic);
 | 
			
		||||
  if (ddsrt_avl_lookup_ipath (&prd_writers_treedef, &prd->writers, &wr->e.guid, &path))
 | 
			
		||||
  {
 | 
			
		||||
    ELOGDISC (prd, "  proxy_reader_add_connection(wr "PGUIDFMT" prd "PGUIDFMT") - already connected\n",
 | 
			
		||||
| 
						 | 
				
			
			@ -2007,6 +2003,7 @@ static void proxy_reader_add_connection (struct proxy_reader *prd, struct writer
 | 
			
		|||
  }
 | 
			
		||||
  else
 | 
			
		||||
  {
 | 
			
		||||
    assert (wr->topic || is_builtin_endpoint (wr->e.guid.entityid, NN_VENDORID_ECLIPSE));
 | 
			
		||||
    ELOGDISC (prd, "  proxy_reader_add_connection(wr "PGUIDFMT" prd "PGUIDFMT")\n",
 | 
			
		||||
              PGUID (wr->e.guid), PGUID (prd->e.guid));
 | 
			
		||||
    ddsrt_avl_insert_ipath (&prd_writers_treedef, &prd->writers, m, &path);
 | 
			
		||||
| 
						 | 
				
			
			@ -4009,7 +4006,6 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
 | 
			
		|||
  entity_common_init (e, proxypp->e.gv, guid, name, kind, tcreate, proxypp->vendor, false);
 | 
			
		||||
  c->xqos = nn_xqos_dup (&plist->qos);
 | 
			
		||||
  c->as = ref_addrset (as);
 | 
			
		||||
  c->topic = NULL; /* set from first matching reader/writer */
 | 
			
		||||
  c->vendor = proxypp->vendor;
 | 
			
		||||
  c->seq = seq;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -4024,12 +4020,9 @@ static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_en
 | 
			
		|||
static void proxy_endpoint_common_fini (struct entity_common *e, struct proxy_endpoint_common *c)
 | 
			
		||||
{
 | 
			
		||||
  unref_proxy_participant (c->proxypp, c);
 | 
			
		||||
 | 
			
		||||
  ddsi_sertopic_unref (c->topic);
 | 
			
		||||
  nn_xqos_fini (c->xqos);
 | 
			
		||||
  ddsrt_free (c->xqos);
 | 
			
		||||
  unref_addrset (c->as);
 | 
			
		||||
 | 
			
		||||
  entity_common_fini (e);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1739,16 +1739,10 @@ static struct ddsi_serdata *get_serdata (struct ddsi_sertopic const * const topi
 | 
			
		|||
  return sd;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static struct ddsi_serdata *extract_sample_from_data
 | 
			
		||||
(
 | 
			
		||||
  const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags,
 | 
			
		||||
  const nn_plist_t *qos, const struct nn_rdata *fragchain, unsigned statusinfo,
 | 
			
		||||
  nn_wctime_t tstamp, struct ddsi_sertopic const * const topic
 | 
			
		||||
)
 | 
			
		||||
static struct ddsi_serdata *new_sample_from_data (struct ddsi_tkmap_instance **tk1, struct q_globals *gv, const struct nn_rsample_info *sampleinfo, unsigned char data_smhdr_flags, const nn_plist_t *qos, const struct nn_rdata *fragchain, unsigned statusinfo, nn_wctime_t tstamp, struct ddsi_sertopic const * const topic)
 | 
			
		||||
{
 | 
			
		||||
  static const nn_guid_t null_guid = {{{0,0,0,0,0,0,0,0,0,0,0,0}},{0}};
 | 
			
		||||
  const char *failmsg = NULL;
 | 
			
		||||
  struct ddsi_serdata * sample = NULL;
 | 
			
		||||
  struct ddsi_serdata *sample = NULL;
 | 
			
		||||
 | 
			
		||||
  if (statusinfo == 0)
 | 
			
		||||
  {
 | 
			
		||||
| 
						 | 
				
			
			@ -1756,8 +1750,11 @@ static struct ddsi_serdata *extract_sample_from_data
 | 
			
		|||
    if (!(data_smhdr_flags & DATA_FLAG_DATAFLAG) || sampleinfo->size == 0)
 | 
			
		||||
    {
 | 
			
		||||
      const struct proxy_writer *pwr = sampleinfo->pwr;
 | 
			
		||||
      nn_guid_t guid = pwr ? pwr->e.guid : null_guid; /* can't be null _yet_, but that might change some day */
 | 
			
		||||
      DDS_CTRACE (&sampleinfo->rst->gv->logconfig,
 | 
			
		||||
      nn_guid_t guid;
 | 
			
		||||
      /* pwr can't currently be null, but that might change some day, and this being
 | 
			
		||||
         an error path, it doesn't hurt to survive that */
 | 
			
		||||
      if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid));
 | 
			
		||||
      DDS_CTRACE (&gv->logconfig,
 | 
			
		||||
                  "data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": write without proper payload (data_smhdr_flags 0x%x size %"PRIu32")\n",
 | 
			
		||||
                  sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1],
 | 
			
		||||
                  PGUID (guid), sampleinfo->seq,
 | 
			
		||||
| 
						 | 
				
			
			@ -1785,7 +1782,7 @@ static struct ddsi_serdata *extract_sample_from_data
 | 
			
		|||
  {
 | 
			
		||||
    /* RTI always tries to make us survive on the keyhash. RTI must
 | 
			
		||||
       mend its ways. */
 | 
			
		||||
    if (NN_STRICT_P (sampleinfo->rst->gv->config))
 | 
			
		||||
    if (NN_STRICT_P (gv->config))
 | 
			
		||||
      failmsg = "no content";
 | 
			
		||||
    else if (!(qos->present & PP_KEYHASH))
 | 
			
		||||
      failmsg = "qos present but without keyhash";
 | 
			
		||||
| 
						 | 
				
			
			@ -1805,17 +1802,32 @@ static struct ddsi_serdata *extract_sample_from_data
 | 
			
		|||
  {
 | 
			
		||||
    /* No message => error out */
 | 
			
		||||
    const struct proxy_writer *pwr = sampleinfo->pwr;
 | 
			
		||||
    nn_guid_t guid = pwr ? pwr->e.guid : null_guid; /* can't be null _yet_, but that might change some day */
 | 
			
		||||
    DDS_CWARNING (&sampleinfo->rst->gv->logconfig,
 | 
			
		||||
    nn_guid_t guid;
 | 
			
		||||
    if (pwr) guid = pwr->e.guid; else memset (&guid, 0, sizeof (guid));
 | 
			
		||||
    DDS_CWARNING (&gv->logconfig,
 | 
			
		||||
                  "data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": deserialization %s/%s failed (%s)\n",
 | 
			
		||||
                  sampleinfo->rst->vendor.id[0], sampleinfo->rst->vendor.id[1],
 | 
			
		||||
                  PGUID (guid), sampleinfo->seq,
 | 
			
		||||
                  topic->name, topic->type_name,
 | 
			
		||||
                  failmsg ? failmsg : "for reasons unknown");
 | 
			
		||||
  }
 | 
			
		||||
  else
 | 
			
		||||
  {
 | 
			
		||||
    if ((*tk1 = ddsi_tkmap_lookup_instance_ref (gv->m_tkmap, sample)) == NULL)
 | 
			
		||||
    {
 | 
			
		||||
      ddsi_serdata_unref (sample);
 | 
			
		||||
      sample = NULL;
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return sample;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static void free_sample_after_store (struct q_globals *gv, struct ddsi_serdata *sample, struct ddsi_tkmap_instance *tk)
 | 
			
		||||
{
 | 
			
		||||
  ddsi_tkmap_instance_unref (gv->m_tkmap, tk);
 | 
			
		||||
  ddsi_serdata_unref (sample);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr)
 | 
			
		||||
{
 | 
			
		||||
  switch ((SubmessageKind_t) smhdr->submessageId)
 | 
			
		||||
| 
						 | 
				
			
			@ -1837,17 +1849,36 @@ unsigned char normalize_data_datafrag_flags (const SubmessageHeader_t *smhdr)
 | 
			
		|||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static struct reader *proxy_writer_first_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it)
 | 
			
		||||
{
 | 
			
		||||
  struct pwr_rd_match *m;
 | 
			
		||||
  struct reader *rd;
 | 
			
		||||
  for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, it); m != NULL; m = ddsrt_avl_iter_next (it))
 | 
			
		||||
    if (m->in_sync == PRMSS_SYNC && (rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
 | 
			
		||||
      return rd;
 | 
			
		||||
  return NULL;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static struct reader *proxy_writer_next_in_sync_reader (struct proxy_writer *pwr, ddsrt_avl_iter_t *it)
 | 
			
		||||
{
 | 
			
		||||
  struct pwr_rd_match *m;
 | 
			
		||||
  struct reader *rd;
 | 
			
		||||
  for (m = ddsrt_avl_iter_next (it); m != NULL; m = ddsrt_avl_iter_next (it))
 | 
			
		||||
    if (m->in_sync == PRMSS_SYNC && (rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL)
 | 
			
		||||
      return rd;
 | 
			
		||||
  return NULL;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const struct nn_rdata *fragchain, const nn_guid_t *rdguid, int pwr_locked)
 | 
			
		||||
{
 | 
			
		||||
  struct receiver_state const * const rst = sampleinfo->rst;
 | 
			
		||||
  struct q_globals * const gv = rst->gv;
 | 
			
		||||
  struct proxy_writer * const pwr = sampleinfo->pwr;
 | 
			
		||||
  struct ddsi_sertopic const * const topic = pwr->c.topic;
 | 
			
		||||
  unsigned statusinfo;
 | 
			
		||||
  Data_DataFrag_common_t *msg;
 | 
			
		||||
  unsigned char data_smhdr_flags;
 | 
			
		||||
  nn_plist_t qos;
 | 
			
		||||
  int need_keyhash;
 | 
			
		||||
  struct ddsi_serdata * payload;
 | 
			
		||||
 | 
			
		||||
  if (pwr->ddsi2direct_cb)
 | 
			
		||||
  {
 | 
			
		||||
| 
						 | 
				
			
			@ -1855,22 +1886,11 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
 | 
			
		|||
    return 0;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* NOTE: pwr->e.lock need not be held for correct processing (though
 | 
			
		||||
     it may be useful to hold it for maintaining order all the way to
 | 
			
		||||
     v_groupWrite): guid is constant, set_vmsg_header() explains about
 | 
			
		||||
     the qos issue (and will have to deal with that); and
 | 
			
		||||
     pwr->groupset takes care of itself.  FIXME: groupset may be
 | 
			
		||||
     taking care of itself, but it is currently doing so in an
 | 
			
		||||
     annoyingly simplistic manner ...  */
 | 
			
		||||
 | 
			
		||||
  /* FIXME: fragments are now handled by copying the message to
 | 
			
		||||
     freshly malloced memory (see defragment()) ... that'll have to
 | 
			
		||||
     change eventually */
 | 
			
		||||
  assert (fragchain->min == 0);
 | 
			
		||||
  assert (!is_builtin_entityid (pwr->e.guid.entityid, pwr->c.vendor));
 | 
			
		||||
  /* Can only get here if at some point readers existed => topic can't
 | 
			
		||||
     still be NULL, even if there are no readers at the moment */
 | 
			
		||||
  assert (topic != NULL);
 | 
			
		||||
 | 
			
		||||
  /* Luckily, the Data header (up to inline QoS) is a prefix of the
 | 
			
		||||
     DataFrag header, so for the fixed-position things that we're
 | 
			
		||||
| 
						 | 
				
			
			@ -1906,13 +1926,13 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
 | 
			
		|||
    src.encoding = (msg->smhdr.flags & SMFLAG_ENDIANNESS) ? PL_CDR_LE : PL_CDR_BE;
 | 
			
		||||
    src.buf = NN_RMSG_PAYLOADOFF (fragchain->rmsg, qos_offset);
 | 
			
		||||
    src.bufsz = NN_RDATA_PAYLOAD_OFF (fragchain) - qos_offset;
 | 
			
		||||
    src.strict = NN_STRICT_P (rst->gv->config);
 | 
			
		||||
    src.factory = rst->gv->m_factory;
 | 
			
		||||
    src.logconfig = &rst->gv->logconfig;
 | 
			
		||||
    src.strict = NN_STRICT_P (gv->config);
 | 
			
		||||
    src.factory = gv->m_factory;
 | 
			
		||||
    src.logconfig = &gv->logconfig;
 | 
			
		||||
    if ((plist_ret = nn_plist_init_frommsg (&qos, NULL, PP_STATUSINFO | PP_KEYHASH | PP_COHERENT_SET, 0, &src)) < 0)
 | 
			
		||||
    {
 | 
			
		||||
      if (plist_ret != DDS_RETCODE_UNSUPPORTED)
 | 
			
		||||
        DDS_CWARNING (&sampleinfo->rst->gv->logconfig,
 | 
			
		||||
        DDS_CWARNING (&gv->logconfig,
 | 
			
		||||
                      "data(application, vendor %u.%u): "PGUIDFMT" #%"PRId64": invalid inline qos\n",
 | 
			
		||||
                      src.vendorid.id[0], src.vendorid.id[1], PGUID (pwr->e.guid), sampleinfo->seq);
 | 
			
		||||
      return 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -1920,67 +1940,54 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
 | 
			
		|||
    statusinfo = (qos.present & PP_STATUSINFO) ? qos.statusinfo : 0;
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  /* Note: deserializing done potentially many times for a historical
 | 
			
		||||
     data sample (once per reader that cares about that data).  For
 | 
			
		||||
     now, this is accepted as sufficiently abnormal behaviour to not
 | 
			
		||||
     worry about it. */
 | 
			
		||||
  {
 | 
			
		||||
    nn_wctime_t tstamp;
 | 
			
		||||
    if (sampleinfo->timestamp.v != NN_WCTIME_INVALID.v)
 | 
			
		||||
      tstamp = sampleinfo->timestamp;
 | 
			
		||||
    else
 | 
			
		||||
      tstamp.v = 0;
 | 
			
		||||
    payload = extract_sample_from_data (sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, topic);
 | 
			
		||||
  }
 | 
			
		||||
  if (payload == NULL)
 | 
			
		||||
  {
 | 
			
		||||
    goto no_payload;
 | 
			
		||||
  }
 | 
			
		||||
  /* FIXME: should it be 0, local wall clock time or INVALID? */
 | 
			
		||||
  const nn_wctime_t tstamp = (sampleinfo->timestamp.v != NN_WCTIME_INVALID.v) ? sampleinfo->timestamp : ((nn_wctime_t) {0});
 | 
			
		||||
  struct proxy_writer_info pwr_info;
 | 
			
		||||
  make_proxy_writer_info (&pwr_info, &pwr->e, pwr->c.xqos);
 | 
			
		||||
 | 
			
		||||
  /* Generate the DDS_SampleInfo (which is faked to some extent
 | 
			
		||||
     because we don't actually have a data reader) */
 | 
			
		||||
  struct ddsi_tkmap_instance *tk;
 | 
			
		||||
  if ((tk = ddsi_tkmap_lookup_instance_ref (pwr->e.gv->m_tkmap, payload)) != NULL)
 | 
			
		||||
  if (rdguid == NULL)
 | 
			
		||||
  {
 | 
			
		||||
    struct proxy_writer_info pwr_info;
 | 
			
		||||
    make_proxy_writer_info (&pwr_info, &pwr->e, pwr->c.xqos);
 | 
			
		||||
    ETRACE (pwr, " %"PRId64"=>EVERYONE\n", sampleinfo->seq);
 | 
			
		||||
 | 
			
		||||
    if (rdguid == NULL)
 | 
			
		||||
    /* FIXME: Retry loop, for re-delivery of rejected reliable samples. Is a
 | 
			
		||||
       temporary hack till throttling back of writer is implemented (with late
 | 
			
		||||
       acknowledgement of sample and nack). */
 | 
			
		||||
  retry:
 | 
			
		||||
    ddsrt_mutex_lock (&pwr->rdary.rdary_lock);
 | 
			
		||||
    if (pwr->rdary.fastpath_ok)
 | 
			
		||||
    {
 | 
			
		||||
      ETRACE (pwr, " %"PRId64"=>EVERYONE\n", sampleinfo->seq);
 | 
			
		||||
 | 
			
		||||
      /* FIXME: pwr->rdary is an array of pointers to attached
 | 
			
		||||
       readers.  There's only one thread delivering data for the
 | 
			
		||||
       proxy writer (as long as there is only one receive thread),
 | 
			
		||||
       so could get away with not locking at all, and doing safe
 | 
			
		||||
       updates + GC of rdary instead.  */
 | 
			
		||||
 | 
			
		||||
      /* Retry loop, for re-delivery of rejected reliable samples. Is a
 | 
			
		||||
       temporary hack till throttling back of writer is implemented
 | 
			
		||||
       (with late acknowledgement of sample and nack). */
 | 
			
		||||
    retry:
 | 
			
		||||
 | 
			
		||||
      ddsrt_mutex_lock (&pwr->rdary.rdary_lock);
 | 
			
		||||
      if (pwr->rdary.fastpath_ok)
 | 
			
		||||
      struct reader ** const rdary = pwr->rdary.rdary;
 | 
			
		||||
      if (rdary[0])
 | 
			
		||||
      {
 | 
			
		||||
        struct reader ** const rdary = pwr->rdary.rdary;
 | 
			
		||||
        for (uint32_t i = 0; rdary[i]; i++)
 | 
			
		||||
        struct ddsi_serdata *payload;
 | 
			
		||||
        struct ddsi_tkmap_instance *tk;
 | 
			
		||||
        if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rdary[0]->topic)) != NULL)
 | 
			
		||||
        {
 | 
			
		||||
          ETRACE (pwr, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
 | 
			
		||||
          if (!rhc_store (rdary[i]->rhc, &pwr_info, payload, tk))
 | 
			
		||||
          {
 | 
			
		||||
            if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
 | 
			
		||||
            ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
 | 
			
		||||
            dds_sleepfor (DDS_MSECS (10));
 | 
			
		||||
            if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
 | 
			
		||||
            goto retry;
 | 
			
		||||
          }
 | 
			
		||||
          uint32_t i = 0;
 | 
			
		||||
          do {
 | 
			
		||||
            ETRACE (pwr, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
 | 
			
		||||
            if (!rhc_store (rdary[i]->rhc, &pwr_info, payload, tk))
 | 
			
		||||
            {
 | 
			
		||||
              if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
 | 
			
		||||
              ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
 | 
			
		||||
              /* It is painful to drop the sample, but there is no guarantee that the readers
 | 
			
		||||
                 will still be there after unlocking; indeed, it is even possible that the
 | 
			
		||||
                 topic definition got replaced in the meantime.  Fortunately, this is in
 | 
			
		||||
                 the midst of a FIXME for many other reasons. */
 | 
			
		||||
              free_sample_after_store (gv, payload, tk);
 | 
			
		||||
              dds_sleepfor (DDS_MSECS (10));
 | 
			
		||||
              if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
 | 
			
		||||
              goto retry;
 | 
			
		||||
            }
 | 
			
		||||
          } while (rdary[++i]);
 | 
			
		||||
          free_sample_after_store (gv, payload, tk);
 | 
			
		||||
        }
 | 
			
		||||
        ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
 | 
			
		||||
      }
 | 
			
		||||
      else
 | 
			
		||||
      {
 | 
			
		||||
        /* When deleting, pwr is no longer accessible via the hash
 | 
			
		||||
      ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      /* When deleting, pwr is no longer accessible via the hash
 | 
			
		||||
         tables, and consequently, a reader may be deleted without
 | 
			
		||||
         it being possible to remove it from rdary. The primary
 | 
			
		||||
         reason rdary exists is to avoid locking the proxy writer
 | 
			
		||||
| 
						 | 
				
			
			@ -1988,39 +1995,58 @@ static int deliver_user_data (const struct nn_rsample_info *sampleinfo, const st
 | 
			
		|||
         we fall back to using the GUIDs so that we can deliver all
 | 
			
		||||
         samples we received from it. As writer being deleted any
 | 
			
		||||
         reliable samples that are rejected are simply discarded. */
 | 
			
		||||
        ddsrt_avl_iter_t it;
 | 
			
		||||
        struct pwr_rd_match *m;
 | 
			
		||||
        ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
 | 
			
		||||
        if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
 | 
			
		||||
        for (m = ddsrt_avl_iter_first (&pwr_readers_treedef, &pwr->readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
 | 
			
		||||
      ddsrt_avl_iter_t it;
 | 
			
		||||
      struct reader *rd;
 | 
			
		||||
      ddsrt_mutex_unlock (&pwr->rdary.rdary_lock);
 | 
			
		||||
      if (!pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
 | 
			
		||||
      if ((rd = proxy_writer_first_in_sync_reader (pwr, &it)) != NULL)
 | 
			
		||||
      {
 | 
			
		||||
        struct ddsi_serdata *payload;
 | 
			
		||||
        struct ddsi_tkmap_instance *tk;
 | 
			
		||||
        if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL)
 | 
			
		||||
        {
 | 
			
		||||
          struct reader *rd;
 | 
			
		||||
          if ((rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, &m->rd_guid)) != NULL && m->in_sync == PRMSS_SYNC)
 | 
			
		||||
          {
 | 
			
		||||
          do {
 | 
			
		||||
            ETRACE (pwr, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
 | 
			
		||||
            (void) rhc_store (rd->rhc, &pwr_info, payload, tk);
 | 
			
		||||
            rd = proxy_writer_next_in_sync_reader (pwr, &it);
 | 
			
		||||
          } while (rd != NULL);
 | 
			
		||||
          free_sample_after_store (gv, payload, tk);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1));
 | 
			
		||||
  }
 | 
			
		||||
  else
 | 
			
		||||
  {
 | 
			
		||||
    struct reader *rd = ephash_lookup_reader_guid (gv->guid_hash, rdguid);
 | 
			
		||||
    ETRACE (pwr, " %"PRId64"=>"PGUIDFMT"%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?");
 | 
			
		||||
    if (rd != NULL)
 | 
			
		||||
    {
 | 
			
		||||
      struct ddsi_serdata *payload;
 | 
			
		||||
      struct ddsi_tkmap_instance *tk;
 | 
			
		||||
      if ((payload = new_sample_from_data (&tk, gv, sampleinfo, data_smhdr_flags, &qos, fragchain, statusinfo, tstamp, rd->topic)) != NULL)
 | 
			
		||||
      {
 | 
			
		||||
        /* FIXME: why look up rd,pwr again? Their states remains valid while the thread stays
 | 
			
		||||
           "awake" (although a delete can be initiated), and blocking like this is a stopgap
 | 
			
		||||
           anyway -- quite possibly to abort once either is deleted */
 | 
			
		||||
        while (!rhc_store (rd->rhc, &pwr_info, payload, tk))
 | 
			
		||||
        {
 | 
			
		||||
          if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
 | 
			
		||||
          dds_sleepfor (DDS_MSECS (1));
 | 
			
		||||
          if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
 | 
			
		||||
          if (ephash_lookup_reader_guid (gv->guid_hash, rdguid) == NULL ||
 | 
			
		||||
              ephash_lookup_proxy_writer_guid (gv->guid_hash, &pwr->e.guid) == NULL)
 | 
			
		||||
          {
 | 
			
		||||
            /* give up when reader or proxy writer no longer accessible */
 | 
			
		||||
            break;
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        if (!pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      ddsrt_atomic_st32 (&pwr->next_deliv_seq_lowword, (uint32_t) (sampleinfo->seq + 1));
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
      struct reader *rd = ephash_lookup_reader_guid (pwr->e.gv->guid_hash, rdguid);
 | 
			
		||||
      ETRACE (pwr, " %"PRId64"=>"PGUIDFMT"%s\n", sampleinfo->seq, PGUID (*rdguid), rd ? "" : "?");
 | 
			
		||||
      while (rd && ! rhc_store (rd->rhc, &pwr_info, payload, tk) && ephash_lookup_proxy_writer_guid (pwr->e.gv->guid_hash, &pwr->e.guid))
 | 
			
		||||
      {
 | 
			
		||||
        if (pwr_locked) ddsrt_mutex_unlock (&pwr->e.lock);
 | 
			
		||||
        dds_sleepfor (DDS_MSECS (1));
 | 
			
		||||
        if (pwr_locked) ddsrt_mutex_lock (&pwr->e.lock);
 | 
			
		||||
        free_sample_after_store (gv, payload, tk);
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    ddsi_tkmap_instance_unref (pwr->e.gv->m_tkmap, tk);
 | 
			
		||||
  }
 | 
			
		||||
  ddsi_serdata_unref (payload);
 | 
			
		||||
 no_payload:
 | 
			
		||||
  nn_plist_fini (&qos);
 | 
			
		||||
  return 0;
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -2308,8 +2334,8 @@ static void drop_oversize (struct receiver_state *rst, struct nn_rmsg *rmsg, con
 | 
			
		|||
 | 
			
		||||
    if (gap_was_valuable)
 | 
			
		||||
    {
 | 
			
		||||
      const char *tname = pwr->c.topic ? pwr->c.topic->name : "(null)";
 | 
			
		||||
      const char *ttname = pwr->c.topic ? pwr->c.topic->type_name : "(null)";
 | 
			
		||||
      const char *tname = (pwr->c.xqos->present & QP_TOPIC_NAME) ? pwr->c.xqos->topic_name : "(null)";
 | 
			
		||||
      const char *ttname = (pwr->c.xqos->present & QP_TYPE_NAME) ? pwr->c.xqos->type_name : "(null)";
 | 
			
		||||
      DDS_CWARNING (&rst->gv->logconfig, "dropping oversize (%"PRIu32" > %"PRIu32") sample %"PRId64" from remote writer "PGUIDFMT" %s/%s\n",
 | 
			
		||||
                    sampleinfo->size, rst->gv->config.max_sample_size, sampleinfo->seq,
 | 
			
		||||
                    PGUIDPREFIX (rst->src_guid_prefix), msg->writerId.u,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue