Merge pull request #89 from eboasson/builtintopics

Builtintopics Revisited
This commit is contained in:
eboasson 2019-01-15 11:46:48 +01:00 committed by GitHub
commit c10a7fab4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
34 changed files with 904 additions and 1002 deletions

View file

@ -53,11 +53,7 @@ if(CMAKE_SYSTEM_NAME STREQUAL "VxWorks")
endif() endif()
if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro") if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro")
add_definitions(-m64) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -m64 -xc99 -D__restrict=restrict")
add_definitions(-xc99)
add_definitions(-D__restrict=restrict)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -m64")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_LINKER_FLAGS} -m64")
endif() endif()
# Conan # Conan

View file

@ -37,6 +37,9 @@ PREPEND(srcs_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds_subscriber.c dds_subscriber.c
dds_write.c dds_write.c
dds_whc.c dds_whc.c
dds_whc_builtintopic.c
dds_serdata_builtintopic.c
dds_sertopic_builtintopic.c
) )
PREPEND(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include/ddsc>$<INSTALL_INTERFACE:include/ddsc>" PREPEND(hdrs_public_ddsc "$<BUILD_INTERFACE:${CMAKE_CURRENT_LIST_DIR}/include/ddsc>$<INSTALL_INTERFACE:include/ddsc>"
@ -73,6 +76,8 @@ PREPEND(hdrs_private_ddsc "${CMAKE_CURRENT_LIST_DIR}/src"
dds__write.h dds__write.h
dds__writer.h dds__writer.h
dds__whc.h dds__whc.h
dds__whc_builtintopic.h
dds__serdata_builtintopic.h
) )
configure_file( configure_file(

View file

@ -1777,7 +1777,7 @@ dds_write_flush(
* @returns A dds_return_t indicating success or failure. * @returns A dds_return_t indicating success or failure.
*/ */
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) _Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER)
DDS_EXPORT int DDS_EXPORT dds_return_t
dds_writecdr( dds_writecdr(
dds_entity_t writer, dds_entity_t writer,
struct ddsi_serdata *serdata); struct ddsi_serdata *serdata);

View file

@ -13,49 +13,31 @@
#define _DDS_BUILTIN_H_ #define _DDS_BUILTIN_H_
#include "ddsi/q_time.h" #include "ddsi/q_time.h"
#include "ddsi/ddsi_serdata_builtin.h"
#if defined (__cplusplus) #if defined (__cplusplus)
extern "C" extern "C"
{ {
#endif #endif
/* Get actual topic in related participant related to topic 'id'. */ /* Get actual topic in related participant related to topic 'id'. */
_Must_inspect_result_ dds_entity_t dds_entity_t dds__get_builtin_topic ( dds_entity_t e, dds_entity_t topic);
dds__get_builtin_topic(
_In_ dds_entity_t e,
_In_ dds_entity_t topic);
/* Global publisher singleton (publishes only locally). */
_Must_inspect_result_ dds_entity_t
dds__get_builtin_publisher(
void);
/* Subscriber singleton within related participant. */ /* Subscriber singleton within related participant. */
_Must_inspect_result_ dds_entity_t dds_entity_t dds__get_builtin_subscriber(dds_entity_t e);
dds__get_builtin_subscriber(
_In_ dds_entity_t e);
/* Checks whether the reader QoS is valid for use with built-in topic TOPIC */ /* Checks whether the reader QoS is valid for use with built-in topic TOPIC */
bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos); bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos);
/* Initialization and finalize functions. */ struct entity_common;
void struct nn_guid;
dds__builtin_init( struct ddsi_tkmap_instance;
void);
void void dds__builtin_init (void);
dds__builtin_fini( void dds__builtin_fini (void);
void); bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid);
struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid);
void struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
dds__builtin_write( void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
_In_ enum ddsi_sertopic_builtin_type type,
_In_ const nn_guid_t *guid,
_In_ nn_wctime_t timestamp,
_In_ bool alive);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -9,35 +9,33 @@
* *
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/ */
#ifndef DDSI_SERDATA_BUILTIN_H #ifndef DDSI_SERDATA_BUILTINTOPIC_H
#define DDSI_SERDATA_BUILTIN_H #define DDSI_SERDATA_BUILTINTOPIC_H
#include "os/os.h" #include "ddsi/q_xqos.h"
#include "util/ut_avl.h"
#include "ddsi/ddsi_serdata.h" #include "ddsi/ddsi_serdata.h"
#include "ddsi/ddsi_sertopic.h" #include "ddsi/ddsi_sertopic.h"
#include "ddsi/q_xqos.h"
struct ddsi_serdata_builtin { struct ddsi_serdata_builtintopic {
struct ddsi_serdata c; struct ddsi_serdata c;
nn_guid_t key; nn_guid_t key;
nn_xqos_t xqos; nn_xqos_t xqos;
}; };
enum ddsi_sertopic_builtin_type { enum ddsi_sertopic_builtintopic_type {
DSBT_PARTICIPANT, DSBT_PARTICIPANT,
DSBT_READER, DSBT_READER,
DSBT_WRITER DSBT_WRITER
}; };
struct ddsi_sertopic_builtin { struct ddsi_sertopic_builtintopic {
struct ddsi_sertopic c; struct ddsi_sertopic c;
enum ddsi_sertopic_builtin_type type; enum ddsi_sertopic_builtintopic_type type;
}; };
extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin; extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic;
extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtin; extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic;
struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename); struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename);
#endif #endif

View file

@ -52,7 +52,7 @@ struct rhc;
* Obviously, it is encouraged to use condition variables and such. But * Obviously, it is encouraged to use condition variables and such. But
* sometimes it wouldn't make that much of a difference and taking the * sometimes it wouldn't make that much of a difference and taking the
* easy route is somewhat pragmatic. */ * easy route is somewhat pragmatic. */
#define DDS_HEADBANG_TIMEOUT_MS (10) #define DDS_HEADBANG_TIMEOUT (DDS_MSECS (10))
typedef bool (*dds_querycondition_filter_with_ctx_fn) (const void * sample, const void *ctx); typedef bool (*dds_querycondition_filter_with_ctx_fn) (const void * sample, const void *ctx);

View file

@ -0,0 +1,28 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#ifndef DDS_WHC_BUILTINTOPIC_H
#define DDS_WHC_BUILTINTOPIC_H
#include "ddsi/q_whc.h"
#include "dds__serdata_builtintopic.h"
#if defined (__cplusplus)
extern "C" {
#endif
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type);
#if defined (__cplusplus)
}
#endif
#endif /* Q_WHC_H */

View file

@ -22,28 +22,16 @@ extern "C" {
struct ddsi_serdata; struct ddsi_serdata;
typedef enum typedef enum {
{
DDS_WR_ACTION_WRITE = 0, DDS_WR_ACTION_WRITE = 0,
DDS_WR_ACTION_WRITE_DISPOSE = DDS_WR_DISPOSE_BIT, DDS_WR_ACTION_WRITE_DISPOSE = DDS_WR_DISPOSE_BIT,
DDS_WR_ACTION_DISPOSE = DDS_WR_KEY_BIT | DDS_WR_DISPOSE_BIT, DDS_WR_ACTION_DISPOSE = DDS_WR_KEY_BIT | DDS_WR_DISPOSE_BIT,
DDS_WR_ACTION_UNREGISTER = DDS_WR_KEY_BIT | DDS_WR_UNREGISTER_BIT DDS_WR_ACTION_UNREGISTER = DDS_WR_KEY_BIT | DDS_WR_UNREGISTER_BIT
} } dds_write_action;
dds_write_action;
int dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp, dds_write_action action);
dds_write_impl( dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action);
_In_ dds_writer *wr, dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d);
_In_ const void *data,
_In_ dds_time_t tstamp,
_In_ dds_write_action action);
int
dds_writecdr_impl(
_In_ dds_writer *wr,
_Inout_ struct ddsi_serdata *d,
_In_ dds_time_t tstamp,
_In_ dds_write_action action);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -24,48 +24,19 @@
#include "dds__subscriber.h" #include "dds__subscriber.h"
#include "dds__write.h" #include "dds__write.h"
#include "dds__writer.h" #include "dds__writer.h"
#include "dds__whc_builtintopic.h"
#include "dds__serdata_builtintopic.h"
#include "ddsi/q_qosmatch.h" #include "ddsi/q_qosmatch.h"
#include "ddsi/ddsi_serdata_builtin.h" #include "ddsi/ddsi_tkmap.h"
static dds_return_t static struct ddsi_sertopic *builtin_participant_topic;
dds__delete_builtin_participant( static struct ddsi_sertopic *builtin_reader_topic;
dds_entity *e); static struct ddsi_sertopic *builtin_writer_topic;
static struct local_orphan_writer *builtintopic_writer_participant;
static struct local_orphan_writer *builtintopic_writer_publications;
static struct local_orphan_writer *builtintopic_writer_subscriptions;
static _Must_inspect_result_ dds_entity_t static dds_qos_t *dds__create_builtin_qos (void)
dds__create_builtin_participant(
void);
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_publisher(
_In_ dds_entity_t participant);
static dds_entity_t
dds__create_builtin_writer(
_In_ dds_entity_t topic);
static _Must_inspect_result_ dds_entity_t
dds__get_builtin_participant(
void);
static os_mutex g_builtin_mutex;
static os_atomic_uint32_t m_call_count = OS_ATOMIC_UINT32_INIT(0);
/* Singletons are used to publish builtin data locally. */
static dds_entity_t g_builtin_local_participant = 0;
static dds_entity_t g_builtin_local_publisher = 0;
static dds_entity_t g_builtin_local_writers[] = {
0, /* index DDS_BUILTIN_TOPIC_DCPSPARTICIPANT - DDS_KIND_INTERNAL - 1 */
0, /* index DDS_BUILTIN_TOPIC_DCPSTOPIC - DDS_KIND_INTERNAL - 1 */
0, /* index DDS_BUILTIN_TOPIC_DCPSPUBLICATION - DDS_KIND_INTERNAL - 1 */
0, /* index DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION - DDS_KIND_INTERNAL - 1 */
};
static _Must_inspect_result_ dds_qos_t *
dds__create_builtin_qos(
void)
{ {
const char *partition = "__BUILT-IN PARTITION__"; const char *partition = "__BUILT-IN PARTITION__";
dds_qos_t *qos = dds_create_qos (); dds_qos_t *qos = dds_create_qos ();
@ -76,95 +47,94 @@ dds__create_builtin_qos(
return qos; return qos;
} }
static dds_return_t void dds__builtin_init (void)
dds__delete_builtin_participant(
dds_entity *e)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
assert(e);
assert(thr);
assert(dds_entity_kind(e->m_hdl) == DDS_KIND_PARTICIPANT);
if (asleep) {
thread_state_awake(thr);
}
dds_domain_free(e->m_domain);
if (asleep) {
thread_state_asleep(thr);
}
return DDS_RETCODE_OK;
}
/*
* We don't use the 'normal' create participant.
*
* This way, the application is not able to access the local builtin writers.
* Also, we can indicate that it should be a 'local only' participant, which
* means that none of the entities under the hierarchy of this participant will
* be exposed to the outside world. This is what we want, because these builtin
* writers are only applicable to local user readers.
*/
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_participant(
void)
{
int q_rc;
nn_plist_t plist;
struct thread_state1 * thr;
bool asleep;
nn_guid_t guid;
dds_entity_t participant;
dds_participant *pp;
nn_plist_init_empty (&plist);
thr = lookup_thread_state ();
asleep = !vtime_awake_p (thr->vtime);
if (asleep) {
thread_state_awake (thr);
}
q_rc = new_participant (&guid, RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_ONLY_LOCAL, &plist);
if (asleep) {
thread_state_asleep (thr);
}
if (q_rc != 0) {
DDS_ERROR("Internal builtin error\n");
participant = DDS_ERRNO(DDS_RETCODE_ERROR);
goto fail;
}
pp = dds_alloc (sizeof (*pp));
participant = dds_entity_init (&pp->m_entity, NULL, DDS_KIND_PARTICIPANT, NULL, NULL, 0);
if (participant < 0) {
goto fail;
}
pp->m_entity.m_guid = guid;
pp->m_entity.m_domain = dds_domain_create (config.domainId.value);
pp->m_entity.m_domainid = config.domainId.value;
pp->m_entity.m_deriver.delete = dds__delete_builtin_participant;
fail:
return participant;
}
static _Must_inspect_result_ dds_entity_t
dds__create_builtin_publisher(
_In_ dds_entity_t participant)
{ {
dds_qos_t *qos = dds__create_builtin_qos (); dds_qos_t *qos = dds__create_builtin_qos ();
dds_entity_t pub = dds_create_publisher(participant, qos, NULL);
builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT));
builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER));
builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER));
dds_delete_qos (qos); dds_delete_qos (qos);
return pub;
} }
static _Must_inspect_result_ dds_entity_t void dds__builtin_fini (void)
dds__create_builtin_subscriber( {
_In_ dds_entity *participant) /* No more sources for builtin topic samples */
struct thread_state1 * const self = lookup_thread_state ();
thread_state_awake (self);
delete_local_orphan_writer (builtintopic_writer_participant);
delete_local_orphan_writer (builtintopic_writer_publications);
delete_local_orphan_writer (builtintopic_writer_subscriptions);
thread_state_asleep (self);
ddsi_sertopic_unref (builtin_participant_topic);
ddsi_sertopic_unref (builtin_reader_topic);
ddsi_sertopic_unref (builtin_writer_topic);
}
dds_entity_t dds__get_builtin_topic (dds_entity_t e, dds_entity_t topic)
{
dds_entity_t pp;
dds_entity_t tp;
if ((pp = dds_get_participant (e)) <= 0)
return pp;
struct ddsi_sertopic *sertopic;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
sertopic = builtin_participant_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
sertopic = builtin_writer_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
sertopic = builtin_reader_topic;
} else {
assert (0);
return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
dds_qos_t *qos = dds__create_builtin_qos ();
tp = dds_create_topic_arbitrary (pp, sertopic, sertopic->name, qos, NULL, NULL);
dds_delete_qos (qos);
return tp;
}
static bool qos_has_resource_limits (const dds_qos_t *qos)
{
return (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED);
}
bool dds__validate_builtin_reader_qos (dds_entity_t topic, const dds_qos_t *qos)
{
if (qos == NULL)
/* default QoS inherited from topic is ok by definition */
return true;
else
{
/* failing writes on built-in topics are unwelcome complications, so we simply forbid the creation of
a reader matching a built-in topics writer that has resource limits */
struct local_orphan_writer *bwr;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
bwr = builtintopic_writer_participant;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
bwr = builtintopic_writer_publications;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
bwr = builtintopic_writer_subscriptions;
} else {
assert (0);
return false;
}
return qos_match_p (qos, bwr->wr.xqos) && !qos_has_resource_limits (qos);
}
}
static dds_entity_t dds__create_builtin_subscriber (dds_entity *participant)
{ {
dds_qos_t *qos = dds__create_builtin_qos (); dds_qos_t *qos = dds__create_builtin_qos ();
dds_entity_t sub = dds__create_subscriber_l (participant, qos, NULL); dds_entity_t sub = dds__create_subscriber_l (participant, qos, NULL);
@ -172,270 +142,98 @@ dds__create_builtin_subscriber(
return sub; return sub;
} }
static dds_entity_t dds_entity_t dds__get_builtin_subscriber (dds_entity_t e)
dds__create_builtin_writer(
_In_ dds_entity_t topic)
{
dds_entity_t wr;
dds_entity_t pub = dds__get_builtin_publisher();
if (pub > 0) {
dds_entity_t top = dds__get_builtin_topic(pub, topic);
if (top > 0) {
wr = dds_create_writer(pub, top, NULL, NULL);
(void)dds_delete(top);
} else {
wr = top;
}
} else {
wr = pub;
}
return wr;
}
static _Must_inspect_result_ dds_entity_t
dds__get_builtin_participant(
void)
{
if (g_builtin_local_participant == 0) {
g_builtin_local_participant = dds__create_builtin_participant();
(void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPARTICIPANT);
(void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSPUBLICATION);
(void)dds__create_builtin_writer(DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION);
}
return g_builtin_local_participant;
}
_Must_inspect_result_ dds_entity_t
dds__get_builtin_publisher(
void)
{
if (g_builtin_local_publisher == 0) {
dds_entity_t par = dds__get_builtin_participant();
if (par > 0) {
g_builtin_local_publisher = dds__create_builtin_publisher(par);
}
}
return g_builtin_local_publisher;
}
_Must_inspect_result_ dds_entity_t
dds__get_builtin_subscriber(
_In_ dds_entity_t e)
{ {
dds_entity_t sub; dds_entity_t sub;
dds_return_t ret; dds_return_t ret;
dds_entity_t participant; dds_entity_t pp;
dds_participant *p; dds_participant *p;
dds_entity *part_entity; dds_entity *part_entity;
participant = dds_get_participant(e); if ((pp = dds_get_participant (e)) <= 0)
if (participant <= 0) { return pp;
/* error already in participant error; no need to repeat error */ if ((ret = dds_entity_lock (pp, DDS_KIND_PARTICIPANT, &part_entity)) < 0)
ret = participant; return ret;
goto error;
}
ret = dds_entity_lock(participant, DDS_KIND_PARTICIPANT, (dds_entity **)&part_entity);
if (ret != DDS_RETCODE_OK) {
goto error;
}
p = (dds_participant *) part_entity; p = (dds_participant *) part_entity;
if (p->m_builtin_subscriber <= 0) { if (p->m_builtin_subscriber <= 0) {
p->m_builtin_subscriber = dds__create_builtin_subscriber (part_entity); p->m_builtin_subscriber = dds__create_builtin_subscriber (part_entity);
} }
sub = p->m_builtin_subscriber; sub = p->m_builtin_subscriber;
dds_entity_unlock(part_entity); dds_entity_unlock(part_entity);
return sub; return sub;
/* Error handling */
error:
assert(ret < 0);
return ret;
} }
bool dds__builtin_is_visible (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid)
_Must_inspect_result_ dds_entity_t
dds__get_builtin_topic(
_In_ dds_entity_t e,
_In_ dds_entity_t topic)
{ {
dds_entity_t participant; return !(onlylocal || is_builtin_endpoint (entityid, vendorid));
dds_entity_t ret;
participant = dds_get_participant(e);
if (participant > 0) {
struct ddsi_sertopic *sertopic;
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
sertopic = gv.builtin_participant_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
sertopic = gv.builtin_writer_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
sertopic = gv.builtin_reader_topic;
} else {
DDS_ERROR("Invalid builtin-topic handle(%d)\n", topic);
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err_invalid_topic;
} }
ret = dds_find_topic (participant, sertopic->name); struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid)
if (ret < 0 && dds_err_nr(ret) == DDS_RETCODE_PRECONDITION_NOT_MET) {
dds_qos_t *qos = dds__create_builtin_qos();
ret = dds_create_topic_arbitrary(participant, sertopic, sertopic->name, qos, NULL, NULL);
dds_delete_qos(qos);
}
} else {
/* Failed to get participant of provided entity */
ret = participant;
}
err_invalid_topic:
return ret;
}
static _Must_inspect_result_ dds_entity_t
dds__get_builtin_writer(
_In_ dds_entity_t topic)
{ {
dds_entity_t wr; struct ddsi_tkmap_instance *tk;
if ((topic >= DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) && (topic <= DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION)) { struct ddsi_serdata *sd;
int index = (int)(topic - DDS_KIND_INTERNAL - 1); struct nn_keyhash kh;
os_mutexLock(&g_builtin_mutex); memcpy (&kh, guid, sizeof (kh));
wr = g_builtin_local_writers[index]; /* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
if (wr == 0) { sd = ddsi_serdata_from_keyhash (builtin_participant_topic, &kh);
wr = dds__create_builtin_writer(topic); tk = ddsi_tkmap_find (sd, false, true);
if (wr > 0) { ddsi_serdata_unref (sd);
g_builtin_local_writers[index] = wr; return tk;
}
}
os_mutexUnlock(&g_builtin_mutex);
} else {
DDS_ERROR("Given topic is not a builtin topic\n");
wr = DDS_ERRNO(DDS_RETCODE_ERROR);
}
return wr;
} }
static dds_return_t struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive)
dds__builtin_write_int(
_In_ dds_entity_t topic,
_In_ const nn_guid_t *guid,
_In_ dds_time_t timestamp,
_In_ bool alive)
{ {
dds_return_t ret = DDS_RETCODE_OK; /* initialize to avoid gcc warning ultimately caused by C's horrible type system */
if (os_atomic_inc32_nv(&m_call_count) > 1) { struct ddsi_sertopic *topic = NULL;
dds_entity_t wr;
wr = dds__get_builtin_writer(topic);
if (wr > 0) {
struct ddsi_sertopic *sertopic;
struct ddsi_serdata *serdata; struct ddsi_serdata *serdata;
struct nn_keyhash keyhash; struct nn_keyhash keyhash;
struct dds_writer *wraddr; switch (e->kind)
if (topic == DDS_BUILTIN_TOPIC_DCPSPARTICIPANT) {
sertopic = gv.builtin_participant_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSPUBLICATION) {
sertopic = gv.builtin_writer_topic;
} else if (topic == DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION) {
sertopic = gv.builtin_reader_topic;
} else {
sertopic = NULL;
assert (0);
}
memcpy (&keyhash, guid, sizeof (keyhash));
serdata = ddsi_serdata_from_keyhash(sertopic, &keyhash);
ret = dds_writer_lock(wr, &wraddr);
if (ret == DDS_RETCODE_OK) {
ret = dds_writecdr_impl (wraddr, serdata, timestamp, alive ? 0 : (DDS_WR_DISPOSE_BIT | DDS_WR_UNREGISTER_BIT));
dds_writer_unlock(wraddr);
}
} else {
ret = wr;
}
}
os_atomic_dec32(&m_call_count);
return ret;
}
void
dds__builtin_write(
_In_ enum ddsi_sertopic_builtin_type type,
_In_ const nn_guid_t *guid,
_In_ nn_wctime_t timestamp,
_In_ bool alive)
{ {
/* initialize to avoid compiler warning ultimately caused by C's horrible type system */ case EK_PARTICIPANT:
dds_entity_t topic = 0; case EK_PROXY_PARTICIPANT:
switch (type) topic = builtin_participant_topic;
{
case DSBT_PARTICIPANT:
topic = DDS_BUILTIN_TOPIC_DCPSPARTICIPANT;
break; break;
case DSBT_WRITER: case EK_WRITER:
topic = DDS_BUILTIN_TOPIC_DCPSPUBLICATION; case EK_PROXY_WRITER:
topic = builtin_writer_topic;
break; break;
case DSBT_READER: case EK_READER:
topic = DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION; case EK_PROXY_READER:
topic = builtin_reader_topic;
break; break;
} }
assert(topic != 0); assert (topic != NULL);
(void)dds__builtin_write_int(topic, guid, timestamp.v, alive); memcpy (&keyhash, &e->guid, sizeof (keyhash));
serdata = ddsi_serdata_from_keyhash (topic, &keyhash);
serdata->timestamp = timestamp;
serdata->statusinfo = alive ? 0 : NN_STATUSINFO_DISPOSE | NN_STATUSINFO_UNREGISTER;
return serdata;
} }
bool dds__validate_builtin_reader_qos(dds_entity_t topic, const dds_qos_t *qos) void dds__builtin_write (const struct entity_common *e, nn_wctime_t timestamp, bool alive)
{ {
if (qos == NULL) { if (ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, get_entity_vendorid (e)))
/* default QoS inherited from topic is ok by definition */
return true;
} else {
dds_entity_t wr = dds__get_builtin_writer(topic);
dds_qos_t *wrqos = dds_create_qos();
dds_return_t ret = dds_get_qos(wr, wrqos);
bool match;
assert (ret == DDS_RETCODE_OK);
(void)ret;
if (!qos_match_p (qos, wrqos)) {
match = false;
} else if (qos->resource_limits.max_samples != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_instances != DDS_LENGTH_UNLIMITED ||
qos->resource_limits.max_samples_per_instance != DDS_LENGTH_UNLIMITED) {
/* this means a write on the built-in topic writer can't fail */
match = false;
} else {
match = true;
}
dds_delete_qos(wrqos);
return match;
}
}
void
dds__builtin_init(
void)
{ {
assert(os_atomic_ld32(&m_call_count) == 0); /* initialize to avoid gcc warning ultimately caused by C's horrible type system */
os_mutexInit(&g_builtin_mutex); struct local_orphan_writer *bwr = NULL;
os_atomic_inc32(&m_call_count); struct ddsi_serdata *serdata = dds__builtin_make_sample (e, timestamp, alive);
} assert (e->tk != NULL);
switch (e->kind)
void
dds__builtin_fini(
void)
{ {
assert(os_atomic_ld32(&m_call_count) > 0); case EK_PARTICIPANT:
while (os_atomic_dec32_nv(&m_call_count) > 0) { case EK_PROXY_PARTICIPANT:
os_atomic_inc32_nv(&m_call_count); bwr = builtintopic_writer_participant;
dds_sleepfor(DDS_MSECS(10)); break;
case EK_WRITER:
case EK_PROXY_WRITER:
bwr = builtintopic_writer_publications;
break;
case EK_READER:
case EK_PROXY_READER:
bwr = builtintopic_writer_subscriptions;
break;
}
dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata);
} }
(void)dds_delete(g_builtin_local_participant);
g_builtin_local_participant = 0;
g_builtin_local_publisher = 0;
memset(g_builtin_local_writers, 0, sizeof(g_builtin_local_writers));
os_mutexDestroy(&g_builtin_mutex);
} }

View file

@ -18,6 +18,7 @@
#include "dds__domain.h" #include "dds__domain.h"
#include "dds__err.h" #include "dds__err.h"
#include "dds__builtin.h" #include "dds__builtin.h"
#include "dds__whc_builtintopic.h"
#include "ddsi/ddsi_iid.h" #include "ddsi/ddsi_iid.h"
#include "ddsi/ddsi_tkmap.h" #include "ddsi/ddsi_tkmap.h"
#include "ddsi/ddsi_serdata.h" #include "ddsi/ddsi_serdata.h"
@ -106,8 +107,6 @@ dds_init(dds_domainid_t domain)
* main configured domain id is. */ * main configured domain id is. */
dds_global.m_default_domain = config.domainId.value; dds_global.m_default_domain = config.domainId.value;
dds__builtin_init();
if (rtps_config_prep(dds_cfgst) != 0) if (rtps_config_prep(dds_cfgst) != 0)
{ {
DDS_ERROR("Failed to configure RTPS\n"); DDS_ERROR("Failed to configure RTPS\n");
@ -138,6 +137,8 @@ dds_init(dds_domainid_t domain)
goto fail_rtps_init; goto fail_rtps_init;
} }
dds__builtin_init ();
if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0) if (gv.servicelease && nn_servicelease_start_renewing(gv.servicelease) < 0)
{ {
DDS_ERROR("Failed to start the servicelease\n"); DDS_ERROR("Failed to start the servicelease\n");
@ -172,7 +173,8 @@ skip:
fail_servicelease_start: fail_servicelease_start:
if (gv.servicelease) if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease); nn_servicelease_stop_renewing (gv.servicelease);
rtps_term (); rtps_stop ();
rtps_fini ();
fail_rtps_init: fail_rtps_init:
if (gv.servicelease) if (gv.servicelease)
{ {
@ -182,7 +184,6 @@ fail_rtps_init:
fail_servicelease_new: fail_servicelease_new:
thread_states_fini(); thread_states_fini();
fail_rtps_config: fail_rtps_config:
dds__builtin_fini();
fail_config_domainid: fail_config_domainid:
dds_global.m_default_domain = DDS_DOMAIN_DEFAULT; dds_global.m_default_domain = DDS_DOMAIN_DEFAULT;
config_fini (dds_cfgst); config_fini (dds_cfgst);
@ -206,11 +207,11 @@ extern void dds_fini (void)
dds_global.m_init_count--; dds_global.m_init_count--;
if (dds_global.m_init_count == 0) if (dds_global.m_init_count == 0)
{ {
dds__builtin_fini();
if (gv.servicelease) if (gv.servicelease)
nn_servicelease_stop_renewing (gv.servicelease); nn_servicelease_stop_renewing (gv.servicelease);
rtps_term (); rtps_stop ();
dds__builtin_fini ();
rtps_fini ();
if (gv.servicelease) if (gv.servicelease)
nn_servicelease_free (gv.servicelease); nn_servicelease_free (gv.servicelease);
gv.servicelease = NULL; gv.servicelease = NULL;
@ -247,7 +248,9 @@ void ddsi_plugin_init (void)
ddsi_plugin.init_fn = dds__init_plugin; ddsi_plugin.init_fn = dds__init_plugin;
ddsi_plugin.fini_fn = dds__fini_plugin; ddsi_plugin.fini_fn = dds__fini_plugin;
ddsi_plugin.builtin_write = dds__builtin_write; ddsi_plugin.builtintopic_is_visible = dds__builtin_is_visible;
ddsi_plugin.builtintopic_get_tkmap_entry = dds__builtin_get_tkmap_entry;
ddsi_plugin.builtintopic_write = dds__builtin_write;
ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free; ddsi_plugin.rhc_plugin.rhc_free_fn = dds_rhc_free;
ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini; ddsi_plugin.rhc_plugin.rhc_fini_fn = dds_rhc_fini;

View file

@ -18,6 +18,7 @@
#include "dds__domain.h" #include "dds__domain.h"
#include "dds__participant.h" #include "dds__participant.h"
#include "dds__err.h" #include "dds__err.h"
#include "dds__builtin.h"
#define DDS_PARTICIPANT_STATUS_MASK 0u #define DDS_PARTICIPANT_STATUS_MASK 0u

View file

@ -316,7 +316,7 @@ bool dds_qos_equal (
} else if (a == NULL || b == NULL) { } else if (a == NULL || b == NULL) {
return false; return false;
} else { } else {
return nn_xqos_delta(a, b, ~(uint64_t)0) != 0; return nn_xqos_delta(a, b, ~(uint64_t)0) == 0;
} }
} }

View file

@ -23,11 +23,10 @@
#include <string.h> #include <string.h>
#include "os/os.h" #include "os/os.h"
#include "dds__key.h" #include "dds__key.h"
#include "ddsi/ddsi_tkmap.h"
#include "dds__stream.h" #include "dds__stream.h"
#include "dds__serdata_builtintopic.h"
#include "ddsi/ddsi_tkmap.h"
#include "ddsi/q_entity.h" #include "ddsi/q_entity.h"
#include "ddsi/ddsi_serdata_builtin.h"
//#include "dds.h" /* FIXME: need the sample types of the built-in topics */
static const uint64_t unihashconsts[] = { static const uint64_t unihashconsts[] = {
UINT64_C (16292676669999574021), UINT64_C (16292676669999574021),
@ -46,7 +45,7 @@ static uint32_t hash_guid (const nn_guid_t *g)
>> 32); >> 32);
} }
static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d, uint32_t basehash) static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtintopic *d, uint32_t basehash)
{ {
d->c.hash = hash_guid (&d->key) ^ basehash; d->c.hash = hash_guid (&d->key) ^ basehash;
return &d->c; return &d->c;
@ -54,37 +53,37 @@ static struct ddsi_serdata *fix_serdata_builtin(struct ddsi_serdata_builtin *d,
static bool serdata_builtin_eqkey(const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn) static bool serdata_builtin_eqkey(const struct ddsi_serdata *acmn, const struct ddsi_serdata *bcmn)
{ {
const struct ddsi_serdata_builtin *a = (const struct ddsi_serdata_builtin *)acmn; const struct ddsi_serdata_builtintopic *a = (const struct ddsi_serdata_builtintopic *)acmn;
const struct ddsi_serdata_builtin *b = (const struct ddsi_serdata_builtin *)bcmn; const struct ddsi_serdata_builtintopic *b = (const struct ddsi_serdata_builtintopic *)bcmn;
return memcmp (&a->key, &b->key, sizeof (a->key)) == 0; return memcmp (&a->key, &b->key, sizeof (a->key)) == 0;
} }
static void serdata_builtin_free(struct ddsi_serdata *dcmn) static void serdata_builtin_free(struct ddsi_serdata *dcmn)
{ {
struct ddsi_serdata_builtin *d = (struct ddsi_serdata_builtin *)dcmn; struct ddsi_serdata_builtintopic *d = (struct ddsi_serdata_builtintopic *)dcmn;
if (d->c.kind == SDK_DATA) if (d->c.kind == SDK_DATA)
nn_xqos_fini (&d->xqos); nn_xqos_fini (&d->xqos);
os_free (d); os_free (d);
} }
static struct ddsi_serdata_builtin *serdata_builtin_new(const struct ddsi_sertopic_builtin *tp, enum ddsi_serdata_kind kind) static struct ddsi_serdata_builtintopic *serdata_builtin_new(const struct ddsi_sertopic_builtintopic *tp, enum ddsi_serdata_kind kind)
{ {
struct ddsi_serdata_builtin *d = os_malloc(sizeof (*d)); struct ddsi_serdata_builtintopic *d = os_malloc(sizeof (*d));
ddsi_serdata_init (&d->c, &tp->c, kind); ddsi_serdata_init (&d->c, &tp->c, kind);
return d; return d;
} }
static void from_entity_pp (struct ddsi_serdata_builtin *d, const struct participant *pp) static void from_entity_pp (struct ddsi_serdata_builtintopic *d, const struct participant *pp)
{ {
nn_xqos_copy(&d->xqos, &pp->plist->qos); nn_xqos_copy(&d->xqos, &pp->plist->qos);
} }
static void from_entity_proxypp (struct ddsi_serdata_builtin *d, const struct proxy_participant *proxypp) static void from_entity_proxypp (struct ddsi_serdata_builtintopic *d, const struct proxy_participant *proxypp)
{ {
nn_xqos_copy(&d->xqos, &proxypp->plist->qos); nn_xqos_copy(&d->xqos, &proxypp->plist->qos);
} }
static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const struct ddsi_sertopic *tp) static void set_topic_type_from_sertopic (struct ddsi_serdata_builtintopic *d, const struct ddsi_sertopic *tp)
{ {
if (!(d->xqos.present & QP_TOPIC_NAME)) if (!(d->xqos.present & QP_TOPIC_NAME))
{ {
@ -98,26 +97,26 @@ static void set_topic_type_from_sertopic (struct ddsi_serdata_builtin *d, const
} }
} }
static void from_entity_rd (struct ddsi_serdata_builtin *d, const struct reader *rd) static void from_entity_rd (struct ddsi_serdata_builtintopic *d, const struct reader *rd)
{ {
nn_xqos_copy(&d->xqos, rd->xqos); nn_xqos_copy(&d->xqos, rd->xqos);
set_topic_type_from_sertopic(d, rd->topic); set_topic_type_from_sertopic(d, rd->topic);
} }
static void from_entity_prd (struct ddsi_serdata_builtin *d, const struct proxy_reader *prd) static void from_entity_prd (struct ddsi_serdata_builtintopic *d, const struct proxy_reader *prd)
{ {
nn_xqos_copy(&d->xqos, prd->c.xqos); nn_xqos_copy(&d->xqos, prd->c.xqos);
assert (d->xqos.present & QP_TOPIC_NAME); assert (d->xqos.present & QP_TOPIC_NAME);
assert (d->xqos.present & QP_TYPE_NAME); assert (d->xqos.present & QP_TYPE_NAME);
} }
static void from_entity_wr (struct ddsi_serdata_builtin *d, const struct writer *wr) static void from_entity_wr (struct ddsi_serdata_builtintopic *d, const struct writer *wr)
{ {
nn_xqos_copy(&d->xqos, wr->xqos); nn_xqos_copy(&d->xqos, wr->xqos);
set_topic_type_from_sertopic(d, wr->topic); set_topic_type_from_sertopic(d, wr->topic);
} }
static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_writer *pwr) static void from_entity_pwr (struct ddsi_serdata_builtintopic *d, const struct proxy_writer *pwr)
{ {
nn_xqos_copy(&d->xqos, pwr->c.xqos); nn_xqos_copy(&d->xqos, pwr->c.xqos);
assert (d->xqos.present & QP_TOPIC_NAME); assert (d->xqos.present & QP_TOPIC_NAME);
@ -127,10 +126,10 @@ static void from_entity_pwr (struct ddsi_serdata_builtin *d, const struct proxy_
struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash) struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi_sertopic *tpcmn, const nn_keyhash_t *keyhash)
{ {
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */ /* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)tpcmn; const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
/* keyhash must in host format (which the GUIDs always are internally) */ /* keyhash must in host format (which the GUIDs always are internally) */
const struct entity_common *entity = ephash_lookup_guid_untyped ((const nn_guid_t *) keyhash->value); const struct entity_common *entity = ephash_lookup_guid_untyped ((const nn_guid_t *) keyhash->value);
struct ddsi_serdata_builtin *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY); struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
memcpy (&d->key, keyhash->value, sizeof (d->key)); memcpy (&d->key, keyhash->value, sizeof (d->key));
if (entity) if (entity)
{ {
@ -188,16 +187,25 @@ static char *dds_string_dup_reuse (char *old, const char *src)
static dds_qos_t *dds_qos_from_xqos_reuse (dds_qos_t *old, const nn_xqos_t *src) static dds_qos_t *dds_qos_from_xqos_reuse (dds_qos_t *old, const nn_xqos_t *src)
{ {
if (old == NULL) if (old == NULL)
return nn_xqos_dup (src); {
old = os_malloc (sizeof (*old));
nn_xqos_init_empty (old);
old->present |= QP_TOPIC_NAME | QP_TYPE_NAME;
nn_xqos_mergein_missing (old, src);
old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME);
}
else else
{ {
nn_xqos_fini (old); nn_xqos_fini (old);
nn_xqos_init_empty (old);
old->present |= QP_TOPIC_NAME | QP_TYPE_NAME;
nn_xqos_mergein_missing (old, src); nn_xqos_mergein_missing (old, src);
old->present &= ~(QP_TOPIC_NAME | QP_TYPE_NAME);
}
return old; return old;
} }
}
static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_participant *sample) static bool to_sample_pp (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_participant *sample)
{ {
convkey (&sample->key, &d->key); convkey (&sample->key, &d->key);
if (d->c.kind == SDK_DATA) if (d->c.kind == SDK_DATA)
@ -207,7 +215,7 @@ static bool to_sample_pp (const struct ddsi_serdata_builtin *d, struct dds_built
return true; return true;
} }
static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds_builtintopic_endpoint *sample) static bool to_sample_endpoint (const struct ddsi_serdata_builtintopic *d, struct dds_builtintopic_endpoint *sample)
{ {
nn_guid_t ppguid; nn_guid_t ppguid;
convkey (&sample->key, &d->key); convkey (&sample->key, &d->key);
@ -227,8 +235,8 @@ static bool to_sample_endpoint (const struct ddsi_serdata_builtin *d, struct dds
static bool serdata_builtin_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim) static bool serdata_builtin_topicless_to_sample (const struct ddsi_sertopic *topic, const struct ddsi_serdata *serdata_common, void *sample, void **bufptr, void *buflim)
{ {
const struct ddsi_serdata_builtin *d = (const struct ddsi_serdata_builtin *)serdata_common; const struct ddsi_serdata_builtintopic *d = (const struct ddsi_serdata_builtintopic *)serdata_common;
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)topic; const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)topic;
if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */ if (bufptr) abort(); else { (void)buflim; } /* FIXME: haven't implemented that bit yet! */
/* FIXME: completing builtin topic support along these lines requires subscribers, publishers and topics to also become DDSI entities - which is probably a good thing anyway */ /* FIXME: completing builtin topic support along these lines requires subscribers, publishers and topics to also become DDSI entities - which is probably a good thing anyway */
switch (tp->type) switch (tp->type)
@ -270,7 +278,7 @@ static void serdata_builtin_to_ser_unref (struct ddsi_serdata *serdata_common, c
(void)serdata_common; (void)ref; (void)serdata_common; (void)ref;
} }
const struct ddsi_serdata_ops ddsi_serdata_ops_builtin = { const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic = {
.get_size = serdata_builtin_get_size, .get_size = serdata_builtin_get_size,
.eqkey = serdata_builtin_eqkey, .eqkey = serdata_builtin_eqkey,
.free = serdata_builtin_free, .free = serdata_builtin_free,

View file

@ -20,22 +20,22 @@
#include "ddsi/q_config.h" #include "ddsi/q_config.h"
#include "ddsi/q_freelist.h" #include "ddsi/q_freelist.h"
#include "ddsi/ddsi_sertopic.h" #include "ddsi/ddsi_sertopic.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsc/dds.h" #include "ddsc/dds.h"
#include "dds__serdata_builtintopic.h"
/* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */ /* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */
struct ddsi_sertopic *new_sertopic_builtin (enum ddsi_sertopic_builtin_type type, const char *name, const char *typename) struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename)
{ {
struct ddsi_sertopic_builtin *tp = os_malloc (sizeof (*tp)); struct ddsi_sertopic_builtintopic *tp = os_malloc (sizeof (*tp));
tp->c.iid = ddsi_iid_gen(); tp->c.iid = ddsi_iid_gen();
tp->c.name = dds_string_dup (name); tp->c.name = dds_string_dup (name);
tp->c.typename = dds_string_dup (typename); tp->c.typename = dds_string_dup (typename);
const size_t name_typename_size = strlen (tp->c.name) + 1 + strlen (tp->c.typename) + 1; const size_t name_typename_size = strlen (tp->c.name) + 1 + strlen (tp->c.typename) + 1;
tp->c.name_typename = dds_alloc (name_typename_size); tp->c.name_typename = dds_alloc (name_typename_size);
snprintf (tp->c.name_typename, name_typename_size, "%s/%s", tp->c.name, tp->c.typename); snprintf (tp->c.name_typename, name_typename_size, "%s/%s", tp->c.name, tp->c.typename);
tp->c.ops = &ddsi_sertopic_ops_builtin; tp->c.ops = &ddsi_sertopic_ops_builtintopic;
tp->c.serdata_ops = &ddsi_serdata_ops_builtin; tp->c.serdata_ops = &ddsi_serdata_ops_builtintopic;
tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops); tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops);
tp->c.status_cb = 0; tp->c.status_cb = 0;
tp->c.status_cb_entity = NULL; tp->c.status_cb_entity = NULL;
@ -66,7 +66,7 @@ static void free_endpoint (void *vsample)
sample->qos = NULL; sample->qos = NULL;
} }
static size_t get_size (enum ddsi_sertopic_builtin_type type) static size_t get_size (enum ddsi_sertopic_builtintopic_type type)
{ {
switch (type) switch (type)
{ {
@ -82,14 +82,14 @@ static size_t get_size (enum ddsi_sertopic_builtin_type type)
static void sertopic_builtin_zero_samples (const struct ddsi_sertopic *sertopic_common, void *samples, size_t count) static void sertopic_builtin_zero_samples (const struct ddsi_sertopic *sertopic_common, void *samples, size_t count)
{ {
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common;
size_t size = get_size (tp->type); size_t size = get_size (tp->type);
memset (samples, 0, size * count); memset (samples, 0, size * count);
} }
static void sertopic_builtin_realloc_samples (void **ptrs, const struct ddsi_sertopic *sertopic_common, void *old, size_t oldcount, size_t count) static void sertopic_builtin_realloc_samples (void **ptrs, const struct ddsi_sertopic *sertopic_common, void *old, size_t oldcount, size_t count)
{ {
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common;
const size_t size = get_size (tp->type); const size_t size = get_size (tp->type);
char *new = dds_realloc (old, size * count); char *new = dds_realloc (old, size * count);
if (new && count > oldcount) if (new && count > oldcount)
@ -105,7 +105,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_
{ {
if (count > 0) if (count > 0)
{ {
const struct ddsi_sertopic_builtin *tp = (const struct ddsi_sertopic_builtin *)sertopic_common; const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)sertopic_common;
const size_t size = get_size (tp->type); const size_t size = get_size (tp->type);
#ifndef NDEBUG #ifndef NDEBUG
for (size_t i = 0, off = 0; i < count; i++, off += size) for (size_t i = 0, off = 0; i < count; i++, off += size)
@ -139,7 +139,7 @@ static void sertopic_builtin_free_samples (const struct ddsi_sertopic *sertopic_
} }
} }
const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtin = { const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic = {
.deinit = sertopic_builtin_deinit, .deinit = sertopic_builtin_deinit,
.zero_samples = sertopic_builtin_zero_samples, .zero_samples = sertopic_builtin_zero_samples,
.realloc_samples = sertopic_builtin_realloc_samples, .realloc_samples = sertopic_builtin_realloc_samples,

View file

@ -316,8 +316,6 @@ static bool dupdef_qos_ok(const dds_qos_t *qos, const struct ddsi_sertopic *st)
static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b) static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b)
{ {
printf ("sertopic_equivalent %p %p (%s %s; %u %u; %p %p; %p %p)\n", (void*)a, (void*)b, a->name_typename, b->name_typename, a->serdata_basehash, b->serdata_basehash, (void *)a->ops, (void *)b->ops, (void *)a->serdata_ops, (void *)b->serdata_ops);
if (strcmp (a->name_typename, b->name_typename) != 0) if (strcmp (a->name_typename, b->name_typename) != 0)
return false; return false;
if (a->serdata_basehash != b->serdata_basehash) if (a->serdata_basehash != b->serdata_basehash)

View file

@ -0,0 +1,201 @@
/*
* Copyright(c) 2006 to 2018 ADLINK Technology Limited and others
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
* v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
#include <assert.h>
#include <stddef.h>
#include <string.h>
#include "os/os.h"
#include "ddsi/ddsi_serdata.h"
#include "ddsi/q_unused.h"
#include "ddsi/q_config.h"
#include "ddsi/q_ephash.h"
#include "ddsi/q_entity.h"
#include "ddsi/ddsi_tkmap.h"
#include "dds__serdata_builtintopic.h"
#include "dds__whc_builtintopic.h"
#include "dds__builtin.h"
struct bwhc {
struct whc common;
enum ddsi_sertopic_builtintopic_type type;
};
enum bwhc_iter_state {
BIS_INIT_LOCAL,
BIS_LOCAL,
BIS_INIT_PROXY,
BIS_PROXY
};
struct bwhc_iter {
struct whc_sample_iter_base c;
enum bwhc_iter_state st;
bool have_sample;
struct ephash_enum it;
};
/* check that our definition of whc_sample_iter fits in the type that callers allocate */
struct bwhc_sample_iter_sizecheck {
char fits_in_generic_type[sizeof(struct bwhc_iter) <= sizeof(struct whc_sample_iter) ? 1 : -1];
};
static void bwhc_free (struct whc *whc_generic)
{
os_free (whc_generic);
}
static void bwhc_sample_iter_init (const struct whc *whc_generic, struct whc_sample_iter *opaque_it)
{
struct bwhc_iter *it = (struct bwhc_iter *) opaque_it;
it->c.whc = (struct whc *) whc_generic;
it->st = BIS_INIT_LOCAL;
it->have_sample = false;
}
static bool is_visible (const struct entity_common *e)
{
const nn_vendorid_t vendorid = get_entity_vendorid (e);
return ddsi_plugin.builtintopic_is_visible (e->guid.entityid, e->onlylocal, vendorid);
}
static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample)
{
struct bwhc_iter * const it = (struct bwhc_iter *) opaque_it;
struct bwhc * const whc = (struct bwhc *) it->c.whc;
enum entity_kind kind = EK_PARTICIPANT; /* pacify gcc */
struct entity_common *entity;
if (it->have_sample)
{
ddsi_serdata_unref (sample->serdata);
it->have_sample = false;
}
/* most fields really don't matter, so memset */
memset (sample, 0, sizeof (*sample));
switch (it->st)
{
case BIS_INIT_LOCAL:
switch (whc->type) {
case DSBT_PARTICIPANT: kind = EK_PARTICIPANT; break;
case DSBT_WRITER: kind = EK_WRITER; break;
case DSBT_READER: kind = EK_READER; break;
}
assert (whc->type == DSBT_PARTICIPANT || kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, kind);
it->st = BIS_LOCAL;
/* FALLS THROUGH */
case BIS_LOCAL:
while ((entity = ephash_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true);
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
it->st = BIS_INIT_PROXY;
}
/* FALLS THROUGH */
case BIS_INIT_PROXY:
switch (whc->type) {
case DSBT_PARTICIPANT: kind = EK_PROXY_PARTICIPANT; break;
case DSBT_WRITER: kind = EK_PROXY_WRITER; break;
case DSBT_READER: kind = EK_PROXY_READER; break;
}
assert (kind != EK_PARTICIPANT);
ephash_enum_init (&it->it, kind);
it->st = BIS_PROXY;
/* FALLS THROUGH */
case BIS_PROXY:
while ((entity = ephash_enum_next (&it->it)) != NULL)
if (is_visible (entity))
break;
if (entity) {
sample->serdata = dds__builtin_make_sample (entity, entity->tupdate, true);
it->have_sample = true;
return true;
} else {
ephash_enum_fini (&it->it);
return false;
}
}
assert (0);
return false;
}
static void bwhc_get_state (const struct whc *whc, struct whc_state *st)
{
(void)whc;
st->max_seq = -1;
st->min_seq = -1;
st->unacked_bytes = 0;
}
static int bwhc_insert (struct whc *whc, seqno_t max_drop_seq, seqno_t seq, struct nn_plist *plist, struct ddsi_serdata *serdata, struct ddsi_tkmap_instance *tk)
{
(void)whc;
(void)max_drop_seq;
(void)seq;
(void)serdata;
(void)tk;
if (plist)
os_free (plist);
return 0;
}
static unsigned bwhc_downgrade_to_volatile (struct whc *whc, struct whc_state *st)
{
(void)whc;
(void)st;
return 0;
}
static unsigned bwhc_remove_acked_messages (struct whc *whc, seqno_t max_drop_seq, struct whc_state *whcst, struct whc_node **deferred_free_list)
{
(void)whc;
(void)max_drop_seq;
(void)whcst;
*deferred_free_list = NULL;
return 0;
}
static void bwhc_free_deferred_free_list (struct whc *whc, struct whc_node *deferred_free_list)
{
(void)whc;
(void)deferred_free_list;
}
static const struct whc_ops bwhc_ops = {
.insert = bwhc_insert,
.remove_acked_messages = bwhc_remove_acked_messages,
.free_deferred_free_list = bwhc_free_deferred_free_list,
.get_state = bwhc_get_state,
.next_seq = 0,
.borrow_sample = 0,
.borrow_sample_key = 0,
.return_sample = 0,
.sample_iter_init = bwhc_sample_iter_init,
.sample_iter_borrow_next = bwhc_sample_iter_borrow_next,
.downgrade_to_volatile = bwhc_downgrade_to_volatile,
.free = bwhc_free
};
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type)
{
struct bwhc *whc = os_malloc (sizeof (*whc));
whc->common.ops = &bwhc_ops;
whc->type = type;
return (struct whc *) whc;
}

View file

@ -26,128 +26,95 @@
#include "ddsi/q_entity.h" #include "ddsi/q_entity.h"
#include "ddsi/q_radmin.h" #include "ddsi/q_radmin.h"
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) dds_return_t dds_write (dds_entity_t writer, const void *data)
dds_return_t
dds_write(
_In_ dds_entity_t writer,
_In_ const void *data)
{ {
dds_return_t ret; dds_return_t ret;
dds__retcode_t rc; dds__retcode_t rc;
dds_writer *wr; dds_writer *wr;
if (data != NULL) { if (data == NULL)
rc = dds_writer_lock(writer, &wr); return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
if (rc == DDS_RETCODE_OK) {
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
return DDS_ERRNO (rc);
ret = dds_write_impl (wr, data, dds_time (), 0); ret = dds_write_impl (wr, data, dds_time (), 0);
dds_writer_unlock (wr); dds_writer_unlock (wr);
} else {
DDS_ERROR("Error occurred on locking entity\n");
ret = DDS_ERRNO(rc);
}
} else {
DDS_ERROR("No data buffer provided\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
return ret; return ret;
} }
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) dds_return_t dds_writecdr (dds_entity_t writer, struct ddsi_serdata *serdata)
int
dds_writecdr(
dds_entity_t writer,
struct ddsi_serdata *serdata)
{ {
dds_return_t ret; dds_return_t ret;
dds__retcode_t rc; dds__retcode_t rc;
dds_writer *wr; dds_writer *wr;
if (serdata != NULL) {
rc = dds_writer_lock(writer, &wr); if (serdata == NULL)
if (rc == DDS_RETCODE_OK) { return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
return DDS_ERRNO (rc);
ret = dds_writecdr_impl (wr, serdata, dds_time (), 0); ret = dds_writecdr_impl (wr, serdata, dds_time (), 0);
dds_writer_unlock (wr); dds_writer_unlock (wr);
} else {
DDS_ERROR("Error occurred on locking writer\n");
ret = DDS_ERRNO(rc);
}
} else{
DDS_ERROR("Given cdr has NULL value\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
}
return ret; return ret;
} }
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) dds_return_t dds_write_ts (dds_entity_t writer, const void *data, dds_time_t timestamp)
dds_return_t
dds_write_ts(
_In_ dds_entity_t writer,
_In_ const void *data,
_In_ dds_time_t timestamp)
{ {
dds_return_t ret; dds_return_t ret;
dds__retcode_t rc; dds__retcode_t rc;
dds_writer *wr; dds_writer *wr;
if(data == NULL){ if (data == NULL || timestamp < 0)
DDS_ERROR("Argument data has NULL value\n"); return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err; if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
} return DDS_ERRNO (rc);
if(timestamp < 0){
DDS_ERROR("Argument timestamp has negative value\n");
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
goto err;
}
rc = dds_writer_lock(writer, &wr);
if (rc == DDS_RETCODE_OK) {
ret = dds_write_impl (wr, data, timestamp, 0); ret = dds_write_impl (wr, data, timestamp, 0);
dds_writer_unlock (wr); dds_writer_unlock (wr);
} else {
DDS_ERROR("Error occurred on locking writer\n");
ret = DDS_ERRNO(rc);
}
err:
return ret; return ret;
} }
static int static dds_return_t try_store (struct rhc *rhc, const struct proxy_writer_info *pwr_info, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk, dds_duration_t *max_block_ms)
deliver_locally( {
_In_ struct writer *wr, while (!(ddsi_plugin.rhc_plugin.rhc_store_fn) (rhc, pwr_info, payload, tk))
_In_ struct ddsi_serdata *payload, {
_In_ struct ddsi_tkmap_instance *tk) if (*max_block_ms > 0)
{
dds_sleepfor (DDS_HEADBANG_TIMEOUT);
*max_block_ms -= DDS_HEADBANG_TIMEOUT;
}
else
{
DDS_ERROR ("The writer could not deliver data on time, probably due to a local reader resources being full\n");
return DDS_ERRNO (DDS_RETCODE_TIMEOUT);
}
}
return DDS_RETCODE_OK;
}
static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *payload, struct ddsi_tkmap_instance *tk)
{ {
dds_return_t ret = DDS_RETCODE_OK; dds_return_t ret = DDS_RETCODE_OK;
os_mutexLock (&wr->rdary.rdary_lock); os_mutexLock (&wr->rdary.rdary_lock);
if (wr->rdary.fastpath_ok) { if (wr->rdary.fastpath_ok)
{
struct reader ** const rdary = wr->rdary.rdary; struct reader ** const rdary = wr->rdary.rdary;
if (rdary[0]) { if (rdary[0])
{
dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time);
struct proxy_writer_info pwr_info; struct proxy_writer_info pwr_info;
unsigned i; unsigned i;
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos); make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
for (i = 0; rdary[i]; i++) { for (i = 0; rdary[i]; i++) {
bool stored;
DDS_TRACE ("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid)); DDS_TRACE ("reader %x:%x:%x:%x\n", PGUID (rdary[i]->e.guid));
dds_duration_t max_block_ms = nn_from_ddsi_duration(wr->xqos->reliability.max_blocking_time) / DDS_NSECS_IN_MSEC; if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
do { break;
stored = (ddsi_plugin.rhc_plugin.rhc_store_fn) (rdary[i]->rhc, &pwr_info, payload, tk);
if (!stored) {
if (max_block_ms <= 0) {
DDS_ERROR("The writer could not deliver data on time, probably due to a local reader resources being full\n");
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT);
} else {
dds_sleepfor(DDS_MSECS(DDS_HEADBANG_TIMEOUT_MS));
}
/* Decreasing the block time after the sleep, let's us possibly
* wait a bit too long. But that's preferable compared to waiting
* a bit too short. */
max_block_ms -= DDS_HEADBANG_TIMEOUT_MS;
}
} while ((!stored) && (ret == DDS_RETCODE_OK));
} }
} }
os_mutexUnlock (&wr->rdary.rdary_lock); os_mutexUnlock (&wr->rdary.rdary_lock);
} else { }
else
{
/* When deleting, pwr is no longer accessible via the hash /* When deleting, pwr is no longer accessible via the hash
tables, and consequently, a reader may be deleted without tables, and consequently, a reader may be deleted without
it being possible to remove it from rdary. The primary it being possible to remove it from rdary. The primary
@ -159,15 +126,19 @@ deliver_locally(
ut_avlIter_t it; ut_avlIter_t it;
struct pwr_rd_match *m; struct pwr_rd_match *m;
struct proxy_writer_info pwr_info; struct proxy_writer_info pwr_info;
dds_duration_t max_block_ms = nn_from_ddsi_duration (wr->xqos->reliability.max_blocking_time);
os_mutexUnlock (&wr->rdary.rdary_lock); os_mutexUnlock (&wr->rdary.rdary_lock);
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos); make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
os_mutexLock (&wr->e.lock); os_mutexLock (&wr->e.lock);
for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it)) { for (m = ut_avlIterFirst (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ut_avlIterNext (&it))
{
struct reader *rd; struct reader *rd;
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL) { if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL)
{
DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid)); DDS_TRACE("reader-via-guid %x:%x:%x:%x\n", PGUID (rd->e.guid));
/* Copied the return value ignore from DDSI deliver_user_data() function. */ /* Copied the return value ignore from DDSI deliver_user_data() function. */
(void)(ddsi_plugin.rhc_plugin.rhc_store_fn) (rd->rhc, &pwr_info, payload, tk); if ((ret = try_store (rd->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
break;
} }
} }
os_mutexUnlock (&wr->e.lock); os_mutexUnlock (&wr->e.lock);
@ -175,121 +146,83 @@ deliver_locally(
return ret; return ret;
} }
int dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstamp, dds_write_action action)
dds_write_impl(
_In_ dds_writer *wr,
_In_ const void * data,
_In_ dds_time_t tstamp,
_In_ dds_write_action action)
{ {
dds_return_t ret = DDS_RETCODE_OK;
int w_rc;
assert (wr);
assert (dds_entity_kind(((dds_entity*)wr)->m_hdl) == DDS_KIND_WRITER);
struct thread_state1 * const thr = lookup_thread_state (); struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime); const bool asleep = !vtime_awake_p (thr->vtime);
const bool writekey = action & DDS_WR_KEY_BIT; const bool writekey = action & DDS_WR_KEY_BIT;
dds_writer * writer = (dds_writer*) wr; struct writer *ddsi_wr = wr->m_wr;
struct writer * ddsi_wr = writer->m_wr;
struct ddsi_tkmap_instance *tk; struct ddsi_tkmap_instance *tk;
struct ddsi_serdata *d; struct ddsi_serdata *d;
dds_return_t ret = DDS_RETCODE_OK;
int w_rc;
if (data == NULL) { if (data == NULL)
{
DDS_ERROR("No data buffer provided\n"); DDS_ERROR("No data buffer provided\n");
return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER); return DDS_ERRNO (DDS_RETCODE_BAD_PARAMETER);
} }
/* Check for topic filter */ /* Check for topic filter */
if (wr->m_topic->filter_fn && ! writekey) { if (wr->m_topic->filter_fn && !writekey)
if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) { if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx))
return DDS_RETCODE_OK; return DDS_RETCODE_OK;
}
}
if (asleep) { if (asleep)
thread_state_awake (thr); thread_state_awake (thr);
}
/* Serialize and write data or key */ /* Serialize and write data or key */
d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data); d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data);
d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0);
/* Set if disposing or unregistering */
d->statusinfo = ((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) |
((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ;
d->timestamp.v = tstamp;
ddsi_serdata_ref(d);
tk = ddsi_tkmap_lookup_instance_ref(d);
w_rc = write_sample_gc (writer->m_xp, ddsi_wr, d, tk);
if (w_rc >= 0) {
/* Flush out write unless configured to batch */
if (! config.whc_batch){
nn_xpack_send (writer->m_xp, false);
}
ret = DDS_RETCODE_OK;
} else if (w_rc == ERR_TIMEOUT) {
DDS_ERROR("The writer could not deliver data on time, probably due to a reader resources being full\n");
ret = DDS_ERRNO(DDS_RETCODE_TIMEOUT);
} else if (w_rc == ERR_INVALID_DATA) {
DDS_ERROR("Invalid data provided\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
} else {
DDS_ERROR("Internal error\n");
ret = DDS_ERRNO(DDS_RETCODE_ERROR);
}
if (ret == DDS_RETCODE_OK) {
ret = deliver_locally (ddsi_wr, d, tk);
}
ddsi_serdata_unref(d);
ddsi_tkmap_instance_unref(tk);
if (asleep) {
thread_state_asleep (thr);
}
return ret;
}
int
dds_writecdr_impl(
_In_ dds_writer *wr,
_Inout_ struct ddsi_serdata *d,
_In_ dds_time_t tstamp,
_In_ dds_write_action action)
{
int ret = DDS_RETCODE_OK;
int w_rc;
assert (wr);
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct writer * ddsi_wr = wr->m_wr;
struct ddsi_tkmap_instance * tk;
if (wr->m_topic->filter_fn) {
abort();
}
if (asleep) {
thread_state_awake (thr);
}
/* Set if disposing or unregistering */
d->statusinfo =
((action & DDS_WR_DISPOSE_BIT ) ? NN_STATUSINFO_DISPOSE : 0) |
((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0) ;
d->timestamp.v = tstamp; d->timestamp.v = tstamp;
ddsi_serdata_ref (d); ddsi_serdata_ref (d);
tk = ddsi_tkmap_lookup_instance_ref (d); tk = ddsi_tkmap_lookup_instance_ref (d);
w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk); w_rc = write_sample_gc (wr->m_xp, ddsi_wr, d, tk);
if (w_rc >= 0)
{
/* Flush out write unless configured to batch */
if (!config.whc_batch)
nn_xpack_send (wr->m_xp, false);
ret = DDS_RETCODE_OK;
} else if (w_rc == ERR_TIMEOUT) {
DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n");
ret = DDS_ERRNO (DDS_RETCODE_TIMEOUT);
} else if (w_rc == ERR_INVALID_DATA) {
DDS_ERROR ("Invalid data provided\n");
ret = DDS_ERRNO (DDS_RETCODE_ERROR);
} else {
DDS_ERROR ("Internal error\n");
ret = DDS_ERRNO (DDS_RETCODE_ERROR);
}
if (ret == DDS_RETCODE_OK)
ret = deliver_locally (ddsi_wr, d, tk);
ddsi_serdata_unref (d);
ddsi_tkmap_instance_unref (tk);
if (asleep)
thread_state_asleep (thr);
return ret;
}
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d)
{
struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime);
struct ddsi_tkmap_instance * tk;
int ret = DDS_RETCODE_OK;
int w_rc;
if (asleep)
thread_state_awake (thr);
ddsi_serdata_ref (d);
tk = ddsi_tkmap_lookup_instance_ref (d);
w_rc = write_sample_gc (xp, ddsi_wr, d, tk);
if (w_rc >= 0) { if (w_rc >= 0) {
/* Flush out write unless configured to batch */ /* Flush out write unless configured to batch */
if (! config.whc_batch) { if (!config.whc_batch && xp != NULL)
nn_xpack_send (wr->m_xp, false); nn_xpack_send (xp, false);
}
ret = DDS_RETCODE_OK; ret = DDS_RETCODE_OK;
} else if (w_rc == ERR_TIMEOUT) { } else if (w_rc == ERR_TIMEOUT) {
DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n"); DDS_ERROR ("The writer could not deliver data on time, probably due to a reader resources being full\n");
@ -302,51 +235,50 @@ dds_writecdr_impl(
ret = DDS_ERRNO (DDS_RETCODE_ERROR); ret = DDS_ERRNO (DDS_RETCODE_ERROR);
} }
if (ret == DDS_RETCODE_OK) { if (ret == DDS_RETCODE_OK)
ret = deliver_locally (ddsi_wr, d, tk); ret = deliver_locally (ddsi_wr, d, tk);
}
ddsi_serdata_unref (d); ddsi_serdata_unref (d);
ddsi_tkmap_instance_unref (tk); ddsi_tkmap_instance_unref (tk);
if (asleep) { if (asleep)
thread_state_asleep (thr); thread_state_asleep (thr);
}
return ret; return ret;
} }
void dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action)
dds_write_set_batch( {
bool enable) if (wr->m_topic->filter_fn)
abort ();
/* Set if disposing or unregistering */
d->statusinfo = ((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) | ((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0);
d->timestamp.v = tstamp;
return dds_writecdr_impl_lowlevel (wr->m_wr, wr->m_xp, d);
}
void dds_write_set_batch (bool enable)
{ {
config.whc_batch = enable ? 1 : 0; config.whc_batch = enable ? 1 : 0;
} }
_Pre_satisfies_((writer & DDS_ENTITY_KIND_MASK) == DDS_KIND_WRITER) void dds_write_flush (dds_entity_t writer)
void
dds_write_flush(
dds_entity_t writer)
{ {
dds__retcode_t rc;
struct thread_state1 * const thr = lookup_thread_state (); struct thread_state1 * const thr = lookup_thread_state ();
const bool asleep = !vtime_awake_p (thr->vtime); const bool asleep = !vtime_awake_p (thr->vtime);
dds_writer *wr; dds_writer *wr;
dds__retcode_t rc;
if (asleep) { if (asleep)
thread_state_awake (thr); thread_state_awake (thr);
} if ((rc = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
rc = dds_writer_lock(writer, &wr); DDS_ERROR ("Error occurred on locking writer\n");
if (rc == DDS_RETCODE_OK) { else
{
nn_xpack_send (wr->m_xp, true); nn_xpack_send (wr->m_xp, true);
dds_writer_unlock (wr); dds_writer_unlock (wr);
} else{
DDS_ERROR("Error occurred on locking writer\n");
} }
if (asleep) { if (asleep)
thread_state_asleep (thr); thread_state_asleep (thr);
}
return; return;
} }

View file

@ -147,20 +147,21 @@ check_default_qos_of_builtin_entity(dds_entity_t entity)
CU_Test(ddsc_builtin_topics, availability_builtin_topics, .init = setup, .fini = teardown) CU_Test(ddsc_builtin_topics, availability_builtin_topics, .init = setup, .fini = teardown)
{ {
/* FIXME: Successful lookup doesn't rhyme with them not being returned when looking at the children of the participant ... */
dds_entity_t topic; dds_entity_t topic;
topic = dds_find_topic(g_participant, "DCPSParticipant"); topic = dds_find_topic(g_participant, "DCPSParticipant");
CU_ASSERT_FATAL(topic > 0); CU_ASSERT_FATAL(topic < 0);
dds_delete(topic); //dds_delete(topic);
topic = dds_find_topic(g_participant, "DCPSTopic"); topic = dds_find_topic(g_participant, "DCPSTopic");
CU_ASSERT_FATAL(topic < 0); CU_ASSERT_FATAL(topic < 0);
//TODO CHAM-347: dds_delete(topic); //dds_delete(topic);
topic = dds_find_topic(g_participant, "DCPSSubscription"); topic = dds_find_topic(g_participant, "DCPSSubscription");
CU_ASSERT_FATAL(topic > 0); CU_ASSERT_FATAL(topic < 0);
dds_delete(topic); //dds_delete(topic);
topic = dds_find_topic(g_participant, "DCPSPublication"); topic = dds_find_topic(g_participant, "DCPSPublication");
CU_ASSERT_FATAL(topic > 0); CU_ASSERT_FATAL(topic < 0);
dds_delete(topic); //dds_delete(topic);
} }
CU_Test(ddsc_builtin_topics, read_publication_data, .init = setup, .fini = teardown) CU_Test(ddsc_builtin_topics, read_publication_data, .init = setup, .fini = teardown)

View file

@ -20,10 +20,8 @@ PREPEND(srcs_ddsi "${CMAKE_CURRENT_LIST_DIR}/src"
ddsi_mcgroup.c ddsi_mcgroup.c
ddsi_serdata.c ddsi_serdata.c
ddsi_serdata_default.c ddsi_serdata_default.c
ddsi_serdata_builtin.c
ddsi_sertopic.c ddsi_sertopic.c
ddsi_sertopic_default.c ddsi_sertopic_default.c
ddsi_sertopic_builtin.c
ddsi_rhc_plugin.c ddsi_rhc_plugin.c
ddsi_iid.c ddsi_iid.c
ddsi_tkmap.c ddsi_tkmap.c
@ -76,7 +74,6 @@ PREPEND(hdrs_private_ddsi "${CMAKE_CURRENT_LIST_DIR}/include/ddsi"
ddsi_serdata.h ddsi_serdata.h
ddsi_sertopic.h ddsi_sertopic.h
ddsi_serdata_default.h ddsi_serdata_default.h
ddsi_serdata_builtin.h
ddsi_rhc_plugin.h ddsi_rhc_plugin.h
ddsi_iid.h ddsi_iid.h
ddsi_tkmap.h ddsi_tkmap.h

View file

@ -22,7 +22,6 @@
#include "ddsi/q_xqos.h" #include "ddsi/q_xqos.h"
#include "ddsi/ddsi_tran.h" #include "ddsi/ddsi_tran.h"
#include "ddsi/q_feature_check.h" #include "ddsi/q_feature_check.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsi/ddsi_rhc_plugin.h" #include "ddsi/ddsi_rhc_plugin.h"
#if defined (__cplusplus) #if defined (__cplusplus)
@ -412,7 +411,10 @@ struct ddsi_plugin
{ {
int (*init_fn) (void); int (*init_fn) (void);
void (*fini_fn) (void); void (*fini_fn) (void);
void (*builtin_write) (enum ddsi_sertopic_builtin_type type, const nn_guid_t *guid, nn_wctime_t timestamp, bool alive);
bool (*builtintopic_is_visible) (nn_entityid_t entityid, bool onlylocal, nn_vendorid_t vendorid);
struct ddsi_tkmap_instance * (*builtintopic_get_tkmap_entry) (const struct nn_guid *guid);
void (*builtintopic_write) (const struct entity_common *e, nn_wctime_t timestamp, bool alive);
/* Read cache */ /* Read cache */
struct ddsi_rhc_plugin rhc_plugin; struct ddsi_rhc_plugin rhc_plugin;

View file

@ -127,12 +127,15 @@ struct pwr_rd_match {
struct nn_rsample_info; struct nn_rsample_info;
struct nn_rdata; struct nn_rdata;
struct ddsi_tkmap_instance;
struct entity_common { struct entity_common {
enum entity_kind kind; enum entity_kind kind;
nn_guid_t guid; nn_guid_t guid;
nn_wctime_t tupdate; /* timestamp of last update */
char *name; char *name;
uint64_t iid; uint64_t iid;
struct ddsi_tkmap_instance *tk;
os_mutex lock; os_mutex lock;
bool onlylocal; bool onlylocal;
}; };
@ -390,8 +393,11 @@ int is_deleted_participant_guid (const struct nn_guid *guid, unsigned for_what);
nn_entityid_t to_entityid (unsigned u); nn_entityid_t to_entityid (unsigned u);
int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid); int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid);
int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid);
bool is_local_orphan_endpoint (const struct entity_common *e);
int is_writer_entityid (nn_entityid_t id); int is_writer_entityid (nn_entityid_t id);
int is_reader_entityid (nn_entityid_t id); int is_reader_entityid (nn_entityid_t id);
nn_vendorid_t get_entity_vendorid (const struct entity_common *e);
int pp_allocate_entityid (nn_entityid_t *id, unsigned kind, struct participant *pp); int pp_allocate_entityid (nn_entityid_t *id, unsigned kind, struct participant *pp);
void pp_release_entityid(struct participant *pp, nn_entityid_t id); void pp_release_entityid(struct participant *pp, nn_entityid_t id);
@ -492,6 +498,12 @@ int delete_writer_nolinger_locked (struct writer *wr);
int delete_reader (const struct nn_guid *guid); int delete_reader (const struct nn_guid *guid);
uint64_t reader_instance_id (const struct nn_guid *guid); uint64_t reader_instance_id (const struct nn_guid *guid);
struct local_orphan_writer {
struct writer wr;
};
struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc);
void delete_local_orphan_writer (struct local_orphan_writer *wr);
/* To create or delete a new proxy participant: "guid" MUST have the /* To create or delete a new proxy participant: "guid" MUST have the
pre-defined participant entity id. Unlike delete_participant(), pre-defined participant entity id. Unlike delete_participant(),
deleting a proxy participant will automatically delete all its deleting a proxy participant will automatically delete all its

View file

@ -43,6 +43,7 @@ struct gcreq {
}; };
struct gcreq_queue *gcreq_queue_new (void); struct gcreq_queue *gcreq_queue_new (void);
void gcreq_queue_drain (struct gcreq_queue *q);
void gcreq_queue_free (struct gcreq_queue *q); void gcreq_queue_free (struct gcreq_queue *q);
struct gcreq *gcreq_new (struct gcreq_queue *gcreq_queue, gcreq_cb_t cb); struct gcreq *gcreq_new (struct gcreq_queue *gcreq_queue, gcreq_cb_t cb);

View file

@ -281,11 +281,6 @@ struct q_globals {
struct ddsi_sertopic *plist_topic; /* used for all discovery data */ struct ddsi_sertopic *plist_topic; /* used for all discovery data */
struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */ struct ddsi_sertopic *rawcdr_topic; /* used for participant message data */
/* Sertopics for built-in topics -- FIXME: these really have little to do with topics, but everything with topic types, in other words, they are the type supports in DDS ... so a bit of refactoring is required */
struct ddsi_sertopic *builtin_participant_topic;
struct ddsi_sertopic *builtin_reader_topic;
struct ddsi_sertopic *builtin_writer_topic;
/* Network ID needed by v_groupWrite -- FIXME: might as well pass it /* Network ID needed by v_groupWrite -- FIXME: might as well pass it
to the receive thread instead of making it global (and that would to the receive thread instead of making it global (and that would
remove the need to include kernelModule.h) */ remove the need to include kernelModule.h) */

View file

@ -80,7 +80,8 @@ int rtps_config_prep (struct cfgst *cfgst);
int rtps_config_open (void); int rtps_config_open (void);
int rtps_init (void); int rtps_init (void);
void ddsi_plugin_init (void); void ddsi_plugin_init (void);
void rtps_term (void); void rtps_stop (void);
void rtps_fini (void);
#if defined (__cplusplus) #if defined (__cplusplus)
} }

View file

@ -42,7 +42,7 @@ struct whc_state {
an iter on the stack without specifying an implementation. If future changes or an iter on the stack without specifying an implementation. If future changes or
implementations require more, these can be adjusted. An implementation should check implementations require more, these can be adjusted. An implementation should check
things fit at compile time. */ things fit at compile time. */
#define WHC_SAMPLE_ITER_SIZE (1*sizeof(void *)) #define WHC_SAMPLE_ITER_SIZE (7 * sizeof(void *))
struct whc_sample_iter_base { struct whc_sample_iter_base {
struct whc *whc; struct whc *whc;
}; };

View file

@ -64,8 +64,13 @@ static ssize_t ddsi_udp_conn_read (ddsi_tran_conn_t conn, unsigned char * buf, s
msghdr.msg_namelen = srclen; msghdr.msg_namelen = srclen;
msghdr.msg_iov = &msg_iov; msghdr.msg_iov = &msg_iov;
msghdr.msg_iovlen = 1; msghdr.msg_iovlen = 1;
#if !defined(__sun) || defined(_XPG4_2)
msghdr.msg_control = NULL; msghdr.msg_control = NULL;
msghdr.msg_controllen = 0; msghdr.msg_controllen = 0;
#else
msghdr.msg_accrights = NULL;
msghdr.msg_accrightslen = 0;
#endif
do { do {
ret = recvmsg(((ddsi_udp_conn_t) conn)->m_sock, &msghdr, 0); ret = recvmsg(((ddsi_udp_conn_t) conn)->m_sock, &msghdr, 0);
@ -117,8 +122,13 @@ static ssize_t ddsi_udp_conn_write (ddsi_tran_conn_t conn, const nn_locator_t *d
set_msghdr_iov (&msg, (os_iovec_t *) iov, niov); set_msghdr_iov (&msg, (os_iovec_t *) iov, niov);
msg.msg_name = &dstaddr; msg.msg_name = &dstaddr;
msg.msg_namelen = (socklen_t) os_sockaddr_get_size((os_sockaddr *) &dstaddr); msg.msg_namelen = (socklen_t) os_sockaddr_get_size((os_sockaddr *) &dstaddr);
#if !defined(__sun) || defined(_XPG4_2)
msg.msg_control = NULL; msg.msg_control = NULL;
msg.msg_controllen = 0; msg.msg_controllen = 0;
#else
msg.msg_accrights = NULL;
msg.msg_accrightslen = 0;
#endif
#if SYSDEPS_MSGHDR_FLAGS #if SYSDEPS_MSGHDR_FLAGS
msg.msg_flags = (int) flags; msg.msg_flags = (int) flags;
#else #else

View file

@ -149,39 +149,41 @@ int is_builtin_entityid (nn_entityid_t id, nn_vendorid_t vendorid)
} }
} }
static int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid) int is_builtin_endpoint (nn_entityid_t id, nn_vendorid_t vendorid)
{ {
return is_builtin_entityid (id, vendorid) && id.u != NN_ENTITYID_PARTICIPANT; return is_builtin_entityid (id, vendorid) && id.u != NN_ENTITYID_PARTICIPANT;
} }
static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_vendorid_t vendorid, bool onlylocal, struct ddsi_tkmap_instance **tk) bool is_local_orphan_endpoint (const struct entity_common *e)
{
return (e->guid.prefix.u[0] == 0 && e->guid.prefix.u[1] == 0 && e->guid.prefix.u[2] == 0 &&
is_builtin_endpoint (e->guid.entityid, ownvendorid));
}
static void entity_common_init (struct entity_common *e, const struct nn_guid *guid, const char *name, enum entity_kind kind, nn_wctime_t tcreate, nn_vendorid_t vendorid, bool onlylocal)
{ {
e->guid = *guid; e->guid = *guid;
e->kind = kind; e->kind = kind;
e->tupdate = tcreate;
e->name = os_strdup (name ? name : ""); e->name = os_strdup (name ? name : "");
e->onlylocal = onlylocal; e->onlylocal = onlylocal;
os_mutexInit (&e->lock); os_mutexInit (&e->lock);
if (onlylocal || is_builtin_endpoint (guid->entityid, vendorid)) if (ddsi_plugin.builtintopic_is_visible (guid->entityid, onlylocal, vendorid))
{ {
e->iid = ddsi_iid_gen (); e->tk = ddsi_plugin.builtintopic_get_tkmap_entry (guid);
*tk = NULL; e->iid = e->tk->m_iid;
} }
else else
{ {
struct ddsi_serdata *sd; e->tk = NULL;
struct nn_keyhash kh; e->iid = ddsi_iid_gen ();
memcpy (&kh, guid, sizeof (kh));
/* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
sd = ddsi_serdata_from_keyhash (gv.builtin_participant_topic, &kh);
/* FIXME: this makes the iid for a reincarnation of a proxy entity dependent on whether an application reader kept the corresponding built-in topic instance around, it may be attractive to reconsider and guarantee a new iid in these cases, at least for the publication handle */
*tk = ddsi_tkmap_find(sd, false, true);
ddsi_serdata_unref (sd);
e->iid = (*tk)->m_iid;
} }
} }
static void entity_common_fini (struct entity_common *e) static void entity_common_fini (struct entity_common *e)
{ {
if (e->tk)
ddsi_tkmap_instance_unref (e->tk);
os_free (e->name); os_free (e->name);
os_mutexDestroy (&e->lock); os_mutexDestroy (&e->lock);
} }
@ -238,38 +240,27 @@ void local_reader_ary_setinvalid (struct local_reader_ary *x)
os_mutexUnlock (&x->rdary_lock); os_mutexUnlock (&x->rdary_lock);
} }
static void write_builtin_topic_any (const struct entity_common *e, nn_wctime_t timestamp, bool alive, nn_vendorid_t vendorid, struct ddsi_tkmap_instance *tk) nn_vendorid_t get_entity_vendorid (const struct entity_common *e)
{ {
if (!(e->onlylocal || is_builtin_endpoint(e->guid.entityid, vendorid)))
{
/* initialize to avoid gcc warning ultimately caused by C's horrible type system */
enum ddsi_sertopic_builtin_type type = DSBT_PARTICIPANT;
switch (e->kind) switch (e->kind)
{ {
case EK_PARTICIPANT: case EK_PARTICIPANT:
case EK_PROXY_PARTICIPANT:
type = DSBT_PARTICIPANT;
break;
case EK_READER: case EK_READER:
case EK_PROXY_READER:
type = DSBT_READER;
break;
case EK_WRITER: case EK_WRITER:
return (nn_vendorid_t) MY_VENDOR_ID;
break;
case EK_PROXY_PARTICIPANT:
return ((const struct proxy_participant *) e)->vendor;
break;
case EK_PROXY_READER:
return ((const struct proxy_reader *) e)->c.vendor;
break;
case EK_PROXY_WRITER: case EK_PROXY_WRITER:
type = DSBT_WRITER; return ((const struct proxy_writer *) e)->c.vendor;
break; break;
} }
assert(type != DSBT_PARTICIPANT || (e->kind == EK_PARTICIPANT || e->kind == EK_PROXY_PARTICIPANT)); assert (0);
ddsi_plugin.builtin_write (type, &e->guid, timestamp, alive); return (nn_vendorid_t) NN_VENDORID_UNKNOWN;
}
/* tkmap instance only needs to be kept around until the first write of a built-in topic (if none ever happens, it needn't be kept at all): afterward, the WHC of the local built-in topic writer will keep the entry alive. FIXME: the SPDP/SEPD ones currently use default sertopics instead of builtin sertopics, and so use different mappings and different instnace ids. No-one ever sees those ids, so it doesn't matter, but it would nicer if it could actually be the same one. FIXME: it would also be nicer if the local built-in topics and the SPDP/SEDP writers were the same, but I want the locally created endpoints visible in the built-in topics as well, and those don't exist in the discovery writers ... */
if (tk)
ddsi_tkmap_instance_unref (tk);
}
static void write_builtin_topic_local (const struct entity_common *e, nn_wctime_t timestamp, bool alive, struct ddsi_tkmap_instance *tk)
{
write_builtin_topic_any(e, timestamp, alive, ownvendorid, tk);
} }
/* DELETED PARTICIPANTS --------------------------------------------- */ /* DELETED PARTICIPANTS --------------------------------------------- */
@ -408,7 +399,6 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
{ {
struct participant *pp; struct participant *pp;
nn_guid_t subguid, group_guid; nn_guid_t subguid, group_guid;
struct ddsi_tkmap_instance *tk;
/* no reserved bits may be set */ /* no reserved bits may be set */
assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0); assert ((flags & ~(RTPS_PF_NO_BUILTIN_READERS | RTPS_PF_NO_BUILTIN_WRITERS | RTPS_PF_PRIVILEGED_PP | RTPS_PF_IS_DDSI2_PP | RTPS_PF_ONLY_LOCAL)) == 0);
@ -451,7 +441,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
pp = os_malloc (sizeof (*pp)); pp = os_malloc (sizeof (*pp));
entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0), &tk); entity_common_init (&pp->e, ppguid, "", EK_PARTICIPANT, now (), ownvendorid, ((flags & RTPS_PF_ONLY_LOCAL) != 0));
pp->user_refc = 1; pp->user_refc = 1;
pp->builtin_refc = 0; pp->builtin_refc = 0;
pp->builtins_deleted = 0; pp->builtins_deleted = 0;
@ -630,7 +620,7 @@ int new_participant_guid (const nn_guid_t *ppguid, unsigned flags, const nn_plis
trigger_recv_threads (); trigger_recv_threads ();
} }
write_builtin_topic_local(&pp->e, now(), true, tk); ddsi_plugin.builtintopic_write (&pp->e, now(), true);
/* SPDP periodic broadcast uses the retransmit path, so the initial /* SPDP periodic broadcast uses the retransmit path, so the initial
publication must be done differently. Must be later than making publication must be done differently. Must be later than making
@ -866,7 +856,7 @@ int delete_participant (const struct nn_guid *ppguid)
struct participant *pp; struct participant *pp;
if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL) if ((pp = ephash_lookup_participant_guid (ppguid)) == NULL)
return ERR_UNKNOWN_ENTITY; return ERR_UNKNOWN_ENTITY;
write_builtin_topic_local(&pp->e, now(), false, NULL); ddsi_plugin.builtintopic_write (&pp->e, now(), false);
remember_deleted_participant_guid (&pp->e.guid); remember_deleted_participant_guid (&pp->e.guid);
ephash_remove_participant_guid (pp); ephash_remove_participant_guid (pp);
gcreq_participant (pp); gcreq_participant (pp);
@ -2077,7 +2067,7 @@ static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn
{ {
int32_t reason; int32_t reason;
(void)tnow; (void)tnow;
if (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid)) if (!is_local_orphan_endpoint (&wr->e) && (is_builtin_entityid (wr->e.guid.entityid, ownvendorid) || is_builtin_entityid (rd->e.guid.entityid, ownvendorid)))
return; return;
if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0) if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0)
{ {
@ -2295,7 +2285,7 @@ static void generic_do_local_match (struct entity_common *e, nn_mtime_t tnow)
struct ephash_enum est; struct ephash_enum est;
struct entity_common *em; struct entity_common *em;
enum entity_kind mkind; enum entity_kind mkind;
if (is_builtin_entityid (e->guid.entityid, ownvendorid)) if (is_builtin_entityid (e->guid.entityid, ownvendorid) && !is_local_orphan_endpoint (e))
/* never a need for local matches on discovery endpoints */ /* never a need for local matches on discovery endpoints */
return; return;
mkind = generic_do_local_match_mkind(e->kind); mkind = generic_do_local_match_mkind(e->kind);
@ -2374,34 +2364,27 @@ static void new_reader_writer_common (const struct nn_guid *guid, const struct d
topic ? topic->typename : "(null)"); topic ? topic->typename : "(null)");
} }
static void endpoint_common_init static void endpoint_common_init (struct entity_common *e, struct endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp)
(
struct entity_common *e,
struct endpoint_common *c,
enum entity_kind kind,
const struct nn_guid *guid,
const struct nn_guid *group_guid,
struct participant *pp,
struct ddsi_tkmap_instance **tk
)
{ {
entity_common_init (e, guid, NULL, kind, ownvendorid, pp->e.onlylocal, tk); entity_common_init (e, guid, NULL, kind, now (), ownvendorid, pp->e.onlylocal);
c->pp = ref_participant (pp, &e->guid); c->pp = ref_participant (pp, &e->guid);
if (group_guid) if (group_guid)
{
c->group_guid = *group_guid; c->group_guid = *group_guid;
}
else else
{
memset (&c->group_guid, 0, sizeof (c->group_guid)); memset (&c->group_guid, 0, sizeof (c->group_guid));
} }
}
static void endpoint_common_fini (struct entity_common *e, struct endpoint_common *c) static void endpoint_common_fini (struct entity_common *e, struct endpoint_common *c)
{ {
if (!is_builtin_entityid(e->guid.entityid, ownvendorid)) if (!is_builtin_entityid(e->guid.entityid, ownvendorid))
pp_release_entityid(c->pp, e->guid.entityid); pp_release_entityid(c->pp, e->guid.entityid);
if (c->pp)
unref_participant (c->pp, &e->guid); unref_participant (c->pp, &e->guid);
else
{
/* only for the (almost pseudo) writers used for generating the built-in topics */
assert (is_local_orphan_endpoint (e));
}
entity_common_fini (e); entity_common_fini (e);
} }
@ -2608,25 +2591,8 @@ unsigned remove_acked_messages (struct writer *wr, struct whc_state *whcst, stru
return n; return n;
} }
static struct writer * new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity) static void new_writer_guid_common_init (struct writer *wr, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void * status_entity)
{ {
struct writer *wr;
nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_writer_guid (guid) == NULL);
assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0);
new_reader_writer_common (guid, topic, xqos);
wr = os_malloc (sizeof (*wr));
/* want a pointer to the participant so that a parallel call to
delete_participant won't interfere with our ability to address
the participant */
endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp, &tk);
os_condInit (&wr->throttle_cond, &wr->e.lock); os_condInit (&wr->throttle_cond, &wr->e.lock);
wr->seq = 0; wr->seq = 0;
wr->cs_seq = 0; wr->cs_seq = 0;
@ -2658,12 +2624,10 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
assert (wr->xqos->aliased == 0); assert (wr->xqos->aliased == 0);
set_topic_type_name (wr->xqos, topic); set_topic_type_name (wr->xqos, topic);
if (dds_get_log_mask() & DDS_LC_DISCOVERY)
{
DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid)); DDS_LOG(DDS_LC_DISCOVERY, "WRITER %x:%x:%x:%x QOS={", PGUID (wr->e.guid));
nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos); nn_log_xqos (DDS_LC_DISCOVERY, wr->xqos);
DDS_LOG(DDS_LC_DISCOVERY, "}\n"); DDS_LOG(DDS_LC_DISCOVERY, "}\n");
}
assert (wr->xqos->present & QP_RELIABILITY); assert (wr->xqos->present & QP_RELIABILITY);
wr->reliable = (wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS); wr->reliable = (wr->xqos->reliability.kind != NN_BEST_EFFORT_RELIABILITY_QOS);
assert (wr->xqos->present & QP_DURABILITY); assert (wr->xqos->present & QP_DURABILITY);
@ -2818,6 +2782,26 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
ut_avlInit (&wr_local_readers_treedef, &wr->local_readers); ut_avlInit (&wr_local_readers_treedef, &wr->local_readers);
local_reader_ary_init (&wr->rdary); local_reader_ary_init (&wr->rdary);
}
static struct writer *new_writer_guid (const struct nn_guid *guid, const struct nn_guid *group_guid, struct participant *pp, const struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc, status_cb_t status_cb, void *status_entity)
{
struct writer *wr;
nn_mtime_t tnow = now_mt ();
assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_writer_guid (guid) == NULL);
assert (memcmp (&guid->prefix, &pp->e.guid.prefix, sizeof (guid->prefix)) == 0);
new_reader_writer_common (guid, topic, xqos);
wr = os_malloc (sizeof (*wr));
/* want a pointer to the participant so that a parallel call to
delete_participant won't interfere with our ability to address
the participant */
endpoint_common_init (&wr->e, &wr->c, EK_WRITER, guid, group_guid, pp);
new_writer_guid_common_init(wr, topic, xqos, whc, status_cb, status_entity);
/* guid_hash needed for protocol handling, so add it before we send /* guid_hash needed for protocol handling, so add it before we send
out our first message. Also: needed for matching, and swapping out our first message. Also: needed for matching, and swapping
@ -2834,7 +2818,7 @@ static struct writer * new_writer_guid (const struct nn_guid *guid, const struct
deleted while we do so */ deleted while we do so */
match_writer_with_proxy_readers (wr, tnow); match_writer_with_proxy_readers (wr, tnow);
match_writer_with_local_readers (wr, tnow); match_writer_with_local_readers (wr, tnow);
write_builtin_topic_local(&wr->e, now(), true, tk); ddsi_plugin.builtintopic_write (&wr->e, now(), true);
sedp_write_writer (wr); sedp_write_writer (wr);
if (wr->lease_duration != T_NEVER) if (wr->lease_duration != T_NEVER)
@ -2866,6 +2850,29 @@ struct writer * new_writer (struct nn_guid *wrguid, const struct nn_guid *group_
return wr; return wr;
} }
struct local_orphan_writer *new_local_orphan_writer (nn_entityid_t entityid, struct ddsi_sertopic *topic, const struct nn_xqos *xqos, struct whc *whc)
{
nn_guid_t guid;
struct local_orphan_writer *lowr;
struct writer *wr;
nn_mtime_t tnow = now_mt ();
DDS_LOG(DDS_LC_DISCOVERY, "new_local_orphan_writer(%s/%s)\n", topic->name, topic->typename);
lowr = os_malloc (sizeof (*lowr));
wr = &lowr->wr;
memset (&guid.prefix, 0, sizeof (guid.prefix));
guid.entityid = entityid;
entity_common_init (&wr->e, &guid, NULL, EK_WRITER, now (), ownvendorid, true);
wr->c.pp = NULL;
memset (&wr->c.group_guid, 0, sizeof (wr->c.group_guid));
new_writer_guid_common_init (wr, topic, xqos, whc, 0, NULL);
ephash_insert_writer_guid (wr);
match_writer_with_local_readers (wr, tnow);
ddsi_plugin.builtintopic_write (&wr->e, now(), true);
return lowr;
}
static void gc_delete_writer (struct gcreq *gcreq) static void gc_delete_writer (struct gcreq *gcreq)
{ {
struct writer *wr = gcreq->arg; struct writer *wr = gcreq->arg;
@ -2960,7 +2967,7 @@ int delete_writer_nolinger_locked (struct writer *wr)
{ {
DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid %x:%x:%x:%x) ...\n", PGUID (wr->e.guid)); DDS_LOG(DDS_LC_DISCOVERY, "delete_writer_nolinger(guid %x:%x:%x:%x) ...\n", PGUID (wr->e.guid));
ASSERT_MUTEX_HELD (&wr->e.lock); ASSERT_MUTEX_HELD (&wr->e.lock);
write_builtin_topic_local(&wr->e, now(), false, NULL); ddsi_plugin.builtintopic_write (&wr->e, now(), false);
local_reader_ary_setinvalid (&wr->rdary); local_reader_ary_setinvalid (&wr->rdary);
ephash_remove_writer_guid (wr); ephash_remove_writer_guid (wr);
writer_set_state (wr, WRST_DELETING); writer_set_state (wr, WRST_DELETING);
@ -2990,6 +2997,13 @@ int delete_writer_nolinger (const struct nn_guid *guid)
return 0; return 0;
} }
void delete_local_orphan_writer (struct local_orphan_writer *lowr)
{
os_mutexLock (&lowr->wr.e.lock);
delete_writer_nolinger_locked (&lowr->wr);
os_mutexUnlock (&lowr->wr.e.lock);
}
int delete_writer (const struct nn_guid *guid) int delete_writer (const struct nn_guid *guid)
{ {
struct writer *wr; struct writer *wr;
@ -3177,7 +3191,6 @@ static struct reader * new_reader_guid
struct reader * rd; struct reader * rd;
nn_mtime_t tnow = now_mt (); nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
assert (!is_writer_entityid (guid->entityid)); assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_reader_guid (guid) == NULL); assert (ephash_lookup_reader_guid (guid) == NULL);
@ -3186,7 +3199,7 @@ static struct reader * new_reader_guid
new_reader_writer_common (guid, topic, xqos); new_reader_writer_common (guid, topic, xqos);
rd = os_malloc (sizeof (*rd)); rd = os_malloc (sizeof (*rd));
endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp, &tk); endpoint_common_init (&rd->e, &rd->c, EK_READER, guid, group_guid, pp);
/* Copy QoS, merging in defaults */ /* Copy QoS, merging in defaults */
rd->xqos = os_malloc (sizeof (*rd->xqos)); rd->xqos = os_malloc (sizeof (*rd->xqos));
@ -3290,7 +3303,7 @@ static struct reader * new_reader_guid
ephash_insert_reader_guid (rd); ephash_insert_reader_guid (rd);
match_reader_with_proxy_writers (rd, tnow); match_reader_with_proxy_writers (rd, tnow);
match_reader_with_local_writers (rd, tnow); match_reader_with_local_writers (rd, tnow);
write_builtin_topic_local(&rd->e, now(), true, tk); ddsi_plugin.builtintopic_write (&rd->e, now(), true);
sedp_write_reader (rd); sedp_write_reader (rd);
return rd; return rd;
} }
@ -3383,7 +3396,7 @@ int delete_reader (const struct nn_guid *guid)
(ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc); (ddsi_plugin.rhc_plugin.rhc_fini_fn) (rd->rhc);
} }
DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid)); DDS_LOG(DDS_LC_DISCOVERY, "delete_reader_guid(guid %x:%x:%x:%x) ...\n", PGUID (*guid));
write_builtin_topic_local(&rd->e, now(), false, NULL); ddsi_plugin.builtintopic_write (&rd->e, now(), false);
ephash_remove_reader_guid (rd); ephash_remove_reader_guid (rd);
gcreq_reader (rd); gcreq_reader (rd);
return 0; return 0;
@ -3459,7 +3472,6 @@ void new_proxy_participant
runs on a single thread, it can't go wrong. FIXME, maybe? The runs on a single thread, it can't go wrong. FIXME, maybe? The
same holds for the other functions for creating entities. */ same holds for the other functions for creating entities. */
struct proxy_participant *proxypp; struct proxy_participant *proxypp;
struct ddsi_tkmap_instance *tk;
assert (ppguid->entityid.u == NN_ENTITYID_PARTICIPANT); assert (ppguid->entityid.u == NN_ENTITYID_PARTICIPANT);
assert (ephash_lookup_proxy_participant_guid (ppguid) == NULL); assert (ephash_lookup_proxy_participant_guid (ppguid) == NULL);
@ -3469,7 +3481,7 @@ void new_proxy_participant
proxypp = os_malloc (sizeof (*proxypp)); proxypp = os_malloc (sizeof (*proxypp));
entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, vendor, false, &tk); entity_common_init (&proxypp->e, ppguid, "", EK_PROXY_PARTICIPANT, timestamp, vendor, false);
proxypp->refc = 1; proxypp->refc = 1;
proxypp->lease_expired = 0; proxypp->lease_expired = 0;
proxypp->vendor = vendor; proxypp->vendor = vendor;
@ -3625,7 +3637,7 @@ void new_proxy_participant
if (proxypp->owns_lease) if (proxypp->owns_lease)
lease_register (os_atomic_ldvoidp (&proxypp->lease)); lease_register (os_atomic_ldvoidp (&proxypp->lease));
write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, tk); ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true);
os_mutexUnlock (&proxypp->e.lock); os_mutexUnlock (&proxypp->e.lock);
} }
@ -3643,7 +3655,7 @@ int update_proxy_participant_plist_locked (struct proxy_participant *proxypp, co
switch (source) switch (source)
{ {
case UPD_PROXYPP_SPDP: case UPD_PROXYPP_SPDP:
write_builtin_topic_any(&proxypp->e, timestamp, true, proxypp->vendor, NULL); ddsi_plugin.builtintopic_write (&proxypp->e, timestamp, true);
proxypp->proxypp_have_spdp = 1; proxypp->proxypp_have_spdp = 1;
break; break;
case UPD_PROXYPP_CM: case UPD_PROXYPP_CM:
@ -3892,7 +3904,7 @@ int delete_proxy_participant_by_guid (const struct nn_guid * guid, nn_wctime_t t
return ERR_UNKNOWN_ENTITY; return ERR_UNKNOWN_ENTITY;
} }
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
write_builtin_topic_any(&ppt->e, timestamp, false, ppt->vendor, NULL); ddsi_plugin.builtintopic_write (&ppt->e, timestamp, false);
remember_deleted_participant_guid (&ppt->e.guid); remember_deleted_participant_guid (&ppt->e.guid);
ephash_remove_proxy_participant_guid (ppt); ephash_remove_proxy_participant_guid (ppt);
os_mutexUnlock (&gv.lock); os_mutexUnlock (&gv.lock);
@ -4021,12 +4033,7 @@ void delete_proxy_group (const nn_guid_t *guid, nn_wctime_t timestamp, int isimp
/* PROXY-ENDPOINT --------------------------------------------------- */ /* PROXY-ENDPOINT --------------------------------------------------- */
static void proxy_endpoint_common_init static void proxy_endpoint_common_init (struct entity_common *e, struct proxy_endpoint_common *c, enum entity_kind kind, const struct nn_guid *guid, nn_wctime_t tcreate, struct proxy_participant *proxypp, struct addrset *as, const nn_plist_t *plist)
(
struct entity_common *e, struct proxy_endpoint_common *c,
enum entity_kind kind, const struct nn_guid *guid, struct proxy_participant *proxypp,
struct addrset *as, const nn_plist_t *plist, struct ddsi_tkmap_instance **tk
)
{ {
const char *name; const char *name;
@ -4036,7 +4043,7 @@ static void proxy_endpoint_common_init
assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == (QP_TOPIC_NAME | QP_TYPE_NAME)); assert ((plist->qos.present & (QP_TOPIC_NAME | QP_TYPE_NAME)) == (QP_TOPIC_NAME | QP_TYPE_NAME));
name = (plist->present & PP_ENTITY_NAME) ? plist->entity_name : ""; name = (plist->present & PP_ENTITY_NAME) ? plist->entity_name : "";
entity_common_init (e, guid, name, kind, proxypp->vendor, false, tk); entity_common_init (e, guid, name, kind, tcreate, proxypp->vendor, false);
c->xqos = nn_xqos_dup (&plist->qos); c->xqos = nn_xqos_dup (&plist->qos);
c->as = ref_addrset (as); c->as = ref_addrset (as);
c->topic = NULL; /* set from first matching reader/writer */ c->topic = NULL; /* set from first matching reader/writer */
@ -4071,8 +4078,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
struct proxy_writer *pwr; struct proxy_writer *pwr;
int isreliable; int isreliable;
nn_mtime_t tnow = now_mt (); nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
(void)timestamp;
assert (is_writer_entityid (guid->entityid)); assert (is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_writer_guid (guid) == NULL); assert (ephash_lookup_proxy_writer_guid (guid) == NULL);
@ -4083,7 +4089,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
} }
pwr = os_malloc (sizeof (*pwr)); pwr = os_malloc (sizeof (*pwr));
proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, proxypp, as, plist, &tk); proxy_endpoint_common_init (&pwr->e, &pwr->c, EK_PROXY_WRITER, guid, timestamp, proxypp, as, plist);
ut_avlInit (&pwr_readers_treedef, &pwr->readers); ut_avlInit (&pwr_readers_treedef, &pwr->readers);
pwr->n_reliable_readers = 0; pwr->n_reliable_readers = 0;
@ -4149,7 +4155,7 @@ int new_proxy_writer (const struct nn_guid *ppguid, const struct nn_guid *guid,
local_reader_ary_init (&pwr->rdary); local_reader_ary_init (&pwr->rdary);
ephash_insert_proxy_writer_guid (pwr); ephash_insert_proxy_writer_guid (pwr);
match_proxy_writer_with_readers (pwr, tnow); match_proxy_writer_with_readers (pwr, tnow);
write_builtin_topic_any(&pwr->e, timestamp, true, pwr->c.vendor, tk); ddsi_plugin.builtintopic_write (&pwr->e, timestamp, true);
os_mutexLock (&pwr->e.lock); os_mutexLock (&pwr->e.lock);
pwr->local_matching_inprogress = 0; pwr->local_matching_inprogress = 0;
@ -4265,7 +4271,6 @@ static void gc_delete_proxy_writer (struct gcreq *gcreq)
int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit) int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit)
{ {
struct proxy_writer *pwr; struct proxy_writer *pwr;
(void)timestamp;
(void)isimplicit; (void)isimplicit;
DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_writer (%x:%x:%x:%x) ", PGUID (*guid)); DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_writer (%x:%x:%x:%x) ", PGUID (*guid));
os_mutexLock (&gv.lock); os_mutexLock (&gv.lock);
@ -4281,7 +4286,7 @@ int delete_proxy_writer (const struct nn_guid *guid, nn_wctime_t timestamp, int
from removing themselves from the proxy writer's rdary[]. */ from removing themselves from the proxy writer's rdary[]. */
local_reader_ary_setinvalid (&pwr->rdary); local_reader_ary_setinvalid (&pwr->rdary);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");
write_builtin_topic_any(&pwr->e, timestamp, false, pwr->c.vendor, NULL); ddsi_plugin.builtintopic_write (&pwr->e, timestamp, false);
ephash_remove_proxy_writer_guid (pwr); ephash_remove_proxy_writer_guid (pwr);
os_mutexUnlock (&gv.lock); os_mutexUnlock (&gv.lock);
gcreq_proxy_writer (pwr); gcreq_proxy_writer (pwr);
@ -4299,8 +4304,6 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
struct proxy_participant *proxypp; struct proxy_participant *proxypp;
struct proxy_reader *prd; struct proxy_reader *prd;
nn_mtime_t tnow = now_mt (); nn_mtime_t tnow = now_mt ();
struct ddsi_tkmap_instance *tk;
(void)timestamp;
assert (!is_writer_entityid (guid->entityid)); assert (!is_writer_entityid (guid->entityid));
assert (ephash_lookup_proxy_reader_guid (guid) == NULL); assert (ephash_lookup_proxy_reader_guid (guid) == NULL);
@ -4312,7 +4315,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
} }
prd = os_malloc (sizeof (*prd)); prd = os_malloc (sizeof (*prd));
proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, proxypp, as, plist, &tk); proxy_endpoint_common_init (&prd->e, &prd->c, EK_PROXY_READER, guid, timestamp, proxypp, as, plist);
prd->deleting = 0; prd->deleting = 0;
#ifdef DDSI_INCLUDE_SSM #ifdef DDSI_INCLUDE_SSM
@ -4326,7 +4329,7 @@ int new_proxy_reader (const struct nn_guid *ppguid, const struct nn_guid *guid,
ut_avlInit (&prd_writers_treedef, &prd->writers); ut_avlInit (&prd_writers_treedef, &prd->writers);
ephash_insert_proxy_reader_guid (prd); ephash_insert_proxy_reader_guid (prd);
match_proxy_reader_with_writers (prd, tnow); match_proxy_reader_with_writers (prd, tnow);
write_builtin_topic_any(&prd->e, timestamp, true, prd->c.vendor, tk); ddsi_plugin.builtintopic_write (&prd->e, timestamp, true);
return 0; return 0;
} }
@ -4399,7 +4402,6 @@ static void gc_delete_proxy_reader (struct gcreq *gcreq)
int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit) int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int isimplicit)
{ {
struct proxy_reader *prd; struct proxy_reader *prd;
(void)timestamp;
(void)isimplicit; (void)isimplicit;
DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_reader (%x:%x:%x:%x) ", PGUID (*guid)); DDS_LOG(DDS_LC_DISCOVERY, "delete_proxy_reader (%x:%x:%x:%x) ", PGUID (*guid));
os_mutexLock (&gv.lock); os_mutexLock (&gv.lock);
@ -4409,7 +4411,7 @@ int delete_proxy_reader (const struct nn_guid *guid, nn_wctime_t timestamp, int
DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n"); DDS_LOG(DDS_LC_DISCOVERY, "- unknown\n");
return ERR_UNKNOWN_ENTITY; return ERR_UNKNOWN_ENTITY;
} }
write_builtin_topic_any(&prd->e, timestamp, false, prd->c.vendor, NULL); ddsi_plugin.builtintopic_write (&prd->e, timestamp, false);
ephash_remove_proxy_reader_guid (prd); ephash_remove_proxy_reader_guid (prd);
os_mutexUnlock (&gv.lock); os_mutexUnlock (&gv.lock);
DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n"); DDS_LOG(DDS_LC_DISCOVERY, "- deleting\n");

View file

@ -180,6 +180,14 @@ struct gcreq_queue *gcreq_queue_new (void)
return q; return q;
} }
void gcreq_queue_drain (struct gcreq_queue *q)
{
os_mutexLock (&q->lock);
while (q->count != 0)
os_condWait (&q->cond, &q->lock);
os_mutexUnlock (&q->lock);
}
void gcreq_queue_free (struct gcreq_queue *q) void gcreq_queue_free (struct gcreq_queue *q)
{ {
struct gcreq *gcreq; struct gcreq *gcreq;
@ -191,7 +199,8 @@ void gcreq_queue_free (struct gcreq_queue *q)
os_mutexLock (&q->lock); os_mutexLock (&q->lock);
q->terminate = 1; q->terminate = 1;
/* Wait until there is only request in existence, the one we just /* Wait until there is only request in existence, the one we just
allocated. Then we know the gc system is quiet. */ allocated (this is also why we can't use "drain" here). Then
we know the gc system is quiet. */
while (q->count != 1) while (q->count != 1)
os_condWait (&q->cond, &q->lock); os_condWait (&q->cond, &q->lock);
os_mutexUnlock (&q->lock); os_mutexUnlock (&q->lock);
@ -227,7 +236,7 @@ void gcreq_free (struct gcreq *gcreq)
struct gcreq_queue *gcreq_queue = gcreq->queue; struct gcreq_queue *gcreq_queue = gcreq->queue;
os_mutexLock (&gcreq_queue->lock); os_mutexLock (&gcreq_queue->lock);
--gcreq_queue->count; --gcreq_queue->count;
if (gcreq_queue->terminate && gcreq_queue->count <= 1) if (gcreq_queue->count <= 1)
os_condBroadcast (&gcreq_queue->cond); os_condBroadcast (&gcreq_queue->cond);
os_mutexUnlock (&gcreq_queue->lock); os_mutexUnlock (&gcreq_queue->lock);
os_free (gcreq); os_free (gcreq);

View file

@ -54,7 +54,6 @@
#include "ddsi/ddsi_raweth.h" #include "ddsi/ddsi_raweth.h"
#include "ddsi/ddsi_mcgroup.h" #include "ddsi/ddsi_mcgroup.h"
#include "ddsi/ddsi_serdata_default.h" #include "ddsi/ddsi_serdata_default.h"
#include "ddsi/ddsi_serdata_builtin.h"
#include "ddsi/ddsi_tkmap.h" #include "ddsi/ddsi_tkmap.h"
#include "dds__whc.h" #include "dds__whc.h"
@ -772,18 +771,12 @@ static void make_special_topics (void)
{ {
gv.plist_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist); gv.plist_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? PL_CDR_LE : PL_CDR_BE, &ddsi_serdata_ops_plist);
gv.rawcdr_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr); gv.rawcdr_topic = make_special_topic (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE, &ddsi_serdata_ops_rawcdr);
gv.builtin_participant_topic = new_sertopic_builtin (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
gv.builtin_reader_topic = new_sertopic_builtin (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
gv.builtin_writer_topic = new_sertopic_builtin (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
} }
static void free_special_topics (void) static void free_special_topics (void)
{ {
ddsi_sertopic_unref (gv.plist_topic); ddsi_sertopic_unref (gv.plist_topic);
ddsi_sertopic_unref (gv.rawcdr_topic); ddsi_sertopic_unref (gv.rawcdr_topic);
ddsi_sertopic_unref (gv.builtin_participant_topic);
ddsi_sertopic_unref (gv.builtin_reader_topic);
ddsi_sertopic_unref (gv.builtin_writer_topic);
} }
static int setup_and_start_recv_threads (void) static int setup_and_start_recv_threads (void)
@ -1408,7 +1401,7 @@ static void builtins_dqueue_ready_cb (void *varg)
os_mutexUnlock (&arg->lock); os_mutexUnlock (&arg->lock);
} }
void rtps_term (void) void rtps_stop (void)
{ {
struct thread_state1 *self = lookup_thread_state (); struct thread_state1 *self = lookup_thread_state ();
#ifdef DDSI_INCLUDE_NETWORK_CHANNELS #ifdef DDSI_INCLUDE_NETWORK_CHANNELS
@ -1527,17 +1520,26 @@ void rtps_term (void)
} }
/* Wait until all participants are really gone => by then we can be /* Wait until all participants are really gone => by then we can be
certain that no new GC requests will be added */ certain that no new GC requests will be added, short of what we
do here */
os_mutexLock (&gv.participant_set_lock); os_mutexLock (&gv.participant_set_lock);
while (gv.nparticipants > 0) while (gv.nparticipants > 0)
os_condWait (&gv.participant_set_cond, &gv.participant_set_lock); os_condWait (&gv.participant_set_cond, &gv.participant_set_lock);
os_mutexUnlock (&gv.participant_set_lock); os_mutexUnlock (&gv.participant_set_lock);
/* Wait until no more GC requests are outstanding -- not really
necessary, but it allows us to claim the stack is quiescent
at this point */
gcreq_queue_drain (gv.gcreq_queue);
/* Clean up privileged_pp -- it must be NULL now (all participants /* Clean up privileged_pp -- it must be NULL now (all participants
are gone), but the lock still needs to be destroyed */ are gone), but the lock still needs to be destroyed */
assert (gv.privileged_pp == NULL); assert (gv.privileged_pp == NULL);
os_mutexDestroy (&gv.privileged_pp_lock); os_mutexDestroy (&gv.privileged_pp_lock);
}
void rtps_fini (void)
{
/* Shut down the GC system -- no new requests will be added */ /* Shut down the GC system -- no new requests will be added */
gcreq_queue_free (gv.gcreq_queue); gcreq_queue_free (gv.gcreq_queue);

View file

@ -893,7 +893,7 @@ static int insert_sample_in_whc (struct writer *wr, seqno_t seq, struct nn_plist
res = 1; res = 1;
#ifndef NDEBUG #ifndef NDEBUG
if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) if (wr->e.guid.entityid.u == NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER && !is_local_orphan_endpoint (&wr->e))
{ {
struct whc_state whcst; struct whc_state whcst;
whc_get_state(wr->whc, &whcst); whc_get_state(wr->whc, &whcst);

View file

@ -95,7 +95,7 @@ endif()
if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro") if(${CMAKE_C_COMPILER_ID} STREQUAL "SunPro")
target_link_libraries(OSAPI INTERFACE -lsocket -lnsl) target_link_libraries(OSAPI INTERFACE -lsocket -lnsl)
target_compile_definitions(OSAPI PRIVATE -KPIC) add_definitions(-KPIC)
endif() endif()
# Determine if platform is big or little endian. # Determine if platform is big or little endian.

View file

@ -74,8 +74,6 @@ extern "C" {
typedef size_t os_iov_len_t; typedef size_t os_iov_len_t;
#if defined(__sun) && !defined(_XPG4_2) #if defined(__sun) && !defined(_XPG4_2)
#define msg_accrights msg_control
#define msg_accrightslen msg_controllen
#define OS_MSGHDR_FLAGS 0 #define OS_MSGHDR_FLAGS 0
#else #else
#define OS_MSGHDR_FLAGS 1 #define OS_MSGHDR_FLAGS 1

View file

@ -190,72 +190,6 @@ CU_Test(os_mutex, basic)
printf("Ending os_mutex_basic\n"); printf("Ending os_mutex_basic\n");
} }
#define RUNTIME_SEC (4)
#define NUM_THREADS (8)
#define OS_STRESS_STOP (0)
#define OS_STRESS_GO (1)
#define THREAD_NAME_LEN (8)
struct os_mutex_stress {
os_threadId tid;
os_mutex m;
os_atomic_uint32_t * flag;
char name[THREAD_NAME_LEN];
};
static uint32_t
os_mutex_init_thr(
void *args)
{
struct os_mutex_stress *state = (struct os_mutex_stress *)args;
os_result r;
uint32_t iterations = 0;
do {
os_mutexInit(&state->m);
r = os_mutexLock_s(&state->m); /* Use the mutex to check that all is OK. */
CU_ASSERT_EQUAL(r, os_resultSuccess); /* Failure can't be forced. */
os_mutexUnlock(&state->m);
os_mutexDestroy(&state->m);
iterations++;
} while ( os_atomic_ld32(state->flag) != OS_STRESS_STOP && r == os_resultSuccess);
printf("%s <%"PRIxMAX">: Performed %u iterations. Stopping now.\n", state->name, os_threadIdToInteger(os_threadIdSelf()), iterations);
return r != os_resultSuccess; /* Return true on faulure */
}
CU_Test(os_mutex, init_stress)
{
struct os_mutex_stress threads[NUM_THREADS];
os_threadAttr tattr;
unsigned i;
os_atomic_uint32_t flag = OS_ATOMIC_UINT32_INIT(OS_STRESS_GO);
os_time runtime = { .tv_sec = RUNTIME_SEC, .tv_nsec = 0 };
printf("Starting os_mutex_init_stress\n");
os_threadAttrInit(&tattr);
for ( i = 0; i < NUM_THREADS; i++ ) {
(void) snprintf(&threads[i].name[0], THREAD_NAME_LEN, "thr%u", i);
threads[i].flag = &flag;
os_threadCreate(&threads[i].tid, threads[i].name, &tattr, &os_mutex_init_thr, &threads[i]);
printf("main <%"PRIxMAX">: Started thread '%s' with thread-id %" PRIxMAX "\n", os_threadIdToInteger(os_threadIdSelf()), threads[i].name, os_threadIdToInteger(threads[i].tid));
}
printf("main <%"PRIxMAX">: Test will run for ~%ds with %d threads\n", os_threadIdToInteger(os_threadIdSelf()), RUNTIME_SEC, NUM_THREADS);
os_nanoSleep(runtime);
os_atomic_st32(&flag, OS_STRESS_STOP);
for ( ; i != 0; i-- ) {
uint32_t thread_failed;
os_threadWaitExit(threads[i - 1].tid, &thread_failed);
printf("main <%"PRIxMAX">: Thread %s <%" PRIxMAX "> stopped with result %s.\n", os_threadIdToInteger(os_threadIdSelf()), threads[i - 1].name, os_threadIdToInteger(threads[i - 1].tid), thread_failed ? "FAILED" : "PASS");
CU_ASSERT_FALSE(thread_failed);
}
printf("Ending os_mutex_init_stress\n");
}
CU_Test(os_mutex, lock, false) CU_Test(os_mutex, lock, false)
{ {
/* Test critical section access with locking and PRIVATE scope */ /* Test critical section access with locking and PRIVATE scope */

View file

@ -192,22 +192,22 @@ void qp_duration_qos (const dds_qos_t *q, FILE *fp, const char *what, bool (*qge
void qp_lifespan (const dds_qos_t *q, FILE *fp) void qp_lifespan (const dds_qos_t *q, FILE *fp)
{ {
qp_duration_qos (q, fp, "lifespan: duration = ", dds_qget_lifespan); qp_duration_qos (q, fp, "lifespan: duration", dds_qget_lifespan);
} }
void qp_deadline (const dds_qos_t *q, FILE *fp) void qp_deadline (const dds_qos_t *q, FILE *fp)
{ {
qp_duration_qos (q, fp, "deadline: period = ", dds_qget_deadline); qp_duration_qos (q, fp, "deadline: period", dds_qget_deadline);
} }
void qp_latency_budget (const dds_qos_t *q, FILE *fp) void qp_latency_budget (const dds_qos_t *q, FILE *fp)
{ {
qp_duration_qos (q, fp, "latency_budget: duration = ", dds_qget_latency_budget); qp_duration_qos (q, fp, "latency_budget: duration", dds_qget_latency_budget);
} }
void qp_time_based_filter (const dds_qos_t *q, FILE *fp) void qp_time_based_filter (const dds_qos_t *q, FILE *fp)
{ {
qp_duration_qos (q, fp, "time_based_filter: minimum_separation = ", dds_qget_time_based_filter); qp_duration_qos (q, fp, "time_based_filter: minimum_separation", dds_qget_time_based_filter);
} }
void qp_ownership (const dds_qos_t *q, FILE *fp) void qp_ownership (const dds_qos_t *q, FILE *fp)