diff --git a/src/core/ddsc/src/dds_serdata_builtintopic.c b/src/core/ddsc/src/dds_serdata_builtintopic.c index 689abe9..1a97a40 100644 --- a/src/core/ddsc/src/dds_serdata_builtintopic.c +++ b/src/core/ddsc/src/dds_serdata_builtintopic.c @@ -131,11 +131,12 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi /* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */ const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn; /* keyhash must in host format (which the GUIDs always are internally) */ - const struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const nn_guid_t *) keyhash->value); + struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const nn_guid_t *) keyhash->value); struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY); memcpy (&d->key, keyhash->value, sizeof (d->key)); if (entity) { + ddsrt_mutex_lock (&entity->qos_lock); switch (entity->kind) { case EK_PARTICIPANT: @@ -163,6 +164,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi from_entity_pwr (d, (const struct proxy_writer *) entity); break; } + ddsrt_mutex_unlock (&entity->qos_lock); } return fix_serdata_builtin(d, tp->c.serdata_basehash); } diff --git a/src/core/ddsi/include/dds/ddsi/q_entity.h b/src/core/ddsi/include/dds/ddsi/q_entity.h index 0acd3da..3aba1f7 100644 --- a/src/core/ddsi/include/dds/ddsi/q_entity.h +++ b/src/core/ddsi/include/dds/ddsi/q_entity.h @@ -142,6 +142,17 @@ struct entity_common { ddsrt_mutex_t lock; bool onlylocal; struct q_globals *gv; + + /* QoS changes always lock the entity itself, and additionally + (and within the scope of the entity lock) acquire qos_lock + while manipulating the QoS. So any thread that needs to read + the QoS without acquiring the entity's lock can still do so + (e.g., the materialisation of samples for built-in topics + when connecting a reader to a writer for a built-in topic). + + qos_lock lock order across entities in is in increasing + order of entity addresses cast to uintptr_t. */ + ddsrt_mutex_t qos_lock; }; struct local_reader_ary { diff --git a/src/core/ddsi/src/q_entity.c b/src/core/ddsi/src/q_entity.c index da19224..57eb62f 100644 --- a/src/core/ddsi/src/q_entity.c +++ b/src/core/ddsi/src/q_entity.c @@ -180,6 +180,7 @@ static void entity_common_init (struct entity_common *e, struct q_globals *gv, c e->onlylocal = onlylocal; e->gv = gv; ddsrt_mutex_init (&e->lock); + ddsrt_mutex_init (&e->qos_lock); if (builtintopic_is_visible (gv->builtin_topic_interface, guid, vendorid)) { e->tk = builtintopic_get_tkmap_entry (gv->builtin_topic_interface, guid); @@ -197,6 +198,7 @@ static void entity_common_fini (struct entity_common *e) if (e->tk) ddsi_tkmap_instance_unref (e->gv->m_tkmap, e->tk); ddsrt_free (e->name); + ddsrt_mutex_destroy (&e->qos_lock); ddsrt_mutex_destroy (&e->lock); } @@ -392,8 +394,10 @@ static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, cons /* no change, or an as-yet unsupported one */ return false; + ddsrt_mutex_lock (&e->qos_lock); nn_xqos_fini_mask (ent_qos, mask); nn_xqos_mergein_missing (ent_qos, xqos, mask); + ddsrt_mutex_unlock (&e->qos_lock); builtintopic_write (e->gv->builtin_topic_interface, e, timestamp, true); return true; } @@ -2088,6 +2092,19 @@ static void reader_qos_mismatch (struct reader * rd, dds_qos_policy_id_t reason) } } +static bool qos_match_p_lock (struct entity_common *ea, const dds_qos_t *a, struct entity_common *eb, const dds_qos_t *b, dds_qos_policy_id_t *reason) +{ + assert (ea != eb); + ddsrt_mutex_t * const locks[] = { &ea->qos_lock, &eb->qos_lock, &ea->qos_lock }; + const int shift = (uintptr_t) ea > (uintptr_t) eb; + for (int i = 0; i < 2; i++) + ddsrt_mutex_lock (locks[i + shift]); + bool ret = qos_match_p (a, b, reason); + for (int i = 0; i < 2; i++) + ddsrt_mutex_unlock (locks[i + shift]); + return ret; +} + static void connect_writer_with_proxy_reader (struct writer *wr, struct proxy_reader *prd, nn_mtime_t tnow) { const int isb0 = (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) != 0); @@ -2098,7 +2115,7 @@ static void connect_writer_with_proxy_reader (struct writer *wr, struct proxy_re return; if (wr->e.onlylocal) return; - if (!isb0 && !qos_match_p (prd->c.xqos, wr->xqos, &reason)) + if (!isb0 && !qos_match_p_lock (&prd->e, prd->c.xqos, &wr->e, wr->xqos, &reason)) { writer_qos_mismatch (wr, reason); return; @@ -2117,7 +2134,7 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r return; if (rd->e.onlylocal) return; - if (!isb0 && !qos_match_p (rd->xqos, pwr->c.xqos, &reason)) + if (!isb0 && !qos_match_p_lock (&rd->e, rd->xqos, &pwr->e, pwr->c.xqos, &reason)) { reader_qos_mismatch (rd, reason); return; @@ -2159,7 +2176,7 @@ static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn return; if (ignore_local_p (&wr->e.guid, &rd->e.guid, wr->xqos, rd->xqos)) return; - if (!qos_match_p (rd->xqos, wr->xqos, &reason)) + if (!qos_match_p_lock (&rd->e, rd->xqos, &wr->e, wr->xqos, &reason)) { writer_qos_mismatch (wr, reason); reader_qos_mismatch (rd, reason);