add back in the broken filter and query condition support
the implementation was and is terrible, but without it too many tests fail Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
791a0efe7e
commit
e841e4bf94
6 changed files with 37 additions and 71 deletions
|
@ -200,12 +200,20 @@ typedef struct dds_writer
|
||||||
}
|
}
|
||||||
dds_writer;
|
dds_writer;
|
||||||
|
|
||||||
|
#ifndef DDS_TOPIC_INTERN_FILTER_FN_DEFINED
|
||||||
|
#define DDS_TOPIC_INTERN_FILTER_FN_DEFINED
|
||||||
|
typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx);
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct dds_topic
|
typedef struct dds_topic
|
||||||
{
|
{
|
||||||
struct dds_entity m_entity;
|
struct dds_entity m_entity;
|
||||||
struct ddsi_sertopic * m_stopic;
|
struct ddsi_sertopic * m_stopic;
|
||||||
const dds_topic_descriptor_t * m_descriptor;
|
const dds_topic_descriptor_t * m_descriptor;
|
||||||
|
|
||||||
|
dds_topic_intern_filter_fn filter_fn;
|
||||||
|
void * filter_ctx;
|
||||||
|
|
||||||
/* Status metrics */
|
/* Status metrics */
|
||||||
|
|
||||||
dds_inconsistent_topic_status_t m_inconsistent_topic_status;
|
dds_inconsistent_topic_status_t m_inconsistent_topic_status;
|
||||||
|
|
|
@ -27,11 +27,9 @@ dds_create_querycondition(
|
||||||
_In_ uint32_t mask,
|
_In_ uint32_t mask,
|
||||||
_In_ dds_querycondition_filter_fn filter)
|
_In_ dds_querycondition_filter_fn filter)
|
||||||
{
|
{
|
||||||
dds_entity_t topic;
|
|
||||||
dds_entity_t hdl;
|
dds_entity_t hdl;
|
||||||
dds__retcode_t rc;
|
dds__retcode_t rc;
|
||||||
dds_reader *r;
|
dds_reader *r;
|
||||||
dds_topic *t;
|
|
||||||
|
|
||||||
DDS_REPORT_STACK();
|
DDS_REPORT_STACK();
|
||||||
|
|
||||||
|
@ -41,21 +39,7 @@ dds_create_querycondition(
|
||||||
assert(cond);
|
assert(cond);
|
||||||
hdl = cond->m_entity.m_hdl;
|
hdl = cond->m_entity.m_hdl;
|
||||||
cond->m_query.m_filter = filter;
|
cond->m_query.m_filter = filter;
|
||||||
topic = r->m_topic->m_entity.m_hdl;
|
|
||||||
dds_reader_unlock(r);
|
dds_reader_unlock(r);
|
||||||
rc = dds_topic_lock(topic, &t);
|
|
||||||
if (rc == DDS_RETCODE_OK) {
|
|
||||||
abort();
|
|
||||||
#if 0
|
|
||||||
if (t->m_stopic->filter_sample == NULL) {
|
|
||||||
t->m_stopic->filter_sample = dds_alloc(t->m_descriptor->m_size);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
dds_topic_unlock(t);
|
|
||||||
} else {
|
|
||||||
(void)dds_delete(hdl);
|
|
||||||
hdl = DDS_ERRNO(rc, "Error occurred on locking topic");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
hdl = DDS_ERRNO(rc, "Error occurred on locking reader");
|
hdl = DDS_ERRNO(rc, "Error occurred on locking reader");
|
||||||
}
|
}
|
||||||
|
|
|
@ -663,16 +663,19 @@ static bool add_sample
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool content_filter_accepts (const struct ddsi_sertopic * topic, const struct ddsi_serdata *sample)
|
static bool content_filter_accepts (const struct ddsi_sertopic * sertopic, const struct ddsi_serdata *sample)
|
||||||
{
|
{
|
||||||
bool ret = true;
|
bool ret = true;
|
||||||
#if 0 /* FIXME: content filter */
|
const struct dds_topic *tp = sertopic->status_cb_entity;
|
||||||
if (topic->filter_fn)
|
if (tp->filter_fn)
|
||||||
{
|
{
|
||||||
deserialize_into ((char*) topic->filter_sample, sample);
|
const dds_topic_descriptor_t * desc = tp->m_descriptor;
|
||||||
ret = (topic->filter_fn) (topic->filter_sample, topic->filter_ctx);
|
char tmp[desc->m_size];
|
||||||
|
memset (tmp, 0, sizeof (tmp));
|
||||||
|
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
|
||||||
|
ret = (tp->filter_fn) (tmp, tp->filter_ctx);
|
||||||
|
dds_sample_free(tmp, desc, DDS_FREE_CONTENTS_BIT);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2179,9 +2182,9 @@ static bool update_conditions_locked
|
||||||
dds_readcond * iter;
|
dds_readcond * iter;
|
||||||
int m_pre;
|
int m_pre;
|
||||||
int m_post;
|
int m_post;
|
||||||
#if 0 /* FIXME: content filter, query cond */
|
bool deserialised = (rhc->topic->status_cb_entity->filter_fn != 0);
|
||||||
bool deserialised = (rhc->topic->filter_fn != NULL);
|
const struct dds_topic_descriptor *desc = rhc->topic->status_cb_entity->m_descriptor;
|
||||||
#endif
|
char tmp[desc->m_size];
|
||||||
|
|
||||||
TRACE (("update_conditions_locked(%p) - inst %u nonempty %u disp %u nowr %u new %u samples %u read %u\n",
|
TRACE (("update_conditions_locked(%p) - inst %u nonempty %u disp %u nowr %u new %u samples %u read %u\n",
|
||||||
(void *) rhc, rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed,
|
(void *) rhc, rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed,
|
||||||
|
@ -2224,17 +2227,17 @@ static bool update_conditions_locked
|
||||||
}
|
}
|
||||||
else if (m_pre < m_post)
|
else if (m_pre < m_post)
|
||||||
{
|
{
|
||||||
#if 0 /* FIXME: content filter, query cond */
|
|
||||||
if (sample && !deserialised && (dds_entity_kind(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY))
|
if (sample && !deserialised && (dds_entity_kind(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY))
|
||||||
{
|
{
|
||||||
deserialize_into ((char*)rhc->topic->filter_sample, sample);
|
memset (tmp, 0, sizeof (tmp));
|
||||||
|
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
|
||||||
deserialised = true;
|
deserialised = true;
|
||||||
}
|
}
|
||||||
if
|
if
|
||||||
(
|
(
|
||||||
(sample == NULL)
|
(sample == NULL)
|
||||||
|| (dds_entity_kind(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|
|| (dds_entity_kind(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY)
|
||||||
|| (iter->m_query.m_filter != NULL && iter->m_query.m_filter (rhc->topic->filter_sample))
|
|| (iter->m_query.m_filter != NULL && iter->m_query.m_filter (tmp))
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
TRACE (("now matches"));
|
TRACE (("now matches"));
|
||||||
|
@ -2244,18 +2247,6 @@ static bool update_conditions_locked
|
||||||
trigger = true;
|
trigger = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
assert (dds_entity_kind(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY);
|
|
||||||
if (sample == NULL)
|
|
||||||
{
|
|
||||||
TRACE (("now matches"));
|
|
||||||
if (iter->m_entity.m_trigger++ == 0)
|
|
||||||
{
|
|
||||||
TRACE ((" (cond now triggers)"));
|
|
||||||
trigger = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -2273,6 +2264,8 @@ static bool update_conditions_locked
|
||||||
iter = iter->m_rhc_next;
|
iter = iter->m_rhc_next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (deserialised)
|
||||||
|
dds_sample_free (tmp, desc, DDS_FREE_CONTENTS_BIT);
|
||||||
return trigger;
|
return trigger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -502,28 +502,20 @@ dds_topic_mod_filter(
|
||||||
void **ctx,
|
void **ctx,
|
||||||
bool set)
|
bool set)
|
||||||
{
|
{
|
||||||
#if 0 /* FIXME: content filter */
|
|
||||||
dds_topic *t;
|
dds_topic *t;
|
||||||
if (dds_topic_lock(topic, &t) == DDS_RETCODE_OK) {
|
if (dds_topic_lock(topic, &t) == DDS_RETCODE_OK) {
|
||||||
if (set) {
|
if (set) {
|
||||||
t->m_stopic->filter_fn = *filter;
|
t->filter_fn = *filter;
|
||||||
t->m_stopic->filter_ctx = *ctx;
|
t->filter_ctx = *ctx;
|
||||||
|
|
||||||
/* Create sample for read filtering */
|
|
||||||
|
|
||||||
if (t->m_stopic->filter_sample == NULL) {
|
|
||||||
t->m_stopic->filter_sample = dds_alloc (t->m_descriptor->m_size);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
*filter = t->m_stopic->filter_fn;
|
*filter = t->filter_fn;
|
||||||
*ctx = t->m_stopic->filter_ctx;
|
*ctx = t->filter_ctx;
|
||||||
}
|
}
|
||||||
dds_topic_unlock(t);
|
dds_topic_unlock(t);
|
||||||
} else {
|
} else {
|
||||||
*filter = 0;
|
*filter = 0;
|
||||||
*ctx = NULL;
|
*ctx = NULL;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_Pre_satisfies_((topic & DDS_ENTITY_KIND_MASK) == DDS_KIND_TOPIC)
|
_Pre_satisfies_((topic & DDS_ENTITY_KIND_MASK) == DDS_KIND_TOPIC)
|
||||||
|
@ -545,8 +537,7 @@ dds_topic_get_filter(
|
||||||
dds_topic_intern_filter_fn filter;
|
dds_topic_intern_filter_fn filter;
|
||||||
void *ctx;
|
void *ctx;
|
||||||
dds_topic_mod_filter (topic, &filter, &ctx, false);
|
dds_topic_mod_filter (topic, &filter, &ctx, false);
|
||||||
return
|
return (filter == dds_topic_chaining_filter) ? (dds_topic_filter_fn)ctx : 0;
|
||||||
(filter == dds_topic_chaining_filter) ? (dds_topic_filter_fn)ctx : 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|
|
@ -198,14 +198,12 @@ dds_write_impl(
|
||||||
return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER, "No data buffer provided");
|
return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER, "No data buffer provided");
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0 /* FIXME: content filter */
|
|
||||||
/* Check for topic filter */
|
/* Check for topic filter */
|
||||||
if (ddsi_wr->topic->filter_fn && ! writekey) {
|
if (wr->m_topic->filter_fn && ! writekey) {
|
||||||
if (!(ddsi_wr->topic->filter_fn) (data, ddsi_wr->topic->filter_ctx)) {
|
if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) {
|
||||||
return DDS_RECTODE_OK;
|
return DDS_RETCODE_OK;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
if (asleep) {
|
if (asleep) {
|
||||||
thread_state_awake (thr);
|
thread_state_awake (thr);
|
||||||
|
@ -265,12 +263,9 @@ dds_writecdr_impl(
|
||||||
struct writer * ddsi_wr = wr->m_wr;
|
struct writer * ddsi_wr = wr->m_wr;
|
||||||
struct tkmap_instance * tk;
|
struct tkmap_instance * tk;
|
||||||
|
|
||||||
#if 0 /* FIXME: content filter */
|
if (wr->m_topic->filter_fn) {
|
||||||
/* Check for topic filter */
|
|
||||||
if (ddsi_wr->topic->filter_fn && ! writekey) {
|
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
if (asleep) {
|
if (asleep) {
|
||||||
thread_state_awake (thr);
|
thread_state_awake (thr);
|
||||||
|
|
|
@ -78,11 +78,6 @@ static size_t alignup_size (size_t x, size_t a)
|
||||||
return (x+m) & ~m;
|
return (x+m) & ~m;
|
||||||
}
|
}
|
||||||
|
|
||||||
static size_t alignup4 (size_t x)
|
|
||||||
{
|
|
||||||
return alignup_size (x, 4);
|
|
||||||
}
|
|
||||||
|
|
||||||
void * ddsi_serstate_append (struct serstate * st, size_t n)
|
void * ddsi_serstate_append (struct serstate * st, size_t n)
|
||||||
{
|
{
|
||||||
char *p;
|
char *p;
|
||||||
|
@ -378,7 +373,7 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi
|
||||||
|
|
||||||
/* if we're it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */
|
/* if we're it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */
|
||||||
assert (kind != SDK_KEY || rawkey == (const unsigned char *)sample->blob + sizeof (nn_parameter_t));
|
assert (kind != SDK_KEY || rawkey == (const unsigned char *)sample->blob + sizeof (nn_parameter_t));
|
||||||
assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup4 (keysize) + sizeof (nn_parameter_t));
|
assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup_size (keysize, 4) + sizeof (nn_parameter_t));
|
||||||
return fix_serdata_default (d, tp->c.iid);
|
return fix_serdata_default (d, tp->c.iid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,7 +399,7 @@ static void serdata_default_to_ser (const struct ddsi_serdata *serdata_common, s
|
||||||
{
|
{
|
||||||
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
|
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
|
||||||
assert (off < d->pos + sizeof(struct CDRHeader));
|
assert (off < d->pos + sizeof(struct CDRHeader));
|
||||||
assert (sz <= alignup4 (d->pos + sizeof(struct CDRHeader)) - off);
|
assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off);
|
||||||
/* FIXME: maybe I should pull the header out ... */
|
/* FIXME: maybe I should pull the header out ... */
|
||||||
memcpy (buf, (char *)&d->hdr + off, sz);
|
memcpy (buf, (char *)&d->hdr + off, sz);
|
||||||
}
|
}
|
||||||
|
@ -413,7 +408,7 @@ static struct ddsi_serdata *serdata_default_to_ser_ref (const struct ddsi_serdat
|
||||||
{
|
{
|
||||||
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
|
const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common;
|
||||||
assert (off < d->pos + sizeof(struct CDRHeader));
|
assert (off < d->pos + sizeof(struct CDRHeader));
|
||||||
assert (sz <= alignup4 (d->pos + sizeof(struct CDRHeader)) - off);
|
assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off);
|
||||||
ref->iov_base = (char *)&d->hdr + off;
|
ref->iov_base = (char *)&d->hdr + off;
|
||||||
ref->iov_len = sz;
|
ref->iov_len = sz;
|
||||||
return ddsi_serdata_ref(serdata_common);
|
return ddsi_serdata_ref(serdata_common);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue