diff --git a/src/core/ddsc/src/dds__types.h b/src/core/ddsc/src/dds__types.h index 4b4cb03..457a1b9 100644 --- a/src/core/ddsc/src/dds__types.h +++ b/src/core/ddsc/src/dds__types.h @@ -247,6 +247,7 @@ typedef struct dds_readcond struct { dds_querycondition_filter_fn m_filter; + unsigned m_index; /* index in RHC condition masks */ } m_query; } dds_readcond; diff --git a/src/core/ddsc/src/dds_readcond.c b/src/core/ddsc/src/dds_readcond.c index 3af397c..503e072 100644 --- a/src/core/ddsc/src/dds_readcond.c +++ b/src/core/ddsc/src/dds_readcond.c @@ -45,6 +45,7 @@ dds_create_readcond( cond->m_rd_guid = rd->m_entity.m_guid; if (kind == DDS_KIND_COND_QUERY) { cond->m_query.m_filter = filter; + cond->m_query.m_index = UINT_MAX; } dds_rhc_add_readcondition (cond); return cond; diff --git a/src/core/ddsc/src/dds_rhc.c b/src/core/ddsc/src/dds_rhc.c index b50fb89..2c704d5 100644 --- a/src/core/ddsc/src/dds_rhc.c +++ b/src/core/ddsc/src/dds_rhc.c @@ -152,6 +152,9 @@ even when generating an invalid sample for an unregister message using the tkmap data. */ +typedef uint32_t querycond_mask_t; +#define MAX_ATTACHED_QUERYCONDS (CHAR_BIT * sizeof (querycond_mask_t)) + #define TRACE(...) DDS_LOG(DDS_LC_RHC, __VA_ARGS__) /****************************** @@ -299,6 +302,7 @@ struct rhc os_mutex lock; dds_readcond * conds; /* List of associated read conditions */ uint32_t nconds; /* Number of associated read conditions */ + uint32_t nqconds; /* Number of associated query conditions */ }; struct trigger_info @@ -2129,17 +2133,20 @@ static uint32_t rhc_get_cond_trigger (struct rhc_instance * const inst, const dd return m ? 1 : 0; } -void dds_rhc_add_readcondition (dds_readcond * cond) +void dds_rhc_add_readcondition (dds_readcond *cond) { /* On the assumption that a readcondition will be attached to a waitset for nearly all of its life, we keep track of all readconditions on a reader in one set, without distinguishing between those attached to a waitset or not. */ - struct rhc * rhc = cond->m_rhc; + struct rhc *rhc = cond->m_rhc; struct ut_hhIter iter; struct rhc_instance * inst; + assert ((dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_READ && cond->m_query.m_filter == 0) || + (dds_entity_kind (&cond->m_entity) == DDS_KIND_COND_QUERY && cond->m_query.m_filter != 0)); + cond->m_qminv = qmask_from_dcpsquery (cond->m_sample_states, cond->m_view_states, cond->m_instance_states); os_mutexLock (&rhc->lock); @@ -2153,8 +2160,29 @@ void dds_rhc_add_readcondition (dds_readcond * cond) } } } + cond->m_rhc_next = rhc->conds; rhc->nconds++; + if (cond->m_query.m_filter) + { + /* Attaching a query condition means selecting an available index in the bitmasks of matching conditions, clearing the bit in all the masks & evaluating the condition on all samples. For the clearing and the setting of the instance mask, we need to scan all instances, not just the non-empty ones. */ + dds_readcond *rc; + querycond_mask_t inv_qcmask = ~(querycond_mask_t)0; + assert (cond->m_query.m_index == UINT_MAX); + for (rc = rhc->conds; rc != NULL; rc = rc->m_rhc_next) + if (rc->m_query.m_filter != 0) + { + assert (rc->m_query.m_index < MAX_ATTACHED_QUERYCONDS); + inv_qcmask &= ~((querycond_mask_t)1 << rc->m_query.m_index); + } + if (inv_qcmask == 0) abort (); /* FIXME: must return an error instead of crashing */ + unsigned index = 0; + while (!(inv_qcmask & ((querycond_mask_t)1 << index))) + index++; + cond->m_query.m_index = index; + + rhc->nqconds++; + } rhc->conds = cond; TRACE ("add_readcondition(%p, %x, %x, %x) => %p qminv %x ; rhc %u conds\n", @@ -2164,31 +2192,20 @@ void dds_rhc_add_readcondition (dds_readcond * cond) os_mutexUnlock (&rhc->lock); } -void dds_rhc_remove_readcondition (dds_readcond * cond) +void dds_rhc_remove_readcondition (dds_readcond *cond) { - struct rhc * rhc = cond->m_rhc; - dds_readcond * iter; - dds_readcond * prev = NULL; - + struct rhc *rhc = cond->m_rhc; + dds_readcond **ptr; os_mutexLock (&rhc->lock); - iter = rhc->conds; - while (iter) + ptr = &rhc->conds; + while (*ptr != cond) + ptr = &(*ptr)->m_rhc_next; + *ptr = (*ptr)->m_rhc_next; + rhc->nconds--; + if (cond->m_query.m_filter) { - if (iter == cond) - { - if (prev) - { - prev->m_rhc_next = iter->m_rhc_next; - } - else - { - rhc->conds = iter->m_rhc_next; - } - rhc->nconds--; - break; - } - prev = iter; - iter = iter->m_rhc_next; + rhc->nqconds--; + cond->m_query.m_index = UINT_MAX; } os_mutexUnlock (&rhc->lock); }