extend sertopic interface and move the concept of a type descriptor to just the sertopic definition
Signed-off-by: Erik Boasson <eb@ilities.com>
This commit is contained in:
parent
c169df6227
commit
e631567c35
11 changed files with 269 additions and 124 deletions
|
@ -947,7 +947,7 @@ dds_lookup_participant(
|
||||||
_In_ size_t size);
|
_In_ size_t size);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Creates a new topic.
|
* @brief Creates a new topic with default type handling.
|
||||||
*
|
*
|
||||||
* The type name for the topic is taken from the generated descriptor. Topic
|
* The type name for the topic is taken from the generated descriptor. Topic
|
||||||
* matching is done on a combination of topic name and type name.
|
* matching is done on a combination of topic name and type name.
|
||||||
|
@ -975,6 +975,39 @@ dds_create_topic(
|
||||||
_In_opt_ const dds_qos_t *qos,
|
_In_opt_ const dds_qos_t *qos,
|
||||||
_In_opt_ const dds_listener_t *listener);
|
_In_opt_ const dds_listener_t *listener);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Creates a new topic with arbitrary type handling.
|
||||||
|
*
|
||||||
|
* The type name for the topic is taken from the provided "sertopic" object. Topic
|
||||||
|
* matching is done on a combination of topic name and type name.
|
||||||
|
*
|
||||||
|
* @param[in] participant Participant on which to create the topic.
|
||||||
|
* @param[in] sertopic Internal description of the topic type.
|
||||||
|
* @param[in] name Name of the topic.
|
||||||
|
* @param[in] qos QoS to set on the new topic (can be NULL).
|
||||||
|
* @param[in] listener Any listener functions associated with the new topic (can be NULL).
|
||||||
|
* @param[in] sedp_plist Topic description to be published as part of discovery (if NULL, not published).
|
||||||
|
*
|
||||||
|
* @returns A valid topic handle or an error code.
|
||||||
|
*
|
||||||
|
* @retval >=0
|
||||||
|
* A valid topic handle.
|
||||||
|
* @retval DDS_RETCODE_BAD_PARAMETER
|
||||||
|
* Either participant, descriptor, name or qos is invalid.
|
||||||
|
*/
|
||||||
|
/* TODO: Check list of retcodes is complete. */
|
||||||
|
struct ddsi_sertopic;
|
||||||
|
struct nn_plist;
|
||||||
|
_Pre_satisfies_((participant & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)
|
||||||
|
DDS_EXPORT dds_entity_t
|
||||||
|
dds_create_topic_arbitrary (
|
||||||
|
_In_ dds_entity_t participant,
|
||||||
|
_In_ struct ddsi_sertopic *sertopic,
|
||||||
|
_In_z_ const char *name,
|
||||||
|
_In_opt_ const dds_qos_t *qos,
|
||||||
|
_In_opt_ const dds_listener_t *listener,
|
||||||
|
_In_opt_ const struct nn_plist *sedp_plist);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Finds a named topic.
|
* @brief Finds a named topic.
|
||||||
*
|
*
|
||||||
|
|
|
@ -169,7 +169,7 @@ typedef struct dds_reader
|
||||||
struct reader * m_rd;
|
struct reader * m_rd;
|
||||||
bool m_data_on_readers;
|
bool m_data_on_readers;
|
||||||
bool m_loan_out;
|
bool m_loan_out;
|
||||||
char * m_loan;
|
void * m_loan;
|
||||||
uint32_t m_loan_size;
|
uint32_t m_loan_size;
|
||||||
|
|
||||||
/* Status metrics */
|
/* Status metrics */
|
||||||
|
@ -209,7 +209,6 @@ typedef struct dds_topic
|
||||||
{
|
{
|
||||||
struct dds_entity m_entity;
|
struct dds_entity m_entity;
|
||||||
struct ddsi_sertopic * m_stopic;
|
struct ddsi_sertopic * m_stopic;
|
||||||
const dds_topic_descriptor_t * m_descriptor;
|
|
||||||
|
|
||||||
dds_topic_intern_filter_fn filter_fn;
|
dds_topic_intern_filter_fn filter_fn;
|
||||||
void * filter_ctx;
|
void * filter_ctx;
|
||||||
|
|
|
@ -262,15 +262,14 @@ dds_unregister_instance_ih_ts(
|
||||||
|
|
||||||
map = gv.m_tkmap;
|
map = gv.m_tkmap;
|
||||||
topic = dds_instance_info((dds_entity*)wr);
|
topic = dds_instance_info((dds_entity*)wr);
|
||||||
sample = dds_alloc (topic->m_descriptor->m_size);
|
sample = ddsi_sertopic_alloc_sample (topic->m_stopic);
|
||||||
if (ddsi_tkmap_get_key (map, topic->m_stopic, handle, sample)) {
|
if (ddsi_tkmap_get_key (map, topic->m_stopic, handle, sample)) {
|
||||||
ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action);
|
ret = dds_write_impl ((dds_writer*)wr, sample, timestamp, action);
|
||||||
} else{
|
} else{
|
||||||
DDS_ERROR("No instance related with the provided handle is found\n");
|
DDS_ERROR("No instance related with the provided handle is found\n");
|
||||||
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
||||||
}
|
}
|
||||||
dds_sample_free (sample, topic->m_descriptor, DDS_FREE_ALL);
|
ddsi_sertopic_free_sample (topic->m_stopic, sample, DDS_FREE_ALL);
|
||||||
|
|
||||||
dds_entity_unlock(wr);
|
dds_entity_unlock(wr);
|
||||||
err:
|
err:
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -356,14 +355,14 @@ dds_dispose_ih_ts(
|
||||||
if (rc == DDS_RETCODE_OK) {
|
if (rc == DDS_RETCODE_OK) {
|
||||||
struct ddsi_tkmap *map = gv.m_tkmap;
|
struct ddsi_tkmap *map = gv.m_tkmap;
|
||||||
const dds_topic *topic = dds_instance_info((dds_entity*)wr);
|
const dds_topic *topic = dds_instance_info((dds_entity*)wr);
|
||||||
void *sample = dds_alloc (topic->m_descriptor->m_size);
|
void *sample = ddsi_sertopic_alloc_sample (topic->m_stopic);
|
||||||
if (ddsi_tkmap_get_key (map, topic->m_stopic, handle, sample)) {
|
if (ddsi_tkmap_get_key (map, topic->m_stopic, handle, sample)) {
|
||||||
ret = dds_dispose_impl(wr, sample, handle, timestamp);
|
ret = dds_dispose_impl(wr, sample, handle, timestamp);
|
||||||
} else {
|
} else {
|
||||||
DDS_ERROR("No instance related with the provided handle is found\n");
|
DDS_ERROR("No instance related with the provided handle is found\n");
|
||||||
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
ret = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
||||||
}
|
}
|
||||||
dds_free(sample);
|
ddsi_sertopic_free_sample (topic->m_stopic, sample, DDS_FREE_ALL);
|
||||||
dds_writer_unlock(wr);
|
dds_writer_unlock(wr);
|
||||||
} else {
|
} else {
|
||||||
DDS_ERROR("Error occurred on locking writer\n");
|
DDS_ERROR("Error occurred on locking writer\n");
|
||||||
|
@ -424,8 +423,7 @@ dds_instance_get_key(
|
||||||
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
ret = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||||
goto err;
|
goto err;
|
||||||
}
|
}
|
||||||
memset (data, 0, topic->m_descriptor->m_size);
|
ddsi_sertopic_zero_sample (topic->m_stopic, data);
|
||||||
|
|
||||||
if (ddsi_tkmap_get_key (map, topic->m_stopic, inst, data)) {
|
if (ddsi_tkmap_get_key (map, topic->m_stopic, inst, data)) {
|
||||||
ret = DDS_RETCODE_OK;
|
ret = DDS_RETCODE_OK;
|
||||||
} else{
|
} else{
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "ddsi/q_thread.h"
|
#include "ddsi/q_thread.h"
|
||||||
#include "ddsi/q_ephash.h"
|
#include "ddsi/q_ephash.h"
|
||||||
#include "ddsi/q_entity.h"
|
#include "ddsi/q_entity.h"
|
||||||
|
#include "ddsi/ddsi_sertopic.h"
|
||||||
|
|
||||||
|
|
||||||
static _Check_return_ dds__retcode_t
|
static _Check_return_ dds__retcode_t
|
||||||
|
@ -93,7 +94,6 @@ dds_read_impl(
|
||||||
_In_ bool lock,
|
_In_ bool lock,
|
||||||
_In_ bool only_reader)
|
_In_ bool only_reader)
|
||||||
{
|
{
|
||||||
uint32_t i;
|
|
||||||
dds_return_t ret = DDS_RETCODE_OK;
|
dds_return_t ret = DDS_RETCODE_OK;
|
||||||
dds__retcode_t rc;
|
dds__retcode_t rc;
|
||||||
struct dds_reader * rd;
|
struct dds_reader * rd;
|
||||||
|
@ -146,28 +146,24 @@ dds_read_impl(
|
||||||
}
|
}
|
||||||
/* Allocate samples if not provided (assuming all or none provided) */
|
/* Allocate samples if not provided (assuming all or none provided) */
|
||||||
if (buf[0] == NULL) {
|
if (buf[0] == NULL) {
|
||||||
char * loan;
|
|
||||||
const size_t sz = rd->m_topic->m_descriptor->m_size;
|
|
||||||
const uint32_t loan_size = (uint32_t) (sz * maxs);
|
|
||||||
/* Allocate, use or reallocate loan cached on reader */
|
/* Allocate, use or reallocate loan cached on reader */
|
||||||
if (rd->m_loan_out) {
|
if (rd->m_loan_out) {
|
||||||
loan = dds_alloc (loan_size);
|
ddsi_sertopic_realloc_samples (buf, rd->m_topic->m_stopic, NULL, 0, maxs);
|
||||||
} else {
|
} else {
|
||||||
if (rd->m_loan) {
|
if (rd->m_loan) {
|
||||||
if (rd->m_loan_size < loan_size) {
|
if (rd->m_loan_size < maxs) {
|
||||||
rd->m_loan = dds_realloc_zero (rd->m_loan, loan_size);
|
ddsi_sertopic_realloc_samples (buf, rd->m_topic->m_stopic, rd->m_loan, rd->m_loan_size, maxs);
|
||||||
rd->m_loan_size = loan_size;
|
rd->m_loan = buf[0];
|
||||||
|
rd->m_loan_size = maxs;
|
||||||
|
} else {
|
||||||
|
buf[0] = rd->m_loan;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
rd->m_loan = dds_alloc (loan_size);
|
ddsi_sertopic_realloc_samples (buf, rd->m_topic->m_stopic, NULL, 0, maxs);
|
||||||
rd->m_loan_size = loan_size;
|
rd->m_loan = buf[0];
|
||||||
|
rd->m_loan_size = maxs;
|
||||||
}
|
}
|
||||||
loan = rd->m_loan;
|
rd->m_loan_out = true;
|
||||||
rd->m_loan_out = true;
|
|
||||||
}
|
|
||||||
for (i = 0; i < maxs; i++) {
|
|
||||||
buf[i] = loan;
|
|
||||||
loan += sz;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (take) {
|
if (take) {
|
||||||
|
@ -759,7 +755,7 @@ dds_return_loan(
|
||||||
_In_ int32_t bufsz)
|
_In_ int32_t bufsz)
|
||||||
{
|
{
|
||||||
dds__retcode_t rc;
|
dds__retcode_t rc;
|
||||||
const dds_topic_descriptor_t * desc;
|
const struct ddsi_sertopic *st;
|
||||||
dds_reader *rd;
|
dds_reader *rd;
|
||||||
dds_readcond *cond;
|
dds_readcond *cond;
|
||||||
dds_return_t ret = DDS_RETCODE_OK;
|
dds_return_t ret = DDS_RETCODE_OK;
|
||||||
|
@ -781,20 +777,16 @@ dds_return_loan(
|
||||||
ret = DDS_ERRNO(rc);
|
ret = DDS_ERRNO(rc);
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
desc = rd->m_topic->m_descriptor;
|
st = rd->m_topic->m_stopic;
|
||||||
|
|
||||||
/* Only free sample contents if they have been allocated */
|
for (int32_t i = 0; i < bufsz; i++) {
|
||||||
if (desc->m_flagset & DDS_TOPIC_NO_OPTIMIZE) {
|
ddsi_sertopic_free_sample (st, buf[i], DDS_FREE_CONTENTS);
|
||||||
int32_t i = 0;
|
|
||||||
for (i = 0; i < bufsz; i++) {
|
|
||||||
dds_sample_free(buf[i], desc, DDS_FREE_CONTENTS);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If possible return loan buffer to reader */
|
/* If possible return loan buffer to reader */
|
||||||
if (rd->m_loan != 0 && (buf[0] == rd->m_loan)) {
|
if (rd->m_loan != 0 && (buf[0] == rd->m_loan)) {
|
||||||
rd->m_loan_out = false;
|
rd->m_loan_out = false;
|
||||||
memset (rd->m_loan, 0, rd->m_loan_size);
|
ddsi_sertopic_zero_samples (st, rd->m_loan, rd->m_loan_size);
|
||||||
buf[0] = NULL;
|
buf[0] = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -668,8 +668,7 @@ static bool content_filter_accepts (const struct ddsi_sertopic *sertopic, const
|
||||||
const struct dds_topic *tp = sertopic->status_cb_entity;
|
const struct dds_topic *tp = sertopic->status_cb_entity;
|
||||||
if (tp->filter_fn)
|
if (tp->filter_fn)
|
||||||
{
|
{
|
||||||
const dds_topic_descriptor_t * desc = tp->m_descriptor;
|
char *tmp = ddsi_sertopic_alloc_sample (tp->m_stopic);
|
||||||
char *tmp = dds_alloc (desc->m_size);
|
|
||||||
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
|
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
|
||||||
ret = (tp->filter_fn) (tmp, tp->filter_ctx);
|
ret = (tp->filter_fn) (tmp, tp->filter_ctx);
|
||||||
ddsi_sertopic_free_sample (tp->m_stopic, tmp, DDS_FREE_ALL);
|
ddsi_sertopic_free_sample (tp->m_stopic, tmp, DDS_FREE_ALL);
|
||||||
|
@ -2168,7 +2167,6 @@ static bool update_conditions_locked
|
||||||
dds_readcond * iter;
|
dds_readcond * iter;
|
||||||
int m_pre;
|
int m_pre;
|
||||||
int m_post;
|
int m_post;
|
||||||
const struct dds_topic_descriptor *desc = rhc->topic->status_cb_entity->m_descriptor;
|
|
||||||
char *tmp = NULL;
|
char *tmp = NULL;
|
||||||
|
|
||||||
DDS_TRACE("update_conditions_locked(%p) - inst %u nonempty %u disp %u nowr %u new %u samples %u read %u\n",
|
DDS_TRACE("update_conditions_locked(%p) - inst %u nonempty %u disp %u nowr %u new %u samples %u read %u\n",
|
||||||
|
@ -2214,8 +2212,7 @@ static bool update_conditions_locked
|
||||||
{
|
{
|
||||||
if (sample && tmp == NULL && (dds_entity_kind(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY))
|
if (sample && tmp == NULL && (dds_entity_kind(iter->m_entity.m_hdl) == DDS_KIND_COND_QUERY))
|
||||||
{
|
{
|
||||||
tmp = os_malloc (desc->m_size);
|
tmp = ddsi_sertopic_alloc_sample (rhc->topic);
|
||||||
memset (tmp, 0, desc->m_size);
|
|
||||||
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
|
ddsi_serdata_to_sample (sample, tmp, NULL, NULL);
|
||||||
}
|
}
|
||||||
if
|
if
|
||||||
|
@ -2251,8 +2248,7 @@ static bool update_conditions_locked
|
||||||
|
|
||||||
if (tmp)
|
if (tmp)
|
||||||
{
|
{
|
||||||
ddsi_sertopic_free_sample (rhc->topic, tmp, DDS_FREE_CONTENTS);
|
ddsi_sertopic_free_sample (rhc->topic, tmp, DDS_FREE_ALL);
|
||||||
os_free (tmp);
|
|
||||||
}
|
}
|
||||||
return trigger;
|
return trigger;
|
||||||
}
|
}
|
||||||
|
|
|
@ -322,31 +322,42 @@ static bool dupdef_qos_ok(const dds_qos_t *qos, const struct ddsi_sertopic *st)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool sertopic_equivalent (const struct ddsi_sertopic *a, const struct ddsi_sertopic *b)
|
||||||
|
{
|
||||||
|
printf ("sertopic_equivalent %p %p (%s %s; %u %u; %p %p; %p %p)\n", a, b, a->name_typename, b->name_typename, a->serdata_basehash, b->serdata_basehash, a->ops, b->ops, a->serdata_ops, b->serdata_ops);
|
||||||
|
|
||||||
|
if (strcmp (a->name_typename, b->name_typename) != 0)
|
||||||
|
return false;
|
||||||
|
if (a->serdata_basehash != b->serdata_basehash)
|
||||||
|
return false;
|
||||||
|
if (a->ops != b->ops)
|
||||||
|
return false;
|
||||||
|
if (a->serdata_ops != b->serdata_ops)
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
_Pre_satisfies_((participant & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)
|
_Pre_satisfies_((participant & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)
|
||||||
DDS_EXPORT dds_entity_t
|
DDS_EXPORT dds_entity_t
|
||||||
dds_create_topic(
|
dds_create_topic_arbitrary (
|
||||||
_In_ dds_entity_t participant,
|
_In_ dds_entity_t participant,
|
||||||
_In_ const dds_topic_descriptor_t *desc,
|
_In_ struct ddsi_sertopic *sertopic,
|
||||||
_In_z_ const char *name,
|
_In_z_ const char *name,
|
||||||
_In_opt_ const dds_qos_t *qos,
|
_In_opt_ const dds_qos_t *qos,
|
||||||
_In_opt_ const dds_listener_t *listener)
|
_In_opt_ const dds_listener_t *listener,
|
||||||
|
_In_opt_ const nn_plist_t *sedp_plist)
|
||||||
{
|
{
|
||||||
char *key = NULL;
|
|
||||||
struct ddsi_sertopic *stgeneric;
|
struct ddsi_sertopic *stgeneric;
|
||||||
struct ddsi_sertopic_default *st;
|
|
||||||
const char *typename;
|
|
||||||
dds__retcode_t rc;
|
dds__retcode_t rc;
|
||||||
dds_entity *par;
|
dds_entity *par;
|
||||||
dds_topic *top;
|
dds_topic *top;
|
||||||
dds_qos_t *new_qos = NULL;
|
dds_qos_t *new_qos = NULL;
|
||||||
nn_plist_t plist;
|
|
||||||
dds_entity_t hdl;
|
dds_entity_t hdl;
|
||||||
struct participant *ddsi_pp;
|
struct participant *ddsi_pp;
|
||||||
struct thread_state1 *const thr = lookup_thread_state ();
|
struct thread_state1 *const thr = lookup_thread_state ();
|
||||||
const bool asleep = !vtime_awake_p (thr->vtime);
|
const bool asleep = !vtime_awake_p (thr->vtime);
|
||||||
uint32_t index;
|
|
||||||
|
|
||||||
if (desc == NULL){
|
if (sertopic == NULL){
|
||||||
DDS_ERROR("Topic description is NULL\n");
|
DDS_ERROR("Topic description is NULL\n");
|
||||||
hdl = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
hdl = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||||
goto bad_param_err;
|
goto bad_param_err;
|
||||||
|
@ -384,8 +395,7 @@ dds_create_topic(
|
||||||
/* Check if topic already exists with same name */
|
/* Check if topic already exists with same name */
|
||||||
os_mutexLock (&dds_global.m_mutex);
|
os_mutexLock (&dds_global.m_mutex);
|
||||||
if ((stgeneric = dds_topic_lookup_locked (par->m_domain, name)) != NULL) {
|
if ((stgeneric = dds_topic_lookup_locked (par->m_domain, name)) != NULL) {
|
||||||
st = (struct ddsi_sertopic_default *)stgeneric;
|
if (!sertopic_equivalent (stgeneric,sertopic)) {
|
||||||
if (st->type != desc) {
|
|
||||||
/* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */
|
/* FIXME: should copy the type, perhaps? but then the pointers will no longer be the same */
|
||||||
DDS_ERROR("Create topic with mismatching type\n");
|
DDS_ERROR("Create topic with mismatching type\n");
|
||||||
hdl = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
hdl = DDS_ERRNO(DDS_RETCODE_PRECONDITION_NOT_MET);
|
||||||
|
@ -394,17 +404,11 @@ dds_create_topic(
|
||||||
DDS_ERROR("Create topic with mismatching qos\n");
|
DDS_ERROR("Create topic with mismatching qos\n");
|
||||||
hdl = DDS_ERRNO(DDS_RETCODE_INCONSISTENT_POLICY);
|
hdl = DDS_ERRNO(DDS_RETCODE_INCONSISTENT_POLICY);
|
||||||
} else {
|
} else {
|
||||||
dds_entity_add_ref (&st->c.status_cb_entity->m_entity);
|
dds_entity_add_ref (&stgeneric->status_cb_entity->m_entity);
|
||||||
hdl = st->c.status_cb_entity->m_entity.m_hdl;
|
hdl = stgeneric->status_cb_entity->m_entity.m_hdl;
|
||||||
}
|
}
|
||||||
os_mutexUnlock (&dds_global.m_mutex);
|
os_mutexUnlock (&dds_global.m_mutex);
|
||||||
} else {
|
} else {
|
||||||
typename = desc->m_typename;
|
|
||||||
key = (char*) dds_alloc (strlen (name) + strlen (typename) + 2);
|
|
||||||
strcpy (key, name);
|
|
||||||
strcat (key, "/");
|
|
||||||
strcat (key, typename);
|
|
||||||
|
|
||||||
if (qos) {
|
if (qos) {
|
||||||
new_qos = dds_create_qos();
|
new_qos = dds_create_qos();
|
||||||
/* Only returns failure when one of the qos args is NULL, which
|
/* Only returns failure when one of the qos args is NULL, which
|
||||||
|
@ -414,77 +418,29 @@ dds_create_topic(
|
||||||
|
|
||||||
/* Create topic */
|
/* Create topic */
|
||||||
top = dds_alloc (sizeof (*top));
|
top = dds_alloc (sizeof (*top));
|
||||||
top->m_descriptor = desc;
|
|
||||||
hdl = dds_entity_init (&top->m_entity, par, DDS_KIND_TOPIC, new_qos, listener, DDS_TOPIC_STATUS_MASK);
|
hdl = dds_entity_init (&top->m_entity, par, DDS_KIND_TOPIC, new_qos, listener, DDS_TOPIC_STATUS_MASK);
|
||||||
top->m_entity.m_deriver.delete = dds_topic_delete;
|
top->m_entity.m_deriver.delete = dds_topic_delete;
|
||||||
top->m_entity.m_deriver.set_qos = dds_topic_qos_set;
|
top->m_entity.m_deriver.set_qos = dds_topic_qos_set;
|
||||||
top->m_entity.m_deriver.validate_status = dds_topic_status_validate;
|
top->m_entity.m_deriver.validate_status = dds_topic_status_validate;
|
||||||
|
top->m_stopic = ddsi_sertopic_ref (sertopic);
|
||||||
st = dds_alloc (sizeof (*st));
|
sertopic->status_cb_entity = top;
|
||||||
|
|
||||||
os_atomic_st32 (&st->c.refc, 1);
|
|
||||||
st->c.iid = ddsi_iid_gen ();
|
|
||||||
st->c.status_cb = dds_topic_status_cb;
|
|
||||||
st->c.status_cb_entity = top;
|
|
||||||
st->c.name_typename = key;
|
|
||||||
st->c.name = dds_alloc (strlen (name) + 1);
|
|
||||||
strcpy (st->c.name, name);
|
|
||||||
st->c.typename = dds_alloc (strlen (typename) + 1);
|
|
||||||
strcpy (st->c.typename, typename);
|
|
||||||
st->c.ops = &ddsi_sertopic_ops_default;
|
|
||||||
st->c.serdata_ops = desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey;
|
|
||||||
st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops);
|
|
||||||
st->native_encoding_identifier = (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE);
|
|
||||||
|
|
||||||
st->type = (void*) desc;
|
|
||||||
st->nkeys = desc->m_nkeys;
|
|
||||||
st->keys = desc->m_keys;
|
|
||||||
|
|
||||||
/* Check if topic cannot be optimised (memcpy marshal) */
|
|
||||||
|
|
||||||
if ((desc->m_flagset & DDS_TOPIC_NO_OPTIMIZE) == 0) {
|
|
||||||
st->opt_size = dds_stream_check_optimize (desc);
|
|
||||||
}
|
|
||||||
top->m_stopic = &st->c;
|
|
||||||
|
|
||||||
/* Add topic to extent */
|
/* Add topic to extent */
|
||||||
dds_topic_add_locked (par->m_domainid, &st->c);
|
dds_topic_add_locked (par->m_domainid, sertopic);
|
||||||
os_mutexUnlock (&dds_global.m_mutex);
|
os_mutexUnlock (&dds_global.m_mutex);
|
||||||
|
|
||||||
nn_plist_init_empty (&plist);
|
|
||||||
if (new_qos) {
|
|
||||||
dds_merge_qos (&plist.qos, new_qos);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Set Topic meta data (for SEDP publication) */
|
|
||||||
plist.qos.topic_name = dds_string_dup (st->c.name);
|
|
||||||
plist.qos.type_name = dds_string_dup (st->c.typename);
|
|
||||||
plist.qos.present |= (QP_TOPIC_NAME | QP_TYPE_NAME);
|
|
||||||
if (desc->m_meta) {
|
|
||||||
plist.type_description = dds_string_dup (desc->m_meta);
|
|
||||||
plist.present |= PP_PRISMTECH_TYPE_DESCRIPTION;
|
|
||||||
}
|
|
||||||
if (desc->m_nkeys) {
|
|
||||||
plist.qos.present |= QP_PRISMTECH_SUBSCRIPTION_KEYS;
|
|
||||||
plist.qos.subscription_keys.use_key_list = 1;
|
|
||||||
plist.qos.subscription_keys.key_list.n = desc->m_nkeys;
|
|
||||||
plist.qos.subscription_keys.key_list.strs = dds_alloc (desc->m_nkeys * sizeof (char*));
|
|
||||||
for (index = 0; index < desc->m_nkeys; index++) {
|
|
||||||
plist.qos.subscription_keys.key_list.strs[index] = dds_string_dup (desc->m_keys[index].m_name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Publish Topic */
|
/* Publish Topic */
|
||||||
if (asleep) {
|
if (asleep) {
|
||||||
thread_state_awake (thr);
|
thread_state_awake (thr);
|
||||||
}
|
}
|
||||||
ddsi_pp = ephash_lookup_participant_guid (&par->m_guid);
|
ddsi_pp = ephash_lookup_participant_guid (&par->m_guid);
|
||||||
assert (ddsi_pp);
|
assert (ddsi_pp);
|
||||||
sedp_write_topic (ddsi_pp, &plist);
|
if (sedp_plist) {
|
||||||
|
sedp_write_topic (ddsi_pp, sedp_plist);
|
||||||
|
}
|
||||||
if (asleep) {
|
if (asleep) {
|
||||||
thread_state_asleep (thr);
|
thread_state_asleep (thr);
|
||||||
}
|
}
|
||||||
nn_plist_fini (&plist);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
qos_err:
|
qos_err:
|
||||||
|
@ -494,6 +450,103 @@ bad_param_err:
|
||||||
return hdl;
|
return hdl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_Pre_satisfies_((participant & DDS_ENTITY_KIND_MASK) == DDS_KIND_PARTICIPANT)
|
||||||
|
DDS_EXPORT dds_entity_t
|
||||||
|
dds_create_topic(
|
||||||
|
_In_ dds_entity_t participant,
|
||||||
|
_In_ const dds_topic_descriptor_t *desc,
|
||||||
|
_In_z_ const char *name,
|
||||||
|
_In_opt_ const dds_qos_t *qos,
|
||||||
|
_In_opt_ const dds_listener_t *listener)
|
||||||
|
{
|
||||||
|
char *key = NULL;
|
||||||
|
struct ddsi_sertopic_default *st;
|
||||||
|
const char *typename;
|
||||||
|
dds_qos_t *new_qos = NULL;
|
||||||
|
nn_plist_t plist;
|
||||||
|
dds_entity_t hdl;
|
||||||
|
uint32_t index;
|
||||||
|
|
||||||
|
if (desc == NULL){
|
||||||
|
DDS_ERROR("Topic description is NULL");
|
||||||
|
hdl = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||||
|
goto bad_param_err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (name == NULL) {
|
||||||
|
DDS_ERROR("Topic name is NULL");
|
||||||
|
hdl = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||||
|
goto bad_param_err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!is_valid_name(name)) {
|
||||||
|
DDS_ERROR("Topic name contains characters that are not allowed.");
|
||||||
|
hdl = DDS_ERRNO(DDS_RETCODE_BAD_PARAMETER);
|
||||||
|
goto bad_param_err;
|
||||||
|
}
|
||||||
|
|
||||||
|
typename = desc->m_typename;
|
||||||
|
key = (char*) dds_alloc (strlen (name) + strlen (typename) + 2);
|
||||||
|
strcpy (key, name);
|
||||||
|
strcat (key, "/");
|
||||||
|
strcat (key, typename);
|
||||||
|
|
||||||
|
st = dds_alloc (sizeof (*st));
|
||||||
|
|
||||||
|
os_atomic_st32 (&st->c.refc, 1);
|
||||||
|
st->c.iid = ddsi_iid_gen ();
|
||||||
|
st->c.status_cb = dds_topic_status_cb;
|
||||||
|
st->c.status_cb_entity = NULL; /* set by dds_create_topic_arbitrary */
|
||||||
|
st->c.name_typename = key;
|
||||||
|
st->c.name = dds_alloc (strlen (name) + 1);
|
||||||
|
strcpy (st->c.name, name);
|
||||||
|
st->c.typename = dds_alloc (strlen (typename) + 1);
|
||||||
|
strcpy (st->c.typename, typename);
|
||||||
|
st->c.ops = &ddsi_sertopic_ops_default;
|
||||||
|
st->c.serdata_ops = desc->m_nkeys ? &ddsi_serdata_ops_cdr : &ddsi_serdata_ops_cdr_nokey;
|
||||||
|
st->c.serdata_basehash = ddsi_sertopic_compute_serdata_basehash (st->c.serdata_ops);
|
||||||
|
st->native_encoding_identifier = (PLATFORM_IS_LITTLE_ENDIAN ? CDR_LE : CDR_BE);
|
||||||
|
|
||||||
|
st->type = (void*) desc;
|
||||||
|
st->nkeys = desc->m_nkeys;
|
||||||
|
st->keys = desc->m_keys;
|
||||||
|
|
||||||
|
/* Check if topic cannot be optimised (memcpy marshal) */
|
||||||
|
if ((desc->m_flagset & DDS_TOPIC_NO_OPTIMIZE) == 0) {
|
||||||
|
st->opt_size = dds_stream_check_optimize (desc);
|
||||||
|
}
|
||||||
|
|
||||||
|
nn_plist_init_empty (&plist);
|
||||||
|
if (new_qos) {
|
||||||
|
dds_merge_qos (&plist.qos, new_qos);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set Topic meta data (for SEDP publication) */
|
||||||
|
plist.qos.topic_name = dds_string_dup (st->c.name);
|
||||||
|
plist.qos.type_name = dds_string_dup (st->c.typename);
|
||||||
|
plist.qos.present |= (QP_TOPIC_NAME | QP_TYPE_NAME);
|
||||||
|
if (desc->m_meta) {
|
||||||
|
plist.type_description = dds_string_dup (desc->m_meta);
|
||||||
|
plist.present |= PP_PRISMTECH_TYPE_DESCRIPTION;
|
||||||
|
}
|
||||||
|
if (desc->m_nkeys) {
|
||||||
|
plist.qos.present |= QP_PRISMTECH_SUBSCRIPTION_KEYS;
|
||||||
|
plist.qos.subscription_keys.use_key_list = 1;
|
||||||
|
plist.qos.subscription_keys.key_list.n = desc->m_nkeys;
|
||||||
|
plist.qos.subscription_keys.key_list.strs = dds_alloc (desc->m_nkeys * sizeof (char*));
|
||||||
|
for (index = 0; index < desc->m_nkeys; index++) {
|
||||||
|
plist.qos.subscription_keys.key_list.strs[index] = dds_string_dup (desc->m_keys[index].m_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hdl = dds_create_topic_arbitrary(participant, &st->c, name, qos, listener, &plist);
|
||||||
|
ddsi_sertopic_unref (&st->c);
|
||||||
|
nn_plist_fini (&plist);
|
||||||
|
|
||||||
|
bad_param_err:
|
||||||
|
return hdl;
|
||||||
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
dds_topic_chaining_filter(
|
dds_topic_chaining_filter(
|
||||||
const void *sample,
|
const void *sample,
|
||||||
|
|
|
@ -9,8 +9,8 @@
|
||||||
*
|
*
|
||||||
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
|
||||||
*/
|
*/
|
||||||
#ifndef DDSI_SER_H
|
#ifndef DDSI_SERDATA_DEFAULT_H
|
||||||
#define DDSI_SER_H
|
#define DDSI_SERDATA_DEFAULT_H
|
||||||
|
|
||||||
#include "os/os.h"
|
#include "os/os.h"
|
||||||
#include "ddsi/q_plist.h" /* for nn_prismtech_writer_info */
|
#include "ddsi/q_plist.h" /* for nn_prismtech_writer_info */
|
||||||
|
|
|
@ -41,11 +41,19 @@ struct ddsi_sertopic {
|
||||||
typedef void (*ddsi_sertopic_deinit_t) (struct ddsi_sertopic *tp);
|
typedef void (*ddsi_sertopic_deinit_t) (struct ddsi_sertopic *tp);
|
||||||
|
|
||||||
/* Release any memory allocated by ddsi_sertopic_to_sample */
|
/* Release any memory allocated by ddsi_sertopic_to_sample */
|
||||||
typedef void (*ddsi_sertopic_free_sample_t) (const struct ddsi_sertopic *d, void *sample, dds_free_op_t op);
|
typedef void (*ddsi_sertopic_zero_samples_t) (const struct ddsi_sertopic *d, void *samples, size_t count);
|
||||||
|
|
||||||
|
/* Release any memory allocated by ddsi_sertopic_to_sample */
|
||||||
|
typedef void (*ddsi_sertopic_realloc_samples_t) (void **ptrs, const struct ddsi_sertopic *d, void *old, size_t oldcount, size_t count);
|
||||||
|
|
||||||
|
/* Release any memory allocated by ddsi_sertopic_to_sample (also undo sertopic_alloc_sample if "op" so requests) */
|
||||||
|
typedef void (*ddsi_sertopic_free_samples_t) (const struct ddsi_sertopic *d, void **ptrs, size_t count, dds_free_op_t op);
|
||||||
|
|
||||||
struct ddsi_sertopic_ops {
|
struct ddsi_sertopic_ops {
|
||||||
ddsi_sertopic_deinit_t deinit;
|
ddsi_sertopic_deinit_t deinit;
|
||||||
ddsi_sertopic_free_sample_t free_sample;
|
ddsi_sertopic_zero_samples_t zero_samples;
|
||||||
|
ddsi_sertopic_realloc_samples_t realloc_samples;
|
||||||
|
ddsi_sertopic_free_samples_t free_samples;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp);
|
struct ddsi_sertopic *ddsi_sertopic_ref (const struct ddsi_sertopic *tp);
|
||||||
|
@ -55,8 +63,26 @@ uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *
|
||||||
inline void ddsi_sertopic_deinit (struct ddsi_sertopic *tp) {
|
inline void ddsi_sertopic_deinit (struct ddsi_sertopic *tp) {
|
||||||
tp->ops->deinit (tp);
|
tp->ops->deinit (tp);
|
||||||
}
|
}
|
||||||
|
inline void ddsi_sertopic_zero_samples (const struct ddsi_sertopic *tp, void *samples, size_t count) {
|
||||||
|
tp->ops->zero_samples (tp, samples, count);
|
||||||
|
}
|
||||||
|
inline void ddsi_sertopic_realloc_samples (void **ptrs, const struct ddsi_sertopic *tp, void *old, size_t oldcount, size_t count)
|
||||||
|
{
|
||||||
|
tp->ops->realloc_samples (ptrs, tp, old, oldcount, count);
|
||||||
|
}
|
||||||
|
inline void ddsi_sertopic_free_samples (const struct ddsi_sertopic *tp, void **ptrs, size_t count, dds_free_op_t op) {
|
||||||
|
tp->ops->free_samples (tp, ptrs, count, op);
|
||||||
|
}
|
||||||
|
inline void ddsi_sertopic_zero_sample (const struct ddsi_sertopic *tp, void *sample) {
|
||||||
|
ddsi_sertopic_zero_samples (tp, sample, 1);
|
||||||
|
}
|
||||||
|
inline void *ddsi_sertopic_alloc_sample (const struct ddsi_sertopic *tp) {
|
||||||
|
void *ptr;
|
||||||
|
ddsi_sertopic_realloc_samples (&ptr, tp, NULL, 0, 1);
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
inline void ddsi_sertopic_free_sample (const struct ddsi_sertopic *tp, void *sample, dds_free_op_t op) {
|
inline void ddsi_sertopic_free_sample (const struct ddsi_sertopic *tp, void *sample, dds_free_op_t op) {
|
||||||
tp->ops->free_sample (tp, sample, op);
|
ddsi_sertopic_free_samples (tp, &sample, 1, op);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -61,4 +61,9 @@ uint32_t ddsi_sertopic_compute_serdata_basehash (const struct ddsi_serdata_ops *
|
||||||
}
|
}
|
||||||
|
|
||||||
extern inline void ddsi_sertopic_deinit (struct ddsi_sertopic *tp);
|
extern inline void ddsi_sertopic_deinit (struct ddsi_sertopic *tp);
|
||||||
|
extern inline void ddsi_sertopic_zero_samples (const struct ddsi_sertopic *tp, void *samples, size_t count);
|
||||||
|
extern inline void ddsi_sertopic_realloc_samples (void **ptrs, const struct ddsi_sertopic *tp, void *old, size_t oldcount, size_t count);
|
||||||
|
extern inline void ddsi_sertopic_free_samples (const struct ddsi_sertopic *tp, void **ptrs, size_t count, dds_free_op_t op);
|
||||||
|
extern inline void ddsi_sertopic_zero_sample (const struct ddsi_sertopic *tp, void *sample);
|
||||||
extern inline void ddsi_sertopic_free_sample (const struct ddsi_sertopic *tp, void *sample, dds_free_op_t op);
|
extern inline void ddsi_sertopic_free_sample (const struct ddsi_sertopic *tp, void *sample, dds_free_op_t op);
|
||||||
|
extern inline void *ddsi_sertopic_alloc_sample (const struct ddsi_sertopic *tp);
|
||||||
|
|
|
@ -30,13 +30,56 @@ static void sertopic_default_deinit (struct ddsi_sertopic *tp)
|
||||||
(void)tp;
|
(void)tp;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void sertopic_default_free_sample (const struct ddsi_sertopic *sertopic_common, void *sample, dds_free_op_t op)
|
static void sertopic_default_zero_samples (const struct ddsi_sertopic *sertopic_common, void *sample, size_t count)
|
||||||
{
|
{
|
||||||
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)sertopic_common;
|
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)sertopic_common;
|
||||||
dds_sample_free (sample, tp->type, op);
|
memset (sample, 0, tp->type->m_size * count);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void sertopic_default_realloc_samples (void **ptrs, const struct ddsi_sertopic *sertopic_common, void *old, size_t oldcount, size_t count)
|
||||||
|
{
|
||||||
|
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)sertopic_common;
|
||||||
|
const size_t size = tp->type->m_size;
|
||||||
|
char *new = dds_realloc (old, size * count);
|
||||||
|
if (new && count > oldcount)
|
||||||
|
memset (new + size * oldcount, 0, size * (count - oldcount));
|
||||||
|
for (size_t i = 0; i < count; i++)
|
||||||
|
{
|
||||||
|
void *ptr = (char *) new + i * size;
|
||||||
|
ptrs[i] = ptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void sertopic_default_free_samples (const struct ddsi_sertopic *sertopic_common, void **ptrs, size_t count, dds_free_op_t op)
|
||||||
|
{
|
||||||
|
if (count > 0)
|
||||||
|
{
|
||||||
|
const struct ddsi_sertopic_default *tp = (const struct ddsi_sertopic_default *)sertopic_common;
|
||||||
|
const struct dds_topic_descriptor *type = tp->type;
|
||||||
|
const size_t size = type->m_size;
|
||||||
|
#ifndef NDEBUG
|
||||||
|
for (size_t i = 0, off = 0; i < count; i++, off += size)
|
||||||
|
assert ((char *)ptrs[i] == (char *)ptrs[0] + off);
|
||||||
|
#endif
|
||||||
|
if (type->m_flagset & DDS_TOPIC_NO_OPTIMIZE)
|
||||||
|
{
|
||||||
|
char *ptr = ptrs[0];
|
||||||
|
for (size_t i = 0; i < count; i++)
|
||||||
|
{
|
||||||
|
dds_sample_free (ptr, type, DDS_FREE_CONTENTS);
|
||||||
|
ptr += size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (op & DDS_FREE_ALL_BIT)
|
||||||
|
{
|
||||||
|
dds_free (ptrs[0]);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const struct ddsi_sertopic_ops ddsi_sertopic_ops_default = {
|
const struct ddsi_sertopic_ops ddsi_sertopic_ops_default = {
|
||||||
.deinit = sertopic_default_deinit,
|
.deinit = sertopic_default_deinit,
|
||||||
.free_sample = sertopic_default_free_sample
|
.zero_samples = sertopic_default_zero_samples,
|
||||||
|
.realloc_samples = sertopic_default_realloc_samples,
|
||||||
|
.free_samples = sertopic_default_free_samples
|
||||||
};
|
};
|
||||||
|
|
|
@ -1040,7 +1040,7 @@ int rtps_init (void)
|
||||||
make_builtin_endpoint_xqos (&gv.builtin_endpoint_xqos_rd, &gv.default_xqos_rd);
|
make_builtin_endpoint_xqos (&gv.builtin_endpoint_xqos_rd, &gv.default_xqos_rd);
|
||||||
make_builtin_endpoint_xqos (&gv.builtin_endpoint_xqos_wr, &gv.default_xqos_wr);
|
make_builtin_endpoint_xqos (&gv.builtin_endpoint_xqos_wr, &gv.default_xqos_wr);
|
||||||
|
|
||||||
make_special_topics (); /* FIXME: leaking these for now */
|
make_special_topics ();
|
||||||
|
|
||||||
os_mutexInit (&gv.participant_set_lock);
|
os_mutexInit (&gv.participant_set_lock);
|
||||||
os_condInit (&gv.participant_set_cond, &gv.participant_set_lock);
|
os_condInit (&gv.participant_set_cond, &gv.participant_set_lock);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue