add QoS to ignore local readers/writers (#78)
Adds a new "ignorelocal" QoS to the readers/writers to ignore local matching readers/writers, with three settings: * DDS_IGNORELOCAL_NONE: default * DDS_IGNORELOCAL_PARTICIPANT: ignores readers/writers in the same participant * DDS_IGNORELOCAL_PROCESS: ignores readers/writers in the same process These can be set/got using dds_qset_ignorelocal and dds_qget_ignorelocal. If a matching reader or writer is ignored because of this setting, it is as-if that reader or writer doesn't exist. No traffic will be generated or data retained on its behalf. There are no consequences for interoperability as this is (by definition) a local affair. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
a6b5229510
commit
4778d6c5df
5 changed files with 103 additions and 0 deletions
|
@ -136,6 +136,16 @@ typedef enum dds_presentation_access_scope_kind
|
||||||
}
|
}
|
||||||
dds_presentation_access_scope_kind_t;
|
dds_presentation_access_scope_kind_t;
|
||||||
|
|
||||||
|
/** Ignore-local QoS: Applies to DataReader, DataWriter */
|
||||||
|
typedef enum dds_ignorelocal_kind
|
||||||
|
{
|
||||||
|
DDS_IGNORELOCAL_NONE,
|
||||||
|
DDS_IGNORELOCAL_PARTICIPANT,
|
||||||
|
DDS_IGNORELOCAL_PROCESS
|
||||||
|
}
|
||||||
|
dds_ignorelocal_kind_t;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Allocate memory and initialize default QoS-policies
|
* @brief Allocate memory and initialize default QoS-policies
|
||||||
*
|
*
|
||||||
|
@ -465,6 +475,16 @@ dds_qset_durability_service (
|
||||||
int32_t max_instances,
|
int32_t max_instances,
|
||||||
int32_t max_samples_per_instance);
|
int32_t max_samples_per_instance);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Set the ignore-local policy of a qos structure
|
||||||
|
*
|
||||||
|
* @param[in,out] qos - Pointer to a dds_qos_t structure that will store the policy
|
||||||
|
* @param[in] ignore - True if readers and writers owned by the same participant should be ignored
|
||||||
|
*/
|
||||||
|
DDS_EXPORT void dds_qset_ignorelocal (
|
||||||
|
dds_qos_t * __restrict qos,
|
||||||
|
dds_ignorelocal_kind_t ignore);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get the userdata from a qos structure
|
* @brief Get the userdata from a qos structure
|
||||||
*
|
*
|
||||||
|
@ -753,6 +773,19 @@ dds_qget_durability_service (
|
||||||
int32_t *max_instances,
|
int32_t *max_instances,
|
||||||
int32_t *max_samples_per_instance);
|
int32_t *max_samples_per_instance);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Get the ignore-local qos policy
|
||||||
|
*
|
||||||
|
* @param[in] qos - Pointer to a dds_qos_t structure storing the policy
|
||||||
|
* @param[in,out] ignore - Pointer that will store whether to ignore readers/writers owned by the same participant (optional)
|
||||||
|
*
|
||||||
|
* @returns - false iff any of the arguments is invalid or the qos is not present in the qos object
|
||||||
|
*/
|
||||||
|
DDS_EXPORT bool
|
||||||
|
dds_qget_ignorelocal (
|
||||||
|
const dds_qos_t * __restrict qos,
|
||||||
|
dds_ignorelocal_kind_t *ignore);
|
||||||
|
|
||||||
#if defined (__cplusplus)
|
#if defined (__cplusplus)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -658,6 +658,16 @@ void dds_qset_durability_service
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dds_qset_ignorelocal (dds_qos_t * __restrict qos, dds_ignorelocal_kind_t ignore)
|
||||||
|
{
|
||||||
|
if (qos) {
|
||||||
|
qos->ignorelocal.value = (nn_ignorelocal_kind_t) ignore;
|
||||||
|
qos->present |= QP_CYCLONE_IGNORELOCAL;
|
||||||
|
} else {
|
||||||
|
DDS_ERROR("Argument QoS is NULL\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool dds_qget_userdata (const dds_qos_t * __restrict qos, void **value, size_t *sz)
|
bool dds_qget_userdata (const dds_qos_t * __restrict qos, void **value, size_t *sz)
|
||||||
{
|
{
|
||||||
if (!qos || !(qos->present & QP_USER_DATA)) {
|
if (!qos || !(qos->present & QP_USER_DATA)) {
|
||||||
|
@ -931,3 +941,14 @@ bool dds_qget_durability_service (const dds_qos_t * __restrict qos, dds_duration
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool dds_qget_ignorelocal (const dds_qos_t * __restrict qos, dds_ignorelocal_kind_t *ignore)
|
||||||
|
{
|
||||||
|
if (!qos || !(qos->present & QP_CYCLONE_IGNORELOCAL)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (ignore) {
|
||||||
|
*ignore = (dds_ignorelocal_kind_t) qos->ignorelocal.value;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
|
@ -205,6 +205,16 @@ typedef struct nn_share_qospolicy {
|
||||||
char *name;
|
char *name;
|
||||||
} nn_share_qospolicy_t;
|
} nn_share_qospolicy_t;
|
||||||
|
|
||||||
|
typedef enum nn_ignorelocal_kind {
|
||||||
|
NN_NONE_IGNORELOCAL_QOS,
|
||||||
|
NN_PARTICIPANT_IGNORELOCAL_QOS,
|
||||||
|
NN_PROCESS_IGNORELOCAL_QOS
|
||||||
|
} nn_ignorelocal_kind_t;
|
||||||
|
|
||||||
|
typedef struct nn_ignorelocal_qospolicy {
|
||||||
|
nn_ignorelocal_kind_t value;
|
||||||
|
} nn_ignorelocal_qospolicy_t;
|
||||||
|
|
||||||
/***/
|
/***/
|
||||||
|
|
||||||
/* Qos Present bit indices */
|
/* Qos Present bit indices */
|
||||||
|
@ -237,6 +247,7 @@ typedef struct nn_share_qospolicy {
|
||||||
#define QP_PRISMTECH_ENTITY_FACTORY ((uint64_t)1 << 27)
|
#define QP_PRISMTECH_ENTITY_FACTORY ((uint64_t)1 << 27)
|
||||||
#define QP_PRISMTECH_SYNCHRONOUS_ENDPOINT ((uint64_t)1 << 28)
|
#define QP_PRISMTECH_SYNCHRONOUS_ENDPOINT ((uint64_t)1 << 28)
|
||||||
#define QP_RTI_TYPECODE ((uint64_t)1 << 29)
|
#define QP_RTI_TYPECODE ((uint64_t)1 << 29)
|
||||||
|
#define QP_CYCLONE_IGNORELOCAL ((uint64_t)1 << 30)
|
||||||
|
|
||||||
/* Partition QoS is not RxO according to the specification (DDS 1.2,
|
/* Partition QoS is not RxO according to the specification (DDS 1.2,
|
||||||
section 7.1.3), but communication will not take place unless it
|
section 7.1.3), but communication will not take place unless it
|
||||||
|
@ -290,6 +301,7 @@ typedef struct nn_xqos {
|
||||||
/*x xR*/nn_reader_lifespan_qospolicy_t reader_lifespan;
|
/*x xR*/nn_reader_lifespan_qospolicy_t reader_lifespan;
|
||||||
/*x xR*/nn_share_qospolicy_t share;
|
/*x xR*/nn_share_qospolicy_t share;
|
||||||
/*xxx */nn_synchronous_endpoint_qospolicy_t synchronous_endpoint;
|
/*xxx */nn_synchronous_endpoint_qospolicy_t synchronous_endpoint;
|
||||||
|
/* x */nn_ignorelocal_qospolicy_t ignorelocal;
|
||||||
|
|
||||||
/* X*/nn_octetseq_t rti_typecode;
|
/* X*/nn_octetseq_t rti_typecode;
|
||||||
} nn_xqos_t;
|
} nn_xqos_t;
|
||||||
|
|
|
@ -2058,12 +2058,39 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r
|
||||||
proxy_writer_add_connection (pwr, rd, tnow, init_count);
|
proxy_writer_add_connection (pwr, rd, tnow, init_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool ignore_local_p (const nn_guid_t *guid1, const nn_guid_t *guid2, const struct nn_xqos *xqos1, const struct nn_xqos *xqos2)
|
||||||
|
{
|
||||||
|
assert (xqos1->present & QP_CYCLONE_IGNORELOCAL);
|
||||||
|
assert (xqos2->present & QP_CYCLONE_IGNORELOCAL);
|
||||||
|
switch (xqos1->ignorelocal.value)
|
||||||
|
{
|
||||||
|
case NN_NONE_IGNORELOCAL_QOS:
|
||||||
|
break;
|
||||||
|
case NN_PARTICIPANT_IGNORELOCAL_QOS:
|
||||||
|
return memcmp (&guid1->prefix, &guid2->prefix, sizeof (guid1->prefix)) == 0;
|
||||||
|
case NN_PROCESS_IGNORELOCAL_QOS:
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
switch (xqos2->ignorelocal.value)
|
||||||
|
{
|
||||||
|
case NN_NONE_IGNORELOCAL_QOS:
|
||||||
|
break;
|
||||||
|
case NN_PARTICIPANT_IGNORELOCAL_QOS:
|
||||||
|
return memcmp (&guid1->prefix, &guid2->prefix, sizeof (guid1->prefix)) == 0;
|
||||||
|
case NN_PROCESS_IGNORELOCAL_QOS:
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn_mtime_t tnow)
|
static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn_mtime_t tnow)
|
||||||
{
|
{
|
||||||
int32_t reason;
|
int32_t reason;
|
||||||
(void)tnow;
|
(void)tnow;
|
||||||
if (!is_local_orphan_endpoint (&wr->e) && (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) || is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE)))
|
if (!is_local_orphan_endpoint (&wr->e) && (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) || is_builtin_entityid (rd->e.guid.entityid, NN_VENDORID_ECLIPSE)))
|
||||||
return;
|
return;
|
||||||
|
if (ignore_local_p (&wr->e.guid, &rd->e.guid, wr->xqos, rd->xqos))
|
||||||
|
return;
|
||||||
if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0)
|
if ((reason = qos_match_p (rd->xqos, wr->xqos)) >= 0)
|
||||||
{
|
{
|
||||||
writer_qos_mismatch (wr, (uint32_t)reason);
|
writer_qos_mismatch (wr, (uint32_t)reason);
|
||||||
|
|
|
@ -2656,6 +2656,9 @@ static void xqos_init_default_common (nn_xqos_t *xqos)
|
||||||
|
|
||||||
xqos->present |= QP_PRISMTECH_SYNCHRONOUS_ENDPOINT;
|
xqos->present |= QP_PRISMTECH_SYNCHRONOUS_ENDPOINT;
|
||||||
xqos->synchronous_endpoint.value = 0;
|
xqos->synchronous_endpoint.value = 0;
|
||||||
|
|
||||||
|
xqos->present |= QP_CYCLONE_IGNORELOCAL;
|
||||||
|
xqos->ignorelocal.value = NN_NONE_IGNORELOCAL_QOS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void nn_xqos_init_default_reader (nn_xqos_t *xqos)
|
void nn_xqos_init_default_reader (nn_xqos_t *xqos)
|
||||||
|
@ -2810,6 +2813,7 @@ void nn_xqos_mergein_missing (nn_xqos_t *a, const nn_xqos_t *b)
|
||||||
CQ (PRISMTECH_READER_LIFESPAN, reader_lifespan);
|
CQ (PRISMTECH_READER_LIFESPAN, reader_lifespan);
|
||||||
CQ (PRISMTECH_ENTITY_FACTORY, entity_factory);
|
CQ (PRISMTECH_ENTITY_FACTORY, entity_factory);
|
||||||
CQ (PRISMTECH_SYNCHRONOUS_ENDPOINT, synchronous_endpoint);
|
CQ (PRISMTECH_SYNCHRONOUS_ENDPOINT, synchronous_endpoint);
|
||||||
|
CQ (CYCLONE_IGNORELOCAL, ignorelocal);
|
||||||
#undef CQ
|
#undef CQ
|
||||||
|
|
||||||
/* For allocated ones it is Not strictly necessary to use tmp, as
|
/* For allocated ones it is Not strictly necessary to use tmp, as
|
||||||
|
@ -3197,6 +3201,10 @@ uint64_t nn_xqos_delta (const nn_xqos_t *a, const nn_xqos_t *b, uint64_t mask)
|
||||||
if (octetseqs_differ (&a->rti_typecode, &b->rti_typecode))
|
if (octetseqs_differ (&a->rti_typecode, &b->rti_typecode))
|
||||||
delta |= QP_RTI_TYPECODE;
|
delta |= QP_RTI_TYPECODE;
|
||||||
}
|
}
|
||||||
|
if (check & QP_CYCLONE_IGNORELOCAL) {
|
||||||
|
if (a->ignorelocal.value != b->ignorelocal.value)
|
||||||
|
delta |= QP_CYCLONE_IGNORELOCAL;
|
||||||
|
}
|
||||||
return delta;
|
return delta;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3257,6 +3265,7 @@ void nn_xqos_addtomsg (struct nn_xmsg *m, const nn_xqos_t *xqos, uint64_t wanted
|
||||||
SIMPLE (PRISMTECH_ENTITY_FACTORY, entity_factory);
|
SIMPLE (PRISMTECH_ENTITY_FACTORY, entity_factory);
|
||||||
SIMPLE (PRISMTECH_SYNCHRONOUS_ENDPOINT, synchronous_endpoint);
|
SIMPLE (PRISMTECH_SYNCHRONOUS_ENDPOINT, synchronous_endpoint);
|
||||||
FUNC_BY_REF (RTI_TYPECODE, rti_typecode, octetseq);
|
FUNC_BY_REF (RTI_TYPECODE, rti_typecode, octetseq);
|
||||||
|
/* CYCLONE_IGNORELOCAL is not visible on the wire */
|
||||||
#undef FUNC_BY_REF
|
#undef FUNC_BY_REF
|
||||||
#undef FUNC_BY_VAL
|
#undef FUNC_BY_VAL
|
||||||
#undef SIMPLE
|
#undef SIMPLE
|
||||||
|
@ -3467,6 +3476,7 @@ void nn_log_xqos (uint32_t cat, const nn_xqos_t *xqos)
|
||||||
log_octetseq (cat, xqos->rti_typecode.length, xqos->rti_typecode.value);
|
log_octetseq (cat, xqos->rti_typecode.length, xqos->rti_typecode.value);
|
||||||
DDS_LOG(cat, ">");
|
DDS_LOG(cat, ">");
|
||||||
});
|
});
|
||||||
|
DO (CYCLONE_IGNORELOCAL, { LOGB1 ("ignorelocal=%u", xqos->ignorelocal.value); });
|
||||||
|
|
||||||
#undef PRINTARG_DUR
|
#undef PRINTARG_DUR
|
||||||
#undef FMT_DUR
|
#undef FMT_DUR
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue