Lock entity QoS while accessing it
A QoS change can happen at the same time that a new reader for a built-in topic is provisioned with historical data, and so cause reading in inconsistent QoS, use-after-free or other fun things. During QoS matching it is also necessary to guarantee the QoS doesn't change (QoS changes affecting matching will be supported at some point, and manipulating complex data structures where bitmasks determine which parts are defined while reading the same data concurrently is a recipe for disaster. Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
103210bf8e
commit
09e08f7778
3 changed files with 34 additions and 4 deletions
|
@ -131,11 +131,12 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi
|
||||||
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
|
/* FIXME: not quite elegant to manage the creation of a serdata for a built-in topic via this function, but I also find it quite unelegant to let from_sample read straight from the underlying internal entity, and to_sample convert to the external format ... I could claim the internal entity is the "serialised form", but that forces wrapping it in a fragchain in one way or another, which, though possible, is also a bit lacking in elegance. */
|
||||||
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
|
const struct ddsi_sertopic_builtintopic *tp = (const struct ddsi_sertopic_builtintopic *)tpcmn;
|
||||||
/* keyhash must in host format (which the GUIDs always are internally) */
|
/* keyhash must in host format (which the GUIDs always are internally) */
|
||||||
const struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const nn_guid_t *) keyhash->value);
|
struct entity_common *entity = ephash_lookup_guid_untyped (tp->gv->guid_hash, (const nn_guid_t *) keyhash->value);
|
||||||
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
|
struct ddsi_serdata_builtintopic *d = serdata_builtin_new(tp, entity ? SDK_DATA : SDK_KEY);
|
||||||
memcpy (&d->key, keyhash->value, sizeof (d->key));
|
memcpy (&d->key, keyhash->value, sizeof (d->key));
|
||||||
if (entity)
|
if (entity)
|
||||||
{
|
{
|
||||||
|
ddsrt_mutex_lock (&entity->qos_lock);
|
||||||
switch (entity->kind)
|
switch (entity->kind)
|
||||||
{
|
{
|
||||||
case EK_PARTICIPANT:
|
case EK_PARTICIPANT:
|
||||||
|
@ -163,6 +164,7 @@ static struct ddsi_serdata *ddsi_serdata_builtin_from_keyhash (const struct ddsi
|
||||||
from_entity_pwr (d, (const struct proxy_writer *) entity);
|
from_entity_pwr (d, (const struct proxy_writer *) entity);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
ddsrt_mutex_unlock (&entity->qos_lock);
|
||||||
}
|
}
|
||||||
return fix_serdata_builtin(d, tp->c.serdata_basehash);
|
return fix_serdata_builtin(d, tp->c.serdata_basehash);
|
||||||
}
|
}
|
||||||
|
|
|
@ -142,6 +142,17 @@ struct entity_common {
|
||||||
ddsrt_mutex_t lock;
|
ddsrt_mutex_t lock;
|
||||||
bool onlylocal;
|
bool onlylocal;
|
||||||
struct q_globals *gv;
|
struct q_globals *gv;
|
||||||
|
|
||||||
|
/* QoS changes always lock the entity itself, and additionally
|
||||||
|
(and within the scope of the entity lock) acquire qos_lock
|
||||||
|
while manipulating the QoS. So any thread that needs to read
|
||||||
|
the QoS without acquiring the entity's lock can still do so
|
||||||
|
(e.g., the materialisation of samples for built-in topics
|
||||||
|
when connecting a reader to a writer for a built-in topic).
|
||||||
|
|
||||||
|
qos_lock lock order across entities in is in increasing
|
||||||
|
order of entity addresses cast to uintptr_t. */
|
||||||
|
ddsrt_mutex_t qos_lock;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct local_reader_ary {
|
struct local_reader_ary {
|
||||||
|
|
|
@ -180,6 +180,7 @@ static void entity_common_init (struct entity_common *e, struct q_globals *gv, c
|
||||||
e->onlylocal = onlylocal;
|
e->onlylocal = onlylocal;
|
||||||
e->gv = gv;
|
e->gv = gv;
|
||||||
ddsrt_mutex_init (&e->lock);
|
ddsrt_mutex_init (&e->lock);
|
||||||
|
ddsrt_mutex_init (&e->qos_lock);
|
||||||
if (builtintopic_is_visible (gv->builtin_topic_interface, guid, vendorid))
|
if (builtintopic_is_visible (gv->builtin_topic_interface, guid, vendorid))
|
||||||
{
|
{
|
||||||
e->tk = builtintopic_get_tkmap_entry (gv->builtin_topic_interface, guid);
|
e->tk = builtintopic_get_tkmap_entry (gv->builtin_topic_interface, guid);
|
||||||
|
@ -197,6 +198,7 @@ static void entity_common_fini (struct entity_common *e)
|
||||||
if (e->tk)
|
if (e->tk)
|
||||||
ddsi_tkmap_instance_unref (e->gv->m_tkmap, e->tk);
|
ddsi_tkmap_instance_unref (e->gv->m_tkmap, e->tk);
|
||||||
ddsrt_free (e->name);
|
ddsrt_free (e->name);
|
||||||
|
ddsrt_mutex_destroy (&e->qos_lock);
|
||||||
ddsrt_mutex_destroy (&e->lock);
|
ddsrt_mutex_destroy (&e->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,8 +394,10 @@ static bool update_qos_locked (struct entity_common *e, dds_qos_t *ent_qos, cons
|
||||||
/* no change, or an as-yet unsupported one */
|
/* no change, or an as-yet unsupported one */
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
ddsrt_mutex_lock (&e->qos_lock);
|
||||||
nn_xqos_fini_mask (ent_qos, mask);
|
nn_xqos_fini_mask (ent_qos, mask);
|
||||||
nn_xqos_mergein_missing (ent_qos, xqos, mask);
|
nn_xqos_mergein_missing (ent_qos, xqos, mask);
|
||||||
|
ddsrt_mutex_unlock (&e->qos_lock);
|
||||||
builtintopic_write (e->gv->builtin_topic_interface, e, timestamp, true);
|
builtintopic_write (e->gv->builtin_topic_interface, e, timestamp, true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -2088,6 +2092,19 @@ static void reader_qos_mismatch (struct reader * rd, dds_qos_policy_id_t reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool qos_match_p_lock (struct entity_common *ea, const dds_qos_t *a, struct entity_common *eb, const dds_qos_t *b, dds_qos_policy_id_t *reason)
|
||||||
|
{
|
||||||
|
assert (ea != eb);
|
||||||
|
ddsrt_mutex_t * const locks[] = { &ea->qos_lock, &eb->qos_lock, &ea->qos_lock };
|
||||||
|
const int shift = (uintptr_t) ea > (uintptr_t) eb;
|
||||||
|
for (int i = 0; i < 2; i++)
|
||||||
|
ddsrt_mutex_lock (locks[i + shift]);
|
||||||
|
bool ret = qos_match_p (a, b, reason);
|
||||||
|
for (int i = 0; i < 2; i++)
|
||||||
|
ddsrt_mutex_unlock (locks[i + shift]);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static void connect_writer_with_proxy_reader (struct writer *wr, struct proxy_reader *prd, nn_mtime_t tnow)
|
static void connect_writer_with_proxy_reader (struct writer *wr, struct proxy_reader *prd, nn_mtime_t tnow)
|
||||||
{
|
{
|
||||||
const int isb0 = (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) != 0);
|
const int isb0 = (is_builtin_entityid (wr->e.guid.entityid, NN_VENDORID_ECLIPSE) != 0);
|
||||||
|
@ -2098,7 +2115,7 @@ static void connect_writer_with_proxy_reader (struct writer *wr, struct proxy_re
|
||||||
return;
|
return;
|
||||||
if (wr->e.onlylocal)
|
if (wr->e.onlylocal)
|
||||||
return;
|
return;
|
||||||
if (!isb0 && !qos_match_p (prd->c.xqos, wr->xqos, &reason))
|
if (!isb0 && !qos_match_p_lock (&prd->e, prd->c.xqos, &wr->e, wr->xqos, &reason))
|
||||||
{
|
{
|
||||||
writer_qos_mismatch (wr, reason);
|
writer_qos_mismatch (wr, reason);
|
||||||
return;
|
return;
|
||||||
|
@ -2117,7 +2134,7 @@ static void connect_proxy_writer_with_reader (struct proxy_writer *pwr, struct r
|
||||||
return;
|
return;
|
||||||
if (rd->e.onlylocal)
|
if (rd->e.onlylocal)
|
||||||
return;
|
return;
|
||||||
if (!isb0 && !qos_match_p (rd->xqos, pwr->c.xqos, &reason))
|
if (!isb0 && !qos_match_p_lock (&rd->e, rd->xqos, &pwr->e, pwr->c.xqos, &reason))
|
||||||
{
|
{
|
||||||
reader_qos_mismatch (rd, reason);
|
reader_qos_mismatch (rd, reason);
|
||||||
return;
|
return;
|
||||||
|
@ -2159,7 +2176,7 @@ static void connect_writer_with_reader (struct writer *wr, struct reader *rd, nn
|
||||||
return;
|
return;
|
||||||
if (ignore_local_p (&wr->e.guid, &rd->e.guid, wr->xqos, rd->xqos))
|
if (ignore_local_p (&wr->e.guid, &rd->e.guid, wr->xqos, rd->xqos))
|
||||||
return;
|
return;
|
||||||
if (!qos_match_p (rd->xqos, wr->xqos, &reason))
|
if (!qos_match_p_lock (&rd->e, rd->xqos, &wr->e, wr->xqos, &reason))
|
||||||
{
|
{
|
||||||
writer_qos_mismatch (wr, reason);
|
writer_qos_mismatch (wr, reason);
|
||||||
reader_qos_mismatch (rd, reason);
|
reader_qos_mismatch (rd, reason);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue