From e841e4bf9428820a65024abd57f1fb767b168fa0 Mon Sep 17 00:00:00 2001 From: Erik Boasson Date: Fri, 26 Oct 2018 20:05:18 +0800 Subject: [PATCH] add back in the broken filter and query condition support the implementation was and is terrible, but without it too many tests fail Signed-off-by: Erik Boasson --- src/core/ddsc/src/dds__types.h | 8 +++++ src/core/ddsc/src/dds_querycond.c | 16 --------- src/core/ddsc/src/dds_rhc.c | 41 ++++++++++-------------- src/core/ddsc/src/dds_topic.c | 19 +++-------- src/core/ddsc/src/dds_write.c | 13 +++----- src/core/ddsi/src/ddsi_serdata_default.c | 11 ++----- 6 files changed, 37 insertions(+), 71 deletions(-) diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index 07d73c6..4821c76 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -200,12 +200,20 @@ typedef struct dds_writer } dds_writer; +#ifndef DDS_TOPIC_INTERN_FILTER_FN_DEFINED +#define DDS_TOPIC_INTERN_FILTER_FN_DEFINED +typedef bool (*dds_topic_intern_filter_fn) (const void * sample, void *ctx); +#endif + typedef struct dds_topic { struct dds_entity m_entity; struct ddsi_sertopic * m_stopic; const dds_topic_descriptor_t * m_descriptor; + dds_topic_intern_filter_fn filter_fn; + void * filter_ctx; + /* Status metrics */ dds_inconsistent_topic_status_t m_inconsistent_topic_status; diff --git a/src/core/ddsc/src/dds_querycond.c b/src/core/ddsc/src/dds_querycond.c index 7551539..efc2d41 100644 --- a/src/core/ddsc/src/dds_querycond.c +++ b/src/core/ddsc/src/dds_querycond.c @@ -27,11 +27,9 @@ dds_create_querycondition( _In_ uint32_t mask, _In_ dds_querycondition_filter_fn filter) { - dds_entity_t topic; dds_entity_t hdl; dds__retcode_t rc; dds_reader *r; - dds_topic *t; DDS_REPORT_STACK(); @@ -41,21 +39,7 @@ dds_create_querycondition( assert(cond); hdl = cond->m_entity.m_hdl; cond->m_query.m_filter = filter; - topic = r->m_topic->m_entity.m_hdl; dds_reader_unlock(r); - rc = dds_topic_lock(topic, &t); - if (rc == DDS_RETCODE_OK) { - abort(); -#if 0 - if (t->m_stopic->filter_sample == NULL) { - t->m_stopic->filter_sample = dds_alloc(t->m_descriptor->m_size); - } -#endif - dds_topic_unlock(t); - } else { - (void)dds_delete(hdl); - hdl = DDS_ERRNO(rc, "Error occurred on locking topic"); - } } else { hdl = DDS_ERRNO(rc, "Error occurred on locking reader"); } diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index bea7ec7..9e8f587 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -663,16 +663,19 @@ static bool add_sample return true; } -static bool content_filter_accepts (const struct ddsi_sertopic * topic, const struct ddsi_serdata *sample) +static bool content_filter_accepts (const struct ddsi_sertopic * sertopic, const struct ddsi_serdata *sample) { bool ret = true; -#if 0 /* FIXME: content filter */ - if (topic->filter_fn) + const struct dds_topic *tp = sertopic->status_cb_entity; + if (tp->filter_fn) { - deserialize_into ((char*) topic->filter_sample, sample); - ret = (topic->filter_fn) (topic->filter_sample, topic->filter_ctx); + const dds_topic_descriptor_t * desc = tp->m_descriptor; + char tmp[desc->m_size]; + memset (tmp, 0, sizeof (tmp)); + ddsi_serdata_to_sample (sample, tmp, NULL, NULL); + ret = (tp->filter_fn) (tmp, tp->filter_ctx); + dds_sample_free(tmp, desc, DDS_FREE_CONTENTS_BIT); } -#endif return ret; } @@ -2179,9 +2182,9 @@ static bool update_conditions_locked dds_readcond * iter; int m_pre; int m_post; -#if 0 /* FIXME: content filter, query cond */ - bool deserialised = (rhc->topic->filter_fn != NULL); -#endif + bool deserialised = (rhc->topic->status_cb_entity->filter_fn != 0); + const struct dds_topic_descriptor *desc = rhc->topic->status_cb_entity->m_descriptor; + char tmp[desc->m_size]; TRACE (("update_conditions_locked(%p) - inst %u nonempty %u disp %u nowr %u new %u samples %u read %u\n", (void *) rhc, rhc->n_instances, rhc->n_nonempty_instances, rhc->n_not_alive_disposed, @@ -2224,17 +2227,17 @@ static bool update_conditions_locked } else if (m_pre < m_post) { -#if 0 /* FIXME: content filter, query cond */ if (sample && !deserialised && (dds_entity_kind(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY)) { - deserialize_into ((char*)rhc->topic->filter_sample, sample); + memset (tmp, 0, sizeof (tmp)); + ddsi_serdata_to_sample (sample, tmp, NULL, NULL); deserialised = true; } if ( (sample == NULL) || (dds_entity_kind(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY) - || (iter->m_query.m_filter != NULL && iter->m_query.m_filter (rhc->topic->filter_sample)) + || (iter->m_query.m_filter != NULL && iter->m_query.m_filter (tmp)) ) { TRACE (("now matches")); @@ -2244,18 +2247,6 @@ static bool update_conditions_locked trigger = true; } } -#else - assert (dds_entity_kind(iter->m_entity.m_hdl) != DDS_KIND_COND_QUERY); - if (sample == NULL) - { - TRACE (("now matches")); - if (iter->m_entity.m_trigger++ == 0) - { - TRACE ((" (cond now triggers)")); - trigger = true; - } - } -#endif } else { @@ -2273,6 +2264,8 @@ static bool update_conditions_locked iter = iter->m_rhc_next; } + if (deserialised) + dds_sample_free (tmp, desc, DDS_FREE_CONTENTS_BIT); return trigger; } diff --git a/src/core/ddsc/src/dds_topic.c b/src/core/ddsc/src/dds_topic.c index bc79f26..a1684f5 100644 --- a/src/core/ddsc/src/dds_topic.c +++ b/src/core/ddsc/src/dds_topic.c @@ -502,28 +502,20 @@ dds_topic_mod_filter( void **ctx, bool set) { -#if 0 /* FIXME: content filter */ dds_topic *t; if (dds_topic_lock(topic, &t) == DDS_RETCODE_OK) { if (set) { - t->m_stopic->filter_fn = *filter; - t->m_stopic->filter_ctx = *ctx; - - /* Create sample for read filtering */ - - if (t->m_stopic->filter_sample == NULL) { - t->m_stopic->filter_sample = dds_alloc (t->m_descriptor->m_size); - } + t->filter_fn = *filter; + t->filter_ctx = *ctx; } else { - *filter = t->m_stopic->filter_fn; - *ctx = t->m_stopic->filter_ctx; + *filter = t->filter_fn; + *ctx = t->filter_ctx; } dds_topic_unlock(t); } else { *filter = 0; *ctx = NULL; } -#endif } _Pre_satisfies_((topic & DDS_ENTITY_KIND_MASK) == DDS_KIND_TOPIC) @@ -545,8 +537,7 @@ dds_topic_get_filter( dds_topic_intern_filter_fn filter; void *ctx; dds_topic_mod_filter (topic, &filter, &ctx, false); - return - (filter == dds_topic_chaining_filter) ? (dds_topic_filter_fn)ctx : 0; + return (filter == dds_topic_chaining_filter) ? (dds_topic_filter_fn)ctx : 0; } void diff --git a/src/core/ddsc/src/dds_write.c b/src/core/ddsc/src/dds_write.c index 545a061..50a8fd1 100644 --- a/src/core/ddsc/src/dds_write.c +++ b/src/core/ddsc/src/dds_write.c @@ -198,14 +198,12 @@ dds_write_impl( return DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER, "No data buffer provided"); } -#if 0 /* FIXME: content filter */ /* Check for topic filter */ - if (ddsi_wr->topic->filter_fn && ! writekey) { - if (!(ddsi_wr->topic->filter_fn) (data, ddsi_wr->topic->filter_ctx)) { - return DDS_RECTODE_OK; + if (wr->m_topic->filter_fn && ! writekey) { + if (!(wr->m_topic->filter_fn) (data, wr->m_topic->filter_ctx)) { + return DDS_RETCODE_OK; } } -#endif if (asleep) { thread_state_awake (thr); @@ -265,12 +263,9 @@ dds_writecdr_impl( struct writer * ddsi_wr = wr->m_wr; struct tkmap_instance * tk; -#if 0 /* FIXME: content filter */ - /* Check for topic filter */ - if (ddsi_wr->topic->filter_fn && ! writekey) { + if (wr->m_topic->filter_fn) { abort(); } -#endif if (asleep) { thread_state_awake (thr); diff --git a/src/core/ddsi/src/ddsi_serdata_default.c b/src/core/ddsi/src/ddsi_serdata_default.c index 4bb6012..e050c8a 100644 --- a/src/core/ddsi/src/ddsi_serdata_default.c +++ b/src/core/ddsi/src/ddsi_serdata_default.c @@ -78,11 +78,6 @@ static size_t alignup_size (size_t x, size_t a) return (x+m) & ~m; } -static size_t alignup4 (size_t x) -{ - return alignup_size (x, 4); -} - void * ddsi_serstate_append (struct serstate * st, size_t n) { char *p; @@ -378,7 +373,7 @@ static struct ddsi_serdata *serdata_default_from_sample_plist (const struct ddsi /* if we're it is supposed to be just a key, rawkey must be be the first field and followed only by a sentinel */ assert (kind != SDK_KEY || rawkey == (const unsigned char *)sample->blob + sizeof (nn_parameter_t)); - assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup4 (keysize) + sizeof (nn_parameter_t)); + assert (kind != SDK_KEY || sample->size == sizeof (nn_parameter_t) + alignup_size (keysize, 4) + sizeof (nn_parameter_t)); return fix_serdata_default (d, tp->c.iid); } @@ -404,7 +399,7 @@ static void serdata_default_to_ser (const struct ddsi_serdata *serdata_common, s { const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common; assert (off < d->pos + sizeof(struct CDRHeader)); - assert (sz <= alignup4 (d->pos + sizeof(struct CDRHeader)) - off); + assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off); /* FIXME: maybe I should pull the header out ... */ memcpy (buf, (char *)&d->hdr + off, sz); } @@ -413,7 +408,7 @@ static struct ddsi_serdata *serdata_default_to_ser_ref (const struct ddsi_serdat { const struct ddsi_serdata_default *d = (const struct ddsi_serdata_default *)serdata_common; assert (off < d->pos + sizeof(struct CDRHeader)); - assert (sz <= alignup4 (d->pos + sizeof(struct CDRHeader)) - off); + assert (sz <= alignup_size (d->pos + sizeof(struct CDRHeader), 4) - off); ref->iov_base = (char *)&d->hdr + off; ref->iov_len = sz; return ddsi_serdata_ref(serdata_common);