Make logging config per-domain

Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
Erik Boasson 2019-07-17 22:03:19 +02:00 committed by eboasson
parent 7190bb3d3e
commit 966ec0dda7
70 changed files with 2052 additions and 1718 deletions

View file

@ -92,7 +92,7 @@ dds_topic_descriptor_t;
#define DDS_ANY_STATE (DDS_ANY_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE)
#define DDS_DOMAIN_DEFAULT -1
#define DDS_DOMAIN_DEFAULT ((uint32_t) 0xffffffffu)
#define DDS_HANDLE_NIL 0
#define DDS_ENTITY_NIL 0
@ -113,7 +113,7 @@ typedef enum dds_entity_kind
/* Handles are opaque pointers to implementation types */
typedef uint64_t dds_instance_handle_t;
typedef int32_t dds_domainid_t;
typedef uint32_t dds_domainid_t;
/* Topic encoding instruction types */

View file

@ -19,8 +19,9 @@ extern "C" {
struct dds_rhc;
struct dds_reader;
struct ddsi_sertopic;
struct q_globals;
DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct ddsi_tkmap *tkmap, const struct ddsi_sertopic *topic, bool xchecks);
DDS_EXPORT struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks);
DDS_EXPORT struct dds_rhc *dds_rhc_default_new (struct dds_reader *reader, const struct ddsi_sertopic *topic);
#if defined (__cplusplus)

View file

@ -315,7 +315,6 @@ typedef struct dds_globals {
int32_t m_init_count;
ddsrt_avl_tree_t m_domains;
ddsrt_mutex_t m_mutex;
uint32_t threadmon_count;
struct ddsi_threadmon *threadmon;
} dds_globals;

View file

@ -87,7 +87,7 @@ void dds_string_free (char * str)
dds_free (str);
}
void dds_sample_free_contents (char * data, const uint32_t * ops)
void dds_sample_free_contents (char *data, const uint32_t * ops)
{
uint32_t op;
uint32_t type;

View file

@ -172,7 +172,7 @@ static struct ddsi_tkmap_instance *dds__builtin_get_tkmap_entry (const struct nn
memcpy (&kh, guid, sizeof (kh));
/* any random builtin topic will do (provided it has a GUID for a key), because what matters is the "class" of the topic, not the actual topic; also, this is called early in the initialisation of the entity with this GUID, which simply causes serdata_from_keyhash to create a key-only serdata because the key lookup fails. */
sd = ddsi_serdata_from_keyhash (domain->builtin_participant_topic, &kh);
tk = ddsi_tkmap_find (domain->gv.m_tkmap, sd, false, true);
tk = ddsi_tkmap_find (domain->gv.m_tkmap, sd, true);
ddsi_serdata_unref (sd);
return tk;
}

View file

@ -52,10 +52,10 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
domain->gv.tstart = now ();
(void) ddsrt_getenv ("CYCLONEDDS_URI", &uri);
domain->cfgst = config_init (uri, &domain->gv.config);
domain->cfgst = config_init (uri, &domain->gv.config, domain_id);
if (domain->cfgst == NULL)
{
DDS_LOG (DDS_LC_CONFIG, "Failed to parse configuration XML file %s\n", uri);
DDS_ILOG (DDS_LC_CONFIG, domain_id, "Failed to parse configuration XML file %s\n", uri);
ret = DDS_RETCODE_ERROR;
goto fail_config;
}
@ -63,9 +63,9 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
/* if a domain id was explicitly given, check & fix up the configuration */
if (domain_id != DDS_DOMAIN_DEFAULT)
{
if (domain_id < 0 || domain_id > 230)
if (domain_id > 230)
{
DDS_ERROR ("requested domain id %"PRId32" is out of range\n", domain_id);
DDS_ILOG (DDS_LC_ERROR, domain_id, "requested domain id %"PRIu32" is out of range\n", domain_id);
ret = DDS_RETCODE_ERROR;
goto fail_config_domainid;
}
@ -75,7 +75,7 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
}
else if (domain_id != domain->gv.config.domainId.value)
{
DDS_ERROR ("requested domain id %"PRId32" is inconsistent with configured value %"PRId32"\n", domain_id, domain->gv.config.domainId.value);
DDS_ILOG (DDS_LC_ERROR, domain_id, "requested domain id %"PRIu32" is inconsistent with configured value %"PRIu32"\n", domain_id, domain->gv.config.domainId.value);
ret = DDS_RETCODE_ERROR;
goto fail_config_domainid;
}
@ -87,33 +87,42 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
if (rtps_config_prep (&domain->gv, domain->cfgst) != 0)
{
DDS_LOG (DDS_LC_CONFIG, "Failed to configure RTPS\n");
DDS_ILOG (DDS_LC_CONFIG, domain->m_id, "Failed to configure RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_config;
}
/* Start monitoring the liveliness of all threads. */
if (rtps_init (&domain->gv) < 0)
{
DDS_ILOG (DDS_LC_CONFIG, domain->m_id, "Failed to initialize RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_init;
}
/* Start monitoring the liveliness of threads if this is the first
domain to configured to do so. */
if (domain->gv.config.liveliness_monitoring)
{
if (++dds_global.threadmon_count == 0)
if (dds_global.threadmon_count++ == 0)
{
dds_global.threadmon = ddsi_threadmon_new (domain->gv.config.liveliness_monitoring_interval, domain->gv.config.noprogress_log_stacktraces);
/* FIXME: configure settings */
dds_global.threadmon = ddsi_threadmon_new (DDS_MSECS (333), true);
if (dds_global.threadmon == NULL)
{
DDS_ERROR ("Failed to create a thread liveliness monitor\n");
DDS_ILOG (DDS_LC_CONFIG, domain->m_id, "Failed to create a thread liveliness monitor\n");
ret = DDS_RETCODE_OUT_OF_RESOURCES;
goto fail_threadmon_new;
}
/* FIXME: thread properties */
if (ddsi_threadmon_start (dds_global.threadmon, "threadmon") < 0)
{
DDS_ILOG (DDS_LC_ERROR, domain->m_id, "Failed to start the thread liveliness monitor\n");
ret = DDS_RETCODE_ERROR;
goto fail_threadmon_start;
}
}
}
if (rtps_init (&domain->gv) < 0)
{
DDS_LOG (DDS_LC_CONFIG, "Failed to initialize RTPS\n");
ret = DDS_RETCODE_ERROR;
goto fail_rtps_init;
}
dds__builtin_init (domain);
if (rtps_start (&domain->gv) < 0)
@ -123,17 +132,6 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
goto fail_rtps_start;
}
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
{
const char *name = "threadmon";
if (ddsi_threadmon_start (dds_global.threadmon, name, lookup_thread_properties (&domain->gv.config, name)) < 0)
{
DDS_ERROR ("Failed to start the thread liveliness monitor\n");
ret = DDS_RETCODE_ERROR;
goto fail_threadmon_start;
}
}
/* Set additional default participant properties */
char progname[50] = "UNKNOWN"; /* FIXME: once retrieving process names is back in */
@ -153,26 +151,23 @@ static dds_return_t dds_domain_init (dds_domain *domain, dds_domainid_t domain_i
(void) snprintf (domain->gv.default_plist_pp.entity_name, len, "%s<%u>", progname, domain->gv.default_plist_pp.process_id);
domain->gv.default_plist_pp.present |= PP_ENTITY_NAME;
if (domain->gv.config.liveliness_monitoring)
ddsi_threadmon_register_domain (dds_global.threadmon, &domain->gv);
return DDS_RETCODE_OK;
fail_threadmon_start:
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
ddsi_threadmon_stop (dds_global.threadmon);
rtps_stop (&domain->gv);
fail_rtps_start:
rtps_fini (&domain->gv);
fail_rtps_init:
if (domain->gv.config.liveliness_monitoring)
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
ddsi_threadmon_stop (dds_global.threadmon);
fail_threadmon_start:
if (domain->gv.config.liveliness_monitoring && --dds_global.threadmon_count == 0)
{
if (--dds_global.threadmon_count == 0)
{
ddsi_threadmon_free (dds_global.threadmon);
dds_global.threadmon = NULL;
}
ddsi_threadmon_free (dds_global.threadmon);
dds_global.threadmon = NULL;
}
fail_threadmon_new:
downgrade_main_thread ();
thread_states_fini();
rtps_fini (&domain->gv);
fail_rtps_init:
fail_rtps_config:
fail_config_domainid:
config_fini (domain->cfgst);
@ -182,19 +177,22 @@ fail_config:
static void dds_domain_fini (struct dds_domain *domain)
{
if (domain->gv.config.liveliness_monitoring && dds_global.threadmon_count == 1)
ddsi_threadmon_stop (dds_global.threadmon);
rtps_stop (&domain->gv);
dds__builtin_fini (domain);
rtps_fini (&domain->gv);
if (domain->gv.config.liveliness_monitoring)
ddsi_threadmon_unregister_domain (dds_global.threadmon, &domain->gv);
rtps_fini (&domain->gv);
ddsrt_mutex_lock (&dds_global.m_mutex);
if (domain->gv.config.liveliness_monitoring && --dds_global.threadmon_count == 0)
{
if (--dds_global.threadmon_count == 0)
{
ddsi_threadmon_free (dds_global.threadmon);
dds_global.threadmon = NULL;
}
ddsi_threadmon_stop (dds_global.threadmon);
ddsi_threadmon_free (dds_global.threadmon);
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
config_fini (domain->cfgst);
}
@ -208,9 +206,6 @@ dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id)
struct dds_domain *dom = NULL;
dds_return_t ret;
if (id != DDS_DOMAIN_DEFAULT && (id < 0 || id > 230))
return DDS_RETCODE_BAD_PARAMETER;
ddsrt_mutex_lock (&dds_global.m_mutex);
/* FIXME: hack around default domain ids, not yet being able to handle multiple domains simultaneously */
@ -246,7 +241,7 @@ dds_return_t dds_domain_create (dds_domain **domain_out, dds_domainid_t id)
}
break;
case DDS_RETCODE_PRECONDITION_NOT_MET:
DDS_ERROR("Inconsistent domain configuration detected: domain on configuration: %"PRId32", domain %"PRId32"\n", dom->m_id, id);
DDS_ILOG (DDS_LC_ERROR, id, "Inconsistent domain configuration detected: domain on configuration: %"PRIu32", domain %"PRIu32"\n", dom->m_id, id);
break;
}
ddsrt_mutex_unlock (&dds_global.m_mutex);
@ -301,12 +296,14 @@ void dds_write_set_batch (bool enable)
{
/* FIXME: get channels + latency budget working and get rid of this; in the mean time, any ugly hack will do. */
struct dds_domain *dom;
dds_domainid_t last_id = -1;
dds_domainid_t next_id = 0;
dds_init ();
ddsrt_mutex_lock (&dds_global.m_mutex);
while ((dom = ddsrt_avl_lookup_succ (&dds_domaintree_def, &dds_global.m_domains, &last_id)) != NULL)
while ((dom = ddsrt_avl_lookup_succ_eq (&dds_domaintree_def, &dds_global.m_domains, &next_id)) != NULL)
{
last_id = dom->m_id;
/* Must be sure that the compiler doesn't reload curr_id from dom->m_id */
dds_domainid_t curr_id = *((volatile dds_domainid_t *) &dom->m_id);
next_id = curr_id + 1;
dom->gv.config.whc_batch = enable;
dds_instance_handle_t last_iid = 0;
@ -322,7 +319,7 @@ void dds_write_set_batch (bool enable)
pushdown_set_batch (e, enable);
ddsrt_mutex_lock (&dds_global.m_mutex);
dds_entity_unpin (e);
dom = ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &last_id);
dom = ddsrt_avl_lookup (&dds_domaintree_def, &dds_global.m_domains, &curr_id);
}
}
ddsrt_mutex_unlock (&dds_global.m_mutex);

View file

@ -201,10 +201,7 @@ dds_return_t dds_delete_impl (dds_entity_t entity, bool keep_if_explicit)
dds_return_t rc;
if ((rc = dds_entity_pin (entity, &e)) < 0)
{
DDS_TRACE ("dds_delete_impl: error on locking entity %"PRIu32" keep_if_explicit %d\n", entity, (int) keep_if_explicit);
return rc;
}
ddsrt_mutex_lock (&e->m_mutex);
if (keep_if_explicit == true && (e->m_flags & DDS_ENTITY_IMPLICIT) == 0)
@ -460,7 +457,7 @@ static dds_return_t dds_set_qos_locked_impl (dds_entity *e, const dds_qos_t *qos
dds_qos_t *newqos = dds_create_qos ();
nn_xqos_mergein_missing (newqos, qos, mask);
nn_xqos_mergein_missing (newqos, e->m_qos, ~(uint64_t)0);
if ((ret = nn_xqos_valid (newqos)) != DDS_RETCODE_OK)
if ((ret = nn_xqos_valid (&e->m_domain->gv.logconfig, newqos)) != DDS_RETCODE_OK)
; /* oops ... invalid or inconsistent */
else if (!(e->m_flags & DDS_ENTITY_ENABLED))
; /* do as you please while the entity is not enabled (perhaps we should even allow invalid ones?) */
@ -823,16 +820,16 @@ dds_return_t dds_enable (dds_entity_t entity)
dds_entity *e;
dds_return_t rc;
if ((rc = dds_entity_lock(entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
if ((rc = dds_entity_lock (entity, DDS_KIND_DONTCARE, &e)) != DDS_RETCODE_OK)
return rc;
if ((e->m_flags & DDS_ENTITY_ENABLED) == 0)
{
/* TODO: Really enable. */
e->m_flags |= DDS_ENTITY_ENABLED;
DDS_ERROR ("Delayed entity enabling is not supported\n");
DDS_CERROR (&e->m_domain->gv.logconfig, "Delayed entity enabling is not supported\n");
}
dds_entity_unlock(e);
dds_entity_unlock (e);
return DDS_RETCODE_OK;
}

View file

@ -58,7 +58,7 @@ dds_return_t dds_init (void)
if (dds_handle_server_init () != DDS_RETCODE_OK)
{
DDS_ERROR("Failed to initialize internal handle server\n");
DDS_ERROR ("Failed to initialize internal handle server\n");
ret = DDS_RETCODE_ERROR;
goto fail_handleserver;
}

View file

@ -41,7 +41,7 @@ dds_return_t dds_dispose_ih (dds_entity_t writer, dds_instance_handle_t handle)
static struct ddsi_tkmap_instance *dds_instance_find (const dds_topic *topic, const void *data, const bool create)
{
struct ddsi_serdata *sd = ddsi_serdata_from_sample (topic->m_stopic, SDK_KEY, data);
struct ddsi_tkmap_instance *inst = ddsi_tkmap_find (topic->m_entity.m_domain->gv.m_tkmap, sd, false, create);
struct ddsi_tkmap_instance *inst = ddsi_tkmap_find (topic->m_entity.m_domain->gv.m_tkmap, sd, create);
ddsi_serdata_unref (sd);
return inst;
}

View file

@ -41,7 +41,7 @@ static dds_return_t dds_participant_delete (dds_entity *e)
thread_state_awake (lookup_thread_state (), &e->m_domain->gv);
if ((ret = delete_participant (&e->m_domain->gv, &e->m_guid)) < 0)
DDS_ERROR ("dds_participant_delete: internal error %"PRId32"\n", ret);
DDS_CERROR (&e->m_domain->gv.logconfig, "dds_participant_delete: internal error %"PRId32"\n", ret);
ddsrt_mutex_lock (&dds_global.m_mutex);
ddsrt_avl_delete (&dds_entity_children_td, &e->m_domain->m_ppants, e);
ddsrt_mutex_unlock (&dds_global.m_mutex);
@ -100,7 +100,7 @@ dds_entity_t dds_create_participant (const dds_domainid_t domain, const dds_qos_
if (qos != NULL)
nn_xqos_mergein_missing (new_qos, qos, DDS_PARTICIPANT_QOS_MASK);
nn_xqos_mergein_missing (new_qos, &dom->gv.default_plist_pp.qos, ~(uint64_t)0);
if ((ret = nn_xqos_valid (new_qos)) < 0)
if ((ret = nn_xqos_valid (&dom->gv.logconfig, new_qos)) < 0)
goto err_qos_validation;
/* Translate qos */

View file

@ -59,7 +59,7 @@ dds_entity_t dds_create_publisher (dds_entity_t participant, const dds_qos_t *qo
if (qos)
nn_xqos_mergein_missing (new_qos, qos, DDS_PUBLISHER_QOS_MASK);
nn_xqos_mergein_missing (new_qos, &par->m_entity.m_domain->gv.default_xqos_pub, ~(uint64_t)0);
if ((ret = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK)
if ((ret = nn_xqos_valid (&par->m_entity.m_domain->gv.logconfig, new_qos)) != DDS_RETCODE_OK)
{
dds_participant_unlock (par);
return ret;

View file

@ -361,7 +361,7 @@ dds_entity_t dds_create_reader (dds_entity_t participant_or_subscriber, dds_enti
nn_xqos_mergein_missing (rqos, tp->m_entity.m_qos, ~(uint64_t)0);
nn_xqos_mergein_missing (rqos, &sub->m_entity.m_domain->gv.default_xqos_rd, ~(uint64_t)0);
if ((ret = nn_xqos_valid (rqos)) != DDS_RETCODE_OK)
if ((ret = nn_xqos_valid (&sub->m_entity.m_domain->gv.logconfig, rqos)) != DDS_RETCODE_OK)
{
dds_delete_qos (rqos);
reader = ret;

View file

@ -159,7 +159,7 @@
#define INCLUDE_TRACE 1
#if INCLUDE_TRACE
#define TRACE(...) DDS_LOG(DDS_LC_RHC, __VA_ARGS__)
#define TRACE(...) DDS_CLOG (DDS_LC_RHC, &rhc->gv->logconfig, __VA_ARGS__)
#else
#define TRACE(...) ((void)0)
#endif
@ -306,6 +306,7 @@ struct dds_rhc_default {
dds_reader *reader; /* reader -- may be NULL (used by rhc_torture) */
struct ddsi_tkmap *tkmap; /* back pointer to tkmap */
struct q_globals *gv; /* globals -- so far only for log config */
const struct ddsi_sertopic *topic; /* topic description */
uint32_t history_depth; /* depth, 1 for KEEP_LAST_1, 2**32-1 for KEEP_ALL */
@ -523,7 +524,7 @@ static void remove_inst_from_nonempty_list (struct dds_rhc_default *rhc, struct
rhc->n_nonempty_instances--;
}
struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct ddsi_tkmap *tkmap, const struct ddsi_sertopic *topic, bool xchecks)
struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct q_globals *gv, const struct ddsi_sertopic *topic, bool xchecks)
{
struct dds_rhc_default *rhc = ddsrt_malloc (sizeof (*rhc));
memset (rhc, 0, sizeof (*rhc));
@ -534,7 +535,8 @@ struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct ddsi_tkm
rhc->instances = ddsrt_hh_new (1, instance_iid_hash, instance_iid_eq);
rhc->topic = topic;
rhc->reader = reader;
rhc->tkmap = tkmap;
rhc->tkmap = gv->m_tkmap;
rhc->gv = gv;
rhc->xchecks = xchecks;
return &rhc->common;
@ -542,7 +544,7 @@ struct dds_rhc *dds_rhc_default_new_xchecks (dds_reader *reader, struct ddsi_tkm
struct dds_rhc *dds_rhc_default_new (dds_reader *reader, const struct ddsi_sertopic *topic)
{
return dds_rhc_default_new_xchecks (reader, reader->m_entity.m_domain->gv.m_tkmap, topic, (reader->m_entity.m_domain->gv.config.enabled_xchecks & DDS_XCHECK_RHC) != 0);
return dds_rhc_default_new_xchecks (reader, &reader->m_entity.m_domain->gv, topic, (reader->m_entity.m_domain->gv.config.enabled_xchecks & DDS_XCHECK_RHC) != 0);
}
static void dds_rhc_default_set_qos (struct dds_rhc_default * rhc, const dds_qos_t * qos)

View file

@ -252,9 +252,7 @@ static size_t dds_stream_check_optimize1 (const dds_topic_descriptor_t * __restr
size_t dds_stream_check_optimize (const dds_topic_descriptor_t * __restrict desc)
{
const size_t size = dds_stream_check_optimize1 (desc);
DDS_TRACE ("Marshalling for type: %s is %soptimised\n", desc->m_typename, size ? "" : "not ");
return size;
return dds_stream_check_optimize1 (desc);
}
static char *dds_stream_reuse_string (dds_istream_t * __restrict is, char * __restrict str, const uint32_t bound)

View file

@ -56,7 +56,7 @@ dds_entity_t dds__create_subscriber_l (dds_participant *participant, const dds_q
if (qos)
nn_xqos_mergein_missing (new_qos, qos, DDS_SUBSCRIBER_QOS_MASK);
nn_xqos_mergein_missing (new_qos, &participant->m_entity.m_domain->gv.default_xqos_sub, ~(uint64_t)0);
if ((ret = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK)
if ((ret = nn_xqos_valid (&participant->m_entity.m_domain->gv.logconfig, new_qos)) != DDS_RETCODE_OK)
{
dds_delete_qos (new_qos);
return ret;

View file

@ -291,6 +291,12 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
if (sertopic == NULL)
return DDS_RETCODE_BAD_PARAMETER;
/* Claim participant handle so we can be sure the handle will not be
reused if we temporarily unlock the participant to check the an
existing topic's compatibility */
if ((rc = dds_entity_pin (participant, &par_ent)) < 0)
return rc;
new_qos = dds_create_qos ();
if (qos)
nn_xqos_mergein_missing (new_qos, qos, DDS_TOPIC_QOS_MASK);
@ -305,15 +311,9 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
* Leaving the topic QoS sparse means a default-default topic QoS of
* best-effort will do "the right thing" and let a writer still default to
* reliable ... (and keep behaviour unchanged) */
if ((rc = nn_xqos_valid (new_qos)) != DDS_RETCODE_OK)
if ((rc = nn_xqos_valid (&par_ent->m_domain->gv.logconfig, new_qos)) != DDS_RETCODE_OK)
goto err_invalid_qos;
/* Claim participant handle so we can be sure the handle will not be
reused if we temporarily unlock the participant to check the an
existing topic's compatibility */
if ((rc = dds_entity_pin (participant, &par_ent)) < 0)
goto err_claim_participant;
/* FIXME: just mutex_lock ought to be good enough, but there is the
pesky "closed" check still ... */
if ((rc = dds_participant_lock (participant, &par)) != DDS_RETCODE_OK)
@ -446,10 +446,9 @@ dds_entity_t dds_create_topic_arbitrary (dds_entity_t participant, struct ddsi_s
err_sertopic_reuse:
dds_participant_unlock (par);
err_lock_participant:
dds_entity_unpin (par_ent);
err_claim_participant:
err_invalid_qos:
dds_delete_qos (new_qos);
dds_entity_unpin (par_ent);
return rc;
}
@ -494,6 +493,7 @@ dds_entity_t dds_create_topic (dds_entity_t participant, const dds_topic_descrip
/* Check if topic cannot be optimised (memcpy marshal) */
if (!(desc->m_flagset & DDS_TOPIC_NO_OPTIMIZE)) {
st->opt_size = dds_stream_check_optimize (desc);
DDS_CTRACE (&ppent->m_domain->gv.logconfig, "Marshalling for type: %s is %soptimised\n", desc->m_typename, st->opt_size ? "" : "not ");
}
nn_plist_init_empty (&plist);

View file

@ -80,6 +80,7 @@ struct whc_impl {
uint64_t total_bytes; /* total number of bytes pushed in */
unsigned is_transient_local: 1;
unsigned xchecks: 1;
struct q_globals *gv;
struct ddsi_tkmap *tkmap;
uint32_t hdepth; /* 0 = unlimited */
uint32_t tldepth; /* 0 = disabled/unlimited (no need to maintain an index if KEEP_ALL <=> is_transient_local + tldepth=0) */
@ -138,7 +139,7 @@ static bool whc_default_sample_iter_borrow_next (struct whc_sample_iter *opaque_
static void whc_default_free (struct whc *whc);
static const ddsrt_avl_treedef_t whc_seq_treedef =
DDSRT_AVL_TREEDEF_INITIALIZER (offsetof (struct whc_intvnode, avlnode), offsetof (struct whc_intvnode, min), compare_seq, 0);
DDSRT_AVL_TREEDEF_INITIALIZER (offsetof (struct whc_intvnode, avlnode), offsetof (struct whc_intvnode, min), compare_seq, 0);
static const struct whc_ops whc_ops = {
.insert = whc_default_insert,
@ -155,6 +156,8 @@ static const struct whc_ops whc_ops = {
.free = whc_default_free
};
#define TRACE(...) DDS_CLOG (DDS_LC_WHC, &whc->gv->logconfig, __VA_ARGS__)
/* Number of instantiated WHCs and a global freelist for WHC nodes that gets
initialized lazily and cleaned up automatically when the last WHC is freed.
Protected by dds_global.m_mutex.
@ -359,6 +362,7 @@ struct whc *whc_new (struct q_globals *gv, int is_transient_local, uint32_t hdep
ddsrt_mutex_init (&whc->lock);
whc->is_transient_local = is_transient_local ? 1 : 0;
whc->xchecks = (gv->config.enabled_xchecks & DDS_XCHECK_WHC) != 0;
whc->gv = gv;
whc->tkmap = gv->m_tkmap;
whc->hdepth = hdepth;
whc->tldepth = tldepth;
@ -572,7 +576,7 @@ static void free_one_instance_from_idx (struct whc_impl *whc, seqno_t max_drop_s
oldn->idxnode = NULL;
if (oldn->seq <= max_drop_seq)
{
DDS_LOG (DDS_LC_WHC, " prune tl whcn %p\n", (void *)oldn);
TRACE (" prune tl whcn %p\n", (void *)oldn);
assert (oldn != whc->maxseq_node);
whc_delete_one (whc, oldn);
}
@ -886,7 +890,7 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
if (whc->is_transient_local && whc->tldepth == 0)
{
/* KEEP_ALL on transient local, so we can never ever delete anything */
DDS_LOG (DDS_LC_WHC, " KEEP_ALL transient-local: do nothing\n");
TRACE (" KEEP_ALL transient-local: do nothing\n");
*deferred_free_list = NULL;
return 0;
}
@ -896,11 +900,11 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
prev_seq = whcn ? whcn->prev_seq : NULL;
while (whcn && whcn->seq <= max_drop_seq)
{
DDS_LOG (DDS_LC_WHC, " whcn %p %"PRId64, (void *) whcn, whcn->seq);
TRACE (" whcn %p %"PRId64, (void *) whcn, whcn->seq);
if (whcn_in_tlidx (whc, whcn->idxnode, whcn->idxnode_pos))
{
/* quickly skip over samples in tlidx */
DDS_LOG (DDS_LC_WHC, " tl:keep");
TRACE (" tl:keep");
if (whcn->unacked)
{
assert (whc->unacked_bytes >= whcn->size);
@ -918,13 +922,13 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
}
else
{
DDS_LOG (DDS_LC_WHC, " delete");
TRACE (" delete");
last_to_free->next_seq = whcn;
last_to_free = last_to_free->next_seq;
whc_delete_one_intv (whc, &intv, &whcn);
ndropped++;
}
DDS_LOG (DDS_LC_WHC, "\n");
TRACE ("\n");
}
if (prev_seq)
prev_seq->next_seq = whcn;
@ -941,7 +945,7 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
if (whc->tldepth > 0 && whc->idxdepth > whc->tldepth)
{
assert (whc->hdepth == whc->idxdepth);
DDS_LOG (DDS_LC_WHC, " idxdepth %"PRIu32" > tldepth %"PRIu32" > 0 -- must prune\n", whc->idxdepth, whc->tldepth);
TRACE (" idxdepth %"PRIu32" > tldepth %"PRIu32" > 0 -- must prune\n", whc->idxdepth, whc->tldepth);
/* Do a second pass over the sequence number range we just processed: this time we only
encounter samples that were retained because of the transient-local durability setting
@ -952,14 +956,14 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
struct whc_idxnode * const idxn = whcn->idxnode;
uint32_t cnt, idx;
DDS_LOG (DDS_LC_WHC, " whcn %p %"PRId64" idxn %p prune_seq %"PRId64":", (void *) whcn, whcn->seq, (void *) idxn, idxn->prune_seq);
TRACE (" whcn %p %"PRId64" idxn %p prune_seq %"PRId64":", (void *) whcn, whcn->seq, (void *) idxn, idxn->prune_seq);
assert (whcn_in_tlidx (whc, idxn, whcn->idxnode_pos));
assert (idxn->prune_seq <= max_drop_seq);
if (idxn->prune_seq == max_drop_seq)
{
DDS_LOG (DDS_LC_WHC, " already pruned\n");
TRACE (" already pruned\n");
whcn = whcn->next_seq;
continue;
}
@ -987,7 +991,7 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
whcn_template.serdata = ddsi_serdata_ref (oldn->serdata);
assert (oldn->seq < whcn->seq);
#endif
DDS_LOG (DDS_LC_WHC, " del %p %"PRId64, (void *) oldn, oldn->seq);
TRACE (" del %p %"PRId64, (void *) oldn, oldn->seq);
whc_delete_one (whc, oldn);
#ifndef NDEBUG
assert (ddsrt_hh_lookup (whc->idx_hash, &template) == idxn);
@ -995,7 +999,7 @@ static uint32_t whc_default_remove_acked_messages_full (struct whc_impl *whc, se
#endif
}
}
DDS_LOG (DDS_LC_WHC, "\n");
TRACE ("\n");
whcn = whcn->next_seq;
}
}
@ -1018,13 +1022,13 @@ static uint32_t whc_default_remove_acked_messages (struct whc *whc_generic, seqn
assert (max_drop_seq < MAX_SEQ_NUMBER);
assert (max_drop_seq >= whc->max_drop_seq);
if (dds_get_log_mask () & DDS_LC_WHC)
if (whc->gv->logconfig.c.mask & DDS_LC_WHC)
{
struct whc_state tmp;
get_state_locked (whc, &tmp);
DDS_LOG (DDS_LC_WHC, "whc_default_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq);
DDS_LOG (DDS_LC_WHC, " whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
tmp.min_seq, tmp.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
TRACE ("whc_default_remove_acked_messages(%p max_drop_seq %"PRId64")\n", (void *)whc, max_drop_seq);
TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
tmp.min_seq, tmp.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
}
check_whc (whc);
@ -1112,14 +1116,14 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
ddsrt_mutex_lock (&whc->lock);
check_whc (whc);
if (dds_get_log_mask () & DDS_LC_WHC)
if (whc->gv->logconfig.c.mask & DDS_LC_WHC)
{
struct whc_state whcst;
get_state_locked (whc, &whcst);
DDS_LOG (DDS_LC_WHC, "whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%"PRIx32")\n",
(void *)whc, max_drop_seq, seq, (void*)plist, (void*)serdata, serdata->hash);
DDS_LOG (DDS_LC_WHC, " whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
TRACE ("whc_default_insert(%p max_drop_seq %"PRId64" seq %"PRId64" plist %p serdata %p:%"PRIx32")\n",
(void *) whc, max_drop_seq, seq, (void *) plist, (void *) serdata, serdata->hash);
TRACE (" whc: [%"PRId64",%"PRId64"] max_drop_seq %"PRId64" h %"PRIu32" tl %"PRIu32"\n",
whcst.min_seq, whcst.max_seq, whc->max_drop_seq, whc->hdepth, whc->tldepth);
}
assert (max_drop_seq < MAX_SEQ_NUMBER);
@ -1133,12 +1137,12 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
/* Always insert in seq admin */
newn = whc_default_insert_seq (whc, max_drop_seq, seq, plist, serdata);
DDS_LOG (DDS_LC_WHC, " whcn %p:", (void*)newn);
TRACE (" whcn %p:", (void*)newn);
/* Special case of empty data (such as commit messages) can't go into index, and if we're not maintaining an index, we're done, too */
if (serdata->kind == SDK_EMPTY || whc->idxdepth == 0)
{
DDS_LOG (DDS_LC_WHC, " empty or no hist\n");
TRACE (" empty or no hist\n");
ddsrt_mutex_unlock (&whc->lock);
return 0;
}
@ -1147,15 +1151,15 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
if ((idxn = ddsrt_hh_lookup (whc->idx_hash, &template)) != NULL)
{
/* Unregisters cause deleting of index entry, non-unregister of adding/overwriting in history */
DDS_LOG (DDS_LC_WHC, " idxn %p", (void *)idxn);
TRACE (" idxn %p", (void *)idxn);
if (serdata->statusinfo & NN_STATUSINFO_UNREGISTER)
{
DDS_LOG (DDS_LC_WHC, " unreg:delete\n");
TRACE (" unreg:delete\n");
delete_one_instance_from_idx (whc, max_drop_seq, idxn);
if (newn->seq <= max_drop_seq)
{
struct whc_node *prev_seq = newn->prev_seq;
DDS_LOG (DDS_LC_WHC, " unreg:seq <= max_drop_seq: delete newn\n");
TRACE (" unreg:seq <= max_drop_seq: delete newn\n");
whc_delete_one (whc, newn);
whc->maxseq_node = prev_seq;
}
@ -1167,7 +1171,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
idxn->headidx = 0;
if ((oldn = idxn->hist[idxn->headidx]) != NULL)
{
DDS_LOG (DDS_LC_WHC, " overwrite whcn %p", (void *)oldn);
TRACE (" overwrite whcn %p", (void *)oldn);
oldn->idxnode = NULL;
}
idxn->hist[idxn->headidx] = newn;
@ -1176,7 +1180,7 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
if (oldn && (whc->hdepth > 0 || oldn->seq <= max_drop_seq))
{
DDS_LOG (DDS_LC_WHC, " prune whcn %p", (void *)oldn);
TRACE (" prune whcn %p", (void *)oldn);
assert (oldn != whc->maxseq_node);
whc_delete_one (whc, oldn);
}
@ -1192,22 +1196,22 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
pos -= whc->idxdepth;
if ((oldn = idxn->hist[pos]) != NULL)
{
DDS_LOG (DDS_LC_WHC, " prune tl whcn %p", (void *)oldn);
TRACE (" prune tl whcn %p", (void *)oldn);
assert (oldn != whc->maxseq_node);
whc_delete_one (whc, oldn);
}
}
DDS_LOG (DDS_LC_WHC, "\n");
TRACE ("\n");
}
}
else
{
DDS_LOG (DDS_LC_WHC, " newkey");
TRACE (" newkey");
/* Ignore unregisters, but insert everything else */
if (!(serdata->statusinfo & NN_STATUSINFO_UNREGISTER))
{
idxn = ddsrt_malloc (sizeof (*idxn) + whc->idxdepth * sizeof (idxn->hist[0]));
DDS_LOG (DDS_LC_WHC, " idxn %p", (void *)idxn);
TRACE (" idxn %p", (void *)idxn);
ddsi_tkmap_instance_ref (tk);
idxn->iid = tk->m_iid;
idxn->tk = tk;
@ -1223,16 +1227,16 @@ static int whc_default_insert (struct whc *whc_generic, seqno_t max_drop_seq, se
}
else
{
DDS_LOG (DDS_LC_WHC, " unreg:skip");
TRACE (" unreg:skip");
if (newn->seq <= max_drop_seq)
{
struct whc_node *prev_seq = newn->prev_seq;
DDS_LOG (DDS_LC_WHC, " unreg:seq <= max_drop_seq: delete newn\n");
TRACE (" unreg:seq <= max_drop_seq: delete newn\n");
whc_delete_one (whc, newn);
whc->maxseq_node = prev_seq;
}
}
DDS_LOG (DDS_LC_WHC, "\n");
TRACE ("\n");
}
ddsrt_mutex_unlock (&whc->lock);
return 0;

View file

@ -82,7 +82,6 @@ static dds_return_t try_store (struct rhc *rhc, const struct proxy_writer_info *
}
else
{
DDS_ERROR ("The writer could not deliver data on time, probably due to a local reader resources being full\n");
return DDS_RETCODE_TIMEOUT;
}
}
@ -102,7 +101,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
struct proxy_writer_info pwr_info;
make_proxy_writer_info (&pwr_info, &wr->e, wr->xqos);
for (uint32_t i = 0; rdary[i]; i++) {
DDS_TRACE ("reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
DDS_CTRACE (&wr->e.gv->logconfig, "reader "PGUIDFMT"\n", PGUID (rdary[i]->e.guid));
if ((ret = try_store (rdary[i]->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
break;
}
@ -132,7 +131,7 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
struct reader *rd;
if ((rd = ephash_lookup_reader_guid (gh, &m->rd_guid)) != NULL)
{
DDS_TRACE ("reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
DDS_CTRACE (&wr->e.gv->logconfig, "reader-via-guid "PGUIDFMT"\n", PGUID (rd->e.guid));
/* Copied the return value ignore from DDSI deliver_user_data () function. */
if ((ret = try_store (rd->rhc, &pwr_info, payload, tk, &max_block_ms)) != DDS_RETCODE_OK)
break;
@ -140,6 +139,11 @@ static dds_return_t deliver_locally (struct writer *wr, struct ddsi_serdata *pay
}
ddsrt_mutex_unlock (&wr->e.lock);
}
if (ret == DDS_RETCODE_TIMEOUT)
{
DDS_CERROR (&wr->e.gv->logconfig, "The writer could not deliver data on time, probably due to a local reader resources being full\n");
}
return ret;
}

View file

@ -288,7 +288,7 @@ dds_entity_t dds_create_writer (dds_entity_t participant_or_publisher, dds_entit
nn_xqos_mergein_missing (wqos, tp->m_entity.m_qos, ~(uint64_t)0);
nn_xqos_mergein_missing (wqos, &pub->m_entity.m_domain->gv.default_xqos_wr, ~(uint64_t)0);
if ((rc = nn_xqos_valid (wqos)) < 0)
if ((rc = nn_xqos_valid (&pub->m_entity.m_domain->gv.logconfig, wqos)) < 0)
{
dds_delete_qos(wqos);
goto err_bad_qos;

View file

@ -336,7 +336,7 @@ CU_Test(ddsc_entity, get_entities, .init = create_entity, .fini = delete_entity)
CU_Test(ddsc_entity, get_domainid, .init = create_entity, .fini = delete_entity)
{
dds_return_t status;
dds_domainid_t id = -1;
dds_domainid_t id = DDS_DOMAIN_DEFAULT;
/* Check getting ID with bad parameters. */
status = dds_get_domainid (0, NULL);
@ -349,7 +349,7 @@ CU_Test(ddsc_entity, get_domainid, .init = create_entity, .fini = delete_entity)
/* Get and check the domain id. */
status = dds_get_domainid (entity, &id);
CU_ASSERT_EQUAL_FATAL(status, DDS_RETCODE_OK);
CU_ASSERT_FATAL(id != -1);
CU_ASSERT_FATAL(id != DDS_DOMAIN_DEFAULT);
}
CU_Test(ddsc_entity, delete, .init = create_entity)

View file

@ -42,17 +42,13 @@ CU_Test(ddsc_participant, create_and_delete) {
/* Test for creating participant with no configuration file */
CU_Test(ddsc_participant, create_with_no_conf_no_env)
{
dds_entity_t participant, participant2, participant3;
dds_entity_t participant2, participant3;
dds_return_t status;
dds_domainid_t domain_id;
dds_domainid_t valid_domain=3;
ddsrt_unsetenv(DDS_PROJECT_NAME_NOSPACE_CAPS"_URI");
//invalid domain
participant = dds_create_participant (-2, NULL, NULL);
CU_ASSERT_FATAL(participant < 0);
//valid specific domain value
participant2 = dds_create_participant (valid_domain, NULL, NULL);
CU_ASSERT_FATAL(participant2 > 0);