Eliminate domain-specific global variables
This commit moves all but a handful of the global variables into the domain object, in particular including the DDSI configuration, globals and all transport internal state. The goal of this commit is not to produce the nicest code possible, but to get a working version that can support multiple simultaneous domains. Various choices are driven by this desire and it is expected that some of the changes will have to be undone. (E.g., passing the DDSI globals into address set operations and locator printing because there is no other way to figure out what transport to use for a given locator; storing the transport pointer inside the locator would solve that.) Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
8a591fdc9b
commit
c1f3ad8a22
93 changed files with 2802 additions and 2915 deletions
|
@ -28,10 +28,6 @@ dds_entity_t dds__get_builtin_subscriber (dds_entity_t e);
|
|||
/* Checks whether the reader QoS is valid for use with built-in topic TOPIC */
|
||||
bool dds__validate_builtin_reader_qos (const dds_domain *dom, dds_entity_t topic, const dds_qos_t *qos);
|
||||
|
||||
/* Init/fini for builtin-topic support that is global across domains */
|
||||
void dds__builtin_init_global (void);
|
||||
void dds__builtin_fini_global (void);
|
||||
|
||||
void dds__builtin_init (struct dds_domain *dom);
|
||||
void dds__builtin_fini (struct dds_domain *dom);
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ struct dds_rhc;
|
|||
struct dds_reader;
|
||||
struct ddsi_sertopic;
|
||||
|
||||
DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct ddsi_tkmap *tkmap, const struct ddsi_sertopic *topic, bool xchecks);
|
||||
DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
|
|
|
@ -33,15 +33,17 @@ enum ddsi_sertopic_builtintopic_type {
|
|||
DSBT_WRITER
|
||||
};
|
||||
|
||||
struct q_globals;
|
||||
struct ddsi_sertopic_builtintopic {
|
||||
struct ddsi_sertopic c;
|
||||
enum ddsi_sertopic_builtintopic_type type;
|
||||
struct q_globals *gv;
|
||||
};
|
||||
|
||||
extern const struct ddsi_sertopic_ops ddsi_sertopic_ops_builtintopic;
|
||||
extern const struct ddsi_serdata_ops ddsi_serdata_ops_builtintopic;
|
||||
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename);
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename, struct q_globals *gv);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "dds/dds.h"
|
||||
#include "dds/ddsrt/sync.h"
|
||||
#include "dds/ddsi/q_rtps.h"
|
||||
#include "dds/ddsi/q_globals.h"
|
||||
#include "dds/ddsrt/avl.h"
|
||||
#include "dds/ddsi/ddsi_builtin_topic_if.h"
|
||||
#include "dds__handles.h"
|
||||
|
@ -97,15 +98,20 @@ typedef struct dds_domain {
|
|||
ddsrt_avl_node_t m_node;
|
||||
dds_domainid_t m_id;
|
||||
ddsrt_avl_tree_t m_topics;
|
||||
struct dds_entity *ppants;
|
||||
ddsrt_avl_tree_t m_ppants;
|
||||
uint32_t m_refc;
|
||||
struct cfgst *cfgst;
|
||||
|
||||
struct ddsi_sertopic *builtin_participant_topic;
|
||||
struct ddsi_sertopic *builtin_reader_topic;
|
||||
struct ddsi_sertopic *builtin_writer_topic;
|
||||
|
||||
struct local_orphan_writer *builtintopic_writer_participant;
|
||||
struct local_orphan_writer *builtintopic_writer_publications;
|
||||
struct local_orphan_writer *builtintopic_writer_subscriptions;
|
||||
|
||||
struct ddsi_builtin_topic_interface btif;
|
||||
struct q_globals gv;
|
||||
} dds_domain;
|
||||
|
||||
struct dds_entity;
|
||||
|
@ -242,6 +248,7 @@ typedef struct dds_writer {
|
|||
struct nn_xpack *m_xp;
|
||||
struct writer *m_wr;
|
||||
struct whc *m_whc; /* FIXME: ownership still with underlying DDSI writer (cos of DDSI built-in writers )*/
|
||||
bool whc_batch; /* FIXME: channels + latency budget */
|
||||
|
||||
/* Status metrics */
|
||||
|
||||
|
@ -311,10 +318,6 @@ typedef struct dds_globals {
|
|||
|
||||
uint32_t threadmon_count;
|
||||
struct ddsi_threadmon *threadmon;
|
||||
|
||||
struct ddsi_sertopic *builtin_participant_topic;
|
||||
struct ddsi_sertopic *builtin_reader_topic;
|
||||
struct ddsi_sertopic *builtin_writer_topic;
|
||||
} dds_globals;
|
||||
|
||||
DDS_EXPORT extern dds_globals dds_global;
|
||||
|
|
|
@ -18,7 +18,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct whc *whc_new (int is_transient_local, uint32_t hdepth, uint32_t tldepth);
|
||||
struct q_globals;
|
||||
struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type);
|
||||
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct ephash *guid_hash);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ typedef enum {
|
|||
|
||||
dds_return_t dds_write_impl (dds_writer *wr, const void *data, dds_time_t tstamp, dds_write_action action);
|
||||
dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time_t tstamp, dds_write_action action);
|
||||
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d);
|
||||
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d, bool flush);
|
||||
|
||||
#if defined (__cplusplus)
|
||||
}
|
||||
|
|
|
@ -54,13 +54,13 @@ dds_entity_t dds__get_builtin_topic (dds_entity_t entity, dds_entity_t topic)
|
|||
switch (topic)
|
||||
{
|
||||
case DDS_BUILTIN_TOPIC_DCPSPARTICIPANT:
|
||||
sertopic = dds_global.builtin_participant_topic;
|
||||
sertopic = e->m_domain->builtin_participant_topic;
|
||||
break;
|
||||
case DDS_BUILTIN_TOPIC_DCPSPUBLICATION:
|
||||
sertopic = dds_global.builtin_writer_topic;
|
||||
sertopic = e->m_domain->builtin_writer_topic;
|
||||
break;
|
||||
case DDS_BUILTIN_TOPIC_DCPSSUBSCRIPTION:
|
||||
sertopic = dds_global.builtin_reader_topic;
|
||||
sertopic = e->m_domain->builtin_reader_topic;
|
||||
break;
|
||||
default:
|
||||
assert (0);
|
||||
|
@ -165,14 +165,14 @@ static bool dds__builtin_is_visible (const nn_guid_t *guid, nn_vendorid_t vendor
|
|||
|
||||
static struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn_guid *guid, void *vdomain)
|
||||
{
|
||||
struct dds_domain *domain = vdomain;
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
struct ddsi_serdata *sd;
|
||||
struct nn_keyhash kh;
|
||||
(void) vdomain;
|
||||
memcpy (&kh, guid, sizeof (kh));
|
||||
/* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
|
||||
sd = ddsi_serdata_from_keyhash (dds_global.builtin_participant_topic, &kh);
|
||||
tk = ddsi_tkmap_find (sd, false, true);
|
||||
sd = ddsi_serdata_from_keyhash (domain->builtin_participant_topic, &kh);
|
||||
tk = ddsi_tkmap_find (domain->gv.m_tkmap, sd, false, true);
|
||||
ddsi_serdata_unref (sd);
|
||||
return tk;
|
||||
}
|
||||
|
@ -180,6 +180,7 @@ static struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn
|
|||
struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn_wctime_t timestamp, bool alive)
|
||||
{
|
||||
/* initialize to avoid gcc warning ultimately caused by C's horrible type system */
|
||||
struct dds_domain *dom = e->gv->builtin_topic_interface->arg;
|
||||
struct ddsi_sertopic *topic = NULL;
|
||||
struct ddsi_serdata *serdata;
|
||||
struct nn_keyhash keyhash;
|
||||
|
@ -187,15 +188,15 @@ struct ddsi_serdata *dds__builtin_make_sample (const struct entity_common *e, nn
|
|||
{
|
||||
case EK_PARTICIPANT:
|
||||
case EK_PROXY_PARTICIPANT:
|
||||
topic = dds_global.builtin_participant_topic;
|
||||
topic = dom->builtin_participant_topic;
|
||||
break;
|
||||
case EK_WRITER:
|
||||
case EK_PROXY_WRITER:
|
||||
topic = dds_global.builtin_writer_topic;
|
||||
topic = dom->builtin_writer_topic;
|
||||
break;
|
||||
case EK_READER:
|
||||
case EK_PROXY_READER:
|
||||
topic = dds_global.builtin_reader_topic;
|
||||
topic = dom->builtin_reader_topic;
|
||||
break;
|
||||
}
|
||||
assert (topic != NULL);
|
||||
|
@ -230,24 +231,10 @@ static void dds__builtin_write (const struct entity_common *e, nn_wctime_t times
|
|||
bwr = dom->builtintopic_writer_subscriptions;
|
||||
break;
|
||||
}
|
||||
dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata);
|
||||
dds_writecdr_impl_lowlevel (&bwr->wr, NULL, serdata, true);
|
||||
}
|
||||
}
|
||||
|
||||
void dds__builtin_init_global (void)
|
||||
{
|
||||
dds_global.builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant");
|
||||
dds_global.builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription");
|
||||
dds_global.builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication");
|
||||
}
|
||||
|
||||
void dds__builtin_fini_global (void)
|
||||
{
|
||||
ddsi_sertopic_unref (dds_global.builtin_participant_topic);
|
||||
ddsi_sertopic_unref (dds_global.builtin_reader_topic);
|
||||
ddsi_sertopic_unref (dds_global.builtin_writer_topic);
|
||||
}
|
||||
|
||||
void dds__builtin_init (struct dds_domain *dom)
|
||||
{
|
||||
dds_qos_t *qos = dds__create_builtin_qos ();
|
||||
|
@ -257,11 +244,16 @@ void dds__builtin_init (struct dds_domain *dom)
|
|||
dom->btif.builtintopic_is_builtintopic = dds__builtin_is_builtintopic;
|
||||
dom->btif.builtintopic_is_visible = dds__builtin_is_visible;
|
||||
dom->btif.builtintopic_write = dds__builtin_write;
|
||||
gv.builtin_topic_interface = &dom->btif;
|
||||
dom->gv.builtin_topic_interface = &dom->btif;
|
||||
|
||||
dom->builtintopic_writer_participant = new_local_orphan_writer (to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dds_global.builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT));
|
||||
dom->builtintopic_writer_publications = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dds_global.builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER));
|
||||
dom->builtintopic_writer_subscriptions = new_local_orphan_writer (to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dds_global.builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER));
|
||||
dom->builtin_participant_topic = new_sertopic_builtintopic (DSBT_PARTICIPANT, "DCPSParticipant", "org::eclipse::cyclonedds::builtin::DCPSParticipant", &dom->gv);
|
||||
dom->builtin_reader_topic = new_sertopic_builtintopic (DSBT_READER, "DCPSSubscription", "org::eclipse::cyclonedds::builtin::DCPSSubscription", &dom->gv);
|
||||
dom->builtin_writer_topic = new_sertopic_builtintopic (DSBT_WRITER, "DCPSPublication", "org::eclipse::cyclonedds::builtin::DCPSPublication", &dom->gv);
|
||||
|
||||
const struct ephash *gh = dom->gv.guid_hash;
|
||||
dom->builtintopic_writer_participant = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER), dom->builtin_participant_topic, qos, builtintopic_whc_new (DSBT_PARTICIPANT, gh));
|
||||
dom->builtintopic_writer_publications = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER), dom->builtin_writer_topic, qos, builtintopic_whc_new (DSBT_WRITER, gh));
|
||||
dom->builtintopic_writer_subscriptions = new_local_orphan_writer (&dom->gv, to_entityid (NN_ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER), dom->builtin_reader_topic, qos, builtintopic_whc_new (DSBT_READER, gh));
|
||||
|
||||
dds_delete_qos (qos);
|
||||
}
|
||||
|
@ -269,9 +261,13 @@ void dds__builtin_init (struct dds_domain *dom)
|
|||
void dds__builtin_fini (struct dds_domain *dom)
|
||||
{
|
||||
/* No more sources for builtin topic samples */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
thread_state_awake (lookup_thread_state (), &dom->gv);
|
||||
delete_local_orphan_writer (dom->builtintopic_writer_participant);
|
||||
delete_local_orphan_writer (dom->builtintopic_writer_publications);
|
||||
delete_local_orphan_writer (dom->builtintopic_writer_subscriptions);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
ddsi_sertopic_unref (dom->builtin_participant_topic);
|
||||
ddsi_sertopic_unref (dom->builtin_reader_topic);
|
||||
ddsi_sertopic_unref (dom->builtin_writer_topic);
|
||||
}
|
||||
|
|
|
@ -49,10 +49,10 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
domain->m_refc = 1;
|
||||
ddsrt_avl_init (&dds_topictree_def, &domain->m_topics);
|
||||
|
||||
gv.tstart = now ();
|
||||
domain->gv.tstart = now ();
|
||||
|
||||
(void) ddsrt_getenv ("CYCLONEDDS_URI", &uri);
|
||||
domain->cfgst = config_init (uri, &config);
|
||||
domain->cfgst = config_init (uri, &domain->gv.config);
|
||||
if (domain->cfgst == NULL)
|
||||
{
|
||||
DDS_LOG (DDS_LC_CONFIG, "Failed to parse configuration XML file %s\n", uri);
|
||||
|
@ -69,23 +69,23 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
ret = DDS_RETCODE_ERROR;
|
||||
goto fail_config_domainid;
|
||||
}
|
||||
else if (config.domainId.isdefault)
|
||||
else if (domain->gv.config.domainId.isdefault)
|
||||
{
|
||||
config.domainId.value = domain_id;
|
||||
domain->gv.config.domainId.value = domain_id;
|
||||
}
|
||||
else if (domain_id != config.domainId.value)
|
||||
else if (domain_id != domain->gv.config.domainId.value)
|
||||
{
|
||||
DDS_ERROR ("requested domain id %"PRId32" is inconsistent with configured value %"PRId32"\n", domain_id, config.domainId.value);
|
||||
DDS_ERROR ("requested domain id %"PRId32" is inconsistent with configured value %"PRId32"\n", domain_id, domain->gv.config.domainId.value);
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
goto fail_config_domainid;
|
||||
}
|
||||
}
|
||||
|
||||
/* FIXME: The config.domainId can change internally in DDSI. So, remember what the
|
||||
/* FIXME: The gv.config.domainId can change internally in DDSI. So, remember what the
|
||||
* main configured domain id is. */
|
||||
domain->m_id = config.domainId.value;
|
||||
domain->m_id = domain->gv.config.domainId.value;
|
||||
|
||||
if (rtps_config_prep (domain->cfgst) != 0)
|
||||
if (rtps_config_prep (&domain->gv, domain->cfgst) != 0)
|
||||
{
|
||||
DDS_LOG (DDS_LC_CONFIG, "Failed to configure RTPS\n");
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
|
@ -93,11 +93,11 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
}
|
||||
|
||||
/* Start monitoring the liveliness of all threads. */
|
||||
if (config.liveliness_monitoring)
|
||||
if (domain->gv.config.liveliness_monitoring)
|
||||
{
|
||||
if (++dds_global.threadmon_count == 0)
|
||||
{
|
||||
dds_global.threadmon = ddsi_threadmon_new ();
|
||||
dds_global.threadmon = ddsi_threadmon_new (domain->gv.config.liveliness_monitoring_interval, domain->gv.config.noprogress_log_stacktraces);
|
||||
if (dds_global.threadmon == NULL)
|
||||
{
|
||||
DDS_ERROR ("Failed to create a thread liveliness monitor\n");
|
||||
|
@ -107,7 +107,7 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
}
|
||||
}
|
||||
|
||||
if (rtps_init () < 0)
|
||||
if (rtps_init (&domain->gv) < 0)
|
||||
{
|
||||
DDS_LOG (DDS_LC_CONFIG, "Failed to initialize RTPS\n");
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
|
@ -116,16 +116,17 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
|
||||
dds__builtin_init (domain);
|
||||
|
||||
if (rtps_start () < 0)
|
||||
if (rtps_start (&domain->gv) < 0)
|
||||
{
|
||||
DDS_LOG (DDS_LC_CONFIG, "Failed to start RTPS\n");
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
goto fail_rtps_start;
|
||||
}
|
||||
|
||||
if (config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
{
|
||||
if (ddsi_threadmon_start (dds_global.threadmon) < 0)
|
||||
const char *name = "threadmon";
|
||||
if (ddsi_threadmon_start (dds_global.threadmon, name, lookup_thread_properties (&domain->gv.config, name)) < 0)
|
||||
{
|
||||
DDS_ERROR ("Failed to start the thread liveliness monitor\n");
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
|
@ -137,31 +138,31 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
|
|||
|
||||
char progname[50] = "UNKNOWN"; /* FIXME: once retrieving process names is back in */
|
||||
char hostname[64];
|
||||
gv.default_plist_pp.process_id = (unsigned) ddsrt_getpid();
|
||||
gv.default_plist_pp.present |= PP_PRISMTECH_PROCESS_ID;
|
||||
gv.default_plist_pp.exec_name = dds_string_alloc(32);
|
||||
(void) snprintf (gv.default_plist_pp.exec_name, 32, "CycloneDDS: %u", gv.default_plist_pp.process_id);
|
||||
len = (uint32_t) (13 + strlen (gv.default_plist_pp.exec_name));
|
||||
gv.default_plist_pp.present |= PP_PRISMTECH_EXEC_NAME;
|
||||
domain->gv.default_plist_pp.process_id = (unsigned) ddsrt_getpid();
|
||||
domain->gv.default_plist_pp.present |= PP_PRISMTECH_PROCESS_ID;
|
||||
domain->gv.default_plist_pp.exec_name = dds_string_alloc(32);
|
||||
(void) snprintf (domain->gv.default_plist_pp.exec_name, 32, "CycloneDDS: %u", domain->gv.default_plist_pp.process_id);
|
||||
len = (uint32_t) (13 + strlen (domain->gv.default_plist_pp.exec_name));
|
||||
domain->gv.default_plist_pp.present |= PP_PRISMTECH_EXEC_NAME;
|
||||
if (ddsrt_gethostname (hostname, sizeof (hostname)) == DDS_RETCODE_OK)
|
||||
{
|
||||
gv.default_plist_pp.node_name = dds_string_dup (hostname);
|
||||
gv.default_plist_pp.present |= PP_PRISMTECH_NODE_NAME;
|
||||
domain->gv.default_plist_pp.node_name = dds_string_dup (hostname);
|
||||
domain->gv.default_plist_pp.present |= PP_PRISMTECH_NODE_NAME;
|
||||
}
|
||||
gv.default_plist_pp.entity_name = dds_alloc (len);
|
||||
(void) snprintf (gv.default_plist_pp.entity_name, len, "%s<%u>", progname, gv.default_plist_pp.process_id);
|
||||
gv.default_plist_pp.present |= PP_ENTITY_NAME;
|
||||
domain->gv.default_plist_pp.entity_name = dds_alloc (len);
|
||||
(void) snprintf (domain->gv.default_plist_pp.entity_name, len, "%s<%u>", progname, domain->gv.default_plist_pp.process_id);
|
||||
domain->gv.default_plist_pp.present |= PP_ENTITY_NAME;
|
||||
|
||||
return DDS_RETCODE_OK;
|
||||
|
||||
fail_threadmon_start:
|
||||
if (config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
ddsi_threadmon_stop (dds_global.threadmon);
|
||||
rtps_stop ();
|
||||
rtps_stop (&domain->gv);
|
||||
fail_rtps_start:
|
||||
rtps_fini ();
|
||||
rtps_fini (&domain->gv);
|
||||
fail_rtps_init:
|
||||
if (config.liveliness_monitoring)
|
||||
if (domain->gv.config.liveliness_monitoring)
|
||||
{
|
||||
if (--dds_global.threadmon_count == 0)
|
||||
{
|
||||
|
@ -181,12 +182,12 @@ fail_config:
|
|||
|
||||
static void dds_domain_fini (struct dds_domain *domain)
|
||||
{
|
||||
if (config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
|
||||
ddsi_threadmon_stop (dds_global.threadmon);
|
||||
rtps_stop ();
|
||||
rtps_stop (&domain->gv);
|
||||
dds__builtin_fini (domain);
|
||||
rtps_fini ();
|
||||
if (config.liveliness_monitoring)
|
||||
rtps_fini (&domain->gv);
|
||||
if (domain->gv.config.liveliness_monitoring)
|
||||
{
|
||||
if (--dds_global.threadmon_count == 0)
|
||||
{
|
||||
|
@ -259,15 +260,73 @@ void dds_domain_free (dds_domain *domain)
|
|||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
if (--domain->m_refc != 0)
|
||||
{
|
||||
//fprintf(stderr, "domain_free %d %p refcount > 0\n", (int)domain->m_id, domain);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
}
|
||||
else
|
||||
{
|
||||
//fprintf(stderr, "domain_free %d %p\n", (int)domain->m_id, domain);
|
||||
ddsrt_avl_delete (&dds_domaintree_def, &dds_global.m_domains, domain);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
dds_domain_fini (domain);
|
||||
dds_free (domain);
|
||||
}
|
||||
}
|
||||
|
||||
#include "dds__entity.h"
|
||||
static void pushdown_set_batch (struct dds_entity *e, bool enable)
|
||||
{
|
||||
/* e is pinned, no locks held */
|
||||
dds_instance_handle_t last_iid = 0;
|
||||
struct dds_entity *c;
|
||||
ddsrt_mutex_lock (&e->m_mutex);
|
||||
while ((c = ddsrt_avl_lookup_succ (&dds_entity_children_td, &e->m_children, &last_iid)) != NULL)
|
||||
{
|
||||
struct dds_entity *x;
|
||||
last_iid = c->m_iid;
|
||||
if (dds_entity_pin (c->m_hdllink.hdl, &x) < 0)
|
||||
continue;
|
||||
assert (x == c);
|
||||
ddsrt_mutex_unlock (&e->m_mutex);
|
||||
if (c->m_kind == DDS_KIND_PARTICIPANT)
|
||||
pushdown_set_batch (c, enable);
|
||||
else if (c->m_kind == DDS_KIND_WRITER)
|
||||
{
|
||||
struct dds_writer *w = (struct dds_writer *) c;
|
||||
w->whc_batch = enable;
|
||||
}
|
||||
ddsrt_mutex_lock (&e->m_mutex);
|
||||
dds_entity_unpin (c);
|
||||
}
|
||||
ddsrt_mutex_unlock (&e->m_mutex);
|
||||
}
|
||||
|
||||
void dds_write_set_batch (bool enable)
|
||||
{
|
||||
/* FIXME: get channels + latency budget working and get rid of this; in the mean time, any ugly hack will do. */
|
||||
struct dds_domain *dom;
|
||||
dds_domainid_t last_id = -1;
|
||||
dds_init ();
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
while ((dom = ddsrt_avl_lookup_succ (&dds_domaintree_def, &dds_global.m_domains, &last_id)) != NULL)
|
||||
{
|
||||
last_id = dom->m_id;
|
||||
dom->gv.config.whc_batch = enable;
|
||||
|
||||
dds_instance_handle_t last_iid = 0;
|
||||
struct dds_entity *e;
|
||||
while (dom && (e = ddsrt_avl_lookup_succ (&dds_entity_children_td, &dom->m_ppants, &last_iid)) != NULL)
|
||||
{
|
||||
struct dds_entity *x;
|
||||
last_iid = e->m_iid;
|
||||
if (dds_entity_pin (e->m_hdllink.hdl, &x) < 0)
|
||||
continue;
|
||||
assert (x == e);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
pushdown_set_batch (e, enable);
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
dds_entity_unpin (e);
|
||||
dom = ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &last_id);
|
||||
}
|
||||
}
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
dds_fini ();
|
||||
}
|
||||
|
|
|
@ -35,8 +35,6 @@
|
|||
#define DOMAIN_ID_MIN 0
|
||||
#define DOMAIN_ID_MAX 230
|
||||
|
||||
struct q_globals gv;
|
||||
|
||||
dds_globals dds_global;
|
||||
|
||||
dds_return_t dds_init (void)
|
||||
|
@ -57,7 +55,6 @@ dds_return_t dds_init (void)
|
|||
thread_states_init_static ();
|
||||
thread_states_init (64);
|
||||
upgrade_main_thread ();
|
||||
dds__builtin_init_global ();
|
||||
|
||||
if (dds_handle_server_init () != DDS_RETCODE_OK)
|
||||
{
|
||||
|
@ -85,7 +82,6 @@ extern void dds_fini (void)
|
|||
if (--dds_global.m_init_count == 0)
|
||||
{
|
||||
dds_handle_server_fini ();
|
||||
dds__builtin_fini_global ();
|
||||
downgrade_main_thread ();
|
||||
thread_states_fini ();
|
||||
ddsi_iid_fini ();
|
||||
|
|
|
@ -41,16 +41,16 @@ dds_return_t dds_dispose_ih (dds_entity_t writer, dds_instance_handle_t handle)
|
|||
static struct ddsi_tkmap_instance *dds_instance_find (const dds_topic *topic, const void *data, const bool create)
|
||||
{
|
||||
struct ddsi_serdata *sd = ddsi_serdata_from_sample (topic->m_stopic, SDK_KEY, data);
|
||||
struct ddsi_tkmap_instance *inst = ddsi_tkmap_find (sd, false, create);
|
||||
struct ddsi_tkmap_instance *inst = ddsi_tkmap_find (topic->m_entity.m_domain->gv.m_tkmap, sd, false, create);
|
||||
ddsi_serdata_unref (sd);
|
||||
return inst;
|
||||
}
|
||||
|
||||
static void dds_instance_remove (const dds_topic *topic, const void *data, dds_instance_handle_t handle)
|
||||
static void dds_instance_remove (struct dds_domain *dom, const dds_topic *topic, const void *data, dds_instance_handle_t handle)
|
||||
{
|
||||
struct ddsi_tkmap_instance *inst;
|
||||
if (handle != DDS_HANDLE_NIL)
|
||||
inst = ddsi_tkmap_find_by_id (gv.m_tkmap, handle);
|
||||
inst = ddsi_tkmap_find_by_id (dom->gv.m_tkmap, handle);
|
||||
else
|
||||
{
|
||||
assert (data);
|
||||
|
@ -58,7 +58,7 @@ static void dds_instance_remove (const dds_topic *topic, const void *data, dds_i
|
|||
}
|
||||
if (inst)
|
||||
{
|
||||
ddsi_tkmap_instance_unref (inst);
|
||||
ddsi_tkmap_instance_unref (dom->gv.m_tkmap, inst);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,7 +74,7 @@ dds_return_t dds_register_instance (dds_entity_t writer, dds_instance_handle_t *
|
|||
if ((ret = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
struct ddsi_tkmap_instance * const inst = dds_instance_find (wr->m_topic, data, true);
|
||||
if (inst == NULL)
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
|
@ -115,10 +115,10 @@ dds_return_t dds_unregister_instance_ts (dds_entity_t writer, const void *data,
|
|||
if (wr->m_entity.m_qos)
|
||||
dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose);
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
if (autodispose)
|
||||
{
|
||||
dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL);
|
||||
dds_instance_remove (wr->m_entity.m_domain, wr->m_topic, data, DDS_HANDLE_NIL);
|
||||
action |= DDS_WR_DISPOSE_BIT;
|
||||
}
|
||||
ret = dds_write_impl (wr, data, timestamp, action);
|
||||
|
@ -142,20 +142,20 @@ dds_return_t dds_unregister_instance_ih_ts (dds_entity_t writer, dds_instance_ha
|
|||
if (wr->m_entity.m_qos)
|
||||
dds_qget_writer_data_lifecycle (wr->m_entity.m_qos, &autodispose);
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
if (autodispose)
|
||||
{
|
||||
dds_instance_remove (wr->m_topic, NULL, handle);
|
||||
dds_instance_remove (wr->m_entity.m_domain, wr->m_topic, NULL, handle);
|
||||
action |= DDS_WR_DISPOSE_BIT;
|
||||
}
|
||||
if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle)) == NULL)
|
||||
if ((tk = ddsi_tkmap_find_by_id (wr->m_entity.m_domain->gv.m_tkmap, handle)) == NULL)
|
||||
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
else
|
||||
{
|
||||
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
|
||||
void *sample = ddsi_sertopic_alloc_sample (tp);
|
||||
ddsi_serdata_topicless_to_sample (tp, tk->m_sample, sample, NULL, NULL);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
ddsi_tkmap_instance_unref (wr->m_entity.m_domain->gv.m_tkmap, tk);
|
||||
ret = dds_write_impl (wr, sample, timestamp, action);
|
||||
ddsi_sertopic_free_sample (tp, sample, DDS_FREE_ALL);
|
||||
}
|
||||
|
@ -173,9 +173,9 @@ dds_return_t dds_writedispose_ts (dds_entity_t writer, const void *data, dds_tim
|
|||
if ((ret = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
if ((ret = dds_write_impl (wr, data, timestamp, DDS_WR_ACTION_WRITE_DISPOSE)) == DDS_RETCODE_OK)
|
||||
dds_instance_remove (wr->m_topic, data, DDS_HANDLE_NIL);
|
||||
dds_instance_remove (wr->m_entity.m_domain, wr->m_topic, data, DDS_HANDLE_NIL);
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock (wr);
|
||||
return ret;
|
||||
|
@ -188,7 +188,7 @@ static dds_return_t dds_dispose_impl (dds_writer *wr, const void *data, dds_inst
|
|||
dds_return_t ret;
|
||||
assert (thread_is_awake ());
|
||||
if ((ret = dds_write_impl (wr, data, timestamp, DDS_WR_ACTION_DISPOSE)) == DDS_RETCODE_OK)
|
||||
dds_instance_remove (wr->m_topic, data, handle);
|
||||
dds_instance_remove (wr->m_entity.m_domain, wr->m_topic, data, handle);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -204,7 +204,7 @@ dds_return_t dds_dispose_ts (dds_entity_t writer, const void *data, dds_time_t t
|
|||
if ((ret = dds_writer_lock (writer, &wr)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
ret = dds_dispose_impl (wr, data, DDS_HANDLE_NIL, timestamp);
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock(wr);
|
||||
|
@ -221,15 +221,15 @@ dds_return_t dds_dispose_ih_ts (dds_entity_t writer, dds_instance_handle_t handl
|
|||
return ret;
|
||||
|
||||
struct ddsi_tkmap_instance *tk;
|
||||
thread_state_awake (ts1);
|
||||
if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, handle)) == NULL)
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
if ((tk = ddsi_tkmap_find_by_id (wr->m_entity.m_domain->gv.m_tkmap, handle)) == NULL)
|
||||
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
else
|
||||
{
|
||||
struct ddsi_sertopic *tp = wr->m_topic->m_stopic;
|
||||
void *sample = ddsi_sertopic_alloc_sample (tp);
|
||||
ddsi_serdata_topicless_to_sample (tp, tk->m_sample, sample, NULL, NULL);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
ddsi_tkmap_instance_unref (wr->m_entity.m_domain->gv.m_tkmap, tk);
|
||||
ret = dds_dispose_impl (wr, sample, handle, timestamp);
|
||||
ddsi_sertopic_free_sample (tp, sample, DDS_FREE_ALL);
|
||||
}
|
||||
|
@ -264,9 +264,9 @@ dds_instance_handle_t dds_lookup_instance (dds_entity_t entity, const void *data
|
|||
return DDS_HANDLE_NIL;
|
||||
}
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, &w_or_r->m_domain->gv);
|
||||
sd = ddsi_serdata_from_sample (topic->m_stopic, SDK_KEY, data);
|
||||
ih = ddsi_tkmap_lookup (gv.m_tkmap, sd);
|
||||
ih = ddsi_tkmap_lookup (w_or_r->m_domain->gv.m_tkmap, sd);
|
||||
ddsi_serdata_unref (sd);
|
||||
thread_state_asleep (ts1);
|
||||
dds_entity_unlock (w_or_r);
|
||||
|
@ -308,14 +308,14 @@ dds_return_t dds_instance_get_key (dds_entity_t entity, dds_instance_handle_t ih
|
|||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
}
|
||||
|
||||
thread_state_awake (ts1);
|
||||
if ((tk = ddsi_tkmap_find_by_id (gv.m_tkmap, ih)) == NULL)
|
||||
thread_state_awake (ts1, &e->m_domain->gv);
|
||||
if ((tk = ddsi_tkmap_find_by_id (e->m_domain->gv.m_tkmap, ih)) == NULL)
|
||||
ret = DDS_RETCODE_BAD_PARAMETER;
|
||||
else
|
||||
{
|
||||
ddsi_sertopic_zero_sample (topic->m_stopic, data);
|
||||
ddsi_serdata_topicless_to_sample (topic->m_stopic, tk->m_sample, data, NULL, NULL);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
ddsi_tkmap_instance_unref (e->m_domain->gv.m_tkmap, tk);
|
||||
ret = DDS_RETCODE_OK;
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
|
|
|
@ -33,18 +33,19 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
|
|||
return rc;
|
||||
else
|
||||
{
|
||||
const struct ephash *gh = wr->m_entity.m_domain->gv.guid_hash;
|
||||
const int32_t nrds_max = (nrds > INT32_MAX) ? INT32_MAX : (int32_t) nrds;
|
||||
int32_t nrds_act = 0;
|
||||
ddsrt_avl_iter_t it;
|
||||
/* FIXME: this ought not be so tightly coupled to the lower layer */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
thread_state_awake (lookup_thread_state (), &wr->m_entity.m_domain->gv);
|
||||
ddsrt_mutex_lock (&wr->m_wr->e.lock);
|
||||
for (const struct wr_prd_match *m = ddsrt_avl_iter_first (&wr_readers_treedef, &wr->m_wr->readers, &it);
|
||||
m != NULL;
|
||||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct proxy_reader *prd;
|
||||
if ((prd = ephash_lookup_proxy_reader_guid (&m->prd_guid)) != NULL)
|
||||
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
|
||||
{
|
||||
if (nrds_act < nrds_max)
|
||||
rds[nrds_act] = prd->e.iid;
|
||||
|
@ -56,7 +57,7 @@ dds_return_t dds_get_matched_subscriptions (dds_entity_t writer, dds_instance_ha
|
|||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct reader *rd;
|
||||
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL)
|
||||
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
|
||||
{
|
||||
if (nrds_act < nrds_max)
|
||||
rds[nrds_act] = rd->e.iid;
|
||||
|
@ -80,18 +81,19 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
|
|||
return rc;
|
||||
else
|
||||
{
|
||||
const struct ephash *gh = rd->m_entity.m_domain->gv.guid_hash;
|
||||
const int32_t nwrs_max = (nwrs > INT32_MAX) ? INT32_MAX : (int32_t) nwrs;
|
||||
int32_t nwrs_act = 0;
|
||||
ddsrt_avl_iter_t it;
|
||||
/* FIXME: this ought not be so tightly coupled to the lower layer */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
thread_state_awake (lookup_thread_state (), &rd->m_entity.m_domain->gv);
|
||||
ddsrt_mutex_lock (&rd->m_rd->e.lock);
|
||||
for (const struct rd_pwr_match *m = ddsrt_avl_iter_first (&rd_writers_treedef, &rd->m_rd->writers, &it);
|
||||
m != NULL;
|
||||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct proxy_writer *pwr;
|
||||
if ((pwr = ephash_lookup_proxy_writer_guid (&m->pwr_guid)) != NULL)
|
||||
if ((pwr = ephash_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
|
||||
{
|
||||
if (nwrs_act < nwrs_max)
|
||||
wrs[nwrs_act] = pwr->e.iid;
|
||||
|
@ -103,7 +105,7 @@ dds_return_t dds_get_matched_publications (dds_entity_t reader, dds_instance_han
|
|||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct writer *wr;
|
||||
if ((wr = ephash_lookup_writer_guid (&m->wr_guid)) != NULL)
|
||||
if ((wr = ephash_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
|
||||
{
|
||||
if (nwrs_act < nwrs_max)
|
||||
wrs[nwrs_act] = wr->e.iid;
|
||||
|
@ -142,17 +144,18 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
|
|||
return NULL;
|
||||
else
|
||||
{
|
||||
const struct ephash *gh = wr->m_entity.m_domain->gv.guid_hash;
|
||||
dds_builtintopic_endpoint_t *ret = NULL;
|
||||
ddsrt_avl_iter_t it;
|
||||
/* FIXME: this ought not be so tightly coupled to the lower layer, and not be so inefficient besides */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
thread_state_awake (lookup_thread_state (), &wr->m_entity.m_domain->gv);
|
||||
ddsrt_mutex_lock (&wr->m_wr->e.lock);
|
||||
for (const struct wr_prd_match *m = ddsrt_avl_iter_first (&wr_readers_treedef, &wr->m_wr->readers, &it);
|
||||
m != NULL && ret == NULL;
|
||||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct proxy_reader *prd;
|
||||
if ((prd = ephash_lookup_proxy_reader_guid (&m->prd_guid)) != NULL)
|
||||
if ((prd = ephash_lookup_proxy_reader_guid (gh, &m->prd_guid)) != NULL)
|
||||
{
|
||||
if (prd->e.iid == ih)
|
||||
ret = make_builtintopic_endpoint (&prd->e.guid, &prd->c.proxypp->e.guid, prd->c.proxypp->e.iid, prd->c.xqos);
|
||||
|
@ -163,7 +166,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_subscription_data (dds_entity_t wri
|
|||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct reader *rd;
|
||||
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL)
|
||||
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
|
||||
{
|
||||
if (rd->e.iid == ih)
|
||||
ret = make_builtintopic_endpoint (&rd->e.guid, &rd->c.pp->e.guid, rd->c.pp->e.iid, rd->xqos);
|
||||
|
@ -184,17 +187,18 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
|
|||
return NULL;
|
||||
else
|
||||
{
|
||||
const struct ephash *gh = rd->m_entity.m_domain->gv.guid_hash;
|
||||
dds_builtintopic_endpoint_t *ret = NULL;
|
||||
ddsrt_avl_iter_t it;
|
||||
/* FIXME: this ought not be so tightly coupled to the lower layer, and not be so inefficient besides */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
thread_state_awake (lookup_thread_state (), &rd->m_entity.m_domain->gv);
|
||||
ddsrt_mutex_lock (&rd->m_rd->e.lock);
|
||||
for (const struct rd_pwr_match *m = ddsrt_avl_iter_first (&rd_writers_treedef, &rd->m_rd->writers, &it);
|
||||
m != NULL && ret == NULL;
|
||||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct proxy_writer *pwr;
|
||||
if ((pwr = ephash_lookup_proxy_writer_guid (&m->pwr_guid)) != NULL)
|
||||
if ((pwr = ephash_lookup_proxy_writer_guid (gh, &m->pwr_guid)) != NULL)
|
||||
{
|
||||
if (pwr->e.iid == ih)
|
||||
ret = make_builtintopic_endpoint (&pwr->e.guid, &pwr->c.proxypp->e.guid, pwr->c.proxypp->e.iid, pwr->c.xqos);
|
||||
|
@ -205,7 +209,7 @@ dds_builtintopic_endpoint_t *dds_get_matched_publication_data (dds_entity_t read
|
|||
m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct writer *wr;
|
||||
if ((wr = ephash_lookup_writer_guid (&m->wr_guid)) != NULL)
|
||||
if ((wr = ephash_lookup_writer_guid (gh, &m->wr_guid)) != NULL)
|
||||
{
|
||||
if (wr->e.iid == ih)
|
||||
ret = make_builtintopic_endpoint (&wr->e.guid, &wr->c.pp->e.guid, wr->c.pp->e.iid, wr->xqos);
|
||||
|
|
|
@ -36,26 +36,14 @@ static dds_return_t dds_participant_delete (dds_entity *e) ddsrt_nonnull_all;
|
|||
|
||||
static dds_return_t dds_participant_delete (dds_entity *e)
|
||||
{
|
||||
struct dds_domain *dom;
|
||||
dds_return_t ret;
|
||||
dds_entity *prev, *iter;
|
||||
assert (dds_entity_kind (e) == DDS_KIND_PARTICIPANT);
|
||||
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
if ((ret = delete_participant (&e->m_guid)) < 0)
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
if ((ret = delete_participant (&e->m_domain->gv, &e->m_guid)) < 0)
|
||||
DDS_ERROR ("dds_participant_delete: internal error %"PRId32"\n", ret);
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
dom = e->m_domain;
|
||||
for (iter = dom->ppants, prev = NULL; iter; prev = iter, iter = iter->m_next)
|
||||
{
|
||||
if (iter == e)
|
||||
break;
|
||||
}
|
||||
assert (iter);
|
||||
if (prev)
|
||||
prev->m_next = iter->m_next;
|
||||
else
|
||||
dom->ppants = iter->m_next;
|
||||
ddsrt_avl_delete (&dds_entity_children_td, &e->m_domain->m_ppants, e);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
|
@ -71,8 +59,8 @@ static dds_return_t dds_participant_qos_set (dds_entity *e, const dds_qos_t *qos
|
|||
if (enabled)
|
||||
{
|
||||
struct participant *pp;
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
if ((pp = ephash_lookup_participant_guid (&e->m_guid)) != NULL)
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
if ((pp = ephash_lookup_participant_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
|
||||
{
|
||||
nn_plist_t plist;
|
||||
nn_plist_init_empty (&plist);
|
||||
|
@ -111,7 +99,7 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
|
|||
new_qos = dds_create_qos ();
|
||||
if (qos != NULL)
|
||||
nn_xqos_mergein_missing (new_qos, qos, DDS_PARTICIPANT_QOS_MASK);
|
||||
nn_xqos_mergein_missing (new_qos, &gv.default_plist_pp.qos, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (new_qos, &dom->gv.default_plist_pp.qos, ~(uint64_t)0);
|
||||
if ((ret = nn_xqos_valid (new_qos)) < 0)
|
||||
goto err_qos_validation;
|
||||
|
||||
|
@ -119,8 +107,8 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
|
|||
nn_plist_init_empty (&plist);
|
||||
dds_merge_qos (&plist.qos, new_qos);
|
||||
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
ret = new_participant (&guid, 0, &plist);
|
||||
thread_state_awake (lookup_thread_state (), &dom->gv);
|
||||
ret = new_participant (&guid, &dom->gv, 0, &plist);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
nn_plist_fini (&plist);
|
||||
if (ret < 0)
|
||||
|
@ -134,14 +122,13 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
|
|||
goto err_entity_init;
|
||||
|
||||
pp->m_entity.m_guid = guid;
|
||||
pp->m_entity.m_iid = get_entity_instance_id (&guid);
|
||||
pp->m_entity.m_iid = get_entity_instance_id (&dom->gv, &guid);
|
||||
pp->m_entity.m_domain = dom;
|
||||
pp->m_builtin_subscriber = 0;
|
||||
|
||||
/* Add participant to extent */
|
||||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
pp->m_entity.m_next = pp->m_entity.m_domain->ppants;
|
||||
pp->m_entity.m_domain->ppants = &pp->m_entity;
|
||||
ddsrt_avl_insert (&dds_entity_children_td, &dom->m_ppants, &pp->m_entity);
|
||||
ddsrt_mutex_unlock (&dds_global.m_mutex);
|
||||
return ret;
|
||||
|
||||
|
@ -176,10 +163,11 @@ dds_entity_t dds_lookup_participant (dds_domainid_t domain_id, dds_entity_t *par
|
|||
ddsrt_mutex_lock (&dds_global.m_mutex);
|
||||
if ((dom = dds_domain_find_locked (domain_id)) != NULL)
|
||||
{
|
||||
for (dds_entity *iter = dom->ppants; iter; iter = iter->m_next)
|
||||
ddsrt_avl_iter_t it;
|
||||
for (dds_entity *e = ddsrt_avl_iter_first (&dds_entity_children_td, &dom->m_ppants, &it); e != NULL; e = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
if ((size_t) ret < size)
|
||||
participants[ret] = iter->m_hdllink.hdl;
|
||||
participants[ret] = e->m_hdllink.hdl;
|
||||
ret++;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,21 +52,19 @@ dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qo
|
|||
dds_qos_t *new_qos;
|
||||
dds_return_t ret;
|
||||
|
||||
if ((ret = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
|
||||
return ret;
|
||||
|
||||
new_qos = dds_create_qos ();
|
||||
if (qos)
|
||||
nn_xqos_mergein_missing (new_qos, qos, DDS_PUBLISHER_QOS_MASK);
|
||||
nn_xqos_mergein_missing (new_qos, &gv.default_xqos_pub, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (new_qos, &par->m_entity.m_domain->gv.default_xqos_pub, ~(uint64_t)0);
|
||||
if ((ret = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK)
|
||||
{
|
||||
dds_delete_qos (new_qos);
|
||||
dds_participant_unlock (par);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if ((ret = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
|
||||
{
|
||||
dds_delete_qos (new_qos);
|
||||
return ret;
|
||||
}
|
||||
pub = dds_alloc (sizeof (*pub));
|
||||
hdl = dds_entity_init (&pub->m_entity, &par->m_entity, DDS_KIND_PUBLISHER, new_qos, listener, DDS_PUBLISHER_STATUS_MASK);
|
||||
pub->m_entity.m_iid = ddsi_iid_gen ();
|
||||
|
|
|
@ -43,27 +43,27 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
|
|||
if (buf == NULL || si == NULL || maxs == 0 || bufsz == 0 || bufsz < maxs)
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
thread_state_awake (ts1);
|
||||
|
||||
if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) {
|
||||
goto fail_awake;
|
||||
goto fail;
|
||||
} else if (dds_entity_kind (entity) == DDS_KIND_READER) {
|
||||
rd = (dds_reader *) entity;
|
||||
cond = NULL;
|
||||
} else if (only_reader) {
|
||||
ret = DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
goto fail_awake_pinned;
|
||||
goto fail_pinned;
|
||||
} else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) {
|
||||
ret = DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
goto fail_awake_pinned;
|
||||
goto fail_pinned;
|
||||
} else {
|
||||
rd = (dds_reader *) entity->m_parent;
|
||||
cond = (dds_readcond *) entity;
|
||||
}
|
||||
|
||||
thread_state_awake (ts1, &entity->m_domain->gv);
|
||||
|
||||
if (hand != DDS_HANDLE_NIL)
|
||||
{
|
||||
if (ddsi_tkmap_find_by_id (gv.m_tkmap, hand) == NULL) {
|
||||
if (ddsi_tkmap_find_by_id (entity->m_domain->gv.m_tkmap, hand) == NULL) {
|
||||
ret = DDS_RETCODE_PRECONDITION_NOT_MET;
|
||||
goto fail_awake_pinned;
|
||||
}
|
||||
|
@ -144,9 +144,10 @@ static dds_return_t dds_read_impl (bool take, dds_entity_t reader_or_condition,
|
|||
#undef NC_RESET_BUF
|
||||
|
||||
fail_awake_pinned:
|
||||
dds_entity_unpin (entity);
|
||||
fail_awake:
|
||||
thread_state_asleep (ts1);
|
||||
fail_pinned:
|
||||
dds_entity_unpin (entity);
|
||||
fail:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -164,20 +165,19 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
|
|||
assert (maxs > 0);
|
||||
(void)take;
|
||||
|
||||
thread_state_awake (ts1);
|
||||
|
||||
if ((ret = dds_entity_pin (reader_or_condition, &entity)) < 0) {
|
||||
goto fail_awake;
|
||||
return ret;
|
||||
} else if (dds_entity_kind (entity) == DDS_KIND_READER) {
|
||||
rd = (dds_reader *) entity;
|
||||
} else if (dds_entity_kind (entity) != DDS_KIND_COND_READ && dds_entity_kind (entity) != DDS_KIND_COND_QUERY) {
|
||||
dds_entity_unpin (entity);
|
||||
ret = DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
goto fail_awake;
|
||||
return DDS_RETCODE_ILLEGAL_OPERATION;
|
||||
} else {
|
||||
rd = (dds_reader *) entity->m_parent;
|
||||
}
|
||||
|
||||
thread_state_awake (ts1, &entity->m_domain->gv);
|
||||
|
||||
/* read/take resets data available status -- must reset before reading because
|
||||
the actual writing is protected by RHC lock, not by rd->m_entity.m_lock */
|
||||
dds_entity_status_reset (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS);
|
||||
|
@ -188,8 +188,6 @@ static dds_return_t dds_readcdr_impl (bool take, dds_entity_t reader_or_conditio
|
|||
|
||||
ret = dds_rhc_takecdr (rd->m_rhc, lock, buf, si, maxs, mask & DDS_ANY_SAMPLE_STATE, mask & DDS_ANY_VIEW_STATE, mask & DDS_ANY_INSTANCE_STATE, hand);
|
||||
dds_entity_unpin (entity);
|
||||
|
||||
fail_awake:
|
||||
thread_state_asleep (ts1);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -45,8 +45,8 @@ static dds_return_t dds_reader_close (dds_entity *e) ddsrt_nonnull_all;
|
|||
static dds_return_t dds_reader_close (dds_entity *e)
|
||||
{
|
||||
dds_return_t ret = DDS_RETCODE_OK;
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
if (delete_reader (&e->m_guid) != 0)
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
if (delete_reader (&e->m_domain->gv, &e->m_guid) != 0)
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
return ret;
|
||||
|
@ -76,8 +76,8 @@ static dds_return_t dds_reader_qos_set (dds_entity *e, const dds_qos_t *qos, boo
|
|||
if (enabled)
|
||||
{
|
||||
struct reader *rd;
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
if ((rd = ephash_lookup_reader_guid (&e->m_guid)) != NULL)
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
if ((rd = ephash_lookup_reader_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
|
||||
update_reader_qos (rd, qos);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
}
|
||||
|
@ -347,6 +347,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
|
|||
goto err_tp_lock;
|
||||
}
|
||||
assert (tp->m_stopic);
|
||||
/* FIXME: domain check */
|
||||
assert (sub->m_entity.m_domain == tp->m_entity.m_domain);
|
||||
|
||||
/* Merge qos from topic and subscriber, dds_copy_qos only fails when it is passed a null
|
||||
|
@ -358,7 +359,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
|
|||
nn_xqos_mergein_missing (rqos, sub->m_entity.m_qos, ~(uint64_t)0);
|
||||
if (tp->m_entity.m_qos)
|
||||
nn_xqos_mergein_missing (rqos, tp->m_entity.m_qos, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (rqos, &gv.default_xqos_rd, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (rqos, &sub->m_entity.m_domain->gv.default_xqos_rd, ~(uint64_t)0);
|
||||
|
||||
if ((ret = nn_xqos_valid (rqos)) != DDS_RETCODE_OK)
|
||||
{
|
||||
|
@ -392,14 +393,14 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
|
|||
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
|
||||
ddsrt_mutex_unlock (&sub->m_entity.m_mutex);
|
||||
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
ret = new_reader (&rd->m_rd, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
|
||||
thread_state_awake (lookup_thread_state (), &sub->m_entity.m_domain->gv);
|
||||
ret = new_reader (&rd->m_rd, &rd->m_entity.m_domain->gv, &rd->m_entity.m_guid, NULL, &sub->m_entity.m_participant->m_guid, tp->m_stopic, rqos, &rd->m_rhc->common.rhc, dds_reader_status_cb, rd);
|
||||
ddsrt_mutex_lock (&sub->m_entity.m_mutex);
|
||||
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
|
||||
assert (ret == DDS_RETCODE_OK); /* FIXME: can be out-of-resources at the very least */
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
rd->m_entity.m_iid = get_entity_instance_id (&rd->m_entity.m_guid);
|
||||
rd->m_entity.m_iid = get_entity_instance_id (&rd->m_entity.m_domain->gv, &rd->m_entity.m_guid);
|
||||
dds_entity_register_child (&sub->m_entity, &rd->m_entity);
|
||||
|
||||
dds_topic_unlock (tp);
|
||||
|
@ -460,7 +461,7 @@ void dds_reader_ddsi2direct (dds_entity_t entity, ddsi2direct_directread_cb_t cb
|
|||
pwrguid_next.entityid.u = (pwrguid_next.entityid.u & ~(uint32_t)0xff) | NN_ENTITYID_KIND_WRITER_NO_KEY;
|
||||
}
|
||||
ddsrt_mutex_unlock (&rd->e.lock);
|
||||
if ((pwr = ephash_lookup_proxy_writer_guid (&pwrguid)) != NULL)
|
||||
if ((pwr = ephash_lookup_proxy_writer_guid (dds_entity->m_domain->gv.guid_hash, &pwrguid)) != NULL)
|
||||
{
|
||||
ddsrt_mutex_lock (&pwr->e.lock);
|
||||
pwr->ddsi2direct_cb = cb;
|
||||
|
|
|
@ -302,8 +302,10 @@ struct dds_rhc_default {
|
|||
bool by_source_ordering; /* true if BY_SOURCE, false if BY_RECEPTION */
|
||||
bool exclusive_ownership; /* true if EXCLUSIVE, false if SHARED */
|
||||
bool reliable; /* true if reliability RELIABLE */
|
||||
bool xchecks; /* whether to do expensive checking if checking at all */
|
||||
|
||||
dds_reader *reader; /* reader */
|
||||
dds_reader *reader; /* reader -- may be NULL (used by rhc_torture) */
|
||||
struct ddsi_tkmap *tkmap; /* back pointer to tkmap */
|
||||
const struct ddsi_sertopic *topic; /* topic description */
|
||||
uint32_t history_depth; /* depth, 1 for KEEP_LAST_1, 2**32-1 for KEEP_ALL */
|
||||
|
||||
|
@ -521,7 +523,7 @@ static void remove_inst_from_nonempty_list (struct dds_rhc_default *rhc, struct
|
|||
rhc->n_nonempty_instances--;
|
||||
}
|
||||
|
||||
struct dds_rhc *dds_rhc_default_new (dds_reader *reader, const struct ddsi_sertopic *topic)
|
||||
struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct ddsi_tkmap *tkmap, const struct ddsi_sertopic *topic, bool xchecks)
|
||||
{
|
||||
struct dds_rhc_default *rhc = ddsrt_malloc (sizeof (*rhc));
|
||||
memset (rhc, 0, sizeof (*rhc));
|
||||
|
@ -532,10 +534,17 @@ struct dds_rhc *dds_rhc_default_new (dds_reader *reader, const struct ddsi_serto
|
|||
rhc->instances = ddsrt_hh_new (1, instance_iid_hash, instance_iid_eq);
|
||||
rhc->topic = topic;
|
||||
rhc->reader = reader;
|
||||
rhc->tkmap = tkmap;
|
||||
rhc->xchecks = xchecks;
|
||||
|
||||
return &rhc->common;
|
||||
}
|
||||
|
||||
struct dds_rhc *dds_rhc_default_new (dds_reader *reader, const struct ddsi_sertopic *topic)
|
||||
{
|
||||
return dds_rhc_default_new_xchecks (reader, reader->m_entity.m_domain->gv.m_tkmap, topic, (reader->m_entity.m_domain->gv.config.enabled_xchecks & DDS_XCHECK_RHC) != 0);
|
||||
}
|
||||
|
||||
static void dds_rhc_default_set_qos (struct dds_rhc_default * rhc, const dds_qos_t * qos)
|
||||
{
|
||||
/* Set read related QoS */
|
||||
|
@ -635,10 +644,10 @@ static void inst_set_invsample (struct dds_rhc_default *rhc, struct rhc_instance
|
|||
}
|
||||
}
|
||||
|
||||
static void free_empty_instance (struct rhc_instance *inst)
|
||||
static void free_empty_instance (struct rhc_instance *inst, struct dds_rhc_default *rhc)
|
||||
{
|
||||
assert (inst_is_empty (inst));
|
||||
ddsi_tkmap_instance_unref (inst->tk);
|
||||
ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk);
|
||||
ddsrt_free (inst);
|
||||
}
|
||||
|
||||
|
@ -667,7 +676,7 @@ static void free_instance_rhc_free (struct rhc_instance *inst, struct dds_rhc_de
|
|||
{
|
||||
remove_inst_from_nonempty_list (rhc, inst);
|
||||
}
|
||||
ddsi_tkmap_instance_unref (inst->tk);
|
||||
ddsi_tkmap_instance_unref (rhc->tkmap, inst->tk);
|
||||
ddsrt_free (inst);
|
||||
}
|
||||
|
||||
|
@ -926,7 +935,7 @@ static void drop_instance_noupdate_no_writers (struct dds_rhc_default *rhc, stru
|
|||
assert (ret);
|
||||
(void) ret;
|
||||
|
||||
free_empty_instance (inst);
|
||||
free_empty_instance (inst, rhc);
|
||||
}
|
||||
|
||||
static void dds_rhc_register (struct dds_rhc_default *rhc, struct rhc_instance *inst, uint64_t wr_iid, bool iid_update)
|
||||
|
@ -1259,7 +1268,7 @@ static rhc_store_result_t rhc_store_new_instance (struct rhc_instance **out_inst
|
|||
{
|
||||
if (!add_sample (rhc, inst, pwr_info, sample, cb_data, trig_qc))
|
||||
{
|
||||
free_empty_instance (inst);
|
||||
free_empty_instance (inst, rhc);
|
||||
return RHC_REJECTED;
|
||||
}
|
||||
}
|
||||
|
@ -2638,7 +2647,7 @@ static int dds_rhc_default_takecdr (struct dds_rhc_default *rhc, bool lock, stru
|
|||
#define CHECK_MAX_CONDS 64
|
||||
static int rhc_check_counts_locked (struct dds_rhc_default *rhc, bool check_conds, bool check_qcmask)
|
||||
{
|
||||
if (!(config.enabled_xchecks & DDS_XCHECK_RHC))
|
||||
if (!rhc->xchecks)
|
||||
return 1;
|
||||
|
||||
const uint32_t ncheck = rhc->nconds < CHECK_MAX_CONDS ? rhc->nconds : CHECK_MAX_CONDS;
|
||||
|
|
|
@ -131,7 +131,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi
|
|||
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
|
||||
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
|
||||
/* keyhash must in host format (which the GUIDs always are internally) */
|
||||
const struct entity_common *entity = ephash_lookup_guid_untyped ((const nn_guid_t *) keyhash->value);
|
||||
const struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const nn_guid_t *) keyhash->value);
|
||||
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
|
||||
memcpy (&d->key, keyhash->value, sizeof (d->key));
|
||||
if (entity)
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
|
||||
/* FIXME: sertopic /= ddstopic so a lot of stuff needs to be moved here from dds_topic.c and the free function needs to be implemented properly */
|
||||
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename)
|
||||
struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic_type type, const char *name, const char *typename, struct q_globals *gv)
|
||||
{
|
||||
struct ddsi_sertopic_builtintopic *tp = ddsrt_malloc (sizeof (*tp));
|
||||
tp->c.iid = ddsi_iid_gen();
|
||||
|
@ -40,6 +40,7 @@ struct ddsi_sertopic *new_sertopic_builtintopic (enum ddsi_sertopic_builtintopic
|
|||
tp->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (tp->c.serdata_ops);
|
||||
ddsrt_atomic_st32 (&tp->c.refc, 1);
|
||||
tp->type = type;
|
||||
tp->gv = gv;
|
||||
return &tp->c;
|
||||
}
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_q
|
|||
new_qos = dds_create_qos ();
|
||||
if (qos)
|
||||
nn_xqos_mergein_missing (new_qos, qos, DDS_SUBSCRIBER_QOS_MASK);
|
||||
nn_xqos_mergein_missing (new_qos, &gv.default_xqos_sub, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (new_qos, &participant->m_entity.m_domain->gv.default_xqos_sub, ~(uint64_t)0);
|
||||
if ((ret = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK)
|
||||
{
|
||||
dds_delete_qos (new_qos);
|
||||
|
|
|
@ -426,8 +426,8 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
|
|||
top->m_stopic = sertopic;
|
||||
|
||||
/* Publish Topic */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
ddsi_pp = ephash_lookup_participant_guid (&par->m_entity.m_guid);
|
||||
thread_state_awake (lookup_thread_state (), &par->m_entity.m_domain->gv);
|
||||
ddsi_pp = ephash_lookup_participant_guid (par->m_entity.m_domain->gv.guid_hash, &par->m_entity.m_guid);
|
||||
assert (ddsi_pp);
|
||||
if (sedp_plist)
|
||||
{
|
||||
|
@ -461,10 +461,15 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
|
|||
nn_plist_t plist;
|
||||
dds_entity_t hdl;
|
||||
size_t keysz;
|
||||
struct dds_entity *ppent;
|
||||
dds_return_t ret;
|
||||
|
||||
if (desc == NULL || name == NULL || !is_valid_name (name))
|
||||
return DDS_RETCODE_BAD_PARAMETER;
|
||||
|
||||
if ((ret = dds_entity_pin (participant, &ppent)) < 0)
|
||||
return ret;
|
||||
|
||||
typename = desc->m_typename;
|
||||
keysz = strlen (name) + strlen (typename) + 2;
|
||||
key = dds_alloc (keysz);
|
||||
|
@ -481,7 +486,7 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
|
|||
st->c.serdata_ops = desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey;
|
||||
st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops);
|
||||
st->native_encoding_identifier = (DDSRT_ENDIAN == DDSRT_LITTLE_ENDIAN ? CDR_LE : CDR_BE);
|
||||
|
||||
st->serpool = ppent->m_domain->gv.serpool;
|
||||
st->type = (void*) desc;
|
||||
st->nkeys = desc->m_nkeys;
|
||||
st->keys = desc->m_keys;
|
||||
|
@ -513,6 +518,7 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
|
|||
|
||||
hdl = dds_create_topic_arbitrary (participant, &st->c, qos, listener, &plist);
|
||||
ddsi_sertopic_unref (&st->c);
|
||||
dds_entity_unpin (ppent);
|
||||
nn_plist_fini (&plist);
|
||||
return hdl;
|
||||
}
|
||||
|
|
|
@ -76,8 +76,11 @@ struct whc_impl {
|
|||
uint32_t seq_size;
|
||||
size_t unacked_bytes;
|
||||
size_t sample_overhead;
|
||||
uint32_t fragment_size;
|
||||
uint64_t total_bytes; /* total number of bytes pushed in */
|
||||
unsigned is_transient_local: 1;
|
||||
unsigned xchecks: 1;
|
||||
struct ddsi_tkmap *tkmap;
|
||||
uint32_t hdepth; /* 0 = unlimited */
|
||||
uint32_t tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */
|
||||
uint32_t idxdepth; /* = max (hdepth, tldepth) */
|
||||
|
@ -264,7 +267,7 @@ static void check_whc (const struct whc_impl *whc)
|
|||
assert (whc->maxseq_node == whc_findmax_procedurally (whc));
|
||||
|
||||
#if !defined (NDEBUG)
|
||||
if (config.enabled_xchecks & DDS_XCHECK_WHC)
|
||||
if (whc->xchecks)
|
||||
{
|
||||
struct whc_intvnode *firstintv;
|
||||
struct whc_node *cur;
|
||||
|
@ -332,7 +335,7 @@ static struct whc_node *whc_findkey (const struct whc_impl *whc, const struct dd
|
|||
} template;
|
||||
struct whc_idxnode *n;
|
||||
check_whc (whc);
|
||||
template.idxn.iid = ddsi_tkmap_lookup (gv.m_tkmap, serdata_key);
|
||||
template.idxn.iid = ddsi_tkmap_lookup (whc->tkmap, serdata_key);
|
||||
n = ddsrt_hh_lookup (whc->idx_hash, &template.idxn);
|
||||
if (n == NULL)
|
||||
return NULL;
|
||||
|
@ -343,7 +346,7 @@ static struct whc_node *whc_findkey (const struct whc_impl *whc, const struct dd
|
|||
}
|
||||
}
|
||||
|
||||
struct whc *whc_new (int is_transient_local, uint32_t hdepth, uint32_t tldepth)
|
||||
struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdepth, uint32_t tldepth)
|
||||
{
|
||||
size_t sample_overhead = 80; /* INFO_TS, DATA (estimate), inline QoS */
|
||||
struct whc_impl *whc;
|
||||
|
@ -355,6 +358,8 @@ struct whc *whc_new (int is_transient_local, uint32_t hdepth, uint32_t tldepth)
|
|||
whc->common.ops = &whc_ops;
|
||||
ddsrt_mutex_init (&whc->lock);
|
||||
whc->is_transient_local = is_transient_local ? 1 : 0;
|
||||
whc->xchecks = (gv->config.enabled_xchecks & DDS_XCHECK_WHC) != 0;
|
||||
whc->tkmap = gv->m_tkmap;
|
||||
whc->hdepth = hdepth;
|
||||
whc->tldepth = tldepth;
|
||||
whc->idxdepth = hdepth > tldepth ? hdepth : tldepth;
|
||||
|
@ -363,6 +368,7 @@ struct whc *whc_new (int is_transient_local, uint32_t hdepth, uint32_t tldepth)
|
|||
whc->unacked_bytes = 0;
|
||||
whc->total_bytes = 0;
|
||||
whc->sample_overhead = sample_overhead;
|
||||
whc->fragment_size = gv->config.fragment_size;
|
||||
#if USE_EHH
|
||||
whc->seq_hash = ddsrt_ehh_new (sizeof (struct whc_seq_entry), 32, whc_seq_entry_hash, whc_seq_entry_eq);
|
||||
#else
|
||||
|
@ -550,7 +556,7 @@ static void delete_one_sample_from_idx (struct whc_impl *whc, struct whc_node *w
|
|||
#endif
|
||||
if (!ddsrt_hh_remove (whc->idx_hash, idxn))
|
||||
assert (0);
|
||||
ddsi_tkmap_instance_unref (idxn->tk);
|
||||
ddsi_tkmap_instance_unref (whc->tkmap, idxn->tk);
|
||||
ddsrt_free (idxn);
|
||||
}
|
||||
whcn->idxnode = NULL;
|
||||
|
@ -647,7 +653,7 @@ static uint32_t whc_default_downgrade_to_volatile (struct whc *whc_generic, stru
|
|||
static size_t whcn_size (const struct whc_impl *whc, const struct whc_node *whcn)
|
||||
{
|
||||
size_t sz = ddsi_serdata_size (whcn->serdata);
|
||||
return sz + ((sz + config.fragment_size - 1) / config.fragment_size) * whc->sample_overhead;
|
||||
return sz + ((sz + whc->fragment_size - 1) / whc->fragment_size) * whc->sample_overhead;
|
||||
}
|
||||
|
||||
static void whc_delete_one_intv (struct whc_impl *whc, struct whc_intvnode **p_intv, struct whc_node **p_whcn)
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
struct bwhc {
|
||||
struct whc common;
|
||||
enum ddsi_sertopic_builtintopic_type type;
|
||||
const struct ephash *guid_hash;
|
||||
};
|
||||
|
||||
enum bwhc_iter_state {
|
||||
|
@ -63,7 +64,7 @@ static void bwhc_sample_iter_init (const struct whc *whc_generic, struct whc_sam
|
|||
static bool is_visible (const struct entity_common *e)
|
||||
{
|
||||
const nn_vendorid_t vendorid = get_entity_vendorid (e);
|
||||
return builtintopic_is_visible (gv.builtin_topic_interface, &e->guid, vendorid);
|
||||
return builtintopic_is_visible (e->gv->builtin_topic_interface, &e->guid, vendorid);
|
||||
}
|
||||
|
||||
static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, struct whc_borrowed_sample *sample)
|
||||
|
@ -91,7 +92,7 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
|
|||
case DSBT_READER: kind = EK_READER; break;
|
||||
}
|
||||
assert (whc->type == DSBT_PARTICIPANT || kind != EK_PARTICIPANT);
|
||||
ephash_enum_init (&it->it, kind);
|
||||
ephash_enum_init (&it->it, whc->guid_hash, kind);
|
||||
it->st = BIS_LOCAL;
|
||||
/* FALLS THROUGH */
|
||||
case BIS_LOCAL:
|
||||
|
@ -114,7 +115,7 @@ static bool bwhc_sample_iter_borrow_next (struct whc_sample_iter *opaque_it, str
|
|||
case DSBT_READER: kind = EK_PROXY_READER; break;
|
||||
}
|
||||
assert (kind != EK_PARTICIPANT);
|
||||
ephash_enum_init (&it->it, kind);
|
||||
ephash_enum_init (&it->it, whc->guid_hash, kind);
|
||||
it->st = BIS_PROXY;
|
||||
/* FALLS THROUGH */
|
||||
case BIS_PROXY:
|
||||
|
@ -191,10 +192,11 @@ static const struct whc_ops bwhc_ops = {
|
|||
.free = bwhc_free
|
||||
};
|
||||
|
||||
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type)
|
||||
struct whc *builtintopic_whc_new (enum ddsi_sertopic_builtintopic_type type, const struct ephash *guid_hash)
|
||||
{
|
||||
struct bwhc *whc = ddsrt_malloc (sizeof (*whc));
|
||||
whc->common.ops = &bwhc_ops;
|
||||
whc->type = type;
|
||||
whc->guid_hash = guid_hash;
|
||||
return (struct whc *) whc;
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "dds/ddsi/q_config.h"
|
||||
#include "dds/ddsi/q_entity.h"
|
||||
#include "dds/ddsi/q_radmin.h"
|
||||
#include "dds/ddsi/q_globals.h"
|
||||
|
||||
dds_return_t dds_write (dds_entity_t writer, const void *data)
|
||||
{
|
||||
|
@ -121,6 +122,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
|
|||
ddsrt_avl_iter_t it;
|
||||
struct pwr_rd_match *m;
|
||||
struct proxy_writer_info pwr_info;
|
||||
const struct ephash *gh = wr->e.gv->guid_hash;
|
||||
dds_duration_t max_block_ms = wr->xqos->reliability.max_blocking_time;
|
||||
ddsrt_mutex_unlock (&wr->rdary.rdary_lock);
|
||||
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
|
||||
|
@ -128,7 +130,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
|
|||
for (m = ddsrt_avl_iter_first (&wr_local_readers_treedef, &wr->local_readers, &it); m != NULL; m = ddsrt_avl_iter_next (&it))
|
||||
{
|
||||
struct reader *rd;
|
||||
if ((rd = ephash_lookup_reader_guid (&m->rd_guid)) != NULL)
|
||||
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
|
||||
{
|
||||
DDS_TRACE ("reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
|
||||
/* Copied the return value ignore from DDSI deliver_user_data () function. */
|
||||
|
@ -159,7 +161,7 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
|
|||
if (! wr->m_topic->filter_fn (data, wr->m_topic->filter_ctx))
|
||||
return DDS_RETCODE_OK;
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
|
||||
/* Serialize and write data or key */
|
||||
d = ddsi_serdata_from_sample (ddsi_wr->topic, writekey ? SDK_KEY : SDK_DATA, data);
|
||||
|
@ -167,12 +169,12 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
|
|||
((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0));
|
||||
d->timestamp.v = tstamp;
|
||||
ddsi_serdata_ref (d);
|
||||
tk = ddsi_tkmap_lookup_instance_ref (d);
|
||||
tk = ddsi_tkmap_lookup_instance_ref (wr->m_entity.m_domain->gv.m_tkmap, d);
|
||||
w_rc = write_sample_gc (ts1, wr->m_xp, ddsi_wr, d, tk);
|
||||
|
||||
if (w_rc >= 0) {
|
||||
/* Flush out write unless configured to batch */
|
||||
if (!config.whc_batch)
|
||||
if (!wr->whc_batch)
|
||||
nn_xpack_send (wr->m_xp, false);
|
||||
ret = DDS_RETCODE_OK;
|
||||
} else if (w_rc == DDS_RETCODE_TIMEOUT) {
|
||||
|
@ -185,25 +187,25 @@ dds_return_t dds_write_impl (dds_writer *wr, const void * data, dds_time_t tstam
|
|||
if (ret == DDS_RETCODE_OK)
|
||||
ret = deliver_locally (ddsi_wr, d, tk);
|
||||
ddsi_serdata_unref (d);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
ddsi_tkmap_instance_unref (wr->m_entity.m_domain->gv.m_tkmap, tk);
|
||||
thread_state_asleep (ts1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d)
|
||||
dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack *xp, struct ddsi_serdata *d, bool flush)
|
||||
{
|
||||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
struct ddsi_tkmap_instance * tk;
|
||||
int ret = DDS_RETCODE_OK;
|
||||
int w_rc;
|
||||
|
||||
thread_state_awake (ts1);
|
||||
thread_state_awake (ts1, ddsi_wr->e.gv);
|
||||
ddsi_serdata_ref (d);
|
||||
tk = ddsi_tkmap_lookup_instance_ref (d);
|
||||
tk = ddsi_tkmap_lookup_instance_ref (ddsi_wr->e.gv->m_tkmap, d);
|
||||
w_rc = write_sample_gc (ts1, xp, ddsi_wr, d, tk);
|
||||
if (w_rc >= 0) {
|
||||
/* Flush out write unless configured to batch */
|
||||
if (!config.whc_batch && xp != NULL)
|
||||
if (flush && xp != NULL)
|
||||
nn_xpack_send (xp, false);
|
||||
ret = DDS_RETCODE_OK;
|
||||
} else if (w_rc == DDS_RETCODE_TIMEOUT) {
|
||||
|
@ -217,7 +219,7 @@ dds_return_t dds_writecdr_impl_lowlevel (struct writer *ddsi_wr, struct nn_xpack
|
|||
if (ret == DDS_RETCODE_OK)
|
||||
ret = deliver_locally (ddsi_wr, d, tk);
|
||||
ddsi_serdata_unref (d);
|
||||
ddsi_tkmap_instance_unref (tk);
|
||||
ddsi_tkmap_instance_unref (ddsi_wr->e.gv->m_tkmap, tk);
|
||||
thread_state_asleep (ts1);
|
||||
return ret;
|
||||
}
|
||||
|
@ -230,12 +232,7 @@ dds_return_t dds_writecdr_impl (dds_writer *wr, struct ddsi_serdata *d, dds_time
|
|||
d->statusinfo = (((action & DDS_WR_DISPOSE_BIT) ? NN_STATUSINFO_DISPOSE : 0) |
|
||||
((action & DDS_WR_UNREGISTER_BIT) ? NN_STATUSINFO_UNREGISTER : 0));
|
||||
d->timestamp.v = tstamp;
|
||||
return dds_writecdr_impl_lowlevel (wr->m_wr, wr->m_xp, d);
|
||||
}
|
||||
|
||||
void dds_write_set_batch (bool enable)
|
||||
{
|
||||
config.whc_batch = enable ? 1 : 0;
|
||||
return dds_writecdr_impl_lowlevel (wr->m_wr, wr->m_xp, d, !wr->whc_batch);
|
||||
}
|
||||
|
||||
void dds_write_flush (dds_entity_t writer)
|
||||
|
@ -243,11 +240,11 @@ void dds_write_flush (dds_entity_t writer)
|
|||
struct thread_state1 * const ts1 = lookup_thread_state ();
|
||||
dds_writer *wr;
|
||||
dds_return_t rc;
|
||||
thread_state_awake (ts1);
|
||||
if ((rc = dds_writer_lock (writer, &wr)) == DDS_RETCODE_OK)
|
||||
{
|
||||
thread_state_awake (ts1, &wr->m_entity.m_domain->gv);
|
||||
nn_xpack_send (wr->m_xp, true);
|
||||
thread_state_asleep (ts1);
|
||||
dds_writer_unlock (wr);
|
||||
}
|
||||
thread_state_asleep (ts1);
|
||||
}
|
||||
|
|
|
@ -172,9 +172,9 @@ static dds_return_t dds_writer_close (dds_entity *e)
|
|||
{
|
||||
dds_writer * const wr = (dds_writer *) e;
|
||||
dds_return_t ret;
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
nn_xpack_send (wr->m_xp, false);
|
||||
if ((ret = delete_writer (&e->m_guid)) < 0)
|
||||
if ((ret = delete_writer (&e->m_domain->gv, &e->m_guid)) < 0)
|
||||
ret = DDS_RETCODE_ERROR;
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
return ret;
|
||||
|
@ -187,7 +187,7 @@ static dds_return_t dds_writer_delete (dds_entity *e)
|
|||
dds_writer * const wr = (dds_writer *) e;
|
||||
dds_return_t ret;
|
||||
/* FIXME: not freeing WHC here because it is owned by the DDSI entity */
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
nn_xpack_free (wr->m_xp);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
if ((ret = dds_delete (wr->m_topic->m_entity.m_hdllink.hdl)) == DDS_RETCODE_OK)
|
||||
|
@ -205,15 +205,15 @@ static dds_return_t dds_writer_qos_set (dds_entity *e, const dds_qos_t *qos, boo
|
|||
if (enabled)
|
||||
{
|
||||
struct writer *wr;
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
if ((wr = ephash_lookup_writer_guid (&e->m_guid)) != NULL)
|
||||
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
|
||||
if ((wr = ephash_lookup_writer_guid (e->m_domain->gv.guid_hash, &e->m_guid)) != NULL)
|
||||
update_writer_qos (wr, qos);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
}
|
||||
return DDS_RETCODE_OK;
|
||||
}
|
||||
|
||||
static struct whc *make_whc (const dds_qos_t *qos)
|
||||
static struct whc *make_whc (struct dds_domain *dom, const dds_qos_t *qos)
|
||||
{
|
||||
bool handle_as_transient_local;
|
||||
uint32_t hdepth, tldepth;
|
||||
|
@ -234,7 +234,7 @@ static struct whc *make_whc (const dds_qos_t *qos)
|
|||
else
|
||||
tldepth = (unsigned) qos->durability_service.history.depth;
|
||||
}
|
||||
return whc_new (handle_as_transient_local, hdepth, tldepth);
|
||||
return whc_new (&dom->gv, handle_as_transient_local, hdepth, tldepth);
|
||||
}
|
||||
|
||||
const struct dds_entity_deriver dds_entity_deriver_writer = {
|
||||
|
@ -253,7 +253,6 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
dds_publisher *pub = NULL;
|
||||
dds_topic *tp;
|
||||
dds_entity_t publisher;
|
||||
ddsi_tran_conn_t conn = gv.data_conn_uc;
|
||||
|
||||
{
|
||||
dds_entity *p_or_p;
|
||||
|
@ -269,6 +268,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
if ((rc = dds_publisher_lock (publisher, &pub)) != DDS_RETCODE_OK)
|
||||
return rc;
|
||||
|
||||
ddsi_tran_conn_t conn = pub->m_entity.m_domain->gv.data_conn_uc;
|
||||
if (publisher != participant_or_publisher)
|
||||
pub->m_entity.m_flags |= DDS_ENTITY_IMPLICIT;
|
||||
|
||||
|
@ -286,7 +286,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
nn_xqos_mergein_missing (wqos, pub->m_entity.m_qos, ~(uint64_t)0);
|
||||
if (tp->m_entity.m_qos)
|
||||
nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (wqos, &gv.default_xqos_wr, ~(uint64_t)0);
|
||||
nn_xqos_mergein_missing (wqos, &pub->m_entity.m_domain->gv.default_xqos_wr, ~(uint64_t)0);
|
||||
|
||||
if ((rc = nn_xqos_valid (wqos)) < 0)
|
||||
{
|
||||
|
@ -300,8 +300,9 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
|
||||
wr->m_topic = tp;
|
||||
dds_entity_add_ref_locked (&tp->m_entity);
|
||||
wr->m_xp = nn_xpack_new (conn, get_bandwidth_limit (wqos->transport_priority), config.xpack_send_async);
|
||||
wr->m_whc = make_whc (wqos);
|
||||
wr->m_xp = nn_xpack_new (conn, get_bandwidth_limit (wqos->transport_priority), pub->m_entity.m_domain->gv.config.xpack_send_async);
|
||||
wr->m_whc = make_whc (pub->m_entity.m_domain, wqos);
|
||||
wr->whc_batch = pub->m_entity.m_domain->gv.config.whc_batch;
|
||||
|
||||
/* Extra claim of this writer to make sure that the delete waits until DDSI
|
||||
* has deleted its writer as well. This can be known through the callback. */
|
||||
|
@ -310,14 +311,14 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
|
|||
ddsrt_mutex_unlock (&tp->m_entity.m_mutex);
|
||||
ddsrt_mutex_unlock (&pub->m_entity.m_mutex);
|
||||
|
||||
thread_state_awake (lookup_thread_state ());
|
||||
rc = new_writer (&wr->m_wr, &wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
|
||||
thread_state_awake (lookup_thread_state (), &pub->m_entity.m_domain->gv);
|
||||
rc = new_writer (&wr->m_wr, &wr->m_entity.m_domain->gv, &wr->m_entity.m_guid, NULL, &pub->m_entity.m_participant->m_guid, tp->m_stopic, wqos, wr->m_whc, dds_writer_status_cb, wr);
|
||||
ddsrt_mutex_lock (&pub->m_entity.m_mutex);
|
||||
ddsrt_mutex_lock (&tp->m_entity.m_mutex);
|
||||
assert(rc == DDS_RETCODE_OK);
|
||||
thread_state_asleep (lookup_thread_state ());
|
||||
|
||||
wr->m_entity.m_iid = get_entity_instance_id (&wr->m_entity.m_guid);
|
||||
wr->m_entity.m_iid = get_entity_instance_id (&wr->m_entity.m_domain->gv, &wr->m_entity.m_guid);
|
||||
dds_entity_register_child (&pub->m_entity, &wr->m_entity);
|
||||
|
||||
dds_topic_unlock (tp);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue