eliminate support for multiple serstatepools: never used and a source of race conditions during shutdown
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
520ca47938
commit
ed551eecc1
9 changed files with 27 additions and 27 deletions
|
@ -70,7 +70,7 @@ dds_instance_find(
|
||||||
_In_ const void *data,
|
_In_ const void *data,
|
||||||
_In_ const bool create)
|
_In_ const bool create)
|
||||||
{
|
{
|
||||||
serdata_t sd = serialize_key (gv.serpool, topic->m_stopic, data);
|
serdata_t sd = serialize_key (topic->m_stopic, data);
|
||||||
struct tkmap_instance * inst = dds_tkmap_find (sd, false, create);
|
struct tkmap_instance * inst = dds_tkmap_find (sd, false, create);
|
||||||
ddsi_serdata_unref (sd);
|
ddsi_serdata_unref (sd);
|
||||||
return inst;
|
return inst;
|
||||||
|
@ -419,7 +419,7 @@ dds_instance_lookup(
|
||||||
|
|
||||||
topic = dds_instance_info_by_hdl (entity);
|
topic = dds_instance_info_by_hdl (entity);
|
||||||
if (topic) {
|
if (topic) {
|
||||||
sd = serialize_key (gv.serpool, topic->m_stopic, data);
|
sd = serialize_key (topic->m_stopic, data);
|
||||||
ih = dds_tkmap_lookup (map, sd);
|
ih = dds_tkmap_lookup (map, sd);
|
||||||
ddsi_serdata_unref (sd);
|
ddsi_serdata_unref (sd);
|
||||||
ret = DDS_RETCODE_OK;
|
ret = DDS_RETCODE_OK;
|
||||||
|
|
|
@ -212,9 +212,9 @@ dds_write_impl(
|
||||||
|
|
||||||
/* Serialize and write data or key */
|
/* Serialize and write data or key */
|
||||||
if (writekey) {
|
if (writekey) {
|
||||||
d = serialize_key (gv.serpool, ddsi_wr->topic, data);
|
d = serialize_key (ddsi_wr->topic, data);
|
||||||
} else {
|
} else {
|
||||||
d = serialize (gv.serpool, ddsi_wr->topic, data);
|
d = serialize (ddsi_wr->topic, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Set if disposing or unregistering */
|
/* Set if disposing or unregistering */
|
||||||
|
@ -287,7 +287,7 @@ dds_writecdr_impl(
|
||||||
|
|
||||||
/* Serialize and write data or key */
|
/* Serialize and write data or key */
|
||||||
{
|
{
|
||||||
serstate_t st = ddsi_serstate_new (gv.serpool, ddsi_wr->topic);
|
serstate_t st = ddsi_serstate_new (ddsi_wr->topic);
|
||||||
dds_stream_t is;
|
dds_stream_t is;
|
||||||
ddsi_serstate_append_blob(st, 1, sz, cdr);
|
ddsi_serstate_append_blob(st, 1, sz, cdr);
|
||||||
d = ddsi_serstate_fix(st);
|
d = ddsi_serstate_fix(st);
|
||||||
|
|
|
@ -22,8 +22,8 @@ extern "C" {
|
||||||
int serdata_cmp (const struct serdata * a, const struct serdata * b);
|
int serdata_cmp (const struct serdata * a, const struct serdata * b);
|
||||||
uint32_t serdata_hash (const struct serdata *a);
|
uint32_t serdata_hash (const struct serdata *a);
|
||||||
|
|
||||||
serdata_t serialize (serstatepool_t pool, const struct sertopic * tp, const void * sample);
|
serdata_t serialize (const struct sertopic * tp, const void * sample);
|
||||||
serdata_t serialize_key (serstatepool_t pool, const struct sertopic * tp, const void * sample);
|
serdata_t serialize_key (const struct sertopic * tp, const void * sample);
|
||||||
|
|
||||||
void deserialize_into (void *sample, const struct serdata *serdata);
|
void deserialize_into (void *sample, const struct serdata *serdata);
|
||||||
void free_deserialized (const struct serdata *serdata, void *vx);
|
void free_deserialized (const struct serdata *serdata, void *vx);
|
||||||
|
|
|
@ -18,10 +18,10 @@
|
||||||
#include "ddsi/q_bswap.h"
|
#include "ddsi/q_bswap.h"
|
||||||
#include "q__osplser.h"
|
#include "q__osplser.h"
|
||||||
|
|
||||||
serdata_t serialize (serstatepool_t pool, const struct sertopic * tp, const void * sample)
|
serdata_t serialize (const struct sertopic * tp, const void * sample)
|
||||||
{
|
{
|
||||||
dds_stream_t os;
|
dds_stream_t os;
|
||||||
serstate_t st = ddsi_serstate_new (pool, tp);
|
serstate_t st = ddsi_serstate_new (tp);
|
||||||
dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample);
|
dds_key_gen ((const dds_topic_descriptor_t*) tp->type, &st->data->v.keyhash, (char*) sample);
|
||||||
dds_stream_from_serstate (&os, st);
|
dds_stream_from_serstate (&os, st);
|
||||||
dds_stream_write_sample (&os, sample, tp);
|
dds_stream_write_sample (&os, sample, tp);
|
||||||
|
@ -58,12 +58,12 @@ int serdata_cmp (const struct serdata *a, const struct serdata *b)
|
||||||
return memcmp (a->v.keyhash.m_hash, b->v.keyhash.m_hash, 16);
|
return memcmp (a->v.keyhash.m_hash, b->v.keyhash.m_hash, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
serdata_t serialize_key (serstatepool_t pool, const struct sertopic * tp, const void * sample)
|
serdata_t serialize_key (const struct sertopic * tp, const void * sample)
|
||||||
{
|
{
|
||||||
serdata_t sd;
|
serdata_t sd;
|
||||||
dds_stream_t os;
|
dds_stream_t os;
|
||||||
dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type;
|
dds_topic_descriptor_t * desc = (dds_topic_descriptor_t*) tp->type;
|
||||||
serstate_t st = ddsi_serstate_new (pool, tp);
|
serstate_t st = ddsi_serstate_new (tp);
|
||||||
dds_key_gen (desc, &st->data->v.keyhash, (char*) sample);
|
dds_key_gen (desc, &st->data->v.keyhash, (char*) sample);
|
||||||
dds_stream_from_serstate (&os, st);
|
dds_stream_from_serstate (&os, st);
|
||||||
dds_stream_write_key (&os, sample, desc);
|
dds_stream_write_key (&os, sample, desc);
|
||||||
|
|
|
@ -180,7 +180,7 @@ OSAPI_EXPORT void ddsi_serstate_set_msginfo
|
||||||
serstate_t st, unsigned statusinfo, nn_wctime_t timestamp,
|
serstate_t st, unsigned statusinfo, nn_wctime_t timestamp,
|
||||||
void * dummy
|
void * dummy
|
||||||
);
|
);
|
||||||
OSAPI_EXPORT serstate_t ddsi_serstate_new (serstatepool_t pool, const struct sertopic * topic);
|
OSAPI_EXPORT serstate_t ddsi_serstate_new (const struct sertopic * topic);
|
||||||
OSAPI_EXPORT serdata_t ddsi_serstate_fix (serstate_t st);
|
OSAPI_EXPORT serdata_t ddsi_serstate_fix (serstate_t st);
|
||||||
nn_mtime_t ddsi_serstate_twrite (const struct serstate *serstate);
|
nn_mtime_t ddsi_serstate_twrite (const struct serstate *serstate);
|
||||||
void ddsi_serstate_set_twrite (struct serstate *serstate, nn_mtime_t twrite);
|
void ddsi_serstate_set_twrite (struct serstate *serstate, nn_mtime_t twrite);
|
||||||
|
|
|
@ -55,8 +55,8 @@ static void serstate_free_wrap (void *elem)
|
||||||
|
|
||||||
void ddsi_serstatepool_free (serstatepool_t pool)
|
void ddsi_serstatepool_free (serstatepool_t pool)
|
||||||
{
|
{
|
||||||
nn_freelist_fini (&pool->freelist, serstate_free_wrap);
|
|
||||||
TRACE (("ddsi_serstatepool_free(%p)\n", pool));
|
TRACE (("ddsi_serstatepool_free(%p)\n", pool));
|
||||||
|
nn_freelist_fini (&pool->freelist, serstate_free_wrap);
|
||||||
os_free (pool);
|
os_free (pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,13 +86,13 @@ void ddsi_serdata_set_twrite (serdata_t serdata, nn_mtime_t twrite)
|
||||||
ddsi_serstate_set_twrite (serdata->v.st, twrite);
|
ddsi_serstate_set_twrite (serdata->v.st, twrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
serstate_t ddsi_serstate_new (serstatepool_t pool, const struct sertopic * topic)
|
serstate_t ddsi_serstate_new (const struct sertopic * topic)
|
||||||
{
|
{
|
||||||
serstate_t st;
|
serstate_t st;
|
||||||
if ((st = nn_freelist_pop (&pool->freelist)) != NULL)
|
if ((st = nn_freelist_pop (&gv.serpool->freelist)) != NULL)
|
||||||
serstate_init (st, topic);
|
serstate_init (st, topic);
|
||||||
else
|
else
|
||||||
st = serstate_allocnew (pool, topic);
|
st = serstate_allocnew (gv.serpool, topic);
|
||||||
return st;
|
return st;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -320,7 +320,7 @@ int spdp_write (struct participant *pp)
|
||||||
|
|
||||||
/* A NULL topic implies a parameter list, now that we do PMD through
|
/* A NULL topic implies a parameter list, now that we do PMD through
|
||||||
the serializer */
|
the serializer */
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
||||||
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
||||||
kh = nn_hton_guid (pp->e.guid);
|
kh = nn_hton_guid (pp->e.guid);
|
||||||
|
@ -357,7 +357,7 @@ int spdp_dispose_unregister (struct participant *pp)
|
||||||
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
|
nn_plist_addtomsg (mpayload, &ps, ~(uint64_t)0, ~(uint64_t)0);
|
||||||
nn_xmsg_addpar_sentinel (mpayload);
|
nn_xmsg_addpar_sentinel (mpayload);
|
||||||
|
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
||||||
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
||||||
kh = nn_hton_guid (pp->e.guid);
|
kh = nn_hton_guid (pp->e.guid);
|
||||||
|
@ -968,7 +968,7 @@ static int sedp_write_endpoint
|
||||||
|
|
||||||
/* Then we take the payload from the message and turn it into a
|
/* Then we take the payload from the message and turn it into a
|
||||||
serdata, and then we can write it as normal data */
|
serdata, and then we can write it as normal data */
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
||||||
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
||||||
kh = nn_hton_guid (*epguid);
|
kh = nn_hton_guid (*epguid);
|
||||||
|
@ -1430,7 +1430,7 @@ int sedp_write_topic (struct participant *pp, const struct nn_plist *datap)
|
||||||
nn_plist_addtomsg (mpayload, datap, ~(uint64_t)0, delta);
|
nn_plist_addtomsg (mpayload, datap, ~(uint64_t)0, delta);
|
||||||
nn_xmsg_addpar_sentinel (mpayload);
|
nn_xmsg_addpar_sentinel (mpayload);
|
||||||
|
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
||||||
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
||||||
|
|
||||||
|
@ -1500,7 +1500,7 @@ int sedp_write_cm_participant (struct participant *pp, int alive)
|
||||||
|
|
||||||
/* Then we take the payload from the message and turn it into a
|
/* Then we take the payload from the message and turn it into a
|
||||||
serdata, and then we can write it as normal data */
|
serdata, and then we can write it as normal data */
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
||||||
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
||||||
kh = nn_hton_guid (pp->e.guid);
|
kh = nn_hton_guid (pp->e.guid);
|
||||||
|
@ -1610,7 +1610,7 @@ int sedp_write_cm_publisher (const struct nn_plist *datap, int alive)
|
||||||
|
|
||||||
/* Then we take the payload from the message and turn it into a
|
/* Then we take the payload from the message and turn it into a
|
||||||
serdata, and then we can write it as normal data */
|
serdata, and then we can write it as normal data */
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
||||||
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
||||||
kh = nn_hton_guid (datap->group_guid);
|
kh = nn_hton_guid (datap->group_guid);
|
||||||
|
@ -1665,7 +1665,7 @@ int sedp_write_cm_subscriber (const struct nn_plist *datap, int alive)
|
||||||
|
|
||||||
/* Then we take the payload from the message and turn it into a
|
/* Then we take the payload from the message and turn it into a
|
||||||
serdata, and then we can write it as normal data */
|
serdata, and then we can write it as normal data */
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
payload_blob = nn_xmsg_payload (&payload_sz, mpayload);
|
||||||
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
ddsi_serstate_append_blob (serstate, 4, payload_sz, payload_blob);
|
||||||
kh = nn_hton_guid (datap->group_guid);
|
kh = nn_hton_guid (datap->group_guid);
|
||||||
|
|
|
@ -1783,7 +1783,7 @@ static serstate_t make_raw_serstate
|
||||||
unsigned statusinfo, nn_wctime_t tstamp
|
unsigned statusinfo, nn_wctime_t tstamp
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
serstate_t st = ddsi_serstate_new (gv.serpool, topic);
|
serstate_t st = ddsi_serstate_new (topic);
|
||||||
ddsi_serstate_set_msginfo (st, statusinfo, tstamp, NULL);
|
ddsi_serstate_set_msginfo (st, statusinfo, tstamp, NULL);
|
||||||
st->kind = justkey ? STK_KEY : STK_DATA;
|
st->kind = justkey ? STK_KEY : STK_DATA;
|
||||||
/* the CDR header is always fully contained in the first fragment
|
/* the CDR header is always fully contained in the first fragment
|
||||||
|
@ -1882,7 +1882,7 @@ static serdata_t extract_sample_from_data
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
serstate_t st;
|
serstate_t st;
|
||||||
st = ddsi_serstate_new (gv.serpool, topic);
|
st = ddsi_serstate_new (topic);
|
||||||
ddsi_serstate_set_msginfo (st, statusinfo, tstamp, NULL);
|
ddsi_serstate_set_msginfo (st, statusinfo, tstamp, NULL);
|
||||||
st->kind = STK_KEY;
|
st->kind = STK_KEY;
|
||||||
ddsi_serstate_append_blob (st, 1, sizeof (qos->keyhash), qos->keyhash.value);
|
ddsi_serstate_append_blob (st, 1, sizeof (qos->keyhash), qos->keyhash.value);
|
||||||
|
|
|
@ -1001,7 +1001,7 @@ static void handle_xevk_spdp (UNUSED_ARG (struct nn_xpack *xp), struct xevent *e
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Look up data in (transient-local) WHC by key value */
|
/* Look up data in (transient-local) WHC by key value */
|
||||||
if ((st = ddsi_serstate_new (gv.serpool, NULL)) == NULL)
|
if ((st = ddsi_serstate_new (NULL)) == NULL)
|
||||||
{
|
{
|
||||||
TRACE (("xmit spdp: skip %x:%x:%x:%x: out of memory\n", PGUID (ev->u.spdp.pp_guid)));
|
TRACE (("xmit spdp: skip %x:%x:%x:%x: out of memory\n", PGUID (ev->u.spdp.pp_guid)));
|
||||||
goto skip;
|
goto skip;
|
||||||
|
@ -1120,7 +1120,7 @@ static void write_pmd_message (struct nn_xpack *xp, struct participant *pp, unsi
|
||||||
u.pmd.length = PMD_DATA_LENGTH;
|
u.pmd.length = PMD_DATA_LENGTH;
|
||||||
memset (u.pmd.value, 0, u.pmd.length);
|
memset (u.pmd.value, 0, u.pmd.length);
|
||||||
|
|
||||||
serstate = ddsi_serstate_new (gv.serpool, NULL);
|
serstate = ddsi_serstate_new (NULL);
|
||||||
ddsi_serstate_append_blob (serstate, 4, sizeof (u.pad), &u.pmd);
|
ddsi_serstate_append_blob (serstate, 4, sizeof (u.pad), &u.pmd);
|
||||||
serstate_set_key (serstate, 0, &u.pmd);
|
serstate_set_key (serstate, 0, &u.pmd);
|
||||||
ddsi_serstate_set_msginfo (serstate, 0, now (), NULL);
|
ddsi_serstate_set_msginfo (serstate, 0, now (), NULL);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue