Use NO_KEY GUIDs for topics without key fields
Submit to the tyranny of the majority: The DCPS specification makes no distinction between topics with and topics without key fields: in the latter case, there is simply a single instance but that instance obeys all the normal rules. In particular, this implies dispose/unregister work regardless of whether a topic has key fields. The DDSI specification makes a distinction between WITH_KEY endpoints and NO_KEY endpoints and does not support dispose/unregister operations on NO_KEY endpoints. This implies that a DCPS implementation must limit itself to the use of WITH_KEY endpoints. Most implementations nonetheless map topics without key fields to the NO_KEY type, and as the DDSI specification also states that a WITH_KEY reader/writer does not match a NO_KEY writer/reader, Cyclone's correctly mapping everything to WITH_KEY means there are interoperability problems for topics without key fields. This commit changes Cyclone to use NO_KEY like the others, but without changing any other part of its behaviour: it continues to support dispose/unregister operations regardless of whether a topic has key fields or not. That is the lesser of the two evils. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
8964c0b1bc
commit
e42092af92
8 changed files with 97 additions and 55 deletions
|
@ -29,16 +29,7 @@
|
|||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename, struct q_globals *gv)
|
||||
{
|
||||
struct ddsi_sertopic_builtintopic *tp = ddsrt_malloc (sizeof (*tp));
|
||||
tp->c.iid = ddsi_iid_gen();
|
||||
tp->c.name = dds_string_dup (name);
|
||||
tp->c.type_name = dds_string_dup (typename);
|
||||
const size_t name_typename_size = strlen (tp->c.name) + 1 + strlen (tp->c.type_name) + 1;
|
||||
tp->c.name_type_name = dds_alloc (name_typename_size);
|
||||
snprintf (tp->c.name_type_name, name_typename_size, "%s/%s", tp->c.name, tp->c.type_name);
|
||||
tp->c.ops = &ddsi_sertopic_ops_builtintopic;
|
||||
tp->c.serdata_ops = &ddsi_serdata_ops_builtintopic;
|
||||
tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops);
|
||||
ddsrt_atomic_st32 (&tp->c.refc, 1);
|
||||
ddsi_sertopic_init (&tp->c, name, typename, &ddsi_sertopic_ops_builtintopic, &ddsi_serdata_ops_builtintopic, false);
|
||||
tp->type = type;
|
||||
tp->gv = gv;
|
||||
return &tp->c;
|
||||
|
@ -46,9 +37,7 @@ struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic
|
|||
|
||||
static void sertopic_builtin_free (struct ddsi_sertopic *tp)
|
||||
{
|
||||
ddsrt_free (tp->name_type_name);
|
||||
ddsrt_free (tp->name);
|
||||
ddsrt_free (tp->type_name);
|
||||
ddsi_sertopic_fini (tp);
|
||||
ddsrt_free (tp);
|
||||
}
|
||||
|
||||
|
|
|
@ -454,12 +454,9 @@ err_invalid_qos:
|
|||
|
||||
dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descriptor_t *desc, const char *name, const dds_qos_t *qos, const dds_listener_t *listener)
|
||||
{
|
||||
char *key = NULL;
|
||||
struct ddsi_sertopic_default *st;
|
||||
const char *typename;
|
||||
nn_plist_t plist;
|
||||
dds_entity_t hdl;
|
||||
size_t keysz;
|
||||
struct dds_entity *ppent;
|
||||
dds_return_t ret;
|
||||
|
||||
|
@ -469,21 +466,9 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
|
|||
if ((ret = dds_entity_pin (participant, &ppent)) < 0)
|
||||
return ret;
|
||||
|
||||
typename = desc->m_typename;
|
||||
keysz = strlen (name) + strlen (typename) + 2;
|
||||
key = dds_alloc (keysz);
|
||||
(void) snprintf (key, keysz, "%s/%s", name, typename);
|
||||
|
||||
st = dds_alloc (sizeof (*st));
|
||||
|
||||
ddsrt_atomic_st32 (&st->c.refc, 1);
|
||||
st->c.iid = ddsi_iid_gen ();
|
||||
st->c.name_type_name = key;
|
||||
st->c.name = ddsrt_strdup (name);
|
||||
st->c.type_name = ddsrt_strdup (typename);
|
||||
st->c.ops = &ddsi_sertopic_ops_default;
|
||||
st->c.serdata_ops = desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey;
|
||||
st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops);
|
||||
ddsi_sertopic_init (&st->c, name, desc->m_typename, &ddsi_sertopic_ops_default, desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey, (desc->m_nkeys == 0));
|
||||
st->native_encoding_identifier = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE);
|
||||
st->serpool = ppent->m_domain->gv.serpool;
|
||||
st->type = (void*) desc;
|
||||
|
|
|
@ -22,16 +22,13 @@ extern "C" {
|
|||
|
||||
struct ddsi_serdata;
|
||||
struct ddsi_serdata_ops;
|
||||
|
||||
struct dds_topic;
|
||||
typedef void (*topic_cb_t) (struct dds_topic * topic);
|
||||
|
||||
struct ddsi_sertopic_ops;
|
||||
|
||||
struct ddsi_sertopic {
|
||||
const struct ddsi_sertopic_ops *ops;
|
||||
const struct ddsi_serdata_ops *serdata_ops;
|
||||
uint32_t serdata_basehash;
|
||||
bool topickind_no_key;
|
||||
char *name_type_name;
|
||||
char *name;
|
||||
char *type_name;
|
||||
|
@ -39,6 +36,18 @@ struct ddsi_sertopic {
|
|||
ddsrt_atomic_uint32_t refc; /* counts refs from entities, not from data */
|
||||
};
|
||||
|
||||
/* The old and the new happen to have the same memory layout on a 64-bit machine
|
||||
and so any user that memset's the ddsi_sertopic to 0 before filling out the
|
||||
required fields gets unchanged behaviour. 32-bit machines have a different
|
||||
layout and no such luck.
|
||||
|
||||
There are presumably very few users of this type outside Cyclone DDS itself,
|
||||
but the ROS2 RMW implementation does use it -- indeed, it prompted the change.
|
||||
This define makes it possible to have a single version of the source that is
|
||||
compatible with the old and the new definition, even if it is only partially
|
||||
binary compatible. */
|
||||
#define DDSI_SERTOPIC_HAS_TOPICKIND_NO_KEY 1
|
||||
|
||||
/* Called when the refcount dropped to zero */
|
||||
typedef void (*ddsi_sertopic_free_t) (struct ddsi_sertopic *tp);
|
||||
|
||||
|
@ -59,6 +68,9 @@ struct ddsi_sertopic_ops {
|
|||
ddsi_sertopic_free_samples_t free_samples;
|
||||
};
|
||||
|
||||
DDS_EXPORT void ddsi_sertopic_init (struct ddsi_sertopic *tp, const char *name, const char *type_name, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key);
|
||||
DDS_EXPORT void ddsi_sertopic_init_anon (struct ddsi_sertopic *tp, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key);
|
||||
DDS_EXPORT void ddsi_sertopic_fini (struct ddsi_sertopic *tp);
|
||||
DDS_EXPORT struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp);
|
||||
DDS_EXPORT void ddsi_sertopic_unref (struct ddsi_sertopic *tp);
|
||||
DDS_EXPORT uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops);
|
||||
|
|
|
@ -410,6 +410,7 @@ int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid);
|
|||
bool is_local_orphan_endpoint (const struct entity_common *e);
|
||||
int is_writer_entityid (nn_entityid_t id);
|
||||
int is_reader_entityid (nn_entityid_t id);
|
||||
int is_keyed_endpoint_entityid (nn_entityid_t id);
|
||||
nn_vendorid_t get_entity_vendorid (const struct entity_common *e);
|
||||
|
||||
/* Interface for glue code between the OpenSplice kernel and the DDSI
|
||||
|
|
|
@ -16,9 +16,11 @@
|
|||
|
||||
#include "dds/ddsrt/heap.h"
|
||||
#include "dds/ddsrt/md5.h"
|
||||
#include "dds/ddsrt/string.h"
|
||||
#include "dds/ddsi/q_bswap.h"
|
||||
#include "dds/ddsi/q_config.h"
|
||||
#include "dds/ddsi/q_freelist.h"
|
||||
#include "dds/ddsi/ddsi_iid.h"
|
||||
#include "dds/ddsi/ddsi_sertopic.h"
|
||||
#include "dds/ddsi/ddsi_serdata.h"
|
||||
|
||||
|
@ -41,6 +43,41 @@ void ddsi_sertopic_unref (struct ddsi_sertopic *sertopic)
|
|||
}
|
||||
}
|
||||
|
||||
void ddsi_sertopic_init (struct ddsi_sertopic *tp, const char *name, const char *type_name, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key)
|
||||
{
|
||||
ddsrt_atomic_st32 (&tp->refc, 1);
|
||||
tp->iid = ddsi_iid_gen ();
|
||||
tp->name = ddsrt_strdup (name);
|
||||
tp->type_name = ddsrt_strdup (type_name);
|
||||
size_t ntn_sz = strlen (tp->name) + 1 + strlen (tp->type_name) + 1;
|
||||
tp->name_type_name = ddsrt_malloc (ntn_sz);
|
||||
(void) snprintf (tp->name_type_name, ntn_sz, "%s/%s", tp->name, tp->type_name);
|
||||
tp->ops = sertopic_ops;
|
||||
tp->serdata_ops = serdata_ops;
|
||||
tp->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->serdata_ops);
|
||||
tp->topickind_no_key = topickind_no_key;
|
||||
}
|
||||
|
||||
void ddsi_sertopic_init_anon (struct ddsi_sertopic *tp, const struct ddsi_sertopic_ops *sertopic_ops, const struct ddsi_serdata_ops *serdata_ops, bool topickind_no_key)
|
||||
{
|
||||
ddsrt_atomic_st32 (&tp->refc, 1);
|
||||
tp->iid = ddsi_iid_gen ();
|
||||
tp->name = NULL;
|
||||
tp->type_name = NULL;
|
||||
tp->name_type_name = NULL;
|
||||
tp->ops = sertopic_ops;
|
||||
tp->serdata_ops = serdata_ops;
|
||||
tp->serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->serdata_ops);
|
||||
tp->topickind_no_key = topickind_no_key;
|
||||
}
|
||||
|
||||
void ddsi_sertopic_fini (struct ddsi_sertopic *tp)
|
||||
{
|
||||
ddsrt_free (tp->name);
|
||||
ddsrt_free (tp->type_name);
|
||||
ddsrt_free (tp->name_type_name);
|
||||
}
|
||||
|
||||
uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *ops)
|
||||
{
|
||||
ddsrt_md5_state_t md5st;
|
||||
|
|
|
@ -22,13 +22,9 @@
|
|||
#include "dds/ddsi/ddsi_sertopic.h"
|
||||
#include "dds/ddsi/ddsi_serdata_default.h"
|
||||
|
||||
/* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */
|
||||
|
||||
static void sertopic_default_free (struct ddsi_sertopic *tp)
|
||||
{
|
||||
ddsrt_free (tp->name_type_name);
|
||||
ddsrt_free (tp->name);
|
||||
ddsrt_free (tp->type_name);
|
||||
ddsi_sertopic_fini (tp);
|
||||
ddsrt_free (tp);
|
||||
}
|
||||
|
||||
|
|
|
@ -145,6 +145,21 @@ int is_reader_entityid (nn_entityid_t id)
|
|||
}
|
||||
}
|
||||
|
||||
int is_keyed_endpoint_entityid (nn_entityid_t id)
|
||||
{
|
||||
switch (id.u & NN_ENTITYID_KIND_MASK)
|
||||
{
|
||||
case NN_ENTITYID_KIND_READER_WITH_KEY:
|
||||
case NN_ENTITYID_KIND_WRITER_WITH_KEY:
|
||||
return 1;
|
||||
case NN_ENTITYID_KIND_READER_NO_KEY:
|
||||
case NN_ENTITYID_KIND_WRITER_NO_KEY:
|
||||
return 0;
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid)
|
||||
{
|
||||
if ((id.u & NN_ENTITYID_SOURCE_MASK) == NN_ENTITYID_SOURCE_BUILTIN)
|
||||
|
@ -402,7 +417,7 @@ static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, cons
|
|||
return true;
|
||||
}
|
||||
|
||||
static dds_return_t pp_allocate_entityid(nn_entityid_t *id, unsigned kind, struct participant *pp)
|
||||
static dds_return_t pp_allocate_entityid(nn_entityid_t *id, uint32_t kind, struct participant *pp)
|
||||
{
|
||||
uint32_t id1;
|
||||
int ret = 0;
|
||||
|
@ -2092,14 +2107,20 @@ static void reader_qos_mismatch (struct reader * rd, dds_qos_policy_id_t reason)
|
|||
}
|
||||
}
|
||||
|
||||
static bool qos_match_p_lock (struct entity_common *ea, const dds_qos_t *a, struct entity_common *eb, const dds_qos_t *b, dds_qos_policy_id_t *reason)
|
||||
static bool topickind_qos_match_p_lock (struct entity_common *rd, const dds_qos_t *rdqos, struct entity_common *wr, const dds_qos_t *wrqos, dds_qos_policy_id_t *reason)
|
||||
{
|
||||
assert (ea != eb);
|
||||
ddsrt_mutex_t * const locks[] = { &ea->qos_lock, &eb->qos_lock, &ea->qos_lock };
|
||||
const int shift = (uintptr_t) ea > (uintptr_t) eb;
|
||||
assert (is_reader_entityid (rd->guid.entityid));
|
||||
assert (is_writer_entityid (wr->guid.entityid));
|
||||
if (is_keyed_endpoint_entityid (rd->guid.entityid) != is_keyed_endpoint_entityid (wr->guid.entityid))
|
||||
{
|
||||
*reason = DDS_INVALID_QOS_POLICY_ID;
|
||||
return false;
|
||||
}
|
||||
ddsrt_mutex_t * const locks[] = { &rd->qos_lock, &wr->qos_lock, &rd->qos_lock };
|
||||
const int shift = (uintptr_t) rd > (uintptr_t) wr;
|
||||
for (int i = 0; i < 2; i++)
|
||||
ddsrt_mutex_lock (locks[i + shift]);
|
||||
bool ret = qos_match_p (a, b, reason);
|
||||
bool ret = qos_match_p (rdqos, wrqos, reason);
|
||||
for (int i = 0; i < 2; i++)
|
||||
ddsrt_mutex_unlock (locks[i + shift]);
|
||||
return ret;
|
||||
|
@ -2115,7 +2136,7 @@ static void connect_writer_with_proxy_reader (struct writer *wr, struct proxy_re
|
|||
return;
|
||||
if (wr->e.onlylocal)
|
||||
return;
|
||||
if (!isb0 && !qos_match_p_lock (&prd->e, prd->c.xqos, &wr->e, wr->xqos, &reason))
|
||||
if (!isb0 && !topickind_qos_match_p_lock (&prd->e, prd->c.xqos, &wr->e, wr->xqos, &reason))
|
||||
{
|
||||
writer_qos_mismatch (wr, reason);
|
||||
return;
|
||||
|
@ -2134,7 +2155,7 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r
|
|||
return;
|
||||
if (rd->e.onlylocal)
|
||||
return;
|
||||
if (!isb0 && !qos_match_p_lock (&rd->e, rd->xqos, &pwr->e, pwr->c.xqos, &reason))
|
||||
if (!isb0 && !topickind_qos_match_p_lock (&rd->e, rd->xqos, &pwr->e, pwr->c.xqos, &reason))
|
||||
{
|
||||
reader_qos_mismatch (rd, reason);
|
||||
return;
|
||||
|
@ -2176,7 +2197,7 @@ static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn
|
|||
return;
|
||||
if (ignore_local_p (&wr->e.guid, &rd->e.guid, wr->xqos, rd->xqos))
|
||||
return;
|
||||
if (!qos_match_p_lock (&rd->e, rd->xqos, &wr->e, wr->xqos, &reason))
|
||||
if (!topickind_qos_match_p_lock (&rd->e, rd->xqos, &wr->e, wr->xqos, &reason))
|
||||
{
|
||||
writer_qos_mismatch (wr, reason);
|
||||
reader_qos_mismatch (rd, reason);
|
||||
|
@ -2912,17 +2933,20 @@ dds_return_t new_writer (struct writer **wr_out, struct q_globals *gv, struct nn
|
|||
{
|
||||
struct participant *pp;
|
||||
dds_return_t rc;
|
||||
uint32_t kind;
|
||||
|
||||
if ((pp = ephash_lookup_participant_guid (gv->guid_hash, ppguid)) == NULL)
|
||||
{
|
||||
GVLOGDISC ("new_writer - participant "PGUIDFMT" not found\n", PGUID (*ppguid));
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
}
|
||||
|
||||
/* participant can't be freed while we're mucking around cos we are
|
||||
awake and do not touch the thread's vtime (ephash_lookup already
|
||||
verifies we're awake) */
|
||||
wrguid->prefix = pp->e.guid.prefix;
|
||||
if ((rc = pp_allocate_entityid (&wrguid->entityid, NN_ENTITYID_KIND_WRITER_WITH_KEY, pp)) < 0)
|
||||
kind = topic->topickind_no_key ? NN_ENTITYID_KIND_WRITER_NO_KEY : NN_ENTITYID_KIND_WRITER_WITH_KEY;
|
||||
if ((rc = pp_allocate_entityid (&wrguid->entityid, kind, pp)) < 0)
|
||||
return rc;
|
||||
return new_writer_guid (wr_out, wrguid, group_guid, pp, topic, xqos, whc, status_cb, status_cb_arg);
|
||||
}
|
||||
|
@ -3391,6 +3415,7 @@ dds_return_t new_reader
|
|||
{
|
||||
struct participant * pp;
|
||||
dds_return_t rc;
|
||||
uint32_t kind;
|
||||
|
||||
if ((pp = ephash_lookup_participant_guid (gv->guid_hash, ppguid)) == NULL)
|
||||
{
|
||||
|
@ -3398,7 +3423,8 @@ dds_return_t new_reader
|
|||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
}
|
||||
rdguid->prefix = pp->e.guid.prefix;
|
||||
if ((rc = pp_allocate_entityid (&rdguid->entityid, NN_ENTITYID_KIND_READER_WITH_KEY, pp)) < 0)
|
||||
kind = topic->topickind_no_key ? NN_ENTITYID_KIND_READER_NO_KEY : NN_ENTITYID_KIND_READER_WITH_KEY;
|
||||
if ((rc = pp_allocate_entityid (&rdguid->entityid, kind, pp)) < 0)
|
||||
return rc;
|
||||
return new_reader_guid (rd_out, rdguid, group_guid, pp, topic, xqos, rhc, status_cb, status_cbarg);
|
||||
}
|
||||
|
|
|
@ -744,15 +744,11 @@ static struct ddsi_sertopic *make_special_topic (struct serdatapool *serpool, ui
|
|||
(kinda natural if they stop being "default" ones) */
|
||||
struct ddsi_sertopic_default *st = ddsrt_malloc (sizeof (*st));
|
||||
memset (st, 0, sizeof (*st));
|
||||
ddsrt_atomic_st32 (&st->c.refc, 1);
|
||||
st->c.ops = &ddsi_sertopic_ops_default;
|
||||
st->c.serdata_ops = ops;
|
||||
st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops);
|
||||
st->c.iid = ddsi_iid_gen ();
|
||||
ddsi_sertopic_init_anon (&st->c, &ddsi_sertopic_ops_default, ops, false);
|
||||
st->native_encoding_identifier = enc_id;
|
||||
st->serpool = serpool;
|
||||
st->nkeys = 1;
|
||||
return (struct ddsi_sertopic *)st;
|
||||
return (struct ddsi_sertopic *) st;
|
||||
}
|
||||
|
||||
static void make_special_topics (struct q_globals *gv)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue