replace old builtin topics by new ones and add implementation of DCPSSubscription and DCPSPublication

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2018-12-03 17:40:39 +01:00
parent 78d49b52a0
commit 1a0fcea0c2
23 changed files with 1126 additions and 2828 deletions

View file

@ -31,7 +31,6 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
q_bitset_inlines.c
q_bswap.c
q_bswap_inlines.c
q_builtin_topic.c
q_config.c
q_ddsi_discovery.c
q_debmon.c
@ -88,7 +87,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi"
q_bitset_template.h
q_bswap.h
q_bswap_template.h
q_builtin_topic.h
q_config.h
q_ddsi_discovery.h
q_debmon.h

View file

@ -1,75 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef Q_BUILTIN_TOPIC_H
#define Q_BUILTIN_TOPIC_H
#include "ddsi/q_time.h"
#include "dds_builtinTopics.h"
struct entity_common;
struct nn_plist;
/* Functions called at proxy entity creation/deletion time, so they
can do whatever is necessary to get the builtin topics function
correctly.
These probably should return an error code, but I don't quite know
how to handle it yet and this way we have Coverity on our side.
Implementation is outside the common core.
These may assume the proxy entities are stable, without parallel QoS
changes. */
void
propagate_builtin_topic_participant(
_In_ const struct entity_common *proxypp,
_In_ const nn_plist_t *plist,
_In_ nn_wctime_t timestamp,
_In_ int alive);
void
propagate_builtin_topic_cmparticipant(
_In_ const struct entity_common *proxypp,
_In_ const nn_plist_t *plist,
_In_ nn_wctime_t timestamp,
_In_ int alive);
#if 0
void dispose_builtin_topic_proxy_participant (const struct proxy_participant *proxypp, nn_wctime_t timestamp, int isimplicit);
void write_builtin_topic_proxy_writer (const struct proxy_writer *pwr, nn_wctime_t timestamp);
void dispose_builtin_topic_proxy_writer (const struct proxy_writer *pwr, nn_wctime_t timestamp, int isimplicit);
void write_builtin_topic_proxy_reader (const struct proxy_reader *prd, nn_wctime_t timestamp);
void dispose_builtin_topic_proxy_reader (const struct proxy_reader *prd, nn_wctime_t timestamp, int isimplicit);
void write_builtin_topic_proxy_group (const struct proxy_group *pgroup, nn_wctime_t timestamp);
void dispose_builtin_topic_proxy_group (const struct proxy_group *pgroup, nn_wctime_t timestamp, int isimplicit);
void write_builtin_topic_proxy_topic (const struct nn_plist *datap, nn_wctime_t timestamp);
#endif
/*
* Let the layer on top of DDSI handle the received builtin data when it wants to.
*/
extern void
forward_builtin_participant(
_In_ DDS_ParticipantBuiltinTopicData *data,
_In_ nn_wctime_t timestamp,
_In_ int alive);
extern void
forward_builtin_cmparticipant(
_In_ DDS_CMParticipantBuiltinTopicData *data,
_In_ nn_wctime_t timestamp,
_In_ int alive);
#endif

View file

@ -22,6 +22,7 @@
#include "ddsi/q_xqos.h"
#include "ddsi/ddsi_tran.h"
#include "ddsi/q_feature_check.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsi/ddsi_rhc_plugin.h"
#if defined (__cplusplus)
@ -411,6 +412,7 @@ struct ddsi_plugin
{
int (*init_fn) (void);
void (*fini_fn) (void);
void (*builtin_write) (enum ddsi_sertopic_builtin_type type, const nn_guid_t *guid, nn_wctime_t timestamp, bool alive);
/* Read cache */
struct ddsi_rhc_plugin rhc_plugin;

View file

@ -281,6 +281,11 @@ struct q_globals {
struct ddsi_sertopic *plist_topic; /* used for all discovery data */
struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */
/* Sertopics for built-in topics -- FIXME: these really have little to do with topics, but everything with topic types, in other words, they are the type supports in DDS ... so a bit of refactoring is required */
struct ddsi_sertopic *builtin_participant_topic;
struct ddsi_sertopic *builtin_reader_topic;
struct ddsi_sertopic *builtin_writer_topic;
/* Network ID needed by v_groupWrite -- FIXME: might as well pass it
to the receive thread instead of making it global (and that would
remove the need to include kernelModule.h) */

View file

@ -1,183 +0,0 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <string.h>
#include "ddsi/q_misc.h"
#include "ddsi/q_config.h"
#include "ddsi/q_entity.h"
#include "ddsi/q_builtin_topic.h"
#include "dds_builtinTopics.h"
static void generate_user_data (_Out_ DDS_UserDataQosPolicy *a, _In_ const nn_xqos_t *xqos)
{
if (!(xqos->present & QP_USER_DATA) || (xqos->user_data.length == 0))
{
a->value._maximum = 0;
a->value._length = 0;
a->value._buffer = NULL;
a->value._release = false;
} else {
a->value._maximum = xqos->user_data.length;
a->value._length = xqos->user_data.length;
a->value._buffer = xqos->user_data.value;
a->value._release = false;
}
}
static void
generate_product_data(
_Out_ DDS_ProductDataQosPolicy *a,
_In_ const struct entity_common *participant,
_In_ const nn_plist_t *plist)
{
/* replicate format generated in v_builtinCreateCMParticipantInfo() */
static const char product_tag[] = "Product";
static const char exec_name_tag[] = "ExecName";
static const char participant_name_tag[] = "ParticipantName";
static const char process_id_tag[] = "PID";
static const char node_name_tag[] = "NodeName";
static const char federation_id_tag[] = "FederationId";
static const char vendor_id_tag[] = "VendorId";
static const char service_type_tag[] = "ServiceType";
const size_t cdata_overhead = 12; /* <![CDATA[ and ]]> */
const size_t tag_overhead = 5; /* <> and </> */
char pidstr[11]; /* unsigned 32-bits, so max < 5e9, or 10 chars + terminator */
char federationidstr[20]; /* max 2 * unsigned 32-bits hex + separator, terminator */
char vendoridstr[22]; /* max 2 * unsigned 32-bits + seperator, terminator */
char servicetypestr[11]; /* unsigned 32-bits */
unsigned servicetype;
size_t len = 1 + 2*(sizeof(product_tag)-1) + tag_overhead;
if (plist->present & PP_PRISMTECH_EXEC_NAME)
len += 2*(sizeof(exec_name_tag)-1) + cdata_overhead + tag_overhead + strlen(plist->exec_name);
if (plist->present & PP_ENTITY_NAME)
len += 2*(sizeof(participant_name_tag)-1) + cdata_overhead + tag_overhead + strlen(plist->entity_name);
if (plist->present & PP_PRISMTECH_PROCESS_ID)
{
int n = snprintf (pidstr, sizeof (pidstr), "%u", plist->process_id);
assert (n > 0 && (size_t) n < sizeof (pidstr));
len += 2*(sizeof(process_id_tag)-1) + tag_overhead + (size_t) n;
}
if (plist->present & PP_PRISMTECH_NODE_NAME)
len += 2*(sizeof(node_name_tag)-1) + cdata_overhead + tag_overhead + strlen(plist->node_name);
{
int n = snprintf (vendoridstr, sizeof (vendoridstr), "%u.%u", plist->vendorid.id[0], plist->vendorid.id[1]);
assert (n > 0 && (size_t) n < sizeof (vendoridstr));
len += 2*(sizeof(vendor_id_tag)-1) + tag_overhead + (size_t) n;
}
{
int n;
if (vendor_is_opensplice (plist->vendorid))
n = snprintf (federationidstr, sizeof (federationidstr), "%x", participant->guid.prefix.u[0]);
else
n = snprintf (federationidstr, sizeof (federationidstr), "%x:%x", participant->guid.prefix.u[0], participant->guid.prefix.u[1]);
assert (n > 0 && (size_t) n < sizeof (federationidstr));
len += 2*(sizeof(federation_id_tag)-1) + tag_overhead + (size_t) n;
}
if (plist->present & PP_PRISMTECH_SERVICE_TYPE)
servicetype = plist->service_type;
else
servicetype = 0;
{
int n = snprintf (servicetypestr, sizeof (servicetypestr), "%u", (unsigned) servicetype);
assert (n > 0 && (size_t) n < sizeof (servicetypestr));
len += 2*(sizeof(service_type_tag)-1) + tag_overhead + (size_t) n;
}
a->value = os_malloc(len);
{
char *p = a->value;
int n;
n = snprintf (p, len, "<%s>", product_tag); assert (n >= 0 && (size_t) n < len); p += n; len -= (size_t) n;
if (plist->present & PP_PRISMTECH_EXEC_NAME)
{
n = snprintf (p, len, "<%s><![CDATA[%s]]></%s>", exec_name_tag, plist->exec_name, exec_name_tag);
assert (n >= 0 && (size_t) n < len);
p += n; len -= (size_t) n;
}
if (plist->present & PP_ENTITY_NAME)
{
n = snprintf (p, len, "<%s><![CDATA[%s]]></%s>", participant_name_tag, plist->entity_name, participant_name_tag);
assert (n >= 0 && (size_t) n < len);
p += n; len -= (size_t) n;
}
if (plist->present & PP_PRISMTECH_PROCESS_ID)
{
n = snprintf (p, len, "<%s>%s</%s>", process_id_tag, pidstr, process_id_tag);
assert (n >= 0 && (size_t) n < len);
p += n; len -= (size_t) n;
}
if (plist->present & PP_PRISMTECH_NODE_NAME)
{
n = snprintf (p, len, "<%s><![CDATA[%s]]></%s>", node_name_tag, plist->node_name, node_name_tag);
assert (n >= 0 && (size_t) n < len);
p += n; len -= (size_t) n;
}
n = snprintf (p, len, "<%s>%s</%s>", federation_id_tag, federationidstr, federation_id_tag);
assert (n >= 0 && (size_t) n < len);
p += n; len -= (size_t) n;
n = snprintf (p, len, "<%s>%s</%s>", vendor_id_tag, vendoridstr, vendor_id_tag);
assert (n >= 0 && (size_t) n < len);
p += n; len -= (size_t) n;
{
n = snprintf (p, len, "<%s>%s</%s>", service_type_tag, servicetypestr, service_type_tag);
assert (n >= 0 && (size_t) n < len);
p += n; len -= (size_t) n;
}
n = snprintf (p, len, "</%s>", product_tag);
assert (n >= 0 && (size_t) n == len-1);
(void) n;
}
}
static void generate_key (_Out_ DDS_BuiltinTopicKey_t *a, _In_ const nn_guid_prefix_t *gid)
{
(*a)[0] = gid->u[0];
(*a)[1] = gid->u[1];
(*a)[2] = gid->u[2];
}
void
propagate_builtin_topic_participant(
_In_ const struct entity_common *participant,
_In_ const nn_plist_t *plist,
_In_ nn_wctime_t timestamp,
_In_ int alive)
{
DDS_ParticipantBuiltinTopicData data;
generate_key(&(data.key), &(participant->guid.prefix));
generate_user_data(&(data.user_data), &(plist->qos));
forward_builtin_participant(&data, timestamp, alive);
}
void
propagate_builtin_topic_cmparticipant(
_In_ const struct entity_common *participant,
_In_ const nn_plist_t *plist,
_In_ nn_wctime_t timestamp,
_In_ int alive)
{
DDS_CMParticipantBuiltinTopicData data;
generate_key(&(data.key), &(participant->guid.prefix));
generate_product_data(&(data.product), participant, plist);
forward_builtin_cmparticipant(&data, timestamp, alive);
os_free(data.product.value);
}

View file

@ -37,7 +37,6 @@
#include "ddsi/q_transmit.h"
#include "ddsi/q_lease.h"
#include "ddsi/q_error.h"
#include "ddsi/q_builtin_topic.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/q_md5.h"
#include "ddsi/q_feature_check.h"
@ -200,8 +199,6 @@ int spdp_write (struct participant *pp)
return 0;
}
propagate_builtin_topic_participant(&(pp->e), pp->plist, now(), true);
DDS_TRACE("spdp_write(%x:%x:%x:%x)\n", PGUID (pp->e.guid));
if ((wr = get_builtin_writer (pp, NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)) == NULL)
@ -1425,8 +1422,6 @@ int sedp_write_cm_participant (struct participant *pp, int alive)
return 0;
}
propagate_builtin_topic_cmparticipant(&(pp->e), pp->plist, now(), alive);
sedp_wr = get_sedp_writer (pp, NN_ENTITYID_SEDP_BUILTIN_CM_PARTICIPANT_WRITER);
/* The message is only a temporary thing, used only for encoding

View file

@ -35,7 +35,6 @@
#include "ddsi/q_protocol.h" /* NN_ENTITYID_... */
#include "ddsi/q_unused.h"
#include "ddsi/q_error.h"
#include "ddsi/q_builtin_topic.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/q_receive.h"
@ -155,14 +154,30 @@ static int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid)
return is_builtin_entityid (id, vendorid) && id.u != NN_ENTITYID_PARTICIPANT;
}
static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, bool onlylocal)
static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_vendorid_t vendorid, bool onlylocal, struct ddsi_tkmap_instance **tk)
{
e->guid = *guid;
e->kind = kind;
e->name = os_strdup (name ? name : "");
e->iid = ddsi_iid_gen ();
os_mutexInit (&e->lock);
e->onlylocal = onlylocal;
os_mutexInit (&e->lock);
if (onlylocal || is_builtin_entityid (guid->entityid, vendorid))
{
e->iid = ddsi_iid_gen ();
*tk = NULL;
}
else
{
struct ddsi_serdata *sd;
struct nn_keyhash kh;
memcpy (&kh, guid, sizeof (kh));
/* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
sd = ddsi_serdata_from_keyhash (gv.builtin_participant_topic, &kh);
/* FIXME: this makes the iid for a reincarnation of a proxy entity dependent on whether an application reader kept the corresponding built-in topic instance around, it may be attractive to reconsider and guarantee a new iid in these cases, at least for the publication handle */
*tk = ddsi_tkmap_find(sd, false, true);
ddsi_serdata_unref (sd);
e->iid = (*tk)->m_iid;
}
}
static void entity_common_fini (struct entity_common *e)
@ -223,6 +238,36 @@ void local_reader_ary_setinvalid (struct local_reader_ary *x)
os_mutexUnlock (&x->rdary_lock);
}
static void write_builtin_topic_any (const struct entity_common *e, nn_wctime_t timestamp, bool alive, nn_vendorid_t vendorid, struct ddsi_tkmap_instance *tk)
{
enum ddsi_sertopic_builtin_type type;
switch (e->kind)
{
case EK_PARTICIPANT:
case EK_PROXY_PARTICIPANT:
type = DSBT_PARTICIPANT;
break;
case EK_READER:
case EK_PROXY_READER:
type = DSBT_READER;
break;
case EK_WRITER:
case EK_PROXY_WRITER:
type = DSBT_WRITER;
break;
}
if (!(e->onlylocal || is_builtin_endpoint(e->guid.entityid, vendorid)))
ddsi_plugin.builtin_write (type, &e->guid, timestamp, alive);
/* tkmap instance only needs to be kept around until the first write of a built-in topic (if none ever happens, it needn't be kept at all): afterward, the WHC of the local built-in topic writer will keep the entry alive. FIXME: the SPDP/SEPD ones currently use default sertopics instead of builtin sertopics, and so use different mappings and different instnace ids. No-one ever sees those ids, so it doesn't matter, but it would nicer if it could actually be the same one. FIXME: it would also be nicer if the local built-in topics and the SPDP/SEDP writers were the same, but I want the locally created endpoints visible in the built-in topics as well, and those don't exist in the discovery writers ... */
if (tk)
ddsi_tkmap_instance_unref (tk);
}
static void write_builtin_topic_local (const struct entity_common *e, nn_wctime_t timestamp, bool alive, struct ddsi_tkmap_instance *tk)
{
write_builtin_topic_any(e, timestamp, alive, ownvendorid, tk);
}
/* DELETED PARTICIPANTS --------------------------------------------- */
int deleted_participants_admin_init (void)
@ -359,6 +404,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
{
struct participant *pp;
nn_guid_t subguid, group_guid;
struct ddsi_tkmap_instance *tk;
/* no reserved bits may be set */
assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0);
@ -401,7 +447,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
pp = os_malloc (sizeof (*pp));
entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, ((flags & RTPS_PF_ONLY_LOCAL) != 0));
entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0), &tk);
pp->user_refc = 1;
pp->builtin_refc = 0;
pp->builtins_deleted = 0;
@ -580,6 +626,8 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
trigger_recv_threads ();
}
write_builtin_topic_local(&pp->e, now(), true, tk);
/* SPDP periodic broadcast uses the retransmit path, so the initial
publication must be done differently. Must be later than making
the participant globally visible, or the SPDP processing won't
@ -606,7 +654,6 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
tsched.v = (pp->lease_duration == T_NEVER) ? T_NEVER : 0;
pp->pmd_update_xevent = qxev_pmd_update (tsched, &pp->e.guid);
}
return 0;
}
@ -815,11 +862,7 @@ int delete_participant (const struct nn_guid *ppguid)
struct participant *pp;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
return ERR_UNKNOWN_ENTITY;
if (!(pp->e.onlylocal))
{
propagate_builtin_topic_cmparticipant(&(pp->e), pp->plist, now(), false);
propagate_builtin_topic_participant(&(pp->e), pp->plist, now(), false);
}
write_builtin_topic_local(&pp->e, now(), false, NULL);
remember_deleted_participant_guid (&pp->e.guid);
ephash_remove_participant_guid (pp);
gcreq_participant (pp);
@ -2333,10 +2376,11 @@ static void endpoint_common_init
enum entity_kind kind,
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp
struct participant *pp,
struct ddsi_tkmap_instance **tk
)
{
entity_common_init (e, guid, NULL, kind, pp->e.onlylocal);
entity_common_init (e, guid, NULL, kind, ownvendorid, pp->e.onlylocal, tk);
c->pp = ref_participant (pp, &e->guid);
if (group_guid)
{
@ -2563,6 +2607,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
{
struct writer *wr;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_writer_guid (guid) == NULL);
@ -2575,7 +2620,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
delete_participant won't interfere with our ability to address
the participant */
endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp);
endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, &tk);
os_condInit (&wr->throttle_cond, &wr->e.lock);
wr->seq = 0;
@ -2784,6 +2829,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
deleted while we do so */
match_writer_with_proxy_readers (wr, tnow);
match_writer_with_local_readers (wr, tnow);
write_builtin_topic_local(&wr->e, now(), true, tk);
sedp_write_writer (wr);
if (wr->lease_duration != T_NEVER)
@ -2909,6 +2955,7 @@ int delete_writer_nolinger_locked (struct writer *wr)
{
DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid %x:%x:%x:%x) ...\n", PGUID (wr->e.guid));
ASSERT_MUTEX_HELD (&wr->e.lock);
write_builtin_topic_local(&wr->e, now(), false, NULL);
local_reader_ary_setinvalid (&wr->rdary);
ephash_remove_writer_guid (wr);
writer_set_state (wr, WRST_DELETING);
@ -3125,6 +3172,7 @@ static struct reader * new_reader_guid
struct reader * rd;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_reader_guid (guid) == NULL);
@ -3133,7 +3181,7 @@ static struct reader * new_reader_guid
new_reader_writer_common (guid, topic, xqos);
rd = os_malloc (sizeof (*rd));
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp);
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, &tk);
/* Copy QoS, merging in defaults */
rd->xqos = os_malloc (sizeof (*rd->xqos));
@ -3237,6 +3285,7 @@ static struct reader * new_reader_guid
ephash_insert_reader_guid (rd);
match_reader_with_proxy_writers (rd, tnow);
match_reader_with_local_writers (rd, tnow);
write_builtin_topic_local(&rd->e, now(), true, tk);
sedp_write_reader (rd);
return rd;
}
@ -3329,6 +3378,7 @@ int delete_reader (const struct nn_guid *guid)
(ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc);
}
DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid));
write_builtin_topic_local(&rd->e, now(), false, NULL);
ephash_remove_reader_guid (rd);
gcreq_reader (rd);
return 0;
@ -3404,6 +3454,7 @@ void new_proxy_participant
runs on a single thread, it can't go wrong. FIXME, maybe? The
same holds for the other functions for creating entities. */
struct proxy_participant *proxypp;
struct ddsi_tkmap_instance *tk;
assert (ppguid->entityid.u == NN_ENTITYID_PARTICIPANT);
assert (ephash_lookup_proxy_participant_guid (ppguid) == NULL);
@ -3413,7 +3464,7 @@ void new_proxy_participant
proxypp = os_malloc (sizeof (*proxypp));
entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, false);
entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, vendor, false, &tk);
proxypp->refc = 1;
proxypp->lease_expired = 0;
proxypp->vendor = vendor;
@ -3569,15 +3620,7 @@ void new_proxy_participant
if (proxypp->owns_lease)
lease_register (os_atomic_ldvoidp (&proxypp->lease));
if (proxypp->proxypp_have_spdp)
{
propagate_builtin_topic_participant(&(proxypp->e), proxypp->plist, timestamp, true);
if (proxypp->proxypp_have_cm)
{
propagate_builtin_topic_cmparticipant(&(proxypp->e), proxypp->plist, timestamp, true);
}
}
write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, tk);
os_mutexUnlock (&proxypp->e.lock);
}
@ -3595,14 +3638,10 @@ int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, co
switch (source)
{
case UPD_PROXYPP_SPDP:
propagate_builtin_topic_participant(&(proxypp->e), proxypp->plist, timestamp, true);
if (!proxypp->proxypp_have_spdp && proxypp->proxypp_have_cm)
propagate_builtin_topic_cmparticipant(&(proxypp->e), proxypp->plist, timestamp, true);
write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, NULL);
proxypp->proxypp_have_spdp = 1;
break;
case UPD_PROXYPP_CM:
if (proxypp->proxypp_have_spdp)
propagate_builtin_topic_cmparticipant(&(proxypp->e), proxypp->plist, timestamp, true);
proxypp->proxypp_have_cm = 1;
break;
}
@ -3848,8 +3887,7 @@ int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t t
return ERR_UNKNOWN_ENTITY;
}
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
propagate_builtin_topic_cmparticipant(&(ppt->e), ppt->plist, timestamp, false);
propagate_builtin_topic_participant(&(ppt->e), ppt->plist, timestamp, false);
write_builtin_topic_any(&ppt->e, timestamp, false, ppt->vendor, NULL);
remember_deleted_participant_guid (&ppt->e.guid);
ephash_remove_proxy_participant_guid (ppt);
os_mutexUnlock (&gv.lock);
@ -3982,7 +4020,7 @@ static void proxy_endpoint_common_init
(
struct entity_common *e, struct proxy_endpoint_common *c,
enum entity_kind kind, const struct nn_guid *guid, struct proxy_participant *proxypp,
struct addrset *as, const nn_plist_t *plist
struct addrset *as, const nn_plist_t *plist, struct ddsi_tkmap_instance **tk
)
{
const char *name;
@ -3993,7 +4031,7 @@ static void proxy_endpoint_common_init
assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == (QP_TOPIC_NAME | QP_TYPE_NAME));
name = (plist->present & PP_ENTITY_NAME) ? plist->entity_name : "";
entity_common_init (e, guid, name, kind, false);
entity_common_init (e, guid, name, kind, proxypp->vendor, false, tk);
c->xqos = nn_xqos_dup (&plist->qos);
c->as = ref_addrset (as);
c->topic = NULL; /* set from first matching reader/writer */
@ -4027,6 +4065,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
struct proxy_writer *pwr;
int isreliable;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
(void)timestamp;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_writer_guid (guid) == NULL);
@ -4038,7 +4077,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
}
pwr = os_malloc (sizeof (*pwr));
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, proxypp, as, plist);
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, proxypp, as, plist, &tk);
ut_avlInit (&pwr_readers_treedef, &pwr->readers);
pwr->n_reliable_readers = 0;
@ -4104,6 +4143,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
local_reader_ary_init (&pwr->rdary);
ephash_insert_proxy_writer_guid (pwr);
match_proxy_writer_with_readers (pwr, tnow);
write_builtin_topic_any(&pwr->e, timestamp, true, pwr->c.vendor, tk);
os_mutexLock (&pwr->e.lock);
pwr->local_matching_inprogress = 0;
@ -4235,6 +4275,7 @@ int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int
from removing themselves from the proxy writer's rdary[]. */
local_reader_ary_setinvalid (&pwr->rdary);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
write_builtin_topic_any(&pwr->e, timestamp, false, pwr->c.vendor, NULL);
ephash_remove_proxy_writer_guid (pwr);
os_mutexUnlock (&gv.lock);
gcreq_proxy_writer (pwr);
@ -4252,6 +4293,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
struct proxy_participant *proxypp;
struct proxy_reader *prd;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
(void)timestamp;
assert (!is_writer_entityid (guid->entityid));
@ -4264,7 +4306,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
}
prd = os_malloc (sizeof (*prd));
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, proxypp, as, plist);
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, proxypp, as, plist, &tk);
prd->deleting = 0;
#ifdef DDSI_INCLUDE_SSM
@ -4278,6 +4320,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
ut_avlInit (&prd_writers_treedef, &prd->writers);
ephash_insert_proxy_reader_guid (prd);
match_proxy_reader_with_writers (prd, tnow);
write_builtin_topic_any(&prd->e, timestamp, true, prd->c.vendor, tk);
return 0;
}
@ -4360,6 +4403,7 @@ int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int
DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n");
return ERR_UNKNOWN_ENTITY;
}
write_builtin_topic_any(&prd->e, timestamp, false, prd->c.vendor, NULL);
ephash_remove_proxy_reader_guid (prd);
os_mutexUnlock (&gv.lock);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");

View file

@ -56,6 +56,7 @@
#include "ddsi/ddsi_raweth.h"
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsi/ddsi_tkmap.h"
#include "dds__whc.h"
@ -783,12 +784,18 @@ static void make_special_topics (void)
{
gv.plist_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist);
gv.rawcdr_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr);
gv.builtin_participant_topic = new_sertopic_builtin (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
gv.builtin_reader_topic = new_sertopic_builtin (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
gv.builtin_writer_topic = new_sertopic_builtin (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
}
static void free_special_topics (void)
{
ddsi_sertopic_unref (gv.plist_topic);
ddsi_sertopic_unref (gv.rawcdr_topic);
ddsi_sertopic_unref (gv.builtin_participant_topic);
ddsi_sertopic_unref (gv.builtin_reader_topic);
ddsi_sertopic_unref (gv.builtin_writer_topic);
}
static int setup_and_start_recv_threads (void)