rework built-in topics again

Move details of built-in topics out of the DDSI core (so the only hooks
remain).  For this, rtps_term had to be split, so now it is "stop"
followed by "fini".

Add a notion of local writers that are not bound to a participant ("local
orphans"), so that the local built-in topic writers can be created during
initialization.  This eliminates the "builtin" participant.  This
uncovered in inconsistency in the unit tests: on the one hand, a newly
created participant is expected to have no child entities; on the other
hand, the built-in topics were expected to be returned by find_topic ...
This inconsistency has been resolved by creating them lazily and
accepting that find_topic can't return them until they have been
created.  Special code was in place in dds_create_reader anyway, so it
is not expected to have any real consequence for applications.

Use a special WHC implementation that regenerates the data on the fly
using the internal discovery tables of DDSI, so that the samples are only
stored by readers.  This eliminates the memory overhead of that existed
previously when the WHC of the writers stored the data.

No longer return topic name and type name in the built-in topics, they
have been extracted already and are not accessible through the normal
interface but do cause problems when comparing QoS.

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-01-06 13:10:24 +01:00
parent d6dcb0558d
commit 66076817e1
26 changed files with 876 additions and 919 deletions

View file

@ -37,6 +37,9 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds_subscriber.c
dds_write.c
dds_whc.c
dds_whc_builtintopic.c
dds_serdata_builtintopic.c
dds_sertopic_builtintopic.c
)
PREPEND(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include/ddsc>$<INSTALL_INTERFACE:include/ddsc>"
@ -73,6 +76,8 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds__write.h
dds__writer.h
dds__whc.h
dds__whc_builtintopic.h
dds__serdata_builtintopic.h
)
configure_file(

View file

@ -1777,7 +1777,7 @@ dds_write_flush(
* @returns A dds_return_t indicating success or failure.
*/
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
DDS_EXPORT int
DDS_EXPORT dds_return_t
dds_writecdr(
dds_entity_t writer,
struct ddsi_serdata *serdata);

View file

@ -13,49 +13,31 @@
#define _DDS_BUILTIN_H_
#include "ddsi/q_time.h"
#include "ddsi/ddsi_serdata_builtin.h"
#if defined (__cplusplus)
extern "C"
{
#endif
/* Get actual topic in related participant related to topic 'id'. */
_Must_inspect_result_ dds_entity_t
dds__get_builtin_topic(
_In_ dds_entity_t e,
_In_ dds_entity_t topic);
/* Global publisher singleton (publishes only locally). */
_Must_inspect_result_ dds_entity_t
dds__get_builtin_publisher(
void);
dds_entity_t dds__get_builtin_topic ( dds_entity_t e, dds_entity_t topic);
/* Subscriber singleton within related participant. */
_Must_inspect_result_ dds_entity_t
dds__get_builtin_subscriber(
_In_ dds_entity_t e);
dds_entity_t dds__get_builtin_subscriber(dds_entity_t e);
/* Checks whether the reader QoS is valid for use with built-in topic TOPIC */
bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos);
/* Initialization and finalize functions. */
void
dds__builtin_init(
void);
struct entity_common;
struct nn_guid;
struct ddsi_tkmap_instance;
void
dds__builtin_fini(
void);
void
dds__builtin_write(
_In_ enum ddsi_sertopic_builtin_type type,
_In_ const nn_guid_t *guid,
_In_ nn_wctime_t timestamp,
_In_ bool alive);
void dds__builtin_init (void);
void dds__builtin_fini (void);
bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid);
struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid);
struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
#if defined (__cplusplus)
}

View file

@ -9,35 +9,33 @@
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDSI_SERDATA_BUILTIN_H
#define DDSI_SERDATA_BUILTIN_H
#ifndef DDSI_SERDATA_BUILTINTOPIC_H
#define DDSI_SERDATA_BUILTINTOPIC_H
#include "os/os.h"
#include "util/ut_avl.h"
#include "ddsi/q_xqos.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/ddsi_sertopic.h"
#include "ddsi/q_xqos.h"
struct ddsi_serdata_builtin {
struct ddsi_serdata_builtintopic {
struct ddsi_serdata c;
nn_guid_t key;
nn_xqos_t xqos;
};
enum ddsi_sertopic_builtin_type {
enum ddsi_sertopic_builtintopic_type {
DSBT_PARTICIPANT,
DSBT_READER,
DSBT_WRITER
};
struct ddsi_sertopic_builtin {
struct ddsi_sertopic_builtintopic {
struct ddsi_sertopic c;
enum ddsi_sertopic_builtin_type type;
enum ddsi_sertopic_builtintopic_type type;
};
extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtin;
extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic;
struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename);
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename);
#endif

View file

@ -52,7 +52,7 @@ struct rhc;
* Obviously, it is encouraged to use condition variables and such. But
* sometimes it wouldn't make that much of a difference and taking the
* easy route is somewhat pragmatic. */
#define DDS_HEADBANG_TIMEOUT_MS (10)
#define DDS_HEADBANG_TIMEOUT (DDS_MSECS (10))
typedef bool (*dds_querycondition_filter_with_ctx_fn) (const void * sample, const void *ctx);

View file

@ -0,0 +1,28 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDS_WHC_BUILTINTOPIC_H
#define DDS_WHC_BUILTINTOPIC_H
#include "ddsi/q_whc.h"
#include "dds__serdata_builtintopic.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type);
#if defined (__cplusplus)
}
#endif
#endif /* Q_WHC_H */

View file

@ -22,28 +22,16 @@ extern "C" {
struct ddsi_serdata;
typedef enum
{
typedef enum {
DDS_WR_ACTION_WRITE = 0,
DDS_WR_ACTION_WRITE_DISPOSE = DDS_WR_DISPOSE_BIT,
DDS_WR_ACTION_DISPOSE = DDS_WR_KEY_BIT | DDS_WR_DISPOSE_BIT,
DDS_WR_ACTION_UNREGISTER = DDS_WR_KEY_BIT | DDS_WR_UNREGISTER_BIT
}
dds_write_action;
} dds_write_action;
int
dds_write_impl(
_In_ dds_writer *wr,
_In_ const void *data,
_In_ dds_time_t tstamp,
_In_ dds_write_action action);
int
dds_writecdr_impl(
_In_ dds_writer *wr,
_Inout_ struct ddsi_serdata *d,
_In_ dds_time_t tstamp,
_In_ dds_write_action action);
dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp, dds_write_action action);
dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action);
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d);
#if defined (__cplusplus)
}

View file

@ -24,418 +24,216 @@
#include "dds__subscriber.h"
#include "dds__write.h"
#include "dds__writer.h"
#include "dds__whc_builtintopic.h"
#include "dds__serdata_builtintopic.h"
#include "ddsi/q_qosmatch.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsi/ddsi_tkmap.h"
static dds_return_t
dds__delete_builtin_participant(
dds_entity *e);
static struct ddsi_sertopic *builtin_participant_topic;
static struct ddsi_sertopic *builtin_reader_topic;
static struct ddsi_sertopic *builtin_writer_topic;
static struct local_orphan_writer *builtintopic_writer_participant;
static struct local_orphan_writer *builtintopic_writer_publications;
static struct local_orphan_writer *builtintopic_writer_subscriptions;
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_participant(
void);
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_publisher(
_In_ dds_entity_t participant);
static dds_entity_t
dds__create_builtin_writer(
_In_ dds_entity_t topic);
static _Must_inspect_result_ dds_entity_t
dds__get_builtin_participant(
void);
static os_mutex g_builtin_mutex;
static os_atomic_uint32_t m_call_count = OS_ATOMIC_UINT32_INIT(0);
/* Singletons are used to publish builtin data locally. */
static dds_entity_t g_builtin_local_participant = 0;
static dds_entity_t g_builtin_local_publisher = 0;
static dds_entity_t g_builtin_local_writers[] = {
0, /* index DDS_BUILTIN_TOPIC_DCPSPARTICIPANT - DDS_KIND_INTERNAL - 1 */
0, /* index DDS_BUILTIN_TOPIC_DCPSTOPIC - DDS_KIND_INTERNAL - 1 */
0, /* index DDS_BUILTIN_TOPIC_DCPSPUBLICATION - DDS_KIND_INTERNAL - 1 */
0, /* index DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION - DDS_KIND_INTERNAL - 1 */
};
static _Must_inspect_result_ dds_qos_t *
dds__create_builtin_qos(
void)
static dds_qos_t *dds__create_builtin_qos (void)
{
const char *partition = "__BUILT-IN PARTITION__";
dds_qos_t *qos = dds_create_qos();
dds_qset_durability(qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_presentation(qos, DDS_PRESENTATION_TOPIC, false, false);
dds_qset_reliability(qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(100));
dds_qset_partition(qos, 1, &partition);
return qos;
const char *partition = "__BUILT-IN PARTITION__";
dds_qos_t *qos = dds_create_qos ();
dds_qset_durability (qos, DDS_DURABILITY_TRANSIENT_LOCAL);
dds_qset_presentation (qos, DDS_PRESENTATION_TOPIC, false, false);
dds_qset_reliability (qos, DDS_RELIABILITY_RELIABLE, DDS_MSECS(100));
dds_qset_partition (qos, 1, &partition);
return qos;
}
static dds_return_t
dds__delete_builtin_participant(
dds_entity *e)
void dds__builtin_init (void)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
dds_qos_t *qos = dds__create_builtin_qos ();
assert(e);
assert(thr);
assert(dds_entity_kind(e->m_hdl) == DDS_KIND_PARTICIPANT);
builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
if (asleep) {
thread_state_awake(thr);
}
dds_domain_free(e->m_domain);
if (asleep) {
thread_state_asleep(thr);
}
builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT));
builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER));
builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER));
return DDS_RETCODE_OK;
dds_delete_qos (qos);
}
/*
* We don't use the 'normal' create participant.
*
* This way, the application is not able to access the local builtin writers.
* Also, we can indicate that it should be a 'local only' participant, which
* means that none of the entities under the hierarchy of this participant will
* be exposed to the outside world. This is what we want, because these builtin
* writers are only applicable to local user readers.
*/
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_participant(
void)
void dds__builtin_fini (void)
{
int q_rc;
nn_plist_t plist;
struct thread_state1 * thr;
bool asleep;
nn_guid_t guid;
dds_entity_t participant;
dds_participant *pp;
/* No more sources for builtin topic samples */
struct thread_state1 * const self = lookup_thread_state ();
thread_state_awake (self);
delete_local_orphan_writer (builtintopic_writer_participant);
delete_local_orphan_writer (builtintopic_writer_publications);
delete_local_orphan_writer (builtintopic_writer_subscriptions);
thread_state_asleep (self);
nn_plist_init_empty (&plist);
thr = lookup_thread_state ();
asleep = !vtime_awake_p (thr->vtime);
if (asleep) {
thread_state_awake (thr);
}
q_rc = new_participant (&guid, RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_ONLY_LOCAL, &plist);
if (asleep) {
thread_state_asleep (thr);
}
if (q_rc != 0) {
DDS_ERROR("Internal builtin error\n");
participant = DDS_ERRNO(DDS_RETCODE_ERROR);
goto fail;
}
pp = dds_alloc (sizeof (*pp));
participant = dds_entity_init (&pp->m_entity, NULL, DDS_KIND_PARTICIPANT, NULL, NULL, 0);
if (participant < 0) {
goto fail;
}
pp->m_entity.m_guid = guid;
pp->m_entity.m_domain = dds_domain_create (config.domainId.value);
pp->m_entity.m_domainid = config.domainId.value;
pp->m_entity.m_deriver.delete = dds__delete_builtin_participant;
fail:
return participant;
ddsi_sertopic_unref (builtin_participant_topic);
ddsi_sertopic_unref (builtin_reader_topic);
ddsi_sertopic_unref (builtin_writer_topic);
}
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_publisher(
_In_ dds_entity_t participant)
dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic)
{
dds_qos_t *qos = dds__create_builtin_qos();
dds_entity_t pub = dds_create_publisher(participant, qos, NULL);
dds_delete_qos(qos);
return pub;
dds_entity_t pp;
dds_entity_t tp;
if ((pp = dds_get_participant (e)) <= 0)
return pp;
struct ddsi_sertopic *sertopic;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
sertopic = builtin_participant_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
sertopic = builtin_writer_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
sertopic = builtin_reader_topic;
} else {
assert (0);
return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
dds_qos_t *qos = dds__create_builtin_qos ();
tp = dds_create_topic_arbitrary (pp, sertopic, sertopic->name, qos, NULL, NULL);
dds_delete_qos (qos);
return tp;
}
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_subscriber(
_In_ dds_entity *participant)
static bool qos_has_resource_limits (const dds_qos_t *qos)
{
dds_qos_t *qos = dds__create_builtin_qos();
dds_entity_t sub = dds__create_subscriber_l(participant, qos, NULL);
dds_delete_qos(qos);
return sub;
return (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED);
}
static dds_entity_t
dds__create_builtin_writer(
_In_ dds_entity_t topic)
bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos)
{
dds_entity_t wr;
dds_entity_t pub = dds__get_builtin_publisher();
if (pub > 0) {
dds_entity_t top = dds__get_builtin_topic(pub, topic);
if (top > 0) {
wr = dds_create_writer(pub, top, NULL, NULL);
(void)dds_delete(top);
} else {
wr = top;
}
if (qos == NULL)
/* default QoS inherited from topic is ok by definition */
return true;
else
{
/* failing writes on built-in topics are unwelcome complications, so we simply forbid the creation of
a reader matching a built-in topics writer that has resource limits */
struct local_orphan_writer *bwr;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
bwr = builtintopic_writer_participant;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
bwr = builtintopic_writer_publications;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
bwr = builtintopic_writer_subscriptions;
} else {
wr = pub;
assert (0);
return false;
}
return wr;
return qos_match_p (qos, bwr->wr.xqos) && !qos_has_resource_limits (qos);
}
}
static _Must_inspect_result_ dds_entity_t
dds__get_builtin_participant(
void)
static dds_entity_t dds__create_builtin_subscriber (dds_entity *participant)
{
if (g_builtin_local_participant == 0) {
g_builtin_local_participant = dds__create_builtin_participant();
(void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPARTICIPANT);
(void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPUBLICATION);
(void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION);
}
return g_builtin_local_participant;
dds_qos_t *qos = dds__create_builtin_qos ();
dds_entity_t sub = dds__create_subscriber_l (participant, qos, NULL);
dds_delete_qos (qos);
return sub;
}
_Must_inspect_result_ dds_entity_t
dds__get_builtin_publisher(
void)
dds_entity_t dds__get_builtin_subscriber (dds_entity_t e)
{
if (g_builtin_local_publisher == 0) {
dds_entity_t par = dds__get_builtin_participant();
if (par > 0) {
g_builtin_local_publisher = dds__create_builtin_publisher(par);
}
}
return g_builtin_local_publisher;
}
dds_entity_t sub;
dds_return_t ret;
dds_entity_t pp;
dds_participant *p;
dds_entity *part_entity;
_Must_inspect_result_ dds_entity_t
dds__get_builtin_subscriber(
_In_ dds_entity_t e)
{
dds_entity_t sub;
dds_return_t ret;
dds_entity_t participant;
dds_participant *p;
dds_entity *part_entity;
participant = dds_get_participant(e);
if (participant <= 0) {
/* error already in participant error; no need to repeat error */
ret = participant;
goto error;
}
ret = dds_entity_lock(participant, DDS_KIND_PARTICIPANT, (dds_entity **)&part_entity);
if (ret != DDS_RETCODE_OK) {
goto error;
}
p = (dds_participant *)part_entity;
if(p->m_builtin_subscriber <= 0) {
p->m_builtin_subscriber = dds__create_builtin_subscriber(part_entity);
}
sub = p->m_builtin_subscriber;
dds_entity_unlock(part_entity);
return sub;
/* Error handling */
error:
assert(ret < 0);
if ((pp = dds_get_participant (e)) <= 0)
return pp;
if ((ret = dds_entity_lock (pp, DDS_KIND_PARTICIPANT, &part_entity)) < 0)
return ret;
p = (dds_participant *) part_entity;
if (p->m_builtin_subscriber <= 0) {
p->m_builtin_subscriber = dds__create_builtin_subscriber (part_entity);
}
sub = p->m_builtin_subscriber;
dds_entity_unlock(part_entity);
return sub;
}
_Must_inspect_result_ dds_entity_t
dds__get_builtin_topic(
_In_ dds_entity_t e,
_In_ dds_entity_t topic)
bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid)
{
dds_entity_t participant;
dds_entity_t ret;
participant = dds_get_participant(e);
if (participant > 0) {
struct ddsi_sertopic *sertopic;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
sertopic = gv.builtin_participant_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
sertopic = gv.builtin_writer_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
sertopic = gv.builtin_reader_topic;
} else {
DDS_ERROR("Invalid builtin-topic handle(%d)\n", topic);
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err_invalid_topic;
}
ret = dds_find_topic (participant, sertopic->name);
if (ret < 0 && dds_err_nr(ret) == DDS_RETCODE_PRECONDITION_NOT_MET) {
dds_qos_t *qos = dds__create_builtin_qos();
ret = dds_create_topic_arbitrary(participant, sertopic, sertopic->name, qos, NULL, NULL);
dds_delete_qos(qos);
}
} else {
/* Failed to get participant of provided entity */
ret = participant;
}
err_invalid_topic:
return ret;
return !(onlylocal || is_builtin_endpoint (entityid, vendorid));
}
static _Must_inspect_result_ dds_entity_t
dds__get_builtin_writer(
_In_ dds_entity_t topic)
struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid)
{
dds_entity_t wr;
if ((topic >= DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) && (topic <= DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION)) {
int index = (int)(topic - DDS_KIND_INTERNAL - 1);
os_mutexLock(&g_builtin_mutex);
wr = g_builtin_local_writers[index];
if (wr == 0) {
wr = dds__create_builtin_writer(topic);
if (wr > 0) {
g_builtin_local_writers[index] = wr;
}
}
os_mutexUnlock(&g_builtin_mutex);
} else {
DDS_ERROR("Given topic is not a builtin topic\n");
wr = DDS_ERRNO(DDS_RETCODE_ERROR);
}
return wr;
struct ddsi_tkmap_instance *tk;
struct ddsi_serdata *sd;
struct nn_keyhash kh;
memcpy (&kh, guid, sizeof (kh));
/* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
sd = ddsi_serdata_from_keyhash (builtin_participant_topic, &kh);
tk = ddsi_tkmap_find (sd, false, true);
ddsi_serdata_unref (sd);
return tk;
}
static dds_return_t
dds__builtin_write_int(
_In_ dds_entity_t topic,
_In_ const nn_guid_t *guid,
_In_ dds_time_t timestamp,
_In_ bool alive)
struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive)
{
dds_return_t ret = DDS_RETCODE_OK;
if (os_atomic_inc32_nv(&m_call_count) > 1) {
dds_entity_t wr;
wr = dds__get_builtin_writer(topic);
if (wr > 0) {
struct ddsi_sertopic *sertopic;
struct ddsi_serdata *serdata;
struct nn_keyhash keyhash;
struct dds_writer *wraddr;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
sertopic = gv.builtin_participant_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
sertopic = gv.builtin_writer_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
sertopic = gv.builtin_reader_topic;
} else {
sertopic = NULL;
assert (0);
}
memcpy (&keyhash, guid, sizeof (keyhash));
serdata = ddsi_serdata_from_keyhash(sertopic, &keyhash);
ret = dds_writer_lock(wr, &wraddr);
if (ret == DDS_RETCODE_OK) {
ret = dds_writecdr_impl (wraddr, serdata, timestamp, alive ? 0 : (DDS_WR_DISPOSE_BIT | DDS_WR_UNREGISTER_BIT));
dds_writer_unlock(wraddr);
}
} else {
ret = wr;
}
}
os_atomic_dec32(&m_call_count);
return ret;
/* initialize to avoid gcc warning ultimately caused by C's horrible type system */
struct ddsi_sertopic *topic = NULL;
struct ddsi_serdata *serdata;
struct nn_keyhash keyhash;
switch (e->kind)
{
case EK_PARTICIPANT:
case EK_PROXY_PARTICIPANT:
topic = builtin_participant_topic;
break;
case EK_WRITER:
case EK_PROXY_WRITER:
topic = builtin_writer_topic;
break;
case EK_READER:
case EK_PROXY_READER:
topic = builtin_reader_topic;
break;
}
assert (topic != NULL);
memcpy (&keyhash, &e->guid, sizeof (keyhash));
serdata = ddsi_serdata_from_keyhash (topic, &keyhash);
serdata->timestamp = timestamp;
serdata->statusinfo = alive ? 0 : NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
return serdata;
}
void
dds__builtin_write(
_In_ enum ddsi_sertopic_builtin_type type,
_In_ const nn_guid_t *guid,
_In_ nn_wctime_t timestamp,
_In_ bool alive)
void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive)
{
/* initialize to avoid compiler warning ultimately caused by C's horrible type system */
dds_entity_t topic = 0;
switch (type)
if (ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, get_entity_vendorid (e)))
{
/* initialize to avoid gcc warning ultimately caused by C's horrible type system */
struct local_orphan_writer *bwr = NULL;
struct ddsi_serdata *serdata = dds__builtin_make_sample (e, timestamp, alive);
assert (e->tk != NULL);
switch (e->kind)
{
case DSBT_PARTICIPANT:
topic = DDS_BUILTIN_TOPIC_DCPSPARTICIPANT;
break;
case DSBT_WRITER:
topic = DDS_BUILTIN_TOPIC_DCPSPUBLICATION;
break;
case DSBT_READER:
topic = DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION;
break;
case EK_PARTICIPANT:
case EK_PROXY_PARTICIPANT:
bwr = builtintopic_writer_participant;
break;
case EK_WRITER:
case EK_PROXY_WRITER:
bwr = builtintopic_writer_publications;
break;
case EK_READER:
case EK_PROXY_READER:
bwr = builtintopic_writer_subscriptions;
break;
}
assert(topic != 0);
(void)dds__builtin_write_int(topic, guid, timestamp.v, alive);
}
bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos)
{
if (qos == NULL) {
/* default QoS inherited from topic is ok by definition */
return true;
} else {
dds_entity_t wr = dds__get_builtin_writer(topic);
dds_qos_t *wrqos = dds_create_qos();
dds_return_t ret = dds_get_qos(wr, wrqos);
bool match;
assert (ret == DDS_RETCODE_OK);
(void)ret;
if (!qos_match_p (qos, wrqos)) {
match = false;
} else if (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED) {
/* this means a write on the built-in topic writer can't fail */
match = false;
} else {
match = true;
}
dds_delete_qos(wrqos);
return match;
}
}
void
dds__builtin_init(
void)
{
assert(os_atomic_ld32(&m_call_count) == 0);
os_mutexInit(&g_builtin_mutex);
os_atomic_inc32(&m_call_count);
}
void
dds__builtin_fini(
void)
{
assert(os_atomic_ld32(&m_call_count) > 0);
while (os_atomic_dec32_nv(&m_call_count) > 0) {
os_atomic_inc32_nv(&m_call_count);
dds_sleepfor(DDS_MSECS(10));
}
(void)dds_delete(g_builtin_local_participant);
g_builtin_local_participant = 0;
g_builtin_local_publisher = 0;
memset(g_builtin_local_writers, 0, sizeof(g_builtin_local_writers));
os_mutexDestroy(&g_builtin_mutex);
dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata);
}
}

View file

@ -18,6 +18,7 @@
#include "dds__domain.h"
#include "dds__err.h"
#include "dds__builtin.h"
#include "dds__whc_builtintopic.h"
#include "ddsi/ddsi_iid.h"
#include "ddsi/ddsi_tkmap.h"
#include "ddsi/ddsi_serdata.h"
@ -106,8 +107,6 @@ dds_init(dds_domainid_t domain)
* main configured domain id is. */
dds_global.m_default_domain = config.domainId.value;
dds__builtin_init();
if (rtps_config_prep(dds_cfgst) != 0)
{
DDS_ERROR("Failed to configure RTPS\n");
@ -138,6 +137,8 @@ dds_init(dds_domainid_t domain)
goto fail_rtps_init;
}
dds__builtin_init ();
if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
{
DDS_ERROR("Failed to start the servicelease\n");
@ -172,7 +173,8 @@ skip:
fail_servicelease_start:
if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease);
rtps_term ();
rtps_stop ();
rtps_fini ();
fail_rtps_init:
if (gv.servicelease)
{
@ -182,7 +184,6 @@ fail_rtps_init:
fail_servicelease_new:
thread_states_fini();
fail_rtps_config:
dds__builtin_fini();
fail_config_domainid:
dds_global.m_default_domain = DDS_DOMAIN_DEFAULT;
config_fini (dds_cfgst);
@ -206,11 +207,11 @@ extern void dds_fini (void)
dds_global.m_init_count--;
if (dds_global.m_init_count == 0)
{
dds__builtin_fini();
if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease);
rtps_term ();
rtps_stop ();
dds__builtin_fini ();
rtps_fini ();
if (gv.servicelease)
nn_servicelease_free (gv.servicelease);
gv.servicelease = NULL;
@ -247,7 +248,9 @@ void ddsi_plugin_init (void)
ddsi_plugin.init_fn = dds__init_plugin;
ddsi_plugin.fini_fn = dds__fini_plugin;
ddsi_plugin.builtin_write = dds__builtin_write;
ddsi_plugin.builtintopic_is_visible = dds__builtin_is_visible;
ddsi_plugin.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry;
ddsi_plugin.builtintopic_write = dds__builtin_write;
ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free;
ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini;

View file

@ -18,6 +18,7 @@
#include "dds__domain.h"
#include "dds__participant.h"
#include "dds__err.h"
#include "dds__builtin.h"
#define DDS_PARTICIPANT_STATUS_MASK 0u

View file

@ -23,11 +23,10 @@
#include <string.h>
#include "os/os.h"
#include "dds__key.h"
#include "ddsi/ddsi_tkmap.h"
#include "dds__stream.h"
#include "dds__serdata_builtintopic.h"
#include "ddsi/ddsi_tkmap.h"
#include "ddsi/q_entity.h"
#include "ddsi/ddsi_serdata_builtin.h"
//#include "dds.h" /* FIXME: need the sample types of the built-in topics */
static const uint64_t unihashconsts[] = {
UINT64_C (16292676669999574021),
@ -46,7 +45,7 @@ static uint32_t hash_guid (const nn_guid_t *g)
>> 32);
}
static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d, uint32_t basehash)
static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtintopic *d, uint32_t basehash)
{
d->c.hash = hash_guid (&d->key) ^ basehash;
return &d->c;
@ -54,37 +53,37 @@ static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d,
static bool serdata_builtin_eqkey(const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn)
{
const struct ddsi_serdata_builtin *a = (const struct ddsi_serdata_builtin *)acmn;
const struct ddsi_serdata_builtin *b = (const struct ddsi_serdata_builtin *)bcmn;
const struct ddsi_serdata_builtintopic *a = (const struct ddsi_serdata_builtintopic *)acmn;
const struct ddsi_serdata_builtintopic *b = (const struct ddsi_serdata_builtintopic *)bcmn;
return memcmp (&a->key, &b->key, sizeof (a->key)) == 0;
}
static void serdata_builtin_free(struct ddsi_serdata *dcmn)
{
struct ddsi_serdata_builtin *d = (struct ddsi_serdata_builtin *)dcmn;
struct ddsi_serdata_builtintopic *d = (struct ddsi_serdata_builtintopic *)dcmn;
if (d->c.kind == SDK_DATA)
nn_xqos_fini (&d->xqos);
os_free (d);
}
static struct ddsi_serdata_builtin *serdata_builtin_new(const struct ddsi_sertopic_builtin *tp, enum ddsi_serdata_kind kind)
static struct ddsi_serdata_builtintopic *serdata_builtin_new(const struct ddsi_sertopic_builtintopic *tp, enum ddsi_serdata_kind kind)
{
struct ddsi_serdata_builtin *d = os_malloc(sizeof (*d));
struct ddsi_serdata_builtintopic *d = os_malloc(sizeof (*d));
ddsi_serdata_init (&d->c, &tp->c, kind);
return d;
}
static void from_entity_pp (struct ddsi_serdata_builtin *d, const struct participant *pp)
static void from_entity_pp (struct ddsi_serdata_builtintopic *d, const struct participant *pp)
{
nn_xqos_copy(&d->xqos, &pp->plist->qos);
}
static void from_entity_proxypp (struct ddsi_serdata_builtin *d, const struct proxy_participant *proxypp)
static void from_entity_proxypp (struct ddsi_serdata_builtintopic *d, const struct proxy_participant *proxypp)
{
nn_xqos_copy(&d->xqos, &proxypp->plist->qos);
}
static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const struct ddsi_sertopic *tp)
static void set_topic_type_from_sertopic (struct ddsi_serdata_builtintopic *d, const struct ddsi_sertopic *tp)
{
if (!(d->xqos.present & QP_TOPIC_NAME))
{
@ -98,26 +97,26 @@ static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const
}
}
static void from_entity_rd (struct ddsi_serdata_builtin *d, const struct reader *rd)
static void from_entity_rd (struct ddsi_serdata_builtintopic *d, const struct reader *rd)
{
nn_xqos_copy(&d->xqos, rd->xqos);
set_topic_type_from_sertopic(d, rd->topic);
}
static void from_entity_prd (struct ddsi_serdata_builtin *d, const struct proxy_reader *prd)
static void from_entity_prd (struct ddsi_serdata_builtintopic *d, const struct proxy_reader *prd)
{
nn_xqos_copy(&d->xqos, prd->c.xqos);
assert (d->xqos.present & QP_TOPIC_NAME);
assert (d->xqos.present & QP_TYPE_NAME);
}
static void from_entity_wr (struct ddsi_serdata_builtin *d, const struct writer *wr)
static void from_entity_wr (struct ddsi_serdata_builtintopic *d, const struct writer *wr)
{
nn_xqos_copy(&d->xqos, wr->xqos);
set_topic_type_from_sertopic(d, wr->topic);
}
static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_writer *pwr)
static void from_entity_pwr (struct ddsi_serdata_builtintopic *d, const struct proxy_writer *pwr)
{
nn_xqos_copy(&d->xqos, pwr->c.xqos);
assert (d->xqos.present & QP_TOPIC_NAME);
@ -127,10 +126,10 @@ static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_
struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash)
{
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)tpcmn;
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
/* keyhash must in host format (which the GUIDs always are internally) */
const struct entity_common *entity = ephash_lookup_guid_untyped ((const nn_guid_t *) keyhash->value);
struct ddsi_serdata_builtin *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
memcpy (&d->key, keyhash->value, sizeof (d->key));
if (entity)
{
@ -206,7 +205,7 @@ static dds_qos_t *dds_qos_from_xqos_reuse (dds_qos_t *old, const nn_xqos_t *src)
return old;
}
static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_participant *sample)
static bool to_sample_pp (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_participant *sample)
{
convkey (&sample->key, &d->key);
if (d->c.kind == SDK_DATA)
@ -216,7 +215,7 @@ static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_built
return true;
}
static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_endpoint *sample)
static bool to_sample_endpoint (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_endpoint *sample)
{
nn_guid_t ppguid;
convkey (&sample->key, &d->key);
@ -236,8 +235,8 @@ static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds
static bool serdata_builtin_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{
const struct ddsi_serdata_builtin *d = (const struct ddsi_serdata_builtin *)serdata_common;
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)topic;
const struct ddsi_serdata_builtintopic *d = (const struct ddsi_serdata_builtintopic *)serdata_common;
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)topic;
if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */
/* FIXME: completing builtin topic support along these lines requires subscribers, publishers and topics to also become DDSI entities - which is probably a good thing anyway */
switch (tp->type)
@ -279,7 +278,7 @@ static void serdata_builtin_to_ser_unref (struct ddsi_serdata *serdata_common, c
(void)serdata_common; (void)ref;
}
const struct ddsi_serdata_ops ddsi_serdata_ops_builtin = {
const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = {
.get_size = serdata_builtin_get_size,
.eqkey = serdata_builtin_eqkey,
.free = serdata_builtin_free,

View file

@ -20,22 +20,22 @@
#include "ddsi/q_config.h"
#include "ddsi/q_freelist.h"
#include "ddsi/ddsi_sertopic.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsc/dds.h"
#include "dds__serdata_builtintopic.h"
/* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */
struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename)
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename)
{
struct ddsi_sertopic_builtin *tp = os_malloc (sizeof (*tp));
struct ddsi_sertopic_builtintopic *tp = os_malloc (sizeof (*tp));
tp->c.iid = ddsi_iid_gen();
tp->c.name = dds_string_dup (name);
tp->c.typename = dds_string_dup (typename);
const size_t name_typename_size = strlen (tp->c.name) + 1 + strlen (tp->c.typename) + 1;
tp->c.name_typename = dds_alloc (name_typename_size);
snprintf (tp->c.name_typename, name_typename_size, "%s/%s", tp->c.name, tp->c.typename);
tp->c.ops = &ddsi_sertopic_ops_builtin;
tp->c.serdata_ops = &ddsi_serdata_ops_builtin;
tp->c.ops = &ddsi_sertopic_ops_builtintopic;
tp->c.serdata_ops = &ddsi_serdata_ops_builtintopic;
tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops);
tp->c.status_cb = 0;
tp->c.status_cb_entity = NULL;
@ -66,7 +66,7 @@ static void free_endpoint (void *vsample)
sample->qos = NULL;
}
static size_t get_size (enum ddsi_sertopic_builtin_type type)
static size_t get_size (enum ddsi_sertopic_builtintopic_type type)
{
switch (type)
{
@ -82,14 +82,14 @@ static size_t get_size (enum ddsi_sertopic_builtin_type type)
static void sertopic_builtin_zero_samples (const struct ddsi_sertopic *sertopic_common, void *samples, size_t count)
{
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common;
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common;
size_t size = get_size (tp->type);
memset (samples, 0, size * count);
}
static void sertopic_builtin_realloc_samples (void **ptrs, const struct ddsi_sertopic *sertopic_common, void *old, size_t oldcount, size_t count)
{
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common;
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common;
const size_t size = get_size (tp->type);
char *new = dds_realloc (old, size * count);
if (new && count > oldcount)
@ -105,7 +105,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_
{
if (count > 0)
{
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common;
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common;
const size_t size = get_size (tp->type);
#ifndef NDEBUG
for (size_t i = 0, off = 0; i < count; i++, off += size)
@ -139,7 +139,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_
}
}
const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin = {
const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic = {
.deinit = sertopic_builtin_deinit,
.zero_samples = sertopic_builtin_zero_samples,
.realloc_samples = sertopic_builtin_realloc_samples,

View file

@ -0,0 +1,201 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <stddef.h>
#include <string.h>
#include "os/os.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/q_unused.h"
#include "ddsi/q_config.h"
#include "ddsi/q_ephash.h"
#include "ddsi/q_entity.h"
#include "ddsi/ddsi_tkmap.h"
#include "dds__serdata_builtintopic.h"
#include "dds__whc_builtintopic.h"
#include "dds__builtin.h"
struct bwhc {
struct whc common;
enum ddsi_sertopic_builtintopic_type type;
};
enum bwhc_iter_state {
BIS_INIT_LOCAL,
BIS_LOCAL,
BIS_INIT_PROXY,
BIS_PROXY
};
struct bwhc_iter {
struct whc_sample_iter_base c;
enum bwhc_iter_state st;
bool have_sample;
struct ephash_enum it;
};
/* check that our definition of whc_sample_iter fits in the type that callers allocate */
struct bwhc_sample_iter_sizecheck {
char fits_in_generic_type[sizeof(struct bwhc_iter) <= sizeof(struct whc_sample_iter) ? 1 : -1];
};
static void bwhc_free (struct whc *whc_generic)
{
os_free (whc_generic);
}
static void bwhc_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it)
{
struct bwhc_iter *it = (struct bwhc_iter *) opaque_it;
it->c.whc = (struct whc *) whc_generic;
it->st = BIS_INIT_LOCAL;
it->have_sample = false;
}
static bool is_visible (const struct entity_common *e)
{
const nn_vendorid_t vendorid = get_entity_vendorid (e);
return ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, vendorid);
}
static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample)
{
struct bwhc_iter * const it = (struct bwhc_iter *) opaque_it;
struct bwhc * const whc = (struct bwhc *) it->c.whc;
enum entity_kind kind = EK_PARTICIPANT; /* pacify gcc */
struct entity_common *entity;
if (it->have_sample)
{
ddsi_serdata_unref (sample->serdata);
it->have_sample = false;
}
/* most fields really don't matter, so memset */
memset (sample, 0, sizeof (*sample));
switch (it->st)
{
case BIS_INIT_LOCAL:
switch (whc->type) {
case DSBT_PARTICIPANT: kind = EK_PARTICIPANT; break;
case DSBT_WRITER: kind = EK_WRITER; break;
case DSBT_READER: kind = EK_READER; break;
}
assert (whc->type == DSBT_PARTICIPANT || kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, kind);
it->st = BIS_LOCAL;
/* FALLS THROUGH */
case BIS_LOCAL:
while ((entity = ephash_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true);
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
it->st = BIS_INIT_PROXY;
}
/* FALLS THROUGH */
case BIS_INIT_PROXY:
switch (whc->type) {
case DSBT_PARTICIPANT: kind = EK_PROXY_PARTICIPANT; break;
case DSBT_WRITER: kind = EK_PROXY_WRITER; break;
case DSBT_READER: kind = EK_PROXY_READER; break;
}
assert (kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, kind);
it->st = BIS_PROXY;
/* FALLS THROUGH */
case BIS_PROXY:
while ((entity = ephash_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true);
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
return false;
}
}
assert (0);
return false;
}
static void bwhc_get_state (const struct whc *whc, struct whc_state *st)
{
(void)whc;
st->max_seq = -1;
st->min_seq = -1;
st->unacked_bytes = 0;
}
static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
(void)whc;
(void)max_drop_seq;
(void)seq;
(void)serdata;
(void)tk;
if (plist)
os_free (plist);
return 0;
}
static unsigned bwhc_downgrade_to_volatile (struct whc *whc, struct whc_state *st)
{
(void)whc;
(void)st;
return 0;
}
static unsigned bwhc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list)
{
(void)whc;
(void)max_drop_seq;
(void)whcst;
*deferred_free_list = NULL;
return 0;
}
static void bwhc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list)
{
(void)whc;
(void)deferred_free_list;
}
static const struct whc_ops bwhc_ops = {
.insert = bwhc_insert,
.remove_acked_messages = bwhc_remove_acked_messages,
.free_deferred_free_list = bwhc_free_deferred_free_list,
.get_state = bwhc_get_state,
.next_seq = 0,
.borrow_sample = 0,
.borrow_sample_key = 0,
.return_sample = 0,
.sample_iter_init = bwhc_sample_iter_init,
.sample_iter_borrow_next = bwhc_sample_iter_borrow_next,
.downgrade_to_volatile = bwhc_downgrade_to_volatile,
.free = bwhc_free
};
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type)
{
struct bwhc *whc = os_malloc (sizeof (*whc));
whc->common.ops = &bwhc_ops;
whc->type = type;
return (struct whc *) whc;
}

View file

@ -26,327 +26,259 @@
#include "ddsi/q_entity.h"
#include "ddsi/q_radmin.h"
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
dds_return_t
dds_write(
_In_ dds_entity_t writer,
_In_ const void *data)
dds_return_t dds_write (dds_entity_t writer, const void *data)
{
dds_return_t ret;
dds__retcode_t rc;
dds_writer *wr;
dds_return_t ret;
dds__retcode_t rc;
dds_writer *wr;
if (data != NULL) {
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
ret = dds_write_impl(wr, data, dds_time(), 0);
dds_writer_unlock(wr);
} else {
DDS_ERROR("Error occurred on locking entity\n");
ret = DDS_ERRNO(rc);
}
} else {
DDS_ERROR("No data buffer provided\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
if (data == NULL)
return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
return ret;
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
return DDS_ERRNO (rc);
ret = dds_write_impl (wr, data, dds_time (), 0);
dds_writer_unlock (wr);
return ret;
}
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
int
dds_writecdr(
dds_entity_t writer,
struct ddsi_serdata *serdata)
dds_return_t dds_writecdr (dds_entity_t writer, struct ddsi_serdata *serdata)
{
dds_return_t ret;
dds__retcode_t rc;
dds_writer *wr;
if (serdata != NULL) {
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
ret = dds_writecdr_impl (wr, serdata, dds_time (), 0);
dds_writer_unlock(wr);
} else {
DDS_ERROR("Error occurred on locking writer\n");
ret = DDS_ERRNO(rc);
}
} else{
DDS_ERROR("Given cdr has NULL value\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
return ret;
dds_return_t ret;
dds__retcode_t rc;
dds_writer *wr;
if (serdata == NULL)
return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
return DDS_ERRNO (rc);
ret = dds_writecdr_impl (wr, serdata, dds_time (), 0);
dds_writer_unlock (wr);
return ret;
}
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
dds_return_t
dds_write_ts(
_In_ dds_entity_t writer,
_In_ const void *data,
_In_ dds_time_t timestamp)
dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t timestamp)
{
dds_return_t ret;
dds__retcode_t rc;
dds_writer *wr;
dds_return_t ret;
dds__retcode_t rc;
dds_writer *wr;
if(data == NULL){
DDS_ERROR("Argument data has NULL value\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
}
if(timestamp < 0){
DDS_ERROR("Argument timestamp has negative value\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
}
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
ret = dds_write_impl(wr, data, timestamp, 0);
dds_writer_unlock(wr);
} else {
DDS_ERROR("Error occurred on locking writer\n");
ret = DDS_ERRNO(rc);
}
err:
return ret;
if (data == NULL || timestamp < 0)
return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
return DDS_ERRNO (rc);
ret = dds_write_impl (wr, data, timestamp, 0);
dds_writer_unlock (wr);
return ret;
}
static int
deliver_locally(
_In_ struct writer *wr,
_In_ struct ddsi_serdata *payload,
_In_ struct ddsi_tkmap_instance *tk)
static dds_return_t try_store (struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms)
{
dds_return_t ret = DDS_RETCODE_OK;
os_mutexLock (&wr->rdary.rdary_lock);
if (wr->rdary.fastpath_ok) {
struct reader ** const rdary = wr->rdary.rdary;
if (rdary[0]) {
struct proxy_writer_info pwr_info;
unsigned i;
make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
for (i = 0; rdary[i]; i++) {
bool stored;
DDS_TRACE("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid));
dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC;
do {
stored = (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk);
if (!stored) {
if (max_block_ms <= 0) {
DDS_ERROR("The writer could not deliver data on time, probably due to a local reader resources being full\n");
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT);
} else {
dds_sleepfor(DDS_MSECS(DDS_HEADBANG_TIMEOUT_MS));
}
/* Decreasing the block time after the sleep, let's us possibly
* wait a bit too long. But that's preferable compared to waiting
* a bit too short. */
max_block_ms -= DDS_HEADBANG_TIMEOUT_MS;
}
} while ((!stored) && (ret == DDS_RETCODE_OK));
}
}
os_mutexUnlock (&wr->rdary.rdary_lock);
} else {
/* When deleting, pwr is no longer accessible via the hash
tables, and consequently, a reader may be deleted without
it being possible to remove it from rdary. The primary
reason rdary exists is to avoid locking the proxy writer
but this is less of an issue when we are deleting it, so
we fall back to using the GUIDs so that we can deliver all
samples we received from it. As writer being deleted any
reliable samples that are rejected are simply discarded. */
ut_avlIter_t it;
struct pwr_rd_match *m;
struct proxy_writer_info pwr_info;
os_mutexUnlock (&wr->rdary.rdary_lock);
make_proxy_writer_info(&pwr_info, &wr->e, wr->xqos);
os_mutexLock (&wr->e.lock);
for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) {
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) {
DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid));
/* Copied the return value ignore from DDSI deliver_user_data() function. */
(void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk);
}
}
os_mutexUnlock (&wr->e.lock);
while (!(ddsi_plugin.rhc_plugin.rhc_store_fn) (rhc, pwr_info, payload, tk))
{
if (*max_block_ms > 0)
{
dds_sleepfor (DDS_HEADBANG_TIMEOUT);
*max_block_ms -= DDS_HEADBANG_TIMEOUT;
}
return ret;
else
{
DDS_ERROR ("The writer could not deliver data on time, probably due to a local reader resources being full\n");
return DDS_ERRNO (DDS_RETCODE_TIMEOUT);
}
}
return DDS_RETCODE_OK;
}
int
dds_write_impl(
_In_ dds_writer *wr,
_In_ const void * data,
_In_ dds_time_t tstamp,
_In_ dds_write_action action)
static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk)
{
dds_return_t ret = DDS_RETCODE_OK;
int w_rc;
assert (wr);
assert (dds_entity_kind(((dds_entity*)wr)->m_hdl) == DDS_KIND_WRITER);
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
const bool writekey = action & DDS_WR_KEY_BIT;
dds_writer * writer = (dds_writer*) wr;
struct writer * ddsi_wr = writer->m_wr;
struct ddsi_tkmap_instance * tk;
struct ddsi_serdata *d;
if (data == NULL) {
DDS_ERROR("No data buffer provided\n");
return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
dds_return_t ret = DDS_RETCODE_OK;
os_mutexLock (&wr->rdary.rdary_lock);
if (wr->rdary.fastpath_ok)
{
struct reader ** const rdary = wr->rdary.rdary;
if (rdary[0])
{
dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time);
struct proxy_writer_info pwr_info;
unsigned i;
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
for (i = 0; rdary[i]; i++) {
DDS_TRACE ("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid));
if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
break;
}
}
/* Check for topic filter */
if (wr->m_topic->filter_fn && ! writekey) {
if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) {
return DDS_RETCODE_OK;
}
os_mutexUnlock (&wr->rdary.rdary_lock);
}
else
{
/* When deleting, pwr is no longer accessible via the hash
tables, and consequently, a reader may be deleted without
it being possible to remove it from rdary. The primary
reason rdary exists is to avoid locking the proxy writer
but this is less of an issue when we are deleting it, so
we fall back to using the GUIDs so that we can deliver all
samples we received from it. As writer being deleted any
reliable samples that are rejected are simply discarded. */
ut_avlIter_t it;
struct pwr_rd_match *m;
struct proxy_writer_info pwr_info;
dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time);
os_mutexUnlock (&wr->rdary.rdary_lock);
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
os_mutexLock (&wr->e.lock);
for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it))
{
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL)
{
DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid));
/* Copied the return value ignore from DDSI deliver_user_data() function. */
if ((ret = try_store (rd->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
break;
}
}
if (asleep) {
thread_state_awake (thr);
}
/* Serialize and write data or key */
d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data);
/* Set if disposing or unregistering */
d->statusinfo = ((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) |
((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ;
d->timestamp.v = tstamp;
ddsi_serdata_ref(d);
tk = ddsi_tkmap_lookup_instance_ref(d);
w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk);
if (w_rc >= 0) {
/* Flush out write unless configured to batch */
if (! config.whc_batch){
nn_xpack_send (writer->m_xp, false);
}
ret = DDS_RETCODE_OK;
} else if (w_rc == ERR_TIMEOUT) {
DDS_ERROR("The writer could not deliver data on time, probably due to a reader resources being full\n");
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT);
} else if (w_rc == ERR_INVALID_DATA) {
DDS_ERROR("Invalid data provided\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
} else {
DDS_ERROR("Internal error\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
}
if (ret == DDS_RETCODE_OK) {
ret = deliver_locally (ddsi_wr, d, tk);
}
ddsi_serdata_unref(d);
ddsi_tkmap_instance_unref(tk);
if (asleep) {
thread_state_asleep (thr);
}
return ret;
os_mutexUnlock (&wr->e.lock);
}
return ret;
}
int
dds_writecdr_impl(
_In_ dds_writer *wr,
_Inout_ struct ddsi_serdata *d,
_In_ dds_time_t tstamp,
_In_ dds_write_action action)
dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action)
{
int ret = DDS_RETCODE_OK;
int w_rc;
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
const bool writekey = action & DDS_WR_KEY_BIT;
struct writer *ddsi_wr = wr->m_wr;
struct ddsi_tkmap_instance *tk;
struct ddsi_serdata *d;
dds_return_t ret = DDS_RETCODE_OK;
int w_rc;
assert (wr);
if (data == NULL)
{
DDS_ERROR("No data buffer provided\n");
return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
}
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct writer * ddsi_wr = wr->m_wr;
struct ddsi_tkmap_instance * tk;
/* Check for topic filter */
if (wr->m_topic->filter_fn && !writekey)
if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx))
return DDS_RETCODE_OK;
if (wr->m_topic->filter_fn) {
abort();
}
if (asleep)
thread_state_awake (thr);
if (asleep) {
thread_state_awake (thr);
}
/* Serialize and write data or key */
d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data);
d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0);
d->timestamp.v = tstamp;
ddsi_serdata_ref (d);
tk = ddsi_tkmap_lookup_instance_ref (d);
w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk);
/* Set if disposing or unregistering */
d->statusinfo =
((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) |
((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ;
d->timestamp.v = tstamp;
ddsi_serdata_ref(d);
tk = ddsi_tkmap_lookup_instance_ref(d);
w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk);
if (w_rc >= 0) {
/* Flush out write unless configured to batch */
if (! config.whc_batch) {
nn_xpack_send (wr->m_xp, false);
}
ret = DDS_RETCODE_OK;
} else if (w_rc == ERR_TIMEOUT) {
DDS_ERROR("The writer could not deliver data on time, probably due to a reader resources being full\n");
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT);
} else if (w_rc == ERR_INVALID_DATA) {
DDS_ERROR("Invalid data provided\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
} else {
DDS_ERROR("Internal error\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
}
if (w_rc >= 0)
{
/* Flush out write unless configured to batch */
if (!config.whc_batch)
nn_xpack_send (wr->m_xp, false);
ret = DDS_RETCODE_OK;
} else if (w_rc == ERR_TIMEOUT) {
DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n");
ret = DDS_ERRNO (DDS_RETCODE_TIMEOUT);
} else if (w_rc == ERR_INVALID_DATA) {
DDS_ERROR ("Invalid data provided\n");
ret = DDS_ERRNO (DDS_RETCODE_ERROR);
} else {
DDS_ERROR ("Internal error\n");
ret = DDS_ERRNO (DDS_RETCODE_ERROR);
}
if (ret == DDS_RETCODE_OK)
ret = deliver_locally (ddsi_wr, d, tk);
ddsi_serdata_unref (d);
ddsi_tkmap_instance_unref (tk);
if (ret == DDS_RETCODE_OK) {
ret = deliver_locally (ddsi_wr, d, tk);
}
ddsi_serdata_unref(d);
ddsi_tkmap_instance_unref(tk);
if (asleep) {
thread_state_asleep (thr);
}
return ret;
if (asleep)
thread_state_asleep (thr);
return ret;
}
void
dds_write_set_batch(
bool enable)
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d)
{
config.whc_batch = enable ? 1 : 0;
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct ddsi_tkmap_instance * tk;
int ret = DDS_RETCODE_OK;
int w_rc;
if (asleep)
thread_state_awake (thr);
ddsi_serdata_ref (d);
tk = ddsi_tkmap_lookup_instance_ref (d);
w_rc = write_sample_gc (xp, ddsi_wr, d, tk);
if (w_rc >= 0) {
/* Flush out write unless configured to batch */
if (!config.whc_batch && xp != NULL)
nn_xpack_send (xp, false);
ret = DDS_RETCODE_OK;
} else if (w_rc == ERR_TIMEOUT) {
DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n");
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT);
} else if (w_rc == ERR_INVALID_DATA) {
DDS_ERROR ("Invalid data provided\n");
ret = DDS_ERRNO (DDS_RETCODE_ERROR);
} else {
DDS_ERROR ("Internal error\n");
ret = DDS_ERRNO (DDS_RETCODE_ERROR);
}
if (ret == DDS_RETCODE_OK)
ret = deliver_locally (ddsi_wr, d, tk);
ddsi_serdata_unref (d);
ddsi_tkmap_instance_unref (tk);
if (asleep)
thread_state_asleep (thr);
return ret;
}
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
void
dds_write_flush(
dds_entity_t writer)
dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action)
{
dds__retcode_t rc;
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
dds_writer *wr;
if (asleep) {
thread_state_awake (thr);
}
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
nn_xpack_send (wr->m_xp, true);
dds_writer_unlock(wr);
} else{
DDS_ERROR("Error occurred on locking writer\n");
}
if (asleep) {
thread_state_asleep (thr);
}
return ;
if (wr->m_topic->filter_fn)
abort ();
/* Set if disposing or unregistering */
d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0);
d->timestamp.v = tstamp;
return dds_writecdr_impl_lowlevel (wr->m_wr, wr->m_xp, d);
}
void dds_write_set_batch (bool enable)
{
config.whc_batch = enable ? 1 : 0;
}
void dds_write_flush (dds_entity_t writer)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
dds_writer *wr;
dds__retcode_t rc;
if (asleep)
thread_state_awake (thr);
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
DDS_ERROR ("Error occurred on locking writer\n");
else
{
nn_xpack_send (wr->m_xp, true);
dds_writer_unlock (wr);
}
if (asleep)
thread_state_asleep (thr);
return;
}

View file

@ -147,20 +147,21 @@ check_default_qos_of_builtin_entity(dds_entity_t entity)
CU_Test(ddsc_builtin_topics, availability_builtin_topics, .init = setup, .fini = teardown)
{
/* FIXME: Successful lookup doesn't rhyme with them not being returned when looking at the children of the participant ... */
dds_entity_t topic;
topic = dds_find_topic(g_participant, "DCPSParticipant");
CU_ASSERT_FATAL(topic > 0);
dds_delete(topic);
CU_ASSERT_FATAL(topic < 0);
//dds_delete(topic);
topic = dds_find_topic(g_participant, "DCPSTopic");
CU_ASSERT_FATAL(topic < 0);
//TODO CHAM-347: dds_delete(topic);
//dds_delete(topic);
topic = dds_find_topic(g_participant, "DCPSSubscription");
CU_ASSERT_FATAL(topic > 0);
dds_delete(topic);
CU_ASSERT_FATAL(topic < 0);
//dds_delete(topic);
topic = dds_find_topic(g_participant, "DCPSPublication");
CU_ASSERT_FATAL(topic > 0);
dds_delete(topic);
CU_ASSERT_FATAL(topic < 0);
//dds_delete(topic);
}
CU_Test(ddsc_builtin_topics, read_publication_data, .init = setup, .fini = teardown)

View file

@ -20,10 +20,8 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_mcgroup.c
ddsi_serdata.c
ddsi_serdata_default.c
ddsi_serdata_builtin.c
ddsi_sertopic.c
ddsi_sertopic_default.c
ddsi_sertopic_builtin.c
ddsi_rhc_plugin.c
ddsi_iid.c
ddsi_tkmap.c
@ -76,7 +74,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi"
ddsi_serdata.h
ddsi_sertopic.h
ddsi_serdata_default.h
ddsi_serdata_builtin.h
ddsi_rhc_plugin.h
ddsi_iid.h
ddsi_tkmap.h

View file

@ -22,7 +22,6 @@
#include "ddsi/q_xqos.h"
#include "ddsi/ddsi_tran.h"
#include "ddsi/q_feature_check.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsi/ddsi_rhc_plugin.h"
#if defined (__cplusplus)
@ -412,7 +411,10 @@ struct ddsi_plugin
{
int (*init_fn) (void);
void (*fini_fn) (void);
void (*builtin_write) (enum ddsi_sertopic_builtin_type type, const nn_guid_t *guid, nn_wctime_t timestamp, bool alive);
bool (*builtintopic_is_visible) (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid);
struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid);
void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
/* Read cache */
struct ddsi_rhc_plugin rhc_plugin;

View file

@ -127,12 +127,15 @@ struct pwr_rd_match {
struct nn_rsample_info;
struct nn_rdata;
struct ddsi_tkmap_instance;
struct entity_common {
enum entity_kind kind;
nn_guid_t guid;
nn_wctime_t tupdate; /* timestamp of last update */
char *name;
uint64_t iid;
struct ddsi_tkmap_instance *tk;
os_mutex lock;
bool onlylocal;
};
@ -390,8 +393,11 @@ int is_deleted_participant_guid (const struct nn_guid *guid, unsigned for_what);
nn_entityid_t to_entityid (unsigned u);
int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid);
int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid);
bool is_local_orphan_endpoint (const struct entity_common *e);
int is_writer_entityid (nn_entityid_t id);
int is_reader_entityid (nn_entityid_t id);
nn_vendorid_t get_entity_vendorid (const struct entity_common *e);
int pp_allocate_entityid (nn_entityid_t *id, unsigned kind, struct participant *pp);
void pp_release_entityid(struct participant *pp, nn_entityid_t id);
@ -473,9 +479,9 @@ struct writer *get_builtin_writer (const struct participant *pp, unsigned entity
GUID "ppguid". May return NULL if participant unknown or
writer/reader already known. */
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg);
struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg);
struct reader * new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void * status_cb_arg);
struct reader *new_reader (struct nn_guid *rdguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct rhc * rhc, status_cb_t status_cb, void *status_cb_arg);
struct whc_node;
struct whc_state;
@ -492,6 +498,12 @@ int delete_writer_nolinger_locked (struct writer *wr);
int delete_reader (const struct nn_guid *guid);
uint64_t reader_instance_id (const struct nn_guid *guid);
struct local_orphan_writer {
struct writer wr;
};
struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc);
void delete_local_orphan_writer (struct local_orphan_writer *wr);
/* To create or delete a new proxy participant: "guid" MUST have the
pre-defined participant entity id. Unlike delete_participant(),
deleting a proxy participant will automatically delete all its

View file

@ -43,6 +43,7 @@ struct gcreq {
};
struct gcreq_queue *gcreq_queue_new (void);
void gcreq_queue_drain (struct gcreq_queue *q);
void gcreq_queue_free (struct gcreq_queue *q);
struct gcreq *gcreq_new (struct gcreq_queue *gcreq_queue, gcreq_cb_t cb);

View file

@ -281,11 +281,6 @@ struct q_globals {
struct ddsi_sertopic *plist_topic; /* used for all discovery data */
struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */
/* Sertopics for built-in topics -- FIXME: these really have little to do with topics, but everything with topic types, in other words, they are the type supports in DDS ... so a bit of refactoring is required */
struct ddsi_sertopic *builtin_participant_topic;
struct ddsi_sertopic *builtin_reader_topic;
struct ddsi_sertopic *builtin_writer_topic;
/* Network ID needed by v_groupWrite -- FIXME: might as well pass it
to the receive thread instead of making it global (and that would
remove the need to include kernelModule.h) */

View file

@ -80,7 +80,8 @@ int rtps_config_prep (struct cfgst *cfgst);
int rtps_config_open (void);
int rtps_init (void);
void ddsi_plugin_init (void);
void rtps_term (void);
void rtps_stop (void);
void rtps_fini (void);
#if defined (__cplusplus)
}

View file

@ -42,7 +42,7 @@ struct whc_state {
an iter on the stack without specifying an implementation. If future changes or
implementations require more, these can be adjusted. An implementation should check
things fit at compile time. */
#define WHC_SAMPLE_ITER_SIZE (1*sizeof(void *))
#define WHC_SAMPLE_ITER_SIZE (7 * sizeof(void *))
struct whc_sample_iter_base {
struct whc *whc;
};

View file

@ -149,39 +149,41 @@ int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid)
}
}
static int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid)
int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid)
{
return is_builtin_entityid (id, vendorid) && id.u != NN_ENTITYID_PARTICIPANT;
}
static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_vendorid_t vendorid, bool onlylocal, struct ddsi_tkmap_instance **tk)
bool is_local_orphan_endpoint (const struct entity_common *e)
{
return (e->guid.prefix.u[0] == 0 && e->guid.prefix.u[1] == 0 && e->guid.prefix.u[2] == 0 &&
is_builtin_endpoint (e->guid.entityid, ownvendorid));
}
static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_wctime_t tcreate, nn_vendorid_t vendorid, bool onlylocal)
{
e->guid = *guid;
e->kind = kind;
e->tupdate = tcreate;
e->name = os_strdup (name ? name : "");
e->onlylocal = onlylocal;
os_mutexInit (&e->lock);
if (onlylocal || is_builtin_endpoint (guid->entityid, vendorid))
if (ddsi_plugin.builtintopic_is_visible (guid->entityid, onlylocal, vendorid))
{
e->iid = ddsi_iid_gen ();
*tk = NULL;
e->tk = ddsi_plugin.builtintopic_get_tkmap_entry (guid);
e->iid = e->tk->m_iid;
}
else
{
struct ddsi_serdata *sd;
struct nn_keyhash kh;
memcpy (&kh, guid, sizeof (kh));
/* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
sd = ddsi_serdata_from_keyhash (gv.builtin_participant_topic, &kh);
/* FIXME: this makes the iid for a reincarnation of a proxy entity dependent on whether an application reader kept the corresponding built-in topic instance around, it may be attractive to reconsider and guarantee a new iid in these cases, at least for the publication handle */
*tk = ddsi_tkmap_find(sd, false, true);
ddsi_serdata_unref (sd);
e->iid = (*tk)->m_iid;
e->tk = NULL;
e->iid = ddsi_iid_gen ();
}
}
static void entity_common_fini (struct entity_common *e)
{
if (e->tk)
ddsi_tkmap_instance_unref (e->tk);
os_free (e->name);
os_mutexDestroy (&e->lock);
}
@ -238,38 +240,27 @@ void local_reader_ary_setinvalid (struct local_reader_ary *x)
os_mutexUnlock (&x->rdary_lock);
}
static void write_builtin_topic_any (const struct entity_common *e, nn_wctime_t timestamp, bool alive, nn_vendorid_t vendorid, struct ddsi_tkmap_instance *tk)
nn_vendorid_t get_entity_vendorid (const struct entity_common *e)
{
if (!(e->onlylocal || is_builtin_endpoint(e->guid.entityid, vendorid)))
switch (e->kind)
{
/* initialize to avoid gcc warning ultimately caused by C's horrible type system */
enum ddsi_sertopic_builtin_type type = DSBT_PARTICIPANT;
switch (e->kind)
{
case EK_PARTICIPANT:
case EK_PROXY_PARTICIPANT:
type = DSBT_PARTICIPANT;
break;
case EK_READER:
case EK_PROXY_READER:
type = DSBT_READER;
break;
case EK_WRITER:
case EK_PROXY_WRITER:
type = DSBT_WRITER;
break;
}
assert(type != DSBT_PARTICIPANT || (e->kind == EK_PARTICIPANT || e->kind == EK_PROXY_PARTICIPANT));
ddsi_plugin.builtin_write (type, &e->guid, timestamp, alive);
case EK_PARTICIPANT:
case EK_READER:
case EK_WRITER:
return (nn_vendorid_t) MY_VENDOR_ID;
break;
case EK_PROXY_PARTICIPANT:
return ((const struct proxy_participant *) e)->vendor;
break;
case EK_PROXY_READER:
return ((const struct proxy_reader *) e)->c.vendor;
break;
case EK_PROXY_WRITER:
return ((const struct proxy_writer *) e)->c.vendor;
break;
}
/* tkmap instance only needs to be kept around until the first write of a built-in topic (if none ever happens, it needn't be kept at all): afterward, the WHC of the local built-in topic writer will keep the entry alive. FIXME: the SPDP/SEPD ones currently use default sertopics instead of builtin sertopics, and so use different mappings and different instnace ids. No-one ever sees those ids, so it doesn't matter, but it would nicer if it could actually be the same one. FIXME: it would also be nicer if the local built-in topics and the SPDP/SEDP writers were the same, but I want the locally created endpoints visible in the built-in topics as well, and those don't exist in the discovery writers ... */
if (tk)
ddsi_tkmap_instance_unref (tk);
}
static void write_builtin_topic_local (const struct entity_common *e, nn_wctime_t timestamp, bool alive, struct ddsi_tkmap_instance *tk)
{
write_builtin_topic_any(e, timestamp, alive, ownvendorid, tk);
assert (0);
return (nn_vendorid_t) NN_VENDORID_UNKNOWN;
}
/* DELETED PARTICIPANTS --------------------------------------------- */
@ -408,7 +399,6 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
{
struct participant *pp;
nn_guid_t subguid, group_guid;
struct ddsi_tkmap_instance *tk;
/* no reserved bits may be set */
assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0);
@ -451,7 +441,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
pp = os_malloc (sizeof (*pp));
entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0), &tk);
entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, now (), ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0));
pp->user_refc = 1;
pp->builtin_refc = 0;
pp->builtins_deleted = 0;
@ -630,7 +620,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
trigger_recv_threads ();
}
write_builtin_topic_local(&pp->e, now(), true, tk);
ddsi_plugin.builtintopic_write (&pp->e, now(), true);
/* SPDP periodic broadcast uses the retransmit path, so the initial
publication must be done differently. Must be later than making
@ -866,7 +856,7 @@ int delete_participant (const struct nn_guid *ppguid)
struct participant *pp;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
return ERR_UNKNOWN_ENTITY;
write_builtin_topic_local(&pp->e, now(), false, NULL);
ddsi_plugin.builtintopic_write (&pp->e, now(), false);
remember_deleted_participant_guid (&pp->e.guid);
ephash_remove_participant_guid (pp);
gcreq_participant (pp);
@ -2077,7 +2067,7 @@ static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn
{
int32_t reason;
(void)tnow;
if (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid))
if (!is_local_orphan_endpoint (&wr->e) && (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid)))
return;
if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0)
{
@ -2295,7 +2285,7 @@ static void generic_do_local_match (struct entity_common *e, nn_mtime_t tnow)
struct ephash_enum est;
struct entity_common *em;
enum entity_kind mkind;
if (is_builtin_entityid (e->guid.entityid, ownvendorid))
if (is_builtin_entityid (e->guid.entityid, ownvendorid) && !is_local_orphan_endpoint (e))
/* never a need for local matches on discovery endpoints */
return;
mkind = generic_do_local_match_mkind(e->kind);
@ -2374,34 +2364,27 @@ static void new_reader_writer_common (const struct nn_guid *guid, const struct d
topic ? topic->typename : "(null)");
}
static void endpoint_common_init
(
struct entity_common *e,
struct endpoint_common *c,
enum entity_kind kind,
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp,
struct ddsi_tkmap_instance **tk
)
static void endpoint_common_init (struct entity_common *e, struct endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp)
{
entity_common_init (e, guid, NULL, kind, ownvendorid, pp->e.onlylocal, tk);
entity_common_init (e, guid, NULL, kind, now (), ownvendorid, pp->e.onlylocal);
c->pp = ref_participant (pp, &e->guid);
if (group_guid)
{
c->group_guid = *group_guid;
}
else
{
memset (&c->group_guid, 0, sizeof (c->group_guid));
}
}
static void endpoint_common_fini (struct entity_common *e, struct endpoint_common *c)
{
if (!is_builtin_entityid(e->guid.entityid, ownvendorid))
pp_release_entityid(c->pp, e->guid.entityid);
unref_participant (c->pp, &e->guid);
if (c->pp)
unref_participant (c->pp, &e->guid);
else
{
/* only for the (almost pseudo) writers used for generating the built-in topics */
assert (is_local_orphan_endpoint (e));
}
entity_common_fini (e);
}
@ -2608,25 +2591,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru
return n;
}
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity)
static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity)
{
struct writer *wr;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_writer_guid (guid) == NULL);
assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0);
new_reader_writer_common (guid, topic, xqos);
wr = os_malloc (sizeof (*wr));
/* want a pointer to the participant so that a parallel call to
delete_participant won't interfere with our ability to address
the participant */
endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, &tk);
os_condInit (&wr->throttle_cond, &wr->e.lock);
wr->seq = 0;
wr->cs_seq = 0;
@ -2658,12 +2624,10 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
assert (wr->xqos->aliased == 0);
set_topic_type_name (wr->xqos, topic);
if (dds_get_log_mask() & DDS_LC_DISCOVERY)
{
DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid));
nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos);
DDS_LOG(DDS_LC_DISCOVERY, "}\n");
}
DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid));
nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos);
DDS_LOG(DDS_LC_DISCOVERY, "}\n");
assert (wr->xqos->present & QP_RELIABILITY);
wr->reliable = (wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS);
assert (wr->xqos->present & QP_DURABILITY);
@ -2818,23 +2782,43 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
ut_avlInit (&wr_local_readers_treedef, &wr->local_readers);
local_reader_ary_init (&wr->rdary);
}
static struct writer *new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity)
{
struct writer *wr;
nn_mtime_t tnow = now_mt ();
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_writer_guid (guid) == NULL);
assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0);
new_reader_writer_common (guid, topic, xqos);
wr = os_malloc (sizeof (*wr));
/* want a pointer to the participant so that a parallel call to
delete_participant won't interfere with our ability to address
the participant */
endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp);
new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity);
/* guid_hash needed for protocol handling, so add it before we send
out our first message. Also: needed for matching, and swapping
the order if hash insert & matching creates a window during which
neither of two endpoints being created in parallel can discover
the other. */
out our first message. Also: needed for matching, and swapping
the order if hash insert & matching creates a window during which
neither of two endpoints being created in parallel can discover
the other. */
ephash_insert_writer_guid (wr);
/* once it exists, match it with proxy writers and broadcast
existence (I don't think it matters much what the order of these
two is, but it seems likely that match-then-broadcast has a
slightly lower likelihood that a response from a proxy reader
gets dropped) -- but note that without adding a lock it might be
deleted while we do so */
existence (I don't think it matters much what the order of these
two is, but it seems likely that match-then-broadcast has a
slightly lower likelihood that a response from a proxy reader
gets dropped) -- but note that without adding a lock it might be
deleted while we do so */
match_writer_with_proxy_readers (wr, tnow);
match_writer_with_local_readers (wr, tnow);
write_builtin_topic_local(&wr->e, now(), true, tk);
ddsi_plugin.builtintopic_write (&wr->e, now(), true);
sedp_write_writer (wr);
if (wr->lease_duration != T_NEVER)
@ -2846,7 +2830,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
return wr;
}
struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void * status_cb_arg)
struct writer *new_writer (struct nn_guid *wrguid, const struct nn_guid *group_guid, const struct nn_guid *ppguid, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc * whc, status_cb_t status_cb, void *status_cb_arg)
{
struct participant *pp;
struct writer * wr;
@ -2866,6 +2850,29 @@ struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_
return wr;
}
struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc)
{
nn_guid_t guid;
struct local_orphan_writer *lowr;
struct writer *wr;
nn_mtime_t tnow = now_mt ();
DDS_LOG(DDS_LC_DISCOVERY, "new_local_orphan_writer(%s/%s)\n", topic->name, topic->typename);
lowr = os_malloc (sizeof (*lowr));
wr = &lowr->wr;
memset (&guid.prefix, 0, sizeof (guid.prefix));
guid.entityid = entityid;
entity_common_init (&wr->e, &guid, NULL, EK_WRITER, now (), ownvendorid, true);
wr->c.pp = NULL;
memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid));
new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL);
ephash_insert_writer_guid (wr);
match_writer_with_local_readers (wr, tnow);
ddsi_plugin.builtintopic_write (&wr->e, now(), true);
return lowr;
}
static void gc_delete_writer (struct gcreq *gcreq)
{
struct writer *wr = gcreq->arg;
@ -2960,7 +2967,7 @@ int delete_writer_nolinger_locked (struct writer *wr)
{
DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid %x:%x:%x:%x) ...\n", PGUID (wr->e.guid));
ASSERT_MUTEX_HELD (&wr->e.lock);
write_builtin_topic_local(&wr->e, now(), false, NULL);
ddsi_plugin.builtintopic_write (&wr->e, now(), false);
local_reader_ary_setinvalid (&wr->rdary);
ephash_remove_writer_guid (wr);
writer_set_state (wr, WRST_DELETING);
@ -2990,6 +2997,13 @@ int delete_writer_nolinger (const struct nn_guid *guid)
return 0;
}
void delete_local_orphan_writer (struct local_orphan_writer *lowr)
{
os_mutexLock (&lowr->wr.e.lock);
delete_writer_nolinger_locked (&lowr->wr);
os_mutexUnlock (&lowr->wr.e.lock);
}
int delete_writer (const struct nn_guid *guid)
{
struct writer *wr;
@ -3177,7 +3191,6 @@ static struct reader * new_reader_guid
struct reader * rd;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_reader_guid (guid) == NULL);
@ -3186,7 +3199,7 @@ static struct reader * new_reader_guid
new_reader_writer_common (guid, topic, xqos);
rd = os_malloc (sizeof (*rd));
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, &tk);
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp);
/* Copy QoS, merging in defaults */
rd->xqos = os_malloc (sizeof (*rd->xqos));
@ -3290,7 +3303,7 @@ static struct reader * new_reader_guid
ephash_insert_reader_guid (rd);
match_reader_with_proxy_writers (rd, tnow);
match_reader_with_local_writers (rd, tnow);
write_builtin_topic_local(&rd->e, now(), true, tk);
ddsi_plugin.builtintopic_write (&rd->e, now(), true);
sedp_write_reader (rd);
return rd;
}
@ -3383,7 +3396,7 @@ int delete_reader (const struct nn_guid *guid)
(ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc);
}
DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid));
write_builtin_topic_local(&rd->e, now(), false, NULL);
ddsi_plugin.builtintopic_write (&rd->e, now(), false);
ephash_remove_reader_guid (rd);
gcreq_reader (rd);
return 0;
@ -3459,7 +3472,6 @@ void new_proxy_participant
runs on a single thread, it can't go wrong. FIXME, maybe? The
same holds for the other functions for creating entities. */
struct proxy_participant *proxypp;
struct ddsi_tkmap_instance *tk;
assert (ppguid->entityid.u == NN_ENTITYID_PARTICIPANT);
assert (ephash_lookup_proxy_participant_guid (ppguid) == NULL);
@ -3469,7 +3481,7 @@ void new_proxy_participant
proxypp = os_malloc (sizeof (*proxypp));
entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, vendor, false, &tk);
entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false);
proxypp->refc = 1;
proxypp->lease_expired = 0;
proxypp->vendor = vendor;
@ -3625,7 +3637,7 @@ void new_proxy_participant
if (proxypp->owns_lease)
lease_register (os_atomic_ldvoidp (&proxypp->lease));
write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, tk);
ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true);
os_mutexUnlock (&proxypp->e.lock);
}
@ -3643,7 +3655,7 @@ int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, co
switch (source)
{
case UPD_PROXYPP_SPDP:
write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, NULL);
ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true);
proxypp->proxypp_have_spdp = 1;
break;
case UPD_PROXYPP_CM:
@ -3892,7 +3904,7 @@ int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t t
return ERR_UNKNOWN_ENTITY;
}
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
write_builtin_topic_any(&ppt->e, timestamp, false, ppt->vendor, NULL);
ddsi_plugin.builtintopic_write (&ppt->e, timestamp, false);
remember_deleted_participant_guid (&ppt->e.guid);
ephash_remove_proxy_participant_guid (ppt);
os_mutexUnlock (&gv.lock);
@ -4021,12 +4033,7 @@ void delete_proxy_group (const nn_guid_t *guid, nn_wctime_t timestamp, int isimp
/* PROXY-ENDPOINT --------------------------------------------------- */
static void proxy_endpoint_common_init
(
struct entity_common *e, struct proxy_endpoint_common *c,
enum entity_kind kind, const struct nn_guid *guid, struct proxy_participant *proxypp,
struct addrset *as, const nn_plist_t *plist, struct ddsi_tkmap_instance **tk
)
static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
{
const char *name;
@ -4036,7 +4043,7 @@ static void proxy_endpoint_common_init
assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == (QP_TOPIC_NAME | QP_TYPE_NAME));
name = (plist->present & PP_ENTITY_NAME) ? plist->entity_name : "";
entity_common_init (e, guid, name, kind, proxypp->vendor, false, tk);
entity_common_init (e, guid, name, kind, tcreate, proxypp->vendor, false);
c->xqos = nn_xqos_dup (&plist->qos);
c->as = ref_addrset (as);
c->topic = NULL; /* set from first matching reader/writer */
@ -4071,8 +4078,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
struct proxy_writer *pwr;
int isreliable;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
(void)timestamp;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_writer_guid (guid) == NULL);
@ -4083,7 +4089,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
}
pwr = os_malloc (sizeof (*pwr));
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, proxypp, as, plist, &tk);
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, proxypp, as, plist);
ut_avlInit (&pwr_readers_treedef, &pwr->readers);
pwr->n_reliable_readers = 0;
@ -4149,7 +4155,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
local_reader_ary_init (&pwr->rdary);
ephash_insert_proxy_writer_guid (pwr);
match_proxy_writer_with_readers (pwr, tnow);
write_builtin_topic_any(&pwr->e, timestamp, true, pwr->c.vendor, tk);
ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true);
os_mutexLock (&pwr->e.lock);
pwr->local_matching_inprogress = 0;
@ -4265,7 +4271,6 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit)
{
struct proxy_writer *pwr;
(void)timestamp;
(void)isimplicit;
DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_writer (%x:%x:%x:%x) ", PGUID (*guid));
os_mutexLock (&gv.lock);
@ -4281,7 +4286,7 @@ int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int
from removing themselves from the proxy writer's rdary[]. */
local_reader_ary_setinvalid (&pwr->rdary);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
write_builtin_topic_any(&pwr->e, timestamp, false, pwr->c.vendor, NULL);
ddsi_plugin.builtintopic_write (&pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (pwr);
os_mutexUnlock (&gv.lock);
gcreq_proxy_writer (pwr);
@ -4299,8 +4304,6 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
struct proxy_participant *proxypp;
struct proxy_reader *prd;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
(void)timestamp;
assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_reader_guid (guid) == NULL);
@ -4312,7 +4315,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
}
prd = os_malloc (sizeof (*prd));
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, proxypp, as, plist, &tk);
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, proxypp, as, plist);
prd->deleting = 0;
#ifdef DDSI_INCLUDE_SSM
@ -4326,7 +4329,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
ut_avlInit (&prd_writers_treedef, &prd->writers);
ephash_insert_proxy_reader_guid (prd);
match_proxy_reader_with_writers (prd, tnow);
write_builtin_topic_any(&prd->e, timestamp, true, prd->c.vendor, tk);
ddsi_plugin.builtintopic_write (&prd->e, timestamp, true);
return 0;
}
@ -4399,7 +4402,6 @@ static void gc_delete_proxy_reader (struct gcreq *gcreq)
int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit)
{
struct proxy_reader *prd;
(void)timestamp;
(void)isimplicit;
DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_reader (%x:%x:%x:%x) ", PGUID (*guid));
os_mutexLock (&gv.lock);
@ -4409,7 +4411,7 @@ int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int
DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n");
return ERR_UNKNOWN_ENTITY;
}
write_builtin_topic_any(&prd->e, timestamp, false, prd->c.vendor, NULL);
ddsi_plugin.builtintopic_write (&prd->e, timestamp, false);
ephash_remove_proxy_reader_guid (prd);
os_mutexUnlock (&gv.lock);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");

View file

@ -180,6 +180,14 @@ struct gcreq_queue *gcreq_queue_new (void)
return q;
}
void gcreq_queue_drain (struct gcreq_queue *q)
{
os_mutexLock (&q->lock);
while (q->count != 0)
os_condWait (&q->cond, &q->lock);
os_mutexUnlock (&q->lock);
}
void gcreq_queue_free (struct gcreq_queue *q)
{
struct gcreq *gcreq;
@ -191,7 +199,8 @@ void gcreq_queue_free (struct gcreq_queue *q)
os_mutexLock (&q->lock);
q->terminate = 1;
/* Wait until there is only request in existence, the one we just
allocated. Then we know the gc system is quiet. */
allocated (this is also why we can't use "drain" here). Then
we know the gc system is quiet. */
while (q->count != 1)
os_condWait (&q->cond, &q->lock);
os_mutexUnlock (&q->lock);
@ -227,7 +236,7 @@ void gcreq_free (struct gcreq *gcreq)
struct gcreq_queue *gcreq_queue = gcreq->queue;
os_mutexLock (&gcreq_queue->lock);
--gcreq_queue->count;
if (gcreq_queue->terminate && gcreq_queue->count <= 1)
if (gcreq_queue->count <= 1)
os_condBroadcast (&gcreq_queue->cond);
os_mutexUnlock (&gcreq_queue->lock);
os_free (gcreq);

View file

@ -54,7 +54,6 @@
#include "ddsi/ddsi_raweth.h"
#include "ddsi/ddsi_mcgroup.h"
#include "ddsi/ddsi_serdata_default.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsi/ddsi_tkmap.h"
#include "dds__whc.h"
@ -772,18 +771,12 @@ static void make_special_topics (void)
{
gv.plist_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist);
gv.rawcdr_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr);
gv.builtin_participant_topic = new_sertopic_builtin (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
gv.builtin_reader_topic = new_sertopic_builtin (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
gv.builtin_writer_topic = new_sertopic_builtin (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
}
static void free_special_topics (void)
{
ddsi_sertopic_unref (gv.plist_topic);
ddsi_sertopic_unref (gv.rawcdr_topic);
ddsi_sertopic_unref (gv.builtin_participant_topic);
ddsi_sertopic_unref (gv.builtin_reader_topic);
ddsi_sertopic_unref (gv.builtin_writer_topic);
}
static int setup_and_start_recv_threads (void)
@ -1408,7 +1401,7 @@ static void builtins_dqueue_ready_cb (void *varg)
os_mutexUnlock (&arg->lock);
}
void rtps_term (void)
void rtps_stop (void)
{
struct thread_state1 *self = lookup_thread_state ();
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS
@ -1527,17 +1520,26 @@ void rtps_term (void)
}
/* Wait until all participants are really gone => by then we can be
certain that no new GC requests will be added */
certain that no new GC requests will be added, short of what we
do here */
os_mutexLock (&gv.participant_set_lock);
while (gv.nparticipants > 0)
os_condWait (&gv.participant_set_cond, &gv.participant_set_lock);
os_mutexUnlock (&gv.participant_set_lock);
/* Wait until no more GC requests are outstanding -- not really
necessary, but it allows us to claim the stack is quiescent
at this point */
gcreq_queue_drain (gv.gcreq_queue);
/* Clean up privileged_pp -- it must be NULL now (all participants
are gone), but the lock still needs to be destroyed */
assert (gv.privileged_pp == NULL);
os_mutexDestroy (&gv.privileged_pp_lock);
}
void rtps_fini (void)
{
/* Shut down the GC system -- no new requests will be added */
gcreq_queue_free (gv.gcreq_queue);

View file

@ -893,7 +893,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
res = 1;
#ifndef NDEBUG
if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER)
if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER && !is_local_orphan_endpoint (&wr->e))
{
struct whc_state whcst;
whc_get_state(wr->whc, &whcst);